feat(terminal): add agent streaming UX with reasoning/tool-call rendering

This commit is contained in:
zhayujie
2026-05-30 19:10:56 +08:00
parent c5b8e06891
commit 29c4be6a3a
3 changed files with 315 additions and 28 deletions

View File

@@ -1,4 +1,7 @@
import json
import os
import sys
import time
from bridge.context import *
from bridge.reply import Reply, ReplyType
@@ -8,6 +11,164 @@ from common.log import logger
from config import conf
class _Style:
"""ANSI escape codes for terminal styling. Disabled when not a tty."""
enabled = sys.stdout.isatty()
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
ITALIC = "\033[3m"
GRAY = "\033[90m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
@classmethod
def wrap(cls, text, *codes):
if not cls.enabled or not codes:
return text
return "".join(codes) + text + cls.RESET
class TerminalAgentRenderer:
"""Render agent stream events to the terminal in real time.
Reuses the same `on_event` mechanism as the web channel so the terminal
can show reasoning, tool calls and streaming answer text just like the web UI.
"""
def __init__(self):
self._reasoning_active = False
self._answer_active = False
self._has_output = False
# Track tool execution start time as a fallback when the event omits it
self._tool_started_at = {}
def _print(self, text, end="", flush=True):
sys.stdout.write(text)
if end:
sys.stdout.write(end)
if flush:
sys.stdout.flush()
self._has_output = True
def _close_section(self):
"""Finish the currently open streaming section (reasoning or answer)."""
if self._reasoning_active:
self._print("", end="\n")
self._reasoning_active = False
if self._answer_active:
self._print("", end="\n")
self._answer_active = False
def _format_arguments(self, arguments):
try:
if isinstance(arguments, (dict, list)):
text = json.dumps(arguments, ensure_ascii=False)
else:
text = str(arguments)
except Exception:
text = str(arguments)
# Keep tool input compact in the terminal
if len(text) > 300:
text = text[:300] + ""
return text
def handle_event(self, event: dict):
try:
self._handle_event(event)
except Exception as e:
logger.debug(f"[Terminal] render event error: {e}")
def _handle_event(self, event: dict):
event_type = event.get("type")
data = event.get("data", {}) or {}
if event_type == "agent_start":
self._print("\n" + _Style.wrap("Agent: ", _Style.BOLD, _Style.GREEN), end="\n")
elif event_type == "reasoning_update":
delta = data.get("delta", "")
if not delta:
return
if self._answer_active:
self._close_section()
if not self._reasoning_active:
self._print(_Style.wrap("💭 思考 ", _Style.DIM, _Style.MAGENTA), end="\n")
self._reasoning_active = True
self._print(_Style.wrap(delta, _Style.DIM, _Style.ITALIC))
elif event_type == "message_update":
delta = data.get("delta", "")
if not delta:
return
if self._reasoning_active:
self._close_section()
self._answer_active = True
self._print(delta)
elif event_type == "tool_execution_start":
self._close_section()
tool_name = data.get("tool_name", "tool")
tool_id = data.get("tool_call_id")
arguments = data.get("arguments", {})
self._tool_started_at[tool_id] = time.time()
header = _Style.wrap(f"🔧 {tool_name}", _Style.BOLD, _Style.CYAN)
args_str = self._format_arguments(arguments)
self._print(f"{header} {_Style.wrap(args_str, _Style.GRAY)}", end="\n")
elif event_type == "tool_execution_end":
tool_name = data.get("tool_name", "tool")
tool_id = data.get("tool_call_id")
status = data.get("status", "success")
result = data.get("result", "")
exec_time = data.get("execution_time")
if exec_time is None and tool_id in self._tool_started_at:
exec_time = time.time() - self._tool_started_at.pop(tool_id, time.time())
success = status == "success"
icon = "" if success else ""
color = _Style.GREEN if success else _Style.RED
result_str = str(result)
if len(result_str) > 500:
result_str = result_str[:500] + ""
# Indent multi-line tool output for readability
result_str = result_str.replace("\n", "\n ")
cost = f" ({exec_time:.2f}s)" if isinstance(exec_time, (int, float)) else ""
self._print(
_Style.wrap(f" {icon} {tool_name}{cost}", color) + " " + _Style.wrap(result_str, _Style.GRAY),
end="\n",
)
elif event_type == "file_to_send":
self._close_section()
file_path = data.get("path", "")
file_name = data.get("file_name", "")
label = file_name or file_path
self._print(_Style.wrap(f"📎 文件: {label}", _Style.BLUE), end="\n")
elif event_type == "error":
self._close_section()
err_msg = data.get("error") or "unknown error"
self._print(_Style.wrap(f"{err_msg}", _Style.BOLD, _Style.RED), end="\n")
elif event_type == "agent_cancelled":
self._close_section()
self._print(_Style.wrap("⏹ 已中止", _Style.YELLOW), end="\n")
elif event_type == "agent_end":
self._close_section()
def finish(self):
"""Ensure any open section is closed at the end of a turn."""
self._close_section()
class TerminalMessage(ChatMessage):
def __init__(
self,
@@ -29,17 +190,33 @@ class TerminalMessage(ChatMessage):
class TerminalChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE]
def __init__(self):
super().__init__()
# Per-request renderers keyed by request_id; used to detect whether
# agent text was already streamed so send() can avoid duplicate output.
self._renderers = {}
# Callback that restores TTY attributes on exit (set in startup).
self._restore_terminal = None
def send(self, reply: Reply, context: Context):
print("\nBot:")
request_id = context.get("request_id") if context else None
renderer = self._renderers.pop(request_id, None) if request_id else None
streamed = renderer is not None and renderer._has_output
if renderer is not None:
renderer.finish()
if reply.type == ReplyType.IMAGE:
from PIL import Image
image_storage = reply.content
image_storage.seek(0)
img = Image.open(image_storage)
if not streamed:
print("\nAgent: ")
print("<IMAGE>")
img.show()
elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
elif reply.type == ReplyType.IMAGE_URL: # download image from url
import io
import requests
@@ -52,38 +229,122 @@ class TerminalChannel(ChatChannel):
image_storage.write(block)
image_storage.seek(0)
img = Image.open(image_storage)
if not streamed:
print("\nAgent: ")
print(img_url)
img.show()
else:
# When agent already streamed the answer, skip re-printing the
# final text to avoid duplication; just emit a trailing newline.
if streamed:
print()
else:
print("\nAgent: ")
print(reply.content)
print("\nUser:", end="")
print("\nUser: ", end="")
sys.stdout.flush()
return
def _silence_console_logging(self):
"""Mute console log output so background-thread logs (web/MCP/scheduler)
don't flood the interactive terminal. Logs still go to run.log in full.
Configurable via `terminal_log_level` (default ERROR). The file handler
is untouched, so run.log keeps the complete log.
"""
import logging
level_name = str(conf().get("terminal_log_level", "ERROR")).upper()
level = getattr(logging, level_name, logging.ERROR)
root_logger = logging.getLogger("log")
for handler in root_logger.handlers:
# Only raise the level of the stdout/stderr stream handler;
# keep FileHandler at the logger's level so run.log stays complete.
if isinstance(handler, logging.StreamHandler) and not isinstance(handler, logging.FileHandler):
handler.setLevel(level)
def _install_terminal_guard(self):
"""Save TTY attributes and register restore hooks so the terminal is
never left in a broken state (no echo / raw mode / leftover ANSI) after
the process exits, especially when Ctrl+C interrupts a blocking input().
"""
if not sys.stdin.isatty():
return
try:
import atexit
import termios
saved_attrs = termios.tcgetattr(sys.stdin.fileno())
def _restore():
try:
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, saved_attrs)
except Exception:
pass
try:
if _Style.enabled:
sys.stdout.write(_Style.RESET)
sys.stdout.flush()
except Exception:
pass
self._restore_terminal = _restore
atexit.register(_restore)
except Exception as e:
# termios is unavailable on Windows; skip the guard there.
logger.debug(f"[Terminal] terminal guard not installed: {e}")
self._restore_terminal = None
def startup(self):
context = Context()
logger.setLevel("WARN")
print("\nPlease input your question:\nUser:", end="")
self._silence_console_logging()
self._install_terminal_guard()
print("\nPlease input your question:\nUser: ", end="")
sys.stdout.flush()
msg_id = 0
while True:
try:
prompt = self.get_input()
except KeyboardInterrupt:
print("\nExiting...")
sys.exit()
except (KeyboardInterrupt, EOFError):
self._shutdown()
msg_id += 1
trigger_prefixs = conf().get("single_chat_prefix", [""])
if check_prefix(prompt, trigger_prefixs) is None:
prompt = trigger_prefixs[0] + prompt # 给没触发的消息加上触发前缀
prompt = trigger_prefixs[0] + prompt # add trigger prefix to untriggered messages
context = self._compose_context(ContextType.TEXT, prompt, msg=TerminalMessage(msg_id, prompt))
context["isgroup"] = False
if context:
# Attach an agent event renderer so reasoning / tool calls /
# streaming answer show up live in the terminal (web-like UX).
request_id = str(msg_id)
context["request_id"] = request_id
renderer = TerminalAgentRenderer()
self._renderers[request_id] = renderer
context["on_event"] = renderer.handle_event
self.produce(context)
else:
raise Exception("context is None")
def _shutdown(self):
"""Restore terminal state and terminate the whole process.
startup() runs in a daemon sub-thread, so sys.exit() would only kill
this thread and leave the main process (and web/MCP/scheduler threads)
alive, holding the terminal in a half-occupied state -> laggy input.
We reset any leftover ANSI styling and hard-exit the process instead.
"""
# Restore TTY attributes and reset any leftover ANSI styling
# (e.g. interrupted mid-stream output) before terminating.
if self._restore_terminal:
self._restore_terminal()
elif _Style.enabled:
sys.stdout.write(_Style.RESET)
sys.stdout.write("\nExiting...\n")
sys.stdout.flush()
# Hard-exit the entire process from a daemon thread.
os._exit(0)
def get_input(self):
"""
Multi-line input function

View File

@@ -1018,27 +1018,30 @@ class WebChannel(ChatChannel):
self._cleanup_stale_voice_recordings()
# 打印可用渠道类型提示
# Print available channel types
logger.info(
"[WebChannel] 全部可用通道如下,可修改 config.json 配置文件中的 channel_type 字段进行切换,多个通道用逗号分隔:")
logger.info("[WebChannel] 1. weixin - 微信")
logger.info("[WebChannel] 2. web - 网页")
logger.info("[WebChannel] 3. terminal - 终端")
logger.info("[WebChannel] 4. feishu - 飞书")
logger.info("[WebChannel] 5. dingtalk - 钉钉")
logger.info("[WebChannel] 6. wecom_bot - 企微智能机器人")
logger.info("[WebChannel] 7. wechatcom_app - 企微自建应用")
logger.info("[WebChannel] 8. wechat_kf - 微信客服")
logger.info("[WebChannel] 9. wechatmp - 个人公众号")
logger.info("[WebChannel] 10. wechatmp_service - 企业公众号")
logger.info("[WebChannel] ✅ Web控制台已运行")
logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}")
"[WebChannel] Available channels (edit `channel_type` in config.json to switch, separate multiple with commas):")
logger.info("[WebChannel] 1. web - Web")
logger.info("[WebChannel] 2. terminal - Terminal")
logger.info("[WebChannel] 3. weixin - WeChat")
logger.info("[WebChannel] 4. feishu - Feishu")
logger.info("[WebChannel] 5. dingtalk - DingTalk")
logger.info("[WebChannel] 6. wecom_bot - WeCom Bot")
logger.info("[WebChannel] 7. wechatcom_app - WeCom App")
logger.info("[WebChannel] 8. wechat_kf - WeChat Customer Service")
logger.info("[WebChannel] 9. wechatmp - WeChat Official Account")
logger.info("[WebChannel] 10. wechatmp_service - WeChat Official Account (Service)")
logger.info("[WebChannel] 11. telegram - Telegram")
logger.info("[WebChannel] 12. slack - Slack")
logger.info("[WebChannel] 13. discord - Discord")
logger.info("[WebChannel] ✅ Web console is running")
logger.info(f"[WebChannel] 🌐 Local access: http://localhost:{port}")
if is_public_bind:
logger.info(f"[WebChannel] 🌍 服务器访问: http://YOUR_IP:{port} (将YOUR_IP替换为服务器IP)")
logger.info(f"[WebChannel] 🌍 Server access: http://YOUR_IP:{port} (replace YOUR_IP with your server IP)")
if not _is_password_enabled():
logger.info("[WebChannel] ⚠️ 当前监听 0.0.0.0 且未设置 web_password,公网部署建议在 config.json 中配置访问密码")
logger.info("[WebChannel] ⚠️ Listening on 0.0.0.0 without web_password set; set an access password in config.json for public deployment")
else:
logger.info(f"[WebChannel] 🔒 当前仅监听 {host},仅本机可访问。如需公网访问,请将 web_host 改为 0.0.0.0 并配置 web_password 密码")
logger.info(f"[WebChannel] 🔒 Listening on {host} only (local access). For public access, set web_host to 0.0.0.0 and configure web_password")
try:
import webbrowser

View File

@@ -8,11 +8,28 @@ from typing import Optional
import click
from cli.utils import get_project_root
from cli.utils import get_project_root, load_config_json
_IS_WIN = sys.platform == "win32"
def _is_terminal_only() -> bool:
"""Whether terminal is the only configured channel.
Terminal needs an interactive stdin/tty, which is incompatible with the
background daemon mode (stdout/stdin detached). When terminal is the only
channel, `start` must run in the foreground so it can own the tty.
"""
channel = load_config_json().get("channel_type", "")
if isinstance(channel, str):
names = [c.strip() for c in channel.split(",") if c.strip()]
elif isinstance(channel, (list, tuple)):
names = [str(c).strip() for c in channel if str(c).strip()]
else:
names = []
return names == ["terminal"]
def _get_pid_file():
return os.path.join(get_project_root(), ".cow.pid")
@@ -103,6 +120,12 @@ def start(foreground, no_logs):
python = sys.executable
# Terminal-only setups need an interactive tty; force foreground so the
# terminal channel can read stdin instead of fighting the shell over the tty.
if not foreground and _is_terminal_only():
foreground = True
click.echo("Detected terminal-only channel, starting in foreground...")
if foreground:
click.echo("Starting CowAgent in foreground...")
if _IS_WIN: