fix: bug fixes

This commit is contained in:
zhayujie
2026-02-02 22:22:10 +08:00
parent 5d02acbf37
commit 50e60e6d05
15 changed files with 865 additions and 575 deletions

View File

@@ -10,6 +10,8 @@ from models.openai_compatible_bot import OpenAICompatibleBot
from bridge.bridge import Bridge
from bridge.context import Context
from bridge.reply import Reply, ReplyType
from bridge.agent_event_handler import AgentEventHandler
from bridge.agent_initializer import AgentInitializer
from common import const
from common.log import logger
@@ -127,9 +129,6 @@ class AgentLLMModel(LLMModel):
try:
if hasattr(self.bot, 'call_with_tools'):
# Use tool-enabled streaming call if available
# Ensure max_tokens is an integer, use default if None
max_tokens = request.max_tokens if request.max_tokens is not None else 4096
# Extract system prompt if present
system_prompt = getattr(request, 'system', None)
@@ -138,10 +137,13 @@ class AgentLLMModel(LLMModel):
'messages': request.messages,
'tools': getattr(request, 'tools', None),
'stream': True,
'max_tokens': max_tokens,
'model': self.model # Pass model parameter
}
# Only pass max_tokens if explicitly set, let the bot use its default
if request.max_tokens is not None:
kwargs['max_tokens'] = request.max_tokens
# Add system prompt if present
if system_prompt:
kwargs['system'] = system_prompt
@@ -182,6 +184,9 @@ class AgentBridge:
self.default_agent = None # For backward compatibility (no session_id)
self.agent: Optional[Agent] = None
self.scheduler_initialized = False
# Create helper instances
self.initializer = AgentInitializer(bridge, self)
def create_agent(self, system_prompt: str, tools: List = None, **kwargs) -> Agent:
"""
Create the super agent with COW integration
@@ -252,492 +257,19 @@ class AgentBridge:
# Check if agent exists for this session
if session_id not in self.agents:
logger.info(f"[AgentBridge] Creating new agent for session: {session_id}")
self._init_agent_for_session(session_id)
return self.agents[session_id]
def _init_default_agent(self):
"""Initialize default super agent with new prompt system"""
from config import conf
import os
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
# Migrate API keys from config.json to environment variables (if not already set)
self._migrate_config_to_env(workspace_root)
# Load environment variables from secure .env file location
env_file = os.path.expanduser("~/.cow/.env")
if os.path.exists(env_file):
try:
from dotenv import load_dotenv
load_dotenv(env_file, override=True)
logger.info(f"[AgentBridge] Loaded environment variables from {env_file}")
except ImportError:
logger.warning("[AgentBridge] python-dotenv not installed, skipping .env file loading")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load .env file: {e}")
# Initialize workspace and create template files
from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
workspace_files = ensure_workspace(workspace_root, create_templates=True)
logger.info(f"[AgentBridge] Workspace initialized at: {workspace_root}")
# Setup memory system
memory_manager = None
memory_tools = []
try:
# Try to initialize memory system
from agent.memory import MemoryManager, MemoryConfig
from agent.tools import MemorySearchTool, MemoryGetTool
# 从 config.json 读取 OpenAI 配置
openai_api_key = conf().get("open_ai_api_key", "")
openai_api_base = conf().get("open_ai_api_base", "")
# 尝试初始化 OpenAI embedding provider
embedding_provider = None
if openai_api_key:
try:
from agent.memory import create_embedding_provider
embedding_provider = create_embedding_provider(
provider="openai",
model="text-embedding-3-small",
api_key=openai_api_key,
api_base=openai_api_base or "https://api.openai.com/v1"
)
logger.info(f"[AgentBridge] OpenAI embedding initialized")
except Exception as embed_error:
logger.warning(f"[AgentBridge] OpenAI embedding failed: {embed_error}")
logger.info(f"[AgentBridge] Using keyword-only search")
else:
logger.info(f"[AgentBridge] No OpenAI API key, using keyword-only search")
# 创建 memory config
memory_config = MemoryConfig(workspace_root=workspace_root)
# 创建 memory manager
memory_manager = MemoryManager(memory_config, embedding_provider=embedding_provider)
# 初始化时执行一次 sync确保数据库有数据
import asyncio
try:
# 尝试在当前事件循环中执行
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果事件循环正在运行,创建任务
asyncio.create_task(memory_manager.sync())
logger.info("[AgentBridge] Memory sync scheduled")
else:
# 如果没有运行的循环,直接执行
loop.run_until_complete(memory_manager.sync())
logger.info("[AgentBridge] Memory synced successfully")
except RuntimeError:
# 没有事件循环,创建新的
asyncio.run(memory_manager.sync())
logger.info("[AgentBridge] Memory synced successfully")
except Exception as e:
logger.warning(f"[AgentBridge] Memory sync failed: {e}")
# Create memory tools
memory_tools = [
MemorySearchTool(memory_manager),
MemoryGetTool(memory_manager)
]
logger.info(f"[AgentBridge] Memory system initialized")
except Exception as e:
logger.warning(f"[AgentBridge] Memory system not available: {e}")
logger.info("[AgentBridge] Continuing without memory features")
# Use ToolManager to dynamically load all available tools
from agent.tools import ToolManager
tool_manager = ToolManager()
tool_manager.load_tools()
# Create tool instances for all available tools
tools = []
file_config = {
"cwd": workspace_root,
"memory_manager": memory_manager
} if memory_manager else {"cwd": workspace_root}
for tool_name in tool_manager.tool_classes.keys():
try:
# Special handling for EnvConfig tool - pass agent_bridge reference
if tool_name == "env_config":
from agent.tools import EnvConfig
tool = EnvConfig({
"agent_bridge": self # Pass self reference for hot reload
})
else:
tool = tool_manager.create_tool(tool_name)
if tool:
# Apply workspace config to file operation tools
if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
tool.config = file_config
tool.cwd = file_config.get("cwd", tool.cwd if hasattr(tool, 'cwd') else None)
if 'memory_manager' in file_config:
tool.memory_manager = file_config['memory_manager']
tools.append(tool)
logger.debug(f"[AgentBridge] Loaded tool: {tool_name}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load tool {tool_name}: {e}")
# Add memory tools
if memory_tools:
tools.extend(memory_tools)
logger.info(f"[AgentBridge] Added {len(memory_tools)} memory tools")
# Initialize scheduler service (once)
if not self.scheduler_initialized:
try:
from agent.tools.scheduler.integration import init_scheduler
if init_scheduler(self):
self.scheduler_initialized = True
logger.info("[AgentBridge] Scheduler service initialized")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to initialize scheduler: {e}")
# Inject scheduler dependencies into SchedulerTool instances
if self.scheduler_initialized:
try:
from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
from agent.tools import SchedulerTool
task_store = get_task_store()
scheduler_service = get_scheduler_service()
for tool in tools:
if isinstance(tool, SchedulerTool):
tool.task_store = task_store
tool.scheduler_service = scheduler_service
if not tool.config:
tool.config = {}
tool.config["channel_type"] = conf().get("channel_type", "unknown")
logger.debug("[AgentBridge] Injected scheduler dependencies into SchedulerTool")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to inject scheduler dependencies: {e}")
logger.info(f"[AgentBridge] Loaded {len(tools)} tools: {[t.name for t in tools]}")
# Load context files (SOUL.md, USER.md, etc.)
context_files = load_context_files(workspace_root)
logger.info(f"[AgentBridge] Loaded {len(context_files)} context files: {[f.path for f in context_files]}")
# Check if this is the first conversation
from agent.prompt.workspace import is_first_conversation, mark_conversation_started
is_first = is_first_conversation(workspace_root)
if is_first:
logger.info("[AgentBridge] First conversation detected")
# Build system prompt using new prompt builder
prompt_builder = PromptBuilder(
workspace_dir=workspace_root,
language="zh"
)
# Get runtime info
runtime_info = {
"model": conf().get("model", "unknown"),
"workspace": workspace_root,
"channel": conf().get("channel_type", "unknown") # Get from config
}
system_prompt = prompt_builder.build(
tools=tools,
context_files=context_files,
memory_manager=memory_manager,
runtime_info=runtime_info,
is_first_conversation=is_first
)
# Mark conversation as started (will be saved after first user message)
if is_first:
mark_conversation_started(workspace_root)
logger.info("[AgentBridge] System prompt built successfully")
# Get cost control parameters from config
max_steps = conf().get("agent_max_steps", 20)
max_context_tokens = conf().get("agent_max_context_tokens", 50000)
# Create agent with configured tools and workspace
agent = self.create_agent(
system_prompt=system_prompt,
tools=tools,
max_steps=max_steps,
output_mode="logger",
workspace_dir=workspace_root, # Pass workspace to agent for skills loading
enable_skills=True, # Enable skills auto-loading
max_context_tokens=max_context_tokens
)
# Attach memory manager to agent if available
if memory_manager:
agent.memory_manager = memory_manager
logger.info(f"[AgentBridge] Memory manager attached to agent")
# Store as default agent
"""Initialize default super agent"""
agent = self.initializer.initialize_agent(session_id=None)
self.default_agent = agent
def _init_agent_for_session(self, session_id: str):
"""
Initialize agent for a specific session
Reuses the same configuration as default agent
"""
from config import conf
import os
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
# Migrate API keys from config.json to environment variables (if not already set)
self._migrate_config_to_env(workspace_root)
# Load environment variables from secure .env file location
env_file = os.path.expanduser("~/.cow/.env")
if os.path.exists(env_file):
try:
from dotenv import load_dotenv
load_dotenv(env_file, override=True)
logger.debug(f"[AgentBridge] Loaded environment variables from {env_file} for session {session_id}")
except ImportError:
logger.warning(f"[AgentBridge] python-dotenv not installed, skipping .env file loading for session {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load .env file for session {session_id}: {e}")
# Migrate API keys from config.json to environment variables (if not already set)
self._migrate_config_to_env(workspace_root)
# Initialize workspace
from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
workspace_files = ensure_workspace(workspace_root, create_templates=True)
# Setup memory system
memory_manager = None
memory_tools = []
try:
from agent.memory import MemoryManager, MemoryConfig, create_embedding_provider
from agent.tools import MemorySearchTool, MemoryGetTool
# 从 config.json 读取 OpenAI 配置
openai_api_key = conf().get("open_ai_api_key", "")
openai_api_base = conf().get("open_ai_api_base", "")
# 尝试初始化 OpenAI embedding provider
embedding_provider = None
if openai_api_key:
try:
embedding_provider = create_embedding_provider(
provider="openai",
model="text-embedding-3-small",
api_key=openai_api_key,
api_base=openai_api_base or "https://api.openai.com/v1"
)
logger.debug(f"[AgentBridge] OpenAI embedding initialized for session {session_id}")
except Exception as embed_error:
logger.warning(f"[AgentBridge] OpenAI embedding failed for session {session_id}: {embed_error}")
logger.info(f"[AgentBridge] Using keyword-only search for session {session_id}")
else:
logger.debug(f"[AgentBridge] No OpenAI API key, using keyword-only search for session {session_id}")
# 创建 memory config
memory_config = MemoryConfig(workspace_root=workspace_root)
# 创建 memory manager
memory_manager = MemoryManager(memory_config, embedding_provider=embedding_provider)
# 初始化时执行一次 sync确保数据库有数据
import asyncio
try:
# 尝试在当前事件循环中执行
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果事件循环正在运行,创建任务
asyncio.create_task(memory_manager.sync())
logger.debug(f"[AgentBridge] Memory sync scheduled for session {session_id}")
else:
# 如果没有运行的循环,直接执行
loop.run_until_complete(memory_manager.sync())
logger.debug(f"[AgentBridge] Memory synced successfully for session {session_id}")
except RuntimeError:
# 没有事件循环,创建新的
asyncio.run(memory_manager.sync())
logger.debug(f"[AgentBridge] Memory synced successfully for session {session_id}")
except Exception as sync_error:
logger.warning(f"[AgentBridge] Memory sync failed for session {session_id}: {sync_error}")
memory_tools = [
MemorySearchTool(memory_manager),
MemoryGetTool(memory_manager)
]
except Exception as e:
logger.warning(f"[AgentBridge] Memory system not available for session {session_id}: {e}")
import traceback
logger.warning(f"[AgentBridge] Memory init traceback: {traceback.format_exc()}")
# Load tools
from agent.tools import ToolManager
tool_manager = ToolManager()
tool_manager.load_tools()
tools = []
file_config = {
"cwd": workspace_root,
"memory_manager": memory_manager
} if memory_manager else {"cwd": workspace_root}
for tool_name in tool_manager.tool_classes.keys():
try:
tool = tool_manager.create_tool(tool_name)
if tool:
if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
tool.config = file_config
tool.cwd = file_config.get("cwd", tool.cwd if hasattr(tool, 'cwd') else None)
if 'memory_manager' in file_config:
tool.memory_manager = file_config['memory_manager']
tools.append(tool)
except Exception as e:
logger.warning(f"[AgentBridge] Failed to load tool {tool_name} for session {session_id}: {e}")
if memory_tools:
tools.extend(memory_tools)
# Initialize scheduler service (once, if not already initialized)
if not self.scheduler_initialized:
try:
from agent.tools.scheduler.integration import init_scheduler
if init_scheduler(self):
self.scheduler_initialized = True
logger.debug(f"[AgentBridge] Scheduler service initialized for session {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to initialize scheduler for session {session_id}: {e}")
# Inject scheduler dependencies into SchedulerTool instances
if self.scheduler_initialized:
try:
from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
from agent.tools import SchedulerTool
task_store = get_task_store()
scheduler_service = get_scheduler_service()
for tool in tools:
if isinstance(tool, SchedulerTool):
tool.task_store = task_store
tool.scheduler_service = scheduler_service
if not tool.config:
tool.config = {}
tool.config["channel_type"] = conf().get("channel_type", "unknown")
logger.debug(f"[AgentBridge] Injected scheduler dependencies for session {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to inject scheduler dependencies for session {session_id}: {e}")
# Load context files
context_files = load_context_files(workspace_root)
# Initialize skill manager
skill_manager = None
try:
from agent.skills import SkillManager
skill_manager = SkillManager(workspace_dir=workspace_root)
logger.debug(f"[AgentBridge] Initialized SkillManager with {len(skill_manager.skills)} skills for session {session_id}")
except Exception as e:
logger.warning(f"[AgentBridge] Failed to initialize SkillManager for session {session_id}: {e}")
# Check if this is the first conversation
from agent.prompt.workspace import is_first_conversation, mark_conversation_started
is_first = is_first_conversation(workspace_root)
# Build system prompt
prompt_builder = PromptBuilder(
workspace_dir=workspace_root,
language="zh"
)
# Get current time and timezone info
import datetime
import time
now = datetime.datetime.now()
# Get timezone info
try:
offset = -time.timezone if not time.daylight else -time.altzone
hours = offset // 3600
minutes = (offset % 3600) // 60
if minutes:
timezone_name = f"UTC{hours:+03d}:{minutes:02d}"
else:
timezone_name = f"UTC{hours:+03d}"
except Exception:
timezone_name = "UTC"
# Chinese weekday mapping
weekday_map = {
'Monday': '星期一',
'Tuesday': '星期二',
'Wednesday': '星期三',
'Thursday': '星期四',
'Friday': '星期五',
'Saturday': '星期六',
'Sunday': '星期日'
}
weekday_zh = weekday_map.get(now.strftime("%A"), now.strftime("%A"))
runtime_info = {
"model": conf().get("model", "unknown"),
"workspace": workspace_root,
"channel": conf().get("channel_type", "unknown"),
"current_time": now.strftime("%Y-%m-%d %H:%M:%S"),
"weekday": weekday_zh,
"timezone": timezone_name
}
system_prompt = prompt_builder.build(
tools=tools,
context_files=context_files,
skill_manager=skill_manager,
memory_manager=memory_manager,
runtime_info=runtime_info,
is_first_conversation=is_first
)
if is_first:
mark_conversation_started(workspace_root)
# Get cost control parameters from config
max_steps = conf().get("agent_max_steps", 20)
max_context_tokens = conf().get("agent_max_context_tokens", 50000)
# Create agent for this session
agent = self.create_agent(
system_prompt=system_prompt,
tools=tools,
max_steps=max_steps,
output_mode="logger",
workspace_dir=workspace_root,
skill_manager=skill_manager,
enable_skills=True,
max_context_tokens=max_context_tokens
)
if memory_manager:
agent.memory_manager = memory_manager
# Store agent for this session
"""Initialize agent for a specific session"""
agent = self.initializer.initialize_agent(session_id=session_id)
self.agents[session_id] = agent
logger.info(f"[AgentBridge] Agent created for session: {session_id}")
def agent_reply(self, query: str, context: Context = None,
on_event=None, clear_history: bool = False) -> Reply:
@@ -764,6 +296,9 @@ class AgentBridge:
if not agent:
return Reply(ReplyType.ERROR, "Failed to initialize super agent")
# Create event handler for logging and channel communication
event_handler = AgentEventHandler(context=context, original_callback=on_event)
# Filter tools based on context
original_tools = agent.tools
filtered_tools = original_tools
@@ -786,16 +321,19 @@ class AgentBridge:
break
try:
# Use agent's run_stream method
# Use agent's run_stream method with event handler
response = agent.run_stream(
user_message=query,
on_event=on_event,
on_event=event_handler.handle_event,
clear_history=clear_history
)
finally:
# Restore original tools
if context and context.get("is_scheduled_task"):
agent.tools = original_tools
# Log execution summary
event_handler.log_summary()
# Check if there are files to send (from read tool)
if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'):
@@ -843,17 +381,18 @@ class AgentBridge:
reply.text_content = text_response # Store accompanying text
return reply
# For documents (PDF, Excel, Word, PPT), use FILE type
if file_type == "document":
# For all file types (document, video, audio), use FILE type
if file_type in ["document", "video", "audio"]:
file_url = f"file://{file_path}"
logger.info(f"[AgentBridge] Sending document: {file_url}")
logger.info(f"[AgentBridge] Sending {file_type}: {file_url}")
reply = Reply(ReplyType.FILE, file_url)
reply.file_name = file_info.get("file_name", os.path.basename(file_path))
# Attach text message if present
if text_response:
reply.text_content = text_response
return reply
# For other files (video, audio), we need channel-specific handling
# For now, return text with file info
# TODO: Implement video/audio sending when channel supports it
# For other unknown file types, return text with file info
message = text_response or file_info.get("message", "文件已准备")
message += f"\n\n[文件: {file_info.get('file_name', file_path)}]"
return Reply(ReplyType.TEXT, message)

View File

@@ -0,0 +1,115 @@
"""
Agent Event Handler - Handles agent events and thinking process output
"""
from common.log import logger
class AgentEventHandler:
"""
Handles agent events and optionally sends intermediate messages to channel
"""
def __init__(self, context=None, original_callback=None):
"""
Initialize event handler
Args:
context: COW context (for accessing channel)
original_callback: Original event callback to chain
"""
self.context = context
self.original_callback = original_callback
# Get channel for sending intermediate messages
self.channel = None
if context:
self.channel = context.kwargs.get("channel") if hasattr(context, "kwargs") else None
# Track current thinking for channel output
self.current_thinking = ""
self.turn_number = 0
def handle_event(self, event):
"""
Main event handler
Args:
event: Event dict with type and data
"""
event_type = event.get("type")
data = event.get("data", {})
# Dispatch to specific handlers
if event_type == "turn_start":
self._handle_turn_start(data)
elif event_type == "message_update":
self._handle_message_update(data)
elif event_type == "message_end":
self._handle_message_end(data)
elif event_type == "tool_execution_start":
self._handle_tool_execution_start(data)
elif event_type == "tool_execution_end":
self._handle_tool_execution_end(data)
# Call original callback if provided
if self.original_callback:
self.original_callback(event)
def _handle_turn_start(self, data):
"""Handle turn start event"""
self.turn_number = data.get("turn", 0)
self.has_tool_calls_in_turn = False
self.current_thinking = ""
def _handle_message_update(self, data):
"""Handle message update event (streaming text)"""
delta = data.get("delta", "")
self.current_thinking += delta
def _handle_message_end(self, data):
"""Handle message end event"""
tool_calls = data.get("tool_calls", [])
# Only send thinking process if followed by tool calls
if tool_calls:
if self.current_thinking.strip():
logger.debug(f"💭 {self.current_thinking.strip()[:200]}{'...' if len(self.current_thinking) > 200 else ''}")
# Send thinking process to channel
self._send_to_channel(f"{self.current_thinking.strip()}")
else:
# No tool calls = final response (logged at agent_stream level)
if self.current_thinking.strip():
logger.debug(f"💬 {self.current_thinking.strip()[:200]}{'...' if len(self.current_thinking) > 200 else ''}")
self.current_thinking = ""
def _handle_tool_execution_start(self, data):
"""Handle tool execution start event - logged by agent_stream.py"""
pass
def _handle_tool_execution_end(self, data):
"""Handle tool execution end event - logged by agent_stream.py"""
pass
def _send_to_channel(self, message):
"""
Try to send message to channel
Args:
message: Message to send
"""
if self.channel:
try:
from bridge.reply import Reply, ReplyType
# Create a Reply object for the message
reply = Reply(ReplyType.TEXT, message)
self.channel._send(reply, self.context)
except Exception as e:
logger.debug(f"[AgentEventHandler] Failed to send to channel: {e}")
def log_summary(self):
"""Log execution summary - simplified"""
# Summary removed as per user request
# Real-time logging during execution is sufficient
pass

375
bridge/agent_initializer.py Normal file
View File

@@ -0,0 +1,375 @@
"""
Agent Initializer - Handles agent initialization logic
"""
import os
import asyncio
import datetime
import time
from typing import Optional, List
from agent.protocol import Agent
from agent.tools import ToolManager
from common.log import logger
class AgentInitializer:
"""
Handles agent initialization including:
- Workspace setup
- Memory system initialization
- Tool loading
- System prompt building
"""
def __init__(self, bridge, agent_bridge):
"""
Initialize agent initializer
Args:
bridge: COW bridge instance
agent_bridge: AgentBridge instance (for create_agent method)
"""
self.bridge = bridge
self.agent_bridge = agent_bridge
def initialize_agent(self, session_id: Optional[str] = None) -> Agent:
"""
Initialize agent for a session
Args:
session_id: Session ID (None for default agent)
Returns:
Initialized agent instance
"""
from config import conf
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
# Migrate API keys
self._migrate_config_to_env(workspace_root)
# Load environment variables
self._load_env_file()
# Initialize workspace
from agent.prompt import ensure_workspace, load_context_files, PromptBuilder
workspace_files = ensure_workspace(workspace_root, create_templates=True)
if session_id is None:
logger.info(f"[AgentInitializer] Workspace initialized at: {workspace_root}")
# Setup memory system
memory_manager, memory_tools = self._setup_memory_system(workspace_root, session_id)
# Load tools
tools = self._load_tools(workspace_root, memory_manager, memory_tools, session_id)
# Initialize scheduler if needed
self._initialize_scheduler(tools, session_id)
# Load context files
context_files = load_context_files(workspace_root)
# Initialize skill manager
skill_manager = self._initialize_skill_manager(workspace_root, session_id)
# Check if first conversation
from agent.prompt.workspace import is_first_conversation, mark_conversation_started
is_first = is_first_conversation(workspace_root)
# Build system prompt
prompt_builder = PromptBuilder(workspace_dir=workspace_root, language="zh")
runtime_info = self._get_runtime_info(workspace_root)
system_prompt = prompt_builder.build(
tools=tools,
context_files=context_files,
skill_manager=skill_manager,
memory_manager=memory_manager,
runtime_info=runtime_info,
is_first_conversation=is_first
)
if is_first:
mark_conversation_started(workspace_root)
# Get cost control parameters
from config import conf
max_steps = conf().get("agent_max_steps", 20)
max_context_tokens = conf().get("agent_max_context_tokens", 50000)
# Create agent
agent = self.agent_bridge.create_agent(
system_prompt=system_prompt,
tools=tools,
max_steps=max_steps,
output_mode="logger",
workspace_dir=workspace_root,
skill_manager=skill_manager,
enable_skills=True,
max_context_tokens=max_context_tokens
)
# Attach memory manager
if memory_manager:
agent.memory_manager = memory_manager
return agent
def _load_env_file(self):
"""Load environment variables from .env file"""
env_file = os.path.expanduser("~/.cow/.env")
if os.path.exists(env_file):
try:
from dotenv import load_dotenv
load_dotenv(env_file, override=True)
except ImportError:
logger.warning("[AgentInitializer] python-dotenv not installed")
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to load .env file: {e}")
def _setup_memory_system(self, workspace_root: str, session_id: Optional[str] = None):
"""
Setup memory system
Returns:
(memory_manager, memory_tools) tuple
"""
memory_manager = None
memory_tools = []
try:
from agent.memory import MemoryManager, MemoryConfig, create_embedding_provider
from agent.tools import MemorySearchTool, MemoryGetTool
from config import conf
# Get OpenAI config
openai_api_key = conf().get("open_ai_api_key", "")
openai_api_base = conf().get("open_ai_api_base", "")
# Initialize embedding provider
embedding_provider = None
if openai_api_key and openai_api_key not in ["", "YOUR API KEY", "YOUR_API_KEY"]:
try:
embedding_provider = create_embedding_provider(
provider="openai",
model="text-embedding-3-small",
api_key=openai_api_key,
api_base=openai_api_base or "https://api.openai.com/v1"
)
if session_id is None:
logger.info("[AgentInitializer] OpenAI embedding initialized")
except Exception as e:
logger.warning(f"[AgentInitializer] OpenAI embedding failed: {e}")
# Create memory manager
memory_config = MemoryConfig(workspace_root=workspace_root)
memory_manager = MemoryManager(memory_config, embedding_provider=embedding_provider)
# Sync memory
self._sync_memory(memory_manager, session_id)
# Create memory tools
memory_tools = [
MemorySearchTool(memory_manager),
MemoryGetTool(memory_manager)
]
if session_id is None:
logger.info("[AgentInitializer] Memory system initialized")
except Exception as e:
logger.warning(f"[AgentInitializer] Memory system not available: {e}")
return memory_manager, memory_tools
def _sync_memory(self, memory_manager, session_id: Optional[str] = None):
"""Sync memory database"""
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
raise RuntimeError("Event loop is closed")
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
if loop.is_running():
asyncio.create_task(memory_manager.sync())
else:
loop.run_until_complete(memory_manager.sync())
except Exception as e:
logger.warning(f"[AgentInitializer] Memory sync failed: {e}")
def _load_tools(self, workspace_root: str, memory_manager, memory_tools: List, session_id: Optional[str] = None):
"""Load all tools"""
tool_manager = ToolManager()
tool_manager.load_tools()
tools = []
file_config = {
"cwd": workspace_root,
"memory_manager": memory_manager
} if memory_manager else {"cwd": workspace_root}
for tool_name in tool_manager.tool_classes.keys():
try:
# Special handling for EnvConfig tool
if tool_name == "env_config":
from agent.tools import EnvConfig
tool = EnvConfig({"agent_bridge": self.agent_bridge})
else:
tool = tool_manager.create_tool(tool_name)
if tool:
# Apply workspace config to file operation tools
if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
tool.config = file_config
tool.cwd = file_config.get("cwd", getattr(tool, 'cwd', None))
if 'memory_manager' in file_config:
tool.memory_manager = file_config['memory_manager']
tools.append(tool)
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to load tool {tool_name}: {e}")
# Add memory tools
if memory_tools:
tools.extend(memory_tools)
if session_id is None:
logger.info(f"[AgentInitializer] Added {len(memory_tools)} memory tools")
if session_id is None:
logger.info(f"[AgentInitializer] Loaded {len(tools)} tools: {[t.name for t in tools]}")
return tools
def _initialize_scheduler(self, tools: List, session_id: Optional[str] = None):
"""Initialize scheduler service if needed"""
if not self.agent_bridge.scheduler_initialized:
try:
from agent.tools.scheduler.integration import init_scheduler
if init_scheduler(self.agent_bridge):
self.agent_bridge.scheduler_initialized = True
if session_id is None:
logger.info("[AgentInitializer] Scheduler service initialized")
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to initialize scheduler: {e}")
# Inject scheduler dependencies
if self.agent_bridge.scheduler_initialized:
try:
from agent.tools.scheduler.integration import get_task_store, get_scheduler_service
from agent.tools import SchedulerTool
from config import conf
task_store = get_task_store()
scheduler_service = get_scheduler_service()
for tool in tools:
if isinstance(tool, SchedulerTool):
tool.task_store = task_store
tool.scheduler_service = scheduler_service
if not tool.config:
tool.config = {}
tool.config["channel_type"] = conf().get("channel_type", "unknown")
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to inject scheduler dependencies: {e}")
def _initialize_skill_manager(self, workspace_root: str, session_id: Optional[str] = None):
"""Initialize skill manager"""
try:
from agent.skills import SkillManager
skill_manager = SkillManager(workspace_dir=workspace_root)
return skill_manager
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to initialize SkillManager: {e}")
return None
def _get_runtime_info(self, workspace_root: str):
"""Get runtime information"""
from config import conf
now = datetime.datetime.now()
# Get timezone info
try:
offset = -time.timezone if not time.daylight else -time.altzone
hours = offset // 3600
minutes = (offset % 3600) // 60
timezone_name = f"UTC{hours:+03d}:{minutes:02d}" if minutes else f"UTC{hours:+03d}"
except Exception:
timezone_name = "UTC"
# Chinese weekday mapping
weekday_map = {
'Monday': '星期一', 'Tuesday': '星期二', 'Wednesday': '星期三',
'Thursday': '星期四', 'Friday': '星期五', 'Saturday': '星期六', 'Sunday': '星期日'
}
weekday_zh = weekday_map.get(now.strftime("%A"), now.strftime("%A"))
return {
"model": conf().get("model", "unknown"),
"workspace": workspace_root,
"channel": conf().get("channel_type", "unknown"),
"current_time": now.strftime("%Y-%m-%d %H:%M:%S"),
"weekday": weekday_zh,
"timezone": timezone_name
}
def _migrate_config_to_env(self, workspace_root: str):
"""Migrate API keys from config.json to .env file"""
from config import conf
key_mapping = {
"open_ai_api_key": "OPENAI_API_KEY",
"open_ai_api_base": "OPENAI_API_BASE",
"gemini_api_key": "GEMINI_API_KEY",
"claude_api_key": "CLAUDE_API_KEY",
"linkai_api_key": "LINKAI_API_KEY",
}
env_file = os.path.expanduser("~/.cow/.env")
# Read existing env vars
existing_env_vars = {}
if os.path.exists(env_file):
try:
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, _ = line.split('=', 1)
existing_env_vars[key.strip()] = True
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to read .env file: {e}")
# Check which keys need migration
keys_to_migrate = {}
for config_key, env_key in key_mapping.items():
if env_key in existing_env_vars:
continue
value = conf().get(config_key, "")
if value and value.strip():
keys_to_migrate[env_key] = value.strip()
# Write new keys
if keys_to_migrate:
try:
env_dir = os.path.dirname(env_file)
if not os.path.exists(env_dir):
os.makedirs(env_dir, exist_ok=True)
if not os.path.exists(env_file):
open(env_file, 'a').close()
with open(env_file, 'a', encoding='utf-8') as f:
f.write('\n# Auto-migrated from config.json\n')
for key, value in keys_to_migrate.items():
f.write(f'{key}={value}\n')
os.environ[key] = value
logger.info(f"[AgentInitializer] Migrated {len(keys_to_migrate)} API keys to .env: {list(keys_to_migrate.keys())}")
except Exception as e:
logger.warning(f"[AgentInitializer] Failed to migrate API keys: {e}")