diff --git a/agent/memory/conversation_store.py b/agent/memory/conversation_store.py
index b4b21a74..b620986d 100644
--- a/agent/memory/conversation_store.py
+++ b/agent/memory/conversation_store.py
@@ -499,6 +499,107 @@ class ConversationStore:
finally:
conn.close()
+ def prune_scheduled_messages(
+ self,
+ session_id: str,
+ keep_last_n: int,
+ markers: Optional[List[str]] = None,
+ ) -> int:
+ """
+ Keep at most ``keep_last_n`` scheduler-injected user/assistant pairs in
+ the session, deleting the older ones.
+
+ A scheduler-injected pair is identified by a user message whose first
+ text block starts with one of ``markers``; the immediately following
+ assistant message (next seq) is treated as its paired output.
+
+ Only scheduler-tagged messages are touched; regular user turns are
+ never deleted. Safe to call repeatedly; no-op if nothing to prune.
+
+ Args:
+ session_id: Session to prune.
+ keep_last_n: Maximum scheduler pairs to retain (must be >= 0).
+ markers: Text prefixes that identify scheduler user messages.
+ Defaults to ``["[SCHEDULED]", "Scheduled task"]`` so that
+ pairs written by older versions are also recognised.
+
+ Returns:
+ Number of message rows deleted.
+ """
+ if keep_last_n < 0:
+ keep_last_n = 0
+ if markers is None:
+ markers = ["[SCHEDULED]", "Scheduled task"]
+
+ def _matches_marker(raw_content: str) -> bool:
+ try:
+ parsed = json.loads(raw_content)
+ except Exception:
+ parsed = raw_content
+ text = _extract_display_text(parsed) if not isinstance(parsed, str) else parsed
+ if not text:
+ return False
+ return any(text.startswith(m) for m in markers)
+
+ with self._lock:
+ conn = self._connect()
+ try:
+ rows = conn.execute(
+ """
+ SELECT seq, role, content
+ FROM messages
+ WHERE session_id = ?
+ ORDER BY seq ASC
+ """,
+ (session_id,),
+ ).fetchall()
+
+ # Find scheduler pairs: each is (user_seq, assistant_seq?)
+ pairs: List[tuple] = [] # list of (user_seq, assistant_seq_or_None)
+ for idx, (seq, role, raw_content) in enumerate(rows):
+ if role != "user" or not _matches_marker(raw_content):
+ continue
+ assistant_seq = None
+ # Pair with the very next message if it's an assistant turn.
+ if idx + 1 < len(rows):
+ next_seq, next_role, _ = rows[idx + 1]
+ if next_role == "assistant":
+ assistant_seq = next_seq
+ pairs.append((seq, assistant_seq))
+
+ if len(pairs) <= keep_last_n:
+ return 0
+
+ to_delete_pairs = pairs[: len(pairs) - keep_last_n]
+ seqs_to_delete: List[int] = []
+ for user_seq, assistant_seq in to_delete_pairs:
+ seqs_to_delete.append(user_seq)
+ if assistant_seq is not None:
+ seqs_to_delete.append(assistant_seq)
+
+ if not seqs_to_delete:
+ return 0
+
+ placeholders = ",".join("?" * len(seqs_to_delete))
+ with conn:
+ conn.execute(
+ f"DELETE FROM messages WHERE session_id = ? AND seq IN ({placeholders})",
+ (session_id, *seqs_to_delete),
+ )
+ conn.execute(
+ """
+ UPDATE sessions
+ SET msg_count = (
+ SELECT COUNT(*) FROM messages WHERE session_id = ?
+ )
+ WHERE session_id = ?
+ """,
+ (session_id, session_id),
+ )
+ return len(seqs_to_delete)
+ finally:
+ conn.close()
+
def cleanup_old_sessions(self, max_age_days: Optional[int] = None) -> int:
"""
Delete sessions that have not been active within max_age_days.
diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py
index ac16a460..49d2fdfa 100644
--- a/agent/tools/scheduler/integration.py
+++ b/agent/tools/scheduler/integration.py
@@ -95,10 +95,24 @@ def _remember_delivered_output(
Uses notify_session_id (the real chat session_id stored at task creation time)
so that group chats correctly associate the output with the user's conversation.
Falls back to receiver for backward compatibility with old tasks.
+
+ Per-action-type behaviour:
+ - agent_task / tool_call / skill_call: gated by ``scheduler_inject_to_session``
+ (default True). These produce AI-generated content worth remembering.
+ - send_message: additionally gated by ``scheduler_inject_send_message``
+ (default False). Fixed reminder text rarely benefits follow-up Q&A and
+ would just consume context tokens.
"""
if not content:
return
action = task.get("action", {})
+ action_type = action.get("type", "")
+
+ # send_message defaults to NOT being injected; explicit opt-in via config.
+ if action_type == "send_message":
+ if not conf().get("scheduler_inject_send_message", False):
+ return
+
session_id = action.get("notify_session_id") or action.get("receiver")
if not session_id:
return
diff --git a/agent/tools/scheduler/scheduler_tool.py b/agent/tools/scheduler/scheduler_tool.py
index 885f4a7a..0f35b9ec 100644
--- a/agent/tools/scheduler/scheduler_tool.py
+++ b/agent/tools/scheduler/scheduler_tool.py
@@ -158,6 +158,11 @@ class SchedulerTool(BaseTool):
# Create task
task_id = str(uuid.uuid4())[:8]
+ # Capture the real chat session_id at task creation time so that scheduler
+ # can later inject the delivered output into the user's actual conversation
+ # (in group chats, session_id != receiver, e.g. "user_id:group_id" on feishu).
+ notify_session_id = context.get("session_id")
+
# Build action based on message or ai_task
if message:
action = {
@@ -166,7 +171,8 @@ class SchedulerTool(BaseTool):
"receiver": context.get("receiver"),
"receiver_name": self._get_receiver_name(context),
"is_group": context.get("isgroup", False),
- "channel_type": self.config.get("channel_type", "unknown")
+ "channel_type": self.config.get("channel_type", "unknown"),
+ "notify_session_id": notify_session_id,
}
else: # ai_task
action = {
@@ -175,7 +181,8 @@ class SchedulerTool(BaseTool):
"receiver": context.get("receiver"),
"receiver_name": self._get_receiver_name(context),
"is_group": context.get("isgroup", False),
- "channel_type": self.config.get("channel_type", "unknown")
+ "channel_type": self.config.get("channel_type", "unknown"),
+ "notify_session_id": notify_session_id,
}
# 针对钉钉单聊,额外存储 sender_staff_id
diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py
index f9266111..7a1abfa3 100644
--- a/bridge/agent_bridge.py
+++ b/bridge/agent_bridge.py
@@ -634,6 +634,13 @@ class AgentBridge:
f"[AgentBridge] Failed to persist messages for session={session_id}: {e}"
)
+ # Marker used to identify scheduler-injected user messages so we can apply
+ # a sliding window without touching real user turns. The legacy prefix
+ # "Scheduled task" (written by the v2 PR) is also recognised when pruning,
+ # so old data can be aged out instead of leaking forever.
+ _SCHEDULED_MARKER = "[SCHEDULED]"
+ _SCHEDULED_LEGACY_MARKERS = ("Scheduled task",)
+
def remember_scheduled_output(
self,
session_id: str,
@@ -648,41 +655,119 @@ class AgentBridge:
part of the conversation from the user's point of view, so keep a small
visible turn in the receiver session for follow-up questions.
- Controlled by config key `scheduler_inject_to_session` (default: True).
- Content is truncated to 2000 chars to prevent session bloat from high-frequency tasks.
+ Configuration:
+ scheduler_inject_to_session (bool, default True):
+ Master switch. When False, this method is a no-op.
+ scheduler_inject_max_per_session (int, default 3):
+ Maximum scheduler-injected user/assistant pairs retained per
+ session. Older injections are pruned automatically.
+
+ Content is truncated to 2000 chars to prevent a single high-volume task
+ from bloating one entry.
"""
+ from config import conf
if not conf().get("scheduler_inject_to_session", True):
return
if not session_id or not content:
return
- # Truncate to prevent high-frequency tasks from bloating the session
max_len = 2000
if len(content) > max_len:
content = content[:max_len] + "..."
- user_text = "Scheduled task"
+ user_text = self._SCHEDULED_MARKER
if task_description:
- user_text = f"{user_text}: {task_description}"
+ user_text = f"{self._SCHEDULED_MARKER} {task_description}"
messages = [
{"role": "user", "content": [{"type": "text", "text": user_text}]},
{"role": "assistant", "content": [{"type": "text", "text": content}]},
]
- # Update in-memory agent if it exists
+ # Persist first so the new pair gets a stable seq, then prune old
+ # scheduler pairs in DB, then sync the in-memory agent.messages buffer.
+ self._persist_messages(session_id, messages, channel_type)
+
+ keep_last_n = max(int(conf().get("scheduler_inject_max_per_session", 3) or 0), 0)
+ try:
+ from agent.memory import get_conversation_store
+ deleted = get_conversation_store().prune_scheduled_messages(
+ session_id, keep_last_n=keep_last_n
+ )
+ if deleted:
+ logger.debug(
+ f"[AgentBridge] Pruned {deleted} old scheduler messages "
+ f"for session={session_id} (keep_last_n={keep_last_n})"
+ )
+ except Exception as e:
+ logger.warning(
+ f"[AgentBridge] Failed to prune scheduled messages "
+ f"for session={session_id}: {e}"
+ )
+
agent = self.agents.get(session_id)
if agent:
try:
with agent.messages_lock:
agent.messages.extend(messages)
+ self._prune_scheduled_in_memory(agent, keep_last_n)
except Exception as e:
logger.warning(
f"[AgentBridge] Failed to update in-memory scheduled output "
f"for session={session_id}: {e}"
)
- self._persist_messages(session_id, messages, channel_type)
+ @classmethod
+ def _prune_scheduled_in_memory(cls, agent, keep_last_n: int) -> None:
+ """Mirror conversation_store.prune_scheduled_messages on agent.messages.
+
+ Caller must hold ``agent.messages_lock``.
+ """
+ if keep_last_n < 0:
+ keep_last_n = 0
+
+ markers = (cls._SCHEDULED_MARKER,) + cls._SCHEDULED_LEGACY_MARKERS
+
+ def _is_marker_user(msg) -> bool:
+ if not isinstance(msg, dict) or msg.get("role") != "user":
+ return False
+ content = msg.get("content")
+ text = ""
+ if isinstance(content, str):
+ text = content
+ elif isinstance(content, list):
+ for block in content:
+ if isinstance(block, dict) and block.get("type") == "text":
+ text = block.get("text", "")
+ break
+ return any(text.startswith(m) for m in markers)
+
+ msgs = agent.messages
+ pair_indices = [] # list of (user_idx, assistant_idx_or_None)
+ for idx, msg in enumerate(msgs):
+ if not _is_marker_user(msg):
+ continue
+ assistant_idx = None
+ if idx + 1 < len(msgs):
+ nxt = msgs[idx + 1]
+ if isinstance(nxt, dict) and nxt.get("role") == "assistant":
+ assistant_idx = idx + 1
+ pair_indices.append((idx, assistant_idx))
+
+ if len(pair_indices) <= keep_last_n:
+ return
+
+ to_drop = pair_indices[: len(pair_indices) - keep_last_n]
+ drop_set = set()
+ for u_idx, a_idx in to_drop:
+ drop_set.add(u_idx)
+ if a_idx is not None:
+ drop_set.add(a_idx)
+
+ # Rebuild the list in place to keep external references stable.
+ kept = [m for i, m in enumerate(msgs) if i not in drop_set]
+ msgs.clear()
+ msgs.extend(kept)
@staticmethod
def _strip_thinking_blocks(messages: list) -> list:
diff --git a/docs/tools/scheduler.mdx b/docs/tools/scheduler.mdx
index 91b1789e..7cd6221b 100644
--- a/docs/tools/scheduler.mdx
+++ b/docs/tools/scheduler.mdx
@@ -38,3 +38,33 @@ description: 创建和管理定时任务
+
+## 结果进入会话上下文
+
+定时任务在隔离 session 中执行(内部规划与 tool 调用不污染用户会话),但**最终输出**会作为一对消息回写到接收者的真实会话,用户可以直接追问"刚才那条第二点展开说说"。
+
+**默认策略**
+
+- Agent 动态任务的输出进入上下文
+- 固定消息类任务默认不进入上下文(可通过配置打开)
+- 每个会话最多保留最近 **3 对** scheduler 消息,更早的自动清理;普通用户消息不受影响
+
+**配置项**
+
+| 配置项 | 默认值 | 说明 |
+| --- | --- | --- |
+| `scheduler_inject_to_session` | `true` | 总开关 |
+| `scheduler_inject_max_per_session` | `3` | 每会话保留 scheduler 消息对数上限 |
+| `scheduler_inject_send_message` | `false` | 是否同时注入固定消息类任务 |
+
+```json
+{
+ "scheduler_inject_to_session": true,
+ "scheduler_inject_max_per_session": 3,
+ "scheduler_inject_send_message": false
+}
+```
+
+
+群聊场景(飞书 / 企微群机器人 / 钉钉等)下用户的真实 session_id 形如 `user_id:group_id`,与 receiver 不同。创建任务时会自动记录正确的 session_id;老的 `tasks.json` 缺该字段时回落到 receiver,行为与历史版本一致。
+