fix(scheduler): inject delivered output into receiver session with sliding window

Further refinements on top of #2795:

- persist real session_id (notify_session_id) at task creation so group chats
  correctly map back to the user's actual conversation
- mark scheduler turns with [SCHEDULED] (recognise legacy "Scheduled task"
  prefix too for backward-compatible pruning)
- prune both DB and in-memory to scheduler_inject_max_per_session (default 3),
  only marker-tagged pairs are touched; regular user turns never deleted
- send_message type gated by scheduler_inject_send_message (default false) —
  fixed reminder text rarely benefits follow-up Q&A

Co-authored-by: huangrichao2020 <grdomai43881@gmail.com>
This commit is contained in:
zhayujie
2026-05-03 21:27:24 +08:00
parent f300d2a2d5
commit aea081703f
5 changed files with 246 additions and 9 deletions

View File

@@ -499,6 +499,107 @@ class ConversationStore:
finally:
conn.close()
def prune_scheduled_messages(
self,
session_id: str,
keep_last_n: int,
markers: Optional[List[str]] = None,
) -> int:
"""
Keep at most ``keep_last_n`` scheduler-injected user/assistant pairs in
the session, deleting the older ones.
A scheduler-injected pair is identified by a user message whose first
text block starts with one of ``markers``; the immediately following
assistant message (next seq) is treated as its paired output.
Only scheduler-tagged messages are touched; regular user turns are
never deleted. Safe to call repeatedly; no-op if nothing to prune.
Args:
session_id: Session to prune.
keep_last_n: Maximum scheduler pairs to retain (must be >= 0).
markers: Text prefixes that identify scheduler user messages.
Defaults to ``["[SCHEDULED]", "Scheduled task"]`` so that
pairs written by older versions are also recognised.
Returns:
Number of message rows deleted.
"""
if keep_last_n < 0:
keep_last_n = 0
if markers is None:
markers = ["[SCHEDULED]", "Scheduled task"]
def _matches_marker(raw_content: str) -> bool:
try:
parsed = json.loads(raw_content)
except Exception:
parsed = raw_content
text = _extract_display_text(parsed) if not isinstance(parsed, str) else parsed
if not text:
return False
return any(text.startswith(m) for m in markers)
with self._lock:
conn = self._connect()
try:
rows = conn.execute(
"""
SELECT seq, role, content
FROM messages
WHERE session_id = ?
ORDER BY seq ASC
""",
(session_id,),
).fetchall()
# Find scheduler pairs: each is (user_seq, assistant_seq?)
pairs: List[tuple] = [] # list of (user_seq, assistant_seq_or_None)
for idx, (seq, role, raw_content) in enumerate(rows):
if role != "user" or not _matches_marker(raw_content):
continue
assistant_seq = None
# Pair with the very next message if it's an assistant turn.
if idx + 1 < len(rows):
next_seq, next_role, _ = rows[idx + 1]
if next_role == "assistant":
assistant_seq = next_seq
pairs.append((seq, assistant_seq))
if len(pairs) <= keep_last_n:
return 0
to_delete_pairs = pairs[: len(pairs) - keep_last_n]
seqs_to_delete: List[int] = []
for user_seq, assistant_seq in to_delete_pairs:
seqs_to_delete.append(user_seq)
if assistant_seq is not None:
seqs_to_delete.append(assistant_seq)
if not seqs_to_delete:
return 0
placeholders = ",".join("?" * len(seqs_to_delete))
with conn:
conn.execute(
f"DELETE FROM messages WHERE session_id = ? AND seq IN ({placeholders})",
(session_id, *seqs_to_delete),
)
conn.execute(
"""
UPDATE sessions
SET msg_count = (
SELECT COUNT(*) FROM messages WHERE session_id = ?
)
WHERE session_id = ?
""",
(session_id, session_id),
)
return len(seqs_to_delete)
finally:
conn.close()
def cleanup_old_sessions(self, max_age_days: Optional[int] = None) -> int:
"""
Delete sessions that have not been active within max_age_days.

View File

@@ -95,10 +95,24 @@ def _remember_delivered_output(
Uses notify_session_id (the real chat session_id stored at task creation time)
so that group chats correctly associate the output with the user's conversation.
Falls back to receiver for backward compatibility with old tasks.
Per-action-type behaviour:
- agent_task / tool_call / skill_call: gated by ``scheduler_inject_to_session``
(default True). These produce AI-generated content worth remembering.
- send_message: additionally gated by ``scheduler_inject_send_message``
(default False). Fixed reminder text rarely benefits follow-up Q&A and
would just consume context tokens.
"""
if not content:
return
action = task.get("action", {})
action_type = action.get("type", "")
# send_message defaults to NOT being injected; explicit opt-in via config.
if action_type == "send_message":
if not conf().get("scheduler_inject_send_message", False):
return
session_id = action.get("notify_session_id") or action.get("receiver")
if not session_id:
return

