Compare commits

..

9 Commits

Author SHA1 Message Date
zhayujie
8ea2455766 feat(cli): add browser install cmd 2026-03-29 15:09:07 +08:00
zhayujie
3458621147 feat: add browser tool 2026-03-29 14:59:06 +08:00
zhayujie
079df5a47c feat: support batch skill install from zip and github 2026-03-29 14:38:11 +08:00
zhayujie
ddb07c65a1 feat: support github zip-first download, gitLab, git@ ssh, local path 2026-03-29 13:45:15 +08:00
zhayujie
9b21cd222b fix: update run.sh 2026-03-28 19:36:51 +08:00
zhayujie
90f736843f fix: add click dependencies 2026-03-28 19:35:15 +08:00
zhayujie
13c020eb61 fix(cli): cli output in wecom_bot 2026-03-28 19:26:59 +08:00
zhayujie
dbc06dbe95 fix: use new run.sh when updating 2026-03-28 19:16:41 +08:00
zhayujie
23d097bc1c Merge pull request #2726 from zhayujie/feat-cow-cli
feat: cow cli in terminal and chat
2026-03-28 19:01:56 +08:00
13 changed files with 1533 additions and 424 deletions

View File

@@ -87,25 +87,25 @@ FileSave = _optional_tools.get('FileSave')
Terminal = _optional_tools.get('Terminal') Terminal = _optional_tools.get('Terminal')
# Delayed import for BrowserTool # BrowserTool (requires playwright)
def _import_browser_tool(): def _import_browser_tool():
from common.log import logger
try: try:
from agent.tools.browser.browser_tool import BrowserTool from agent.tools.browser.browser_tool import BrowserTool
return BrowserTool return BrowserTool
except ImportError: except ImportError as e:
# Return a placeholder class that will prompt the user to install dependencies when instantiated logger.info(
class BrowserToolPlaceholder: f"[Tools] BrowserTool not loaded - missing dependency: {e}\n"
def __init__(self, *args, **kwargs): f" To enable browser tool, run:\n"
raise ImportError( f" pip install playwright\n"
"The 'browser-use' package is required to use BrowserTool. " f" playwright install chromium"
"Please install it with 'pip install browser-use>=0.1.40'." )
) return None
except Exception as e:
logger.error(f"[Tools] BrowserTool failed to load: {e}")
return None
return BrowserToolPlaceholder BrowserTool = _import_browser_tool()
# Dynamically set BrowserTool
# BrowserTool = _import_browser_tool()
# Export all tools (including optional ones that might be None) # Export all tools (including optional ones that might be None)
__all__ = [ __all__ = [
@@ -124,8 +124,7 @@ __all__ = [
'WebSearch', 'WebSearch',
'WebFetch', 'WebFetch',
'Vision', 'Vision',
# Optional tools (may be None if dependencies not available) 'BrowserTool',
# 'BrowserTool'
] ]
""" """

View File

@@ -0,0 +1,3 @@
from agent.tools.browser.browser_tool import BrowserTool
__all__ = ["BrowserTool"]

View File

@@ -0,0 +1,509 @@
"""
Browser service - Playwright wrapper managing browser lifecycle and page operations.
Lazily launches a Chromium instance on first use, reuses it across tool calls,
and cleans up on close(). Headless mode is auto-detected based on platform and
display availability.
"""
import os
import sys
import re
import uuid
from typing import Optional, Dict, Any, List
from common.log import logger
from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright
# ---------------------------------------------------------------------------
# Snapshot DOM helpers
# ---------------------------------------------------------------------------
# Tags that typically carry useful content for an agent
_INTERACTIVE_TAGS = {
"a", "button", "input", "textarea", "select", "option",
"label", "details", "summary",
}
_SEMANTIC_TAGS = {
"h1", "h2", "h3", "h4", "h5", "h6",
"p", "li", "td", "th", "caption", "figcaption", "blockquote", "pre", "code",
"nav", "main", "article", "section", "header", "footer", "form", "table",
"img", "video", "audio",
}
_KEEP_TAGS = _INTERACTIVE_TAGS | _SEMANTIC_TAGS
_SNAPSHOT_JS = """
() => {
const KEEP = new Set(%s);
const INTERACTIVE = new Set(%s);
const SKIP = new Set(["script","style","noscript","svg","path","meta","link","br","hr"]);
let refCounter = 0;
const refMap = {};
function visible(el) {
if (!(el instanceof HTMLElement)) return true;
const st = window.getComputedStyle(el);
if (st.display === "none" || st.visibility === "hidden") return false;
if (parseFloat(st.opacity) === 0) return false;
return true;
}
function walk(node) {
if (node.nodeType === Node.TEXT_NODE) {
const t = node.textContent.trim();
return t ? t : null;
}
if (node.nodeType !== Node.ELEMENT_NODE) return null;
const tag = node.tagName.toLowerCase();
if (SKIP.has(tag)) return null;
if (!visible(node)) return null;
const children = [];
for (const ch of node.childNodes) {
const r = walk(ch);
if (r !== null) {
if (typeof r === "string") children.push(r);
else children.push(r);
}
}
const keep = KEEP.has(tag);
if (!keep) {
// Unwrap: promote children
if (children.length === 0) return null;
if (children.length === 1) return children[0];
return children;
}
const obj = { tag };
if (INTERACTIVE.has(tag)) {
refCounter++;
obj.ref = refCounter;
refMap[refCounter] = node;
}
// Attributes
if (tag === "a" && node.href) obj.href = node.getAttribute("href");
if (tag === "img") {
obj.alt = node.alt || "";
obj.src = node.getAttribute("src") || "";
}
if (tag === "input" || tag === "textarea" || tag === "select") {
obj.type = node.type || "text";
obj.name = node.name || undefined;
obj.value = node.value || undefined;
obj.placeholder = node.placeholder || undefined;
if (node.disabled) obj.disabled = true;
if (tag === "input" && node.type === "checkbox") obj.checked = node.checked;
}
if (tag === "button") {
if (node.disabled) obj.disabled = true;
}
if (tag === "option") {
obj.value = node.value;
if (node.selected) obj.selected = true;
}
if (tag === "label" && node.htmlFor) obj.for = node.htmlFor;
// Role / aria-label
const role = node.getAttribute("role");
if (role) obj.role = role;
const ariaLabel = node.getAttribute("aria-label");
if (ariaLabel) obj.ariaLabel = ariaLabel;
// Children
if (children.length === 1 && typeof children[0] === "string") {
obj.text = children[0];
} else if (children.length > 0) {
obj.children = children;
}
return obj;
}
// Store refMap on window for later use by click/fill actions
const result = walk(document.body);
window.__cowRefMap = refMap;
return { tree: result, refCount: refCounter };
}
""" % (
str(list(_KEEP_TAGS)),
str(list(_INTERACTIVE_TAGS)),
)
def _should_use_headless() -> bool:
"""Decide headless mode: headless on Linux servers without display, headed elsewhere."""
if sys.platform in ("win32", "darwin"):
return False
# Linux: check for display
if os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY"):
return False
return True
def _flatten_tree(node, indent=0) -> List[str]:
"""Convert snapshot tree to compact text lines for LLM consumption."""
if node is None:
return []
if isinstance(node, str):
return [" " * indent + node]
if isinstance(node, list):
lines = []
for child in node:
lines.extend(_flatten_tree(child, indent))
return lines
if not isinstance(node, dict):
return []
tag = node.get("tag", "?")
ref = node.get("ref")
parts = [tag]
if ref:
parts[0] = f"[{ref}] {tag}"
# Inline attributes
for attr in ("type", "name", "href", "alt", "role", "ariaLabel", "placeholder", "value"):
val = node.get(attr)
if val:
# Truncate long values
s = str(val)
if len(s) > 80:
s = s[:77] + "..."
parts.append(f'{attr}="{s}"')
for flag in ("disabled", "checked", "selected"):
if node.get(flag):
parts.append(flag)
prefix = " " * indent
header = prefix + " ".join(parts)
text = node.get("text")
if text:
# Truncate long text
if len(text) > 120:
text = text[:117] + "..."
header += f": {text}"
lines = [header]
children = node.get("children", [])
for child in children:
lines.extend(_flatten_tree(child, indent + 2))
return lines
class BrowserService:
"""Manages a single Playwright browser instance with page operations."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self._config = config or {}
self._playwright: Optional[Playwright] = None
self._browser: Optional[Browser] = None
self._context: Optional[BrowserContext] = None
self._page: Optional[Page] = None
self._headless: Optional[bool] = None
self._screenshot_dir: Optional[str] = None
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def _ensure_browser(self):
"""Lazily launch browser on first use."""
if self._page and not self._page.is_closed():
return
if self._headless is None:
headless_cfg = self._config.get("headless")
self._headless = headless_cfg if headless_cfg is not None else _should_use_headless()
launch_args = ["--disable-dev-shm-usage"]
if self._headless:
launch_args.append("--no-sandbox")
extra_args = self._config.get("launch_args", [])
if extra_args:
launch_args.extend(extra_args)
viewport_w = self._config.get("viewport_width", 1280)
viewport_h = self._config.get("viewport_height", 720)
if not self._playwright:
self._playwright = sync_playwright().start()
logger.info(f"[Browser] Launching Chromium (headless={self._headless})")
self._browser = self._playwright.chromium.launch(
headless=self._headless,
args=launch_args,
)
self._context = self._browser.new_context(
viewport={"width": viewport_w, "height": viewport_h},
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
)
self._page = self._context.new_page()
logger.info("[Browser] Browser ready")
@property
def page(self) -> Page:
self._ensure_browser()
return self._page
def close(self):
"""Release all browser resources."""
try:
if self._context:
self._context.close()
except Exception as e:
logger.debug(f"[Browser] context close error: {e}")
try:
if self._browser:
self._browser.close()
except Exception as e:
logger.debug(f"[Browser] browser close error: {e}")
try:
if self._playwright:
self._playwright.stop()
except Exception as e:
logger.debug(f"[Browser] playwright stop error: {e}")
self._page = None
self._context = None
self._browser = None
self._playwright = None
logger.info("[Browser] Browser closed")
# ------------------------------------------------------------------
# Actions
# ------------------------------------------------------------------
def navigate(self, url: str, timeout: int = 30000) -> Dict[str, Any]:
"""Navigate to a URL and return page info."""
page = self.page
try:
resp = page.goto(url, wait_until="domcontentloaded", timeout=timeout)
status = resp.status if resp else None
except Exception as e:
return {"error": f"Navigation failed: {e}"}
return {
"url": page.url,
"title": page.title(),
"status": status,
}
def snapshot(self, selector: Optional[str] = None) -> str:
"""
Return a compact text representation of the page DOM for LLM consumption.
Interactive elements get numeric refs usable in click/fill actions.
"""
page = self.page
try:
target = selector or "body"
result = page.evaluate(_SNAPSHOT_JS)
except Exception as e:
return f"[Snapshot error: {e}]"
tree = result.get("tree")
ref_count = result.get("refCount", 0)
lines = _flatten_tree(tree)
header = f"Page: {page.title()} ({page.url})\nInteractive elements: {ref_count}\n---"
body = "\n".join(lines)
# Limit output size
max_chars = self._config.get("snapshot_max_chars", 30000)
if len(body) > max_chars:
body = body[:max_chars] + "\n... [snapshot truncated]"
return f"{header}\n{body}"
def screenshot(self, full_page: bool = False, cwd: str = "") -> str:
"""Take a screenshot and save to workspace/tmp. Returns file path."""
page = self.page
save_dir = self._get_screenshot_dir(cwd)
filename = f"screenshot_{uuid.uuid4().hex[:8]}.png"
filepath = os.path.join(save_dir, filename)
page.screenshot(path=filepath, full_page=full_page)
logger.info(f"[Browser] Screenshot saved: {filepath}")
return filepath
def click(self, ref: Optional[int] = None, selector: Optional[str] = None,
timeout: int = 5000) -> Dict[str, Any]:
"""Click an element by snapshot ref or CSS selector."""
page = self.page
try:
if ref is not None:
result = page.evaluate(f"""
() => {{
const el = window.__cowRefMap && window.__cowRefMap[{ref}];
if (!el) return {{ error: "ref {ref} not found. Run snapshot first." }};
el.click();
return {{ clicked: true, tag: el.tagName.toLowerCase() }};
}}
""")
if result.get("error"):
return result
page.wait_for_timeout(500)
return result
elif selector:
page.click(selector, timeout=timeout)
return {"clicked": True, "selector": selector}
else:
return {"error": "Provide either ref (from snapshot) or selector"}
except Exception as e:
return {"error": f"Click failed: {e}"}
def fill(self, text: str, ref: Optional[int] = None,
selector: Optional[str] = None, timeout: int = 5000) -> Dict[str, Any]:
"""Fill text into an input/textarea by snapshot ref or CSS selector."""
page = self.page
try:
if ref is not None:
result = page.evaluate(f"""
() => {{
const el = window.__cowRefMap && window.__cowRefMap[{ref}];
if (!el) return {{ error: "ref {ref} not found. Run snapshot first." }};
el.focus();
el.value = "";
return {{ tag: el.tagName.toLowerCase(), name: el.name || "" }};
}}
""")
if result.get("error"):
return result
page.keyboard.type(text)
return {"filled": True, "ref": ref, "text": text}
elif selector:
page.fill(selector, text, timeout=timeout)
return {"filled": True, "selector": selector, "text": text}
else:
return {"error": "Provide either ref (from snapshot) or selector"}
except Exception as e:
return {"error": f"Fill failed: {e}"}
def select(self, value: str, ref: Optional[int] = None,
selector: Optional[str] = None, timeout: int = 5000) -> Dict[str, Any]:
"""Select an option in a <select> element."""
page = self.page
try:
if ref is not None:
result = page.evaluate(f"""
() => {{
const el = window.__cowRefMap && window.__cowRefMap[{ref}];
if (!el || el.tagName.toLowerCase() !== "select")
return {{ error: "ref {ref} is not a <select> element" }};
el.value = {repr(value)};
el.dispatchEvent(new Event("change", {{ bubbles: true }}));
return {{ selected: true, value: el.value }};
}}
""")
return result
elif selector:
page.select_option(selector, value, timeout=timeout)
return {"selected": True, "selector": selector, "value": value}
else:
return {"error": "Provide either ref (from snapshot) or selector"}
except Exception as e:
return {"error": f"Select failed: {e}"}
def scroll(self, direction: str = "down", amount: int = 500) -> Dict[str, Any]:
"""Scroll the page."""
page = self.page
delta_map = {
"down": (0, amount),
"up": (0, -amount),
"right": (amount, 0),
"left": (-amount, 0),
}
dx, dy = delta_map.get(direction, (0, amount))
try:
page.mouse.wheel(dx, dy)
page.wait_for_timeout(300)
scroll_info = page.evaluate("""
() => ({
scrollX: window.scrollX,
scrollY: window.scrollY,
scrollHeight: document.documentElement.scrollHeight,
clientHeight: document.documentElement.clientHeight
})
""")
return {"scrolled": direction, "amount": amount, **scroll_info}
except Exception as e:
return {"error": f"Scroll failed: {e}"}
def wait(self, selector: Optional[str] = None, timeout: int = 5000,
state: str = "visible") -> Dict[str, Any]:
"""Wait for a selector to appear or a fixed timeout."""
page = self.page
try:
if selector:
page.wait_for_selector(selector, timeout=timeout, state=state)
return {"waited": True, "selector": selector, "state": state}
else:
page.wait_for_timeout(timeout)
return {"waited": True, "timeout_ms": timeout}
except Exception as e:
return {"error": f"Wait failed: {e}"}
def go_back(self) -> Dict[str, Any]:
page = self.page
try:
page.go_back(wait_until="domcontentloaded", timeout=10000)
return {"url": page.url, "title": page.title()}
except Exception as e:
return {"error": f"Go back failed: {e}"}
def go_forward(self) -> Dict[str, Any]:
page = self.page
try:
page.go_forward(wait_until="domcontentloaded", timeout=10000)
return {"url": page.url, "title": page.title()}
except Exception as e:
return {"error": f"Go forward failed: {e}"}
def get_text(self, selector: str) -> Dict[str, Any]:
"""Get text content of an element."""
page = self.page
try:
text = page.text_content(selector, timeout=5000)
return {"text": text or ""}
except Exception as e:
return {"error": f"Get text failed: {e}"}
def evaluate(self, script: str) -> Dict[str, Any]:
"""Execute JavaScript in the page context."""
page = self.page
try:
result = page.evaluate(script)
return {"result": result}
except Exception as e:
return {"error": f"Evaluate failed: {e}"}
def press(self, key: str) -> Dict[str, Any]:
"""Press a keyboard key (e.g. Enter, Tab, Escape)."""
page = self.page
try:
page.keyboard.press(key)
page.wait_for_timeout(300)
return {"pressed": key}
except Exception as e:
return {"error": f"Press failed: {e}"}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _get_screenshot_dir(self, cwd: str = "") -> str:
if self._screenshot_dir and os.path.isdir(self._screenshot_dir):
return self._screenshot_dir
base = cwd or os.getcwd()
d = os.path.join(base, "tmp")
os.makedirs(d, exist_ok=True)
self._screenshot_dir = d
return d

View File

@@ -0,0 +1,287 @@
"""
Browser tool - Control a Chromium browser for web navigation and interaction.
Uses Playwright under the hood. Browser instance is lazily started on first
use, reused across tool calls within the same session, and cleaned up via
close().
"""
import json
import os
from typing import Dict, Any, Optional
from agent.tools.base_tool import BaseTool, ToolResult
from agent.tools.browser.browser_service import BrowserService
from common.log import logger
class BrowserTool(BaseTool):
"""Single tool exposing all browser actions via an 'action' parameter."""
name: str = "browser"
description: str = (
"Control a browser to navigate web pages, interact with elements, and extract content. "
"Actions: navigate, snapshot, click, fill, select, scroll, screenshot, wait, back, forward, "
"get_text, press, evaluate.\n\n"
"Workflow: navigate to a URL → snapshot to see the page (elements get numeric refs) → "
"use refs in click/fill/select actions → snapshot again to verify.\n\n"
"Use snapshot (not screenshot) as the primary way to read page content."
)
params: dict = {
"type": "object",
"properties": {
"action": {
"type": "string",
"description": (
"The browser action to perform. One of: "
"navigate, snapshot, click, fill, select, scroll, "
"screenshot, wait, back, forward, get_text, press, evaluate"
),
"enum": [
"navigate", "snapshot", "click", "fill", "select", "scroll",
"screenshot", "wait", "back", "forward", "get_text", "press",
"evaluate"
]
},
"url": {
"type": "string",
"description": "URL to navigate to (for 'navigate' action)"
},
"ref": {
"type": "integer",
"description": "Element ref number from snapshot (for click/fill/select)"
},
"selector": {
"type": "string",
"description": "CSS selector as fallback when ref is unavailable (for click/fill/select/wait/get_text)"
},
"text": {
"type": "string",
"description": "Text to type (for 'fill' action)"
},
"value": {
"type": "string",
"description": "Option value (for 'select' action)"
},
"key": {
"type": "string",
"description": "Key to press, e.g. Enter, Tab, Escape (for 'press' action)"
},
"direction": {
"type": "string",
"description": "Scroll direction: up, down, left, right (for 'scroll' action, default: down)"
},
"script": {
"type": "string",
"description": "JavaScript code to execute (for 'evaluate' action)"
},
"full_page": {
"type": "boolean",
"description": "Capture full page screenshot (for 'screenshot' action, default: false)"
},
"timeout": {
"type": "integer",
"description": "Timeout in milliseconds (optional, default varies by action)"
}
},
"required": ["action"]
}
_shared_service: Optional[BrowserService] = None
def __init__(self, config: dict = None):
self.config = config or {}
self.cwd = self.config.get("cwd", os.getcwd())
self._service: Optional[BrowserService] = None
def _get_service(self) -> BrowserService:
"""Get or create the browser service, sharing across copies."""
if self._service is not None:
return self._service
# Reuse shared service across tool copies within the same session
if BrowserTool._shared_service is not None:
self._service = BrowserTool._shared_service
return self._service
self._service = BrowserService(self.config)
BrowserTool._shared_service = self._service
return self._service
def execute(self, args: Dict[str, Any]) -> ToolResult:
action = args.get("action", "").strip().lower()
if not action:
return ToolResult.fail("Error: 'action' parameter is required")
handler = self._ACTION_MAP.get(action)
if not handler:
valid = ", ".join(sorted(self._ACTION_MAP.keys()))
return ToolResult.fail(f"Unknown action '{action}'. Valid actions: {valid}")
try:
return handler(self, args)
except Exception as e:
logger.error(f"[Browser] Action '{action}' error: {e}")
return ToolResult.fail(f"Browser error ({action}): {e}")
# ------------------------------------------------------------------
# Action handlers
# ------------------------------------------------------------------
def _do_navigate(self, args: Dict[str, Any]) -> ToolResult:
url = args.get("url", "").strip()
if not url:
return ToolResult.fail("Error: 'url' is required for navigate action")
if not url.startswith(("http://", "https://")):
url = "https://" + url
timeout = args.get("timeout", 30000)
result = self._get_service().navigate(url, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(
f"Navigated to: {result['url']}\nTitle: {result['title']}\nStatus: {result['status']}\n\n"
f"Use action 'snapshot' to see the page content."
)
def _do_snapshot(self, args: Dict[str, Any]) -> ToolResult:
selector = args.get("selector")
text = self._get_service().snapshot(selector=selector)
return ToolResult.success(text)
def _do_click(self, args: Dict[str, Any]) -> ToolResult:
ref = args.get("ref")
selector = args.get("selector")
timeout = args.get("timeout", 5000)
result = self._get_service().click(ref=ref, selector=selector, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Clicked successfully. Use 'snapshot' to see updated page.")
def _do_fill(self, args: Dict[str, Any]) -> ToolResult:
text = args.get("text", "")
ref = args.get("ref")
selector = args.get("selector")
timeout = args.get("timeout", 5000)
if not text and text != "":
return ToolResult.fail("Error: 'text' is required for fill action")
result = self._get_service().fill(text, ref=ref, selector=selector, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Filled text into element. Use 'snapshot' to verify.")
def _do_select(self, args: Dict[str, Any]) -> ToolResult:
value = args.get("value", "")
ref = args.get("ref")
selector = args.get("selector")
timeout = args.get("timeout", 5000)
if not value:
return ToolResult.fail("Error: 'value' is required for select action")
result = self._get_service().select(value, ref=ref, selector=selector, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Selected option '{value}'.")
def _do_scroll(self, args: Dict[str, Any]) -> ToolResult:
direction = args.get("direction", "down")
amount = args.get("timeout", 500) # reuse timeout field or default
if "amount" in args:
amount = args["amount"]
result = self._get_service().scroll(direction=direction, amount=amount)
if "error" in result:
return ToolResult.fail(result["error"])
pos = f"scrollY={result.get('scrollY', '?')}/{result.get('scrollHeight', '?')}"
return ToolResult.success(f"Scrolled {direction}. Position: {pos}")
def _do_screenshot(self, args: Dict[str, Any]) -> ToolResult:
full_page = args.get("full_page", False)
filepath = self._get_service().screenshot(full_page=full_page, cwd=self.cwd)
return ToolResult.success(f"Screenshot saved to: {filepath}")
def _do_wait(self, args: Dict[str, Any]) -> ToolResult:
selector = args.get("selector")
timeout = args.get("timeout", 5000)
result = self._get_service().wait(selector=selector, timeout=timeout)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Wait completed.")
def _do_back(self, args: Dict[str, Any]) -> ToolResult:
result = self._get_service().go_back()
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Navigated back to: {result['url']}")
def _do_forward(self, args: Dict[str, Any]) -> ToolResult:
result = self._get_service().go_forward()
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Navigated forward to: {result['url']}")
def _do_get_text(self, args: Dict[str, Any]) -> ToolResult:
selector = args.get("selector", "").strip()
if not selector:
return ToolResult.fail("Error: 'selector' is required for get_text action")
result = self._get_service().get_text(selector)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(result["text"])
def _do_press(self, args: Dict[str, Any]) -> ToolResult:
key = args.get("key", "").strip()
if not key:
return ToolResult.fail("Error: 'key' is required for press action")
result = self._get_service().press(key)
if "error" in result:
return ToolResult.fail(result["error"])
return ToolResult.success(f"Pressed key: {key}")
def _do_evaluate(self, args: Dict[str, Any]) -> ToolResult:
script = args.get("script", "").strip()
if not script:
return ToolResult.fail("Error: 'script' is required for evaluate action")
result = self._get_service().evaluate(script)
if "error" in result:
return ToolResult.fail(result["error"])
val = result.get("result")
if isinstance(val, (dict, list)):
return ToolResult.success(json.dumps(val, ensure_ascii=False, indent=2))
return ToolResult.success(str(val) if val is not None else "(no return value)")
# Action dispatch table
_ACTION_MAP = {
"navigate": _do_navigate,
"snapshot": _do_snapshot,
"click": _do_click,
"fill": _do_fill,
"select": _do_select,
"scroll": _do_scroll,
"screenshot": _do_screenshot,
"wait": _do_wait,
"back": _do_back,
"forward": _do_forward,
"get_text": _do_get_text,
"press": _do_press,
"evaluate": _do_evaluate,
}
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def copy(self):
"""Share browser instance across tool copies (avoids re-launching)."""
new_tool = BrowserTool(self.config)
new_tool.model = self.model
new_tool.context = getattr(self, "context", None)
new_tool.cwd = self.cwd
new_tool._service = self._service
return new_tool
def close(self):
"""Release browser resources."""
if self._service:
self._service.close()
self._service = None
BrowserTool._shared_service = None
logger.info("[Browser] BrowserTool closed")

View File

@@ -1,18 +0,0 @@
def copy(self):
"""
Special copy method for browser tool to avoid recreating browser instance.
:return: A new instance with shared browser reference but unique model
"""
new_tool = self.__class__()
# Copy essential attributes
new_tool.model = self.model
new_tool.context = getattr(self, 'context', None)
new_tool.config = getattr(self, 'config', None)
# Share the browser instance instead of creating a new one
if hasattr(self, 'browser'):
new_tool.browser = self.browser
return new_tool

View File

@@ -84,11 +84,11 @@ class ToolManager:
except ImportError as e: except ImportError as e:
# Handle missing dependencies with helpful messages # Handle missing dependencies with helpful messages
error_msg = str(e) error_msg = str(e)
if "browser-use" in error_msg or "browser_use" in error_msg: if "playwright" in error_msg:
logger.warning( logger.warning(
f"[ToolManager] Browser tool not loaded - missing dependencies.\n" f"[ToolManager] Browser tool not loaded - missing dependencies.\n"
f" To enable browser tool, run:\n" f" To enable browser tool, run:\n"
f" pip install browser-use markdownify playwright\n" f" pip install playwright\n"
f" playwright install chromium" f" playwright install chromium"
) )
elif "markdownify" in error_msg: elif "markdownify" in error_msg:
@@ -154,11 +154,11 @@ class ToolManager:
except ImportError as e: except ImportError as e:
# Handle missing dependencies with helpful messages # Handle missing dependencies with helpful messages
error_msg = str(e) error_msg = str(e)
if "browser-use" in error_msg or "browser_use" in error_msg: if "playwright" in error_msg:
logger.warning( logger.warning(
f"[ToolManager] Browser tool not loaded - missing dependencies.\n" f"[ToolManager] Browser tool not loaded - missing dependencies.\n"
f" To enable browser tool, run:\n" f" To enable browser tool, run:\n"
f" pip install browser-use markdownify playwright\n" f" pip install playwright\n"
f" playwright install chromium" f" playwright install chromium"
) )
elif "markdownify" in error_msg: elif "markdownify" in error_msg:
@@ -197,7 +197,7 @@ class ToolManager:
logger.warning( logger.warning(
f"[ToolManager] Browser tool is configured but not loaded.\n" f"[ToolManager] Browser tool is configured but not loaded.\n"
f" To enable browser tool, run:\n" f" To enable browser tool, run:\n"
f" pip install browser-use markdownify playwright\n" f" pip install playwright\n"
f" playwright install chromium" f" playwright install chromium"
) )
elif tool_name == "google_search": elif tool_name == "google_search":

View File

@@ -452,7 +452,7 @@ class WecomBotChannel(ChatChannel):
if req_id: if req_id:
state = self._stream_states.pop(req_id, None) state = self._stream_states.pop(req_id, None)
if state: if state:
final_content = state["committed"] final_content = state["committed"] or content
stream_id = state["stream_id"] stream_id = state["stream_id"]
else: else:
final_content = content final_content = content

View File

@@ -5,6 +5,7 @@ from cli import __version__
from cli.commands.skill import skill from cli.commands.skill import skill
from cli.commands.process import start, stop, restart, update, status, logs from cli.commands.process import start, stop, restart, update, status, logs
from cli.commands.context import context from cli.commands.context import context
from cli.commands.install import install_browser
HELP_TEXT = """Usage: cow COMMAND [ARGS]... HELP_TEXT = """Usage: cow COMMAND [ARGS]...
@@ -21,6 +22,7 @@ Commands:
status Show CowAgent running status. status Show CowAgent running status.
logs View CowAgent logs. logs View CowAgent logs.
skill Manage CowAgent skills. skill Manage CowAgent skills.
install-browser Install browser tool (Playwright + Chromium).
Tip: You can also send /help, /skill list, etc. in agent chat.""" Tip: You can also send /help, /skill list, etc. in agent chat."""
@@ -67,6 +69,7 @@ main.add_command(update)
main.add_command(status) main.add_command(status)
main.add_command(logs) main.add_command(logs)
main.add_command(context) main.add_command(context)
main.add_command(install_browser)
if __name__ == '__main__': if __name__ == '__main__':

63
cli/commands/install.py Normal file
View File

@@ -0,0 +1,63 @@
"""cow install-browser - Install Playwright + Chromium for the browser tool."""
import os
import sys
import subprocess
import click
def _has_display() -> bool:
"""Check if a graphical display is available (Linux only)."""
return bool(os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY"))
def _is_headless_linux() -> bool:
"""True when running on a Linux server without a display."""
return sys.platform == "linux" and not _has_display()
@click.command("install-browser")
def install_browser():
"""Install browser tool dependencies (Playwright + Chromium)."""
python = sys.executable
# Step 1: Install playwright package
click.echo(click.style("[1/3] Installing playwright Python package...", fg="yellow"))
ret = subprocess.call([python, "-m", "pip", "install", "playwright"])
if ret != 0:
click.echo(click.style("Failed to install playwright package.", fg="red"))
raise SystemExit(1)
click.echo(click.style("playwright package installed.", fg="green"))
click.echo()
# Step 2: System dependencies (Linux only)
if sys.platform == "linux":
click.echo(click.style("[2/3] Installing system dependencies (Linux)...", fg="yellow"))
ret = subprocess.call([python, "-m", "playwright", "install-deps", "chromium"])
if ret != 0:
click.echo(click.style(
"Could not auto-install system deps (may need sudo).\n"
f" Run manually: sudo {python} -m playwright install-deps chromium",
fg="yellow",
))
else:
click.echo(click.style(f"[2/3] Skipping system deps (not needed on {sys.platform}).", fg="yellow"))
click.echo()
# Step 3: Install Chromium (headless shell on Linux servers, full elsewhere)
click.echo(click.style("[3/3] Installing Chromium browser...", fg="yellow"))
cmd = [python, "-m", "playwright", "install", "chromium"]
if _is_headless_linux():
cmd.append("--only-shell")
click.echo(" (headless-only mode for Linux server)")
elif sys.platform == "linux":
click.echo(" (full browser for Linux desktop)")
ret = subprocess.call(cmd)
if ret != 0:
click.echo(click.style("Failed to install Chromium.", fg="red"))
raise SystemExit(1)
click.echo()
click.echo(click.style("Browser tool ready! Restart CowAgent to enable it.", fg="green"))

View File

@@ -8,6 +8,8 @@ import hashlib
import shutil import shutil
import zipfile import zipfile
import tempfile import tempfile
from dataclasses import dataclass, field
from typing import Optional, List
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -22,10 +24,34 @@ from cli.utils import (
SKILL_HUB_API, SKILL_HUB_API,
) )
# ======================================================================
# Public types for the core install API (used by CLI and chat plugin)
# ======================================================================
class SkillInstallError(Exception):
"""Raised when skill installation fails."""
pass
@dataclass
class InstallResult:
"""Result of a skill installation operation."""
installed: List[str] = field(default_factory=list)
messages: List[str] = field(default_factory=list)
error: Optional[str] = None
_SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$") _SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$")
_GITHUB_URL_RE = re.compile( _GITHUB_URL_RE = re.compile(
r"^https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(?:tree|blob)/([^/]+)(?:/(.+))?)?/?$" r"^https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(?:tree|blob)/([^/]+)(?:/(.+))?)?/?$"
) )
_GITLAB_URL_RE = re.compile(
r"^https?://gitlab\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/-/tree/([^/]+)(?:/(.+))?)?/?$"
)
_GIT_SSH_RE = re.compile(
r"^git@([^:]+):([^/]+)/([^/]+?)(?:\.git)?$"
)
def _parse_github_url(url: str): def _parse_github_url(url: str):
@@ -45,11 +71,92 @@ def _parse_github_url(url: str):
return owner, repo, branch or "main", subpath return owner, repo, branch or "main", subpath
def _parse_gitlab_url(url: str):
"""Parse a GitLab URL into (owner, repo, branch, subpath).
Returns None if the URL doesn't match.
Supported formats:
https://gitlab.com/owner/repo
https://gitlab.com/owner/repo/-/tree/branch
https://gitlab.com/owner/repo/-/tree/branch/path/to/skill
"""
m = _GITLAB_URL_RE.match(url.strip())
if not m:
return None
owner, repo, branch, subpath = m.groups()
return owner, repo, branch or "main", subpath
def _parse_git_ssh_url(url: str):
"""Parse a git@ SSH URL into (host, owner, repo).
Returns None if the URL doesn't match.
Supported format: git@github.com:owner/repo.git
"""
m = _GIT_SSH_RE.match(url.strip())
if not m:
return None
host, owner, repo = m.groups()
return host, owner, repo
def _clone_repo(git_url: str):
"""Shallow-clone a git repo and return (tmp_dir, repo_root).
Requires git to be installed. The caller must clean up tmp_dir.
"""
tmp_dir = tempfile.mkdtemp(prefix="cow-skill-")
repo_dir = os.path.join(tmp_dir, "repo")
try:
import subprocess
subprocess.run(
["git", "clone", "--depth", "1", git_url, repo_dir],
check=True, capture_output=True, timeout=120,
)
except FileNotFoundError:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError("git is not installed")
except Exception as e:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError(f"git clone failed: {e}")
return tmp_dir, repo_dir
def _download_repo_zip(spec: str, branch: str = "main", host: str = "github"):
"""Download a GitHub/GitLab repo as zip and extract it.
Returns (tmp_dir, repo_root) where tmp_dir is the temp directory to clean up
and repo_root is the extracted repository root path.
"""
if host == "gitlab":
zip_url = f"https://gitlab.com/{spec}/-/archive/{branch}/{spec.split('/')[-1]}-{branch}.zip"
else:
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
resp = requests.get(zip_url, timeout=120, allow_redirects=True)
resp.raise_for_status()
tmp_dir = tempfile.mkdtemp(prefix="cow-skill-")
zip_path = os.path.join(tmp_dir, "repo.zip")
with open(zip_path, "wb") as f:
f.write(resp.content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
# GitHub zips have a single top-level dir like "repo-main/"
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
return tmp_dir, os.path.join(extract_dir, top_items[0])
return tmp_dir, extract_dir
def _download_github_dir(owner, repo, branch, subpath, dest_dir): def _download_github_dir(owner, repo, branch, subpath, dest_dir):
"""Download a subdirectory from GitHub using the Contents API. """Download a subdirectory from GitHub using the Contents API.
Recursively fetches all files under the given subpath and writes them Recursively fetches all files under the given subpath and writes them
to dest_dir. Raises on any network or API error. to dest_dir. Used as a fallback when zip download fails.
Costs one API request per directory (60/hr unauthenticated).
""" """
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{subpath}?ref={branch}" api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{subpath}?ref={branch}"
resp = requests.get(api_url, timeout=30, headers={"Accept": "application/vnd.github.v3+json"}) resp = requests.get(api_url, timeout=30, headers={"Accept": "application/vnd.github.v3+json"})
@@ -80,6 +187,139 @@ def _download_github_dir(owner, repo, branch, subpath, dest_dir):
_download_github_dir(owner, repo, branch, child_subpath, dest_dir) _download_github_dir(owner, repo, branch, child_subpath, dest_dir)
# Directories to search for skills following the Agent Skills convention
_SKILL_SCAN_DIRS = [
"skills",
"skills/.curated",
"skills/.experimental",
]
_SKILL_SCAN_SKIP = {
"node_modules", "__pycache__", ".git", ".github", "venv", ".venv",
}
def _scan_skills_in_repo(repo_root: str) -> list:
"""Scan a repo for skill directories containing SKILL.md.
Searches in conventional locations (skills/, skills/.curated/, etc.)
and also checks the repo root itself.
Returns a list of (skill_name, skill_dir_path) tuples.
"""
found = []
# Check repo root for a SKILL.md (single-skill repo)
if os.path.isfile(os.path.join(repo_root, "SKILL.md")):
fm = _parse_skill_frontmatter(_read_file_text(os.path.join(repo_root, "SKILL.md")))
name = fm.get("name") or os.path.basename(repo_root)
found.append((name, repo_root))
return found
for scan_dir in _SKILL_SCAN_DIRS:
search_root = os.path.join(repo_root, scan_dir)
if not os.path.isdir(search_root):
continue
for entry in os.listdir(search_root):
if entry.startswith(".") and entry not in (".curated", ".experimental"):
continue
if entry in _SKILL_SCAN_SKIP:
continue
entry_path = os.path.join(search_root, entry)
if os.path.isdir(entry_path) and os.path.isfile(os.path.join(entry_path, "SKILL.md")):
fm = _parse_skill_frontmatter(
_read_file_text(os.path.join(entry_path, "SKILL.md"))
)
name = fm.get("name") or entry
found.append((name, entry_path))
return found
def _scan_skills_in_dir(directory: str) -> list:
"""Scan immediate subdirectories for SKILL.md files.
Unlike _scan_skills_in_repo which checks conventional locations,
this scans all direct children of the given directory.
Returns a list of (skill_name, skill_dir_path) tuples.
"""
found = []
if not os.path.isdir(directory):
return found
for entry in os.listdir(directory):
if entry.startswith(".") or entry in _SKILL_SCAN_SKIP:
continue
entry_path = os.path.join(directory, entry)
if os.path.isdir(entry_path) and os.path.isfile(os.path.join(entry_path, "SKILL.md")):
fm = _parse_skill_frontmatter(
_read_file_text(os.path.join(entry_path, "SKILL.md"))
)
name = fm.get("name") or entry
found.append((name, entry_path))
return found
def _batch_install_skills(discovered, spec, skills_dir, source, result: InstallResult):
"""Install a list of discovered skills into skills_dir."""
result.messages.append(f"Found {len(discovered)} skill(s) in {spec}:")
for sname, sdir in discovered:
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64]
if not _SAFE_NAME_RE.match(safe_name):
result.messages.append(f" Skipping '{sname}' (invalid name)")
continue
target_dir = os.path.join(skills_dir, safe_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(sdir, target_dir)
_register_installed_skill(safe_name, source=source)
result.installed.append(safe_name)
result.messages.append(f" + {safe_name}")
if result.installed:
result.messages.append(f"{len(result.installed)} skill(s) installed from {spec}.")
else:
result.messages.append("No valid skills found.")
def _read_file_text(path: str) -> str:
"""Read a file as UTF-8 text, returning empty string on failure."""
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
return ""
def _install_local(path: str, result: InstallResult):
"""Install skill(s) from a local directory."""
path = os.path.abspath(os.path.expanduser(path))
if not os.path.isdir(path):
raise SkillInstallError(f"'{path}' is not a directory.")
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
if os.path.isfile(os.path.join(path, "SKILL.md")):
fm = _parse_skill_frontmatter(_read_file_text(os.path.join(path, "SKILL.md")))
skill_name = fm.get("name") or os.path.basename(path)
skill_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', skill_name)[:64]
_check_skill_name(skill_name)
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(path, target_dir)
_register_installed_skill(skill_name, source="local")
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from local path.")
return
discovered = _scan_skills_in_repo(path) or _scan_skills_in_dir(path)
if not discovered:
raise SkillInstallError(f"No skills found in '{path}'.")
_batch_install_skills(discovered, path, skills_dir, "local", result)
def _register_installed_skill(name: str, source: str = "cowhub"): def _register_installed_skill(name: str, source: str = "cowhub"):
"""Register a newly installed skill into skills_config.json. """Register a newly installed skill into skills_config.json.
@@ -145,39 +385,38 @@ def _read_skill_description(skill_dir: str) -> str:
return "" return ""
def _install_url(url: str): def _install_url(url: str, result: InstallResult):
"""Install a skill from a direct SKILL.md URL.""" """Install a skill from a direct SKILL.md URL."""
click.echo(f"Downloading SKILL.md from {url} ...") result.messages.append(f"Downloading SKILL.md from {url} ...")
try: try:
resp = requests.get(url, timeout=30) resp = requests.get(url, timeout=30)
resp.raise_for_status() resp.raise_for_status()
except Exception as e: except Exception as e:
click.echo(f"Error: Failed to download SKILL.md: {e}", err=True) raise SkillInstallError(f"Failed to download SKILL.md: {e}")
sys.exit(1)
content = resp.text content = resp.text
fm = _parse_skill_frontmatter(content) fm = _parse_skill_frontmatter(content)
skill_name = fm.get("name") skill_name = fm.get("name")
if not skill_name: if not skill_name:
click.echo("Error: SKILL.md missing 'name' field in frontmatter.", err=True) raise SkillInstallError("SKILL.md missing 'name' field in frontmatter.")
sys.exit(1)
skill_name = skill_name.strip() skill_name = skill_name.strip()
_validate_skill_name(skill_name) _check_skill_name(skill_name)
skills_dir = get_skills_dir() skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True) os.makedirs(skills_dir, exist_ok=True)
skill_dir = os.path.join(skills_dir, skill_name) skill_dir = os.path.join(skills_dir, skill_name)
if os.path.isdir(skill_dir): if os.path.isdir(skill_dir):
click.echo(f"Skill '{skill_name}' already exists. Overwriting SKILL.md ...") result.messages.append(f"Skill '{skill_name}' already exists. Overwriting SKILL.md ...")
os.makedirs(skill_dir, exist_ok=True) os.makedirs(skill_dir, exist_ok=True)
with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f: with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f:
f.write(content) f.write(content)
_register_installed_skill(skill_name, source="url") _register_installed_skill(skill_name, source="url")
_print_install_success(skill_name, "url") result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from URL.")
def _print_install_success(name: str, source: str): def _print_install_success(name: str, source: str):
@@ -210,6 +449,20 @@ def _validate_github_spec(spec: str):
sys.exit(1) sys.exit(1)
def _check_skill_name(name: str):
"""Raise SkillInstallError if name is invalid."""
if not _SAFE_NAME_RE.match(name):
raise SkillInstallError(
f"Invalid skill name '{name}'. Use only letters, digits, hyphens, and underscores."
)
def _check_github_spec(spec: str):
"""Raise SkillInstallError if spec is not owner/repo."""
if not re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+$", spec):
raise SkillInstallError(f"Invalid GitHub spec '{spec}'. Expected format: owner/repo")
def _safe_extractall(zf: zipfile.ZipFile, dest: str): def _safe_extractall(zf: zipfile.ZipFile, dest: str):
"""Extract zip while guarding against Zip Slip (path traversal).""" """Extract zip while guarding against Zip Slip (path traversal)."""
dest = os.path.realpath(dest) dest = os.path.realpath(dest)
@@ -241,6 +494,18 @@ def _verify_checksum(content: bytes, expected: str):
return True return True
def _check_checksum(content: bytes, expected: str):
"""Raise SkillInstallError on SHA-256 checksum mismatch."""
if not expected:
return
actual = hashlib.sha256(content).hexdigest()
if actual != expected.lower():
raise SkillInstallError(
f"Checksum mismatch! Expected: {expected}, Actual: {actual}. "
"The downloaded package may have been tampered with."
)
@click.group() @click.group()
def skill(): def skill():
"""Manage CowAgent skills.""" """Manage CowAgent skills."""
@@ -412,63 +677,140 @@ def search(query):
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# cow skill install # Core install function — reusable from CLI and chat plugin
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def install(name):
"""Install a skill from Skill Hub, GitHub, or a SKILL.md URL.
Examples: def install_skill(name: str) -> InstallResult:
"""Core install logic, usable from CLI and chat plugin.
cow skill install pptx Accepts all formats: Skill Hub name, owner/repo, GitHub/GitLab URL,
git@ SSH, local path, SKILL.md URL.
cow skill install github:owner/repo Returns InstallResult with installed skill names and messages.
cow skill install github:owner/repo#path/to/skill
cow skill install https://github.com/owner/repo/tree/main/path/to/skill
cow skill install https://example.com/path/to/SKILL.md
""" """
result = InstallResult()
try:
_route_install(name, result)
except SkillInstallError as e:
result.error = str(e)
return result
def _route_install(name: str, result: InstallResult):
"""Dispatch to the appropriate installer based on input format."""
# --- Local path ---
if name.startswith(("./", "../", "/", "~/")):
_install_local(name, result)
return
# --- Direct SKILL.md URL ---
if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"): if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"):
# GitHub SKILL.md → strip filename and install the whole directory
dir_url = re.sub(r'/SKILL\.md/?$', '', name) dir_url = re.sub(r'/SKILL\.md/?$', '', name)
gh = _parse_github_url(dir_url) gh = _parse_github_url(dir_url)
if gh: if gh:
owner, repo, branch, subpath = gh owner, repo, branch, subpath = gh
spec = f"{owner}/{repo}" _install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=(
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo subpath.rstrip("/").split("/")[-1] if subpath else repo
_install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch) ), branch=branch)
return return
_install_url(name) _install_url(name, result)
return return
# --- Full GitHub URL ---
parsed = _parse_github_url(name) parsed = _parse_github_url(name)
if parsed: if parsed:
owner, repo, branch, subpath = parsed owner, repo, branch, subpath = parsed
spec = f"{owner}/{repo}" _install_github(f"{owner}/{repo}", result, subpath=subpath, branch=branch)
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo return
_install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
elif name.startswith("github:"): # --- Full GitLab URL ---
skill_name = name[7:] gl = _parse_gitlab_url(name)
_validate_skill_name(skill_name) if gl:
_install_hub(skill_name) owner, repo, branch, subpath = gl
elif name.startswith("clawhub:"): _install_gitlab(f"{owner}/{repo}", result, subpath=subpath, branch=branch)
return
# --- git@host:owner/repo.git SSH URL ---
ssh = _parse_git_ssh_url(name)
if ssh:
host, owner, repo = ssh
_install_git_clone(name, result, display_name=f"{owner}/{repo}")
return
# --- github: prefix ---
if name.startswith("github:"):
raw = name[7:]
subpath = None
if "#" in raw:
raw, subpath = raw.split("#", 1)
_check_github_spec(raw)
_install_github(raw, result, subpath=subpath)
return
# --- clawhub: prefix ---
if name.startswith("clawhub:"):
skill_name = name[8:] skill_name = name[8:]
_validate_skill_name(skill_name) _check_skill_name(skill_name)
_install_hub(skill_name, provider="clawhub") _install_hub(skill_name, result, provider="clawhub")
else: return
_validate_skill_name(name)
_install_hub(name) # --- owner/repo or owner/repo#subpath shorthand ---
if re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+(?:#.+)?$", name):
subpath = None
spec = name
if "#" in spec:
spec, subpath = spec.split("#", 1)
_install_github(spec, result, subpath=subpath)
return
# --- Fallback: Skill Hub by name ---
_check_skill_name(name)
_install_hub(name, result)
def _install_hub(name, provider=None): # ------------------------------------------------------------------
# cow skill install (CLI thin wrapper)
# ------------------------------------------------------------------
@skill.command()
@click.argument("name")
def install(name):
"""Install skill(s) from Skill Hub, GitHub, GitLab, git URL, or local path.
When given an owner/repo (or full URL), downloads the repo and
auto-discovers all skills/ subdirectories containing SKILL.md,
installing them in batch. Use a subpath to install a single skill.
Examples:
cow skill install pptx (from Skill Hub)
cow skill install larksuite/cli (GitHub shorthand, all skills)
cow skill install larksuite/cli#skills/lark-im (single skill by subpath)
cow skill install https://github.com/owner/repo
cow skill install https://gitlab.com/org/repo
cow skill install git@github.com:owner/repo.git
cow skill install ./my-local-skills (local directory)
cow skill install https://example.com/path/to/SKILL.md
"""
result = install_skill(name)
for msg in result.messages:
click.echo(msg)
if result.error:
click.echo(f"Error: {result.error}", err=True)
sys.exit(1)
def _install_hub(name, result: InstallResult, provider=None):
"""Install a skill from Skill Hub.""" """Install a skill from Skill Hub."""
skills_dir = get_skills_dir() skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True) os.makedirs(skills_dir, exist_ok=True)
click.echo(f"Fetching skill info for '{name}'...") result.messages.append(f"Fetching skill info for '{name}'...")
try: try:
body = {} body = {}
@@ -482,13 +824,12 @@ def _install_hub(name, provider=None):
resp.raise_for_status() resp.raise_for_status()
except requests.HTTPError as e: except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404: if e.response is not None and e.response.status_code == 404:
click.echo(f"Error: Skill '{name}' not found on Skill Hub.", err=True) raise SkillInstallError(f"Skill '{name}' not found on Skill Hub.")
else: raise SkillInstallError(f"Failed to fetch skill: {e}")
click.echo(f"Error: Failed to fetch skill: {e}", err=True) except SkillInstallError:
sys.exit(1) raise
except Exception as e: except Exception as e:
click.echo(f"Error: Failed to connect to Skill Hub: {e}", err=True) raise SkillInstallError(f"Failed to connect to Skill Hub: {e}")
sys.exit(1)
content_type = resp.headers.get("Content-Type", "") content_type = resp.headers.get("Content-Type", "")
@@ -501,12 +842,12 @@ def _install_hub(name, provider=None):
parsed_url = _parse_github_url(source_url) parsed_url = _parse_github_url(source_url)
if parsed_url: if parsed_url:
owner, repo, branch, subpath = parsed_url owner, repo, branch, subpath = parsed_url
click.echo(f"Source: GitHub ({source_url})") result.messages.append(f"Source: GitHub ({source_url})")
_install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch) _install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=name, branch=branch)
else: else:
_validate_github_spec(source_url) _check_github_spec(source_url)
click.echo(f"Source: GitHub ({source_url})") result.messages.append(f"Source: GitHub ({source_url})")
_install_github(source_url, skill_name=name) _install_github(source_url, result, skill_name=name)
return return
if source_type == "registry": if source_type == "registry":
@@ -514,25 +855,25 @@ def _install_hub(name, provider=None):
if download_url: if download_url:
parsed = urlparse(download_url) parsed = urlparse(download_url)
if parsed.scheme != "https": if parsed.scheme != "https":
click.echo(f"Error: Refusing to download from non-HTTPS URL.", err=True) raise SkillInstallError("Refusing to download from non-HTTPS URL.")
sys.exit(1) src_provider = data.get("source_provider", "registry")
provider = data.get("source_provider", "registry")
expected_checksum = data.get("checksum") or data.get("sha256") expected_checksum = data.get("checksum") or data.get("sha256")
click.echo(f"Source: {provider}") result.messages.append(f"Source: {src_provider}")
click.echo("Downloading skill package...") result.messages.append("Downloading skill package...")
try: try:
dl_resp = requests.get(download_url, timeout=60, allow_redirects=True) dl_resp = requests.get(download_url, timeout=60, allow_redirects=True)
dl_resp.raise_for_status() dl_resp.raise_for_status()
except Exception as e: except Exception as e:
click.echo(f"Error: Failed to download from {provider}: {e}", err=True) raise SkillInstallError(f"Failed to download from {src_provider}: {e}")
sys.exit(1) _check_checksum(dl_resp.content, expected_checksum)
_verify_checksum(dl_resp.content, expected_checksum) installed_before = len(result.installed)
_install_zip_bytes(dl_resp.content, name, skills_dir) _install_zip_bytes(dl_resp.content, name, skills_dir, result=result, source_label=src_provider)
_register_installed_skill(name, source=provider) if len(result.installed) == installed_before:
_print_install_success(name, provider) _register_installed_skill(name, source=src_provider)
result.installed.append(name)
result.messages.append(f"Installed '{name}' from {src_provider}.")
else: else:
click.echo(f"Error: Unsupported registry provider.", err=True) raise SkillInstallError("Unsupported registry provider.")
sys.exit(1)
return return
if "redirect" in data: if "redirect" in data:
@@ -540,107 +881,189 @@ def _install_hub(name, provider=None):
parsed_url = _parse_github_url(source_url) parsed_url = _parse_github_url(source_url)
if parsed_url: if parsed_url:
owner, repo, branch, subpath = parsed_url owner, repo, branch, subpath = parsed_url
click.echo(f"Source: GitHub ({source_url})") result.messages.append(f"Source: GitHub ({source_url})")
_install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch) _install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=name, branch=branch)
else: else:
_validate_github_spec(source_url) _check_github_spec(source_url)
click.echo(f"Source: GitHub ({source_url})") result.messages.append(f"Source: GitHub ({source_url})")
_install_github(source_url, skill_name=name) _install_github(source_url, result, skill_name=name)
return return
elif "application/zip" in content_type: elif "application/zip" in content_type:
click.echo("Downloading skill package...") result.messages.append("Downloading skill package...")
expected_checksum = resp.headers.get("X-Checksum-Sha256") expected_checksum = resp.headers.get("X-Checksum-Sha256")
_verify_checksum(resp.content, expected_checksum) _check_checksum(resp.content, expected_checksum)
_install_zip_bytes(resp.content, name, skills_dir) installed_before = len(result.installed)
_register_installed_skill(name) _install_zip_bytes(resp.content, name, skills_dir, result=result, source_label="cowhub")
_print_install_success(name, "cowhub") if len(result.installed) == installed_before:
_register_installed_skill(name)
result.installed.append(name)
result.messages.append(f"Installed '{name}' from Skill Hub.")
return return
click.echo(f"Error: Unexpected response from Skill Hub.", err=True) raise SkillInstallError("Unexpected response from Skill Hub.")
sys.exit(1)
def _install_github(spec, subpath=None, skill_name=None, branch="main", source="github"): def _install_github(spec, result: InstallResult, subpath=None, skill_name=None, branch="main", source="github"):
"""Install a skill from a GitHub repo. """Install skill(s) from a GitHub repo.
spec format: owner/repo or owner/repo#path Strategy: zip download first (no API rate limit), Contents API as fallback.
""" """
if "#" in spec and not subpath: if "#" in spec and not subpath:
spec, subpath = spec.split("#", 1) spec, subpath = spec.split("#", 1)
_validate_github_spec(spec) _check_github_spec(spec)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
_validate_skill_name(skill_name)
skills_dir = get_skills_dir() skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True) os.makedirs(skills_dir, exist_ok=True)
target_dir = os.path.join(skills_dir, skill_name)
owner, repo = spec.split("/", 1) owner, repo = spec.split("/", 1)
# For subpath installs, try GitHub Contents API first (avoids downloading entire repo) result.messages.append(f"Downloading from GitHub: {spec} (branch: {branch})...")
if subpath:
click.echo(f"Downloading from GitHub: {spec}/{subpath} (branch: {branch})...")
try:
with tempfile.TemporaryDirectory() as tmp_dir:
api_dest = os.path.join(tmp_dir, skill_name)
os.makedirs(api_dest)
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(api_dest, target_dir)
_register_installed_skill(skill_name, source=source)
_print_install_success(skill_name, source)
return
except Exception:
click.echo("Contents API unavailable, falling back to zip download...")
# Fallback: download full repo zip tmp_dir = None
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip" repo_root = None
click.echo(f"Downloading from GitHub: {spec} (branch: {branch})...") try:
tmp_dir, repo_root = _download_repo_zip(spec, branch)
except Exception:
result.messages.append("Zip download failed, falling back to Contents API...")
if repo_root:
try:
_install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source, result)
return
except SkillInstallError:
raise
except Exception as e:
result.messages.append(f"Error processing zip: {e}")
result.messages.append("Falling back to Contents API...")
finally:
if tmp_dir:
shutil.rmtree(tmp_dir, ignore_errors=True)
if not subpath:
raise SkillInstallError(
f"Zip download failed and batch install requires zip. "
f"Try again or specify a subpath: {spec}#skills/<name>"
)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1]
_check_skill_name(skill_name)
result.messages.append(f"Downloading via Contents API: {spec}/{subpath} ...")
target_dir = os.path.join(skills_dir, skill_name)
try:
with tempfile.TemporaryDirectory() as api_tmp:
api_dest = os.path.join(api_tmp, skill_name)
os.makedirs(api_dest)
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(api_dest, target_dir)
_register_installed_skill(skill_name, source=source)
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from GitHub.")
except Exception as e:
raise SkillInstallError(f"Contents API also failed: {e}")
def _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source, result: InstallResult):
"""Install skill(s) from an already-extracted repo root directory."""
if subpath:
source_dir = os.path.join(repo_root, subpath.strip("/"))
if not os.path.isdir(source_dir):
raise SkillInstallError(f"Path '{subpath}' not found in repository.")
if os.path.isfile(os.path.join(source_dir, "SKILL.md")):
if not skill_name:
fm = _parse_skill_frontmatter(
_read_file_text(os.path.join(source_dir, "SKILL.md"))
)
skill_name = fm.get("name") or subpath.rstrip("/").split("/")[-1]
_check_skill_name(skill_name)
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
_register_installed_skill(skill_name, source=source)
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from {source}.")
return
discovered = _scan_skills_in_dir(source_dir)
if discovered:
_batch_install_skills(discovered, spec, skills_dir, source, result)
return
raise SkillInstallError(f"No SKILL.md found in '{subpath}' or its subdirectories.")
else:
discovered = _scan_skills_in_repo(repo_root)
if not discovered:
if skill_name:
_check_skill_name(skill_name)
else:
skill_name = spec.split("/")[-1]
_check_skill_name(skill_name)
target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(repo_root, target_dir)
_register_installed_skill(skill_name, source=source)
result.installed.append(skill_name)
result.messages.append(f"Installed '{skill_name}' from {source}.")
return
_batch_install_skills(discovered, spec, skills_dir, source, result)
def _install_gitlab(spec, result: InstallResult, subpath=None, branch="main"):
"""Install skill(s) from a GitLab repo via zip download."""
_check_github_spec(spec)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
result.messages.append(f"Downloading from GitLab: {spec} (branch: {branch})...")
try: try:
resp = requests.get(zip_url, timeout=60, allow_redirects=True) tmp_dir, repo_root = _download_repo_zip(spec, branch, host="gitlab")
resp.raise_for_status()
except Exception as e: except Exception as e:
click.echo(f"Error: Failed to download from GitHub: {e}", err=True) raise SkillInstallError(f"Failed to download from GitLab: {e}")
sys.exit(1)
with tempfile.TemporaryDirectory() as tmp_dir: try:
zip_path = os.path.join(tmp_dir, "repo.zip") _install_from_repo_root(repo_root, spec, subpath, None, skills_dir, "gitlab", result)
with open(zip_path, "wb") as f: finally:
f.write(resp.content) shutil.rmtree(tmp_dir, ignore_errors=True)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
repo_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
repo_root = os.path.join(extract_dir, top_items[0])
if subpath:
source_dir = os.path.join(repo_root, subpath.strip("/"))
if not os.path.isdir(source_dir):
click.echo(f"Error: Path '{subpath}' not found in repository.", err=True)
sys.exit(1)
else:
source_dir = repo_root
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
_register_installed_skill(skill_name, source=source)
_print_install_success(skill_name, source)
def _install_zip_bytes(content, name, skills_dir): def _install_git_clone(git_url: str, result: InstallResult, display_name: str = ""):
"""Extract a zip archive into the skills directory.""" """Install skill(s) from any git URL via shallow clone."""
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
result.messages.append(f"Cloning {display_name or git_url} ...")
try:
tmp_dir, repo_root = _clone_repo(git_url)
except RuntimeError as e:
raise SkillInstallError(str(e))
try:
_install_from_repo_root(repo_root, display_name or git_url, None, None, skills_dir, "git", result)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def _install_zip_bytes(content, name, skills_dir, result: InstallResult = None, source_label: str = "zip"):
"""Extract a zip archive and install skill(s).
Supports three scenarios:
1. Root contains SKILL.md → single skill install
2. Contains multiple skill dirs (skills/, or immediate children with SKILL.md) → batch install
3. Fallback → treat the entire archive as a single skill named `name`
"""
with tempfile.TemporaryDirectory() as tmp_dir: with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "package.zip") zip_path = os.path.join(tmp_dir, "package.zip")
with open(zip_path, "wb") as f: with open(zip_path, "wb") as f:
@@ -651,14 +1074,35 @@ def _install_zip_bytes(content, name, skills_dir):
_safe_extractall(zf, extract_dir) _safe_extractall(zf, extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")] top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
source = extract_dir pkg_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])): if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
source = os.path.join(extract_dir, top_items[0]) pkg_root = os.path.join(extract_dir, top_items[0])
discovered = _scan_skills_in_repo(pkg_root) or _scan_skills_in_dir(pkg_root)
if discovered and len(discovered) > 1 and result is not None:
_batch_install_skills(discovered, name, skills_dir, source_label, result)
return
if discovered and len(discovered) == 1:
sname, sdir = discovered[0]
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64]
if not _SAFE_NAME_RE.match(safe_name):
safe_name = name
target = os.path.join(skills_dir, safe_name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(sdir, target)
_register_installed_skill(safe_name, source=source_label)
if result is not None:
result.installed.append(safe_name)
result.messages.append(f"Installed '{safe_name}' from {source_label}.")
return
target = os.path.join(skills_dir, name) target = os.path.join(skills_dir, name)
if os.path.exists(target): if os.path.exists(target):
shutil.rmtree(target) shutil.rmtree(target)
shutil.copytree(source, target) shutil.copytree(pkg_root, target)

View File

@@ -599,239 +599,38 @@ class CowCliPlugin(Plugin):
return "请指定要安装的技能: /skill install <名称>" return "请指定要安装的技能: /skill install <名称>"
try: try:
from cli.utils import get_skills_dir, SKILL_HUB_API from cli.commands.skill import install_skill
from cli.commands.skill import _parse_github_url, _download_github_dir result = install_skill(name)
import requests
import shutil
import zipfile
import tempfile
skills_dir = get_skills_dir() if result.error:
os.makedirs(skills_dir, exist_ok=True) return f"安装失败: {result.error}"
if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"): if not result.installed:
import re as re_mod return "\n".join(result.messages) if result.messages else "未找到可安装的技能"
dir_url = re_mod.sub(r'/SKILL\.md/?$', '', name)
gh = _parse_github_url(dir_url)
if gh:
owner, repo, branch, subpath = gh
spec = f"{owner}/{repo}"
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
return self._skill_install_github(
spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch
)
return self._skill_install_url(name, skills_dir)
parsed = _parse_github_url(name) return self._format_install_result(result)
if parsed:
owner, repo, branch, subpath = parsed
spec = f"{owner}/{repo}"
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
return self._skill_install_github(
spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch
)
provider = None
if name.startswith("github:"):
name = name[7:]
elif name.startswith("clawhub:"):
name = name[8:]
provider = "clawhub"
body = {}
if provider:
body["provider"] = provider
resp = requests.post(
f"{SKILL_HUB_API}/skills/{name}/download",
json=body,
timeout=15,
)
resp.raise_for_status()
content_type = resp.headers.get("Content-Type", "")
if "application/json" in content_type:
data = resp.json()
source_type = data.get("source_type")
if source_type == "github" or "redirect" in data:
source_url = data.get("source_url", "")
parsed_url = _parse_github_url(source_url)
if parsed_url:
owner, repo, branch, subpath = parsed_url
return self._skill_install_github(
f"{owner}/{repo}", skills_dir, subpath=subpath,
skill_name=name, branch=branch
)
return self._skill_install_github(source_url, skills_dir, skill_name=name)
if source_type == "registry":
download_url = data.get("download_url")
if not download_url:
return f"此技能来自不支持的注册表,无法自动安装。"
from urllib.parse import urlparse
if urlparse(download_url).scheme != "https":
return "安装失败: 下载地址不安全 (非 HTTPS)"
provider = data.get("source_provider", "registry")
try:
dl_resp = requests.get(download_url, timeout=60, allow_redirects=True)
dl_resp.raise_for_status()
except Exception as e:
return f"{provider} 下载失败: {e}"
self._extract_zip(dl_resp.content, name, skills_dir)
self._register_skill(name, source=provider)
return self._format_install_success(name, provider)
elif "application/zip" in content_type:
self._extract_zip(resp.content, name, skills_dir)
self._register_skill(name, source="cowhub")
return self._format_install_success(name, "cowhub")
return "技能商店返回了未预期的响应格式"
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
return f"技能 '{name}' 未在技能商店中找到"
return f"安装失败: {e}"
except Exception as e: except Exception as e:
return f"安装失败: {e}" return f"安装失败: {e}"
def _skill_install_url(self, url: str, skills_dir: str) -> str:
"""Install a skill from a direct SKILL.md URL."""
import requests
from cli.commands.skill import _parse_skill_frontmatter
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
except Exception as e:
return f"下载 SKILL.md 失败: {e}"
content = resp.text
fm = _parse_skill_frontmatter(content)
skill_name = fm.get("name")
if not skill_name:
return "SKILL.md 中未找到 name 字段,无法安装"
skill_name = skill_name.strip()
skill_dir = os.path.join(skills_dir, skill_name)
os.makedirs(skill_dir, exist_ok=True)
with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f:
f.write(content)
self._register_skill(skill_name, source="url")
return self._format_install_success(skill_name, "url")
def _skill_install_github(self, spec: str, skills_dir: str,
subpath: str = None, skill_name: str = None,
branch: str = "main") -> str:
import requests
import shutil
import zipfile
import tempfile
from cli.commands.skill import _download_github_dir
if "#" in spec and not subpath:
spec, subpath = spec.split("#", 1)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
owner, repo = spec.split("/", 1)
target_dir = os.path.join(skills_dir, skill_name)
# For subpath installs, try Contents API first
if subpath:
try:
with tempfile.TemporaryDirectory() as tmp_dir:
api_dest = os.path.join(tmp_dir, skill_name)
os.makedirs(api_dest)
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(api_dest, target_dir)
self._register_skill(skill_name, source="github")
return self._format_install_success(skill_name, "github")
except Exception:
pass # fall through to zip download
# Fallback: download full repo zip
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
try:
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
resp.raise_for_status()
except Exception as e:
return f"从 GitHub 下载失败: {e}"
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "repo.zip")
with open(zip_path, "wb") as f:
f.write(resp.content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
repo_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
repo_root = os.path.join(extract_dir, top_items[0])
if subpath:
source_dir = os.path.join(repo_root, subpath.strip("/"))
if not os.path.isdir(source_dir):
return f"路径 '{subpath}' 在仓库中不存在"
else:
source_dir = repo_root
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
self._register_skill(skill_name, source="github")
return self._format_install_success(skill_name, "github")
def _extract_zip(self, content: bytes, name: str, skills_dir: str):
import zipfile
import tempfile
import shutil
with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = os.path.join(tmp_dir, "package.zip")
with open(zip_path, "wb") as f:
f.write(content)
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
source = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
source = os.path.join(extract_dir, top_items[0])
target = os.path.join(skills_dir, name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(source, target)
@staticmethod @staticmethod
def _register_skill(name: str, source: str = "cowhub"): def _format_install_result(result) -> str:
try: """Format InstallResult into a chat-friendly message."""
from cli.commands.skill import _register_installed_skill
_register_installed_skill(name, source=source)
except Exception:
pass
@staticmethod
def _format_install_success(name: str, source: str) -> str:
from cli.commands.skill import _read_skill_description from cli.commands.skill import _read_skill_description
from cli.utils import get_skills_dir from cli.utils import get_skills_dir
desc = _read_skill_description(os.path.join(get_skills_dir(), name)) skills_dir = get_skills_dir()
lines = [f"{name}"]
if desc: lines = []
if len(desc) > 60: for skill_name in result.installed:
desc = desc[:57] + "" desc = _read_skill_description(os.path.join(skills_dir, skill_name))
lines.append(f" {desc}") lines.append(f" {skill_name}")
lines.append(f" 来源: {source}") if desc:
if len(desc) > 60:
desc = desc[:57] + ""
lines.append(f" {desc}")
if len(result.installed) > 1:
lines.append(f"\n共安装 {len(result.installed)} 个技能")
return "\n".join(lines) return "\n".join(lines)
def _skill_uninstall(self, name: str) -> str: def _skill_uninstall(self, name: str) -> str:

View File

@@ -9,6 +9,7 @@ agentmesh-sdk>=0.1.3
python-dotenv>=1.0.0 python-dotenv>=1.0.0
PyYAML>=6.0 PyYAML>=6.0
croniter>=2.0.0 croniter>=2.0.0
click>=8.0
qrcode qrcode
# wechatcom & wechatmp # wechatcom & wechatmp

45
run.sh
View File

@@ -584,13 +584,15 @@ start_project() {
echo -e " ${GREEN}cow restart${NC} Restart the service" echo -e " ${GREEN}cow restart${NC} Restart the service"
echo -e " ${GREEN}cow status${NC} Check status" echo -e " ${GREEN}cow status${NC} Check status"
echo -e " ${GREEN}cow logs${NC} View logs" echo -e " ${GREEN}cow logs${NC} View logs"
echo -e " ${GREEN}cow update${NC} Update and restart"
echo -e " ${GREEN}cow install-browser${NC} Install browser tool"
else else
echo -e " ${GREEN}./run.sh stop${NC} Stop the service" echo -e " ${GREEN}./run.sh stop${NC} Stop the service"
echo -e " ${GREEN}./run.sh restart${NC} Restart the service" echo -e " ${GREEN}./run.sh restart${NC} Restart the service"
echo -e " ${GREEN}./run.sh status${NC} Check status" echo -e " ${GREEN}./run.sh status${NC} Check status"
echo -e " ${GREEN}./run.sh logs${NC} View logs" echo -e " ${GREEN}./run.sh logs${NC} View logs"
echo -e " ${GREEN}./run.sh update${NC} Update and restart"
fi fi
echo -e " ${GREEN}./run.sh update${NC} Update and restart"
echo -e "${CYAN}${BOLD}=========================================${NC}" echo -e "${CYAN}${BOLD}=========================================${NC}"
echo "" echo ""
@@ -792,27 +794,43 @@ cmd_update() {
echo -e "${GREEN}${EMOJI_WRENCH} Updating CowAgent...${NC}" echo -e "${GREEN}${EMOJI_WRENCH} Updating CowAgent...${NC}"
cd "${BASE_DIR}" cd "${BASE_DIR}"
# Stop service # Pull latest code first (service still running)
if is_running; then local pull_ok=false
cmd_stop
fi
# Update code
if [ -d .git ]; then if [ -d .git ]; then
echo -e "${GREEN}🔄 Pulling latest code...${NC}" echo -e "${GREEN}🔄 Pulling latest code...${NC}"
git pull || { if git pull; then
echo -e "${YELLOW}⚠️ GitHub failed, trying Gitee...${NC}" pull_ok=true
else
echo -e "${YELLOW}⚠️ git pull failed, trying Gitee mirror...${NC}"
git remote set-url origin https://gitee.com/zhayujie/chatgpt-on-wechat.git git remote set-url origin https://gitee.com/zhayujie/chatgpt-on-wechat.git
git pull if git pull; then
} pull_ok=true
else
echo -e "${RED}❌ Failed to pull code. Update aborted.${NC}"
exit 1
fi
fi
else else
echo -e "${YELLOW}⚠️ Not a git repository, skipping code update${NC}" echo -e "${YELLOW}⚠️ Not a git repository, skipping code update${NC}"
fi fi
# Re-exec with the updated run.sh to pick up new logic
exec "$0" _post_update
}
# Post-update: called by cmd_update after git pull to run with new code
cmd_post_update() {
cd "${BASE_DIR}"
# Stop service
if is_running; then
cmd_stop
fi
# Reinstall dependencies # Reinstall dependencies
check_python_version check_python_version
install_dependencies install_dependencies
# Restart service # Restart service
cmd_start cmd_start
} }
@@ -882,7 +900,7 @@ require_project_dir() {
# Main function # Main function
main() { main() {
case "$1" in case "$1" in
start|stop|restart|status|logs|config|update) start|stop|restart|status|logs|config|update|_post_update)
require_project_dir require_project_dir
;; ;;
esac esac
@@ -895,6 +913,7 @@ main() {
logs) cmd_logs ;; logs) cmd_logs ;;
config) cmd_config ;; config) cmd_config ;;
update) cmd_update ;; update) cmd_update ;;
_post_update) cmd_post_update ;;
help|--help|-h) help|--help|-h)
show_usage show_usage
;; ;;