docs: add CLI system docs

This commit is contained in:
zhayujie
2026-03-29 17:57:12 +08:00
parent e06925ab85
commit 3cb5a0fbd6
62 changed files with 2995 additions and 750 deletions

View File

@@ -6,8 +6,10 @@ import subprocess
import click
MIN_PLAYWRIGHT_VERSION = "1.49.0"
MIN_GLIBC_VERSION = (2, 28)
PLAYWRIGHT_VERSION = "1.49.0"
PLAYWRIGHT_LEGACY_VERSION = "1.28.0"
GLIBC_THRESHOLD = (2, 28)
CHINA_MIRROR = "https://registry.npmmirror.com/-/binary/playwright"
def _has_display() -> bool:
@@ -16,16 +18,13 @@ def _has_display() -> bool:
def _is_headless_linux() -> bool:
"""True when running on a Linux server without a display."""
return sys.platform == "linux" and not _has_display()
def _get_installed_version() -> str:
"""Return installed playwright version string, or empty if not installed."""
python = sys.executable
try:
out = subprocess.check_output(
[python, "-c", "import playwright; print(playwright.__version__)"],
[sys.executable, "-c", "import playwright; print(playwright.__version__)"],
stderr=subprocess.DEVNULL,
)
return out.decode().strip()
@@ -34,7 +33,6 @@ def _get_installed_version() -> str:
def _version_tuple(v: str):
"""Parse '1.49.0' into (1, 49, 0)."""
try:
return tuple(int(x) for x in v.split(".")[:3])
except (ValueError, AttributeError):
@@ -42,7 +40,6 @@ def _version_tuple(v: str):
def _get_glibc_version():
"""Return glibc version as (major, minor) tuple, or None if unavailable."""
if sys.platform != "linux":
return None
try:
@@ -57,41 +54,62 @@ def _get_glibc_version():
return None
def _is_china_network() -> bool:
try:
out = subprocess.check_output(
[sys.executable, "-m", "pip", "config", "get", "global.index-url"],
stderr=subprocess.DEVNULL,
)
url = out.decode().strip().lower()
return any(kw in url for kw in ("tsinghua", "aliyun", "npmmirror", "douban", "ustc", "huawei", "tencentyun"))
except Exception:
return False
def _pip_install(package_spec: str) -> int:
"""Install a package, retrying with --user on permission failure."""
python = sys.executable
ret = subprocess.call([python, "-m", "pip", "install", package_spec])
if ret != 0:
click.echo(" Retrying with --user flag...")
ret = subprocess.call([python, "-m", "pip", "install", "--user", package_spec])
return ret
@click.command("install-browser")
def install_browser():
"""Install browser tool dependencies (Playwright + Chromium)."""
python = sys.executable
legacy_mode = False
# Pre-check: glibc version on Linux
if sys.platform == "linux":
glibc = _get_glibc_version()
if glibc and glibc < MIN_GLIBC_VERSION:
glibc_str = f"{glibc[0]}.{glibc[1]}"
click.echo(click.style(
f"Your system glibc version is {glibc_str}, "
f"but Playwright requires glibc >= {MIN_GLIBC_VERSION[0]}.{MIN_GLIBC_VERSION[1]}.\n"
f"(e.g. Ubuntu 18.04 ships glibc 2.27, CentOS 7 ships glibc 2.17)\n\n"
f"Options:\n"
f" 1. Upgrade your OS (e.g. Ubuntu 20.04+, Debian 10+, CentOS 8+)\n"
f" 2. Use Docker with a newer Linux image\n"
f" 3. Install an older playwright version manually (not recommended):\n"
f" pip install playwright==1.30.0 && playwright install chromium",
fg="red",
))
raise SystemExit(1)
# Determine playwright version based on glibc
glibc = _get_glibc_version()
if glibc and glibc < GLIBC_THRESHOLD:
legacy_mode = True
glibc_str = f"{glibc[0]}.{glibc[1]}"
click.echo(click.style(
f"glibc {glibc_str} detected (< 2.28). "
f"Will install playwright {PLAYWRIGHT_LEGACY_VERSION} for compatibility.",
fg="yellow",
))
click.echo(click.style(
" Note: upgrade your OS for full browser tool support.",
fg="yellow",
))
click.echo()
# Step 1: Install / upgrade playwright package
target_version = PLAYWRIGHT_LEGACY_VERSION if legacy_mode else PLAYWRIGHT_VERSION
# Step 1: Install playwright package
click.echo(click.style("[1/3] Installing playwright Python package...", fg="yellow"))
ret = subprocess.call([python, "-m", "pip", "install", f"playwright>={MIN_PLAYWRIGHT_VERSION}"])
ret = _pip_install(f"playwright=={target_version}" if legacy_mode else f"playwright>={target_version}")
if ret != 0:
click.echo(click.style("Failed to install playwright package.", fg="red"))
raise SystemExit(1)
installed = _get_installed_version()
if installed:
click.echo(click.style(f"playwright {installed} installed.", fg="green"))
else:
click.echo(click.style("playwright package installed.", fg="green"))
click.echo(click.style(f" playwright {installed} installed.", fg="green"))
click.echo()
# Step 2: System dependencies (Linux only)
@@ -100,7 +118,7 @@ def install_browser():
ret = subprocess.call([python, "-m", "playwright", "install-deps", "chromium"])
if ret != 0:
click.echo(click.style(
"Could not auto-install system deps (may need sudo).\n"
" Could not auto-install system deps (may need sudo).\n"
f" Run manually: sudo {python} -m playwright install-deps chromium",
fg="yellow",
))
@@ -113,20 +131,42 @@ def install_browser():
cmd = [python, "-m", "playwright", "install", "chromium"]
# --only-shell requires playwright >= 1.57
if _is_headless_linux():
ver = _version_tuple(_get_installed_version())
if _is_headless_linux() and not legacy_mode:
ver = _version_tuple(installed or "")
if ver >= (1, 57, 0):
cmd.append("--only-shell")
click.echo(" (headless shell for Linux server)")
else:
click.echo(" (full Chromium - upgrade to playwright>=1.57 for smaller headless-only install)")
elif sys.platform == "linux":
click.echo(" (full Chromium)")
elif sys.platform == "linux" and _has_display():
click.echo(" (full browser for Linux desktop)")
ret = subprocess.call(cmd)
# Use China mirror if pip is configured with a domestic index
env = os.environ.copy()
if _is_china_network():
env["PLAYWRIGHT_DOWNLOAD_HOST"] = CHINA_MIRROR
click.echo(f" (using China mirror: {CHINA_MIRROR})")
ret = subprocess.call(cmd, env=env)
if ret != 0:
click.echo(click.style("Failed to install Chromium.", fg="red"))
raise SystemExit(1)
# Quick smoke test
click.echo()
click.echo("Verifying browser installation...")
ret = subprocess.call(
[python, "-c", "from playwright.sync_api import sync_playwright; print('OK')"],
stderr=subprocess.DEVNULL,
)
if ret != 0:
click.echo(click.style(
" Warning: playwright import failed. Browser tool may not work on this system.\n"
" Consider upgrading your OS or using Docker.",
fg="yellow",
))
else:
click.echo(click.style(" Verification passed.", fg="green"))
click.echo()
click.echo(click.style("Browser tool ready! Restart CowAgent to enable it.", fg="green"))

