kinit/kinit-api/utils/wx/wx_access_token.py
ktianc 7fbdcb3b0f 版本升级
1. 新增:微信小程序端新增微信手机号登录功能(必须为企业认证小程序)
2. 新增:加入动态更新常见问题
3. 新增:新增小程序分享功能
4. 新增:小程序新增第一次登录需要修改密码
5. 新增:新增接口权限控制
6. 新增:用户新增is_staff用来判断是否为工作人员
7. 新增:软删除新增is_delete字段来判断,delete_datetime当前主要来记录时间
8. 更新:部分接口删除功能已更新,需要试用软删除的才会试用软删除
9. 更新:更新系统配置缓存功能
10. 更新:接口认证依赖项更新
11. 更新:获取系统基础配置信息与用户协议与隐私协议更新
12. 优化:优化接口与数据库操作
2023-02-27 17:28:27 +08:00

62 lines
2.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python
# -*- coding: utf-8 -*-
# @version : 1.0
# @Creaet Time : 2021/11/27 18:37
# @File : wx_access_token.py
# @IDE : PyCharm
# @desc : 获取小程序全局唯一后台接口调用凭据
import requests
from aioredis import Redis
from core.logger import logger
class WxAccessToken:
"""
获取到的access_token存储在redis数据库中
获取小程序全局唯一后台接口调用凭据access_token。调用绝大多数后台接口时都需使用 access_token开发者需要进行妥善保存。
官方文档https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/access-token/auth.getAccessToken.html
"""
def __init__(self, appid: str, secret: str, redis: Redis, grant_type: str = "client_credential", *args, **kwargs):
self.__url = "https://api.weixin.qq.com/cgi-bin/token"
self.__method = "get"
self.appidKey = f"{appid}_access_token"
self.redis = redis
self.params = {
"appid": appid,
"secret": secret,
"grant_type": grant_type
}
async def get(self) -> dict:
"""
获取小程序access_token
"""
token = await self.redis.get(self.appidKey)
if not token:
return await self.update()
return {"status": True, "token": token}
async def update(self) -> dict:
"""
更新小程序access_token
"""
print("开始更新 access_token")
method = getattr(requests, self.__method)
response = method(url=self.__url, params=self.params)
result = response.json()
if result.get("errcode", "0") != "0":
print("获取access_token失败", result)
logger.error(f"获取access_token失败{result}")
return {"status": False, "token": None}
print("成功获取到", result)
await self.redis.set(self.appidKey, result.get("access_token"), ex=2000)
logger.info(f"获取access_token成功{result}")
return {"status": True, "token": result.get("access_token")}