Compare commits

...

23 Commits

Author SHA1 Message Date
zhayujie
db16bdf8cb fix(cli): add security hardening for skill install and process management 2026-03-27 17:59:15 +08:00
zhayujie
158510cbbe feat(cli): imporve cow cli and skill hub integration 2026-03-26 16:49:42 +08:00
zhayujie
8fd029a4a1 feat(cli): support cow cli 2026-03-26 10:08:51 +08:00
zhayujie
3eb8348708 fix: docker volume permission issue and clean up unused dependencies 2026-03-25 01:25:34 +08:00
zhayujie
393f0c007c fix: context loss after trim 2026-03-24 20:49:28 +08:00
zhayujie
c062ca8c66 Merge pull request #2720 from 6vision/fix/deepseek-docs
Docs: update
2026-03-24 00:25:17 +08:00
6vision
76dcb25103 docs(deepseek): update model descriptions to V3.2 with thinking/non-thinking mode
Made-with: Cursor
2026-03-24 00:05:39 +08:00
6vision
c5b4f236db docs(deepseek): remove migration notes from zh and en docs
Made-with: Cursor
2026-03-24 00:05:39 +08:00
zhayujie
0974c940a8 Merge pull request #2719 from 6vision/feat/deepseek-bot
feat: add independent DeepSeek bot module with dedicated config
2026-03-23 22:42:58 +08:00
6vision
cffa20d37e docs(deepseek): remove migration notes to reduce user cognitive load
Made-with: Cursor
2026-03-23 22:39:15 +08:00
6vision
ef009edd29 docs(deepseek): update config guides for independent DeepSeek module
Update DeepSeek docs (zh/en/ja) and README to reflect the new dedicated deepseek_api_key / deepseek_api_base config fields, with backward compatibility notes.

Made-with: Cursor
2026-03-23 21:43:51 +08:00
zhayujie
3ca52b118d fix(weixin): qrcode url log 2026-03-23 21:33:53 +08:00
zhayujie
13f5fde4fb fix: rebuild system prompt from scratch on every turn 2026-03-23 21:27:44 +08:00
6vision
f512b55ec2 feat(deepseek): add independent DeepSeek bot module with dedicated config
Separate DeepSeek from ChatGPTBot into its own module (models/deepseek/) with dedicated deepseek_api_key and deepseek_api_base config fields, avoiding config conflicts when switching between providers. Backward compatible with old users who configured DeepSeek via open_ai_api_key/open_ai_api_base through automatic fallback.

Made-with: Cursor
2026-03-23 21:23:35 +08:00
zhayujie
22b8ca0095 feat: optimize vision image compression 2026-03-23 21:18:04 +08:00
zhayujie
baf66a103d fix(weixin): preserve original filename for received files 2026-03-23 01:18:02 +08:00
zhayujie
45faa9c1ff fix(wexin): resolve image/file send and receive failures 2026-03-23 00:13:41 +08:00
zhayujie
304381a88d fix: hide breadcrumb on mobile for better space utilization 2026-03-22 23:36:34 +08:00
zhayujie
fc9f54dbc8 feat(weixin): optimize login qrcode generate 2026-03-22 23:04:50 +08:00
zhayujie
7199dc187f fix: default gemini model 2026-03-22 22:52:37 +08:00
zhayujie
e9ae066d53 Merge pull request #2716 from cowagent/fix-gemini-model-attribute
fix: add missing model property to GoogleGeminiBot
2026-03-22 22:49:00 +08:00
cowagent
d71ae406ff fix: add missing model property to GoogleGeminiBot
api_key and api_base were refactored to @property but model was not
migrated, causing AttributeError: 'GoogleGeminiBot' object has no
attribute 'model' when using any Gemini model.
2026-03-22 22:43:26 +08:00
zhayujie
f3216904b3 feat(weixin): optimize weixin login qrcode 2026-03-22 21:34:47 +08:00
49 changed files with 2258 additions and 348 deletions

8
.gitignore vendored
View File

@@ -37,3 +37,11 @@ client_config.json
ref/
.cursor/
local/
# Build artifacts
dist/
build/
*.egg-info/
# CLI runtime
.cow.pid

View File

