Files
chatgpt-on-wechat/bridge/agent_bridge.py

997 lines
41 KiB
Python

"""
Agent Bridge - Integrates Agent system with existing COW bridge
"""
import os
from typing import Optional, List
from agent.protocol import Agent, LLMModel, LLMRequest
from bridge.agent_event_handler import AgentEventHandler
from bridge.agent_initializer import AgentInitializer
from bridge.bridge import Bridge
from bridge.context import Context
from bridge.reply import Reply, ReplyType
from common import const
from common.log import logger
from common.utils import expand_path
from config import conf
from models.openai_compatible_bot import OpenAICompatibleBot
def add_openai_compatible_support(bot_instance):
"""
Dynamically add OpenAI-compatible tool calling support to a bot instance.
This allows any bot to gain tool calling capability without modifying its code,
as long as it uses OpenAI-compatible API format.
Note: Some bots like ZHIPUAIBot have native tool calling support and don't need enhancement.
"""
if hasattr(bot_instance, 'call_with_tools'):
# Bot already has tool calling support (e.g., ZHIPUAIBot)
logger.debug(f"[AgentBridge] {type(bot_instance).__name__} already has native tool calling support")
return bot_instance
# Create a temporary mixin class that combines the bot with OpenAI compatibility
class EnhancedBot(bot_instance.__class__, OpenAICompatibleBot):
"""Dynamically enhanced bot with OpenAI-compatible tool calling"""
def get_api_config(self):
"""
Infer API config from common configuration patterns.
Most OpenAI-compatible bots use similar configuration.
"""
from config import conf
return {
'api_key': conf().get("open_ai_api_key"),
'api_base': conf().get("open_ai_api_base"),
'model': conf().get("model", "gpt-3.5-turbo"),
'default_temperature': conf().get("temperature", 0.9),
'default_top_p': conf().get("top_p", 1.0),
'default_frequency_penalty': conf().get("frequency_penalty", 0.0),
'default_presence_penalty': conf().get("presence_penalty", 0.0),
}
# Change the bot's class to the enhanced version
bot_instance.__class__ = EnhancedBot
logger.info(
f"[AgentBridge] Enhanced {bot_instance.__class__.__bases__[0].__name__} with OpenAI-compatible tool calling")
return bot_instance
class AgentLLMModel(LLMModel):
"""
LLM Model adapter that uses COW's existing bot infrastructure
"""
_MODEL_BOT_TYPE_MAP = {
"wenxin": const.BAIDU, "wenxin-4": const.BAIDU,
"xunfei": const.XUNFEI, const.QWEN: const.QWEN_DASHSCOPE,
const.QIANFAN: const.QIANFAN,
const.MODELSCOPE: const.MODELSCOPE,
}
_MODEL_PREFIX_MAP = [
("qwen", const.QWEN_DASHSCOPE), ("qwq", const.QWEN_DASHSCOPE), ("qvq", const.QWEN_DASHSCOPE),
("gemini", const.GEMINI), ("glm", const.ZHIPU_AI), ("claude", const.CLAUDEAPI),
("moonshot", const.MOONSHOT), ("kimi", const.MOONSHOT),
("doubao", const.DOUBAO), ("deepseek", const.DEEPSEEK),
("ernie", const.QIANFAN),
]
def __init__(self, bridge: Bridge, bot_type: str = "chat"):
super().__init__(model=conf().get("model", const.GPT_41))
self.bridge = bridge
self.bot_type = bot_type
self._bot = None
self._bot_model = None
@property
def model(self):
return conf().get("model", const.GPT_41)
@model.setter
def model(self, value):
pass
def _resolve_bot_type(self, model_name: str) -> str:
"""Resolve bot type from model name, matching Bridge.__init__ logic."""
if conf().get("use_linkai", False) and conf().get("linkai_api_key"):
return const.LINKAI
# Support custom bot type configuration
configured_bot_type = conf().get("bot_type")
if configured_bot_type:
return configured_bot_type
if not model_name or not isinstance(model_name, str):
return const.OPENAI
if model_name in self._MODEL_BOT_TYPE_MAP:
return self._MODEL_BOT_TYPE_MAP[model_name]
if model_name.lower().startswith("minimax") or model_name in ["abab6.5-chat"]:
return const.MiniMax
if model_name in [const.QWEN_TURBO, const.QWEN_PLUS, const.QWEN_MAX]:
return const.QWEN_DASHSCOPE
if model_name in [const.MOONSHOT, "moonshot-v1-8k", "moonshot-v1-32k", "moonshot-v1-128k"]:
return const.MOONSHOT
if conf().get("bot_type") == "modelscope":
return const.MODELSCOPE
lowered_model = model_name.lower()
for prefix, btype in self._MODEL_PREFIX_MAP:
if lowered_model.startswith(prefix):
return btype
return const.OPENAI
@property
def bot(self):
"""Lazy load the bot, re-create when model or bot_type changes"""
from models.bot_factory import create_bot
cur_model = self.model
cur_bot_type = self._resolve_bot_type(cur_model)
if self._bot is None or self._bot_model != cur_model or getattr(self, '_bot_type', None) != cur_bot_type:
self._bot = create_bot(cur_bot_type)
self._bot = add_openai_compatible_support(self._bot)
self._bot_model = cur_model
self._bot_type = cur_bot_type
return self._bot
def call(self, request: LLMRequest):
"""
Call the model using COW's bot infrastructure
"""
try:
# For non-streaming calls, we'll use the existing reply method
# This is a simplified implementation
if hasattr(self.bot, 'call_with_tools'):
# Use tool-enabled call if available
kwargs = {
'messages': request.messages,
'tools': getattr(request, 'tools', None),
'stream': False,
'model': self.model # Pass model parameter
}
# Only pass max_tokens if it's explicitly set
if request.max_tokens is not None:
kwargs['max_tokens'] = request.max_tokens
# Extract system prompt if present
system_prompt = getattr(request, 'system', None)
if system_prompt:
kwargs['system'] = system_prompt
# Pass context metadata to bot
channel_type = getattr(self, 'channel_type', None) or ''
if channel_type:
kwargs['channel_type'] = channel_type
session_id = getattr(self, 'session_id', None)
if session_id:
kwargs['session_id'] = session_id
# Thinking mode is a global toggle independent of the channel.
# IM channels (WeChat/WeCom/DingTalk/Feishu) won't render the
# reasoning trace, but still benefit from the higher answer
# quality the thinking pass produces.
from config import conf
thinking_enabled = bool(conf().get("enable_thinking", False))
kwargs['thinking'] = (
{"type": "enabled"} if thinking_enabled
else {"type": "disabled"}
)
# Reasoning effort is only meaningful when thinking is on.
# Bots that don't understand the kwarg drop it silently.
if thinking_enabled:
effort = conf().get("reasoning_effort", "high")
if effort in ("high", "max"):
kwargs['reasoning_effort'] = effort
response = self.bot.call_with_tools(**kwargs)
return self._format_response(response)
else:
# Fallback to regular call
# This would need to be implemented based on your specific needs
raise NotImplementedError("Regular call not implemented yet")
except Exception as e:
logger.error(f"AgentLLMModel call error: {e}")
raise
def call_stream(self, request: LLMRequest):
"""
Call the model with streaming using COW's bot infrastructure
"""
try:
if hasattr(self.bot, 'call_with_tools'):
# Use tool-enabled streaming call if available
# Extract system prompt if present
system_prompt = getattr(request, 'system', None)
# Build kwargs for call_with_tools
kwargs = {
'messages': request.messages,
'tools': getattr(request, 'tools', None),
'stream': True,
'model': self.model # Pass model parameter
}
# Only pass max_tokens if explicitly set, let the bot use its default
if request.max_tokens is not None:
kwargs['max_tokens'] = request.max_tokens
# Add system prompt if present
if system_prompt:
kwargs['system'] = system_prompt
# Pass context metadata to bot
channel_type = getattr(self, 'channel_type', None) or ''
if channel_type:
kwargs['channel_type'] = channel_type
session_id = getattr(self, 'session_id', None)
if session_id:
kwargs['session_id'] = session_id
# Thinking mode is a global toggle independent of the channel.
# IM channels (WeChat/WeCom/DingTalk/Feishu) won't render the
# reasoning trace, but still benefit from the higher answer
# quality the thinking pass produces.
from config import conf
thinking_enabled = bool(conf().get("enable_thinking", False))
kwargs['thinking'] = (
{"type": "enabled"} if thinking_enabled
else {"type": "disabled"}
)
# Reasoning effort is only meaningful when thinking is on.
# Bots that don't understand the kwarg drop it silently.
if thinking_enabled:
effort = conf().get("reasoning_effort", "high")
if effort in ("high", "max"):
kwargs['reasoning_effort'] = effort
stream = self.bot.call_with_tools(**kwargs)
# Convert stream format to our expected format
for chunk in stream:
yield self._format_stream_chunk(chunk)
else:
bot_type = type(self.bot).__name__
raise NotImplementedError(f"Bot {bot_type} does not support call_with_tools. Please add the method.")
except Exception as e:
logger.error(f"AgentLLMModel call_stream error: {e}", exc_info=True)
raise
def _format_response(self, response):
"""Format Claude response to our expected format"""
# This would need to be implemented based on Claude's response format
return response
def _format_stream_chunk(self, chunk):
"""Format Claude stream chunk to our expected format"""
# This would need to be implemented based on Claude's stream format
return chunk
class AgentBridge:
"""
Bridge class that integrates super Agent with COW
Manages multiple agent instances per session for conversation isolation
"""
def __init__(self, bridge: Bridge):
self.bridge = bridge
self.agents = {} # session_id -> Agent instance mapping
self.default_agent = None # For backward compatibility (no session_id)
self.agent: Optional[Agent] = None
self.scheduler_initialized = False
# Create helper instances
self.initializer = AgentInitializer(bridge, self)
def create_agent(self, system_prompt: str, tools: List = None, **kwargs) -> Agent:
"""
Create the super agent with COW integration
Args:
system_prompt: System prompt
tools: List of tools (optional)
**kwargs: Additional agent parameters
Returns:
Agent instance
"""
# Create LLM model that uses COW's bot infrastructure
model = AgentLLMModel(self.bridge)
# Default tools if none provided
if tools is None:
# Use ToolManager to load all available tools
from agent.tools import ToolManager
tool_manager = ToolManager()
tool_manager.load_tools()
tools = []
workspace_dir = kwargs.get("workspace_dir")
for tool_name in tool_manager.tool_classes.keys():
try:
tool = tool_manager.create_tool(tool_name)
if tool:
if workspace_dir and hasattr(tool, 'cwd'):
tool.cwd = workspace_dir
tools.append(tool)
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load tool {tool_name}: {e}")
# Create agent instance
agent = Agent(
system_prompt=system_prompt,
description=kwargs.get("description", "AI Super Agent"),
model=model,
tools=tools,
max_steps=kwargs.get("max_steps", 15),
output_mode=kwargs.get("output_mode", "logger"),
workspace_dir=kwargs.get("workspace_dir"),
skill_manager=kwargs.get("skill_manager"),
enable_skills=kwargs.get("enable_skills", True),
memory_manager=kwargs.get("memory_manager"),
max_context_tokens=kwargs.get("max_context_tokens"),
context_reserve_tokens=kwargs.get("context_reserve_tokens"),
runtime_info=kwargs.get("runtime_info"),
)
# Log skill loading details
if agent.skill_manager:
logger.debug(f"[AgentBridge] SkillManager initialized with {len(agent.skill_manager.skills)} skills")
return agent
def get_agent(self, session_id: str = None) -> Optional[Agent]:
"""
Get agent instance for the given session
Args:
session_id: Session identifier (e.g., user_id). If None, returns default agent.
Returns:
Agent instance for this session
"""
# If no session_id, use default agent (backward compatibility)
if session_id is None:
if self.default_agent is None:
self._init_default_agent()
return self.default_agent
# Check if agent exists for this session
if session_id not in self.agents:
self._init_agent_for_session(session_id)
return self.agents[session_id]
def _init_default_agent(self):
"""Initialize default super agent"""
agent = self.initializer.initialize_agent(session_id=None)
self.default_agent = agent
def _init_agent_for_session(self, session_id: str):
"""Initialize agent for a specific session"""
agent = self.initializer.initialize_agent(session_id=session_id)
self.agents[session_id] = agent
def agent_reply(self, query: str, context: Context = None,
on_event=None, clear_history: bool = False) -> Reply:
"""
Use super agent to reply to a query
Args:
query: User query
context: COW context (optional, contains session_id for user isolation)
on_event: Event callback (optional)
clear_history: Whether to clear conversation history
Returns:
Reply object
"""
session_id = None
agent = None
try:
# Extract session_id from context for user isolation
if context:
session_id = context.kwargs.get("session_id") or context.get("session_id")
# Get agent for this session (will auto-initialize if needed)
agent = self.get_agent(session_id=session_id)
if not agent:
return Reply(ReplyType.ERROR, "Failed to initialize super agent")
# Create event handler for logging and channel communication
event_handler = AgentEventHandler(context=context, original_callback=on_event)
# Filter tools based on context
original_tools = agent.tools
filtered_tools = original_tools
# If this is a scheduled task execution, exclude scheduler tool to prevent recursion
if context and context.get("is_scheduled_task"):
filtered_tools = [tool for tool in agent.tools if tool.name != "scheduler"]
agent.tools = filtered_tools
logger.info(f"[AgentBridge] Scheduled task execution: excluded scheduler tool ({len(filtered_tools)}/{len(original_tools)} tools)")
else:
# Attach context to scheduler tool if present
if context and agent.tools:
for tool in agent.tools:
if tool.name == "scheduler":
try:
from agent.tools.scheduler.integration import attach_scheduler_to_tool
attach_scheduler_to_tool(tool, context)
except Exception as e:
logger.warning(f"[AgentBridge] Failed to attach context to scheduler: {e}")
break
# Pass context metadata to model for downstream API requests
if context and hasattr(agent, 'model'):
agent.model.channel_type = context.get("channel_type", "")
agent.model.session_id = session_id or ""
# Store session_id on agent so executor can clear DB on fatal errors
agent._current_session_id = session_id
# Bound the in-memory context for scheduler sessions before each run.
# Scheduler sessions are stable per-task and append every trigger,
# so without trimming they would grow unbounded across runs and
# blow up prompt cost. Regular user chats are not touched here —
# the agent's own context manager handles that path.
if session_id and session_id.startswith("scheduler_"):
from config import conf
scheduler_keep_turns = max(
1, int(conf().get("agent_max_context_turns", 20)) // 5
)
self._trim_in_memory_to_turns(agent, scheduler_keep_turns)
try:
# Use agent's run_stream method with event handler
response = agent.run_stream(
user_message=query,
on_event=event_handler.handle_event,
clear_history=clear_history
)
finally:
# Restore original tools
if context and context.get("is_scheduled_task"):
agent.tools = original_tools
# Log execution summary
event_handler.log_summary()
# Persist new messages generated during this run
if session_id:
channel_type = (context.get("channel_type") or "") if context else ""
new_messages = getattr(agent, '_last_run_new_messages', [])
if new_messages:
self._persist_messages(session_id, list(new_messages), channel_type)
else:
with agent.messages_lock:
msg_count = len(agent.messages)
if msg_count == 0:
try:
from agent.memory import get_conversation_store
get_conversation_store().clear_session(session_id)
logger.info(f"[AgentBridge] Cleared DB for recovered session: {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to clear DB after recovery: {e}")
# Post-message hot-reload: detect edits to ~/cow/mcp.json and
# sync any new/removed MCP tools into the live agent in the
# background. Off the critical path so user latency is unaffected;
# changes take effect on the user's next message.
self._schedule_mcp_hot_reload(agent)
# Check if there are files to send (from send/read tool)
if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'):
files_to_send = agent.stream_executor.files_to_send
if files_to_send:
# Send the first file (for now, handle one file at a time)
file_info = files_to_send[0]
logger.info(f"[AgentBridge] Sending file: {file_info.get('path')}")
# Clear files_to_send for next request
agent.stream_executor.files_to_send = []
# Return file reply based on file type
return self._create_file_reply(file_info, response, context)
return Reply(ReplyType.TEXT, response)
except Exception as e:
logger.error(f"Agent reply error: {e}")
# If the agent cleared its messages due to format error / overflow,
# also purge the DB so the next request starts clean.
if session_id and agent:
try:
with agent.messages_lock:
msg_count = len(agent.messages)
if msg_count == 0:
from agent.memory import get_conversation_store
get_conversation_store().clear_session(session_id)
logger.info(f"[AgentBridge] Cleared DB for session after error: {session_id}")
except Exception as db_err:
logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}")
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
def _schedule_mcp_hot_reload(self, agent):
"""
Fire-and-forget: detect mcp.json edits and reconcile the agent's
tool dict in the background. Runs after the user's reply is sent,
so any cost (file stat, hash, server boot) never adds to user latency.
Failures are isolated and never raise into the message pipeline.
"""
import threading
from agent.tools import ToolManager
def _run():
try:
tm = ToolManager()
tm.refresh_mcp_if_changed()
added, removed = tm.sync_mcp_into_agent(agent)
if added or removed:
logger.info(
f"[AgentBridge] Agent tools synced — "
f"added={added}, removed={removed}"
)
except Exception as e:
logger.warning(f"[AgentBridge] MCP hot-reload failed (non-fatal): {e}")
threading.Thread(target=_run, daemon=True, name="mcp-hot-reload").start()
def _create_file_reply(self, file_info: dict, text_response: str, context: Context = None) -> Reply:
"""
Create a reply for sending files
Args:
file_info: File metadata from read tool
text_response: Text response from agent
context: Context object
Returns:
Reply object for file sending
"""
file_type = file_info.get("file_type", "file")
file_path = file_info.get("path")
# For images, use IMAGE_URL type (channel will handle upload)
if file_type == "image":
# Convert local path to file:// URL for channel processing
file_url = f"file://{file_path}"
logger.info(f"[AgentBridge] Sending image: {file_url}")
reply = Reply(ReplyType.IMAGE_URL, file_url)
# Attach text message if present (for channels that support text+image)
if text_response:
reply.text_content = text_response # Store accompanying text
return reply
# For all file types (document, video, audio), use FILE type
if file_type in ["document", "video", "audio"]:
file_url = f"file://{file_path}"
logger.info(f"[AgentBridge] Sending {file_type}: {file_url}")
reply = Reply(ReplyType.FILE, file_url)
reply.file_name = file_info.get("file_name", os.path.basename(file_path))
# Attach text message if present
if text_response:
reply.text_content = text_response
return reply
# For all other file types (tar.gz, zip, etc.), also use FILE type
file_url = f"file://{file_path}"
logger.info(f"[AgentBridge] Sending generic file: {file_url}")
reply = Reply(ReplyType.FILE, file_url)
reply.file_name = file_info.get("file_name", os.path.basename(file_path))
if text_response:
reply.text_content = text_response
return reply
def _migrate_config_to_env(self, workspace_root: str):
"""
Sync API keys from config.json to .env file.
Adds new keys and updates changed values on each startup.
Args:
workspace_root: Workspace directory path (not used, kept for compatibility)
"""
from config import conf
import os
key_mapping = {
"open_ai_api_key": "OPENAI_API_KEY",
"open_ai_api_base": "OPENAI_API_BASE",
"gemini_api_key": "GEMINI_API_KEY",
"claude_api_key": "CLAUDE_API_KEY",
"linkai_api_key": "LINKAI_API_KEY",
}
env_file = expand_path("~/.cow/.env")
# Read existing env vars (key -> value)
existing_env_vars = {}
if os.path.exists(env_file):
try:
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, val = line.split('=', 1)
existing_env_vars[key.strip()] = val.strip()
except Exception as e:
logger.warning(f"[AgentBridge] Failed to read .env file: {e}")
# Sync config.json values into .env (add/update/remove)
updated = False
for config_key, env_key in key_mapping.items():
raw = conf().get(config_key, "")
value = raw.strip() if raw else ""
old_value = existing_env_vars.get(env_key)
if value:
if old_value == value:
continue
existing_env_vars[env_key] = value
os.environ[env_key] = value
updated = True
else:
if old_value is None:
continue
existing_env_vars.pop(env_key, None)
os.environ.pop(env_key, None)
updated = True
updated = True
if updated:
try:
env_dir = os.path.dirname(env_file)
os.makedirs(env_dir, exist_ok=True)
with open(env_file, 'w', encoding='utf-8') as f:
f.write('# Environment variables for agent\n')
f.write('# Auto-managed - synced from config.json on startup\n\n')
for key, value in sorted(existing_env_vars.items()):
f.write(f'{key}={value}\n')
logger.info(f"[AgentBridge] Synced API keys from config.json to .env")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to sync API keys: {e}")
def _persist_messages(
self, session_id: str, new_messages: list, channel_type: str = ""
) -> None:
"""
Persist new messages to the conversation store after each agent run.
Failures are logged but never propagate — they must not interrupt replies.
"""
if not new_messages:
return
try:
from config import conf
if not conf().get("conversation_persistence", True):
return
# When deep-thinking display is disabled, strip "thinking" content
# blocks before persisting so they don't resurface on history reload.
# The in-memory message list keeps them intact for this run's
# multi-turn LLM context.
thinking_enabled = bool(conf().get("enable_thinking", False))
except Exception:
thinking_enabled = False
messages_to_store = new_messages
if not thinking_enabled:
messages_to_store = self._strip_thinking_blocks(new_messages)
try:
from agent.memory import get_conversation_store
get_conversation_store().append_messages(
session_id, messages_to_store, channel_type=channel_type
)
except Exception as e:
logger.warning(
f"[AgentBridge] Failed to persist messages for session={session_id}: {e}"
)
# Marker used to identify scheduler-injected user messages so we can apply
# a sliding window without touching real user turns. The legacy prefix
# "Scheduled task" (written by the v2 PR) is also recognised when pruning,
# so old data can be aged out instead of leaking forever.
_SCHEDULED_MARKER = "[SCHEDULED]"
_SCHEDULED_LEGACY_MARKERS = ("Scheduled task",)
def remember_scheduled_output(
self,
session_id: str,
content: str,
channel_type: str = "",
task_description: str = "",
) -> None:
"""Add the visible output of a scheduled task to the receiver's session.
Scheduled task execution uses an isolated session so internal planning and
tool calls do not leak into the user's chat. The final message is still
part of the conversation from the user's point of view, so keep a small
visible turn in the receiver session for follow-up questions.
Configuration:
scheduler_inject_to_session (bool, default True):
Master switch. When False, this method is a no-op.
scheduler_inject_max_per_session (int, default 3):
Maximum scheduler-injected user/assistant pairs retained per
session. Older injections are pruned automatically.
Content is truncated to 2000 chars to prevent a single high-volume task
from bloating one entry.
"""
from config import conf
if not conf().get("scheduler_inject_to_session", True):
return
if not session_id or not content:
return
max_len = 2000
if len(content) > max_len:
content = content[:max_len] + "..."
user_text = self._SCHEDULED_MARKER
if task_description:
user_text = f"{self._SCHEDULED_MARKER} {task_description}"
messages = [
{"role": "user", "content": [{"type": "text", "text": user_text}]},
{"role": "assistant", "content": [{"type": "text", "text": content}]},
]
# Persist first so the new pair gets a stable seq, then prune old
# scheduler pairs in DB, then sync the in-memory agent.messages buffer.
self._persist_messages(session_id, messages, channel_type)
keep_last_n = max(int(conf().get("scheduler_inject_max_per_session", 3) or 0), 0)
try:
from agent.memory import get_conversation_store
deleted = get_conversation_store().prune_scheduled_messages(
session_id, keep_last_n=keep_last_n
)
if deleted:
logger.debug(
f"[AgentBridge] Pruned {deleted} old scheduler messages "
f"for session={session_id} (keep_last_n={keep_last_n})"
)
except Exception as e:
logger.warning(
f"[AgentBridge] Failed to prune scheduled messages "
f"for session={session_id}: {e}"
)
agent = self.agents.get(session_id)
if agent:
try:
with agent.messages_lock:
agent.messages.extend(messages)
self._prune_scheduled_in_memory(agent, keep_last_n)
except Exception as e:
logger.warning(
f"[AgentBridge] Failed to update in-memory scheduled output "
f"for session={session_id}: {e}"
)
@staticmethod
def _trim_in_memory_to_turns(agent, keep_turns: int) -> None:
"""Bound ``agent.messages`` to the most recent ``keep_turns`` real
user/assistant turns, dropping older history together with any
intermediate tool_use/tool_result blocks that belonged to it.
A "real" user message is any user message whose content is not solely a
tool_result block — matches the heuristic used elsewhere when filtering
history (see ``AgentInitializer._filter_text_only_messages``).
No-op when the session is already within budget. Caller does not need
to hold the lock; this method acquires it itself.
"""
if keep_turns <= 0:
return
def _is_real_user(msg) -> bool:
if not isinstance(msg, dict) or msg.get("role") != "user":
return False
content = msg.get("content")
if isinstance(content, list):
if any(
isinstance(b, dict) and b.get("type") == "tool_result"
for b in content
):
return False
return any(
isinstance(b, dict) and b.get("type") == "text" and b.get("text")
for b in content
)
if isinstance(content, str):
return bool(content.strip())
return False
with agent.messages_lock:
msgs = agent.messages
real_user_indices = [i for i, m in enumerate(msgs) if _is_real_user(m)]
if len(real_user_indices) <= keep_turns:
return
# Cut at the (k-th from the end) real user message; keep everything
# from there onwards so the surviving slice is still a valid
# user/assistant sequence.
cut_idx = real_user_indices[-keep_turns]
if cut_idx == 0:
return
kept = msgs[cut_idx:]
msgs.clear()
msgs.extend(kept)
logger.debug(
f"[AgentBridge] Trimmed in-memory messages to last "
f"{keep_turns} turns ({len(kept)} messages remain)"
)
@classmethod
def _prune_scheduled_in_memory(cls, agent, keep_last_n: int) -> None:
"""Mirror conversation_store.prune_scheduled_messages on agent.messages.
Caller must hold ``agent.messages_lock``.
"""
if keep_last_n < 0:
keep_last_n = 0
markers = (cls._SCHEDULED_MARKER,) + cls._SCHEDULED_LEGACY_MARKERS
def _is_marker_user(msg) -> bool:
if not isinstance(msg, dict) or msg.get("role") != "user":
return False
content = msg.get("content")
text = ""
if isinstance(content, str):
text = content
elif isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
break
return any(text.startswith(m) for m in markers)
msgs = agent.messages
pair_indices = [] # list of (user_idx, assistant_idx_or_None)
for idx, msg in enumerate(msgs):
if not _is_marker_user(msg):
continue
assistant_idx = None
if idx + 1 < len(msgs):
nxt = msgs[idx + 1]
if isinstance(nxt, dict) and nxt.get("role") == "assistant":
assistant_idx = idx + 1
pair_indices.append((idx, assistant_idx))
if len(pair_indices) <= keep_last_n:
return
to_drop = pair_indices[: len(pair_indices) - keep_last_n]
drop_set = set()
for u_idx, a_idx in to_drop:
drop_set.add(u_idx)
if a_idx is not None:
drop_set.add(a_idx)
# Rebuild the list in place to keep external references stable.
kept = [m for i, m in enumerate(msgs) if i not in drop_set]
msgs.clear()
msgs.extend(kept)
@staticmethod
def _strip_thinking_blocks(messages: list) -> list:
"""Return a shallow copy of messages with assistant "thinking" blocks removed."""
cleaned = []
for msg in messages:
if not isinstance(msg, dict):
cleaned.append(msg)
continue
if msg.get("role") != "assistant":
cleaned.append(msg)
continue
content = msg.get("content")
if not isinstance(content, list):
cleaned.append(msg)
continue
filtered_blocks = [
b for b in content
if not (isinstance(b, dict) and b.get("type") == "thinking")
]
if len(filtered_blocks) == len(content):
cleaned.append(msg)
else:
new_msg = dict(msg)
new_msg["content"] = filtered_blocks
cleaned.append(new_msg)
return cleaned
def clear_session(self, session_id: str):
"""
Clear a specific session's agent and conversation history
Args:
session_id: Session identifier to clear
"""
if session_id in self.agents:
logger.info(f"[AgentBridge] Clearing session: {session_id}")
del self.agents[session_id]
def clear_all_sessions(self):
"""Clear all agent sessions"""
logger.info(f"[AgentBridge] Clearing all sessions ({len(self.agents)} total)")
self.agents.clear()
self.default_agent = None
def refresh_all_skills(self) -> int:
"""
Refresh skills and conditional tools in all agent instances after
environment variable changes. This allows hot-reload without restarting.
Returns:
Number of agent instances refreshed
"""
import os
from dotenv import load_dotenv
from config import conf
# Reload environment variables from .env file
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
env_file = os.path.join(workspace_root, '.env')
if os.path.exists(env_file):
load_dotenv(env_file, override=True)
logger.info(f"[AgentBridge] Reloaded environment variables from {env_file}")
refreshed_count = 0
# Collect all agent instances to refresh
agents_to_refresh = []
if self.default_agent:
agents_to_refresh.append(("default", self.default_agent))
for session_id, agent in self.agents.items():
agents_to_refresh.append((session_id, agent))
for label, agent in agents_to_refresh:
# Refresh skills
if hasattr(agent, 'skill_manager') and agent.skill_manager:
agent.skill_manager.refresh_skills()
# Refresh conditional tools (e.g. web_search depends on API keys)
self._refresh_conditional_tools(agent)
refreshed_count += 1
if refreshed_count > 0:
logger.info(f"[AgentBridge] Refreshed skills & tools in {refreshed_count} agent instance(s)")
return refreshed_count
@staticmethod
def _refresh_conditional_tools(agent):
"""
Add or remove conditional tools based on current environment variables.
For example, web_search should only be present when BOCHA_API_KEY or
LINKAI_API_KEY is set.
"""
try:
from agent.tools.web_search.web_search import WebSearch
has_tool = any(t.name == "web_search" for t in agent.tools)
available = WebSearch.is_available()
if available and not has_tool:
# API key was added - inject the tool
tool = WebSearch()
tool.model = agent.model
agent.tools.append(tool)
logger.info("[AgentBridge] web_search tool added (API key now available)")
elif not available and has_tool:
# API key was removed - remove the tool
agent.tools = [t for t in agent.tools if t.name != "web_search"]
logger.info("[AgentBridge] web_search tool removed (API key no longer available)")
except Exception as e:
logger.debug(f"[AgentBridge] Failed to refresh conditional tools: {e}")