From c5a3f991c59f02158217bcedef3bdfa0679e8072 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Mon, 25 May 2026 12:15:57 +0800 Subject: [PATCH] fix(scheduler): make cron pushes survive restart on weixin channel --- agent/tools/scheduler/integration.py | 348 +++++++++++---------- agent/tools/scheduler/scheduler_service.py | 83 ++--- app.py | 12 + bridge/agent_bridge.py | 9 + channel/weixin/weixin_channel.py | 128 +++++++- 5 files changed, 354 insertions(+), 226 deletions(-) diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py index 9e559a43..7421a525 100644 --- a/agent/tools/scheduler/integration.py +++ b/agent/tools/scheduler/integration.py @@ -57,34 +57,44 @@ def init_scheduler(agent_bridge) -> bool: _task_store = TaskStore(store_path) logger.debug(f"[Scheduler] Task store initialized: {store_path}") - # Create execute callback + # Create execute callback. Returns True on success, False to ask + # the scheduler to retry on the next tick (e.g. channel not yet + # ready right after process start). def execute_task_callback(task: dict): - """Callback to execute a scheduled task""" try: action = task.get("action", {}) action_type = action.get("type") + channel_type = action.get("channel_type", "unknown") + receiver = action.get("receiver", "") + + if not _is_channel_ready(channel_type, receiver): + logger.warning( + f"[Scheduler] Task {task.get('id')}: channel " + f"'{channel_type}' not ready for receiver={receiver} " + f"(no inbound msg cached since restart?); deferring" + ) + return False if action_type == "agent_task": - _execute_agent_task(task, agent_bridge) + return _execute_agent_task(task, agent_bridge) elif action_type == "send_message": - # Legacy support for old tasks - _execute_send_message(task, agent_bridge) + return _execute_send_message(task, agent_bridge) elif action_type == "tool_call": - # Legacy support for old tasks - _execute_tool_call(task, agent_bridge) + return _execute_tool_call(task, agent_bridge) elif action_type == "skill_call": - # Legacy support for old tasks - _execute_skill_call(task, agent_bridge) + return _execute_skill_call(task, agent_bridge) else: logger.warning(f"[Scheduler] Unknown action type: {action_type}") + return True except Exception as e: logger.error(f"[Scheduler] Error executing task {task.get('id')}: {e}") + return False # Create scheduler service _scheduler_service = SchedulerService(_task_store, execute_task_callback) _scheduler_service.start() - logger.debug("[Scheduler] Scheduler service initialized and started") + logger.info("[Scheduler] Service initialized and started") return True except Exception as e: @@ -92,6 +102,40 @@ def init_scheduler(agent_bridge) -> bool: return False +def _is_channel_ready(channel_type: str, receiver: str) -> bool: + """Best-effort readiness probe for outbound channels. + + Returns False when we know the send will drop (e.g. weixin not yet + logged in, web session has no polling queue), so the scheduler can + defer instead of consuming the task. Unknown channels return True + to preserve previous behaviour. + """ + if not channel_type or channel_type == "unknown": + return True + try: + from channel.channel_factory import create_channel + channel = create_channel(channel_type) + if channel is None: + return False + + if channel_type == "weixin": + tokens = getattr(channel, "_context_tokens", None) + if not tokens or receiver not in tokens: + return False + return True + + if channel_type == "web": + queues = getattr(channel, "session_queues", None) + if not queues or receiver not in queues: + return False + return True + + return True + except Exception as e: + logger.warning(f"[Scheduler] Channel readiness check failed for {channel_type}: {e}") + return True + + def get_task_store(): """Get the global task store instance""" return _task_store @@ -145,13 +189,10 @@ def _remember_delivered_output( ) -def _execute_agent_task(task: dict, agent_bridge): +def _execute_agent_task(task: dict, agent_bridge) -> bool: """ - Execute an agent_task action - let Agent handle the task - - Args: - task: Task dictionary - agent_bridge: AgentBridge instance + Execute an agent_task action - let Agent handle the task. + Returns True on successful delivery, False to retry next tick. """ try: action = task.get("action", {}) @@ -162,11 +203,11 @@ def _execute_agent_task(task: dict, agent_bridge): if not task_description: logger.error(f"[Scheduler] Task {task['id']}: No task_description specified") - return + return True # malformed task, don't loop forever if not receiver: logger.error(f"[Scheduler] Task {task['id']}: No receiver specified") - return + return True # Check for unsupported channels if channel_type == "dingtalk": @@ -209,51 +250,47 @@ def _execute_agent_task(task: dict, agent_bridge): try: # Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations reply = agent_bridge.agent_reply(task_description, context=context, on_event=None, clear_history=False) - - if reply and reply.content: - # Send the reply via channel - from channel.channel_factory import create_channel - - try: - channel = create_channel(channel_type) - if channel: - # For web channel, register request_id - if channel_type == "web" and hasattr(channel, 'request_to_session'): - request_id = context.get("request_id") - if request_id: - channel.request_to_session[request_id] = receiver - logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}") - - # Send the reply - channel.send(reply, context) - _remember_delivered_output(agent_bridge, task, channel_type, reply.content) - logger.info(f"[Scheduler] Task {task['id']} executed successfully, result sent to {receiver}") - else: - logger.error(f"[Scheduler] Failed to create channel: {channel_type}") - except Exception as e: - logger.error(f"[Scheduler] Failed to send result: {e}") - else: + + if not (reply and reply.content): logger.error(f"[Scheduler] Task {task['id']}: No result from agent execution") - + return True # agent ran but produced nothing; don't loop + + from channel.channel_factory import create_channel + channel = create_channel(channel_type) + if not channel: + logger.error(f"[Scheduler] Failed to create channel: {channel_type}") + return False + + if channel_type == "web" and hasattr(channel, 'request_to_session'): + request_id = context.get("request_id") + if request_id: + channel.request_to_session[request_id] = receiver + + try: + channel.send(reply, context) + except Exception as e: + logger.error(f"[Scheduler] Failed to send result: {e}") + return False + + _remember_delivered_output(agent_bridge, task, channel_type, reply.content) + logger.info(f"[Scheduler] Task {task['id']} executed successfully, result sent to {receiver}") + return True + except Exception as e: logger.error(f"[Scheduler] Failed to execute task via Agent: {e}") import traceback logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") - + return False + except Exception as e: logger.error(f"[Scheduler] Error in _execute_agent_task: {e}") import traceback logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") + return False -def _execute_send_message(task: dict, agent_bridge): - """ - Execute a send_message action - - Args: - task: Task dictionary - agent_bridge: AgentBridge instance - """ +def _execute_send_message(task: dict, agent_bridge) -> bool: + """Execute a send_message action. Returns True/False for delivery.""" try: action = task.get("action", {}) content = action.get("content", "") @@ -263,7 +300,7 @@ def _execute_send_message(task: dict, agent_bridge): if not receiver: logger.error(f"[Scheduler] Task {task['id']}: No receiver specified") - return + return True # Create context for sending message context = Context(ContextType.TEXT, content) @@ -308,169 +345,135 @@ def _execute_send_message(task: dict, agent_bridge): # Get channel and send from channel.channel_factory import create_channel + channel = create_channel(channel_type) + if not channel: + logger.error(f"[Scheduler] Failed to create channel: {channel_type}") + return False + + if channel_type == "web" and hasattr(channel, 'request_to_session'): + channel.request_to_session[request_id] = receiver + try: - channel = create_channel(channel_type) - if channel: - # For web channel, register the request_id to session mapping - if channel_type == "web" and hasattr(channel, 'request_to_session'): - channel.request_to_session[request_id] = receiver - logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}") - - channel.send(reply, context) - _remember_delivered_output(agent_bridge, task, channel_type, content) - logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}") - else: - logger.error(f"[Scheduler] Failed to create channel: {channel_type}") + channel.send(reply, context) except Exception as e: logger.error(f"[Scheduler] Failed to send message: {e}") - import traceback - logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") - + return False + + _remember_delivered_output(agent_bridge, task, channel_type, content) + logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}") + return True + except Exception as e: logger.error(f"[Scheduler] Error in _execute_send_message: {e}") import traceback logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") + return False -def _execute_tool_call(task: dict, agent_bridge): - """ - Execute a tool_call action - - Args: - task: Task dictionary - agent_bridge: AgentBridge instance - """ +def _execute_tool_call(task: dict, agent_bridge) -> bool: + """Execute a tool_call action. Returns True/False for delivery.""" try: action = task.get("action", {}) - # Support both old and new field names tool_name = action.get("call_name") or action.get("tool_name") tool_params = action.get("call_params") or action.get("tool_params", {}) result_prefix = action.get("result_prefix", "") receiver = action.get("receiver") is_group = action.get("is_group", False) channel_type = action.get("channel_type", "unknown") - + if not tool_name: logger.error(f"[Scheduler] Task {task['id']}: No tool_name specified") - return - + return True if not receiver: logger.error(f"[Scheduler] Task {task['id']}: No receiver specified") - return - - # Get tool manager and create tool instance + return True + from agent.tools.tool_manager import ToolManager - tool_manager = ToolManager() - tool = tool_manager.create_tool(tool_name) - + tool = ToolManager().create_tool(tool_name) if not tool: logger.error(f"[Scheduler] Task {task['id']}: Tool '{tool_name}' not found") - return - - # Execute tool + return True + logger.info(f"[Scheduler] Task {task['id']}: Executing tool '{tool_name}' with params {tool_params}") result = tool.execute(tool_params) - - # Get result content - if hasattr(result, 'result'): - content = result.result - else: - content = str(result) - - # Add prefix if specified + content = result.result if hasattr(result, 'result') else str(result) if result_prefix: content = f"{result_prefix}\n\n{content}" - - # Send result as message + context = Context(ContextType.TEXT, content) context["receiver"] = receiver context["isgroup"] = is_group context["session_id"] = receiver - - # Channel-specific context setup + + request_id = None if channel_type == "web": - # Web channel needs request_id import uuid request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}" context["request_id"] = request_id - logger.debug(f"[Scheduler] Generated request_id for web channel: {request_id}") elif channel_type == "feishu": context["receive_id_type"] = "chat_id" if is_group else "open_id" context["msg"] = None - logger.debug(f"[Scheduler] Feishu: receive_id_type={context['receive_id_type']}, is_group={is_group}, receiver={receiver}") elif channel_type == "wecom_bot": context["msg"] = None reply = Reply(ReplyType.TEXT, content) - # Get channel and send from channel.channel_factory import create_channel + channel = create_channel(channel_type) + if not channel: + logger.error(f"[Scheduler] Failed to create channel: {channel_type}") + return False + + if channel_type == "web" and request_id and hasattr(channel, 'request_to_session'): + channel.request_to_session[request_id] = receiver try: - channel = create_channel(channel_type) - if channel: - if channel_type == "web" and hasattr(channel, 'request_to_session'): - channel.request_to_session[request_id] = receiver - logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}") - - channel.send(reply, context) - _remember_delivered_output(agent_bridge, task, channel_type, content) - logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}") - else: - logger.error(f"[Scheduler] Failed to create channel: {channel_type}") + channel.send(reply, context) except Exception as e: logger.error(f"[Scheduler] Failed to send tool result: {e}") + return False + + _remember_delivered_output(agent_bridge, task, channel_type, content) + logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}") + return True except Exception as e: logger.error(f"[Scheduler] Error in _execute_tool_call: {e}") + return False -def _execute_skill_call(task: dict, agent_bridge): - """ - Execute a skill_call action by asking Agent to run the skill - - Args: - task: Task dictionary - agent_bridge: AgentBridge instance - """ +def _execute_skill_call(task: dict, agent_bridge) -> bool: + """Execute a skill_call action by asking Agent to run the skill. + Returns True/False for delivery.""" try: action = task.get("action", {}) - # Support both old and new field names skill_name = action.get("call_name") or action.get("skill_name") skill_params = action.get("call_params") or action.get("skill_params", {}) result_prefix = action.get("result_prefix", "") receiver = action.get("receiver") is_group = action.get("isgroup", False) channel_type = action.get("channel_type", "unknown") - + if not skill_name: logger.error(f"[Scheduler] Task {task['id']}: No skill_name specified") - return - + return True if not receiver: logger.error(f"[Scheduler] Task {task['id']}: No receiver specified") - return - + return True + logger.info(f"[Scheduler] Task {task['id']}: Executing skill '{skill_name}' with params {skill_params}") - - # Create a unique session_id for this scheduled task to avoid polluting user's conversation - # Format: scheduler__ to ensure isolation + scheduler_session_id = f"scheduler_{receiver}_{task['id']}" - - # Build a natural language query for the Agent to execute the skill - # Format: "Use skill-name to do something with params" param_str = ", ".join([f"{k}={v}" for k, v in skill_params.items()]) query = f"Use {skill_name} skill" if param_str: query += f" with {param_str}" - - # Create context for Agent + context = Context(ContextType.TEXT, query) context["receiver"] = receiver context["isgroup"] = is_group context["session_id"] = scheduler_session_id - - # Channel-specific setup + if channel_type == "web": import uuid request_id = f"scheduler_{task['id']}_{uuid.uuid4().hex[:8]}" @@ -481,49 +484,48 @@ def _execute_skill_call(task: dict, agent_bridge): elif channel_type == "wecom_bot": context["msg"] = None - # Use Agent to execute the skill try: - # Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations reply = agent_bridge.agent_reply(query, context=context, on_event=None, clear_history=False) - - if reply and reply.content: - content = reply.content - - # Add prefix if specified - if result_prefix: - content = f"{result_prefix}\n\n{content}" - - # Send the result via channel - from channel.channel_factory import create_channel - - try: - channel = create_channel(channel_type) - if channel: - # For web channel, register request_id - if channel_type == "web" and hasattr(channel, 'request_to_session'): - req_id = context.get("request_id") - if req_id: - channel.request_to_session[req_id] = receiver - logger.debug(f"[Scheduler] Registered request_id {req_id} -> session {receiver}") - - channel.send(Reply(ReplyType.TEXT, content), context) - _remember_delivered_output(agent_bridge, task, channel_type, content) - except Exception as e: - logger.error(f"[Scheduler] Failed to send skill result: {e}") - - logger.info(f"[Scheduler] Task {task['id']} executed: skill result sent to {receiver}") - else: - logger.error(f"[Scheduler] Task {task['id']}: No result from skill execution") - except Exception as e: logger.error(f"[Scheduler] Failed to execute skill via Agent: {e}") import traceback logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") - + return False + + if not (reply and reply.content): + logger.error(f"[Scheduler] Task {task['id']}: No result from skill execution") + return True + + content = reply.content + if result_prefix: + content = f"{result_prefix}\n\n{content}" + + from channel.channel_factory import create_channel + channel = create_channel(channel_type) + if not channel: + logger.error(f"[Scheduler] Failed to create channel: {channel_type}") + return False + + if channel_type == "web" and hasattr(channel, 'request_to_session'): + req_id = context.get("request_id") + if req_id: + channel.request_to_session[req_id] = receiver + + try: + channel.send(Reply(ReplyType.TEXT, content), context) + except Exception as e: + logger.error(f"[Scheduler] Failed to send skill result: {e}") + return False + + _remember_delivered_output(agent_bridge, task, channel_type, content) + logger.info(f"[Scheduler] Task {task['id']} executed: skill result sent to {receiver}") + return True + except Exception as e: logger.error(f"[Scheduler] Error in _execute_skill_call: {e}") import traceback logger.error(f"[Scheduler] Traceback: {traceback.format_exc()}") + return False def attach_scheduler_to_tool(tool, context: Context = None): diff --git a/agent/tools/scheduler/scheduler_service.py b/agent/tools/scheduler/scheduler_service.py index dd5369cb..1f4bc6fb 100644 --- a/agent/tools/scheduler/scheduler_service.py +++ b/agent/tools/scheduler/scheduler_service.py @@ -52,7 +52,6 @@ class SchedulerService: self.running = True self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() - logger.debug("[Scheduler] Service started") def stop(self): """Stop the scheduler service""" @@ -67,7 +66,7 @@ class SchedulerService: def _run_loop(self): """Main scheduler loop""" - logger.debug("[Scheduler] Scheduler loop started") + logger.info("[Scheduler] Scheduler loop started") while self.running: try: @@ -84,12 +83,18 @@ class SchedulerService: for task in tasks: try: - # Check if task is due if self._is_task_due(task, now): logger.info(f"[Scheduler] Executing task: {task['id']} - {task['name']}") - self._execute_task(task) - - # Update next run time + ok = self._execute_task(task) + if not ok: + # Leave next_run_at as-is so the next loop retries. + # Cron tasks within the catch-up window will keep + # firing; beyond it _is_task_due will reschedule. + logger.warning( + f"[Scheduler] Task {task['id']} delivery failed, will retry next tick" + ) + continue + next_run = self._calculate_next_run(task, now) if next_run: self.task_store.update_task(task['id'], { @@ -97,7 +102,6 @@ class SchedulerService: "last_run_at": now.isoformat() }) else: - # One-time task completed, remove it self.task_store.delete_task(task['id']) logger.info(f"[Scheduler] One-time task completed and removed: {task['id']}") except Exception as e: @@ -128,30 +132,35 @@ class SchedulerService: try: next_run = _parse_naive_local(next_run_str) - # Check if task is overdue (e.g., service restart) if next_run < now: time_diff = (now - next_run).total_seconds() - - # If overdue by more than 5 minutes, skip this run and schedule next - if time_diff > 300: # 5 minutes - logger.warning(f"[Scheduler] Task {task['id']} is overdue by {int(time_diff)}s, skipping and scheduling next run") - - # For one-time tasks, remove them directly - schedule = task.get("schedule", {}) - if schedule.get("type") == "once": - self.task_store.delete_task(task['id']) - logger.info(f"[Scheduler] One-time task {task['id']} expired, removed") - return False - - # For recurring tasks, calculate next run from now - next_next_run = self._calculate_next_run(task, now) - if next_next_run: - self.task_store.update_task(task['id'], { - "next_run_at": next_next_run.isoformat() - }) - logger.info(f"[Scheduler] Rescheduled task {task['id']} to {next_next_run}") + schedule = task.get("schedule", {}) + schedule_type = schedule.get("type") + + # Catch-up window: fire if we're within 10 minutes of the + # scheduled tick. Beyond that we'd rather skip than push a + # stale daily report to the user. + if time_diff <= 600: + return True + + logger.warning( + f"[Scheduler] Task {task['id']} is overdue by {int(time_diff)}s, " + f"skipping and scheduling next run" + ) + + if schedule_type == "once": + self.task_store.delete_task(task['id']) + logger.info(f"[Scheduler] One-time task {task['id']} expired, removed") return False - + + next_next_run = self._calculate_next_run(task, now) + if next_next_run: + self.task_store.update_task(task['id'], { + "next_run_at": next_next_run.isoformat() + }) + logger.info(f"[Scheduler] Rescheduled task {task['id']} to {next_next_run}") + return False + return now >= next_run except Exception as e: logger.error( @@ -213,20 +222,22 @@ class SchedulerService: return None - def _execute_task(self, task: dict): + def _execute_task(self, task: dict) -> bool: """ - Execute a task - - Args: - task: Task dictionary + Execute a task. + + Returns True if delivery succeeded (caller should advance state), + False if it failed (caller should keep next_run_at so the next + loop iteration retries). Callback may return None for legacy + behaviour, treated as success. """ try: - # Call the execute callback - self.execute_callback(task) + result = self.execute_callback(task) + return False if result is False else True except Exception as e: logger.error(f"[Scheduler] Error executing task {task['id']}: {e}") - # Update task with error self.task_store.update_task(task['id'], { "last_error": str(e), "last_error_at": datetime.now().isoformat() }) + return False diff --git a/app.py b/app.py index be4ad2eb..eeb44e45 100644 --- a/app.py +++ b/app.py @@ -288,6 +288,16 @@ def _warmup_mcp_tools(): logger.warning(f"[App] MCP warmup failed (non-fatal): {e}") +def _warmup_scheduler(): + """Eager-init AgentBridge so the scheduler thread starts at process + boot rather than waiting for the first user message.""" + try: + from bridge.bridge import Bridge + Bridge().get_agent_bridge() + except Exception as e: + logger.warning(f"[App] Scheduler warmup failed: {e}") + + def _sync_builtin_skills(): """Sync builtin skills from project skills/ to workspace skills/ on startup.""" import shutil @@ -353,6 +363,8 @@ def run(): # latency isn't dominated by npx package downloads. _warmup_mcp_tools() + _warmup_scheduler() + logger.info(f"[App] Starting channels: {channel_names}") _channel_mgr = ChannelManager() diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index e60ffd9d..a20659a8 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -285,6 +285,15 @@ class AgentBridge: # Create helper instances self.initializer = AgentInitializer(bridge, self) + + # Eager-start the scheduler so cron tasks fire without waiting + # for the first user message. init_scheduler is idempotent. + try: + from agent.tools.scheduler.integration import init_scheduler + if init_scheduler(self): + self.scheduler_initialized = True + except Exception as e: + logger.warning(f"[AgentBridge] Eager scheduler init failed: {e}") def create_agent(self, system_prompt: str, tools: List = None, **kwargs) -> Agent: """ Create the super agent with COW integration diff --git a/channel/weixin/weixin_channel.py b/channel/weixin/weixin_channel.py index 61f5cbb1..309aecab 100644 --- a/channel/weixin/weixin_channel.py +++ b/channel/weixin/weixin_channel.py @@ -47,14 +47,16 @@ def _load_credentials(cred_path: str) -> dict: def _save_credentials(cred_path: str, data: dict): - """Save credentials to JSON file.""" + """Atomically save credentials to JSON file (tmp + rename).""" os.makedirs(os.path.dirname(cred_path), exist_ok=True) - with open(cred_path, "w") as f: + tmp_path = f"{cred_path}.tmp" + with open(tmp_path, "w") as f: json.dump(data, f, indent=2) try: - os.chmod(cred_path, 0o600) + os.chmod(tmp_path, 0o600) except Exception: pass + os.replace(tmp_path, cred_path) @singleton @@ -73,7 +75,10 @@ class WeixinChannel(ChatChannel): self.api = None self._stop_event = threading.Event() self._poll_thread = None - self._context_tokens = {} # user_id -> context_token + # user_id -> context_token. Guarded by _context_tokens_lock for any + # mutation that races with disk persistence. + self._context_tokens = {} + self._context_tokens_lock = threading.Lock() self._received_msgs = ExpiredDict(60 * 60 * 7.1) self._get_updates_buf = "" self._credentials_path = "" @@ -95,12 +100,19 @@ class WeixinChannel(ChatChannel): conf().get("weixin_credentials_path", "~/.weixin_cow_credentials.json") ) + # Always load credentials so we can restore context_tokens even when + # the bot token itself comes from config. + creds = _load_credentials(self._credentials_path) if not token: - creds = _load_credentials(self._credentials_path) token = creds.get("token", "") if creds.get("base_url"): base_url = creds["base_url"] + # Restore persisted context_tokens so scheduler can deliver pushes + # immediately after restart, without waiting for the user to ping + # the bot first. + self._restore_context_tokens_from_creds(creds) + if not token: token, base_url = self._login_with_retry(base_url) if not token: @@ -140,11 +152,16 @@ class WeixinChannel(ChatChannel): def _relogin(self) -> bool: """Re-login after session expiry. Returns True on success.""" base_url = self.api.base_url if self.api else DEFAULT_BASE_URL - if os.path.exists(self._credentials_path): - try: - os.remove(self._credentials_path) - except Exception: - pass + # Clearing the whole credentials file is intentional: the new login + # will issue a fresh `token` and persisted context_tokens belong to + # the previous bot identity, so they must not survive. + with self._context_tokens_lock: + self._context_tokens.clear() + if os.path.exists(self._credentials_path): + try: + os.remove(self._credentials_path) + except Exception: + pass self.login_status = self.LOGIN_STATUS_WAITING result = self._qr_login(base_url) if not result: @@ -156,9 +173,62 @@ class WeixinChannel(ChatChannel): cdn_base_url=self.api.cdn_base_url if self.api else CDN_BASE_URL, ) self.login_status = self.LOGIN_STATUS_OK - self._context_tokens.clear() return True + # ── Context token persistence ────────────────────────────────────── + # ilink requires every outbound send to echo the context_token from the + # user's latest inbound message. We mirror the in-memory map into the + # credentials JSON so scheduled pushes survive process restarts. + # All mutation + disk IO is serialized via _context_tokens_lock so that + # concurrent updates can never lose each other's writes. + + def _restore_context_tokens_from_creds(self, creds: dict) -> None: + if not isinstance(creds, dict): + return + tokens = creds.get("context_tokens") + if not isinstance(tokens, dict): + return + restored = 0 + with self._context_tokens_lock: + for user_id, token in tokens.items(): + if isinstance(user_id, str) and isinstance(token, str) and token: + self._context_tokens[user_id] = token + restored += 1 + if restored: + logger.info(f"[Weixin] Restored {restored} context_tokens from credentials") + + def _persist_context_tokens_locked(self) -> None: + """Flush the token map to disk. Caller must hold _context_tokens_lock.""" + if not self._credentials_path: + return + try: + creds = _load_credentials(self._credentials_path) or {} + creds["context_tokens"] = dict(self._context_tokens) + _save_credentials(self._credentials_path, creds) + except Exception as e: + logger.warning(f"[Weixin] Failed to persist context_tokens: {e}") + + def _update_context_token(self, user_id: str, token: str) -> None: + """Update the in-memory token for a user; flush to disk only on change.""" + if not user_id or not token: + return + with self._context_tokens_lock: + if self._context_tokens.get(user_id) == token: + return + self._context_tokens[user_id] = token + self._persist_context_tokens_locked() + + def _invalidate_context_token(self, user_id: str) -> None: + """Drop the cached token for a user (used after -14 / send rejection).""" + if not user_id: + return + with self._context_tokens_lock: + if user_id not in self._context_tokens: + return + del self._context_tokens[user_id] + logger.info(f"[Weixin] Invalidated stale context_token for {user_id}") + self._persist_context_tokens_locked() + # ── QR Login ─────────────────────────────────────────────────────── @staticmethod @@ -391,7 +461,7 @@ class WeixinChannel(ChatChannel): context_token = raw_msg.get("context_token", "") if context_token and from_user: - self._context_tokens[from_user] = context_token + self._update_context_token(from_user, context_token) cdn_base_url = self.api.cdn_base_url if self.api else CDN_BASE_URL try: @@ -510,10 +580,30 @@ class WeixinChannel(ChatChannel): return msg.context_token return self._context_tokens.get(receiver, "") + def _check_send_response(self, resp, receiver: str) -> None: + """Inspect a send-API response; drop stale context_token on -14. + + ilink uses ret/errcode = -14 to signal that the session (and any + cached context_token) is no longer valid. The plugin keeps running + because the bot itself can re-login; we just need to forget the + per-user token so the next push won't retry forever. + """ + if not isinstance(resp, dict): + return + ret = resp.get("ret") + errcode = resp.get("errcode") + if ret == -14 or errcode == -14: + logger.warning( + f"[Weixin] Send returned -14 (session expired) for " + f"receiver={receiver}; dropping cached context_token" + ) + self._invalidate_context_token(receiver) + def _send_text(self, text: str, receiver: str, context_token: str): if len(text) <= TEXT_CHUNK_LIMIT: try: - self.api.send_text(receiver, text, context_token) + resp = self.api.send_text(receiver, text, context_token) + self._check_send_response(resp, receiver) logger.debug(f"[Weixin] Text sent to {receiver}, len={len(text)}") except Exception as e: logger.error(f"[Weixin] Failed to send text: {e}") @@ -522,7 +612,8 @@ class WeixinChannel(ChatChannel): chunks = self._split_text(text, TEXT_CHUNK_LIMIT) for i, chunk in enumerate(chunks): try: - self.api.send_text(receiver, chunk, context_token) + resp = self.api.send_text(receiver, chunk, context_token) + self._check_send_response(resp, receiver) logger.debug(f"[Weixin] Text chunk {i+1}/{len(chunks)} sent to {receiver}, len={len(chunk)}") except Exception as e: logger.error(f"[Weixin] Failed to send text chunk {i+1}/{len(chunks)}: {e}") @@ -556,13 +647,14 @@ class WeixinChannel(ChatChannel): return try: result = upload_media_to_cdn(self.api, local_path, receiver, media_type=1) - self.api.send_image_item( + resp = self.api.send_image_item( to=receiver, context_token=context_token, encrypt_query_param=result["encrypt_query_param"], aes_key_b64=result["aes_key_b64"], ciphertext_size=result["ciphertext_size"], ) + self._check_send_response(resp, receiver) logger.info(f"[Weixin] Image sent to {receiver}") except Exception as e: logger.error(f"[Weixin] Image send failed: {e}") @@ -575,7 +667,7 @@ class WeixinChannel(ChatChannel): return try: result = upload_media_to_cdn(self.api, local_path, receiver, media_type=3) - self.api.send_file_item( + resp = self.api.send_file_item( to=receiver, context_token=context_token, encrypt_query_param=result["encrypt_query_param"], @@ -583,6 +675,7 @@ class WeixinChannel(ChatChannel): file_name=os.path.basename(local_path), file_size=result["raw_size"], ) + self._check_send_response(resp, receiver) logger.info(f"[Weixin] File sent to {receiver}") except Exception as e: logger.error(f"[Weixin] File send failed: {e}") @@ -595,13 +688,14 @@ class WeixinChannel(ChatChannel): return try: result = upload_media_to_cdn(self.api, local_path, receiver, media_type=2) - self.api.send_video_item( + resp = self.api.send_video_item( to=receiver, context_token=context_token, encrypt_query_param=result["encrypt_query_param"], aes_key_b64=result["aes_key_b64"], ciphertext_size=result["ciphertext_size"], ) + self._check_send_response(resp, receiver) logger.info(f"[Weixin] Video sent to {receiver}") except Exception as e: logger.error(f"[Weixin] Video send failed: {e}")