新增 mongo 与 redis 数据库连接检查
This commit is contained in:
parent
9ceeacb97b
commit
ebc0095ca6
@ -12,6 +12,7 @@ from motor.motor_asyncio import AsyncIOMotorClient
|
|||||||
from application.settings import REDIS_DB_URL, MONGO_DB_URL, MONGO_DB_NAME, EVENTS
|
from application.settings import REDIS_DB_URL, MONGO_DB_URL, MONGO_DB_NAME, EVENTS
|
||||||
from utils.cache import Cache
|
from utils.cache import Cache
|
||||||
from redis import asyncio as aioredis
|
from redis import asyncio as aioredis
|
||||||
|
from redis.exceptions import AuthenticationError, TimeoutError, RedisError
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from utils.tools import import_modules_async
|
from utils.tools import import_modules_async
|
||||||
from sqlalchemy.exc import ProgrammingError
|
from sqlalchemy.exc import ProgrammingError
|
||||||
@ -64,15 +65,27 @@ async def connect_redis(app: FastAPI, status: bool):
|
|||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
if status:
|
if status:
|
||||||
print("Connecting to Redis")
|
rd = aioredis.from_url(REDIS_DB_URL, decode_responses=True, health_check_interval=1)
|
||||||
app.state.redis = aioredis.from_url(REDIS_DB_URL, decode_responses=True, health_check_interval=1)
|
app.state.redis = rd
|
||||||
|
try:
|
||||||
|
response = await rd.ping()
|
||||||
|
if response:
|
||||||
|
print("Redis 连接成功")
|
||||||
|
else:
|
||||||
|
print("Redis 连接失败")
|
||||||
|
except AuthenticationError as e:
|
||||||
|
raise AuthenticationError(f"Redis 连接认证失败,用户名或密码错误: {e}")
|
||||||
|
except TimeoutError as e:
|
||||||
|
raise AuthenticationError(f"Redis 连接超时,地址或者端口错误: {e}")
|
||||||
|
except RedisError as e:
|
||||||
|
raise RedisError(f"Redis 连接失败: {e}")
|
||||||
try:
|
try:
|
||||||
await Cache(app.state.redis).cache_tab_names()
|
await Cache(app.state.redis).cache_tab_names()
|
||||||
except ProgrammingError as e:
|
except ProgrammingError as e:
|
||||||
logger.error(f"sqlalchemy.exc.ProgrammingError: {e}")
|
logger.error(f"sqlalchemy.exc.ProgrammingError: {e}")
|
||||||
print(f"sqlalchemy.exc.ProgrammingError: {e}")
|
print(f"sqlalchemy.exc.ProgrammingError: {e}")
|
||||||
else:
|
else:
|
||||||
print("Redis connection closed")
|
print("Redis 连接关闭")
|
||||||
await app.state.redis.close()
|
await app.state.redis.close()
|
||||||
|
|
||||||
|
|
||||||
@ -88,14 +101,21 @@ async def connect_mongo(app: FastAPI, status: bool):
|
|||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
if status:
|
if status:
|
||||||
client: AsyncIOMotorClient = AsyncIOMotorClient(MONGO_DB_URL, maxPoolSize=10, minPoolSize=10)
|
client: AsyncIOMotorClient = AsyncIOMotorClient(
|
||||||
|
MONGO_DB_URL,
|
||||||
|
maxPoolSize=10,
|
||||||
|
minPoolSize=10,
|
||||||
|
serverSelectionTimeoutMS=5000
|
||||||
|
)
|
||||||
app.state.mongo_client = client
|
app.state.mongo_client = client
|
||||||
app.state.mongo = client[MONGO_DB_NAME]
|
app.state.mongo = client[MONGO_DB_NAME]
|
||||||
print("Connecting to Mongo")
|
# 尝试连接并捕获可能的超时异常
|
||||||
|
try:
|
||||||
|
# 触发一次服务器通信来确认连接
|
||||||
|
data = await client.server_info()
|
||||||
|
print("MongoDB 连接成功", data)
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f"MongoDB 连接失败: {e}")
|
||||||
else:
|
else:
|
||||||
print("Mongo connection closed")
|
print("MongoDB 连接关闭")
|
||||||
app.state.mongo_client.close()
|
app.state.mongo_client.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ from pymongo import MongoClient
|
|||||||
from pymongo.results import InsertOneResult, UpdateResult
|
from pymongo.results import InsertOneResult, UpdateResult
|
||||||
from pymongo.mongo_client import MongoClient as MongoClientType
|
from pymongo.mongo_client import MongoClient as MongoClientType
|
||||||
from pymongo.database import Database
|
from pymongo.database import Database
|
||||||
|
from pymongo.errors import ServerSelectionTimeoutError
|
||||||
|
|
||||||
|
|
||||||
class MongoManage:
|
class MongoManage:
|
||||||
@ -25,8 +26,30 @@ class MongoManage:
|
|||||||
:param db_name: 数据库名称
|
:param db_name: 数据库名称
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
self.client = MongoClient(path)
|
# 设置连接超时时长为5秒
|
||||||
|
self.client = MongoClient(path, serverSelectionTimeoutMS=5000)
|
||||||
self.db = self.client[db_name]
|
self.db = self.client[db_name]
|
||||||
|
self.test_connect()
|
||||||
|
|
||||||
|
def get_databases(self):
|
||||||
|
"""
|
||||||
|
获取数据库列表,用来测试是否真的连接成功
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
return self.client.list_database_names()
|
||||||
|
|
||||||
|
def test_connect(self):
|
||||||
|
"""
|
||||||
|
测试连接是否成功
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
# 尝试连接并捕获可能的超时异常
|
||||||
|
try:
|
||||||
|
# 触发一次服务器通信来确认连接
|
||||||
|
self.client.server_info()
|
||||||
|
print("MongoDB 连接成功")
|
||||||
|
except ServerSelectionTimeoutError as e:
|
||||||
|
raise ServerSelectionTimeoutError(f"MongoDB 连接失败: {e}")
|
||||||
|
|
||||||
def close_database_connection(self) -> None:
|
def close_database_connection(self) -> None:
|
||||||
"""
|
"""
|
||||||
|
@ -12,10 +12,27 @@ class RedisManage:
|
|||||||
"""
|
"""
|
||||||
连接 redis 数据库
|
连接 redis 数据库
|
||||||
|
|
||||||
:param path: mongodb 链接地址
|
:param path: redis 链接地址
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
self.rd = redis.from_url(path)
|
self.rd = redis.from_url(path)
|
||||||
|
self.test_connect()
|
||||||
|
|
||||||
|
def test_connect(self) -> None:
|
||||||
|
"""
|
||||||
|
测试链接
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# 发送PING命令
|
||||||
|
response = self.rd.ping()
|
||||||
|
if response:
|
||||||
|
print("Redis 连接成功")
|
||||||
|
else:
|
||||||
|
print("Redis 连接失败")
|
||||||
|
except redis.exceptions.RedisError as e:
|
||||||
|
# 捕获并处理任何Redis错误
|
||||||
|
raise redis.exceptions.RedisError(f"Redis 连接失败: {e}")
|
||||||
|
|
||||||
def close_database_connection(self) -> None:
|
def close_database_connection(self) -> None:
|
||||||
"""
|
"""
|
||||||
|
@ -10,6 +10,7 @@ from application.settings import MONGO_DB_NAME, MONGO_DB_URL, REDIS_DB_URL, SUBS
|
|||||||
SCHEDULER_TASK_RECORD
|
SCHEDULER_TASK_RECORD
|
||||||
from core.redis import get_database as get_redis
|
from core.redis import get_database as get_redis
|
||||||
from core.logger import logger
|
from core.logger import logger
|
||||||
|
from core.redis.redis_manage import RedisManage
|
||||||
|
|
||||||
|
|
||||||
class ScheduledTask:
|
class ScheduledTask:
|
||||||
@ -98,6 +99,8 @@ class ScheduledTask:
|
|||||||
self.start_scheduler()
|
self.start_scheduler()
|
||||||
self.start_redis()
|
self.start_redis()
|
||||||
|
|
||||||
|
assert isinstance(self.rd, RedisManage)
|
||||||
|
|
||||||
pubsub = self.rd.subscribe(SUBSCRIBE)
|
pubsub = self.rd.subscribe(SUBSCRIBE)
|
||||||
|
|
||||||
logger.info("已成功启动程序,等待接收消息...")
|
logger.info("已成功启动程序,等待接收消息...")
|
||||||
@ -124,7 +127,6 @@ class ScheduledTask:
|
|||||||
"""
|
"""
|
||||||
self.mongo = get_mongo()
|
self.mongo = get_mongo()
|
||||||
self.mongo.connect_to_database(MONGO_DB_URL, MONGO_DB_NAME)
|
self.mongo.connect_to_database(MONGO_DB_URL, MONGO_DB_NAME)
|
||||||
print("成功连接 MongoDB")
|
|
||||||
|
|
||||||
def start_scheduler(self) -> None:
|
def start_scheduler(self) -> None:
|
||||||
"""
|
"""
|
||||||
@ -134,7 +136,7 @@ class ScheduledTask:
|
|||||||
"""
|
"""
|
||||||
self.scheduler = Scheduler()
|
self.scheduler = Scheduler()
|
||||||
self.scheduler.start()
|
self.scheduler.start()
|
||||||
print("成功启动 Scheduler")
|
print("Scheduler 启动成功")
|
||||||
|
|
||||||
def start_redis(self) -> None:
|
def start_redis(self) -> None:
|
||||||
"""
|
"""
|
||||||
@ -144,7 +146,6 @@ class ScheduledTask:
|
|||||||
"""
|
"""
|
||||||
self.rd = get_redis()
|
self.rd = get_redis()
|
||||||
self.rd.connect_to_database(REDIS_DB_URL)
|
self.rd.connect_to_database(REDIS_DB_URL)
|
||||||
print("成功连接 Redis")
|
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
"""
|
"""
|
||||||
@ -155,6 +156,7 @@ class ScheduledTask:
|
|||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
self.mongo.close_database_connection()
|
self.mongo.close_database_connection()
|
||||||
|
if self.scheduler:
|
||||||
self.scheduler.shutdown()
|
self.scheduler.shutdown()
|
||||||
self.rd.close_database_connection()
|
self.rd.close_database_connection()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user