From f150d7d83a3beda9990a4b13b24a6c6dd475ad49 Mon Sep 17 00:00:00 2001 From: tingchim2pro Date: Sat, 2 May 2026 19:00:50 +0800 Subject: [PATCH] fix: remember scheduled task outputs in receiver session (v2) Address review feedback from #2794: 1. Use notify_session_id instead of receiver for correct group chat mapping - Task creation should store the real session_id in action.notify_session_id - Falls back to receiver for backward compatibility with old tasks 2. Add injection to all four execution branches: - _execute_agent_task - _execute_send_message - _execute_tool_call - _execute_skill_call (also fixed missing channel.send) 3. Add config switch and content truncation: - scheduler_inject_to_session (default: true) to toggle the feature - 2000 char limit prevents high-frequency tasks from bloating sessions Fixes #2793 --- agent/tools/scheduler/integration.py | 50 ++++++++++++++++++++++++++++ bridge/agent_bridge.py | 50 ++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py index 949c9ff3..ac16a460 100644 --- a/agent/tools/scheduler/integration.py +++ b/agent/tools/scheduler/integration.py @@ -84,6 +84,35 @@ def get_scheduler_service(): return _scheduler_service +def _remember_delivered_output( + agent_bridge, + task: dict, + channel_type: str, + content: str, +) -> None: + """Best-effort persistence of the message the scheduler sent to a user. + + Uses notify_session_id (the real chat session_id stored at task creation time) + so that group chats correctly associate the output with the user's conversation. + Falls back to receiver for backward compatibility with old tasks. + """ + if not content: + return + action = task.get("action", {}) + session_id = action.get("notify_session_id") or action.get("receiver") + if not session_id: + return + try: + remember = getattr(agent_bridge, "remember_scheduled_output", None) + if remember: + task_desc = action.get("task_description") or action.get("content", "") + remember(session_id, str(content), channel_type=channel_type, task_description=task_desc) + except Exception as e: + logger.warning( + f"[Scheduler] Failed to remember delivered output for {session_id}: {e}" + ) + + def _execute_agent_task(task: dict, agent_bridge): """ Execute an agent_task action - let Agent handle the task @@ -165,6 +194,7 @@ def _execute_agent_task(task: dict, agent_bridge): # Send the reply channel.send(reply, context) + _remember_delivered_output(agent_bridge, task, channel_type, reply.content) logger.info(f"[Scheduler] Task {task['id']} executed successfully, result sent to {receiver}") else: logger.error(f"[Scheduler] Failed to create channel: {channel_type}") @@ -255,6 +285,7 @@ def _execute_send_message(task: dict, agent_bridge): logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}") channel.send(reply, context) + _remember_delivered_output(agent_bridge, task, channel_type, content) logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}") else: logger.error(f"[Scheduler] Failed to create channel: {channel_type}") @@ -351,6 +382,7 @@ def _execute_tool_call(task: dict, agent_bridge): logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}") channel.send(reply, context) + _remember_delivered_output(agent_bridge, task, channel_type, content) logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}") else: logger.error(f"[Scheduler] Failed to create channel: {channel_type}") @@ -429,6 +461,24 @@ def _execute_skill_call(task: dict, agent_bridge): if result_prefix: content = f"{result_prefix}\n\n{content}" + # Send the result via channel + from channel.channel_factory import create_channel + + try: + channel = create_channel(channel_type) + if channel: + # For web channel, register request_id + if channel_type == "web" and hasattr(channel, 'request_to_session'): + req_id = context.get("request_id") + if req_id: + channel.request_to_session[req_id] = receiver + logger.debug(f"[Scheduler] Registered request_id {req_id} -> session {receiver}") + + channel.send(Reply(ReplyType.TEXT, content), context) + _remember_delivered_output(agent_bridge, task, channel_type, content) + except Exception as e: + logger.error(f"[Scheduler] Failed to send skill result: {e}") + logger.info(f"[Scheduler] Task {task['id']} executed: skill result sent to {receiver}") else: logger.error(f"[Scheduler] Task {task['id']}: No result from skill execution") diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index 3701df77..f9266111 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -634,6 +634,56 @@ class AgentBridge: f"[AgentBridge] Failed to persist messages for session={session_id}: {e}" ) + def remember_scheduled_output( + self, + session_id: str, + content: str, + channel_type: str = "", + task_description: str = "", + ) -> None: + """Add the visible output of a scheduled task to the receiver's session. + + Scheduled task execution uses an isolated session so internal planning and + tool calls do not leak into the user's chat. The final message is still + part of the conversation from the user's point of view, so keep a small + visible turn in the receiver session for follow-up questions. + + Controlled by config key `scheduler_inject_to_session` (default: True). + Content is truncated to 2000 chars to prevent session bloat from high-frequency tasks. + """ + if not conf().get("scheduler_inject_to_session", True): + return + if not session_id or not content: + return + + # Truncate to prevent high-frequency tasks from bloating the session + max_len = 2000 + if len(content) > max_len: + content = content[:max_len] + "..." + + user_text = "Scheduled task" + if task_description: + user_text = f"{user_text}: {task_description}" + + messages = [ + {"role": "user", "content": [{"type": "text", "text": user_text}]}, + {"role": "assistant", "content": [{"type": "text", "text": content}]}, + ] + + # Update in-memory agent if it exists + agent = self.agents.get(session_id) + if agent: + try: + with agent.messages_lock: + agent.messages.extend(messages) + except Exception as e: + logger.warning( + f"[AgentBridge] Failed to update in-memory scheduled output " + f"for session={session_id}: {e}" + ) + + self._persist_messages(session_id, messages, channel_type) + @staticmethod def _strip_thinking_blocks(messages: list) -> list: """Return a shallow copy of messages with assistant "thinking" blocks removed."""