feat(agent): support user-initiated cancel for in-flight agent runs

This commit is contained in:
zhayujie
2026-05-26 23:36:09 +08:00
parent ad2db1a776
commit 8d67177a1b
19 changed files with 691 additions and 22 deletions

View File

@@ -3,6 +3,11 @@ from .agent_stream import AgentStreamExecutor
from .task import Task, TaskType, TaskStatus
from .result import AgentResult, AgentAction, AgentActionType, ToolResult
from .models import LLMModel, LLMRequest, ModelFactory
from .cancel import (
AgentCancelledError,
CancelTokenRegistry,
get_cancel_registry,
)
__all__ = [
'Agent',
@@ -16,5 +21,8 @@ __all__ = [
'ToolResult',
'LLMModel',
'LLMRequest',
'ModelFactory'
'ModelFactory',
'AgentCancelledError',
'CancelTokenRegistry',
'get_cancel_registry',
]

View File

@@ -365,7 +365,8 @@ class Agent:
return action
def run_stream(self, user_message: str, on_event=None, clear_history: bool = False, skill_filter=None) -> str:
def run_stream(self, user_message: str, on_event=None, clear_history: bool = False,
skill_filter=None, cancel_event=None) -> str:
"""
Execute single agent task with streaming (based on tool-call)
@@ -374,6 +375,7 @@ class Agent:
- Multi-turn reasoning based on tool-call
- Event callbacks
- Persistent conversation history across calls
- User-initiated cancellation via ``cancel_event``
Args:
user_message: User message
@@ -381,6 +383,11 @@ class Agent:
event = {"type": str, "timestamp": float, "data": dict}
clear_history: If True, clear conversation history before this call (default: False)
skill_filter: Optional list of skill names to include in this run
cancel_event: Optional threading.Event polled at agent checkpoints.
When set, the loop exits at the next safe point, injects a
"[Interrupted by user]" assistant note, and returns the
partial response. ``messages`` stays in a valid state
(tool_use/tool_result pairs preserved).
Returns:
Final response text
@@ -424,7 +431,8 @@ class Agent:
max_turns=self.max_steps,
on_event=on_event,
messages=messages_copy, # Pass copied message history
max_context_turns=max_context_turns
max_context_turns=max_context_turns,
cancel_event=cancel_event,
)
# Execute

View File

@@ -7,6 +7,7 @@ import json
import time
from typing import List, Dict, Any, Optional, Callable, Tuple
from agent.protocol.cancel import AgentCancelledError
from agent.protocol.models import LLMRequest, LLMModel
from agent.protocol.message_utils import sanitize_claude_messages, compress_turn_to_text_only
from agent.tools.base_tool import BaseTool, ToolResult
@@ -64,7 +65,8 @@ class AgentStreamExecutor:
max_turns: int = 50,
on_event: Optional[Callable] = None,
messages: Optional[List[Dict]] = None,
max_context_turns: int = 30
max_context_turns: int = 30,
cancel_event=None,
):
"""
Initialize stream executor
@@ -78,6 +80,10 @@ class AgentStreamExecutor:
on_event: Event callback function
messages: Optional existing message history (for persistent conversations)
max_context_turns: Maximum number of conversation turns to keep in context
cancel_event: Optional threading.Event used to signal user cancel.
Checked at every safe point (turn boundary, before tool execution,
during LLM streaming). When set, raises AgentCancelledError which
run_stream catches to gracefully wind down.
"""
self.agent = agent
self.model = model
@@ -87,6 +93,7 @@ class AgentStreamExecutor:
self.max_turns = max_turns
self.on_event = on_event
self.max_context_turns = max_context_turns
self.cancel_event = cancel_event
# Message history - use provided messages or create new list
self.messages = messages if messages is not None else []
@@ -97,6 +104,73 @@ class AgentStreamExecutor:
# Track files to send (populated by read tool)
self.files_to_send = [] # List of file metadata dicts
def _check_cancelled(self) -> None:
"""Raise AgentCancelledError if the user requested cancellation.
Called at safe points (turn start, between tool calls, between LLM
chunks). Cheap to call: just an Event.is_set() probe.
"""
if self.cancel_event is not None and self.cancel_event.is_set():
raise AgentCancelledError("agent cancelled by user")
def _handle_cancelled(self, partial_response: str) -> None:
"""Wind down ``self.messages`` after a user-initiated cancel.
The messages list may be in any of these states when we get here:
(a) Last message is an assistant message containing tool_use
blocks but the matching tool_result has not been appended yet.
(b) Last message is an assistant text-only reply (cancel happened
right before the next turn started).
(c) Last message is a user tool_result message and we cancelled
between turns.
For (a) we MUST synthesise tool_result blocks, otherwise the next
request will fail Claude/OpenAI's strict pairing validation. For
(b)/(c) the state is already valid and we just append a small
cancellation note so the user/LLM both see the boundary clearly.
"""
try:
# Step 1: close any orphaned tool_use in the trailing assistant
# message by injecting matching tool_result blocks.
if self.messages and isinstance(self.messages[-1], dict) \
and self.messages[-1].get("role") == "assistant":
last = self.messages[-1]
content = last.get("content")
if isinstance(content, list):
pending_tool_use_ids = [
block.get("id")
for block in content
if isinstance(block, dict) and block.get("type") == "tool_use"
]
pending_tool_use_ids = [tid for tid in pending_tool_use_ids if tid]
if pending_tool_use_ids:
tool_result_blocks = [
{
"type": "tool_result",
"tool_use_id": tid,
"content": "Cancelled by user before this tool finished.",
"is_error": True,
}
for tid in pending_tool_use_ids
]
self.messages.append({
"role": "user",
"content": tool_result_blocks,
})
logger.info(
f"[Agent] Injected {len(tool_result_blocks)} cancellation "
f"tool_result blocks to keep message history valid"
)
# Step 2: append a stable "interrupted" marker so the LLM sees a
# clear stop boundary on the next turn.
self.messages.append({
"role": "assistant",
"content": [{"type": "text", "text": "_(Cancelled by user)_"}],
})
except Exception as e:
logger.warning(f"[Agent] _handle_cancelled cleanup failed: {e}")
def _emit_event(self, event_type: str, data: dict = None):
"""Emit event"""
if self.on_event:
@@ -270,8 +344,13 @@ class AgentStreamExecutor:
final_response = ""
turn = 0
cancelled = False
try:
while turn < self.max_turns:
# Check at the very top of every turn so a cancel arriving
# between turns short-circuits cleanly.
self._check_cancelled()
turn += 1
logger.info(f"[Agent] 第 {turn}")
self._emit_event("turn_start", {"turn": turn})
@@ -375,6 +454,8 @@ class AgentStreamExecutor:
try:
for tool_call in tool_calls:
# Honour cancel between tool invocations within the same turn
self._check_cancelled()
result = self._execute_tool(tool_call)
tool_results.append(result)
@@ -557,6 +638,15 @@ class AgentStreamExecutor:
self.messages.pop(prompt_insert_idx)
logger.debug("[Agent] Removed injected max-steps prompt from message history")
except AgentCancelledError:
# User-initiated stop: wind down message history cleanly so the
# next turn is unaffected; channels emit a "cancelled" UI event.
cancelled = True
logger.info(f"[Agent] 🛑 已被用户中止 (第 {turn} 轮)")
self._handle_cancelled(final_response)
if not final_response or not final_response.strip():
final_response = "_(Cancelled)_"
except Exception as e:
logger.error(f"❌ Agent执行错误: {e}")
self._emit_event("error", {"error": str(e)})
@@ -564,8 +654,11 @@ class AgentStreamExecutor:
finally:
final_response = final_response.strip() if final_response else final_response
logger.info(f"[Agent] 🏁 完成 ({turn}轮)")
self._emit_event("agent_end", {"final_response": final_response})
if cancelled:
# Emit before agent_end so channels can mark UI as cancelled
self._emit_event("agent_cancelled", {"final_response": final_response})
logger.info(f"[Agent] 🏁 完成 ({turn}轮)" + (" [cancelled]" if cancelled else ""))
self._emit_event("agent_end", {"final_response": final_response, "cancelled": cancelled})
return final_response
@@ -644,7 +737,32 @@ class AgentStreamExecutor:
try:
stream = self.model.call_stream(request)
# Probe cancel every N chunks to bound reaction time without
# checking on every token.
_cancel_probe_counter = 0
_CANCEL_PROBE_EVERY = 8
for chunk in stream:
_cancel_probe_counter += 1
if _cancel_probe_counter >= _CANCEL_PROBE_EVERY:
_cancel_probe_counter = 0
if self.cancel_event is not None and self.cancel_event.is_set():
# Persist partial text only; tool_use args may be
# truncated mid-stream and would fail validation.
logger.info("[Agent] cancel detected mid-stream, aborting LLM call")
if full_content:
partial_msg = {
"role": "assistant",
"content": [{"type": "text", "text": full_content}],
}
self.messages.append(partial_msg)
self._emit_event("message_end", {
"content": full_content,
"tool_calls": [],
"cancelled": True,
})
raise AgentCancelledError("cancelled during LLM streaming")
# Check for errors
if isinstance(chunk, dict) and chunk.get("error"):
# Extract error message from nested structure
@@ -738,6 +856,10 @@ class AgentStreamExecutor:
elif isinstance(choice, dict) and choice.get("_gemini_raw_parts"):
gemini_raw_parts = choice["_gemini_raw_parts"]
except AgentCancelledError:
# Must propagate untouched; never treat as a retryable error.
raise
except Exception as e:
error_str = str(e)
error_str_lower = error_str.lower()

121
agent/protocol/cancel.py Normal file
View File

@@ -0,0 +1,121 @@
"""
Cancel token registry for aborting in-flight agent runs.
A user cancel (web Cancel button, /cancel command) sets a threading.Event
that the agent loop polls at safe checkpoints. Tokens are keyed by
request_id (preferred) and tracked under session_id as a fallback. Entries
are released after the run completes to keep the registry bounded.
No project deps — importable from any layer without circular imports.
"""
from __future__ import annotations
import threading
from typing import Dict, Optional
class AgentCancelledError(Exception):
"""Raised inside the agent loop when a stop has been requested.
The agent stream executor catches this, injects a "[Interrupted]" note
into the message history (preserving tool_use/tool_result integrity)
and returns a partial response to the caller.
"""
class _CancelEntry:
__slots__ = ("event", "session_id")
def __init__(self, session_id: Optional[str]):
self.event = threading.Event()
self.session_id = session_id
class CancelTokenRegistry:
"""In-process registry mapping request_id -> cancel Event.
Thread-safe. Singleton via module-level ``_registry``.
"""
def __init__(self):
self._lock = threading.Lock()
self._by_request: Dict[str, _CancelEntry] = {}
# session_id -> set of request_ids currently in flight (usually 1).
self._by_session: Dict[str, set] = {}
def register(self, request_id: str, session_id: Optional[str] = None) -> threading.Event:
"""Create (or return existing) cancel event for a request.
Returns the threading.Event the caller should poll via ``is_set()``.
"""
if not request_id:
return threading.Event()
with self._lock:
entry = self._by_request.get(request_id)
if entry is None:
entry = _CancelEntry(session_id)
self._by_request[request_id] = entry
if session_id:
self._by_session.setdefault(session_id, set()).add(request_id)
return entry.event
def get_event(self, request_id: str) -> Optional[threading.Event]:
if not request_id:
return None
with self._lock:
entry = self._by_request.get(request_id)
return entry.event if entry else None
def cancel_request(self, request_id: str) -> bool:
"""Trigger cancel for a specific request. Returns True when matched."""
if not request_id:
return False
with self._lock:
entry = self._by_request.get(request_id)
if entry is None:
return False
entry.event.set()
return True
def cancel_session(self, session_id: str) -> int:
"""Trigger cancel for every in-flight request of a session.
Returns the number of requests cancelled (0 when nothing was running).
"""
if not session_id:
return 0
with self._lock:
request_ids = list(self._by_session.get(session_id, ()))
entries = [self._by_request[r] for r in request_ids if r in self._by_request]
for entry in entries:
entry.event.set()
return len(entries)
def unregister(self, request_id: str) -> None:
"""Remove an entry once the agent run is done. Safe to call twice."""
if not request_id:
return
with self._lock:
entry = self._by_request.pop(request_id, None)
if entry and entry.session_id:
bucket = self._by_session.get(entry.session_id)
if bucket is not None:
bucket.discard(request_id)
if not bucket:
self._by_session.pop(entry.session_id, None)
def has_active(self, session_id: str) -> bool:
if not session_id:
return False
with self._lock:
bucket = self._by_session.get(session_id)
return bool(bucket)
_registry = CancelTokenRegistry()
def get_cancel_registry() -> CancelTokenRegistry:
"""Module-level accessor for the singleton registry."""
return _registry

View File

@@ -5,7 +5,7 @@ Agent Bridge - Integrates Agent system with existing COW bridge
import os
from typing import Optional, List
from agent.protocol import Agent, LLMModel, LLMRequest
from agent.protocol import Agent, LLMModel, LLMRequest, get_cancel_registry
from bridge.agent_event_handler import AgentEventHandler
from bridge.agent_initializer import AgentInitializer
from bridge.bridge import Bridge
@@ -399,10 +399,21 @@ class AgentBridge:
"""
session_id = None
agent = None
request_id = None
cancel_event = None
try:
# Extract session_id from context for user isolation
if context:
session_id = context.kwargs.get("session_id") or context.get("session_id")
request_id = context.kwargs.get("request_id") or context.get("request_id")
# Register a cancel token. Prefer per-turn request_id (web),
# fall back to session_id (IM channels). The Event is polled by
# AgentStreamExecutor at safe checkpoints.
registry = get_cancel_registry()
token_key = request_id or session_id
if token_key:
cancel_event = registry.register(token_key, session_id=session_id)
# Get agent for this session (will auto-initialize if needed)
agent = self.get_agent(session_id=session_id)
@@ -458,7 +469,8 @@ class AgentBridge:
response = agent.run_stream(
user_message=query,
on_event=event_handler.handle_event,
clear_history=clear_history
clear_history=clear_history,
cancel_event=cancel_event,
)
finally:
# Restore original tools
@@ -468,6 +480,13 @@ class AgentBridge:
# Log execution summary
event_handler.log_summary()
# Release cancel token; keep registry bounded.
if token_key:
try:
registry.unregister(token_key)
except Exception:
pass
# Persist new messages generated during this run
if session_id:
channel_type = (context.get("channel_type") or "") if context else ""
@@ -521,6 +540,12 @@ class AgentBridge:
logger.info(f"[AgentBridge] Cleared DB for session after error: {session_id}")
except Exception as db_err:
logger.warning(f"[AgentBridge] Failed to clear DB after error: {db_err}")
# Release cancel token on error path too (idempotent).
if cancel_event is not None and (request_id or session_id):
try:
get_cancel_registry().unregister(request_id or session_id)
except Exception:
pass
return Reply(ReplyType.ERROR, f"Agent error: {str(e)}")
def _schedule_mcp_hot_reload(self, agent):

View File

@@ -438,8 +438,21 @@ class ChatChannel(Channel):
return func
# Chat commands that must bypass the per-session serial queue,
# otherwise /cancel would queue behind the task it tries to cancel.
# Use /cancel (not /stop) to avoid colliding with `cow stop` CLI.
_BYPASS_QUEUE_COMMANDS = ("/cancel",)
def produce(self, context: Context):
session_id = context["session_id"]
# Fast path: /cancel must not enter the queue.
if context.type == ContextType.TEXT and context.content:
stripped = context.content.strip().lower()
if stripped in self._BYPASS_QUEUE_COMMANDS:
self._handle_cancel_command(context, session_id)
return
with self.lock:
if session_id not in self.sessions:
self.sessions[session_id] = [
@@ -451,6 +464,29 @@ class ChatChannel(Channel):
else:
self.sessions[session_id][0].put(context)
def _handle_cancel_command(self, context: Context, session_id: str) -> None:
"""Cancel any in-flight agent run for *session_id* and reply inline.
Runs synchronously on the caller's thread. Reply is sent through
_send_reply so plugins (e.g. logging) still observe it.
"""
try:
from agent.protocol import get_cancel_registry
from bridge.reply import Reply, ReplyType
cancelled = get_cancel_registry().cancel_session(session_id)
text = (
"🛑 已中止"
if cancelled > 0
else "当前没有可中止的任务。"
)
logger.info(
f"[chat_channel] /cancel fast-path: session={session_id}, cancelled={cancelled}"
)
self._send_reply(context, Reply(ReplyType.TEXT, text))
except Exception as e:
logger.warning(f"[chat_channel] /cancel fast-path failed: {e}")
# 消费者函数,单独线程,用于从消息队列中取出消息并处理
def consume(self):
while True:

View File

@@ -752,6 +752,9 @@ class FeiShuChanel(ChatChannel):
init_in_flight = [False]
# 一旦初始化失败就长期标记为 disabled本次回复不再尝试任何流式调用
disabled = [False]
# True after agent_cancelled: agent_end stops rewriting the card
# with stale final_response and just finalizes current content.
cancelled = [False]
lock = threading.Lock()
# ---- 异步推送队列 ----------------------------------------------------
@@ -1076,18 +1079,42 @@ class FeiShuChanel(ChatChannel):
message_id[0] = None
sequence[0] = 0
elif event_type == "agent_cancelled":
# Lock channel into "no-rewrite" mode: the subsequent
# agent_end's final_response is from the last *completed*
# turn (the user already saw it), so rewriting the card
# would duplicate it visually.
with lock:
cancelled[0] = True
elif event_type == "agent_end":
# 最终回复:用 final_response 覆盖当前流式卡片,然后关闭流式模式。
final_response = data.get("final_response", "")
if not final_response:
return
final_text = str(final_response)
# 标记 streamed 让 chat_channel 跳过 send()
context["feishu_streamed"] = True
with lock:
was_cancelled = cancelled[0]
has_card = card_id[0] is not None
init_busy = init_in_flight[0]
pending_text = current_text[0]
if was_cancelled:
# Cancelled path: finalize the in-flight card with
# partial output (or a short marker if empty); drop
# stale final_response to avoid duplicating last turn.
if has_card:
_drain_push_queue()
partial = (pending_text or "").rstrip()
final_text = partial or "_(已中止)_"
_stream_update_text(final_text)
_close_streaming_mode(final_text)
push_queue.put(None)
return
if not final_response:
return
final_text = str(final_response)
# 罕见情况agent_end 触发时还没创建过卡片(极快返回 / 没有
# message_update主动创建一张承载 final_text。

View File

@@ -445,7 +445,7 @@
bg-primary-400 text-white hover:bg-primary-500
disabled:bg-slate-300 dark:disabled:bg-slate-600
disabled:cursor-not-allowed cursor-pointer transition-colors duration-150"
disabled onclick="sendMessage()">
disabled>
<i class="fas fa-paper-plane text-sm"></i>
</button>
</div>

View File

@@ -1367,3 +1367,35 @@
text-align: right;
}
.voice-pill audio { display: none; }
/* Send button toggles into a Stop button while an SSE stream is in flight.
Match the look of the disabled send button (light grey block + white
glyph) so it reads as the same visual element, just paused/idle from
sending perspective and clickable to stop. */
#send-btn.send-btn-cancel {
background-color: rgb(203 213 225) !important; /* slate-300, == disabled send-btn */
color: white !important;
}
#send-btn.send-btn-cancel:hover {
background-color: rgb(148 163 184) !important; /* slate-400 */
}
#send-btn.send-btn-cancel:disabled {
background-color: rgb(226 232 240) !important; /* slate-200, while stop is in flight */
color: white !important;
cursor: progress;
}
.dark #send-btn.send-btn-cancel {
background-color: rgb(71 85 105) !important; /* slate-600, == dark disabled send-btn */
color: white !important;
}
.dark #send-btn.send-btn-cancel:hover {
background-color: rgb(100 116 139) !important; /* slate-500 */
}
.dark #send-btn.send-btn-cancel:disabled {
background-color: rgb(51 65 85) !important; /* slate-700 */
color: rgb(203 213 225) !important;
}
.agent-cancelled-tag {
font-style: italic;
}

