From a85c5f9d4e6979942fc1ed938227e0c750e9bda4 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Mon, 18 May 2026 18:36:48 +0800 Subject: [PATCH] fix(scheduler): make scheduler init idempotent to prevent duplicate task runs --- agent/tools/scheduler/integration.py | 116 ++++++++++++++++----------- bridge/agent_initializer.py | 29 ++++--- 2 files changed, 87 insertions(+), 58 deletions(-) diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py index 49d2fdfa..9e559a43 100644 --- a/agent/tools/scheduler/integration.py +++ b/agent/tools/scheduler/integration.py @@ -3,6 +3,7 @@ Integration module for scheduler with AgentBridge """ import os +import threading from typing import Optional from config import conf from common.log import logger @@ -13,65 +14,82 @@ from bridge.reply import Reply, ReplyType # Global scheduler service instance _scheduler_service = None _task_store = None +# Module-level lock to guard idempotent initialization across threads +_init_lock = threading.Lock() def init_scheduler(agent_bridge) -> bool: """ - Initialize scheduler service - + Initialize scheduler service (idempotent). + + Safe to call multiple times and from multiple threads: only the first + successful call creates the singleton ``SchedulerService`` + background + scanning thread. Subsequent calls return immediately. + Args: agent_bridge: AgentBridge instance - + Returns: - True if initialized successfully + True if scheduler is initialized (newly created or already running) """ global _scheduler_service, _task_store - - try: - from agent.tools.scheduler.task_store import TaskStore - from agent.tools.scheduler.scheduler_service import SchedulerService - - # Get workspace from config - workspace_root = expand_path(conf().get("agent_workspace", "~/cow")) - store_path = os.path.join(workspace_root, "scheduler", "tasks.json") - - # Create task store - _task_store = TaskStore(store_path) - logger.debug(f"[Scheduler] Task store initialized: {store_path}") - - # Create execute callback - def execute_task_callback(task: dict): - """Callback to execute a scheduled task""" - try: - action = task.get("action", {}) - action_type = action.get("type") - - if action_type == "agent_task": - _execute_agent_task(task, agent_bridge) - elif action_type == "send_message": - # Legacy support for old tasks - _execute_send_message(task, agent_bridge) - elif action_type == "tool_call": - # Legacy support for old tasks - _execute_tool_call(task, agent_bridge) - elif action_type == "skill_call": - # Legacy support for old tasks - _execute_skill_call(task, agent_bridge) - else: - logger.warning(f"[Scheduler] Unknown action type: {action_type}") - except Exception as e: - logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}") - - # Create scheduler service - _scheduler_service = SchedulerService(_task_store, execute_task_callback) - _scheduler_service.start() - - logger.debug("[Scheduler] Scheduler service initialized and started") + + # Fast path: already initialized and running + if _scheduler_service is not None and getattr(_scheduler_service, "running", False): return True - - except Exception as e: - logger.error(f"[Scheduler] Failed to initialize scheduler: {e}") - return False + + with _init_lock: + # Re-check under the lock to avoid races where multiple threads + # passed the fast-path check before any of them acquired the lock. + if _scheduler_service is not None and getattr(_scheduler_service, "running", False): + return True + + try: + from agent.tools.scheduler.task_store import TaskStore + from agent.tools.scheduler.scheduler_service import SchedulerService + + # Get workspace from config + workspace_root = expand_path(conf().get("agent_workspace", "~/cow")) + store_path = os.path.join(workspace_root, "scheduler", "tasks.json") + + # Create task store (reuse if already created) + if _task_store is None: + _task_store = TaskStore(store_path) + logger.debug(f"[Scheduler] Task store initialized: {store_path}") + + # Create execute callback + def execute_task_callback(task: dict): + """Callback to execute a scheduled task""" + try: + action = task.get("action", {}) + action_type = action.get("type") + + if action_type == "agent_task": + _execute_agent_task(task, agent_bridge) + elif action_type == "send_message": + # Legacy support for old tasks + _execute_send_message(task, agent_bridge) + elif action_type == "tool_call": + # Legacy support for old tasks + _execute_tool_call(task, agent_bridge) + elif action_type == "skill_call": + # Legacy support for old tasks + _execute_skill_call(task, agent_bridge) + else: + logger.warning(f"[Scheduler] Unknown action type: {action_type}") + except Exception as e: + logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}") + + # Create scheduler service + _scheduler_service = SchedulerService(_task_store, execute_task_callback) + _scheduler_service.start() + + logger.debug("[Scheduler] Scheduler service initialized and started") + return True + + except Exception as e: + logger.error(f"[Scheduler] Failed to initialize scheduler: {e}") + return False def get_task_store(): diff --git a/bridge/agent_initializer.py b/bridge/agent_initializer.py index 78f01e51..70e34a5f 100644 --- a/bridge/agent_initializer.py +++ b/bridge/agent_initializer.py @@ -5,6 +5,7 @@ Agent Initializer - Handles agent initialization logic import os import asyncio import datetime +import threading import time from typing import Optional, List @@ -13,6 +14,9 @@ from agent.tools import ToolManager from common.log import logger from common.utils import expand_path +# Module-level lock to serialize scheduler init across concurrent sessions +_scheduler_init_lock = threading.Lock() + class AgentInitializer: """ @@ -406,16 +410,23 @@ class AgentInitializer: return tools def _initialize_scheduler(self, tools: List, session_id: Optional[str] = None): - """Initialize scheduler service if needed""" + """Initialize scheduler service if needed. + + Serialize the check-and-set under a module-level lock so concurrent + first-time session inits cannot each create a new SchedulerService + (which would leak background scanning threads). + """ if not self.agent_bridge.scheduler_initialized: - try: - from agent.tools.scheduler.integration import init_scheduler - if init_scheduler(self.agent_bridge): - self.agent_bridge.scheduler_initialized = True - if session_id is None: - logger.info("[AgentInitializer] Scheduler service initialized") - except Exception as e: - logger.warning(f"[AgentInitializer] Failed to initialize scheduler: {e}") + with _scheduler_init_lock: + if not self.agent_bridge.scheduler_initialized: + try: + from agent.tools.scheduler.integration import init_scheduler + if init_scheduler(self.agent_bridge): + self.agent_bridge.scheduler_initialized = True + if session_id is None: + logger.info("[AgentInitializer] Scheduler service initialized") + except Exception as e: + logger.warning(f"[AgentInitializer] Failed to initialize scheduler: {e}") # Inject scheduler dependencies if self.agent_bridge.scheduler_initialized: