mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 00:57:41 +08:00
296 lines
13 KiB
Python
296 lines
13 KiB
Python
"""
|
|
Bash tool - Execute bash commands
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import subprocess
|
|
import tempfile
|
|
from typing import Dict, Any
|
|
|
|
from agent.tools.base_tool import BaseTool, ToolResult
|
|
from agent.tools.utils.truncate import truncate_tail, format_size, DEFAULT_MAX_LINES, DEFAULT_MAX_BYTES
|
|
from common.log import logger
|
|
from common.utils import expand_path
|
|
|
|
|
|
class Bash(BaseTool):
|
|
"""Tool for executing bash commands"""
|
|
|
|
_IS_WIN = sys.platform == "win32"
|
|
|
|
name: str = "bash"
|
|
description: str = f"""Execute a bash command in the current working directory. Returns stdout and stderr. Output is truncated to last {DEFAULT_MAX_LINES} lines or {DEFAULT_MAX_BYTES // 1024}KB (whichever is hit first). If truncated, full output is saved to a temp file.
|
|
{'''
|
|
PLATFORM: Windows (cmd.exe). Do NOT use Unix-only commands like grep, head, tail, sed, awk.
|
|
''' if _IS_WIN else ''}
|
|
ENVIRONMENT: All API keys from env_config are auto-injected. Use $VAR_NAME directly.
|
|
|
|
SAFETY:
|
|
- Freely create/modify/delete files within the workspace
|
|
- For destructive commands out of workspace, explain and confirm first"""
|
|
|
|
params: dict = {
|
|
"type": "object",
|
|
"properties": {
|
|
"command": {
|
|
"type": "string",
|
|
"description": "Bash command to execute"
|
|
},
|
|
"timeout": {
|
|
"type": "integer",
|
|
"description": "Timeout in seconds (optional, default: 30)"
|
|
}
|
|
},
|
|
"required": ["command"]
|
|
}
|
|
|
|
def __init__(self, config: dict = None):
|
|
self.config = config or {}
|
|
self.cwd = self.config.get("cwd", os.getcwd())
|
|
# Ensure working directory exists
|
|
if not os.path.exists(self.cwd):
|
|
os.makedirs(self.cwd, exist_ok=True)
|
|
self.default_timeout = self.config.get("timeout", 30)
|
|
# Enable safety mode by default (can be disabled in config)
|
|
self.safety_mode = self.config.get("safety_mode", True)
|
|
|
|
def execute(self, args: Dict[str, Any]) -> ToolResult:
|
|
"""
|
|
Execute a bash command
|
|
|
|
:param args: Dictionary containing the command and optional timeout
|
|
:return: Command output or error
|
|
"""
|
|
command = args.get("command", "").strip()
|
|
timeout = args.get("timeout", self.default_timeout)
|
|
|
|
if not command:
|
|
return ToolResult.fail("Error: command parameter is required")
|
|
|
|
# Security check: Prevent accessing sensitive config files
|
|
if "~/.cow/.env" in command or "~/.cow" in command:
|
|
return ToolResult.fail(
|
|
"Error: Access denied. API keys and credentials must be accessed through the env_config tool only."
|
|
)
|
|
|
|
# Optional safety check - only warn about extremely dangerous commands
|
|
if self.safety_mode:
|
|
warning = self._get_safety_warning(command)
|
|
if warning:
|
|
return ToolResult.fail(
|
|
f"Safety Warning: {warning}\n\nIf you believe this command is safe and necessary, please ask the user for confirmation first, explaining what the command does and why it's needed.")
|
|
|
|
try:
|
|
# Prepare environment with .env file variables
|
|
env = os.environ.copy()
|
|
|
|
# Load environment variables from ~/.cow/.env if it exists
|
|
env_file = expand_path("~/.cow/.env")
|
|
dotenv_vars = {}
|
|
if os.path.exists(env_file):
|
|
try:
|
|
from dotenv import dotenv_values
|
|
dotenv_vars = dotenv_values(env_file)
|
|
env.update(dotenv_vars)
|
|
logger.debug(f"[Bash] Loaded {len(dotenv_vars)} variables from {env_file}")
|
|
except ImportError:
|
|
logger.debug("[Bash] python-dotenv not installed, skipping .env loading")
|
|
except Exception as e:
|
|
logger.debug(f"[Bash] Failed to load .env: {e}")
|
|
|
|
# getuid() only exists on Unix-like systems
|
|
if hasattr(os, 'getuid'):
|
|
logger.debug(f"[Bash] Process UID: {os.getuid()}")
|
|
else:
|
|
logger.debug(f"[Bash] Process User: {os.environ.get('USERNAME', os.environ.get('USER', 'unknown'))}")
|
|
|
|
# On Windows, convert $VAR references to %VAR% for cmd.exe
|
|
if self._IS_WIN:
|
|
env["PYTHONIOENCODING"] = "utf-8"
|
|
command = self._convert_env_vars_for_windows(command, dotenv_vars)
|
|
if command and not command.strip().lower().startswith("chcp"):
|
|
command = f"chcp 65001 >nul 2>&1 && {command}"
|
|
|
|
result = subprocess.run(
|
|
command,
|
|
shell=True,
|
|
cwd=self.cwd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
timeout=timeout,
|
|
env=env,
|
|
)
|
|
|
|
logger.debug(f"[Bash] Exit code: {result.returncode}")
|
|
logger.debug(f"[Bash] Stdout length: {len(result.stdout)}")
|
|
logger.debug(f"[Bash] Stderr length: {len(result.stderr)}")
|
|
|
|
# Workaround for exit code 126 with no output
|
|
if result.returncode == 126 and not result.stdout and not result.stderr:
|
|
logger.warning(f"[Bash] Exit 126 with no output - trying alternative execution method")
|
|
# Try using argument list instead of shell=True
|
|
import shlex
|
|
try:
|
|
parts = shlex.split(command)
|
|
if len(parts) > 0:
|
|
logger.info(f"[Bash] Retrying with argument list: {parts[:3]}...")
|
|
retry_result = subprocess.run(
|
|
parts,
|
|
cwd=self.cwd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
timeout=timeout,
|
|
env=env
|
|
)
|
|
logger.debug(f"[Bash] Retry exit code: {retry_result.returncode}, stdout: {len(retry_result.stdout)}, stderr: {len(retry_result.stderr)}")
|
|
|
|
# If retry succeeded, use retry result
|
|
if retry_result.returncode == 0 or retry_result.stdout or retry_result.stderr:
|
|
result = retry_result
|
|
else:
|
|
# Both attempts failed - check if this is openai-image-vision skill
|
|
if 'openai-image-vision' in command or 'vision.sh' in command:
|
|
# Create a mock result with helpful error message
|
|
from types import SimpleNamespace
|
|
result = SimpleNamespace(
|
|
returncode=1,
|
|
stdout='{"error": "图片无法解析", "reason": "该图片格式可能不受支持,或图片文件存在问题", "suggestion": "请尝试其他图片"}',
|
|
stderr=''
|
|
)
|
|
logger.info(f"[Bash] Converted exit 126 to user-friendly image error message for vision skill")
|
|
except Exception as retry_err:
|
|
logger.warning(f"[Bash] Retry failed: {retry_err}")
|
|
|
|
# When command succeeds with stdout, keep output clean (stderr goes to server log only).
|
|
# When command fails or stdout is empty, include stderr so the agent can diagnose.
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
output = result.stdout
|
|
if result.stderr:
|
|
logger.info(f"[Bash] stderr (not forwarded): {result.stderr[:500]}")
|
|
else:
|
|
output = result.stdout
|
|
if result.stderr:
|
|
output += "\n" + result.stderr
|
|
|
|
# Check if we need to save full output to temp file
|
|
temp_file_path = None
|
|
total_bytes = len(output.encode('utf-8'))
|
|
|
|
if total_bytes > DEFAULT_MAX_BYTES:
|
|
# Save full output to temp file
|
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log', prefix='bash-') as f:
|
|
f.write(output)
|
|
temp_file_path = f.name
|
|
|
|
# Apply tail truncation
|
|
truncation = truncate_tail(output)
|
|
output_text = truncation.content or "(no output)"
|
|
|
|
# Build result
|
|
details = {}
|
|
|
|
if truncation.truncated:
|
|
details["truncation"] = truncation.to_dict()
|
|
if temp_file_path:
|
|
details["full_output_path"] = temp_file_path
|
|
|
|
# Build notice
|
|
start_line = truncation.total_lines - truncation.output_lines + 1
|
|
end_line = truncation.total_lines
|
|
|
|
if truncation.last_line_partial:
|
|
# Edge case: last line alone > 30KB
|
|
last_line = output.split('\n')[-1] if output else ""
|
|
last_line_size = format_size(len(last_line.encode('utf-8')))
|
|
output_text += f"\n\n[Showing last {format_size(truncation.output_bytes)} of line {end_line} (line is {last_line_size}). Full output: {temp_file_path}]"
|
|
elif truncation.truncated_by == "lines":
|
|
output_text += f"\n\n[Showing lines {start_line}-{end_line} of {truncation.total_lines}. Full output: {temp_file_path}]"
|
|
else:
|
|
output_text += f"\n\n[Showing lines {start_line}-{end_line} of {truncation.total_lines} ({format_size(DEFAULT_MAX_BYTES)} limit). Full output: {temp_file_path}]"
|
|
|
|
# Check exit code
|
|
if result.returncode != 0:
|
|
output_text += f"\n\nCommand exited with code {result.returncode}"
|
|
return ToolResult.fail({
|
|
"output": output_text,
|
|
"exit_code": result.returncode,
|
|
"details": details if details else None
|
|
})
|
|
|
|
return ToolResult.success({
|
|
"output": output_text,
|
|
"exit_code": result.returncode,
|
|
"details": details if details else None
|
|
})
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return ToolResult.fail(f"Error: Command timed out after {timeout} seconds")
|
|
except Exception as e:
|
|
return ToolResult.fail(f"Error executing command: {str(e)}")
|
|
|
|
def _get_safety_warning(self, command: str) -> str:
|
|
"""
|
|
Get safety warning for absolutely catastrophic commands only.
|
|
Keep the blocklist minimal so the agent retains maximum freedom.
|
|
|
|
:param command: Command to check
|
|
:return: Warning message if dangerous, empty string if safe
|
|
"""
|
|
# Tokenize to avoid substring false positives (e.g. `rm -rf /tmp/x`
|
|
# must not match `rm -rf /`).
|
|
tokens = command.lower().split()
|
|
|
|
# `rm -rf /` or `rm -rf /*` targeting the real root.
|
|
for i, tok in enumerate(tokens):
|
|
if tok != "rm":
|
|
continue
|
|
has_rf = False
|
|
for j in range(i + 1, len(tokens)):
|
|
t = tokens[j]
|
|
if t.startswith("-") and "r" in t and "f" in t:
|
|
has_rf = True
|
|
elif t in ("--recursive", "--force"):
|
|
continue
|
|
elif t in ("/", "/*"):
|
|
if has_rf:
|
|
return "This command will delete the entire filesystem"
|
|
break
|
|
else:
|
|
break
|
|
|
|
# Disk wiping
|
|
if "if=/dev/zero" in command.lower() and "dd " in command.lower():
|
|
return "This command can destroy disk data"
|
|
|
|
# Power control - match only as a standalone word (\b enforces word boundary)
|
|
if re.search(r'\b(shutdown|reboot|halt|poweroff)\b', command.lower()):
|
|
return "This command will shut down or restart the system"
|
|
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _convert_env_vars_for_windows(command: str, dotenv_vars: dict) -> str:
|
|
"""
|
|
Convert bash-style $VAR / ${VAR} references to cmd.exe %VAR% syntax.
|
|
Only converts variables loaded from .env (user-configured API keys etc.)
|
|
to avoid breaking $PATH, jq expressions, regex, etc.
|
|
"""
|
|
if not dotenv_vars:
|
|
return command
|
|
|
|
def replace_match(m):
|
|
var_name = m.group(1) or m.group(2)
|
|
if var_name in dotenv_vars:
|
|
return f"%{var_name}%"
|
|
return m.group(0)
|
|
|
|
return re.sub(r'\$\{(\w+)\}|\$(\w+)', replace_match, command)
|