fix(scheduler): make cron pushes survive restart on weixin channel

This commit is contained in:
zhayujie
2026-05-25 12:15:57 +08:00
parent eb74b73351
commit c5a3f991c5
5 changed files with 354 additions and 226 deletions

View File

@@ -57,34 +57,44 @@ def init_scheduler(agent_bridge) -> bool:
_task_store = TaskStore(store_path) _task_store = TaskStore(store_path)
logger.debug(f"[Scheduler] Task store initialized: {store_path}") logger.debug(f"[Scheduler] Task store initialized: {store_path}")
# Create execute callback # Create execute callback. Returns True on success, False to ask
# the scheduler to retry on the next tick (e.g. channel not yet
# ready right after process start).
def execute_task_callback(task: dict): def execute_task_callback(task: dict):
"""Callback to execute a scheduled task"""
try: try:
action = task.get("action", {}) action = task.get("action", {})
action_type = action.get("type") action_type = action.get("type")
channel_type = action.get("channel_type", "unknown")
receiver = action.get("receiver", "")
if not _is_channel_ready(channel_type, receiver):
logger.warning(
f"[Scheduler] Task {task.get('id')}: channel "
f"'{channel_type}' not ready for receiver={receiver} "
f"(no inbound msg cached since restart?); deferring"
)
return False
if action_type == "agent_task": if action_type == "agent_task":
_execute_agent_task(task, agent_bridge) return _execute_agent_task(task, agent_bridge)
elif action_type == "send_message": elif action_type == "send_message":
# Legacy support for old tasks return _execute_send_message(task, agent_bridge)
_execute_send_message(task, agent_bridge)
elif action_type == "tool_call": elif action_type == "tool_call":
# Legacy support for old tasks return _execute_tool_call(task, agent_bridge)
_execute_tool_call(task, agent_bridge)
elif action_type == "skill_call": elif action_type == "skill_call":
# Legacy support for old tasks return _execute_skill_call(task, agent_bridge)
_execute_skill_call(task, agent_bridge)
else: else:
logger.warning(f"[Scheduler] Unknown action type: {action_type}") logger.warning(f"[Scheduler] Unknown action type: {action_type}")
return True
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}") logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}")
return False
# Create scheduler service # Create scheduler service
_scheduler_service = SchedulerService(_task_store, execute_task_callback) _scheduler_service = SchedulerService(_task_store, execute_task_callback)
_scheduler_service.start() _scheduler_service.start()
logger.debug("[Scheduler] Scheduler service initialized and started") logger.info("[Scheduler] Service initialized and started")
return True return True
except Exception as e: except Exception as e:
@@ -92,6 +102,40 @@ def init_scheduler(agent_bridge) -> bool:
return False return False
def _is_channel_ready(channel_type: str, receiver: str) -> bool:
"""Best-effort readiness probe for outbound channels.
Returns False when we know the send will drop (e.g. weixin not yet
logged in, web session has no polling queue), so the scheduler can
defer instead of consuming the task. Unknown channels return True
to preserve previous behaviour.
"""
if not channel_type or channel_type == "unknown":
return True
try:
from channel.channel_factory import create_channel
channel = create_channel(channel_type)
if channel is None:
return False
if channel_type == "weixin":
tokens = getattr(channel, "_context_tokens", None)
if not tokens or receiver not in tokens:
return False
return True
if channel_type == "web":
queues = getattr(channel, "session_queues", None)
if not queues or receiver not in queues:
return False
return True
return True
except Exception as e:
logger.warning(f"[Scheduler] Channel readiness check failed for {channel_type}: {e}")
return True
def get_task_store(): def get_task_store():
"""Get the global task store instance""" """Get the global task store instance"""
return _task_store return _task_store
@@ -145,13 +189,10 @@ def _remember_delivered_output(
) )
def _execute_agent_task(task: dict, agent_bridge): def _execute_agent_task(task: dict, agent_bridge) -> bool:
""" """
Execute an agent_task action - let Agent handle the task Execute an agent_task action - let Agent handle the task.
Returns True on successful delivery, False to retry next tick.
Args:
task: Task dictionary
agent_bridge: AgentBridge instance
""" """
try: try:
action = task.get("action", {}) action = task.get("action", {})
@@ -162,11 +203,11 @@ def _execute_agent_task(task: dict, agent_bridge):
if not task_description: if not task_description:
logger.error(f"[Scheduler] Task {task['id']}: No task_description specified") logger.error(f"[Scheduler] Task {task['id']}: No task_description specified")
return return True # malformed task, don't loop forever
if not receiver: if not receiver:
logger.error(f"[Scheduler] Task {task['id']}: No receiver specified") logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
return return True
# Check for unsupported channels # Check for unsupported channels
if channel_type == "dingtalk": if channel_type == "dingtalk":
@@ -210,50 +251,46 @@ def _execute_agent_task(task: dict, agent_bridge):
# Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations # Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations
reply = agent_bridge.agent_reply(task_description, context=context, on_event=None, clear_history=False) reply = agent_bridge.agent_reply(task_description, context=context, on_event=None, clear_history=False)
if reply and reply.content: if not (reply and reply.content):
# Send the reply via channel logger.error(f"[Scheduler] Task {task['id']}: No result from agent execution")
from channel.channel_factory import create_channel return True # agent ran but produced nothing; don't loop
try: from channel.channel_factory import create_channel
channel = create_channel(channel_type) channel = create_channel(channel_type)
if channel: if not channel:
# For web channel, register request_id logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
return False
if channel_type == "web" and hasattr(channel, 'request_to_session'): if channel_type == "web" and hasattr(channel, 'request_to_session'):
request_id = context.get("request_id") request_id = context.get("request_id")
if request_id: if request_id:
channel.request_to_session[request_id] = receiver channel.request_to_session[request_id] = receiver
logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
# Send the reply try:
channel.send(reply, context) channel.send(reply, context)
_remember_delivered_output(agent_bridge, task, channel_type, reply.content)
logger.info(f"[Scheduler] Task {task['id']} executed successfully, result sent to {receiver}")
else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Failed to send result: {e}") logger.error(f"[Scheduler] Failed to send result: {e}")
else: return False
logger.error(f"[Scheduler] Task {task['id']}: No result from agent execution")
_remember_delivered_output(agent_bridge, task, channel_type, reply.content)
logger.info(f"[Scheduler] Task {task['id']} executed successfully, result sent to {receiver}")
return True
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Failed to execute task via Agent: {e}") logger.error(f"[Scheduler] Failed to execute task via Agent: {e}")
import traceback import traceback
logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
return False
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Error in _execute_agent_task: {e}") logger.error(f"[Scheduler] Error in _execute_agent_task: {e}")
import traceback import traceback
logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
return False
def _execute_send_message(task: dict, agent_bridge): def _execute_send_message(task: dict, agent_bridge) -> bool:
""" """Execute a send_message action. Returns True/False for delivery."""
Execute a send_message action
Args:
task: Task dictionary
agent_bridge: AgentBridge instance
"""
try: try:
action = task.get("action", {}) action = task.get("action", {})
content = action.get("content", "") content = action.get("content", "")
@@ -263,7 +300,7 @@ def _execute_send_message(task: dict, agent_bridge):
if not receiver: if not receiver:
logger.error(f"[Scheduler] Task {task['id']}: No receiver specified") logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
return return True
# Create context for sending message # Create context for sending message
context = Context(ContextType.TEXT, content) context = Context(ContextType.TEXT, content)
@@ -308,41 +345,35 @@ def _execute_send_message(task: dict, agent_bridge):
# Get channel and send # Get channel and send
from channel.channel_factory import create_channel from channel.channel_factory import create_channel
try:
channel = create_channel(channel_type) channel = create_channel(channel_type)
if channel: if not channel:
# For web channel, register the request_id to session mapping logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
return False
if channel_type == "web" and hasattr(channel, 'request_to_session'): if channel_type == "web" and hasattr(channel, 'request_to_session'):
channel.request_to_session[request_id] = receiver channel.request_to_session[request_id] = receiver
logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
try:
channel.send(reply, context) channel.send(reply, context)
_remember_delivered_output(agent_bridge, task, channel_type, content)
logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}")
else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Failed to send message: {e}") logger.error(f"[Scheduler] Failed to send message: {e}")
import traceback return False
logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
_remember_delivered_output(agent_bridge, task, channel_type, content)
logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}")
return True
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Error in _execute_send_message: {e}") logger.error(f"[Scheduler] Error in _execute_send_message: {e}")
import traceback import traceback
logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
return False
def _execute_tool_call(task: dict, agent_bridge): def _execute_tool_call(task: dict, agent_bridge) -> bool:
""" """Execute a tool_call action. Returns True/False for delivery."""
Execute a tool_call action
Args:
task: Task dictionary
agent_bridge: AgentBridge instance
"""
try: try:
action = task.get("action", {}) action = task.get("action", {})
# Support both old and new field names
tool_name = action.get("call_name") or action.get("tool_name") tool_name = action.get("call_name") or action.get("tool_name")
tool_params = action.get("call_params") or action.get("tool_params", {}) tool_params = action.get("call_params") or action.get("tool_params", {})
result_prefix = action.get("result_prefix", "") result_prefix = action.get("result_prefix", "")
@@ -352,90 +383,70 @@ def _execute_tool_call(task: dict, agent_bridge):
if not tool_name: if not tool_name:
logger.error(f"[Scheduler] Task {task['id']}: No tool_name specified") logger.error(f"[Scheduler] Task {task['id']}: No tool_name specified")
return return True
if not receiver: if not receiver:
logger.error(f"[Scheduler] Task {task['id']}: No receiver specified") logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
return return True
# Get tool manager and create tool instance
from agent.tools.tool_manager import ToolManager from agent.tools.tool_manager import ToolManager
tool_manager = ToolManager() tool = ToolManager().create_tool(tool_name)
tool = tool_manager.create_tool(tool_name)
if not tool: if not tool:
logger.error(f"[Scheduler] Task {task['id']}: Tool '{tool_name}' not found") logger.error(f"[Scheduler] Task {task['id']}: Tool '{tool_name}' not found")
return return True
# Execute tool
logger.info(f"[Scheduler] Task {task['id']}: Executing tool '{tool_name}' with params {tool_params}") logger.info(f"[Scheduler] Task {task['id']}: Executing tool '{tool_name}' with params {tool_params}")
result = tool.execute(tool_params) result = tool.execute(tool_params)
content = result.result if hasattr(result, 'result') else str(result)
# Get result content
if hasattr(result, 'result'):
content = result.result
else:
content = str(result)
# Add prefix if specified
if result_prefix: if result_prefix:
content = f"{result_prefix}\n\n{content}" content = f"{result_prefix}\n\n{content}"
# Send result as message
context = Context(ContextType.TEXT, content) context = Context(ContextType.TEXT, content)
context["receiver"] = receiver context["receiver"] = receiver
context["isgroup"] = is_group context["isgroup"] = is_group
context["session_id"] = receiver context["session_id"] = receiver
# Channel-specific context setup request_id = None
if channel_type == "web": if channel_type == "web":
# Web channel needs request_id
import uuid import uuid
request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}" request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}"
context["request_id"] = request_id context["request_id"] = request_id
logger.debug(f"[Scheduler] Generated request_id for web channel: {request_id}")
elif channel_type == "feishu": elif channel_type == "feishu":
context["receive_id_type"] = "chat_id" if is_group else "open_id" context["receive_id_type"] = "chat_id" if is_group else "open_id"
context["msg"] = None context["msg"] = None
logger.debug(f"[Scheduler] Feishu: receive_id_type={context['receive_id_type']}, is_group={is_group}, receiver={receiver}")
elif channel_type == "wecom_bot": elif channel_type == "wecom_bot":
context["msg"] = None context["msg"] = None
reply = Reply(ReplyType.TEXT, content) reply = Reply(ReplyType.TEXT, content)
# Get channel and send
from channel.channel_factory import create_channel from channel.channel_factory import create_channel
channel = create_channel(channel_type)
if not channel:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
return False
if channel_type == "web" and request_id and hasattr(channel, 'request_to_session'):
channel.request_to_session[request_id] = receiver
try: try:
channel = create_channel(channel_type)
if channel:
if channel_type == "web" and hasattr(channel, 'request_to_session'):
channel.request_to_session[request_id] = receiver
logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
channel.send(reply, context) channel.send(reply, context)
_remember_delivered_output(agent_bridge, task, channel_type, content)
logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}")
else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Failed to send tool result: {e}") logger.error(f"[Scheduler] Failed to send tool result: {e}")
return False
_remember_delivered_output(agent_bridge, task, channel_type, content)
logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}")
return True
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Error in _execute_tool_call: {e}") logger.error(f"[Scheduler] Error in _execute_tool_call: {e}")
return False
def _execute_skill_call(task: dict, agent_bridge): def _execute_skill_call(task: dict, agent_bridge) -> bool:
""" """Execute a skill_call action by asking Agent to run the skill.
Execute a skill_call action by asking Agent to run the skill Returns True/False for delivery."""
Args:
task: Task dictionary
agent_bridge: AgentBridge instance
"""
try: try:
action = task.get("action", {}) action = task.get("action", {})
# Support both old and new field names
skill_name = action.get("call_name") or action.get("skill_name") skill_name = action.get("call_name") or action.get("skill_name")
skill_params = action.get("call_params") or action.get("skill_params", {}) skill_params = action.get("call_params") or action.get("skill_params", {})
result_prefix = action.get("result_prefix", "") result_prefix = action.get("result_prefix", "")
@@ -445,32 +456,24 @@ def _execute_skill_call(task: dict, agent_bridge):
if not skill_name: if not skill_name:
logger.error(f"[Scheduler] Task {task['id']}: No skill_name specified") logger.error(f"[Scheduler] Task {task['id']}: No skill_name specified")
return return True
if not receiver: if not receiver:
logger.error(f"[Scheduler] Task {task['id']}: No receiver specified") logger.error(f"[Scheduler] Task {task['id']}: No receiver specified")
return return True
logger.info(f"[Scheduler] Task {task['id']}: Executing skill '{skill_name}' with params {skill_params}") logger.info(f"[Scheduler] Task {task['id']}: Executing skill '{skill_name}' with params {skill_params}")
# Create a unique session_id for this scheduled task to avoid polluting user's conversation
# Format: scheduler_<receiver>_<task_id> to ensure isolation
scheduler_session_id = f"scheduler_{receiver}_{task['id']}" scheduler_session_id = f"scheduler_{receiver}_{task['id']}"
# Build a natural language query for the Agent to execute the skill
# Format: "Use skill-name to do something with params"
param_str = ", ".join([f"{k}={v}" for k, v in skill_params.items()]) param_str = ", ".join([f"{k}={v}" for k, v in skill_params.items()])
query = f"Use {skill_name} skill" query = f"Use {skill_name} skill"
if param_str: if param_str:
query += f" with {param_str}" query += f" with {param_str}"
# Create context for Agent
context = Context(ContextType.TEXT, query) context = Context(ContextType.TEXT, query)
context["receiver"] = receiver context["receiver"] = receiver
context["isgroup"] = is_group context["isgroup"] = is_group
context["session_id"] = scheduler_session_id context["session_id"] = scheduler_session_id
# Channel-specific setup
if channel_type == "web": if channel_type == "web":
import uuid import uuid
request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}" request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}"
@@ -481,49 +484,48 @@ def _execute_skill_call(task: dict, agent_bridge):
elif channel_type == "wecom_bot": elif channel_type == "wecom_bot":
context["msg"] = None context["msg"] = None
# Use Agent to execute the skill
try: try:
# Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations
reply = agent_bridge.agent_reply(query, context=context, on_event=None, clear_history=False) reply = agent_bridge.agent_reply(query, context=context, on_event=None, clear_history=False)
if reply and reply.content:
content = reply.content
# Add prefix if specified
if result_prefix:
content = f"{result_prefix}\n\n{content}"
# Send the result via channel
from channel.channel_factory import create_channel
try:
channel = create_channel(channel_type)
if channel:
# For web channel, register request_id
if channel_type == "web" and hasattr(channel, 'request_to_session'):
req_id = context.get("request_id")
if req_id:
channel.request_to_session[req_id] = receiver
logger.debug(f"[Scheduler] Registered request_id {req_id} -> session {receiver}")
channel.send(Reply(ReplyType.TEXT, content), context)
_remember_delivered_output(agent_bridge, task, channel_type, content)
except Exception as e:
logger.error(f"[Scheduler] Failed to send skill result: {e}")
logger.info(f"[Scheduler] Task {task['id']} executed: skill result sent to {receiver}")
else:
logger.error(f"[Scheduler] Task {task['id']}: No result from skill execution")
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Failed to execute skill via Agent: {e}") logger.error(f"[Scheduler] Failed to execute skill via Agent: {e}")
import traceback import traceback
logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
return False
if not (reply and reply.content):
logger.error(f"[Scheduler] Task {task['id']}: No result from skill execution")
return True
content = reply.content
if result_prefix:
content = f"{result_prefix}\n\n{content}"
from channel.channel_factory import create_channel
channel = create_channel(channel_type)
if not channel:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
return False
if channel_type == "web" and hasattr(channel, 'request_to_session'):
req_id = context.get("request_id")
if req_id:
channel.request_to_session[req_id] = receiver
try:
channel.send(Reply(ReplyType.TEXT, content), context)
except Exception as e:
logger.error(f"[Scheduler] Failed to send skill result: {e}")
return False
_remember_delivered_output(agent_bridge, task, channel_type, content)
logger.info(f"[Scheduler] Task {task['id']} executed: skill result sent to {receiver}")
return True
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Error in _execute_skill_call: {e}") logger.error(f"[Scheduler] Error in _execute_skill_call: {e}")
import traceback import traceback
logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}")
return False
def attach_scheduler_to_tool(tool, context: Context = None): def attach_scheduler_to_tool(tool, context: Context = None):

