From 29c4be6a3afa4586d5138913886c43bfa9496ee0 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Sat, 30 May 2026 19:10:56 +0800 Subject: [PATCH] feat(terminal): add agent streaming UX with reasoning/tool-call rendering --- channel/terminal/terminal_channel.py | 281 ++++++++++++++++++++++++++- channel/web/web_channel.py | 37 ++-- cli/commands/process.py | 25 ++- 3 files changed, 315 insertions(+), 28 deletions(-) diff --git a/channel/terminal/terminal_channel.py b/channel/terminal/terminal_channel.py index 9b64eb47..f439888b 100644 --- a/channel/terminal/terminal_channel.py +++ b/channel/terminal/terminal_channel.py @@ -1,4 +1,7 @@ +import json +import os import sys +import time from bridge.context import * from bridge.reply import Reply, ReplyType @@ -8,6 +11,164 @@ from common.log import logger from config import conf +class _Style: + """ANSI escape codes for terminal styling. Disabled when not a tty.""" + + enabled = sys.stdout.isatty() + + RESET = "\033[0m" + BOLD = "\033[1m" + DIM = "\033[2m" + ITALIC = "\033[3m" + + GRAY = "\033[90m" + RED = "\033[31m" + GREEN = "\033[32m" + YELLOW = "\033[33m" + BLUE = "\033[34m" + MAGENTA = "\033[35m" + CYAN = "\033[36m" + + @classmethod + def wrap(cls, text, *codes): + if not cls.enabled or not codes: + return text + return "".join(codes) + text + cls.RESET + + +class TerminalAgentRenderer: + """Render agent stream events to the terminal in real time. + + Reuses the same `on_event` mechanism as the web channel so the terminal + can show reasoning, tool calls and streaming answer text just like the web UI. + """ + + def __init__(self): + self._reasoning_active = False + self._answer_active = False + self._has_output = False + # Track tool execution start time as a fallback when the event omits it + self._tool_started_at = {} + + def _print(self, text, end="", flush=True): + sys.stdout.write(text) + if end: + sys.stdout.write(end) + if flush: + sys.stdout.flush() + self._has_output = True + + def _close_section(self): + """Finish the currently open streaming section (reasoning or answer).""" + if self._reasoning_active: + self._print("", end="\n") + self._reasoning_active = False + if self._answer_active: + self._print("", end="\n") + self._answer_active = False + + def _format_arguments(self, arguments): + try: + if isinstance(arguments, (dict, list)): + text = json.dumps(arguments, ensure_ascii=False) + else: + text = str(arguments) + except Exception: + text = str(arguments) + # Keep tool input compact in the terminal + if len(text) > 300: + text = text[:300] + "…" + return text + + def handle_event(self, event: dict): + try: + self._handle_event(event) + except Exception as e: + logger.debug(f"[Terminal] render event error: {e}") + + def _handle_event(self, event: dict): + event_type = event.get("type") + data = event.get("data", {}) or {} + + if event_type == "agent_start": + self._print("\n" + _Style.wrap("Agent: ", _Style.BOLD, _Style.GREEN), end="\n") + + elif event_type == "reasoning_update": + delta = data.get("delta", "") + if not delta: + return + if self._answer_active: + self._close_section() + if not self._reasoning_active: + self._print(_Style.wrap("💭 思考 ", _Style.DIM, _Style.MAGENTA), end="\n") + self._reasoning_active = True + self._print(_Style.wrap(delta, _Style.DIM, _Style.ITALIC)) + + elif event_type == "message_update": + delta = data.get("delta", "") + if not delta: + return + if self._reasoning_active: + self._close_section() + self._answer_active = True + self._print(delta) + + elif event_type == "tool_execution_start": + self._close_section() + tool_name = data.get("tool_name", "tool") + tool_id = data.get("tool_call_id") + arguments = data.get("arguments", {}) + self._tool_started_at[tool_id] = time.time() + header = _Style.wrap(f"🔧 {tool_name}", _Style.BOLD, _Style.CYAN) + args_str = self._format_arguments(arguments) + self._print(f"{header} {_Style.wrap(args_str, _Style.GRAY)}", end="\n") + + elif event_type == "tool_execution_end": + tool_name = data.get("tool_name", "tool") + tool_id = data.get("tool_call_id") + status = data.get("status", "success") + result = data.get("result", "") + exec_time = data.get("execution_time") + if exec_time is None and tool_id in self._tool_started_at: + exec_time = time.time() - self._tool_started_at.pop(tool_id, time.time()) + success = status == "success" + icon = "✓" if success else "✗" + color = _Style.GREEN if success else _Style.RED + result_str = str(result) + if len(result_str) > 500: + result_str = result_str[:500] + "…" + # Indent multi-line tool output for readability + result_str = result_str.replace("\n", "\n ") + cost = f" ({exec_time:.2f}s)" if isinstance(exec_time, (int, float)) else "" + self._print( + _Style.wrap(f" {icon} {tool_name}{cost}", color) + " " + _Style.wrap(result_str, _Style.GRAY), + end="\n", + ) + + elif event_type == "file_to_send": + self._close_section() + file_path = data.get("path", "") + file_name = data.get("file_name", "") + label = file_name or file_path + self._print(_Style.wrap(f"📎 文件: {label}", _Style.BLUE), end="\n") + + elif event_type == "error": + self._close_section() + err_msg = data.get("error") or "unknown error" + self._print(_Style.wrap(f"❌ {err_msg}", _Style.BOLD, _Style.RED), end="\n") + + elif event_type == "agent_cancelled": + self._close_section() + self._print(_Style.wrap("⏹ 已中止", _Style.YELLOW), end="\n") + + elif event_type == "agent_end": + self._close_section() + + def finish(self): + """Ensure any open section is closed at the end of a turn.""" + self._close_section() + + class TerminalMessage(ChatMessage): def __init__( self, @@ -29,17 +190,33 @@ class TerminalMessage(ChatMessage): class TerminalChannel(ChatChannel): NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE] + def __init__(self): + super().__init__() + # Per-request renderers keyed by request_id; used to detect whether + # agent text was already streamed so send() can avoid duplicate output. + self._renderers = {} + # Callback that restores TTY attributes on exit (set in startup). + self._restore_terminal = None + def send(self, reply: Reply, context: Context): - print("\nBot:") + request_id = context.get("request_id") if context else None + renderer = self._renderers.pop(request_id, None) if request_id else None + streamed = renderer is not None and renderer._has_output + + if renderer is not None: + renderer.finish() + if reply.type == ReplyType.IMAGE: from PIL import Image image_storage = reply.content image_storage.seek(0) img = Image.open(image_storage) + if not streamed: + print("\nAgent: ") print("") img.show() - elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片 + elif reply.type == ReplyType.IMAGE_URL: # download image from url import io import requests @@ -52,38 +229,122 @@ class TerminalChannel(ChatChannel): image_storage.write(block) image_storage.seek(0) img = Image.open(image_storage) + if not streamed: + print("\nAgent: ") print(img_url) img.show() else: - print(reply.content) - print("\nUser:", end="") + # When agent already streamed the answer, skip re-printing the + # final text to avoid duplication; just emit a trailing newline. + if streamed: + print() + else: + print("\nAgent: ") + print(reply.content) + print("\nUser: ", end="") sys.stdout.flush() return + def _silence_console_logging(self): + """Mute console log output so background-thread logs (web/MCP/scheduler) + don't flood the interactive terminal. Logs still go to run.log in full. + + Configurable via `terminal_log_level` (default ERROR). The file handler + is untouched, so run.log keeps the complete log. + """ + import logging + + level_name = str(conf().get("terminal_log_level", "ERROR")).upper() + level = getattr(logging, level_name, logging.ERROR) + root_logger = logging.getLogger("log") + for handler in root_logger.handlers: + # Only raise the level of the stdout/stderr stream handler; + # keep FileHandler at the logger's level so run.log stays complete. + if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler): + handler.setLevel(level) + + def _install_terminal_guard(self): + """Save TTY attributes and register restore hooks so the terminal is + never left in a broken state (no echo / raw mode / leftover ANSI) after + the process exits, especially when Ctrl+C interrupts a blocking input(). + """ + if not sys.stdin.isatty(): + return + try: + import atexit + import termios + + saved_attrs = termios.tcgetattr(sys.stdin.fileno()) + + def _restore(): + try: + termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, saved_attrs) + except Exception: + pass + try: + if _Style.enabled: + sys.stdout.write(_Style.RESET) + sys.stdout.flush() + except Exception: + pass + + self._restore_terminal = _restore + atexit.register(_restore) + except Exception as e: + # termios is unavailable on Windows; skip the guard there. + logger.debug(f"[Terminal] terminal guard not installed: {e}") + self._restore_terminal = None + def startup(self): context = Context() - logger.setLevel("WARN") - print("\nPlease input your question:\nUser:", end="") + self._silence_console_logging() + self._install_terminal_guard() + print("\nPlease input your question:\nUser: ", end="") sys.stdout.flush() msg_id = 0 while True: try: prompt = self.get_input() - except KeyboardInterrupt: - print("\nExiting...") - sys.exit() + except (KeyboardInterrupt, EOFError): + self._shutdown() msg_id += 1 trigger_prefixs = conf().get("single_chat_prefix", [""]) if check_prefix(prompt, trigger_prefixs) is None: - prompt = trigger_prefixs[0] + prompt # 给没触发的消息加上触发前缀 + prompt = trigger_prefixs[0] + prompt # add trigger prefix to untriggered messages context = self._compose_context(ContextType.TEXT, prompt, msg=TerminalMessage(msg_id, prompt)) context["isgroup"] = False if context: + # Attach an agent event renderer so reasoning / tool calls / + # streaming answer show up live in the terminal (web-like UX). + request_id = str(msg_id) + context["request_id"] = request_id + renderer = TerminalAgentRenderer() + self._renderers[request_id] = renderer + context["on_event"] = renderer.handle_event self.produce(context) else: raise Exception("context is None") + def _shutdown(self): + """Restore terminal state and terminate the whole process. + + startup() runs in a daemon sub-thread, so sys.exit() would only kill + this thread and leave the main process (and web/MCP/scheduler threads) + alive, holding the terminal in a half-occupied state -> laggy input. + We reset any leftover ANSI styling and hard-exit the process instead. + """ + # Restore TTY attributes and reset any leftover ANSI styling + # (e.g. interrupted mid-stream output) before terminating. + if self._restore_terminal: + self._restore_terminal() + elif _Style.enabled: + sys.stdout.write(_Style.RESET) + sys.stdout.write("\nExiting...\n") + sys.stdout.flush() + # Hard-exit the entire process from a daemon thread. + os._exit(0) + def get_input(self): """ Multi-line input function diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 6cf6bc0b..120d8efd 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -1018,27 +1018,30 @@ class WebChannel(ChatChannel): self._cleanup_stale_voice_recordings() - # 打印可用渠道类型提示 + # Print available channel types logger.info( - "[WebChannel] 全部可用通道如下,可修改 config.json 配置文件中的 channel_type 字段进行切换,多个通道用逗号分隔:") - logger.info("[WebChannel] 1. weixin - 微信") - logger.info("[WebChannel] 2. web - 网页") - logger.info("[WebChannel] 3. terminal - 终端") - logger.info("[WebChannel] 4. feishu - 飞书") - logger.info("[WebChannel] 5. dingtalk - 钉钉") - logger.info("[WebChannel] 6. wecom_bot - 企微智能机器人") - logger.info("[WebChannel] 7. wechatcom_app - 企微自建应用") - logger.info("[WebChannel] 8. wechat_kf - 微信客服") - logger.info("[WebChannel] 9. wechatmp - 个人公众号") - logger.info("[WebChannel] 10. wechatmp_service - 企业公众号") - logger.info("[WebChannel] ✅ Web控制台已运行") - logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}") + "[WebChannel] Available channels (edit `channel_type` in config.json to switch, separate multiple with commas):") + logger.info("[WebChannel] 1. web - Web") + logger.info("[WebChannel] 2. terminal - Terminal") + logger.info("[WebChannel] 3. weixin - WeChat") + logger.info("[WebChannel] 4. feishu - Feishu") + logger.info("[WebChannel] 5. dingtalk - DingTalk") + logger.info("[WebChannel] 6. wecom_bot - WeCom Bot") + logger.info("[WebChannel] 7. wechatcom_app - WeCom App") + logger.info("[WebChannel] 8. wechat_kf - WeChat Customer Service") + logger.info("[WebChannel] 9. wechatmp - WeChat Official Account") + logger.info("[WebChannel] 10. wechatmp_service - WeChat Official Account (Service)") + logger.info("[WebChannel] 11. telegram - Telegram") + logger.info("[WebChannel] 12. slack - Slack") + logger.info("[WebChannel] 13. discord - Discord") + logger.info("[WebChannel] ✅ Web console is running") + logger.info(f"[WebChannel] 🌐 Local access: http://localhost:{port}") if is_public_bind: - logger.info(f"[WebChannel] 🌍 服务器访问: http://YOUR_IP:{port} (将YOUR_IP替换为服务器IP)") + logger.info(f"[WebChannel] 🌍 Server access: http://YOUR_IP:{port} (replace YOUR_IP with your server IP)") if not _is_password_enabled(): - logger.info("[WebChannel] ⚠️ 当前监听 0.0.0.0 且未设置 web_password,公网部署建议在 config.json 中配置访问密码") + logger.info("[WebChannel] ⚠️ Listening on 0.0.0.0 without web_password set; set an access password in config.json for public deployment") else: - logger.info(f"[WebChannel] 🔒 当前仅监听 {host},仅本机可访问。如需公网访问,请将 web_host 改为 0.0.0.0 并配置 web_password 密码") + logger.info(f"[WebChannel] 🔒 Listening on {host} only (local access). For public access, set web_host to 0.0.0.0 and configure web_password") try: import webbrowser diff --git a/cli/commands/process.py b/cli/commands/process.py index 0996ffca..2176fbf7 100644 --- a/cli/commands/process.py +++ b/cli/commands/process.py @@ -8,11 +8,28 @@ from typing import Optional import click -from cli.utils import get_project_root +from cli.utils import get_project_root, load_config_json _IS_WIN = sys.platform == "win32" +def _is_terminal_only() -> bool: + """Whether terminal is the only configured channel. + + Terminal needs an interactive stdin/tty, which is incompatible with the + background daemon mode (stdout/stdin detached). When terminal is the only + channel, `start` must run in the foreground so it can own the tty. + """ + channel = load_config_json().get("channel_type", "") + if isinstance(channel, str): + names = [c.strip() for c in channel.split(",") if c.strip()] + elif isinstance(channel, (list, tuple)): + names = [str(c).strip() for c in channel if str(c).strip()] + else: + names = [] + return names == ["terminal"] + + def _get_pid_file(): return os.path.join(get_project_root(), ".cow.pid") @@ -103,6 +120,12 @@ def start(foreground, no_logs): python = sys.executable + # Terminal-only setups need an interactive tty; force foreground so the + # terminal channel can read stdin instead of fighting the shell over the tty. + if not foreground and _is_terminal_only(): + foreground = True + click.echo("Detected terminal-only channel, starting in foreground...") + if foreground: click.echo("Starting CowAgent in foreground...") if _IS_WIN: