fix(cli): add security hardening for skill install and process management

This commit is contained in:
zhayujie
2026-03-27 17:59:15 +08:00
parent 158510cbbe
commit db16bdf8cb
5 changed files with 635 additions and 39 deletions

View File

@@ -22,7 +22,7 @@ Commands:
context View or manage conversation context.
skill Manage CowAgent skills.
Tip: You can also send /help, /skill list, etc. in chat."""
Tip: You can also send /help, /skill list, etc. in agent chat."""
class CowCLI(click.Group):

View File

@@ -2,7 +2,6 @@
import os
import sys
import signal
import subprocess
import time
from typing import Optional
@@ -11,6 +10,8 @@ import click
from cli.utils import get_project_root
_IS_WIN = sys.platform == "win32"
def _get_pid_file():
return os.path.join(get_project_root(), ".cow.pid")
@@ -20,6 +21,40 @@ def _get_log_file():
return os.path.join(get_project_root(), "nohup.out")
def _is_pid_alive(pid: int) -> bool:
"""Check whether a process is still running (cross-platform)."""
if _IS_WIN:
try:
out = subprocess.check_output(
["tasklist", "/FI", f"PID eq {pid}", "/NH"],
stderr=subprocess.DEVNULL,
)
return str(pid) in out.decode(errors="ignore")
except Exception:
return False
else:
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
def _kill_pid(pid: int, force: bool = False):
"""Terminate a process by PID (cross-platform)."""
if _IS_WIN:
flag = "/F" if force else ""
cmd = ["taskkill"]
if force:
cmd.append("/F")
cmd.extend(["/PID", str(pid)])
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
import signal
sig = signal.SIGKILL if force else signal.SIGTERM
os.kill(pid, sig)
def _read_pid() -> Optional[int]:
pid_file = _get_pid_file()
if not os.path.exists(pid_file):
@@ -27,11 +62,16 @@ def _read_pid() -> Optional[int]:
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
os.kill(pid, 0)
return pid
except (ValueError, ProcessLookupError, PermissionError):
if _is_pid_alive(pid):
return pid
os.remove(pid_file)
return None
except (ValueError, OSError):
try:
os.remove(pid_file)
except OSError:
pass
return None
def _write_pid(pid: int):
@@ -65,18 +105,29 @@ def start(foreground, no_logs):
if foreground:
click.echo("Starting CowAgent in foreground...")
os.execv(python, [python, app_py])
if _IS_WIN:
sys.exit(subprocess.call([python, app_py], cwd=root))
else:
os.execv(python, [python, app_py])
else:
log_file = _get_log_file()
click.echo("Starting CowAgent...")
popen_kwargs = dict(cwd=root)
if _IS_WIN:
CREATE_NO_WINDOW = 0x08000000
popen_kwargs["creationflags"] = (
subprocess.CREATE_NEW_PROCESS_GROUP | CREATE_NO_WINDOW
)
else:
popen_kwargs["start_new_session"] = True
with open(log_file, "a") as log:
proc = subprocess.Popen(
[python, app_py],
cwd=root,
stdout=log,
stderr=log,
start_new_session=True,
**popen_kwargs,
)
_write_pid(proc.pid)
click.echo(click.style(f"✓ CowAgent started (PID: {proc.pid})", fg="green"))
@@ -97,16 +148,14 @@ def stop():
click.echo(f"Stopping CowAgent (PID: {pid})...")
try:
os.kill(pid, signal.SIGTERM)
_kill_pid(pid)
for _ in range(30):
time.sleep(0.1)
try:
os.kill(pid, 0)
except ProcessLookupError:
if not _is_pid_alive(pid):
break
else:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
_kill_pid(pid, force=True)
except (ProcessLookupError, OSError):
pass
_remove_pid()
@@ -161,21 +210,32 @@ def logs(follow, lines):
if follow:
_tail_log(log_file, lines)
else:
subprocess.run(
["tail", "-n", str(lines), log_file],
stdout=sys.stdout,
stderr=sys.stderr,
)
_print_last_lines(log_file, lines)
def _print_last_lines(file_path: str, n: int = 50):
"""Print the last N lines of a file (cross-platform)."""
try:
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
for line in all_lines[-n:]:
click.echo(line, nl=False)
except Exception as e:
click.echo(f"Error reading log file: {e}", err=True)
def _tail_log(log_file: str, lines: int = 50):
"""Follow log file output. Blocks until Ctrl+C."""
"""Follow log file output. Blocks until Ctrl+C (cross-platform)."""
_print_last_lines(log_file, lines)
try:
proc = subprocess.Popen(
["tail", "-f", "-n", str(lines), log_file],
stdout=sys.stdout,
stderr=sys.stderr,
)
proc.wait()
with open(log_file, "r", encoding="utf-8", errors="replace") as f:
f.seek(0, 2)
while True:
line = f.readline()
if line:
click.echo(line, nl=False)
else:
time.sleep(0.3)
except KeyboardInterrupt:
pass

View File

