From ebc0095ca6129913566770d60e2cb448ed5eec09 Mon Sep 17 00:00:00 2001 From: ktianc Date: Sun, 21 Jan 2024 10:50:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20mongo=20=E4=B8=8E=20redis?= =?UTF-8?q?=20=E6=95=B0=E6=8D=AE=E5=BA=93=E8=BF=9E=E6=8E=A5=E6=A3=80?= =?UTF-8?q?=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kinit-api/core/event.py | 40 ++++++++++++++++++++------- kinit-task/core/mongo/mongo_manage.py | 25 ++++++++++++++++- kinit-task/core/redis/redis_manage.py | 19 ++++++++++++- kinit-task/main.py | 10 ++++--- 4 files changed, 78 insertions(+), 16 deletions(-) diff --git a/kinit-api/core/event.py b/kinit-api/core/event.py index a26d2da..4842b98 100644 --- a/kinit-api/core/event.py +++ b/kinit-api/core/event.py @@ -12,6 +12,7 @@ from motor.motor_asyncio import AsyncIOMotorClient from application.settings import REDIS_DB_URL, MONGO_DB_URL, MONGO_DB_NAME, EVENTS from utils.cache import Cache from redis import asyncio as aioredis +from redis.exceptions import AuthenticationError, TimeoutError, RedisError from contextlib import asynccontextmanager from utils.tools import import_modules_async from sqlalchemy.exc import ProgrammingError @@ -64,15 +65,27 @@ async def connect_redis(app: FastAPI, status: bool): :return: """ if status: - print("Connecting to Redis") - app.state.redis = aioredis.from_url(REDIS_DB_URL, decode_responses=True, health_check_interval=1) + rd = aioredis.from_url(REDIS_DB_URL, decode_responses=True, health_check_interval=1) + app.state.redis = rd + try: + response = await rd.ping() + if response: + print("Redis 连接成功") + else: + print("Redis 连接失败") + except AuthenticationError as e: + raise AuthenticationError(f"Redis 连接认证失败,用户名或密码错误: {e}") + except TimeoutError as e: + raise AuthenticationError(f"Redis 连接超时,地址或者端口错误: {e}") + except RedisError as e: + raise RedisError(f"Redis 连接失败: {e}") try: await Cache(app.state.redis).cache_tab_names() except ProgrammingError as e: logger.error(f"sqlalchemy.exc.ProgrammingError: {e}") print(f"sqlalchemy.exc.ProgrammingError: {e}") else: - print("Redis connection closed") + print("Redis 连接关闭") await app.state.redis.close() @@ -88,14 +101,21 @@ async def connect_mongo(app: FastAPI, status: bool): :return: """ if status: - client: AsyncIOMotorClient = AsyncIOMotorClient(MONGO_DB_URL, maxPoolSize=10, minPoolSize=10) + client: AsyncIOMotorClient = AsyncIOMotorClient( + MONGO_DB_URL, + maxPoolSize=10, + minPoolSize=10, + serverSelectionTimeoutMS=5000 + ) app.state.mongo_client = client app.state.mongo = client[MONGO_DB_NAME] - print("Connecting to Mongo") + # 尝试连接并捕获可能的超时异常 + try: + # 触发一次服务器通信来确认连接 + data = await client.server_info() + print("MongoDB 连接成功", data) + except Exception as e: + raise ValueError(f"MongoDB 连接失败: {e}") else: - print("Mongo connection closed") + print("MongoDB 连接关闭") app.state.mongo_client.close() - - - - diff --git a/kinit-task/core/mongo/mongo_manage.py b/kinit-task/core/mongo/mongo_manage.py index 9e8770e..25acbe5 100644 --- a/kinit-task/core/mongo/mongo_manage.py +++ b/kinit-task/core/mongo/mongo_manage.py @@ -6,6 +6,7 @@ from pymongo import MongoClient from pymongo.results import InsertOneResult, UpdateResult from pymongo.mongo_client import MongoClient as MongoClientType from pymongo.database import Database +from pymongo.errors import ServerSelectionTimeoutError class MongoManage: @@ -25,8 +26,30 @@ class MongoManage: :param db_name: 数据库名称 :return: """ - self.client = MongoClient(path) + # 设置连接超时时长为5秒 + self.client = MongoClient(path, serverSelectionTimeoutMS=5000) self.db = self.client[db_name] + self.test_connect() + + def get_databases(self): + """ + 获取数据库列表,用来测试是否真的连接成功 + :return: + """ + return self.client.list_database_names() + + def test_connect(self): + """ + 测试连接是否成功 + :return: + """ + # 尝试连接并捕获可能的超时异常 + try: + # 触发一次服务器通信来确认连接 + self.client.server_info() + print("MongoDB 连接成功") + except ServerSelectionTimeoutError as e: + raise ServerSelectionTimeoutError(f"MongoDB 连接失败: {e}") def close_database_connection(self) -> None: """ diff --git a/kinit-task/core/redis/redis_manage.py b/kinit-task/core/redis/redis_manage.py index a7d5a37..108126c 100644 --- a/kinit-task/core/redis/redis_manage.py +++ b/kinit-task/core/redis/redis_manage.py @@ -12,10 +12,27 @@ class RedisManage: """ 连接 redis 数据库 - :param path: mongodb 链接地址 + :param path: redis 链接地址 :return: """ self.rd = redis.from_url(path) + self.test_connect() + + def test_connect(self) -> None: + """ + 测试链接 + :return: + """ + try: + # 发送PING命令 + response = self.rd.ping() + if response: + print("Redis 连接成功") + else: + print("Redis 连接失败") + except redis.exceptions.RedisError as e: + # 捕获并处理任何Redis错误 + raise redis.exceptions.RedisError(f"Redis 连接失败: {e}") def close_database_connection(self) -> None: """ diff --git a/kinit-task/main.py b/kinit-task/main.py index 41ad4a1..622ec48 100644 --- a/kinit-task/main.py +++ b/kinit-task/main.py @@ -10,6 +10,7 @@ from application.settings import MONGO_DB_NAME, MONGO_DB_URL, REDIS_DB_URL, SUBS SCHEDULER_TASK_RECORD from core.redis import get_database as get_redis from core.logger import logger +from core.redis.redis_manage import RedisManage class ScheduledTask: @@ -98,6 +99,8 @@ class ScheduledTask: self.start_scheduler() self.start_redis() + assert isinstance(self.rd, RedisManage) + pubsub = self.rd.subscribe(SUBSCRIBE) logger.info("已成功启动程序,等待接收消息...") @@ -124,7 +127,6 @@ class ScheduledTask: """ self.mongo = get_mongo() self.mongo.connect_to_database(MONGO_DB_URL, MONGO_DB_NAME) - print("成功连接 MongoDB") def start_scheduler(self) -> None: """ @@ -134,7 +136,7 @@ class ScheduledTask: """ self.scheduler = Scheduler() self.scheduler.start() - print("成功启动 Scheduler") + print("Scheduler 启动成功") def start_redis(self) -> None: """ @@ -144,7 +146,6 @@ class ScheduledTask: """ self.rd = get_redis() self.rd.connect_to_database(REDIS_DB_URL) - print("成功连接 Redis") def close(self) -> None: """ @@ -155,7 +156,8 @@ class ScheduledTask: :return: """ self.mongo.close_database_connection() - self.scheduler.shutdown() + if self.scheduler: + self.scheduler.shutdown() self.rd.close_database_connection()