Compare commits

...

33 Commits
2.0.0 ... 2.0.1

Author SHA1 Message Date
zhayujie
a24b26a1ef Merge pull request #2667 from cowagent/fix-wechatcom-image-support
fix: 支持企业微信图片消息识别功能
2026-02-12 16:44:18 +08:00
zhayujie
6f8421cdd5 fix: 支持企业微信图片消息识别功能
- 在 ChatGPTBot 中添加 ContextType.IMAGE 处理分支
- 新增 reply_image() 方法,支持 OpenAI Vision API
- 自动 Base64 编码图片并检测格式
- 自动清理临时文件

修复 #2625
2026-02-12 12:00:24 +08:00
zhayujie
284cd9bca9 Merge pull request #2666 from cowagent/fix-model-type-validation
fix: handle non-string model_type to prevent AttributeError
2026-02-10 11:31:45 +08:00
cowagent
23fd6b8d2b fix: handle non-string model_type to prevent AttributeError
When numeric model names (e.g., '1') are used with vLLM and configured
in YAML without quotes, they are parsed as integers. This causes
AttributeError when calling startswith() method.

Changes:
- Add type checking for model_type
- Convert non-string model_type to string with warning log
- Prevents crash when using custom numeric model names

Fixes #2664
2026-02-10 11:07:10 +08:00
zhayujie
4f0ea5d756 feat: make web search a built-in tool 2026-02-09 11:37:11 +08:00
zhayujie
6c218331b1 fix: improve skill system prompts and simplify tool descriptions
- Simplify skill-creator installation flow
- Refine skill selection prompt for better matching
- Add parameter alias and env variable hints for tools
- Skip linkai-agent when unconfigured
- Create skills/ dir in workspace on init
2026-02-08 18:59:59 +08:00
zhayujie
cea7fb7490 fix: add intelligent context cleanup #2663 2026-02-07 20:42:41 +08:00
zhayujie
8acf2dbdfe fix: chat context overflow #2663 2026-02-07 20:36:24 +08:00
zhayujie
0542700f90 fix: issues with empty tool calls and handling excessively long tool results 2026-02-07 20:25:05 +08:00
zhayujie
5264f7ce18 fix: getuid not found in windows 2026-02-07 11:17:58 +08:00
zhayujie
051ffd78a3 fix: windows path and encoding adaptation 2026-02-06 18:37:05 +08:00
zhayujie
bea95d4fae Merge pull request #2661 from cowagent/feat-add-claude-opus-4-6
feat: 添加 Claude Opus 4.6 模型支持
2026-02-06 15:09:49 +08:00
cowagent
fdf7bc312f feat: 添加 Claude Opus 4.6 模型支持
- 在 common/const.py 中添加 CLAUDE_4_6_OPUS 常量
- 将 claude-opus-4-6 添加到 MODEL_LIST
- 在 README.md 中更新 Agent 推荐模型列表
- 在 Claude 配置说明中添加 claude-opus-4-6 支持

Claude Opus 4.6 是 Anthropic 于 2026年2月5日发布的最新模型,
具有更强的规划能力和代码能力,适合作为 Agent 推荐模型。
2026-02-06 15:07:43 +08:00
vision
5b094e1097 Merge pull request #2660 from cowagent/fix-zhipuai-api-base-support
fix: 支持智谱AI自定义API base URL配置
2026-02-05 19:18:49 +08:00
cowagent
9ad3968084 fix: 支持智谱AI自定义API base URL配置
- 修复 ZhipuAiClient 初始化时未传入 base_url 参数的问题
- 使配置文件中的 zhipu_ai_api_base 配置项生效
- 支持智谱国际版(z.ai)等自定义API端点
- 同时修复对话和图片生成功能
- 添加日志输出便于确认使用的API地址

Fixes #2659
2026-02-05 19:06:46 +08:00
zhayujie
3958b6aae1 Merge pull request #2657 from cowagent/fix-missing-runtime-info-parameter
fix: 补充缺失的 runtime_info 参数传递
2026-02-04 22:51:53 +08:00
cowagent
eaa413caf0 fix: 补充缺失的 runtime_info 参数传递
问题:
PR #2655 已合并,但遗漏了关键的参数传递环节。runtime_info 在 agent_initializer.py 中创建并传递给 create_agent(),但 agent_bridge.py 的 create_agent() 方法中没有将其传递给 Agent 实例,导致动态时间更新功能无法生效。

影响:
- Agent 实例的 self.runtime_info 为 None
- get_full_system_prompt() 无法检测到动态时间函数
- 时间戳仍然是静态的,不会实时更新

修复:
在 agent_bridge.py 第 236 行添加:
runtime_info=kwargs.get("runtime_info")

这确保了完整的参数传递链路:
agent_initializer → agent_bridge.create_agent → Agent.__init__

---