@@ -1,12 +1,16 @@
"""cow skill - Skill management commands."""
import os
import re
import sys
import json
import hashlib
import shutil
import zipfile
import tempfile
from urllib.parse import urlparse
import click
import requests
@@ -18,6 +22,57 @@ from cli.utils import (
SKILL_HUB_API,
)
_SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$")
def _validate_skill_name(name: str):
"""Reject names that contain path traversal or special characters."""
if not _SAFE_NAME_RE.match(name):
click.echo(
f"Error: Invalid skill name '{name}'. "
"Use only letters, digits, hyphens, and underscores.",
err=True,
)
sys.exit(1)
def _validate_github_spec(spec: str):
"""Reject specs that don't look like owner/repo."""
if not re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+$", spec):
click.echo(f"Error: Invalid GitHub spec '{spec}'. Expected format: owner/repo", err=True)
sys.exit(1)
def _safe_extractall(zf: zipfile.ZipFile, dest: str):
"""Extract zip while guarding against Zip Slip (path traversal)."""
dest = os.path.realpath(dest)
for member in zf.infolist():
target = os.path.realpath(os.path.join(dest, member.filename))
if not target.startswith(dest + os.sep) and target != dest:
raise ValueError(f"Unsafe zip entry detected: {member.filename}")
zf.extractall(dest)
def _verify_checksum(content: bytes, expected: str):
"""Verify SHA-256 checksum of downloaded content.
Returns True if checksum matches or no expected value provided.
Exits with error if mismatch.
"""
if not expected:
return True
actual = hashlib.sha256(content).hexdigest()
if actual != expected.lower():
click.echo(
f"Error: Checksum mismatch!\n"
f" Expected: {expected}\n"
f" Actual: {actual}\n"
f"The downloaded package may have been tampered with.",
err=True,
)
sys.exit(1)
return True
@click.group()
def skill():
@@ -208,6 +263,7 @@ def install(name):
if name.startswith("github:"):
_install_github(name[7:])
else:
_validate_skill_name(name)
_install_hub(name)
@@ -239,18 +295,40 @@ def _install_hub(name):
if source_type == "github":
source_url = data.get("source_url", "")
_validate_github_spec(source_url)
source_path = data.get("source_path")
click.echo(f"Source: GitHub ({source_url})")
_install_github(source_url, subpath=source_path, skill_name=name)
return
if source_type == "registry":
click.echo(f"This skill is from an external registry: {data.get('source_url', '')}")
click.echo("Please install it through the corresponding platform.")
download_url = data.get("download_url")
if download_url:
parsed = urlparse(download_url)
if parsed.scheme != "https":
click.echo(f"Error: Refusing to download from non-HTTPS URL.", err=True)
sys.exit(1)
provider = data.get("source_provider", "registry")
expected_checksum = data.get("checksum") or data.get("sha256")
click.echo(f"Source: {provider}")
click.echo("Downloading skill package...")
try:
dl_resp = requests.get(download_url, timeout=60, allow_redirects=True)
dl_resp.raise_for_status()
except Exception as e:
click.echo(f"Error: Failed to download from {provider}: {e}", err=True)
sys.exit(1)
_verify_checksum(dl_resp.content, expected_checksum)
_install_zip_bytes(dl_resp.content, name, skills_dir)
click.echo(click.style(f"✓ Skill '{name}' installed successfully!", fg="green"))
else:
click.echo(f"Error: Unsupported registry provider.", err=True)
sys.exit(1)
return
if "redirect" in data:
source_url = data.get("source_url", "")
_validate_github_spec(source_url)
source_path = data.get("source_path")
click.echo(f"Source: GitHub ({source_url})")
_install_github(source_url, subpath=source_path, skill_name=name)
@@ -258,8 +336,9 @@ def _install_hub(name):
elif "application/zip" in content_type:
click.echo("Downloading skill package...")
expected_checksum = resp.headers.get("X-Checksum-Sha256")
_verify_checksum(resp.content, expected_checksum)
_install_zip_bytes(resp.content, name, skills_dir)
_report_install(name)
click.echo(click.style(f"✓ Skill '{name}' installed successfully!", fg="green"))
return
@@ -275,8 +354,11 @@ def _install_github(spec, subpath=None, skill_name=None):
if "#" in spec and not subpath:
spec, subpath = spec.split("#", 1)
_validate_github_spec(spec)
if not skill_name:
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
_validate_skill_name(skill_name)
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
@@ -298,7 +380,7 @@ def _install_github(spec, subpath=None, skill_name=None):
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
_safe_extractall(zf, extract_dir)
# GitHub archives have a top-level dir like "repo-main/"
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
@@ -319,7 +401,6 @@ def _install_github(spec, subpath=None, skill_name=None):
shutil.rmtree(target_dir)
shutil.copytree(source_dir, target_dir)
_report_install(skill_name)
click.echo(click.style(f"✓ Skill '{skill_name}' installed successfully!", fg="green"))
@@ -332,7 +413,7 @@ def _install_zip_bytes(content, name, skills_dir):
extract_dir = os.path.join(tmp_dir, "extracted")
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
_safe_extractall(zf, extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
source = extract_dir
@@ -345,12 +426,6 @@ def _install_zip_bytes(content, name, skills_dir):
shutil.copytree(source, target)
def _report_install(name):
"""Report installation to Skill Hub for download counting."""
try:
requests.post(f"{SKILL_HUB_API}/skills/{name}/install", json={}, timeout=5)
except Exception:
pass
# ------------------------------------------------------------------
@@ -361,6 +436,7 @@ def _report_install(name):
@click.option("--yes", "-y", is_flag=True, help="Skip confirmation")
def uninstall(name, yes):
"""Uninstall a skill."""
_validate_skill_name(name)
skills_dir = get_skills_dir()
skill_dir = os.path.join(skills_dir, name)
@@ -405,6 +481,7 @@ def disable(name):
def _set_enabled(name, enabled):
_validate_skill_name(name)
skills_dir = get_skills_dir()
config_path = os.path.join(skills_dir, "skills_config.json")
@@ -440,6 +517,7 @@ def _set_enabled(name, enabled):
@click.argument("name")
def info(name):
"""Show details about an installed skill."""
_validate_skill_name(name)
skills_dir = get_skills_dir()
builtin_dir = get_builtin_skills_dir()