fix(scheduler): make scheduler init idempotent to prevent duplicate task runs

This commit is contained in:
zhayujie
2026-05-18 18:36:48 +08:00
parent 2720bba5b7
commit a85c5f9d4e
2 changed files with 87 additions and 58 deletions

View File

@@ -3,6 +3,7 @@ Integration module for scheduler with AgentBridge
"""
import os
import threading
from typing import Optional
from config import conf
from common.log import logger
@@ -13,65 +14,82 @@ from bridge.reply import Reply, ReplyType
# Global scheduler service instance
_scheduler_service = None
_task_store = None
# Module-level lock to guard idempotent initialization across threads
_init_lock = threading.Lock()
def init_scheduler(agent_bridge) -> bool:
"""
Initialize scheduler service
Initialize scheduler service (idempotent).
Safe to call multiple times and from multiple threads: only the first
successful call creates the singleton ``SchedulerService`` + background
scanning thread. Subsequent calls return immediately.
Args:
agent_bridge: AgentBridge instance
Returns:
True if initialized successfully
True if scheduler is initialized (newly created or already running)
"""
global _scheduler_service, _task_store
try:
from agent.tools.scheduler.task_store import TaskStore
from agent.tools.scheduler.scheduler_service import SchedulerService
# Get workspace from config
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
store_path = os.path.join(workspace_root, "scheduler", "tasks.json")
# Create task store
_task_store = TaskStore(store_path)
logger.debug(f"[Scheduler] Task store initialized: {store_path}")
# Create execute callback
def execute_task_callback(task: dict):
"""Callback to execute a scheduled task"""
try:
action = task.get("action", {})
action_type = action.get("type")
if action_type == "agent_task":
_execute_agent_task(task, agent_bridge)
elif action_type == "send_message":
# Legacy support for old tasks
_execute_send_message(task, agent_bridge)
elif action_type == "tool_call":
# Legacy support for old tasks
_execute_tool_call(task, agent_bridge)
elif action_type == "skill_call":
# Legacy support for old tasks
_execute_skill_call(task, agent_bridge)
else:
logger.warning(f"[Scheduler] Unknown action type: {action_type}")
except Exception as e:
logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}")
# Create scheduler service
_scheduler_service = SchedulerService(_task_store, execute_task_callback)
_scheduler_service.start()
logger.debug("[Scheduler] Scheduler service initialized and started")
# Fast path: already initialized and running
if _scheduler_service is not None and getattr(_scheduler_service, "running", False):
return True
except Exception as e:
logger.error(f"[Scheduler] Failed to initialize scheduler: {e}")
return False
with _init_lock:
# Re-check under the lock to avoid races where multiple threads
# passed the fast-path check before any of them acquired the lock.
if _scheduler_service is not None and getattr(_scheduler_service, "running", False):
return True
try:
from agent.tools.scheduler.task_store import TaskStore
from agent.tools.scheduler.scheduler_service import SchedulerService
# Get workspace from config
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
store_path = os.path.join(workspace_root, "scheduler", "tasks.json")
# Create task store (reuse if already created)
if _task_store is None:
_task_store = TaskStore(store_path)
logger.debug(f"[Scheduler] Task store initialized: {store_path}")
# Create execute callback
def execute_task_callback(task: dict):
"""Callback to execute a scheduled task"""
try:
action = task.get("action", {})
action_type = action.get("type")
if action_type == "agent_task":
_execute_agent_task(task, agent_bridge)
elif action_type == "send_message":
# Legacy support for old tasks
_execute_send_message(task, agent_bridge)
elif action_type == "tool_call":
# Legacy support for old tasks
_execute_tool_call(task, agent_bridge)
elif action_type == "skill_call":
# Legacy support for old tasks
_execute_skill_call(task, agent_bridge)
else:
logger.warning(f"[Scheduler] Unknown action type: {action_type}")
except Exception as e:
logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}")
# Create scheduler service
_scheduler_service = SchedulerService(_task_store, execute_task_callback)
_scheduler_service.start()
logger.debug("[Scheduler] Scheduler service initialized and started")
return True
except Exception as e:
logger.error(f"[Scheduler] Failed to initialize scheduler: {e}")
return False
def get_task_store():

View File

@@ -5,6 +5,7 @@ Agent Initializer - Handles agent initialization logic
import os
import asyncio
import datetime
import threading
import time
from typing import Optional, List
@@ -13,6 +14,9 @@ from agent.tools import ToolManager
from common.log import logger
from common.utils import expand_path
# Module-level lock to serialize scheduler init across concurrent sessions
_scheduler_init_lock = threading.Lock()
class AgentInitializer:
"""
@@ -406,16 +410,23 @@ class AgentInitializer:
return tools
def _initialize_scheduler(self, tools: List, session_id: Optional[str] = None):
"""Initialize scheduler service if needed"""
"""Initialize scheduler service if needed.
Serialize the check-and-set under a module-level lock so concurrent
first-time session inits cannot each create a new SchedulerService
(which would leak background scanning threads).
"""
if not self.agent_bridge.scheduler_initialized:
try:
from agent.tools.scheduler.integration import init_scheduler
if init_scheduler(self.agent_bridge):
self.agent_bridge.scheduler_initialized = True
if session_id is None:
logger.info("[AgentInitializer] Scheduler service initialized")
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to initialize scheduler: {e}")
with _scheduler_init_lock:
if not self.agent_bridge.scheduler_initialized:
try:
from agent.tools.scheduler.integration import init_scheduler
if init_scheduler(self.agent_bridge):
self.agent_bridge.scheduler_initialized = True
if session_id is None:
logger.info("[AgentInitializer] Scheduler service initialized")
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to initialize scheduler: {e}")
# Inject scheduler dependencies
if self.agent_bridge.scheduler_initialized: