Compare commits

...

9 Commits

Author SHA1 Message Date
zhayujie
8ea2455766 feat(cli): add browser install cmd 2026-03-29 15:09:07 +08:00
zhayujie
3458621147 feat: add browser tool 2026-03-29 14:59:06 +08:00
zhayujie
079df5a47c feat: support batch skill install from zip and github 2026-03-29 14:38:11 +08:00
zhayujie
ddb07c65a1 feat: support github zip-first download, gitLab, git@ ssh, local path 2026-03-29 13:45:15 +08:00
zhayujie
9b21cd222b fix: update run.sh 2026-03-28 19:36:51 +08:00
zhayujie
90f736843f fix: add click dependencies 2026-03-28 19:35:15 +08:00
zhayujie
13c020eb61 fix(cli): cli output in wecom_bot 2026-03-28 19:26:59 +08:00
zhayujie
dbc06dbe95 fix: use new run.sh when updating 2026-03-28 19:16:41 +08:00
zhayujie
23d097bc1c Merge pull request #2726 from zhayujie/feat-cow-cli
feat: cow cli in terminal and chat
2026-03-28 19:01:56 +08:00
13 changed files with 1533 additions and 424 deletions

View File

@@ -87,25 +87,25 @@ FileSave = _optional_tools.get('FileSave')
Terminal = _optional_tools.get('Terminal')
# Delayed import for BrowserTool
# BrowserTool (requires playwright)
def _import_browser_tool():
from common.log import logger
try:
from agent.tools.browser.browser_tool import BrowserTool
return BrowserTool
except ImportError:
# Return a placeholder class that will prompt the user to install dependencies when instantiated
class BrowserToolPlaceholder:
def __init__(self, *args, **kwargs):
raise ImportError(
"The 'browser-use' package is required to use BrowserTool. "
"Please install it with 'pip install browser-use>=0.1.40'."
)
except ImportError as e:
logger.info(
f"[Tools] BrowserTool not loaded - missing dependency: {e}\n"
f" To enable browser tool, run:\n"
f" pip install playwright\n"
f" playwright install chromium"
)
return None
except Exception as e:
logger.error(f"[Tools] BrowserTool failed to load: {e}")
return None
return BrowserToolPlaceholder
# Dynamically set BrowserTool
# BrowserTool = _import_browser_tool()
BrowserTool = _import_browser_tool()
# Export all tools (including optional ones that might be None)
__all__ = [
@@ -124,8 +124,7 @@ __all__ = [
'WebSearch',
'WebFetch',
'Vision',
# Optional tools (may be None if dependencies not available)
# 'BrowserTool'
'BrowserTool',
]
"""

View File

@@ -0,0 +1,3 @@
from agent.tools.browser.browser_tool import BrowserTool
__all__ = ["BrowserTool"]

View File

@@ -0,0 +1,509 @@
"""
Browser service - Playwright wrapper managing browser lifecycle and page operations.
Lazily launches a Chromium instance on first use, reuses it across tool calls,
and cleans up on close(). Headless mode is auto-detected based on platform and
display availability.
"""
import os
import sys
import re
import uuid
from typing import Optional, Dict, Any, List
from common.log import logger
from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright
# ---------------------------------------------------------------------------
# Snapshot DOM helpers
# ---------------------------------------------------------------------------
# Tags that typically carry useful content for an agent
_INTERACTIVE_TAGS = {
"a", "button", "input", "textarea", "select", "option",
"label", "details", "summary",
}
_SEMANTIC_TAGS = {
"h1", "h2", "h3", "h4", "h5", "h6",
"p", "li", "td", "th", "caption", "figcaption", "blockquote", "pre", "code",
"nav", "main", "article", "section", "header", "footer", "form", "table",
"img", "video", "audio",
}
_KEEP_TAGS = _INTERACTIVE_TAGS | _SEMANTIC_TAGS
_SNAPSHOT_JS = """
() => {
const KEEP = new Set(%s);
const INTERACTIVE = new Set(%s);
const SKIP = new Set(["script","style","noscript","svg","path","meta","link","br","hr"]);
let refCounter = 0;
const refMap = {};
function visible(el) {
if (!(el instanceof HTMLElement)) return true;
const st = window.getComputedStyle(el);
if (st.display === "none" || st.visibility === "hidden") return false;
if (parseFloat(st.opacity) === 0) return false;
return true;
}
function walk(node) {
if (node.nodeType === Node.TEXT_NODE) {
const t = node.textContent.trim();
return t ? t : null;
}
if (node.nodeType !== Node.ELEMENT_NODE) return null;
const tag = node.tagName.toLowerCase();
if (SKIP.has(tag)) return null;
if (!visible(node)) return null;
const children = [];
for (const ch of node.childNodes) {
const r = walk(ch);
if (r !== null) {
if (typeof r === "string") children.push(r);
else children.push(r);
}
}
const keep = KEEP.has(tag);
if (!keep) {
// Unwrap: promote children
if (children.length === 0) return null;
if (children.length === 1) return children[0];
return children;
}
const obj = { tag };
if (INTERACTIVE.has(tag)) {
refCounter++;
obj.ref = refCounter;
refMap[refCounter] = node;
}
// Attributes
if (tag === "a" && node.href) obj.href = node.getAttribute("href");
if (tag === "img") {
obj.alt = node.alt || "";
obj.src = node.getAttribute("src") || "";
}
if (tag === "input" || tag === "textarea" || tag === "select") {
obj.type = node.type || "text";
obj.name = node.name || undefined;
obj.value = node.value || undefined;
obj.placeholder = node.placeholder || undefined;
if (node.disabled) obj.disabled = true;
if (tag === "input" && node.type === "checkbox") obj.checked = node.checked;
}
if (tag === "button") {
if (node.disabled) obj.disabled = true;
}
if (tag === "option") {
obj.value = node.value;
if (node.selected) obj.selected = true;
}
if (tag === "label" && node.htmlFor) obj.for = node.htmlFor;
// Role / aria-label
const role = node.getAttribute("role");
if (role) obj.role = role;
const ariaLabel = node.getAttribute("aria-label");
if (ariaLabel) obj.ariaLabel = ariaLabel;
// Children
if (children.length === 1 && typeof children[0] === "string") {
obj.text = children[0];
} else if (children.length > 0) {
obj.children = children;
}
return obj;
}
// Store refMap on window for later use by click/fill actions
const result = walk(document.body);
window.__cowRefMap = refMap;
return { tree: result, refCount: refCounter };
}
""" % (
str(list(_KEEP_TAGS)),
str(list(_INTERACTIVE_TAGS)),
)
def _should_use_headless() -> bool:
"""Decide headless mode: headless on Linux servers without display, headed elsewhere."""
if sys.platform in ("win32", "darwin"):
return False
# Linux: check for display
if os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY"):
return False
return True
def _flatten_tree(node, indent=0) -> List[str]:
"""Convert snapshot tree to compact text lines for LLM consumption."""
if node is None:
return []
if isinstance(node, str):
return [" " * indent + node]
if isinstance(node, list):
lines = []
for child in node:
lines.extend(_flatten_tree(child, indent))
return lines
if not isinstance(node, dict):
return []
tag = node.get("tag", "?")
ref = node.get("ref")
parts = [tag]
if ref:
parts[0] = f"[{ref}] {tag}"
# Inline attributes
for attr in ("type", "name", "href", "alt", "role", "ariaLabel", "placeholder", "value"):
val = node.get(attr)
if val:
# Truncate long values
s = str(val)
if len(s) > 80:
s = s[:77] + "..."
parts.append(f'{attr}="{s}"')
for flag in ("disabled", "checked", "selected"):
if node.get(flag):
parts.append(flag)
prefix = " " * indent
header = prefix + " ".join(parts)
text = node.get("text")
if text:
# Truncate long text
if len(text) > 120:
text = text[:117] + "..."
header += f": {text}"
lines = [header]
children = node.get("children", [])
for child in children:
lines.extend(_flatten_tree(child, indent + 2))
return lines
class BrowserService:
"""Manages a single Playwright browser instance with page operations."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self._config = config or {}
self._playwright: Optional[Playwright] = None
self._browser: Optional[Browser] = None
self._context: Optional[BrowserContext] = None
self._page: Optional[Page] = None
self._headless: Optional[bool] = None
self._screenshot_dir: Optional[str] = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def _ensure_browser(self):
"""Lazily launch browser on first use."""
if self._page and not self._page.is_closed():
return
if self._headless is None:
headless_cfg = self._config.get("headless")
self._headless = headless_cfg if headless_cfg is not None else _should_use_headless()
launch_args = ["--disable-dev-shm-usage"]
if self._headless:
launch_args.append("--no-sandbox")
extra_args = self._config.get("launch_args", [])
if extra_args:
launch_args.extend(extra_args)
viewport_w = self._config.get("viewport_width", 1280)
viewport_h = self._config.get("viewport_height", 720)
if not self._playwright:
self._playwright = sync_playwright().start()
logger.info(f"[Browser] Launching Chromium (headless={self._headless})")
self._browser = self._playwright.chromium.launch(
headless=self._headless,
args=launch_args,
)
self._context = self._browser.new_context(
viewport={"width": viewport_w, "height": viewport_h},
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
)
self._page = self._context.new_page()
logger.info("[Browser] Browser ready")
@property
def page(self) -> Page:
self._ensure_browser()
return self._page
def close(self):
"""Release all browser resources."""
try:
if self._context:
self._context.close()
except Exception as e:
logger.debug(f"[Browser] context close error: {e}")
try:
if self._browser:
self._browser.close()
except Exception as e:
logger.debug(f"[Browser] browser close error: {e}")
try:
if self._playwright:
self._playwright.stop()
except Exception as e:
logger.debug(f"[Browser] playwright stop error: {e}")
self._page = None
self._context = None
self._browser = None
self._playwright = None
logger.info("[Browser] Browser closed")
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
def navigate(self, url: str, timeout: int = 30000) -> Dict[str, Any]:
"""Navigate to a URL and return page info."""
page = self.page
try:
resp = page.goto(url, wait_until="domcontentloaded", timeout=timeout)
status = resp.status if resp else None
except Exception as e:
return {"error": f"Navigation failed: {e}"}
return {
"url": page.url,
"title": page.title(),
"status": status,
}
def snapshot(self, selector: Optional[str] = None) -> str:
"""
Return a compact text representation of the page DOM for LLM consumption.
Interactive elements get numeric refs usable in click/fill actions.
"""
page = self.page
try:
target = selector or "body"
result = page.evaluate(_SNAPSHOT_JS)
except Exception as e:
return f"[Snapshot error: {e}]"
tree = result.get("tree")
ref_count = result.get("refCount", 0)
lines = _flatten_tree(tree)
header = f"Page: {page.title()} ({page.url})\nInteractive elements: {ref_count}\n---"
body = "\n".join(lines)
# Limit output size
max_chars = self._config.get("snapshot_max_chars", 30000)
if len(body) > max_chars:
body = body[:max_chars] + "\n... [snapshot truncated]"
return f"{header}\n{body}"
def screenshot(self, full_page: bool = False, cwd: str = "") -> str:
"""Take a screenshot and save to workspace/tmp. Returns file path."""
page = self.page
save_dir = self._get_screenshot_dir(cwd)
filename = f"screenshot_{uuid.uuid4().hex[:8]}.png"
filepath = os.path.join(save_dir, filename)
page.screenshot(path=filepath, full_page=full_page)
logger.info(f"[Browser] Screenshot saved: {filepath}")
return filepath
def click(self, ref: Optional[int] = None, selector: Optional[str] = None,
timeout: int = 5000) -> Dict[str, Any]:
"""Click an element by snapshot ref or CSS selector."""
page = self.page
try:
if ref is not None:
result = page.evaluate(f"""
() => {{
const el = window.__cowRefMap && window.__cowRefMap[{ref}];
if (!el) return {{ error: "ref {ref} not found. Run snapshot first." }};
el.click();
return {{ clicked: true, tag: el.tagName.toLowerCase() }};
}}
""")
if result.get("error"):
return result
page.wait_for_timeout(500)
return result
elif selector:
page.click(selector, timeout=timeout)
return {"clicked": True, "selector": selector}
else:
return {"error": "Provide either ref (from snapshot) or selector"}
except Exception as e:
return {"error": f"Click failed: {e}"}
def fill(self, text: str, ref: Optional[int] = None,
selector: Optional[str] = None, timeout: int = 5000) -> Dict[str, Any]:
"""Fill text into an input/textarea by snapshot ref or CSS selector."""
page = self.page
try:
if ref is not None:
result = page.evaluate(f"""
() => {{
const el = window.__cowRefMap && window.__cowRefMap[{ref}];
if (!el) return {{ error: "ref {ref} not found. Run snapshot first." }};
el.focus();
el.value = "";
return {{ tag: el.tagName.toLowerCase(), name: el.name || "" }};
}}
""")
if result.get("error"):
return result
page.keyboard.type(text)
return {"filled": True, "ref": ref, "text": text}
elif selector:
page.fill(selector, text, timeout=timeout)
return {"filled": True, "selector": selector, "text": text}
else:
return {"error": "Provide either ref (from snapshot) or selector"}
except Exception as e:
return {"error": f"Fill failed: {e}"}
def select(self, value: str, ref: Optional[int] = None,
selector: Optional[str] = None, timeout: int = 5000) -> Dict[str, Any]:
"""Select an option in a <select> element."""
page = self.page
try:
if ref is not None:
result = page.evaluate(f"""
() => {{
const el = window.__cowRefMap && window.__cowRefMap[{ref}];
if (!el || el.tagName.toLowerCase() !== "select")
return {{ error: "ref {ref} is not a <select> element" }};
el.value = {repr(value)};
el.dispatchEvent(new Event("change", {{ bubbles: true }}));
return {{ selected: true, value: el.value }};
}}
""")
return result
elif selector:
page.select_option(selector, value, timeout=timeout)
return {"selected": True, "selector": selector, "value": value}
else:
return {"error": "Provide either ref (from snapshot) or selector"}
except Exception as e:
return {"error": f"Select failed: {e}"}
def scroll(self, direction: str = "down", amount: int = 500) -> Dict[str, Any]:
"""Scroll the page."""
page = self.page
delta_map = {
"down": (0, amount),
"up": (0, -amount),
"right": (amount, 0),
"left": (-amount, 0),
}
dx, dy = delta_map.get(direction, (0, amount))
try:
page.mouse.wheel(dx, dy)
page.wait_for_timeout(300)
scroll_info = page.evaluate("""
() => ({
scrollX: window.scrollX,
scrollY: window.scrollY,
scrollHeight: document.documentElement.scrollHeight,
clientHeight: document.documentElement.clientHeight
})
""")
return {"scrolled": direction, "amount": amount, **scroll_info}
except Exception as e:
return {"error": f"Scroll failed: {e}"}
def wait(self, selector: Optional[str] = None, timeout: int = 5000,
state: str = "visible") -> Dict[str, Any]:
"""Wait for a selector to appear or a fixed timeout."""
page = self.page
try:
if selector:
page.wait_for_selector(selector, timeout=timeout, state=state)
return {"waited": True, "selector": selector, "state": state}
else:
page.wait_for_timeout(timeout)
return {"waited": True, "timeout_ms": timeout}
except Exception as e:
return {"error": f"Wait failed: {e}"}
def go_back(self) -> Dict[str, Any]:
page = self.page
try:
page.go_back(wait_until="domcontentloaded", timeout=10000)
return {"url": page.url, "title": page.title()}
except Exception as e:
return {"error": f"Go back failed: {e}"}
def go_forward(self) -> Dict[str, Any]:
page = self.page
try:
page.go_forward(wait_until="domcontentloaded", timeout=10000)
return {"url": page.url, "title": page.title()}
except Exception as e:
return {"error": f"Go forward failed: {e}"}
def get_text(self, selector: str) -> Dict[str, Any]:
"""Get text content of an element."""
page = self.page
try:
text = page.text_content(selector, timeout=5000)
return {"text": text or ""}
except Exception as e:
return {"error": f"Get text failed: {e}"}
def evaluate(self, script: str) -> Dict[str, Any]:
"""Execute JavaScript in the page context."""
page = self.page
try:
result = page.evaluate(script)
return {"result": result}
except Exception as e:
return {"error": f"Evaluate failed: {e}"}
def press(self, key: str) -> Dict[str, Any]:
"""Press a keyboard key (e.g. Enter, Tab, Escape)."""
page = self.page
try:
page.keyboard.press(key)
page.wait_for_timeout(300)
return {"pressed": key}
except Exception as e:
return {"error": f"Press failed: {e}"}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _get_screenshot_dir(self, cwd: str = "") -> str:
if self._screenshot_dir and os.path.isdir(self._screenshot_dir):
return self._screenshot_dir
base = cwd or os.getcwd()
d = os.path.join(base, "tmp")
os.makedirs(d, exist_ok=True)
self._screenshot_dir = d
return d

View File

@@ -0,0 +1,287 @@
"""
Browser tool - Control a Chromium browser for web navigation and interaction.
Uses Playwright under the hood. Browser instance is lazily started on first
use, reused across tool calls within the same session, and cleaned up via
close().
"""
import json
import os
from typing import Dict, Any, Optional
from agent.tools.base_tool import BaseTool, ToolResult
from agent.tools.browser.browser_service import BrowserService
from common.log import logger
class BrowserTool(BaseTool):
"""Single tool exposing all browser actions via an 'action' parameter."""
name: str = "browser"
description: str = (
"Control a browser to navigate web pages, interact with elements, and extract content. "
"Actions: navigate, snapshot, click, fill, select, scroll, screenshot, wait, back, forward, "
"get_text, press, evaluate.\n\n"
"Workflow: navigate to a URL → snapshot to see the page (elements get numeric refs) → "
"use refs in click/fill/select actions → snapshot again to verify.\n\n"
"Use snapshot (not screenshot) as the primary way to read page content."
)
params: dict = {
"type": "object",
"properties": {
"action": {
"type": "string",
"description": (
"The browser action to perform. One of: "
"navigate, snapshot, click, fill, select, scroll, "
"screenshot, wait, back, forward, get_text, press, evaluate"
),
"enum": [
"navigate", "snapshot", "click", "fill", "select", "scroll",
"screenshot", "wait", "back", "forward", "get_text", "press",
"evaluate"
]
},
"url": {
"type": "string",
"description": "URL to navigate to (for 'navigate' action)"
},
"ref": {
"type": "integer",
"description": "Element ref number from snapshot (for click/fill/select)"
},
"selector": {
"type": "string",
"description": "CSS selector as fallback when ref is unavailable (for click/fill/select/wait/get_text)"
},
"text": {
"type": "string",
"description": "Text to type (for 'fill' action)"
},
"value": {
"type": "string",
"description": "Option value (for 'select' action)"
},
"key": {
"type": "string",
"description": "Key to press, e.g. Enter, Tab, Escape (for 'press' action)"
},
"direction": {
"type": "string",
"description": "Scroll direction: up, down, left, right (for 'scroll' action, default: down)"
},
"script": {
"type": "string",
"description": "JavaScript code to execute (for 'evaluate' action)"
},
"full_page": {
"type": "boolean",
"description": "Capture full page screenshot (for 'screenshot' action, default: false)"
},
"timeout": {
"type": "integer",
"description": "Timeout in milliseconds (optional, default varies by action)"
}
},
"required": ["action"]
}
_shared_service: Optional[BrowserService] = None
def __init__(self, config: dict = None):
self.config = config or {}
self.cwd = self.config.get("cwd", os.getcwd())
self._service: Optional[BrowserService] = None
def _get_service(self) -> BrowserService:
"""Get or create the browser service, sharing across copies."""
if self._service is not None:
return self._service
# Reuse shared service across tool copies within the same session
if BrowserTool._shared_service is not None:
self._service = BrowserTool._shared_service
return self._service
self._service = BrowserService(self.config)
BrowserTool._shared_service = self._service
return self._service
def execute(self, args: Dict[str, Any]) -> ToolResult:
action = args.get("action", "").strip().lower()
if not action:
return ToolResult.fail("Error: 'action' parameter is required")
handler = self._ACTION_MAP.get(action)
if not handler:
valid = ", ".join(sorted(self._ACTION_MAP.keys()))
return ToolResult.fail(f"Unknown action '{action}'. Valid actions: {valid}")
try:
return handler(self, args)
except Exception as e:
logger.error(f"[Browser] Action '{action}' error: {e}")
return ToolResult.fail(f"Browser error ({action}): {e}")
# ------------------------------------------------------------------
# Action handlers
# ------------------------------------------------------------------
def _do_navigate(self, args: Dict[str, Any]) -> ToolResult:
url = args.get("url", "").strip()
if not url:
return ToolResult.fail("Error: 'url' is required for navigate action")
if not url.startswith(("http://", "https://")):
url = "https://" + url
timeout = args.get("timeout", 30000)
result = self._get_service().navigate(url, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(
f"Navigated to: {result['url']}\nTitle: {result['title']}\nStatus: {result['status']}\n\n"
f"Use action 'snapshot' to see the page content."
)
def _do_snapshot(self, args: Dict[str, Any]) -> ToolResult:
selector = args.get("selector")
text = self._get_service().snapshot(selector=selector)
return ToolResult.success(text)
def _do_click(self, args: Dict[str, Any]) -> ToolResult:
ref = args.get("ref")
selector = args.get("selector")
timeout = args.get("timeout", 5000)
result = self._get_service().click(ref=ref, selector=selector, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Clicked successfully. Use 'snapshot' to see updated page.")
def _do_fill(self, args: Dict[str, Any]) -> ToolResult:
text = args.get("text", "")
ref = args.get("ref")
selector = args.get("selector")
timeout = args.get("timeout", 5000)
if not text and text != "":
return ToolResult.fail("Error: 'text' is required for fill action")
result = self._get_service().fill(text, ref=ref, selector=selector, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Filled text into element. Use 'snapshot' to verify.")
def _do_select(self, args: Dict[str, Any]) -> ToolResult:
value = args.get("value", "")
ref = args.get("ref")
selector = args.get("selector")
timeout = args.get("timeout", 5000)
if not value:
return ToolResult.fail("Error: 'value' is required for select action")
result = self._get_service().select(value, ref=ref, selector=selector, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Selected option '{value}'.")
def _do_scroll(self, args: Dict[str, Any]) -> ToolResult:
direction = args.get("direction", "down")
amount = args.get("timeout", 500) # reuse timeout field or default
if "amount" in args:
amount = args["amount"]
result = self._get_service().scroll(direction=direction, amount=amount)
if "error" in result:
return ToolResult.fail(result["error"])
pos = f"scrollY={result.get('scrollY', '?')}/{result.get('scrollHeight', '?')}"
return ToolResult.success(f"Scrolled {direction}. Position: {pos}")
def _do_screenshot(self, args: Dict[str, Any]) -> ToolResult:
full_page = args.get("full_page", False)
filepath = self._get_service().screenshot(full_page=full_page, cwd=self.cwd)
return ToolResult.success(f"Screenshot saved to: {filepath}")
def _do_wait(self, args: Dict[str, Any]) -> ToolResult:
selector = args.get("selector")
timeout = args.get("timeout", 5000)
result = self._get_service().wait(selector=selector, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Wait completed.")
def _do_back(self, args: Dict[str, Any]) -> ToolResult:
result = self._get_service().go_back()
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Navigated back to: {result['url']}")
def _do_forward(self, args: Dict[str, Any]) -> ToolResult:
result = self._get_service().go_forward()
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Navigated forward to: {result['url']}")
def _do_get_text(self, args: Dict[str, Any]) -> ToolResult:
selector = args.get("selector", "").strip()
if not selector:
return ToolResult.fail("Error: 'selector' is required for get_text action")
result = self._get_service().get_text(selector)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(result["text"])
def _do_press(self, args: Dict[str, Any]) -> ToolResult:
key = args.get("key", "").strip()
if not key:
return ToolResult.fail("Error: 'key' is required for press action")
result = self._get_service().press(key)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Pressed key: {key}")
def _do_evaluate(self, args: Dict[str, Any]) -> ToolResult:
script = args.get("script", "").strip()
if not script:
return ToolResult.fail("Error: 'script' is required for evaluate action")
result = self._get_service().evaluate(script)
if "error" in result:
return ToolResult.fail(result["error"])
val = result.get("result")
if isinstance(val, (dict, list)):
return ToolResult.success(json.dumps(val, ensure_ascii=False, indent=2))
return ToolResult.success(str(val) if val is not None else "(no return value)")
# Action dispatch table
_ACTION_MAP = {
"navigate": _do_navigate,
"snapshot": _do_snapshot,
"click": _do_click,
"fill": _do_fill,
"select": _do_select,
"scroll": _do_scroll,
"screenshot": _do_screenshot,
"wait": _do_wait,
"back": _do_back,
"forward": _do_forward,
"get_text": _do_get_text,
"press": _do_press,
"evaluate": _do_evaluate,
}
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def copy(self):
"""Share browser instance across tool copies (avoids re-launching)."""
new_tool = BrowserTool(self.config)
new_tool.model = self.model
new_tool.context = getattr(self, "context", None)
new_tool.cwd = self.cwd
new_tool._service = self._service
return new_tool
def close(self):
"""Release browser resources."""
if self._service:
self._service.close()
self._service = None
BrowserTool._shared_service = None
logger.info("[Browser] BrowserTool closed")

View File

@@ -1,18 +0,0 @@
def copy(self):
"""
Special copy method for browser tool to avoid recreating browser instance.
:return: A new instance with shared browser reference but unique model
"""
new_tool = self.__class__()
# Copy essential attributes
new_tool.model = self.model
new_tool.context = getattr(self, 'context', None)
new_tool.config = getattr(self, 'config', None)
# Share the browser instance instead of creating a new one
if hasattr(self, 'browser'):
new_tool.browser = self.browser
return new_tool

View File

@@ -84,11 +84,11 @@ class ToolManager:
except ImportError as e:
# Handle missing dependencies with helpful messages
error_msg = str(e)
if "browser-use" in error_msg or "browser_use" in error_msg:
if "playwright" in error_msg:
logger.warning(
f"[ToolManager] Browser tool not loaded - missing dependencies.\n"
f" To enable browser tool, run:\n"
f" pip install browser-use markdownify playwright\n"
f" pip install playwright\n"
f" playwright install chromium"
)
elif "markdownify" in error_msg:
@@ -154,11 +154,11 @@ class ToolManager:
except ImportError as e:
# Handle missing dependencies with helpful messages
error_msg = str(e)
if "browser-use" in error_msg or "browser_use" in error_msg:
if "playwright" in error_msg:
logger.warning(
f"[ToolManager] Browser tool not loaded - missing dependencies.\n"
f" To enable browser tool, run:\n"
f" pip install browser-use markdownify playwright\n"
f" pip install playwright\n"
f" playwright install chromium"
)
elif "markdownify" in error_msg:
@@ -197,7 +197,7 @@ class ToolManager:
logger.warning(
f"[ToolManager] Browser tool is configured but not loaded.\n"
f" To enable browser tool, run:\n"
f" pip install browser-use markdownify playwright\n"
f" pip install playwright\n"
f" playwright install chromium"
)
elif tool_name == "google_search":

View File

@@ -452,7 +452,7 @@ class WecomBotChannel(ChatChannel):
if req_id:
state = self._stream_states.pop(req_id, None)
if state:
final_content = state["committed"]
final_content = state["committed"] or content
stream_id = state["stream_id"]
else:
final_content = content

View File

@@ -5,6 +5,7 @@ from cli import __version__
from cli.commands.skill import skill
from cli.commands.process import start, stop, restart, update, status, logs
from cli.commands.context import context
from cli.commands.install import install_browser
HELP_TEXT = """Usage: cow COMMAND [ARGS]...
@@ -21,6 +22,7 @@ Commands:
status Show CowAgent running status.
logs View CowAgent logs.
skill Manage CowAgent skills.
install-browser Install browser tool (Playwright + Chromium).
Tip: You can also send /help, /skill list, etc. in agent chat."""
@@ -67,6 +69,7 @@ main.add_command(update)
main.add_command(status)
main.add_command(logs)
main.add_command(context)
main.add_command(install_browser)
if __name__ == '__main__':

63
cli/commands/install.py Normal file
View File

@@ -0,0 +1,63 @@
"""cow install-browser - Install Playwright + Chromium for the browser tool."""
import os
import sys
import subprocess
import click
def _has_display() -> bool:
"""Check if a graphical display is available (Linux only)."""
return bool(os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY"))
def _is_headless_linux() -> bool:
"""True when running on a Linux server without a display."""
return sys.platform == "linux" and not _has_display()
@click.command("install-browser")
def install_browser():
"""Install browser tool dependencies (Playwright + Chromium)."""
python = sys.executable
# Step 1: Install playwright package
click.echo(click.style("[1/3] Installing playwright Python package...", fg="yellow"))
ret = subprocess.call([python, "-m", "pip", "install", "playwright"])
if ret != 0:
click.echo(click.style("Failed to install playwright package.", fg="red"))
raise SystemExit(1)
click.echo(click.style("playwright package installed.", fg="green"))
click.echo()
# Step 2: System dependencies (Linux only)
if sys.platform == "linux":
click.echo(click.style("[2/3] Installing system dependencies (Linux)...", fg="yellow"))
ret = subprocess.call([python, "-m", "playwright", "install-deps", "chromium"])
if ret != 0:
click.echo(click.style(
"Could not auto-install system deps (may need sudo).\n"
f" Run manually: sudo {python} -m playwright install-deps chromium",
fg="yellow",
))
else:
click.echo(click.style(f"[2/3] Skipping system deps (not needed on {sys.platform}).", fg="yellow"))
click.echo()
# Step 3: Install Chromium (headless shell on Linux servers, full elsewhere)
click.echo(click.style("[3/3] Installing Chromium browser...", fg="yellow"))
cmd = [python, "-m", "playwright", "install", "chromium"]
if _is_headless_linux():
cmd.append("--only-shell")
click.echo(" (headless-only mode for Linux server)")
elif sys.platform == "linux":
click.echo(" (full browser for Linux desktop)")
ret = subprocess.call(cmd)
if ret != 0:
click.echo(click.style("Failed to install Chromium.", fg="red"))
raise SystemExit(1)
click.echo()
click.echo(click.style("Browser tool ready! Restart CowAgent to enable it.", fg="green"))

View File

@@ -8,6 +8,8 @@ import hashlib
import shutil
import zipfile
import tempfile
from dataclasses import dataclass, field
from typing import Optional, List
from urllib.parse import urlparse
@@ -22,10 +24,34 @@ from cli.utils import (
SKILL_HUB_API,
)
# ======================================================================
# Public types for the core install API (used by CLI and chat plugin)
# ======================================================================
class SkillInstallError(Exception):
"""Raised when skill installation fails."""
pass
@dataclass
class InstallResult:
"""Result of a skill installation operation."""
installed: List[str] = field(default_factory=list)
messages: List[str] = field(default_factory=list)
error: Optional[str] = None
_SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$")
_GITHUB_URL_RE = re.compile(
r"^https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(?:tree|blob)/([^/]+)(?:/(.+))?)?/?$"
)
_GITLAB_URL_RE = re.compile(
r"^https?://gitlab\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/-/tree/([^/]+)(?:/(.+))?)?/?$"
)
_GIT_SSH_RE = re.compile(
r"^git@([^:]+):([^/]+)/([^/]+?)(?:\.git)?$"
)
def _parse_github_url(url: str):
@@ -45,11 +71,92 @@ def _parse_github_url(url: str):
return owner, repo, branch or "main", subpath
def _parse_gitlab_url(url: str):
"""Parse a GitLab URL into (owner, repo, branch, subpath).
Returns None if the URL doesn't match.
Supported formats:
https://gitlab.com/owner/repo
https://gitlab.com/owner/repo/-/tree/branch
https://gitlab.com/owner/repo/-/tree/branch/path/to/skill
"""
m = _GITLAB_URL_RE.match(url.strip())
if not m:
return None
owner, repo, branch, subpath = m.groups()
return owner, repo, branch or "main", subpath
def _parse_git_ssh_url(url: str):
"""Parse a git@ SSH URL into (host, owner, repo).
Returns None if the URL doesn't match.
Supported format: git@github.com:owner/repo.git
"""
m = _GIT_SSH_RE.match(url.strip())
if not m:
return None
host, owner, repo = m.groups()
return host, owner, repo
def _clone_repo(git_url: str):
"""Shallow-clone a git repo and return (tmp_dir, repo_root).
Requires git to be installed. The caller must clean up tmp_dir.
"""
tmp_dir = tempfile.mkdtemp(prefix="cow-skill-")
repo_dir = os.path.join(tmp_dir, "repo")
try:
import subprocess
subprocess.run(
["git", "clone", "--depth", "1", git_url, repo_dir],
check=True, capture_output=True, timeout=120,
)
except FileNotFoundError:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError("git is not installed")
except Exception as e:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError(f"git clone failed: {e}")
return tmp_dir, repo_dir
def _download_repo_zip(spec: str, branch: str = "main", host: str = "github"):
"""Download a GitHub/GitLab repo as zip and extract it.
Returns (tmp_dir, repo_root) where tmp_dir is the temp directory to clean up
and repo_root is the extracted repository root path.
"""
if host == "gitlab":
zip_url = f"https://gitlab.com/{spec}/-/archive/{branch}/{spec.split('/')[-1]}-{branch}.zip"
else:
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
resp = requests.get(zip_url, timeout=120, allow_redirects=True)
resp.raise_for_status()
tmp_dir = tempfile.mkdtemp(prefix="cow-skill-")
zip_path = os.path.join(tmp_dir, "repo.zip")
with open(zip_path, "wb") as f:
f.write(resp.content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
# GitHub zips have a single top-level dir like "repo-main/"
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
return tmp_dir, os.path.join(extract_dir, top_items[0])
return tmp_dir, extract_dir
def _download_github_dir(owner, repo, branch, subpath, dest_dir):
"""Download a subdirectory from GitHub using the Contents API.
Recursively fetches all files under the given subpath and writes them
to dest_dir. Raises on any network or API error.
to dest_dir. Used as a fallback when zip download fails.
Costs one API request per directory (60/hr unauthenticated).
"""
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{subpath}?ref={branch}"
resp = requests.get(api_url, timeout=30, headers={"Accept": "application/vnd.github.v3+json"})
@@ -80,6 +187,139 @@ def _download_github_dir(owner, repo, branch, subpath, dest_dir):
_download_github_dir(owner, repo, branch, child_subpath, dest_dir)
# Directories to search for skills following the Agent Skills convention
_SKILL_SCAN_DIRS = [
"skills",
"skills/.curated",
"skills/.experimental",
]
_SKILL_SCAN_SKIP = {
"node_modules", "__pycache__", ".git", ".github", "venv", ".venv",
}
def _scan_skills_in_repo(repo_root: str) -> list:
"""Scan a repo for skill directories containing SKILL.md.
Searches in conventional locations (skills/, skills/.curated/, etc.)
and also checks the repo root itself.
Returns a list of (skill_name, skill_dir_path) tuples.
"""
found = []
# Check repo root for a SKILL.md (single-skill repo)
if os.path.isfile(os.path.join(repo_root, "SKILL.md")):
fm = _parse_skill_frontmatter(_read_file_text(os.path.join(repo_root, "SKILL.md")))
name = fm.get("name") or os.path.basename(repo_root)
found.append((name, repo_root))
return found
for scan_dir in _SKILL_SCAN_DIRS:
search_root = os.path.join(repo_root, scan_dir)
if not os.path.isdir(search_root):
continue
for entry in os.listdir(search_root):
if entry.startswith(".") and entry not in (".curated", ".experimental"):
continue
if entry in _SKILL_SCAN_SKIP:
continue
entry_path = os.path.join(search_root, entry)
if os.path.isdir(entry_path) and os.path.isfile(os.path.join(entry_path, "SKILL.md")):
fm = _parse_skill_frontmatter(
_read_file_text(os.path.join(entry_path, "SKILL.md"))
)
name = fm.get("name") or entry
found.append((name, entry_path))
return found
def _scan_skills_in_dir(directory: str) -> list:
"""Scan immediate subdirectories for SKILL.md files.
Unlike _scan_skills_in_repo which checks conventional locations,
this scans all direct children of the given directory.
Returns a list of (skill_name, skill_dir_path) tuples.
"""
found = []
if not os.path.isdir(directory):
return found
for entry in os.listdir(directory):
if entry.startswith(".") or entry in _SKILL_SCAN_SKIP:
continue
entry_path = os.path.join(directory, entry)
if os.path.isdir(entry_path) and os.path.isfile(os.path.join(entry_path, "SKILL.md")):
fm = _parse_skill_frontmatter(
_read_file_text(os.path.join(entry_path, "SKILL.md"))
)
name = fm.get("name") or entry
found.append((name, entry_path))
return found
def _batch_install_skills(discovered, spec, skills_dir, source, result: InstallResult):
"""Install a list of discovered skills into skills_dir."""
result.messages.append(f"Found {len(discovered)} skill(s) in {spec}:")
for sname, sdir in discovered:
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64]
if not _SAFE_NAME_RE.match(safe_name):
result.messages.append(f" Skipping '{sname}' (invalid name)")
continue
target_dir = os.path.join(skills_dir, safe_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(sdir, target_dir)
_register_installed_skill(safe_name, source=source)
result.installed.append(safe_name)
result.messages.append(f" + {safe_name}")
if result.installed:
result.messages.append(f"{len(result.installed)} skill(s) installed from {spec}.")
else:
result.messages.append("No valid skills found.")
def _read_file_text(path: str) -> str:
"""Read a file as UTF-8 text, returning empty string on failure."""
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
return ""
def _install_local(path: str, result: InstallResult):
"""Install skill(s) from a local directory."""
path = os.path.abspath(os.path.expanduser(path))
if not os.path.isdir(path):
raise SkillInstallError(f"'{path}' is not a directory.")
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
if os.path.isfile(os.path.join(path, "SKILL.md")):
fm = _parse_skill_frontmatter(_read_file_text(os.path.join(path, "SKILL.md")))
skill_name = fm.get("name") or os.path.basename(path)
skill_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', skill_name)[:64]
_check_skill_name(skill_name)
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(path, target_dir)
_register_installed_skill(skill_name, source="local")
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from local path.")
return
discovered = _scan_skills_in_repo(path) or _scan_skills_in_dir(path)
if not discovered:
raise SkillInstallError(f"No skills found in '{path}'.")
_batch_install_skills(discovered, path, skills_dir, "local", result)
def _register_installed_skill(name: str, source: str = "cowhub"):
"""Register a newly installed skill into skills_config.json.
@@ -145,39 +385,38 @@ def _read_skill_description(skill_dir: str) -> str:
return ""
def _install_url(url: str):
def _install_url(url: str, result: InstallResult):
"""Install a skill from a direct SKILL.md URL."""
click.echo(f"Downloading SKILL.md from {url} ...")
result.messages.append(f"Downloading SKILL.md from {url} ...")
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
except Exception as e:
click.echo(f"Error: Failed to download SKILL.md: {e}", err=True)
sys.exit(1)
raise SkillInstallError(f"Failed to download SKILL.md: {e}")
content = resp.text
fm = _parse_skill_frontmatter(content)
skill_name = fm.get("name")
if not skill_name:
click.echo("Error: SKILL.md missing 'name' field in frontmatter.", err=True)
sys.exit(1)
raise SkillInstallError("SKILL.md missing 'name' field in frontmatter.")
skill_name = skill_name.strip()
_validate_skill_name(skill_name)
_check_skill_name(skill_name)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
skill_dir = os.path.join(skills_dir, skill_name)
if os.path.isdir(skill_dir):
click.echo(f"Skill '{skill_name}' already exists. Overwriting SKILL.md ...")
result.messages.append(f"Skill '{skill_name}' already exists. Overwriting SKILL.md ...")
os.makedirs(skill_dir, exist_ok=True)
with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f:
f.write(content)
_register_installed_skill(skill_name, source="url")
_print_install_success(skill_name, "url")
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from URL.")
def _print_install_success(name: str, source: str):
@@ -210,6 +449,20 @@ def _validate_github_spec(spec: str):
sys.exit(1)
def _check_skill_name(name: str):
"""Raise SkillInstallError if name is invalid."""
if not _SAFE_NAME_RE.match(name):
raise SkillInstallError(
f"Invalid skill name '{name}'. Use only letters, digits, hyphens, and underscores."
)
def _check_github_spec(spec: str):
"""Raise SkillInstallError if spec is not owner/repo."""
if not re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+$", spec):
raise SkillInstallError(f"Invalid GitHub spec '{spec}'. Expected format: owner/repo")
def _safe_extractall(zf: zipfile.ZipFile, dest: str):
"""Extract zip while guarding against Zip Slip (path traversal)."""
dest = os.path.realpath(dest)
@@ -241,6 +494,18 @@ def _verify_checksum(content: bytes, expected: str):
return True
def _check_checksum(content: bytes, expected: str):
"""Raise SkillInstallError on SHA-256 checksum mismatch."""
if not expected:
return
actual = hashlib.sha256(content).hexdigest()
if actual != expected.lower():
raise SkillInstallError(
f"Checksum mismatch! Expected: {expected}, Actual: {actual}. "
"The downloaded package may have been tampered with."
)
@click.group()
def skill():
"""Manage CowAgent skills."""
@@ -412,63 +677,140 @@ def search(query):
# ------------------------------------------------------------------
# cow skill install
# Core install function — reusable from CLI and chat plugin
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def install(name):
"""Install a skill from Skill Hub, GitHub, or a SKILL.md URL.
Examples:
def install_skill(name: str) -> InstallResult:
"""Core install logic, usable from CLI and chat plugin.
cow skill install pptx
cow skill install github:owner/repo
cow skill install github:owner/repo#path/to/skill
cow skill install https://github.com/owner/repo/tree/main/path/to/skill
cow skill install https://example.com/path/to/SKILL.md
Accepts all formats: Skill Hub name, owner/repo, GitHub/GitLab URL,
git@ SSH, local path, SKILL.md URL.
Returns InstallResult with installed skill names and messages.
"""
result = InstallResult()
try:
_route_install(name, result)
except SkillInstallError as e:
result.error = str(e)
return result
def _route_install(name: str, result: InstallResult):
"""Dispatch to the appropriate installer based on input format."""
# --- Local path ---
if name.startswith(("./", "../", "/", "~/")):
_install_local(name, result)
return
# --- Direct SKILL.md URL ---
if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"):
# GitHub SKILL.md → strip filename and install the whole directory
dir_url = re.sub(r'/SKILL\.md/?$', '', name)
gh = _parse_github_url(dir_url)
if gh:
owner, repo, branch, subpath = gh
spec = f"{owner}/{repo}"
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
_install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
_install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=(
subpath.rstrip("/").split("/")[-1] if subpath else repo
), branch=branch)
return
_install_url(name)
_install_url(name, result)
return
# --- Full GitHub URL ---
parsed = _parse_github_url(name)
if parsed:
owner, repo, branch, subpath = parsed
spec = f"{owner}/{repo}"
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
_install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
elif name.startswith("github:"):
skill_name = name[7:]
_validate_skill_name(skill_name)
_install_hub(skill_name)
elif name.startswith("clawhub:"):
_install_github(f"{owner}/{repo}", result, subpath=subpath, branch=branch)
return
# --- Full GitLab URL ---
gl = _parse_gitlab_url(name)
if gl:
owner, repo, branch, subpath = gl
_install_gitlab(f"{owner}/{repo}", result, subpath=subpath, branch=branch)
return
# --- git@host:owner/repo.git SSH URL ---
ssh = _parse_git_ssh_url(name)
if ssh:
host, owner, repo = ssh
_install_git_clone(name, result, display_name=f"{owner}/{repo}")
return
# --- github: prefix ---
if name.startswith("github:"):
raw = name[7:]
subpath = None
if "#" in raw:
raw, subpath = raw.split("#", 1)
_check_github_spec(raw)
_install_github(raw, result, subpath=subpath)
return
# --- clawhub: prefix ---
if name.startswith("clawhub:"):
skill_name = name[8:]
_validate_skill_name(skill_name)
_install_hub(skill_name, provider="clawhub")
else:
_validate_skill_name(name)
_install_hub(name)
_check_skill_name(skill_name)
_install_hub(skill_name, result, provider="clawhub")
return
# --- owner/repo or owner/repo#subpath shorthand ---
if re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+(?:#.+)?$", name):
subpath = None
spec = name
if "#" in spec:
spec, subpath = spec.split("#", 1)
_install_github(spec, result, subpath=subpath)
return
# --- Fallback: Skill Hub by name ---
_check_skill_name(name)
_install_hub(name, result)
def _install_hub(name, provider=None):
# ------------------------------------------------------------------
# cow skill install (CLI thin wrapper)
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def install(name):
"""Install skill(s) from Skill Hub, GitHub, GitLab, git URL, or local path.
When given an owner/repo (or full URL), downloads the repo and
auto-discovers all skills/ subdirectories containing SKILL.md,
installing them in batch. Use a subpath to install a single skill.
Examples:
cow skill install pptx (from Skill Hub)
cow skill install larksuite/cli (GitHub shorthand, all skills)
cow skill install larksuite/cli#skills/lark-im (single skill by subpath)
cow skill install https://github.com/owner/repo
cow skill install https://gitlab.com/org/repo
cow skill install git@github.com:owner/repo.git
cow skill install ./my-local-skills (local directory)
cow skill install https://example.com/path/to/SKILL.md
"""
result = install_skill(name)
for msg in result.messages:
click.echo(msg)
if result.error:
click.echo(f"Error: {result.error}", err=True)
sys.exit(1)
def _install_hub(name, result: InstallResult, provider=None):
"""Install a skill from Skill Hub."""
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
click.echo(f"Fetching skill info for '{name}'...")
result.messages.append(f"Fetching skill info for '{name}'...")
try:
body = {}
@@ -482,13 +824,12 @@ def _install_hub(name, provider=None):
resp.raise_for_status()
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
click.echo(f"Error: Skill '{name}' not found on Skill Hub.", err=True)
else:
click.echo(f"Error: Failed to fetch skill: {e}", err=True)
sys.exit(1)
raise SkillInstallError(f"Skill '{name}' not found on Skill Hub.")
raise SkillInstallError(f"Failed to fetch skill: {e}")
except SkillInstallError:
raise
except Exception as e:
click.echo(f"Error: Failed to connect to Skill Hub: {e}", err=True)
sys.exit(1)
raise SkillInstallError(f"Failed to connect to Skill Hub: {e}")
content_type = resp.headers.get("Content-Type", "")
@@ -501,12 +842,12 @@ def _install_hub(name, provider=None):
parsed_url = _parse_github_url(source_url)
if parsed_url:
owner, repo, branch, subpath = parsed_url
click.echo(f"Source: GitHub ({source_url})")
_install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch)
result.messages.append(f"Source: GitHub ({source_url})")
_install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=name, branch=branch)
else:
_validate_github_spec(source_url)
click.echo(f"Source: GitHub ({source_url})")
_install_github(source_url, skill_name=name)
_check_github_spec(source_url)
result.messages.append(f"Source: GitHub ({source_url})")
_install_github(source_url, result, skill_name=name)
return
if source_type == "registry":
@@ -514,25 +855,25 @@ def _install_hub(name, provider=None):
if download_url:
parsed = urlparse(download_url)
if parsed.scheme != "https":
click.echo(f"Error: Refusing to download from non-HTTPS URL.", err=True)
sys.exit(1)
provider = data.get("source_provider", "registry")
raise SkillInstallError("Refusing to download from non-HTTPS URL.")
src_provider = data.get("source_provider", "registry")
expected_checksum = data.get("checksum") or data.get("sha256")
click.echo(f"Source: {provider}")
click.echo("Downloading skill package...")
result.messages.append(f"Source: {src_provider}")
result.messages.append("Downloading skill package...")
try:
dl_resp = requests.get(download_url, timeout=60, allow_redirects=True)
dl_resp.raise_for_status()
except Exception as e:
click.echo(f"Error: Failed to download from {provider}: {e}", err=True)
sys.exit(1)
_verify_checksum(dl_resp.content, expected_checksum)
_install_zip_bytes(dl_resp.content, name, skills_dir)
_register_installed_skill(name, source=provider)
_print_install_success(name, provider)
raise SkillInstallError(f"Failed to download from {src_provider}: {e}")
_check_checksum(dl_resp.content, expected_checksum)
installed_before = len(result.installed)
_install_zip_bytes(dl_resp.content, name, skills_dir, result=result, source_label=src_provider)
if len(result.installed) == installed_before:
_register_installed_skill(name, source=src_provider)
result.installed.append(name)
result.messages.append(f"Installed '{name}' from {src_provider}.")
else:
click.echo(f"Error: Unsupported registry provider.", err=True)
sys.exit(1)
raise SkillInstallError("Unsupported registry provider.")
return
if "redirect" in data:
@@ -540,107 +881,189 @@ def _install_hub(name, provider=None):
parsed_url = _parse_github_url(source_url)
if parsed_url:
owner, repo, branch, subpath = parsed_url
click.echo(f"Source: GitHub ({source_url})")
_install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch)
result.messages.append(f"Source: GitHub ({source_url})")
_install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=name, branch=branch)
else:
_validate_github_spec(source_url)
click.echo(f"Source: GitHub ({source_url})")
_install_github(source_url, skill_name=name)
_check_github_spec(source_url)
result.messages.append(f"Source: GitHub ({source_url})")
_install_github(source_url, result, skill_name=name)
return
elif "application/zip" in content_type:
click.echo("Downloading skill package...")
result.messages.append("Downloading skill package...")
expected_checksum = resp.headers.get("X-Checksum-Sha256")
_verify_checksum(resp.content, expected_checksum)
_install_zip_bytes(resp.content, name, skills_dir)
_register_installed_skill(name)
_print_install_success(name, "cowhub")
_check_checksum(resp.content, expected_checksum)
installed_before = len(result.installed)
_install_zip_bytes(resp.content, name, skills_dir, result=result, source_label="cowhub")
if len(result.installed) == installed_before:
_register_installed_skill(name)
result.installed.append(name)
result.messages.append(f"Installed '{name}' from Skill Hub.")
return
click.echo(f"Error: Unexpected response from Skill Hub.", err=True)
sys.exit(1)
raise SkillInstallError("Unexpected response from Skill Hub.")
def _install_github(spec, subpath=None, skill_name=None, branch="main", source="github"):
"""Install a skill from a GitHub repo.
def _install_github(spec, result: InstallResult, subpath=None, skill_name=None, branch="main", source="github"):
"""Install skill(s) from a GitHub repo.
spec format: owner/repo or owner/repo#path
Strategy: zip download first (no API rate limit), Contents API as fallback.
"""
if "#" in spec and not subpath:
spec, subpath = spec.split("#", 1)
_validate_github_spec(spec)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
_validate_skill_name(skill_name)
_check_github_spec(spec)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
target_dir = os.path.join(skills_dir, skill_name)
owner, repo = spec.split("/", 1)
# For subpath installs, try GitHub Contents API first (avoids downloading entire repo)
if subpath:
click.echo(f"Downloading from GitHub: {spec}/{subpath} (branch: {branch})...")
try:
with tempfile.TemporaryDirectory() as tmp_dir:
api_dest = os.path.join(tmp_dir, skill_name)
os.makedirs(api_dest)
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(api_dest, target_dir)
_register_installed_skill(skill_name, source=source)
_print_install_success(skill_name, source)
return
except Exception:
click.echo("Contents API unavailable, falling back to zip download...")
result.messages.append(f"Downloading from GitHub: {spec} (branch: {branch})...")
# Fallback: download full repo zip
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
click.echo(f"Downloading from GitHub: {spec} (branch: {branch})...")
tmp_dir = None
repo_root = None
try:
tmp_dir, repo_root = _download_repo_zip(spec, branch)
except Exception:
result.messages.append("Zip download failed, falling back to Contents API...")
if repo_root:
try:
_install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source, result)
return
except SkillInstallError:
raise
except Exception as e:
result.messages.append(f"Error processing zip: {e}")
result.messages.append("Falling back to Contents API...")
finally:
if tmp_dir:
shutil.rmtree(tmp_dir, ignore_errors=True)
if not subpath:
raise SkillInstallError(
f"Zip download failed and batch install requires zip. "
f"Try again or specify a subpath: {spec}#skills/<name>"
)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1]
_check_skill_name(skill_name)
result.messages.append(f"Downloading via Contents API: {spec}/{subpath} ...")
target_dir = os.path.join(skills_dir, skill_name)
try:
with tempfile.TemporaryDirectory() as api_tmp:
api_dest = os.path.join(api_tmp, skill_name)
os.makedirs(api_dest)
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(api_dest, target_dir)
_register_installed_skill(skill_name, source=source)
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from GitHub.")
except Exception as e:
raise SkillInstallError(f"Contents API also failed: {e}")
def _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source, result: InstallResult):
"""Install skill(s) from an already-extracted repo root directory."""
if subpath:
source_dir = os.path.join(repo_root, subpath.strip("/"))
if not os.path.isdir(source_dir):
raise SkillInstallError(f"Path '{subpath}' not found in repository.")
if os.path.isfile(os.path.join(source_dir, "SKILL.md")):
if not skill_name:
fm = _parse_skill_frontmatter(
_read_file_text(os.path.join(source_dir, "SKILL.md"))
)
skill_name = fm.get("name") or subpath.rstrip("/").split("/")[-1]
_check_skill_name(skill_name)
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
_register_installed_skill(skill_name, source=source)
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from {source}.")
return
discovered = _scan_skills_in_dir(source_dir)
if discovered:
_batch_install_skills(discovered, spec, skills_dir, source, result)
return
raise SkillInstallError(f"No SKILL.md found in '{subpath}' or its subdirectories.")
else:
discovered = _scan_skills_in_repo(repo_root)
if not discovered:
if skill_name:
_check_skill_name(skill_name)
else:
skill_name = spec.split("/")[-1]
_check_skill_name(skill_name)
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(repo_root, target_dir)
_register_installed_skill(skill_name, source=source)
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from {source}.")
return
_batch_install_skills(discovered, spec, skills_dir, source, result)
def _install_gitlab(spec, result: InstallResult, subpath=None, branch="main"):
"""Install skill(s) from a GitLab repo via zip download."""
_check_github_spec(spec)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
result.messages.append(f"Downloading from GitLab: {spec} (branch: {branch})...")
try:
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
resp.raise_for_status()
tmp_dir, repo_root = _download_repo_zip(spec, branch, host="gitlab")
except Exception as e:
click.echo(f"Error: Failed to download from GitHub: {e}", err=True)
sys.exit(1)
raise SkillInstallError(f"Failed to download from GitLab: {e}")
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "repo.zip")
with open(zip_path, "wb") as f:
f.write(resp.content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
repo_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
repo_root = os.path.join(extract_dir, top_items[0])
if subpath:
source_dir = os.path.join(repo_root, subpath.strip("/"))
if not os.path.isdir(source_dir):
click.echo(f"Error: Path '{subpath}' not found in repository.", err=True)
sys.exit(1)
else:
source_dir = repo_root
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
_register_installed_skill(skill_name, source=source)
_print_install_success(skill_name, source)
try:
_install_from_repo_root(repo_root, spec, subpath, None, skills_dir, "gitlab", result)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def _install_zip_bytes(content, name, skills_dir):
"""Extract a zip archive into the skills directory."""
def _install_git_clone(git_url: str, result: InstallResult, display_name: str = ""):
"""Install skill(s) from any git URL via shallow clone."""
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
result.messages.append(f"Cloning {display_name or git_url} ...")
try:
tmp_dir, repo_root = _clone_repo(git_url)
except RuntimeError as e:
raise SkillInstallError(str(e))
try:
_install_from_repo_root(repo_root, display_name or git_url, None, None, skills_dir, "git", result)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def _install_zip_bytes(content, name, skills_dir, result: InstallResult = None, source_label: str = "zip"):
"""Extract a zip archive and install skill(s).
Supports three scenarios:
1. Root contains SKILL.md → single skill install
2. Contains multiple skill dirs (skills/, or immediate children with SKILL.md) → batch install
3. Fallback → treat the entire archive as a single skill named `name`
"""
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "package.zip")
with open(zip_path, "wb") as f:
@@ -651,14 +1074,35 @@ def _install_zip_bytes(content, name, skills_dir):
_safe_extractall(zf, extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
source = extract_dir
pkg_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
source = os.path.join(extract_dir, top_items[0])
pkg_root = os.path.join(extract_dir, top_items[0])
discovered = _scan_skills_in_repo(pkg_root) or _scan_skills_in_dir(pkg_root)
if discovered and len(discovered) > 1 and result is not None:
_batch_install_skills(discovered, name, skills_dir, source_label, result)
return
if discovered and len(discovered) == 1:
sname, sdir = discovered[0]
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64]
if not _SAFE_NAME_RE.match(safe_name):
safe_name = name
target = os.path.join(skills_dir, safe_name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(sdir, target)
_register_installed_skill(safe_name, source=source_label)
if result is not None:
result.installed.append(safe_name)
result.messages.append(f"Installed '{safe_name}' from {source_label}.")
return
target = os.path.join(skills_dir, name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(source, target)
shutil.copytree(pkg_root, target)

View File

@@ -599,239 +599,38 @@ class CowCliPlugin(Plugin):
return "请指定要安装的技能: /skill install <名称>"
try:
from cli.utils import get_skills_dir, SKILL_HUB_API
from cli.commands.skill import _parse_github_url, _download_github_dir
import requests
import shutil
import zipfile
import tempfile
from cli.commands.skill import install_skill
result = install_skill(name)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
if result.error:
return f"安装失败: {result.error}"
if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"):
import re as re_mod
dir_url = re_mod.sub(r'/SKILL\.md/?$', '', name)
gh = _parse_github_url(dir_url)
if gh:
owner, repo, branch, subpath = gh
spec = f"{owner}/{repo}"
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
return self._skill_install_github(
spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch
)
return self._skill_install_url(name, skills_dir)
if not result.installed:
return "\n".join(result.messages) if result.messages else "未找到可安装的技能"
parsed = _parse_github_url(name)
if parsed:
owner, repo, branch, subpath = parsed
spec = f"{owner}/{repo}"
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
return self._skill_install_github(
spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch
)
provider = None
if name.startswith("github:"):
name = name[7:]
elif name.startswith("clawhub:"):
name = name[8:]
provider = "clawhub"
body = {}
if provider:
body["provider"] = provider
resp = requests.post(
f"{SKILL_HUB_API}/skills/{name}/download",
json=body,
timeout=15,
)
resp.raise_for_status()
content_type = resp.headers.get("Content-Type", "")
if "application/json" in content_type:
data = resp.json()
source_type = data.get("source_type")
if source_type == "github" or "redirect" in data:
source_url = data.get("source_url", "")
parsed_url = _parse_github_url(source_url)
if parsed_url:
owner, repo, branch, subpath = parsed_url
return self._skill_install_github(
f"{owner}/{repo}", skills_dir, subpath=subpath,
skill_name=name, branch=branch
)
return self._skill_install_github(source_url, skills_dir, skill_name=name)
if source_type == "registry":
download_url = data.get("download_url")
if not download_url:
return f"此技能来自不支持的注册表,无法自动安装。"
from urllib.parse import urlparse
if urlparse(download_url).scheme != "https":
return "安装失败: 下载地址不安全 (非 HTTPS)"
provider = data.get("source_provider", "registry")
try:
dl_resp = requests.get(download_url, timeout=60, allow_redirects=True)
dl_resp.raise_for_status()
except Exception as e:
return f"{provider} 下载失败: {e}"
self._extract_zip(dl_resp.content, name, skills_dir)
self._register_skill(name, source=provider)
return self._format_install_success(name, provider)
elif "application/zip" in content_type:
self._extract_zip(resp.content, name, skills_dir)
self._register_skill(name, source="cowhub")
return self._format_install_success(name, "cowhub")
return "技能商店返回了未预期的响应格式"
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
return f"技能 '{name}' 未在技能商店中找到"
return f"安装失败: {e}"
return self._format_install_result(result)
except Exception as e:
return f"安装失败: {e}"
def _skill_install_url(self, url: str, skills_dir: str) -> str:
"""Install a skill from a direct SKILL.md URL."""
import requests
from cli.commands.skill import _parse_skill_frontmatter
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
except Exception as e:
return f"下载 SKILL.md 失败: {e}"
content = resp.text
fm = _parse_skill_frontmatter(content)
skill_name = fm.get("name")
if not skill_name:
return "SKILL.md 中未找到 name 字段,无法安装"
skill_name = skill_name.strip()
skill_dir = os.path.join(skills_dir, skill_name)
os.makedirs(skill_dir, exist_ok=True)
with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f:
f.write(content)
self._register_skill(skill_name, source="url")
return self._format_install_success(skill_name, "url")
def _skill_install_github(self, spec: str, skills_dir: str,
subpath: str = None, skill_name: str = None,
branch: str = "main") -> str:
import requests
import shutil
import zipfile
import tempfile
from cli.commands.skill import _download_github_dir
if "#" in spec and not subpath:
spec, subpath = spec.split("#", 1)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
owner, repo = spec.split("/", 1)
target_dir = os.path.join(skills_dir, skill_name)
# For subpath installs, try Contents API first
if subpath:
try:
with tempfile.TemporaryDirectory() as tmp_dir:
api_dest = os.path.join(tmp_dir, skill_name)
os.makedirs(api_dest)
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(api_dest, target_dir)
self._register_skill(skill_name, source="github")
return self._format_install_success(skill_name, "github")
except Exception:
pass # fall through to zip download
# Fallback: download full repo zip
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
try:
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
resp.raise_for_status()
except Exception as e:
return f"从 GitHub 下载失败: {e}"
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "repo.zip")
with open(zip_path, "wb") as f:
f.write(resp.content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
repo_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
repo_root = os.path.join(extract_dir, top_items[0])
if subpath:
source_dir = os.path.join(repo_root, subpath.strip("/"))
if not os.path.isdir(source_dir):
return f"路径 '{subpath}' 在仓库中不存在"
else:
source_dir = repo_root
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
self._register_skill(skill_name, source="github")
return self._format_install_success(skill_name, "github")
def _extract_zip(self, content: bytes, name: str, skills_dir: str):
import zipfile
import tempfile
import shutil
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "package.zip")
with open(zip_path, "wb") as f:
f.write(content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
source = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
source = os.path.join(extract_dir, top_items[0])
target = os.path.join(skills_dir, name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(source, target)
@staticmethod
def _register_skill(name: str, source: str = "cowhub"):
try:
from cli.commands.skill import _register_installed_skill
_register_installed_skill(name, source=source)
except Exception:
pass
@staticmethod
def _format_install_success(name: str, source: str) -> str:
def _format_install_result(result) -> str:
"""Format InstallResult into a chat-friendly message."""
from cli.commands.skill import _read_skill_description
from cli.utils import get_skills_dir
desc = _read_skill_description(os.path.join(get_skills_dir(), name))
lines = [f"{name}"]
if desc:
if len(desc) > 60:
desc = desc[:57] + ""
lines.append(f" {desc}")
lines.append(f" 来源: {source}")
skills_dir = get_skills_dir()
lines = []
for skill_name in result.installed:
desc = _read_skill_description(os.path.join(skills_dir, skill_name))
lines.append(f" {skill_name}")
if desc:
if len(desc) > 60:
desc = desc[:57] + ""
lines.append(f" {desc}")
if len(result.installed) > 1:
lines.append(f"\n共安装 {len(result.installed)} 个技能")
return "\n".join(lines)
def _skill_uninstall(self, name: str) -> str:

View File

@@ -9,6 +9,7 @@ agentmesh-sdk>=0.1.3
python-dotenv>=1.0.0
PyYAML>=6.0
croniter>=2.0.0
click>=8.0
qrcode
# wechatcom & wechatmp

45
run.sh
View File

@@ -584,13 +584,15 @@ start_project() {
echo -e " ${GREEN}cow restart${NC} Restart the service"
echo -e " ${GREEN}cow status${NC} Check status"
echo -e " ${GREEN}cow logs${NC} View logs"
echo -e " ${GREEN}cow update${NC} Update and restart"
echo -e " ${GREEN}cow install-browser${NC} Install browser tool"
else
echo -e " ${GREEN}./run.sh stop${NC} Stop the service"
echo -e " ${GREEN}./run.sh restart${NC} Restart the service"
echo -e " ${GREEN}./run.sh status${NC} Check status"
echo -e " ${GREEN}./run.sh logs${NC} View logs"
echo -e " ${GREEN}./run.sh update${NC} Update and restart"
fi
echo -e " ${GREEN}./run.sh update${NC} Update and restart"
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo ""
@@ -792,27 +794,43 @@ cmd_update() {
echo -e "${GREEN}${EMOJI_WRENCH} Updating CowAgent...${NC}"
cd "${BASE_DIR}"
# Stop service
if is_running; then
cmd_stop
fi
# Update code
# Pull latest code first (service still running)
local pull_ok=false
if [ -d .git ]; then
echo -e "${GREEN}🔄 Pulling latest code...${NC}"
git pull || {
echo -e "${YELLOW}⚠️ GitHub failed, trying Gitee...${NC}"
if git pull; then
pull_ok=true
else
echo -e "${YELLOW}⚠️ git pull failed, trying Gitee mirror...${NC}"
git remote set-url origin https://gitee.com/zhayujie/chatgpt-on-wechat.git
git pull
}
if git pull; then
pull_ok=true
else
echo -e "${RED}❌ Failed to pull code. Update aborted.${NC}"
exit 1
fi
fi
else
echo -e "${YELLOW}⚠️ Not a git repository, skipping code update${NC}"
fi
# Re-exec with the updated run.sh to pick up new logic
exec "$0" _post_update
}
# Post-update: called by cmd_update after git pull to run with new code
cmd_post_update() {
cd "${BASE_DIR}"
# Stop service
if is_running; then
cmd_stop
fi
# Reinstall dependencies
check_python_version
install_dependencies
# Restart service
cmd_start
}
@@ -882,7 +900,7 @@ require_project_dir() {
# Main function
main() {
case "$1" in
start|stop|restart|status|logs|config|update)
start|stop|restart|status|logs|config|update|_post_update)
require_project_dir
;;
esac
@@ -895,6 +913,7 @@ main() {
logs) cmd_logs ;;
config) cmd_config ;;
update) cmd_update ;;
_post_update) cmd_post_update ;;
help|--help|-h)
show_usage
;;