fix: remember scheduled task outputs in receiver session (v2)

Address review feedback from #2794:

1. Use notify_session_id instead of receiver for correct group chat mapping
   - Task creation should store the real session_id in action.notify_session_id
   - Falls back to receiver for backward compatibility with old tasks

2. Add injection to all four execution branches:
   - _execute_agent_task
   - _execute_send_message
   - _execute_tool_call
   - _execute_skill_call (also fixed missing channel.send)

3. Add config switch and content truncation:
   - scheduler_inject_to_session (default: true) to toggle the feature
   - 2000 char limit prevents high-frequency tasks from bloating sessions

Fixes #2793
This commit is contained in:
tingchim2pro
2026-05-02 19:00:50 +08:00
parent 02bfe30848
commit f150d7d83a
2 changed files with 100 additions and 0 deletions

View File

@@ -84,6 +84,35 @@ def get_scheduler_service():
return _scheduler_service return _scheduler_service
def _remember_delivered_output(
agent_bridge,
task: dict,
channel_type: str,
content: str,
) -> None:
"""Best-effort persistence of the message the scheduler sent to a user.
Uses notify_session_id (the real chat session_id stored at task creation time)
so that group chats correctly associate the output with the user's conversation.
Falls back to receiver for backward compatibility with old tasks.
"""
if not content:
return
action = task.get("action", {})
session_id = action.get("notify_session_id") or action.get("receiver")
if not session_id:
return
try:
remember = getattr(agent_bridge, "remember_scheduled_output", None)
if remember:
task_desc = action.get("task_description") or action.get("content", "")
remember(session_id, str(content), channel_type=channel_type, task_description=task_desc)
except Exception as e:
logger.warning(
f"[Scheduler] Failed to remember delivered output for {session_id}: {e}"
)
def _execute_agent_task(task: dict, agent_bridge): def _execute_agent_task(task: dict, agent_bridge):
""" """
Execute an agent_task action - let Agent handle the task Execute an agent_task action - let Agent handle the task
@@ -165,6 +194,7 @@ def _execute_agent_task(task: dict, agent_bridge):
# Send the reply # Send the reply
channel.send(reply, context) channel.send(reply, context)
_remember_delivered_output(agent_bridge, task, channel_type, reply.content)
logger.info(f"[Scheduler] Task {task['id']} executed successfully, result sent to {receiver}") logger.info(f"[Scheduler] Task {task['id']} executed successfully, result sent to {receiver}")
else: else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}") logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
@@ -255,6 +285,7 @@ def _execute_send_message(task: dict, agent_bridge):
logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}") logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
channel.send(reply, context) channel.send(reply, context)
_remember_delivered_output(agent_bridge, task, channel_type, content)
logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}") logger.info(f"[Scheduler] Task {task['id']} executed: sent message to {receiver}")
else: else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}") logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
@@ -351,6 +382,7 @@ def _execute_tool_call(task: dict, agent_bridge):
logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}") logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
channel.send(reply, context) channel.send(reply, context)
_remember_delivered_output(agent_bridge, task, channel_type, content)
logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}") logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}")
else: else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}") logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
@@ -429,6 +461,24 @@ def _execute_skill_call(task: dict, agent_bridge):
if result_prefix: if result_prefix:
content = f"{result_prefix}\n\n{content}" content = f"{result_prefix}\n\n{content}"
# Send the result via channel
from channel.channel_factory import create_channel
try:
channel = create_channel(channel_type)
if channel:
# For web channel, register request_id
if channel_type == "web" and hasattr(channel, 'request_to_session'):
req_id = context.get("request_id")
if req_id:
channel.request_to_session[req_id] = receiver
logger.debug(f"[Scheduler] Registered request_id {req_id} -> session {receiver}")
channel.send(Reply(ReplyType.TEXT, content), context)
_remember_delivered_output(agent_bridge, task, channel_type, content)
except Exception as e:
logger.error(f"[Scheduler] Failed to send skill result: {e}")
logger.info(f"[Scheduler] Task {task['id']} executed: skill result sent to {receiver}") logger.info(f"[Scheduler] Task {task['id']} executed: skill result sent to {receiver}")
else: else:
logger.error(f"[Scheduler] Task {task['id']}: No result from skill execution") logger.error(f"[Scheduler] Task {task['id']}: No result from skill execution")

View File

@@ -634,6 +634,56 @@ class AgentBridge:
f"[AgentBridge] Failed to persist messages for session={session_id}: {e}" f"[AgentBridge] Failed to persist messages for session={session_id}: {e}"
) )
def remember_scheduled_output(
self,
session_id: str,
content: str,
channel_type: str = "",
task_description: str = "",
) -> None:
"""Add the visible output of a scheduled task to the receiver's session.
Scheduled task execution uses an isolated session so internal planning and
tool calls do not leak into the user's chat. The final message is still
part of the conversation from the user's point of view, so keep a small
visible turn in the receiver session for follow-up questions.
Controlled by config key `scheduler_inject_to_session` (default: True).
Content is truncated to 2000 chars to prevent session bloat from high-frequency tasks.
"""
if not conf().get("scheduler_inject_to_session", True):
return
if not session_id or not content:
return
# Truncate to prevent high-frequency tasks from bloating the session
max_len = 2000
if len(content) > max_len:
content = content[:max_len] + "..."
user_text = "Scheduled task"
if task_description:
user_text = f"{user_text}: {task_description}"
messages = [
{"role": "user", "content": [{"type": "text", "text": user_text}]},
{"role": "assistant", "content": [{"type": "text", "text": content}]},
]
# Update in-memory agent if it exists
agent = self.agents.get(session_id)
if agent:
try:
with agent.messages_lock:
agent.messages.extend(messages)
except Exception as e:
logger.warning(
f"[AgentBridge] Failed to update in-memory scheduled output "
f"for session={session_id}: {e}"
)
self._persist_messages(session_id, messages, channel_type)
@staticmethod @staticmethod
def _strip_thinking_blocks(messages: list) -> list: def _strip_thinking_blocks(messages: list) -> list:
"""Return a shallow copy of messages with assistant "thinking" blocks removed.""" """Return a shallow copy of messages with assistant "thinking" blocks removed."""