View File

@@ -419,6 +419,87 @@ def _install_url(url: str, result: InstallResult):
result.messages.append(f"Installed '{skill_name}' from URL.")
def _install_archive_url(url: str, result: InstallResult):
"""Install skill(s) from a remote zip/tar.gz archive URL."""
parsed = urlparse(url)
if parsed.scheme != "https":
raise SkillInstallError("Refusing to download from non-HTTPS URL.")
filename = os.path.basename(parsed.path).split("?")[0]
fallback_name = re.sub(r'\.(zip|tar\.gz|tgz)$', '', filename, flags=re.IGNORECASE)
if not fallback_name or not _SAFE_NAME_RE.match(fallback_name):
fallback_name = "skill-package"
result.messages.append(f"Downloading archive from {url} ...")
try:
resp = requests.get(url, timeout=120, allow_redirects=True)
resp.raise_for_status()
except Exception as e:
raise SkillInstallError(f"Failed to download archive: {e}")
skills_dir = get_skills_dir()
os.makedirs(skills_dir, exist_ok=True)
content_type = resp.headers.get("Content-Type", "")
lower_url = url.lower()
if lower_url.endswith((".tar.gz", ".tgz")) or "gzip" in content_type:
_install_targz_bytes(resp.content, fallback_name, skills_dir, result)
else:
_install_zip_bytes(resp.content, fallback_name, skills_dir, result=result, source_label="url")
def _install_targz_bytes(content: bytes, name: str, skills_dir: str, result: InstallResult):
"""Extract a tar.gz archive and install skill(s)."""
with tempfile.TemporaryDirectory() as tmp_dir:
tar_path = os.path.join(tmp_dir, "package.tar.gz")
with open(tar_path, "wb") as f:
f.write(content)
import tarfile
extract_dir = os.path.join(tmp_dir, "extracted")
os.makedirs(extract_dir)
with tarfile.open(tar_path, "r:gz") as tf:
for member in tf.getmembers():
resolved = os.path.realpath(os.path.join(extract_dir, member.name))
if not resolved.startswith(os.path.realpath(extract_dir)):
raise SkillInstallError("Archive contains path traversal, aborting.")
tf.extractall(extract_dir)
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
pkg_root = extract_dir
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
pkg_root = os.path.join(extract_dir, top_items[0])
discovered = _scan_skills_in_repo(pkg_root) or _scan_skills_in_dir(pkg_root)
if discovered and len(discovered) > 1:
_batch_install_skills(discovered, name, skills_dir, "url", result)
return
if discovered and len(discovered) == 1:
sname, sdir = discovered[0]
safe_name = re.sub(r'[^a-zA-Z0-9_\\-]', '-', sname)[:64]
if not _SAFE_NAME_RE.match(safe_name):
safe_name = name
target = os.path.join(skills_dir, safe_name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(sdir, target)
_register_installed_skill(safe_name, source="url")
result.installed.append(safe_name)
result.messages.append(f"Installed '{safe_name}' from URL.")
return
target = os.path.join(skills_dir, name)
if os.path.exists(target):
shutil.rmtree(target)
shutil.copytree(pkg_root, target)
_register_installed_skill(name, source="url")
result.installed.append(name)
result.messages.append(f"Installed '{name}' from URL.")
def _print_install_success(name: str, source: str):
"""Print a unified install success message with description and source."""
skills_dir = get_skills_dir()
@@ -715,6 +796,11 @@ def _route_install(name: str, result: InstallResult):
_install_url(name, result)
return
# --- Zip / tar.gz archive URL ---
if name.startswith(("http://", "https://")) and re.search(r'\.(zip|tar\.gz|tgz)(\?.*)?$', name, re.IGNORECASE):
_install_archive_url(name, result)
return
# --- Full GitHub URL ---
parsed = _parse_github_url(name)
if parsed: