From aea081703f75fcd213806fe73cfe12d35b70148b Mon Sep 17 00:00:00 2001 From: zhayujie Date: Sun, 3 May 2026 21:27:24 +0800 Subject: [PATCH] fix(scheduler): inject delivered output into receiver session with sliding window MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Further refinements on top of #2795: - persist real session_id (notify_session_id) at task creation so group chats correctly map back to the user's actual conversation - mark scheduler turns with [SCHEDULED] (recognise legacy "Scheduled task" prefix too for backward-compatible pruning) - prune both DB and in-memory to scheduler_inject_max_per_session (default 3), only marker-tagged pairs are touched; regular user turns never deleted - send_message type gated by scheduler_inject_send_message (default false) — fixed reminder text rarely benefits follow-up Q&A Co-authored-by: huangrichao2020 --- agent/memory/conversation_store.py | 101 ++++++++++++++++++++++++ agent/tools/scheduler/integration.py | 14 ++++ agent/tools/scheduler/scheduler_tool.py | 11 ++- bridge/agent_bridge.py | 99 +++++++++++++++++++++-- docs/tools/scheduler.mdx | 30 +++++++ 5 files changed, 246 insertions(+), 9 deletions(-) diff --git a/agent/memory/conversation_store.py b/agent/memory/conversation_store.py index b4b21a74..b620986d 100644 --- a/agent/memory/conversation_store.py +++ b/agent/memory/conversation_store.py @@ -499,6 +499,107 @@ class ConversationStore: finally: conn.close() + def prune_scheduled_messages( + self, + session_id: str, + keep_last_n: int, + markers: Optional[List[str]] = None, + ) -> int: + """ + Keep at most ``keep_last_n`` scheduler-injected user/assistant pairs in + the session, deleting the older ones. + + A scheduler-injected pair is identified by a user message whose first + text block starts with one of ``markers``; the immediately following + assistant message (next seq) is treated as its paired output. + + Only scheduler-tagged messages are touched; regular user turns are + never deleted. Safe to call repeatedly; no-op if nothing to prune. + + Args: + session_id: Session to prune. + keep_last_n: Maximum scheduler pairs to retain (must be >= 0). + markers: Text prefixes that identify scheduler user messages. + Defaults to ``["[SCHEDULED]", "Scheduled task"]`` so that + pairs written by older versions are also recognised. + + Returns: + Number of message rows deleted. + """ + if keep_last_n < 0: + keep_last_n = 0 + if markers is None: + markers = ["[SCHEDULED]", "Scheduled task"] + + def _matches_marker(raw_content: str) -> bool: + try: + parsed = json.loads(raw_content) + except Exception: + parsed = raw_content + text = _extract_display_text(parsed) if not isinstance(parsed, str) else parsed + if not text: + return False + return any(text.startswith(m) for m in markers) + + with self._lock: + conn = self._connect() + try: + rows = conn.execute( + """ + SELECT seq, role, content + FROM messages + WHERE session_id = ? + ORDER BY seq ASC + """, + (session_id,), + ).fetchall() + + # Find scheduler pairs: each is (user_seq, assistant_seq?) + pairs: List[tuple] = [] # list of (user_seq, assistant_seq_or_None) + for idx, (seq, role, raw_content) in enumerate(rows): + if role != "user" or not _matches_marker(raw_content): + continue + assistant_seq = None + # Pair with the very next message if it's an assistant turn. + if idx + 1 < len(rows): + next_seq, next_role, _ = rows[idx + 1] + if next_role == "assistant": + assistant_seq = next_seq + pairs.append((seq, assistant_seq)) + + if len(pairs) <= keep_last_n: + return 0 + + to_delete_pairs = pairs[: len(pairs) - keep_last_n] + seqs_to_delete: List[int] = [] + for user_seq, assistant_seq in to_delete_pairs: + seqs_to_delete.append(user_seq) + if assistant_seq is not None: + seqs_to_delete.append(assistant_seq) + + if not seqs_to_delete: + return 0 + + placeholders = ",".join("?" * len(seqs_to_delete)) + with conn: + conn.execute( + f"DELETE FROM messages WHERE session_id = ? AND seq IN ({placeholders})", + (session_id, *seqs_to_delete), + ) + conn.execute( + """ + UPDATE sessions + SET msg_count = ( + SELECT COUNT(*) FROM messages WHERE session_id = ? + ) + WHERE session_id = ? + """, + (session_id, session_id), + ) + return len(seqs_to_delete) + finally: + conn.close() + def cleanup_old_sessions(self, max_age_days: Optional[int] = None) -> int: """ Delete sessions that have not been active within max_age_days. diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py index ac16a460..49d2fdfa 100644 --- a/agent/tools/scheduler/integration.py +++ b/agent/tools/scheduler/integration.py @@ -95,10 +95,24 @@ def _remember_delivered_output( Uses notify_session_id (the real chat session_id stored at task creation time) so that group chats correctly associate the output with the user's conversation. Falls back to receiver for backward compatibility with old tasks. + + Per-action-type behaviour: + - agent_task / tool_call / skill_call: gated by ``scheduler_inject_to_session`` + (default True). These produce AI-generated content worth remembering. + - send_message: additionally gated by ``scheduler_inject_send_message`` + (default False). Fixed reminder text rarely benefits follow-up Q&A and + would just consume context tokens. """ if not content: return action = task.get("action", {}) + action_type = action.get("type", "") + + # send_message defaults to NOT being injected; explicit opt-in via config. + if action_type == "send_message": + if not conf().get("scheduler_inject_send_message", False): + return + session_id = action.get("notify_session_id") or action.get("receiver") if not session_id: return diff --git a/agent/tools/scheduler/scheduler_tool.py b/agent/tools/scheduler/scheduler_tool.py index 885f4a7a..0f35b9ec 100644 --- a/agent/tools/scheduler/scheduler_tool.py +++ b/agent/tools/scheduler/scheduler_tool.py @@ -158,6 +158,11 @@ class SchedulerTool(BaseTool): # Create task task_id = str(uuid.uuid4())[:8] + # Capture the real chat session_id at task creation time so that scheduler + # can later inject the delivered output into the user's actual conversation + # (in group chats, session_id != receiver, e.g. "user_id:group_id" on feishu). + notify_session_id = context.get("session_id") + # Build action based on message or ai_task if message: action = { @@ -166,7 +171,8 @@ class SchedulerTool(BaseTool): "receiver": context.get("receiver"), "receiver_name": self._get_receiver_name(context), "is_group": context.get("isgroup", False), - "channel_type": self.config.get("channel_type", "unknown") + "channel_type": self.config.get("channel_type", "unknown"), + "notify_session_id": notify_session_id, } else: # ai_task action = { @@ -175,7 +181,8 @@ class SchedulerTool(BaseTool): "receiver": context.get("receiver"), "receiver_name": self._get_receiver_name(context), "is_group": context.get("isgroup", False), - "channel_type": self.config.get("channel_type", "unknown") + "channel_type": self.config.get("channel_type", "unknown"), + "notify_session_id": notify_session_id, } # 针对钉钉单聊,额外存储 sender_staff_id diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index f9266111..7a1abfa3 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -634,6 +634,13 @@ class AgentBridge: f"[AgentBridge] Failed to persist messages for session={session_id}: {e}" ) + # Marker used to identify scheduler-injected user messages so we can apply + # a sliding window without touching real user turns. The legacy prefix + # "Scheduled task" (written by the v2 PR) is also recognised when pruning, + # so old data can be aged out instead of leaking forever. + _SCHEDULED_MARKER = "[SCHEDULED]" + _SCHEDULED_LEGACY_MARKERS = ("Scheduled task",) + def remember_scheduled_output( self, session_id: str, @@ -648,41 +655,119 @@ class AgentBridge: part of the conversation from the user's point of view, so keep a small visible turn in the receiver session for follow-up questions. - Controlled by config key `scheduler_inject_to_session` (default: True). - Content is truncated to 2000 chars to prevent session bloat from high-frequency tasks. + Configuration: + scheduler_inject_to_session (bool, default True): + Master switch. When False, this method is a no-op. + scheduler_inject_max_per_session (int, default 3): + Maximum scheduler-injected user/assistant pairs retained per + session. Older injections are pruned automatically. + + Content is truncated to 2000 chars to prevent a single high-volume task + from bloating one entry. """ + from config import conf if not conf().get("scheduler_inject_to_session", True): return if not session_id or not content: return - # Truncate to prevent high-frequency tasks from bloating the session max_len = 2000 if len(content) > max_len: content = content[:max_len] + "..." - user_text = "Scheduled task" + user_text = self._SCHEDULED_MARKER if task_description: - user_text = f"{user_text}: {task_description}" + user_text = f"{self._SCHEDULED_MARKER} {task_description}" messages = [ {"role": "user", "content": [{"type": "text", "text": user_text}]}, {"role": "assistant", "content": [{"type": "text", "text": content}]}, ] - # Update in-memory agent if it exists + # Persist first so the new pair gets a stable seq, then prune old + # scheduler pairs in DB, then sync the in-memory agent.messages buffer. + self._persist_messages(session_id, messages, channel_type) + + keep_last_n = max(int(conf().get("scheduler_inject_max_per_session", 3) or 0), 0) + try: + from agent.memory import get_conversation_store + deleted = get_conversation_store().prune_scheduled_messages( + session_id, keep_last_n=keep_last_n + ) + if deleted: + logger.debug( + f"[AgentBridge] Pruned {deleted} old scheduler messages " + f"for session={session_id} (keep_last_n={keep_last_n})" + ) + except Exception as e: + logger.warning( + f"[AgentBridge] Failed to prune scheduled messages " + f"for session={session_id}: {e}" + ) + agent = self.agents.get(session_id) if agent: try: with agent.messages_lock: agent.messages.extend(messages) + self._prune_scheduled_in_memory(agent, keep_last_n) except Exception as e: logger.warning( f"[AgentBridge] Failed to update in-memory scheduled output " f"for session={session_id}: {e}" ) - self._persist_messages(session_id, messages, channel_type) + @classmethod + def _prune_scheduled_in_memory(cls, agent, keep_last_n: int) -> None: + """Mirror conversation_store.prune_scheduled_messages on agent.messages. + + Caller must hold ``agent.messages_lock``. + """ + if keep_last_n < 0: + keep_last_n = 0 + + markers = (cls._SCHEDULED_MARKER,) + cls._SCHEDULED_LEGACY_MARKERS + + def _is_marker_user(msg) -> bool: + if not isinstance(msg, dict) or msg.get("role") != "user": + return False + content = msg.get("content") + text = "" + if isinstance(content, str): + text = content + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text = block.get("text", "") + break + return any(text.startswith(m) for m in markers) + + msgs = agent.messages + pair_indices = [] # list of (user_idx, assistant_idx_or_None) + for idx, msg in enumerate(msgs): + if not _is_marker_user(msg): + continue + assistant_idx = None + if idx + 1 < len(msgs): + nxt = msgs[idx + 1] + if isinstance(nxt, dict) and nxt.get("role") == "assistant": + assistant_idx = idx + 1 + pair_indices.append((idx, assistant_idx)) + + if len(pair_indices) <= keep_last_n: + return + + to_drop = pair_indices[: len(pair_indices) - keep_last_n] + drop_set = set() + for u_idx, a_idx in to_drop: + drop_set.add(u_idx) + if a_idx is not None: + drop_set.add(a_idx) + + # Rebuild the list in place to keep external references stable. + kept = [m for i, m in enumerate(msgs) if i not in drop_set] + msgs.clear() + msgs.extend(kept) @staticmethod def _strip_thinking_blocks(messages: list) -> list: diff --git a/docs/tools/scheduler.mdx b/docs/tools/scheduler.mdx index 91b1789e..7cd6221b 100644 --- a/docs/tools/scheduler.mdx +++ b/docs/tools/scheduler.mdx @@ -38,3 +38,33 @@ description: 创建和管理定时任务 + +## 结果进入会话上下文 + +定时任务在隔离 session 中执行(内部规划与 tool 调用不污染用户会话),但**最终输出**会作为一对消息回写到接收者的真实会话,用户可以直接追问"刚才那条第二点展开说说"。 + +**默认策略** + +- Agent 动态任务的输出进入上下文 +- 固定消息类任务默认不进入上下文(可通过配置打开) +- 每个会话最多保留最近 **3 对** scheduler 消息,更早的自动清理;普通用户消息不受影响 + +**配置项** + +| 配置项 | 默认值 | 说明 | +| --- | --- | --- | +| `scheduler_inject_to_session` | `true` | 总开关 | +| `scheduler_inject_max_per_session` | `3` | 每会话保留 scheduler 消息对数上限 | +| `scheduler_inject_send_message` | `false` | 是否同时注入固定消息类任务 | + +```json +{ + "scheduler_inject_to_session": true, + "scheduler_inject_max_per_session": 3, + "scheduler_inject_send_message": false +} +``` + + +群聊场景(飞书 / 企微群机器人 / 钉钉等)下用户的真实 session_id 形如 `user_id:group_id`,与 receiver 不同。创建任务时会自动记录正确的 session_id;老的 `tasks.json` 缺该字段时回落到 receiver,行为与历史版本一致。 +