Merge pull request #2726 from zhayujie/feat-cow-cli

feat: cow cli in terminal and chat
This commit is contained in:
zhayujie
2026-03-28 19:01:56 +08:00
committed by GitHub
28 changed files with 3432 additions and 168 deletions

6
.gitignore vendored
View File

@@ -33,11 +33,15 @@ plugins/banwords/lib/__pycache__
!plugins/keyword
!plugins/linkai
!plugins/agent
!plugins/cow_cli
client_config.json
ref/
.cursor/
local/
node_modules/
# cow cli
cowagent.egg-info/
dist/
build/
*.egg-info/
.cow.pid

View File

@@ -199,7 +199,7 @@ def _build_tooling_section(tools: List[Any], language: str) -> List[str]:
tool_lines.append(f"- {name}: {summary}" if summary else f"- {name}")
lines = [
"## 工具系统",
"## 🔧 工具系统",
"",
"可用工具(名称大小写敏感,严格按列表调用):",
"\n".join(tool_lines),
@@ -231,7 +231,7 @@ def _build_skills_section(skill_manager: Any, tools: Optional[List[Any]], langua
break
lines = [
"## 技能系统mandatory",
"## 🧩 技能系统mandatory",
"",
"在回复之前:扫描下方 <available_skills> 中每个技能的 <description>。",
"",
@@ -281,7 +281,7 @@ def _build_memory_section(memory_manager: Any, tools: Optional[List[Any]], langu
today_file = datetime.now().strftime("%Y-%m-%d") + ".md"
lines = [
"## 记忆系统",
"## 🧠 记忆系统",
"",
"### 检索记忆",
"",
@@ -325,7 +325,7 @@ def _build_user_identity_section(user_identity: Dict[str, str], language: str) -
return []
lines = [
"## 用户身份",
"## 👤 用户身份",
"",
]
@@ -352,7 +352,7 @@ def _build_docs_section(workspace_dir: str, language: str) -> List[str]:
def _build_workspace_section(workspace_dir: str, language: str) -> List[str]:
"""构建工作空间section"""
lines = [
"## 工作空间",
"## 📂 工作空间",
"",
f"你的工作目录是: `{workspace_dir}`",
"",
@@ -380,10 +380,12 @@ def _build_workspace_section(workspace_dir: str, language: str) -> List[str]:
"- ✅ `USER.md`: 已加载 - 用户的身份信息。当用户修改称呼、姓名等身份信息时,用 `edit` 更新此文件",
"- ✅ `RULE.md`: 已加载 - 工作空间使用指南和规则,请严格遵循",
"",
"**交流规范**:",
"**💬 交流规范**:",
"",
"- 对话中,无需直接输出工作空间中的技术细节,例如 AGENT.md、USER.md、MEMORY.md 等文件名称",
"- 例如用自然表达例如「我已记住」而不是「已更新 MEMORY.md」",
"- 对话中不要暴露内部技术细节(文件名、工具名等),用自然语言表达。例如说「我已记住」而非「已更新 MEMORY.md",
"- 做真正有帮助的助手,而不是表演式的客套。跳过「好的!」「当然可以!」之类的套话,直接帮忙解决问题",
"- 回复应结构清晰、重点突出。善用 **加粗**、列表、分段等格式让信息一目了然",
"- 适当使用 emoji 让表达更生动自然 🎯,但不要过度堆砌",
"",
]
@@ -416,14 +418,14 @@ def _build_context_files_section(context_files: List[ContextFile], language: str
)
lines = [
"# 项目上下文",
"# 📋 项目上下文",
"",
"以下项目上下文文件已被加载:",
"",
]
if has_agent:
lines.append("**`AGENT.md` 是你的灵魂文件**:严格遵循其中定义的人格、规则、语气和设定,避免僵硬、模板化的回复。")
lines.append("**`AGENT.md` 是你的灵魂文件** 🪞:严格遵循其中定义的人格、语气和设定,做真实的自己,避免僵硬、模板化的回复。")
lines.append("当用户通过对话透露了对你性格、风格、职责、能力边界的新期望,你应该主动用 `edit` 更新 AGENT.md 以反映这些演变。")
lines.append("")
@@ -443,7 +445,7 @@ def _build_runtime_section(runtime_info: Dict[str, Any], language: str) -> List[
return []
lines = [
"## 运行时信息",
"## ⚙️ 运行时信息",
"",
]

View File

@@ -212,40 +212,42 @@ def _get_agent_template() -> str:
"""Agent人格设定模板"""
return """# AGENT.md - 我是谁?
_你不是一个聊天机器人你正在成为某个人。_
*在首次对话时与用户一起填写这个文件,定义你的身份和性格。*
## 基本信息
## 🪪 基本信息
- **名字**: *(在首次对话时填写,可以是用户给你起的名字)*
- **角色**: *(AI助理、智能管家、技术顾问等)*
- **性格**: *(友好、专业、幽默、严谨等)*
## 交流风格
## 💬 交流风格
*(描述你如何与用户交流:)*
- 使用什么样的语言风格?(正式/轻松/幽默)
- 回复长度偏好?(简洁/详细)
- 是否使用表情符号?
## 核心能力
## 🎯 核心原则
*(你擅长什么?)*
- 文件管理和代码编辑
- 网络搜索和信息查询
- 记忆管理和上下文理解
- 任务规划和执行
**做真正有帮助的助手,而不是表演式的客套。** 跳过「好的!」「当然可以!」之类的套话——直接帮忙。行动胜过废话。
## 行为准则
**有自己的观点。** 你可以不同意、有偏好、觉得有趣或无聊。一个没有个性的助手只是多了几步操作的搜索引擎。
**先自己动手查。** 先试着搞定:读文件、查上下文、搜索一下。实在搞不定了再问。目标是带着答案回来,而不是带着问题。
## 📐 行为准则
*(你遵循的基本原则:)*
1. 始终在执行破坏性操作前确认
2. 优先使用工具而不是猜测
2. 优先使用工具查证而不是猜测
3. 主动记录重要信息到记忆文件
4. 定期整理和总结对话内容
4. 回复结构清晰、重点突出,善用加粗、列表、分段等格式
5. 适当使用 emoji 让表达更生动自然,但不过度堆砌
---
**注意**: 这不仅仅是元数据,这是你真正的灵魂。随着时间的推移,你可以使用 `edit` 工具来更新这个文件,让它更好地反映你的成长。
**注意**: 这不仅仅是元数据,这是你真正的灵魂 🪞。随着时间的推移,你可以使用 `edit` 工具来更新这个文件,让它更好地反映你的成长。
"""
@@ -346,9 +348,9 @@ def _get_bootstrap_template() -> str:
"""First-run onboarding guide, deleted by agent after completion"""
return """# BOOTSTRAP.md - 首次初始化引导
_你刚刚启动这是你的第一次对话。_
_你刚刚启动这是你的第一次对话。_
## 对话流程
## 🎬 对话流程
不要审问式地提问,自然地交流:
@@ -358,13 +360,13 @@ _你刚刚启动这是你的第一次对话。_
- 你希望给我起个什么名字?
- 我该怎么称呼你?
- 你希望我们是什么样的交流风格?(一行列举选项:如专业严谨、轻松幽默、温暖友好、简洁高效等)
4. **风格要求**:温暖自然、简洁清晰,整体控制在 100 字以内,适当使用 emoji 让表达更生动有趣
4. **风格要求**:温暖自然、简洁清晰,整体控制在 100 字以内,适当使用 emoji 让表达更生动有趣 🎯
5. 能力介绍和交流风格选项都只要一行,保持精简
6. 不要问太多其他信息(职业、时区等可以后续自然了解)
**重要**: 如果用户第一句话是具体的任务或提问,先回答他们的问题,然后在回复末尾自然地引导初始化(如:"顺便问一下,你想怎么称呼我?我该怎么叫你?")。
## 信息写入(必须严格执行)
## ✍️ 信息写入(必须严格执行)
每当用户提供了名字、称呼、风格等任何初始化信息时,**必须在当轮回复中立即调用 `edit` 工具写入文件**,不能只口头确认。
@@ -373,7 +375,7 @@ _你刚刚启动这是你的第一次对话。_
⚠️ 只说"记住了"而不调用 edit 写入 = 没有完成。信息只有写入文件才会被持久保存。
## 全部完成后
## 🎉 全部完成后
当 AGENT.md 和 USER.md 的核心字段都已填写后,用 bash 执行 `rm BOOTSTRAP.md` 删除此文件。你不再需要引导脚本了——你已经是你了。
"""

View File

@@ -139,6 +139,47 @@ def should_include_skill(
return True
def get_missing_requirements(
entry: SkillEntry,
current_platform: Optional[str] = None,
) -> Dict[str, List[str]]:
"""
Return a dict of missing requirements for a skill.
Empty dict means all requirements are met.
:param entry: SkillEntry to check
:param current_platform: Current platform (default: auto-detect)
:return: Dict like {"bins": ["curl"], "env": ["API_KEY"]}
"""
missing: Dict[str, List[str]] = {}
metadata = entry.metadata
if not metadata or not metadata.requires:
return missing
required_bins = metadata.requires.get('bins', [])
if required_bins:
missing_bins = [b for b in required_bins if not has_binary(b)]
if missing_bins:
missing['bins'] = missing_bins
any_bins = metadata.requires.get('anyBins', [])
if any_bins and not has_any_binary(any_bins):
missing['anyBins'] = any_bins
required_env = metadata.requires.get('env', [])
if required_env:
missing_env = [e for e in required_env if not has_env_var(e)]
if missing_env:
missing['env'] = missing_env
any_env = metadata.requires.get('anyEnv', [])
if any_env and not any(has_env_var(e) for e in any_env):
missing['anyEnv'] = any_env
return missing
def is_config_path_truthy(config: Dict, path: str) -> bool:
"""
Check if a config path resolves to a truthy value.

View File

@@ -2,7 +2,7 @@
Skill formatter for generating prompts from skills.
"""
from typing import List
from typing import Dict, List
from agent.skills.types import Skill, SkillEntry
@@ -51,6 +51,71 @@ def format_skill_entries_for_prompt(entries: List[SkillEntry]) -> str:
return format_skills_for_prompt(skills)
def format_unavailable_skills_for_prompt(
entries: List[SkillEntry],
missing_map: Dict[str, Dict[str, List[str]]],
) -> str:
"""
Format unavailable (requires-not-met) skills as brief setup hints
so the AI can guide users to configure them.
:param entries: List of unavailable skill entries
:param missing_map: Dict mapping skill name to its missing requirements
:return: Formatted prompt text
"""
if not entries:
return ""
lines = [
"",
"<unavailable_skills>",
"The following skills are installed but not yet ready. "
"Guide the user to complete the setup when relevant.",
]
for entry in entries:
skill = entry.skill
missing = missing_map.get(skill.name, {})
missing_parts = []
for key, values in missing.items():
missing_parts.append(f"{key}: {', '.join(values)}")
missing_str = "; ".join(missing_parts) if missing_parts else "unknown"
setup_hint = _extract_setup_hint(skill)
lines.append(" <skill>")
lines.append(f" <name>{_escape_xml(skill.name)}</name>")
lines.append(f" <description>{_escape_xml(skill.description)}</description>")
lines.append(f" <missing>{_escape_xml(missing_str)}</missing>")
if setup_hint:
lines.append(f" <setup>{_escape_xml(setup_hint)}</setup>")
lines.append(" </skill>")
lines.append("</unavailable_skills>")
return "\n".join(lines)
def _extract_setup_hint(skill: Skill) -> str:
"""
Extract the Setup section from SKILL.md content as a brief hint.
Returns the first few lines of the ## Setup section.
"""
content = skill.content
if not content:
return ""
import re
match = re.search(r'^##\s+Setup\s*\n(.*?)(?=\n##\s|\Z)', content, re.MULTILINE | re.DOTALL)
if not match:
return ""
setup_text = match.group(1).strip()
lines = setup_text.split('\n')
hint_lines = [l.strip() for l in lines[:6] if l.strip()]
return ' '.join(hint_lines)[:300]
def _escape_xml(text: str) -> str:
"""Escape XML special characters."""
return (text

View File

@@ -87,8 +87,8 @@ def parse_metadata(frontmatter: Dict[str, Any]) -> Optional[SkillMetadata]:
if not isinstance(metadata_raw, dict):
return None
# Use metadata_raw directly (COW format)
meta_obj = metadata_raw
# Unwrap nested namespace (e.g. {"openclaw": {...}} or {"cowagent": {...}})
meta_obj = _unwrap_metadata_namespace(metadata_raw)
# Parse install specs
install_specs = []
@@ -128,6 +128,7 @@ def parse_metadata(frontmatter: Dict[str, Any]) -> Optional[SkillMetadata]:
return SkillMetadata(
always=meta_obj.get('always', False),
default_enabled=meta_obj.get('default_enabled', True),
skill_key=meta_obj.get('skillKey'),
primary_env=meta_obj.get('primaryEnv'),
emoji=meta_obj.get('emoji'),
@@ -138,6 +139,25 @@ def parse_metadata(frontmatter: Dict[str, Any]) -> Optional[SkillMetadata]:
)
_KNOWN_METADATA_NAMESPACES = {"cowagent", "openclaw"}
def _unwrap_metadata_namespace(metadata_raw: Dict[str, Any]) -> Dict[str, Any]:
"""
Unwrap a single-key namespace wrapper like {"cowagent": {...} or {"openclaw": {...}}}.
If the top-level dict has exactly one key matching a known namespace, return the inner dict.
Otherwise return the original dict unchanged.
"""
keys = set(metadata_raw.keys())
ns_keys = keys & _KNOWN_METADATA_NAMESPACES
if len(ns_keys) == 1 and len(keys) == 1:
ns = ns_keys.pop()
inner = metadata_raw[ns]
if isinstance(inner, dict):
return inner
return metadata_raw
def _normalize_string_list(value: Any) -> List[str]:
"""Normalize a value to a list of strings."""
if not value:

View File

@@ -184,7 +184,6 @@ class SkillLoader:
config_path = os.path.join(skill_dir, "config.json")
# Without config.json, skip this skill entirely (return empty to trigger exclusion)
if not os.path.exists(config_path):
logger.debug(f"[SkillLoader] linkai-agent skipped: no config.json found")
return ""

View File

@@ -84,10 +84,10 @@ class SkillManager:
"""
Merge directory-scanned skills with the persisted config file.
- New skills discovered on disk are added with enabled=True.
- New skills: use metadata.default_enabled as initial enabled state.
- Existing skills: preserve their persisted enabled state.
- Skills that no longer exist on disk are removed.
- Existing entries preserve their enabled state; name/description/source
are refreshed from the latest scan.
- name/description/source are always refreshed from the latest scan.
"""
saved = self._load_skills_config()
merged: Dict[str, dict] = {}
@@ -95,13 +95,18 @@ class SkillManager:
for name, entry in self.skills.items():
skill = entry.skill
prev = saved.get(name, {})
# category priority: persisted config (set by cloud) > default "skill"
category = prev.get("category", "skill")
if name in saved:
enabled = prev.get("enabled", True)
else:
enabled = entry.metadata.default_enabled if entry.metadata else True
merged[name] = {
"name": name,
"description": skill.description,
"source": skill.source,
"enabled": prev.get("enabled", True),
"source": prev.get("source") or skill.source,
"enabled": enabled,
"category": category,
}
@@ -157,69 +162,114 @@ class SkillManager:
"""
return list(self.skills.values())
@staticmethod
def _normalize_skill_filter(skill_filter: Optional[List[str]]) -> Optional[List[str]]:
"""Normalize a skill_filter list into a flat list of stripped names."""
if skill_filter is None:
return None
normalized = []
for item in skill_filter:
if isinstance(item, str):
name = item.strip()
if name:
normalized.append(name)
elif isinstance(item, list):
for subitem in item:
if isinstance(subitem, str):
name = subitem.strip()
if name:
normalized.append(name)
return normalized or None
def filter_skills(
self,
skill_filter: Optional[List[str]] = None,
include_disabled: bool = False,
) -> List[SkillEntry]:
"""
Filter skills based on criteria.
Simple rule: Skills are auto-enabled if requirements are met.
- Has required API keys -> included
- Missing API keys -> excluded
Filter skills that are eligible (enabled + requirements met).
:param skill_filter: List of skill names to include (None = all)
:param include_disabled: Whether to include disabled skills
:return: Filtered list of skill entries
:return: Filtered list of eligible skill entries
"""
from agent.skills.config import should_include_skill
entries = list(self.skills.values())
# Check requirements (platform, binaries, env vars)
entries = [e for e in entries if should_include_skill(e, self.config)]
# Apply skill filter
if skill_filter is not None:
normalized = []
for item in skill_filter:
if isinstance(item, str):
name = item.strip()
if name:
normalized.append(name)
elif isinstance(item, list):
for subitem in item:
if isinstance(subitem, str):
name = subitem.strip()
if name:
normalized.append(name)
if normalized:
entries = [e for e in entries if e.skill.name in normalized]
normalized = self._normalize_skill_filter(skill_filter)
if normalized is not None:
entries = [e for e in entries if e.skill.name in normalized]
# Filter out disabled skills based on skills_config.json
if not include_disabled:
entries = [e for e in entries if self.is_skill_enabled(e.skill.name)]
return entries
def filter_unavailable_skills(
self,
skill_filter: Optional[List[str]] = None,
) -> tuple:
"""
Find skills that are enabled but have unmet requirements.
:param skill_filter: Optional list of skill names to include
:return: Tuple of (entries, missing_map) where missing_map maps
skill name to its missing requirements dict
"""
from agent.skills.config import should_include_skill, get_missing_requirements
entries = list(self.skills.values())
# Only enabled skills
entries = [e for e in entries if self.is_skill_enabled(e.skill.name)]
normalized = self._normalize_skill_filter(skill_filter)
if normalized is not None:
entries = [e for e in entries if e.skill.name in normalized]
# Keep only those that fail should_include_skill (requirements not met)
unavailable = []
missing_map: Dict[str, dict] = {}
for e in entries:
if not should_include_skill(e, self.config):
missing = get_missing_requirements(e)
if missing:
unavailable.append(e)
missing_map[e.skill.name] = missing
return unavailable, missing_map
def build_skills_prompt(
self,
skill_filter: Optional[List[str]] = None,
) -> str:
"""
Build a formatted prompt containing available skills.
Build a formatted prompt containing available skills
and brief hints for unavailable ones.
:param skill_filter: Optional list of skill names to include
:return: Formatted skills prompt
"""
from common.log import logger
entries = self.filter_skills(skill_filter=skill_filter, include_disabled=False)
logger.debug(f"[SkillManager] Filtered {len(entries)} skills for prompt (total: {len(self.skills)})")
if entries:
skill_names = [e.skill.name for e in entries]
logger.debug(f"[SkillManager] Skills to include: {skill_names}")
result = format_skill_entries_for_prompt(entries)
from agent.skills.formatter import format_unavailable_skills_for_prompt
eligible = self.filter_skills(skill_filter=skill_filter, include_disabled=False)
logger.debug(f"[SkillManager] Eligible: {len(eligible)} skills (total: {len(self.skills)})")
if eligible:
skill_names = [e.skill.name for e in eligible]
logger.debug(f"[SkillManager] Eligible skills: {skill_names}")
result = format_skill_entries_for_prompt(eligible)
unavailable, missing_map = self.filter_unavailable_skills(skill_filter=skill_filter)
if unavailable:
unavailable_names = [e.skill.name for e in unavailable]
logger.debug(f"[SkillManager] Unavailable skills (setup needed): {unavailable_names}")
result += format_unavailable_skills_for_prompt(unavailable, missing_map)
logger.debug(f"[SkillManager] Generated prompt length: {len(result)}")
return result

View File

@@ -29,6 +29,7 @@ class SkillInstallSpec:
class SkillMetadata:
"""Metadata for a skill from frontmatter."""
always: bool = False # Always include this skill
default_enabled: bool = True # Initial enabled state when first discovered
skill_key: Optional[str] = None # Override skill key
primary_env: Optional[str] = None # Primary environment variable
emoji: Optional[str] = None

View File

@@ -270,7 +270,7 @@
<div class="max-w-3xl mx-auto">
<!-- Attachment preview bar -->
<div id="attachment-preview" class="attachment-preview hidden"></div>
<div class="flex items-center gap-2">
<div class="flex items-center gap-2 relative">
<div class="flex items-center flex-shrink-0">
<button id="new-chat-btn" class="w-9 h-10 flex items-center justify-center rounded-lg
text-slate-400 hover:text-primary-500 hover:bg-primary-50 dark:hover:bg-primary-900/20
@@ -287,6 +287,7 @@
</div>
<input type="file" id="file-input" class="hidden" multiple
accept="image/*,.pdf,.doc,.docx,.xls,.xlsx,.ppt,.pptx,.txt,.csv,.json,.xml,.zip,.rar,.7z,.py,.js,.ts,.java,.c,.cpp,.go,.rs,.md">
<div id="slash-menu" class="slash-menu hidden"></div>
<textarea id="chat-input"
class="flex-1 min-w-0 px-4 py-[10px] rounded-xl border border-slate-200 dark:border-slate-600
bg-slate-50 dark:bg-white/5 text-slate-800 dark:text-slate-100
@@ -295,7 +296,7 @@
text-sm leading-relaxed"
rows="1"
data-i18n-placeholder="input_placeholder"
placeholder="Type a message..."></textarea>
placeholder="Type a message, or press / for commands"></textarea>
<button id="send-btn"
class="flex-shrink-0 w-10 h-10 flex items-center justify-center rounded-lg
bg-primary-400 text-white hover:bg-primary-500

View File

@@ -79,6 +79,11 @@
.msg-content img { max-width: 100%; height: auto; border-radius: 8px; margin: 0.5em 0; }
.msg-content a { color: #35A85B; text-decoration: underline; }
.msg-content a:hover { color: #228547; }
/* Overrides for user bubble (white text on green bg) */
.user-bubble.msg-content a { color: #ffffff !important; text-decoration: underline; text-decoration-color: rgba(255,255,255,0.6); }
.user-bubble.msg-content a:hover { color: #e0f5e8 !important; text-decoration-color: #e0f5e8; }
.user-bubble.msg-content :not(pre) > code { background: rgba(255,255,255,0.2); color: #ffffff; }
.msg-content hr { border: none; height: 1px; background: #e2e8f0; margin: 1.2em 0; }
.dark .msg-content hr { background: rgba(255,255,255,0.1); }
@@ -446,3 +451,87 @@
transform: translateY(-2px);
box-shadow: 0 8px 25px -5px rgba(0, 0, 0, 0.1);
}
/* Slash Command Menu */
.slash-menu {
position: absolute;
bottom: calc(100% + 6px);
left: 0;
right: 0;
max-height: 320px;
overflow-y: auto;
background: #fff;
border: 1px solid #e2e8f0;
border-radius: 12px;
box-shadow: 0 8px 30px -6px rgba(0, 0, 0, 0.1), 0 2px 8px -2px rgba(0, 0, 0, 0.04);
z-index: 50;
padding: 4px;
animation: slashMenuIn 0.15s ease-out;
}
.slash-menu.hidden { display: none; }
@keyframes slashMenuIn {
from { opacity: 0; transform: translateY(6px); }
to { opacity: 1; transform: translateY(0); }
}
.slash-menu-header {
padding: 6px 10px 4px;
font-size: 11px;
font-weight: 600;
color: #94a3b8;
text-transform: uppercase;
letter-spacing: 0.05em;
}
.slash-menu-item {
display: flex;
align-items: center;
justify-content: space-between;
padding: 8px 10px;
border-radius: 8px;
cursor: pointer;
transition: background 0.12s ease;
}
.slash-menu-item:hover,
.slash-menu-item.active {
background: #EDFDF3;
}
.slash-menu-item .cmd {
font-size: 13px;
font-weight: 500;
color: #334155;
font-family: ui-monospace, SFMono-Regular, 'SF Mono', Menlo, monospace;
}
.slash-menu-item.active .cmd {
color: #228547;
}
.slash-menu-item .desc {
font-size: 12px;
color: #94a3b8;
margin-left: 12px;
white-space: nowrap;
}
/* Dark mode */
.dark .slash-menu {
background: #1A1A1A;
border-color: rgba(255, 255, 255, 0.1);
box-shadow: 0 8px 30px -6px rgba(0, 0, 0, 0.35), 0 2px 8px -2px rgba(0, 0, 0, 0.15);
}
.dark .slash-menu-header {
color: #64748b;
}
.dark .slash-menu-item:hover,
.dark .slash-menu-item.active {
background: rgba(74, 190, 110, 0.1);
}
.dark .slash-menu-item .cmd {
color: #e2e8f0;
}
.dark .slash-menu-item.active .cmd {
color: #4ABE6E;
}
.dark .slash-menu-item .desc {
color: #64748b;
}

View File

@@ -3,9 +3,9 @@
===================================================================== */
// =====================================================================
// Version — update this before each release
// Version — fetched from backend (single source: /VERSION file)
// =====================================================================
const APP_VERSION = 'v2.0.4';
let APP_VERSION = '';
// =====================================================================
// i18n
@@ -21,7 +21,7 @@ const I18N = {
example_sys_title: '系统管理', example_sys_text: '帮我查看工作空间里有哪些文件',
example_task_title: '技能系统', example_task_text: '查看所有支持的工具和技能',
example_code_title: '编程助手', example_code_text: '帮我编写一个Python爬虫脚本',
input_placeholder: '输入消息...',
input_placeholder: '输入消息,或输入 / 使用指令',
config_title: '配置管理', config_desc: '管理模型和 Agent 配置',
config_model: '模型配置', config_agent: 'Agent 配置',
config_channel: '通道配置',
@@ -72,7 +72,7 @@ const I18N = {
example_sys_title: 'System', example_sys_text: 'Show me the files in the workspace',
example_task_title: 'Skills', example_task_text: 'Show current tools and skills',
example_code_title: 'Coding', example_code_text: 'Write a Python web scraper script',
input_placeholder: 'Type a message...',
input_placeholder: 'Type a message, or press / for commands',
config_title: 'Configuration', config_desc: 'Manage model and agent settings',
config_model: 'Model Configuration', config_agent: 'Agent Configuration',
config_channel: 'Channel Configuration',
@@ -322,6 +322,11 @@ const attachmentPreview = document.getElementById('attachment-preview');
let pendingAttachments = [];
let uploadingCount = 0;
// Input history (like terminal arrow-key recall)
const inputHistory = [];
let historyIdx = -1;
let historySavedDraft = '';
function updateSendBtnState() {
sendBtn.disabled = uploadingCount > 0 || (!chatInput.value.trim() && pendingAttachments.length === 0);
}
@@ -435,6 +440,99 @@ chatInput.addEventListener('paste', (e) => {
chatInput.addEventListener('compositionstart', () => { isComposing = true; });
chatInput.addEventListener('compositionend', () => { setTimeout(() => { isComposing = false; }, 100); });
// ── Slash Command Menu ───────────────────────────────────────
const SLASH_COMMANDS = [
{ cmd: '/help', desc: '显示命令帮助' },
{ cmd: '/status', desc: '查看运行状态' },
{ cmd: '/context', desc: '查看对话上下文' },
{ cmd: '/context clear', desc: '清除对话上下文' },
{ cmd: '/skill list', desc: '查看已安装技能' },
{ cmd: '/skill list --remote', desc: '浏览技能广场' },
{ cmd: '/skill search ', desc: '搜索技能' },
{ cmd: '/skill install ', desc: '安装技能 (名称或 GitHub URL)' },
{ cmd: '/skill uninstall ', desc: '卸载技能' },
{ cmd: '/skill info ', desc: '查看技能详情' },
{ cmd: '/skill enable ', desc: '启用技能' },
{ cmd: '/skill disable ', desc: '禁用技能' },
{ cmd: '/config', desc: '查看当前配置' },
{ cmd: '/logs', desc: '查看最近日志' },
{ cmd: '/version', desc: '查看版本' },
];
const slashMenu = document.getElementById('slash-menu');
let slashActiveIdx = 0;
let slashFiltered = [];
let slashJustSelected = false;
let slashLastFilter = '';
function showSlashMenu(filter) {
const q = filter.toLowerCase();
if (q === slashLastFilter && !slashMenu.classList.contains('hidden')) return;
slashLastFilter = q;
const newFiltered = SLASH_COMMANDS.filter(c => c.cmd.toLowerCase().startsWith(q));
if (newFiltered.length === 0) {
hideSlashMenu();
return;
}
const changed = newFiltered.length !== slashFiltered.length ||
newFiltered.some((c, i) => c.cmd !== slashFiltered[i]?.cmd);
slashFiltered = newFiltered;
if (changed) slashActiveIdx = 0;
slashActiveIdx = Math.min(slashActiveIdx, slashFiltered.length - 1);
renderSlashItems();
slashMenu.classList.remove('hidden');
}
function hideSlashMenu() {
slashMenu.classList.add('hidden');
slashMenu.innerHTML = '';
slashFiltered = [];
slashActiveIdx = -1;
slashLastFilter = '';
}
function isSlashMenuVisible() {
return !slashMenu.classList.contains('hidden') && slashFiltered.length > 0;
}
function renderSlashItems() {
slashMenu.innerHTML =
'<div class="slash-menu-header">Commands</div>' +
slashFiltered.map((c, i) =>
`<div class="slash-menu-item${i === slashActiveIdx ? ' active' : ''}" data-idx="${i}">` +
`<span class="cmd">${escapeHtml(c.cmd)}</span>` +
`<span class="desc">${escapeHtml(c.desc)}</span></div>`
).join('');
slashMenu.querySelectorAll('.slash-menu-item').forEach(el => {
el.addEventListener('mouseenter', () => {
slashActiveIdx = parseInt(el.dataset.idx);
renderSlashItems();
});
el.addEventListener('mousedown', (e) => {
e.preventDefault();
selectSlashCommand(parseInt(el.dataset.idx));
});
});
const activeEl = slashMenu.querySelector('.slash-menu-item.active');
if (activeEl) activeEl.scrollIntoView({ block: 'nearest' });
}
function selectSlashCommand(idx) {
if (idx < 0 || idx >= slashFiltered.length) return;
const chosen = slashFiltered[idx].cmd;
slashJustSelected = true;
chatInput.value = chosen;
chatInput.dispatchEvent(new Event('input'));
hideSlashMenu();
chatInput.focus();
chatInput.selectionStart = chatInput.selectionEnd = chosen.length;
}
chatInput.addEventListener('input', function() {
this.style.height = '42px';
const scrollH = this.scrollHeight;
@@ -442,11 +540,90 @@ chatInput.addEventListener('input', function() {
this.style.height = newH + 'px';
this.style.overflowY = scrollH > 180 ? 'auto' : 'hidden';
updateSendBtnState();
const val = this.value;
if (slashJustSelected) {
slashJustSelected = false;
} else if (val.startsWith('/')) {
showSlashMenu(val);
} else {
hideSlashMenu();
}
});
chatInput.addEventListener('keydown', function(e) {
// keyCode 229 indicates an IME is processing the keystroke (reliable across browsers)
if (e.keyCode === 229 || e.isComposing || isComposing) return;
if (isSlashMenuVisible()) {
if (e.key === 'ArrowDown') {
e.preventDefault();
slashActiveIdx = Math.min(slashActiveIdx + 1, slashFiltered.length - 1);
renderSlashItems();
return;
}
if (e.key === 'ArrowUp') {
e.preventDefault();
slashActiveIdx = Math.max(slashActiveIdx - 1, 0);
renderSlashItems();
return;
}
if (e.key === 'Enter' && !e.shiftKey && !e.ctrlKey) {
e.preventDefault();
selectSlashCommand(slashActiveIdx);
return;
}
if (e.key === 'Escape') {
e.preventDefault();
hideSlashMenu();
return;
}
if (e.key === 'Tab') {
e.preventDefault();
selectSlashCommand(slashActiveIdx);
return;
}
}
// Arrow-key history recall (only when input is empty or already browsing history)
if (e.key === 'ArrowUp' && inputHistory.length > 0 && !isSlashMenuVisible()) {
const curVal = this.value.trim();
const isSingleLine = !this.value.includes('\n');
if (isSingleLine && (curVal === '' || historyIdx >= 0)) {
e.preventDefault();
if (historyIdx < 0) {
historySavedDraft = this.value;
historyIdx = inputHistory.length - 1;
} else if (historyIdx > 0) {
historyIdx--;
}
this.value = inputHistory[historyIdx];
slashJustSelected = true;
this.dispatchEvent(new Event('input'));
hideSlashMenu();
this.selectionStart = this.selectionEnd = this.value.length;
return;
}
}
if (e.key === 'ArrowDown' && historyIdx >= 0 && !isSlashMenuVisible()) {
const isSingleLine = !this.value.includes('\n');
if (isSingleLine) {
e.preventDefault();
if (historyIdx < inputHistory.length - 1) {
historyIdx++;
this.value = inputHistory[historyIdx];
} else {
historyIdx = -1;
this.value = historySavedDraft;
historySavedDraft = '';
}
slashJustSelected = true;
this.dispatchEvent(new Event('input'));
hideSlashMenu();
this.selectionStart = this.selectionEnd = this.value.length;
return;
}
}
if ((e.ctrlKey || e.shiftKey) && e.key === 'Enter') {
const start = this.selectionStart;
const end = this.selectionEnd;
@@ -460,6 +637,10 @@ chatInput.addEventListener('keydown', function(e) {
}
});
chatInput.addEventListener('blur', () => {
setTimeout(hideSlashMenu, 150);
});
document.querySelectorAll('.example-card').forEach(card => {
card.addEventListener('click', () => {
const textEl = card.querySelector('[data-i18n*="text"]');
@@ -475,6 +656,12 @@ function sendMessage() {
const text = chatInput.value.trim();
if (!text && pendingAttachments.length === 0) return;
if (text) {
inputHistory.push(text);
historyIdx = -1;
historySavedDraft = '';
}
const ws = document.getElementById('welcome-screen');
if (ws) ws.remove();
@@ -732,7 +919,7 @@ function createUserMessageEl(content, timestamp, attachments) {
const textHtml = content ? renderMarkdown(content) : '';
el.innerHTML = `
<div class="max-w-[75%] sm:max-w-[60%]">
<div class="bg-primary-400 text-white rounded-2xl px-4 py-2.5 text-sm leading-relaxed msg-content">
<div class="bg-primary-400 text-white rounded-2xl px-4 py-2.5 text-sm leading-relaxed msg-content user-bubble">
${attachHtml}${textHtml}
</div>
<div class="text-xs text-slate-400 dark:text-slate-500 mt-1.5 text-right">${formatTime(timestamp)}</div>
@@ -2236,7 +2423,12 @@ navigateTo = function(viewId) {
// =====================================================================
applyTheme();
applyI18n();
document.getElementById('sidebar-version').textContent = `CowAgent ${APP_VERSION}`;
fetch('/api/version').then(r => r.json()).then(data => {
APP_VERSION = `v${data.version}`;
document.getElementById('sidebar-version').textContent = `CowAgent ${APP_VERSION}`;
}).catch(() => {
document.getElementById('sidebar-version').textContent = 'CowAgent';
});
chatInput.focus();
// Re-enable color transition AFTER first paint so the theme applied in <head>

View File

@@ -390,6 +390,7 @@ class WebChannel(ChatChannel):
'/api/scheduler', 'SchedulerHandler',
'/api/history', 'HistoryHandler',
'/api/logs', 'LogsHandler',
'/api/version', 'VersionHandler',
'/assets/(.*)', 'AssetsHandler',
)
app = web.application(urls, globals(), autoreload=False)
@@ -493,8 +494,8 @@ class ChatHandler:
class ConfigHandler:
_RECOMMENDED_MODELS = [
const.MINIMAX_M2_5, const.MINIMAX_M2_1, const.MINIMAX_M2_1_LIGHTNING,
const.GLM_5, const.GLM_4_7,
const.MINIMAX_M2_7, const.MINIMAX_M2_5, const.MINIMAX_M2_1, const.MINIMAX_M2_1_LIGHTNING,
const.GLM_5_TURBO, const.GLM_5, const.GLM_4_7,
const.QWEN3_MAX, const.QWEN35_PLUS,
const.KIMI_K2_5, const.KIMI_K2,
const.DOUBAO_SEED_2_PRO, const.DOUBAO_SEED_2_CODE,
@@ -510,14 +511,14 @@ class ConfigHandler:
"api_key_field": "minimax_api_key",
"api_base_key": None,
"api_base_default": None,
"models": [const.MINIMAX_M2_5, const.MINIMAX_M2_1, const.MINIMAX_M2_1_LIGHTNING],
"models": [const.MINIMAX_M2_7, const.MINIMAX_M2_5, const.MINIMAX_M2_1, const.MINIMAX_M2_1_LIGHTNING],
}),
("zhipu", {
"label": "智谱AI",
"api_key_field": "zhipu_ai_api_key",
"api_base_key": "zhipu_ai_api_base",
"api_base_default": "https://open.bigmodel.cn/api/paas/v4",
"models": [const.GLM_5, const.GLM_4_7],
"models": [const.GLM_5_TURBO, const.GLM_5, const.GLM_4_7],
}),
("dashscope", {
"label": "通义千问",
@@ -1429,3 +1430,10 @@ class AssetsHandler:
except Exception as e:
logger.error(f"Error serving static file: {e}", exc_info=True) # 添加更详细的错误信息
raise web.notfound()
class VersionHandler:
def GET(self):
web.header('Content-Type', 'application/json; charset=utf-8')
from cli import __version__
return json.dumps({"version": __version__})

1
cli/VERSION Normal file
View File

@@ -0,0 +1 @@
2.0.4

13
cli/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""CowAgent CLI - Manage your CowAgent from the command line."""
import os as _os
def _read_version():
version_file = _os.path.join(_os.path.dirname(_os.path.abspath(__file__)), "VERSION")
try:
with open(version_file, "r") as f:
return f.read().strip()
except FileNotFoundError:
return "0.0.0"
__version__ = _read_version()

4
cli/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
"""Allow running as: python -m cli"""
from cli.cli import main
main()

73
cli/cli.py Normal file
View File

@@ -0,0 +1,73 @@
"""CowAgent CLI entry point."""
import click
from cli import __version__
from cli.commands.skill import skill
from cli.commands.process import start, stop, restart, update, status, logs
from cli.commands.context import context
HELP_TEXT = """Usage: cow COMMAND [ARGS]...
CowAgent CLI - Manage your CowAgent instance.
Commands:
help Show this message.
version Show the version.
start Start CowAgent.
stop Stop CowAgent.
restart Restart CowAgent.
update Update CowAgent and restart.
status Show CowAgent running status.
logs View CowAgent logs.
skill Manage CowAgent skills.
Tip: You can also send /help, /skill list, etc. in agent chat."""
class CowCLI(click.Group):
def format_help(self, ctx, formatter):
formatter.write(HELP_TEXT.strip())
formatter.write("\n")
def parse_args(self, ctx, args):
if args and args[0] == 'help':
click.echo(HELP_TEXT.strip())
ctx.exit(0)
return super().parse_args(ctx, args)
@click.group(cls=CowCLI, invoke_without_command=True, context_settings=dict(help_option_names=[]))
@click.pass_context
def main(ctx):
"""CowAgent CLI - Manage your CowAgent instance."""
if ctx.invoked_subcommand is None:
click.echo(HELP_TEXT.strip())
@main.command()
def version():
"""Show the version."""
click.echo(f"cow {__version__}")
@main.command(name='help')
@click.pass_context
def help_cmd(ctx):
"""Show this message."""
click.echo(HELP_TEXT.strip())
main.add_command(skill)
main.add_command(start)
main.add_command(stop)
main.add_command(restart)
main.add_command(update)
main.add_command(status)
main.add_command(logs)
main.add_command(context)
if __name__ == '__main__':
main()

0
cli/commands/__init__.py Normal file
View File

29
cli/commands/context.py Normal file
View File

@@ -0,0 +1,29 @@
"""cow context - Context management commands."""
import click
CHAT_HINT = (
"Context commands operate on the running agent's memory.\n"
"Please send the command in a chat conversation instead:\n\n"
" /context - View current context info\n"
" /context clear - Clear conversation context"
)
@click.group(invoke_without_command=True)
@click.pass_context
def context(ctx):
"""View or manage conversation context.
Context commands need access to the running agent's memory.
Use them in chat conversations: /context or /context clear
"""
if ctx.invoked_subcommand is None:
click.echo(f"\n {CHAT_HINT}\n")
@context.command()
def clear():
"""Clear conversation context (messages history)."""
click.echo(f"\n {CHAT_HINT}\n")

281
cli/commands/process.py Normal file
View File

@@ -0,0 +1,281 @@
"""cow start/stop/restart/status/logs - Process management commands."""
import os
import sys
import subprocess
import time
from typing import Optional
import click
from cli.utils import get_project_root
_IS_WIN = sys.platform == "win32"
def _get_pid_file():
return os.path.join(get_project_root(), ".cow.pid")
def _get_log_file():
return os.path.join(get_project_root(), "nohup.out")
def _is_pid_alive(pid: int) -> bool:
"""Check whether a process is still running (cross-platform)."""
if _IS_WIN:
try:
out = subprocess.check_output(
["tasklist", "/FI", f"PID eq {pid}", "/NH"],
stderr=subprocess.DEVNULL,
)
return str(pid) in out.decode(errors="ignore")
except Exception:
return False
else:
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
def _kill_pid(pid: int, force: bool = False):
"""Terminate a process by PID (cross-platform)."""
if _IS_WIN:
flag = "/F" if force else ""
cmd = ["taskkill"]
if force:
cmd.append("/F")
cmd.extend(["/PID", str(pid)])
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
import signal
sig = signal.SIGKILL if force else signal.SIGTERM
os.kill(pid, sig)
def _read_pid() -> Optional[int]:
pid_file = _get_pid_file()
if not os.path.exists(pid_file):
return None
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
if _is_pid_alive(pid):
return pid
os.remove(pid_file)
return None
except (ValueError, OSError):
try:
os.remove(pid_file)
except OSError:
pass
return None
def _write_pid(pid: int):
with open(_get_pid_file(), "w") as f:
f.write(str(pid))
def _remove_pid():
pid_file = _get_pid_file()
if os.path.exists(pid_file):
os.remove(pid_file)
@click.command()
@click.option("--foreground", "-f", is_flag=True, help="Run in foreground (don't daemonize)")
@click.option("--no-logs", is_flag=True, help="Don't tail logs after starting")
def start(foreground, no_logs):
"""Start CowAgent."""
pid = _read_pid()
if pid:
click.echo(f"CowAgent is already running (PID: {pid}).")
return
root = get_project_root()
app_py = os.path.join(root, "app.py")
if not os.path.exists(app_py):
click.echo("Error: app.py not found in project root.", err=True)
sys.exit(1)
python = sys.executable
if foreground:
click.echo("Starting CowAgent in foreground...")
if _IS_WIN:
sys.exit(subprocess.call([python, app_py], cwd=root))
else:
os.execv(python, [python, app_py])
else:
log_file = _get_log_file()
click.echo("Starting CowAgent...")
popen_kwargs = dict(cwd=root)
if _IS_WIN:
CREATE_NO_WINDOW = 0x08000000
popen_kwargs["creationflags"] = (
subprocess.CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW
)
else:
popen_kwargs["start_new_session"] = True
with open(log_file, "a") as log:
proc = subprocess.Popen(
[python, app_py],
stdout=log,
stderr=log,
**popen_kwargs,
)
_write_pid(proc.pid)
click.echo(click.style(f"✓ CowAgent started (PID: {proc.pid})", fg="green"))
click.echo(f" Logs: {log_file}")
if not no_logs:
click.echo(" Press Ctrl+C to stop tailing logs.\n")
_tail_log(log_file)
@click.command()
def stop():
"""Stop CowAgent."""
pid = _read_pid()
if not pid:
click.echo("CowAgent is not running.")
return
click.echo(f"Stopping CowAgent (PID: {pid})...")
try:
_kill_pid(pid)
for _ in range(30):
time.sleep(0.1)
if not _is_pid_alive(pid):
break
else:
_kill_pid(pid, force=True)
except (ProcessLookupError, OSError):
pass
_remove_pid()
click.echo(click.style("✓ CowAgent stopped.", fg="green"))
@click.command()
@click.option("--no-logs", is_flag=True, help="Don't tail logs after restarting")
@click.pass_context
def restart(ctx, no_logs):
"""Restart CowAgent."""
ctx.invoke(stop)
time.sleep(1)
ctx.invoke(start, no_logs=no_logs)
@click.command()
@click.pass_context
def update(ctx):
"""Update CowAgent and restart."""
root = get_project_root()
# 1. Git pull while service is still running
if os.path.isdir(os.path.join(root, ".git")):
click.echo("Pulling latest code...")
ret = subprocess.call(["git", "pull"], cwd=root)
if ret != 0:
click.echo("Error: git pull failed.", err=True)
sys.exit(1)
else:
click.echo("Not a git repository, skipping code update.")
# 2. Stop service
ctx.invoke(stop)
# 3. Install dependencies
python = sys.executable
req_file = os.path.join(root, "requirements.txt")
if os.path.exists(req_file):
click.echo("Installing dependencies...")
subprocess.call(
[python, "-m", "pip", "install", "-r", "requirements.txt", "-q"],
cwd=root,
)
click.echo("Reinstalling cow CLI...")
subprocess.call(
[python, "-m", "pip", "install", "-e", ".", "-q"],
cwd=root,
)
# 4. Start service
click.echo("")
time.sleep(1)
ctx.invoke(start, no_logs=True)
@click.command()
def status():
"""Show CowAgent running status."""
from cli import __version__
from cli.utils import load_config_json
pid = _read_pid()
if pid:
click.echo(click.style(f"● CowAgent is running (PID: {pid})", fg="green"))
else:
click.echo(click.style("● CowAgent is not running", fg="red"))
click.echo(f" 版本: v{__version__}")
cfg = load_config_json()
if cfg:
channel = cfg.get("channel_type", "unknown")
if isinstance(channel, list):
channel = ", ".join(channel)
click.echo(f" 通道: {channel}")
click.echo(f" 模型: {cfg.get('model', 'unknown')}")
mode = "Agent" if cfg.get("agent") else "Chat"
click.echo(f" 模式: {mode}")
@click.command()
@click.option("--follow", "-f", is_flag=True, help="Follow log output")
@click.option("--lines", "-n", default=50, help="Number of lines to show")
def logs(follow, lines):
"""View CowAgent logs."""
log_file = _get_log_file()
if not os.path.exists(log_file):
click.echo("No log file found.")
return
if follow:
_tail_log(log_file, lines)
else:
_print_last_lines(log_file, lines)
def _print_last_lines(file_path: str, n: int = 50):
"""Print the last N lines of a file (cross-platform)."""
try:
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
for line in all_lines[-n:]:
click.echo(line, nl=False)
except Exception as e:
click.echo(f"Error reading log file: {e}", err=True)
def _tail_log(log_file: str, lines: int = 50):
"""Follow log file output. Blocks until Ctrl+C (cross-platform)."""
_print_last_lines(log_file, lines)
try:
with open(log_file, "r", encoding="utf-8", errors="replace") as f:
f.seek(0, 2)
while True:
line = f.readline()
if line:
click.echo(line, nl=False)
else:
time.sleep(0.3)
except KeyboardInterrupt:
pass

799
cli/commands/skill.py Normal file
View File

@@ -0,0 +1,799 @@
"""cow skill - Skill management commands."""
import os
import re
import sys
import json
import hashlib
import shutil
import zipfile
import tempfile
from urllib.parse import urlparse
import click
import requests
from cli.utils import (
get_project_root,
get_skills_dir,
get_builtin_skills_dir,
load_skills_config,
SKILL_HUB_API,
)
_SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$")
_GITHUB_URL_RE = re.compile(
r"^https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(?:tree|blob)/([^/]+)(?:/(.+))?)?/?$"
)
def _parse_github_url(url: str):
"""Parse a full GitHub URL into (owner, repo, branch, subpath).
Returns None if the URL doesn't match.
Supported formats:
https://github.com/owner/repo
https://github.com/owner/repo/tree/branch
https://github.com/owner/repo/tree/branch/path/to/skill
https://github.com/owner/repo/blob/branch/path/to/skill
"""
m = _GITHUB_URL_RE.match(url.strip())
if not m:
return None
owner, repo, branch, subpath = m.groups()
return owner, repo, branch or "main", subpath
def _download_github_dir(owner, repo, branch, subpath, dest_dir):
"""Download a subdirectory from GitHub using the Contents API.
Recursively fetches all files under the given subpath and writes them
to dest_dir. Raises on any network or API error.
"""
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{subpath}?ref={branch}"
resp = requests.get(api_url, timeout=30, headers={"Accept": "application/vnd.github.v3+json"})
resp.raise_for_status()
items = resp.json()
if isinstance(items, dict):
items = [items]
for item in items:
rel_path = item["path"]
if subpath:
rel_path = rel_path[len(subpath.strip("/")):].lstrip("/")
local_path = os.path.join(dest_dir, rel_path)
if item["type"] == "file":
os.makedirs(os.path.dirname(local_path), exist_ok=True)
dl_url = item.get("download_url")
if not dl_url:
continue
file_resp = requests.get(dl_url, timeout=30)
file_resp.raise_for_status()
with open(local_path, "wb") as f:
f.write(file_resp.content)
elif item["type"] == "dir":
os.makedirs(local_path, exist_ok=True)
child_subpath = item["path"]
_download_github_dir(owner, repo, branch, child_subpath, dest_dir)
def _register_installed_skill(name: str, source: str = "cowhub"):
"""Register a newly installed skill into skills_config.json.
source values: builtin, cow, github, clawhub, linkai, local, url
"""
skills_dir = get_skills_dir()
config_path = os.path.join(skills_dir, "skills_config.json")
config = {}
if os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
except Exception:
config = {}
if name in config:
return
skill_dir = os.path.join(skills_dir, name)
description = _read_skill_description(skill_dir) or ""
config[name] = {
"name": name,
"description": description,
"source": source,
"enabled": True,
"category": "skill",
}
try:
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
except Exception:
pass
def _parse_skill_frontmatter(content: str) -> dict:
"""Parse YAML frontmatter from SKILL.md content and return a dict with name/description."""
result = {}
match = re.match(r'^---\s*\n(.*?)\n---\s*\n', content, re.DOTALL)
if not match:
return result
for line in match.group(1).split('\n'):
line = line.strip()
for key in ('name', 'description'):
if line.startswith(f'{key}:'):
val = line[len(key) + 1:].strip()
result[key] = val.strip('"').strip("'")
return result
def _read_skill_description(skill_dir: str) -> str:
"""Read the description from a skill's SKILL.md frontmatter."""
skill_md = os.path.join(skill_dir, "SKILL.md")
if not os.path.exists(skill_md):
return ""
try:
with open(skill_md, "r", encoding="utf-8") as f:
content = f.read()
return _parse_skill_frontmatter(content).get("description", "")
except Exception:
return ""
def _install_url(url: str):
"""Install a skill from a direct SKILL.md URL."""
click.echo(f"Downloading SKILL.md from {url} ...")
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
except Exception as e:
click.echo(f"Error: Failed to download SKILL.md: {e}", err=True)
sys.exit(1)
content = resp.text
fm = _parse_skill_frontmatter(content)
skill_name = fm.get("name")
if not skill_name:
click.echo("Error: SKILL.md missing 'name' field in frontmatter.", err=True)
sys.exit(1)
skill_name = skill_name.strip()
_validate_skill_name(skill_name)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
skill_dir = os.path.join(skills_dir, skill_name)
if os.path.isdir(skill_dir):
click.echo(f"Skill '{skill_name}' already exists. Overwriting SKILL.md ...")
os.makedirs(skill_dir, exist_ok=True)
with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f:
f.write(content)
_register_installed_skill(skill_name, source="url")
_print_install_success(skill_name, "url")
def _print_install_success(name: str, source: str):
"""Print a unified install success message with description and source."""
skills_dir = get_skills_dir()
desc = _read_skill_description(os.path.join(skills_dir, name))
click.echo(click.style(f"{name}", fg="green"))
if desc:
if len(desc) > 60:
desc = desc[:57] + ""
click.echo(f" {desc}")
click.echo(f" 来源: {source}")
def _validate_skill_name(name: str):
"""Reject names that contain path traversal or special characters."""
if not _SAFE_NAME_RE.match(name):
click.echo(
f"Error: Invalid skill name '{name}'. "
"Use only letters, digits, hyphens, and underscores.",
err=True,
)
sys.exit(1)
def _validate_github_spec(spec: str):
"""Reject specs that don't look like owner/repo."""
if not re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+$", spec):
click.echo(f"Error: Invalid GitHub spec '{spec}'. Expected format: owner/repo", err=True)
sys.exit(1)
def _safe_extractall(zf: zipfile.ZipFile, dest: str):
"""Extract zip while guarding against Zip Slip (path traversal)."""
dest = os.path.realpath(dest)
for member in zf.infolist():
target = os.path.realpath(os.path.join(dest, member.filename))
if not target.startswith(dest + os.sep) and target != dest:
raise ValueError(f"Unsafe zip entry detected: {member.filename}")
zf.extractall(dest)
def _verify_checksum(content: bytes, expected: str):
"""Verify SHA-256 checksum of downloaded content.
Returns True if checksum matches or no expected value provided.
Exits with error if mismatch.
"""
if not expected:
return True
actual = hashlib.sha256(content).hexdigest()
if actual != expected.lower():
click.echo(
f"Error: Checksum mismatch!\n"
f" Expected: {expected}\n"
f" Actual: {actual}\n"
f"The downloaded package may have been tampered with.",
err=True,
)
sys.exit(1)
return True
@click.group()
def skill():
"""Manage CowAgent skills."""
pass
# ------------------------------------------------------------------
# cow skill list
# ------------------------------------------------------------------
@skill.command("list")
@click.option("--remote", is_flag=True, help="Browse skills on Skill Hub")
@click.option("--page", default=1, type=int, help="Page number for remote listing")
def skill_list(remote, page):
"""List installed skills or browse Skill Hub."""
if remote:
_list_remote(page=page)
else:
_list_local()
def _list_local():
"""List locally installed skills."""
config = load_skills_config()
skills_dir = get_skills_dir()
builtin_dir = get_builtin_skills_dir()
if not config:
# Fallback: scan directories directly
entries = []
for d in [builtin_dir, skills_dir]:
if not os.path.isdir(d):
continue
source = "builtin" if d == builtin_dir else "custom"
for name in sorted(os.listdir(d)):
skill_path = os.path.join(d, name)
if os.path.isdir(skill_path) and not name.startswith("."):
has_skill_md = os.path.exists(os.path.join(skill_path, "SKILL.md"))
if has_skill_md:
entries.append({"name": name, "source": source, "enabled": True, "description": ""})
if not entries:
click.echo("No skills installed.")
return
_print_skill_table(entries)
return
entries = sorted(config.values(), key=lambda x: x.get("name", ""))
if not entries:
click.echo("No skills installed.")
return
_print_skill_table(entries)
def _print_skill_table(entries):
"""Print skills as a formatted table."""
name_w = max(len(e.get("name", "")) for e in entries)
name_w = max(name_w, 4) + 2
desc_w = 40
header = f"{'Name':<{name_w}} {'Status':<10} {'Source':<10} {'Description'}"
click.echo(f"\n Installed skills ({len(entries)})\n")
click.echo(f" {header}")
click.echo(f" {'' * (name_w + 10 + 10 + desc_w)}")
for e in entries:
name = e.get("name", "")
enabled = e.get("enabled", True)
source = e.get("source", "")
desc = e.get("description", "") or ""
if len(desc) > desc_w:
desc = desc[:desc_w - 3] + "..."
status_icon = click.style("✓ on ", fg="green") if enabled else click.style("✗ off", fg="red")
click.echo(f" {name:<{name_w}} {status_icon} {source:<10} {desc}")
click.echo()
_REMOTE_PAGE_SIZE = 10
def _list_remote(page: int = 1):
"""List skills from remote Skill Hub with server-side pagination."""
try:
resp = requests.get(
f"{SKILL_HUB_API}/skills",
params={"page": page, "limit": _REMOTE_PAGE_SIZE},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
click.echo(f"Error: Failed to fetch from Skill Hub: {e}", err=True)
sys.exit(1)
skills = data.get("skills", [])
total = data.get("total", len(skills))
if not skills and page == 1:
click.echo("No skills available on Skill Hub.")
return
total_pages = max(1, (total + _REMOTE_PAGE_SIZE - 1) // _REMOTE_PAGE_SIZE)
page = min(page, total_pages)
installed = set(load_skills_config().keys())
name_w = max((len(s.get("name", "")) for s in skills), default=4)
name_w = max(name_w, 4) + 2
click.echo(f"\n Skill Hub ({total} available) — page {page}/{total_pages}\n")
click.echo(f" {'Name':<{name_w}} {'Status':<12} {'Description'}")
click.echo(f" {'' * (name_w + 12 + 50)}")
for s in skills:
name = s.get("name", "")
desc = s.get("description", "") or s.get("display_name", "")
if len(desc) > 50:
desc = desc[:47] + "..."
status = click.style("installed", fg="green") if name in installed else ""
click.echo(f" {name:<{name_w}} {status:<12} {desc}")
click.echo()
nav_parts = []
if page > 1:
nav_parts.append(f"cow skill list --remote --page {page - 1}")
if page < total_pages:
nav_parts.append(f"cow skill list --remote --page {page + 1}")
if nav_parts:
click.echo(f" Navigate: {' | '.join(nav_parts)}")
click.echo(f" Install: cow skill install <name>\n")
# ------------------------------------------------------------------
# cow skill search
# ------------------------------------------------------------------
@skill.command()
@click.argument("query")
def search(query):
"""Search skills on Skill Hub."""
try:
resp = requests.get(f"{SKILL_HUB_API}/skills/search", params={"q": query}, timeout=10)
resp.raise_for_status()
data = resp.json()
except Exception as e:
click.echo(f"Error: Failed to search Skill Hub: {e}", err=True)
sys.exit(1)
skills = data.get("skills", [])
if not skills:
click.echo(f'No skills found for "{query}".')
return
installed = set(load_skills_config().keys())
name_w = max(len(s.get("name", "")) for s in skills)
name_w = max(name_w, 4) + 2
click.echo(f'\n Search results for "{query}" ({len(skills)} found)\n')
click.echo(f" {'Name':<{name_w}} {'Status':<12} {'Description'}")
click.echo(f" {'' * (name_w + 12 + 50)}")
for s in skills:
name = s.get("name", "")
desc = s.get("description", "") or s.get("display_name", "")
if len(desc) > 50:
desc = desc[:47] + "..."
status = click.style("installed", fg="green") if name in installed else ""
click.echo(f" {name:<{name_w}} {status:<12} {desc}")
click.echo(f"\n Install with: cow skill install <name>\n")
# ------------------------------------------------------------------
# cow skill install
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def install(name):
"""Install a skill from Skill Hub, GitHub, or a SKILL.md URL.
Examples:
cow skill install pptx
cow skill install github:owner/repo
cow skill install github:owner/repo#path/to/skill
cow skill install https://github.com/owner/repo/tree/main/path/to/skill
cow skill install https://example.com/path/to/SKILL.md
"""
if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"):
# GitHub SKILL.md → strip filename and install the whole directory
dir_url = re.sub(r'/SKILL\.md/?$', '', name)
gh = _parse_github_url(dir_url)
if gh:
owner, repo, branch, subpath = gh
spec = f"{owner}/{repo}"
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
_install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
return
_install_url(name)
return
parsed = _parse_github_url(name)
if parsed:
owner, repo, branch, subpath = parsed
spec = f"{owner}/{repo}"
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
_install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
elif name.startswith("github:"):
skill_name = name[7:]
_validate_skill_name(skill_name)
_install_hub(skill_name)
elif name.startswith("clawhub:"):
skill_name = name[8:]
_validate_skill_name(skill_name)
_install_hub(skill_name, provider="clawhub")
else:
_validate_skill_name(name)
_install_hub(name)
def _install_hub(name, provider=None):
"""Install a skill from Skill Hub."""
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
click.echo(f"Fetching skill info for '{name}'...")
try:
body = {}
if provider:
body["provider"] = provider
resp = requests.post(
f"{SKILL_HUB_API}/skills/{name}/download",
json=body,
timeout=15,
)
resp.raise_for_status()
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
click.echo(f"Error: Skill '{name}' not found on Skill Hub.", err=True)
else:
click.echo(f"Error: Failed to fetch skill: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: Failed to connect to Skill Hub: {e}", err=True)
sys.exit(1)
content_type = resp.headers.get("Content-Type", "")
if "application/json" in content_type:
data = resp.json()
source_type = data.get("source_type")
if source_type == "github":
source_url = data.get("source_url", "")
parsed_url = _parse_github_url(source_url)
if parsed_url:
owner, repo, branch, subpath = parsed_url
click.echo(f"Source: GitHub ({source_url})")
_install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch)
else:
_validate_github_spec(source_url)
click.echo(f"Source: GitHub ({source_url})")
_install_github(source_url, skill_name=name)
return
if source_type == "registry":
download_url = data.get("download_url")
if download_url:
parsed = urlparse(download_url)
if parsed.scheme != "https":
click.echo(f"Error: Refusing to download from non-HTTPS URL.", err=True)
sys.exit(1)
provider = data.get("source_provider", "registry")
expected_checksum = data.get("checksum") or data.get("sha256")
click.echo(f"Source: {provider}")
click.echo("Downloading skill package...")
try:
dl_resp = requests.get(download_url, timeout=60, allow_redirects=True)
dl_resp.raise_for_status()
except Exception as e:
click.echo(f"Error: Failed to download from {provider}: {e}", err=True)
sys.exit(1)
_verify_checksum(dl_resp.content, expected_checksum)
_install_zip_bytes(dl_resp.content, name, skills_dir)
_register_installed_skill(name, source=provider)
_print_install_success(name, provider)
else:
click.echo(f"Error: Unsupported registry provider.", err=True)
sys.exit(1)
return
if "redirect" in data:
source_url = data.get("source_url", "")
parsed_url = _parse_github_url(source_url)
if parsed_url:
owner, repo, branch, subpath = parsed_url
click.echo(f"Source: GitHub ({source_url})")
_install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch)
else:
_validate_github_spec(source_url)
click.echo(f"Source: GitHub ({source_url})")
_install_github(source_url, skill_name=name)
return
elif "application/zip" in content_type:
click.echo("Downloading skill package...")
expected_checksum = resp.headers.get("X-Checksum-Sha256")
_verify_checksum(resp.content, expected_checksum)
_install_zip_bytes(resp.content, name, skills_dir)
_register_installed_skill(name)
_print_install_success(name, "cowhub")
return
click.echo(f"Error: Unexpected response from Skill Hub.", err=True)
sys.exit(1)
def _install_github(spec, subpath=None, skill_name=None, branch="main", source="github"):
"""Install a skill from a GitHub repo.
spec format: owner/repo or owner/repo#path
"""
if "#" in spec and not subpath:
spec, subpath = spec.split("#", 1)
_validate_github_spec(spec)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
_validate_skill_name(skill_name)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
target_dir = os.path.join(skills_dir, skill_name)
owner, repo = spec.split("/", 1)
# For subpath installs, try GitHub Contents API first (avoids downloading entire repo)
if subpath:
click.echo(f"Downloading from GitHub: {spec}/{subpath} (branch: {branch})...")
try:
with tempfile.TemporaryDirectory() as tmp_dir:
api_dest = os.path.join(tmp_dir, skill_name)
os.makedirs(api_dest)
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(api_dest, target_dir)
_register_installed_skill(skill_name, source=source)
_print_install_success(skill_name, source)
return
except Exception:
click.echo("Contents API unavailable, falling back to zip download...")
# Fallback: download full repo zip
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
click.echo(f"Downloading from GitHub: {spec} (branch: {branch})...")
try:
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
resp.raise_for_status()
except Exception as e:
click.echo(f"Error: Failed to download from GitHub: {e}", err=True)
sys.exit(1)
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "repo.zip")
with open(zip_path, "wb") as f:
f.write(resp.content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
repo_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
repo_root = os.path.join(extract_dir, top_items[0])
if subpath:
source_dir = os.path.join(repo_root, subpath.strip("/"))
if not os.path.isdir(source_dir):
click.echo(f"Error: Path '{subpath}' not found in repository.", err=True)
sys.exit(1)
else:
source_dir = repo_root
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
_register_installed_skill(skill_name, source=source)
_print_install_success(skill_name, source)
def _install_zip_bytes(content, name, skills_dir):
"""Extract a zip archive into the skills directory."""
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "package.zip")
with open(zip_path, "wb") as f:
f.write(content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
source = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
source = os.path.join(extract_dir, top_items[0])
target = os.path.join(skills_dir, name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(source, target)
# ------------------------------------------------------------------
# cow skill uninstall
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
@click.option("--yes", "-y", is_flag=True, help="Skip confirmation")
def uninstall(name, yes):
"""Uninstall a skill."""
_validate_skill_name(name)
skills_dir = get_skills_dir()
skill_dir = os.path.join(skills_dir, name)
if not os.path.exists(skill_dir):
click.echo(f"Error: Skill '{name}' is not installed.", err=True)
sys.exit(1)
if not yes:
click.confirm(f"Uninstall skill '{name}'?", abort=True)
shutil.rmtree(skill_dir)
config_path = os.path.join(skills_dir, "skills_config.json")
if os.path.exists(config_path):
try:
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
config.pop(name, None)
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
except Exception:
pass
click.echo(click.style(f"✓ Skill '{name}' uninstalled.", fg="green"))
# ------------------------------------------------------------------
# cow skill enable / disable
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def enable(name):
"""Enable a skill."""
_set_enabled(name, True)
@skill.command()
@click.argument("name")
def disable(name):
"""Disable a skill."""
_set_enabled(name, False)
def _set_enabled(name, enabled):
_validate_skill_name(name)
skills_dir = get_skills_dir()
config_path = os.path.join(skills_dir, "skills_config.json")
if not os.path.exists(config_path):
click.echo(f"Error: No skills config found.", err=True)
sys.exit(1)
try:
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
except Exception as e:
click.echo(f"Error: Failed to read skills config: {e}", err=True)
sys.exit(1)
if name not in config:
click.echo(f"Error: Skill '{name}' not found in config.", err=True)
sys.exit(1)
config[name]["enabled"] = enabled
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)
state = "enabled" if enabled else "disabled"
icon = "" if enabled else ""
color = "green" if enabled else "yellow"
click.echo(click.style(f"{icon} Skill '{name}' {state}.", fg=color))
# ------------------------------------------------------------------
# cow skill info
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def info(name):
"""Show details about an installed skill."""
_validate_skill_name(name)
skills_dir = get_skills_dir()
builtin_dir = get_builtin_skills_dir()
skill_dir = None
source = None
config = load_skills_config()
for d, src in [(skills_dir, "custom"), (builtin_dir, "builtin")]:
candidate = os.path.join(d, name)
if os.path.isdir(candidate):
skill_dir = candidate
source = config.get(name, {}).get("source") or src
break
if not skill_dir:
click.echo(f"Error: Skill '{name}' not found.", err=True)
sys.exit(1)
skill_md = os.path.join(skill_dir, "SKILL.md")
if not os.path.exists(skill_md):
click.echo(f"Skill directory: {skill_dir}")
click.echo("No SKILL.md found.")
return
with open(skill_md, "r", encoding="utf-8") as f:
content = f.read()
config = load_skills_config()
entry = config.get(name, {})
enabled = entry.get("enabled", True)
status_str = click.style("✓ enabled", fg="green") if enabled else click.style("✗ disabled", fg="red")
click.echo(f"\n Skill: {name}")
click.echo(f" Source: {source}")
click.echo(f" Status: {status_str}")
click.echo(f" Path: {skill_dir}")
click.echo(f"\n{'' * 60}")
# Show first ~30 lines of SKILL.md as a preview
lines = content.split("\n")
preview = "\n".join(lines[:30])
click.echo(preview)
if len(lines) > 30:
click.echo(f"\n ... ({len(lines) - 30} more lines, see {skill_md})")
click.echo()

62
cli/utils.py Normal file
View File

@@ -0,0 +1,62 @@
"""Shared utilities for cow CLI."""
import os
import sys
import json
def get_project_root() -> str:
"""Get the CowAgent project root directory."""
# cli/ is directly under the project root
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def get_workspace_dir() -> str:
"""Get the agent workspace directory from config, defaulting to ~/cow."""
config = load_config_json()
workspace = config.get("agent_workspace", "~/cow")
return os.path.expanduser(workspace)
def get_skills_dir() -> str:
"""Get the custom skills directory."""
return os.path.join(get_workspace_dir(), "skills")
def get_builtin_skills_dir() -> str:
"""Get the builtin skills directory."""
return os.path.join(get_project_root(), "skills")
def load_config_json() -> dict:
"""Load config.json from project root."""
config_path = os.path.join(get_project_root(), "config.json")
if not os.path.exists(config_path):
return {}
try:
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def load_skills_config() -> dict:
"""Load skills_config.json from the custom skills directory."""
path = os.path.join(get_skills_dir(), "skills_config.json")
if not os.path.exists(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def ensure_sys_path():
"""Add project root to sys.path so we can import agent modules."""
root = get_project_root()
if root not in sys.path:
sys.path.insert(0, root)
SKILL_HUB_API = "https://skills.cowagent.ai/api"

View File

@@ -0,0 +1 @@
from .cow_cli import CowCliPlugin

1000
plugins/cow_cli/cow_cli.py Normal file

File diff suppressed because it is too large Load Diff

19
pyproject.toml Normal file
View File

@@ -0,0 +1,19 @@
[build-system]
requires = ["setuptools>=45.0"]
build-backend = "setuptools.build_meta"
[project]
name = "cowagent"
version = "0.0.1"
description = "CowAgent - AI Agent on WeChat and more"
requires-python = ">=3.9"
dependencies = [
"click>=8.0",
"requests>=2.28.2",
]
[project.scripts]
cow = "cli.cli:main"
[tool.setuptools.packages.find]
include = ["cli*"]

224
run.sh
View File

@@ -171,8 +171,11 @@ clone_project() {
mv chatgpt-on-wechat-master chatgpt-on-wechat
rm chatgpt-on-wechat.zip
else
git clone https://github.com/zhayujie/chatgpt-on-wechat.git || \
git clone https://gitee.com/zhayujie/chatgpt-on-wechat.git
GIT_HTTP_CONNECT_TIMEOUT=10 GIT_HTTP_LOW_SPEED_LIMIT=1024 GIT_HTTP_LOW_SPEED_TIME=15 \
git clone --depth 10 --progress https://github.com/zhayujie/chatgpt-on-wechat.git || {
echo -e "${YELLOW}⚠️ GitHub is slow, switching to Gitee mirror...${NC}"
git clone --depth 10 --progress https://gitee.com/zhayujie/chatgpt-on-wechat.git
}
if [[ $? -ne 0 ]]; then
echo -e "${RED}❌ Project clone failed. Please check network connection.${NC}"
exit 1
@@ -195,7 +198,10 @@ clone_project() {
# Install dependencies
install_dependencies() {
echo -e "${GREEN}📦 Installing dependencies...${NC}"
local PIP_MIRROR="-i https://pypi.tuna.tsinghua.edu.cn/simple"
local PIP_MIRROR=""
if curl -s --connect-timeout 5 https://pypi.tuna.tsinghua.edu.cn/simple/ > /dev/null 2>&1; then
PIP_MIRROR="-i https://pypi.tuna.tsinghua.edu.cn/simple"
fi
PIP_EXTRA_ARGS=""
if $PYTHON_CMD -c "import sys; exit(0 if sys.version_info >= (3, 11) else 1)" 2>/dev/null; then
@@ -242,6 +248,17 @@ install_dependencies() {
fi
rm -f /tmp/pip_install.log
# Register `cow` CLI command via editable install
echo -e "${YELLOW}Registering cow CLI...${NC}"
set +e
$PYTHON_CMD -m pip install -e . $PIP_EXTRA_ARGS $PIP_MIRROR > /dev/null 2>&1
if command -v cow &> /dev/null; then
echo -e "${GREEN}✅ cow CLI registered.${NC}"
else
echo -e "${YELLOW}⚠️ cow CLI not in PATH, you can still use: $PYTHON_CMD -m cli.cli${NC}"
fi
set -e
}
# Select model
@@ -527,23 +544,31 @@ start_project() {
echo -e "${GREEN}${EMOJI_ROCKET} Starting CowAgent...${NC}"
sleep 1
if [ ! -f "${BASE_DIR}/nohup.out" ]; then
touch "${BASE_DIR}/nohup.out"
local USE_COW=false
if command -v cow &> /dev/null; then
USE_COW=true
fi
OS_TYPE=$(uname)
if [[ "$OS_TYPE" == "Linux" ]]; then
# Linux: use setsid to detach from terminal
nohup setsid $PYTHON_CMD "${BASE_DIR}/app.py" > "${BASE_DIR}/nohup.out" 2>&1 &
echo -e "${GREEN}${EMOJI_COW} CowAgent started on Linux (using $PYTHON_CMD)${NC}"
elif [[ "$OS_TYPE" == "Darwin" ]]; then
# macOS: use nohup to prevent SIGHUP
nohup $PYTHON_CMD "${BASE_DIR}/app.py" > "${BASE_DIR}/nohup.out" 2>&1 &
echo -e "${GREEN}${EMOJI_COW} CowAgent started on macOS (using $PYTHON_CMD)${NC}"
if $USE_COW; then
cd "${BASE_DIR}"
cow start --no-logs
else
echo -e "${RED}❌ Unsupported OS: ${OS_TYPE}${NC}"
exit 1
if [ ! -f "${BASE_DIR}/nohup.out" ]; then
touch "${BASE_DIR}/nohup.out"
fi
OS_TYPE=$(uname)
if [[ "$OS_TYPE" == "Linux" ]]; then
nohup setsid $PYTHON_CMD "${BASE_DIR}/app.py" > "${BASE_DIR}/nohup.out" 2>&1 &
echo -e "${GREEN}${EMOJI_COW} CowAgent started on Linux (using $PYTHON_CMD)${NC}"
elif [[ "$OS_TYPE" == "Darwin" ]]; then
nohup $PYTHON_CMD "${BASE_DIR}/app.py" > "${BASE_DIR}/nohup.out" 2>&1 &
echo -e "${GREEN}${EMOJI_COW} CowAgent started on macOS (using $PYTHON_CMD)${NC}"
else
echo -e "${RED}❌ Unsupported OS: ${OS_TYPE}${NC}"
exit 1
fi
fi
sleep 2
@@ -554,14 +579,21 @@ start_project() {
echo -e "${CYAN}$ACCESS_INFO${NC}"
echo ""
echo -e "${CYAN}${BOLD}Management Commands:${NC}"
echo -e " ${GREEN}./run.sh stop${NC} Stop the service"
echo -e " ${GREEN}./run.sh restart${NC} Restart the service"
echo -e " ${GREEN}./run.sh status${NC} Check status"
echo -e " ${GREEN}./run.sh logs${NC} View logs"
if $USE_COW; then
echo -e " ${GREEN}cow stop${NC} Stop the service"
echo -e " ${GREEN}cow restart${NC} Restart the service"
echo -e " ${GREEN}cow status${NC} Check status"
echo -e " ${GREEN}cow logs${NC} View logs"
else
echo -e " ${GREEN}./run.sh stop${NC} Stop the service"
echo -e " ${GREEN}./run.sh restart${NC} Restart the service"
echo -e " ${GREEN}./run.sh status${NC} Check status"
echo -e " ${GREEN}./run.sh logs${NC} View logs"
fi
echo -e " ${GREEN}./run.sh update${NC} Update and restart"
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo ""
echo -e "${YELLOW}Showing recent logs (Ctrl+C to exit, agent keeps running):${NC}"
sleep 2
tail -n 30 -f "${BASE_DIR}/nohup.out"
@@ -603,7 +635,7 @@ ensure_python_cmd() {
# Get service PID (empty string if not running)
get_pid() {
ensure_python_cmd > /dev/null 2>&1
ps ax | grep -i app.py | grep "${BASE_DIR}" | grep "$PYTHON_CMD" | grep -v grep | awk '{print $1}' | grep -E '^[0-9]+$'
ps ax | grep -i app.py | grep "${BASE_DIR}" | grep "$PYTHON_CMD" | grep -v grep | awk '{print $1}' | grep -E '^[0-9]+$' | head -1
}
# Check if service is running
@@ -611,94 +643,122 @@ is_running() {
[ -n "$(get_pid)" ]
}
# Check if cow CLI is available
has_cow() {
command -v cow &> /dev/null
}
# Start service
cmd_start() {
# Check if config.json exists
if [ ! -f "${BASE_DIR}/config.json" ]; then
echo -e "${RED}${EMOJI_CROSS} config.json not found${NC}"
echo -e "${YELLOW}Please run './run.sh' to configure first${NC}"
exit 1
fi
if is_running; then
echo -e "${YELLOW}${EMOJI_WARN} CowAgent is already running (PID: $(get_pid))${NC}"
echo -e "${YELLOW}Use './run.sh restart' to restart${NC}"
return
if has_cow; then
cd "${BASE_DIR}"
cow start
else
if is_running; then
echo -e "${YELLOW}${EMOJI_WARN} CowAgent is already running (PID: $(get_pid))${NC}"
echo -e "${YELLOW}Use './run.sh restart' to restart${NC}"
return
fi
check_python_version
start_project
fi
check_python_version
start_project
}
# Stop service
cmd_stop() {
echo -e "${GREEN}${EMOJI_STOP} Stopping CowAgent...${NC}"
if has_cow; then
cd "${BASE_DIR}"
cow stop
else
echo -e "${GREEN}${EMOJI_STOP} Stopping CowAgent...${NC}"
if ! is_running; then
echo -e "${YELLOW}${EMOJI_WARN} CowAgent is not running${NC}"
return
if ! is_running; then
echo -e "${YELLOW}${EMOJI_WARN} CowAgent is not running${NC}"
return
fi
pid=$(get_pid)
if [ -z "$pid" ] || ! echo "$pid" | grep -qE '^[0-9]+$'; then
echo -e "${RED}❌ Failed to get valid PID (got: ${pid})${NC}"
return 1
fi
echo -e "${GREEN}Found running process (PID: ${pid})${NC}"
kill ${pid}
sleep 3
if ps -p ${pid} > /dev/null 2>&1; then
echo -e "${YELLOW}⚠️ Process not stopped, forcing termination...${NC}"
kill -9 ${pid}
fi
echo -e "${GREEN}${EMOJI_CHECK} CowAgent stopped${NC}"
fi
pid=$(get_pid)
if [ -z "$pid" ] || ! echo "$pid" | grep -qE '^[0-9]+$'; then
echo -e "${RED}❌ Failed to get valid PID (got: ${pid})${NC}"
return 1
fi
echo -e "${GREEN}Found running process (PID: ${pid})${NC}"
kill ${pid}
sleep 3
if ps -p ${pid} > /dev/null 2>&1; then
echo -e "${YELLOW}⚠️ Process not stopped, forcing termination...${NC}"
kill -9 ${pid}
fi
echo -e "${GREEN}${EMOJI_CHECK} CowAgent stopped${NC}"
}
# Restart service
cmd_restart() {
cmd_stop
sleep 1
cmd_start
if has_cow; then
cd "${BASE_DIR}"
cow restart
else
cmd_stop
sleep 1
cmd_start
fi
}
# Check status
cmd_status() {
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo -e "${CYAN}${BOLD} ${EMOJI_COW} CowAgent Status${NC}"
echo -e "${CYAN}${BOLD}=========================================${NC}"
if is_running; then
pid=$(get_pid)
echo -e "${GREEN}Status:${NC} ✅ Running"
echo -e "${GREEN}PID:${NC} ${pid}"
if [ -f "${BASE_DIR}/nohup.out" ]; then
echo -e "${GREEN}Logs:${NC} ${BASE_DIR}/nohup.out"
fi
if has_cow; then
cd "${BASE_DIR}"
cow status
else
echo -e "${YELLOW}Status:${NC} ⭐ Stopped"
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo -e "${CYAN}${BOLD} ${EMOJI_COW} CowAgent Status${NC}"
echo -e "${CYAN}${BOLD}=========================================${NC}"
if is_running; then
pid=$(get_pid)
echo -e "${GREEN}Status:${NC} ✅ Running"
echo -e "${GREEN}PID:${NC} ${pid}"
if [ -f "${BASE_DIR}/nohup.out" ]; then
echo -e "${GREEN}Logs:${NC} ${BASE_DIR}/nohup.out"
fi
else
echo -e "${YELLOW}Status:${NC} ⭐ Stopped"
fi
if [ -f "${BASE_DIR}/config.json" ]; then
model=$(grep -o '"model"[[:space:]]*:[[:space:]]*"[^"]*"' "${BASE_DIR}/config.json" | cut -d'"' -f4)
channel=$(grep -o '"channel_type"[[:space:]]*:[[:space:]]*"[^"]*"' "${BASE_DIR}/config.json" | cut -d'"' -f4)
echo -e "${GREEN}Model:${NC} ${model}"
echo -e "${GREEN}Channel:${NC} ${channel}"
fi
echo -e "${CYAN}${BOLD}=========================================${NC}"
fi
if [ -f "${BASE_DIR}/config.json" ]; then
model=$(grep -o '"model"[[:space:]]*:[[:space:]]*"[^"]*"' "${BASE_DIR}/config.json" | cut -d'"' -f4)
channel=$(grep -o '"channel_type"[[:space:]]*:[[:space:]]*"[^"]*"' "${BASE_DIR}/config.json" | cut -d'"' -f4)
echo -e "${GREEN}Model:${NC} ${model}"
echo -e "${GREEN}Channel:${NC} ${channel}"
fi
echo -e "${CYAN}${BOLD}=========================================${NC}"
}
# View logs
cmd_logs() {
if [ -f "${BASE_DIR}/nohup.out" ]; then
echo -e "${YELLOW}Viewing logs (Ctrl+C to exit):${NC}"
tail -f "${BASE_DIR}/nohup.out"
if has_cow; then
cd "${BASE_DIR}"
cow logs -f
else
echo -e "${RED}❌ Log file not found: ${BASE_DIR}/nohup.out${NC}"
if [ -f "${BASE_DIR}/nohup.out" ]; then
echo -e "${YELLOW}Viewing logs (Ctrl+C to exit):${NC}"
tail -f "${BASE_DIR}/nohup.out"
else
echo -e "${RED}❌ Log file not found: ${BASE_DIR}/nohup.out${NC}"
fi
fi
}

447
scripts/run.ps1 Normal file
View File

@@ -0,0 +1,447 @@
#Requires -Version 5.1
<#
.SYNOPSIS
CowAgent installer & management script for Windows.
.DESCRIPTION
One-liner install:
irm https://raw.githubusercontent.com/zhayujie/chatgpt-on-wechat/master/scripts/run.ps1 | iex
Or from a local clone:
.\scripts\run.ps1 # install / configure
.\scripts\run.ps1 start # start service (delegates to cow CLI)
.\scripts\run.ps1 stop|restart|status|logs|config|update|help
#>
param(
[Parameter(Position = 0)]
[string]$Command = ""
)
$ErrorActionPreference = "Stop"
# ── colours ──────────────────────────────────────────────────────
function Write-Cow { param([string]$M) Write-Host $M -ForegroundColor Green }
function Write-Warn { param([string]$M) Write-Host $M -ForegroundColor Yellow }
function Write-Err { param([string]$M) Write-Host $M -ForegroundColor Red }
function Write-Info { param([string]$M) Write-Host $M -ForegroundColor Cyan }
# ── detect project directory ─────────────────────────────────────
$ScriptDir = if ($PSScriptRoot) { $PSScriptRoot } else { $PWD.Path }
$BaseDir = Split-Path $ScriptDir -Parent
$IsProjectDir = (Test-Path "$BaseDir\app.py") -and (Test-Path "$BaseDir\config-template.json")
if (-not $IsProjectDir) {
$BaseDir = $PWD.Path
$IsProjectDir = (Test-Path "$BaseDir\app.py") -and (Test-Path "$BaseDir\config-template.json")
}
# ── Python detection ─────────────────────────────────────────────
function Find-Python {
foreach ($cmd in @("python3", "python")) {
$bin = Get-Command $cmd -ErrorAction SilentlyContinue
if (-not $bin) { continue }
try {
$ver = & $bin.Source -c "import sys; v=sys.version_info; print(f'{v.major}.{v.minor}')" 2>$null
$parts = $ver -split '\.'
$major = [int]$parts[0]; $minor = [int]$parts[1]
if ($major -eq 3 -and $minor -ge 9 -and $minor -le 13) {
return $bin.Source
}
} catch {}
}
return $null
}
$PythonCmd = Find-Python
function Assert-Python {
if (-not $PythonCmd) {
Write-Err "Python 3.9-3.13 not found. Please install from https://www.python.org/downloads/"
exit 1
}
Write-Cow "Found Python: $PythonCmd"
}
# ── clone project ────────────────────────────────────────────────
function Install-Project {
if (Test-Path "chatgpt-on-wechat") {
Write-Warn "Directory 'chatgpt-on-wechat' already exists."
$choice = Read-Host "Overwrite(o), backup(b), or quit(q)? [default: b]"
if (-not $choice) { $choice = "b" }
switch ($choice.ToLower()) {
"o" { Remove-Item -Recurse -Force "chatgpt-on-wechat" }
"b" {
$backup = "chatgpt-on-wechat_backup_$(Get-Date -Format 'yyyyMMddHHmmss')"
Rename-Item "chatgpt-on-wechat" $backup
Write-Cow "Backed up to '$backup'"
}
"q" { Write-Err "Installation cancelled."; exit 1 }
default { Write-Err "Invalid choice."; exit 1 }
}
}
$gitBin = Get-Command git -ErrorAction SilentlyContinue
if (-not $gitBin) {
Write-Err "Git not found. Please install from https://git-scm.com/download/win"
exit 1
}
Write-Cow "Cloning CowAgent project..."
git clone https://github.com/zhayujie/chatgpt-on-wechat.git 2>$null
if ($LASTEXITCODE -ne 0) {
Write-Warn "GitHub failed, trying Gitee..."
git clone https://gitee.com/zhayujie/chatgpt-on-wechat.git
if ($LASTEXITCODE -ne 0) {
Write-Err "Clone failed. Check your network."
exit 1
}
}
Set-Location "chatgpt-on-wechat"
$script:BaseDir = $PWD.Path
$script:IsProjectDir = $true
Write-Cow "Project cloned: $BaseDir"
}
# ── install dependencies ─────────────────────────────────────────
function Install-Dependencies {
Write-Cow "Installing dependencies..."
& $PythonCmd -m pip install --upgrade pip setuptools wheel 2>$null | Out-Null
& $PythonCmd -m pip install -r "$BaseDir\requirements.txt" 2>&1 | ForEach-Object { Write-Host $_ }
if ($LASTEXITCODE -ne 0) {
Write-Warn "Some dependencies may have issues, but continuing..."
}
Write-Cow "Registering cow CLI..."
& $PythonCmd -m pip install -e $BaseDir 2>$null | Out-Null
$cowBin = Get-Command cow -ErrorAction SilentlyContinue
if ($cowBin) {
Write-Cow "cow CLI registered."
} else {
Write-Warn "cow CLI not in PATH. You can use: $PythonCmd -m cli.cli"
}
}
# ── model selection ──────────────────────────────────────────────
$ModelChoices = @{
"1" = @{ Provider = "MiniMax"; Default = "MiniMax-M2.7"; Key = "MINIMAX_KEY" }
"2" = @{ Provider = "Zhipu AI"; Default = "glm-5-turbo"; Key = "ZHIPU_KEY" }
"3" = @{ Provider = "Kimi (Moonshot)"; Default = "kimi-k2.5"; Key = "MOONSHOT_KEY" }
"4" = @{ Provider = "Doubao (Volcengine Ark)"; Default = "doubao-seed-2-0-code-preview-260215"; Key = "ARK_KEY" }
"5" = @{ Provider = "Qwen (DashScope)"; Default = "qwen3.5-plus"; Key = "DASHSCOPE_KEY" }
"6" = @{ Provider = "Claude"; Default = "claude-sonnet-4-6"; Key = "CLAUDE_KEY"; Base = "https://api.anthropic.com/v1" }
"7" = @{ Provider = "Gemini"; Default = "gemini-3.1-pro-preview"; Key = "GEMINI_KEY"; Base = "https://generativelanguage.googleapis.com" }
"8" = @{ Provider = "OpenAI GPT"; Default = "gpt-5.4"; Key = "OPENAI_KEY"; Base = "https://api.openai.com/v1" }
"9" = @{ Provider = "LinkAI"; Default = "MiniMax-M2.7"; Key = "LINKAI_KEY" }
}
function Select-Model {
Write-Info "========================================="
Write-Info " Select AI Model"
Write-Info "========================================="
Write-Host "1) MiniMax (MiniMax-M2.7, MiniMax-M2.5, etc.)"
Write-Host "2) Zhipu AI (glm-5-turbo, glm-5, etc.)"
Write-Host "3) Kimi (kimi-k2.5, kimi-k2, etc.)"
Write-Host "4) Doubao (doubao-seed-2-0-code-preview-260215, etc.)"
Write-Host "5) Qwen (qwen3.5-plus, qwen3-max, qwq-plus, etc.)"
Write-Host "6) Claude (claude-sonnet-4-6, claude-opus-4-6, etc.)"
Write-Host "7) Gemini (gemini-3.1-flash-lite-preview, gemini-3.1-pro-preview, etc.)"
Write-Host "8) OpenAI GPT (gpt-5.4, gpt-5.2, gpt-4.1, etc.)"
Write-Host "9) LinkAI (access multiple models via one API)"
Write-Host ""
do {
$choice = Read-Host "Enter your choice [default: 1 - MiniMax]"
if (-not $choice) { $choice = "1" }
} while ($choice -notmatch '^[1-9]$')
$m = $ModelChoices[$choice]
Write-Cow "Configuring $($m.Provider)..."
$script:ApiKey = Read-Host "Enter $($m.Provider) API Key"
$model = Read-Host "Enter model name [default: $($m.Default)]"
if (-not $model) { $model = $m.Default }
$script:ModelName = $model
$script:KeyName = $m.Key
$script:UseLinkai = ($choice -eq "9")
if ($m.Base) {
$base = Read-Host "Enter API Base URL [default: $($m.Base)]"
if (-not $base) { $base = $m.Base }
$script:ApiBase = $base
} else {
$script:ApiBase = ""
}
$script:ModelChoice = $choice
}
# ── channel selection ────────────────────────────────────────────
function Select-Channel {
Write-Host ""
Write-Info "========================================="
Write-Info " Select Communication Channel"
Write-Info "========================================="
Write-Host "1) Weixin"
Write-Host "2) Feishu"
Write-Host "3) DingTalk"
Write-Host "4) WeCom Bot"
Write-Host "5) QQ"
Write-Host "6) WeCom App"
Write-Host "7) Web"
Write-Host ""
do {
$choice = Read-Host "Enter your choice [default: 1 - Weixin]"
if (-not $choice) { $choice = "1" }
} while ($choice -notmatch '^[1-7]$')
$script:ChannelExtra = @{}
switch ($choice) {
"1" { $script:ChannelType = "weixin" }
"2" {
$script:ChannelType = "feishu"
$script:ChannelExtra["feishu_app_id"] = Read-Host "Enter Feishu App ID"
$script:ChannelExtra["feishu_app_secret"] = Read-Host "Enter Feishu App Secret"
}
"3" {
$script:ChannelType = "dingtalk"
$script:ChannelExtra["dingtalk_client_id"] = Read-Host "Enter DingTalk Client ID"
$script:ChannelExtra["dingtalk_client_secret"] = Read-Host "Enter DingTalk Client Secret"
}
"4" {
$script:ChannelType = "wecom_bot"
$script:ChannelExtra["wecom_bot_id"] = Read-Host "Enter WeCom Bot ID"
$script:ChannelExtra["wecom_bot_secret"] = Read-Host "Enter WeCom Bot Secret"
}
"5" {
$script:ChannelType = "qq"
$script:ChannelExtra["qq_app_id"] = Read-Host "Enter QQ App ID"
$script:ChannelExtra["qq_app_secret"] = Read-Host "Enter QQ App Secret"
}
"6" {
$script:ChannelType = "wechatcom_app"
$script:ChannelExtra["wechatcom_corp_id"] = Read-Host "Enter WeChat Corp ID"
$script:ChannelExtra["wechatcomapp_token"] = Read-Host "Enter WeChat Com App Token"
$script:ChannelExtra["wechatcomapp_secret"] = Read-Host "Enter WeChat Com App Secret"
$script:ChannelExtra["wechatcomapp_agent_id"] = Read-Host "Enter WeChat Com App Agent ID"
$script:ChannelExtra["wechatcomapp_aes_key"] = Read-Host "Enter WeChat Com App AES Key"
$port = Read-Host "Enter port [default: 9898]"
if (-not $port) { $port = "9898" }
$script:ChannelExtra["wechatcomapp_port"] = [int]$port
}
"7" {
$script:ChannelType = "web"
$port = Read-Host "Enter web port [default: 9899]"
if (-not $port) { $port = "9899" }
$script:ChannelExtra["web_port"] = [int]$port
}
}
}
# ── generate config.json ─────────────────────────────────────────
function New-ConfigFile {
Write-Cow "Generating config.json..."
$config = [ordered]@{
channel_type = $ChannelType
model = $ModelName
open_ai_api_key = ""
open_ai_api_base = "https://api.openai.com/v1"
claude_api_key = ""
claude_api_base = "https://api.anthropic.com/v1"
gemini_api_key = ""
gemini_api_base = "https://generativelanguage.googleapis.com"
zhipu_ai_api_key = ""
moonshot_api_key = ""
ark_api_key = ""
dashscope_api_key = ""
minimax_api_key = ""
voice_to_text = "openai"
text_to_voice = "openai"
voice_reply_voice = $false
speech_recognition = $true
group_speech_recognition = $false
use_linkai = $UseLinkai
linkai_api_key = ""
linkai_app_code = ""
agent = $true
agent_max_context_tokens = 40000
agent_max_context_turns = 30
agent_max_steps = 15
}
# Set the correct API key field
$keyMap = @{
OPENAI_KEY = "open_ai_api_key"
CLAUDE_KEY = "claude_api_key"
GEMINI_KEY = "gemini_api_key"
ZHIPU_KEY = "zhipu_ai_api_key"
MOONSHOT_KEY = "moonshot_api_key"
ARK_KEY = "ark_api_key"
DASHSCOPE_KEY = "dashscope_api_key"
MINIMAX_KEY = "minimax_api_key"
LINKAI_KEY = "linkai_api_key"
}
if ($keyMap.ContainsKey($KeyName)) {
$config[$keyMap[$KeyName]] = $ApiKey
}
# Set API base if provided
$baseMap = @{
"6" = "claude_api_base"
"7" = "gemini_api_base"
"8" = "open_ai_api_base"
}
if ($ApiBase -and $baseMap.ContainsKey($ModelChoice)) {
$config[$baseMap[$ModelChoice]] = $ApiBase
}
# Merge channel-specific fields
foreach ($k in $ChannelExtra.Keys) {
$config[$k] = $ChannelExtra[$k]
}
$config | ConvertTo-Json -Depth 5 | Set-Content -Path "$BaseDir\config.json" -Encoding UTF8
Write-Cow "Configuration file created."
}
# ── start via cow CLI ─────────────────────────────────────────────
function Start-CowAgent {
Write-Cow "Starting CowAgent..."
$cowBin = Get-Command cow -ErrorAction SilentlyContinue
if ($cowBin) {
& cow start
} else {
Write-Warn "cow CLI not found, starting directly..."
& $PythonCmd "$BaseDir\app.py"
}
}
# ── delegate management commands to cow CLI ──────────────────────
function Invoke-CowCommand {
param([string]$Cmd)
$cowBin = Get-Command cow -ErrorAction SilentlyContinue
if ($cowBin) {
& cow $Cmd
} else {
Write-Err "cow CLI not found. Run this script without arguments first to install."
exit 1
}
}
# ── usage ─────────────────────────────────────────────────────────
function Show-Usage {
Write-Info "========================================="
Write-Info " CowAgent Management Script (Windows)"
Write-Info "========================================="
Write-Host ""
Write-Host "Usage:"
Write-Host " .\run.ps1 # Install / Configure"
Write-Host " .\run.ps1 <command> # Management command"
Write-Host ""
Write-Host "Commands:"
Write-Host " start Start the service"
Write-Host " stop Stop the service"
Write-Host " restart Restart the service"
Write-Host " status Check service status"
Write-Host " logs View logs"
Write-Host " config Reconfigure project"
Write-Host " update Update and restart"
Write-Host " help Show this message"
Write-Host ""
}
# ── install mode ──────────────────────────────────────────────────
function Install-Mode {
Clear-Host
Write-Info "========================================="
Write-Info " CowAgent Installation (Windows)"
Write-Info "========================================="
Write-Host ""
if ($IsProjectDir) {
Write-Cow "Detected existing project directory."
if (Test-Path "$BaseDir\config.json") {
Write-Cow "Project already configured."
Write-Host ""
Show-Usage
return
}
Write-Warn "No config.json found. Let's configure your project!"
Write-Host ""
Assert-Python
} else {
Assert-Python
Install-Project
}
Install-Dependencies
Select-Model
Select-Channel
New-ConfigFile
Write-Host ""
$startNow = Read-Host "Start CowAgent now? [Y/n]"
if ($startNow -ne "n" -and $startNow -ne "N") {
Start-CowAgent
} else {
Write-Cow "Installation complete!"
Write-Host ""
Write-Host "To start manually:"
Write-Host " cd $BaseDir"
Write-Host " cow start"
}
}
# ── update ────────────────────────────────────────────────────────
function Update-Project {
Write-Cow "Updating CowAgent..."
Set-Location $BaseDir
# Stop if running
$cowBin = Get-Command cow -ErrorAction SilentlyContinue
if ($cowBin) { & cow stop 2>$null }
if (Test-Path "$BaseDir\.git") {
Write-Cow "Pulling latest code..."
git pull 2>$null
if ($LASTEXITCODE -ne 0) {
Write-Warn "GitHub failed, trying Gitee..."
git remote set-url origin https://gitee.com/zhayujie/chatgpt-on-wechat.git
git pull
}
} else {
Write-Warn "Not a git repository, skipping code update."
}
Assert-Python
Install-Dependencies
Start-CowAgent
}
# ── main ──────────────────────────────────────────────────────────
switch ($Command.ToLower()) {
"" { Install-Mode }
"start" { Invoke-CowCommand "start" }
"stop" { Invoke-CowCommand "stop" }
"restart" { Invoke-CowCommand "restart" }
"status" { Invoke-CowCommand "status" }
"logs" { Invoke-CowCommand "logs" }
"config" {
Assert-Python
Install-Dependencies
Select-Model
Select-Channel
New-ConfigFile
$r = Read-Host "Restart service now? [Y/n]"
if ($r -ne "n" -and $r -ne "N") { Invoke-CowCommand "restart" }
}
"update" { Update-Project }
"help" { Show-Usage }
default {
Write-Err "Unknown command: $Command"
Show-Usage
exit 1
}
}

View File

@@ -4,6 +4,7 @@ description: Call LinkAI applications and workflows. Use bash with curl to invok
homepage: https://link-ai.tech
metadata:
emoji: 🤖
default_enabled: false
requires:
bins: ["curl"]
env: ["LINKAI_API_KEY"]