View File

@@ -158,6 +158,11 @@ class SchedulerTool(BaseTool):
# Create task
task_id = str(uuid.uuid4())[:8]
# Capture the real chat session_id at task creation time so that scheduler
# can later inject the delivered output into the user's actual conversation
# (in group chats, session_id != receiver, e.g. "user_id:group_id" on feishu).
notify_session_id = context.get("session_id")
# Build action based on message or ai_task
if message:
action = {
@@ -166,7 +171,8 @@ class SchedulerTool(BaseTool):
"receiver": context.get("receiver"),
"receiver_name": self._get_receiver_name(context),
"is_group": context.get("isgroup", False),
"channel_type": self.config.get("channel_type", "unknown")
"channel_type": self.config.get("channel_type", "unknown"),
"notify_session_id": notify_session_id,
}
else: # ai_task
action = {
@@ -175,7 +181,8 @@ class SchedulerTool(BaseTool):
"receiver": context.get("receiver"),
"receiver_name": self._get_receiver_name(context),
"is_group": context.get("isgroup", False),
"channel_type": self.config.get("channel_type", "unknown")
"channel_type": self.config.get("channel_type", "unknown"),
"notify_session_id": notify_session_id,
}
# 针对钉钉单聊,额外存储 sender_staff_id

View File

