""" Bash tool - Execute bash commands """ import os import re import sys import subprocess import tempfile from typing import Dict, Any from agent.tools.base_tool import BaseTool, ToolResult from agent.tools.utils.truncate import truncate_tail, format_size, DEFAULT_MAX_LINES, DEFAULT_MAX_BYTES from common.log import logger from common.utils import expand_path class Bash(BaseTool): """Tool for executing bash commands""" _IS_WIN = sys.platform == "win32" name: str = "bash" description: str = f"""Execute a bash command in the current working directory. Returns stdout and stderr. Output is truncated to last {DEFAULT_MAX_LINES} lines or {DEFAULT_MAX_BYTES // 1024}KB (whichever is hit first). If truncated, full output is saved to a temp file. {''' PLATFORM: Windows (cmd.exe). Do NOT use Unix-only commands like grep, head, tail, sed, awk. ''' if _IS_WIN else ''} ENVIRONMENT: All API keys from env_config are auto-injected. Use $VAR_NAME directly. SAFETY: - Freely create/modify/delete files within the workspace - For destructive commands out of workspace, explain and confirm first""" params: dict = { "type": "object", "properties": { "command": { "type": "string", "description": "Bash command to execute" }, "timeout": { "type": "integer", "description": "Timeout in seconds (optional, default: 30)" } }, "required": ["command"] } def __init__(self, config: dict = None): self.config = config or {} self.cwd = self.config.get("cwd", os.getcwd()) # Ensure working directory exists if not os.path.exists(self.cwd): os.makedirs(self.cwd, exist_ok=True) self.default_timeout = self.config.get("timeout", 30) # Enable safety mode by default (can be disabled in config) self.safety_mode = self.config.get("safety_mode", True) def execute(self, args: Dict[str, Any]) -> ToolResult: """ Execute a bash command :param args: Dictionary containing the command and optional timeout :return: Command output or error """ command = args.get("command", "").strip() timeout = args.get("timeout", self.default_timeout) if not command: return ToolResult.fail("Error: command parameter is required") # Security check: Prevent accessing sensitive config files if "~/.cow/.env" in command or "~/.cow" in command: return ToolResult.fail( "Error: Access denied. API keys and credentials must be accessed through the env_config tool only." ) # Optional safety check - only warn about extremely dangerous commands if self.safety_mode: warning = self._get_safety_warning(command) if warning: return ToolResult.fail( f"Safety Warning: {warning}\n\nIf you believe this command is safe and necessary, please ask the user for confirmation first, explaining what the command does and why it's needed.") try: # Prepare environment with .env file variables env = os.environ.copy() # Load environment variables from ~/.cow/.env if it exists env_file = expand_path("~/.cow/.env") dotenv_vars = {} if os.path.exists(env_file): try: from dotenv import dotenv_values dotenv_vars = dotenv_values(env_file) env.update(dotenv_vars) logger.debug(f"[Bash] Loaded {len(dotenv_vars)} variables from {env_file}") except ImportError: logger.debug("[Bash] python-dotenv not installed, skipping .env loading") except Exception as e: logger.debug(f"[Bash] Failed to load .env: {e}") # getuid() only exists on Unix-like systems if hasattr(os, 'getuid'): logger.debug(f"[Bash] Process UID: {os.getuid()}") else: logger.debug(f"[Bash] Process User: {os.environ.get('USERNAME', os.environ.get('USER', 'unknown'))}") # On Windows, convert $VAR references to %VAR% for cmd.exe if self._IS_WIN: env["PYTHONIOENCODING"] = "utf-8" command = self._convert_env_vars_for_windows(command, dotenv_vars) if command and not command.strip().lower().startswith("chcp"): command = f"chcp 65001 >nul 2>&1 && {command}" result = subprocess.run( command, shell=True, cwd=self.cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", timeout=timeout, env=env, ) logger.debug(f"[Bash] Exit code: {result.returncode}") logger.debug(f"[Bash] Stdout length: {len(result.stdout)}") logger.debug(f"[Bash] Stderr length: {len(result.stderr)}") # Workaround for exit code 126 with no output if result.returncode == 126 and not result.stdout and not result.stderr: logger.warning(f"[Bash] Exit 126 with no output - trying alternative execution method") # Try using argument list instead of shell=True import shlex try: parts = shlex.split(command) if len(parts) > 0: logger.info(f"[Bash] Retrying with argument list: {parts[:3]}...") retry_result = subprocess.run( parts, cwd=self.cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", timeout=timeout, env=env ) logger.debug(f"[Bash] Retry exit code: {retry_result.returncode}, stdout: {len(retry_result.stdout)}, stderr: {len(retry_result.stderr)}") # If retry succeeded, use retry result if retry_result.returncode == 0 or retry_result.stdout or retry_result.stderr: result = retry_result else: # Both attempts failed - check if this is openai-image-vision skill if 'openai-image-vision' in command or 'vision.sh' in command: # Create a mock result with helpful error message from types import SimpleNamespace result = SimpleNamespace( returncode=1, stdout='{"error": "图片无法解析", "reason": "该图片格式可能不受支持,或图片文件存在问题", "suggestion": "请尝试其他图片"}', stderr='' ) logger.info(f"[Bash] Converted exit 126 to user-friendly image error message for vision skill") except Exception as retry_err: logger.warning(f"[Bash] Retry failed: {retry_err}") # When command succeeds with stdout, keep output clean (stderr goes to server log only). # When command fails or stdout is empty, include stderr so the agent can diagnose. if result.returncode == 0 and result.stdout.strip(): output = result.stdout if result.stderr: logger.info(f"[Bash] stderr (not forwarded): {result.stderr[:500]}") else: output = result.stdout if result.stderr: output += "\n" + result.stderr # Check if we need to save full output to temp file temp_file_path = None total_bytes = len(output.encode('utf-8')) if total_bytes > DEFAULT_MAX_BYTES: # Save full output to temp file with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log', prefix='bash-') as f: f.write(output) temp_file_path = f.name # Apply tail truncation truncation = truncate_tail(output) output_text = truncation.content or "(no output)" # Build result details = {} if truncation.truncated: details["truncation"] = truncation.to_dict() if temp_file_path: details["full_output_path"] = temp_file_path # Build notice start_line = truncation.total_lines - truncation.output_lines + 1 end_line = truncation.total_lines if truncation.last_line_partial: # Edge case: last line alone > 30KB last_line = output.split('\n')[-1] if output else "" last_line_size = format_size(len(last_line.encode('utf-8'))) output_text += f"\n\n[Showing last {format_size(truncation.output_bytes)} of line {end_line} (line is {last_line_size}). Full output: {temp_file_path}]" elif truncation.truncated_by == "lines": output_text += f"\n\n[Showing lines {start_line}-{end_line} of {truncation.total_lines}. Full output: {temp_file_path}]" else: output_text += f"\n\n[Showing lines {start_line}-{end_line} of {truncation.total_lines} ({format_size(DEFAULT_MAX_BYTES)} limit). Full output: {temp_file_path}]" # Check exit code if result.returncode != 0: output_text += f"\n\nCommand exited with code {result.returncode}" return ToolResult.fail({ "output": output_text, "exit_code": result.returncode, "details": details if details else None }) return ToolResult.success({ "output": output_text, "exit_code": result.returncode, "details": details if details else None }) except subprocess.TimeoutExpired: return ToolResult.fail(f"Error: Command timed out after {timeout} seconds") except Exception as e: return ToolResult.fail(f"Error executing command: {str(e)}") def _get_safety_warning(self, command: str) -> str: """ Get safety warning for absolutely catastrophic commands only. Keep the blocklist minimal so the agent retains maximum freedom. :param command: Command to check :return: Warning message if dangerous, empty string if safe """ # Tokenize to avoid substring false positives (e.g. `rm -rf /tmp/x` # must not match `rm -rf /`). tokens = command.lower().split() # `rm -rf /` or `rm -rf /*` targeting the real root. for i, tok in enumerate(tokens): if tok != "rm": continue has_rf = False for j in range(i + 1, len(tokens)): t = tokens[j] if t.startswith("-") and "r" in t and "f" in t: has_rf = True elif t in ("--recursive", "--force"): continue elif t in ("/", "/*"): if has_rf: return "This command will delete the entire filesystem" break else: break # Disk wiping if "if=/dev/zero" in command.lower() and "dd " in command.lower(): return "This command can destroy disk data" # Power control - match only as a standalone word (\b enforces word boundary) if re.search(r'\b(shutdown|reboot|halt|poweroff)\b', command.lower()): return "This command will shut down or restart the system" return "" @staticmethod def _convert_env_vars_for_windows(command: str, dotenv_vars: dict) -> str: """ Convert bash-style $VAR / ${VAR} references to cmd.exe %VAR% syntax. Only converts variables loaded from .env (user-configured API keys etc.) to avoid breaking $PATH, jq expressions, regex, etc. """ if not dotenv_vars: return command def replace_match(m): var_name = m.group(1) or m.group(2) if var_name in dotenv_vars: return f"%{var_name}%" return m.group(0) return re.sub(r'\$\{(\w+)\}|\$(\w+)', replace_match, command)