From 8d67177a1b1698e2b1f1a105919634eb2b0fa629 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Tue, 26 May 2026 23:36:09 +0800 Subject: [PATCH] feat(agent): support user-initiated cancel for in-flight agent runs --- agent/protocol/__init__.py | 12 ++- agent/protocol/agent.py | 12 ++- agent/protocol/agent_stream.py | 128 ++++++++++++++++++++++++- agent/protocol/cancel.py | 121 +++++++++++++++++++++++ bridge/agent_bridge.py | 31 +++++- channel/chat_channel.py | 36 +++++++ channel/feishu/feishu_channel.py | 33 ++++++- channel/web/chat.html | 2 +- channel/web/static/css/console.css | 32 +++++++ channel/web/static/js/console.js | 117 ++++++++++++++++++++-- channel/web/web_channel.py | 105 ++++++++++++++++++++ channel/wecom_bot/wecom_bot_channel.py | 11 +++ docs/cli/general.mdx | 8 ++ docs/cli/index.mdx | 2 + docs/en/cli/general.mdx | 8 ++ docs/en/cli/index.mdx | 2 + docs/ja/cli/general.mdx | 8 ++ docs/ja/cli/index.mdx | 2 + plugins/cow_cli/cow_cli.py | 43 ++++++++- 19 files changed, 691 insertions(+), 22 deletions(-) create mode 100644 agent/protocol/cancel.py diff --git a/agent/protocol/__init__.py b/agent/protocol/__init__.py index a9fe5a3e..f0a7a4e2 100644 --- a/agent/protocol/__init__.py +++ b/agent/protocol/__init__.py @@ -3,6 +3,11 @@ from .agent_stream import AgentStreamExecutor from .task import Task, TaskType, TaskStatus from .result import AgentResult, AgentAction, AgentActionType, ToolResult from .models import LLMModel, LLMRequest, ModelFactory +from .cancel import ( + AgentCancelledError, + CancelTokenRegistry, + get_cancel_registry, +) __all__ = [ 'Agent', @@ -16,5 +21,8 @@ __all__ = [ 'ToolResult', 'LLMModel', 'LLMRequest', - 'ModelFactory' -] \ No newline at end of file + 'ModelFactory', + 'AgentCancelledError', + 'CancelTokenRegistry', + 'get_cancel_registry', +] diff --git a/agent/protocol/agent.py b/agent/protocol/agent.py index 285a9732..d944660b 100644 --- a/agent/protocol/agent.py +++ b/agent/protocol/agent.py @@ -365,7 +365,8 @@ class Agent: return action - def run_stream(self, user_message: str, on_event=None, clear_history: bool = False, skill_filter=None) -> str: + def run_stream(self, user_message: str, on_event=None, clear_history: bool = False, + skill_filter=None, cancel_event=None) -> str: """ Execute single agent task with streaming (based on tool-call) @@ -374,6 +375,7 @@ class Agent: - Multi-turn reasoning based on tool-call - Event callbacks - Persistent conversation history across calls + - User-initiated cancellation via ``cancel_event`` Args: user_message: User message @@ -381,6 +383,11 @@ class Agent: event = {"type": str, "timestamp": float, "data": dict} clear_history: If True, clear conversation history before this call (default: False) skill_filter: Optional list of skill names to include in this run + cancel_event: Optional threading.Event polled at agent checkpoints. + When set, the loop exits at the next safe point, injects a + "[Interrupted by user]" assistant note, and returns the + partial response. ``messages`` stays in a valid state + (tool_use/tool_result pairs preserved). Returns: Final response text @@ -424,7 +431,8 @@ class Agent: max_turns=self.max_steps, on_event=on_event, messages=messages_copy, # Pass copied message history - max_context_turns=max_context_turns + max_context_turns=max_context_turns, + cancel_event=cancel_event, ) # Execute diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py index ef4f975b..701b00de 100644 --- a/agent/protocol/agent_stream.py +++ b/agent/protocol/agent_stream.py @@ -7,6 +7,7 @@ import json import time from typing import List, Dict, Any, Optional, Callable, Tuple +from agent.protocol.cancel import AgentCancelledError from agent.protocol.models import LLMRequest, LLMModel from agent.protocol.message_utils import sanitize_claude_messages, compress_turn_to_text_only from agent.tools.base_tool import BaseTool, ToolResult @@ -64,7 +65,8 @@ class AgentStreamExecutor: max_turns: int = 50, on_event: Optional[Callable] = None, messages: Optional[List[Dict]] = None, - max_context_turns: int = 30 + max_context_turns: int = 30, + cancel_event=None, ): """ Initialize stream executor @@ -78,6 +80,10 @@ class AgentStreamExecutor: on_event: Event callback function messages: Optional existing message history (for persistent conversations) max_context_turns: Maximum number of conversation turns to keep in context + cancel_event: Optional threading.Event used to signal user cancel. + Checked at every safe point (turn boundary, before tool execution, + during LLM streaming). When set, raises AgentCancelledError which + run_stream catches to gracefully wind down. """ self.agent = agent self.model = model @@ -87,6 +93,7 @@ class AgentStreamExecutor: self.max_turns = max_turns self.on_event = on_event self.max_context_turns = max_context_turns + self.cancel_event = cancel_event # Message history - use provided messages or create new list self.messages = messages if messages is not None else [] @@ -97,6 +104,73 @@ class AgentStreamExecutor: # Track files to send (populated by read tool) self.files_to_send = [] # List of file metadata dicts + def _check_cancelled(self) -> None: + """Raise AgentCancelledError if the user requested cancellation. + + Called at safe points (turn start, between tool calls, between LLM + chunks). Cheap to call: just an Event.is_set() probe. + """ + if self.cancel_event is not None and self.cancel_event.is_set(): + raise AgentCancelledError("agent cancelled by user") + + def _handle_cancelled(self, partial_response: str) -> None: + """Wind down ``self.messages`` after a user-initiated cancel. + + The messages list may be in any of these states when we get here: + (a) Last message is an assistant message containing tool_use + blocks but the matching tool_result has not been appended yet. + (b) Last message is an assistant text-only reply (cancel happened + right before the next turn started). + (c) Last message is a user tool_result message and we cancelled + between turns. + + For (a) we MUST synthesise tool_result blocks, otherwise the next + request will fail Claude/OpenAI's strict pairing validation. For + (b)/(c) the state is already valid and we just append a small + cancellation note so the user/LLM both see the boundary clearly. + """ + try: + # Step 1: close any orphaned tool_use in the trailing assistant + # message by injecting matching tool_result blocks. + if self.messages and isinstance(self.messages[-1], dict) \ + and self.messages[-1].get("role") == "assistant": + last = self.messages[-1] + content = last.get("content") + if isinstance(content, list): + pending_tool_use_ids = [ + block.get("id") + for block in content + if isinstance(block, dict) and block.get("type") == "tool_use" + ] + pending_tool_use_ids = [tid for tid in pending_tool_use_ids if tid] + if pending_tool_use_ids: + tool_result_blocks = [ + { + "type": "tool_result", + "tool_use_id": tid, + "content": "Cancelled by user before this tool finished.", + "is_error": True, + } + for tid in pending_tool_use_ids + ] + self.messages.append({ + "role": "user", + "content": tool_result_blocks, + }) + logger.info( + f"[Agent] Injected {len(tool_result_blocks)} cancellation " + f"tool_result blocks to keep message history valid" + ) + + # Step 2: append a stable "interrupted" marker so the LLM sees a + # clear stop boundary on the next turn. + self.messages.append({ + "role": "assistant", + "content": [{"type": "text", "text": "_(Cancelled by user)_"}], + }) + except Exception as e: + logger.warning(f"[Agent] _handle_cancelled cleanup failed: {e}") + def _emit_event(self, event_type: str, data: dict = None): """Emit event""" if self.on_event: @@ -270,8 +344,13 @@ class AgentStreamExecutor: final_response = "" turn = 0 + cancelled = False try: while turn < self.max_turns: + # Check at the very top of every turn so a cancel arriving + # between turns short-circuits cleanly. + self._check_cancelled() + turn += 1 logger.info(f"[Agent] 第 {turn} 轮") self._emit_event("turn_start", {"turn": turn}) @@ -375,6 +454,8 @@ class AgentStreamExecutor: try: for tool_call in tool_calls: + # Honour cancel between tool invocations within the same turn + self._check_cancelled() result = self._execute_tool(tool_call) tool_results.append(result) @@ -557,6 +638,15 @@ class AgentStreamExecutor: self.messages.pop(prompt_insert_idx) logger.debug("[Agent] Removed injected max-steps prompt from message history") + except AgentCancelledError: + # User-initiated stop: wind down message history cleanly so the + # next turn is unaffected; channels emit a "cancelled" UI event. + cancelled = True + logger.info(f"[Agent] 🛑 已被用户中止 (第 {turn} 轮)") + self._handle_cancelled(final_response) + if not final_response or not final_response.strip(): + final_response = "_(Cancelled)_" + except Exception as e: logger.error(f"❌ Agent执行错误: {e}") self._emit_event("error", {"error": str(e)}) @@ -564,8 +654,11 @@ class AgentStreamExecutor: finally: final_response = final_response.strip() if final_response else final_response - logger.info(f"[Agent] 🏁 完成 ({turn}轮)") - self._emit_event("agent_end", {"final_response": final_response}) + if cancelled: + # Emit before agent_end so channels can mark UI as cancelled + self._emit_event("agent_cancelled", {"final_response": final_response}) + logger.info(f"[Agent] 🏁 完成 ({turn}轮)" + (" [cancelled]" if cancelled else "")) + self._emit_event("agent_end", {"final_response": final_response, "cancelled": cancelled}) return final_response @@ -644,7 +737,32 @@ class AgentStreamExecutor: try: stream = self.model.call_stream(request) + # Probe cancel every N chunks to bound reaction time without + # checking on every token. + _cancel_probe_counter = 0 + _CANCEL_PROBE_EVERY = 8 + for chunk in stream: + _cancel_probe_counter += 1 + if _cancel_probe_counter >= _CANCEL_PROBE_EVERY: + _cancel_probe_counter = 0 + if self.cancel_event is not None and self.cancel_event.is_set(): + # Persist partial text only; tool_use args may be + # truncated mid-stream and would fail validation. + logger.info("[Agent] cancel detected mid-stream, aborting LLM call") + if full_content: + partial_msg = { + "role": "assistant", + "content": [{"type": "text", "text": full_content}], + } + self.messages.append(partial_msg) + self._emit_event("message_end", { + "content": full_content, + "tool_calls": [], + "cancelled": True, + }) + raise AgentCancelledError("cancelled during LLM streaming") + # Check for errors if isinstance(chunk, dict) and chunk.get("error"): # Extract error message from nested structure @@ -738,6 +856,10 @@ class AgentStreamExecutor: elif isinstance(choice, dict) and choice.get("_gemini_raw_parts"): gemini_raw_parts = choice["_gemini_raw_parts"] + except AgentCancelledError: + # Must propagate untouched; never treat as a retryable error. + raise + except Exception as e: error_str = str(e) error_str_lower = error_str.lower() diff --git a/agent/protocol/cancel.py b/agent/protocol/cancel.py new file mode 100644 index 00000000..6354cd38 --- /dev/null +++ b/agent/protocol/cancel.py @@ -0,0 +1,121 @@ +""" +Cancel token registry for aborting in-flight agent runs. + +A user cancel (web Cancel button, /cancel command) sets a threading.Event +that the agent loop polls at safe checkpoints. Tokens are keyed by +request_id (preferred) and tracked under session_id as a fallback. Entries +are released after the run completes to keep the registry bounded. + +No project deps — importable from any layer without circular imports. +""" + +from __future__ import annotations + +import threading +from typing import Dict, Optional + + +class AgentCancelledError(Exception): + """Raised inside the agent loop when a stop has been requested. + + The agent stream executor catches this, injects a "[Interrupted]" note + into the message history (preserving tool_use/tool_result integrity) + and returns a partial response to the caller. + """ + + +class _CancelEntry: + __slots__ = ("event", "session_id") + + def __init__(self, session_id: Optional[str]): + self.event = threading.Event() + self.session_id = session_id + + +class CancelTokenRegistry: + """In-process registry mapping request_id -> cancel Event. + + Thread-safe. Singleton via module-level ``_registry``. + """ + + def __init__(self): + self._lock = threading.Lock() + self._by_request: Dict[str, _CancelEntry] = {} + # session_id -> set of request_ids currently in flight (usually 1). + self._by_session: Dict[str, set] = {} + + def register(self, request_id: str, session_id: Optional[str] = None) -> threading.Event: + """Create (or return existing) cancel event for a request. + + Returns the threading.Event the caller should poll via ``is_set()``. + """ + if not request_id: + return threading.Event() + with self._lock: + entry = self._by_request.get(request_id) + if entry is None: + entry = _CancelEntry(session_id) + self._by_request[request_id] = entry + if session_id: + self._by_session.setdefault(session_id, set()).add(request_id) + return entry.event + + def get_event(self, request_id: str) -> Optional[threading.Event]: + if not request_id: + return None + with self._lock: + entry = self._by_request.get(request_id) + return entry.event if entry else None + + def cancel_request(self, request_id: str) -> bool: + """Trigger cancel for a specific request. Returns True when matched.""" + if not request_id: + return False + with self._lock: + entry = self._by_request.get(request_id) + if entry is None: + return False + entry.event.set() + return True + + def cancel_session(self, session_id: str) -> int: + """Trigger cancel for every in-flight request of a session. + + Returns the number of requests cancelled (0 when nothing was running). + """ + if not session_id: + return 0 + with self._lock: + request_ids = list(self._by_session.get(session_id, ())) + entries = [self._by_request[r] for r in request_ids if r in self._by_request] + for entry in entries: + entry.event.set() + return len(entries) + + def unregister(self, request_id: str) -> None: + """Remove an entry once the agent run is done. Safe to call twice.""" + if not request_id: + return + with self._lock: + entry = self._by_request.pop(request_id, None) + if entry and entry.session_id: + bucket = self._by_session.get(entry.session_id) + if bucket is not None: + bucket.discard(request_id) + if not bucket: + self._by_session.pop(entry.session_id, None) + + def has_active(self, session_id: str) -> bool: + if not session_id: + return False + with self._lock: + bucket = self._by_session.get(session_id) + return bool(bucket) + + +_registry = CancelTokenRegistry() + + +def get_cancel_registry() -> CancelTokenRegistry: + """Module-level accessor for the singleton registry.""" + return _registry diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index a20659a8..a924dab2 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -5,7 +5,7 @@ Agent Bridge - Integrates Agent system with existing COW bridge import os from typing import Optional, List -from agent.protocol import Agent, LLMModel, LLMRequest +from agent.protocol import Agent, LLMModel, LLMRequest, get_cancel_registry from bridge.agent_event_handler import AgentEventHandler from bridge.agent_initializer import AgentInitializer from bridge.bridge import Bridge @@ -399,11 +399,22 @@ class AgentBridge: """ session_id = None agent = None + request_id = None + cancel_event = None try: # Extract session_id from context for user isolation if context: session_id = context.kwargs.get("session_id") or context.get("session_id") - + request_id = context.kwargs.get("request_id") or context.get("request_id") + + # Register a cancel token. Prefer per-turn request_id (web), + # fall back to session_id (IM channels). The Event is polled by + # AgentStreamExecutor at safe checkpoints. + registry = get_cancel_registry() + token_key = request_id or session_id + if token_key: + cancel_event = registry.register(token_key, session_id=session_id) + # Get agent for this session (will auto-initialize if needed) agent = self.get_agent(session_id=session_id) if not agent: @@ -458,7 +469,8 @@ class AgentBridge: response = agent.run_stream( user_message=query, on_event=event_handler.handle_event, - clear_history=clear_history + clear_history=clear_history, + cancel_event=cancel_event, ) finally: # Restore original tools @@ -468,6 +480,13 @@ class AgentBridge: # Log execution summary event_handler.log_summary() + # Release cancel token; keep registry bounded. + if token_key: + try: + registry.unregister(token_key) + except Exception: + pass + # Persist new messages generated during this run if session_id: channel_type = (context.get("channel_type") or "") if context else "" @@ -521,6 +540,12 @@ class AgentBridge: logger.info(f"[AgentBridge] Cleared DB for session after error: {session_id}") except Exception as db_err: logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}") + # Release cancel token on error path too (idempotent). + if cancel_event is not None and (request_id or session_id): + try: + get_cancel_registry().unregister(request_id or session_id) + except Exception: + pass return Reply(ReplyType.ERROR, f"Agent error: {str(e)}") def _schedule_mcp_hot_reload(self, agent): diff --git a/channel/chat_channel.py b/channel/chat_channel.py index c38dd7c8..6a9a1952 100644 --- a/channel/chat_channel.py +++ b/channel/chat_channel.py @@ -438,8 +438,21 @@ class ChatChannel(Channel): return func + # Chat commands that must bypass the per-session serial queue, + # otherwise /cancel would queue behind the task it tries to cancel. + # Use /cancel (not /stop) to avoid colliding with `cow stop` CLI. + _BYPASS_QUEUE_COMMANDS = ("/cancel",) + def produce(self, context: Context): session_id = context["session_id"] + + # Fast path: /cancel must not enter the queue. + if context.type == ContextType.TEXT and context.content: + stripped = context.content.strip().lower() + if stripped in self._BYPASS_QUEUE_COMMANDS: + self._handle_cancel_command(context, session_id) + return + with self.lock: if session_id not in self.sessions: self.sessions[session_id] = [ @@ -451,6 +464,29 @@ class ChatChannel(Channel): else: self.sessions[session_id][0].put(context) + def _handle_cancel_command(self, context: Context, session_id: str) -> None: + """Cancel any in-flight agent run for *session_id* and reply inline. + + Runs synchronously on the caller's thread. Reply is sent through + _send_reply so plugins (e.g. logging) still observe it. + """ + try: + from agent.protocol import get_cancel_registry + from bridge.reply import Reply, ReplyType + + cancelled = get_cancel_registry().cancel_session(session_id) + text = ( + "🛑 已中止" + if cancelled > 0 + else "当前没有可中止的任务。" + ) + logger.info( + f"[chat_channel] /cancel fast-path: session={session_id}, cancelled={cancelled}" + ) + self._send_reply(context, Reply(ReplyType.TEXT, text)) + except Exception as e: + logger.warning(f"[chat_channel] /cancel fast-path failed: {e}") + # 消费者函数,单独线程,用于从消息队列中取出消息并处理 def consume(self): while True: diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py index ca18e64b..9a9f3307 100644 --- a/channel/feishu/feishu_channel.py +++ b/channel/feishu/feishu_channel.py @@ -752,6 +752,9 @@ class FeiShuChanel(ChatChannel): init_in_flight = [False] # 一旦初始化失败就长期标记为 disabled,本次回复不再尝试任何流式调用 disabled = [False] + # True after agent_cancelled: agent_end stops rewriting the card + # with stale final_response and just finalizes current content. + cancelled = [False] lock = threading.Lock() # ---- 异步推送队列 ---------------------------------------------------- @@ -1076,18 +1079,42 @@ class FeiShuChanel(ChatChannel): message_id[0] = None sequence[0] = 0 + elif event_type == "agent_cancelled": + # Lock channel into "no-rewrite" mode: the subsequent + # agent_end's final_response is from the last *completed* + # turn (the user already saw it), so rewriting the card + # would duplicate it visually. + with lock: + cancelled[0] = True + elif event_type == "agent_end": # 最终回复:用 final_response 覆盖当前流式卡片,然后关闭流式模式。 final_response = data.get("final_response", "") - if not final_response: - return - final_text = str(final_response) # 标记 streamed 让 chat_channel 跳过 send() context["feishu_streamed"] = True with lock: + was_cancelled = cancelled[0] has_card = card_id[0] is not None init_busy = init_in_flight[0] + pending_text = current_text[0] + + if was_cancelled: + # Cancelled path: finalize the in-flight card with + # partial output (or a short marker if empty); drop + # stale final_response to avoid duplicating last turn. + if has_card: + _drain_push_queue() + partial = (pending_text or "").rstrip() + final_text = partial or "_(已中止)_" + _stream_update_text(final_text) + _close_streaming_mode(final_text) + push_queue.put(None) + return + + if not final_response: + return + final_text = str(final_response) # 罕见情况:agent_end 触发时还没创建过卡片(极快返回 / 没有 # message_update),主动创建一张承载 final_text。 diff --git a/channel/web/chat.html b/channel/web/chat.html index 947e07b7..d90adb15 100644 --- a/channel/web/chat.html +++ b/channel/web/chat.html @@ -445,7 +445,7 @@ bg-primary-400 text-white hover:bg-primary-500 disabled:bg-slate-300 dark:disabled:bg-slate-600 disabled:cursor-not-allowed cursor-pointer transition-colors duration-150" - disabled onclick="sendMessage()"> + disabled> diff --git a/channel/web/static/css/console.css b/channel/web/static/css/console.css index 35fc307b..957db3e0 100644 --- a/channel/web/static/css/console.css +++ b/channel/web/static/css/console.css @@ -1367,3 +1367,35 @@ text-align: right; } .voice-pill audio { display: none; } + +/* Send button toggles into a Stop button while an SSE stream is in flight. + Match the look of the disabled send button (light grey block + white + glyph) so it reads as the same visual element, just paused/idle from + sending perspective and clickable to stop. */ +#send-btn.send-btn-cancel { + background-color: rgb(203 213 225) !important; /* slate-300, == disabled send-btn */ + color: white !important; +} +#send-btn.send-btn-cancel:hover { + background-color: rgb(148 163 184) !important; /* slate-400 */ +} +#send-btn.send-btn-cancel:disabled { + background-color: rgb(226 232 240) !important; /* slate-200, while stop is in flight */ + color: white !important; + cursor: progress; +} +.dark #send-btn.send-btn-cancel { + background-color: rgb(71 85 105) !important; /* slate-600, == dark disabled send-btn */ + color: white !important; +} +.dark #send-btn.send-btn-cancel:hover { + background-color: rgb(100 116 139) !important; /* slate-500 */ +} +.dark #send-btn.send-btn-cancel:disabled { + background-color: rgb(51 65 85) !important; /* slate-700 */ + color: rgb(203 213 225) !important; +} + +.agent-cancelled-tag { + font-style: italic; +} diff --git a/channel/web/static/js/console.js b/channel/web/static/js/console.js index 7376e29c..6d0a66fc 100644 --- a/channel/web/static/js/console.js +++ b/channel/web/static/js/console.js @@ -1016,7 +1016,60 @@ const inputHistory = []; let historyIdx = -1; let historySavedDraft = ''; +// While an SSE stream is in flight, the send button morphs into a cancel +// button. Only one in-flight request is supported at a time. +let activeRequestId = null; +let sendBtnMode = 'send'; // 'send' | 'cancel' + +function setSendBtnCancelMode(requestId) { + activeRequestId = requestId; + sendBtnMode = 'cancel'; + sendBtn.disabled = false; + sendBtn.classList.add('send-btn-cancel'); + sendBtn.title = (currentLang === 'zh' ? '中止' : 'Cancel'); + sendBtn.innerHTML = ''; +} + +function resetSendBtnSendMode() { + activeRequestId = null; + sendBtnMode = 'send'; + sendBtn.classList.remove('send-btn-cancel'); + sendBtn.title = ''; + sendBtn.innerHTML = ''; + updateSendBtnState(); +} + +function requestCancel() { + const reqId = activeRequestId; + if (!reqId) return; + fetch('/cancel', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ request_id: reqId, session_id: sessionId, lang: currentLang }), + }).catch(err => { + console.warn('[cancel] request failed', err); + }); + // Optimistic UI lock so the click visibly registers before the SSE + // "cancelled" event arrives. + sendBtn.disabled = true; + sendBtn.title = (currentLang === 'zh' ? '已中止' : 'Cancelled'); +} + +// Button click is the only path to Cancel. Pressing Enter still calls +// sendMessage() so users can submit "/cancel" as a regular slash command. +sendBtn.addEventListener('click', () => { + if (sendBtnMode === 'cancel') { + requestCancel(); + } else { + sendMessage(); + } +}); + function updateSendBtnState() { + if (sendBtnMode === 'cancel') { + // Don't downgrade a Cancel button on input edits. + return; + } sendBtn.disabled = uploadingCount > 0 || (!chatInput.value.trim() && pendingAttachments.length === 0); } @@ -1264,6 +1317,7 @@ const SLASH_COMMANDS = [ { cmd: '/knowledge on', desc: '开启知识库' }, { cmd: '/knowledge off', desc: '关闭知识库' }, { cmd: '/config', desc: '查看当前配置' }, + { cmd: '/cancel', desc: '中止当前正在运行的 Agent 任务' }, { cmd: '/logs', desc: '查看最近日志' }, { cmd: '/version', desc: '查看版本' }, ]; @@ -1534,6 +1588,7 @@ function sendVoiceMessage(text, audioUrl) { stream: true, timestamp: timestamp.toISOString(), is_voice: true, + lang: currentLang, }; const MAX_RETRIES = 2; @@ -1547,7 +1602,12 @@ function sendVoiceMessage(text, audioUrl) { .then(r => r.json()) .then(data => { if (data.status === 'success') { - if (data.stream) { + if (data.inline_reply) { + // Synchronous fast-path reply (e.g. /cancel); skip SSE. + loadingEl.remove(); + addBotMessage(data.inline_reply, new Date()); + } else if (data.stream) { + setSendBtnCancelMode(data.request_id); startSSE(data.request_id, loadingEl, timestamp, titleInfo); } else { loadingContainers[data.request_id] = loadingEl; @@ -1555,6 +1615,7 @@ function sendVoiceMessage(text, audioUrl) { } else { loadingEl.remove(); addBotMessage(t('error_send'), new Date()); + resetSendBtnSendMode(); } }) .catch(err => { @@ -1591,6 +1652,10 @@ function addUserVoiceMessage(audioUrl, caption, timestamp) { } function sendMessage() { + // Do NOT branch on sendBtnMode here: Enter should always send (so + // typing "/cancel" submits normally). Cancel is wired only to the + // send button's pointer click — see send-btn listener above. + const text = chatInput.value.trim(); if (!text && pendingAttachments.length === 0) return; @@ -1619,7 +1684,7 @@ function sendMessage() { renderAttachmentPreview(); sendBtn.disabled = true; - const body = { session_id: sessionId, message: text, stream: true, timestamp: timestamp.toISOString() }; + const body = { session_id: sessionId, message: text, stream: true, timestamp: timestamp.toISOString(), lang: currentLang }; if (attachments.length > 0) { body.attachments = attachments.map(a => ({ file_path: a.file_path, @@ -1641,7 +1706,13 @@ function sendMessage() { .then(r => r.json()) .then(data => { if (data.status === 'success') { - if (data.stream) { + if (data.inline_reply) { + // Channel handled synchronously (e.g. /cancel fast-path); + // render as a bot bubble and skip SSE entirely. + loadingEl.remove(); + addBotMessage(data.inline_reply, new Date()); + } else if (data.stream) { + setSendBtnCancelMode(data.request_id); startSSE(data.request_id, loadingEl, timestamp, titleInfo); } else { loadingContainers[data.request_id] = loadingEl; @@ -1649,12 +1720,14 @@ function sendMessage() { } else { loadingEl.remove(); addBotMessage(t('error_send'), new Date()); + resetSendBtnSendMode(); } }) .catch(err => { if (err.name === 'AbortError') { loadingEl.remove(); addBotMessage(t('error_timeout'), new Date()); + resetSendBtnSendMode(); return; } if (attempt < MAX_RETRIES) { @@ -1664,6 +1737,7 @@ function sendMessage() { } loadingEl.remove(); addBotMessage(t('error_send'), new Date()); + resetSendBtnSendMode(); }); } @@ -1919,14 +1993,33 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) { stepsEl.appendChild(wrap); scrollChatToBottom(); + } else if (item.type === 'cancelled') { + // Agent acknowledged the stop; mark the bubble. A trailing + // "done" still arrives with the partial answer. + ensureBotEl(); + if (currentReasoningEl) { + finalizeThinking(currentReasoningEl, reasoningStartTime, reasoningText); + currentReasoningEl = null; + reasoningText = ''; + } + if (!botEl.querySelector('.agent-cancelled-tag')) { + const tag = document.createElement('div'); + tag.className = 'agent-cancelled-tag text-xs text-amber-600 dark:text-amber-400 mt-1'; + tag.textContent = (currentLang === 'zh') ? '已中止' : 'Cancelled'; + stepsEl.appendChild(tag); + } + resetSendBtnSendMode(); + } else if (item.type === 'done') { // Don't close the stream yet: the backend keeps it open // for a short tail to deliver async attachments such as // TTS audio (`voice_attach`). It will close the stream on // its own via onerror once the tail expires. done = true; + resetSendBtnSendMode(); - const finalText = item.content || accumulatedText; + const finalTextRaw = item.content || accumulatedText; + const finalText = localizeCancelMarker(finalTextRaw); if (!botEl && finalText) { if (loadingEl) { loadingEl.remove(); loadingEl = null; } @@ -1934,7 +2027,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) { } else if (botEl) { contentEl.classList.remove('sse-streaming'); if (finalText) contentEl.innerHTML = renderMarkdown(finalText); - contentEl.dataset.rawMd = finalText || ''; + contentEl.dataset.rawMd = finalTextRaw || ''; const copyBtn = botEl.querySelector('.copy-msg-btn'); if (copyBtn && finalText) copyBtn.style.display = ''; applyHighlighting(botEl); @@ -1964,6 +2057,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) { delete activeStreams[requestId]; if (loadingEl) { loadingEl.remove(); loadingEl = null; } addBotMessage(t('error_send'), new Date()); + resetSendBtnSendMode(); } }; @@ -2000,6 +2094,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) { applyHighlighting(botEl); bindChatKnowledgeLinks(botEl); } + resetSendBtnSendMode(); }; } @@ -2238,13 +2333,23 @@ function _renderSentFileFromToolResult(step) { ` ${escapeHtml(fileName)}`; } +// Cosmetic translator for cancel markers persisted in history. +// History keeps the English canonical form for the LLM; only display is localized. +function localizeCancelMarker(text) { + if (!text) return text; + if (currentLang !== 'zh') return text; + return text + .replace(/_\(Cancelled by user\)_/g, '_(用户已中止)_') + .replace(/_\(Cancelled\)_/g, '_(已中止)_'); +} + function createBotMessageEl(content, timestamp, requestId, msg) { const el = document.createElement('div'); el.className = 'flex gap-3 px-4 sm:px-6 py-3'; if (requestId) el.dataset.requestId = requestId; let stepsHtml = ''; - let displayContent = content; + let displayContent = localizeCancelMarker(content); if (msg && msg.steps && msg.steps.length > 0) { // New format: ordered steps with interleaved content diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 371f0271..ab1d6915 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -93,6 +93,15 @@ def _require_auth(): json.dumps({"status": "error", "message": "Unauthorized"})) +# Localized text for /cancel system replies. Web is the only channel that +# honors a per-request `lang`; other channels reply in Chinese by default. +def _cancel_reply_text(cancelled: int, lang: str) -> str: + en = lang.startswith("en") + if cancelled > 0: + return "🛑 Cancelled." if en else "🛑 已中止" + return "Nothing to cancel." if en else "当前没有可中止的任务。" + + def _get_upload_dir() -> str: from common.utils import expand_path ws_root = expand_path(conf().get("agent_workspace", "~/cow")) @@ -437,6 +446,18 @@ class WebChannel(ChatChannel): "timestamp": time.time(), }) + elif event_type == "agent_cancelled": + # Push an explicit cancelled SSE event so the frontend + # marks the bubble as stopped. A trailing "done" still + # arrives with the partial answer. + final_response = data.get("final_response", "") + q.put({ + "type": "cancelled", + "content": final_response, + "request_id": request_id, + "timestamp": time.time(), + }) + elif event_type == "agent_end": # Safety net: if the agent finishes with an empty final_response, # chat_channel skips _send_reply (because reply.content is empty), @@ -756,6 +777,25 @@ class WebChannel(ChatChannel): # desire_rtype concept used by other channels). is_voice_input = bool(json_data.get('is_voice', False)) + # Fast path for /cancel: bypass the session queue and SSE setup. + # Web frontend (stream=true) only listens to SSE, so we return an + # inline_reply payload to be rendered synchronously. + stripped_prompt = (prompt or "").strip().lower() + if stripped_prompt == "/cancel": + from agent.protocol import get_cancel_registry + cancelled = get_cancel_registry().cancel_session(session_id) + lang = (json_data.get('lang') or 'zh').lower() + msg_text = _cancel_reply_text(cancelled, lang) + logger.info( + f"[WebChannel] /cancel fast-path: session={session_id}, cancelled={cancelled}, lang={lang}" + ) + return json.dumps({ + "status": "success", + "request_id": "", + "stream": False, + "inline_reply": msg_text, + }) + # Append file references to the prompt (same format as QQ channel) if attachments: file_refs = [] @@ -862,6 +902,11 @@ class WebChannel(ChatChannel): if itype == "done": post_done = True post_deadline = time.time() + POST_DONE_TAIL_SECONDS + elif itype == "cancelled": + # Close SSE tail quickly after cancel; don't wait for the + # full TTS tail since the user already pressed Stop. + post_done = True + post_deadline = time.time() + 3 elif itype == "voice_attach": # WSGI buffers the previous chunk until the next yield; # shrink the tail so the generator wakes up quickly to @@ -872,6 +917,59 @@ class WebChannel(ChatChannel): finally: self.sse_queues.pop(request_id, None) + def cancel_request(self): + """ + Cancel an in-flight agent run. + + Body: {"request_id": "...", "session_id": "..."} + Either field is sufficient; request_id is preferred when known. + Always returns success even when nothing was running, so the + client's UX is idempotent. + """ + try: + from agent.protocol import get_cancel_registry + + data = web.data() + try: + json_data = json.loads(data) if data else {} + except Exception: + json_data = {} + + request_id = (json_data.get("request_id") or "").strip() + session_id = (json_data.get("session_id") or "").strip() + lang = (json_data.get("lang") or "zh").lower() + + registry = get_cancel_registry() + cancelled = 0 + + if request_id: + if registry.cancel_request(request_id): + cancelled = 1 + + if cancelled == 0 and session_id: + cancelled = registry.cancel_session(session_id) + + if request_id and request_id in self.sse_queues: + self.sse_queues[request_id].put({ + "type": "cancelled", + "content": "Cancelled" if lang.startswith("en") else "已中止", + "request_id": request_id, + "timestamp": time.time(), + }) + + logger.info( + f"[WebChannel] cancel request: request_id={request_id!r}, " + f"session_id={session_id!r}, cancelled={cancelled}" + ) + return json.dumps({ + "status": "success", + "cancelled": cancelled, + }) + + except Exception as e: + logger.error(f"[WebChannel] cancel_request error: {e}") + return json.dumps({"status": "error", "message": str(e)}) + def poll_response(self): """ Poll for responses using the session_id. @@ -967,6 +1065,7 @@ class WebChannel(ChatChannel): '/api/voice/tts', 'VoiceTtsHandler', '/poll', 'PollHandler', '/stream', 'StreamHandler', + '/cancel', 'CancelHandler', '/chat', 'ChatHandler', '/config', 'ConfigHandler', '/api/models', 'ModelsHandler', @@ -1240,6 +1339,12 @@ class PollHandler: return WebChannel().poll_response() +class CancelHandler: + def POST(self): + _require_auth() + return WebChannel().cancel_request() + + class StreamHandler: def GET(self): _require_auth() diff --git a/channel/wecom_bot/wecom_bot_channel.py b/channel/wecom_bot/wecom_bot_channel.py index 0fe4500b..ebc1104b 100644 --- a/channel/wecom_bot/wecom_bot_channel.py +++ b/channel/wecom_bot/wecom_bot_channel.py @@ -440,6 +440,17 @@ class WecomBotChannel(ChatChannel): state["current"] = "" _push_stream(state, force=True) + elif event_type == "agent_cancelled": + # Flush partial output and strip trailing "---" separator + # left over from previous turn, to avoid a dangling divider. + if state["current"]: + state["committed"] += state["current"] + state["current"] = "" + state["committed"] = state["committed"].rstrip() + if state["committed"].endswith("---"): + state["committed"] = state["committed"][:-3].rstrip() + _push_stream(state, force=True) + return on_event # ------------------------------------------------------------------ diff --git a/docs/cli/general.mdx b/docs/cli/general.mdx index cb3f933d..36af1783 100644 --- a/docs/cli/general.mdx +++ b/docs/cli/general.mdx @@ -39,6 +39,14 @@ Mode: agent Session: 12 messages | 8 skills loaded ``` +## cancel + +中止当前会话正在运行的 Agent 任务。在 Agent 执行长时间任务(例如多轮工具调用、长流式输出)时,可随时发送 `/cancel`,Agent 会在下一次工具执行前停止。Web 端、微信、企业微信、飞书等各通道均可使用。 + +```text +/cancel +``` + ## config 查看或修改运行时配置。修改后立即生效,无需重启服务。 diff --git a/docs/cli/index.mdx b/docs/cli/index.mdx index ce67be27..f6462ecb 100644 --- a/docs/cli/index.mdx +++ b/docs/cli/index.mdx @@ -57,6 +57,7 @@ Others: | --- | --- | | `/help` | 显示命令帮助 | | `/status` | 查看服务状态和配置 | +| `/cancel` | 中止当前正在运行的 Agent 任务 | | `/config` | 查看或修改运行时配置 | | `/skill` | 管理技能(安装、卸载、启用、禁用等) | | `/memory dream [N]` | 手动触发记忆蒸馏(默认 3 天,最大 30) | @@ -82,6 +83,7 @@ Others: | version | ✓ | ✓ | | status | ✓ | ✓ | | logs | ✓ | ✓ | +| cancel | ✗ | ✓ | | config | ✗ | ✓ | | context | — | ✓ | | memory (子命令) | ✗ | ✓ | diff --git a/docs/en/cli/general.mdx b/docs/en/cli/general.mdx index ae78f3e4..8107fcb5 100644 --- a/docs/en/cli/general.mdx +++ b/docs/en/cli/general.mdx @@ -25,6 +25,14 @@ View current session and service status, including process info, model configura /status ``` +## cancel + +Abort the agent task currently running in this session. When the agent is busy with a long task (e.g. multi-turn tool calls or a long streaming response), send `/cancel` and the agent will stop before the next tool execution. Available across all channels — Web, WeChat, WeCom, Feishu, etc. + +```text +/cancel +``` + ## config View or modify runtime configuration. Changes take effect immediately without restarting. diff --git a/docs/en/cli/index.mdx b/docs/en/cli/index.mdx index 36147261..e13b45a3 100644 --- a/docs/en/cli/index.mdx +++ b/docs/en/cli/index.mdx @@ -57,6 +57,7 @@ In the Web console or any connected channel, type `/` to see command suggestions | --- | --- | | `/help` | Show command help | | `/status` | View service status and configuration | +| `/cancel` | Abort the currently running agent task | | `/config` | View or modify runtime configuration | | `/skill` | Manage skills (install, uninstall, enable, disable, etc.) | | `/memory dream [N]` | Manually trigger memory distillation (default 3 days, max 30) | @@ -80,6 +81,7 @@ In the Web console or any connected channel, type `/` to see command suggestions | version | ✓ | ✓ | | status | ✓ | ✓ | | logs | ✓ | ✓ | +| cancel | ✗ | ✓ | | config | ✗ | ✓ | | context | — | ✓ | | memory (subcommands) | ✗ | ✓ | diff --git a/docs/ja/cli/general.mdx b/docs/ja/cli/general.mdx index ab24a14e..e31d7cf3 100644 --- a/docs/ja/cli/general.mdx +++ b/docs/ja/cli/general.mdx @@ -25,6 +25,14 @@ description: ステータスの確認、設定管理、コンテキスト制御 /status ``` +## cancel + +現在のセッションで実行中の Agent タスクを中止します。Agent が長時間のタスク(マルチターンのツール呼び出しや長いストリーミング応答など)を実行している間、`/cancel` を送信すると、次のツール実行の前に停止します。Web、WeChat、企業微信、Feishu など、すべてのチャネルで利用可能です。 + +```text +/cancel +``` + ## config 実行時設定の表示または変更を行います。変更は即座に反映され、再起動は不要です。 diff --git a/docs/ja/cli/index.mdx b/docs/ja/cli/index.mdx index f8e28eff..4d00a654 100644 --- a/docs/ja/cli/index.mdx +++ b/docs/ja/cli/index.mdx @@ -57,6 +57,7 @@ Web コンソールや接続されたチャネルの会話で `/` を入力す | --- | --- | | `/help` | コマンドヘルプを表示 | | `/status` | サービスの状態と設定を表示 | +| `/cancel` | 実行中の Agent タスクを中止 | | `/config` | 実行時設定の表示・変更 | | `/skill` | スキル管理(インストール、アンインストール、有効化、無効化など) | | `/memory dream [N]` | 記憶蒸留を手動トリガー(デフォルト 3 日、最大 30) | @@ -80,6 +81,7 @@ Web コンソールや接続されたチャネルの会話で `/` を入力す | version | ✓ | ✓ | | status | ✓ | ✓ | | logs | ✓ | ✓ | +| cancel | ✗ | ✓ | | config | ✗ | ✓ | | context | — | ✓ | | memory(サブコマンド) | ✗ | ✓ | diff --git a/plugins/cow_cli/cow_cli.py b/plugins/cow_cli/cow_cli.py index fc721f51..b7dc6371 100644 --- a/plugins/cow_cli/cow_cli.py +++ b/plugins/cow_cli/cow_cli.py @@ -26,16 +26,19 @@ from common.log import logger from cli import __version__ -# Known top-level subcommands that cow supports +# Known top-level subcommands that cow supports. +# "start" / "stop" / "restart" refer to daemon lifecycle on the host shell; +# in chat, "/cancel" aborts the in-flight agent run instead. KNOWN_COMMANDS = { "help", "version", "status", "logs", "start", "stop", "restart", + "cancel", "skill", "context", "config", "knowledge", "memory", "install-browser", } -# Commands that can only run from the CLI (terminal), not in chat +# Commands that can only run from the CLI (terminal), not in chat. CLI_ONLY_COMMANDS = {"start", "stop", "restart"} # Commands that can only run from chat (need access to in-process memory) @@ -225,6 +228,7 @@ class CowCliPlugin(Plugin): " /help 显示此帮助", " /version 查看版本", " /status 查看运行状态", + " /cancel 中止当前正在运行的 Agent 任务", " /logs [N] 查看最近N条日志 (默认20)", " /context 查看当前对话上下文信息", " /context clear 清除当前对话上下文", @@ -250,6 +254,41 @@ class CowCliPlugin(Plugin): def _cmd_version(self, args: str, e_context, **_) -> str: return f"CowAgent v{__version__}" + # ------------------------------------------------------------------ + # cancel — abort the in-flight agent run for the current session. + # Fallback handler; in practice chat_channel/web_channel intercept + # /cancel earlier so it bypasses the per-session serial queue. + # ------------------------------------------------------------------ + + def _cmd_cancel(self, args: str, e_context: EventContext, session_id: str = "", **_) -> str: + """Signal the running agent to halt at its next checkpoint.""" + from agent.protocol import get_cancel_registry + + target_session = self._get_session_id(e_context, fallback=session_id) + registry = get_cancel_registry() + + # Prefer per-turn request_id (matches the key agent_bridge registered) + cancelled = 0 + request_id = "" + if e_context is not None: + try: + ctx = e_context["context"] + request_id = ctx.kwargs.get("request_id") or ctx.get("request_id", "") + except Exception: + request_id = "" + + if request_id and registry.cancel_request(request_id): + cancelled = 1 + + # Fall back to session-wide cancel + if cancelled == 0 and target_session: + cancelled = registry.cancel_session(target_session) + + if cancelled <= 0: + return "当前没有可中止的任务。" + + return "🛑 已中止" + # ------------------------------------------------------------------ # status # ------------------------------------------------------------------