1. 修复(kinit-api):utils/cache.py 日志模块导入问题修复 2. 修复(kinit-api):token解析失败会报错问题修复 3. 优化(kinit-api):用户登录认证失败返回值优化 4. 优化(kinit-api):获取redis方式统一改为redis_getter(request) 5. 优化(kini-api):文件IO修改为异步操作 6. 优化(kinit-api):关联创建人时将user_id改为create_user_id 7. 文档(kinit-api):kinit-api/README.md 加入查询数据文档 8. 修复(kinit-admin):用户无法导出问题修复 9. 优化(kinit-admin):角色新增与编辑框使用默认父子联动 10. 更新(kinit-admin):usePermissionStore 改为 useRouterStoreWithOut,因为此文件主要记录路由 11. 更新(kinit-admin):取消用户信息的持久化存储,改为仅保存在pinia store共享中,并添加roles,permissions信息 12. 修复(kinit-admin):菜单新增与编辑框,目录与菜单切换时会出现抖动问题修复 13. 优化(kinit-admin):src\hooks\web\useTable.ts 优化删除数据方法 14. 优化(kinit-admin):config/services.ts 新增返回403状态码时直接退出系统 15. 优化(kinit-admin):将store中的本文件使用store调用的,改为this 16. 更新(kinit-admin):路由拦截更新 17. 更新(kinit-api,kinit-admin,kinit-uni):取消接口地址最后面的 /
147 lines
6.1 KiB
Python
147 lines
6.1 KiB
Python
# -*- coding: utf-8 -*-
|
||
# @version : 1.0
|
||
# @Create Time : 2021/10/24 16:44
|
||
# @File : views.py
|
||
# @IDE : PyCharm
|
||
# @desc : 安全认证视图
|
||
|
||
"""
|
||
JWT 表示 「JSON Web Tokens」。https://jwt.io/
|
||
|
||
它是一个将 JSON 对象编码为密集且没有空格的长字符串的标准。
|
||
|
||
通过这种方式,你可以创建一个有效期为 1 周的令牌。然后当用户第二天使用令牌重新访问时,你知道该用户仍然处于登入状态。
|
||
一周后令牌将会过期,用户将不会通过认证,必须再次登录才能获得一个新令牌。
|
||
|
||
我们需要安装 python-jose 以在 Python 中生成和校验 JWT 令牌:pip install python-jose[cryptography]
|
||
|
||
PassLib 是一个用于处理哈希密码的很棒的 Python 包。它支持许多安全哈希算法以及配合算法使用的实用程序。
|
||
推荐的算法是 「Bcrypt」:pip install passlib[bcrypt]
|
||
"""
|
||
|
||
from datetime import timedelta
|
||
from aioredis import Redis
|
||
from fastapi import APIRouter, Depends, Request, Body
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from core.database import db_getter, redis_getter
|
||
from utils import status
|
||
from utils.response import SuccessResponse, ErrorResponse
|
||
from application import settings
|
||
from .login_manage import LoginManage
|
||
from .validation import LoginForm, WXLoginForm
|
||
from apps.vadmin.record.models import VadminLoginRecord
|
||
from apps.vadmin.auth.crud import MenuDal, UserDal
|
||
from .current import FullAdminAuth
|
||
from .validation.auth import Auth
|
||
from utils.wx.oauth import WXOAuth
|
||
import jwt
|
||
|
||
app = APIRouter()
|
||
|
||
|
||
@app.post("/login", summary="手机号密码登录", description="员工登录通道,限制最多输错次数,达到最大值后将is_active=False")
|
||
async def login_for_access_token(
|
||
request: Request,
|
||
data: LoginForm,
|
||
manage: LoginManage = Depends(),
|
||
db: AsyncSession = Depends(db_getter)
|
||
):
|
||
try:
|
||
if data.method == "0":
|
||
result = await manage.password_login(data, db, request)
|
||
elif data.method == "1":
|
||
result = await manage.sms_login(data, db, request)
|
||
else:
|
||
raise ValueError("无效参数")
|
||
|
||
if not result.status:
|
||
raise ValueError(result.msg)
|
||
|
||
access_token = LoginManage.create_token({"sub": result.user.telephone, "is_refresh": False})
|
||
expires = timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)
|
||
refresh_token = LoginManage.create_token({"sub": result.user.telephone, "is_refresh": True}, expires=expires)
|
||
resp = {
|
||
"access_token": access_token,
|
||
"refresh_token": refresh_token,
|
||
"token_type": "bearer",
|
||
"is_reset_password": result.user.is_reset_password,
|
||
"is_wx_server_openid": result.user.is_wx_server_openid
|
||
}
|
||
await VadminLoginRecord.create_login_record(db, data, True, request, resp)
|
||
return SuccessResponse(resp)
|
||
except ValueError as e:
|
||
await VadminLoginRecord.create_login_record(db, data, False, request, {"message": str(e)})
|
||
return ErrorResponse(msg=str(e))
|
||
|
||
|
||
@app.post("/wx/login", summary="微信服务端一键登录", description="员工登录通道")
|
||
async def wx_login_for_access_token(
|
||
request: Request,
|
||
data: WXLoginForm,
|
||
db: AsyncSession = Depends(db_getter),
|
||
rd: Redis = Depends(redis_getter)
|
||
):
|
||
try:
|
||
if data.platform != "1" or data.method != "2":
|
||
raise ValueError("无效参数")
|
||
wx = WXOAuth(rd, 0)
|
||
telephone = await wx.parsing_phone_number(data.code)
|
||
if not telephone:
|
||
raise ValueError("无效Code")
|
||
data.telephone = telephone
|
||
user = await UserDal(db).get_data(telephone=telephone, v_return_none=True)
|
||
if not user:
|
||
raise ValueError("手机号不存在")
|
||
elif not user.is_active:
|
||
raise ValueError("手机号已被冻结")
|
||
except ValueError as e:
|
||
await VadminLoginRecord.create_login_record(db, data, False, request, {"message": str(e)})
|
||
return ErrorResponse(msg=str(e))
|
||
|
||
# 更新登录时间
|
||
await user.update_login_info(db, request.client.host)
|
||
|
||
# 登录成功创建 token
|
||
access_token = LoginManage.create_token({"sub": user.telephone, "is_refresh": False})
|
||
expires = timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)
|
||
refresh_token = LoginManage.create_token({"sub": user.telephone, "is_refresh": True}, expires=expires)
|
||
resp = {
|
||
"access_token": access_token,
|
||
"refresh_token": refresh_token,
|
||
"token_type": "bearer",
|
||
"is_reset_password": user.is_reset_password,
|
||
"is_wx_server_openid": user.is_wx_server_openid
|
||
}
|
||
await VadminLoginRecord.create_login_record(db, data, True, request, resp)
|
||
return SuccessResponse(resp)
|
||
|
||
|
||
@app.get("/getMenuList", summary="获取当前用户菜单树")
|
||
async def get_menu_list(auth: Auth = Depends(FullAdminAuth())):
|
||
return SuccessResponse(await MenuDal(auth.db).get_routers(auth.user))
|
||
|
||
|
||
@app.post("/token/refresh", summary="刷新Token")
|
||
async def token_refresh(refresh: str = Body(..., title="刷新Token")):
|
||
error_code = status.HTTP_401_UNAUTHORIZED
|
||
try:
|
||
payload = jwt.decode(refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
||
telephone: str = payload.get("sub")
|
||
is_refresh: bool = payload.get("is_refresh")
|
||
if telephone is None or not is_refresh:
|
||
return ErrorResponse("未认证,请您重新登录", code=error_code, status=error_code)
|
||
except jwt.exceptions.InvalidSignatureError:
|
||
return ErrorResponse("无效认证,请您重新登录", code=error_code, status=error_code)
|
||
except jwt.exceptions.ExpiredSignatureError:
|
||
return ErrorResponse("登录已超时,请您重新登录", code=error_code, status=error_code)
|
||
|
||
access_token = LoginManage.create_token({"sub": telephone, "is_refresh": False})
|
||
expires = timedelta(minutes=settings.REFRESH_TOKEN_EXPIRE_MINUTES)
|
||
refresh_token = LoginManage.create_token({"sub": telephone, "is_refresh": True}, expires=expires)
|
||
resp = {
|
||
"access_token": access_token,
|
||
"refresh_token": refresh_token,
|
||
"token_type": "bearer"
|
||
}
|
||
return SuccessResponse(resp)
|