Files
chatgpt-on-wechat/agent/protocol/agent_stream.py

1719 lines
79 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Agent Stream Execution Module - Multi-turn reasoning based on tool-call
Provides streaming output, event system, and complete tool-call loop
"""
import json
import time
from typing import List, Dict, Any, Optional, Callable, Tuple
from agent.protocol.cancel import AgentCancelledError
from agent.protocol.models import LLMRequest, LLMModel
from agent.protocol.message_utils import sanitize_claude_messages, compress_turn_to_text_only
from agent.tools.base_tool import BaseTool, ToolResult
from common.log import logger
from common.i18n import t as _t
# Optional: repair malformed JSON args from non-strict providers (e.g. unescaped quotes in long content).
try:
from json_repair import repair_json as _repair_json
_HAS_JSON_REPAIR = True
except ImportError:
_HAS_JSON_REPAIR = False
# Maximum number of characters of model "reasoning / thinking" content to persist
# in conversation history. The full reasoning is still streamed to the UI in real
# time (subject to its own SSE / rendering limits); this bound only controls what
# is stored in DB and replayed in history. Long reasoning is not useful for later
# context (the LLM never sees thinking blocks anyway) and bloats DB.
# Keep aligned with the frontend REASONING_RENDER_CAP and the SSE
# MAX_REASONING_STREAM_CHARS so that storage / stream / display all match.
MAX_STORED_REASONING_CHARS = 4 * 1024 # 4 KB
# Marker inserted between head and tail when reasoning is truncated.
_REASONING_TRUNCATE_MARKER = "\n\n... [reasoning truncated, {omitted} chars omitted] ...\n\n"
def _truncate_reasoning_for_storage(text: str) -> str:
"""Trim long reasoning to head + tail with an omission marker.
Keeps the first and last halves of MAX_STORED_REASONING_CHARS so both the
initial chain-of-thought and the final conclusions are preserved for UI
replay, without storing the entire (often very large) middle.
"""
if not text:
return text
if len(text) <= MAX_STORED_REASONING_CHARS:
return text
half = MAX_STORED_REASONING_CHARS // 2
head = text[:half]
tail = text[-half:]
omitted = len(text) - len(head) - len(tail)
return head + _REASONING_TRUNCATE_MARKER.format(omitted=omitted) + tail
def _parse_tool_args(args_str: str, finish_reason: Optional[str]) -> Tuple[dict, Optional[str]]:
"""Parse tool args JSON. Returns (args, error_msg); error_msg is None on success.
On JSONDecodeError: detect truncation first (skip repair, surface max_tokens hint);
otherwise try json-repair for escape issues; finally fall back to the raw decoder error.
"""
if not args_str:
return {}, None
try:
return json.loads(args_str), None
except json.JSONDecodeError as e:
if finish_reason in ("length", "max_tokens") or not args_str.rstrip().endswith("}"):
return {}, "Output truncated (max_tokens reached). Split content into smaller chunks across multiple tool calls."
if _HAS_JSON_REPAIR:
try:
repaired = _repair_json(args_str, return_objects=True)
if isinstance(repaired, dict):
logger.warning(f"Tool args JSON repaired ({len(args_str)} chars)")
return repaired, None
except Exception:
pass
return {}, f"Invalid JSON in tool arguments: {e.msg}"
class AgentStreamExecutor:
"""
Agent Stream Executor
Handles multi-turn reasoning loop based on tool-call:
1. LLM generates response (may include tool calls)
2. Execute tools
3. Return results to LLM
4. Repeat until no more tool calls
"""
def __init__(
self,
agent, # Agent instance
model: LLMModel,
system_prompt: str,
tools: List[BaseTool],
max_turns: int = 50,
on_event: Optional[Callable] = None,
messages: Optional[List[Dict]] = None,
max_context_turns: int = 30,
cancel_event=None,
):
"""
Initialize stream executor
Args:
agent: Agent instance (for accessing context)
model: LLM model
system_prompt: System prompt
tools: List of available tools
max_turns: Maximum number of turns
on_event: Event callback function
messages: Optional existing message history (for persistent conversations)
max_context_turns: Maximum number of conversation turns to keep in context
cancel_event: Optional threading.Event used to signal user cancel.
Checked at every safe point (turn boundary, before tool execution,
during LLM streaming). When set, raises AgentCancelledError which
run_stream catches to gracefully wind down.
"""
self.agent = agent
self.model = model
self.system_prompt = system_prompt
# Convert tools list to dict
self.tools = {tool.name: tool for tool in tools} if isinstance(tools, list) else tools
self.max_turns = max_turns
self.on_event = on_event
self.max_context_turns = max_context_turns
self.cancel_event = cancel_event
# Message history - use provided messages or create new list
self.messages = messages if messages is not None else []
# Tool failure tracking for retry protection
self.tool_failure_history = [] # List of (tool_name, args_hash, success) tuples
# Track files to send (populated by read tool)
self.files_to_send = [] # List of file metadata dicts
def _check_cancelled(self) -> None:
"""Raise AgentCancelledError if the user requested cancellation.
Called at safe points (turn start, between tool calls, between LLM
chunks). Cheap to call: just an Event.is_set() probe.
"""
if self.cancel_event is not None and self.cancel_event.is_set():
raise AgentCancelledError("agent cancelled by user")
def _handle_cancelled(self, partial_response: str) -> None:
"""Wind down ``self.messages`` after a user-initiated cancel.
The messages list may be in any of these states when we get here:
(a) Last message is an assistant message containing tool_use
blocks but the matching tool_result has not been appended yet.
(b) Last message is an assistant text-only reply (cancel happened
right before the next turn started).
(c) Last message is a user tool_result message and we cancelled
between turns.
For (a) we MUST synthesise tool_result blocks, otherwise the next
request will fail Claude/OpenAI's strict pairing validation. For
(b)/(c) the state is already valid and we just append a small
cancellation note so the user/LLM both see the boundary clearly.
"""
try:
# Step 1: close any orphaned tool_use in the trailing assistant
# message by injecting matching tool_result blocks.
if self.messages and isinstance(self.messages[-1], dict) \
and self.messages[-1].get("role") == "assistant":
last = self.messages[-1]
content = last.get("content")
if isinstance(content, list):
pending_tool_use_ids = [
block.get("id")
for block in content
if isinstance(block, dict) and block.get("type") == "tool_use"
]
pending_tool_use_ids = [tid for tid in pending_tool_use_ids if tid]
if pending_tool_use_ids:
tool_result_blocks = [
{
"type": "tool_result",
"tool_use_id": tid,
"content": "Cancelled by user before this tool finished.",
"is_error": True,
}
for tid in pending_tool_use_ids
]
self.messages.append({
"role": "user",
"content": tool_result_blocks,
})
logger.info(
f"[Agent] Injected {len(tool_result_blocks)} cancellation "
f"tool_result blocks to keep message history valid"
)
# Step 2: append a stable "interrupted" marker so the LLM sees a
# clear stop boundary on the next turn.
self.messages.append({
"role": "assistant",
"content": [{"type": "text", "text": "_(Cancelled by user)_"}],
})
except Exception as e:
logger.warning(f"[Agent] _handle_cancelled cleanup failed: {e}")
def _emit_event(self, event_type: str, data: dict = None):
"""Emit event"""
if self.on_event:
try:
self.on_event({
"type": event_type,
"timestamp": time.time(),
"data": data or {}
})
except Exception as e:
logger.error(f"Event callback error: {e}")
def _is_thinking_enabled(self) -> bool:
"""Whether deep-thinking mode is on at the model layer.
Mirrors the global toggle used by ``bridge.agent_bridge`` when deciding
whether to send ``thinking={"type": "enabled"}`` to the model. Used for
logging and reasoning-update event emission across all channels.
"""
from config import conf
return bool(conf().get("enable_thinking", False))
def _should_render_thinking_inline(self) -> bool:
"""Whether ``<think>...</think>`` blocks embedded directly in ``content``
(MiniMax, some third-party proxies) should be surfaced to the channel.
Only the Web console can render them in a collapsible panel. IM channels
(WeChat/WeCom/DingTalk/Feishu) must strip them, otherwise users see raw
XML tags in their chat.
"""
from config import conf
channel_type = getattr(self.model, 'channel_type', '') or ''
return conf().get("enable_thinking", False) and channel_type == 'web'
def _filter_think_tags(self, text: str) -> str:
"""
Handle <think>...</think> blocks in content returned by some LLM providers
(e.g., MiniMax).
- When inline thinking rendering is allowed (Web + thinking enabled):
remove only the tags, keep the content inside.
- Otherwise (IM channels, or thinking disabled globally): remove both
the tags and the content entirely.
"""
if not text:
return text
import re
if self._should_render_thinking_inline():
text = re.sub(r'<think>', '', text)
text = re.sub(r'</think>', '', text)
else:
text = re.sub(r'<think>[\s\S]*?</think>', '', text)
# Also strip unclosed <think> tag at the end (streaming partial)
text = re.sub(r'<think>[\s\S]*$', '', text)
return text
def _hash_args(self, args: dict) -> str:
"""Generate a simple hash for tool arguments"""
import hashlib
# Sort keys for consistent hashing
args_str = json.dumps(args, sort_keys=True, ensure_ascii=False)
return hashlib.md5(args_str.encode()).hexdigest()[:8]
def _check_consecutive_failures(self, tool_name: str, args: dict) -> Tuple[bool, str, bool]:
"""
Check if tool has failed too many times consecutively or called repeatedly with same args
Returns:
(should_stop, reason, is_critical)
- should_stop: Whether to stop tool execution
- reason: Reason for stopping
- is_critical: Whether to abort entire conversation (True for 8+ failures)
"""
args_hash = self._hash_args(args)
# Count consecutive calls (both success and failure) for same tool + args
# This catches infinite loops where tool succeeds but LLM keeps calling it
same_args_calls = 0
for name, ahash, success in reversed(self.tool_failure_history):
if name == tool_name and ahash == args_hash:
same_args_calls += 1
else:
break # Different tool or args, stop counting
# Stop at 5 consecutive calls with same args (whether success or failure)
if same_args_calls >= 5:
return True, f"工具 '{tool_name}' 使用相同参数已被调用 {same_args_calls} 次,停止执行以防止无限循环。如果需要查看配置,结果已在之前的调用中返回。", False
# Count consecutive failures for same tool + args
same_args_failures = 0
for name, ahash, success in reversed(self.tool_failure_history):
if name == tool_name and ahash == args_hash:
if not success:
same_args_failures += 1
else:
break # Stop at first success
else:
break # Different tool or args, stop counting
if same_args_failures >= 3:
return True, f"工具 '{tool_name}' 使用相同参数连续失败 {same_args_failures} 次,停止执行以防止无限循环", False
# Count consecutive failures for same tool (any args)
same_tool_failures = 0
for name, ahash, success in reversed(self.tool_failure_history):
if name == tool_name:
if not success:
same_tool_failures += 1
else:
break # Stop at first success
else:
break # Different tool, stop counting
# Hard stop at 8 failures - abort with critical message
if same_tool_failures >= 8:
return True, _t(
"抱歉,我没能完成这个任务。可能是我理解有误或者当前方法不太合适。\n\n建议你:\n• 换个方式描述需求试试\n• 把任务拆分成更小的步骤\n• 或者换个思路来解决",
"Sorry, I couldn't complete this task. I may have misunderstood, or my current approach isn't quite right.\n\nYou could try:\n• Rephrasing your request\n• Breaking the task into smaller steps\n• Taking a different approach",
), True
# Warning at 6 failures
if same_tool_failures >= 6:
return True, f"工具 '{tool_name}' 连续失败 {same_tool_failures} 次(使用不同参数),停止执行以防止无限循环", False
return False, "", False
def _record_tool_result(self, tool_name: str, args: dict, success: bool):
"""Record tool execution result for failure tracking"""
args_hash = self._hash_args(args)
self.tool_failure_history.append((tool_name, args_hash, success))
# Keep only last 50 records to avoid memory bloat
if len(self.tool_failure_history) > 50:
self.tool_failure_history = self.tool_failure_history[-50:]
def run_stream(self, user_message: str) -> str:
"""
Execute streaming reasoning loop
Args:
user_message: User message
Returns:
Final response text
"""
# Log user message with model info
thinking_enabled = self._is_thinking_enabled()
thinking_label = " | 💭 thinking" if thinking_enabled else ""
logger.info(f"🤖 {self.model.model}{thinking_label} | 👤 {user_message}")
# Add user message (Claude format - use content blocks for consistency)
self.messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": user_message
}
]
})
# Trim context ONCE before the agent loop starts, not during tool steps.
# This ensures tool_use/tool_result chains created during the current run
# are never stripped mid-execution (which would cause LLM loops).
self._trim_messages()
# Validate after trimming: trimming may leave orphaned tool_use at the
# boundary (e.g. the last kept turn ends with an assistant tool_use whose
# tool_result was in a discarded turn).
self._validate_and_fix_messages()
self._emit_event("agent_start")
final_response = ""
turn = 0
cancelled = False
try:
while turn < self.max_turns:
# Check at the very top of every turn so a cancel arriving
# between turns short-circuits cleanly.
self._check_cancelled()
turn += 1
logger.info(f"[Agent] Turn {turn}")
self._emit_event("turn_start", {"turn": turn})
# Call LLM (enable retry_on_empty for better reliability)
assistant_msg, tool_calls = self._call_llm_stream(retry_on_empty=True)
final_response = assistant_msg
# No tool calls, end loop
if not tool_calls:
# 检查是否返回了空响应
if not assistant_msg:
logger.warning(f"[Agent] LLM returned empty response after retry (no content and no tool calls)")
logger.info(f"[Agent] This usually happens when LLM thinks the task is complete after tool execution")
# 如果之前有工具调用,强制要求 LLM 生成文本回复
if turn > 1:
logger.info(f"[Agent] Requesting explicit response from LLM...")
# Remember position so we can remove the injected prompt later
prompt_insert_idx = len(self.messages)
# 添加一条消息,明确要求回复用户
self.messages.append({
"role": "user",
"content": [{
"type": "text",
"text": "请向用户说明刚才工具执行的结果或回答用户的问题。"
}]
})
# 再调用一次 LLM
assistant_msg, tool_calls = self._call_llm_stream(retry_on_empty=False)
final_response = assistant_msg
# Remove the injected prompt from history so it doesn't
# appear as a user message in persisted conversations.
# _call_llm_stream may have appended an assistant message
# after the prompt, so we locate and remove only the prompt.
if (prompt_insert_idx < len(self.messages)
and self.messages[prompt_insert_idx].get("role") == "user"):
self.messages.pop(prompt_insert_idx)
logger.debug("[Agent] Removed injected explicit-response prompt from message history")
# If LLM responded with tool_calls instead of text, fall through
# to the tool execution path below (don't break the loop).
if tool_calls:
logger.info(
f"[Agent] LLM returned tool_calls in explicit-response retry, "
f"continuing to execute tools instead of breaking"
)
elif not assistant_msg:
# Still empty (no text and no tool_calls): use fallback
logger.warning(f"[Agent] Still empty after explicit request")
final_response = _t(
"抱歉,我暂时无法生成回复。请尝试换一种方式描述你的需求,或稍后再试。",
"Sorry, I can't generate a reply right now. Please try rephrasing your request, or try again later.",
)
logger.info(f"Generated fallback response for empty LLM output")
else:
# First-turn empty reply, fall back directly
final_response = _t(
"抱歉,我暂时无法生成回复。请尝试换一种方式描述你的需求,或稍后再试。",
"Sorry, I can't generate a reply right now. Please try rephrasing your request, or try again later.",
)
logger.info(f"Generated fallback response for empty LLM output")
else:
logger.info(f"💭 {assistant_msg[:150]}{'...' if len(assistant_msg) > 150 else ''}")
# If the explicit-response retry produced tool_calls, skip the break
# and continue down to the tool execution branch in this same iteration.
if not tool_calls:
logger.debug(f"✅ Done (no tool calls)")
self._emit_event("turn_end", {
"turn": turn,
"has_tool_calls": False
})
break
# Log tool calls with arguments (truncate long values like base64)
tool_calls_str = []
for tc in tool_calls:
args = tc.get('arguments') or {}
if isinstance(args, dict):
parts = []
for k, v in args.items():
v_str = str(v)
if len(v_str) > 200:
v_str = v_str[:200] + f"...({len(v_str)} chars)"
parts.append(f"{k}={v_str}")
args_str = ', '.join(parts)
if args_str:
tool_calls_str.append(f"{tc['name']}({args_str})")
else:
tool_calls_str.append(tc['name'])
else:
tool_calls_str.append(tc['name'])
logger.info(f"🔧 {', '.join(tool_calls_str)}")
# Execute tools
tool_results = []
tool_result_blocks = []
try:
for tool_call in tool_calls:
# Honour cancel between tool invocations within the same turn
self._check_cancelled()
result = self._execute_tool(tool_call)
tool_results.append(result)
# Debug: Check if tool is being called repeatedly with same args
if turn > 2:
# Check last N tool calls for repeats
repeat_count = sum(
1 for name, ahash, _ in self.tool_failure_history[-10:]
if name == tool_call["name"] and ahash == self._hash_args(tool_call["arguments"])
)
if repeat_count >= 3:
logger.warning(
f"⚠️ Tool '{tool_call['name']}' has been called {repeat_count} times "
f"with same arguments. This may indicate a loop."
)
# Check if this is a file to send
if result.get("status") == "success" and isinstance(result.get("result"), dict):
result_data = result.get("result")
if result_data.get("type") == "file_to_send":
self.files_to_send.append(result_data)
logger.info(f"📎 File queued for sending: {result_data.get('file_name', result_data.get('path'))}")
self._emit_event("file_to_send", result_data)
# Check for critical error - abort entire conversation
if result.get("status") == "critical_error":
logger.error(f"💥 Fatal error detected, aborting conversation")
final_response = result.get('result') or _t("任务执行失败", "Task execution failed")
return final_response
# Log tool result in compact format
status_emoji = "" if result.get("status") == "success" else ""
result_data = result.get('result', '')
# Format result string with proper Chinese character support
if isinstance(result_data, (dict, list)):
result_str = json.dumps(result_data, ensure_ascii=False)
else:
result_str = str(result_data)
logger.info(f" {status_emoji} {tool_call['name']} ({result.get('execution_time', 0):.2f}s): {result_str[:200]}{'...' if len(result_str) > 200 else ''}")
# Build tool result block (Claude format)
# Format content in a way that's easy for LLM to understand
is_error = result.get("status") == "error"
if is_error:
# For errors, provide clear error message
result_content = f"Error: {result.get('result', 'Unknown error')}"
elif isinstance(result.get('result'), dict):
# For dict results, use JSON format
result_content = json.dumps(result.get('result'), ensure_ascii=False)
elif isinstance(result.get('result'), str):
# For string results, use directly
result_content = result.get('result')
else:
# Fallback to full JSON
result_content = json.dumps(result, ensure_ascii=False)
# Truncate excessively large tool results for the current turn
# Historical turns will be further truncated in _trim_messages()
MAX_CURRENT_TURN_RESULT_CHARS = 50000
if len(result_content) > MAX_CURRENT_TURN_RESULT_CHARS:
truncated_len = len(result_content)
result_content = result_content[:MAX_CURRENT_TURN_RESULT_CHARS] + \
f"\n\n[Output truncated: {truncated_len} chars total, showing first {MAX_CURRENT_TURN_RESULT_CHARS} chars]"
logger.info(f"📎 Truncated tool result for '{tool_call['name']}': {truncated_len} -> {MAX_CURRENT_TURN_RESULT_CHARS} chars")
tool_result_block = {
"type": "tool_result",
"tool_use_id": tool_call["id"],
"content": result_content
}
# Add is_error field for Claude API (helps model understand failures)
if is_error:
tool_result_block["is_error"] = True
tool_result_blocks.append(tool_result_block)
finally:
# CRITICAL: Always add tool_result to maintain message history integrity
# Even if tool execution fails, we must add error results to match tool_use
if tool_result_blocks:
# Add tool results to message history as user message (Claude format)
self.messages.append({
"role": "user",
"content": tool_result_blocks
})
# Detect potential infinite loop: same tool called multiple times with success
# If detected, add a hint to LLM to stop calling tools and provide response
if turn >= 3 and len(tool_calls) > 0:
tool_name = tool_calls[0]["name"]
args_hash = self._hash_args(tool_calls[0]["arguments"])
# Count recent successful calls with same tool+args
recent_success_count = 0
for name, ahash, success in reversed(self.tool_failure_history[-10:]):
if name == tool_name and ahash == args_hash and success:
recent_success_count += 1
# If tool was called successfully 3+ times with same args, add hint to stop loop
if recent_success_count >= 3:
logger.warning(
f"⚠️ Detected potential loop: '{tool_name}' called {recent_success_count} times "
f"with same args. Adding hint to LLM to provide final response."
)
# Add a gentle hint message to guide LLM to respond
self.messages.append({
"role": "user",
"content": [{
"type": "text",
"text": "工具已成功执行并返回结果。请基于这些信息向用户做出回复,不要重复调用相同的工具。"
}]
})
elif tool_calls:
# If we have tool_calls but no tool_result_blocks (unexpected error),
# create error results for all tool calls to maintain message integrity
logger.warning("⚠️ Tool execution interrupted, adding error results to maintain message history")
emergency_blocks = []
for tool_call in tool_calls:
emergency_blocks.append({
"type": "tool_result",
"tool_use_id": tool_call["id"],
"content": "Error: Tool execution was interrupted",
"is_error": True
})
self.messages.append({
"role": "user",
"content": emergency_blocks
})
self._emit_event("turn_end", {
"turn": turn,
"has_tool_calls": True,
"tool_count": len(tool_calls)
})
if turn >= self.max_turns:
logger.warning(f"⚠️ Reached max decision step limit: {self.max_turns}")
# Force model to summarize without tool calls
logger.info(f"[Agent] Requesting summary from LLM after reaching max steps...")
# Remember position before injecting the prompt so we can remove it later
prompt_insert_idx = len(self.messages)
# Add a temporary prompt to force summary
self.messages.append({
"role": "user",
"content": [{
"type": "text",
"text": f"你已经执行了{turn}个决策步骤,达到了单次运行的最大步数限制。请总结一下你目前的执行过程和结果,告诉用户当前的进展情况。不要再调用工具,直接用文字回复。"
}]
})
# Call LLM one more time to get summary (without retry to avoid loops)
try:
summary_response, summary_tools = self._call_llm_stream(retry_on_empty=False)
if summary_response:
final_response = summary_response
logger.info(f"💭 Summary: {summary_response[:150]}{'...' if len(summary_response) > 150 else ''}")
else:
# Fallback if model still doesn't respond
final_response = _t(
f"我已经执行了{turn}个决策步骤,达到了单次运行的步数上限。任务可能还未完全完成,建议你将任务拆分成更小的步骤,或者换一种方式描述需求。",
f"I've taken {turn} decision steps and reached the per-run limit. The task may not be fully complete — try breaking it into smaller steps, or describe your request differently.",
)
except Exception as e:
logger.warning(f"Failed to get summary from LLM: {e}")
final_response = _t(
f"我已经执行了{turn}个决策步骤,达到了单次运行的步数上限。任务可能还未完全完成,建议你将任务拆分成更小的步骤,或者换一种方式描述需求。",
f"I've taken {turn} decision steps and reached the per-run limit. The task may not be fully complete — try breaking it into smaller steps, or describe your request differently.",
)
finally:
# Remove the injected user prompt from history to avoid polluting
# persisted conversation records. The assistant summary (if any)
# was already appended by _call_llm_stream and is kept.
if (prompt_insert_idx < len(self.messages)
and self.messages[prompt_insert_idx].get("role") == "user"):
self.messages.pop(prompt_insert_idx)
logger.debug("[Agent] Removed injected max-steps prompt from message history")
except AgentCancelledError:
# User-initiated stop: wind down message history cleanly so the
# next turn is unaffected; channels emit a "cancelled" UI event.
cancelled = True
logger.info(f"[Agent] 🛑 Cancelled by user (turn {turn})")
self._handle_cancelled(final_response)
if not final_response or not final_response.strip():
final_response = "_(Cancelled)_"
except Exception as e:
logger.error(f"❌ Agent execution error: {e}")
self._emit_event("error", {"error": str(e)})
raise
finally:
final_response = final_response.strip() if final_response else final_response
if cancelled:
# Emit before agent_end so channels can mark UI as cancelled
self._emit_event("agent_cancelled", {"final_response": final_response})
logger.info(f"[Agent] 🏁 Done ({turn} turns)" + (" [cancelled]" if cancelled else ""))
self._emit_event("agent_end", {"final_response": final_response, "cancelled": cancelled})
return final_response
def _call_llm_stream(self, retry_on_empty=True, retry_count=0, max_retries=3,
_overflow_retry: bool = False) -> Tuple[str, List[Dict]]:
"""
Call LLM with streaming and automatic retry on errors
Args:
retry_on_empty: Whether to retry once if empty response is received
retry_count: Current retry attempt (internal use)
max_retries: Maximum number of retries for API errors
_overflow_retry: Internal flag indicating this is a retry after context overflow
Returns:
(response_text, tool_calls)
"""
# Validate and fix message history (e.g. orphaned tool_result blocks).
# Context trimming is done once in run_stream() before the loop starts,
# NOT here — trimming mid-execution would strip the current run's
# tool_use/tool_result chains and cause LLM loops.
self._validate_and_fix_messages()
# Prepare messages
messages = self._prepare_messages()
turns = self._identify_complete_turns()
logger.info(f"Sending {len(messages)} messages ({len(turns)} turns) to LLM")
# Pull in any MCP tools that finished loading since this turn started.
# Cheap dict reconciliation (microseconds) — lets the agent pick up
# newly available MCP tools mid-conversation without a session restart.
try:
from agent.tools import ToolManager
ToolManager().sync_mcp_into_agent(self)
except Exception as e:
logger.debug(f"[Agent] MCP sync skipped: {e}")
# Prepare tool definitions. Prefer get_json_schema() when it yields
# real properties (lets tools augment schema at runtime), otherwise
# fall back to the static `tool.params` (MCP tools rely on this).
tools_schema = None
if self.tools:
tools_schema = []
for tool in self.tools.values():
input_schema = tool.params
try:
dynamic = (tool.get_json_schema() or {}).get("parameters") or {}
if dynamic.get("properties"):
input_schema = dynamic
except Exception:
pass
tools_schema.append({
"name": tool.name,
"description": tool.description,
"input_schema": input_schema,
})
# Debug: dump the full system prompt and messages sent to the LLM.
# Gated behind `debug` config to avoid flooding normal logs.
# try:
# from config import conf
# if conf().get("debug", False):
# logger.debug(
# "[Agent][debug] system_prompt sent to LLM "
# f"({len(self.system_prompt or '')} chars):\n"
# "================ SYSTEM PROMPT BEGIN ================\n"
# f"{self.system_prompt}\n"
# "================ SYSTEM PROMPT END =================="
# )
# logger.info(f"[Agent][debug] messages sent to LLM: {messages}")
# except Exception:
# pass
# Create request
request = LLMRequest(
messages=messages,
temperature=0,
stream=True,
tools=tools_schema,
system=self.system_prompt # Pass system prompt separately for Claude API
)
self._emit_event("message_start", {"role": "assistant"})
# Streaming response
full_content = ""
full_reasoning = ""
tool_calls_buffer = {} # {index: {id, name, arguments}}
gemini_raw_parts = None # Preserve Gemini thoughtSignature for round-trip
stop_reason = None # Track why the stream stopped
try:
stream = self.model.call_stream(request)
# Probe cancel every N chunks to bound reaction time without
# checking on every token.
_cancel_probe_counter = 0
_CANCEL_PROBE_EVERY = 8
for chunk in stream:
_cancel_probe_counter += 1
if _cancel_probe_counter >= _CANCEL_PROBE_EVERY:
_cancel_probe_counter = 0
if self.cancel_event is not None and self.cancel_event.is_set():
# Persist partial text only; tool_use args may be
# truncated mid-stream and would fail validation.
logger.info("[Agent] cancel detected mid-stream, aborting LLM call")
if full_content:
partial_msg = {
"role": "assistant",
"content": [{"type": "text", "text": full_content}],
}
self.messages.append(partial_msg)
self._emit_event("message_end", {
"content": full_content,
"tool_calls": [],
"cancelled": True,
})
raise AgentCancelledError("cancelled during LLM streaming")
# Check for errors
if isinstance(chunk, dict) and chunk.get("error"):
# Extract error message from nested structure
error_data = chunk.get("error", {})
if isinstance(error_data, dict):
error_msg = error_data.get("message", chunk.get("message", "Unknown error"))
error_code = error_data.get("code", "")
error_type = error_data.get("type", "")
else:
error_msg = chunk.get("message", str(error_data))
error_code = ""
error_type = ""
status_code = chunk.get("status_code", "N/A")
# Log error with all available information
logger.error(f"🔴 Stream API Error:")
logger.error(f" Message: {error_msg}")
logger.error(f" Status Code: {status_code}")
logger.error(f" Error Code: {error_code}")
logger.error(f" Error Type: {error_type}")
logger.error(f" Full chunk: {chunk}")
# Check if this is a context overflow error (keyword-based, works for all models)
# Don't rely on specific status codes as different providers use different codes
error_msg_lower = error_msg.lower()
is_overflow = any(keyword in error_msg_lower for keyword in [
'context length exceeded', 'maximum context length', 'prompt is too long',
'context overflow', 'context window', 'too large', 'exceeds model context',
'request_too_large', 'request exceeds the maximum size', 'tokens exceed'
])
if is_overflow:
# Mark as context overflow for special handling
raise Exception(f"[CONTEXT_OVERFLOW] {error_msg} (Status: {status_code})")
else:
# Raise exception with full error message for retry logic
raise Exception(f"{error_msg} (Status: {status_code}, Code: {error_code}, Type: {error_type})")
# Parse chunk
if isinstance(chunk, dict) and chunk.get("choices"):
choice = chunk["choices"][0]
delta = choice.get("delta", {})
# Capture finish_reason if present
finish_reason = choice.get("finish_reason")
if finish_reason:
stop_reason = finish_reason
reasoning_delta = delta.get("reasoning_content") or ""
if reasoning_delta:
full_reasoning += reasoning_delta
if self._is_thinking_enabled():
self._emit_event("reasoning_update", {"delta": reasoning_delta})
# Handle text content
content_delta = delta.get("content") or ""
if content_delta:
# Filter out <think> tags from content
filtered_delta = self._filter_think_tags(content_delta)
full_content += filtered_delta
if filtered_delta: # Only emit if there's content after filtering
self._emit_event("message_update", {"delta": filtered_delta})
# Handle tool calls
if "tool_calls" in delta and delta["tool_calls"]:
for tc_delta in delta["tool_calls"]:
index = tc_delta.get("index", 0)
if index not in tool_calls_buffer:
tool_calls_buffer[index] = {
"id": "",
"name": "",
"arguments": ""
}
if tc_delta.get("id"):
tool_calls_buffer[index]["id"] = tc_delta["id"]
if "function" in tc_delta:
func = tc_delta["function"]
if func.get("name"):
tool_calls_buffer[index]["name"] = func["name"]
if func.get("arguments"):
tool_calls_buffer[index]["arguments"] += func["arguments"]
# Preserve _gemini_raw_parts for Gemini thoughtSignature round-trip
# (direct Gemini: list of parts; LinkAI proxy: base64 string of JSON parts)
if "_gemini_raw_parts" in delta:
gemini_raw_parts = delta["_gemini_raw_parts"]
elif isinstance(choice, dict) and choice.get("_gemini_raw_parts"):
gemini_raw_parts = choice["_gemini_raw_parts"]
except AgentCancelledError:
# Must propagate untouched; never treat as a retryable error.
raise
except Exception as e:
error_str = str(e)
error_str_lower = error_str.lower()
# Check if error is context overflow (non-retryable, needs session reset)
# Method 1: Check for special marker (set in stream error handling above)
is_context_overflow = '[context_overflow]' in error_str_lower
# Method 2: Fallback to keyword matching for non-stream errors
if not is_context_overflow:
is_context_overflow = any(keyword in error_str_lower for keyword in [
'context length exceeded', 'maximum context length', 'prompt is too long',
'context overflow', 'context window', 'too large', 'exceeds model context',
'request_too_large', 'request exceeds the maximum size'
])
# Check if error is message format error (incomplete tool_use/tool_result pairs)
# This happens when previous conversation had tool failures or context trimming
# broke tool_use/tool_result pairs.
# Note: MiniMax returns error 2013 "tool result's tool id(...) not found" for
# tool_call_id mismatches — the keywords below are intentionally broad to catch
# both standard (Claude/OpenAI) and provider-specific (MiniMax) variants.
is_message_format_error = any(keyword in error_str_lower for keyword in [
'tool_use', 'tool_result', 'tool result', 'without', 'immediately after',
'corresponding', 'must have', 'each',
'tool_call_id', 'tool id', 'is not found', 'not found', 'tool_calls',
'must be a response to a preceeding message',
'2013', # MiniMax error code for tool_call_id mismatch
]) and ('400' in error_str_lower or 'status: 400' in error_str_lower
or 'invalid_request' in error_str_lower
or 'invalidparameter' in error_str_lower)
if is_context_overflow or is_message_format_error:
error_type = "context overflow" if is_context_overflow else "message format error"
logger.error(f"💥 {error_type} detected: {e}")
# Flush memory before trimming to preserve context that will be lost
if is_context_overflow and self.agent.memory_manager:
user_id = getattr(self.agent, '_current_user_id', None)
self.agent.memory_manager.flush_memory(
messages=self.messages, user_id=user_id,
reason="overflow", max_messages=0
)
# Strategy: try aggressive trimming first, only clear as last resort
if is_context_overflow and not _overflow_retry:
trimmed = self._aggressive_trim_for_overflow()
if trimmed:
logger.warning("🔄 Aggressively trimmed context, retrying...")
return self._call_llm_stream(
retry_on_empty=retry_on_empty,
retry_count=retry_count,
max_retries=max_retries,
_overflow_retry=True
)
# Aggressive trim didn't help or this is a message format error
# -> clear everything and also purge DB to prevent reload of dirty data
logger.warning("🔄 Clearing conversation history to recover")
self.messages.clear()
self._clear_session_db()
if is_context_overflow:
raise Exception(_t(
"抱歉,对话历史过长导致上下文溢出。我已清空历史记录,请重新描述你的需求。",
"Sorry, the conversation history got too long and overflowed the context. I've cleared the history — please describe your request again.",
))
else:
raise Exception(_t(
"抱歉,之前的对话出现了问题。我已清空历史记录,请重新发送你的消息。",
"Sorry, something went wrong with the earlier conversation. I've cleared the history — please send your message again.",
))
# Check if error is rate limit (429)
is_rate_limit = '429' in error_str_lower or 'rate limit' in error_str_lower
# Check if error is retryable (timeout, connection, server busy, etc.)
is_retryable = any(keyword in error_str_lower for keyword in [
'timeout', 'timed out', 'connection', 'network',
'rate limit', 'overloaded', 'unavailable', 'busy', 'retry',
'429', '500', '502', '503', '504', '512'
])
if is_retryable and retry_count < max_retries:
# Rate limit needs longer wait time
if is_rate_limit:
wait_time = 30 + (retry_count * 15) # 30s, 45s, 60s for rate limit
else:
wait_time = (retry_count + 1) * 2 # 2s, 4s, 6s for other errors
logger.warning(f"⚠️ LLM API error (attempt {retry_count + 1}/{max_retries}): {e}")
logger.info(f"Retrying in {wait_time}s...")
time.sleep(wait_time)
return self._call_llm_stream(
retry_on_empty=retry_on_empty,
retry_count=retry_count + 1,
max_retries=max_retries
)
else:
if retry_count >= max_retries:
logger.error(f"❌ LLM API error after {max_retries} retries: {e}", exc_info=True)
else:
logger.error(f"❌ LLM call error (non-retryable): {e}", exc_info=True)
raise
# Parse tool calls
tool_calls = []
for idx in sorted(tool_calls_buffer.keys()):
tc = tool_calls_buffer[idx]
# Ensure tool call has a valid ID (some providers return empty/None IDs)
tool_id = tc.get("id") or ""
if not tool_id:
import uuid
tool_id = f"call_{uuid.uuid4().hex[:24]}"
args_str = tc.get("arguments") or ""
arguments, parse_err = _parse_tool_args(args_str, stop_reason)
if parse_err:
logger.error(
f"Tool args parse failed for {tc['name']} ({len(args_str)} chars): {parse_err}"
)
tool_calls.append({
"id": tool_id,
"name": tc["name"],
"arguments": {},
"_parse_error": parse_err,
})
continue
tool_calls.append({
"id": tool_id,
"name": tc["name"],
"arguments": arguments
})
# Check for empty response and retry once if enabled
if retry_on_empty and not full_content and not tool_calls:
logger.warning(f"⚠️ LLM returned empty response (stop_reason: {stop_reason}), retrying once...")
self._emit_event("message_end", {
"content": "",
"tool_calls": [],
"empty_retry": True,
"stop_reason": stop_reason
})
# Retry without retry flag to avoid infinite loop
return self._call_llm_stream(
retry_on_empty=False,
retry_count=retry_count,
max_retries=max_retries
)
# Filter full_content one more time (in case tags were split across chunks)
full_content = self._filter_think_tags(full_content)
# Add assistant message to history (Claude format uses content blocks)
assistant_msg = {"role": "assistant", "content": []}
if full_reasoning:
stored_reasoning = _truncate_reasoning_for_storage(full_reasoning)
if len(stored_reasoning) < len(full_reasoning):
logger.info(
f"[reasoning] truncated for storage: "
f"{len(full_reasoning)} -> {len(stored_reasoning)} chars"
)
assistant_msg["content"].append({
"type": "thinking",
"thinking": stored_reasoning
})
if full_content:
assistant_msg["content"].append({
"type": "text",
"text": full_content
})
# Add tool_use blocks if present
if tool_calls:
for tc in tool_calls:
assistant_msg["content"].append({
"type": "tool_use",
"id": tc.get("id", ""),
"name": tc.get("name", ""),
"input": tc.get("arguments", {})
})
if gemini_raw_parts:
assistant_msg["_gemini_raw_parts"] = gemini_raw_parts
# Only append if content is not empty
if assistant_msg["content"]:
self.messages.append(assistant_msg)
self._emit_event("message_end", {
"content": full_content,
"tool_calls": tool_calls
})
return full_content, tool_calls
def _execute_tool(self, tool_call: Dict) -> Dict[str, Any]:
"""
Execute tool
Args:
tool_call: {"id": str, "name": str, "arguments": dict}
Returns:
Tool execution result
"""
tool_name = tool_call["name"]
tool_id = tool_call["id"]
arguments = tool_call["arguments"]
if "_parse_error" in tool_call:
result = {
"status": "error",
"result": tool_call["_parse_error"],
"execution_time": 0,
}
self._record_tool_result(tool_name, arguments, False)
return result
# Check for consecutive failures (retry protection)
should_stop, stop_reason, is_critical = self._check_consecutive_failures(tool_name, arguments)
if should_stop:
logger.error(f"🛑 {stop_reason}")
self._record_tool_result(tool_name, arguments, False)
if is_critical:
# Critical failure - abort entire conversation
result = {
"status": "critical_error",
"result": stop_reason,
"execution_time": 0
}
else:
# Normal failure - let LLM try different approach
result = {
"status": "error",
"result": f"{stop_reason}\n\n当前方法行不通,请尝试完全不同的方法或向用户询问更多信息。",
"execution_time": 0
}
return result
self._emit_event("tool_execution_start", {
"tool_call_id": tool_id,
"tool_name": tool_name,
"arguments": arguments
})
try:
tool = self.tools.get(tool_name)
if not tool:
raise ValueError(self._build_tool_not_found_message(tool_name))
# Set tool context
tool.model = self.model
tool.context = self.agent
# Execute tool
start_time = time.time()
result: ToolResult = tool.execute_tool(arguments)
execution_time = time.time() - start_time
result_dict = {
"status": result.status,
"result": result.result,
"execution_time": execution_time
}
# Record tool result for failure tracking
success = result.status == "success"
self._record_tool_result(tool_name, arguments, success)
# Auto-refresh skills after skill creation
if tool_name == "bash" and result.status == "success":
command = arguments.get("command", "")
if "init_skill.py" in command and self.agent.skill_manager:
logger.info("Detected skill creation, refreshing skills...")
self.agent.refresh_skills()
logger.info(f"Skills refreshed! Now have {len(self.agent.skill_manager.skills)} skills")
self._emit_event("tool_execution_end", {
"tool_call_id": tool_id,
"tool_name": tool_name,
**result_dict
})
return result_dict
except Exception as e:
logger.error(f"Tool execution error: {e}")
error_result = {
"status": "error",
"result": str(e),
"execution_time": 0
}
# Record failure
self._record_tool_result(tool_name, arguments, False)
self._emit_event("tool_execution_end", {
"tool_call_id": tool_id,
"tool_name": tool_name,
**error_result
})
return error_result
def _build_tool_not_found_message(self, tool_name: str) -> str:
"""Build a helpful error message when a tool is not found.
If a skill with the same name exists in skill_manager, read its
SKILL.md and include the content so the LLM knows how to use it.
"""
available_tools = list(self.tools.keys())
base_msg = f"Tool '{tool_name}' not found. Available tools: {available_tools}"
skill_manager = getattr(self.agent, 'skill_manager', None)
if not skill_manager:
return base_msg
skill_entry = skill_manager.get_skill(tool_name)
if not skill_entry:
return base_msg
skill = skill_entry.skill
skill_md_path = skill.file_path
skill_content = ""
try:
with open(skill_md_path, 'r', encoding='utf-8') as f:
skill_content = f.read()
except Exception:
skill_content = skill.description
logger.info(
f"[Agent] Tool '{tool_name}' not found, but matched skill '{skill.name}'. "
f"Guiding LLM to use the skill instead."
)
return (
f"Tool '{tool_name}' is not a built-in tool, but a matching skill "
f"'{skill.name}' is available. You should use existing tools (e.g. bash with curl) "
f"to accomplish this task following the skill instructions below:\n\n"
f"--- SKILL: {skill.name} (path: {skill_md_path}) ---\n"
f"{skill_content}\n"
f"--- END SKILL ---\n\n"
f"Available tools: {available_tools}"
)
def _validate_and_fix_messages(self):
"""Delegate to the shared sanitizer (see message_sanitizer.py)."""
sanitize_claude_messages(self.messages)
def _identify_complete_turns(self) -> List[Dict]:
"""
识别完整的对话轮次
一个完整轮次包括:
1. 用户消息text
2. AI 回复(可能包含 tool_use
3. 工具结果tool_result如果有
4. 后续 AI 回复(如果有)
Returns:
List of turns, each turn is a dict with 'messages' list
"""
turns = []
current_turn = {'messages': []}
for msg in self.messages:
role = msg.get('role')
content = msg.get('content', [])
if role == 'user':
# Determine if this is a real user query (not a tool_result injection
# or an internal hint message injected by the agent loop).
is_user_query = False
has_tool_result = False
if isinstance(content, list):
has_text = any(
isinstance(block, dict) and block.get('type') == 'text'
for block in content
)
has_tool_result = any(
isinstance(block, dict) and block.get('type') == 'tool_result'
for block in content
)
# A message with tool_result is always internal, even if it
# also contains text blocks (shouldn't happen, but be safe).
is_user_query = has_text and not has_tool_result
elif isinstance(content, str):
is_user_query = True
if is_user_query:
if current_turn['messages']:
turns.append(current_turn)
current_turn = {'messages': [msg]}
else:
current_turn['messages'].append(msg)
else:
# AI 回复,属于当前轮次
current_turn['messages'].append(msg)
# 添加最后一个轮次
if current_turn['messages']:
turns.append(current_turn)
return turns
def _estimate_turn_tokens(self, turn: Dict) -> int:
"""估算一个轮次的 tokens"""
return sum(
self.agent._estimate_message_tokens(msg)
for msg in turn['messages']
)
def _truncate_historical_tool_results(self):
"""
Truncate tool_result content in historical messages to reduce context size.
Current turn results are kept at 30K chars (truncated at creation time).
Historical turn results are further truncated to 10K chars here.
This runs before token-based trimming so that we first shrink oversized
results, potentially avoiding the need to drop entire turns.
"""
MAX_HISTORY_RESULT_CHARS = 20000
if len(self.messages) < 2:
return
# Find where the last user text message starts (= current turn boundary)
# We skip the current turn's messages to preserve their full content
current_turn_start = len(self.messages)
for i in range(len(self.messages) - 1, -1, -1):
msg = self.messages[i]
if msg.get("role") == "user":
content = msg.get("content", [])
if isinstance(content, list) and any(
isinstance(b, dict) and b.get("type") == "text" for b in content
):
current_turn_start = i
break
elif isinstance(content, str):
current_turn_start = i
break
truncated_count = 0
for i in range(current_turn_start):
msg = self.messages[i]
if msg.get("role") != "user":
continue
content = msg.get("content", [])
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict) or block.get("type") != "tool_result":
continue
result_str = block.get("content", "")
if isinstance(result_str, str) and len(result_str) > MAX_HISTORY_RESULT_CHARS:
original_len = len(result_str)
block["content"] = result_str[:MAX_HISTORY_RESULT_CHARS] + \
f"\n\n[Historical output truncated: {original_len} -> {MAX_HISTORY_RESULT_CHARS} chars]"
truncated_count += 1
if truncated_count > 0:
logger.info(f"📎 Truncated {truncated_count} historical tool result(s) to {MAX_HISTORY_RESULT_CHARS} chars")
def _aggressive_trim_for_overflow(self) -> bool:
"""
Aggressively trim context when a real overflow error is returned by the API.
This method goes beyond normal _trim_messages by:
1. Truncating all tool results (including current turn) to a small limit
2. Keeping only the last 5 complete conversation turns
3. Truncating overly long user messages
Returns:
True if messages were trimmed (worth retrying), False if nothing left to trim
"""
if not self.messages:
return False
original_count = len(self.messages)
# Step 1: Aggressively truncate ALL tool results to 5K chars
AGGRESSIVE_LIMIT = 10000
truncated = 0
for msg in self.messages:
content = msg.get("content", [])
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict):
continue
# Truncate tool_result blocks
if block.get("type") == "tool_result":
result_str = block.get("content", "")
if isinstance(result_str, str) and len(result_str) > AGGRESSIVE_LIMIT:
block["content"] = (
result_str[:AGGRESSIVE_LIMIT]
+ f"\n\n[Truncated for context recovery: "
f"{len(result_str)} -> {AGGRESSIVE_LIMIT} chars]"
)
truncated += 1
# Truncate tool_use input blocks (e.g. large write content)
if block.get("type") == "tool_use" and isinstance(block.get("input"), dict):
input_str = json.dumps(block["input"], ensure_ascii=False)
if len(input_str) > AGGRESSIVE_LIMIT:
# Keep only a summary of the input
for key, val in block["input"].items():
if isinstance(val, str) and len(val) > 1000:
block["input"][key] = (
val[:1000]
+ f"... [truncated {len(val)} chars]"
)
truncated += 1
# Step 2: Truncate overly long user text messages (e.g. pasted content)
USER_MSG_LIMIT = 10000
for msg in self.messages:
if msg.get("role") != "user":
continue
content = msg.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if len(text) > USER_MSG_LIMIT:
block["text"] = (
text[:USER_MSG_LIMIT]
+ f"\n\n[Message truncated for context recovery: "
f"{len(text)} -> {USER_MSG_LIMIT} chars]"
)
truncated += 1
elif isinstance(content, str) and len(content) > USER_MSG_LIMIT:
msg["content"] = (
content[:USER_MSG_LIMIT]
+ f"\n\n[Message truncated for context recovery: "
f"{len(content)} -> {USER_MSG_LIMIT} chars]"
)
truncated += 1
# Step 3: Keep only the last 5 complete turns
turns = self._identify_complete_turns()
if len(turns) > 5:
kept_turns = turns[-5:]
new_messages = []
for turn in kept_turns:
new_messages.extend(turn["messages"])
removed = len(turns) - 5
self.messages[:] = new_messages
logger.info(
f"🔧 Aggressive trim: removed {removed} old turns, "
f"truncated {truncated} large blocks, "
f"{original_count} -> {len(self.messages)} messages"
)
return True
if truncated > 0:
logger.info(
f"🔧 Aggressive trim: truncated {truncated} large blocks "
f"(no turns removed, only {len(turns)} turn(s) left)"
)
return True
# Nothing left to trim
logger.warning("🔧 Aggressive trim: nothing to trim, will clear history")
return False
def _build_context_summary_callback(self, discarded_turns: list, kept_turns: list):
"""
Build a callback that injects an LLM summary into the first user
message of *kept_turns*. Returns None if no valid injection target.
The callback is passed to flush_from_messages so that the same LLM
call that writes daily memory also provides the in-context summary.
"""
if not kept_turns:
return None
# Find the first user text block in kept_turns as injection target
target_block = None
for turn in kept_turns:
for msg in turn["messages"]:
if msg.get("role") == "user":
content = msg.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
target_block = block
break
if target_block:
break
if target_block:
break
if not target_block:
return None
turn_count = len(discarded_turns)
original_text = target_block["text"]
def _on_summary_ready(summary: str):
if not summary or not summary.strip():
return
target_block["text"] = (
f"[System: Previous conversation summary — "
f"{turn_count} turns were compacted]\n\n"
f"{summary.strip()}\n\n"
f"The recent conversation continues below.\n\n---\n\n"
f"{original_text}"
)
logger.info(
f"📝 Context summary injected "
f"({len(summary)} chars, {turn_count} turns)"
)
return _on_summary_ready
def _trim_messages(self):
"""
智能清理消息历史,保持对话完整性
使用完整轮次作为清理单位,确保:
1. 不会在对话中间截断
2. 工具调用链tool_use + tool_result保持完整
3. 每轮对话都是完整的(用户消息 + AI回复 + 工具调用)
"""
if not self.messages or not self.agent:
return
# Step 0: Truncate large tool results in historical turns (30K -> 10K)
self._truncate_historical_tool_results()
# Step 1: 识别完整轮次
turns = self._identify_complete_turns()
if not turns:
return
# Step 2: 轮次限制 - 超出时移除前一半,保留后一半
if len(turns) > self.max_context_turns:
removed_count = len(turns) // 2
keep_count = len(turns) - removed_count
discarded_turns = turns[:removed_count]
turns = turns[-keep_count:]
logger.info(
f"💾 Context turns exceeded: {keep_count + removed_count} > {self.max_context_turns}, "
f"trimmed to {keep_count} turns (removed {removed_count})"
)
# Flush to daily memory + inject context summary (single async LLM call)
if self.agent.memory_manager:
discarded_messages = []
for turn in discarded_turns:
discarded_messages.extend(turn["messages"])
if discarded_messages:
user_id = getattr(self.agent, '_current_user_id', None)
cb = self._build_context_summary_callback(discarded_turns, turns)
self.agent.memory_manager.flush_memory(
messages=discarded_messages, user_id=user_id,
reason="trim", max_messages=0,
context_summary_callback=cb,
)
# Step 3: Token 限制 - 保留完整轮次
# Get context window from agent (based on model)
context_window = self.agent._get_model_context_window()
# Use configured max_context_tokens if available
if hasattr(self.agent, 'max_context_tokens') and self.agent.max_context_tokens:
max_tokens = self.agent.max_context_tokens
else:
# Reserve 10% for response generation
reserve_tokens = int(context_window * 0.1)
max_tokens = context_window - reserve_tokens
# Estimate system prompt tokens
system_tokens = self.agent._estimate_message_tokens({"role": "system", "content": self.system_prompt})
available_tokens = max_tokens - system_tokens
# Calculate current tokens
current_tokens = sum(self._estimate_turn_tokens(turn) for turn in turns)
# If under limit, reconstruct messages and return
if current_tokens + system_tokens <= max_tokens:
# Reconstruct message list from turns
new_messages = []
for turn in turns:
new_messages.extend(turn['messages'])
old_count = len(self.messages)
self.messages = new_messages
# Log if we removed messages due to turn limit
if old_count > len(self.messages):
logger.info(f" Rebuilt message list: {old_count} -> {len(self.messages)} messages")
return
# Token limit exceeded — tiered strategy based on turn count:
#
# Few turns (<5): Compress ALL turns to text-only (strip tool chains,
# keep user query + final reply). Never discard turns
# — losing even one is too painful when context is thin.
#
# Many turns (>=5): Directly discard the first half of turns.
# With enough turns the oldest ones are less
# critical, and keeping the recent half intact
# (with full tool chains) is more useful.
COMPRESS_THRESHOLD = 5
if len(turns) < COMPRESS_THRESHOLD:
# --- Few turns: compress ALL turns to text-only, never discard ---
compressed_turns = []
for t in turns:
compressed = compress_turn_to_text_only(t)
if compressed["messages"]:
compressed_turns.append(compressed)
new_messages = []
for turn in compressed_turns:
new_messages.extend(turn["messages"])
new_tokens = sum(self._estimate_turn_tokens(t) for t in compressed_turns)
old_count = len(self.messages)
self.messages = new_messages
logger.info(
f"📦 Context tokens exceeded (turns<{COMPRESS_THRESHOLD}): "
f"~{current_tokens + system_tokens} > {max_tokens}, "
f"compressed all {len(turns)} turns to plain text "
f"({old_count} -> {len(self.messages)} messages, "
f"~{current_tokens + system_tokens} -> ~{new_tokens + system_tokens} tokens)"
)
return
# --- Many turns (>=5): discard the older half, keep the newer half ---
removed_count = len(turns) // 2
keep_count = len(turns) - removed_count
discarded_turns = turns[:removed_count]
kept_turns = turns[-keep_count:]
kept_tokens = sum(self._estimate_turn_tokens(t) for t in kept_turns)
logger.info(
f"🔄 Context tokens exceeded: ~{current_tokens + system_tokens} > {max_tokens}, "
f"trimmed to {keep_count} turns (removed {removed_count})"
)
if self.agent.memory_manager:
discarded_messages = []
for turn in discarded_turns:
discarded_messages.extend(turn["messages"])
if discarded_messages:
user_id = getattr(self.agent, '_current_user_id', None)
cb = self._build_context_summary_callback(discarded_turns, kept_turns)
self.agent.memory_manager.flush_memory(
messages=discarded_messages, user_id=user_id,
reason="trim", max_messages=0,
context_summary_callback=cb,
)
new_messages = []
for turn in kept_turns:
new_messages.extend(turn['messages'])
old_count = len(self.messages)
self.messages = new_messages
logger.info(
f" Removed {removed_count} turns "
f"({old_count} -> {len(self.messages)} messages, "
f"~{current_tokens + system_tokens} -> ~{kept_tokens + system_tokens} tokens)"
)
def _clear_session_db(self):
"""
Clear the current session's persisted messages from SQLite DB.
This prevents dirty data (broken tool_use/tool_result pairs) from being
reloaded on the next request or after a restart.
"""
try:
session_id = getattr(self.agent, '_current_session_id', None)
if not session_id:
return
from agent.memory import get_conversation_store
store = get_conversation_store()
store.clear_session(session_id)
logger.info(f"🗑️ Cleared dirty session data from DB: {session_id}")
except Exception as e:
logger.warning(f"Failed to clear session DB: {e}")
def _prepare_messages(self) -> List[Dict[str, Any]]:
"""
Prepare messages to send to LLM
Note: For Claude API, system prompt should be passed separately via system parameter,
not as a message. The AgentLLMModel will handle this.
"""
# Don't add system message here - it will be handled separately by the LLM adapter
return self.messages