diff --git a/kinit-api/apps/vadmin/auth/crud.py b/kinit-api/apps/vadmin/auth/crud.py index f8f5c05..97d66c0 100644 --- a/kinit-api/apps/vadmin/auth/crud.py +++ b/kinit-api/apps/vadmin/auth/crud.py @@ -9,11 +9,12 @@ from typing import Any from redis.asyncio import Redis from fastapi import UploadFile -from sqlalchemy.orm import joinedload -from sqlalchemy.orm.strategy_options import _AbstractLoad +from sqlalchemy.exc import StatementError +from sqlalchemy.orm import joinedload, aliased +from sqlalchemy.orm.strategy_options import _AbstractLoad, contains_eager from core.exception import CustomException from fastapi.encoders import jsonable_encoder -from sqlalchemy import select, false +from sqlalchemy import select, false, and_ from core.crud import DalBase from sqlalchemy.ext.asyncio import AsyncSession from core.validator import vali_telephone @@ -28,6 +29,7 @@ from . import models, schemas from application import settings from utils.excel.excel_manage import ExcelManage from apps.vadmin.system import crud as vadmin_system_crud +from apps.vadmin.help import models as vadmin_help_models import copy from utils import status from utils.wx.oauth import WXOAuth @@ -138,7 +140,7 @@ class UserDal(DalBase): :param v_schema: :return: """ - obj = await self.get_data(data_id, v_options=[joinedload(self.model.roles)]) + obj = await self.get_data(data_id, v_options=[joinedload(self.model.roles), joinedload(self.model.depts)]) data_dict = jsonable_encoder(data) for key, value in data_dict.items(): if key == "role_ids": @@ -743,22 +745,309 @@ class DeptDal(DalBase): class TestDal(DalBase): def __init__(self, db: AsyncSession): - super(TestDal, self).__init__(db, models.VadminUser, schemas.UserSimpleOut) + super(TestDal, self).__init__() + self.db = db + self.model = models.VadminUser - async def test(self): - # print("-----------------------开始------------------------") - options = [joinedload(self.model.roles)] - v_join = [[self.model.roles]] - v_where = [self.model.id == 1, models.VadminRole.id == 1] - v_start_sql = select(self.model) - result, count = await self.get_datas( - v_start_sql=v_start_sql, - v_join=v_join, - v_options=options, - v_where=v_where, - v_return_count=True + async def test_session_cache(self): + """ + SQLAlchemy 会话(Session)缓存机制: + 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。 + 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。 + + 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059 + :return: + """ + print("==================================会话缓存====================================") + await self.test_session_cache1() + print("=============查询出单个对象结果后,即使没有通过.访问属性,同样会产生缓存=============") + await self.test_session_cache2() + print("=============================数据列表会话缓存==================================") + await self.test_session_cache3() + print("=============================expire 单个对象过期==============================") + await self.test_session_cache4() + print("=========expire 单个对象过期后,重新访问之前对象的属性也会重新查询数据库,但是不会重新加载关系==========") + await self.test_session_cache5() + + async def test_session_cache1(self): + """ + SQLAlchemy 会话(Session)缓存机制: + 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。 + 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。 + + 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059 + + 示例:会话缓存 + + :return: + """ + # 第一次查询,并加载用户的所有关联部门项 + sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts)) + queryset1 = await self.db.scalars(sql1) + user1 = queryset1.unique().first() + print(f"用户编号:{user1.id} 用户姓名:{user1.name} 关联部门 {[i.name for i in user1.depts]}") + + # 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取 + sql2 = select(models.VadminUser).where(models.VadminUser.id == 1) + queryset2 = await self.db.scalars(sql2) + user2 = queryset2.first() + print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}") + + async def test_session_cache2(self): + """ + SQLAlchemy 会话(Session)缓存机制: + 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。 + 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。 + + 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059 + + 示例:查询出单个对象结果后,即使没有通过.访问属性,同样会产生缓存 + + :return: + """ + # 第一次查询,并加载用户的所有关联部门项,但是不访问用户的属性 + sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts)) + queryset1 = await self.db.scalars(sql1) + user1 = queryset1.unique().first() + print(f"没有访问属性,也会产生缓存") + + # 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取 + sql2 = select(models.VadminUser).where(models.VadminUser.id == 1) + queryset2 = await self.db.scalars(sql2) + user2 = queryset2.first() + print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}") + + async def test_session_cache3(self): + """ + SQLAlchemy 会话(Session)缓存机制: + 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。 + 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。 + + 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059 + + 示例:数据列表会话缓存 + + :return: + """ + # 第一次查询出所有用户,并加载用户的所有关联部门项 + sql1 = select(models.VadminUser).options(joinedload(models.VadminUser.depts)) + queryset1 = await self.db.scalars(sql1) + datas1 = queryset1.unique().all() + for data in datas1: + print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}") + + # 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取 + sql2 = select(models.VadminUser) + queryset2 = await self.db.scalars(sql2) + datas2 = queryset2.all() + for data in datas2: + print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}") + + async def test_session_cache4(self): + """ + SQLAlchemy 会话(Session)缓存机制: + 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。 + 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。 + + 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059 + + 示例:expire 单个对象过期 + + :return: + """ + # 第一次查询,并加载用户的所有关联部门项 + sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts)) + queryset1 = await self.db.scalars(sql1) + user1 = queryset1.unique().first() + print(f"用户编号:{user1.id} 用户姓名:{user1.name} 关联部门 {[i.name for i in user1.depts]}") + + # 使当前会话(Session)中的 user1 对象过期,再次访问就会重新查询数据库数据 + self.db.expire(user1) + + # 第二次查询会发现会话中没有该对象的缓存,会重新在数据库中查询 + sql2 = select(models.VadminUser).where(models.VadminUser.id == 1) + queryset2 = await self.db.scalars(sql2) + user2 = queryset2.first() + try: + print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}") + except StatementError: + print("访问部门报错了!!!!!") + + async def test_session_cache5(self): + """ + SQLAlchemy 会话(Session)缓存机制: + 当你通过一个会话查询数据库时,SQLAlchemy 首先检查这个对象是否已经在会话缓存中。如果是,它会直接从缓存中返回对象,而不是从数据库重新加载。 + 在一个会话中,对于具有相同主键的实体,会话缓存确保只有一个唯一的对象实例。这有助于维护数据的一致性。 + + 会话(Session)缓存:https://blog.csdn.net/k_genius/article/details/135491059 + + 示例:expire 单个对象过期后,重新访问之前对象的属性也会重新查询数据库,但是不会重新加载关系 + + :return: + """ + # 第一次查询,并加载用户的所有关联部门项 + sql = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts)) + queryset = await self.db.scalars(sql) + user = queryset.unique().first() + print(f"用户编号:{user.id} 用户姓名:{user.name} 关联部门 {[i.name for i in user.depts]}") + + # 使当前会话(Session)中的 user9 对象过期,再次访问就会重新查询数据库数据 + self.db.expire(user) + + # 第二次查询会发现会话中没有该对象的缓存,会重新在数据库中查询,但是不会重新加载关系 + try: + print(f"用户编号:{user.id} 用户姓名:{user.name} 关联部门 {[i.name for i in user.depts]}") + except StatementError: + print("访问部门报错了!!!!!") + + async def test_join_form(self): + """ + join_form 使用示例:通过关联表的查询条件反查询出主表的数据 + + 官方描述:在当前 Select 的左侧不符合我们想要从中进行连接的情况下,可以使用 Select.join_from() 方法 + 官方文档:https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#setting-the-leftmost-from-clause-in-a-join + + 查询条件:获取指定用户所关联的所有部门列表数据,只返回关联的部门列表数据 + :return: + """ + # 设定用户编号为:1 + user_id = 1 + + sql = select(models.VadminDept).where(models.VadminDept.is_delete == false()) + sql = sql.join_from(models.VadminUser, models.VadminUser.depts).where(models.VadminUser.id == user_id) + queryset = await self.db.scalars(sql) + result = queryset.unique().all() + for dept in result: + print(f"部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}") + + # 转换后的 SQL: + # SELECT + # vadmin_auth_dept.NAME, + # vadmin_auth_dept.dept_key, + # vadmin_auth_dept.disabled, + # vadmin_auth_dept.order, + # vadmin_auth_dept.desc, + # vadmin_auth_dept.OWNER, + # vadmin_auth_dept.phone, + # vadmin_auth_dept.email, + # vadmin_auth_dept.parent_id, + # vadmin_auth_dept.id, + # vadmin_auth_dept.create_datetime, + # vadmin_auth_dept.update_datetime, + # vadmin_auth_dept.delete_datetime, + # vadmin_auth_dept.is_delete + # FROM + # vadmin_auth_user + # JOIN vadmin_auth_user_depts ON vadmin_auth_user.id = vadmin_auth_user_depts.user_id + # JOIN vadmin_auth_dept ON vadmin_auth_dept.id = vadmin_auth_user_depts.dept_id + # WHERE + # vadmin_auth_dept.is_delete = FALSE + # AND vadmin_auth_user.id = 1 + + async def test_left_join(self): + """ + 多对多左连接查询示例: + 查询出所有用户信息,并加载用户关联所有部门,左连接条件:只需要查询出该用户关联的部门负责人为"张伟"的部门即可,其他部门不需要显示, + :return: + """ + # 封装查询语句 + dept_alias = aliased(models.VadminDept) + v_options = [contains_eager(self.model.depts, alias=dept_alias)] + v_outer_join = [ + [models.vadmin_auth_user_depts, self.model.id == models.vadmin_auth_user_depts.c.user_id], + [dept_alias, and_(dept_alias.id == models.vadmin_auth_user_depts.c.dept_id, dept_alias.owner == "张伟")] + ] + datas: list[models.VadminUser] = await self.get_datas( + limit=0, + v_outer_join=v_outer_join, + v_options=v_options, + v_return_objs=True, + v_expire_all=True ) - if result: - print(result) - print(count) - # print("-----------------------结束------------------------") + for data in datas: + print(f"用户编号:{data.id} 用户名称:{data.name} 共查询出关联的部门负责人为‘张伟’的部门有如下:") + for dept in data.depts: + print(f" 部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}") + + # 原查询语句: + # DeptAlias = aliased(models.VadminDept) + # sql = select(self.model).where(self.model.is_delete == false()) + # sql = sql.options(contains_eager(self.model.depts, alias=DeptAlias)) + # sql = sql.outerjoin(models.vadmin_auth_user_depts, self.model.id == models.vadmin_auth_user_depts.c.user_id) + # sql = sql.outerjoin( + # DeptAlias, + # and_(DeptAlias.id == models.vadmin_auth_user_depts.c.dept_id, DeptAlias.owner == "张伟") + # ) + # self.db.expire_all() + # queryset = await self.db.scalars(sql) + # result = queryset.unique().all() + # for data in result: + # print(f"用户编号:{data.id} 用户名称:{data.name} 共查询出关联的部门负责人为‘张伟’的部门有如下:") + # for dept in data.depts: + # print(f" 部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}") + + async def get_user_depts(self): + """ + 获取用户部门列表 + :return: + """ + sql1 = select(models.VadminUser).options(joinedload(models.VadminUser.depts)) + queryset1 = await self.db.scalars(sql1) + datas1 = queryset1.unique().all() + for data in datas1: + print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}") + + async def relationship_where_operations_any(self): + """ + 关系运算符操作:any 方法使用示例 + 官方文档: https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#relationship-where-operators + + any 方法用于一对多关系中,允许在 any 方法中指定一个条件,该条件会生成一个 SQL 表达式,只有满足该条件的元素才会被查询出来。 + :return: + """ + print("==============================any 方法使用案例1=========================================") + # 用户表(models.VadminUser)与 部门表(VadminDept)为多对多关系 + # 查找出只有满足关联了部门名称为 "人事一部" 的所有用户,没有关联的则不会查询出来 + sql1 = select(models.VadminUser).where(models.VadminUser.depts.any(models.VadminDept.name == "人事一部")) + queryset1 = await self.db.scalars(sql1) + result1 = queryset1.unique().all() + for data in result1: + print(f"用户编号:{data.id} 用户名称:{data.name}") + + print("==============================any 方法使用案例2=========================================") + # 案例1 取反,查找出只有满足没有关联了部门名称为 "人事一部" 的所有用户,关联的则不会查询出来 + sql2 = select(models.VadminUser).where(~models.VadminUser.depts.any(models.VadminDept.name == "人事一部")) + queryset2 = await self.db.scalars(sql2) + result2 = queryset2.unique().all() + for data in result2: + print(f"用户编号:{data.id} 用户名称:{data.name}") + + print("==============================any 方法使用案例3=========================================") + # 查询出没有关联部门的所有用户 + sql3 = select(models.VadminUser).where(~models.VadminUser.depts.any()) + queryset3 = await self.db.scalars(sql3) + result3 = queryset3.unique().all() + for data in result3: + print(f"用户编号:{data.id} 用户名称:{data.name}") + + async def relationship_where_operations_has(self): + """ + 关系运算符操作: has 方法使用示例 + 官方文档: https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#relationship-where-operators + + has 方法用于多对一关系中,与 any 方法使用方式同理,只有满足条件的元素才会被查询出来。 + + 对多关系中使用 has 方法会报错,报错内容如下: + sqlalchemy.exc.InvalidRequestError: 'has()' not implemented for collections. Use any(). + :return: + """ + print("==============================has 方法使用案例1=========================================") + # 用户(models.VadminUser)与 帮助问题(models.VadminIssue)为多对一关系 + # 查找出只有满足关联了用户名称为 "kinit" 的所有帮助问题,没有关联的则不会查询出来 + sql1 = select(vadmin_help_models.VadminIssue).where( + vadmin_help_models.VadminIssue.create_user.has(models.VadminUser.name == "kinit") + ) + queryset1 = await self.db.scalars(sql1) + result1 = queryset1.unique().all() + for data in result1: + print(f"问题编号:{data.id} 问题标题:{data.title}") diff --git a/kinit-api/apps/vadmin/auth/views.py b/kinit-api/apps/vadmin/auth/views.py index 3d4108f..d756330 100644 --- a/kinit-api/apps/vadmin/auth/views.py +++ b/kinit-api/apps/vadmin/auth/views.py @@ -24,9 +24,8 @@ app = APIRouter() # 接口测试 ########################################################### @app.get("/test", summary="接口测试") -async def test(auth: Auth = Depends(FullAdminAuth())): - print(auth) - return SuccessResponse() +async def test(auth: Auth = Depends(OpenAuth())): + return SuccessResponse(await crud.TestDal(auth.db).relationship_where_operations_has()) ###########################################################