+
${attachHtml}${textHtml}
${formatTime(timestamp)}
diff --git a/cli/commands/skill.py b/cli/commands/skill.py
index 26be0d79..e0652e4e 100644
--- a/cli/commands/skill.py
+++ b/cli/commands/skill.py
@@ -23,10 +23,68 @@ from cli.utils import (
)
_SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$")
+_GITHUB_URL_RE = re.compile(
+ r"^https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(?:tree|blob)/([^/]+)(?:/(.+))?)?/?$"
+)
-def _register_installed_skill(name: str):
- """Register a newly installed skill into skills_config.json."""
+def _parse_github_url(url: str):
+ """Parse a full GitHub URL into (owner, repo, branch, subpath).
+
+ Returns None if the URL doesn't match.
+ Supported formats:
+ https://github.com/owner/repo
+ https://github.com/owner/repo/tree/branch
+ https://github.com/owner/repo/tree/branch/path/to/skill
+ https://github.com/owner/repo/blob/branch/path/to/skill
+ """
+ m = _GITHUB_URL_RE.match(url.strip())
+ if not m:
+ return None
+ owner, repo, branch, subpath = m.groups()
+ return owner, repo, branch or "main", subpath
+
+
+def _download_github_dir(owner, repo, branch, subpath, dest_dir):
+ """Download a subdirectory from GitHub using the Contents API.
+
+ Recursively fetches all files under the given subpath and writes them
+ to dest_dir. Raises on any network or API error.
+ """
+ api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{subpath}?ref={branch}"
+ resp = requests.get(api_url, timeout=30, headers={"Accept": "application/vnd.github.v3+json"})
+ resp.raise_for_status()
+ items = resp.json()
+
+ if isinstance(items, dict):
+ items = [items]
+
+ for item in items:
+ rel_path = item["path"]
+ if subpath:
+ rel_path = rel_path[len(subpath.strip("/")):].lstrip("/")
+ local_path = os.path.join(dest_dir, rel_path)
+
+ if item["type"] == "file":
+ os.makedirs(os.path.dirname(local_path), exist_ok=True)
+ dl_url = item.get("download_url")
+ if not dl_url:
+ continue
+ file_resp = requests.get(dl_url, timeout=30)
+ file_resp.raise_for_status()
+ with open(local_path, "wb") as f:
+ f.write(file_resp.content)
+ elif item["type"] == "dir":
+ os.makedirs(local_path, exist_ok=True)
+ child_subpath = item["path"]
+ _download_github_dir(owner, repo, branch, child_subpath, dest_dir)
+
+
+def _register_installed_skill(name: str, source: str = "cowhub"):
+ """Register a newly installed skill into skills_config.json.
+
+ source values: builtin, cow, github, clawhub, linkai, local, url
+ """
skills_dir = get_skills_dir()
config_path = os.path.join(skills_dir, "skills_config.json")
@@ -47,7 +105,7 @@ def _register_installed_skill(name: str):
config[name] = {
"name": name,
"description": description,
- "source": "custom",
+ "source": source,
"enabled": True,
"category": "skill",
}
@@ -59,6 +117,21 @@ def _register_installed_skill(name: str):
pass
+def _parse_skill_frontmatter(content: str) -> dict:
+ """Parse YAML frontmatter from SKILL.md content and return a dict with name/description."""
+ result = {}
+ match = re.match(r'^---\s*\n(.*?)\n---\s*\n', content, re.DOTALL)
+ if not match:
+ return result
+ for line in match.group(1).split('\n'):
+ line = line.strip()
+ for key in ('name', 'description'):
+ if line.startswith(f'{key}:'):
+ val = line[len(key) + 1:].strip()
+ result[key] = val.strip('"').strip("'")
+ return result
+
+
def _read_skill_description(skill_dir: str) -> str:
"""Read the description from a skill's SKILL.md frontmatter."""
skill_md = os.path.join(skill_dir, "SKILL.md")
@@ -67,18 +140,56 @@ def _read_skill_description(skill_dir: str) -> str:
try:
with open(skill_md, "r", encoding="utf-8") as f:
content = f.read()
- import re as re_mod
- match = re_mod.match(r'^---\s*\n(.*?)\n---\s*\n', content, re_mod.DOTALL)
- if not match:
- return ""
- for line in match.group(1).split('\n'):
- line = line.strip()
- if line.startswith('description:'):
- desc = line[len('description:'):].strip()
- return desc.strip('"').strip("'")
+ return _parse_skill_frontmatter(content).get("description", "")
except Exception:
- pass
- return ""
+ return ""
+
+
+def _install_url(url: str):
+ """Install a skill from a direct SKILL.md URL."""
+ click.echo(f"Downloading SKILL.md from {url} ...")
+ try:
+ resp = requests.get(url, timeout=30)
+ resp.raise_for_status()
+ except Exception as e:
+ click.echo(f"Error: Failed to download SKILL.md: {e}", err=True)
+ sys.exit(1)
+
+ content = resp.text
+ fm = _parse_skill_frontmatter(content)
+ skill_name = fm.get("name")
+ if not skill_name:
+ click.echo("Error: SKILL.md missing 'name' field in frontmatter.", err=True)
+ sys.exit(1)
+
+ skill_name = skill_name.strip()
+ _validate_skill_name(skill_name)
+
+ skills_dir = get_skills_dir()
+ os.makedirs(skills_dir, exist_ok=True)
+ skill_dir = os.path.join(skills_dir, skill_name)
+
+ if os.path.isdir(skill_dir):
+ click.echo(f"Skill '{skill_name}' already exists. Overwriting SKILL.md ...")
+ os.makedirs(skill_dir, exist_ok=True)
+
+ with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f:
+ f.write(content)
+
+ _register_installed_skill(skill_name, source="url")
+ _print_install_success(skill_name, "url")
+
+
+def _print_install_success(name: str, source: str):
+ """Print a unified install success message with description and source."""
+ skills_dir = get_skills_dir()
+ desc = _read_skill_description(os.path.join(skills_dir, name))
+ click.echo(click.style(f"✓ {name}", fg="green"))
+ if desc:
+ if len(desc) > 60:
+ desc = desc[:57] + "…"
+ click.echo(f" {desc}")
+ click.echo(f" 来源: {source}")
def _validate_skill_name(name: str):
@@ -306,7 +417,7 @@ def search(query):
@skill.command()
@click.argument("name")
def install(name):
- """Install a skill from Skill Hub or GitHub.
+ """Install a skill from Skill Hub, GitHub, or a SKILL.md URL.
Examples:
@@ -315,8 +426,31 @@ def install(name):
cow skill install github:owner/repo
cow skill install github:owner/repo#path/to/skill
+
+ cow skill install https://github.com/owner/repo/tree/main/path/to/skill
+
+ cow skill install https://example.com/path/to/SKILL.md
"""
- if name.startswith("github:"):
+ if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"):
+ # GitHub SKILL.md → strip filename and install the whole directory
+ dir_url = re.sub(r'/SKILL\.md/?$', '', name)
+ gh = _parse_github_url(dir_url)
+ if gh:
+ owner, repo, branch, subpath = gh
+ spec = f"{owner}/{repo}"
+ skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
+ _install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
+ return
+ _install_url(name)
+ return
+
+ parsed = _parse_github_url(name)
+ if parsed:
+ owner, repo, branch, subpath = parsed
+ spec = f"{owner}/{repo}"
+ skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
+ _install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
+ elif name.startswith("github:"):
_install_github(name[7:])
else:
_validate_skill_name(name)
@@ -351,10 +485,15 @@ def _install_hub(name):
if source_type == "github":
source_url = data.get("source_url", "")
- _validate_github_spec(source_url)
- source_path = data.get("source_path")
- click.echo(f"Source: GitHub ({source_url})")
- _install_github(source_url, subpath=source_path, skill_name=name)
+ parsed_url = _parse_github_url(source_url)
+ if parsed_url:
+ owner, repo, branch, subpath = parsed_url
+ click.echo(f"Source: GitHub ({source_url})")
+ _install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch)
+ else:
+ _validate_github_spec(source_url)
+ click.echo(f"Source: GitHub ({source_url})")
+ _install_github(source_url, skill_name=name)
return
if source_type == "registry":
@@ -376,8 +515,8 @@ def _install_hub(name):
sys.exit(1)
_verify_checksum(dl_resp.content, expected_checksum)
_install_zip_bytes(dl_resp.content, name, skills_dir)
- _register_installed_skill(name)
- click.echo(click.style(f"✓ Skill '{name}' installed successfully!", fg="green"))
+ _register_installed_skill(name, source=provider)
+ _print_install_success(name, provider)
else:
click.echo(f"Error: Unsupported registry provider.", err=True)
sys.exit(1)
@@ -385,10 +524,15 @@ def _install_hub(name):
if "redirect" in data:
source_url = data.get("source_url", "")
- _validate_github_spec(source_url)
- source_path = data.get("source_path")
- click.echo(f"Source: GitHub ({source_url})")
- _install_github(source_url, subpath=source_path, skill_name=name)
+ parsed_url = _parse_github_url(source_url)
+ if parsed_url:
+ owner, repo, branch, subpath = parsed_url
+ click.echo(f"Source: GitHub ({source_url})")
+ _install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch)
+ else:
+ _validate_github_spec(source_url)
+ click.echo(f"Source: GitHub ({source_url})")
+ _install_github(source_url, skill_name=name)
return
elif "application/zip" in content_type:
@@ -397,14 +541,14 @@ def _install_hub(name):
_verify_checksum(resp.content, expected_checksum)
_install_zip_bytes(resp.content, name, skills_dir)
_register_installed_skill(name)
- click.echo(click.style(f"✓ Skill '{name}' installed successfully!", fg="green"))
+ _print_install_success(name, "cowhub")
return
click.echo(f"Error: Unexpected response from Skill Hub.", err=True)
sys.exit(1)
-def _install_github(spec, subpath=None, skill_name=None):
+def _install_github(spec, subpath=None, skill_name=None, branch="main", source="github"):
"""Install a skill from a GitHub repo.
spec format: owner/repo or owner/repo#path
@@ -420,9 +564,30 @@ def _install_github(spec, subpath=None, skill_name=None):
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
+ target_dir = os.path.join(skills_dir, skill_name)
- zip_url = f"https://github.com/{spec}/archive/refs/heads/main.zip"
- click.echo(f"Downloading from GitHub: {spec}...")
+ owner, repo = spec.split("/", 1)
+
+ # For subpath installs, try GitHub Contents API first (avoids downloading entire repo)
+ if subpath:
+ click.echo(f"Downloading from GitHub: {spec}/{subpath} (branch: {branch})...")
+ try:
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ api_dest = os.path.join(tmp_dir, skill_name)
+ os.makedirs(api_dest)
+ _download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
+ if os.path.exists(target_dir):
+ shutil.rmtree(target_dir)
+ shutil.copytree(api_dest, target_dir)
+ _register_installed_skill(skill_name, source=source)
+ _print_install_success(skill_name, source)
+ return
+ except Exception:
+ click.echo("Contents API unavailable, falling back to zip download...")
+
+ # Fallback: download full repo zip
+ zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
+ click.echo(f"Downloading from GitHub: {spec} (branch: {branch})...")
try:
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
@@ -440,7 +605,6 @@ def _install_github(spec, subpath=None, skill_name=None):
with zipfile.ZipFile(zip_path, "r") as zf:
_safe_extractall(zf, extract_dir)
- # GitHub archives have a top-level dir like "repo-main/"
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
repo_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
@@ -454,13 +618,12 @@ def _install_github(spec, subpath=None, skill_name=None):
else:
source_dir = repo_root
- target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
- _register_installed_skill(skill_name)
- click.echo(click.style(f"✓ Skill '{skill_name}' installed successfully!", fg="green"))
+ _register_installed_skill(skill_name, source=source)
+ _print_install_success(skill_name, source)
def _install_zip_bytes(content, name, skills_dir):
diff --git a/plugins/cow_cli/cow_cli.py b/plugins/cow_cli/cow_cli.py
index ffbe5a4f..cf72390e 100644
--- a/plugins/cow_cli/cow_cli.py
+++ b/plugins/cow_cli/cow_cli.py
@@ -486,10 +486,11 @@ class CowCliPlugin(Plugin):
desc = entry.get("description", "")
if len(desc) > 50:
desc = desc[:47] + "…"
- source_tag = f" · {source}" if source else ""
- line = f"{icon} {name}{source_tag}"
+ line = f"{icon} {name}"
if desc:
line += f"\n {desc}"
+ if source:
+ line += f"\n 来源: {source}"
lines.append(line)
lines.append("")
@@ -598,10 +599,9 @@ class CowCliPlugin(Plugin):
if not name:
return "请指定要安装的技能: /skill install <名称>"
- # Run installation in a thread to avoid blocking
- # For now, invoke the CLI logic directly
try:
from cli.utils import get_skills_dir, SKILL_HUB_API
+ from cli.commands.skill import _parse_github_url, _download_github_dir
import requests
import shutil
import zipfile
@@ -610,6 +610,28 @@ class CowCliPlugin(Plugin):
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
+ if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"):
+ import re as re_mod
+ dir_url = re_mod.sub(r'/SKILL\.md/?$', '', name)
+ gh = _parse_github_url(dir_url)
+ if gh:
+ owner, repo, branch, subpath = gh
+ spec = f"{owner}/{repo}"
+ skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
+ return self._skill_install_github(
+ spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch
+ )
+ return self._skill_install_url(name, skills_dir)
+
+ parsed = _parse_github_url(name)
+ if parsed:
+ owner, repo, branch, subpath = parsed
+ spec = f"{owner}/{repo}"
+ skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
+ return self._skill_install_github(
+ spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch
+ )
+
if name.startswith("github:"):
return self._skill_install_github(name[7:], skills_dir)
@@ -623,8 +645,14 @@ class CowCliPlugin(Plugin):
source_type = data.get("source_type")
if source_type == "github" or "redirect" in data:
source_url = data.get("source_url", "")
- source_path = data.get("source_path")
- return self._skill_install_github(source_url, skills_dir, subpath=source_path, skill_name=name)
+ parsed_url = _parse_github_url(source_url)
+ if parsed_url:
+ owner, repo, branch, subpath = parsed_url
+ return self._skill_install_github(
+ f"{owner}/{repo}", skills_dir, subpath=subpath,
+ skill_name=name, branch=branch
+ )
+ return self._skill_install_github(source_url, skills_dir, skill_name=name)
if source_type == "registry":
download_url = data.get("download_url")
if not download_url:
@@ -639,13 +667,13 @@ class CowCliPlugin(Plugin):
except Exception as e:
return f"从 {provider} 下载失败: {e}"
self._extract_zip(dl_resp.content, name, skills_dir)
- self._report_install(name)
- return f"✅ 技能 '{name}' 安装成功!"
+ self._register_skill(name, source=provider)
+ return self._format_install_success(name, provider)
elif "application/zip" in content_type:
self._extract_zip(resp.content, name, skills_dir)
- self._report_install(name)
- return f"✅ 技能 '{name}' 安装成功!"
+ self._register_skill(name, source="cowhub")
+ return self._format_install_success(name, "cowhub")
return "技能商店返回了未预期的响应格式"
@@ -656,19 +684,67 @@ class CowCliPlugin(Plugin):
except Exception as e:
return f"安装失败: {e}"
+ def _skill_install_url(self, url: str, skills_dir: str) -> str:
+ """Install a skill from a direct SKILL.md URL."""
+ import requests
+ from cli.commands.skill import _parse_skill_frontmatter
+
+ try:
+ resp = requests.get(url, timeout=30)
+ resp.raise_for_status()
+ except Exception as e:
+ return f"下载 SKILL.md 失败: {e}"
+
+ content = resp.text
+ fm = _parse_skill_frontmatter(content)
+ skill_name = fm.get("name")
+ if not skill_name:
+ return "SKILL.md 中未找到 name 字段,无法安装"
+
+ skill_name = skill_name.strip()
+ skill_dir = os.path.join(skills_dir, skill_name)
+ os.makedirs(skill_dir, exist_ok=True)
+
+ with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f:
+ f.write(content)
+
+ self._register_skill(skill_name, source="url")
+ return self._format_install_success(skill_name, "url")
+
def _skill_install_github(self, spec: str, skills_dir: str,
- subpath: str = None, skill_name: str = None) -> str:
+ subpath: str = None, skill_name: str = None,
+ branch: str = "main") -> str:
import requests
import shutil
import zipfile
import tempfile
+ from cli.commands.skill import _download_github_dir
if "#" in spec and not subpath:
spec, subpath = spec.split("#", 1)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
- zip_url = f"https://github.com/{spec}/archive/refs/heads/main.zip"
+ owner, repo = spec.split("/", 1)
+ target_dir = os.path.join(skills_dir, skill_name)
+
+ # For subpath installs, try Contents API first
+ if subpath:
+ try:
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ api_dest = os.path.join(tmp_dir, skill_name)
+ os.makedirs(api_dest)
+ _download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
+ if os.path.exists(target_dir):
+ shutil.rmtree(target_dir)
+ shutil.copytree(api_dest, target_dir)
+ self._register_skill(skill_name, source="github")
+ return self._format_install_success(skill_name, "github")
+ except Exception:
+ pass # fall through to zip download
+
+ # Fallback: download full repo zip
+ zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
try:
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
resp.raise_for_status()
@@ -696,15 +772,12 @@ class CowCliPlugin(Plugin):
else:
source_dir = repo_root
- target_dir = os.path.join(skills_dir, skill_name)
if os.path.exists(target_dir):
- import shutil
shutil.rmtree(target_dir)
- import shutil
shutil.copytree(source_dir, target_dir)
- self._report_install(skill_name)
- return f"✅ 技能 '{skill_name}' 安装成功!"
+ self._register_skill(skill_name, source="github")
+ return self._format_install_success(skill_name, "github")
def _extract_zip(self, content: bytes, name: str, skills_dir: str):
import zipfile
@@ -730,14 +803,27 @@ class CowCliPlugin(Plugin):
shutil.rmtree(target)
shutil.copytree(source, target)
- def _report_install(self, name: str):
+ @staticmethod
+ def _register_skill(name: str, source: str = "cowhub"):
try:
- import requests
- from cli.utils import SKILL_HUB_API
- requests.post(f"{SKILL_HUB_API}/skills/{name}/install", json={}, timeout=5)
+ from cli.commands.skill import _register_installed_skill
+ _register_installed_skill(name, source=source)
except Exception:
pass
+ @staticmethod
+ def _format_install_success(name: str, source: str) -> str:
+ from cli.commands.skill import _read_skill_description
+ from cli.utils import get_skills_dir
+ desc = _read_skill_description(os.path.join(get_skills_dir(), name))
+ lines = [f"✅ {name}"]
+ if desc:
+ if len(desc) > 60:
+ desc = desc[:57] + "…"
+ lines.append(f" {desc}")
+ lines.append(f" 来源: {source}")
+ return "\n".join(lines)
+
def _skill_uninstall(self, name: str) -> str:
if not name:
return "请指定要卸载的技能: /skill uninstall <名称>"