*来自 [CowAgent](https://github.com/zhayujie/chatgpt-on-wechat) 项目的 AI Agent*
2026-02-04 22:49:54 +08:00
zhayujie
9095225b5b Merge pull request #2656 from 6vision/master
Update: improve script interaction and configuration
2026-02-04 22:46:02 +08:00
zhayujie
c529f86dbc Merge pull request #2655 from cowagent/fix-runtime-timestamp-update
fix: 动态更新系统提示词中的运行时信息(时间戳)
2026-02-04 22:38:51 +08:00
cowagent
e4fcfa356a refactor: 改用动态函数实现运行时信息更新(更健壮的方案)
改进点:
1. builder.py: _build_runtime_section() 支持 callable 动态时间函数
2. agent_initializer.py: 传入 get_current_time 函数而非静态时间值
3. agent.py: _rebuild_runtime_section() 动态调用时间函数并重建该部分

优势:
- 解耦模板:不依赖具体的提示词格式
- 健壮性:提示词模板改变不会导致功能失效
- 向后兼容:保留对静态时间的支持
- 性能优化:只在需要时才计算时间

相比之前的正则匹配方案,这个方案更加优雅和可维护。
2026-02-04 22:37:19 +08:00
vision
8218cff7c1 Merge branch 'zhayujie:master' into master 2026-02-04 22:32:20 +08:00
6vision
6949bbcf39 update: Improve script interaction and configuration 2026-02-04 22:31:40 +08:00
cowagent
480c60c0a7 fix: 动态更新系统提示词中的运行时信息(时间戳)
问题:
- system_prompt 在 Agent 初始化时固定,导致模型获取的时间信息过时
- 长时间运行的会话中,模型对时间判断不准确

解决方案:
- 在 get_full_system_prompt() 中添加动态更新逻辑
- 每次获取系统提示词时,使用正则表达式替换运行时信息中的时间戳
- 保持其他运行时信息(模型、工作空间等)不变

测试:
- 创建测试脚本验证时间动态更新功能
- 等待3秒后时间正确更新(22:19:45 -> 22:19:48)
2026-02-04 22:27:24 +08:00
zhayujie
eec10cb5db fix: claude remove toolname 2026-02-04 22:15:10 +08:00
zhayujie
02c83d8689 docs: update agent.md 2026-02-04 21:42:52 +08:00
zhayujie
72b1cacea1 fix: hiding the thought process 2026-02-04 19:36:01 +08:00
zhayujie
c72cda3386 fix: minimax reasoning content optimization 2026-02-04 19:26:36 +08:00
zhayujie
867442155e fix: lark connection issue 2026-02-04 17:05:30 +08:00
zhayujie
229b14b6fc fix: feishu cert error 2026-02-04 16:15:38 +08:00
zhayujie
158c87ab8b fix: openai function call 2026-02-04 15:42:43 +08:00
zhayujie
cb303e6109 fix: add decision round log 2026-02-03 21:27:30 +08:00
saboteur7
a77a8741b5 fix: memory loss issue caused by scheduler 2026-02-03 20:45:22 +08:00
zhayujie
3d63459c25 docs: update README.md 2026-02-03 15:44:00 +08:00
48 changed files with 1527 additions and 663 deletions

196
README.md
View File

@@ -12,13 +12,13 @@
# 简介
> 该项目既是一个可以开箱即用的超级AI助理也是一个支持高FTS5 not available, using LIKE-based keyword searc度扩展的Agent框架可以通过为项目扩展大模型接口、接入渠道、内置工具、Skills系统来灵活实现各种定制需求。核心能力如下
> 该项目既是一个可以开箱即用的超级AI助理也是一个支持高扩展的Agent框架可以通过为项目扩展大模型接口、接入渠道、内置工具、Skills系统来灵活实现各种定制需求。核心能力如下
-**复杂任务规划**:能够理解复杂任务并自主规划执行,持续思考和调用工具直到完成目标,支持通过工具操作访问文件、终端、浏览器、定时任务等系统资源
-**长期记忆:** 自动将对话记忆持久化至本地文件和数据库中,包括全局记忆和天级记忆,支持关键词及向量检索
-**技能系统:** 实现了Skills创建和运行的引擎内置多种技能并支持通过自然语言对话完成自定义Skills开发
-**多模态消息:** 支持对文本、图片、语音、文件等多类型消息进行解析、处理、生成、发送等操作
-**多模型接入:** 支持OpenAI, Claude, Gemini, DeepSeek, MiniMax、GLM、通义千问, Kimi等国内外主流模型厂商
-**多模型接入:** 支持OpenAI, Claude, Gemini, DeepSeek, MiniMax、GLM、Qwen、Kimi等国内外主流模型厂商
-**多端部署:** 支持运行在本地计算机或服务器,可集成到网页、飞书、钉钉、微信公众号、企业微信应用中使用
-**知识库:** 集成企业知识库能力让Agent成为专属数字员工基于[LinkAI](https://link-ai.tech)平台实现
@@ -90,7 +90,7 @@ bash <(curl -sS https://cdn.link-ai.tech/code/cow/run.sh)
项目支持国内外主流厂商的模型接口,可选模型及配置说明参考:[模型说明](#模型说明)。
> Agent模式下推荐使用以下模型可根据效果及成本综合选择 Claude(claude-sonnet-4-5、claude-sonnet-4-0)、Gemini(gemini-3-flash-preview、gemini-3-pro-preview)、GLM(glm-4.7)、MiniMAx(MiniMax-M2.1)、Qwen(qwen3-max)
> Agent模式下推荐使用以下模型可根据效果及成本综合选择GLM(glm-4.7)、MiniMAx(MiniMax-M2.1)、Qwen(qwen3-max)、Claude(claude-opus-4-6、claude-sonnet-4-5、claude-sonnet-4-0)、Gemini(gemini-3-flash-preview、gemini-3-pro-preview)
同时支持使用 **LinkAI平台** 接口,可灵活切换 OpenAI、Claude、Gemini、DeepSeek、Qwen、Kimi 等多种常用模型并支持知识库、工作流、插件等Agent能力参考 [接口文档](https://docs.link-ai.tech/platform/api)。
@@ -136,16 +136,16 @@ pip3 install -r requirements-optional.txt
# config.json 文件内容示例
{
"channel_type": "web", # 接入渠道类型默认为web支持修改为:feishu,dingtalk,wechatcom_app,terminal,wechatmp,wechatmp_service
"model": "claude-sonnet-4-5", # 模型名称
"model": "MiniMax-M2.1", # 模型名称
"minimax_api_key": "", # MiniMax API Key
"zhipu_ai_api_key": "", # 智谱GLM API Key
"dashscope_api_key": "", # 百炼(通义千问)API Key
"claude_api_key": "", # Claude API Key
"claude_api_base": "https://api.anthropic.com/v1", # Claude API 地址,修改可接入三方代理平台
"open_ai_api_key": "", # OpenAI API Key
"open_ai_api_base": "https://api.openai.com/v1", # OpenAI API 地址
"gemini_api_key": "", # Gemini API Key
"gemini_api_base": "https://generativelanguage.googleapis.com", # Gemini API地址
"zhipu_ai_api_key": "", # 智谱GLM API Key
"minimax_api_key": "", # MiniMax API Key
"dashscope_api_key": "", # 百炼(通义千问)API Key
"open_ai_api_key": "", # OpenAI API Key
"open_ai_api_base": "https://api.openai.com/v1", # OpenAI API 地址
"linkai_api_key": "", # LinkAI API Key
"proxy": "", # 代理客户端的ip和端口国内环境需要开启代理的可填写该项如 "127.0.0.1:7890"
"speech_recognition": false, # 是否开启语音识别
@@ -173,7 +173,7 @@ pip3 install -r requirements-optional.txt
<details>
<summary>2. 其他配置</summary>
+ `model`: 模型名称Agent模式下推荐使用 `claude-sonnet-4-5``claude-sonnet-4-0``gemini-3-flash-preview``gemini-3-pro-preview``glm-4.7``MiniMax-M2.1``qwen3-max`,全部模型名称参考[common/const.py](https://github.com/zhayujie/chatgpt-on-wechat/blob/master/common/const.py)文件
+ `model`: 模型名称Agent模式下推荐使用 `glm-4.7``MiniMax-M2.1``qwen3-max``claude-opus-4-6``claude-sonnet-4-5``claude-sonnet-4-0``gemini-3-flash-preview``gemini-3-pro-preview`,全部模型名称参考[common/const.py](https://github.com/zhayujie/chatgpt-on-wechat/blob/master/common/const.py)文件
+ `character_desc`普通对话模式下的机器人系统提示词。在Agent模式下该配置不生效由工作空间中的文件内容构成。
+ `subscribe_msg`订阅消息公众号和企业微信channel中请填写当被订阅时会自动回复 可使用特殊占位符。目前支持的占位符有{trigger_prefix}在程序中它会自动替换成bot的触发词。
</details>
@@ -302,6 +302,93 @@ volumes:
+ `model`: model字段填写空则直接使用智能体的模型可在平台中灵活切换[模型列表](https://link-ai.tech/console/models)中的全部模型均可使用
</details>
<details>
<summary>MiniMax</summary>
方式一:官方接入,配置如下(推荐)
```json
{
"model": "MiniMax-M2.1",
"minimax_api_key": ""
}
```
- `model`: 可填写 `MiniMax-M2.1、MiniMax-M2.1-lightning、MiniMax-M2、abab6.5-chat`
- `minimax_api_key`MiniMax平台的API-KEY在 [控制台](https://platform.minimaxi.com/user-center/basic-information/interface-key) 创建
方式二OpenAI兼容方式接入配置如下
```json
{
"bot_type": "chatGPT",
"model": "MiniMax-M2.1",
"open_ai_api_base": "https://api.minimaxi.com/v1",
"open_ai_api_key": ""
}
```
- `bot_type`: OpenAI兼容方式
- `model`: 可填 `MiniMax-M2.1、MiniMax-M2.1-lightning、MiniMax-M2`,参考[API文档](https://platform.minimaxi.com/document/%E5%AF%B9%E8%AF%9D?key=66701d281d57f38758d581d0#QklxsNSbaf6kM4j6wjO5eEek)
- `open_ai_api_base`: MiniMax平台API的 BASE URL
- `open_ai_api_key`: MiniMax平台的API-KEY
</details>
<details>
<summary>智谱AI (GLM)</summary>
方式一:官方接入,配置如下(推荐)
```json
{
"model": "glm-4.7",
"zhipu_ai_api_key": ""
}
```
- `model`: 可填 `glm-4.7、glm-4-plus、glm-4-flash、glm-4-air、glm-4-airx、glm-4-long` 等, 参考 [glm-4系列模型编码](https://bigmodel.cn/dev/api/normal-model/glm-4)
- `zhipu_ai_api_key`: 智谱AI平台的 API KEY在 [控制台](https://www.bigmodel.cn/usercenter/proj-mgmt/apikeys) 创建
方式二OpenAI兼容方式接入配置如下
```json
{
"bot_type": "chatGPT",
"model": "glm-4.7",
"open_ai_api_base": "https://open.bigmodel.cn/api/paas/v4",
"open_ai_api_key": ""
}
```
- `bot_type`: OpenAI兼容方式
- `model`: 可填 `glm-4.7、glm-4.6、glm-4-plus、glm-4-flash、glm-4-air、glm-4-airx、glm-4-long`
- `open_ai_api_base`: 智谱AI平台的 BASE URL
- `open_ai_api_key`: 智谱AI平台的 API KEY
</details>
<details>
<summary>通义千问 (Qwen)</summary>
方式一官方SDK接入配置如下(推荐)
```json
{
"model": "qwen3-max",
"dashscope_api_key": "sk-qVxxxxG"
}
```
- `model`: 可填写 `qwen3-max、qwen-max、qwen-plus、qwen-turbo、qwen-long、qwq-plus`
- `dashscope_api_key`: 通义千问的 API-KEY参考 [官方文档](https://bailian.console.aliyun.com/?tab=api#/api) ,在 [控制台](https://bailian.console.aliyun.com/?tab=model#/api-key) 创建
方式二OpenAI兼容方式接入配置如下
```json
{
"bot_type": "chatGPT",
"model": "qwen3-max",
"open_ai_api_base": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"open_ai_api_key": "sk-qVxxxxG"
}
```
- `bot_type`: OpenAI兼容方式
- `model`: 支持官方所有模型,参考[模型列表](https://help.aliyun.com/zh/model-studio/models?spm=a2c4g.11186623.0.0.78d84823Kth5on#9f8890ce29g5u)
- `open_ai_api_base`: 通义千问API的 BASE URL
- `open_ai_api_key`: 通义千问的 API-KEY
</details>
<details>
<summary>Claude</summary>
@@ -315,7 +402,7 @@ volumes:
"claude_api_key": "YOUR_API_KEY"
}
```
- `model`: 参考 [官方模型ID](https://docs.anthropic.com/en/docs/about-claude/models/overview#model-aliases) ,支持 `claude-sonnet-4-5、claude-sonnet-4-0、claude-opus-4-0、claude-3-5-sonnet-latest`
- `model`: 参考 [官方模型ID](https://docs.anthropic.com/en/docs/about-claude/models/overview#model-aliases) ,支持 `claude-opus-4-6、claude-sonnet-4-5、claude-sonnet-4-0、claude-opus-4-0、claude-3-5-sonnet-latest`
</details>
<details>
@@ -354,93 +441,6 @@ API Key创建在 [控制台](https://aistudio.google.com/app/apikey?hl=zh-cn)
- `open_ai_api_base`: DeepSeek平台 BASE URL
</details>
<details>
<summary>通义千问 (Qwen)</summary>
方式一官方SDK接入配置如下(推荐)
```json
{
"model": "qwen3-max",
"dashscope_api_key": "sk-qVxxxxG"
}
```
- `model`: 可填写 `qwen3-max、qwen-max、qwen-plus、qwen-turbo、qwen-long、qwq-plus`
- `dashscope_api_key`: 通义千问的 API-KEY参考 [官方文档](https://bailian.console.aliyun.com/?tab=api#/api) ,在 [控制台](https://bailian.console.aliyun.com/?tab=model#/api-key) 创建
方式二OpenAI兼容方式接入配置如下
```json
{
"bot_type": "chatGPT",
"model": "qwen3-max",
"open_ai_api_base": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"open_ai_api_key": "sk-qVxxxxG"
}
```
- `bot_type`: OpenAI兼容方式
- `model`: 支持官方所有模型,参考[模型列表](https://help.aliyun.com/zh/model-studio/models?spm=a2c4g.11186623.0.0.78d84823Kth5on#9f8890ce29g5u)
- `open_ai_api_base`: 通义千问API的 BASE URL
- `open_ai_api_key`: 通义千问的 API-KEY
</details>
<details>
<summary>MiniMax</summary>
方式一:官方接入,配置如下(推荐)
```json
{
"model": "MiniMax-M2.1",
"minimax_api_key": ""
}
```
- `model`: 可填写 `MiniMax-M2.1、MiniMax-M2.1-lightning、MiniMax-M2、abab6.5-chat`
- `minimax_api_key`MiniMax平台的API-KEY在 [控制台](https://platform.minimaxi.com/user-center/basic-information/interface-key) 创建
方式二OpenAI兼容方式接入配置如下
```json
{
"bot_type": "chatGPT",
"model": "MiniMax-M2.1",
"open_ai_api_base": "https://api.minimaxi.com/v1",
"open_ai_api_key": ""
}
```
- `bot_type`: OpenAI兼容方式
- `model`: 可填 `MiniMax-M2.1、MiniMax-M2.1-lightning、MiniMax-M2`,参考[API文档](https://platform.minimaxi.com/document/%E5%AF%B9%E8%AF%9D?key=66701d281d57f38758d581d0#QklxsNSbaf6kM4j6wjO5eEek)
- `open_ai_api_base`: MiniMax平台API的 BASE URL
- `open_ai_api_key`: MiniMax平台的API-KEY
</details>
<details>
<summary>智谱AI (GLM)</summary>
方式一:官方接入,配置如下(推荐)
```json
{
"model": "glm-4.7",
"zhipu_ai_api_key": ""
}
```
- `model`: 可填 `glm-4.7、glm-4-plus、glm-4-flash、glm-4-air、glm-4-airx、glm-4-long` 等, 参考 [glm-4系列模型编码](https://bigmodel.cn/dev/api/normal-model/glm-4)
- `zhipu_ai_api_key`: 智谱AI平台的 API KEY在 [控制台](https://www.bigmodel.cn/usercenter/proj-mgmt/apikeys) 创建
方式二OpenAI兼容方式接入配置如下
```json
{
"bot_type": "chatGPT",
"model": "glm-4.7",
"open_ai_api_base": "https://open.bigmodel.cn/api/paas/v4",
"open_ai_api_key": ""
}
```
- `bot_type`: OpenAI兼容方式
- `model`: 可填 `glm-4.7、glm-4.6、glm-4-plus、glm-4-flash、glm-4-air、glm-4-airx、glm-4-long`
- `open_ai_api_base`: 智谱AI平台的 BASE URL
- `open_ai_api_key`: 智谱AI平台的 API KEY
</details>
<details>
<summary>Kimi (Moonshot)</summary>

View File

@@ -11,12 +11,18 @@ from typing import Optional, List
from pathlib import Path
def _default_workspace():
"""Get default workspace path with proper Windows support"""
from common.utils import expand_path
return expand_path("~/cow")
@dataclass
class MemoryConfig:
"""Configuration for memory storage and search"""
# Storage paths (default: ~/cow)
workspace_root: str = field(default_factory=lambda: os.path.expanduser("~/cow"))
workspace_root: str = field(default_factory=_default_workspace)
# Embedding config
embedding_provider: str = "openai" # "openai" | "local"

View File

@@ -304,7 +304,7 @@ class MemoryManager:
):
"""Sync a single file"""
# Compute file hash
content = file_path.read_text()
content = file_path.read_text(encoding='utf-8')
file_hash = MemoryStorage.compute_hash(content)
# Get relative path

View File

@@ -157,96 +157,66 @@ def _build_identity_section(base_persona: Optional[str], language: str) -> List[
def _build_tooling_section(tools: List[Any], language: str) -> List[str]:
"""构建工具说明section"""
"""Build tooling section with concise tool list and call style guide."""
# One-line summaries for known tools (details are in the tool schema)
core_summaries = {
"read": "读取文件内容",
"write": "创建或覆盖文件",
"edit": "精确编辑文件",
"ls": "列出目录内容",
"grep": "搜索文件内容",
"find": "按模式查找文件",
"bash": "执行shell命令",
"terminal": "管理后台进程",
"web_search": "网络搜索",
"web_fetch": "获取URL内容",
"browser": "控制浏览器",
"memory_search": "搜索记忆",
"memory_get": "读取记忆内容",
"env_config": "管理API密钥和技能配置",
"scheduler": "管理定时任务和提醒",
"send": "发送文件给用户",
}
# Preferred display order
tool_order = [
"read", "write", "edit", "ls", "grep", "find",
"bash", "terminal",
"web_search", "web_fetch", "browser",
"memory_search", "memory_get",
"env_config", "scheduler", "send",
]
# Build name -> summary mapping for available tools
available = {}
for tool in tools:
name = tool.name if hasattr(tool, 'name') else str(tool)
available[name] = core_summaries.get(name, "")
# Generate tool lines: ordered tools first, then extras
tool_lines = []
for name in tool_order:
if name in available:
summary = available.pop(name)
tool_lines.append(f"- {name}: {summary}" if summary else f"- {name}")
for name in sorted(available):
summary = available[name]
tool_lines.append(f"- {name}: {summary}" if summary else f"- {name}")
lines = [
"## 工具系统",
"",
"你可以使用以下工具来完成任务。工具名称大小写敏感的,请严格按列表中的名称调用。",
"可用工具(名称大小写敏感严格按列表调用):",
"\n".join(tool_lines),
"",
"### 可用工具",
"工具调用风格:",
"",
"- 在多步骤任务、敏感操作或用户要求时简要解释决策过程",
"- 持续推进直到任务完成,完成后向用户报告结果。",
"- 回复中涉及密钥、令牌等敏感信息必须脱敏。",
"",
]
# 工具分类和排序
tool_categories = {
"文件操作": ["read", "write", "edit", "ls", "grep", "find"],
"命令执行": ["bash", "terminal"],
"网络搜索": ["web_search", "web_fetch", "browser"],
"记忆系统": ["memory_search", "memory_get"],
"其他": []
}
# 构建工具映射
tool_map = {}
tool_descriptions = {
"read": "读取文件内容",
"write": "创建新文件或完全覆盖现有文件(会删除原内容!追加内容请用 edit。注意单次 write 内容不要超过 10KB超大文件请分步创建",
"edit": "精确编辑文件(追加、修改、删除部分内容)",
"ls": "列出目录内容",
"grep": "在文件中搜索内容",
"find": "按照模式查找文件",
"bash": "执行shell命令",
"terminal": "管理后台进程",
"web_search": "网络搜索(使用搜索引擎)",
"web_fetch": "获取URL内容",
"browser": "控制浏览器",
"memory_search": "搜索记忆文件",
"memory_get": "获取记忆文件内容",
"calculator": "计算器",
"current_time": "获取当前时间",
}
for tool in tools:
tool_name = tool.name if hasattr(tool, 'name') else str(tool)
tool_desc = tool.description if hasattr(tool, 'description') else tool_descriptions.get(tool_name, "")
tool_map[tool_name] = tool_desc
# 按分类添加工具
for category, tool_names in tool_categories.items():
category_tools = [(name, tool_map.get(name, "")) for name in tool_names if name in tool_map]
if category_tools:
lines.append(f"**{category}**:")
for name, desc in category_tools:
if desc:
lines.append(f"- `{name}`: {desc}")
else:
lines.append(f"- `{name}`")
del tool_map[name] # 移除已添加的工具
lines.append("")
# 添加其他未分类的工具
if tool_map:
lines.append("**其他工具**:")
for name, desc in sorted(tool_map.items()):
if desc:
lines.append(f"- `{name}`: {desc}")
else:
lines.append(f"- `{name}`")
lines.append("")
# 工具使用指南
lines.extend([
"### 工具调用风格",
"",
"默认规则: 对于常规、低风险的工具调用,直接调用即可,无需叙述。",
"",
"需要叙述的情况:",
"- 多步骤、复杂的任务",
"- 敏感操作(如删除文件)",
"- 用户明确要求解释过程",
"",
"叙述要求: 保持简洁、信息密度高,避免重复显而易见的步骤。",
"",
"完成标准:",
"- 确保用户的需求得到实际解决,而不仅仅是制定计划。",
"- 当任务需要多次工具调用时,持续推进直到完成, 解决完后向用户报告结果或回复用户的问题",
"- 每次工具调用后,评估是否已获得足够信息来推进或完成任务",
"- 避免重复调用相同的工具和相同参数获取相同的信息,除非用户明确要求",
"",
"**安全提醒**: 回复中涉及密钥、令牌、密码等敏感信息时,必须脱敏处理,禁止直接显示完整内容。",
"",
])
return lines
@@ -265,16 +235,17 @@ def _build_skills_section(skill_manager: Any, tools: Optional[List[Any]], langua
break
lines = [
"## 技能系统",
"## 技能系统mandatory",
"",
"在回复之前:扫描下方 <available_skills> 中的 <description> 条目。",
"",
f"- 如果恰好有一个技能明确适用:使用 `{read_tool_name}` 工具读取其 <location> 路径下的 SKILL.md 文件,然后遵循它",
"- 如果多个技能都适用选择最具体的一个,然后读取并遵循",
"- 如果没有明确适用的:不要读取任何 SKILL.md",
f"- 如果恰好有一个技能(Skill)明确适用:使用 `{read_tool_name}` 读取其 <location> 的 SKILL.md然后严格遵循它",
"- 如果多个技能都适用选择最匹配的一个,如果没有明确适用的则不要读取任何 SKILL.md",
"- 读取 SKILL.md 后直接按其指令执行,无需多余的预检查",
"",
"**约束**: 永远不要一次性读取多个技能只在选择后再读取。",
"**注意**: 永远不要一次性读取多个技能只在选择后再读取。技能和工具不同必须先读取其SKILL.md并按照文件内容运行。",
"",
"以下是可用技能:"
]
# 添加技能列表通过skill_manager获取
@@ -461,7 +432,7 @@ def _build_context_files_section(context_files: List[ContextFile], language: str
def _build_runtime_section(runtime_info: Dict[str, Any], language: str) -> List[str]:
"""构建运行时信息section"""
"""构建运行时信息section - 支持动态时间"""
if not runtime_info:
return []
@@ -471,7 +442,17 @@ def _build_runtime_section(runtime_info: Dict[str, Any], language: str) -> List[
]
# Add current time if available
if runtime_info.get("current_time"):
# Support dynamic time via callable function
if callable(runtime_info.get("_get_current_time")):
try:
time_info = runtime_info["_get_current_time"]()
time_line = f"当前时间: {time_info['time']} {time_info['weekday']} ({time_info['timezone']})"
lines.append(time_line)
lines.append("")
except Exception as e:
logger.warning(f"[PromptBuilder] Failed to get dynamic time: {e}")
elif runtime_info.get("current_time"):
# Fallback to static time for backward compatibility
time_str = runtime_info["current_time"]
weekday = runtime_info.get("weekday", "")
timezone = runtime_info.get("timezone", "")

View File

@@ -57,6 +57,10 @@ def ensure_workspace(workspace_dir: str, create_templates: bool = True) -> Works
# 创建memory子目录
os.makedirs(memory_dir, exist_ok=True)
# 创建skills子目录 (for workspace-level skills installed by agent)
skills_dir = os.path.join(workspace_dir, "skills")
os.makedirs(skills_dir, exist_ok=True)
# 如果需要,创建模板文件
if create_templates:

View File

@@ -13,7 +13,8 @@ class Agent:
def __init__(self, system_prompt: str, description: str = "AI Agent", model: LLMModel = None,
tools=None, output_mode="print", max_steps=100, max_context_tokens=None,
context_reserve_tokens=None, memory_manager=None, name: str = None,
workspace_dir: str = None, skill_manager=None, enable_skills: bool = True):
workspace_dir: str = None, skill_manager=None, enable_skills: bool = True,
runtime_info: dict = None):
"""
Initialize the Agent with system prompt, model, description.
@@ -31,6 +32,7 @@ class Agent:
:param workspace_dir: Optional workspace directory for workspace-specific skills
:param skill_manager: Optional SkillManager instance (will be created if None and enable_skills=True)
:param enable_skills: Whether to enable skills support (default: True)
:param runtime_info: Optional runtime info dict (with _get_current_time callable for dynamic time)
"""
self.name = name or "Agent"
self.system_prompt = system_prompt
@@ -48,6 +50,7 @@ class Agent:
self.memory_manager = memory_manager # Memory manager for auto memory flush
self.workspace_dir = workspace_dir # Workspace directory
self.enable_skills = enable_skills # Skills enabled flag
self.runtime_info = runtime_info # Runtime info for dynamic time update
# Initialize skill manager
self.skill_manager = None
@@ -96,18 +99,98 @@ class Agent:
def get_full_system_prompt(self, skill_filter=None) -> str:
"""
Get the full system prompt including skills.
Note: Skills are now built into the system prompt by PromptBuilder,
so we just return the base prompt directly. This method is kept for
backward compatibility.
:param skill_filter: Optional list of skill names to include (deprecated)
:return: Complete system prompt
"""
# Skills are now included in system_prompt by PromptBuilder
# No need to append them here
return self.system_prompt
prompt = self.system_prompt
# Rebuild tool list section to reflect current self.tools
prompt = self._rebuild_tool_list_section(prompt)
# If runtime_info contains dynamic time function, rebuild runtime section
if self.runtime_info and callable(self.runtime_info.get('_get_current_time')):
prompt = self._rebuild_runtime_section(prompt)
return prompt
def _rebuild_runtime_section(self, prompt: str) -> str:
"""
Rebuild runtime info section with current time.
This method dynamically updates the runtime info section by calling
the _get_current_time function from runtime_info.
:param prompt: Original system prompt
:return: Updated system prompt with current runtime info
"""
try:
# Get current time dynamically
time_info = self.runtime_info['_get_current_time']()
# Build new runtime section
runtime_lines = [
"\n## 运行时信息\n",
"\n",
f"当前时间: {time_info['time']} {time_info['weekday']} ({time_info['timezone']})\n",
"\n"
]
# Add other runtime info
runtime_parts = []
if self.runtime_info.get("model"):
runtime_parts.append(f"模型={self.runtime_info['model']}")
if self.runtime_info.get("workspace"):
# Replace backslashes with forward slashes for Windows paths
workspace_path = str(self.runtime_info['workspace']).replace('\\', '/')
runtime_parts.append(f"工作空间={workspace_path}")
if self.runtime_info.get("channel") and self.runtime_info.get("channel") != "web":
runtime_parts.append(f"渠道={self.runtime_info['channel']}")
if runtime_parts:
runtime_lines.append("运行时: " + " | ".join(runtime_parts) + "\n")
runtime_lines.append("\n")
new_runtime_section = "".join(runtime_lines)
# Find and replace the runtime section
import re
pattern = r'\n## 运行时信息\s*\n.*?(?=\n##|\Z)'
updated_prompt = re.sub(pattern, new_runtime_section.rstrip('\n'), prompt, flags=re.DOTALL)
return updated_prompt
except Exception as e:
logger.warning(f"Failed to rebuild runtime section: {e}")
return prompt
def _rebuild_tool_list_section(self, prompt: str) -> str:
"""
Rebuild the tool list inside the '## 工具系统' section so that it
always reflects the current ``self.tools`` (handles dynamic add/remove
of conditional tools like web_search).
"""
import re
from agent.prompt.builder import _build_tooling_section
try:
if not self.tools:
return prompt
new_lines = _build_tooling_section(self.tools, "zh")
new_section = "\n".join(new_lines).rstrip("\n")
# Replace existing tooling section
pattern = r'## 工具系统\s*\n.*?(?=\n## |\Z)'
updated = re.sub(pattern, new_section, prompt, count=1, flags=re.DOTALL)
return updated
except Exception as e:
logger.warning(f"Failed to rebuild tool list section: {e}")
return prompt
def refresh_skills(self):
"""Refresh the loaded skills."""
if self.skill_manager:
@@ -193,27 +276,67 @@ class Agent:
def _estimate_message_tokens(self, message: dict) -> int:
"""
Estimate token count for a message using chars/4 heuristic.
This is a conservative estimate (tends to overestimate).
Estimate token count for a message.
Uses chars/3 for Chinese-heavy content and chars/4 for ASCII-heavy content,
plus per-block overhead for tool_use / tool_result structures.
:param message: Message dict with 'role' and 'content'
:return: Estimated token count
"""
content = message.get('content', '')
if isinstance(content, str):
return max(1, len(content) // 4)
return max(1, self._estimate_text_tokens(content))
elif isinstance(content, list):
# Handle multi-part content (text + images)
total_chars = 0
total_tokens = 0
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
total_chars += len(part.get('text', ''))
elif isinstance(part, dict) and part.get('type') == 'image':
# Estimate images as ~1200 tokens
total_chars += 4800
return max(1, total_chars // 4)
if not isinstance(part, dict):
continue
block_type = part.get('type', '')
if block_type == 'text':
total_tokens += self._estimate_text_tokens(part.get('text', ''))
elif block_type == 'image':
total_tokens += 1200
elif block_type == 'tool_use':
# tool_use has id + name + input (JSON-encoded)
total_tokens += 50 # overhead for structure
input_data = part.get('input', {})
if isinstance(input_data, dict):
import json
input_str = json.dumps(input_data, ensure_ascii=False)
total_tokens += self._estimate_text_tokens(input_str)
elif block_type == 'tool_result':
# tool_result has tool_use_id + content
total_tokens += 30 # overhead for structure
result_content = part.get('content', '')
if isinstance(result_content, str):
total_tokens += self._estimate_text_tokens(result_content)
else:
# Unknown block type, estimate conservatively
total_tokens += 10
return max(1, total_tokens)
return 1
@staticmethod
def _estimate_text_tokens(text: str) -> int:
"""
Estimate token count for a text string.
Chinese / CJK characters typically use ~1.5 tokens each,
while ASCII uses ~0.25 tokens per char (4 chars/token).
We use a weighted average based on the character mix.
:param text: Input text
:return: Estimated token count
"""
if not text:
return 0
# Count non-ASCII characters (CJK, emoji, etc.)
non_ascii = sum(1 for c in text if ord(c) > 127)
ascii_count = len(text) - non_ascii
# CJK chars: ~1.5 tokens each; ASCII: ~0.25 tokens per char
return int(non_ascii * 1.5 + ascii_count * 0.25) + 1
def _find_tool(self, tool_name: str):
"""Find and return a tool with the specified name"""
for tool in self.tools:
@@ -370,7 +493,17 @@ class Agent:
)
# Execute
response = executor.run_stream(user_message)
try:
response = executor.run_stream(user_message)
except Exception:
# If executor cleared its messages (context overflow / message format error),
# sync that back to the Agent's own message list so the next request
# starts fresh instead of hitting the same overflow forever.
if len(executor.messages) == 0:
with self.messages_lock:
self.messages.clear()
logger.info("[Agent] Cleared Agent message history after executor recovery")
raise
# Append only the NEW messages from this execution (thread-safe)
# This allows concurrent requests to both contribute to history

View File

@@ -76,6 +76,20 @@ class AgentStreamExecutor:
})
except Exception as e:
logger.error(f"Event callback error: {e}")
def _filter_think_tags(self, text: str) -> str:
"""
Remove <think> and </think> tags but keep the content inside.
Some LLM providers (e.g., MiniMax) may return thinking process wrapped in <think> tags.
We only remove the tags themselves, keeping the actual thinking content.
"""
if not text:
return text
import re
# Remove only the <think> and </think> tags, keep the content
text = re.sub(r'<think>', '', text)
text = re.sub(r'</think>', '', text)
return text
def _hash_args(self, args: dict) -> str:
"""Generate a simple hash for tool arguments"""
@@ -184,7 +198,7 @@ class AgentStreamExecutor:
try:
while turn < self.max_turns:
turn += 1
logger.debug(f"{turn}")
logger.info(f"[Agent] {turn}")
self._emit_event("turn_start", {"turn": turn})
# Check if memory flush is needed (before calling LLM)
@@ -322,7 +336,7 @@ class AgentStreamExecutor:
# Build tool result block (Claude format)
# Format content in a way that's easy for LLM to understand
is_error = result.get("status") == "error"
if is_error:
# For errors, provide clear error message
result_content = f"Error: {result.get('result', 'Unknown error')}"
@@ -335,7 +349,16 @@ class AgentStreamExecutor:
else:
# Fallback to full JSON
result_content = json.dumps(result, ensure_ascii=False)
# Truncate excessively large tool results for the current turn
# Historical turns will be further truncated in _trim_messages()
MAX_CURRENT_TURN_RESULT_CHARS = 50000
if len(result_content) > MAX_CURRENT_TURN_RESULT_CHARS:
truncated_len = len(result_content)
result_content = result_content[:MAX_CURRENT_TURN_RESULT_CHARS] + \
f"\n\n[Output truncated: {truncated_len} chars total, showing first {MAX_CURRENT_TURN_RESULT_CHARS} chars]"
logger.info(f"📎 Truncated tool result for '{tool_call['name']}': {truncated_len} -> {MAX_CURRENT_TURN_RESULT_CHARS} chars")
tool_result_block = {
"type": "tool_result",
"tool_use_id": tool_call["id"],
@@ -447,7 +470,7 @@ class AgentStreamExecutor:
raise
finally:
logger.debug(f"🏁 完成({turn}轮)")
logger.info(f"[Agent] 🏁 完成 ({turn}轮)")
self._emit_event("agent_end", {"final_response": final_response})
# 每轮对话结束后增加计数(用户消息+AI回复=1轮
@@ -456,7 +479,8 @@ class AgentStreamExecutor:
return final_response
def _call_llm_stream(self, retry_on_empty=True, retry_count=0, max_retries=3) -> Tuple[str, List[Dict]]:
def _call_llm_stream(self, retry_on_empty=True, retry_count=0, max_retries=3,
_overflow_retry: bool = False) -> Tuple[str, List[Dict]]:
"""
Call LLM with streaming and automatic retry on errors
@@ -464,6 +488,7 @@ class AgentStreamExecutor:
retry_on_empty: Whether to retry once if empty response is received
retry_count: Current retry attempt (internal use)
max_retries: Maximum number of retries for API errors
_overflow_retry: Internal flag indicating this is a retry after context overflow
Returns:
(response_text, tool_calls)
@@ -561,8 +586,11 @@ class AgentStreamExecutor:
# Handle text content
content_delta = delta.get("content") or ""
if content_delta:
full_content += content_delta
self._emit_event("message_update", {"delta": content_delta})
# Filter out <think> tags from content
filtered_delta = self._filter_think_tags(content_delta)
full_content += filtered_delta
if filtered_delta: # Only emit if there's content after filtering
self._emit_event("message_update", {"delta": filtered_delta})
# Handle tool calls
if "tool_calls" in delta and delta["tool_calls"]:
@@ -612,10 +640,23 @@ class AgentStreamExecutor:
if is_context_overflow or is_message_format_error:
error_type = "context overflow" if is_context_overflow else "message format error"
logger.error(f"💥 {error_type} detected: {e}")
# Clear message history to recover
# Strategy: try aggressive trimming first, only clear as last resort
if is_context_overflow and not _overflow_retry:
trimmed = self._aggressive_trim_for_overflow()
if trimmed:
logger.warning("🔄 Aggressively trimmed context, retrying...")
return self._call_llm_stream(
retry_on_empty=retry_on_empty,
retry_count=retry_count,
max_retries=max_retries,
_overflow_retry=True
)
# Aggressive trim didn't help or this is a message format error
# -> clear everything
logger.warning("🔄 Clearing conversation history to recover")
self.messages.clear()
# Raise special exception with user-friendly message
if is_context_overflow:
raise Exception(
"抱歉,对话历史过长导致上下文溢出。我已清空历史记录,请重新描述你的需求。"
@@ -661,6 +702,13 @@ class AgentStreamExecutor:
tool_calls = []
for idx in sorted(tool_calls_buffer.keys()):
tc = tool_calls_buffer[idx]
# Ensure tool call has a valid ID (some providers return empty/None IDs)
tool_id = tc.get("id") or ""
if not tool_id:
import uuid
tool_id = f"call_{uuid.uuid4().hex[:24]}"
try:
# Safely get arguments, handle None case
args_str = tc.get("arguments") or ""
@@ -673,11 +721,11 @@ class AgentStreamExecutor:
logger.error(f"Arguments length: {len(args_str)} chars")
logger.error(f"Arguments preview: {args_preview}...")
logger.error(f"JSON decode error: {e}")
# Return a clear error message to the LLM instead of empty dict
# This helps the LLM understand what went wrong
tool_calls.append({
"id": tc["id"],
"id": tool_id,
"name": tc["name"],
"arguments": {},
"_parse_error": f"Invalid JSON in tool arguments: {args_preview}... Error: {str(e)}. Tip: For large content, consider splitting into smaller chunks or using a different approach."
@@ -685,7 +733,7 @@ class AgentStreamExecutor:
continue
tool_calls.append({
"id": tc["id"],
"id": tool_id,
"name": tc["name"],
"arguments": arguments
})
@@ -706,6 +754,9 @@ class AgentStreamExecutor:
max_retries=max_retries
)
# Filter full_content one more time (in case tags were split across chunks)
full_content = self._filter_think_tags(full_content)
# Add assistant message to history (Claude format uses content blocks)
assistant_msg = {"role": "assistant", "content": []}
@@ -926,10 +977,164 @@ class AgentStreamExecutor:
for msg in turn['messages']
)
def _truncate_historical_tool_results(self):
"""
Truncate tool_result content in historical messages to reduce context size.
Current turn results are kept at 30K chars (truncated at creation time).
Historical turn results are further truncated to 10K chars here.
This runs before token-based trimming so that we first shrink oversized
results, potentially avoiding the need to drop entire turns.
"""
MAX_HISTORY_RESULT_CHARS = 20000
if len(self.messages) < 2:
return
# Find where the last user text message starts (= current turn boundary)
# We skip the current turn's messages to preserve their full content
current_turn_start = len(self.messages)
for i in range(len(self.messages) - 1, -1, -1):
msg = self.messages[i]
if msg.get("role") == "user":
content = msg.get("content", [])
if isinstance(content, list) and any(
isinstance(b, dict) and b.get("type") == "text" for b in content
):
current_turn_start = i
break
elif isinstance(content, str):
current_turn_start = i
break
truncated_count = 0
for i in range(current_turn_start):
msg = self.messages[i]
if msg.get("role") != "user":
continue
content = msg.get("content", [])
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict) or block.get("type") != "tool_result":
continue
result_str = block.get("content", "")
if isinstance(result_str, str) and len(result_str) > MAX_HISTORY_RESULT_CHARS:
original_len = len(result_str)
block["content"] = result_str[:MAX_HISTORY_RESULT_CHARS] + \
f"\n\n[Historical output truncated: {original_len} -> {MAX_HISTORY_RESULT_CHARS} chars]"
truncated_count += 1
if truncated_count > 0:
logger.info(f"📎 Truncated {truncated_count} historical tool result(s) to {MAX_HISTORY_RESULT_CHARS} chars")
def _aggressive_trim_for_overflow(self) -> bool:
"""
Aggressively trim context when a real overflow error is returned by the API.
This method goes beyond normal _trim_messages by:
1. Truncating all tool results (including current turn) to a small limit
2. Keeping only the last 5 complete conversation turns
3. Truncating overly long user messages
Returns:
True if messages were trimmed (worth retrying), False if nothing left to trim
"""
if not self.messages:
return False
original_count = len(self.messages)
# Step 1: Aggressively truncate ALL tool results to 5K chars
AGGRESSIVE_LIMIT = 10000
truncated = 0
for msg in self.messages:
content = msg.get("content", [])
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict):
continue
# Truncate tool_result blocks
if block.get("type") == "tool_result":
result_str = block.get("content", "")
if isinstance(result_str, str) and len(result_str) > AGGRESSIVE_LIMIT:
block["content"] = (
result_str[:AGGRESSIVE_LIMIT]
+ f"\n\n[Truncated for context recovery: "
f"{len(result_str)} -> {AGGRESSIVE_LIMIT} chars]"
)
truncated += 1
# Truncate tool_use input blocks (e.g. large write content)
if block.get("type") == "tool_use" and isinstance(block.get("input"), dict):
input_str = json.dumps(block["input"], ensure_ascii=False)
if len(input_str) > AGGRESSIVE_LIMIT:
# Keep only a summary of the input
for key, val in block["input"].items():
if isinstance(val, str) and len(val) > 1000:
block["input"][key] = (
val[:1000]
+ f"... [truncated {len(val)} chars]"
)
truncated += 1
# Step 2: Truncate overly long user text messages (e.g. pasted content)
USER_MSG_LIMIT = 10000
for msg in self.messages:
if msg.get("role") != "user":
continue
content = msg.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if len(text) > USER_MSG_LIMIT:
block["text"] = (
text[:USER_MSG_LIMIT]
+ f"\n\n[Message truncated for context recovery: "
f"{len(text)} -> {USER_MSG_LIMIT} chars]"
)
truncated += 1
elif isinstance(content, str) and len(content) > USER_MSG_LIMIT:
msg["content"] = (
content[:USER_MSG_LIMIT]
+ f"\n\n[Message truncated for context recovery: "
f"{len(content)} -> {USER_MSG_LIMIT} chars]"
)
truncated += 1
# Step 3: Keep only the last 5 complete turns
turns = self._identify_complete_turns()
if len(turns) > 5:
kept_turns = turns[-5:]
new_messages = []
for turn in kept_turns:
new_messages.extend(turn["messages"])
removed = len(turns) - 5
self.messages[:] = new_messages
logger.info(
f"🔧 Aggressive trim: removed {removed} old turns, "
f"truncated {truncated} large blocks, "
f"{original_count} -> {len(self.messages)} messages"
)
return True
if truncated > 0:
logger.info(
f"🔧 Aggressive trim: truncated {truncated} large blocks "
f"(no turns removed, only {len(turns)} turn(s) left)"
)
return True
# Nothing left to trim
logger.warning("🔧 Aggressive trim: nothing to trim, will clear history")
return False
def _trim_messages(self):
"""
智能清理消息历史,保持对话完整性
使用完整轮次作为清理单位,确保:
1. 不会在对话中间截断
2. 工具调用链tool_use + tool_result保持完整
@@ -938,6 +1143,9 @@ class AgentStreamExecutor:
if not self.messages or not self.agent:
return
# Step 0: Truncate large tool results in historical turns (30K -> 10K)
self._truncate_historical_tool_results()
# Step 1: 识别完整轮次
turns = self._identify_complete_turns()

View File

@@ -23,18 +23,15 @@ def format_skills_for_prompt(skills: List[Skill]) -> str:
return ""
lines = [
"\n\nThe following skills provide specialized instructions for specific tasks.",
"Use the read tool to load a skill's file when the task matches its description.",
"",
"<available_skills>",
]
for skill in visible_skills:
lines.append(" <skill>")
lines.append(f" <name>{_escape_xml(skill.name)}</name>")
lines.append(f" <description>{_escape_xml(skill.description)}</description>")
lines.append(f" <location>{_escape_xml(skill.file_path)}</location>")
lines.append(f" <base_dir>{_escape_xml(skill.base_dir)}</base_dir>")
lines.append(" </skill>")
lines.append("</available_skills>")

View File

@@ -188,16 +188,14 @@ class SkillLoader:
import json
config_path = os.path.join(skill_dir, "config.json")
template_path = os.path.join(skill_dir, "config.json.template")
# Try to load config.json or fallback to template
config_file = config_path if os.path.exists(config_path) else template_path
if not os.path.exists(config_file):
return default_description
# Without config.json, skip this skill entirely (return empty to trigger exclusion)
if not os.path.exists(config_path):
logger.debug(f"[SkillLoader] linkai-agent skipped: no config.json found")
return ""
try:
with open(config_file, 'r', encoding='utf-8') as f:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
apps = config.get("apps", [])

View File

@@ -45,16 +45,25 @@ def _import_optional_tools():
)
except Exception as e:
logger.error(f"[Tools] Scheduler tool failed to load: {e}")
# WebSearch Tool (conditionally loaded based on API key availability at init time)
try:
from agent.tools.web_search.web_search import WebSearch
tools['WebSearch'] = WebSearch
except ImportError as e:
logger.error(f"[Tools] WebSearch not loaded - missing dependency: {e}")
except Exception as e:
logger.error(f"[Tools] WebSearch failed to load: {e}")
return tools
# Load optional tools
_optional_tools = _import_optional_tools()
EnvConfig = _optional_tools.get('EnvConfig')
SchedulerTool = _optional_tools.get('SchedulerTool')
WebSearch = _optional_tools.get('WebSearch')
GoogleSearch = _optional_tools.get('GoogleSearch')
FileSave = _optional_tools.get('FileSave')
FileSave = _optional_tools.get('FileSave')
Terminal = _optional_tools.get('Terminal')
@@ -92,6 +101,7 @@ __all__ = [
'MemoryGetTool',
'EnvConfig',
'SchedulerTool',
'WebSearch',
# Optional tools (may be None if dependencies not available)
# 'BrowserTool'
]

View File

@@ -11,6 +11,7 @@ from typing import Dict, Any
from agent.tools.base_tool import BaseTool, ToolResult
from agent.tools.utils.truncate import truncate_tail, format_size, DEFAULT_MAX_LINES, DEFAULT_MAX_BYTES
from common.log import logger
from common.utils import expand_path
class Bash(BaseTool):
@@ -19,10 +20,11 @@ class Bash(BaseTool):
name: str = "bash"
description: str = f"""Execute a bash command in the current working directory. Returns stdout and stderr. Output is truncated to last {DEFAULT_MAX_LINES} lines or {DEFAULT_MAX_BYTES // 1024}KB (whichever is hit first). If truncated, full output is saved to a temp file.
IMPORTANT SAFETY GUIDELINES:
- You can freely create, modify, and delete files within the current workspace
- For operations outside the workspace or potentially destructive commands (rm -rf, system commands, etc.), always explain what you're about to do and ask for user confirmation first
- When in doubt, describe the command's purpose and ask for permission before executing"""
ENVIRONMENT: All API keys from env_config are auto-injected. Use $VAR_NAME directly.
SAFETY:
- Freely create/modify/delete files within the workspace
- For destructive and out-of-workspace commands, explain and confirm first"""
params: dict = {
"type": "object",
@@ -80,7 +82,7 @@ IMPORTANT SAFETY GUIDELINES:
env = os.environ.copy()
# Load environment variables from ~/.cow/.env if it exists
env_file = os.path.expanduser("~/.cow/.env")
env_file = expand_path("~/.cow/.env")
if os.path.exists(env_file):
try:
from dotenv import dotenv_values
@@ -91,14 +93,12 @@ IMPORTANT SAFETY GUIDELINES:
logger.debug("[Bash] python-dotenv not installed, skipping .env loading")
except Exception as e:
logger.debug(f"[Bash] Failed to load .env: {e}")
# Debug logging
logger.debug(f"[Bash] CWD: {self.cwd}")
logger.debug(f"[Bash] Command: {command[:500]}")
logger.debug(f"[Bash] OPENAI_API_KEY in env: {'OPENAI_API_KEY' in env}")
logger.debug(f"[Bash] SHELL: {env.get('SHELL', 'not set')}")
logger.debug(f"[Bash] Python executable: {sys.executable}")
logger.debug(f"[Bash] Process UID: {os.getuid()}")
# getuid() only exists on Unix-like systems
if hasattr(os, 'getuid'):
logger.debug(f"[Bash] Process UID: {os.getuid()}")
else:
logger.debug(f"[Bash] Process User: {os.environ.get('USERNAME', os.environ.get('USER', 'unknown'))}")
# Execute command with inherited environment variables
result = subprocess.run(

View File

@@ -7,6 +7,7 @@ import os
from typing import Dict, Any
from agent.tools.base_tool import BaseTool, ToolResult
from common.utils import expand_path
from agent.tools.utils.diff import (
strip_bom,
detect_line_ending,
@@ -178,7 +179,7 @@ class Edit(BaseTool):
:return: Absolute path
"""
# Expand ~ to user home directory
path = os.path.expanduser(path)
path = expand_path(path)
if os.path.isabs(path):
return path
return os.path.abspath(os.path.join(self.cwd, path))

View File

@@ -9,6 +9,7 @@ from pathlib import Path
from agent.tools.base_tool import BaseTool, ToolResult
from common.log import logger
from common.utils import expand_path
# API Key 知识库:常见的环境变量及其描述
@@ -66,7 +67,7 @@ class EnvConfig(BaseTool):
def __init__(self, config: dict = None):
self.config = config or {}
# Store env config in ~/.cow directory (outside workspace for security)
self.env_dir = os.path.expanduser("~/.cow")
self.env_dir = expand_path("~/.cow")
self.env_path = os.path.join(self.env_dir, '.env')
self.agent_bridge = self.config.get("agent_bridge") # Reference to AgentBridge for hot reload
# Don't create .env file in __init__ to avoid issues during tool discovery
@@ -201,7 +202,8 @@ class EnvConfig(BaseTool):
"key": key,
"value": self._mask_value(value),
"description": description,
"exists": True
"exists": True,
"note": f"Value is masked for security. In bash, use ${key} directly — it is auto-injected."
})
else:
return ToolResult.success({

View File

@@ -7,6 +7,7 @@ from typing import Dict, Any
from agent.tools.base_tool import BaseTool, ToolResult
from agent.tools.utils.truncate import truncate_head, format_size, DEFAULT_MAX_BYTES
from common.utils import expand_path
DEFAULT_LIMIT = 500
@@ -51,7 +52,7 @@ class Ls(BaseTool):
absolute_path = self._resolve_path(path)
# Security check: Prevent accessing sensitive config directory
env_config_dir = os.path.expanduser("~/.cow")
env_config_dir = expand_path("~/.cow")
if os.path.abspath(absolute_path) == os.path.abspath(env_config_dir):
return ToolResult.fail(
"Error: Access denied. API keys and credentials must be accessed through the env_config tool only."
@@ -133,7 +134,7 @@ class Ls(BaseTool):
def _resolve_path(self, path: str) -> str:
"""Resolve path to absolute path"""
# Expand ~ to user home directory
path = os.path.expanduser(path)
path = expand_path(path)
if os.path.isabs(path):
return path
return os.path.abspath(os.path.join(self.cwd, path))

View File

@@ -77,7 +77,7 @@ class MemoryGetTool(BaseTool):
if not file_path.exists():
return ToolResult.fail(f"Error: File not found: {path}")
content = file_path.read_text()
content = file_path.read_text(encoding='utf-8')
lines = content.split('\n')
# Handle line range

View File

@@ -9,6 +9,7 @@ from pathlib import Path
from agent.tools.base_tool import BaseTool, ToolResult
from agent.tools.utils.truncate import truncate_head, format_size, DEFAULT_MAX_LINES, DEFAULT_MAX_BYTES
from common.utils import expand_path
class Read(BaseTool):
@@ -66,10 +67,12 @@ class Read(BaseTool):
:param args: Contains file path and optional offset/limit parameters
:return: File content or error message
"""
path = args.get("path", "").strip()
# Support 'location' as alias for 'path' (LLM may use it from skill listing)
path = args.get("path", "") or args.get("location", "")
path = path.strip() if isinstance(path, str) else ""
offset = args.get("offset")
limit = args.get("limit")
if not path:
return ToolResult.fail("Error: path parameter is required")
@@ -77,7 +80,7 @@ class Read(BaseTool):
absolute_path = self._resolve_path(path)
# Security check: Prevent reading sensitive config files
env_config_path = os.path.expanduser("~/.cow/.env")
env_config_path = expand_path("~/.cow/.env")
if os.path.abspath(absolute_path) == os.path.abspath(env_config_path):
return ToolResult.fail(
"Error: Access denied. API keys and credentials must be accessed through the env_config tool only."
@@ -129,7 +132,7 @@ class Read(BaseTool):
:return: Absolute path
"""
# Expand ~ to user home directory
path = os.path.expanduser(path)
path = expand_path(path)
if os.path.isabs(path):
return path
return os.path.abspath(os.path.join(self.cwd, path))

View File

@@ -6,6 +6,7 @@ import os
from typing import Optional
from config import conf
from common.log import logger
from common.utils import expand_path
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
@@ -31,7 +32,7 @@ def init_scheduler(agent_bridge) -> bool:
from agent.tools.scheduler.scheduler_service import SchedulerService
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
store_path = os.path.join(workspace_root, "scheduler", "tasks.json")
# Create task store
@@ -112,11 +113,15 @@ def _execute_agent_task(task: dict, agent_bridge):
logger.info(f"[Scheduler] Task {task['id']}: Executing agent task '{task_description}'")
# Create a unique session_id for this scheduled task to avoid polluting user's conversation
# Format: scheduler_<receiver>_<task_id> to ensure isolation
scheduler_session_id = f"scheduler_{receiver}_{task['id']}"
# Create context for Agent
context = Context(ContextType.TEXT, task_description)
context["receiver"] = receiver
context["isgroup"] = is_group
context["session_id"] = receiver
context["session_id"] = scheduler_session_id
# Channel-specific setup
if channel_type == "web":
@@ -140,7 +145,8 @@ def _execute_agent_task(task: dict, agent_bridge):
context["is_scheduled_task"] = True
try:
reply = agent_bridge.agent_reply(task_description, context=context, on_event=None, clear_history=True)
# Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations
reply = agent_bridge.agent_reply(task_description, context=context, on_event=None, clear_history=False)
if reply and reply.content:
# Send the reply via channel
@@ -378,6 +384,10 @@ def _execute_skill_call(task: dict, agent_bridge):
logger.info(f"[Scheduler] Task {task['id']}: Executing skill '{skill_name}' with params {skill_params}")
# Create a unique session_id for this scheduled task to avoid polluting user's conversation
# Format: scheduler_<receiver>_<task_id> to ensure isolation
scheduler_session_id = f"scheduler_{receiver}_{task['id']}"
# Build a natural language query for the Agent to execute the skill
# Format: "Use skill-name to do something with params"
param_str = ", ".join([f"{k}={v}" for k, v in skill_params.items()])
@@ -389,7 +399,7 @@ def _execute_skill_call(task: dict, agent_bridge):
context = Context(ContextType.TEXT, query)
context["receiver"] = receiver
context["isgroup"] = is_group
context["session_id"] = receiver
context["session_id"] = scheduler_session_id
# Channel-specific setup
if channel_type == "web":
@@ -402,7 +412,8 @@ def _execute_skill_call(task: dict, agent_bridge):
# Use Agent to execute the skill
try:
reply = agent_bridge.agent_reply(query, context=context, on_event=None, clear_history=True)
# Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations
reply = agent_bridge.agent_reply(query, context=context, on_event=None, clear_history=False)
if reply and reply.content:
content = reply.content

View File

@@ -20,7 +20,8 @@ class SchedulerTool(BaseTool):
name: str = "scheduler"
description: str = (
"创建、查询和管理定时任务。支持固定消息和AI任务两种类型\n\n"
"创建、查询和管理定时任务(提醒、周期性任务等)\n\n"
"⚠️ 重要:仅当需要「定时/提醒/每天/每周/X分钟后/X点」等延迟或周期执行时才使用此工具。"
"使用方法:\n"
"- 创建action='create', name='任务名', message/ai_task='内容', schedule_type='once/interval/cron', schedule_value='...'\n"
"- 查询action='list' / action='get', task_id='任务ID'\n"
@@ -53,7 +54,7 @@ class SchedulerTool(BaseTool):
},
"ai_task": {
"type": "string",
"description": "AI任务描述 (与message二选一)'搜索今日新闻''查询天气'"
"description": "AI任务描述 (与message二选一)用于定时让AI执行的任务"
},
"schedule_type": {
"type": "string",

View File

@@ -8,6 +8,7 @@ import threading
from datetime import datetime
from typing import Dict, List, Optional
from pathlib import Path
from common.utils import expand_path
class TaskStore:
@@ -24,7 +25,7 @@ class TaskStore:
"""
if store_path is None:
# Default to ~/cow/scheduler/tasks.json
home = os.path.expanduser("~")
home = expand_path("~")
store_path = os.path.join(home, "cow", "scheduler", "tasks.json")
self.store_path = store_path

View File

@@ -7,6 +7,7 @@ from typing import Dict, Any
from pathlib import Path
from agent.tools.base_tool import BaseTool, ToolResult
from common.utils import expand_path
class Send(BaseTool):
@@ -102,7 +103,7 @@ class Send(BaseTool):
def _resolve_path(self, path: str) -> str:
"""Resolve path to absolute path"""
path = os.path.expanduser(path)
path = expand_path(path)
if os.path.isabs(path):
return path
return os.path.abspath(os.path.join(self.cwd, path))

View File

@@ -0,0 +1,3 @@
from agent.tools.web_search.web_search import WebSearch
__all__ = ["WebSearch"]

View File

@@ -0,0 +1,322 @@
"""
Web Search tool - Search the web using Bocha or LinkAI search API.
Supports two backends with unified response format:
1. Bocha Search (primary, requires BOCHA_API_KEY)
2. LinkAI Search (fallback, requires LINKAI_API_KEY)
"""
import os
import json
from typing import Dict, Any, Optional
import requests
from agent.tools.base_tool import BaseTool, ToolResult
from common.log import logger
# Default timeout for API requests (seconds)
DEFAULT_TIMEOUT = 30
class WebSearch(BaseTool):
"""Tool for searching the web using Bocha or LinkAI search API"""
name: str = "web_search"
description: str = (
"Search the web for current information, news, research topics, or any real-time data. "
"Returns web page titles, URLs, snippets, and optional summaries. "
"Use this when the user asks about recent events, needs fact-checking, or wants up-to-date information."
)
params: dict = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query string"
},
"count": {
"type": "integer",
"description": "Number of results to return (1-50, default: 10)"
},
"freshness": {
"type": "string",
"description": (
"Time range filter. Options: "
"'noLimit' (default), 'oneDay', 'oneWeek', 'oneMonth', 'oneYear', "
"or date range like '2025-01-01..2025-02-01'"
)
},
"summary": {
"type": "boolean",
"description": "Whether to include text summary for each result (default: false)"
}
},
"required": ["query"]
}
def __init__(self, config: dict = None):
self.config = config or {}
self._backend = None # Will be resolved on first execute
@staticmethod
def is_available() -> bool:
"""Check if web search is available (at least one API key is configured)"""
return bool(os.environ.get("BOCHA_API_KEY") or os.environ.get("LINKAI_API_KEY"))
def _resolve_backend(self) -> Optional[str]:
"""
Determine which search backend to use.
Priority: Bocha > LinkAI
:return: 'bocha', 'linkai', or None
"""
if os.environ.get("BOCHA_API_KEY"):
return "bocha"
if os.environ.get("LINKAI_API_KEY"):
return "linkai"
return None
def execute(self, args: Dict[str, Any]) -> ToolResult:
"""
Execute web search
:param args: Search parameters (query, count, freshness, summary)
:return: Search results
"""
query = args.get("query", "").strip()
if not query:
return ToolResult.fail("Error: 'query' parameter is required")
count = args.get("count", 10)
freshness = args.get("freshness", "noLimit")
summary = args.get("summary", False)
# Validate count
if not isinstance(count, int) or count < 1 or count > 50:
count = 10
# Resolve backend
backend = self._resolve_backend()
if not backend:
return ToolResult.fail(
"Error: No search API key configured. "
"Please set BOCHA_API_KEY or LINKAI_API_KEY using env_config tool.\n"
" - Bocha Search: https://open.bocha.cn\n"
" - LinkAI Search: https://link-ai.tech"
)
try:
if backend == "bocha":
return self._search_bocha(query, count, freshness, summary)
else:
return self._search_linkai(query, count, freshness)
except requests.Timeout:
return ToolResult.fail(f"Error: Search request timed out after {DEFAULT_TIMEOUT}s")
except requests.ConnectionError:
return ToolResult.fail("Error: Failed to connect to search API")
except Exception as e:
logger.error(f"[WebSearch] Unexpected error: {e}", exc_info=True)
return ToolResult.fail(f"Error: Search failed - {str(e)}")
def _search_bocha(self, query: str, count: int, freshness: str, summary: bool) -> ToolResult:
"""
Search using Bocha API
:param query: Search query
:param count: Number of results
:param freshness: Time range filter
:param summary: Whether to include summary
:return: Formatted search results
"""
api_key = os.environ.get("BOCHA_API_KEY", "")
url = "https://api.bocha.cn/v1/web-search"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json"
}
payload = {
"query": query,
"count": count,
"freshness": freshness,
"summary": summary
}
logger.debug(f"[WebSearch] Bocha search: query='{query}', count={count}")
response = requests.post(url, headers=headers, json=payload, timeout=DEFAULT_TIMEOUT)
if response.status_code == 401:
return ToolResult.fail("Error: Invalid BOCHA_API_KEY. Please check your API key.")
if response.status_code == 403:
return ToolResult.fail("Error: Bocha API - insufficient balance. Please top up at https://open.bocha.cn")
if response.status_code == 429:
return ToolResult.fail("Error: Bocha API rate limit reached. Please try again later.")
if response.status_code != 200:
return ToolResult.fail(f"Error: Bocha API returned HTTP {response.status_code}")
data = response.json()
# Check API-level error code
api_code = data.get("code")
if api_code is not None and api_code != 200:
msg = data.get("msg") or "Unknown error"
return ToolResult.fail(f"Error: Bocha API error (code={api_code}): {msg}")
# Extract and format results
return self._format_bocha_results(data, query)
def _format_bocha_results(self, data: dict, query: str) -> ToolResult:
"""
Format Bocha API response into unified result structure
:param data: Raw API response
:param query: Original query
:return: Formatted ToolResult
"""
search_data = data.get("data", {})
web_pages = search_data.get("webPages", {})
pages = web_pages.get("value", [])
if not pages:
return ToolResult.success({
"query": query,
"backend": "bocha",
"total": 0,
"results": [],
"message": "No results found"
})
results = []
for page in pages:
result = {
"title": page.get("name", ""),
"url": page.get("url", ""),
"snippet": page.get("snippet", ""),
"siteName": page.get("siteName", ""),
"datePublished": page.get("datePublished") or page.get("dateLastCrawled", ""),
}
# Include summary only if present
if page.get("summary"):
result["summary"] = page["summary"]
results.append(result)
total = web_pages.get("totalEstimatedMatches", len(results))
return ToolResult.success({
"query": query,
"backend": "bocha",
"total": total,
"count": len(results),
"results": results
})
def _search_linkai(self, query: str, count: int, freshness: str) -> ToolResult:
"""
Search using LinkAI plugin API
:param query: Search query
:param count: Number of results
:param freshness: Time range filter
:return: Formatted search results
"""
api_key = os.environ.get("LINKAI_API_KEY", "")
url = "https://api.link-ai.tech/v1/plugin/execute"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"code": "web-search",
"args": {
"query": query,
"count": count,
"freshness": freshness
}
}
logger.debug(f"[WebSearch] LinkAI search: query='{query}', count={count}")
response = requests.post(url, headers=headers, json=payload, timeout=DEFAULT_TIMEOUT)
if response.status_code == 401:
return ToolResult.fail("Error: Invalid LINKAI_API_KEY. Please check your API key.")
if response.status_code != 200:
return ToolResult.fail(f"Error: LinkAI API returned HTTP {response.status_code}")
data = response.json()
if not data.get("success"):
msg = data.get("message") or "Unknown error"
return ToolResult.fail(f"Error: LinkAI search failed: {msg}")
return self._format_linkai_results(data, query)
def _format_linkai_results(self, data: dict, query: str) -> ToolResult:
"""
Format LinkAI API response into unified result structure.
LinkAI returns the search data in data.data field, which follows
the same Bing-compatible format as Bocha.
:param data: Raw API response
:param query: Original query
:return: Formatted ToolResult
"""
raw_data = data.get("data", "")
# LinkAI may return data as a JSON string
if isinstance(raw_data, str):
try:
raw_data = json.loads(raw_data)
except (json.JSONDecodeError, TypeError):
# If data is plain text, return it as a single result
return ToolResult.success({
"query": query,
"backend": "linkai",
"total": 1,
"count": 1,
"results": [{"content": raw_data}]
})
# If the response follows Bing-compatible structure
if isinstance(raw_data, dict):
web_pages = raw_data.get("webPages", {})
pages = web_pages.get("value", [])
if pages:
results = []
for page in pages:
result = {
"title": page.get("name", ""),
"url": page.get("url", ""),
"snippet": page.get("snippet", ""),
"siteName": page.get("siteName", ""),
"datePublished": page.get("datePublished") or page.get("dateLastCrawled", ""),
}
if page.get("summary"):
result["summary"] = page["summary"]
results.append(result)
total = web_pages.get("totalEstimatedMatches", len(results))
return ToolResult.success({
"query": query,
"backend": "linkai",
"total": total,
"count": len(results),
"results": results
})
# Fallback: return raw data
return ToolResult.success({
"query": query,
"backend": "linkai",
"total": 1,
"count": 1,
"results": [{"content": str(raw_data)}]
})

View File

@@ -8,6 +8,7 @@ from typing import Dict, Any
from pathlib import Path
from agent.tools.base_tool import BaseTool, ToolResult
from common.utils import expand_path
class Write(BaseTool):
@@ -90,7 +91,7 @@ class Write(BaseTool):
:return: Absolute path
"""
# Expand ~ to user home directory
path = os.path.expanduser(path)
path = expand_path(path)
if os.path.isabs(path):
return path
return os.path.abspath(os.path.join(self.cwd, path))

View File

@@ -13,6 +13,7 @@ from bridge.context import Context
from bridge.reply import Reply, ReplyType
from common import const
from common.log import logger
from common.utils import expand_path
from models.openai_compatible_bot import OpenAICompatibleBot
@@ -233,7 +234,8 @@ class AgentBridge:
enable_skills=kwargs.get("enable_skills", True), # Enable skills by default
memory_manager=kwargs.get("memory_manager"), # Pass memory manager
max_context_tokens=kwargs.get("max_context_tokens"),
context_reserve_tokens=kwargs.get("context_reserve_tokens")
context_reserve_tokens=kwargs.get("context_reserve_tokens"),
runtime_info=kwargs.get("runtime_info") # Pass runtime_info for dynamic time updates
)
# Log skill loading details
@@ -420,7 +422,7 @@ class AgentBridge:
}
# Use fixed secure location for .env file
env_file = os.path.expanduser("~/.cow/.env")
env_file = expand_path("~/.cow/.env")
# Read existing env vars from .env file
existing_env_vars = {}
@@ -492,39 +494,70 @@ class AgentBridge:
def refresh_all_skills(self) -> int:
"""
Refresh skills in all agent instances after environment variable changes.
This allows hot-reload of skills without restarting the agent.
Refresh skills and conditional tools in all agent instances after
environment variable changes. This allows hot-reload without restarting.
Returns:
Number of agent instances refreshed
"""
import os
from dotenv import load_dotenv
from config import conf
# Reload environment variables from .env file
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
env_file = os.path.join(workspace_root, '.env')
if os.path.exists(env_file):
load_dotenv(env_file, override=True)
logger.info(f"[AgentBridge] Reloaded environment variables from {env_file}")
refreshed_count = 0
# Refresh default agent
if self.default_agent and hasattr(self.default_agent, 'skill_manager'):
self.default_agent.skill_manager.refresh_skills()
refreshed_count += 1
logger.info("[AgentBridge] Refreshed skills in default agent")
# Refresh all session agents
# Collect all agent instances to refresh
agents_to_refresh = []
if self.default_agent:
agents_to_refresh.append(("default", self.default_agent))
for session_id, agent in self.agents.items():
if hasattr(agent, 'skill_manager'):
agents_to_refresh.append((session_id, agent))
for label, agent in agents_to_refresh:
# Refresh skills
if hasattr(agent, 'skill_manager') and agent.skill_manager:
agent.skill_manager.refresh_skills()
refreshed_count += 1
# Refresh conditional tools (e.g. web_search depends on API keys)
self._refresh_conditional_tools(agent)
refreshed_count += 1
if refreshed_count > 0:
logger.info(f"[AgentBridge] Refreshed skills in {refreshed_count} agent instance(s)")
return refreshed_count
logger.info(f"[AgentBridge] Refreshed skills & tools in {refreshed_count} agent instance(s)")
return refreshed_count
@staticmethod
def _refresh_conditional_tools(agent):
"""
Add or remove conditional tools based on current environment variables.
For example, web_search should only be present when BOCHA_API_KEY or
LINKAI_API_KEY is set.
"""
try:
from agent.tools.web_search.web_search import WebSearch
has_tool = any(t.name == "web_search" for t in agent.tools)
available = WebSearch.is_available()
if available and not has_tool:
# API key was added - inject the tool
tool = WebSearch()
tool.model = agent.model
agent.tools.append(tool)
logger.info("[AgentBridge] web_search tool added (API key now available)")
elif not available and has_tool:
# API key was removed - remove the tool
agent.tools = [t for t in agent.tools if t.name != "web_search"]
logger.info("[AgentBridge] web_search tool removed (API key no longer available)")
except Exception as e:
logger.debug(f"[AgentBridge] Failed to refresh conditional tools: {e}")

View File

@@ -11,6 +11,7 @@ from typing import Optional, List
from agent.protocol import Agent
from agent.tools import ToolManager
from common.log import logger
from common.utils import expand_path
class AgentInitializer:
@@ -46,7 +47,7 @@ class AgentInitializer:
from config import conf
# Get workspace from config
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
# Migrate API keys
self._migrate_config_to_env(workspace_root)
@@ -110,7 +111,8 @@ class AgentInitializer:
workspace_dir=workspace_root,
skill_manager=skill_manager,
enable_skills=True,
max_context_tokens=max_context_tokens
max_context_tokens=max_context_tokens,
runtime_info=runtime_info # Pass runtime_info for dynamic time updates
)
# Attach memory manager
@@ -121,7 +123,7 @@ class AgentInitializer:
def _load_env_file(self):
"""Load environment variables from .env file"""
env_file = os.path.expanduser("~/.cow/.env")
env_file = expand_path("~/.cow/.env")
if os.path.exists(env_file):
try:
from dotenv import load_dotenv
@@ -217,13 +219,20 @@ class AgentInitializer:
for tool_name in tool_manager.tool_classes.keys():
try:
# Skip web_search if no API key is available
if tool_name == "web_search":
from agent.tools.web_search.web_search import WebSearch
if not WebSearch.is_available():
logger.debug("[AgentInitializer] WebSearch skipped - no BOCHA_API_KEY or LINKAI_API_KEY")
continue
# Special handling for EnvConfig tool
if tool_name == "env_config":
from agent.tools import EnvConfig
tool = EnvConfig({"agent_bridge": self.agent_bridge})
else:
tool = tool_manager.create_tool(tool_name)
if tool:
# Apply workspace config to file operation tools
if tool_name in ['read', 'write', 'edit', 'bash', 'grep', 'find', 'ls']:
@@ -289,34 +298,40 @@ class AgentInitializer:
return None
def _get_runtime_info(self, workspace_root: str):
"""Get runtime information"""
"""Get runtime information with dynamic time support"""
from config import conf
now = datetime.datetime.now()
# Get timezone info
try:
offset = -time.timezone if not time.daylight else -time.altzone
hours = offset // 3600
minutes = (offset % 3600) // 60
timezone_name = f"UTC{hours:+03d}:{minutes:02d}" if minutes else f"UTC{hours:+03d}"
except Exception:
timezone_name = "UTC"
# Chinese weekday mapping
weekday_map = {
'Monday': '星期一', 'Tuesday': '星期二', 'Wednesday': '星期三',
'Thursday': '星期四', 'Friday': '星期五', 'Saturday': '星期六', 'Sunday': '星期日'
}
weekday_zh = weekday_map.get(now.strftime("%A"), now.strftime("%A"))
def get_current_time():
"""Get current time dynamically - called each time system prompt is accessed"""
now = datetime.datetime.now()
# Get timezone info
try:
offset = -time.timezone if not time.daylight else -time.altzone
hours = offset // 3600
minutes = (offset % 3600) // 60
timezone_name = f"UTC{hours:+03d}:{minutes:02d}" if minutes else f"UTC{hours:+03d}"
except Exception:
timezone_name = "UTC"
# Chinese weekday mapping
weekday_map = {
'Monday': '星期一', 'Tuesday': '星期二', 'Wednesday': '星期三',
'Thursday': '星期四', 'Friday': '星期五', 'Saturday': '星期六', 'Sunday': '星期日'
}
weekday_zh = weekday_map.get(now.strftime("%A"), now.strftime("%A"))
return {
'time': now.strftime("%Y-%m-%d %H:%M:%S"),
'weekday': weekday_zh,
'timezone': timezone_name
}
return {
"model": conf().get("model", "unknown"),
"workspace": workspace_root,
"channel": conf().get("channel_type", "unknown"),
"current_time": now.strftime("%Y-%m-%d %H:%M:%S"),
"weekday": weekday_zh,
"timezone": timezone_name
"_get_current_time": get_current_time # Dynamic time function
}
def _migrate_config_to_env(self, workspace_root: str):
@@ -331,7 +346,7 @@ class AgentInitializer:
"linkai_api_key": "LINKAI_API_KEY",
}
env_file = os.path.expanduser("~/.cow/.env")
env_file = expand_path("~/.cow/.env")
# Read existing env vars
existing_env_vars = {}

View File

@@ -24,6 +24,13 @@ class Bridge(object):
self.btype["chat"] = bot_type
else:
model_type = conf().get("model") or const.GPT_41_MINI
# Ensure model_type is string to prevent AttributeError when using startswith()
# This handles cases where numeric model names (e.g., "1") are parsed as integers from YAML
if not isinstance(model_type, str):
logger.warning(f"[Bridge] model_type is not a string: {model_type} (type: {type(model_type).__name__}), converting to string")
model_type = str(model_type)
if model_type in ["text-davinci-003"]:
self.btype["chat"] = const.OPEN_AI
if conf().get("use_azure_chatgpt", False):

View File

@@ -21,6 +21,7 @@ from dingtalk_stream.card_replier import CardReplier
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel
from common.utils import expand_path
from channel.dingtalk.dingtalk_message import DingTalkMessage
from common.expired_dict import ExpiredDict
from common.log import logger
@@ -276,7 +277,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler):
# 保存到临时文件
file_name = os.path.basename(file_path) or f"media_{uuid.uuid4()}"
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(workspace_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
temp_file = os.path.join(tmp_dir, file_name)
@@ -607,7 +608,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler):
def send(self, reply: Reply, context: Context):
logger.info(f"[DingTalk] send() called with reply.type={reply.type}, content_length={len(str(reply.content))}")
logger.debug(f"[DingTalk] send() called with reply.type={reply.type}, content_length={len(str(reply.content))}")
receiver = context["receiver"]
# Check if msg exists (for scheduled tasks, msg might be None)
@@ -647,7 +648,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler):
robot_code = msg.robot_code
if robot_code and robot_code != self._robot_code:
self._robot_code = robot_code
logger.info(f"[DingTalk] Cached robot_code: {robot_code}")
logger.debug(f"[DingTalk] Cached robot_code: {robot_code}")
isgroup = msg.is_group
incoming_message = msg.incoming_message

View File

@@ -9,6 +9,7 @@ from channel.chat_message import ChatMessage
# -*- coding=utf-8 -*-
from common.log import logger
from common.tmp_dir import TmpDir
from common.utils import expand_path
from config import conf
@@ -49,7 +50,7 @@ class DingTalkMessage(ChatMessage):
download_url = image_download_handler.get_image_download_url(download_code)
# 下载到工作空间 tmp 目录
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(workspace_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
@@ -67,7 +68,7 @@ class DingTalkMessage(ChatMessage):
self.ctype = ContextType.TEXT
# 下载到工作空间 tmp 目录
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(workspace_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)

View File

@@ -140,6 +140,23 @@ python3 app.py
**解决**: 安装依赖 `pip install lark-oapi`
### SSL证书验证失败
```
[Lark][ERROR] connect failed, err:[SSL:CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain
```
**原因**: 网络环境中存在自签名证书或SSL中间人代理(如企业代理、VPN等)
**解决**: 程序会自动检测SSL证书验证失败并自动重试禁用证书验证的连接。无需手动配置。
当遇到证书错误时,日志会显示:
```
[FeiShu] SSL certificate verification disabled due to certificate error. This may happen when using corporate proxy or self-signed certificates.
```
这是正常现象,程序会自动处理并继续运行。
### Webhook模式端口被占用
```

View File

@@ -13,6 +13,7 @@
import json
import os
import ssl
import threading
# -*- coding=utf-8 -*-
import uuid
@@ -107,23 +108,65 @@ class FeiShuChanel(ChatChannel):
.register_p2_im_message_receive_v1(handle_message_event) \
.build()
# 创建长连接客户端
ws_client = lark.ws.Client(
self.feishu_app_id,
self.feishu_app_secret,
event_handler=event_handler,
log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.INFO
)
# 尝试连接如果遇到SSL错误则自动禁用证书验证
def start_client_with_retry():
"""启动websocket客户端自动处理SSL证书错误"""
# 全局禁用SSL证书验证在导入lark_oapi之前设置
import ssl as ssl_module
# 保存原始的SSL上下文创建方法
original_create_default_context = ssl_module.create_default_context
def create_unverified_context(*args, **kwargs):
"""创建一个不验证证书的SSL上下文"""
context = original_create_default_context(*args, **kwargs)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
# 尝试正常连接如果失败则禁用SSL验证
for attempt in range(2):
try:
if attempt == 1:
# 第二次尝试禁用SSL验证
logger.warning("[FeiShu] SSL certificate verification disabled due to certificate error. "
"This may happen when using corporate proxy or self-signed certificates.")
ssl_module.create_default_context = create_unverified_context
ssl_module._create_unverified_context = create_unverified_context
ws_client = lark.ws.Client(
self.feishu_app_id,
self.feishu_app_secret,
event_handler=event_handler,
log_level=lark.LogLevel.DEBUG if conf().get("debug") else lark.LogLevel.INFO
)
logger.debug("[FeiShu] Websocket client starting...")
ws_client.start()
# 如果成功启动,跳出循环
break
except Exception as e:
error_msg = str(e)
# 检查是否是SSL证书验证错误
is_ssl_error = "CERTIFICATE_VERIFY_FAILED" in error_msg or "certificate verify failed" in error_msg.lower()
if is_ssl_error and attempt == 0:
# 第一次遇到SSL错误记录日志并继续循环下次会禁用验证
logger.warning(f"[FeiShu] SSL certificate verification failed: {error_msg}")
logger.info("[FeiShu] Retrying connection with SSL verification disabled...")
continue
else:
# 其他错误或禁用验证后仍失败,抛出异常
logger.error(f"[FeiShu] Websocket client error: {e}", exc_info=True)
# 恢复原始方法
ssl_module.create_default_context = original_create_default_context
raise
# 注意不恢复原始方法因为ws_client.start()会持续运行
# 在新线程中启动客户端,避免阻塞主线程
def start_client():
try:
logger.debug("[FeiShu] Websocket client starting...")
ws_client.start()
except Exception as e:
logger.error(f"[FeiShu] Websocket client error: {e}", exc_info=True)
ws_thread = threading.Thread(target=start_client, daemon=True)
ws_thread = threading.Thread(target=start_client_with_retry, daemon=True)
ws_thread.start()
# 保持主线程运行
@@ -176,7 +219,7 @@ class FeiShuChanel(ChatChannel):
# 处理文件缓存逻辑
from channel.file_cache import get_file_cache
file_cache = get_file_cache()
# 获取 session_id用于缓存关联
if is_group:
if conf().get("group_shared_session", True):
@@ -185,7 +228,7 @@ class FeiShuChanel(ChatChannel):
session_id = feishu_msg.from_user_id + "_" + msg.get("chat_id")
else:
session_id = feishu_msg.from_user_id
# 如果是单张图片消息,缓存起来
if feishu_msg.ctype == ContextType.IMAGE:
if hasattr(feishu_msg, 'image_path') and feishu_msg.image_path:
@@ -193,7 +236,7 @@ class FeiShuChanel(ChatChannel):
logger.info(f"[FeiShu] Image cached for session {session_id}, waiting for user query...")
# 单张图片不直接处理,等待用户提问
return
# 如果是文本消息,检查是否有缓存的文件
if feishu_msg.ctype == ContextType.TEXT:
cached_files = file_cache.get(session_id)
@@ -209,7 +252,7 @@ class FeiShuChanel(ChatChannel):
file_refs.append(f"[视频: {file_path}]")
else:
file_refs.append(f"[文件: {file_path}]")
feishu_msg.content = feishu_msg.content + "\n" + "\n".join(file_refs)
logger.info(f"[FeiShu] Attached {len(cached_files)} cached file(s) to user query")
# 清除缓存
@@ -258,26 +301,27 @@ class FeiShuChanel(ChatChannel):
self._send(text_reply, context)
import time
time.sleep(0.3) # 短暂延迟,确保文本先到达
# 判断是否为视频文件
file_path = reply.content
if file_path.startswith("file://"):
file_path = file_path[7:]
is_video = file_path.lower().endswith(('.mp4', '.avi', '.mov', '.wmv', '.flv'))
if is_video:
# 视频上传包含duration信息
upload_data = self._upload_video_url(reply.content, access_token)
if not upload_data or not upload_data.get('file_key'):
logger.warning("[FeiShu] upload video failed")
return
# 视频使用 media 类型(根据官方文档)
# 错误码 230055 说明:上传 mp4 时必须使用 msg_type="media"
msg_type = "media"
reply_content = upload_data # 完整的上传响应数据包含file_key和duration
logger.info(f"[FeiShu] Sending video: file_key={upload_data.get('file_key')}, duration={upload_data.get('duration')}ms")
logger.info(
f"[FeiShu] Sending video: file_key={upload_data.get('file_key')}, duration={upload_data.get('duration')}ms")
content_key = None # 直接序列化整个对象
else:
# 其他文件使用 file 类型
@@ -288,14 +332,14 @@ class FeiShuChanel(ChatChannel):
reply_content = file_key
msg_type = "file"
content_key = "file_key"
# Check if we can reply to an existing message (need msg_id)
can_reply = is_group and msg and hasattr(msg, 'msg_id') and msg.msg_id
# Build content JSON
content_json = json.dumps(reply_content) if content_key is None else json.dumps({content_key: reply_content})
logger.debug(f"[FeiShu] Sending message: msg_type={msg_type}, content={content_json[:200]}")
if can_reply:
# 群聊中回复已有消息
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg.msg_id}/reply"
@@ -320,7 +364,6 @@ class FeiShuChanel(ChatChannel):
else:
logger.error(f"[FeiShu] send message failed, code={res.get('code')}, msg={res.get('msg')}")
def fetch_access_token(self) -> str:
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
headers = {
@@ -342,35 +385,34 @@ class FeiShuChanel(ChatChannel):
else:
logger.error(f"[FeiShu] fetch token error, res={response}")
def _upload_image_url(self, img_url, access_token):
logger.debug(f"[FeiShu] start process image, img_url={img_url}")
# Check if it's a local file path (file:// protocol)
if img_url.startswith("file://"):
local_path = img_url[7:] # Remove "file://" prefix
logger.info(f"[FeiShu] uploading local file: {local_path}")
if not os.path.exists(local_path):
logger.error(f"[FeiShu] local file not found: {local_path}")
return None
# Upload directly from local file
upload_url = "https://open.feishu.cn/open-apis/im/v1/images"
data = {'image_type': 'message'}
headers = {'Authorization': f'Bearer {access_token}'}
with open(local_path, "rb") as file:
upload_response = requests.post(upload_url, files={"image": file}, data=data, headers=headers)
logger.info(f"[FeiShu] upload file, res={upload_response.content}")
response_data = upload_response.json()
if response_data.get("code") == 0:
return response_data.get("data").get("image_key")
else:
logger.error(f"[FeiShu] upload failed: {response_data}")
return None
# Original logic for HTTP URLs
response = requests.get(img_url)
suffix = utils.get_path_suffix(img_url)
@@ -406,7 +448,7 @@ class FeiShuChanel(ChatChannel):
"""
try:
import subprocess
# 使用 ffprobe 获取视频时长
cmd = [
'ffprobe',
@@ -415,7 +457,7 @@ class FeiShuChanel(ChatChannel):
'-of', 'default=noprint_wrappers=1:nokey=1',
file_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
duration_seconds = float(result.stdout.strip())
@@ -444,7 +486,7 @@ class FeiShuChanel(ChatChannel):
"""
local_path = None
temp_file = None
try:
# For file:// URLs (local files), upload directly
if video_url.startswith("file://"):
@@ -459,65 +501,67 @@ class FeiShuChanel(ChatChannel):
if response.status_code != 200:
logger.error(f"[FeiShu] download video failed, status={response.status_code}")
return None
# Save to temp file
import uuid
file_name = os.path.basename(video_url) or "video.mp4"
temp_file = str(uuid.uuid4()) + "_" + file_name
with open(temp_file, "wb") as file:
file.write(response.content)
logger.info(f"[FeiShu] Video downloaded, size={len(response.content)} bytes")
local_path = temp_file
# Get video duration
duration = self._get_video_duration(local_path)
# Upload to Feishu
file_name = os.path.basename(local_path)
file_ext = os.path.splitext(file_name)[1].lower()
file_type_map = {'.mp4': 'mp4'}
file_type = file_type_map.get(file_ext, 'mp4')
upload_url = "https://open.feishu.cn/open-apis/im/v1/files"
data = {
'file_type': file_type,
'file_type': file_type,
'file_name': file_name
}
# Add duration only if available (required for video/audio)
if duration:
data['duration'] = duration # Must be int, not string
headers = {'Authorization': f'Bearer {access_token}'}
logger.info(f"[FeiShu] Uploading video: file_name={file_name}, duration={duration}ms")
with open(local_path, "rb") as file:
upload_response = requests.post(
upload_url,
files={"file": file},
data=data,
headers=headers,
upload_url,
files={"file": file},
data=data,
headers=headers,
timeout=(5, 60)
)
logger.info(f"[FeiShu] upload video response, status={upload_response.status_code}, res={upload_response.content}")
logger.info(
f"[FeiShu] upload video response, status={upload_response.status_code}, res={upload_response.content}")
response_data = upload_response.json()
if response_data.get("code") == 0:
# Add duration to the response data (API doesn't return it)
upload_data = response_data.get("data")
upload_data['duration'] = duration # Add our calculated duration
logger.info(f"[FeiShu] Upload complete: file_key={upload_data.get('file_key')}, duration={duration}ms")
logger.info(
f"[FeiShu] Upload complete: file_key={upload_data.get('file_key')}, duration={duration}ms")
return upload_data
else:
logger.error(f"[FeiShu] upload video failed: {response_data}")
return None
except Exception as e:
logger.error(f"[FeiShu] upload video exception: {e}")
return None
finally:
# Clean up temp file
if temp_file and os.path.exists(temp_file):
@@ -532,20 +576,20 @@ class FeiShuChanel(ChatChannel):
Supports both local files (file://) and HTTP URLs
"""
logger.debug(f"[FeiShu] start process file, file_url={file_url}")
# Check if it's a local file path (file:// protocol)
if file_url.startswith("file://"):
local_path = file_url[7:] # Remove "file://" prefix
logger.info(f"[FeiShu] uploading local file: {local_path}")
if not os.path.exists(local_path):
logger.error(f"[FeiShu] local file not found: {local_path}")
return None
# Get file info
file_name = os.path.basename(local_path)
file_ext = os.path.splitext(file_name)[1].lower()
# Determine file type for Feishu API
# Feishu supports: opus, mp4, pdf, doc, xls, ppt, stream (other types)
file_type_map = {
@@ -557,23 +601,24 @@ class FeiShuChanel(ChatChannel):
'.ppt': 'ppt', '.pptx': 'ppt',
}
file_type = file_type_map.get(file_ext, 'stream') # Default to stream for other types
# Upload file to Feishu
upload_url = "https://open.feishu.cn/open-apis/im/v1/files"
data = {'file_type': file_type, 'file_name': file_name}
headers = {'Authorization': f'Bearer {access_token}'}
try:
with open(local_path, "rb") as file:
upload_response = requests.post(
upload_url,
files={"file": file},
data=data,
upload_url,
files={"file": file},
data=data,
headers=headers,
timeout=(5, 30) # 5s connect, 30s read timeout
)
logger.info(f"[FeiShu] upload file response, status={upload_response.status_code}, res={upload_response.content}")
logger.info(
f"[FeiShu] upload file response, status={upload_response.status_code}, res={upload_response.content}")
response_data = upload_response.json()
if response_data.get("code") == 0:
return response_data.get("data").get("file_key")
@@ -583,22 +628,22 @@ class FeiShuChanel(ChatChannel):
except Exception as e:
logger.error(f"[FeiShu] upload file exception: {e}")
return None
# For HTTP URLs, download first then upload
try:
response = requests.get(file_url, timeout=(5, 30))
if response.status_code != 200:
logger.error(f"[FeiShu] download file failed, status={response.status_code}")
return None
# Save to temp file
import uuid
file_name = os.path.basename(file_url)
temp_name = str(uuid.uuid4()) + "_" + file_name
with open(temp_name, "wb") as file:
file.write(response.content)
# Upload
file_ext = os.path.splitext(file_name)[1].lower()
file_type_map = {
@@ -608,18 +653,18 @@ class FeiShuChanel(ChatChannel):
'.ppt': 'ppt', '.pptx': 'ppt',
}
file_type = file_type_map.get(file_ext, 'stream')
upload_url = "https://open.feishu.cn/open-apis/im/v1/files"
data = {'file_type': file_type, 'file_name': file_name}
headers = {'Authorization': f'Bearer {access_token}'}
with open(temp_name, "rb") as file:
upload_response = requests.post(upload_url, files={"file": file}, data=data, headers=headers)
logger.info(f"[FeiShu] upload file, res={upload_response.content}")
response_data = upload_response.json()
os.remove(temp_name) # Clean up temp file
if response_data.get("code") == 0:
return response_data.get("data").get("file_key")
else:
@@ -636,7 +681,7 @@ class FeiShuChanel(ChatChannel):
context["origin_ctype"] = ctype
cmsg = context["msg"]
# Set session_id based on chat type
if cmsg.is_group:
# Group chat: check if group_shared_session is enabled
@@ -652,7 +697,7 @@ class FeiShuChanel(ChatChannel):
else:
# Private chat: use user_id only
context["session_id"] = cmsg.from_user_id
context["receiver"] = cmsg.other_user_id
if ctype == ContextType.TEXT:

View File

@@ -6,6 +6,7 @@ import requests
from common.log import logger
from common.tmp_dir import TmpDir
from common import utils
from common.utils import expand_path
from config import conf
@@ -31,7 +32,7 @@ class FeishuMessage(ChatMessage):
image_key = content.get("image_key")
# 下载图片到工作空间临时目录
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(workspace_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
image_path = os.path.join(tmp_dir, f"{image_key}.png")
@@ -97,7 +98,7 @@ class FeishuMessage(ChatMessage):
if image_keys:
# 如果包含图片,下载并在文本中引用本地路径
workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow"))
workspace_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(workspace_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)

View File

@@ -25,6 +25,7 @@ CLAUDE_35_SONNET = "claude-3-5-sonnet-latest" # 带 latest 标签的模型名
CLAUDE_35_SONNET_1022 = "claude-3-5-sonnet-20241022" # 带具体日期的模型名称,会固定为该日期发布的模型
CLAUDE_35_SONNET_0620 = "claude-3-5-sonnet-20240620"
CLAUDE_4_OPUS = "claude-opus-4-0"
CLAUDE_4_6_OPUS = "claude-opus-4-6" # Claude Opus 4.6 - Agent推荐模型
CLAUDE_4_SONNET = "claude-sonnet-4-0" # Claude Sonnet 4.0 - Agent推荐模型
CLAUDE_4_5_SONNET = "claude-sonnet-4-5" # Claude Sonnet 4.5 - Agent推荐模型
@@ -120,7 +121,7 @@ MODELSCOPE_MODEL_LIST = ["LLM-Research/c4ai-command-r-plus-08-2024","mistralai/M
MODEL_LIST = [
# Claude
CLAUDE3, CLAUDE_4_OPUS, CLAUDE_4_5_SONNET, CLAUDE_4_SONNET, CLAUDE_3_OPUS, CLAUDE_3_OPUS_0229,
CLAUDE3, CLAUDE_4_6_OPUS, CLAUDE_4_OPUS, CLAUDE_4_5_SONNET, CLAUDE_4_SONNET, CLAUDE_3_OPUS, CLAUDE_3_OPUS_0229,
CLAUDE_35_SONNET, CLAUDE_35_SONNET_1022, CLAUDE_35_SONNET_0620, CLAUDE_3_SONNET, CLAUDE_3_HAIKU,
"claude", "claude-3-haiku", "claude-3-sonnet", "claude-3-opus", "claude-3.5-sonnet",

View File

@@ -76,3 +76,42 @@ def remove_markdown_symbol(text: str):
if not text:
return text
return re.sub(r'\*\*(.*?)\*\*', r'\1', text)
def expand_path(path: str) -> str:
"""
Expand user path with proper Windows support.
On Windows, os.path.expanduser('~') may not work properly in some shells (like PowerShell).
This function provides a more robust path expansion.
Args:
path: Path string that may contain ~
Returns:
Expanded absolute path
"""
if not path:
return path
# Try standard expansion first
expanded = os.path.expanduser(path)
# If expansion didn't work (path still starts with ~), use HOME or USERPROFILE
if expanded.startswith('~'):
import platform
if platform.system() == 'Windows':
# On Windows, try USERPROFILE first, then HOME
home = os.environ.get('USERPROFILE') or os.environ.get('HOME')
else:
# On Unix-like systems, use HOME
home = os.environ.get('HOME')
if home:
# Replace ~ with home directory
if path == '~':
expanded = home
elif path.startswith('~/') or path.startswith('~\\'):
expanded = os.path.join(home, path[2:])
return expanded

View File

@@ -1,6 +1,6 @@
{
"channel_type": "web",
"model": "claude-sonnet-4-5",
"model": "glm-4.7",
"claude_api_key": "",
"claude_api_base": "https://api.anthropic.com/v1",
"open_ai_api_key": "",

View File

@@ -245,9 +245,9 @@ class Config(dict):
self.user_datas = pickle.load(f)
logger.debug("[Config] User datas loaded.")
except FileNotFoundError as e:
logger.info("[Config] User datas file not found, ignore.")
logger.debug("[Config] User datas file not found, ignore.")
except Exception as e:
logger.info("[Config] User datas error: {}".format(e))
logger.warning("[Config] User datas error: {}".format(e))
self.user_datas = {}
def save_user_datas(self):

View File

@@ -70,7 +70,7 @@ Cow项目从简单的聊天机器人全面升级为超级智能助理 **CowAgent
- **内置技能:** 在项目的`skills`目录下包含技能创造器、网络搜索、图像识别openai-image-vision、LinkAI智能体、网页抓取等。内置Skill根据依赖条件 (API Key、系统命令等) 自动判断是否启用。通过技能创造器可以快速创建自定义技能。
- **自定义技能:** 由用户通过对话创建,存放在工作空间中 (`~/cow/skills/`),基于自定义技能可以实现任何
- **自定义技能:** 由用户通过对话创建,存放在工作空间中 (`~/cow/skills/`),基于自定义技能可以实现任何复杂的业务流程和第三方系统对接。
#### 3.1 创建技能
@@ -82,7 +82,7 @@ Cow项目从简单的聊天机器人全面升级为超级智能助理 **CowAgent
#### 3.2 搜索和图像识别
- **搜索技能:** 系统内置实现了 `bocha-search`(博查搜索)的Skill依赖环境变量 `BOCHA_SEARCH_API_KEY`,可在[控制台]()进行创建并发送给Agent完成配置
- **搜索技能:** 系统内置实现了 `bocha-search`(博查搜索)的Skill依赖环境变量 `BOCHA_SEARCH_API_KEY`,可在[控制台](https://open.bochaai.com/)进行创建并发送给Agent完成配置
- **图像识别技能:** 实现了 `openai-image-vision` 插件,可使用 gpt-4.1-mini、gpt-4.1 等图像识别模型。依赖秘钥 `OPENAI_API_KEY`可通过config.json或env_config工具进行维护。
<img width="800" src="https://cdn.link-ai.tech/doc/20260202213219.png">
@@ -137,11 +137,11 @@ bash <(curl -sS https://cdn.link-ai.tech/code/cow/run.sh)
Agent模式推荐使用以下模型可根据效果及成本综合选择
- **MiniMax**: `MiniMax-M2.1`
- **GLM**: `glm-4.7`
- **Qwen**: `qwen3-max`
- **Claude**: `claude-sonnet-4-5``claude-sonnet-4-0`
- **Gemini**: `gemini-3-flash-preview``gemini-3-pro-preview`
- **GLM**: `glm-4.7`
- **MiniMax**: `MiniMax-M2.1`
- **Qwen**: `qwen3-max`
详细模型配置方式参考 [README.md 模型说明](../README.md#模型说明)

View File

@@ -125,10 +125,100 @@ class ChatGPTBot(Bot, OpenAIImage, OpenAICompatibleBot):
else:
reply = Reply(ReplyType.ERROR, retstring)
return reply
elif context.type == ContextType.IMAGE:
logger.info("[CHATGPT] Image message received")
reply = self.reply_image(context)
return reply
else:
reply = Reply(ReplyType.ERROR, "Bot不支持处理{}类型的消息".format(context.type))
return reply
def reply_image(self, context):
"""
Process image message using OpenAI Vision API
"""
import base64
import os
try:
image_path = context.content
logger.info(f"[CHATGPT] Processing image: {image_path}")
# Check if file exists
if not os.path.exists(image_path):
logger.error(f"[CHATGPT] Image file not found: {image_path}")
return Reply(ReplyType.ERROR, "图片文件不存在")
# Read and encode image
with open(image_path, "rb") as f:
image_data = f.read()
image_base64 = base64.b64encode(image_data).decode("utf-8")
# Detect image format
extension = os.path.splitext(image_path)[1].lower()
mime_type_map = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp"
}
mime_type = mime_type_map.get(extension, "image/jpeg")
# Get model and API config
model = context.get("gpt_model") or conf().get("model", "gpt-4o")
api_key = context.get("openai_api_key") or conf().get("open_ai_api_key")
api_base = conf().get("open_ai_api_base")
# Build vision request
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "请描述这张图片的内容"},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_base64}"
}
}
]
}
]
logger.info(f"[CHATGPT] Calling vision API with model: {model}")
# Call OpenAI API
kwargs = {
"model": model,
"messages": messages,
"max_tokens": 1000
}
if api_key:
kwargs["api_key"] = api_key
if api_base:
kwargs["api_base"] = api_base
response = openai.ChatCompletion.create(**kwargs)
content = response.choices[0]["message"]["content"]
logger.info(f"[CHATGPT] Vision API response: {content[:100]}...")
# Clean up temp file
try:
os.remove(image_path)
logger.debug(f"[CHATGPT] Removed temp image file: {image_path}")
except:
pass
return Reply(ReplyType.TEXT, content)
except Exception as e:
logger.error(f"[CHATGPT] Image processing error: {e}")
import traceback
logger.error(traceback.format_exc())
return Reply(ReplyType.ERROR, f"图片识别失败: {str(e)}")
def reply_text(self, session: ChatGPTSession, api_key=None, args=None, retry_count=0) -> dict:
"""
call openai's ChatCompletion to get the answer

View File

@@ -628,9 +628,28 @@ def _handle_linkai_stream_response(self, base_url, headers, body):
break
try:
chunk = json.loads(line)
yield chunk
except json.JSONDecodeError:
continue
# Check for error responses within the stream
# Some providers (e.g., MiniMax via LinkAI) return errors as:
# {'type': 'error', 'error': {'type': '...', 'message': '...', 'http_code': '400'}}
if chunk.get("type") == "error" or (
isinstance(chunk.get("error"), dict) and "message" in chunk.get("error", {})
):
error_data = chunk.get("error", {})
error_msg = error_data.get("message", "Unknown error") if isinstance(error_data, dict) else str(error_data)
http_code = error_data.get("http_code", "") if isinstance(error_data, dict) else ""
status_code = int(http_code) if http_code and str(http_code).isdigit() else 400
logger.error(f"[LinkAI] stream error: {error_msg} (http_code={http_code})")
yield {
"error": True,
"message": error_msg,
"status_code": status_code
}
return
yield chunk
except Exception as e:
logger.error(f"[LinkAI] stream response error: {e}")

View File

@@ -2,6 +2,7 @@
import time
import json
from pydantic.types import T
import requests
from models.bot import Bot
@@ -276,7 +277,7 @@ class MinimaxBot(Bot):
if isinstance(content, list):
# Extract text from content blocks
text_parts = []
tool_result = None
tool_results = []
for block in content:
if isinstance(block, dict):
@@ -284,11 +285,17 @@ class MinimaxBot(Bot):
text_parts.append(block.get("text", ""))
elif block.get("type") == "tool_result":
# Tool result should be a separate message with role="tool"
tool_result = {
tool_call_id = block.get("tool_use_id") or ""
if not tool_call_id:
logger.warning(f"[MINIMAX] tool_result missing tool_use_id")
result_content = block.get("content", "")
if not isinstance(result_content, str):
result_content = json.dumps(result_content, ensure_ascii=False)
tool_results.append({
"role": "tool",
"tool_call_id": block.get("tool_use_id"),
"content": str(block.get("content", ""))
}
"tool_call_id": tool_call_id,
"content": result_content
})
if text_parts:
converted.append({
@@ -296,7 +303,8 @@ class MinimaxBot(Bot):
"content": "\n".join(text_parts)
})
if tool_result:
# Add all tool results (not just the last one)
for tool_result in tool_results:
converted.append(tool_result)
else:
# Simple text content
@@ -546,16 +554,14 @@ class MinimaxBot(Bot):
# Optionally yield thinking as visible content
if show_thinking:
# Format thinking text for display
formatted_thinking = f"💭 {reasoning_text}"
# Yield as OpenAI-format content delta
# Yield thinking text as-is (without emoji decoration)
# The reasoning text will be displayed to users
yield {
"choices": [{
"index": 0,
"delta": {
"role": "assistant",
"content": formatted_thinking
"content": reasoning_text
}
}]
}

View File

@@ -230,43 +230,69 @@ class OpenAICompatibleBot:
if isinstance(content, list):
# Check if this is a tool result message (user role with tool_result blocks)
if role == "user" and any(block.get("type") == "tool_result" for block in content):
# Convert each tool_result block to a separate tool message
# Separate text content and tool_result blocks
text_parts = []
tool_results = []
for block in content:
if block.get("type") == "tool_result":
openai_messages.append({
"role": "tool",
"tool_call_id": block.get("tool_use_id"),
"content": block.get("content", "")
})
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif block.get("type") == "tool_result":
tool_results.append(block)
# First, add tool result messages (must come immediately after assistant with tool_calls)
for block in tool_results:
tool_call_id = block.get("tool_use_id") or ""
if not tool_call_id:
logger.warning(f"[OpenAICompatible] tool_result missing tool_use_id, using empty string")
# Ensure content is a string (some providers require string content)
result_content = block.get("content", "")
if not isinstance(result_content, str):
result_content = json.dumps(result_content, ensure_ascii=False)
openai_messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": result_content
})
# Then, add text content as a separate user message if present
if text_parts:
openai_messages.append({
"role": "user",
"content": " ".join(text_parts)
})
# Check if this is an assistant message with tool_use blocks
elif role == "assistant":
# Separate text content and tool_use blocks
text_parts = []
tool_calls = []
for block in content:
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif block.get("type") == "tool_use":
tool_id = block.get("id") or ""
if not tool_id:
logger.warning(f"[OpenAICompatible] tool_use missing id for '{block.get('name')}'")
tool_calls.append({
"id": block.get("id"),
"id": tool_id,
"type": "function",
"function": {
"name": block.get("name"),
"arguments": json.dumps(block.get("input", {}))
}
})
# Build OpenAI format assistant message
openai_msg = {
"role": "assistant",
"content": " ".join(text_parts) if text_parts else None
}
if tool_calls:
openai_msg["tool_calls"] = tool_calls
openai_messages.append(openai_msg)
else:
# Other list content, keep as is

View File

@@ -7,7 +7,16 @@ from config import conf
class ZhipuAIImage(object):
def __init__(self):
from zai import ZhipuAiClient
self.client = ZhipuAiClient(api_key=conf().get("zhipu_ai_api_key"))
# 初始化客户端,支持自定义 API base URL例如智谱国际版 z.ai
api_key = conf().get("zhipu_ai_api_key")
api_base = conf().get("zhipu_ai_api_base")
if api_base:
self.client = ZhipuAiClient(api_key=api_key, base_url=api_base)
logger.info(f"[ZHIPU_AI_IMAGE] 使用自定义 API Base URL: {api_base}")
else:
self.client = ZhipuAiClient(api_key=api_key)
logger.info("[ZHIPU_AI_IMAGE] 使用默认 API Base URL")
def create_img(self, query, retry_count=0, api_key=None, api_base=None):
try:

View File

@@ -24,7 +24,16 @@ class ZHIPUAIBot(Bot, ZhipuAIImage):
"temperature": conf().get("temperature", 0.9), # 值在(0,1)之间(智谱AI 的温度不能取 0 或者 1)
"top_p": conf().get("top_p", 0.7), # 值在(0,1)之间(智谱AI 的 top_p 不能取 0 或者 1)
}
self.client = ZhipuAiClient(api_key=conf().get("zhipu_ai_api_key"))
# 初始化客户端,支持自定义 API base URL例如智谱国际版 z.ai
api_key = conf().get("zhipu_ai_api_key")
api_base = conf().get("zhipu_ai_api_base")
if api_base:
self.client = ZhipuAiClient(api_key=api_key, base_url=api_base)
logger.info(f"[ZHIPU_AI] 使用自定义 API Base URL: {api_base}")
else:
self.client = ZhipuAiClient(api_key=api_key)
logger.info("[ZHIPU_AI] 使用默认 API Base URL")
def reply(self, query, context=None):
# acquire reply content
@@ -205,7 +214,7 @@ class ZHIPUAIBot(Bot, ZhipuAIImage):
request_params["thinking"] = thinking
elif "glm-4.7" in request_params["model"]:
# Enable thinking by default for GLM-4.7
request_params["thinking"] = {"type": "enabled"}
request_params["thinking"] = {"type": "disabled"}
# Make API call with ZhipuAI SDK
if stream:

View File

@@ -38,7 +38,7 @@ class AgentPlugin(Plugin):
"""Load configuration from config.yaml file."""
config_path = os.path.join(self.path, "config.yaml")
if not os.path.exists(config_path):
logger.warning(f"Config file not found at {config_path}")
logger.debug(f"Config file not found at {config_path}")
return {}
with open(config_path, 'r', encoding='utf-8') as f:

View File

@@ -51,7 +51,7 @@ class Banwords(Plugin):
self.reply_action = conf.get("reply_action", "ignore")
logger.debug("[Banwords] inited")
except Exception as e:
logger.warn("[Banwords] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/banwords .")
logger.debug("[Banwords] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/banwords .")
raise e
def on_handle_context(self, e_context: EventContext):

112
run.sh
View File

@@ -14,6 +14,15 @@ CYAN='\033[0;36m'
BOLD='\033[1m'
NC='\033[0m'
# Emojis
EMOJI_ROCKET="🚀"
EMOJI_COW="🐄"
EMOJI_CHECK="✅"
EMOJI_CROSS="❌"
EMOJI_WARN="⚠️"
EMOJI_STOP="🛑"
EMOJI_WRENCH="🔧"
# Check if using Bash
if [ -z "$BASH_VERSION" ]; then
echo -e "${RED}❌ Please run this script with Bash.${NC}"
@@ -123,7 +132,8 @@ clone_project() {
if [ -d "chatgpt-on-wechat" ]; then
echo -e "${YELLOW}⚠️ Directory 'chatgpt-on-wechat' already exists.${NC}"
read -p "Choose action: overwrite(o), backup(b), or quit(q)? [o/b/q]: " choice
read -p "Choose action: overwrite(o), backup(b), or quit(q)? [press Enter for default: b]: " choice
choice=${choice:-b}
case "$choice" in
o|O)
echo -e "${YELLOW}🗑️ Overwriting 'chatgpt-on-wechat' directory...${NC}"
@@ -260,17 +270,18 @@ select_model() {
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo -e "${CYAN}${BOLD} Select AI Model${NC}"
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo -e "${YELLOW}1) Claude (claude-sonnet-4-5, claude-opus-4-0, etc.)${NC}"
echo -e "${YELLOW}1) MiniMax (MiniMax-M2.1, MiniMax-M2.1-lightning, etc.)${NC}"
echo -e "${YELLOW}2) Zhipu AI (glm-4.7, glm-4.6, etc.)${NC}"
echo -e "${YELLOW}3) Gemini (gemini-3-flash-preview, gemini-2.5-pro, etc.)${NC}"
echo -e "${YELLOW}4) OpenAI GPT (gpt-5.2, gpt-4.1, etc.)${NC}"
echo -e "${YELLOW}5) Qwen (qwen3-max, qwen-plus, qwq-plus, etc.)${NC}"
echo -e "${YELLOW}6) MiniMax (MiniMax-M2.1, MiniMax-M2.1-lightning, etc.)${NC}"
echo -e "${YELLOW}3) Qwen (qwen3-max, qwen-plus, qwq-plus, etc.)${NC}"
echo -e "${YELLOW}4) Claude (claude-sonnet-4-5, claude-opus-4-0, etc.)${NC}"
echo -e "${YELLOW}5) Gemini (gemini-3-flash-preview, gemini-2.5-pro, etc.)${NC}"
echo -e "${YELLOW}6) OpenAI GPT (gpt-5.2, gpt-4.1, etc.)${NC}"
echo -e "${YELLOW}7) LinkAI (access multiple models via one API)${NC}"
echo ""
while true; do
read -p "Enter your choice [1-7]: " model_choice
read -p "Enter your choice [press Enter for default: 1 - MiniMax]: " model_choice
model_choice=${model_choice:-1}
case "$model_choice" in
1|2|3|4|5|6|7)
break
@@ -286,17 +297,14 @@ select_model() {
configure_model() {
case "$model_choice" in
1)
# Claude
echo -e "${GREEN}Configuring Claude...${NC}"
read -p "Enter Claude API Key: " claude_key
read -p "Enter model name [press Enter for default: claude-sonnet-4-5]: " model_name
model_name=${model_name:-claude-sonnet-4-5}
read -p "Enter API Base URL [press Enter for default: https://api.anthropic.com/v1]: " api_base
api_base=${api_base:-https://api.anthropic.com/v1}
# MiniMax
echo -e "${GREEN}Configuring MiniMax...${NC}"
read -p "Enter MiniMax API Key: " minimax_key
read -p "Enter model name [press Enter for default: MiniMax-M2.1]: " model_name
model_name=${model_name:-MiniMax-M2.1}
MODEL_NAME="$model_name"
CLAUDE_KEY="$claude_key"
CLAUDE_BASE="$api_base"
MINIMAX_KEY="$minimax_key"
;;
2)
# Zhipu AI
@@ -309,6 +317,29 @@ configure_model() {
ZHIPU_KEY="$zhipu_key"
;;
3)
# Qwen (DashScope)
echo -e "${GREEN}Configuring Qwen (DashScope)...${NC}"
read -p "Enter DashScope API Key: " dashscope_key
read -p "Enter model name [press Enter for default: qwen3-max]: " model_name
model_name=${model_name:-qwen3-max}
MODEL_NAME="$model_name"
DASHSCOPE_KEY="$dashscope_key"
;;
4)
# Claude
echo -e "${GREEN}Configuring Claude...${NC}"
read -p "Enter Claude API Key: " claude_key
read -p "Enter model name [press Enter for default: claude-sonnet-4-5]: " model_name
model_name=${model_name:-claude-sonnet-4-5}
read -p "Enter API Base URL [press Enter for default: https://api.anthropic.com/v1]: " api_base
api_base=${api_base:-https://api.anthropic.com/v1}
MODEL_NAME="$model_name"
CLAUDE_KEY="$claude_key"
CLAUDE_BASE="$api_base"
;;
5)
# Gemini
echo -e "${GREEN}Configuring Gemini...${NC}"
read -p "Enter Gemini API Key: " gemini_key
@@ -321,7 +352,7 @@ configure_model() {
GEMINI_KEY="$gemini_key"
GEMINI_BASE="$api_base"
;;
4)
6)
# OpenAI
echo -e "${GREEN}Configuring OpenAI GPT...${NC}"
read -p "Enter OpenAI API Key: " openai_key
@@ -334,32 +365,12 @@ configure_model() {
OPENAI_KEY="$openai_key"
OPENAI_BASE="$api_base"
;;
5)
# Qwen (DashScope)
echo -e "${GREEN}Configuring Qwen (DashScope)...${NC}"
read -p "Enter DashScope API Key: " dashscope_key
read -p "Enter model name [press Enter for default: qwen3-max]: " model_name
model_name=${model_name:-qwen3-max}
MODEL_NAME="$model_name"
DASHSCOPE_KEY="$dashscope_key"
;;
6)
# MiniMax
echo -e "${GREEN}Configuring MiniMax...${NC}"
read -p "Enter MiniMax API Key: " minimax_key
read -p "Enter model name [press Enter for default: MiniMax-M2.1]: " model_name
model_name=${model_name:-MiniMax-M2.1}
MODEL_NAME="$model_name"
MINIMAX_KEY="$minimax_key"
;;
7)
# LinkAI
echo -e "${GREEN}Configuring LinkAI...${NC}"
read -p "Enter LinkAI API Key: " linkai_key
read -p "Enter model name [press Enter for default: claude-sonnet-4-5]: " model_name
model_name=${model_name:-claude-sonnet-4-5}
read -p "Enter model name [press Enter for default: MiniMax-M2.1]: " model_name
model_name=${model_name:-MiniMax-M2.1}
MODEL_NAME="$model_name"
USE_LINKAI="true"
@@ -381,7 +392,7 @@ select_channel() {
echo ""
while true; do
read -p "Enter your choice [1-4, default: 1]: " channel_choice
read -p "Enter your choice [press Enter for default: 1 - Feishu]: " channel_choice
channel_choice=${channel_choice:-1}
case "$channel_choice" in
1|2|3|4)
@@ -498,7 +509,7 @@ EOF
cat > config.json <<EOF
{
"channel_type": "web",
"web_port": 9899,
"web_port": ${WEB_PORT},
"model": "${MODEL_NAME}",
"open_ai_api_key": "${OPENAI_KEY:-}",
"open_ai_api_base": "${OPENAI_BASE:-https://api.openai.com/v1}",
@@ -876,13 +887,20 @@ install_mode() {
configure_channel
create_config_file
# Show completion message
echo ""
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo -e "${GREEN}${BOLD}✅ Installation Complete!${NC}"
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo ""
show_usage
read -p "Start CowAgent now? [Y/n]: " start_now
if [[ ! $start_now == [Nn]* ]]; then
start_project
else
echo -e "${GREEN}✅ Installation complete!${NC}"
echo ""
echo -e "${CYAN}${BOLD}To start manually:${NC}"
echo -e "${YELLOW} cd ${BASE_DIR}${NC}"
echo -e "${YELLOW} ./run.sh start${NC}"
echo ""
echo -e "${CYAN}Or use nohup directly:${NC}"
echo -e "${YELLOW} nohup $PYTHON_CMD app.py > nohup.out 2>&1 & tail -f nohup.out${NC}"
fi
}
# Main function

View File

@@ -1,91 +0,0 @@
---
name: bocha-search
description: High-quality web search with AI-optimized results. Use when user needs to search the internet for current information, news, or research topics.
homepage: https://open.bocha.cn/
metadata:
emoji: 🔍
requires:
bins: ["curl"]
env: ["BOCHA_API_KEY"]
primaryEnv: "BOCHA_API_KEY"
---
# Bocha Search
High-quality web search powered by Bocha AI, optimized for AI consumption. Returns web pages, images, and detailed metadata.
## Setup
This skill requires a Bocha API key. If not configured:
1. Visit https://open.bocha.cn to get an API key
2. Set the key using: `env_config(action="set", key="BOCHA_API_KEY", value="your-key")`
3. Or manually add to `~/cow/.env`: `BOCHA_API_KEY=your-key`
## Usage
**Important**: Scripts are located relative to this skill's base directory.
When you see this skill in `<available_skills>`, note the `<base_dir>` path.
```bash
# General pattern:
bash "<base_dir>/scripts/search.sh" "<query>" [count] [freshness] [summary]
# Parameters:
# - query: Search query (required)
# - count: Number of results (1-50, default: 10)
# - freshness: Time range filter (default: noLimit)
# Options: noLimit, oneDay, oneWeek, oneMonth, oneYear, YYYY-MM-DD..YYYY-MM-DD
# - summary: Include text summary (true/false, default: false)
```
## Examples
### Basic search
```bash
bash "<base_dir>/scripts/search.sh" "latest AI news"
```
### Search with more results
```bash
bash "<base_dir>/scripts/search.sh" "Python tutorials" 20
```
### Search recent content with summary
```bash
bash "<base_dir>/scripts/search.sh" "阿里巴巴ESG报告" 10 oneWeek true
```
### Search specific date range
```bash
bash "<base_dir>/scripts/search.sh" "tech news" 15 "2025-01-01..2025-02-01"
```
## Response Format
The API returns structured data compatible with Bing Search API:
**Web Pages** (in `data.webPages.value`):
- `name`: Page title
- `url`: Page URL
- `snippet`: Short description
- `summary`: Full text summary (if requested)
- `siteName`: Website name
- `siteIcon`: Website icon URL
- `datePublished`: Publication date (UTC+8)
- `language`: Page language
**Images** (in `data.images.value`):
- `contentUrl`: Image URL
- `hostPageUrl`: Source page URL
- `width`, `height`: Image dimensions
- `thumbnailUrl`: Thumbnail URL
## Notes
- **Optimized for AI**: Results include summaries and structured metadata
- **Time range**: Use `noLimit` for best results (algorithm auto-optimizes time range)
- **Timeout**: 30 seconds
- **Rate limits**: Check your API plan at https://open.bocha.cn
- **Response format**: Compatible with Bing Search API for easy integration

View File

@@ -1,75 +0,0 @@
#!/usr/bin/env bash
# Bocha Web Search API wrapper
# API Docs: https://open.bocha.cn/
set -euo pipefail
query="${1:-}"
count="${2:-10}"
freshness="${3:-noLimit}"
summary="${4:-false}"
if [ -z "$query" ]; then
echo '{"error": "Query is required", "usage": "bash search.sh <query> [count] [freshness] [summary]"}'
exit 1
fi
if [ -z "${BOCHA_API_KEY:-}" ]; then
echo '{"error": "BOCHA_API_KEY environment variable is not set", "help": "Visit https://open.bocha.cn to get an API key"}'
exit 1
fi
# Validate count (1-50)
if ! [[ "$count" =~ ^[0-9]+$ ]] || [ "$count" -lt 1 ] || [ "$count" -gt 50 ]; then
count=10
fi
# Build JSON request body
request_body=$(cat <<EOF
{
"query": "$query",
"count": $count,
"freshness": "$freshness",
"summary": $summary
}
EOF
)
# Call Bocha API
response=$(curl -sS --max-time 30 \
-X POST \
-H "Authorization: Bearer $BOCHA_API_KEY" \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-d "$request_body" \
"https://api.bocha.cn/v1/web-search" 2>&1)
curl_exit_code=$?
if [ $curl_exit_code -ne 0 ]; then
echo "{\"error\": \"Failed to call Bocha API\", \"details\": \"$response\"}"
exit 1
fi
# Simple JSON validation - check if response starts with { or [
if [[ ! "$response" =~ ^[[:space:]]*[\{\[] ]]; then
echo "{\"error\": \"Invalid JSON response from API\", \"response\": \"$response\"}"
exit 1
fi
# Extract API code using grep and sed (basic JSON parsing)
api_code=$(echo "$response" | grep -o '"code"[[:space:]]*:[[:space:]]*[0-9]*' | grep -o '[0-9]*' | head -1)
# If code extraction failed or code is not 200, check for error
if [ -n "$api_code" ] && [ "$api_code" != "200" ]; then
# Try to extract error message
api_msg=$(echo "$response" | grep -o '"msg"[[:space:]]*:[[:space:]]*"[^"]*"' | sed 's/"msg"[[:space:]]*:[[:space:]]*"\(.*\)"/\1/' | head -1)
if [ -z "$api_msg" ]; then
api_msg="Unknown error"
fi
echo "{\"error\": \"API returned error\", \"code\": $api_code, \"message\": \"$api_msg\"}"
exit 1
fi
# Return the full response
echo "$response"

View File

@@ -1,6 +1,6 @@
---
name: skill-creator
description: Create or update skills. Use when designing, structuring, or packaging skills with scripts, references, and assets. COW simplified version - skills are used locally in workspace.
description: Create, install, or update skills in the workspace. Use when (1) installing a skill from a URL or remote source, (2) creating a new skill from scratch, (3) updating or restructuring existing skills. Always use this skill for any skill installation or creation task.
license: Complete terms in LICENSE.txt
---
@@ -93,9 +93,16 @@ Do NOT create auxiliary documentation files:
**Critical Rule**: Only create files that the agent will actually execute (scripts) or that are too large for SKILL.md (references). Documentation, examples, and guides ALL belong in SKILL.md.
## Skill Creation Process
## Installing a Skill from URL
**COW Simplified Version** - Skills are used locally, no packaging/sharing needed.
1. Fetch the URL content (curl or web-fetch skill)
2. Extract `name` from YAML frontmatter
3. Create directory `<workspace>/skills/<name>/` and save content as `SKILL.md`
4. Check the saved SKILL.md for an installation/setup section — if it defines additional steps (e.g., downloading scripts, installing dependencies), execute them; otherwise installation is complete
The `<workspace>` is the working directory from the "工作空间" section.
## Skill Creation Process (from scratch)
1. **Understand** - Clarify use cases with concrete examples
2. **Plan** - Identify needed scripts, references, assets
@@ -181,11 +188,13 @@ scripts/init_skill.py <skill-name> --path <output-directory> [--resources script
Examples:
```bash
scripts/init_skill.py my-skill --path ~/cow/skills
scripts/init_skill.py my-skill --path ~/cow/skills --resources scripts,references
scripts/init_skill.py my-skill --path ~/cow/skills --resources scripts --examples
scripts/init_skill.py my-skill --path <workspace>/skills
scripts/init_skill.py my-skill --path <workspace>/skills --resources scripts,references
scripts/init_skill.py my-skill --path <workspace>/skills --resources scripts --examples
```
Where `<workspace>` is your workspace directory shown in the "工作空间" section of the system prompt.
The script:
- Creates the skill directory at the specified path
@@ -195,7 +204,7 @@ The script:
After initialization, customize the SKILL.md and add resources as needed. If you used `--examples`, replace or delete placeholder files.
**Important**: Always create skills in workspace directory (`~/cow/skills`), NOT in project directory.
**Important**: Always create skills in workspace skills directory (`<workspace>/skills`), NOT in project directory. Check the "工作空间" section for the actual workspace path.
### Step 4: Edit the Skill
@@ -335,7 +344,7 @@ scripts/quick_validate.py <path/to/skill-folder>
Example:
```bash
scripts/quick_validate.py ~/cow/skills/weather-api
scripts/quick_validate.py <workspace>/skills/weather-api
```
Validation checks: