diff --git a/agent/protocol/__init__.py b/agent/protocol/__init__.py
index a9fe5a3e..f0a7a4e2 100644
--- a/agent/protocol/__init__.py
+++ b/agent/protocol/__init__.py
@@ -3,6 +3,11 @@ from .agent_stream import AgentStreamExecutor
from .task import Task, TaskType, TaskStatus
from .result import AgentResult, AgentAction, AgentActionType, ToolResult
from .models import LLMModel, LLMRequest, ModelFactory
+from .cancel import (
+ AgentCancelledError,
+ CancelTokenRegistry,
+ get_cancel_registry,
+)
__all__ = [
'Agent',
@@ -16,5 +21,8 @@ __all__ = [
'ToolResult',
'LLMModel',
'LLMRequest',
- 'ModelFactory'
-]
\ No newline at end of file
+ 'ModelFactory',
+ 'AgentCancelledError',
+ 'CancelTokenRegistry',
+ 'get_cancel_registry',
+]
diff --git a/agent/protocol/agent.py b/agent/protocol/agent.py
index 285a9732..d944660b 100644
--- a/agent/protocol/agent.py
+++ b/agent/protocol/agent.py
@@ -365,7 +365,8 @@ class Agent:
return action
- def run_stream(self, user_message: str, on_event=None, clear_history: bool = False, skill_filter=None) -> str:
+ def run_stream(self, user_message: str, on_event=None, clear_history: bool = False,
+ skill_filter=None, cancel_event=None) -> str:
"""
Execute single agent task with streaming (based on tool-call)
@@ -374,6 +375,7 @@ class Agent:
- Multi-turn reasoning based on tool-call
- Event callbacks
- Persistent conversation history across calls
+ - User-initiated cancellation via ``cancel_event``
Args:
user_message: User message
@@ -381,6 +383,11 @@ class Agent:
event = {"type": str, "timestamp": float, "data": dict}
clear_history: If True, clear conversation history before this call (default: False)
skill_filter: Optional list of skill names to include in this run
+ cancel_event: Optional threading.Event polled at agent checkpoints.
+ When set, the loop exits at the next safe point, injects a
+ "[Interrupted by user]" assistant note, and returns the
+ partial response. ``messages`` stays in a valid state
+ (tool_use/tool_result pairs preserved).
Returns:
Final response text
@@ -424,7 +431,8 @@ class Agent:
max_turns=self.max_steps,
on_event=on_event,
messages=messages_copy, # Pass copied message history
- max_context_turns=max_context_turns
+ max_context_turns=max_context_turns,
+ cancel_event=cancel_event,
)
# Execute
diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py
index ef4f975b..701b00de 100644
--- a/agent/protocol/agent_stream.py
+++ b/agent/protocol/agent_stream.py
@@ -7,6 +7,7 @@ import json
import time
from typing import List, Dict, Any, Optional, Callable, Tuple
+from agent.protocol.cancel import AgentCancelledError
from agent.protocol.models import LLMRequest, LLMModel
from agent.protocol.message_utils import sanitize_claude_messages, compress_turn_to_text_only
from agent.tools.base_tool import BaseTool, ToolResult
@@ -64,7 +65,8 @@ class AgentStreamExecutor:
max_turns: int = 50,
on_event: Optional[Callable] = None,
messages: Optional[List[Dict]] = None,
- max_context_turns: int = 30
+ max_context_turns: int = 30,
+ cancel_event=None,
):
"""
Initialize stream executor
@@ -78,6 +80,10 @@ class AgentStreamExecutor:
on_event: Event callback function
messages: Optional existing message history (for persistent conversations)
max_context_turns: Maximum number of conversation turns to keep in context
+ cancel_event: Optional threading.Event used to signal user cancel.
+ Checked at every safe point (turn boundary, before tool execution,
+ during LLM streaming). When set, raises AgentCancelledError which
+ run_stream catches to gracefully wind down.
"""
self.agent = agent
self.model = model
@@ -87,6 +93,7 @@ class AgentStreamExecutor:
self.max_turns = max_turns
self.on_event = on_event
self.max_context_turns = max_context_turns
+ self.cancel_event = cancel_event
# Message history - use provided messages or create new list
self.messages = messages if messages is not None else []
@@ -97,6 +104,73 @@ class AgentStreamExecutor:
# Track files to send (populated by read tool)
self.files_to_send = [] # List of file metadata dicts
+ def _check_cancelled(self) -> None:
+ """Raise AgentCancelledError if the user requested cancellation.
+
+ Called at safe points (turn start, between tool calls, between LLM
+ chunks). Cheap to call: just an Event.is_set() probe.
+ """
+ if self.cancel_event is not None and self.cancel_event.is_set():
+ raise AgentCancelledError("agent cancelled by user")
+
+ def _handle_cancelled(self, partial_response: str) -> None:
+ """Wind down ``self.messages`` after a user-initiated cancel.
+
+ The messages list may be in any of these states when we get here:
+ (a) Last message is an assistant message containing tool_use
+ blocks but the matching tool_result has not been appended yet.
+ (b) Last message is an assistant text-only reply (cancel happened
+ right before the next turn started).
+ (c) Last message is a user tool_result message and we cancelled
+ between turns.
+
+ For (a) we MUST synthesise tool_result blocks, otherwise the next
+ request will fail Claude/OpenAI's strict pairing validation. For
+ (b)/(c) the state is already valid and we just append a small
+ cancellation note so the user/LLM both see the boundary clearly.
+ """
+ try:
+ # Step 1: close any orphaned tool_use in the trailing assistant
+ # message by injecting matching tool_result blocks.
+ if self.messages and isinstance(self.messages[-1], dict) \
+ and self.messages[-1].get("role") == "assistant":
+ last = self.messages[-1]
+ content = last.get("content")
+ if isinstance(content, list):
+ pending_tool_use_ids = [
+ block.get("id")
+ for block in content
+ if isinstance(block, dict) and block.get("type") == "tool_use"
+ ]
+ pending_tool_use_ids = [tid for tid in pending_tool_use_ids if tid]
+ if pending_tool_use_ids:
+ tool_result_blocks = [
+ {
+ "type": "tool_result",
+ "tool_use_id": tid,
+ "content": "Cancelled by user before this tool finished.",
+ "is_error": True,
+ }
+ for tid in pending_tool_use_ids
+ ]
+ self.messages.append({
+ "role": "user",
+ "content": tool_result_blocks,
+ })
+ logger.info(
+ f"[Agent] Injected {len(tool_result_blocks)} cancellation "
+ f"tool_result blocks to keep message history valid"
+ )
+
+ # Step 2: append a stable "interrupted" marker so the LLM sees a
+ # clear stop boundary on the next turn.
+ self.messages.append({
+ "role": "assistant",
+ "content": [{"type": "text", "text": "_(Cancelled by user)_"}],
+ })
+ except Exception as e:
+ logger.warning(f"[Agent] _handle_cancelled cleanup failed: {e}")
+
def _emit_event(self, event_type: str, data: dict = None):
"""Emit event"""
if self.on_event:
@@ -270,8 +344,13 @@ class AgentStreamExecutor:
final_response = ""
turn = 0
+ cancelled = False
try:
while turn < self.max_turns:
+ # Check at the very top of every turn so a cancel arriving
+ # between turns short-circuits cleanly.
+ self._check_cancelled()
+
turn += 1
logger.info(f"[Agent] 第 {turn} 轮")
self._emit_event("turn_start", {"turn": turn})
@@ -375,6 +454,8 @@ class AgentStreamExecutor:
try:
for tool_call in tool_calls:
+ # Honour cancel between tool invocations within the same turn
+ self._check_cancelled()
result = self._execute_tool(tool_call)
tool_results.append(result)
@@ -557,6 +638,15 @@ class AgentStreamExecutor:
self.messages.pop(prompt_insert_idx)
logger.debug("[Agent] Removed injected max-steps prompt from message history")
+ except AgentCancelledError:
+ # User-initiated stop: wind down message history cleanly so the
+ # next turn is unaffected; channels emit a "cancelled" UI event.
+ cancelled = True
+ logger.info(f"[Agent] 🛑 已被用户中止 (第 {turn} 轮)")
+ self._handle_cancelled(final_response)
+ if not final_response or not final_response.strip():
+ final_response = "_(Cancelled)_"
+
except Exception as e:
logger.error(f"❌ Agent执行错误: {e}")
self._emit_event("error", {"error": str(e)})
@@ -564,8 +654,11 @@ class AgentStreamExecutor:
finally:
final_response = final_response.strip() if final_response else final_response
- logger.info(f"[Agent] 🏁 完成 ({turn}轮)")
- self._emit_event("agent_end", {"final_response": final_response})
+ if cancelled:
+ # Emit before agent_end so channels can mark UI as cancelled
+ self._emit_event("agent_cancelled", {"final_response": final_response})
+ logger.info(f"[Agent] 🏁 完成 ({turn}轮)" + (" [cancelled]" if cancelled else ""))
+ self._emit_event("agent_end", {"final_response": final_response, "cancelled": cancelled})
return final_response
@@ -644,7 +737,32 @@ class AgentStreamExecutor:
try:
stream = self.model.call_stream(request)
+ # Probe cancel every N chunks to bound reaction time without
+ # checking on every token.
+ _cancel_probe_counter = 0
+ _CANCEL_PROBE_EVERY = 8
+
for chunk in stream:
+ _cancel_probe_counter += 1
+ if _cancel_probe_counter >= _CANCEL_PROBE_EVERY:
+ _cancel_probe_counter = 0
+ if self.cancel_event is not None and self.cancel_event.is_set():
+ # Persist partial text only; tool_use args may be
+ # truncated mid-stream and would fail validation.
+ logger.info("[Agent] cancel detected mid-stream, aborting LLM call")
+ if full_content:
+ partial_msg = {
+ "role": "assistant",
+ "content": [{"type": "text", "text": full_content}],
+ }
+ self.messages.append(partial_msg)
+ self._emit_event("message_end", {
+ "content": full_content,
+ "tool_calls": [],
+ "cancelled": True,
+ })
+ raise AgentCancelledError("cancelled during LLM streaming")
+
# Check for errors
if isinstance(chunk, dict) and chunk.get("error"):
# Extract error message from nested structure
@@ -738,6 +856,10 @@ class AgentStreamExecutor:
elif isinstance(choice, dict) and choice.get("_gemini_raw_parts"):
gemini_raw_parts = choice["_gemini_raw_parts"]
+ except AgentCancelledError:
+ # Must propagate untouched; never treat as a retryable error.
+ raise
+
except Exception as e:
error_str = str(e)
error_str_lower = error_str.lower()
diff --git a/agent/protocol/cancel.py b/agent/protocol/cancel.py
new file mode 100644
index 00000000..6354cd38
--- /dev/null
+++ b/agent/protocol/cancel.py
@@ -0,0 +1,121 @@
+"""
+Cancel token registry for aborting in-flight agent runs.
+
+A user cancel (web Cancel button, /cancel command) sets a threading.Event
+that the agent loop polls at safe checkpoints. Tokens are keyed by
+request_id (preferred) and tracked under session_id as a fallback. Entries
+are released after the run completes to keep the registry bounded.
+
+No project deps — importable from any layer without circular imports.
+"""
+
+from __future__ import annotations
+
+import threading
+from typing import Dict, Optional
+
+
+class AgentCancelledError(Exception):
+ """Raised inside the agent loop when a stop has been requested.
+
+ The agent stream executor catches this, injects a "[Interrupted]" note
+ into the message history (preserving tool_use/tool_result integrity)
+ and returns a partial response to the caller.
+ """
+
+
+class _CancelEntry:
+ __slots__ = ("event", "session_id")
+
+ def __init__(self, session_id: Optional[str]):
+ self.event = threading.Event()
+ self.session_id = session_id
+
+
+class CancelTokenRegistry:
+ """In-process registry mapping request_id -> cancel Event.
+
+ Thread-safe. Singleton via module-level ``_registry``.
+ """
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._by_request: Dict[str, _CancelEntry] = {}
+ # session_id -> set of request_ids currently in flight (usually 1).
+ self._by_session: Dict[str, set] = {}
+
+ def register(self, request_id: str, session_id: Optional[str] = None) -> threading.Event:
+ """Create (or return existing) cancel event for a request.
+
+ Returns the threading.Event the caller should poll via ``is_set()``.
+ """
+ if not request_id:
+ return threading.Event()
+ with self._lock:
+ entry = self._by_request.get(request_id)
+ if entry is None:
+ entry = _CancelEntry(session_id)
+ self._by_request[request_id] = entry
+ if session_id:
+ self._by_session.setdefault(session_id, set()).add(request_id)
+ return entry.event
+
+ def get_event(self, request_id: str) -> Optional[threading.Event]:
+ if not request_id:
+ return None
+ with self._lock:
+ entry = self._by_request.get(request_id)
+ return entry.event if entry else None
+
+ def cancel_request(self, request_id: str) -> bool:
+ """Trigger cancel for a specific request. Returns True when matched."""
+ if not request_id:
+ return False
+ with self._lock:
+ entry = self._by_request.get(request_id)
+ if entry is None:
+ return False
+ entry.event.set()
+ return True
+
+ def cancel_session(self, session_id: str) -> int:
+ """Trigger cancel for every in-flight request of a session.
+
+ Returns the number of requests cancelled (0 when nothing was running).
+ """
+ if not session_id:
+ return 0
+ with self._lock:
+ request_ids = list(self._by_session.get(session_id, ()))
+ entries = [self._by_request[r] for r in request_ids if r in self._by_request]
+ for entry in entries:
+ entry.event.set()
+ return len(entries)
+
+ def unregister(self, request_id: str) -> None:
+ """Remove an entry once the agent run is done. Safe to call twice."""
+ if not request_id:
+ return
+ with self._lock:
+ entry = self._by_request.pop(request_id, None)
+ if entry and entry.session_id:
+ bucket = self._by_session.get(entry.session_id)
+ if bucket is not None:
+ bucket.discard(request_id)
+ if not bucket:
+ self._by_session.pop(entry.session_id, None)
+
+ def has_active(self, session_id: str) -> bool:
+ if not session_id:
+ return False
+ with self._lock:
+ bucket = self._by_session.get(session_id)
+ return bool(bucket)
+
+
+_registry = CancelTokenRegistry()
+
+
+def get_cancel_registry() -> CancelTokenRegistry:
+ """Module-level accessor for the singleton registry."""
+ return _registry
diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py
index a20659a8..a924dab2 100644
--- a/bridge/agent_bridge.py
+++ b/bridge/agent_bridge.py
@@ -5,7 +5,7 @@ Agent Bridge - Integrates Agent system with existing COW bridge
import os
from typing import Optional, List
-from agent.protocol import Agent, LLMModel, LLMRequest
+from agent.protocol import Agent, LLMModel, LLMRequest, get_cancel_registry
from bridge.agent_event_handler import AgentEventHandler
from bridge.agent_initializer import AgentInitializer
from bridge.bridge import Bridge
@@ -399,11 +399,22 @@ class AgentBridge:
"""
session_id = None
agent = None
+ request_id = None
+ cancel_event = None
try:
# Extract session_id from context for user isolation
if context:
session_id = context.kwargs.get("session_id") or context.get("session_id")
-
+ request_id = context.kwargs.get("request_id") or context.get("request_id")
+
+ # Register a cancel token. Prefer per-turn request_id (web),
+ # fall back to session_id (IM channels). The Event is polled by
+ # AgentStreamExecutor at safe checkpoints.
+ registry = get_cancel_registry()
+ token_key = request_id or session_id
+ if token_key:
+ cancel_event = registry.register(token_key, session_id=session_id)
+
# Get agent for this session (will auto-initialize if needed)
agent = self.get_agent(session_id=session_id)
if not agent:
@@ -458,7 +469,8 @@ class AgentBridge:
response = agent.run_stream(
user_message=query,
on_event=event_handler.handle_event,
- clear_history=clear_history
+ clear_history=clear_history,
+ cancel_event=cancel_event,
)
finally:
# Restore original tools
@@ -468,6 +480,13 @@ class AgentBridge:
# Log execution summary
event_handler.log_summary()
+ # Release cancel token; keep registry bounded.
+ if token_key:
+ try:
+ registry.unregister(token_key)
+ except Exception:
+ pass
+
# Persist new messages generated during this run
if session_id:
channel_type = (context.get("channel_type") or "") if context else ""
@@ -521,6 +540,12 @@ class AgentBridge:
logger.info(f"[AgentBridge] Cleared DB for session after error: {session_id}")
except Exception as db_err:
logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}")
+ # Release cancel token on error path too (idempotent).
+ if cancel_event is not None and (request_id or session_id):
+ try:
+ get_cancel_registry().unregister(request_id or session_id)
+ except Exception:
+ pass
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
def _schedule_mcp_hot_reload(self, agent):
diff --git a/channel/chat_channel.py b/channel/chat_channel.py
index c38dd7c8..6a9a1952 100644
--- a/channel/chat_channel.py
+++ b/channel/chat_channel.py
@@ -438,8 +438,21 @@ class ChatChannel(Channel):
return func
+ # Chat commands that must bypass the per-session serial queue,
+ # otherwise /cancel would queue behind the task it tries to cancel.
+ # Use /cancel (not /stop) to avoid colliding with `cow stop` CLI.
+ _BYPASS_QUEUE_COMMANDS = ("/cancel",)
+
def produce(self, context: Context):
session_id = context["session_id"]
+
+ # Fast path: /cancel must not enter the queue.
+ if context.type == ContextType.TEXT and context.content:
+ stripped = context.content.strip().lower()
+ if stripped in self._BYPASS_QUEUE_COMMANDS:
+ self._handle_cancel_command(context, session_id)
+ return
+
with self.lock:
if session_id not in self.sessions:
self.sessions[session_id] = [
@@ -451,6 +464,29 @@ class ChatChannel(Channel):
else:
self.sessions[session_id][0].put(context)
+ def _handle_cancel_command(self, context: Context, session_id: str) -> None:
+ """Cancel any in-flight agent run for *session_id* and reply inline.
+
+ Runs synchronously on the caller's thread. Reply is sent through
+ _send_reply so plugins (e.g. logging) still observe it.
+ """
+ try:
+ from agent.protocol import get_cancel_registry
+ from bridge.reply import Reply, ReplyType
+
+ cancelled = get_cancel_registry().cancel_session(session_id)
+ text = (
+ "🛑 已中止"
+ if cancelled > 0
+ else "当前没有可中止的任务。"
+ )
+ logger.info(
+ f"[chat_channel] /cancel fast-path: session={session_id}, cancelled={cancelled}"
+ )
+ self._send_reply(context, Reply(ReplyType.TEXT, text))
+ except Exception as e:
+ logger.warning(f"[chat_channel] /cancel fast-path failed: {e}")
+
# 消费者函数,单独线程,用于从消息队列中取出消息并处理
def consume(self):
while True:
diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py
index ca18e64b..9a9f3307 100644
--- a/channel/feishu/feishu_channel.py
+++ b/channel/feishu/feishu_channel.py
@@ -752,6 +752,9 @@ class FeiShuChanel(ChatChannel):
init_in_flight = [False]
# 一旦初始化失败就长期标记为 disabled,本次回复不再尝试任何流式调用
disabled = [False]
+ # True after agent_cancelled: agent_end stops rewriting the card
+ # with stale final_response and just finalizes current content.
+ cancelled = [False]
lock = threading.Lock()
# ---- 异步推送队列 ----------------------------------------------------
@@ -1076,18 +1079,42 @@ class FeiShuChanel(ChatChannel):
message_id[0] = None
sequence[0] = 0
+ elif event_type == "agent_cancelled":
+ # Lock channel into "no-rewrite" mode: the subsequent
+ # agent_end's final_response is from the last *completed*
+ # turn (the user already saw it), so rewriting the card
+ # would duplicate it visually.
+ with lock:
+ cancelled[0] = True
+
elif event_type == "agent_end":
# 最终回复:用 final_response 覆盖当前流式卡片,然后关闭流式模式。
final_response = data.get("final_response", "")
- if not final_response:
- return
- final_text = str(final_response)
# 标记 streamed 让 chat_channel 跳过 send()
context["feishu_streamed"] = True
with lock:
+ was_cancelled = cancelled[0]
has_card = card_id[0] is not None
init_busy = init_in_flight[0]
+ pending_text = current_text[0]
+
+ if was_cancelled:
+ # Cancelled path: finalize the in-flight card with
+ # partial output (or a short marker if empty); drop
+ # stale final_response to avoid duplicating last turn.
+ if has_card:
+ _drain_push_queue()
+ partial = (pending_text or "").rstrip()
+ final_text = partial or "_(已中止)_"
+ _stream_update_text(final_text)
+ _close_streaming_mode(final_text)
+ push_queue.put(None)
+ return
+
+ if not final_response:
+ return
+ final_text = str(final_response)
# 罕见情况:agent_end 触发时还没创建过卡片(极快返回 / 没有
# message_update),主动创建一张承载 final_text。
diff --git a/channel/web/chat.html b/channel/web/chat.html
index 947e07b7..d90adb15 100644
--- a/channel/web/chat.html
+++ b/channel/web/chat.html
@@ -445,7 +445,7 @@
bg-primary-400 text-white hover:bg-primary-500
disabled:bg-slate-300 dark:disabled:bg-slate-600
disabled:cursor-not-allowed cursor-pointer transition-colors duration-150"
- disabled onclick="sendMessage()">
+ disabled>
diff --git a/channel/web/static/css/console.css b/channel/web/static/css/console.css
index 35fc307b..957db3e0 100644
--- a/channel/web/static/css/console.css
+++ b/channel/web/static/css/console.css
@@ -1367,3 +1367,35 @@
text-align: right;
}
.voice-pill audio { display: none; }
+
+/* Send button toggles into a Stop button while an SSE stream is in flight.
+ Match the look of the disabled send button (light grey block + white
+ glyph) so it reads as the same visual element, just paused/idle from
+ sending perspective and clickable to stop. */
+#send-btn.send-btn-cancel {
+ background-color: rgb(203 213 225) !important; /* slate-300, == disabled send-btn */
+ color: white !important;
+}
+#send-btn.send-btn-cancel:hover {
+ background-color: rgb(148 163 184) !important; /* slate-400 */
+}
+#send-btn.send-btn-cancel:disabled {
+ background-color: rgb(226 232 240) !important; /* slate-200, while stop is in flight */
+ color: white !important;
+ cursor: progress;
+}
+.dark #send-btn.send-btn-cancel {
+ background-color: rgb(71 85 105) !important; /* slate-600, == dark disabled send-btn */
+ color: white !important;
+}
+.dark #send-btn.send-btn-cancel:hover {
+ background-color: rgb(100 116 139) !important; /* slate-500 */
+}
+.dark #send-btn.send-btn-cancel:disabled {
+ background-color: rgb(51 65 85) !important; /* slate-700 */
+ color: rgb(203 213 225) !important;
+}
+
+.agent-cancelled-tag {
+ font-style: italic;
+}
diff --git a/channel/web/static/js/console.js b/channel/web/static/js/console.js
index 7376e29c..6d0a66fc 100644
--- a/channel/web/static/js/console.js
+++ b/channel/web/static/js/console.js
@@ -1016,7 +1016,60 @@ const inputHistory = [];
let historyIdx = -1;
let historySavedDraft = '';
+// While an SSE stream is in flight, the send button morphs into a cancel
+// button. Only one in-flight request is supported at a time.
+let activeRequestId = null;
+let sendBtnMode = 'send'; // 'send' | 'cancel'
+
+function setSendBtnCancelMode(requestId) {
+ activeRequestId = requestId;
+ sendBtnMode = 'cancel';
+ sendBtn.disabled = false;
+ sendBtn.classList.add('send-btn-cancel');
+ sendBtn.title = (currentLang === 'zh' ? '中止' : 'Cancel');
+ sendBtn.innerHTML = '';
+}
+
+function resetSendBtnSendMode() {
+ activeRequestId = null;
+ sendBtnMode = 'send';
+ sendBtn.classList.remove('send-btn-cancel');
+ sendBtn.title = '';
+ sendBtn.innerHTML = '';
+ updateSendBtnState();
+}
+
+function requestCancel() {
+ const reqId = activeRequestId;
+ if (!reqId) return;
+ fetch('/cancel', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ request_id: reqId, session_id: sessionId, lang: currentLang }),
+ }).catch(err => {
+ console.warn('[cancel] request failed', err);
+ });
+ // Optimistic UI lock so the click visibly registers before the SSE
+ // "cancelled" event arrives.
+ sendBtn.disabled = true;
+ sendBtn.title = (currentLang === 'zh' ? '已中止' : 'Cancelled');
+}
+
+// Button click is the only path to Cancel. Pressing Enter still calls
+// sendMessage() so users can submit "/cancel" as a regular slash command.
+sendBtn.addEventListener('click', () => {
+ if (sendBtnMode === 'cancel') {
+ requestCancel();
+ } else {
+ sendMessage();
+ }
+});
+
function updateSendBtnState() {
+ if (sendBtnMode === 'cancel') {
+ // Don't downgrade a Cancel button on input edits.
+ return;
+ }
sendBtn.disabled = uploadingCount > 0 || (!chatInput.value.trim() && pendingAttachments.length === 0);
}
@@ -1264,6 +1317,7 @@ const SLASH_COMMANDS = [
{ cmd: '/knowledge on', desc: '开启知识库' },
{ cmd: '/knowledge off', desc: '关闭知识库' },
{ cmd: '/config', desc: '查看当前配置' },
+ { cmd: '/cancel', desc: '中止当前正在运行的 Agent 任务' },
{ cmd: '/logs', desc: '查看最近日志' },
{ cmd: '/version', desc: '查看版本' },
];
@@ -1534,6 +1588,7 @@ function sendVoiceMessage(text, audioUrl) {
stream: true,
timestamp: timestamp.toISOString(),
is_voice: true,
+ lang: currentLang,
};
const MAX_RETRIES = 2;
@@ -1547,7 +1602,12 @@ function sendVoiceMessage(text, audioUrl) {
.then(r => r.json())
.then(data => {
if (data.status === 'success') {
- if (data.stream) {
+ if (data.inline_reply) {
+ // Synchronous fast-path reply (e.g. /cancel); skip SSE.
+ loadingEl.remove();
+ addBotMessage(data.inline_reply, new Date());
+ } else if (data.stream) {
+ setSendBtnCancelMode(data.request_id);
startSSE(data.request_id, loadingEl, timestamp, titleInfo);
} else {
loadingContainers[data.request_id] = loadingEl;
@@ -1555,6 +1615,7 @@ function sendVoiceMessage(text, audioUrl) {
} else {
loadingEl.remove();
addBotMessage(t('error_send'), new Date());
+ resetSendBtnSendMode();
}
})
.catch(err => {
@@ -1591,6 +1652,10 @@ function addUserVoiceMessage(audioUrl, caption, timestamp) {
}
function sendMessage() {
+ // Do NOT branch on sendBtnMode here: Enter should always send (so
+ // typing "/cancel" submits normally). Cancel is wired only to the
+ // send button's pointer click — see send-btn listener above.
+
const text = chatInput.value.trim();
if (!text && pendingAttachments.length === 0) return;
@@ -1619,7 +1684,7 @@ function sendMessage() {
renderAttachmentPreview();
sendBtn.disabled = true;
- const body = { session_id: sessionId, message: text, stream: true, timestamp: timestamp.toISOString() };
+ const body = { session_id: sessionId, message: text, stream: true, timestamp: timestamp.toISOString(), lang: currentLang };
if (attachments.length > 0) {
body.attachments = attachments.map(a => ({
file_path: a.file_path,
@@ -1641,7 +1706,13 @@ function sendMessage() {
.then(r => r.json())
.then(data => {
if (data.status === 'success') {
- if (data.stream) {
+ if (data.inline_reply) {
+ // Channel handled synchronously (e.g. /cancel fast-path);
+ // render as a bot bubble and skip SSE entirely.
+ loadingEl.remove();
+ addBotMessage(data.inline_reply, new Date());
+ } else if (data.stream) {
+ setSendBtnCancelMode(data.request_id);
startSSE(data.request_id, loadingEl, timestamp, titleInfo);
} else {
loadingContainers[data.request_id] = loadingEl;
@@ -1649,12 +1720,14 @@ function sendMessage() {
} else {
loadingEl.remove();
addBotMessage(t('error_send'), new Date());
+ resetSendBtnSendMode();
}
})
.catch(err => {
if (err.name === 'AbortError') {
loadingEl.remove();
addBotMessage(t('error_timeout'), new Date());
+ resetSendBtnSendMode();
return;
}
if (attempt < MAX_RETRIES) {
@@ -1664,6 +1737,7 @@ function sendMessage() {
}
loadingEl.remove();
addBotMessage(t('error_send'), new Date());
+ resetSendBtnSendMode();
});
}
@@ -1919,14 +1993,33 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) {
stepsEl.appendChild(wrap);
scrollChatToBottom();
+ } else if (item.type === 'cancelled') {
+ // Agent acknowledged the stop; mark the bubble. A trailing
+ // "done" still arrives with the partial answer.
+ ensureBotEl();
+ if (currentReasoningEl) {
+ finalizeThinking(currentReasoningEl, reasoningStartTime, reasoningText);
+ currentReasoningEl = null;
+ reasoningText = '';
+ }
+ if (!botEl.querySelector('.agent-cancelled-tag')) {
+ const tag = document.createElement('div');
+ tag.className = 'agent-cancelled-tag text-xs text-amber-600 dark:text-amber-400 mt-1';
+ tag.textContent = (currentLang === 'zh') ? '已中止' : 'Cancelled';
+ stepsEl.appendChild(tag);
+ }
+ resetSendBtnSendMode();
+
} else if (item.type === 'done') {
// Don't close the stream yet: the backend keeps it open
// for a short tail to deliver async attachments such as
// TTS audio (`voice_attach`). It will close the stream on
// its own via onerror once the tail expires.
done = true;
+ resetSendBtnSendMode();
- const finalText = item.content || accumulatedText;
+ const finalTextRaw = item.content || accumulatedText;
+ const finalText = localizeCancelMarker(finalTextRaw);
if (!botEl && finalText) {
if (loadingEl) { loadingEl.remove(); loadingEl = null; }
@@ -1934,7 +2027,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) {
} else if (botEl) {
contentEl.classList.remove('sse-streaming');
if (finalText) contentEl.innerHTML = renderMarkdown(finalText);
- contentEl.dataset.rawMd = finalText || '';
+ contentEl.dataset.rawMd = finalTextRaw || '';
const copyBtn = botEl.querySelector('.copy-msg-btn');
if (copyBtn && finalText) copyBtn.style.display = '';
applyHighlighting(botEl);
@@ -1964,6 +2057,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) {
delete activeStreams[requestId];
if (loadingEl) { loadingEl.remove(); loadingEl = null; }
addBotMessage(t('error_send'), new Date());
+ resetSendBtnSendMode();
}
};
@@ -2000,6 +2094,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) {
applyHighlighting(botEl);
bindChatKnowledgeLinks(botEl);
}
+ resetSendBtnSendMode();
};
}
@@ -2238,13 +2333,23 @@ function _renderSentFileFromToolResult(step) {
` ${escapeHtml(fileName)}`;
}
+// Cosmetic translator for cancel markers persisted in history.
+// History keeps the English canonical form for the LLM; only display is localized.
+function localizeCancelMarker(text) {
+ if (!text) return text;
+ if (currentLang !== 'zh') return text;
+ return text
+ .replace(/_\(Cancelled by user\)_/g, '_(用户已中止)_')
+ .replace(/_\(Cancelled\)_/g, '_(已中止)_');
+}
+
function createBotMessageEl(content, timestamp, requestId, msg) {
const el = document.createElement('div');
el.className = 'flex gap-3 px-4 sm:px-6 py-3';
if (requestId) el.dataset.requestId = requestId;
let stepsHtml = '';
- let displayContent = content;
+ let displayContent = localizeCancelMarker(content);
if (msg && msg.steps && msg.steps.length > 0) {
// New format: ordered steps with interleaved content
diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py
index 371f0271..ab1d6915 100644
--- a/channel/web/web_channel.py
+++ b/channel/web/web_channel.py
@@ -93,6 +93,15 @@ def _require_auth():
json.dumps({"status": "error", "message": "Unauthorized"}))
+# Localized text for /cancel system replies. Web is the only channel that
+# honors a per-request `lang`; other channels reply in Chinese by default.
+def _cancel_reply_text(cancelled: int, lang: str) -> str:
+ en = lang.startswith("en")
+ if cancelled > 0:
+ return "🛑 Cancelled." if en else "🛑 已中止"
+ return "Nothing to cancel." if en else "当前没有可中止的任务。"
+
+
def _get_upload_dir() -> str:
from common.utils import expand_path
ws_root = expand_path(conf().get("agent_workspace", "~/cow"))
@@ -437,6 +446,18 @@ class WebChannel(ChatChannel):
"timestamp": time.time(),
})
+ elif event_type == "agent_cancelled":
+ # Push an explicit cancelled SSE event so the frontend
+ # marks the bubble as stopped. A trailing "done" still
+ # arrives with the partial answer.
+ final_response = data.get("final_response", "")
+ q.put({
+ "type": "cancelled",
+ "content": final_response,
+ "request_id": request_id,
+ "timestamp": time.time(),
+ })
+
elif event_type == "agent_end":
# Safety net: if the agent finishes with an empty final_response,
# chat_channel skips _send_reply (because reply.content is empty),
@@ -756,6 +777,25 @@ class WebChannel(ChatChannel):
# desire_rtype concept used by other channels).
is_voice_input = bool(json_data.get('is_voice', False))
+ # Fast path for /cancel: bypass the session queue and SSE setup.
+ # Web frontend (stream=true) only listens to SSE, so we return an
+ # inline_reply payload to be rendered synchronously.
+ stripped_prompt = (prompt or "").strip().lower()
+ if stripped_prompt == "/cancel":
+ from agent.protocol import get_cancel_registry
+ cancelled = get_cancel_registry().cancel_session(session_id)
+ lang = (json_data.get('lang') or 'zh').lower()
+ msg_text = _cancel_reply_text(cancelled, lang)
+ logger.info(
+ f"[WebChannel] /cancel fast-path: session={session_id}, cancelled={cancelled}, lang={lang}"
+ )
+ return json.dumps({
+ "status": "success",
+ "request_id": "",
+ "stream": False,
+ "inline_reply": msg_text,
+ })
+
# Append file references to the prompt (same format as QQ channel)
if attachments:
file_refs = []
@@ -862,6 +902,11 @@ class WebChannel(ChatChannel):
if itype == "done":
post_done = True
post_deadline = time.time() + POST_DONE_TAIL_SECONDS
+ elif itype == "cancelled":
+ # Close SSE tail quickly after cancel; don't wait for the
+ # full TTS tail since the user already pressed Stop.
+ post_done = True
+ post_deadline = time.time() + 3
elif itype == "voice_attach":
# WSGI buffers the previous chunk until the next yield;
# shrink the tail so the generator wakes up quickly to
@@ -872,6 +917,59 @@ class WebChannel(ChatChannel):
finally:
self.sse_queues.pop(request_id, None)
+ def cancel_request(self):
+ """
+ Cancel an in-flight agent run.
+
+ Body: {"request_id": "...", "session_id": "..."}
+ Either field is sufficient; request_id is preferred when known.
+ Always returns success even when nothing was running, so the
+ client's UX is idempotent.
+ """
+ try:
+ from agent.protocol import get_cancel_registry
+
+ data = web.data()
+ try:
+ json_data = json.loads(data) if data else {}
+ except Exception:
+ json_data = {}
+
+ request_id = (json_data.get("request_id") or "").strip()
+ session_id = (json_data.get("session_id") or "").strip()
+ lang = (json_data.get("lang") or "zh").lower()
+
+ registry = get_cancel_registry()
+ cancelled = 0
+
+ if request_id:
+ if registry.cancel_request(request_id):
+ cancelled = 1
+
+ if cancelled == 0 and session_id:
+ cancelled = registry.cancel_session(session_id)
+
+ if request_id and request_id in self.sse_queues:
+ self.sse_queues[request_id].put({
+ "type": "cancelled",
+ "content": "Cancelled" if lang.startswith("en") else "已中止",
+ "request_id": request_id,
+ "timestamp": time.time(),
+ })
+
+ logger.info(
+ f"[WebChannel] cancel request: request_id={request_id!r}, "
+ f"session_id={session_id!r}, cancelled={cancelled}"
+ )
+ return json.dumps({
+ "status": "success",
+ "cancelled": cancelled,
+ })
+
+ except Exception as e:
+ logger.error(f"[WebChannel] cancel_request error: {e}")
+ return json.dumps({"status": "error", "message": str(e)})
+
def poll_response(self):
"""
Poll for responses using the session_id.
@@ -967,6 +1065,7 @@ class WebChannel(ChatChannel):
'/api/voice/tts', 'VoiceTtsHandler',
'/poll', 'PollHandler',
'/stream', 'StreamHandler',
+ '/cancel', 'CancelHandler',
'/chat', 'ChatHandler',
'/config', 'ConfigHandler',
'/api/models', 'ModelsHandler',
@@ -1240,6 +1339,12 @@ class PollHandler:
return WebChannel().poll_response()
+class CancelHandler:
+ def POST(self):
+ _require_auth()
+ return WebChannel().cancel_request()
+
+
class StreamHandler:
def GET(self):
_require_auth()
diff --git a/channel/wecom_bot/wecom_bot_channel.py b/channel/wecom_bot/wecom_bot_channel.py
index 0fe4500b..ebc1104b 100644
--- a/channel/wecom_bot/wecom_bot_channel.py
+++ b/channel/wecom_bot/wecom_bot_channel.py
@@ -440,6 +440,17 @@ class WecomBotChannel(ChatChannel):
state["current"] = ""
_push_stream(state, force=True)
+ elif event_type == "agent_cancelled":
+ # Flush partial output and strip trailing "---" separator
+ # left over from previous turn, to avoid a dangling divider.
+ if state["current"]:
+ state["committed"] += state["current"]
+ state["current"] = ""
+ state["committed"] = state["committed"].rstrip()
+ if state["committed"].endswith("---"):
+ state["committed"] = state["committed"][:-3].rstrip()
+ _push_stream(state, force=True)
+
return on_event
# ------------------------------------------------------------------
diff --git a/docs/cli/general.mdx b/docs/cli/general.mdx
index cb3f933d..36af1783 100644
--- a/docs/cli/general.mdx
+++ b/docs/cli/general.mdx
@@ -39,6 +39,14 @@ Mode: agent
Session: 12 messages | 8 skills loaded
```
+## cancel
+
+中止当前会话正在运行的 Agent 任务。在 Agent 执行长时间任务(例如多轮工具调用、长流式输出)时,可随时发送 `/cancel`,Agent 会在下一次工具执行前停止。Web 端、微信、企业微信、飞书等各通道均可使用。
+
+```text
+/cancel
+```
+
## config
查看或修改运行时配置。修改后立即生效,无需重启服务。
diff --git a/docs/cli/index.mdx b/docs/cli/index.mdx
index ce67be27..f6462ecb 100644
--- a/docs/cli/index.mdx
+++ b/docs/cli/index.mdx
@@ -57,6 +57,7 @@ Others:
| --- | --- |
| `/help` | 显示命令帮助 |
| `/status` | 查看服务状态和配置 |
+| `/cancel` | 中止当前正在运行的 Agent 任务 |
| `/config` | 查看或修改运行时配置 |
| `/skill` | 管理技能(安装、卸载、启用、禁用等) |
| `/memory dream [N]` | 手动触发记忆蒸馏(默认 3 天,最大 30) |
@@ -82,6 +83,7 @@ Others:
| version | ✓ | ✓ |
| status | ✓ | ✓ |
| logs | ✓ | ✓ |
+| cancel | ✗ | ✓ |
| config | ✗ | ✓ |
| context | — | ✓ |
| memory (子命令) | ✗ | ✓ |
diff --git a/docs/en/cli/general.mdx b/docs/en/cli/general.mdx
index ae78f3e4..8107fcb5 100644
--- a/docs/en/cli/general.mdx
+++ b/docs/en/cli/general.mdx
@@ -25,6 +25,14 @@ View current session and service status, including process info, model configura
/status
```
+## cancel
+
+Abort the agent task currently running in this session. When the agent is busy with a long task (e.g. multi-turn tool calls or a long streaming response), send `/cancel` and the agent will stop before the next tool execution. Available across all channels — Web, WeChat, WeCom, Feishu, etc.
+
+```text
+/cancel
+```
+
## config
View or modify runtime configuration. Changes take effect immediately without restarting.
diff --git a/docs/en/cli/index.mdx b/docs/en/cli/index.mdx
index 36147261..e13b45a3 100644
--- a/docs/en/cli/index.mdx
+++ b/docs/en/cli/index.mdx
@@ -57,6 +57,7 @@ In the Web console or any connected channel, type `/` to see command suggestions
| --- | --- |
| `/help` | Show command help |
| `/status` | View service status and configuration |
+| `/cancel` | Abort the currently running agent task |
| `/config` | View or modify runtime configuration |
| `/skill` | Manage skills (install, uninstall, enable, disable, etc.) |
| `/memory dream [N]` | Manually trigger memory distillation (default 3 days, max 30) |
@@ -80,6 +81,7 @@ In the Web console or any connected channel, type `/` to see command suggestions
| version | ✓ | ✓ |
| status | ✓ | ✓ |
| logs | ✓ | ✓ |
+| cancel | ✗ | ✓ |
| config | ✗ | ✓ |
| context | — | ✓ |
| memory (subcommands) | ✗ | ✓ |
diff --git a/docs/ja/cli/general.mdx b/docs/ja/cli/general.mdx
index ab24a14e..e31d7cf3 100644
--- a/docs/ja/cli/general.mdx
+++ b/docs/ja/cli/general.mdx
@@ -25,6 +25,14 @@ description: ステータスの確認、設定管理、コンテキスト制御
/status
```
+## cancel
+
+現在のセッションで実行中の Agent タスクを中止します。Agent が長時間のタスク(マルチターンのツール呼び出しや長いストリーミング応答など)を実行している間、`/cancel` を送信すると、次のツール実行の前に停止します。Web、WeChat、企業微信、Feishu など、すべてのチャネルで利用可能です。
+
+```text
+/cancel
+```
+
## config
実行時設定の表示または変更を行います。変更は即座に反映され、再起動は不要です。
diff --git a/docs/ja/cli/index.mdx b/docs/ja/cli/index.mdx
index f8e28eff..4d00a654 100644
--- a/docs/ja/cli/index.mdx
+++ b/docs/ja/cli/index.mdx
@@ -57,6 +57,7 @@ Web コンソールや接続されたチャネルの会話で `/` を入力す
| --- | --- |
| `/help` | コマンドヘルプを表示 |
| `/status` | サービスの状態と設定を表示 |
+| `/cancel` | 実行中の Agent タスクを中止 |
| `/config` | 実行時設定の表示・変更 |
| `/skill` | スキル管理(インストール、アンインストール、有効化、無効化など) |
| `/memory dream [N]` | 記憶蒸留を手動トリガー(デフォルト 3 日、最大 30) |
@@ -80,6 +81,7 @@ Web コンソールや接続されたチャネルの会話で `/` を入力す
| version | ✓ | ✓ |
| status | ✓ | ✓ |
| logs | ✓ | ✓ |
+| cancel | ✗ | ✓ |
| config | ✗ | ✓ |
| context | — | ✓ |
| memory(サブコマンド) | ✗ | ✓ |
diff --git a/plugins/cow_cli/cow_cli.py b/plugins/cow_cli/cow_cli.py
index fc721f51..b7dc6371 100644
--- a/plugins/cow_cli/cow_cli.py
+++ b/plugins/cow_cli/cow_cli.py
@@ -26,16 +26,19 @@ from common.log import logger
from cli import __version__
-# Known top-level subcommands that cow supports
+# Known top-level subcommands that cow supports.
+# "start" / "stop" / "restart" refer to daemon lifecycle on the host shell;
+# in chat, "/cancel" aborts the in-flight agent run instead.
KNOWN_COMMANDS = {
"help", "version", "status", "logs",
"start", "stop", "restart",
+ "cancel",
"skill", "context", "config",
"knowledge", "memory",
"install-browser",
}
-# Commands that can only run from the CLI (terminal), not in chat
+# Commands that can only run from the CLI (terminal), not in chat.
CLI_ONLY_COMMANDS = {"start", "stop", "restart"}
# Commands that can only run from chat (need access to in-process memory)
@@ -225,6 +228,7 @@ class CowCliPlugin(Plugin):
" /help 显示此帮助",
" /version 查看版本",
" /status 查看运行状态",
+ " /cancel 中止当前正在运行的 Agent 任务",
" /logs [N] 查看最近N条日志 (默认20)",
" /context 查看当前对话上下文信息",
" /context clear 清除当前对话上下文",
@@ -250,6 +254,41 @@ class CowCliPlugin(Plugin):
def _cmd_version(self, args: str, e_context, **_) -> str:
return f"CowAgent v{__version__}"
+ # ------------------------------------------------------------------
+ # cancel — abort the in-flight agent run for the current session.
+ # Fallback handler; in practice chat_channel/web_channel intercept
+ # /cancel earlier so it bypasses the per-session serial queue.
+ # ------------------------------------------------------------------
+
+ def _cmd_cancel(self, args: str, e_context: EventContext, session_id: str = "", **_) -> str:
+ """Signal the running agent to halt at its next checkpoint."""
+ from agent.protocol import get_cancel_registry
+
+ target_session = self._get_session_id(e_context, fallback=session_id)
+ registry = get_cancel_registry()
+
+ # Prefer per-turn request_id (matches the key agent_bridge registered)
+ cancelled = 0
+ request_id = ""
+ if e_context is not None:
+ try:
+ ctx = e_context["context"]
+ request_id = ctx.kwargs.get("request_id") or ctx.get("request_id", "")
+ except Exception:
+ request_id = ""
+
+ if request_id and registry.cancel_request(request_id):
+ cancelled = 1
+
+ # Fall back to session-wide cancel
+ if cancelled == 0 and target_session:
+ cancelled = registry.cancel_session(target_session)
+
+ if cancelled <= 0:
+ return "当前没有可中止的任务。"
+
+ return "🛑 已中止"
+
# ------------------------------------------------------------------
# status
# ------------------------------------------------------------------