mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 00:57:41 +08:00
feat: support github zip-first download, gitLab, git@ ssh, local path
This commit is contained in:
@@ -26,6 +26,12 @@ _SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$")
|
||||
_GITHUB_URL_RE = re.compile(
|
||||
r"^https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(?:tree|blob)/([^/]+)(?:/(.+))?)?/?$"
|
||||
)
|
||||
_GITLAB_URL_RE = re.compile(
|
||||
r"^https?://gitlab\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/-/tree/([^/]+)(?:/(.+))?)?/?$"
|
||||
)
|
||||
_GIT_SSH_RE = re.compile(
|
||||
r"^git@([^:]+):([^/]+)/([^/]+?)(?:\.git)?$"
|
||||
)
|
||||
|
||||
|
||||
def _parse_github_url(url: str):
|
||||
@@ -45,11 +51,92 @@ def _parse_github_url(url: str):
|
||||
return owner, repo, branch or "main", subpath
|
||||
|
||||
|
||||
def _parse_gitlab_url(url: str):
|
||||
"""Parse a GitLab URL into (owner, repo, branch, subpath).
|
||||
|
||||
Returns None if the URL doesn't match.
|
||||
Supported formats:
|
||||
https://gitlab.com/owner/repo
|
||||
https://gitlab.com/owner/repo/-/tree/branch
|
||||
https://gitlab.com/owner/repo/-/tree/branch/path/to/skill
|
||||
"""
|
||||
m = _GITLAB_URL_RE.match(url.strip())
|
||||
if not m:
|
||||
return None
|
||||
owner, repo, branch, subpath = m.groups()
|
||||
return owner, repo, branch or "main", subpath
|
||||
|
||||
|
||||
def _parse_git_ssh_url(url: str):
|
||||
"""Parse a git@ SSH URL into (host, owner, repo).
|
||||
|
||||
Returns None if the URL doesn't match.
|
||||
Supported format: git@github.com:owner/repo.git
|
||||
"""
|
||||
m = _GIT_SSH_RE.match(url.strip())
|
||||
if not m:
|
||||
return None
|
||||
host, owner, repo = m.groups()
|
||||
return host, owner, repo
|
||||
|
||||
|
||||
def _clone_repo(git_url: str):
|
||||
"""Shallow-clone a git repo and return (tmp_dir, repo_root).
|
||||
|
||||
Requires git to be installed. The caller must clean up tmp_dir.
|
||||
"""
|
||||
tmp_dir = tempfile.mkdtemp(prefix="cow-skill-")
|
||||
repo_dir = os.path.join(tmp_dir, "repo")
|
||||
try:
|
||||
import subprocess
|
||||
subprocess.run(
|
||||
["git", "clone", "--depth", "1", git_url, repo_dir],
|
||||
check=True, capture_output=True, timeout=120,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
raise RuntimeError("git is not installed")
|
||||
except Exception as e:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
raise RuntimeError(f"git clone failed: {e}")
|
||||
return tmp_dir, repo_dir
|
||||
|
||||
|
||||
def _download_repo_zip(spec: str, branch: str = "main", host: str = "github"):
|
||||
"""Download a GitHub/GitLab repo as zip and extract it.
|
||||
|
||||
Returns (tmp_dir, repo_root) where tmp_dir is the temp directory to clean up
|
||||
and repo_root is the extracted repository root path.
|
||||
"""
|
||||
if host == "gitlab":
|
||||
zip_url = f"https://gitlab.com/{spec}/-/archive/{branch}/{spec.split('/')[-1]}-{branch}.zip"
|
||||
else:
|
||||
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
|
||||
resp = requests.get(zip_url, timeout=120, allow_redirects=True)
|
||||
resp.raise_for_status()
|
||||
|
||||
tmp_dir = tempfile.mkdtemp(prefix="cow-skill-")
|
||||
zip_path = os.path.join(tmp_dir, "repo.zip")
|
||||
with open(zip_path, "wb") as f:
|
||||
f.write(resp.content)
|
||||
|
||||
extract_dir = os.path.join(tmp_dir, "extracted")
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
_safe_extractall(zf, extract_dir)
|
||||
|
||||
# GitHub zips have a single top-level dir like "repo-main/"
|
||||
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
|
||||
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
|
||||
return tmp_dir, os.path.join(extract_dir, top_items[0])
|
||||
return tmp_dir, extract_dir
|
||||
|
||||
|
||||
def _download_github_dir(owner, repo, branch, subpath, dest_dir):
|
||||
"""Download a subdirectory from GitHub using the Contents API.
|
||||
|
||||
Recursively fetches all files under the given subpath and writes them
|
||||
to dest_dir. Raises on any network or API error.
|
||||
to dest_dir. Used as a fallback when zip download fails.
|
||||
Costs one API request per directory (60/hr unauthenticated).
|
||||
"""
|
||||
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{subpath}?ref={branch}"
|
||||
resp = requests.get(api_url, timeout=30, headers={"Accept": "application/vnd.github.v3+json"})
|
||||
@@ -80,6 +167,119 @@ def _download_github_dir(owner, repo, branch, subpath, dest_dir):
|
||||
_download_github_dir(owner, repo, branch, child_subpath, dest_dir)
|
||||
|
||||
|
||||
# Directories to search for skills following the Agent Skills convention
|
||||
_SKILL_SCAN_DIRS = [
|
||||
"skills",
|
||||
"skills/.curated",
|
||||
"skills/.experimental",
|
||||
]
|
||||
|
||||
_SKILL_SCAN_SKIP = {
|
||||
"node_modules", "__pycache__", ".git", ".github", "venv", ".venv",
|
||||
}
|
||||
|
||||
|
||||
def _scan_skills_in_repo(repo_root: str) -> list:
|
||||
"""Scan a repo for skill directories containing SKILL.md.
|
||||
|
||||
Searches in conventional locations (skills/, skills/.curated/, etc.)
|
||||
and also checks the repo root itself.
|
||||
|
||||
Returns a list of (skill_name, skill_dir_path) tuples.
|
||||
"""
|
||||
found = []
|
||||
|
||||
# Check repo root for a SKILL.md (single-skill repo)
|
||||
if os.path.isfile(os.path.join(repo_root, "SKILL.md")):
|
||||
fm = _parse_skill_frontmatter(_read_file_text(os.path.join(repo_root, "SKILL.md")))
|
||||
name = fm.get("name") or os.path.basename(repo_root)
|
||||
found.append((name, repo_root))
|
||||
return found
|
||||
|
||||
for scan_dir in _SKILL_SCAN_DIRS:
|
||||
search_root = os.path.join(repo_root, scan_dir)
|
||||
if not os.path.isdir(search_root):
|
||||
continue
|
||||
for entry in os.listdir(search_root):
|
||||
if entry.startswith(".") and entry not in (".curated", ".experimental"):
|
||||
continue
|
||||
if entry in _SKILL_SCAN_SKIP:
|
||||
continue
|
||||
entry_path = os.path.join(search_root, entry)
|
||||
if os.path.isdir(entry_path) and os.path.isfile(os.path.join(entry_path, "SKILL.md")):
|
||||
fm = _parse_skill_frontmatter(
|
||||
_read_file_text(os.path.join(entry_path, "SKILL.md"))
|
||||
)
|
||||
name = fm.get("name") or entry
|
||||
found.append((name, entry_path))
|
||||
|
||||
return found
|
||||
|
||||
|
||||
def _read_file_text(path: str) -> str:
|
||||
"""Read a file as UTF-8 text, returning empty string on failure."""
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def _install_local(path: str):
|
||||
"""Install skill(s) from a local directory.
|
||||
|
||||
If the path contains SKILL.md directly, install it as a single skill.
|
||||
Otherwise scan for skills/ subdirectories with SKILL.md.
|
||||
"""
|
||||
path = os.path.abspath(os.path.expanduser(path))
|
||||
if not os.path.isdir(path):
|
||||
click.echo(f"Error: '{path}' is not a directory.", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
skills_dir = get_skills_dir()
|
||||
os.makedirs(skills_dir, exist_ok=True)
|
||||
|
||||
if os.path.isfile(os.path.join(path, "SKILL.md")):
|
||||
fm = _parse_skill_frontmatter(_read_file_text(os.path.join(path, "SKILL.md")))
|
||||
skill_name = fm.get("name") or os.path.basename(path)
|
||||
skill_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', skill_name)[:64]
|
||||
_validate_skill_name(skill_name)
|
||||
target_dir = os.path.join(skills_dir, skill_name)
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(path, target_dir)
|
||||
_register_installed_skill(skill_name, source="local")
|
||||
_print_install_success(skill_name, "local")
|
||||
return
|
||||
|
||||
discovered = _scan_skills_in_repo(path)
|
||||
if not discovered:
|
||||
click.echo(f"Error: No skills found in '{path}'.", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
click.echo(f"Found {len(discovered)} skill(s) in {path}:\n")
|
||||
installed_names = []
|
||||
for sname, sdir in discovered:
|
||||
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64]
|
||||
if not _SAFE_NAME_RE.match(safe_name):
|
||||
click.echo(click.style(f" ✗ Skipping '{sname}' (invalid name)", fg="yellow"))
|
||||
continue
|
||||
target_dir = os.path.join(skills_dir, safe_name)
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(sdir, target_dir)
|
||||
_register_installed_skill(safe_name, source="local")
|
||||
installed_names.append(safe_name)
|
||||
|
||||
if installed_names:
|
||||
click.echo("")
|
||||
for n in installed_names:
|
||||
_print_install_success(n, "local")
|
||||
click.echo(f"\n {len(installed_names)} skill(s) installed from local path.")
|
||||
else:
|
||||
click.echo("No valid skills found in directory.")
|
||||
|
||||
|
||||
def _register_installed_skill(name: str, source: str = "cowhub"):
|
||||
"""Register a newly installed skill into skills_config.json.
|
||||
|
||||
@@ -417,50 +617,98 @@ def search(query):
|
||||
@skill.command()
|
||||
@click.argument("name")
|
||||
def install(name):
|
||||
"""Install a skill from Skill Hub, GitHub, or a SKILL.md URL.
|
||||
"""Install skill(s) from Skill Hub, GitHub, GitLab, git URL, or local path.
|
||||
|
||||
When given an owner/repo (or full URL), downloads the repo and
|
||||
auto-discovers all skills/ subdirectories containing SKILL.md,
|
||||
installing them in batch. Use a subpath to install a single skill.
|
||||
|
||||
Examples:
|
||||
|
||||
cow skill install pptx
|
||||
cow skill install pptx (from Skill Hub)
|
||||
|
||||
cow skill install github:owner/repo
|
||||
cow skill install larksuite/cli (GitHub shorthand, all skills)
|
||||
|
||||
cow skill install github:owner/repo#path/to/skill
|
||||
cow skill install larksuite/cli#skills/lark-im (single skill by subpath)
|
||||
|
||||
cow skill install https://github.com/owner/repo/tree/main/path/to/skill
|
||||
cow skill install https://github.com/owner/repo
|
||||
|
||||
cow skill install https://gitlab.com/org/repo
|
||||
|
||||
cow skill install git@github.com:owner/repo.git
|
||||
|
||||
cow skill install ./my-local-skills (local directory)
|
||||
|
||||
cow skill install https://example.com/path/to/SKILL.md
|
||||
"""
|
||||
# --- Local path ---
|
||||
if name.startswith(("./", "../", "/", "~/")):
|
||||
_install_local(name)
|
||||
return
|
||||
|
||||
# --- Direct SKILL.md URL ---
|
||||
if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"):
|
||||
# GitHub SKILL.md → strip filename and install the whole directory
|
||||
dir_url = re.sub(r'/SKILL\.md/?$', '', name)
|
||||
gh = _parse_github_url(dir_url)
|
||||
if gh:
|
||||
owner, repo, branch, subpath = gh
|
||||
spec = f"{owner}/{repo}"
|
||||
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
|
||||
_install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
|
||||
_install_github(f"{owner}/{repo}", subpath=subpath, skill_name=(
|
||||
subpath.rstrip("/").split("/")[-1] if subpath else repo
|
||||
), branch=branch)
|
||||
return
|
||||
_install_url(name)
|
||||
return
|
||||
|
||||
# --- Full GitHub URL ---
|
||||
parsed = _parse_github_url(name)
|
||||
if parsed:
|
||||
owner, repo, branch, subpath = parsed
|
||||
spec = f"{owner}/{repo}"
|
||||
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
|
||||
_install_github(spec, subpath=subpath, skill_name=skill_name, branch=branch)
|
||||
elif name.startswith("github:"):
|
||||
skill_name = name[7:]
|
||||
_validate_skill_name(skill_name)
|
||||
_install_hub(skill_name)
|
||||
elif name.startswith("clawhub:"):
|
||||
_install_github(f"{owner}/{repo}", subpath=subpath, branch=branch)
|
||||
return
|
||||
|
||||
# --- Full GitLab URL ---
|
||||
gl = _parse_gitlab_url(name)
|
||||
if gl:
|
||||
owner, repo, branch, subpath = gl
|
||||
_install_gitlab(f"{owner}/{repo}", subpath=subpath, branch=branch)
|
||||
return
|
||||
|
||||
# --- git@host:owner/repo.git SSH URL ---
|
||||
ssh = _parse_git_ssh_url(name)
|
||||
if ssh:
|
||||
host, owner, repo = ssh
|
||||
_install_git_clone(name, f"{owner}/{repo}")
|
||||
return
|
||||
|
||||
# --- github: prefix ---
|
||||
if name.startswith("github:"):
|
||||
raw = name[7:]
|
||||
subpath = None
|
||||
if "#" in raw:
|
||||
raw, subpath = raw.split("#", 1)
|
||||
_validate_github_spec(raw)
|
||||
_install_github(raw, subpath=subpath)
|
||||
return
|
||||
|
||||
# --- clawhub: prefix ---
|
||||
if name.startswith("clawhub:"):
|
||||
skill_name = name[8:]
|
||||
_validate_skill_name(skill_name)
|
||||
_install_hub(skill_name, provider="clawhub")
|
||||
else:
|
||||
_validate_skill_name(name)
|
||||
_install_hub(name)
|
||||
return
|
||||
|
||||
# --- owner/repo or owner/repo#subpath shorthand ---
|
||||
if re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+(?:#.+)?$", name):
|
||||
subpath = None
|
||||
spec = name
|
||||
if "#" in spec:
|
||||
spec, subpath = spec.split("#", 1)
|
||||
_install_github(spec, subpath=subpath)
|
||||
return
|
||||
|
||||
# --- Fallback: Skill Hub by name ---
|
||||
_validate_skill_name(name)
|
||||
_install_hub(name)
|
||||
|
||||
|
||||
def _install_hub(name, provider=None):
|
||||
@@ -562,7 +810,13 @@ def _install_hub(name, provider=None):
|
||||
|
||||
|
||||
def _install_github(spec, subpath=None, skill_name=None, branch="main", source="github"):
|
||||
"""Install a skill from a GitHub repo.
|
||||
"""Install skill(s) from a GitHub repo.
|
||||
|
||||
Strategy: zip download first (no API rate limit), Contents API as fallback.
|
||||
|
||||
When subpath is given, install that single skill directory.
|
||||
When subpath is None, scan the repo for all skills/ subdirectories containing
|
||||
SKILL.md and batch-install them.
|
||||
|
||||
spec format: owner/repo or owner/repo#path
|
||||
"""
|
||||
@@ -571,72 +825,160 @@ def _install_github(spec, subpath=None, skill_name=None, branch="main", source="
|
||||
|
||||
_validate_github_spec(spec)
|
||||
|
||||
if not skill_name:
|
||||
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
|
||||
_validate_skill_name(skill_name)
|
||||
|
||||
skills_dir = get_skills_dir()
|
||||
os.makedirs(skills_dir, exist_ok=True)
|
||||
target_dir = os.path.join(skills_dir, skill_name)
|
||||
|
||||
owner, repo = spec.split("/", 1)
|
||||
|
||||
# For subpath installs, try GitHub Contents API first (avoids downloading entire repo)
|
||||
if subpath:
|
||||
click.echo(f"Downloading from GitHub: {spec}/{subpath} (branch: {branch})...")
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
api_dest = os.path.join(tmp_dir, skill_name)
|
||||
os.makedirs(api_dest)
|
||||
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(api_dest, target_dir)
|
||||
_register_installed_skill(skill_name, source=source)
|
||||
_print_install_success(skill_name, source)
|
||||
return
|
||||
except Exception:
|
||||
click.echo("Contents API unavailable, falling back to zip download...")
|
||||
|
||||
# Fallback: download full repo zip
|
||||
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
|
||||
# --- Primary: zip download (no rate limit) ---
|
||||
click.echo(f"Downloading from GitHub: {spec} (branch: {branch})...")
|
||||
|
||||
tmp_dir = None
|
||||
repo_root = None
|
||||
try:
|
||||
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
|
||||
resp.raise_for_status()
|
||||
except Exception as e:
|
||||
click.echo(f"Error: Failed to download from GitHub: {e}", err=True)
|
||||
tmp_dir, repo_root = _download_repo_zip(spec, branch)
|
||||
except Exception:
|
||||
click.echo("Zip download failed, falling back to Contents API...")
|
||||
|
||||
if repo_root:
|
||||
try:
|
||||
_install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source)
|
||||
return
|
||||
except SystemExit:
|
||||
raise
|
||||
except Exception as e:
|
||||
click.echo(f"Error processing zip: {e}", err=True)
|
||||
click.echo("Falling back to Contents API...")
|
||||
finally:
|
||||
if tmp_dir:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
# --- Fallback: Contents API (rate-limited but works for single skill) ---
|
||||
if not subpath:
|
||||
click.echo("Error: Zip download failed and batch install requires zip.", err=True)
|
||||
click.echo(f" Try again or specify a subpath: cow skill install {spec}#skills/<name>", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
zip_path = os.path.join(tmp_dir, "repo.zip")
|
||||
with open(zip_path, "wb") as f:
|
||||
f.write(resp.content)
|
||||
if not skill_name:
|
||||
skill_name = subpath.rstrip("/").split("/")[-1]
|
||||
_validate_skill_name(skill_name)
|
||||
|
||||
extract_dir = os.path.join(tmp_dir, "extracted")
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
_safe_extractall(zf, extract_dir)
|
||||
click.echo(f"Downloading via Contents API: {spec}/{subpath} ...")
|
||||
target_dir = os.path.join(skills_dir, skill_name)
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as api_tmp:
|
||||
api_dest = os.path.join(api_tmp, skill_name)
|
||||
os.makedirs(api_dest)
|
||||
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(api_dest, target_dir)
|
||||
_register_installed_skill(skill_name, source=source)
|
||||
_print_install_success(skill_name, source)
|
||||
except Exception as e:
|
||||
click.echo(f"Error: Contents API also failed: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
|
||||
repo_root = extract_dir
|
||||
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
|
||||
repo_root = os.path.join(extract_dir, top_items[0])
|
||||
|
||||
if subpath:
|
||||
source_dir = os.path.join(repo_root, subpath.strip("/"))
|
||||
if not os.path.isdir(source_dir):
|
||||
click.echo(f"Error: Path '{subpath}' not found in repository.", err=True)
|
||||
sys.exit(1)
|
||||
else:
|
||||
source_dir = repo_root
|
||||
def _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source):
|
||||
"""Install skill(s) from an already-extracted repo root directory."""
|
||||
if subpath:
|
||||
source_dir = os.path.join(repo_root, subpath.strip("/"))
|
||||
if not os.path.isdir(source_dir):
|
||||
click.echo(f"Error: Path '{subpath}' not found in repository.", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
if not skill_name:
|
||||
fm = _parse_skill_frontmatter(
|
||||
_read_file_text(os.path.join(source_dir, "SKILL.md"))
|
||||
)
|
||||
skill_name = fm.get("name") or subpath.rstrip("/").split("/")[-1]
|
||||
_validate_skill_name(skill_name)
|
||||
|
||||
target_dir = os.path.join(skills_dir, skill_name)
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(source_dir, target_dir)
|
||||
_register_installed_skill(skill_name, source=source)
|
||||
_print_install_success(skill_name, source)
|
||||
else:
|
||||
# Auto-discover all skills in the repo
|
||||
discovered = _scan_skills_in_repo(repo_root)
|
||||
|
||||
_register_installed_skill(skill_name, source=source)
|
||||
_print_install_success(skill_name, source)
|
||||
if not discovered:
|
||||
if skill_name:
|
||||
_validate_skill_name(skill_name)
|
||||
else:
|
||||
skill_name = spec.split("/")[-1]
|
||||
_validate_skill_name(skill_name)
|
||||
target_dir = os.path.join(skills_dir, skill_name)
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(repo_root, target_dir)
|
||||
_register_installed_skill(skill_name, source=source)
|
||||
_print_install_success(skill_name, source)
|
||||
return
|
||||
|
||||
click.echo(f"Found {len(discovered)} skill(s) in {spec}:\n")
|
||||
installed_names = []
|
||||
for sname, sdir in discovered:
|
||||
safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64]
|
||||
if not _SAFE_NAME_RE.match(safe_name):
|
||||
click.echo(click.style(f" ✗ Skipping '{sname}' (invalid name)", fg="yellow"))
|
||||
continue
|
||||
target_dir = os.path.join(skills_dir, safe_name)
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(sdir, target_dir)
|
||||
_register_installed_skill(safe_name, source=source)
|
||||
installed_names.append(safe_name)
|
||||
|
||||
if installed_names:
|
||||
click.echo("")
|
||||
for n in installed_names:
|
||||
_print_install_success(n, source)
|
||||
click.echo(f"\n {len(installed_names)} skill(s) installed from {spec}.")
|
||||
else:
|
||||
click.echo("No valid skills found in repository.")
|
||||
|
||||
|
||||
def _install_gitlab(spec, subpath=None, branch="main"):
|
||||
"""Install skill(s) from a GitLab repo via zip download."""
|
||||
_validate_github_spec(spec)
|
||||
|
||||
skills_dir = get_skills_dir()
|
||||
os.makedirs(skills_dir, exist_ok=True)
|
||||
|
||||
click.echo(f"Downloading from GitLab: {spec} (branch: {branch})...")
|
||||
|
||||
try:
|
||||
tmp_dir, repo_root = _download_repo_zip(spec, branch, host="gitlab")
|
||||
except Exception as e:
|
||||
click.echo(f"Error: Failed to download from GitLab: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
_install_from_repo_root(repo_root, spec, subpath, None, skills_dir, "gitlab")
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
|
||||
def _install_git_clone(git_url: str, display_name: str = ""):
|
||||
"""Install skill(s) from any git URL via shallow clone."""
|
||||
skills_dir = get_skills_dir()
|
||||
os.makedirs(skills_dir, exist_ok=True)
|
||||
|
||||
click.echo(f"Cloning {display_name or git_url} ...")
|
||||
|
||||
try:
|
||||
tmp_dir, repo_root = _clone_repo(git_url)
|
||||
except RuntimeError as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
_install_from_repo_root(repo_root, display_name or git_url, None, None, skills_dir, "git")
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
|
||||
def _install_zip_bytes(content, name, skills_dir):
|
||||
|
||||
Reference in New Issue
Block a user