View File

@@ -1016,7 +1016,60 @@ const inputHistory = [];
let historyIdx = -1;
let historySavedDraft = '';
// While an SSE stream is in flight, the send button morphs into a cancel
// button. Only one in-flight request is supported at a time.
let activeRequestId = null;
let sendBtnMode = 'send'; // 'send' | 'cancel'
function setSendBtnCancelMode(requestId) {
activeRequestId = requestId;
sendBtnMode = 'cancel';
sendBtn.disabled = false;
sendBtn.classList.add('send-btn-cancel');
sendBtn.title = (currentLang === 'zh' ? '中止' : 'Cancel');
sendBtn.innerHTML = '<i class="fas fa-stop text-sm"></i>';
}
function resetSendBtnSendMode() {
activeRequestId = null;
sendBtnMode = 'send';
sendBtn.classList.remove('send-btn-cancel');
sendBtn.title = '';
sendBtn.innerHTML = '<i class="fas fa-paper-plane text-sm"></i>';
updateSendBtnState();
}
function requestCancel() {
const reqId = activeRequestId;
if (!reqId) return;
fetch('/cancel', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ request_id: reqId, session_id: sessionId, lang: currentLang }),
}).catch(err => {
console.warn('[cancel] request failed', err);
});
// Optimistic UI lock so the click visibly registers before the SSE
// "cancelled" event arrives.
sendBtn.disabled = true;
sendBtn.title = (currentLang === 'zh' ? '已中止' : 'Cancelled');
}
// Button click is the only path to Cancel. Pressing Enter still calls
// sendMessage() so users can submit "/cancel" as a regular slash command.
sendBtn.addEventListener('click', () => {
if (sendBtnMode === 'cancel') {
requestCancel();
} else {
sendMessage();
}
});
function updateSendBtnState() {
if (sendBtnMode === 'cancel') {
// Don't downgrade a Cancel button on input edits.
return;
}
sendBtn.disabled = uploadingCount > 0 || (!chatInput.value.trim() && pendingAttachments.length === 0);
}
@@ -1264,6 +1317,7 @@ const SLASH_COMMANDS = [
{ cmd: '/knowledge on', desc: '开启知识库' },
{ cmd: '/knowledge off', desc: '关闭知识库' },
{ cmd: '/config', desc: '查看当前配置' },
{ cmd: '/cancel', desc: '中止当前正在运行的 Agent 任务' },
{ cmd: '/logs', desc: '查看最近日志' },
{ cmd: '/version', desc: '查看版本' },
];
@@ -1534,6 +1588,7 @@ function sendVoiceMessage(text, audioUrl) {
stream: true,
timestamp: timestamp.toISOString(),
is_voice: true,
lang: currentLang,
};
const MAX_RETRIES = 2;
@@ -1547,7 +1602,12 @@ function sendVoiceMessage(text, audioUrl) {
.then(r => r.json())
.then(data => {
if (data.status === 'success') {
if (data.stream) {
if (data.inline_reply) {
// Synchronous fast-path reply (e.g. /cancel); skip SSE.
loadingEl.remove();
addBotMessage(data.inline_reply, new Date());
} else if (data.stream) {
setSendBtnCancelMode(data.request_id);
startSSE(data.request_id, loadingEl, timestamp, titleInfo);
} else {
loadingContainers[data.request_id] = loadingEl;
@@ -1555,6 +1615,7 @@ function sendVoiceMessage(text, audioUrl) {
} else {
loadingEl.remove();
addBotMessage(t('error_send'), new Date());
resetSendBtnSendMode();
}
})
.catch(err => {
@@ -1591,6 +1652,10 @@ function addUserVoiceMessage(audioUrl, caption, timestamp) {
}
function sendMessage() {
// Do NOT branch on sendBtnMode here: Enter should always send (so
// typing "/cancel" submits normally). Cancel is wired only to the
// send button's pointer click — see send-btn listener above.
const text = chatInput.value.trim();
if (!text && pendingAttachments.length === 0) return;
@@ -1619,7 +1684,7 @@ function sendMessage() {
renderAttachmentPreview();
sendBtn.disabled = true;
const body = { session_id: sessionId, message: text, stream: true, timestamp: timestamp.toISOString() };
const body = { session_id: sessionId, message: text, stream: true, timestamp: timestamp.toISOString(), lang: currentLang };
if (attachments.length > 0) {
body.attachments = attachments.map(a => ({
file_path: a.file_path,
@@ -1641,7 +1706,13 @@ function sendMessage() {
.then(r => r.json())
.then(data => {
if (data.status === 'success') {
if (data.stream) {
if (data.inline_reply) {
// Channel handled synchronously (e.g. /cancel fast-path);
// render as a bot bubble and skip SSE entirely.
loadingEl.remove();
addBotMessage(data.inline_reply, new Date());
} else if (data.stream) {
setSendBtnCancelMode(data.request_id);
startSSE(data.request_id, loadingEl, timestamp, titleInfo);
} else {
loadingContainers[data.request_id] = loadingEl;
@@ -1649,12 +1720,14 @@ function sendMessage() {
} else {
loadingEl.remove();
addBotMessage(t('error_send'), new Date());
resetSendBtnSendMode();
}
})
.catch(err => {
if (err.name === 'AbortError') {
loadingEl.remove();
addBotMessage(t('error_timeout'), new Date());
resetSendBtnSendMode();
return;
}
if (attempt < MAX_RETRIES) {
@@ -1664,6 +1737,7 @@ function sendMessage() {
}
loadingEl.remove();
addBotMessage(t('error_send'), new Date());
resetSendBtnSendMode();
});
}
@@ -1919,14 +1993,33 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) {
stepsEl.appendChild(wrap);
scrollChatToBottom();
} else if (item.type === 'cancelled') {
// Agent acknowledged the stop; mark the bubble. A trailing
// "done" still arrives with the partial answer.
ensureBotEl();
if (currentReasoningEl) {
finalizeThinking(currentReasoningEl, reasoningStartTime, reasoningText);
currentReasoningEl = null;
reasoningText = '';
}
if (!botEl.querySelector('.agent-cancelled-tag')) {
const tag = document.createElement('div');
tag.className = 'agent-cancelled-tag text-xs text-amber-600 dark:text-amber-400 mt-1';
tag.textContent = (currentLang === 'zh') ? '已中止' : 'Cancelled';
stepsEl.appendChild(tag);
}
resetSendBtnSendMode();
} else if (item.type === 'done') {
// Don't close the stream yet: the backend keeps it open
// for a short tail to deliver async attachments such as
// TTS audio (`voice_attach`). It will close the stream on
// its own via onerror once the tail expires.
done = true;
resetSendBtnSendMode();
const finalText = item.content || accumulatedText;
const finalTextRaw = item.content || accumulatedText;
const finalText = localizeCancelMarker(finalTextRaw);
if (!botEl && finalText) {
if (loadingEl) { loadingEl.remove(); loadingEl = null; }
@@ -1934,7 +2027,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) {
} else if (botEl) {
contentEl.classList.remove('sse-streaming');
if (finalText) contentEl.innerHTML = renderMarkdown(finalText);
contentEl.dataset.rawMd = finalText || '';
contentEl.dataset.rawMd = finalTextRaw || '';
const copyBtn = botEl.querySelector('.copy-msg-btn');
if (copyBtn && finalText) copyBtn.style.display = '';
applyHighlighting(botEl);
@@ -1964,6 +2057,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) {
delete activeStreams[requestId];
if (loadingEl) { loadingEl.remove(); loadingEl = null; }
addBotMessage(t('error_send'), new Date());
resetSendBtnSendMode();
}
};
@@ -2000,6 +2094,7 @@ function startSSE(requestId, loadingEl, timestamp, titleInfo) {
applyHighlighting(botEl);
bindChatKnowledgeLinks(botEl);
}
resetSendBtnSendMode();
};
}
@@ -2238,13 +2333,23 @@ function _renderSentFileFromToolResult(step) {
`<i class="fas fa-file-download" style="color:#6b7280;"></i> ${escapeHtml(fileName)}</a></div>`;
}
// Cosmetic translator for cancel markers persisted in history.
// History keeps the English canonical form for the LLM; only display is localized.
function localizeCancelMarker(text) {
if (!text) return text;
if (currentLang !== 'zh') return text;
return text
.replace(/_\(Cancelled by user\)_/g, '_(用户已中止)_')
.replace(/_\(Cancelled\)_/g, '_(已中止)_');
}
function createBotMessageEl(content, timestamp, requestId, msg) {
const el = document.createElement('div');
el.className = 'flex gap-3 px-4 sm:px-6 py-3';
if (requestId) el.dataset.requestId = requestId;
let stepsHtml = '';
let displayContent = content;
let displayContent = localizeCancelMarker(content);
if (msg && msg.steps && msg.steps.length > 0) {
// New format: ordered steps with interleaved content

View File

@@ -93,6 +93,15 @@ def _require_auth():
json.dumps({"status": "error", "message": "Unauthorized"}))
# Localized text for /cancel system replies. Web is the only channel that
# honors a per-request `lang`; other channels reply in Chinese by default.
def _cancel_reply_text(cancelled: int, lang: str) -> str:
en = lang.startswith("en")
if cancelled > 0:
return "🛑 Cancelled." if en else "🛑 已中止"
return "Nothing to cancel." if en else "当前没有可中止的任务。"
def _get_upload_dir() -> str:
from common.utils import expand_path
ws_root = expand_path(conf().get("agent_workspace", "~/cow"))
@@ -437,6 +446,18 @@ class WebChannel(ChatChannel):
"timestamp": time.time(),
})
elif event_type == "agent_cancelled":
# Push an explicit cancelled SSE event so the frontend
# marks the bubble as stopped. A trailing "done" still
# arrives with the partial answer.
final_response = data.get("final_response", "")
q.put({
"type": "cancelled",
"content": final_response,
"request_id": request_id,
"timestamp": time.time(),
})
elif event_type == "agent_end":
# Safety net: if the agent finishes with an empty final_response,
# chat_channel skips _send_reply (because reply.content is empty),
@@ -756,6 +777,25 @@ class WebChannel(ChatChannel):
# desire_rtype concept used by other channels).
is_voice_input = bool(json_data.get('is_voice', False))
# Fast path for /cancel: bypass the session queue and SSE setup.
# Web frontend (stream=true) only listens to SSE, so we return an
# inline_reply payload to be rendered synchronously.
stripped_prompt = (prompt or "").strip().lower()
if stripped_prompt == "/cancel":
from agent.protocol import get_cancel_registry
cancelled = get_cancel_registry().cancel_session(session_id)
lang = (json_data.get('lang') or 'zh').lower()
msg_text = _cancel_reply_text(cancelled, lang)
logger.info(
f"[WebChannel] /cancel fast-path: session={session_id}, cancelled={cancelled}, lang={lang}"
)
return json.dumps({
"status": "success",
"request_id": "",
"stream": False,
"inline_reply": msg_text,
})
# Append file references to the prompt (same format as QQ channel)
if attachments:
file_refs = []
@@ -862,6 +902,11 @@ class WebChannel(ChatChannel):
if itype == "done":
post_done = True
post_deadline = time.time() + POST_DONE_TAIL_SECONDS
elif itype == "cancelled":
# Close SSE tail quickly after cancel; don't wait for the
# full TTS tail since the user already pressed Stop.
post_done = True
post_deadline = time.time() + 3
elif itype == "voice_attach":
# WSGI buffers the previous chunk until the next yield;
# shrink the tail so the generator wakes up quickly to
@@ -872,6 +917,59 @@ class WebChannel(ChatChannel):
finally:
self.sse_queues.pop(request_id, None)
def cancel_request(self):
"""
Cancel an in-flight agent run.
Body: {"request_id": "...", "session_id": "..."}
Either field is sufficient; request_id is preferred when known.
Always returns success even when nothing was running, so the
client's UX is idempotent.
"""
try:
from agent.protocol import get_cancel_registry
data = web.data()
try:
json_data = json.loads(data) if data else {}
except Exception:
json_data = {}
request_id = (json_data.get("request_id") or "").strip()
session_id = (json_data.get("session_id") or "").strip()
lang = (json_data.get("lang") or "zh").lower()
registry = get_cancel_registry()
cancelled = 0
if request_id:
if registry.cancel_request(request_id):
cancelled = 1
if cancelled == 0 and session_id:
cancelled = registry.cancel_session(session_id)
if request_id and request_id in self.sse_queues:
self.sse_queues[request_id].put({
"type": "cancelled",
"content": "Cancelled" if lang.startswith("en") else "已中止",
"request_id": request_id,
"timestamp": time.time(),
})
logger.info(
f"[WebChannel] cancel request: request_id={request_id!r}, "
f"session_id={session_id!r}, cancelled={cancelled}"
)
return json.dumps({
"status": "success",
"cancelled": cancelled,
})
except Exception as e:
logger.error(f"[WebChannel] cancel_request error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def poll_response(self):
"""
Poll for responses using the session_id.
@@ -967,6 +1065,7 @@ class WebChannel(ChatChannel):
'/api/voice/tts', 'VoiceTtsHandler',
'/poll', 'PollHandler',
'/stream', 'StreamHandler',
'/cancel', 'CancelHandler',
'/chat', 'ChatHandler',
'/config', 'ConfigHandler',
'/api/models', 'ModelsHandler',
@@ -1240,6 +1339,12 @@ class PollHandler:
return WebChannel().poll_response()
class CancelHandler:
def POST(self):
_require_auth()
return WebChannel().cancel_request()
class StreamHandler:
def GET(self):
_require_auth()

View File

@@ -440,6 +440,17 @@ class WecomBotChannel(ChatChannel):
state["current"] = ""
_push_stream(state, force=True)
elif event_type == "agent_cancelled":
# Flush partial output and strip trailing "---" separator
# left over from previous turn, to avoid a dangling divider.
if state["current"]:
state["committed"] += state["current"]
state["current"] = ""
state["committed"] = state["committed"].rstrip()
if state["committed"].endswith("---"):
state["committed"] = state["committed"][:-3].rstrip()
_push_stream(state, force=True)
return on_event
# ------------------------------------------------------------------

View File

@@ -39,6 +39,14 @@ Mode: agent
Session: 12 messages | 8 skills loaded
```
## cancel
中止当前会话正在运行的 Agent 任务。在 Agent 执行长时间任务(例如多轮工具调用、长流式输出)时,可随时发送 `/cancel`Agent 会在下一次工具执行前停止。Web 端、微信、企业微信、飞书等各通道均可使用。
```text
/cancel
```
## config
查看或修改运行时配置。修改后立即生效,无需重启服务。

View File

@@ -57,6 +57,7 @@ Others:
| --- | --- |
| `/help` | 显示命令帮助 |
| `/status` | 查看服务状态和配置 |
| `/cancel` | 中止当前正在运行的 Agent 任务 |
| `/config` | 查看或修改运行时配置 |
| `/skill` | 管理技能(安装、卸载、启用、禁用等) |
| `/memory dream [N]` | 手动触发记忆蒸馏(默认 3 天,最大 30 |
@@ -82,6 +83,7 @@ Others:
| version | ✓ | ✓ |
| status | ✓ | ✓ |
| logs | ✓ | ✓ |
| cancel | ✗ | ✓ |
| config | ✗ | ✓ |
| context | — | ✓ |
| memory (子命令) | ✗ | ✓ |

View File

@@ -25,6 +25,14 @@ View current session and service status, including process info, model configura
/status
```
## cancel
Abort the agent task currently running in this session. When the agent is busy with a long task (e.g. multi-turn tool calls or a long streaming response), send `/cancel` and the agent will stop before the next tool execution. Available across all channels — Web, WeChat, WeCom, Feishu, etc.
```text
/cancel
```
## config
View or modify runtime configuration. Changes take effect immediately without restarting.

View File

@@ -57,6 +57,7 @@ In the Web console or any connected channel, type `/` to see command suggestions
| --- | --- |
| `/help` | Show command help |
| `/status` | View service status and configuration |
| `/cancel` | Abort the currently running agent task |
| `/config` | View or modify runtime configuration |
| `/skill` | Manage skills (install, uninstall, enable, disable, etc.) |
| `/memory dream [N]` | Manually trigger memory distillation (default 3 days, max 30) |
@@ -80,6 +81,7 @@ In the Web console or any connected channel, type `/` to see command suggestions
| version | ✓ | ✓ |
| status | ✓ | ✓ |
| logs | ✓ | ✓ |
| cancel | ✗ | ✓ |
| config | ✗ | ✓ |
| context | — | ✓ |
| memory (subcommands) | ✗ | ✓ |

View File

@@ -25,6 +25,14 @@ description: ステータスの確認、設定管理、コンテキスト制御
/status
```
## cancel
現在のセッションで実行中の Agent タスクを中止します。Agent が長時間のタスク(マルチターンのツール呼び出しや長いストリーミング応答など)を実行している間、`/cancel` を送信すると、次のツール実行の前に停止します。Web、WeChat、企業微信、Feishu など、すべてのチャネルで利用可能です。
```text
/cancel
```
## config
実行時設定の表示または変更を行います。変更は即座に反映され、再起動は不要です。

View File

@@ -57,6 +57,7 @@ Web コンソールや接続されたチャネルの会話で `/` を入力す
| --- | --- |
| `/help` | コマンドヘルプを表示 |
| `/status` | サービスの状態と設定を表示 |
| `/cancel` | 実行中の Agent タスクを中止 |
| `/config` | 実行時設定の表示・変更 |
| `/skill` | スキル管理(インストール、アンインストール、有効化、無効化など) |
| `/memory dream [N]` | 記憶蒸留を手動トリガー(デフォルト 3 日、最大 30 |
@@ -80,6 +81,7 @@ Web コンソールや接続されたチャネルの会話で `/` を入力す
| version | ✓ | ✓ |
| status | ✓ | ✓ |
| logs | ✓ | ✓ |
| cancel | ✗ | ✓ |
| config | ✗ | ✓ |
| context | — | ✓ |
| memoryサブコマンド | ✗ | ✓ |

View File

@@ -26,16 +26,19 @@ from common.log import logger
from cli import __version__
# Known top-level subcommands that cow supports
# Known top-level subcommands that cow supports.
# "start" / "stop" / "restart" refer to daemon lifecycle on the host shell;
# in chat, "/cancel" aborts the in-flight agent run instead.
KNOWN_COMMANDS = {
"help", "version", "status", "logs",
"start", "stop", "restart",
"cancel",
"skill", "context", "config",
"knowledge", "memory",
"install-browser",
}
# Commands that can only run from the CLI (terminal), not in chat
# Commands that can only run from the CLI (terminal), not in chat.
CLI_ONLY_COMMANDS = {"start", "stop", "restart"}
# Commands that can only run from chat (need access to in-process memory)
@@ -225,6 +228,7 @@ class CowCliPlugin(Plugin):
" /help 显示此帮助",
" /version 查看版本",
" /status 查看运行状态",
" /cancel 中止当前正在运行的 Agent 任务",
" /logs [N] 查看最近N条日志 (默认20)",
" /context 查看当前对话上下文信息",
" /context clear 清除当前对话上下文",
@@ -250,6 +254,41 @@ class CowCliPlugin(Plugin):
def _cmd_version(self, args: str, e_context, **_) -> str:
return f"CowAgent v{__version__}"
# ------------------------------------------------------------------
# cancel — abort the in-flight agent run for the current session.
# Fallback handler; in practice chat_channel/web_channel intercept
# /cancel earlier so it bypasses the per-session serial queue.
# ------------------------------------------------------------------
def _cmd_cancel(self, args: str, e_context: EventContext, session_id: str = "", **_) -> str:
"""Signal the running agent to halt at its next checkpoint."""
from agent.protocol import get_cancel_registry
target_session = self._get_session_id(e_context, fallback=session_id)
registry = get_cancel_registry()
# Prefer per-turn request_id (matches the key agent_bridge registered)
cancelled = 0
request_id = ""
if e_context is not None:
try:
ctx = e_context["context"]
request_id = ctx.kwargs.get("request_id") or ctx.get("request_id", "")
except Exception:
request_id = ""
if request_id and registry.cancel_request(request_id):
cancelled = 1
# Fall back to session-wide cancel
if cancelled == 0 and target_session:
cancelled = registry.cancel_session(target_session)
if cancelled <= 0:
return "当前没有可中止的任务。"
return "🛑 已中止"
# ------------------------------------------------------------------
# status
# ------------------------------------------------------------------