View File

@@ -52,7 +52,6 @@ class SchedulerService:
self.running = True self.running = True
self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start() self.thread.start()
logger.debug("[Scheduler] Service started")
def stop(self): def stop(self):
"""Stop the scheduler service""" """Stop the scheduler service"""
@@ -67,7 +66,7 @@ class SchedulerService:
def _run_loop(self): def _run_loop(self):
"""Main scheduler loop""" """Main scheduler loop"""
logger.debug("[Scheduler] Scheduler loop started") logger.info("[Scheduler] Scheduler loop started")
while self.running: while self.running:
try: try:
@@ -84,12 +83,18 @@ class SchedulerService:
for task in tasks: for task in tasks:
try: try:
# Check if task is due
if self._is_task_due(task, now): if self._is_task_due(task, now):
logger.info(f"[Scheduler] Executing task: {task['id']} - {task['name']}") logger.info(f"[Scheduler] Executing task: {task['id']} - {task['name']}")
self._execute_task(task) ok = self._execute_task(task)
if not ok:
# Leave next_run_at as-is so the next loop retries.
# Cron tasks within the catch-up window will keep
# firing; beyond it _is_task_due will reschedule.
logger.warning(
f"[Scheduler] Task {task['id']} delivery failed, will retry next tick"
)
continue
# Update next run time
next_run = self._calculate_next_run(task, now) next_run = self._calculate_next_run(task, now)
if next_run: if next_run:
self.task_store.update_task(task['id'], { self.task_store.update_task(task['id'], {
@@ -97,7 +102,6 @@ class SchedulerService:
"last_run_at": now.isoformat() "last_run_at": now.isoformat()
}) })
else: else:
# One-time task completed, remove it
self.task_store.delete_task(task['id']) self.task_store.delete_task(task['id'])
logger.info(f"[Scheduler] One-time task completed and removed: {task['id']}") logger.info(f"[Scheduler] One-time task completed and removed: {task['id']}")
except Exception as e: except Exception as e:
@@ -128,22 +132,27 @@ class SchedulerService:
try: try:
next_run = _parse_naive_local(next_run_str) next_run = _parse_naive_local(next_run_str)
# Check if task is overdue (e.g., service restart)
if next_run < now: if next_run < now:
time_diff = (now - next_run).total_seconds() time_diff = (now - next_run).total_seconds()
# If overdue by more than 5 minutes, skip this run and schedule next
if time_diff > 300: # 5 minutes
logger.warning(f"[Scheduler] Task {task['id']} is overdue by {int(time_diff)}s, skipping and scheduling next run")
# For one-time tasks, remove them directly
schedule = task.get("schedule", {}) schedule = task.get("schedule", {})
if schedule.get("type") == "once": schedule_type = schedule.get("type")
# Catch-up window: fire if we're within 10 minutes of the
# scheduled tick. Beyond that we'd rather skip than push a
# stale daily report to the user.
if time_diff <= 600:
return True
logger.warning(
f"[Scheduler] Task {task['id']} is overdue by {int(time_diff)}s, "
f"skipping and scheduling next run"
)
if schedule_type == "once":
self.task_store.delete_task(task['id']) self.task_store.delete_task(task['id'])
logger.info(f"[Scheduler] One-time task {task['id']} expired, removed") logger.info(f"[Scheduler] One-time task {task['id']} expired, removed")
return False return False
# For recurring tasks, calculate next run from now
next_next_run = self._calculate_next_run(task, now) next_next_run = self._calculate_next_run(task, now)
if next_next_run: if next_next_run:
self.task_store.update_task(task['id'], { self.task_store.update_task(task['id'], {
@@ -213,20 +222,22 @@ class SchedulerService:
return None return None
def _execute_task(self, task: dict): def _execute_task(self, task: dict) -> bool:
""" """
Execute a task Execute a task.
Args: Returns True if delivery succeeded (caller should advance state),
task: Task dictionary False if it failed (caller should keep next_run_at so the next
loop iteration retries). Callback may return None for legacy
behaviour, treated as success.
""" """
try: try:
# Call the execute callback result = self.execute_callback(task)
self.execute_callback(task) return False if result is False else True
except Exception as e: except Exception as e:
logger.error(f"[Scheduler] Error executing task {task['id']}: {e}") logger.error(f"[Scheduler] Error executing task {task['id']}: {e}")
# Update task with error
self.task_store.update_task(task['id'], { self.task_store.update_task(task['id'], {
"last_error": str(e), "last_error": str(e),
"last_error_at": datetime.now().isoformat() "last_error_at": datetime.now().isoformat()
}) })
return False

12
app.py
View File

@@ -288,6 +288,16 @@ def _warmup_mcp_tools():
logger.warning(f"[App] MCP warmup failed (non-fatal): {e}") logger.warning(f"[App] MCP warmup failed (non-fatal): {e}")
def _warmup_scheduler():
"""Eager-init AgentBridge so the scheduler thread starts at process
boot rather than waiting for the first user message."""
try:
from bridge.bridge import Bridge
Bridge().get_agent_bridge()
except Exception as e:
logger.warning(f"[App] Scheduler warmup failed: {e}")
def _sync_builtin_skills(): def _sync_builtin_skills():
"""Sync builtin skills from project skills/ to workspace skills/ on startup.""" """Sync builtin skills from project skills/ to workspace skills/ on startup."""
import shutil import shutil
@@ -353,6 +363,8 @@ def run():
# latency isn't dominated by npx package downloads. # latency isn't dominated by npx package downloads.
_warmup_mcp_tools() _warmup_mcp_tools()
_warmup_scheduler()
logger.info(f"[App] Starting channels: {channel_names}") logger.info(f"[App] Starting channels: {channel_names}")
_channel_mgr = ChannelManager() _channel_mgr = ChannelManager()

View File

@@ -285,6 +285,15 @@ class AgentBridge:
# Create helper instances # Create helper instances
self.initializer = AgentInitializer(bridge, self) self.initializer = AgentInitializer(bridge, self)
# Eager-start the scheduler so cron tasks fire without waiting
# for the first user message. init_scheduler is idempotent.
try:
from agent.tools.scheduler.integration import init_scheduler
if init_scheduler(self):
self.scheduler_initialized = True
except Exception as e:
logger.warning(f"[AgentBridge] Eager scheduler init failed: {e}")
def create_agent(self, system_prompt: str, tools: List = None, **kwargs) -> Agent: def create_agent(self, system_prompt: str, tools: List = None, **kwargs) -> Agent:
""" """
Create the super agent with COW integration Create the super agent with COW integration

View File

@@ -47,14 +47,16 @@ def _load_credentials(cred_path: str) -> dict:
def _save_credentials(cred_path: str, data: dict): def _save_credentials(cred_path: str, data: dict):
"""Save credentials to JSON file.""" """Atomically save credentials to JSON file (tmp + rename)."""
os.makedirs(os.path.dirname(cred_path), exist_ok=True) os.makedirs(os.path.dirname(cred_path), exist_ok=True)
with open(cred_path, "w") as f: tmp_path = f"{cred_path}.tmp"
with open(tmp_path, "w") as f:
json.dump(data, f, indent=2) json.dump(data, f, indent=2)
try: try:
os.chmod(cred_path, 0o600) os.chmod(tmp_path, 0o600)
except Exception: except Exception:
pass pass
os.replace(tmp_path, cred_path)
@singleton @singleton
@@ -73,7 +75,10 @@ class WeixinChannel(ChatChannel):
self.api = None self.api = None
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._poll_thread = None self._poll_thread = None
self._context_tokens = {} # user_id -> context_token # user_id -> context_token. Guarded by _context_tokens_lock for any
# mutation that races with disk persistence.
self._context_tokens = {}
self._context_tokens_lock = threading.Lock()
self._received_msgs = ExpiredDict(60 * 60 * 7.1) self._received_msgs = ExpiredDict(60 * 60 * 7.1)
self._get_updates_buf = "" self._get_updates_buf = ""
self._credentials_path = "" self._credentials_path = ""
@@ -95,12 +100,19 @@ class WeixinChannel(ChatChannel):
conf().get("weixin_credentials_path", "~/.weixin_cow_credentials.json") conf().get("weixin_credentials_path", "~/.weixin_cow_credentials.json")
) )
if not token: # Always load credentials so we can restore context_tokens even when
# the bot token itself comes from config.
creds = _load_credentials(self._credentials_path) creds = _load_credentials(self._credentials_path)
if not token:
token = creds.get("token", "") token = creds.get("token", "")
if creds.get("base_url"): if creds.get("base_url"):
base_url = creds["base_url"] base_url = creds["base_url"]
# Restore persisted context_tokens so scheduler can deliver pushes
# immediately after restart, without waiting for the user to ping
# the bot first.
self._restore_context_tokens_from_creds(creds)
if not token: if not token:
token, base_url = self._login_with_retry(base_url) token, base_url = self._login_with_retry(base_url)
if not token: if not token:
@@ -140,6 +152,11 @@ class WeixinChannel(ChatChannel):
def _relogin(self) -> bool: def _relogin(self) -> bool:
"""Re-login after session expiry. Returns True on success.""" """Re-login after session expiry. Returns True on success."""
base_url = self.api.base_url if self.api else DEFAULT_BASE_URL base_url = self.api.base_url if self.api else DEFAULT_BASE_URL
# Clearing the whole credentials file is intentional: the new login
# will issue a fresh `token` and persisted context_tokens belong to
# the previous bot identity, so they must not survive.
with self._context_tokens_lock:
self._context_tokens.clear()
if os.path.exists(self._credentials_path): if os.path.exists(self._credentials_path):
try: try:
os.remove(self._credentials_path) os.remove(self._credentials_path)
@@ -156,9 +173,62 @@ class WeixinChannel(ChatChannel):
cdn_base_url=self.api.cdn_base_url if self.api else CDN_BASE_URL, cdn_base_url=self.api.cdn_base_url if self.api else CDN_BASE_URL,
) )
self.login_status = self.LOGIN_STATUS_OK self.login_status = self.LOGIN_STATUS_OK
self._context_tokens.clear()
return True return True
# ── Context token persistence ──────────────────────────────────────
# ilink requires every outbound send to echo the context_token from the
# user's latest inbound message. We mirror the in-memory map into the
# credentials JSON so scheduled pushes survive process restarts.
# All mutation + disk IO is serialized via _context_tokens_lock so that
# concurrent updates can never lose each other's writes.
def _restore_context_tokens_from_creds(self, creds: dict) -> None:
if not isinstance(creds, dict):
return
tokens = creds.get("context_tokens")
if not isinstance(tokens, dict):
return
restored = 0
with self._context_tokens_lock:
for user_id, token in tokens.items():
if isinstance(user_id, str) and isinstance(token, str) and token:
self._context_tokens[user_id] = token
restored += 1
if restored:
logger.info(f"[Weixin] Restored {restored} context_tokens from credentials")
def _persist_context_tokens_locked(self) -> None:
"""Flush the token map to disk. Caller must hold _context_tokens_lock."""
if not self._credentials_path:
return
try:
creds = _load_credentials(self._credentials_path) or {}
creds["context_tokens"] = dict(self._context_tokens)
_save_credentials(self._credentials_path, creds)
except Exception as e:
logger.warning(f"[Weixin] Failed to persist context_tokens: {e}")
def _update_context_token(self, user_id: str, token: str) -> None:
"""Update the in-memory token for a user; flush to disk only on change."""
if not user_id or not token:
return
with self._context_tokens_lock:
if self._context_tokens.get(user_id) == token:
return
self._context_tokens[user_id] = token
self._persist_context_tokens_locked()
def _invalidate_context_token(self, user_id: str) -> None:
"""Drop the cached token for a user (used after -14 / send rejection)."""
if not user_id:
return
with self._context_tokens_lock:
if user_id not in self._context_tokens:
return
del self._context_tokens[user_id]
logger.info(f"[Weixin] Invalidated stale context_token for {user_id}")
self._persist_context_tokens_locked()
# ── QR Login ─────────────────────────────────────────────────────── # ── QR Login ───────────────────────────────────────────────────────
@staticmethod @staticmethod
@@ -391,7 +461,7 @@ class WeixinChannel(ChatChannel):
context_token = raw_msg.get("context_token", "") context_token = raw_msg.get("context_token", "")
if context_token and from_user: if context_token and from_user:
self._context_tokens[from_user] = context_token self._update_context_token(from_user, context_token)
cdn_base_url = self.api.cdn_base_url if self.api else CDN_BASE_URL cdn_base_url = self.api.cdn_base_url if self.api else CDN_BASE_URL
try: try:
@@ -510,10 +580,30 @@ class WeixinChannel(ChatChannel):
return msg.context_token return msg.context_token
return self._context_tokens.get(receiver, "") return self._context_tokens.get(receiver, "")
def _check_send_response(self, resp, receiver: str) -> None:
"""Inspect a send-API response; drop stale context_token on -14.
ilink uses ret/errcode = -14 to signal that the session (and any
cached context_token) is no longer valid. The plugin keeps running
because the bot itself can re-login; we just need to forget the
per-user token so the next push won't retry forever.
"""
if not isinstance(resp, dict):
return
ret = resp.get("ret")
errcode = resp.get("errcode")
if ret == -14 or errcode == -14:
logger.warning(
f"[Weixin] Send returned -14 (session expired) for "
f"receiver={receiver}; dropping cached context_token"
)
self._invalidate_context_token(receiver)
def _send_text(self, text: str, receiver: str, context_token: str): def _send_text(self, text: str, receiver: str, context_token: str):
if len(text) <= TEXT_CHUNK_LIMIT: if len(text) <= TEXT_CHUNK_LIMIT:
try: try:
self.api.send_text(receiver, text, context_token) resp = self.api.send_text(receiver, text, context_token)
self._check_send_response(resp, receiver)
logger.debug(f"[Weixin] Text sent to {receiver}, len={len(text)}") logger.debug(f"[Weixin] Text sent to {receiver}, len={len(text)}")
except Exception as e: except Exception as e:
logger.error(f"[Weixin] Failed to send text: {e}") logger.error(f"[Weixin] Failed to send text: {e}")
@@ -522,7 +612,8 @@ class WeixinChannel(ChatChannel):
chunks = self._split_text(text, TEXT_CHUNK_LIMIT) chunks = self._split_text(text, TEXT_CHUNK_LIMIT)
for i, chunk in enumerate(chunks): for i, chunk in enumerate(chunks):
try: try:
self.api.send_text(receiver, chunk, context_token) resp = self.api.send_text(receiver, chunk, context_token)
self._check_send_response(resp, receiver)
logger.debug(f"[Weixin] Text chunk {i+1}/{len(chunks)} sent to {receiver}, len={len(chunk)}") logger.debug(f"[Weixin] Text chunk {i+1}/{len(chunks)} sent to {receiver}, len={len(chunk)}")
except Exception as e: except Exception as e:
logger.error(f"[Weixin] Failed to send text chunk {i+1}/{len(chunks)}: {e}") logger.error(f"[Weixin] Failed to send text chunk {i+1}/{len(chunks)}: {e}")
@@ -556,13 +647,14 @@ class WeixinChannel(ChatChannel):
return return
try: try:
result = upload_media_to_cdn(self.api, local_path, receiver, media_type=1) result = upload_media_to_cdn(self.api, local_path, receiver, media_type=1)
self.api.send_image_item( resp = self.api.send_image_item(
to=receiver, to=receiver,
context_token=context_token, context_token=context_token,
encrypt_query_param=result["encrypt_query_param"], encrypt_query_param=result["encrypt_query_param"],
aes_key_b64=result["aes_key_b64"], aes_key_b64=result["aes_key_b64"],
ciphertext_size=result["ciphertext_size"], ciphertext_size=result["ciphertext_size"],
) )
self._check_send_response(resp, receiver)
logger.info(f"[Weixin] Image sent to {receiver}") logger.info(f"[Weixin] Image sent to {receiver}")
except Exception as e: except Exception as e:
logger.error(f"[Weixin] Image send failed: {e}") logger.error(f"[Weixin] Image send failed: {e}")
@@ -575,7 +667,7 @@ class WeixinChannel(ChatChannel):
return return
try: try:
result = upload_media_to_cdn(self.api, local_path, receiver, media_type=3) result = upload_media_to_cdn(self.api, local_path, receiver, media_type=3)
self.api.send_file_item( resp = self.api.send_file_item(
to=receiver, to=receiver,
context_token=context_token, context_token=context_token,
encrypt_query_param=result["encrypt_query_param"], encrypt_query_param=result["encrypt_query_param"],
@@ -583,6 +675,7 @@ class WeixinChannel(ChatChannel):
file_name=os.path.basename(local_path), file_name=os.path.basename(local_path),
file_size=result["raw_size"], file_size=result["raw_size"],
) )
self._check_send_response(resp, receiver)
logger.info(f"[Weixin] File sent to {receiver}") logger.info(f"[Weixin] File sent to {receiver}")
except Exception as e: except Exception as e:
logger.error(f"[Weixin] File send failed: {e}") logger.error(f"[Weixin] File send failed: {e}")
@@ -595,13 +688,14 @@ class WeixinChannel(ChatChannel):
return return
try: try:
result = upload_media_to_cdn(self.api, local_path, receiver, media_type=2) result = upload_media_to_cdn(self.api, local_path, receiver, media_type=2)
self.api.send_video_item( resp = self.api.send_video_item(
to=receiver, to=receiver,
context_token=context_token, context_token=context_token,
encrypt_query_param=result["encrypt_query_param"], encrypt_query_param=result["encrypt_query_param"],
aes_key_b64=result["aes_key_b64"], aes_key_b64=result["aes_key_b64"],
ciphertext_size=result["ciphertext_size"], ciphertext_size=result["ciphertext_size"],
) )
self._check_send_response(resp, receiver)
logger.info(f"[Weixin] Video sent to {receiver}") logger.info(f"[Weixin] Video sent to {receiver}")
except Exception as e: except Exception as e:
logger.error(f"[Weixin] Video send failed: {e}") logger.error(f"[Weixin] Video send failed: {e}")