@@ -634,6 +634,13 @@ class AgentBridge:
f"[AgentBridge] Failed to persist messages for session={session_id}: {e}"
)
# Marker used to identify scheduler-injected user messages so we can apply
# a sliding window without touching real user turns. The legacy prefix
# "Scheduled task" (written by the v2 PR) is also recognised when pruning,
# so old data can be aged out instead of leaking forever.
_SCHEDULED_MARKER = "[SCHEDULED]"
_SCHEDULED_LEGACY_MARKERS = ("Scheduled task",)
def remember_scheduled_output(
self,
session_id: str,
@@ -648,41 +655,119 @@ class AgentBridge:
part of the conversation from the user's point of view, so keep a small
visible turn in the receiver session for follow-up questions.
Controlled by config key `scheduler_inject_to_session` (default: True).
Content is truncated to 2000 chars to prevent session bloat from high-frequency tasks.
Configuration:
scheduler_inject_to_session (bool, default True):
Master switch. When False, this method is a no-op.
scheduler_inject_max_per_session (int, default 3):
Maximum scheduler-injected user/assistant pairs retained per
session. Older injections are pruned automatically.
Content is truncated to 2000 chars to prevent a single high-volume task
from bloating one entry.
"""
from config import conf
if not conf().get("scheduler_inject_to_session", True):
return
if not session_id or not content:
return
# Truncate to prevent high-frequency tasks from bloating the session
max_len = 2000
if len(content) > max_len:
content = content[:max_len] + "..."
user_text = "Scheduled task"
user_text = self._SCHEDULED_MARKER
if task_description:
user_text = f"{user_text}: {task_description}"
user_text = f"{self._SCHEDULED_MARKER} {task_description}"
messages = [
{"role": "user", "content": [{"type": "text", "text": user_text}]},
{"role": "assistant", "content": [{"type": "text", "text": content}]},
]
# Update in-memory agent if it exists
# Persist first so the new pair gets a stable seq, then prune old
# scheduler pairs in DB, then sync the in-memory agent.messages buffer.
self._persist_messages(session_id, messages, channel_type)
keep_last_n = max(int(conf().get("scheduler_inject_max_per_session", 3) or 0), 0)
try:
from agent.memory import get_conversation_store
deleted = get_conversation_store().prune_scheduled_messages(
session_id, keep_last_n=keep_last_n
)
if deleted:
logger.debug(
f"[AgentBridge] Pruned {deleted} old scheduler messages "
f"for session={session_id} (keep_last_n={keep_last_n})"
)
except Exception as e:
logger.warning(
f"[AgentBridge] Failed to prune scheduled messages "
f"for session={session_id}: {e}"
)
agent = self.agents.get(session_id)
if agent:
try:
with agent.messages_lock:
agent.messages.extend(messages)
self._prune_scheduled_in_memory(agent, keep_last_n)
except Exception as e:
logger.warning(
f"[AgentBridge] Failed to update in-memory scheduled output "
f"for session={session_id}: {e}"
)
self._persist_messages(session_id, messages, channel_type)
@classmethod
def _prune_scheduled_in_memory(cls, agent, keep_last_n: int) -> None:
"""Mirror conversation_store.prune_scheduled_messages on agent.messages.
Caller must hold ``agent.messages_lock``.
"""
if keep_last_n < 0:
keep_last_n = 0
markers = (cls._SCHEDULED_MARKER,) + cls._SCHEDULED_LEGACY_MARKERS
def _is_marker_user(msg) -> bool:
if not isinstance(msg, dict) or msg.get("role") != "user":
return False
content = msg.get("content")
text = ""
if isinstance(content, str):
text = content
elif isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
break
return any(text.startswith(m) for m in markers)
msgs = agent.messages
pair_indices = [] # list of (user_idx, assistant_idx_or_None)
for idx, msg in enumerate(msgs):
if not _is_marker_user(msg):
continue
assistant_idx = None
if idx + 1 < len(msgs):
nxt = msgs[idx + 1]
if isinstance(nxt, dict) and nxt.get("role") == "assistant":
assistant_idx = idx + 1
pair_indices.append((idx, assistant_idx))
if len(pair_indices) <= keep_last_n:
return
to_drop = pair_indices[: len(pair_indices) - keep_last_n]
drop_set = set()
for u_idx, a_idx in to_drop:
drop_set.add(u_idx)
if a_idx is not None:
drop_set.add(a_idx)
# Rebuild the list in place to keep external references stable.
kept = [m for i, m in enumerate(msgs) if i not in drop_set]
msgs.clear()
msgs.extend(kept)
@staticmethod
def _strip_thinking_blocks(messages: list) -> list:

View File

@@ -38,3 +38,33 @@ description: 创建和管理定时任务
<Frame>
<img src="https://cdn.link-ai.tech/doc/20260202195402.png" width="800" />
</Frame>
## 结果进入会话上下文
定时任务在隔离 session 中执行(内部规划与 tool 调用不污染用户会话),但**最终输出**会作为一对消息回写到接收者的真实会话,用户可以直接追问"刚才那条第二点展开说说"。
**默认策略**
- Agent 动态任务的输出进入上下文
- 固定消息类任务默认不进入上下文(可通过配置打开)
- 每个会话最多保留最近 **3 对** scheduler 消息,更早的自动清理;普通用户消息不受影响
**配置项**
| 配置项 | 默认值 | 说明 |
| --- | --- | --- |
| `scheduler_inject_to_session` | `true` | 总开关 |
| `scheduler_inject_max_per_session` | `3` | 每会话保留 scheduler 消息对数上限 |
| `scheduler_inject_send_message` | `false` | 是否同时注入固定消息类任务 |
```json
{
"scheduler_inject_to_session": true,
"scheduler_inject_max_per_session": 3,
"scheduler_inject_send_message": false
}
```
<Note>
群聊场景(飞书 / 企微群机器人 / 钉钉等)下用户的真实 session_id 形如 `user_id:group_id`,与 receiver 不同。创建任务时会自动记录正确的 session_id老的 `tasks.json` 缺该字段时回落到 receiver行为与历史版本一致。
</Note>