@@ -129,6 +129,9 @@ pip3 install -r requirements.txt
```bash
pip3 install -r requirements-optional.txt
```
> 国内网络可使用镜像源加速:`pip3 install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple`
如果某项依赖安装失败可注释掉对应的行后重试。
## 二、配置
@@ -155,6 +158,8 @@ pip3 install -r requirements-optional.txt
"claude_api_base": "https://api.anthropic.com/v1", # Claude API 地址,修改可接入三方代理平台
"gemini_api_key": "", # Gemini API Key
"gemini_api_base": "https://generativelanguage.googleapis.com", # Gemini API地址
"deepseek_api_key": "", # DeepSeek API Key
"deepseek_api_base": "https://api.deepseek.com/v1", # DeepSeek API 地址,可修改为第三方代理
"open_ai_api_key": "", # OpenAI API Key
"open_ai_api_base": "https://api.openai.com/v1", # OpenAI API 地址
"linkai_api_key": "", # LinkAI API Key
@@ -472,21 +477,31 @@ API Key创建在 [控制台](https://aistudio.google.com/app/apikey?hl=zh-cn)
2. 填写配置
方式一:官方接入(推荐):
```json
{
"model": "deepseek-chat",
"open_ai_api_key": "sk-xxxxxxxxxxx",
"open_ai_api_base": "https://api.deepseek.com/v1",
"bot_type": "openai"
"deepseek_api_key": "sk-xxxxxxxxxxx"
}
```
- `bot_type`: OpenAI兼容方式
- `model`: 可填 `deepseek-chat、deepseek-reasoner`,分别对应的是 DeepSeek-V3 和 DeepSeek-R1 模型
- `open_ai_api_key`: DeepSeek平台的 API Key
- `open_ai_api_base`: DeepSeek平台 BASE URL
</details>
- `model`: 可填 `deepseek-chat、deepseek-reasoner`,分别对应的是 DeepSeek-V3.2(非思考模式)和 DeepSeek-R1思考模式
- `deepseek_api_key`: DeepSeek平台的 API Key
- `deepseek_api_base`: 可选,默认为 `https://api.deepseek.com/v1`,可修改为第三方代理地址
方式二OpenAI兼容方式接入
```json
{
"model": "deepseek-chat",
"bot_type": "openai",
"open_ai_api_key": "sk-xxxxxxxxxxx",
"open_ai_api_base": "https://api.deepseek.com/v1"
}
```
</details>
<details>
<summary>Azure</summary>

View File

@@ -166,10 +166,56 @@ class ChatService:
logger.info("[ChatService] Cleared agent message history after executor recovery")
raise
# Append only the NEW messages from this execution (thread-safe)
# Sync executor messages back to agent (thread-safe).
# The executor may have trimmed context, making its list shorter than
# original_length. In that case we must replace entirely — just
# appending would leave stale pre-trim messages in agent.messages
# and cause the same trim to fire on every subsequent request.
with agent.messages_lock:
new_messages = executor.messages[original_length:]
agent.messages.extend(new_messages)
trimmed = len(executor.messages) < original_length
if trimmed:
# Context was trimmed: the executor appended the new user
# query *before* trimming, so the new messages (user +
# assistant + tools) sit at the tail of the trimmed list.
# We cannot simply slice at original_length (it exceeds the
# list length). Instead, count how many messages the
# executor added on top of the post-trim baseline.
#
# Timeline inside executor.run_stream:
# 1. messages had `original_length` items
# 2. append user query → original_length + 1
# 3. _trim_messages() → some smaller number (includes the
# user query because it belongs to the last turn)
# 4. LLM replies / tool calls appended
#
# The user query message is always the first message of the
# last turn (it cannot be trimmed away), so we locate it to
# find where "new" messages begin.
new_start = original_length # fallback
for idx in range(len(executor.messages) - 1, -1, -1):
msg = executor.messages[idx]
if msg.get("role") == "user":
content = msg.get("content", [])
is_user_query = False
if isinstance(content, list):
has_text = any(
isinstance(b, dict) and b.get("type") == "text"
for b in content
)
has_tool_result = any(
isinstance(b, dict) and b.get("type") == "tool_result"
for b in content
)
is_user_query = has_text and not has_tool_result
elif isinstance(content, str):
is_user_query = True
if is_user_query:
new_start = idx
break
new_messages = list(executor.messages[new_start:])
else:
new_messages = list(executor.messages[original_length:])
agent.messages = list(executor.messages)
# Persist new messages to SQLite so they survive restarts and
# can be queried via the HISTORY interface.

View File

@@ -376,9 +376,9 @@ def _build_workspace_section(workspace_dir: str, language: str) -> List[str]:
"",
"以下文件在会话启动时**已经自动加载**到系统提示词的「项目上下文」section 中,你**无需再用 read 工具读取它们**",
"",
"- ✅ `AGENT.md`: 已加载 - 你的人格和灵魂设定。当你的名字、性格或交流风格发生变化时,主动用 `edit` 更新此文件",
"- ✅ `AGENT.md`: 已加载 - 你的人格和灵魂设定,请严格遵循。当你的名字、性格或交流风格发生变化时,主动用 `edit` 更新此文件",
"- ✅ `USER.md`: 已加载 - 用户的身份信息。当用户修改称呼、姓名等身份信息时,用 `edit` 更新此文件",
"- ✅ `RULE.md`: 已加载 - 工作空间使用指南和规则",
"- ✅ `RULE.md`: 已加载 - 工作空间使用指南和规则,请严格遵循",
"",
"**交流规范**:",
"",
@@ -423,7 +423,7 @@ def _build_context_files_section(context_files: List[ContextFile], language: str
]
if has_agent:
lines.append("**`AGENT.md` 是你的灵魂文件**:严格体现其中定义的人格、语气和设定,避免僵硬、模板化的回复。")
lines.append("**`AGENT.md` 是你的灵魂文件**:严格遵循其中定义的人格、规则、语气和设定,避免僵硬、模板化的回复。")
lines.append("当用户通过对话透露了对你性格、风格、职责、能力边界的新期望,你应该主动用 `edit` 更新 AGENT.md 以反映这些演变。")
lines.append("")

View File

@@ -358,7 +358,7 @@ _你刚刚启动这是你的第一次对话。_
- 你希望给我起个什么名字?
- 我该怎么称呼你?
- 你希望我们是什么样的交流风格?(一行列举选项:如专业严谨、轻松幽默、温暖友好、简洁高效等)
4. **风格要求**:温暖自然、简洁清晰,整体控制在 100 字以内
4. **风格要求**:温暖自然、简洁清晰,整体控制在 100 字以内,适当使用 emoji 让表达更生动有趣
5. 能力介绍和交流风格选项都只要一行,保持精简
6. 不要问太多其他信息(职业、时区等可以后续自然了解)

View File

@@ -100,138 +100,31 @@ class Agent:
def get_full_system_prompt(self, skill_filter=None) -> str:
"""
Get the full system prompt including skills.
Build the complete system prompt from scratch every time.
Note: Skills are now built into the system prompt by PromptBuilder,
so we just return the base prompt directly. This method is kept for
backward compatibility.
:param skill_filter: Optional list of skill names to include (deprecated)
:return: Complete system prompt
"""
prompt = self.system_prompt
# Rebuild tool list section to reflect current self.tools
prompt = self._rebuild_tool_list_section(prompt)
# If runtime_info contains dynamic time function, rebuild runtime section
if self.runtime_info and callable(self.runtime_info.get('_get_current_time')):
prompt = self._rebuild_runtime_section(prompt)
# Rebuild skills section to pick up newly installed/removed skills
if self.skill_manager:
prompt = self._rebuild_skills_section(prompt)
return prompt
def _rebuild_runtime_section(self, prompt: str) -> str:
"""
Rebuild runtime info section with current time.
This method dynamically updates the runtime info section by calling
the _get_current_time function from runtime_info.
:param prompt: Original system prompt
:return: Updated system prompt with current runtime info
Re-reads AGENT.md / USER.md / RULE.md from disk, refreshes skills,
tools, and runtime info so any change takes effect immediately.
Falls back to the cached self.system_prompt on error.
"""
try:
# Get current time dynamically
time_info = self.runtime_info['_get_current_time']()
# Build new runtime section
runtime_lines = [
"\n## 运行时信息\n",
"\n",
f"当前时间: {time_info['time']} {time_info['weekday']} ({time_info['timezone']})\n",
"\n"
]
# Add other runtime info
runtime_parts = []
if self.runtime_info.get("model"):
runtime_parts.append(f"模型={self.runtime_info['model']}")
if self.runtime_info.get("workspace"):
# Replace backslashes with forward slashes for Windows paths
workspace_path = str(self.runtime_info['workspace']).replace('\\', '/')
runtime_parts.append(f"工作空间={workspace_path}")
if self.runtime_info.get("channel") and self.runtime_info.get("channel") != "web":
runtime_parts.append(f"渠道={self.runtime_info['channel']}")
if runtime_parts:
runtime_lines.append("运行时: " + " | ".join(runtime_parts) + "\n")
runtime_lines.append("\n")
new_runtime_section = "".join(runtime_lines)
# Find and replace the runtime section
import re
pattern = r'\n## 运行时信息\s*\n.*?(?=\n##|\Z)'
_repl = new_runtime_section.rstrip('\n')
updated_prompt = re.sub(pattern, lambda m: _repl, prompt, flags=re.DOTALL)
return updated_prompt
from agent.prompt import load_context_files, PromptBuilder
if self.skill_manager:
self.skill_manager.refresh_skills()
context_files = load_context_files(self.workspace_dir) if self.workspace_dir else None
builder = PromptBuilder(workspace_dir=self.workspace_dir or "", language="zh")
return builder.build(
tools=self.tools,
context_files=context_files,
skill_manager=self.skill_manager,
memory_manager=self.memory_manager,
runtime_info=self.runtime_info,
)
except Exception as e:
logger.warning(f"Failed to rebuild runtime section: {e}")
return prompt
def _rebuild_skills_section(self, prompt: str) -> str:
"""
Rebuild the <available_skills> block so that newly installed or
removed skills are reflected without re-creating the agent.
"""
try:
import re
self.skill_manager.refresh_skills()
new_skills_xml = self.skill_manager.build_skills_prompt()
old_block_pattern = r'<available_skills>.*?</available_skills>'
has_old_block = re.search(old_block_pattern, prompt, flags=re.DOTALL)
# Extract the new <available_skills>...</available_skills> tag from the prompt
new_block = ""
if new_skills_xml and new_skills_xml.strip():
m = re.search(old_block_pattern, new_skills_xml, flags=re.DOTALL)
if m:
new_block = m.group(0)
if has_old_block:
replacement = new_block or "<available_skills>\n</available_skills>"
# Use lambda to prevent re.sub from interpreting backslashes in replacement
# (e.g. Windows paths like \LinkAI would be treated as bad escape sequences)
prompt = re.sub(old_block_pattern, lambda m: replacement, prompt, flags=re.DOTALL)
elif new_block:
skills_header = "以下是可用技能:"
idx = prompt.find(skills_header)
if idx != -1:
insert_pos = idx + len(skills_header)
prompt = prompt[:insert_pos] + "\n" + new_block + prompt[insert_pos:]
except Exception as e:
logger.warning(f"Failed to rebuild skills section: {e}")
return prompt
def _rebuild_tool_list_section(self, prompt: str) -> str:
"""
Rebuild the tool list inside the '## 工具系统' section so that it
always reflects the current ``self.tools`` (handles dynamic add/remove
of conditional tools like web_search).
"""
import re
from agent.prompt.builder import _build_tooling_section
try:
if not self.tools:
return prompt
new_lines = _build_tooling_section(self.tools, "zh")
new_section = "\n".join(new_lines).rstrip("\n")
# Replace existing tooling section
pattern = r'## 工具系统\s*\n.*?(?=\n## |\Z)'
updated = re.sub(pattern, lambda m: new_section, prompt, count=1, flags=re.DOTALL)
return updated
except Exception as e:
logger.warning(f"Failed to rebuild tool list section: {e}")
return prompt
logger.warning(f"Failed to rebuild system prompt, using cached version: {e}")
return self.system_prompt
def refresh_skills(self):
"""Refresh the loaded skills."""

View File

@@ -139,6 +139,47 @@ def should_include_skill(
return True
def get_missing_requirements(
entry: SkillEntry,
current_platform: Optional[str] = None,
) -> Dict[str, List[str]]:
"""
Return a dict of missing requirements for a skill.
Empty dict means all requirements are met.
:param entry: SkillEntry to check
:param current_platform: Current platform (default: auto-detect)
:return: Dict like {"bins": ["curl"], "env": ["API_KEY"]}
"""
missing: Dict[str, List[str]] = {}
metadata = entry.metadata
if not metadata or not metadata.requires:
return missing
required_bins = metadata.requires.get('bins', [])
if required_bins:
missing_bins = [b for b in required_bins if not has_binary(b)]
if missing_bins:
missing['bins'] = missing_bins
any_bins = metadata.requires.get('anyBins', [])
if any_bins and not has_any_binary(any_bins):
missing['anyBins'] = any_bins
required_env = metadata.requires.get('env', [])
if required_env:
missing_env = [e for e in required_env if not has_env_var(e)]
if missing_env:
missing['env'] = missing_env
any_env = metadata.requires.get('anyEnv', [])
if any_env and not any(has_env_var(e) for e in any_env):
missing['anyEnv'] = any_env
return missing
def is_config_path_truthy(config: Dict, path: str) -> bool:
"""
Check if a config path resolves to a truthy value.

View File

@@ -2,7 +2,7 @@
Skill formatter for generating prompts from skills.
"""
from typing import List
from typing import Dict, List
from agent.skills.types import Skill, SkillEntry
@@ -51,6 +51,71 @@ def format_skill_entries_for_prompt(entries: List[SkillEntry]) -> str:
return format_skills_for_prompt(skills)
def format_unavailable_skills_for_prompt(
entries: List[SkillEntry],
missing_map: Dict[str, Dict[str, List[str]]],
) -> str:
"""
Format unavailable (requires-not-met) skills as brief setup hints
so the AI can guide users to configure them.
:param entries: List of unavailable skill entries
:param missing_map: Dict mapping skill name to its missing requirements
:return: Formatted prompt text
"""
if not entries:
return ""
lines = [
"",
"<unavailable_skills>",
"The following skills are installed but not yet ready. "
"Guide the user to complete the setup when relevant.",
]
for entry in entries:
skill = entry.skill
missing = missing_map.get(skill.name, {})
missing_parts = []
for key, values in missing.items():
missing_parts.append(f"{key}: {', '.join(values)}")
missing_str = "; ".join(missing_parts) if missing_parts else "unknown"
setup_hint = _extract_setup_hint(skill)
lines.append(" <skill>")
lines.append(f" <name>{_escape_xml(skill.name)}</name>")
lines.append(f" <description>{_escape_xml(skill.description)}</description>")
lines.append(f" <missing>{_escape_xml(missing_str)}</missing>")
if setup_hint:
lines.append(f" <setup>{_escape_xml(setup_hint)}</setup>")
lines.append(" </skill>")
lines.append("</unavailable_skills>")
return "\n".join(lines)
def _extract_setup_hint(skill: Skill) -> str:
"""
Extract the Setup section from SKILL.md content as a brief hint.
Returns the first few lines of the ## Setup section.
"""
content = skill.content
if not content:
return ""
import re
match = re.search(r'^##\s+Setup\s*\n(.*?)(?=\n##\s|\Z)', content, re.MULTILINE | re.DOTALL)
if not match:
return ""
setup_text = match.group(1).strip()
lines = setup_text.split('\n')
hint_lines = [l.strip() for l in lines[:6] if l.strip()]
return ' '.join(hint_lines)[:300]
def _escape_xml(text: str) -> str:
"""Escape XML special characters."""
return (text

View File

@@ -128,6 +128,7 @@ def parse_metadata(frontmatter: Dict[str, Any]) -> Optional[SkillMetadata]:
return SkillMetadata(
always=meta_obj.get('always', False),
default_enabled=meta_obj.get('default_enabled', True),
skill_key=meta_obj.get('skillKey'),
primary_env=meta_obj.get('primaryEnv'),
emoji=meta_obj.get('emoji'),

View File

@@ -184,7 +184,6 @@ class SkillLoader:
config_path = os.path.join(skill_dir, "config.json")
# Without config.json, skip this skill entirely (return empty to trigger exclusion)
if not os.path.exists(config_path):
logger.debug(f"[SkillLoader] linkai-agent skipped: no config.json found")
return ""

View File

@@ -84,10 +84,10 @@ class SkillManager:
"""
Merge directory-scanned skills with the persisted config file.
- New skills discovered on disk are added with enabled=True.
- New skills: use metadata.default_enabled as initial enabled state.
- Existing skills: preserve their persisted enabled state.
- Skills that no longer exist on disk are removed.
- Existing entries preserve their enabled state; name/description/source
are refreshed from the latest scan.
- name/description/source are always refreshed from the latest scan.
"""
saved = self._load_skills_config()
merged: Dict[str, dict] = {}
@@ -95,13 +95,18 @@ class SkillManager:
for name, entry in self.skills.items():
skill = entry.skill
prev = saved.get(name, {})
# category priority: persisted config (set by cloud) > default "skill"
category = prev.get("category", "skill")
if name in saved:
enabled = prev.get("enabled", True)
else:
enabled = entry.metadata.default_enabled if entry.metadata else True
merged[name] = {
"name": name,
"description": skill.description,
"source": skill.source,
"enabled": prev.get("enabled", True),
"enabled": enabled,
"category": category,
}
@@ -157,69 +162,114 @@ class SkillManager:
"""
return list(self.skills.values())
@staticmethod
def _normalize_skill_filter(skill_filter: Optional[List[str]]) -> Optional[List[str]]:
"""Normalize a skill_filter list into a flat list of stripped names."""
if skill_filter is None:
return None
normalized = []
for item in skill_filter:
if isinstance(item, str):
name = item.strip()
if name:
normalized.append(name)
elif isinstance(item, list):
for subitem in item:
if isinstance(subitem, str):
name = subitem.strip()
if name:
normalized.append(name)
return normalized or None
def filter_skills(
self,
skill_filter: Optional[List[str]] = None,
include_disabled: bool = False,
) -> List[SkillEntry]:
"""
Filter skills based on criteria.
Simple rule: Skills are auto-enabled if requirements are met.
- Has required API keys -> included
- Missing API keys -> excluded
Filter skills that are eligible (enabled + requirements met).
:param skill_filter: List of skill names to include (None = all)
:param include_disabled: Whether to include disabled skills
:return: Filtered list of skill entries
:return: Filtered list of eligible skill entries
"""
from agent.skills.config import should_include_skill
entries = list(self.skills.values())
# Check requirements (platform, binaries, env vars)
entries = [e for e in entries if should_include_skill(e, self.config)]
# Apply skill filter
if skill_filter is not None:
normalized = []
for item in skill_filter:
if isinstance(item, str):
name = item.strip()
if name:
normalized.append(name)
elif isinstance(item, list):
for subitem in item:
if isinstance(subitem, str):
name = subitem.strip()
if name:
normalized.append(name)
if normalized:
entries = [e for e in entries if e.skill.name in normalized]
normalized = self._normalize_skill_filter(skill_filter)
if normalized is not None:
entries = [e for e in entries if e.skill.name in normalized]
# Filter out disabled skills based on skills_config.json
if not include_disabled:
entries = [e for e in entries if self.is_skill_enabled(e.skill.name)]
return entries
def filter_unavailable_skills(
self,
skill_filter: Optional[List[str]] = None,
) -> tuple:
"""
Find skills that are enabled but have unmet requirements.
:param skill_filter: Optional list of skill names to include
:return: Tuple of (entries, missing_map) where missing_map maps
skill name to its missing requirements dict
"""
from agent.skills.config import should_include_skill, get_missing_requirements
entries = list(self.skills.values())
# Only enabled skills
entries = [e for e in entries if self.is_skill_enabled(e.skill.name)]
normalized = self._normalize_skill_filter(skill_filter)
if normalized is not None:
entries = [e for e in entries if e.skill.name in normalized]
# Keep only those that fail should_include_skill (requirements not met)
unavailable = []
missing_map: Dict[str, dict] = {}
for e in entries:
if not should_include_skill(e, self.config):
missing = get_missing_requirements(e)
if missing:
unavailable.append(e)
missing_map[e.skill.name] = missing
return unavailable, missing_map
def build_skills_prompt(
self,
skill_filter: Optional[List[str]] = None,
) -> str:
"""
Build a formatted prompt containing available skills.
Build a formatted prompt containing available skills
and brief hints for unavailable ones.
:param skill_filter: Optional list of skill names to include
:return: Formatted skills prompt
"""
from common.log import logger
entries = self.filter_skills(skill_filter=skill_filter, include_disabled=False)
logger.debug(f"[SkillManager] Filtered {len(entries)} skills for prompt (total: {len(self.skills)})")
if entries:
skill_names = [e.skill.name for e in entries]
logger.debug(f"[SkillManager] Skills to include: {skill_names}")
result = format_skill_entries_for_prompt(entries)
from agent.skills.formatter import format_unavailable_skills_for_prompt
eligible = self.filter_skills(skill_filter=skill_filter, include_disabled=False)
logger.debug(f"[SkillManager] Eligible: {len(eligible)} skills (total: {len(self.skills)})")
if eligible:
skill_names = [e.skill.name for e in eligible]
logger.debug(f"[SkillManager] Eligible skills: {skill_names}")
result = format_skill_entries_for_prompt(eligible)
unavailable, missing_map = self.filter_unavailable_skills(skill_filter=skill_filter)
if unavailable:
unavailable_names = [e.skill.name for e in unavailable]
logger.debug(f"[SkillManager] Unavailable skills (setup needed): {unavailable_names}")
result += format_unavailable_skills_for_prompt(unavailable, missing_map)
logger.debug(f"[SkillManager] Generated prompt length: {len(result)}")
return result

View File

@@ -29,6 +29,7 @@ class SkillInstallSpec:
class SkillMetadata:
"""Metadata for a skill from frontmatter."""
always: bool = False # Always include this skill
default_enabled: bool = True # Initial enabled state when first discovered
skill_key: Optional[str] = None # Override skill key
primary_env: Optional[str] = None # Primary environment variable
emoji: Optional[str] = None

View File

@@ -167,7 +167,7 @@ class Vision(BaseTool):
@staticmethod
def _maybe_compress(path: str) -> str:
"""Compress image if larger than threshold; return path to use."""
"""Compress image to under COMPRESS_THRESHOLD with max long-edge 1536px."""
file_size = os.path.getsize(path)
if file_size <= COMPRESS_THRESHOLD:
return path
@@ -175,27 +175,47 @@ class Vision(BaseTool):
tmp = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False)
tmp.close()
try:
# macOS: use sips
subprocess.run(
["sips", "-Z", "800", path, "--out", tmp.name],
capture_output=True, check=True,
)
logger.debug(f"[Vision] Compressed image ({file_size // 1024}KB -> {os.path.getsize(tmp.name) // 1024}KB)")
return tmp.name
except (FileNotFoundError, subprocess.CalledProcessError):
pass
def _try_sips(max_dim: str, quality: str) -> bool:
try:
subprocess.run(
["sips", "-Z", max_dim, "-s", "formatOptions", quality,
path, "--out", tmp.name],
capture_output=True, check=True,
)
return True
except (FileNotFoundError, subprocess.CalledProcessError):
return False
try:
# Linux: use ImageMagick convert
subprocess.run(
["convert", path, "-resize", "800x800>", tmp.name],
capture_output=True, check=True,
)
logger.debug(f"[Vision] Compressed image ({file_size // 1024}KB -> {os.path.getsize(tmp.name) // 1024}KB)")
def _try_convert(max_dim: str, quality: str) -> bool:
try:
subprocess.run(
["convert", path, "-resize", f"{max_dim}x{max_dim}>",
"-quality", quality, tmp.name],
capture_output=True, check=True,
)
return True
except (FileNotFoundError, subprocess.CalledProcessError):
return False
attempts = [
("1536", "85"),
("1536", "70"),
("1536", "50"),
]
for max_dim, quality in attempts:
ok = _try_sips(max_dim, quality) or _try_convert(max_dim, quality)
if not ok:
continue
new_size = os.path.getsize(tmp.name)
logger.debug(f"[Vision] Compressed image "
f"({file_size // 1024}KB -> {new_size // 1024}KB, "
f"max_dim={max_dim}, q={quality})")
if new_size <= COMPRESS_THRESHOLD:
return tmp.name
if os.path.exists(tmp.name) and os.path.getsize(tmp.name) > 0:
return tmp.name
except (FileNotFoundError, subprocess.CalledProcessError):
pass
os.remove(tmp.name)
return path

View File

@@ -74,7 +74,7 @@ class AgentLLMModel(LLMModel):
("qwen", const.QWEN_DASHSCOPE), ("qwq", const.QWEN_DASHSCOPE), ("qvq", const.QWEN_DASHSCOPE),
("gemini", const.GEMINI), ("glm", const.ZHIPU_AI), ("claude", const.CLAUDEAPI),
("moonshot", const.MOONSHOT), ("kimi", const.MOONSHOT),
("doubao", const.DOUBAO),
("doubao", const.DOUBAO), ("deepseek", const.DEEPSEEK),
]
def __init__(self, bridge: Bridge, bot_type: str = "chat"):
@@ -115,8 +115,6 @@ class AgentLLMModel(LLMModel):
return const.QWEN_DASHSCOPE
if model_name in [const.MOONSHOT, "moonshot-v1-8k", "moonshot-v1-32k", "moonshot-v1-128k"]:
return const.MOONSHOT
if model_name in [const.DEEPSEEK_CHAT, const.DEEPSEEK_REASONER]:
return const.OPENAI
for prefix, btype in self._MODEL_PREFIX_MAP:
if model_name.startswith(prefix):
return btype

View File

@@ -61,6 +61,9 @@ class Bridge(object):
if model_type and model_type.startswith("doubao"):
self.btype["chat"] = const.DOUBAO
if model_type and model_type.startswith("deepseek"):
self.btype["chat"] = const.DEEPSEEK
if model_type in [const.MODELSCOPE]:
self.btype["chat"] = const.MODELSCOPE

View File

@@ -166,8 +166,8 @@
<i class="fas fa-bars text-slate-600 dark:text-slate-300"></i>
</button>
<!-- Breadcrumb -->
<div class="flex items-center gap-2 text-sm min-w-0">
<!-- Breadcrumb (hidden on mobile) -->
<div class="hidden lg:flex items-center gap-2 text-sm min-w-0">
<span id="breadcrumb-group" class="text-slate-400 dark:text-slate-500 truncate" data-i18n="nav_chat">Chat</span>
<i class="fas fa-chevron-right text-[10px] text-slate-300 dark:text-slate-600"></i>
<span id="breadcrumb-page" class="font-medium text-slate-700 dark:text-slate-200 truncate" data-i18n="menu_chat">Chat</span>

View File

@@ -3,9 +3,9 @@
===================================================================== */
// =====================================================================
// Version — update this before each release
// Version — fetched from backend (single source: /VERSION file)
// =====================================================================
const APP_VERSION = 'v2.0.4';
let APP_VERSION = '';
// =====================================================================
// i18n
@@ -2236,7 +2236,12 @@ navigateTo = function(viewId) {
// =====================================================================
applyTheme();
applyI18n();
document.getElementById('sidebar-version').textContent = `CowAgent ${APP_VERSION}`;
fetch('/api/version').then(r => r.json()).then(data => {
APP_VERSION = `v${data.version}`;
document.getElementById('sidebar-version').textContent = `CowAgent ${APP_VERSION}`;
}).catch(() => {
document.getElementById('sidebar-version').textContent = 'CowAgent';
});
chatInput.focus();
// Re-enable color transition AFTER first paint so the theme applied in <head>

View File

@@ -390,6 +390,7 @@ class WebChannel(ChatChannel):
'/api/scheduler', 'SchedulerHandler',
'/api/history', 'HistoryHandler',
'/api/logs', 'LogsHandler',
'/api/version', 'VersionHandler',
'/assets/(.*)', 'AssetsHandler',
)
app = web.application(urls, globals(), autoreload=False)
@@ -563,9 +564,9 @@ class ConfigHandler:
}),
("deepseek", {
"label": "DeepSeek",
"api_key_field": "open_ai_api_key",
"api_base_key": None,
"api_base_default": None,
"api_key_field": "deepseek_api_key",
"api_base_key": "deepseek_api_base",
"api_base_default": "https://api.deepseek.com/v1",
"models": [const.DEEPSEEK_CHAT, const.DEEPSEEK_REASONER],
}),
("linkai", {
@@ -579,9 +580,9 @@ class ConfigHandler:
EDITABLE_KEYS = {
"model", "bot_type", "use_linkai",
"open_ai_api_base", "claude_api_base", "gemini_api_base",
"open_ai_api_base", "deepseek_api_base", "claude_api_base", "gemini_api_base",
"zhipu_ai_api_base", "moonshot_base_url", "ark_base_url",
"open_ai_api_key", "claude_api_key", "gemini_api_key",
"open_ai_api_key", "deepseek_api_key", "claude_api_key", "gemini_api_key",
"zhipu_ai_api_key", "dashscope_api_key", "moonshot_api_key",
"ark_api_key", "minimax_api_key", "linkai_api_key",
"agent_max_context_tokens", "agent_max_context_turns", "agent_max_steps",
@@ -1429,3 +1430,10 @@ class AssetsHandler:
except Exception as e:
logger.error(f"Error serving static file: {e}", exc_info=True) # 添加更详细的错误信息
raise web.notfound()
class VersionHandler:
def GET(self):
web.header('Content-Type', 'application/json; charset=utf-8')
from cli import __version__
return json.dumps({"version": __version__})

View File

@@ -172,10 +172,8 @@ class WeixinApi:
def get_upload_url(self, filekey: str, media_type: int, to_user_id: str,
rawsize: int, rawfilemd5: str, filesize: int,
aeskey: str,
thumb_rawsize: int = 0, thumb_rawfilemd5: str = "",
thumb_filesize: int = 0) -> dict:
body = {
aeskey: str) -> dict:
return self._post("ilink/bot/getuploadurl", {
"filekey": filekey,
"media_type": media_type,
"to_user_id": to_user_id,
@@ -183,14 +181,8 @@ class WeixinApi:
"rawfilemd5": rawfilemd5,
"filesize": filesize,
"aeskey": aeskey,
}
if thumb_rawsize > 0:
body["thumb_rawsize"] = thumb_rawsize
body["thumb_rawfilemd5"] = thumb_rawfilemd5
body["thumb_filesize"] = thumb_filesize
else:
body["no_need_thumb"] = True
return self._post("ilink/bot/getuploadurl", body)
"no_need_thumb": True,
})
# ── getConfig / sendTyping ─────────────────────────────────────────
@@ -259,10 +251,18 @@ def _md5_bytes(data: bytes) -> str:
return hashlib.md5(data).hexdigest()
def _aes_ecb_padded_size(plaintext_size: int) -> int:
"""PKCS7 padded size for AES-128-ECB."""
return ((plaintext_size + 1 + 15) // 16) * 16
UPLOAD_MAX_RETRIES = 3
def upload_media_to_cdn(api: WeixinApi, file_path: str, to_user_id: str,
media_type: int) -> dict:
"""
Upload a local file to the Weixin CDN.
Upload a local file to the Weixin CDN (matching official plugin protocol).
Args:
api: WeixinApi instance
@@ -275,35 +275,14 @@ def upload_media_to_cdn(api: WeixinApi, file_path: str, to_user_id: str,
"""
aes_key = os.urandom(16)
aes_key_hex = aes_key.hex()
filekey = uuid.uuid4().hex
with open(file_path, "rb") as f:
raw_data = f.read()
raw_size = len(raw_data)
raw_md5 = _md5_bytes(raw_data)
encrypted = _aes_ecb_encrypt(raw_data, aes_key)
cipher_size = len(encrypted)
filekey = uuid.uuid4().hex
thumb_rawsize = 0
thumb_rawfilemd5 = ""
thumb_filesize = 0
if media_type == 1: # IMAGE - generate a tiny thumbnail
try:
from PIL import Image
import io
img = Image.open(file_path)
img.thumbnail((100, 100))
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=60)
thumb_raw = buf.getvalue()
thumb_rawsize = len(thumb_raw)
thumb_rawfilemd5 = _md5_bytes(thumb_raw)
thumb_encrypted = _aes_ecb_encrypt(thumb_raw, aes_key)
thumb_filesize = len(thumb_encrypted)
except Exception as e:
logger.warning(f"[Weixin] Thumbnail generation failed, skipping: {e}")
cipher_size = _aes_ecb_padded_size(raw_size)
resp = api.get_upload_url(
filekey=filekey,
@@ -313,37 +292,52 @@ def upload_media_to_cdn(api: WeixinApi, file_path: str, to_user_id: str,
rawfilemd5=raw_md5,
filesize=cipher_size,
aeskey=aes_key_hex,
thumb_rawsize=thumb_rawsize,
thumb_rawfilemd5=thumb_rawfilemd5,
thumb_filesize=thumb_filesize,
)
upload_param = resp.get("upload_param", "")
if not upload_param:
raise RuntimeError(f"[Weixin] getUploadUrl returned no upload_param: {resp}")
cdn_url = api.cdn_base_url + "?" + upload_param
put_resp = requests.put(cdn_url, data=encrypted, headers={
"Content-Type": "application/octet-stream",
"Content-Length": str(cipher_size),
}, timeout=60)
put_resp.raise_for_status()
encrypted = _aes_ecb_encrypt(raw_data, aes_key)
# Upload thumbnail if we have one
thumb_upload_param = resp.get("thumb_upload_param", "")
if thumb_upload_param and thumb_filesize > 0:
thumb_cdn_url = api.cdn_base_url + "?" + thumb_upload_param
from urllib.parse import quote
cdn_url = (f"{api.cdn_base_url}/upload"
f"?encrypted_query_param={quote(upload_param)}"
f"&filekey={quote(filekey)}")
download_param = None
last_error = None
for attempt in range(1, UPLOAD_MAX_RETRIES + 1):
try:
requests.put(thumb_cdn_url, data=thumb_encrypted, headers={
cdn_resp = requests.post(cdn_url, data=encrypted, headers={
"Content-Type": "application/octet-stream",
"Content-Length": str(thumb_filesize),
}, timeout=30)
}, timeout=120)
if 400 <= cdn_resp.status_code < 500:
err_msg = cdn_resp.headers.get("x-error-message", cdn_resp.text[:200])
raise RuntimeError(f"CDN client error {cdn_resp.status_code}: {err_msg}")
cdn_resp.raise_for_status()
download_param = cdn_resp.headers.get("x-encrypted-param", "")
if not download_param:
raise RuntimeError("CDN response missing x-encrypted-param header")
logger.debug(f"[Weixin] CDN upload success attempt={attempt} filekey={filekey}")
break
except Exception as e:
logger.warning(f"[Weixin] Thumbnail upload failed (non-fatal): {e}")
last_error = e
if "client error" in str(e):
raise
if attempt < UPLOAD_MAX_RETRIES:
logger.warning(f"[Weixin] CDN upload attempt {attempt} failed, retrying: {e}")
else:
logger.error(f"[Weixin] CDN upload failed after {UPLOAD_MAX_RETRIES} attempts: {e}")
if not download_param:
raise last_error or RuntimeError("CDN upload failed")
aes_key_b64 = base64.b64encode(aes_key_hex.encode("utf-8")).decode("utf-8")
return {
"encrypt_query_param": upload_param,
"aes_key_b64": base64.b64encode(aes_key).decode("utf-8"),
"encrypt_query_param": download_param,
"aes_key_b64": aes_key_b64,
"ciphertext_size": cipher_size,
"raw_size": raw_size,
}
@@ -363,19 +357,30 @@ def download_media_from_cdn(cdn_base_url: str, encrypt_query_param: str,
Returns:
save_path on success
"""
url = cdn_base_url + "?" + encrypt_query_param
from urllib.parse import quote
url = f"{cdn_base_url}/download?encrypted_query_param={quote(encrypt_query_param)}"
resp = requests.get(url, timeout=60)
resp.raise_for_status()
# Determine key format (hex string or base64)
# Determine key format:
# 1) 32-char hex string → 16 raw bytes
# 2) base64 string → decode → if 32 bytes, treat as hex-encoded → 16 raw bytes
# 3) base64 string → decode → 16 raw bytes directly
try:
key_bytes = bytes.fromhex(aes_key)
if len(key_bytes) != 16:
raise ValueError()
except (ValueError, TypeError):
key_bytes = base64.b64decode(aes_key)
if len(key_bytes) != 16:
raise ValueError(f"Invalid AES key length: {len(key_bytes)}")
decoded = base64.b64decode(aes_key)
if len(decoded) == 32:
try:
key_bytes = bytes.fromhex(decoded.decode("ascii"))
except (ValueError, UnicodeDecodeError):
raise ValueError(f"Invalid AES key: 32 bytes but not valid hex")
elif len(decoded) == 16:
key_bytes = decoded
else:
raise ValueError(f"Invalid AES key length after base64 decode: {len(decoded)}")
decrypted = _aes_ecb_decrypt(resp.content, key_bytes)

View File

@@ -31,6 +31,8 @@ BACKOFF_DELAY = 30
RETRY_DELAY = 2
SESSION_EXPIRED_ERRCODE = -14
TEXT_CHUNK_LIMIT = 4000
QR_LOGIN_TIMEOUT_S = 480
QR_MAX_REFRESHES = 10
def _load_credentials(cred_path: str) -> dict:
@@ -80,6 +82,8 @@ class WeixinChannel(ChatChannel):
# ── Lifecycle ──────────────────────────────────────────────────────
def startup(self):
self._stop_event.clear()
base_url = conf().get("weixin_base_url", DEFAULT_BASE_URL)
cdn_base_url = conf().get("weixin_cdn_base_url", CDN_BASE_URL)
token = conf().get("weixin_token", "")
@@ -95,17 +99,9 @@ class WeixinChannel(ChatChannel):
base_url = creds["base_url"]
if not token:
logger.info("[Weixin] No token found, starting QR login...")
self.login_status = self.LOGIN_STATUS_WAITING
login_result = self._qr_login(base_url)
if not login_result:
self.login_status = self.LOGIN_STATUS_IDLE
err = "[Weixin] QR login failed. Set weixin_token in config or run login again."
logger.error(err)
self.report_startup_error(err)
token, base_url = self._login_with_retry(base_url)
if not token:
return
token = login_result["token"]
base_url = login_result.get("base_url", base_url)
self.api = WeixinApi(base_url=base_url, token=token, cdn_base_url=cdn_base_url)
self.login_status = self.LOGIN_STATUS_OK
@@ -114,9 +110,26 @@ class WeixinChannel(ChatChannel):
f"如需重新扫码登录请删除该文件后重启")
self.report_startup_success()
self._stop_event.clear()
self._poll_loop()
def _login_with_retry(self, base_url: str) -> tuple:
"""Attempt QR login, then wait for stop if failed.
Returns (token, base_url) on success, or ("", "") if stopped."""
logger.info("[Weixin] No token found, starting QR login...")
self.login_status = self.LOGIN_STATUS_WAITING
login_result = self._qr_login(base_url)
if login_result:
return login_result["token"], login_result.get("base_url", base_url)
self.login_status = self.LOGIN_STATUS_IDLE
if not self._stop_event.is_set():
logger.info("[Weixin] QR login timed out, waiting for stop or reconnect...")
print(" 二维码登录超时,请通过控制台重新接入\n")
self._stop_event.wait()
logger.info("[Weixin] Login cancelled by stop event")
return "", ""
def stop(self):
logger.info("[Weixin] stop() called")
self._stop_event.set()
@@ -202,14 +215,21 @@ class WeixinChannel(ChatChannel):
return {}
self._current_qr_url = qrcode_url
logger.info(f"[Weixin] QR code URL: {qrcode_url}")
logger.info(f"[Weixin] 微信二维码链接: {qrcode_url}")
self._print_qr(qrcode_url)
self._notify_cloud_qrcode(qrcode_url)
print(" 等待扫码...\n")
scanned_printed = False
refresh_count = 0
deadline = time.time() + QR_LOGIN_TIMEOUT_S
while not self._stop_event.is_set():
if time.time() >= deadline:
logger.warning(f"[Weixin] QR login timed out after {QR_LOGIN_TIMEOUT_S}s")
print(f"\n 二维码登录超时({QR_LOGIN_TIMEOUT_S}s请重启后重试")
break
try:
status_resp = api.poll_qr_status(qrcode)
except Exception as e:
@@ -226,14 +246,19 @@ class WeixinChannel(ChatChannel):
print(" 已扫码,请在手机上确认...")
scanned_printed = True
elif status == "expired":
print(" 二维码已过期,正在刷新...")
refresh_count += 1
if refresh_count >= QR_MAX_REFRESHES:
logger.warning(f"[Weixin] QR code refreshed {QR_MAX_REFRESHES} times, giving up")
print(f"\n 二维码已刷新 {QR_MAX_REFRESHES} 次仍未扫码,请重启后重试")
break
print(f" 二维码已过期,正在刷新({refresh_count}/{QR_MAX_REFRESHES}...")
try:
qr_resp = api.fetch_qr_code()
qrcode = qr_resp.get("qrcode", "")
qrcode_url = qr_resp.get("qrcode_img_content", "")
scanned_printed = False
self._current_qr_url = qrcode_url
logger.info(f"[Weixin] New QR code: {qrcode_url}")
logger.info(f"[Weixin] 微信二维码链接 ({refresh_count}/{QR_MAX_REFRESHES}): {qrcode_url}")
self._print_qr(qrcode_url)
self._notify_cloud_qrcode(qrcode_url)
except Exception as e:
@@ -267,8 +292,9 @@ class WeixinChannel(ChatChannel):
self._stop_event.wait(1)
logger.info("[Weixin] QR login cancelled by stop event")
self._current_qr_url = ""
if self._stop_event.is_set():
logger.info("[Weixin] QR login cancelled by stop event")
return {}
# ── Long-poll loop ─────────────────────────────────────────────────

View File

@@ -184,12 +184,16 @@ class WeixinMessage(ChatMessage):
logger.warning(f"[Weixin] Missing CDN params for media download (type={media_type})")
return ""
ext_map = {ITEM_IMAGE: ".jpg", ITEM_VIDEO: ".mp4", ITEM_FILE: "", ITEM_VOICE: ".silk"}
ext = ext_map.get(media_type, "")
if media_type == ITEM_FILE:
ext = os.path.splitext(info.get("file_name", ""))[1] or ".bin"
save_path = os.path.join(_get_tmp_dir(), f"wx_{self.msg_id}{ext}")
original_name = info.get("file_name", "")
if original_name:
save_path = os.path.join(_get_tmp_dir(), original_name)
else:
save_path = os.path.join(_get_tmp_dir(), f"wx_{self.msg_id}.bin")
else:
ext_map = {ITEM_IMAGE: ".jpg", ITEM_VIDEO: ".mp4", ITEM_VOICE: ".silk"}
ext = ext_map.get(media_type, "")
save_path = os.path.join(_get_tmp_dir(), f"wx_{self.msg_id}{ext}")
try:
download_media_from_cdn(cdn_base_url, encrypt_param, aes_key, save_path)

1
cli/VERSION Normal file
View File

@@ -0,0 +1 @@
2.0.4

13
cli/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""CowAgent CLI - Manage your CowAgent from the command line."""
import os as _os
def _read_version():
version_file = _os.path.join(_os.path.dirname(_os.path.abspath(__file__)), "VERSION")
try:
with open(version_file, "r") as f:
return f.read().strip()
except FileNotFoundError:
return "0.0.0"
__version__ = _read_version()

4
cli/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
"""Allow running as: python -m cli"""
from cli.cli import main
main()

72
cli/cli.py Normal file
View File

@@ -0,0 +1,72 @@
"""CowAgent CLI entry point."""
import click
from cli import __version__
from cli.commands.skill import skill
from cli.commands.process import start, stop, restart, status, logs
from cli.commands.context import context
HELP_TEXT = """Usage: cow COMMAND [ARGS]...
CowAgent CLI - Manage your CowAgent instance.
Commands:
help Show this message.
version Show the version.
start Start CowAgent.
stop Stop CowAgent.
restart Restart CowAgent.
status Show CowAgent running status.
logs View CowAgent logs.
context View or manage conversation context.
skill Manage CowAgent skills.
Tip: You can also send /help, /skill list, etc. in agent chat."""
class CowCLI(click.Group):
def format_help(self, ctx, formatter):
formatter.write(HELP_TEXT.strip())
formatter.write("\n")
def parse_args(self, ctx, args):
if args and args[0] == 'help':
click.echo(HELP_TEXT.strip())
ctx.exit(0)
return super().parse_args(ctx, args)
@click.group(cls=CowCLI, invoke_without_command=True, context_settings=dict(help_option_names=[]))
@click.pass_context
def main(ctx):
"""CowAgent CLI - Manage your CowAgent instance."""
if ctx.invoked_subcommand is None:
click.echo(HELP_TEXT.strip())
@main.command()
def version():
"""Show the version."""
click.echo(f"cow {__version__}")
@main.command(name='help')
@click.pass_context
def help_cmd(ctx):
"""Show this message."""
click.echo(HELP_TEXT.strip())
main.add_command(skill)
main.add_command(start)
main.add_command(stop)
main.add_command(restart)
main.add_command(status)
main.add_command(logs)
main.add_command(context)
if __name__ == '__main__':
main()

0
cli/commands/__init__.py Normal file
View File

29
cli/commands/context.py Normal file
View File

@@ -0,0 +1,29 @@
"""cow context - Context management commands."""
import click
CHAT_HINT = (
"Context commands operate on the running agent's memory.\n"
"Please send the command in a chat conversation instead:\n\n"
" /context - View current context info\n"
" /context clear - Clear conversation context"
)
@click.group(invoke_without_command=True)
@click.pass_context
def context(ctx):
"""View or manage conversation context.
Context commands need access to the running agent's memory.
Use them in chat conversations: /context or /context clear
"""
if ctx.invoked_subcommand is None:
click.echo(f"\n {CHAT_HINT}\n")
@context.command()
def clear():
"""Clear conversation context (messages history)."""
click.echo(f"\n {CHAT_HINT}\n")

241
cli/commands/process.py Normal file
View File

@@ -0,0 +1,241 @@
"""cow start/stop/restart/status/logs - Process management commands."""
import os
import sys
import subprocess
import time
from typing import Optional
import click
from cli.utils import get_project_root
_IS_WIN = sys.platform == "win32"
def _get_pid_file():
return os.path.join(get_project_root(), ".cow.pid")
def _get_log_file():
return os.path.join(get_project_root(), "nohup.out")
def _is_pid_alive(pid: int) -> bool:
"""Check whether a process is still running (cross-platform)."""
if _IS_WIN:
try:
out = subprocess.check_output(
["tasklist", "/FI", f"PID eq {pid}", "/NH"],
stderr=subprocess.DEVNULL,
)
return str(pid) in out.decode(errors="ignore")
except Exception:
return False
else:
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
def _kill_pid(pid: int, force: bool = False):
"""Terminate a process by PID (cross-platform)."""
if _IS_WIN:
flag = "/F" if force else ""
cmd = ["taskkill"]
if force:
cmd.append("/F")
cmd.extend(["/PID", str(pid)])
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
import signal
sig = signal.SIGKILL if force else signal.SIGTERM
os.kill(pid, sig)
def _read_pid() -> Optional[int]:
pid_file = _get_pid_file()
if not os.path.exists(pid_file):
return None
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
if _is_pid_alive(pid):
return pid
os.remove(pid_file)
return None
except (ValueError, OSError):
try:
os.remove(pid_file)
except OSError:
pass
return None
def _write_pid(pid: int):
with open(_get_pid_file(), "w") as f:
f.write(str(pid))
def _remove_pid():
pid_file = _get_pid_file()
if os.path.exists(pid_file):
os.remove(pid_file)
@click.command()
@click.option("--foreground", "-f", is_flag=True, help="Run in foreground (don't daemonize)")
@click.option("--no-logs", is_flag=True, help="Don't tail logs after starting")
def start(foreground, no_logs):
"""Start CowAgent."""
pid = _read_pid()
if pid:
click.echo(f"CowAgent is already running (PID: {pid}).")
return
root = get_project_root()
app_py = os.path.join(root, "app.py")
if not os.path.exists(app_py):
click.echo("Error: app.py not found in project root.", err=True)
sys.exit(1)
python = sys.executable
if foreground:
click.echo("Starting CowAgent in foreground...")
if _IS_WIN:
sys.exit(subprocess.call([python, app_py], cwd=root))
else:
os.execv(python, [python, app_py])
else:
log_file = _get_log_file()
click.echo("Starting CowAgent...")
popen_kwargs = dict(cwd=root)
if _IS_WIN:
CREATE_NO_WINDOW = 0x08000000
popen_kwargs["creationflags"] = (
subprocess.CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW
)
else:
popen_kwargs["start_new_session"] = True
with open(log_file, "a") as log:
proc = subprocess.Popen(
[python, app_py],
stdout=log,
stderr=log,
**popen_kwargs,
)
_write_pid(proc.pid)
click.echo(click.style(f"✓ CowAgent started (PID: {proc.pid})", fg="green"))
click.echo(f" Logs: {log_file}")
if not no_logs:
click.echo(" Press Ctrl+C to stop tailing logs.\n")
_tail_log(log_file)
@click.command()
def stop():
"""Stop CowAgent."""
pid = _read_pid()
if not pid:
click.echo("CowAgent is not running.")
return
click.echo(f"Stopping CowAgent (PID: {pid})...")
try:
_kill_pid(pid)
for _ in range(30):
time.sleep(0.1)
if not _is_pid_alive(pid):
break
else:
_kill_pid(pid, force=True)
except (ProcessLookupError, OSError):
pass
_remove_pid()
click.echo(click.style("✓ CowAgent stopped.", fg="green"))
@click.command()
@click.option("--no-logs", is_flag=True, help="Don't tail logs after restarting")
@click.pass_context
def restart(ctx, no_logs):
"""Restart CowAgent."""
ctx.invoke(stop)
time.sleep(1)
ctx.invoke(start, no_logs=no_logs)
@click.command()
def status():
"""Show CowAgent running status."""
from cli import __version__
from cli.utils import load_config_json
pid = _read_pid()
if pid:
click.echo(click.style(f"● CowAgent is running (PID: {pid})", fg="green"))
else:
click.echo(click.style("● CowAgent is not running", fg="red"))
click.echo(f" 版本: v{__version__}")
cfg = load_config_json()
if cfg:
channel = cfg.get("channel_type", "unknown")
if isinstance(channel, list):
channel = ", ".join(channel)
click.echo(f" 通道: {channel}")
click.echo(f" 模型: {cfg.get('model', 'unknown')}")
mode = "Agent" if cfg.get("agent") else "Chat"
click.echo(f" 模式: {mode}")
@click.command()
@click.option("--follow", "-f", is_flag=True, help="Follow log output")
@click.option("--lines", "-n", default=50, help="Number of lines to show")
def logs(follow, lines):
"""View CowAgent logs."""
log_file = _get_log_file()
if not os.path.exists(log_file):
click.echo("No log file found.")
return
if follow:
_tail_log(log_file, lines)
else:
_print_last_lines(log_file, lines)
def _print_last_lines(file_path: str, n: int = 50):
"""Print the last N lines of a file (cross-platform)."""
try:
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
for line in all_lines[-n:]:
click.echo(line, nl=False)
except Exception as e:
click.echo(f"Error reading log file: {e}", err=True)
def _tail_log(log_file: str, lines: int = 50):
"""Follow log file output. Blocks until Ctrl+C (cross-platform)."""
_print_last_lines(log_file, lines)
try:
with open(log_file, "r", encoding="utf-8", errors="replace") as f:
f.seek(0, 2)
while True:
line = f.readline()
if line:
click.echo(line, nl=False)
else:
time.sleep(0.3)
except KeyboardInterrupt:
pass

563
cli/commands/skill.py Normal file
View File

@@ -0,0 +1,563 @@
"""cow skill - Skill management commands."""
import os
import re
import sys
import json
import hashlib
import shutil
import zipfile
import tempfile
from urllib.parse import urlparse
import click
import requests
from cli.utils import (
get_project_root,
get_skills_dir,
get_builtin_skills_dir,
load_skills_config,
SKILL_HUB_API,
)
_SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$")
def _validate_skill_name(name: str):
"""Reject names that contain path traversal or special characters."""
if not _SAFE_NAME_RE.match(name):
click.echo(
f"Error: Invalid skill name '{name}'. "
"Use only letters, digits, hyphens, and underscores.",
err=True,
)
sys.exit(1)
def _validate_github_spec(spec: str):
"""Reject specs that don't look like owner/repo."""
if not re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+$", spec):
click.echo(f"Error: Invalid GitHub spec '{spec}'. Expected format: owner/repo", err=True)
sys.exit(1)
def _safe_extractall(zf: zipfile.ZipFile, dest: str):
"""Extract zip while guarding against Zip Slip (path traversal)."""
dest = os.path.realpath(dest)
for member in zf.infolist():
target = os.path.realpath(os.path.join(dest, member.filename))
if not target.startswith(dest + os.sep) and target != dest:
raise ValueError(f"Unsafe zip entry detected: {member.filename}")
zf.extractall(dest)
def _verify_checksum(content: bytes, expected: str):
"""Verify SHA-256 checksum of downloaded content.
Returns True if checksum matches or no expected value provided.
Exits with error if mismatch.
"""
if not expected:
return True
actual = hashlib.sha256(content).hexdigest()
if actual != expected.lower():
click.echo(
f"Error: Checksum mismatch!\n"
f" Expected: {expected}\n"
f" Actual: {actual}\n"
f"The downloaded package may have been tampered with.",
err=True,
)
sys.exit(1)
return True
@click.group()
def skill():
"""Manage CowAgent skills."""
pass
# ------------------------------------------------------------------
# cow skill list
# ------------------------------------------------------------------
@skill.command("list")
@click.option("--remote", is_flag=True, help="Browse skills on Skill Hub")
@click.option("--page", default=1, type=int, help="Page number for remote listing")
def skill_list(remote, page):
"""List installed skills or browse Skill Hub."""
if remote:
_list_remote(page=page)
else:
_list_local()
def _list_local():
"""List locally installed skills."""
config = load_skills_config()
skills_dir = get_skills_dir()
builtin_dir = get_builtin_skills_dir()
if not config:
# Fallback: scan directories directly
entries = []
for d in [builtin_dir, skills_dir]:
if not os.path.isdir(d):
continue
source = "builtin" if d == builtin_dir else "custom"
for name in sorted(os.listdir(d)):
skill_path = os.path.join(d, name)
if os.path.isdir(skill_path) and not name.startswith("."):
has_skill_md = os.path.exists(os.path.join(skill_path, "SKILL.md"))
if has_skill_md:
entries.append({"name": name, "source": source, "enabled": True, "description": ""})
if not entries:
click.echo("No skills installed.")
return
_print_skill_table(entries)
return
entries = sorted(config.values(), key=lambda x: x.get("name", ""))
if not entries:
click.echo("No skills installed.")
return
_print_skill_table(entries)
def _print_skill_table(entries):
"""Print skills as a formatted table."""
name_w = max(len(e.get("name", "")) for e in entries)
name_w = max(name_w, 4) + 2
desc_w = 40
header = f"{'Name':<{name_w}} {'Status':<10} {'Source':<10} {'Description'}"
click.echo(f"\n Installed skills ({len(entries)})\n")
click.echo(f" {header}")
click.echo(f" {'' * (name_w + 10 + 10 + desc_w)}")
for e in entries:
name = e.get("name", "")
enabled = e.get("enabled", True)
source = e.get("source", "")
desc = e.get("description", "") or ""
if len(desc) > desc_w:
desc = desc[:desc_w - 3] + "..."
status_icon = click.style("✓ on ", fg="green") if enabled else click.style("✗ off", fg="red")
click.echo(f" {name:<{name_w}} {status_icon} {source:<10} {desc}")
click.echo()
_REMOTE_PAGE_SIZE = 10
def _list_remote(page: int = 1):
"""List skills from remote Skill Hub with server-side pagination."""
try:
resp = requests.get(
f"{SKILL_HUB_API}/skills",
params={"page": page, "limit": _REMOTE_PAGE_SIZE},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
click.echo(f"Error: Failed to fetch from Skill Hub: {e}", err=True)
sys.exit(1)
skills = data.get("skills", [])
total = data.get("total", len(skills))
if not skills and page == 1:
click.echo("No skills available on Skill Hub.")
return
total_pages = max(1, (total + _REMOTE_PAGE_SIZE - 1) // _REMOTE_PAGE_SIZE)
page = min(page, total_pages)
installed = set(load_skills_config().keys())
name_w = max((len(s.get("name", "")) for s in skills), default=4)
name_w = max(name_w, 4) + 2
click.echo(f"\n Skill Hub ({total} available) — page {page}/{total_pages}\n")
click.echo(f" {'Name':<{name_w}} {'Status':<12} {'Description'}")
click.echo(f" {'' * (name_w + 12 + 50)}")
for s in skills:
name = s.get("name", "")
desc = s.get("description", "") or s.get("display_name", "")
if len(desc) > 50:
desc = desc[:47] + "..."
status = click.style("installed", fg="green") if name in installed else ""
click.echo(f" {name:<{name_w}} {status:<12} {desc}")
click.echo()
nav_parts = []
if page > 1:
nav_parts.append(f"cow skill list --remote --page {page - 1}")
if page < total_pages:
nav_parts.append(f"cow skill list --remote --page {page + 1}")
if nav_parts:
click.echo(f" Navigate: {' | '.join(nav_parts)}")
click.echo(f" Install: cow skill install <name>\n")
# ------------------------------------------------------------------
# cow skill search
# ------------------------------------------------------------------
@skill.command()
@click.argument("query")
def search(query):
"""Search skills on Skill Hub."""
try:
resp = requests.get(f"{SKILL_HUB_API}/skills/search", params={"q": query}, timeout=10)
resp.raise_for_status()
data = resp.json()
except Exception as e:
click.echo(f"Error: Failed to search Skill Hub: {e}", err=True)
sys.exit(1)
skills = data.get("skills", [])
if not skills:
click.echo(f'No skills found for "{query}".')
return
installed = set(load_skills_config().keys())
name_w = max(len(s.get("name", "")) for s in skills)
name_w = max(name_w, 4) + 2
click.echo(f'\n Search results for "{query}" ({len(skills)} found)\n')
click.echo(f" {'Name':<{name_w}} {'Status':<12} {'Description'}")
click.echo(f" {'' * (name_w + 12 + 50)}")
for s in skills:
name = s.get("name", "")
desc = s.get("description", "") or s.get("display_name", "")
if len(desc) > 50:
desc = desc[:47] + "..."
status = click.style("installed", fg="green") if name in installed else ""
click.echo(f" {name:<{name_w}} {status:<12} {desc}")
click.echo(f"\n Install with: cow skill install <name>\n")
# ------------------------------------------------------------------
# cow skill install
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def install(name):
"""Install a skill from Skill Hub or GitHub.
Examples:
cow skill install pptx
cow skill install github:owner/repo
cow skill install github:owner/repo#path/to/skill
"""
if name.startswith("github:"):
_install_github(name[7:])
else:
_validate_skill_name(name)
_install_hub(name)
def _install_hub(name):
"""Install a skill from Skill Hub."""
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
click.echo(f"Fetching skill info for '{name}'...")
try:
resp = requests.get(f"{SKILL_HUB_API}/skills/{name}/download", timeout=15)
resp.raise_for_status()
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
click.echo(f"Error: Skill '{name}' not found on Skill Hub.", err=True)
else:
click.echo(f"Error: Failed to fetch skill: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: Failed to connect to Skill Hub: {e}", err=True)
sys.exit(1)
content_type = resp.headers.get("Content-Type", "")
if "application/json" in content_type:
data = resp.json()
source_type = data.get("source_type")
if source_type == "github":
source_url = data.get("source_url", "")
_validate_github_spec(source_url)
source_path = data.get("source_path")
click.echo(f"Source: GitHub ({source_url})")
_install_github(source_url, subpath=source_path, skill_name=name)
return
if source_type == "registry":
download_url = data.get("download_url")
if download_url:
parsed = urlparse(download_url)
if parsed.scheme != "https":
click.echo(f"Error: Refusing to download from non-HTTPS URL.", err=True)
sys.exit(1)
provider = data.get("source_provider", "registry")
expected_checksum = data.get("checksum") or data.get("sha256")
click.echo(f"Source: {provider}")
click.echo("Downloading skill package...")
try:
dl_resp = requests.get(download_url, timeout=60, allow_redirects=True)
dl_resp.raise_for_status()
except Exception as e:
click.echo(f"Error: Failed to download from {provider}: {e}", err=True)
sys.exit(1)
_verify_checksum(dl_resp.content, expected_checksum)
_install_zip_bytes(dl_resp.content, name, skills_dir)
click.echo(click.style(f"✓ Skill '{name}' installed successfully!", fg="green"))
else:
click.echo(f"Error: Unsupported registry provider.", err=True)
sys.exit(1)
return
if "redirect" in data:
source_url = data.get("source_url", "")
_validate_github_spec(source_url)
source_path = data.get("source_path")
click.echo(f"Source: GitHub ({source_url})")
_install_github(source_url, subpath=source_path, skill_name=name)
return
elif "application/zip" in content_type:
click.echo("Downloading skill package...")
expected_checksum = resp.headers.get("X-Checksum-Sha256")
_verify_checksum(resp.content, expected_checksum)
_install_zip_bytes(resp.content, name, skills_dir)
click.echo(click.style(f"✓ Skill '{name}' installed successfully!", fg="green"))
return
click.echo(f"Error: Unexpected response from Skill Hub.", err=True)
sys.exit(1)
def _install_github(spec, subpath=None, skill_name=None):
"""Install a skill from a GitHub repo.
spec format: owner/repo or owner/repo#path
"""
if "#" in spec and not subpath:
spec, subpath = spec.split("#", 1)
_validate_github_spec(spec)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
_validate_skill_name(skill_name)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
zip_url = f"https://github.com/{spec}/archive/refs/heads/main.zip"
click.echo(f"Downloading from GitHub: {spec}...")
try:
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
resp.raise_for_status()
except Exception as e:
click.echo(f"Error: Failed to download from GitHub: {e}", err=True)
sys.exit(1)
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "repo.zip")
with open(zip_path, "wb") as f:
f.write(resp.content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
# GitHub archives have a top-level dir like "repo-main/"
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
repo_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
repo_root = os.path.join(extract_dir, top_items[0])
if subpath:
source_dir = os.path.join(repo_root, subpath.strip("/"))
if not os.path.isdir(source_dir):
click.echo(f"Error: Path '{subpath}' not found in repository.", err=True)
sys.exit(1)
else:
source_dir = repo_root
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
click.echo(click.style(f"✓ Skill '{skill_name}' installed successfully!", fg="green"))
def _install_zip_bytes(content, name, skills_dir):
"""Extract a zip archive into the skills directory."""
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "package.zip")
with open(zip_path, "wb") as f:
f.write(content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
source = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
source = os.path.join(extract_dir, top_items[0])
target = os.path.join(skills_dir, name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(source, target)
# ------------------------------------------------------------------
# cow skill uninstall
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
@click.option("--yes", "-y", is_flag=True, help="Skip confirmation")
def uninstall(name, yes):
"""Uninstall a skill."""
_validate_skill_name(name)
skills_dir = get_skills_dir()
skill_dir = os.path.join(skills_dir, name)
if not os.path.exists(skill_dir):
click.echo(f"Error: Skill '{name}' is not installed.", err=True)
sys.exit(1)
if not yes:
click.confirm(f"Uninstall skill '{name}'?", abort=True)
shutil.rmtree(skill_dir)
config_path = os.path.join(skills_dir, "skills_config.json")
if os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
config.pop(name, None)
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
except Exception:
pass
click.echo(click.style(f"✓ Skill '{name}' uninstalled.", fg="green"))
# ------------------------------------------------------------------
# cow skill enable / disable
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def enable(name):
"""Enable a skill."""
_set_enabled(name, True)
@skill.command()
@click.argument("name")
def disable(name):
"""Disable a skill."""
_set_enabled(name, False)
def _set_enabled(name, enabled):
_validate_skill_name(name)
skills_dir = get_skills_dir()
config_path = os.path.join(skills_dir, "skills_config.json")
if not os.path.exists(config_path):
click.echo(f"Error: No skills config found.", err=True)
sys.exit(1)
try:
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
except Exception as e:
click.echo(f"Error: Failed to read skills config: {e}", err=True)
sys.exit(1)
if name not in config:
click.echo(f"Error: Skill '{name}' not found in config.", err=True)
sys.exit(1)
config[name]["enabled"] = enabled
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
state = "enabled" if enabled else "disabled"
icon = "" if enabled else ""
color = "green" if enabled else "yellow"
click.echo(click.style(f"{icon} Skill '{name}' {state}.", fg=color))
# ------------------------------------------------------------------
# cow skill info
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def info(name):
"""Show details about an installed skill."""
_validate_skill_name(name)
skills_dir = get_skills_dir()
builtin_dir = get_builtin_skills_dir()
skill_dir = None
source = None
for d, src in [(skills_dir, "custom"), (builtin_dir, "builtin")]:
candidate = os.path.join(d, name)
if os.path.isdir(candidate):
skill_dir = candidate
source = src
break
if not skill_dir:
click.echo(f"Error: Skill '{name}' not found.", err=True)
sys.exit(1)
skill_md = os.path.join(skill_dir, "SKILL.md")
if not os.path.exists(skill_md):
click.echo(f"Skill directory: {skill_dir}")
click.echo("No SKILL.md found.")
return
with open(skill_md, "r", encoding="utf-8") as f:
content = f.read()
config = load_skills_config()
entry = config.get(name, {})
enabled = entry.get("enabled", True)
status_str = click.style("✓ enabled", fg="green") if enabled else click.style("✗ disabled", fg="red")
click.echo(f"\n Skill: {name}")
click.echo(f" Source: {source}")
click.echo(f" Status: {status_str}")
click.echo(f" Path: {skill_dir}")
click.echo(f"\n{'' * 60}")
# Show first ~30 lines of SKILL.md as a preview
lines = content.split("\n")
preview = "\n".join(lines[:30])
click.echo(preview)
if len(lines) > 30:
click.echo(f"\n ... ({len(lines) - 30} more lines, see {skill_md})")
click.echo()

62
cli/utils.py Normal file
View File

@@ -0,0 +1,62 @@
"""Shared utilities for cow CLI."""
import os
import sys
import json
def get_project_root() -> str:
"""Get the CowAgent project root directory."""
# cli/ is directly under the project root
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def get_workspace_dir() -> str:
"""Get the agent workspace directory from config, defaulting to ~/cow."""
config = load_config_json()
workspace = config.get("agent_workspace", "~/cow")
return os.path.expanduser(workspace)
def get_skills_dir() -> str:
"""Get the custom skills directory."""
return os.path.join(get_workspace_dir(), "skills")
def get_builtin_skills_dir() -> str:
"""Get the builtin skills directory."""
return os.path.join(get_project_root(), "skills")
def load_config_json() -> dict:
"""Load config.json from project root."""
config_path = os.path.join(get_project_root(), "config.json")
if not os.path.exists(config_path):
return {}
try:
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def load_skills_config() -> dict:
"""Load skills_config.json from the custom skills directory."""
path = os.path.join(get_skills_dir(), "skills_config.json")
if not os.path.exists(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def ensure_sys_path():
"""Add project root to sys.path so we can import agent modules."""
root = get_project_root()
if root not in sys.path:
sys.path.insert(0, root)
SKILL_HUB_API = "https://skills.cowagent.ai/api"

View File

@@ -222,7 +222,14 @@ class CloudClient(LinkAIClient):
return
existing_ch = self.channel_mgr.get_channel(channel_type)
if existing_ch and not cred_changed:
skip_restart = existing_ch and not cred_changed
if skip_restart and channel_type in ("weixin", "wx"):
login_status = getattr(existing_ch, "login_status", "")
if login_status != "logged_in":
skip_restart = False
logger.info(f"[CloudClient] Channel '{channel_type}' not logged in "
f"(status={login_status}), forcing restart")
if skip_restart:
logger.info(f"[CloudClient] Channel '{channel_type}' already running with same config, "
"skip restart, reporting status only")
threading.Thread(
@@ -255,7 +262,14 @@ class CloudClient(LinkAIClient):
).start()
else:
existing_ch = self.channel_mgr.get_channel(channel_type)
if existing_ch and not cred_changed:
needs_restart = cred_changed or not existing_ch
if not needs_restart and channel_type in ("weixin", "wx"):
login_status = getattr(existing_ch, "login_status", "")
if login_status != "logged_in":
needs_restart = True
logger.info(f"[CloudClient] Channel '{channel_type}' not logged in "
f"(status={login_status}), forcing restart")
if existing_ch and not needs_restart:
logger.info(f"[CloudClient] Channel '{channel_type}' already running with same config, "
"skip restart, reporting status only")
threading.Thread(

View File

@@ -17,8 +17,7 @@ RUN apt-get update \
&& cp config-template.json config.json \
&& /usr/local/bin/python -m pip install --no-cache --upgrade pip \
&& pip install --no-cache -r requirements.txt \
&& pip install --no-cache -r requirements-optional.txt \
&& pip install azure-cognitiveservices-speech
&& pip install --no-cache -r requirements-optional.txt
WORKDIR ${BUILD_PREFIX}
@@ -30,6 +29,4 @@ RUN chmod +x /entrypoint.sh \
&& useradd -r -g agent -s /bin/bash -d /home/agent agent \
&& chown -R agent:agent /home/agent ${BUILD_PREFIX} /usr/local/lib
USER agent
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -43,9 +43,15 @@ fi
# fi
# go to prefix dir
# fix ownership of mounted volumes then drop to non-root user
if [ "$(id -u)" = "0" ]; then
mkdir -p /home/agent/cow
chown agent:agent /home/agent/cow
exec su agent -s /bin/bash -c "cd $CHATGPT_ON_WECHAT_PREFIX && $CHATGPT_ON_WECHAT_EXEC"
fi
# fallback: already running as agent
cd $CHATGPT_ON_WECHAT_PREFIX
# excute
$CHATGPT_ON_WECHAT_EXEC

View File

@@ -1,9 +1,9 @@
---
title: 微信
description: 将 CowAgent 接入个人微信
description: 将 CowAgent 接入个人微信(基于官方接口)
---
> 接入个人微信,扫码登录即可使用,支持文本、图片、语音、文件、视频等消息的收发
> 接入个人微信,扫码登录即可使用,支持文本、图片、语音、文件、视频等消息的私聊收发。通过微信官方API进行接入无安全风险接入后会在会话中新增一个机器人助手不影响当前账号的使用
## 一、配置和运行

View File

@@ -3,7 +3,22 @@ title: DeepSeek
description: DeepSeek model configuration
---
Use OpenAI-compatible configuration:
Option 1: Native integration (recommended):
```json
{
"model": "deepseek-chat",
"deepseek_api_key": "YOUR_API_KEY"
}
```
| Parameter | Description |
| --- | --- |
| `model` | `deepseek-chat` (DeepSeek-V3.2, non-thinking mode), `deepseek-reasoner` (DeepSeek-R1, thinking mode) |
| `deepseek_api_key` | Create at [DeepSeek Platform](https://platform.deepseek.com/api_keys) |
| `deepseek_api_base` | Optional, defaults to `https://api.deepseek.com/v1`. Can be changed to a third-party proxy |
Option 2: OpenAI-compatible configuration:
```json
{
@@ -14,9 +29,4 @@ Use OpenAI-compatible configuration:
}
```
| Parameter | Description |
| --- | --- |
| `model` | `deepseek-chat` (DeepSeek-V3), `deepseek-reasoner` (DeepSeek-R1) |
| `bot_type` | Must be `openai` (OpenAI-compatible mode) |
| `open_ai_api_key` | Create at [DeepSeek Platform](https://platform.deepseek.com/api_keys) |
| `open_ai_api_base` | DeepSeek platform BASE URL |

View File

@@ -30,6 +30,8 @@ pip3 install -r requirements.txt
pip3 install -r requirements-optional.txt
```
> 国内网络可使用镜像源加速:`pip3 install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple`
### 3. 配置
复制配置文件模板并编辑:

View File

@@ -5,7 +5,7 @@ description: CowAgent 的升级方式说明
## 脚本升级(推荐)
如果使用 `run.sh` 管理服务,执行以下命令即可一键升级:
如果使用 `run.sh` 管理服务,在项目根目录执行以下命令即可一键升级:
```bash
./run.sh update

View File

@@ -3,7 +3,22 @@ title: DeepSeek
description: DeepSeekモデルの設定
---
OpenAI互換の設定を使用します:
方法1公式接続推奨
```json
{
"model": "deepseek-chat",
"deepseek_api_key": "YOUR_API_KEY"
}
```
| パラメータ | 説明 |
| --- | --- |
| `model` | `deepseek-chat`DeepSeek-V3.2、非思考モード)、`deepseek-reasoner`DeepSeek-R1、思考モード |
| `deepseek_api_key` | [DeepSeek Platform](https://platform.deepseek.com/api_keys)で作成 |
| `deepseek_api_base` | オプション、デフォルトは `https://api.deepseek.com/v1`。サードパーティプロキシに変更可能 |
方法2OpenAI互換方式
```json
{
@@ -13,10 +28,3 @@ OpenAI互換の設定を使用します:
"open_ai_api_base": "https://api.deepseek.com/v1"
}
```
| パラメータ | 説明 |
| --- | --- |
| `model` | `deepseek-chat` (DeepSeek-V3)、`deepseek-reasoner` (DeepSeek-R1) |
| `bot_type` | `openai`を指定OpenAI互換モード |
| `open_ai_api_key` | [DeepSeek Platform](https://platform.deepseek.com/api_keys)で作成 |
| `open_ai_api_base` | DeepSeekプラットフォームのBASE URL |

View File

@@ -3,20 +3,29 @@ title: DeepSeek
description: DeepSeek 模型配置
---
通过 OpenAI 兼容方式接入
方式一:官方接入(推荐)
```json
{
"model": "deepseek-chat",
"open_ai_api_key": "YOUR_API_KEY",
"open_ai_api_base": "https://api.deepseek.com/v1",
"bot_type": "openai"
"deepseek_api_key": "YOUR_API_KEY"
}
```
| 参数 | 说明 |
| --- | --- |
| `model` | `deepseek-chat`DeepSeek-V3、`deepseek-reasoner`DeepSeek-R1 |
| `bot_type` | 固定为 `openai`OpenAI 兼容方式) |
| `open_ai_api_key` | 在 [DeepSeek 平台](https://platform.deepseek.com/api_keys) 创建 |
| `open_ai_api_base` | DeepSeek 平台 BASE URL |
| `model` | `deepseek-chat`DeepSeek-V3.2,非思考模式)、`deepseek-reasoner`DeepSeek-R1,思考模式 |
| `deepseek_api_key` | 在 [DeepSeek 平台](https://platform.deepseek.com/api_keys) 创建 |
| `deepseek_api_base` | 可选,默认为 `https://api.deepseek.com/v1`,可修改为第三方代理地址 |
方式二OpenAI 兼容方式接入:
```json
{
"model": "deepseek-chat",
"bot_type": "openai",
"open_ai_api_key": "YOUR_API_KEY",
"open_ai_api_base": "https://api.deepseek.com/v1"
}
```

View File

@@ -17,7 +17,11 @@ def create_bot(bot_type):
from models.baidu.baidu_wenxin import BaiduWenxinBot
return BaiduWenxinBot()
elif bot_type in (const.OPENAI, const.CHATGPT, const.DEEPSEEK): # OpenAI-compatible API
elif bot_type == const.DEEPSEEK:
from models.deepseek.deepseek_bot import DeepSeekBot
return DeepSeekBot()
elif bot_type in (const.OPENAI, const.CHATGPT): # OpenAI-compatible API
from models.chatgpt.chat_gpt_bot import ChatGPTBot
return ChatGPTBot()

View File

View File

@@ -0,0 +1,160 @@
# encoding:utf-8
"""
DeepSeek Bot — fully OpenAI-compatible, uses its own API key / base config.
"""
import time
import requests
from models.bot import Bot
from models.openai_compatible_bot import OpenAICompatibleBot
from models.session_manager import SessionManager
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
from common import const
from common.log import logger
from config import conf, load_config
from .deepseek_session import DeepSeekSession
DEFAULT_API_BASE = "https://api.deepseek.com/v1"
class DeepSeekBot(Bot, OpenAICompatibleBot):
def __init__(self):
super().__init__()
self.sessions = SessionManager(
DeepSeekSession,
model=conf().get("model") or const.DEEPSEEK_CHAT,
)
conf_model = conf().get("model") or const.DEEPSEEK_CHAT
self.args = {
"model": conf_model,
"temperature": conf().get("temperature", 0.7),
"top_p": conf().get("top_p", 1.0),
"frequency_penalty": conf().get("frequency_penalty", 0.0),
"presence_penalty": conf().get("presence_penalty", 0.0),
}
# ---------- config helpers ----------
@property
def api_key(self):
return conf().get("deepseek_api_key") or conf().get("open_ai_api_key")
@property
def api_base(self):
url = (
conf().get("deepseek_api_base")
or conf().get("open_ai_api_base")
or DEFAULT_API_BASE
)
return url.rstrip("/")
def get_api_config(self):
"""OpenAICompatibleBot interface — used by call_with_tools()."""
return {
"api_key": self.api_key,
"api_base": self.api_base,
"model": conf().get("model", const.DEEPSEEK_CHAT),
"default_temperature": conf().get("temperature", 0.7),
"default_top_p": conf().get("top_p", 1.0),
"default_frequency_penalty": conf().get("frequency_penalty", 0.0),
"default_presence_penalty": conf().get("presence_penalty", 0.0),
}
# ---------- simple chat (non-agent mode) ----------
def reply(self, query, context=None):
if context.type == ContextType.TEXT:
logger.info("[DEEPSEEK] query={}".format(query))
session_id = context["session_id"]
reply = None
clear_memory_commands = conf().get("clear_memory_commands", ["#清除记忆"])
if query in clear_memory_commands:
self.sessions.clear_session(session_id)
reply = Reply(ReplyType.INFO, "记忆已清除")
elif query == "#清除所有":
self.sessions.clear_all_session()
reply = Reply(ReplyType.INFO, "所有人记忆已清除")
elif query == "#更新配置":
load_config()
reply = Reply(ReplyType.INFO, "配置已更新")
if reply:
return reply
session = self.sessions.session_query(query, session_id)
logger.debug("[DEEPSEEK] session query={}".format(session.messages))
new_args = self.args.copy()
reply_content = self.reply_text(session, args=new_args)
logger.debug(
"[DEEPSEEK] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format(
session.messages, session_id,
reply_content["content"], reply_content["completion_tokens"],
)
)
if reply_content["completion_tokens"] == 0 and len(reply_content["content"]) > 0:
reply = Reply(ReplyType.ERROR, reply_content["content"])
elif reply_content["completion_tokens"] > 0:
self.sessions.session_reply(
reply_content["content"], session_id, reply_content["total_tokens"],
)
reply = Reply(ReplyType.TEXT, reply_content["content"])
else:
reply = Reply(ReplyType.ERROR, reply_content["content"])
logger.debug("[DEEPSEEK] reply {} used 0 tokens.".format(reply_content))
return reply
else:
reply = Reply(ReplyType.ERROR, "Bot不支持处理{}类型的消息".format(context.type))
return reply
def reply_text(self, session, args=None, retry_count: int = 0) -> dict:
try:
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + self.api_key,
}
body = args.copy()
body["messages"] = session.messages
res = requests.post(
f"{self.api_base}/chat/completions",
headers=headers,
json=body,
timeout=180,
)
if res.status_code == 200:
response = res.json()
return {
"total_tokens": response["usage"]["total_tokens"],
"completion_tokens": response["usage"]["completion_tokens"],
"content": response["choices"][0]["message"]["content"],
}
else:
response = res.json()
error = response.get("error", {})
logger.error(
f"[DEEPSEEK] chat failed, status_code={res.status_code}, "
f"msg={error.get('message')}, type={error.get('type')}"
)
result = {"completion_tokens": 0, "content": "提问太快啦,请休息一下再问我吧"}
need_retry = False
if res.status_code >= 500:
need_retry = retry_count < 2
elif res.status_code == 401:
result["content"] = "授权失败请检查API Key是否正确"
elif res.status_code == 429:
result["content"] = "请求过于频繁,请稍后再试"
need_retry = retry_count < 2
if need_retry:
time.sleep(3)
return self.reply_text(session, args, retry_count + 1)
return result
except Exception as e:
logger.exception(e)
if retry_count < 2:
return self.reply_text(session, args, retry_count + 1)
return {"completion_tokens": 0, "content": "我现在有点累了,等会再来吧"}

View File

@@ -0,0 +1,57 @@
from models.session_manager import Session
from common.log import logger
class DeepSeekSession(Session):
def __init__(self, session_id, system_prompt=None, model="deepseek-chat"):
super().__init__(session_id, system_prompt)
self.model = model
self.reset()
def discard_exceeding(self, max_tokens, cur_tokens=None):
precise = True
try:
cur_tokens = self.calc_tokens()
except Exception as e:
precise = False
if cur_tokens is None:
raise e
logger.debug("Exception when counting tokens precisely for query: {}".format(e))
while cur_tokens > max_tokens:
if len(self.messages) > 2:
self.messages.pop(1)
elif len(self.messages) == 2 and self.messages[1]["role"] == "assistant":
self.messages.pop(1)
if precise:
cur_tokens = self.calc_tokens()
else:
cur_tokens = cur_tokens - max_tokens
break
elif len(self.messages) == 2 and self.messages[1]["role"] == "user":
logger.warn("user message exceed max_tokens. total_tokens={}".format(cur_tokens))
break
else:
logger.debug("max_tokens={}, total_tokens={}, len(messages)={}".format(
max_tokens, cur_tokens, len(self.messages)))
break
if precise:
cur_tokens = self.calc_tokens()
else:
cur_tokens = cur_tokens - max_tokens
return cur_tokens
def calc_tokens(self):
return num_tokens_from_messages(self.messages, self.model)
def num_tokens_from_messages(messages, model):
tokens = 0
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
tokens += len(content)
elif isinstance(content, list):
for block in content:
if isinstance(block, dict):
tokens += len(block.get("text", ""))
return tokens

View File

@@ -34,6 +34,13 @@ class GoogleGeminiBot(Bot):
def api_key(self):
return conf().get("gemini_api_key")
@property
def model(self):
model_name = conf().get("model") or "gemini-3.1-pro-preview"
if model_name == "gemini":
model_name = "gemini-3.1-pro-preview"
return model_name
@property
def api_base(self):
base = conf().get("gemini_api_base", "").strip()

19
pyproject.toml Normal file
View File

@@ -0,0 +1,19 @@
[build-system]
requires = ["setuptools>=68.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cowagent"
version = "0.0.1"
description = "CowAgent - AI Agent on WeChat and more"
requires-python = ">=3.9"
dependencies = [
"click>=8.0",
"requests>=2.28.2",
]
[project.scripts]
cow = "cli.cli:main"
[tool.setuptools.packages.find]
include = ["cli*"]

View File

@@ -2,14 +2,8 @@ tiktoken>=0.3.2 # openai calculate token
#voice
pydub>=0.25.1 # need ffmpeg
SpeechRecognition # google speech to text
gTTS>=2.3.1 # google text to speech
pyttsx3>=2.90 # pytsx text to speech
baidu_aip>=4.16.10 # baidu voice
azure-cognitiveservices-speech # azure voice
edge-tts # edge-tts
numpy<=1.24.2
langid # language detect
elevenlabs==1.0.3 # elevenlabs TTS
#install plugin
@@ -18,18 +12,9 @@ dulwich
# xunfei spark
websocket-client==1.2.0
# claude API
anthropic==0.25.0
# tongyi qwen
broadscope_bailian
# google
google-generativeai
# tencentcloud sdk
tencentcloud-sdk-python>=3.0.0
# file parsing (web_fetch document support)
pypdf
python-docx

13
run.sh
View File

@@ -242,6 +242,17 @@ install_dependencies() {
fi
rm -f /tmp/pip_install.log
# Register `cow` CLI command via editable install
echo -e "${YELLOW}Registering cow CLI...${NC}"
set +e
$PYTHON_CMD -m pip install -e . $PIP_EXTRA_ARGS $PIP_MIRROR > /dev/null 2>&1
if command -v cow &> /dev/null; then
echo -e "${GREEN}✅ cow CLI registered.${NC}"
else
echo -e "${YELLOW}⚠️ cow CLI not in PATH, you can still use: $PYTHON_CMD -m cli.cli${NC}"
fi
set -e
}
# Select model
@@ -603,7 +614,7 @@ ensure_python_cmd() {
# Get service PID (empty string if not running)
get_pid() {
ensure_python_cmd > /dev/null 2>&1
ps ax | grep -i app.py | grep "${BASE_DIR}" | grep "$PYTHON_CMD" | grep -v grep | awk '{print $1}' | grep -E '^[0-9]+$'
ps ax | grep -i app.py | grep "${BASE_DIR}" | grep "$PYTHON_CMD" | grep -v grep | awk '{print $1}' | grep -E '^[0-9]+$' | head -1
}
# Check if service is running

447
scripts/run.ps1 Normal file
View File

@@ -0,0 +1,447 @@
#Requires -Version 5.1
<#
.SYNOPSIS
CowAgent installer & management script for Windows.
.DESCRIPTION
One-liner install:
irm https://raw.githubusercontent.com/zhayujie/chatgpt-on-wechat/master/scripts/run.ps1 | iex
Or from a local clone:
.\scripts\run.ps1 # install / configure
.\scripts\run.ps1 start # start service (delegates to cow CLI)
.\scripts\run.ps1 stop|restart|status|logs|config|update|help
#>
param(
[Parameter(Position = 0)]
[string]$Command = ""
)
$ErrorActionPreference = "Stop"
# ── colours ──────────────────────────────────────────────────────
function Write-Cow { param([string]$M) Write-Host $M -ForegroundColor Green }
function Write-Warn { param([string]$M) Write-Host $M -ForegroundColor Yellow }
function Write-Err { param([string]$M) Write-Host $M -ForegroundColor Red }
function Write-Info { param([string]$M) Write-Host $M -ForegroundColor Cyan }
# ── detect project directory ─────────────────────────────────────
$ScriptDir = if ($PSScriptRoot) { $PSScriptRoot } else { $PWD.Path }
$BaseDir = Split-Path $ScriptDir -Parent
$IsProjectDir = (Test-Path "$BaseDir\app.py") -and (Test-Path "$BaseDir\config-template.json")
if (-not $IsProjectDir) {
$BaseDir = $PWD.Path
$IsProjectDir = (Test-Path "$BaseDir\app.py") -and (Test-Path "$BaseDir\config-template.json")
}
# ── Python detection ─────────────────────────────────────────────
function Find-Python {
foreach ($cmd in @("python3", "python")) {
$bin = Get-Command $cmd -ErrorAction SilentlyContinue
if (-not $bin) { continue }
try {
$ver = & $bin.Source -c "import sys; v=sys.version_info; print(f'{v.major}.{v.minor}')" 2>$null
$parts = $ver -split '\.'
$major = [int]$parts[0]; $minor = [int]$parts[1]
if ($major -eq 3 -and $minor -ge 9 -and $minor -le 13) {
return $bin.Source
}
} catch {}
}
return $null
}
$PythonCmd = Find-Python
function Assert-Python {
if (-not $PythonCmd) {
Write-Err "Python 3.9-3.13 not found. Please install from https://www.python.org/downloads/"
exit 1
}
Write-Cow "Found Python: $PythonCmd"
}
# ── clone project ────────────────────────────────────────────────
function Install-Project {
if (Test-Path "chatgpt-on-wechat") {
Write-Warn "Directory 'chatgpt-on-wechat' already exists."
$choice = Read-Host "Overwrite(o), backup(b), or quit(q)? [default: b]"
if (-not $choice) { $choice = "b" }
switch ($choice.ToLower()) {
"o" { Remove-Item -Recurse -Force "chatgpt-on-wechat" }
"b" {
$backup = "chatgpt-on-wechat_backup_$(Get-Date -Format 'yyyyMMddHHmmss')"
Rename-Item "chatgpt-on-wechat" $backup
Write-Cow "Backed up to '$backup'"
}
"q" { Write-Err "Installation cancelled."; exit 1 }
default { Write-Err "Invalid choice."; exit 1 }
}
}
$gitBin = Get-Command git -ErrorAction SilentlyContinue
if (-not $gitBin) {
Write-Err "Git not found. Please install from https://git-scm.com/download/win"
exit 1
}
Write-Cow "Cloning CowAgent project..."
git clone https://github.com/zhayujie/chatgpt-on-wechat.git 2>$null
if ($LASTEXITCODE -ne 0) {
Write-Warn "GitHub failed, trying Gitee..."
git clone https://gitee.com/zhayujie/chatgpt-on-wechat.git
if ($LASTEXITCODE -ne 0) {
Write-Err "Clone failed. Check your network."
exit 1
}
}
Set-Location "chatgpt-on-wechat"
$script:BaseDir = $PWD.Path
$script:IsProjectDir = $true
Write-Cow "Project cloned: $BaseDir"
}
# ── install dependencies ─────────────────────────────────────────
function Install-Dependencies {
Write-Cow "Installing dependencies..."
& $PythonCmd -m pip install --upgrade pip setuptools wheel 2>$null | Out-Null
& $PythonCmd -m pip install -r "$BaseDir\requirements.txt" 2>&1 | ForEach-Object { Write-Host $_ }
if ($LASTEXITCODE -ne 0) {
Write-Warn "Some dependencies may have issues, but continuing..."
}
Write-Cow "Registering cow CLI..."
& $PythonCmd -m pip install -e $BaseDir 2>$null | Out-Null
$cowBin = Get-Command cow -ErrorAction SilentlyContinue
if ($cowBin) {
Write-Cow "cow CLI registered."
} else {
Write-Warn "cow CLI not in PATH. You can use: $PythonCmd -m cli.cli"
}
}
# ── model selection ──────────────────────────────────────────────
$ModelChoices = @{
"1" = @{ Provider = "MiniMax"; Default = "MiniMax-M2.7"; Key = "MINIMAX_KEY" }
"2" = @{ Provider = "Zhipu AI"; Default = "glm-5-turbo"; Key = "ZHIPU_KEY" }
"3" = @{ Provider = "Kimi (Moonshot)"; Default = "kimi-k2.5"; Key = "MOONSHOT_KEY" }
"4" = @{ Provider = "Doubao (Volcengine Ark)"; Default = "doubao-seed-2-0-code-preview-260215"; Key = "ARK_KEY" }
"5" = @{ Provider = "Qwen (DashScope)"; Default = "qwen3.5-plus"; Key = "DASHSCOPE_KEY" }
"6" = @{ Provider = "Claude"; Default = "claude-sonnet-4-6"; Key = "CLAUDE_KEY"; Base = "https://api.anthropic.com/v1" }
"7" = @{ Provider = "Gemini"; Default = "gemini-3.1-pro-preview"; Key = "GEMINI_KEY"; Base = "https://generativelanguage.googleapis.com" }
"8" = @{ Provider = "OpenAI GPT"; Default = "gpt-5.4"; Key = "OPENAI_KEY"; Base = "https://api.openai.com/v1" }
"9" = @{ Provider = "LinkAI"; Default = "MiniMax-M2.7"; Key = "LINKAI_KEY" }
}
function Select-Model {
Write-Info "========================================="
Write-Info " Select AI Model"
Write-Info "========================================="
Write-Host "1) MiniMax (MiniMax-M2.7, MiniMax-M2.5, etc.)"
Write-Host "2) Zhipu AI (glm-5-turbo, glm-5, etc.)"
Write-Host "3) Kimi (kimi-k2.5, kimi-k2, etc.)"
Write-Host "4) Doubao (doubao-seed-2-0-code-preview-260215, etc.)"
Write-Host "5) Qwen (qwen3.5-plus, qwen3-max, qwq-plus, etc.)"
Write-Host "6) Claude (claude-sonnet-4-6, claude-opus-4-6, etc.)"
Write-Host "7) Gemini (gemini-3.1-flash-lite-preview, gemini-3.1-pro-preview, etc.)"
Write-Host "8) OpenAI GPT (gpt-5.4, gpt-5.2, gpt-4.1, etc.)"
Write-Host "9) LinkAI (access multiple models via one API)"
Write-Host ""
do {
$choice = Read-Host "Enter your choice [default: 1 - MiniMax]"
if (-not $choice) { $choice = "1" }
} while ($choice -notmatch '^[1-9]$')
$m = $ModelChoices[$choice]
Write-Cow "Configuring $($m.Provider)..."
$script:ApiKey = Read-Host "Enter $($m.Provider) API Key"
$model = Read-Host "Enter model name [default: $($m.Default)]"
if (-not $model) { $model = $m.Default }
$script:ModelName = $model
$script:KeyName = $m.Key
$script:UseLinkai = ($choice -eq "9")
if ($m.Base) {
$base = Read-Host "Enter API Base URL [default: $($m.Base)]"
if (-not $base) { $base = $m.Base }
$script:ApiBase = $base
} else {
$script:ApiBase = ""
}
$script:ModelChoice = $choice
}
# ── channel selection ────────────────────────────────────────────
function Select-Channel {
Write-Host ""
Write-Info "========================================="
Write-Info " Select Communication Channel"
Write-Info "========================================="
Write-Host "1) Weixin"
Write-Host "2) Feishu"
Write-Host "3) DingTalk"
Write-Host "4) WeCom Bot"
Write-Host "5) QQ"
Write-Host "6) WeCom App"
Write-Host "7) Web"
Write-Host ""
do {
$choice = Read-Host "Enter your choice [default: 1 - Weixin]"
if (-not $choice) { $choice = "1" }
} while ($choice -notmatch '^[1-7]$')
$script:ChannelExtra = @{}
switch ($choice) {
"1" { $script:ChannelType = "weixin" }
"2" {
$script:ChannelType = "feishu"
$script:ChannelExtra["feishu_app_id"] = Read-Host "Enter Feishu App ID"
$script:ChannelExtra["feishu_app_secret"] = Read-Host "Enter Feishu App Secret"
}
"3" {
$script:ChannelType = "dingtalk"
$script:ChannelExtra["dingtalk_client_id"] = Read-Host "Enter DingTalk Client ID"
$script:ChannelExtra["dingtalk_client_secret"] = Read-Host "Enter DingTalk Client Secret"
}
"4" {
$script:ChannelType = "wecom_bot"
$script:ChannelExtra["wecom_bot_id"] = Read-Host "Enter WeCom Bot ID"
$script:ChannelExtra["wecom_bot_secret"] = Read-Host "Enter WeCom Bot Secret"
}
"5" {
$script:ChannelType = "qq"
$script:ChannelExtra["qq_app_id"] = Read-Host "Enter QQ App ID"
$script:ChannelExtra["qq_app_secret"] = Read-Host "Enter QQ App Secret"
}
"6" {
$script:ChannelType = "wechatcom_app"
$script:ChannelExtra["wechatcom_corp_id"] = Read-Host "Enter WeChat Corp ID"
$script:ChannelExtra["wechatcomapp_token"] = Read-Host "Enter WeChat Com App Token"
$script:ChannelExtra["wechatcomapp_secret"] = Read-Host "Enter WeChat Com App Secret"
$script:ChannelExtra["wechatcomapp_agent_id"] = Read-Host "Enter WeChat Com App Agent ID"
$script:ChannelExtra["wechatcomapp_aes_key"] = Read-Host "Enter WeChat Com App AES Key"
$port = Read-Host "Enter port [default: 9898]"
if (-not $port) { $port = "9898" }
$script:ChannelExtra["wechatcomapp_port"] = [int]$port
}
"7" {
$script:ChannelType = "web"
$port = Read-Host "Enter web port [default: 9899]"
if (-not $port) { $port = "9899" }
$script:ChannelExtra["web_port"] = [int]$port
}
}
}
# ── generate config.json ─────────────────────────────────────────
function New-ConfigFile {
Write-Cow "Generating config.json..."
$config = [ordered]@{
channel_type = $ChannelType
model = $ModelName
open_ai_api_key = ""
open_ai_api_base = "https://api.openai.com/v1"
claude_api_key = ""
claude_api_base = "https://api.anthropic.com/v1"
gemini_api_key = ""
gemini_api_base = "https://generativelanguage.googleapis.com"
zhipu_ai_api_key = ""
moonshot_api_key = ""
ark_api_key = ""
dashscope_api_key = ""
minimax_api_key = ""
voice_to_text = "openai"
text_to_voice = "openai"
voice_reply_voice = $false
speech_recognition = $true
group_speech_recognition = $false
use_linkai = $UseLinkai
linkai_api_key = ""
linkai_app_code = ""
agent = $true
agent_max_context_tokens = 40000
agent_max_context_turns = 30
agent_max_steps = 15
}
# Set the correct API key field
$keyMap = @{
OPENAI_KEY = "open_ai_api_key"
CLAUDE_KEY = "claude_api_key"
GEMINI_KEY = "gemini_api_key"
ZHIPU_KEY = "zhipu_ai_api_key"
MOONSHOT_KEY = "moonshot_api_key"
ARK_KEY = "ark_api_key"
DASHSCOPE_KEY = "dashscope_api_key"
MINIMAX_KEY = "minimax_api_key"
LINKAI_KEY = "linkai_api_key"
}
if ($keyMap.ContainsKey($KeyName)) {
$config[$keyMap[$KeyName]] = $ApiKey
}
# Set API base if provided
$baseMap = @{
"6" = "claude_api_base"
"7" = "gemini_api_base"
"8" = "open_ai_api_base"
}
if ($ApiBase -and $baseMap.ContainsKey($ModelChoice)) {
$config[$baseMap[$ModelChoice]] = $ApiBase
}
# Merge channel-specific fields
foreach ($k in $ChannelExtra.Keys) {
$config[$k] = $ChannelExtra[$k]
}
$config | ConvertTo-Json -Depth 5 | Set-Content -Path "$BaseDir\config.json" -Encoding UTF8
Write-Cow "Configuration file created."
}
# ── start via cow CLI ─────────────────────────────────────────────
function Start-CowAgent {
Write-Cow "Starting CowAgent..."
$cowBin = Get-Command cow -ErrorAction SilentlyContinue
if ($cowBin) {
& cow start
} else {
Write-Warn "cow CLI not found, starting directly..."
& $PythonCmd "$BaseDir\app.py"
}
}
# ── delegate management commands to cow CLI ──────────────────────
function Invoke-CowCommand {
param([string]$Cmd)
$cowBin = Get-Command cow -ErrorAction SilentlyContinue
if ($cowBin) {
& cow $Cmd
} else {
Write-Err "cow CLI not found. Run this script without arguments first to install."
exit 1
}
}
# ── usage ─────────────────────────────────────────────────────────
function Show-Usage {
Write-Info "========================================="
Write-Info " CowAgent Management Script (Windows)"
Write-Info "========================================="
Write-Host ""
Write-Host "Usage:"
Write-Host " .\run.ps1 # Install / Configure"
Write-Host " .\run.ps1 <command> # Management command"
Write-Host ""
Write-Host "Commands:"
Write-Host " start Start the service"
Write-Host " stop Stop the service"
Write-Host " restart Restart the service"
Write-Host " status Check service status"
Write-Host " logs View logs"
Write-Host " config Reconfigure project"
Write-Host " update Update and restart"
Write-Host " help Show this message"
Write-Host ""
}
# ── install mode ──────────────────────────────────────────────────
function Install-Mode {
Clear-Host
Write-Info "========================================="
Write-Info " CowAgent Installation (Windows)"
Write-Info "========================================="
Write-Host ""
if ($IsProjectDir) {
Write-Cow "Detected existing project directory."
if (Test-Path "$BaseDir\config.json") {
Write-Cow "Project already configured."
Write-Host ""
Show-Usage
return
}
Write-Warn "No config.json found. Let's configure your project!"
Write-Host ""
Assert-Python
} else {
Assert-Python
Install-Project
}
Install-Dependencies
Select-Model
Select-Channel
New-ConfigFile
Write-Host ""
$startNow = Read-Host "Start CowAgent now? [Y/n]"
if ($startNow -ne "n" -and $startNow -ne "N") {
Start-CowAgent
} else {
Write-Cow "Installation complete!"
Write-Host ""
Write-Host "To start manually:"
Write-Host " cd $BaseDir"
Write-Host " cow start"
}
}
# ── update ────────────────────────────────────────────────────────
function Update-Project {
Write-Cow "Updating CowAgent..."
Set-Location $BaseDir
# Stop if running
$cowBin = Get-Command cow -ErrorAction SilentlyContinue
if ($cowBin) { & cow stop 2>$null }
if (Test-Path "$BaseDir\.git") {
Write-Cow "Pulling latest code..."
git pull 2>$null
if ($LASTEXITCODE -ne 0) {
Write-Warn "GitHub failed, trying Gitee..."
git remote set-url origin https://gitee.com/zhayujie/chatgpt-on-wechat.git
git pull
}
} else {
Write-Warn "Not a git repository, skipping code update."
}
Assert-Python
Install-Dependencies
Start-CowAgent
}
# ── main ──────────────────────────────────────────────────────────
switch ($Command.ToLower()) {
"" { Install-Mode }
"start" { Invoke-CowCommand "start" }
"stop" { Invoke-CowCommand "stop" }
"restart" { Invoke-CowCommand "restart" }
"status" { Invoke-CowCommand "status" }
"logs" { Invoke-CowCommand "logs" }
"config" {
Assert-Python
Install-Dependencies
Select-Model
Select-Channel
New-ConfigFile
$r = Read-Host "Restart service now? [Y/n]"
if ($r -ne "n" -and $r -ne "N") { Invoke-CowCommand "restart" }
}
"update" { Update-Project }
"help" { Show-Usage }
default {
Write-Err "Unknown command: $Command"
Show-Usage
exit 1
}
}

View File

@@ -4,6 +4,7 @@ description: Call LinkAI applications and workflows. Use bash with curl to invok
homepage: https://link-ai.tech
metadata:
emoji: 🤖
default_enabled: false
requires:
bins: ["curl"]
env: ["LINKAI_API_KEY"]