diff --git a/cli/commands/skill.py b/cli/commands/skill.py index 2a568665..01065a72 100644 --- a/cli/commands/skill.py +++ b/cli/commands/skill.py @@ -26,6 +26,12 @@ _SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$") _GITHUB_URL_RE = re.compile( r"^https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(?:tree|blob)/([^/]+)(?:/(.+))?)?/?$" ) +_GITLAB_URL_RE = re.compile( + r"^https?://gitlab\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/-/tree/([^/]+)(?:/(.+))?)?/?$" +) +_GIT_SSH_RE = re.compile( + r"^git@([^:]+):([^/]+)/([^/]+?)(?:\.git)?$" +) def _parse_github_url(url: str): @@ -45,11 +51,92 @@ def _parse_github_url(url: str): return owner, repo, branch or "main", subpath +def _parse_gitlab_url(url: str): + """Parse a GitLab URL into (owner, repo, branch, subpath). + + Returns None if the URL doesn't match. + Supported formats: + https://gitlab.com/owner/repo + https://gitlab.com/owner/repo/-/tree/branch + https://gitlab.com/owner/repo/-/tree/branch/path/to/skill + """ + m = _GITLAB_URL_RE.match(url.strip()) + if not m: + return None + owner, repo, branch, subpath = m.groups() + return owner, repo, branch or "main", subpath + + +def _parse_git_ssh_url(url: str): + """Parse a git@ SSH URL into (host, owner, repo). + + Returns None if the URL doesn't match. + Supported format: git@github.com:owner/repo.git + """ + m = _GIT_SSH_RE.match(url.strip()) + if not m: + return None + host, owner, repo = m.groups() + return host, owner, repo + + +def _clone_repo(git_url: str): + """Shallow-clone a git repo and return (tmp_dir, repo_root). + + Requires git to be installed. The caller must clean up tmp_dir. + """ + tmp_dir = tempfile.mkdtemp(prefix="cow-skill-") + repo_dir = os.path.join(tmp_dir, "repo") + try: + import subprocess + subprocess.run( + ["git", "clone", "--depth", "1", git_url, repo_dir], + check=True, capture_output=True, timeout=120, + ) + except FileNotFoundError: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise RuntimeError("git is not installed") + except Exception as e: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise RuntimeError(f"git clone failed: {e}") + return tmp_dir, repo_dir + + +def _download_repo_zip(spec: str, branch: str = "main", host: str = "github"): + """Download a GitHub/GitLab repo as zip and extract it. + + Returns (tmp_dir, repo_root) where tmp_dir is the temp directory to clean up + and repo_root is the extracted repository root path. + """ + if host == "gitlab": + zip_url = f"https://gitlab.com/{spec}/-/archive/{branch}/{spec.split('/')[-1]}-{branch}.zip" + else: + zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip" + resp = requests.get(zip_url, timeout=120, allow_redirects=True) + resp.raise_for_status() + + tmp_dir = tempfile.mkdtemp(prefix="cow-skill-") + zip_path = os.path.join(tmp_dir, "repo.zip") + with open(zip_path, "wb") as f: + f.write(resp.content) + + extract_dir = os.path.join(tmp_dir, "extracted") + with zipfile.ZipFile(zip_path, "r") as zf: + _safe_extractall(zf, extract_dir) + + # GitHub zips have a single top-level dir like "repo-main/" + top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")] + if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])): + return tmp_dir, os.path.join(extract_dir, top_items[0]) + return tmp_dir, extract_dir + + def _download_github_dir(owner, repo, branch, subpath, dest_dir): """Download a subdirectory from GitHub using the Contents API. Recursively fetches all files under the given subpath and writes them - to dest_dir. Raises on any network or API error. + to dest_dir. Used as a fallback when zip download fails. + Costs one API request per directory (60/hr unauthenticated). """ api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{subpath}?ref={branch}" resp = requests.get(api_url, timeout=30, headers={"Accept": "application/vnd.github.v3+json"}) @@ -80,6 +167,119 @@ def _download_github_dir(owner, repo, branch, subpath, dest_dir): _download_github_dir(owner, repo, branch, child_subpath, dest_dir) +# Directories to search for skills following the Agent Skills convention +_SKILL_SCAN_DIRS = [ + "skills", + "skills/.curated", + "skills/.experimental", +] + +_SKILL_SCAN_SKIP = { + "node_modules", "__pycache__", ".git", ".github", "venv", ".venv", +} + + +def _scan_skills_in_repo(repo_root: str) -> list: + """Scan a repo for skill directories containing SKILL.md. + + Searches in conventional locations (skills/, skills/.curated/, etc.) + and also checks the repo root itself. + + Returns a list of (skill_name, skill_dir_path) tuples. + """ + found = [] + + # Check repo root for a SKILL.md (single-skill repo) + if os.path.isfile(os.path.join(repo_root, "SKILL.md")): + fm = _parse_skill_frontmatter(_read_file_text(os.path.join(repo_root, "SKILL.md"))) + name = fm.get("name") or os.path.basename(repo_root) + found.append((name, repo_root)) + return found + + for scan_dir in _SKILL_SCAN_DIRS: + search_root = os.path.join(repo_root, scan_dir) + if not os.path.isdir(search_root): + continue + for entry in os.listdir(search_root): + if entry.startswith(".") and entry not in (".curated", ".experimental"): + continue + if entry in _SKILL_SCAN_SKIP: + continue + entry_path = os.path.join(search_root, entry) + if os.path.isdir(entry_path) and os.path.isfile(os.path.join(entry_path, "SKILL.md")): + fm = _parse_skill_frontmatter( + _read_file_text(os.path.join(entry_path, "SKILL.md")) + ) + name = fm.get("name") or entry + found.append((name, entry_path)) + + return found + + +def _read_file_text(path: str) -> str: + """Read a file as UTF-8 text, returning empty string on failure.""" + try: + with open(path, "r", encoding="utf-8") as f: + return f.read() + except Exception: + return "" + + +def _install_local(path: str): + """Install skill(s) from a local directory. + + If the path contains SKILL.md directly, install it as a single skill. + Otherwise scan for skills/ subdirectories with SKILL.md. + """ + path = os.path.abspath(os.path.expanduser(path)) + if not os.path.isdir(path): + click.echo(f"Error: '{path}' is not a directory.", err=True) + sys.exit(1) + + skills_dir = get_skills_dir() + os.makedirs(skills_dir, exist_ok=True) + + if os.path.isfile(os.path.join(path, "SKILL.md")): + fm = _parse_skill_frontmatter(_read_file_text(os.path.join(path, "SKILL.md"))) + skill_name = fm.get("name") or os.path.basename(path) + skill_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', skill_name)[:64] + _validate_skill_name(skill_name) + target_dir = os.path.join(skills_dir, skill_name) + if os.path.exists(target_dir): + shutil.rmtree(target_dir) + shutil.copytree(path, target_dir) + _register_installed_skill(skill_name, source="local") + _print_install_success(skill_name, "local") + return + + discovered = _scan_skills_in_repo(path) + if not discovered: + click.echo(f"Error: No skills found in '{path}'.", err=True) + sys.exit(1) + + click.echo(f"Found {len(discovered)} skill(s) in {path}:\n") + installed_names = [] + for sname, sdir in discovered: + safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64] + if not _SAFE_NAME_RE.match(safe_name): + click.echo(click.style(f" ✗ Skipping '{sname}' (invalid name)", fg="yellow")) + continue + target_dir = os.path.join(skills_dir, safe_name) + if os.path.exists(target_dir): + shutil.rmtree(target_dir) + shutil.copytree(sdir, target_dir) + _register_installed_skill(safe_name, source="local") + installed_names.append(safe_name) + + if installed_names: + click.echo("") + for n in installed_names: + _print_install_success(n, "local") + click.echo(f"\n {len(installed_names)} skill(s) installed from local path.") + else: + click.echo("No valid skills found in directory.") + + def _register_installed_skill(name: str, source: str = "cowhub"): """Register a newly installed skill into skills_config.json. @@ -417,50 +617,98 @@ def search(query): @skill.command() @click.argument("name") def install(name): - """Install a skill from Skill Hub, GitHub, or a SKILL.md URL. + """Install skill(s) from Skill Hub, GitHub, GitLab, git URL, or local path. + + When given an owner/repo (or full URL), downloads the repo and + auto-discovers all skills/ subdirectories containing SKILL.md, + installing them in batch. Use a subpath to install a single skill. Examples: - cow skill install pptx + cow skill install pptx (from Skill Hub) - cow skill install github:owner/repo + cow skill install larksuite/cli (GitHub shorthand, all skills) - cow skill install github:owner/repo#path/to/skill + cow skill install larksuite/cli#skills/lark-im (single skill by subpath) - cow skill install https://github.com/owner/repo/tree/main/path/to/skill + cow skill install https://github.com/owner/repo + + cow skill install https://gitlab.com/org/repo + + cow skill install git@github.com:owner/repo.git + + cow skill install ./my-local-skills (local directory) cow skill install https://example.com/path/to/SKILL.md """ + # --- Local path --- + if name.startswith(("./", "../", "/", "~/")): + _install_local(name) + return + + # --- Direct SKILL.md URL --- if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"): - # GitHub SKILL.md → strip filename and install the whole directory dir_url = re.sub(r'/SKILL\.md/?$', '', name) gh = _parse_github_url(dir_url) if gh: owner, repo, branch, subpath = gh - spec = f"{owner}/{repo}" - skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo - _install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch) + _install_github(f"{owner}/{repo}", subpath=subpath, skill_name=( + subpath.rstrip("/").split("/")[-1] if subpath else repo + ), branch=branch) return _install_url(name) return + # --- Full GitHub URL --- parsed = _parse_github_url(name) if parsed: owner, repo, branch, subpath = parsed - spec = f"{owner}/{repo}" - skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo - _install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch) - elif name.startswith("github:"): - skill_name = name[7:] - _validate_skill_name(skill_name) - _install_hub(skill_name) - elif name.startswith("clawhub:"): + _install_github(f"{owner}/{repo}", subpath=subpath, branch=branch) + return + + # --- Full GitLab URL --- + gl = _parse_gitlab_url(name) + if gl: + owner, repo, branch, subpath = gl + _install_gitlab(f"{owner}/{repo}", subpath=subpath, branch=branch) + return + + # --- git@host:owner/repo.git SSH URL --- + ssh = _parse_git_ssh_url(name) + if ssh: + host, owner, repo = ssh + _install_git_clone(name, f"{owner}/{repo}") + return + + # --- github: prefix --- + if name.startswith("github:"): + raw = name[7:] + subpath = None + if "#" in raw: + raw, subpath = raw.split("#", 1) + _validate_github_spec(raw) + _install_github(raw, subpath=subpath) + return + + # --- clawhub: prefix --- + if name.startswith("clawhub:"): skill_name = name[8:] _validate_skill_name(skill_name) _install_hub(skill_name, provider="clawhub") - else: - _validate_skill_name(name) - _install_hub(name) + return + + # --- owner/repo or owner/repo#subpath shorthand --- + if re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+(?:#.+)?$", name): + subpath = None + spec = name + if "#" in spec: + spec, subpath = spec.split("#", 1) + _install_github(spec, subpath=subpath) + return + + # --- Fallback: Skill Hub by name --- + _validate_skill_name(name) + _install_hub(name) def _install_hub(name, provider=None): @@ -562,7 +810,13 @@ def _install_hub(name, provider=None): def _install_github(spec, subpath=None, skill_name=None, branch="main", source="github"): - """Install a skill from a GitHub repo. + """Install skill(s) from a GitHub repo. + + Strategy: zip download first (no API rate limit), Contents API as fallback. + + When subpath is given, install that single skill directory. + When subpath is None, scan the repo for all skills/ subdirectories containing + SKILL.md and batch-install them. spec format: owner/repo or owner/repo#path """ @@ -571,72 +825,160 @@ def _install_github(spec, subpath=None, skill_name=None, branch="main", source=" _validate_github_spec(spec) - if not skill_name: - skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1] - _validate_skill_name(skill_name) - skills_dir = get_skills_dir() os.makedirs(skills_dir, exist_ok=True) - target_dir = os.path.join(skills_dir, skill_name) - owner, repo = spec.split("/", 1) - # For subpath installs, try GitHub Contents API first (avoids downloading entire repo) - if subpath: - click.echo(f"Downloading from GitHub: {spec}/{subpath} (branch: {branch})...") - try: - with tempfile.TemporaryDirectory() as tmp_dir: - api_dest = os.path.join(tmp_dir, skill_name) - os.makedirs(api_dest) - _download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest) - if os.path.exists(target_dir): - shutil.rmtree(target_dir) - shutil.copytree(api_dest, target_dir) - _register_installed_skill(skill_name, source=source) - _print_install_success(skill_name, source) - return - except Exception: - click.echo("Contents API unavailable, falling back to zip download...") - - # Fallback: download full repo zip - zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip" + # --- Primary: zip download (no rate limit) --- click.echo(f"Downloading from GitHub: {spec} (branch: {branch})...") + tmp_dir = None + repo_root = None try: - resp = requests.get(zip_url, timeout=60, allow_redirects=True) - resp.raise_for_status() - except Exception as e: - click.echo(f"Error: Failed to download from GitHub: {e}", err=True) + tmp_dir, repo_root = _download_repo_zip(spec, branch) + except Exception: + click.echo("Zip download failed, falling back to Contents API...") + + if repo_root: + try: + _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source) + return + except SystemExit: + raise + except Exception as e: + click.echo(f"Error processing zip: {e}", err=True) + click.echo("Falling back to Contents API...") + finally: + if tmp_dir: + shutil.rmtree(tmp_dir, ignore_errors=True) + + # --- Fallback: Contents API (rate-limited but works for single skill) --- + if not subpath: + click.echo("Error: Zip download failed and batch install requires zip.", err=True) + click.echo(f" Try again or specify a subpath: cow skill install {spec}#skills/", err=True) sys.exit(1) - with tempfile.TemporaryDirectory() as tmp_dir: - zip_path = os.path.join(tmp_dir, "repo.zip") - with open(zip_path, "wb") as f: - f.write(resp.content) + if not skill_name: + skill_name = subpath.rstrip("/").split("/")[-1] + _validate_skill_name(skill_name) - extract_dir = os.path.join(tmp_dir, "extracted") - with zipfile.ZipFile(zip_path, "r") as zf: - _safe_extractall(zf, extract_dir) + click.echo(f"Downloading via Contents API: {spec}/{subpath} ...") + target_dir = os.path.join(skills_dir, skill_name) + try: + with tempfile.TemporaryDirectory() as api_tmp: + api_dest = os.path.join(api_tmp, skill_name) + os.makedirs(api_dest) + _download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest) + if os.path.exists(target_dir): + shutil.rmtree(target_dir) + shutil.copytree(api_dest, target_dir) + _register_installed_skill(skill_name, source=source) + _print_install_success(skill_name, source) + except Exception as e: + click.echo(f"Error: Contents API also failed: {e}", err=True) + sys.exit(1) - top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")] - repo_root = extract_dir - if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])): - repo_root = os.path.join(extract_dir, top_items[0]) - if subpath: - source_dir = os.path.join(repo_root, subpath.strip("/")) - if not os.path.isdir(source_dir): - click.echo(f"Error: Path '{subpath}' not found in repository.", err=True) - sys.exit(1) - else: - source_dir = repo_root +def _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source): + """Install skill(s) from an already-extracted repo root directory.""" + if subpath: + source_dir = os.path.join(repo_root, subpath.strip("/")) + if not os.path.isdir(source_dir): + click.echo(f"Error: Path '{subpath}' not found in repository.", err=True) + sys.exit(1) + if not skill_name: + fm = _parse_skill_frontmatter( + _read_file_text(os.path.join(source_dir, "SKILL.md")) + ) + skill_name = fm.get("name") or subpath.rstrip("/").split("/")[-1] + _validate_skill_name(skill_name) + + target_dir = os.path.join(skills_dir, skill_name) if os.path.exists(target_dir): shutil.rmtree(target_dir) shutil.copytree(source_dir, target_dir) + _register_installed_skill(skill_name, source=source) + _print_install_success(skill_name, source) + else: + # Auto-discover all skills in the repo + discovered = _scan_skills_in_repo(repo_root) - _register_installed_skill(skill_name, source=source) - _print_install_success(skill_name, source) + if not discovered: + if skill_name: + _validate_skill_name(skill_name) + else: + skill_name = spec.split("/")[-1] + _validate_skill_name(skill_name) + target_dir = os.path.join(skills_dir, skill_name) + if os.path.exists(target_dir): + shutil.rmtree(target_dir) + shutil.copytree(repo_root, target_dir) + _register_installed_skill(skill_name, source=source) + _print_install_success(skill_name, source) + return + + click.echo(f"Found {len(discovered)} skill(s) in {spec}:\n") + installed_names = [] + for sname, sdir in discovered: + safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64] + if not _SAFE_NAME_RE.match(safe_name): + click.echo(click.style(f" ✗ Skipping '{sname}' (invalid name)", fg="yellow")) + continue + target_dir = os.path.join(skills_dir, safe_name) + if os.path.exists(target_dir): + shutil.rmtree(target_dir) + shutil.copytree(sdir, target_dir) + _register_installed_skill(safe_name, source=source) + installed_names.append(safe_name) + + if installed_names: + click.echo("") + for n in installed_names: + _print_install_success(n, source) + click.echo(f"\n {len(installed_names)} skill(s) installed from {spec}.") + else: + click.echo("No valid skills found in repository.") + + +def _install_gitlab(spec, subpath=None, branch="main"): + """Install skill(s) from a GitLab repo via zip download.""" + _validate_github_spec(spec) + + skills_dir = get_skills_dir() + os.makedirs(skills_dir, exist_ok=True) + + click.echo(f"Downloading from GitLab: {spec} (branch: {branch})...") + + try: + tmp_dir, repo_root = _download_repo_zip(spec, branch, host="gitlab") + except Exception as e: + click.echo(f"Error: Failed to download from GitLab: {e}", err=True) + sys.exit(1) + + try: + _install_from_repo_root(repo_root, spec, subpath, None, skills_dir, "gitlab") + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + +def _install_git_clone(git_url: str, display_name: str = ""): + """Install skill(s) from any git URL via shallow clone.""" + skills_dir = get_skills_dir() + os.makedirs(skills_dir, exist_ok=True) + + click.echo(f"Cloning {display_name or git_url} ...") + + try: + tmp_dir, repo_root = _clone_repo(git_url) + except RuntimeError as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + try: + _install_from_repo_root(repo_root, display_name or git_url, None, None, skills_dir, "git") + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) def _install_zip_bytes(content, name, skills_dir):