新增多种数据操作测试用例,可用来作为参考

This commit is contained in:
ktianc 2024-01-21 10:53:37 +08:00
parent 7eb590a697
commit a5476a92d3
2 changed files with 312 additions and 24 deletions

View File

@ -9,11 +9,12 @@
from typing import Any from typing import Any
from redis.asyncio import Redis from redis.asyncio import Redis
from fastapi import UploadFile from fastapi import UploadFile
from sqlalchemy.orm import joinedload from sqlalchemy.exc import StatementError
from sqlalchemy.orm.strategy_options import _AbstractLoad from sqlalchemy.orm import joinedload, aliased
from sqlalchemy.orm.strategy_options import _AbstractLoad, contains_eager
from core.exception import CustomException from core.exception import CustomException
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from sqlalchemy import select, false from sqlalchemy import select, false, and_
from core.crud import DalBase from core.crud import DalBase
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from core.validator import vali_telephone from core.validator import vali_telephone
@ -28,6 +29,7 @@ from . import models, schemas
from application import settings from application import settings
from utils.excel.excel_manage import ExcelManage from utils.excel.excel_manage import ExcelManage
from apps.vadmin.system import crud as vadmin_system_crud from apps.vadmin.system import crud as vadmin_system_crud
from apps.vadmin.help import models as vadmin_help_models
import copy import copy
from utils import status from utils import status
from utils.wx.oauth import WXOAuth from utils.wx.oauth import WXOAuth
@ -138,7 +140,7 @@ class UserDal(DalBase):
:param v_schema: :param v_schema:
:return: :return:
""" """
obj = await self.get_data(data_id, v_options=[joinedload(self.model.roles)]) obj = await self.get_data(data_id, v_options=[joinedload(self.model.roles), joinedload(self.model.depts)])
data_dict = jsonable_encoder(data) data_dict = jsonable_encoder(data)
for key, value in data_dict.items(): for key, value in data_dict.items():
if key == "role_ids": if key == "role_ids":
@ -743,22 +745,309 @@ class DeptDal(DalBase):
class TestDal(DalBase): class TestDal(DalBase):
def __init__(self, db: AsyncSession): def __init__(self, db: AsyncSession):
super(TestDal, self).__init__(db, models.VadminUser, schemas.UserSimpleOut) super(TestDal, self).__init__()
self.db = db
self.model = models.VadminUser
async def test(self): async def test_session_cache(self):
# print("-----------------------开始------------------------") """
options = [joinedload(self.model.roles)] SQLAlchemy 会话Session缓存机制
v_join = [[self.model.roles]] 当你通过一个会话查询数据库时SQLAlchemy 首先检查这个对象是否已经在会话缓存中如果是它会直接从缓存中返回对象而不是从数据库重新加载
v_where = [self.model.id == 1, models.VadminRole.id == 1] 在一个会话中对于具有相同主键的实体会话缓存确保只有一个唯一的对象实例这有助于维护数据的一致性
v_start_sql = select(self.model)
result, count = await self.get_datas( 会话Session缓存https://blog.csdn.net/k_genius/article/details/135491059
v_start_sql=v_start_sql, :return:
v_join=v_join, """
v_options=options, print("==================================会话缓存====================================")
v_where=v_where, await self.test_session_cache1()
v_return_count=True print("=============查询出单个对象结果后,即使没有通过.访问属性,同样会产生缓存=============")
await self.test_session_cache2()
print("=============================数据列表会话缓存==================================")
await self.test_session_cache3()
print("=============================expire 单个对象过期==============================")
await self.test_session_cache4()
print("=========expire 单个对象过期后,重新访问之前对象的属性也会重新查询数据库,但是不会重新加载关系==========")
await self.test_session_cache5()
async def test_session_cache1(self):
"""
SQLAlchemy 会话Session缓存机制
当你通过一个会话查询数据库时SQLAlchemy 首先检查这个对象是否已经在会话缓存中如果是它会直接从缓存中返回对象而不是从数据库重新加载
在一个会话中对于具有相同主键的实体会话缓存确保只有一个唯一的对象实例这有助于维护数据的一致性
会话Session缓存https://blog.csdn.net/k_genius/article/details/135491059
示例会话缓存
:return:
"""
# 第一次查询,并加载用户的所有关联部门项
sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
user1 = queryset1.unique().first()
print(f"用户编号:{user1.id} 用户姓名:{user1.name} 关联部门 {[i.name for i in user1.depts]}")
# 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
queryset2 = await self.db.scalars(sql2)
user2 = queryset2.first()
print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
async def test_session_cache2(self):
"""
SQLAlchemy 会话Session缓存机制
当你通过一个会话查询数据库时SQLAlchemy 首先检查这个对象是否已经在会话缓存中如果是它会直接从缓存中返回对象而不是从数据库重新加载
在一个会话中对于具有相同主键的实体会话缓存确保只有一个唯一的对象实例这有助于维护数据的一致性
会话Session缓存https://blog.csdn.net/k_genius/article/details/135491059
示例查询出单个对象结果后即使没有通过.访问属性同样会产生缓存
:return:
"""
# 第一次查询,并加载用户的所有关联部门项,但是不访问用户的属性
sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
user1 = queryset1.unique().first()
print(f"没有访问属性,也会产生缓存")
# 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
queryset2 = await self.db.scalars(sql2)
user2 = queryset2.first()
print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
async def test_session_cache3(self):
"""
SQLAlchemy 会话Session缓存机制
当你通过一个会话查询数据库时SQLAlchemy 首先检查这个对象是否已经在会话缓存中如果是它会直接从缓存中返回对象而不是从数据库重新加载
在一个会话中对于具有相同主键的实体会话缓存确保只有一个唯一的对象实例这有助于维护数据的一致性
会话Session缓存https://blog.csdn.net/k_genius/article/details/135491059
示例数据列表会话缓存
:return:
"""
# 第一次查询出所有用户,并加载用户的所有关联部门项
sql1 = select(models.VadminUser).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
datas1 = queryset1.unique().all()
for data in datas1:
print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
# 第二次即使没有加载用户关联的部门,同样可以访问,因为这里会默认从会话缓存中获取
sql2 = select(models.VadminUser)
queryset2 = await self.db.scalars(sql2)
datas2 = queryset2.all()
for data in datas2:
print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
async def test_session_cache4(self):
"""
SQLAlchemy 会话Session缓存机制
当你通过一个会话查询数据库时SQLAlchemy 首先检查这个对象是否已经在会话缓存中如果是它会直接从缓存中返回对象而不是从数据库重新加载
在一个会话中对于具有相同主键的实体会话缓存确保只有一个唯一的对象实例这有助于维护数据的一致性
会话Session缓存https://blog.csdn.net/k_genius/article/details/135491059
示例expire 单个对象过期
:return:
"""
# 第一次查询,并加载用户的所有关联部门项
sql1 = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
user1 = queryset1.unique().first()
print(f"用户编号:{user1.id} 用户姓名:{user1.name} 关联部门 {[i.name for i in user1.depts]}")
# 使当前会话Session中的 user1 对象过期,再次访问就会重新查询数据库数据
self.db.expire(user1)
# 第二次查询会发现会话中没有该对象的缓存,会重新在数据库中查询
sql2 = select(models.VadminUser).where(models.VadminUser.id == 1)
queryset2 = await self.db.scalars(sql2)
user2 = queryset2.first()
try:
print(f"用户编号:{user2.id} 用户姓名:{user2.name} 关联部门 {[i.name for i in user2.depts]}")
except StatementError:
print("访问部门报错了!!!!!")
async def test_session_cache5(self):
"""
SQLAlchemy 会话Session缓存机制
当你通过一个会话查询数据库时SQLAlchemy 首先检查这个对象是否已经在会话缓存中如果是它会直接从缓存中返回对象而不是从数据库重新加载
在一个会话中对于具有相同主键的实体会话缓存确保只有一个唯一的对象实例这有助于维护数据的一致性
会话Session缓存https://blog.csdn.net/k_genius/article/details/135491059
示例expire 单个对象过期后重新访问之前对象的属性也会重新查询数据库但是不会重新加载关系
:return:
"""
# 第一次查询,并加载用户的所有关联部门项
sql = select(models.VadminUser).where(models.VadminUser.id == 1).options(joinedload(models.VadminUser.depts))
queryset = await self.db.scalars(sql)
user = queryset.unique().first()
print(f"用户编号:{user.id} 用户姓名:{user.name} 关联部门 {[i.name for i in user.depts]}")
# 使当前会话Session中的 user9 对象过期,再次访问就会重新查询数据库数据
self.db.expire(user)
# 第二次查询会发现会话中没有该对象的缓存,会重新在数据库中查询,但是不会重新加载关系
try:
print(f"用户编号:{user.id} 用户姓名:{user.name} 关联部门 {[i.name for i in user.depts]}")
except StatementError:
print("访问部门报错了!!!!!")
async def test_join_form(self):
"""
join_form 使用示例通过关联表的查询条件反查询出主表的数据
官方描述在当前 Select 的左侧不符合我们想要从中进行连接的情况下可以使用 Select.join_from() 方法
官方文档https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#setting-the-leftmost-from-clause-in-a-join
查询条件获取指定用户所关联的所有部门列表数据只返回关联的部门列表数据
:return:
"""
# 设定用户编号为1
user_id = 1
sql = select(models.VadminDept).where(models.VadminDept.is_delete == false())
sql = sql.join_from(models.VadminUser, models.VadminUser.depts).where(models.VadminUser.id == user_id)
queryset = await self.db.scalars(sql)
result = queryset.unique().all()
for dept in result:
print(f"部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
# 转换后的 SQL
# SELECT
# vadmin_auth_dept.NAME,
# vadmin_auth_dept.dept_key,
# vadmin_auth_dept.disabled,
# vadmin_auth_dept.order,
# vadmin_auth_dept.desc,
# vadmin_auth_dept.OWNER,
# vadmin_auth_dept.phone,
# vadmin_auth_dept.email,
# vadmin_auth_dept.parent_id,
# vadmin_auth_dept.id,
# vadmin_auth_dept.create_datetime,
# vadmin_auth_dept.update_datetime,
# vadmin_auth_dept.delete_datetime,
# vadmin_auth_dept.is_delete
# FROM
# vadmin_auth_user
# JOIN vadmin_auth_user_depts ON vadmin_auth_user.id = vadmin_auth_user_depts.user_id
# JOIN vadmin_auth_dept ON vadmin_auth_dept.id = vadmin_auth_user_depts.dept_id
# WHERE
# vadmin_auth_dept.is_delete = FALSE
# AND vadmin_auth_user.id = 1
async def test_left_join(self):
"""
多对多左连接查询示例
查询出所有用户信息并加载用户关联所有部门左连接条件只需要查询出该用户关联的部门负责人为"张伟"的部门即可其他部门不需要显示
:return:
"""
# 封装查询语句
dept_alias = aliased(models.VadminDept)
v_options = [contains_eager(self.model.depts, alias=dept_alias)]
v_outer_join = [
[models.vadmin_auth_user_depts, self.model.id == models.vadmin_auth_user_depts.c.user_id],
[dept_alias, and_(dept_alias.id == models.vadmin_auth_user_depts.c.dept_id, dept_alias.owner == "张伟")]
]
datas: list[models.VadminUser] = await self.get_datas(
limit=0,
v_outer_join=v_outer_join,
v_options=v_options,
v_return_objs=True,
v_expire_all=True
) )
if result: for data in datas:
print(result) print(f"用户编号:{data.id} 用户名称:{data.name} 共查询出关联的部门负责人为‘张伟’的部门有如下:")
print(count) for dept in data.depts:
# print("-----------------------结束------------------------") print(f" 部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
# 原查询语句:
# DeptAlias = aliased(models.VadminDept)
# sql = select(self.model).where(self.model.is_delete == false())
# sql = sql.options(contains_eager(self.model.depts, alias=DeptAlias))
# sql = sql.outerjoin(models.vadmin_auth_user_depts, self.model.id == models.vadmin_auth_user_depts.c.user_id)
# sql = sql.outerjoin(
# DeptAlias,
# and_(DeptAlias.id == models.vadmin_auth_user_depts.c.dept_id, DeptAlias.owner == "张伟")
# )
# self.db.expire_all()
# queryset = await self.db.scalars(sql)
# result = queryset.unique().all()
# for data in result:
# print(f"用户编号:{data.id} 用户名称:{data.name} 共查询出关联的部门负责人为‘张伟’的部门有如下:")
# for dept in data.depts:
# print(f" 部门编号:{dept.id} 部门名称:{dept.name} 部门负责人:{dept.owner}")
async def get_user_depts(self):
"""
获取用户部门列表
:return:
"""
sql1 = select(models.VadminUser).options(joinedload(models.VadminUser.depts))
queryset1 = await self.db.scalars(sql1)
datas1 = queryset1.unique().all()
for data in datas1:
print(f"用户编号:{data.id} 用户姓名:{data.name} 关联部门 {[i.name for i in data.depts]}")
async def relationship_where_operations_any(self):
"""
关系运算符操作any 方法使用示例
官方文档 https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#relationship-where-operators
any 方法用于一对多关系中允许在 any 方法中指定一个条件该条件会生成一个 SQL 表达式只有满足该条件的元素才会被查询出来
:return:
"""
print("==============================any 方法使用案例1=========================================")
# 用户表models.VadminUser与 部门表VadminDept为多对多关系
# 查找出只有满足关联了部门名称为 "人事一部" 的所有用户,没有关联的则不会查询出来
sql1 = select(models.VadminUser).where(models.VadminUser.depts.any(models.VadminDept.name == "人事一部"))
queryset1 = await self.db.scalars(sql1)
result1 = queryset1.unique().all()
for data in result1:
print(f"用户编号:{data.id} 用户名称:{data.name}")
print("==============================any 方法使用案例2=========================================")
# 案例1 取反,查找出只有满足没有关联了部门名称为 "人事一部" 的所有用户,关联的则不会查询出来
sql2 = select(models.VadminUser).where(~models.VadminUser.depts.any(models.VadminDept.name == "人事一部"))
queryset2 = await self.db.scalars(sql2)
result2 = queryset2.unique().all()
for data in result2:
print(f"用户编号:{data.id} 用户名称:{data.name}")
print("==============================any 方法使用案例3=========================================")
# 查询出没有关联部门的所有用户
sql3 = select(models.VadminUser).where(~models.VadminUser.depts.any())
queryset3 = await self.db.scalars(sql3)
result3 = queryset3.unique().all()
for data in result3:
print(f"用户编号:{data.id} 用户名称:{data.name}")
async def relationship_where_operations_has(self):
"""
关系运算符操作 has 方法使用示例
官方文档 https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#relationship-where-operators
has 方法用于多对一关系中 any 方法使用方式同理只有满足条件的元素才会被查询出来
对多关系中使用 has 方法会报错报错内容如下
sqlalchemy.exc.InvalidRequestError: 'has()' not implemented for collections. Use any().
:return:
"""
print("==============================has 方法使用案例1=========================================")
# 用户models.VadminUser与 帮助问题models.VadminIssue为多对一关系
# 查找出只有满足关联了用户名称为 "kinit" 的所有帮助问题,没有关联的则不会查询出来
sql1 = select(vadmin_help_models.VadminIssue).where(
vadmin_help_models.VadminIssue.create_user.has(models.VadminUser.name == "kinit")
)
queryset1 = await self.db.scalars(sql1)
result1 = queryset1.unique().all()
for data in result1:
print(f"问题编号:{data.id} 问题标题:{data.title}")

View File

@ -24,9 +24,8 @@ app = APIRouter()
# 接口测试 # 接口测试
########################################################### ###########################################################
@app.get("/test", summary="接口测试") @app.get("/test", summary="接口测试")
async def test(auth: Auth = Depends(FullAdminAuth())): async def test(auth: Auth = Depends(OpenAuth())):
print(auth) return SuccessResponse(await crud.TestDal(auth.db).relationship_where_operations_has())
return SuccessResponse()
########################################################### ###########################################################