Files
chatgpt-on-wechat/channel/web/web_channel.py
zhayujie fff7326209 feat(memory): hot-swap embedding provider on rebuild-index
Switching embedding provider in the web console no longer requires a
restart and no longer drops the running conversation
2026-05-20 21:32:53 +08:00

3052 lines
130 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import hashlib
import hmac
import time
import json
import logging
import mimetypes
import os
import threading
import time
import uuid
from queue import Queue, Empty
from typing import List, Tuple
import web
from bridge.context import *
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel, check_prefix
from channel.chat_message import ChatMessage
from collections import OrderedDict
from common import const
from common.log import logger
from common.singleton import singleton
from config import conf
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
VIDEO_EXTENSIONS = {".mp4", ".webm", ".avi", ".mov", ".mkv"}
def _is_password_enabled():
return bool(conf().get("web_password", ""))
def _session_expire_seconds():
return int(conf().get("web_session_expire_days", 30)) * 86400
def _create_auth_token():
"""Create a stateless signed token: ``<timestamp_hex>.<hmac_hex>``."""
ts = format(int(time.time()), "x")
sig = hmac.new(
conf().get("web_password", "").encode(),
ts.encode(),
hashlib.sha256,
).hexdigest()
return f"{ts}.{sig}"
def _verify_auth_token(token):
"""Verify a signed token is valid and not expired.
The token is derived from the password, so it survives server restarts
and automatically invalidates when the password changes.
"""
if not token or "." not in token:
return False
ts_hex, sig = token.split(".", 1)
try:
ts = int(ts_hex, 16)
except ValueError:
return False
if time.time() - ts > _session_expire_seconds():
return False
expected = hmac.new(
conf().get("web_password", "").encode(),
ts_hex.encode(),
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(sig, expected)
def _check_auth():
"""Return True if request is authenticated or password not enabled."""
if not _is_password_enabled():
return True
return _verify_auth_token(web.cookies().get("cow_auth_token", ""))
def _require_auth():
"""Raise 401 if not authenticated. Call at the top of protected handlers."""
if not _check_auth():
raise web.HTTPError("401 Unauthorized",
{"Content-Type": "application/json; charset=utf-8"},
json.dumps({"status": "error", "message": "Unauthorized"}))
def _get_upload_dir() -> str:
from common.utils import expand_path
ws_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(ws_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
return tmp_dir
def _sanitize_upload_relative_path(relative_path: str) -> str:
"""Normalize relative upload path and reject escapes / absolute paths."""
relative_path = (relative_path or "").replace("\\", "/").strip("/")
if not relative_path:
raise ValueError("Empty relative path")
parts = []
for part in relative_path.split("/"):
if part in ("", "."):
continue
if part == "..":
raise ValueError("Invalid relative path")
parts.append(part)
if not parts:
raise ValueError("Invalid relative path")
norm_path = "/".join(parts)
if os.path.isabs(norm_path):
raise ValueError("Invalid relative path")
return norm_path
def _sanitize_upload_id(upload_id: str) -> str:
"""Allow only simple batch ids for directory uploads."""
sanitized = "".join(ch for ch in (upload_id or "") if ch.isalnum() or ch in ("-", "_"))
if not sanitized:
raise ValueError("Invalid upload id")
return sanitized[:80]
def _is_within_directory(root_path: str, target_path: str) -> bool:
try:
return os.path.commonpath([root_path, target_path]) == root_path
except ValueError:
return False
def _resolve_upload_path(upload_root: str, relative_path: str) -> Tuple[str, str]:
"""Resolve a relative upload path under upload_root and reject escapes."""
safe_rel_path = _sanitize_upload_relative_path(relative_path)
upload_root_real = os.path.realpath(upload_root)
save_path = os.path.realpath(os.path.join(upload_root_real, *safe_rel_path.split("/")))
if not _is_within_directory(upload_root_real, save_path):
raise ValueError("Invalid directory upload path")
return safe_rel_path, save_path
def _read_uploaded_file_bytes(file_obj) -> bytes:
"""Return uploaded content as bytes across web.py upload object variants."""
if isinstance(file_obj, bytes):
return file_obj
if isinstance(file_obj, str):
return file_obj.encode("utf-8")
content = None
if hasattr(file_obj, "file") and hasattr(file_obj.file, "read"):
content = file_obj.file.read()
elif hasattr(file_obj, "read"):
content = file_obj.read()
elif hasattr(file_obj, "value"):
content = file_obj.value
if content is None:
raise ValueError("Unable to read uploaded file content")
if isinstance(content, bytes):
return content
if isinstance(content, str):
return content.encode("utf-8")
raise TypeError(f"Unsupported uploaded content type: {type(content).__name__}")
def _raw_web_input():
"""Return unprocessed multipart form data when web.py exposes rawinput."""
rawinput = getattr(getattr(web, "webapi", None), "rawinput", None)
if not callable(rawinput):
raise RuntimeError("web.py rawinput is not available")
try:
return rawinput(method="post")
except TypeError:
return rawinput()
def _ensure_list(value):
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def _generate_session_title(user_message: str, assistant_reply: str = "") -> str:
"""Delegate to the shared SessionService implementation."""
from agent.chat.session_service import generate_session_title
return generate_session_title(user_message, assistant_reply)
class WebMessage(ChatMessage):
def __init__(
self,
msg_id,
content,
ctype=ContextType.TEXT,
from_user_id="User",
to_user_id="Chatgpt",
other_user_id="Chatgpt",
):
self.msg_id = msg_id
self.ctype = ctype
self.content = content
self.from_user_id = from_user_id
self.to_user_id = to_user_id
self.other_user_id = other_user_id
@singleton
class WebChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE]
_instance = None
# def __new__(cls):
# if cls._instance is None:
# cls._instance = super(WebChannel, cls).__new__(cls)
# return cls._instance
def __init__(self):
super().__init__()
self.msg_id_counter = 0
self.session_queues = {} # session_id -> Queue (fallback polling)
self.request_to_session = {} # request_id -> session_id
self.sse_queues = {} # request_id -> Queue (SSE streaming)
self._http_server = None
def _generate_msg_id(self):
"""生成唯一的消息ID"""
self.msg_id_counter += 1
return str(int(time.time())) + str(self.msg_id_counter)
def _generate_request_id(self):
"""生成唯一的请求ID"""
return str(uuid.uuid4())
def send(self, reply: Reply, context: Context):
try:
if reply.type in self.NOT_SUPPORT_REPLYTYPE:
logger.warning(f"Web channel doesn't support {reply.type} yet")
return
if reply.type == ReplyType.IMAGE_URL:
time.sleep(0.5)
request_id = context.get("request_id", None)
if not request_id:
logger.error("No request_id found in context, cannot send message")
return
session_id = self.request_to_session.get(request_id)
if not session_id:
logger.error(f"No session_id found for request {request_id}")
return
# SSE mode: push events to SSE queue
if request_id in self.sse_queues:
content = reply.content if reply.content is not None else ""
# Intermediate status lines (e.g. /install-browser phases) must NOT use "done",
# or the frontend closes EventSource and drops subsequent events.
if getattr(reply, "sse_phase", False):
self.sse_queues[request_id].put({
"type": "phase",
"content": content,
"request_id": request_id,
"timestamp": time.time(),
})
logger.debug(f"SSE phase for request {request_id}")
return
# Files are already pushed via on_event (file_to_send) during agent execution.
# Skip duplicate file pushes here; just let the done event through.
if reply.type in (ReplyType.IMAGE_URL, ReplyType.FILE) and content.startswith("file://"):
text_content = getattr(reply, 'text_content', '')
if text_content:
self.sse_queues[request_id].put({
"type": "done",
"content": text_content,
"request_id": request_id,
"timestamp": time.time()
})
logger.debug(f"SSE skipped duplicate file for request {request_id}")
return
# Skip http-URL FILE/IMAGE_URL replies produced by chat_channel's media extraction:
# the text reply (already sent as "done") contains the URL and the frontend will
# render it via renderMarkdown/injectVideoPlayers, so no separate SSE event needed.
if reply.type in (ReplyType.FILE, ReplyType.IMAGE_URL) and content.startswith(("http://", "https://")):
logger.debug(f"SSE skipped http media reply for request {request_id}")
return
self.sse_queues[request_id].put({
"type": "done",
"content": content,
"request_id": request_id,
"timestamp": time.time()
})
logger.debug(f"SSE done sent for request {request_id}")
return
# Fallback: polling mode
if session_id in self.session_queues:
content = reply.content if reply.content is not None else ""
# Skip file:// IMAGE_URL/FILE replies originating from an SSE-enabled
# request: they were already pushed via the `file_to_send` event during
# agent execution. By the time the chat_channel sends the IMAGE_URL reply,
# the SSE stream has typically closed (after the text "done") and the
# request_id is gone from sse_queues, so we'd otherwise duplicate the file
# as a polling bubble. Scheduler/push tasks have no on_event and must
# still go through polling normally.
if (
reply.type in (ReplyType.IMAGE_URL, ReplyType.FILE)
and content.startswith("file://")
and context.get("on_event") is not None
):
logger.debug(f"Polling skipped duplicate file reply for session {session_id}")
return
response_data = {
"type": str(reply.type),
"content": content,
"timestamp": time.time(),
"request_id": request_id
}
self.session_queues[session_id].put(response_data)
logger.debug(f"Response sent to poll queue for session {session_id}, request {request_id}")
else:
logger.warning(f"No response queue found for session {session_id}, response dropped")
except Exception as e:
logger.error(f"Error in send method: {e}")
def _make_sse_callback(self, request_id: str):
"""Build an on_event callback that pushes agent stream events into the SSE queue."""
# Cap reasoning bytes pushed to the frontend per request to avoid
# browser stalls / crashes on very long chains-of-thought. Anything
# beyond the cap is dropped from the stream (DB still persists a
# truncated copy via _truncate_reasoning_for_storage).
# Keep aligned with frontend REASONING_RENDER_CAP and backend
# MAX_STORED_REASONING_CHARS.
MAX_REASONING_STREAM_CHARS = 4 * 1024 # 4 KB
# Use a single-element list as a mutable counter accessible from closure.
reasoning_chars_sent = [0]
reasoning_capped_notified = [False]
def on_event(event: dict):
if request_id not in self.sse_queues:
return
q = self.sse_queues[request_id]
event_type = event.get("type")
data = event.get("data", {})
if event_type == "reasoning_update":
delta = data.get("delta", "")
if not delta:
return
remaining = MAX_REASONING_STREAM_CHARS - reasoning_chars_sent[0]
if remaining <= 0:
if not reasoning_capped_notified[0]:
reasoning_capped_notified[0] = True
q.put({
"type": "reasoning",
"content": "\n\n... [reasoning truncated for display] ...",
})
return
if len(delta) > remaining:
delta = delta[:remaining]
reasoning_chars_sent[0] += len(delta)
q.put({"type": "reasoning", "content": delta})
elif event_type == "message_update":
delta = data.get("delta", "")
if delta:
q.put({"type": "delta", "content": delta})
elif event_type == "tool_execution_start":
tool_name = data.get("tool_name", "tool")
arguments = data.get("arguments", {})
q.put({"type": "tool_start", "tool": tool_name, "arguments": arguments})
elif event_type == "tool_execution_end":
tool_name = data.get("tool_name", "tool")
status = data.get("status", "success")
result = data.get("result", "")
exec_time = data.get("execution_time", 0)
# Truncate long results to avoid huge SSE payloads
result_str = str(result)
if len(result_str) > 2000:
result_str = result_str[:2000] + ""
q.put({
"type": "tool_end",
"tool": tool_name,
"status": status,
"result": result_str,
"execution_time": round(exec_time, 2)
})
elif event_type == "message_end":
tool_calls = data.get("tool_calls", [])
if tool_calls:
q.put({"type": "message_end", "has_tool_calls": True})
elif event_type == "agent_end":
# Safety net: if the agent finishes with an empty final_response,
# chat_channel skips _send_reply (because reply.content is empty),
# which means no "done" event is ever emitted and the SSE stream
# would hang until the 10-min idle timeout. Push a fallback "done"
# here so the frontend always gets closure.
final_response = data.get("final_response", "")
if not final_response or not str(final_response).strip():
logger.warning(
f"[WebChannel] agent_end with empty final_response for "
f"request {request_id}, sending fallback done"
)
q.put({
"type": "done",
"content": "(模型未返回任何内容,请重试或换一种方式描述你的需求)",
"request_id": request_id,
"timestamp": time.time(),
})
elif event_type == "file_to_send":
file_path = data.get("path", "")
file_name = data.get("file_name", os.path.basename(file_path))
file_type = data.get("file_type", "file")
from urllib.parse import quote
web_url = f"/api/file?path={quote(file_path)}"
is_image = file_type == "image"
q.put({
"type": "image" if is_image else "file",
"content": web_url,
"file_name": file_name,
})
return on_event
def upload_file(self):
"""Handle file or directory upload via multipart/form-data."""
try:
params = _raw_web_input()
file_obj = params.get("file")
file_objs = params.get("files")
session_id = params.get("session_id", "")
relative_path = params.get("relative_path", "")
relative_paths = params.get("relative_paths")
upload_id = params.get("upload_id", "")
directory_files = _ensure_list(file_objs)
# NOTE: cgi.FieldStorage raises TypeError on truthy checks for single-file
# uploads (Python 3.9+). Always use `is not None` instead of `if file_obj`.
if not directory_files and file_obj is not None and relative_path:
directory_files = [file_obj]
directory_rel_paths = _ensure_list(relative_paths)
if not directory_rel_paths and relative_path:
directory_rel_paths = [relative_path]
is_directory_upload = bool(directory_files) or bool(directory_rel_paths) or bool(relative_path) or bool(upload_id)
upload_dir = _get_upload_dir()
if is_directory_upload:
if not upload_id:
return json.dumps({"status": "error", "message": "Missing upload_id for directory upload"})
if not directory_files:
return json.dumps({"status": "error", "message": "No files uploaded"})
if len(directory_files) != len(directory_rel_paths):
return json.dumps({"status": "error", "message": "Directory upload payload mismatch"})
safe_upload_id = _sanitize_upload_id(upload_id)
upload_root = os.path.join(upload_dir, f"webdir_{safe_upload_id}")
upload_root_real = os.path.realpath(upload_root)
root_name = None
saved_files = 0
for file_obj, rel_path in zip(directory_files, directory_rel_paths):
if file_obj is None:
raise ValueError("Invalid uploaded file")
safe_rel_path, save_path = _resolve_upload_path(upload_root_real, rel_path)
current_root_name = safe_rel_path.split("/", 1)[0]
if root_name is None:
root_name = current_root_name
elif root_name != current_root_name:
raise ValueError("Directory upload must use a single root folder")
os.makedirs(os.path.dirname(save_path), exist_ok=True)
content_bytes = _read_uploaded_file_bytes(file_obj)
with open(save_path, "wb") as f:
f.write(content_bytes)
saved_files += 1
if not root_name:
raise ValueError("Directory root path missing")
root_path = os.path.realpath(os.path.join(upload_root_real, root_name))
if not _is_within_directory(upload_root_real, root_path):
raise ValueError("Invalid directory upload path")
logger.info(f"[WebChannel] Directory uploaded: {root_name} -> {root_path} ({saved_files} files)")
return json.dumps({
"status": "success",
"file_path": root_path,
"file_name": root_name,
"file_type": "directory",
"file_count": saved_files,
"root_path": root_path,
"root_name": root_name,
"upload_type": "directory",
}, ensure_ascii=False)
if file_obj is None or not hasattr(file_obj, "filename") or not file_obj.filename:
return json.dumps({"status": "error", "message": "No file uploaded"})
original_name = file_obj.filename
ext = os.path.splitext(original_name)[1].lower()
safe_name = f"web_{uuid.uuid4().hex[:8]}{ext}"
save_path = os.path.join(upload_dir, safe_name)
public_path = safe_name
display_name = original_name
content_bytes = _read_uploaded_file_bytes(file_obj)
with open(save_path, "wb") as f:
f.write(content_bytes)
if ext in IMAGE_EXTENSIONS:
file_type = "image"
elif ext in VIDEO_EXTENSIONS:
file_type = "video"
else:
file_type = "file"
from urllib.parse import quote
preview_url = f"/uploads/{quote(public_path, safe='/')}"
logger.info(f"[WebChannel] File uploaded: {original_name} -> {save_path} ({file_type})")
return json.dumps({
"status": "success",
"file_path": save_path,
"file_name": display_name,
"file_type": file_type,
"preview_url": preview_url,
}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] File upload error: {e}", exc_info=True)
return json.dumps({"status": "error", "message": str(e)})
def post_message(self):
"""
Handle incoming messages from users via POST request.
Returns a request_id for tracking this specific request.
Supports optional attachments (file paths from /upload).
"""
try:
data = web.data()
json_data = json.loads(data)
session_id = json_data.get('session_id', f'session_{int(time.time())}')
prompt = json_data.get('message', '')
use_sse = json_data.get('stream', True)
attachments = json_data.get('attachments', [])
# Append file references to the prompt (same format as QQ channel)
if attachments:
file_refs = []
for att in attachments:
ftype = att.get("file_type", "file")
fpath = att.get("file_path", "")
if not fpath:
continue
if ftype == "image":
file_refs.append(f"[图片: {fpath}]")
elif ftype == "video":
file_refs.append(f"[视频: {fpath}]")
elif ftype == "directory":
file_refs.append(f"[目录: {fpath}]")
else:
file_refs.append(f"[文件: {fpath}]")
if file_refs:
prompt = prompt + "\n" + "\n".join(file_refs)
logger.info(f"[WebChannel] Attached {len(file_refs)} file(s) to message")
request_id = self._generate_request_id()
self.request_to_session[request_id] = session_id
if session_id not in self.session_queues:
self.session_queues[session_id] = Queue()
if use_sse:
self.sse_queues[request_id] = Queue()
trigger_prefixs = conf().get("single_chat_prefix", [""])
if check_prefix(prompt, trigger_prefixs) is None:
if trigger_prefixs:
prompt = trigger_prefixs[0] + prompt
logger.debug(f"[WebChannel] Added prefix to message: {prompt}")
msg = WebMessage(self._generate_msg_id(), prompt)
msg.from_user_id = session_id
context = self._compose_context(ContextType.TEXT, prompt, msg=msg, isgroup=False)
if context is None:
logger.warning(f"[WebChannel] Context is None for session {session_id}, message may be filtered")
if request_id in self.sse_queues:
del self.sse_queues[request_id]
return json.dumps({"status": "error", "message": "Message was filtered"})
context["session_id"] = session_id
context["receiver"] = session_id
context["request_id"] = request_id
if use_sse:
context["on_event"] = self._make_sse_callback(request_id)
threading.Thread(target=self.produce, args=(context,)).start()
return json.dumps({"status": "success", "request_id": request_id, "stream": use_sse})
except Exception as e:
logger.error(f"Error processing message: {e}")
return json.dumps({"status": "error", "message": str(e)})
def stream_response(self, request_id: str):
"""
SSE generator for a given request_id.
Yields UTF-8 encoded bytes to avoid WSGI Latin-1 mangling.
Supports client reconnection: the queue is only removed after a
"done" event is consumed, so a new GET /stream with the same
request_id can resume reading remaining events.
"""
if request_id not in self.sse_queues:
yield b"data: {\"type\": \"error\", \"message\": \"invalid request_id\"}\n\n"
return
q = self.sse_queues[request_id]
idle_timeout = 600 # 10 minutes without any real event
deadline = time.time() + idle_timeout
done = False
try:
while time.time() < deadline:
try:
item = q.get(timeout=1)
except Empty:
yield b": keepalive\n\n"
continue
# Real event received, reset idle deadline
deadline = time.time() + idle_timeout
payload = json.dumps(item, ensure_ascii=False)
yield f"data: {payload}\n\n".encode("utf-8")
if item.get("type") == "done":
done = True
break
finally:
if done:
self.sse_queues.pop(request_id, None)
def poll_response(self):
"""
Poll for responses using the session_id.
"""
try:
data = web.data()
json_data = json.loads(data)
session_id = json_data.get('session_id')
if not session_id or session_id not in self.session_queues:
return json.dumps({"status": "error", "message": "Invalid session ID"})
# 尝试从队列获取响应,不等待
try:
# 使用peek而不是get这样如果前端没有成功处理下次还能获取到
response = self.session_queues[session_id].get(block=False)
# 返回响应包含请求ID以区分不同请求
return json.dumps({
"status": "success",
"has_content": True,
"content": response["content"],
"request_id": response["request_id"],
"timestamp": response["timestamp"]
})
except Empty:
# 没有新响应
return json.dumps({"status": "success", "has_content": False})
except Exception as e:
logger.error(f"Error polling response: {e}")
return json.dumps({"status": "error", "message": str(e)})
def chat_page(self):
"""Serve the chat HTML page."""
file_path = os.path.join(os.path.dirname(__file__), 'chat.html') # 使用绝对路径
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def startup(self):
configured_host = conf().get("web_host", "")
host = configured_host or ("0.0.0.0" if _is_password_enabled() else "127.0.0.1")
port = conf().get("web_port", 9899)
is_public_bind = host in ("0.0.0.0", "::")
# 打印可用渠道类型提示
logger.info(
"[WebChannel] 全部可用通道如下,可修改 config.json 配置文件中的 channel_type 字段进行切换,多个通道用逗号分隔:")
logger.info("[WebChannel] 1. weixin - 微信")
logger.info("[WebChannel] 2. web - 网页")
logger.info("[WebChannel] 3. terminal - 终端")
logger.info("[WebChannel] 4. feishu - 飞书")
logger.info("[WebChannel] 5. dingtalk - 钉钉")
logger.info("[WebChannel] 6. wecom_bot - 企微智能机器人")
logger.info("[WebChannel] 7. wechatcom_app - 企微自建应用")
logger.info("[WebChannel] 8. wechatmp - 个人公众号")
logger.info("[WebChannel] 9. wechatmp_service - 企业公众号")
logger.info("[WebChannel] ✅ Web控制台已运行")
logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}")
if is_public_bind:
logger.info(f"[WebChannel] 🌍 服务器访问: http://YOUR_IP:{port} (将YOUR_IP替换为服务器IP)")
if not _is_password_enabled():
logger.info("[WebChannel] ⚠️ 当前监听 0.0.0.0 且未设置 web_password公网部署建议在 config.json 中配置访问密码")
else:
logger.info(f"[WebChannel] 🔒 当前仅监听 {host},仅本机可访问。如需公网访问,请将 web_host 改为 0.0.0.0 并配置 web_password 密码")
try:
import webbrowser
webbrowser.open(f"http://localhost:{port}")
logger.debug(f"[WebChannel] Opened browser at http://localhost:{port}")
except Exception as e:
logger.debug(f"[WebChannel] Could not open browser: {e}")
# 确保静态文件目录存在
static_dir = os.path.join(os.path.dirname(__file__), 'static')
if not os.path.exists(static_dir):
os.makedirs(static_dir)
logger.debug(f"[WebChannel] Created static directory: {static_dir}")
urls = (
'/', 'RootHandler',
'/auth/login', 'AuthLoginHandler',
'/auth/check', 'AuthCheckHandler',
'/auth/logout', 'AuthLogoutHandler',
'/message', 'MessageHandler',
'/upload', 'UploadHandler',
'/uploads/(.*)', 'UploadsHandler',
'/api/file', 'FileServeHandler',
'/poll', 'PollHandler',
'/stream', 'StreamHandler',
'/chat', 'ChatHandler',
'/config', 'ConfigHandler',
'/api/models', 'ModelsHandler',
'/api/channels', 'ChannelsHandler',
'/api/weixin/qrlogin', 'WeixinQrHandler',
'/api/feishu/register', 'FeishuRegisterHandler',
'/api/tools', 'ToolsHandler',
'/api/skills', 'SkillsHandler',
'/api/memory', 'MemoryHandler',
'/api/memory/content', 'MemoryContentHandler',
'/api/knowledge/list', 'KnowledgeListHandler',
'/api/knowledge/read', 'KnowledgeReadHandler',
'/api/knowledge/graph', 'KnowledgeGraphHandler',
'/api/scheduler', 'SchedulerHandler',
'/api/sessions', 'SessionsHandler',
'/api/sessions/(.*)/generate_title', 'SessionTitleHandler',
'/api/sessions/(.*)/clear_context', 'SessionClearContextHandler',
'/api/sessions/(.*)', 'SessionDetailHandler',
'/api/history', 'HistoryHandler',
'/api/logs', 'LogsHandler',
'/api/version', 'VersionHandler',
'/assets/(.*)', 'AssetsHandler',
)
app = web.application(urls, globals(), autoreload=False)
# 完全禁用web.py的HTTP日志输出
web.httpserver.LogMiddleware.log = lambda self, status, environ: None
# 配置web.py的日志级别为ERROR
logging.getLogger("web").setLevel(logging.ERROR)
logging.getLogger("web.httpserver").setLevel(logging.ERROR)
# Build WSGI app with middleware (same as runsimple but without print)
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)
server = web.httpserver.WSGIServer((host, port), func)
server.daemon_threads = True
# Default request_queue_size(5) / timeout(10s) / numthreads(10) are
# too small: when SSE streams occupy many threads, the backlog fills
# and new connections get refused (ERR_CONNECTION_ABORTED).
server.request_queue_size = 128
server.timeout = 300
server.requests.min = 20
server.requests.max = 80
self._http_server = server
try:
server.start()
except (KeyboardInterrupt, SystemExit):
server.stop()
except OSError as e:
if e.errno in (48, 98): # macOS/Linux EADDRINUSE
logger.error(
f"[WebChannel] 端口 {port} 已被占用,可执行 `cow restart` 清理残留进程,"
f"或在 config.json 中修改 web_port"
)
raise
def stop(self):
if self._http_server:
try:
self._http_server.stop()
logger.info("[WebChannel] HTTP server stopped")
except Exception as e:
logger.warning(f"[WebChannel] Error stopping HTTP server: {e}")
self._http_server = None
class RootHandler:
def GET(self):
raise web.seeother('/chat')
class AuthCheckHandler:
def GET(self):
web.header('Content-Type', 'application/json; charset=utf-8')
if not _is_password_enabled():
return json.dumps({"status": "success", "auth_required": False})
if _check_auth():
return json.dumps({"status": "success", "auth_required": True, "authenticated": True})
return json.dumps({"status": "success", "auth_required": True, "authenticated": False})
class AuthLoginHandler:
def POST(self):
web.header('Content-Type', 'application/json; charset=utf-8')
if not _is_password_enabled():
return json.dumps({"status": "success"})
try:
data = json.loads(web.data())
except Exception:
return json.dumps({"status": "error", "message": "Invalid request"})
password = data.get("password", "")
expected = conf().get("web_password", "")
if not hmac.compare_digest(password, expected):
logger.warning("[WebChannel] Invalid login attempt")
return json.dumps({"status": "error", "message": "Wrong password"})
token = _create_auth_token()
web.setcookie("cow_auth_token", token, expires=_session_expire_seconds(),
path="/", httponly=True, samesite="Lax")
return json.dumps({"status": "success"})
class AuthLogoutHandler:
def POST(self):
web.header('Content-Type', 'application/json; charset=utf-8')
web.setcookie("cow_auth_token", "", expires=-1, path="/")
return json.dumps({"status": "success"})
class MessageHandler:
def POST(self):
_require_auth()
return WebChannel().post_message()
class UploadHandler:
def POST(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
return WebChannel().upload_file()
class UploadsHandler:
def GET(self, file_name):
_require_auth()
try:
upload_dir = _get_upload_dir()
full_path = os.path.normpath(os.path.join(upload_dir, file_name))
if not os.path.abspath(full_path).startswith(os.path.abspath(upload_dir)):
raise web.notfound()
if not os.path.isfile(full_path):
raise web.notfound()
content_type = mimetypes.guess_type(full_path)[0] or "application/octet-stream"
web.header('Content-Type', content_type)
web.header('Cache-Control', 'public, max-age=86400')
with open(full_path, 'rb') as f:
return f.read()
except web.HTTPError:
raise
except Exception as e:
logger.error(f"[WebChannel] Error serving upload: {e}")
raise web.notfound()
class FileServeHandler:
def GET(self):
_require_auth()
try:
params = web.input(path="")
file_path = params.path
if not file_path or not os.path.isabs(file_path):
raise web.notfound()
file_path = os.path.normpath(file_path)
if not os.path.isfile(file_path):
raise web.notfound()
content_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
file_name = os.path.basename(file_path)
from urllib.parse import quote
web.header('Content-Type', content_type)
web.header('Content-Disposition', f"inline; filename*=UTF-8''{quote(file_name)}")
web.header('Cache-Control', 'public, max-age=3600')
with open(file_path, 'rb') as f:
return f.read()
except web.HTTPError:
raise
except Exception as e:
logger.error(f"[WebChannel] Error serving file: {e}")
raise web.notfound()
class PollHandler:
def POST(self):
_require_auth()
return WebChannel().poll_response()
class StreamHandler:
def GET(self):
_require_auth()
params = web.input(request_id='')
request_id = params.request_id
if not request_id:
raise web.badrequest()
web.header('Content-Type', 'text/event-stream; charset=utf-8')
web.header('Cache-Control', 'no-cache')
web.header('X-Accel-Buffering', 'no')
web.header('Access-Control-Allow-Origin', '*')
return WebChannel().stream_response(request_id)
class ChatHandler:
def GET(self):
web.header('Cache-Control', 'no-cache, no-store, must-revalidate')
web.header('Pragma', 'no-cache')
file_path = os.path.join(os.path.dirname(__file__), 'chat.html')
with open(file_path, 'r', encoding='utf-8') as f:
html = f.read()
cache_bust = str(int(time.time()))
html = html.replace('assets/js/console.js', f'assets/js/console.js?v={cache_bust}')
html = html.replace('assets/css/console.css', f'assets/css/console.css?v={cache_bust}')
return html
class ConfigHandler:
_RECOMMENDED_MODELS = [
const.DEEPSEEK_V4_FLASH, const.DEEPSEEK_V4_PRO, const.DEEPSEEK_CHAT, const.DEEPSEEK_REASONER,
const.MINIMAX_M2_7_HIGHSPEED, const.MINIMAX_M2_7, const.MINIMAX_M2_5, const.MINIMAX_M2_1, const.MINIMAX_M2_1_LIGHTNING,
const.CLAUDE_4_6_SONNET, const.CLAUDE_4_7_OPUS, const.CLAUDE_4_6_OPUS, const.CLAUDE_4_5_SONNET,
const.GEMINI_31_FLASH_LITE_PRE, const.GEMINI_31_PRO_PRE, const.GEMINI_3_FLASH_PRE,
const.GPT_54, const.GPT_54_MINI, const.GPT_54_NANO, const.GPT_5, const.GPT_41, const.GPT_4o,
const.GLM_5_1, const.GLM_5_TURBO, const.GLM_5, const.GLM_4_7,
const.QWEN36_PLUS, const.QWEN35_PLUS, const.QWEN3_MAX,
const.DOUBAO_SEED_2_PRO, const.DOUBAO_SEED_2_CODE,
const.KIMI_K2_6, const.KIMI_K2_5, const.KIMI_K2,
const.ERNIE_5_1, const.ERNIE_5, const.ERNIE_X1_1, const.ERNIE_45_TURBO_128K, const.ERNIE_45_TURBO_32K,
]
# Generic placeholder hints surfaced in the web console. We deliberately
# show the version-path tail (e.g. "/v1") so users are reminded to type
# the full base URL. The form is intentionally vague (`...../v1`) so it
# never looks like a real default a user might paste verbatim — and we
# never auto-rewrite anything on the server side.
_PLACEHOLDER_V1 = "https://...../v1"
_PLACEHOLDER_QIANFAN = "https://...../v2"
_PLACEHOLDER_ZHIPU = "https://...../api/paas/v4"
_PLACEHOLDER_DOUBAO = "https://...../api/v3"
_PLACEHOLDER_GEMINI = "https://....."
PROVIDER_MODELS = OrderedDict([
("deepseek", {
"label": "DeepSeek",
"api_key_field": "deepseek_api_key",
"api_base_key": "deepseek_api_base",
"api_base_default": "https://api.deepseek.com/v1",
"api_base_placeholder": _PLACEHOLDER_V1,
"models": [const.DEEPSEEK_V4_FLASH, const.DEEPSEEK_V4_PRO, const.DEEPSEEK_CHAT, const.DEEPSEEK_REASONER],
}),
("minimax", {
"label": "MiniMax",
"api_key_field": "minimax_api_key",
"api_base_key": None,
"api_base_default": None,
"api_base_placeholder": "",
"models": [const.MINIMAX_M2_7, const.MINIMAX_M2_7_HIGHSPEED, const.MINIMAX_M2_5, const.MINIMAX_M2_1, const.MINIMAX_M2_1_LIGHTNING],
}),
("claudeAPI", {
"label": "Claude",
"api_key_field": "claude_api_key",
"api_base_key": "claude_api_base",
"api_base_default": "https://api.anthropic.com/v1",
"api_base_placeholder": _PLACEHOLDER_V1,
"models": [const.CLAUDE_4_6_SONNET, const.CLAUDE_4_7_OPUS, const.CLAUDE_4_6_OPUS, const.CLAUDE_4_5_SONNET],
}),
("gemini", {
"label": "Gemini",
"api_key_field": "gemini_api_key",
"api_base_key": "gemini_api_base",
"api_base_default": "https://generativelanguage.googleapis.com",
"api_base_placeholder": _PLACEHOLDER_GEMINI,
"models": [const.GEMINI_31_FLASH_LITE_PRE, const.GEMINI_31_PRO_PRE, const.GEMINI_3_FLASH_PRE],
}),
("openai", {
"label": "OpenAI",
"api_key_field": "open_ai_api_key",
"api_base_key": "open_ai_api_base",
"api_base_default": "https://api.openai.com/v1",
"api_base_placeholder": _PLACEHOLDER_V1,
"models": [const.GPT_54, const.GPT_54_MINI, const.GPT_54_NANO, const.GPT_5, const.GPT_41, const.GPT_4o],
}),
("zhipu", {
"label": "智谱AI",
"api_key_field": "zhipu_ai_api_key",
"api_base_key": "zhipu_ai_api_base",
"api_base_default": "https://open.bigmodel.cn/api/paas/v4",
"api_base_placeholder": _PLACEHOLDER_ZHIPU,
"models": [const.GLM_5_1, const.GLM_5_TURBO, const.GLM_5, const.GLM_4_7],
}),
("dashscope", {
"label": "通义千问",
"api_key_field": "dashscope_api_key",
"api_base_key": None,
"api_base_default": None,
"api_base_placeholder": "",
"models": [const.QWEN36_PLUS, const.QWEN35_PLUS, const.QWEN3_MAX],
}),
("doubao", {
"label": "豆包",
"api_key_field": "ark_api_key",
"api_base_key": "ark_base_url",
"api_base_default": "https://ark.cn-beijing.volces.com/api/v3",
"api_base_placeholder": _PLACEHOLDER_DOUBAO,
"models": [const.DOUBAO_SEED_2_PRO, const.DOUBAO_SEED_2_CODE],
}),
("moonshot", {
"label": "Kimi",
"api_key_field": "moonshot_api_key",
"api_base_key": "moonshot_base_url",
"api_base_default": "https://api.moonshot.cn/v1",
"api_base_placeholder": _PLACEHOLDER_V1,
"models": [const.KIMI_K2_6, const.KIMI_K2_5, const.KIMI_K2],
}),
("qianfan", {
"label": "百度千帆",
"api_key_field": "qianfan_api_key",
"api_base_key": "qianfan_api_base",
"api_base_default": "https://qianfan.baidubce.com/v2",
"api_base_placeholder": _PLACEHOLDER_QIANFAN,
"models": [const.ERNIE_5_1, const.ERNIE_5, const.ERNIE_X1_1, const.ERNIE_45_TURBO_128K, const.ERNIE_45_TURBO_32K],
}),
("modelscope", {
"label": "ModelScope",
"api_key_field": "modelscope_api_key",
"api_base_key": None,
"api_base_default": None,
"api_base_placeholder": "",
"models": [const.QWEN3_5_27B, const.QWEN3_235B_A22B_INSTRUCT_2507],
}),
("linkai", {
"label": "LinkAI",
"api_key_field": "linkai_api_key",
"api_base_key": None,
"api_base_default": None,
"api_base_placeholder": "",
"models": _RECOMMENDED_MODELS,
}),
("custom", {
"label": "自定义",
"api_key_field": "custom_api_key",
"api_base_key": "custom_api_base",
"api_base_default": "",
"api_base_placeholder": _PLACEHOLDER_V1,
"models": [],
}),
])
EDITABLE_KEYS = {
"model", "bot_type", "use_linkai",
"open_ai_api_base", "deepseek_api_base", "qianfan_api_base", "claude_api_base", "gemini_api_base",
"zhipu_ai_api_base", "moonshot_base_url", "ark_base_url", "custom_api_base",
"open_ai_api_key", "deepseek_api_key", "qianfan_api_key", "claude_api_key", "gemini_api_key",
"zhipu_ai_api_key", "dashscope_api_key", "moonshot_api_key",
"ark_api_key", "minimax_api_key", "linkai_api_key", "custom_api_key",
"agent_max_context_tokens", "agent_max_context_turns", "agent_max_steps",
"enable_thinking", "web_password",
}
@staticmethod
def _mask_key(value: str) -> str:
"""Mask the middle part of an API key for display."""
if not value or len(value) <= 8:
return value
return value[:4] + "*" * (len(value) - 8) + value[-4:]
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
local_config = conf()
use_agent = local_config.get("agent", False)
title = "CowAgent" if use_agent else "AI Assistant"
api_bases = {}
api_keys_masked = {}
for pid, pinfo in self.PROVIDER_MODELS.items():
base_key = pinfo.get("api_base_key")
if base_key:
api_bases[base_key] = local_config.get(base_key, pinfo["api_base_default"])
key_field = pinfo.get("api_key_field")
if key_field and key_field not in api_keys_masked:
raw = local_config.get(key_field, "")
api_keys_masked[key_field] = self._mask_key(raw) if raw else ""
providers = {}
for pid, p in self.PROVIDER_MODELS.items():
providers[pid] = {
"label": p["label"],
"models": p["models"],
"api_base_key": p["api_base_key"],
"api_base_default": p["api_base_default"],
"api_base_placeholder": p.get("api_base_placeholder", ""),
"api_key_field": p.get("api_key_field"),
}
raw_pwd = local_config.get("web_password", "")
masked_pwd = ("*" * len(raw_pwd)) if raw_pwd else ""
return json.dumps({
"status": "success",
"use_agent": use_agent,
"title": title,
"model": local_config.get("model", ""),
"bot_type": "openai" if local_config.get("bot_type") == "chatGPT" else local_config.get("bot_type", ""),
"use_linkai": bool(local_config.get("use_linkai", False)),
"channel_type": local_config.get("channel_type", ""),
"agent_max_context_tokens": local_config.get("agent_max_context_tokens", 50000),
"agent_max_context_turns": local_config.get("agent_max_context_turns", 20),
"agent_max_steps": local_config.get("agent_max_steps", 20),
"enable_thinking": bool(local_config.get("enable_thinking", False)),
"api_bases": api_bases,
"api_keys": api_keys_masked,
"providers": providers,
"web_password_masked": masked_pwd,
}, ensure_ascii=False)
except Exception as e:
logger.error(f"Error getting config: {e}")
return json.dumps({"status": "error", "message": str(e)})
def POST(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
data = json.loads(web.data())
updates = data.get("updates", {})
if not updates:
return json.dumps({"status": "error", "message": "no updates provided"})
local_config = conf()
applied = {}
for key, value in updates.items():
if key not in self.EDITABLE_KEYS:
continue
if key in ("agent_max_context_tokens", "agent_max_context_turns", "agent_max_steps"):
value = int(value)
if key in ("use_linkai", "enable_thinking"):
value = bool(value)
local_config[key] = value
applied[key] = value
if not applied:
return json.dumps({"status": "error", "message": "no valid keys to update"})
config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))), "config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
file_cfg = json.load(f)
else:
file_cfg = {}
file_cfg.update(applied)
with open(config_path, "w", encoding="utf-8") as f:
json.dump(file_cfg, f, indent=4, ensure_ascii=False)
logger.info(f"[WebChannel] Config updated: {list(applied.keys())}")
# Reset Bridge so that bot routing reflects the new config.
# Without this, Bridge keeps its cached bot instance (e.g. LinkAIBot)
# even after the user switches bot_type / use_linkai / model in UI.
bridge_routing_keys = {"bot_type", "use_linkai", "model"}
if any(k in applied for k in bridge_routing_keys):
try:
from bridge.bridge import Bridge
Bridge().reset_bot()
logger.info("[WebChannel] Bridge bot routing reset due to config change")
except Exception as reset_err:
logger.warning(f"[WebChannel] Failed to reset bridge: {reset_err}")
return json.dumps({"status": "success", "applied": applied}, ensure_ascii=False)
except Exception as e:
logger.error(f"Error updating config: {e}")
return json.dumps({"status": "error", "message": str(e)})
class ModelsHandler:
"""API for the unified Models console.
Layered model:
Layer 1 (providers): vendor credentials shared across capabilities.
Stored as flat *_api_key / *_api_base fields in
config.json — the same fields ConfigHandler
already manages.
Layer 2 (capabilities): which provider/model is used by chat / vision /
asr / tts / embedding / image / search.
GET /api/models -> overview (providers + capabilities)
POST /api/models/provider -> upsert a vendor credential
DELETE /api/models/provider -> clear a vendor credential
POST /api/models/capability -> set provider/model for a capability
"""
# Capability -> editable flag, current-value resolver, and supported provider
# ids drawn from ConfigHandler.PROVIDER_MODELS where applicable.
_ASR_PROVIDERS = ["openai", "linkai", "baidu", "ali", "xunfei", "azure", "google"]
_TTS_PROVIDERS = ["openai", "linkai", "minimax", "baidu", "ali", "xunfei", "azure", "google", "elevenlabs", "edge", "pytts"]
_EMBEDDING_PROVIDERS = ["openai", "dashscope", "doubao", "zhipu", "linkai"]
# Capability-scoped model catalogs. The chat dropdown can reuse the
# provider's generic model list, but vision and image generation are
# served by a narrower subset that the runtime actually dispatches to —
# see agent/tools/vision/vision.py and skills/image-generation/SKILL.md.
# Anything not listed here intentionally hides the model dropdown so
# users cannot pin a chat-only model and silently get a 4xx at runtime.
_VISION_PROVIDER_MODELS = {
# OpenAI ordering matches the recommended GPT-5.4 family first, then
# GPT-5 and the GPT-4.1/4o backstops.
"openai": [
const.GPT_54_MINI,
const.GPT_54_NANO,
const.GPT_54,
const.GPT_5,
const.GPT_41,
const.GPT_41_MINI,
const.GPT_4o,
],
"doubao": [const.DOUBAO_SEED_2_PRO],
"moonshot": [const.KIMI_K2_6],
"dashscope": [const.QWEN36_PLUS, const.QWEN35_PLUS, const.QWEN3_MAX],
"claudeAPI": [const.CLAUDE_4_6_SONNET, const.CLAUDE_4_7_OPUS, const.CLAUDE_4_6_OPUS],
"gemini": [const.GEMINI_31_FLASH_LITE_PRE, const.GEMINI_31_PRO_PRE, const.GEMINI_3_FLASH_PRE],
"qianfan": [const.ERNIE_45_TURBO_VL],
# Zhipu's bot hard-codes the call to glm-5v-turbo regardless of what
# name is passed in (see models/zhipuai/zhipuai_bot.py::call_vision),
# so listing the chat models here would silently route to the same
# endpoint. Surface only the model the runtime can truly dispatch to.
"zhipu": [const.GLM_5V_TURBO],
# MiniMax's vision endpoint is similarly hard-coded to MiniMax-Text-01
# (see models/minimax/minimax_bot.py::call_vision); the M2.x chat
# family is text-only.
"minimax": [const.MINIMAX_TEXT_01],
# LinkAI proxies the underlying vendor; surface a curated set of
# multimodal models. Order: gpt-4.1-mini → gpt-5.4-mini as the
# cross-vendor baselines, then each vendor's recommended default.
"linkai": [
const.GPT_41_MINI,
const.GPT_54_MINI,
const.QWEN36_PLUS,
const.DOUBAO_SEED_2_PRO,
const.KIMI_K2_6,
const.CLAUDE_4_6_SONNET,
const.GEMINI_31_FLASH_LITE_PRE,
],
}
# Image-generation catalog. Source of truth: skills/image-generation/SKILL.md.
# Listed verbatim (not via const.*) because these are skill-side names
# the script forwards directly to the vendor's image endpoint.
#
# Two shapes are accepted per model entry:
# - bare string → the model id, no hint
# - {"value": ..., "hint": "..."} → model id + dim secondary
# label rendered on the right
# of the dropdown row. Useful
# for surfacing brand names
# (e.g. "Nano Banana 2" next
# to gemini-3.1-flash-image-preview).
# The skill itself maps either form to the real vendor endpoint, so the
# hint is purely cosmetic.
_IMAGE_PROVIDER_MODELS = {
"openai": ["gpt-image-2", "gpt-image-1"],
"gemini": [
{"value": "gemini-3.1-flash-image-preview", "hint": "Nano Banana 2"},
{"value": "gemini-3-pro-image-preview", "hint": "Nano Banana Pro"},
{"value": "gemini-2.5-flash-image", "hint": "Nano Banana"},
],
"doubao": ["seedream-5.0-lite", "seedream-4.5"],
"dashscope": ["qwen-image-2.0-pro", "qwen-image-2.0"],
"minimax": ["image-01"],
"linkai": ["gpt-image-2", "gemini-3-pro-image-preview", "seedream-5.0-lite"],
}
@staticmethod
def _config_path() -> str:
return os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"config.json",
)
@classmethod
def _read_file_config(cls) -> dict:
path = cls._config_path()
if not os.path.exists(path):
return {}
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
@classmethod
def _write_file_config(cls, data: dict) -> None:
with open(cls._config_path(), "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
@staticmethod
def _is_real_key(value: str) -> bool:
return bool(value) and value not in ("", "YOUR API KEY", "YOUR_API_KEY")
@classmethod
def _provider_overview(cls) -> List[dict]:
"""All known providers (configured first, unconfigured after).
Re-uses ConfigHandler.PROVIDER_MODELS for the canonical list."""
local_config = conf()
items = []
for pid, p in ConfigHandler.PROVIDER_MODELS.items():
key_field = p.get("api_key_field")
base_field = p.get("api_base_key")
raw_key = local_config.get(key_field, "") if key_field else ""
raw_base = local_config.get(base_field, "") if base_field else ""
configured = cls._is_real_key(raw_key)
items.append({
"id": pid,
"label": p["label"],
"configured": configured,
"api_key_field": key_field,
"api_base_field": base_field,
"api_key_masked": ConfigHandler._mask_key(raw_key) if configured else "",
"api_base": raw_base or (p.get("api_base_default") or ""),
"api_base_default": p.get("api_base_default") or "",
"api_base_placeholder": p.get("api_base_placeholder") or "",
"models": list(p.get("models") or []),
})
items.sort(key=lambda it: (0 if it["configured"] else 1, list(ConfigHandler.PROVIDER_MODELS.keys()).index(it["id"])))
return items
@classmethod
def _chat_capability(cls, local_config: dict) -> dict:
"""Main chat model — drives the agent. bot_type maps to a provider id."""
bot_type = local_config.get("bot_type") or ""
provider_id = "openai" if bot_type == "chatGPT" else bot_type
if provider_id not in ConfigHandler.PROVIDER_MODELS and local_config.get("use_linkai"):
provider_id = "linkai"
return {
"editable": True,
"current_provider": provider_id,
"current_model": local_config.get("model", ""),
"providers": list(ConfigHandler.PROVIDER_MODELS.keys()),
"use_linkai": bool(local_config.get("use_linkai", False)),
}
# Auto-fallback order for vision when no explicit model is pinned.
# Mirrors agent/tools/vision/vision.py::_resolve_providers — DeepSeek and
# other text-only chat bots are intentionally absent, since they cannot
# actually serve a vision request. Each entry is
# (provider_id, api_key_field, default_vision_model)
# and lookups are case-insensitive on the api_key_field. LinkAI and
# OpenAI are handled separately below so use_linkai can promote LinkAI
# to the front of the chain.
_VISION_AUTO_ORDER = [
("moonshot", "moonshot_api_key", const.KIMI_K2_6),
("doubao", "ark_api_key", const.DOUBAO_SEED_2_PRO),
("dashscope", "dashscope_api_key", const.QWEN36_PLUS),
("claudeAPI", "claude_api_key", const.CLAUDE_4_6_SONNET),
("gemini", "gemini_api_key", const.GEMINI_31_FLASH_LITE_PRE),
("qianfan", "qianfan_api_key", const.ERNIE_45_TURBO_VL),
("zhipu", "zhipu_ai_api_key", const.GLM_5V_TURBO),
("minimax", "minimax_api_key", const.MINIMAX_TEXT_01),
]
@classmethod
def _predict_vision_auto(cls, local_config: dict) -> dict:
"""Predict which provider vision.py will actually dispatch to when
no tool.vision.model is set. Mirrors the fallback order in
agent/tools/vision/vision.py::_resolve_providers so the UI hint
matches reality."""
chat = cls._chat_capability(local_config)
main_provider = chat["current_provider"]
main_model = chat["current_model"]
use_linkai_flag = bool(local_config.get("use_linkai", False))
linkai_configured = cls._is_real_key(local_config.get("linkai_api_key", ""))
def _try(pid: str, model_default: str):
# Look up the api_key for this provider via the canonical
# provider table so we don't hardcode field names here.
meta = ConfigHandler.PROVIDER_MODELS.get(pid) or {}
key_field = meta.get("api_key_field")
if not key_field:
return None
if not cls._is_real_key(local_config.get(key_field, "")):
return None
# Pick a model that the vision runtime can actually dispatch to
# for this provider. Using `main_model` here is unsafe — for
# vendors like Zhipu/MiniMax the bot hard-codes the vision model
# name regardless of the chat-model name, so surfacing the chat
# model name in the hint is misleading. Trust the curated
# _VISION_PROVIDER_MODELS list: prefer the main model only if
# it appears there; otherwise show the vendor's first vision-
# capable model.
allowed = cls._VISION_PROVIDER_MODELS.get(pid, [])
if pid == main_provider and main_model and main_model in allowed:
return {"provider": pid, "model": main_model}
fallback = allowed[0] if allowed else model_default
return {"provider": pid, "model": fallback}
# 1. use_linkai → suppress the hint entirely. LinkAI is a proxy and
# we don't observe which underlying model it picks; surfacing
# "LinkAI" with no model would not tell the user anything useful.
if use_linkai_flag and linkai_configured:
return {"provider": "", "model": ""}
# 2. Main bot — only when it natively supports vision. We approximate
# "natively supports" by membership in _VISION_PROVIDER_MODELS,
# which is the same set vision.py's _DISCOVERABLE_MODELS covers
# (minus the chat-only DeepSeek family).
if main_provider in cls._VISION_PROVIDER_MODELS:
hit = _try(main_provider, main_model)
if hit:
return hit
# 3. Other discoverable providers in declared order
for pid, _key, default_model in cls._VISION_AUTO_ORDER:
hit = _try(pid, default_model)
if hit:
return hit
# 4. OpenAI raw HTTP
if cls._is_real_key(local_config.get("open_ai_api_key", "")):
return {"provider": "openai", "model": const.GPT_41_MINI}
# 5. LinkAI as last resort (only reached when use_linkai is off)
if linkai_configured:
return {"provider": "linkai", "model": const.GPT_41_MINI}
return {"provider": "", "model": ""}
@classmethod
def _vision_capability(cls, local_config: dict) -> dict:
"""Vision model. tool.vision.model is the explicit override; otherwise
the runtime fallback chain in agent/tools/vision/vision.py decides."""
tool_conf = local_config.get("tool") or {}
if not isinstance(tool_conf, dict):
tool_conf = {}
vision_conf = tool_conf.get("vision") or {}
if not isinstance(vision_conf, dict):
vision_conf = {}
user_specified = (vision_conf.get("model") or "").strip()
# When the user pinned a specific model, infer which vendor card to
# highlight by scanning the per-provider model lists. Falls back to
# an empty provider so the dropdown stays on "auto" if we can't tell.
inferred_provider = ""
if user_specified:
for pid, models in cls._VISION_PROVIDER_MODELS.items():
if user_specified in models:
inferred_provider = pid
break
# In auto mode the hint should reflect what vision.py will actually
# dispatch to — surface that prediction via fallback_* so the UI
# shows e.g. "openai / gpt-4.1-mini" instead of the chat-model name.
predicted = cls._predict_vision_auto(local_config)
return {
"editable": True,
"strategy": "specified" if user_specified else "auto",
"user_specified_model": user_specified,
"current_provider": inferred_provider,
"current_model": user_specified,
"fallback_provider": predicted["provider"],
"fallback_model": predicted["model"],
"providers": list(cls._VISION_PROVIDER_MODELS.keys()),
"provider_models": cls._VISION_PROVIDER_MODELS,
}
@classmethod
def _asr_capability(cls, local_config: dict) -> dict:
provider_id = (local_config.get("voice_to_text") or "openai").strip().lower()
return {
"editable": True,
"current_provider": provider_id,
"current_model": "",
"providers": cls._ASR_PROVIDERS,
}
@classmethod
def _tts_capability(cls, local_config: dict) -> dict:
provider_id = (local_config.get("text_to_voice") or "openai").strip().lower()
return {
"editable": True,
"current_provider": provider_id,
"current_model": local_config.get("text_to_voice_model", "") or "",
"providers": cls._TTS_PROVIDERS,
}
@classmethod
def _embedding_capability(cls, local_config: dict) -> dict:
# Embedding is "pick or empty" — runtime's legacy openai/linkai
# fallback is a safety net, not a UX-visible auto mode.
# `suggested_provider` is a UI-only hint (NOT persisted) that
# preselects the dropdown to whichever configured vendor we'd
# recommend, so users don't have to expand the menu to find it.
explicit = (local_config.get("embedding_provider") or "").strip().lower()
suggested = ""
if not explicit:
for pid in cls._EMBEDDING_PROVIDERS:
meta = ConfigHandler.PROVIDER_MODELS.get(pid) or {}
key_field = meta.get("api_key_field")
if key_field and cls._is_real_key(local_config.get(key_field, "")):
suggested = pid
break
return {
"editable": True,
"current_provider": explicit,
"suggested_provider": suggested,
"current_model": local_config.get("embedding_model", "") or "",
"current_dim": int(local_config.get("embedding_dimensions") or 0) or None,
"providers": cls._EMBEDDING_PROVIDERS,
}
# Auto-fallback order for image generation. Mirrors the global priority
# used inside skills/image-generation/scripts/generate.py
# (`_DEFAULT_PROVIDER_ORDER`): OpenAI → Gemini → Seedream(Ark/doubao) →
# Qwen(dashscope) → MiniMax → LinkAI. Each entry maps the
# provider-card id to the script's per-provider DEFAULT_MODEL so the
# hint matches what the runtime would actually request.
_IMAGE_AUTO_ORDER = [
("openai", "gpt-image-2"),
("gemini", "gemini-3.1-flash-image-preview"), # nano-banana-2
("doubao", "seedream-5.0-lite"),
("dashscope", "qwen-image-2.0"),
("minimax", "image-01"),
("linkai", "gpt-image-2"),
]
@classmethod
def _predict_image_auto(cls, local_config: dict) -> dict:
"""Predict which provider/model the image-generation skill will hit
when no SKILL_IMAGE_GENERATION_MODEL override is set. Mirrors
skills/image-generation/scripts/generate.py::_build_providers so
the UI hint matches reality. Chat-only providers (DeepSeek etc.)
are absent by design — image generation never falls back to a chat
bot regardless of the main model.
When use_linkai is enabled the hint is suppressed entirely — LinkAI
proxies to whichever backend it deems appropriate and surfacing
"LinkAI" alone tells the user nothing actionable."""
use_linkai_flag = bool(local_config.get("use_linkai", False))
linkai_configured = cls._is_real_key(local_config.get("linkai_api_key", ""))
if use_linkai_flag and linkai_configured:
return {"provider": "", "model": ""}
for pid, default_model in cls._IMAGE_AUTO_ORDER:
meta = ConfigHandler.PROVIDER_MODELS.get(pid) or {}
key_field = meta.get("api_key_field")
if not key_field:
continue
if cls._is_real_key(local_config.get(key_field, "")):
return {"provider": pid, "model": default_model}
return {"provider": "", "model": ""}
@classmethod
def _image_capability(cls, local_config: dict) -> dict:
"""Image generation. Source of truth: config["skill"]["image-generation"]["model"]
(mirrors the per-skill config schema documented in skills/image-generation).
The runtime resolver in skills/image-generation/scripts/generate.py
reads this via the SKILL_IMAGE_GENERATION_MODEL env var that the
agent_initializer syncs at startup; provider is inferred from the
model name prefix, mirroring vision.py's design.
"""
skill_node = local_config.get("skill") or {}
if not isinstance(skill_node, dict):
skill_node = {}
img_node = skill_node.get("image-generation") or {}
if not isinstance(img_node, dict):
img_node = {}
explicit_model = (img_node.get("model") or "").strip()
# Infer the provider card to highlight by scanning per-provider
# model lists, including alias values inside {value, hint} entries.
inferred_provider = ""
if explicit_model:
for pid, models in cls._IMAGE_PROVIDER_MODELS.items():
for entry in models:
val = entry if isinstance(entry, str) else (entry.get("value") or "")
if val == explicit_model:
inferred_provider = pid
break
if inferred_provider:
break
# In auto mode the hint should reflect what generate.py will actually
# dispatch to — surface that prediction via fallback_* so the UI
# never claims a chat-only bot (e.g. minimax/MiniMax-M2.7) "would
# generate the image", which is impossible.
predicted = cls._predict_image_auto(local_config)
return {
"editable": True,
"strategy": "specified" if explicit_model else "auto",
"current_provider": inferred_provider,
"current_model": explicit_model,
"fallback_provider": predicted["provider"],
"fallback_model": predicted["model"],
"providers": list(cls._IMAGE_PROVIDER_MODELS.keys()),
"provider_models": cls._IMAGE_PROVIDER_MODELS,
# The dispatcher that honors a pinned provider isn't wired up
# yet; advertise this so the UI can show a "saved but not active"
# banner until the runtime catches up.
"runtime_active": False,
"note": "router_pending",
}
@classmethod
def _search_capability(cls, local_config: dict) -> dict:
"""Web search resolves at runtime via env vars (BOCHA -> LINKAI)."""
if cls._is_real_key(os.environ.get("BOCHA_API_KEY", "")):
current = "bocha"
elif cls._is_real_key(local_config.get("linkai_api_key", "")) or cls._is_real_key(os.environ.get("LINKAI_API_KEY", "")):
current = "linkai"
else:
current = ""
return {
"editable": False,
"current_provider": current,
"available": bool(current),
"note": "set_BOCHA_API_KEY_env" if not current else "",
}
@classmethod
def _capabilities(cls, local_config: dict) -> dict:
return {
"chat": cls._chat_capability(local_config),
"vision": cls._vision_capability(local_config),
"asr": cls._asr_capability(local_config),
"tts": cls._tts_capability(local_config),
"embedding": cls._embedding_capability(local_config),
"image": cls._image_capability(local_config),
"search": cls._search_capability(local_config),
}
def GET(self):
_require_auth()
web.header("Content-Type", "application/json; charset=utf-8")
try:
local_config = conf()
return json.dumps({
"status": "success",
"providers": self._provider_overview(),
"capabilities": self._capabilities(local_config),
}, ensure_ascii=False)
except Exception as e:
logger.error(f"[ModelsHandler] GET failed: {e}")
return json.dumps({"status": "error", "message": str(e)})
def POST(self):
_require_auth()
web.header("Content-Type", "application/json; charset=utf-8")
try:
data = json.loads(web.data() or b"{}")
action = data.get("action") or ""
if action == "set_provider":
return self._handle_set_provider(data)
if action == "delete_provider":
return self._handle_delete_provider(data)
if action == "set_capability":
return self._handle_set_capability(data)
return json.dumps({"status": "error", "message": f"unknown action: {action!r}"})
except Exception as e:
logger.error(f"[ModelsHandler] POST failed: {e}")
return json.dumps({"status": "error", "message": str(e)})
def _handle_set_provider(self, data: dict) -> str:
provider_id = (data.get("provider_id") or "").strip()
meta = ConfigHandler.PROVIDER_MODELS.get(provider_id)
if not meta:
return json.dumps({"status": "error", "message": f"unknown provider: {provider_id}"})
# api_key absent / empty / null => leave the existing key untouched
# (used by the "edit only base url" flow). To clear the key, callers
# must use action=delete_provider explicitly.
api_key_raw = data.get("api_key")
api_key = api_key_raw.strip() if isinstance(api_key_raw, str) else ""
# api_base presence is significant: an explicit "" means "reset to
# default", whereas a missing key means "no change".
api_base_present = "api_base" in data
api_base = (data.get("api_base") or "").strip() if api_base_present else None
applied = {}
local_config = conf()
file_cfg = self._read_file_config()
key_field = meta.get("api_key_field")
if key_field and api_key:
local_config[key_field] = api_key
file_cfg[key_field] = api_key
applied[key_field] = True
base_field = meta.get("api_base_key")
if base_field and api_base_present:
local_config[base_field] = api_base
file_cfg[base_field] = api_base
applied[base_field] = True
if not applied:
# Nothing actually changed (e.g. user opened the modal and hit
# save without editing). Treat as a successful no-op so the
# frontend can show "Saved" instead of surfacing an error.
return json.dumps({"status": "success", "provider": provider_id, "noop": True})
self._write_file_config(file_cfg)
logger.info(f"[ModelsHandler] provider {provider_id} updated: {sorted(applied.keys())}")
# Vendor credentials affect bot routing for any capability that uses
# them; safest to reset Bridge so the next request rebuilds bots.
self._reset_bridge()
return json.dumps({"status": "success", "provider": provider_id})
def _handle_delete_provider(self, data: dict) -> str:
provider_id = (data.get("provider_id") or "").strip()
meta = ConfigHandler.PROVIDER_MODELS.get(provider_id)
if not meta:
return json.dumps({"status": "error", "message": f"unknown provider: {provider_id}"})
local_config = conf()
file_cfg = self._read_file_config()
cleared = []
for field_name in (meta.get("api_key_field"), meta.get("api_base_key")):
if not field_name:
continue
if field_name in local_config:
local_config[field_name] = ""
file_cfg[field_name] = ""
cleared.append(field_name)
self._write_file_config(file_cfg)
logger.info(f"[ModelsHandler] provider {provider_id} cleared: {cleared}")
self._reset_bridge()
return json.dumps({"status": "success", "provider": provider_id, "cleared": cleared})
def _handle_set_capability(self, data: dict) -> str:
capability = (data.get("capability") or "").strip()
provider_id = (data.get("provider_id") or "").strip()
model = (data.get("model") or "").strip()
if capability == "chat":
return self._set_chat(provider_id, model)
if capability == "vision":
return self._set_vision(provider_id, model)
if capability == "asr":
return self._set_simple("voice_to_text", provider_id)
if capability == "tts":
return self._set_tts(provider_id, model)
if capability == "embedding":
return self._set_embedding(provider_id, model)
if capability == "image":
return self._set_image(provider_id, model)
return json.dumps({"status": "error", "message": f"capability not editable: {capability}"})
def _set_image(self, provider_id: str, model: str) -> str:
# Source of truth: config["skill"]["image-generation"]["model"].
# provider_id is informational only (used by the UI to highlight a
# vendor card); the runtime resolver infers the provider from the
# model name prefix at request time, mirroring vision.py's design.
# An empty model means "switch back to auto / let the script pick".
local_config = conf()
file_cfg = self._read_file_config()
def _ensure_skill_node(cfg: dict) -> dict:
skill_node = cfg.get("skill") or {}
if not isinstance(skill_node, dict):
skill_node = {}
img_node = skill_node.get("image-generation") or {}
if not isinstance(img_node, dict):
img_node = {}
skill_node["image-generation"] = img_node
cfg["skill"] = skill_node
return img_node
_ensure_skill_node(local_config)["model"] = model or ""
_ensure_skill_node(file_cfg)["model"] = model or ""
self._write_file_config(file_cfg)
# The skill subprocess (skills/image-generation/scripts/generate.py)
# reads SKILL_IMAGE_GENERATION_MODEL from its environment, which is
# only synced from config["skill"] at startup. Update os.environ live
# so changes take effect on the next call without a restart. An empty
# model means "clear the override" → drop the env var entirely.
env_key = "SKILL_IMAGE_GENERATION_MODEL"
if model:
os.environ[env_key] = model
else:
os.environ.pop(env_key, None)
logger.info(f"[ModelsHandler] image updated: provider_hint={provider_id!r} model={model!r}")
return json.dumps({
"status": "success",
"provider": provider_id,
"model": model,
"router_pending": True,
})
def _set_chat(self, provider_id: str, model: str) -> str:
if provider_id and provider_id not in ConfigHandler.PROVIDER_MODELS:
return json.dumps({"status": "error", "message": f"unknown provider: {provider_id}"})
applied = {}
local_config = conf()
file_cfg = self._read_file_config()
if provider_id:
bot_type_value = "chatGPT" if provider_id == "openai" else provider_id
local_config["bot_type"] = bot_type_value
file_cfg["bot_type"] = bot_type_value
applied["bot_type"] = bot_type_value
use_linkai = (provider_id == "linkai")
local_config["use_linkai"] = use_linkai
file_cfg["use_linkai"] = use_linkai
applied["use_linkai"] = use_linkai
if model:
local_config["model"] = model
file_cfg["model"] = model
applied["model"] = model
if not applied:
# No-op save (nothing to write). Return success so the UI can
# confirm the click without showing a misleading error.
return json.dumps({"status": "success", "applied": {}, "noop": True})
self._write_file_config(file_cfg)
logger.info(f"[ModelsHandler] chat updated: {applied}")
self._reset_bridge()
return json.dumps({"status": "success", "applied": applied})
def _set_vision(self, provider_id: str, model: str) -> str:
# Vision uses tool.vision.model (nested). provider_id is informational
# only; the runtime resolver auto-routes by model name prefix.
local_config = conf()
file_cfg = self._read_file_config()
tool_node = file_cfg.get("tool") or {}
if not isinstance(tool_node, dict):
tool_node = {}
vision_node = tool_node.get("vision") or {}
if not isinstance(vision_node, dict):
vision_node = {}
vision_node["model"] = model
tool_node["vision"] = vision_node
file_cfg["tool"] = tool_node
# Mirror into in-memory config so the live agent sees the change.
runtime_tool = local_config.get("tool") or {}
if not isinstance(runtime_tool, dict):
runtime_tool = {}
runtime_vision = runtime_tool.get("vision") or {}
if not isinstance(runtime_vision, dict):
runtime_vision = {}
runtime_vision["model"] = model
runtime_tool["vision"] = runtime_vision
local_config["tool"] = runtime_tool
self._write_file_config(file_cfg)
logger.info(f"[ModelsHandler] vision model set: {model!r}")
return json.dumps({"status": "success", "model": model})
def _set_simple(self, key: str, value: str) -> str:
local_config = conf()
file_cfg = self._read_file_config()
local_config[key] = value
file_cfg[key] = value
self._write_file_config(file_cfg)
logger.info(f"[ModelsHandler] {key} set: {value!r}")
return json.dumps({"status": "success", key: value})
def _set_tts(self, provider_id: str, model: str) -> str:
local_config = conf()
file_cfg = self._read_file_config()
if provider_id:
local_config["text_to_voice"] = provider_id
file_cfg["text_to_voice"] = provider_id
if model:
local_config["text_to_voice_model"] = model
file_cfg["text_to_voice_model"] = model
self._write_file_config(file_cfg)
logger.info(f"[ModelsHandler] tts updated: provider={provider_id!r} model={model!r}")
return json.dumps({"status": "success", "provider": provider_id, "model": model})
def _set_embedding(self, provider_id: str, model: str) -> str:
# provider_id="" + model="" means "switch back to legacy auto mode".
local_config = conf()
file_cfg = self._read_file_config()
local_config["embedding_provider"] = provider_id
file_cfg["embedding_provider"] = provider_id
if model:
local_config["embedding_model"] = model
file_cfg["embedding_model"] = model
else:
local_config["embedding_model"] = ""
file_cfg["embedding_model"] = ""
self._write_file_config(file_cfg)
logger.info(f"[ModelsHandler] embedding updated: provider={provider_id!r} model={model!r}")
# The agent's MemoryManager picks the new provider on next process
# restart; the index dim may now mismatch so a rebuild is needed.
# The frontend surfaces this via a confirm + post-save dialog.
return json.dumps({"status": "success", "provider": provider_id, "model": model})
@staticmethod
def _reset_bridge() -> None:
try:
from bridge.bridge import Bridge
Bridge().reset_bot()
logger.info("[ModelsHandler] Bridge bot routing reset")
except Exception as e:
logger.warning(f"[ModelsHandler] Bridge reset failed: {e}")
class ChannelsHandler:
"""API for managing external channel configurations (feishu, dingtalk, etc)."""
CHANNEL_DEFS = OrderedDict([
("weixin", {
"label": {"zh": "微信", "en": "WeChat"},
"icon": "fa-comment",
"color": "emerald",
"fields": [],
}),
("feishu", {
"label": {"zh": "飞书", "en": "Feishu"},
"icon": "fa-paper-plane",
"color": "blue",
"fields": [
{"key": "feishu_app_id", "label": "App ID", "type": "text"},
{"key": "feishu_app_secret", "label": "App Secret", "type": "secret"},
],
}),
("dingtalk", {
"label": {"zh": "钉钉", "en": "DingTalk"},
"icon": "fa-comments",
"color": "blue",
"fields": [
{"key": "dingtalk_client_id", "label": "Client ID", "type": "text"},
{"key": "dingtalk_client_secret", "label": "Client Secret", "type": "secret"},
],
}),
("wecom_bot", {
"label": {"zh": "企微智能机器人", "en": "WeCom Bot"},
"icon": "fa-robot",
"color": "emerald",
"fields": [
{"key": "wecom_bot_id", "label": "Bot ID", "type": "text"},
{"key": "wecom_bot_secret", "label": "Secret", "type": "secret"},
],
}),
("qq", {
"label": {"zh": "QQ 机器人", "en": "QQ Bot"},
"icon": "fa-comment",
"color": "blue",
"fields": [
{"key": "qq_app_id", "label": "App ID", "type": "text"},
{"key": "qq_app_secret", "label": "App Secret", "type": "secret"},
],
}),
("wechatcom_app", {
"label": {"zh": "企微自建应用", "en": "WeCom App"},
"icon": "fa-building",
"color": "emerald",
"fields": [
{"key": "wechatcom_corp_id", "label": "Corp ID", "type": "text"},
{"key": "wechatcomapp_agent_id", "label": "Agent ID", "type": "text"},
{"key": "wechatcomapp_secret", "label": "Secret", "type": "secret"},
{"key": "wechatcomapp_token", "label": "Token", "type": "secret"},
{"key": "wechatcomapp_aes_key", "label": "AES Key", "type": "secret"},
{"key": "wechatcomapp_port", "label": "Port", "type": "number", "default": 9898},
],
}),
("wechatmp", {
"label": {"zh": "公众号", "en": "WeChat MP"},
"icon": "fa-comment-dots",
"color": "emerald",
"fields": [
{"key": "wechatmp_app_id", "label": "App ID", "type": "text"},
{"key": "wechatmp_app_secret", "label": "App Secret", "type": "secret"},
{"key": "wechatmp_token", "label": "Token", "type": "secret"},
{"key": "wechatmp_aes_key", "label": "AES Key", "type": "secret"},
{"key": "wechatmp_port", "label": "Port", "type": "number", "default": 8080},
],
}),
])
@staticmethod
def _get_weixin_login_status() -> str:
try:
import sys
app_module = sys.modules.get('__main__') or sys.modules.get('app')
mgr = getattr(app_module, '_channel_mgr', None) if app_module else None
if mgr:
ch = mgr.get_channel("weixin")
if ch and hasattr(ch, 'login_status'):
return ch.login_status
except Exception:
pass
return "unknown"
@staticmethod
def _mask_secret(value: str) -> str:
if not value or len(value) <= 8:
return value
return value[:4] + "*" * (len(value) - 8) + value[-4:]
@staticmethod
def _parse_channel_list(raw) -> list:
if isinstance(raw, list):
return [ch.strip() for ch in raw if ch.strip()]
if isinstance(raw, str):
return [ch.strip() for ch in raw.split(",") if ch.strip()]
return []
@classmethod
def _active_channel_set(cls) -> set:
return set(cls._parse_channel_list(conf().get("channel_type", "")))
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
local_config = conf()
active_channels = self._active_channel_set()
channels = []
for ch_name, ch_def in self.CHANNEL_DEFS.items():
fields_out = []
for f in ch_def["fields"]:
raw_val = local_config.get(f["key"], f.get("default", ""))
if f["type"] == "secret" and raw_val:
display_val = self._mask_secret(str(raw_val))
else:
display_val = raw_val
fields_out.append({
"key": f["key"],
"label": f["label"],
"type": f["type"],
"value": display_val,
"default": f.get("default", ""),
})
ch_info = {
"name": ch_name,
"label": ch_def["label"],
"icon": ch_def["icon"],
"color": ch_def["color"],
"active": ch_name in active_channels,
"fields": fields_out,
}
if ch_name == "weixin" and ch_name in active_channels:
ch_info["login_status"] = self._get_weixin_login_status()
channels.append(ch_info)
return json.dumps({"status": "success", "channels": channels}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Channels API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def POST(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
body = json.loads(web.data())
action = body.get("action")
channel_name = body.get("channel")
if not action or not channel_name:
return json.dumps({"status": "error", "message": "action and channel required"})
if channel_name not in self.CHANNEL_DEFS:
return json.dumps({"status": "error", "message": f"unknown channel: {channel_name}"})
if action == "save":
return self._handle_save(channel_name, body.get("config", {}))
elif action == "connect":
return self._handle_connect(channel_name, body.get("config", {}))
elif action == "disconnect":
return self._handle_disconnect(channel_name)
else:
return json.dumps({"status": "error", "message": f"unknown action: {action}"})
except Exception as e:
logger.error(f"[WebChannel] Channels POST error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def _handle_save(self, channel_name: str, updates: dict):
ch_def = self.CHANNEL_DEFS[channel_name]
valid_keys = {f["key"] for f in ch_def["fields"]}
secret_keys = {f["key"] for f in ch_def["fields"] if f["type"] == "secret"}
local_config = conf()
applied = {}
for key, value in updates.items():
if key not in valid_keys:
continue
if key in secret_keys:
if not value or (len(value) > 8 and "*" * 4 in value):
continue
field_def = next((f for f in ch_def["fields"] if f["key"] == key), None)
if field_def:
if field_def["type"] == "number":
value = int(value)
elif field_def["type"] == "bool":
value = bool(value)
local_config[key] = value
applied[key] = value
if not applied:
return json.dumps({"status": "error", "message": "no valid fields to update"})
config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))), "config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
file_cfg = json.load(f)
else:
file_cfg = {}
file_cfg.update(applied)
with open(config_path, "w", encoding="utf-8") as f:
json.dump(file_cfg, f, indent=4, ensure_ascii=False)
logger.info(f"[WebChannel] Channel '{channel_name}' config updated: {list(applied.keys())}")
should_restart = False
active_channels = self._active_channel_set()
if channel_name in active_channels:
should_restart = True
try:
import sys
app_module = sys.modules.get('__main__') or sys.modules.get('app')
mgr = getattr(app_module, '_channel_mgr', None) if app_module else None
if mgr:
threading.Thread(
target=mgr.restart,
args=(channel_name,),
daemon=True,
).start()
logger.info(f"[WebChannel] Channel '{channel_name}' restart triggered")
except Exception as e:
logger.warning(f"[WebChannel] Failed to restart channel '{channel_name}': {e}")
return json.dumps({
"status": "success",
"applied": list(applied.keys()),
"restarted": should_restart,
}, ensure_ascii=False)
def _handle_connect(self, channel_name: str, updates: dict):
"""Save config fields, add channel to channel_type, and start it."""
ch_def = self.CHANNEL_DEFS[channel_name]
valid_keys = {f["key"] for f in ch_def["fields"]}
secret_keys = {f["key"] for f in ch_def["fields"] if f["type"] == "secret"}
# Feishu connected via web console must use websocket (long connection) mode
if channel_name == "feishu":
updates.setdefault("feishu_event_mode", "websocket")
valid_keys.add("feishu_event_mode")
local_config = conf()
applied = {}
for key, value in updates.items():
if key not in valid_keys:
continue
if key in secret_keys:
if not value or (len(value) > 8 and "*" * 4 in value):
continue
field_def = next((f for f in ch_def["fields"] if f["key"] == key), None)
if field_def:
if field_def["type"] == "number":
value = int(value)
elif field_def["type"] == "bool":
value = bool(value)
local_config[key] = value
applied[key] = value
existing = self._parse_channel_list(conf().get("channel_type", ""))
if channel_name not in existing:
existing.append(channel_name)
new_channel_type = ",".join(existing)
local_config["channel_type"] = new_channel_type
config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))), "config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
file_cfg = json.load(f)
else:
file_cfg = {}
file_cfg.update(applied)
file_cfg["channel_type"] = new_channel_type
with open(config_path, "w", encoding="utf-8") as f:
json.dump(file_cfg, f, indent=4, ensure_ascii=False)
logger.info(f"[WebChannel] Channel '{channel_name}' connecting, channel_type={new_channel_type}")
def _do_start():
try:
import sys
app_module = sys.modules.get('__main__') or sys.modules.get('app')
clear_fn = getattr(app_module, '_clear_singleton_cache', None) if app_module else None
mgr = getattr(app_module, '_channel_mgr', None) if app_module else None
if mgr is None:
logger.warning(f"[WebChannel] ChannelManager not available, cannot start '{channel_name}'")
return
# Stop existing instance first if still running (e.g. re-connect without disconnect)
existing_ch = mgr.get_channel(channel_name)
if existing_ch is not None:
logger.info(f"[WebChannel] Stopping existing '{channel_name}' before reconnect...")
mgr.stop(channel_name)
# Always wait for the remote service to release the old connection before
# establishing a new one (DingTalk drops callbacks on duplicate connections)
logger.info(f"[WebChannel] Waiting for '{channel_name}' old connection to close...")
time.sleep(5)
if clear_fn:
clear_fn(channel_name)
logger.info(f"[WebChannel] Starting channel '{channel_name}'...")
mgr.start([channel_name], first_start=False)
logger.info(f"[WebChannel] Channel '{channel_name}' start completed")
except Exception as e:
logger.error(f"[WebChannel] Failed to start channel '{channel_name}': {e}",
exc_info=True)
threading.Thread(target=_do_start, daemon=True).start()
return json.dumps({
"status": "success",
"channel_type": new_channel_type,
}, ensure_ascii=False)
def _handle_disconnect(self, channel_name: str):
existing = self._parse_channel_list(conf().get("channel_type", ""))
existing = [ch for ch in existing if ch != channel_name]
new_channel_type = ",".join(existing)
local_config = conf()
local_config["channel_type"] = new_channel_type
config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(
os.path.abspath(__file__)))), "config.json")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
file_cfg = json.load(f)
else:
file_cfg = {}
file_cfg["channel_type"] = new_channel_type
with open(config_path, "w", encoding="utf-8") as f:
json.dump(file_cfg, f, indent=4, ensure_ascii=False)
def _do_stop():
try:
import sys
app_module = sys.modules.get('__main__') or sys.modules.get('app')
mgr = getattr(app_module, '_channel_mgr', None) if app_module else None
clear_fn = getattr(app_module, '_clear_singleton_cache', None) if app_module else None
if mgr:
mgr.stop(channel_name)
else:
logger.warning(f"[WebChannel] ChannelManager not found, cannot stop '{channel_name}'")
if clear_fn:
clear_fn(channel_name)
logger.info(f"[WebChannel] Channel '{channel_name}' disconnected, "
f"channel_type={new_channel_type}")
except Exception as e:
logger.warning(f"[WebChannel] Failed to stop channel '{channel_name}': {e}",
exc_info=True)
threading.Thread(target=_do_stop, daemon=True).start()
return json.dumps({
"status": "success",
"channel_type": new_channel_type,
}, ensure_ascii=False)
class WeixinQrHandler:
"""Handle WeChat QR code login from the web console.
GET /api/weixin/qrlogin → fetch a new QR code
POST /api/weixin/qrlogin → poll QR status or start channel after login
"""
_qr_state = {}
@staticmethod
def _qr_to_data_uri(data: str) -> str:
"""Generate a QR code as a PNG data URI."""
try:
import qrcode as qr_lib
import io
import base64
qr = qr_lib.QRCode(error_correction=qr_lib.constants.ERROR_CORRECT_L, box_size=6, border=2)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buf = io.BytesIO()
img.save(buf, format="PNG")
b64 = base64.b64encode(buf.getvalue()).decode("ascii")
return f"data:image/png;base64,{b64}"
except ImportError:
return ""
@staticmethod
def _get_running_channel():
try:
import sys
app_module = sys.modules.get('__main__') or sys.modules.get('app')
mgr = getattr(app_module, '_channel_mgr', None) if app_module else None
if mgr:
return mgr.get_channel("weixin")
except Exception:
pass
return None
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
running_ch = self._get_running_channel()
if running_ch and hasattr(running_ch, '_current_qr_url') and running_ch._current_qr_url:
qr_image = self._qr_to_data_uri(running_ch._current_qr_url)
return json.dumps({
"status": "success",
"qrcode_url": running_ch._current_qr_url,
"qr_image": qr_image,
"source": "channel",
})
from channel.weixin.weixin_api import WeixinApi, DEFAULT_BASE_URL
base_url = conf().get("weixin_base_url", DEFAULT_BASE_URL)
api = WeixinApi(base_url=base_url)
qr_resp = api.fetch_qr_code()
qrcode = qr_resp.get("qrcode", "")
qrcode_url = qr_resp.get("qrcode_img_content", "")
if not qrcode:
return json.dumps({"status": "error", "message": "No QR code returned"})
qr_image = self._qr_to_data_uri(qrcode_url)
WeixinQrHandler._qr_state = {
"qrcode": qrcode,
"qrcode_url": qrcode_url,
"base_url": base_url,
}
return json.dumps({"status": "success", "qrcode_url": qrcode_url, "qr_image": qr_image})
except Exception as e:
logger.error(f"[WebChannel] WeixinQr GET error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def POST(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
body = json.loads(web.data())
action = body.get("action", "poll")
if action == "poll":
return self._poll_status()
elif action == "refresh":
return self.GET()
else:
return json.dumps({"status": "error", "message": f"unknown action: {action}"})
except Exception as e:
logger.error(f"[WebChannel] WeixinQr POST error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def _poll_status(self):
state = WeixinQrHandler._qr_state
qrcode = state.get("qrcode", "")
base_url = state.get("base_url", "")
if not qrcode:
return json.dumps({"status": "error", "message": "No active QR session"})
from channel.weixin.weixin_api import WeixinApi, DEFAULT_BASE_URL
api = WeixinApi(base_url=base_url or DEFAULT_BASE_URL)
try:
status_resp = api.poll_qr_status(qrcode, timeout=10)
except Exception as e:
return json.dumps({"status": "error", "message": str(e)})
qr_status = status_resp.get("status", "wait")
if qr_status == "confirmed":
bot_token = status_resp.get("bot_token", "")
bot_id = status_resp.get("ilink_bot_id", "")
result_base_url = status_resp.get("baseurl", base_url)
user_id = status_resp.get("ilink_user_id", "")
if not bot_token or not bot_id:
return json.dumps({"status": "error", "message": "Login confirmed but missing token"})
cred_path = os.path.expanduser(
conf().get("weixin_credentials_path", "~/.weixin_cow_credentials.json")
)
from channel.weixin.weixin_channel import _save_credentials
_save_credentials(cred_path, {
"token": bot_token,
"base_url": result_base_url,
"bot_id": bot_id,
"user_id": user_id,
})
conf()["weixin_token"] = bot_token
conf()["weixin_base_url"] = result_base_url
WeixinQrHandler._qr_state = {}
logger.info(f"[WebChannel] WeChat QR login confirmed: bot_id={bot_id}")
return json.dumps({
"status": "success",
"qr_status": "confirmed",
"bot_id": bot_id,
})
if qr_status == "expired":
new_resp = api.fetch_qr_code()
new_qrcode = new_resp.get("qrcode", "")
new_qrcode_url = new_resp.get("qrcode_img_content", "")
new_qr_image = self._qr_to_data_uri(new_qrcode_url)
WeixinQrHandler._qr_state["qrcode"] = new_qrcode
WeixinQrHandler._qr_state["qrcode_url"] = new_qrcode_url
return json.dumps({
"status": "success",
"qr_status": "expired",
"qrcode_url": new_qrcode_url,
"qr_image": new_qr_image,
})
return json.dumps({"status": "success", "qr_status": qr_status})
class FeishuRegisterHandler:
"""飞书智能体应用一键创建OAuth 设备授权流,基于 lark.register_app SDK
GET /api/feishu/register → 启动注册:调用 SDK 生成二维码 URL立即返回
后台线程继续轮询飞书侧直到用户扫码授权。
POST /api/feishu/register → 轮询当前会话状态pending / done / error / expired
注册成功后不直接写 config由前端再调
/api/channels {action:'connect'} 走标准启用流程。
"""
# 进程内单例状态({url, expire_in, status, app_id, app_secret, error, thread})。
# 简单的本地自部署场景下不需要 session 隔离。
_state = {}
_lock = threading.Lock()
@staticmethod
def _qr_to_data_uri(data: str) -> str:
"""复用 WeixinQrHandler 的二维码渲染。"""
return WeixinQrHandler._qr_to_data_uri(data)
@classmethod
def _reset_state(cls):
with cls._lock:
cls._state = {}
@classmethod
def _start_register_thread(cls):
"""启动一次新的注册会话。如已有进行中的会话,先取消(通过 cancel_event"""
# 先取消可能存在的上一次会话,避免两个 SDK 线程并发 poll 同一个端点
with cls._lock:
old_cancel = cls._state.get("cancel_event") if cls._state else None
if old_cancel is not None:
old_cancel.set()
cancel_event = threading.Event()
cls._state = {"status": "starting", "cancel_event": cancel_event}
def _worker():
try:
import lark_oapi as lark
except ImportError:
with cls._lock:
cls._state["status"] = "error"
cls._state["error"] = "lark-oapi SDK 未安装,请执行 pip install -U lark-oapi"
return
def _on_qr(info):
# SDK 拿到二维码 URL 后立即回调;写入 state 让前端 GET 立刻能拿到
with cls._lock:
cls._state["url"] = info.get("url", "")
cls._state["expire_in"] = info.get("expire_in", 600)
cls._state["qr_image"] = cls._qr_to_data_uri(info.get("url", ""))
cls._state["status"] = "pending"
logger.info(f"[FeishuRegister] QR ready, expire_in={info.get('expire_in')}s")
def _on_status(info):
# 过滤掉 polling 心跳(每 5 秒一次,纯噪音);
# 保留 slow_down / domain_switched 等真正的状态切换事件
status = info.get("status")
if status == "polling":
return
logger.info(f"[FeishuRegister] SDK status: {info}")
try:
result = lark.register_app(
on_qr_code=_on_qr,
on_status_change=_on_status,
source="cowagent",
cancel_event=cancel_event,
)
with cls._lock:
cls._state["status"] = "done"
cls._state["app_id"] = result.get("client_id", "")
cls._state["app_secret"] = result.get("client_secret", "")
logger.info(f"[FeishuRegister] App created: app_id={result.get('client_id')}")
except Exception as e:
err_msg = str(e)
err_cls = e.__class__.__name__
# 飞书 SDK 抛出的 AppExpiredError / AppAccessDeniedError / RegisterAppError
if "Expired" in err_cls:
status = "expired"
elif "Denied" in err_cls:
status = "denied"
elif "abort" in err_msg.lower() or "cancel" in err_msg.lower():
# 被新一轮注册抢占,保持安静
return
else:
status = "error"
with cls._lock:
# 仅当当前 state 仍属于本次 worker 时才写入,避免覆盖更新的会话
if cls._state.get("cancel_event") is cancel_event:
cls._state["status"] = status
cls._state["error"] = err_msg
logger.warning(f"[FeishuRegister] Register failed ({err_cls}): {err_msg}")
threading.Thread(target=_worker, daemon=True, name="feishu-register").start()
def GET(self):
"""启动一次新的注册会话。如果已有 pending/done 会话则覆盖。"""
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
self._start_register_thread()
# 等待 SDK 拿到二维码 URL最多 10s。SDK 内部会马上回调 _on_qr。
import time as _t
for _ in range(100):
with self._lock:
if self._state.get("url") or self._state.get("status") in ("error", "expired", "denied"):
break
_t.sleep(0.1)
with self._lock:
if self._state.get("status") in ("error", "expired", "denied"):
return json.dumps({
"status": "error",
"message": self._state.get("error", "register failed"),
})
if not self._state.get("url"):
return json.dumps({
"status": "error",
"message": "等待飞书二维码超时,请重试",
})
return json.dumps({
"status": "success",
"qrcode_url": self._state["url"],
"qr_image": self._state.get("qr_image", ""),
"expire_in": self._state.get("expire_in", 600),
})
except Exception as e:
logger.error(f"[WebChannel] FeishuRegister GET error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def POST(self):
"""轮询注册结果。"""
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
body = json.loads(web.data() or b"{}")
action = body.get("action", "poll")
if action != "poll":
return json.dumps({"status": "error", "message": f"unknown action: {action}"})
with self._lock:
status = self._state.get("status", "idle")
if status == "done":
payload = {
"status": "success",
"register_status": "done",
"app_id": self._state.get("app_id", ""),
"app_secret": self._state.get("app_secret", ""),
}
# 一次性返回凭据后清掉,避免敏感信息长期驻留内存
self._state = {}
return json.dumps(payload)
if status in ("error", "expired", "denied"):
return json.dumps({
"status": "success",
"register_status": status,
"message": self._state.get("error", ""),
})
# pending / starting还在等用户扫码
return json.dumps({
"status": "success",
"register_status": "pending",
})
except Exception as e:
logger.error(f"[WebChannel] FeishuRegister POST error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def _get_workspace_root():
"""Resolve the agent workspace directory."""
from common.utils import expand_path
return expand_path(conf().get("agent_workspace", "~/cow"))
class ToolsHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.tools.tool_manager import ToolManager
tm = ToolManager()
if not tm.tool_classes:
tm.load_tools()
tools = []
for name, cls in tm.tool_classes.items():
try:
instance = cls()
tools.append({
"name": name,
"description": instance.description,
})
except Exception:
tools.append({"name": name, "description": ""})
return json.dumps({"status": "success", "tools": tools}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Tools API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class SkillsHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.skills.service import SkillService
from agent.skills.manager import SkillManager
workspace_root = _get_workspace_root()
manager = SkillManager(custom_dir=os.path.join(workspace_root, "skills"))
service = SkillService(manager)
skills = service.query()
return json.dumps({"status": "success", "skills": skills}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Skills API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def POST(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.skills.service import SkillService
from agent.skills.manager import SkillManager
body = json.loads(web.data())
action = body.get("action")
name = body.get("name")
if not action or not name:
return json.dumps({"status": "error", "message": "action and name are required"})
workspace_root = _get_workspace_root()
manager = SkillManager(custom_dir=os.path.join(workspace_root, "skills"))
service = SkillService(manager)
if action == "open":
service.open({"name": name})
elif action == "close":
service.close({"name": name})
else:
return json.dumps({"status": "error", "message": f"unknown action: {action}"})
return json.dumps({"status": "success"}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Skills POST error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class MemoryHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.memory.service import MemoryService
params = web.input(page='1', page_size='20', category='memory')
workspace_root = _get_workspace_root()
service = MemoryService(workspace_root)
result = service.list_files(
page=int(params.page), page_size=int(params.page_size),
category=params.category,
)
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Memory API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class MemoryContentHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.memory.service import MemoryService
params = web.input(filename='', category='memory')
if not params.filename:
return json.dumps({"status": "error", "message": "filename required"})
workspace_root = _get_workspace_root()
service = MemoryService(workspace_root)
result = service.get_content(params.filename, category=params.category)
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except ValueError:
return json.dumps({"status": "error", "message": "invalid filename"})
except FileNotFoundError:
return json.dumps({"status": "error", "message": "file not found"})
except Exception as e:
logger.error(f"[WebChannel] Memory content API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class SchedulerHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.tools.scheduler.task_store import TaskStore
workspace_root = _get_workspace_root()
store_path = os.path.join(workspace_root, "scheduler", "tasks.json")
store = TaskStore(store_path)
tasks = store.list_tasks()
return json.dumps({"status": "success", "tasks": tasks}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Scheduler API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class SessionsHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
params = web.input(page='1', page_size='50')
from agent.memory import get_conversation_store
store = get_conversation_store()
result = store.list_sessions(
channel_type="web",
page=int(params.page),
page_size=int(params.page_size),
)
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Sessions API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class SessionDetailHandler:
def DELETE(self, session_id: str):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
logger.info(f"[WebChannel] DELETE session request: {session_id}")
try:
if not session_id:
return json.dumps({"status": "error", "message": "session_id required"})
from agent.memory import get_conversation_store
store = get_conversation_store()
store.clear_session(session_id)
# Also remove the Agent instance from AgentBridge if exists
try:
from bridge.bridge import Bridge
ab = Bridge().get_agent_bridge()
if session_id in ab.agents:
del ab.agents[session_id]
logger.info(f"[WebChannel] Removed agent instance for session {session_id}")
except Exception:
pass
channel = WebChannel()
channel.session_queues.pop(session_id, None)
logger.info(f"[WebChannel] Session deleted: {session_id}")
return json.dumps({"status": "success"})
except Exception as e:
logger.error(f"[WebChannel] Session delete error: {e}")
return json.dumps({"status": "error", "message": str(e)})
def PUT(self, session_id: str):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
if not session_id:
return json.dumps({"status": "error", "message": "session_id required"})
body = json.loads(web.data())
title = body.get("title", "").strip()
if not title:
return json.dumps({"status": "error", "message": "title required"})
from agent.memory import get_conversation_store
store = get_conversation_store()
found = store.rename_session(session_id, title)
if not found:
return json.dumps({"status": "error", "message": "session not found"})
return json.dumps({"status": "success"})
except Exception as e:
logger.error(f"[WebChannel] Session rename error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class SessionTitleHandler:
def POST(self, session_id: str):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
if not session_id:
return json.dumps({"status": "error", "message": "session_id required"})
body = json.loads(web.data())
user_message = body.get("user_message", "")
assistant_reply = body.get("assistant_reply", "")
if not user_message:
return json.dumps({"status": "error", "message": "user_message required"})
title = _generate_session_title(user_message, assistant_reply)
from agent.memory import get_conversation_store
store = get_conversation_store()
updated = store.rename_session(session_id, title)
logger.info(f"[WebChannel] Session title set: sid={session_id}, title='{title}', db_updated={updated}")
return json.dumps({"status": "success", "title": title}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Title generation error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class SessionClearContextHandler:
def POST(self, session_id: str):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
if not session_id:
return json.dumps({"status": "error", "message": "session_id required"})
from agent.memory import get_conversation_store
store = get_conversation_store()
new_seq = store.clear_context(session_id)
# Delete the agent instance so a fresh one is created on the next message
try:
from bridge.bridge import Bridge
bridge = Bridge()
ab = bridge.get_agent_bridge()
if session_id in ab.agents:
del ab.agents[session_id]
logger.info(f"[WebChannel] Cleared agent instance for session {session_id}")
except Exception:
pass
return json.dumps({"status": "success", "context_start_seq": new_seq})
except Exception as e:
logger.error(f"[WebChannel] Clear context error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class HistoryHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
web.header('Access-Control-Allow-Origin', '*')
try:
params = web.input(session_id='', page='1', page_size='20')
session_id = params.session_id.strip()
if not session_id:
return json.dumps({"status": "error", "message": "session_id required"})
from agent.memory import get_conversation_store
store = get_conversation_store()
result = store.load_history_page(
session_id=session_id,
page=int(params.page),
page_size=int(params.page_size),
)
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] History API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class LogsHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'text/event-stream; charset=utf-8')
web.header('Cache-Control', 'no-cache')
web.header('X-Accel-Buffering', 'no')
from config import get_root
log_path = os.path.join(get_root(), "run.log")
def generate():
if not os.path.isfile(log_path):
yield b"data: {\"type\": \"error\", \"message\": \"run.log not found\"}\n\n"
return
# Read last 200 lines for initial display
try:
with open(log_path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
tail_lines = lines[-200:]
chunk = ''.join(tail_lines)
payload = json.dumps({"type": "init", "content": chunk}, ensure_ascii=False)
yield f"data: {payload}\n\n".encode('utf-8')
except Exception as e:
yield f"data: {{\"type\": \"error\", \"message\": \"{e}\"}}\n\n".encode('utf-8')
return
# Tail new lines
try:
with open(log_path, 'r', encoding='utf-8', errors='replace') as f:
f.seek(0, 2) # seek to end
deadline = time.time() + 600 # 10 min max
while time.time() < deadline:
line = f.readline()
if line:
payload = json.dumps({"type": "line", "content": line}, ensure_ascii=False)
yield f"data: {payload}\n\n".encode('utf-8')
else:
yield b": keepalive\n\n"
time.sleep(1)
except GeneratorExit:
return
except Exception:
return
return generate()
class AssetsHandler:
def GET(self, file_path): # 修改默认参数
try:
# 如果请求是/static/,需要处理
if file_path == '':
# 返回目录列表...
pass
# 获取当前文件的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
static_dir = os.path.join(current_dir, 'static')
full_path = os.path.normpath(os.path.join(static_dir, file_path))
# 安全检查确保请求的文件在static目录内
if not os.path.abspath(full_path).startswith(os.path.abspath(static_dir)):
logger.error(f"Security check failed for path: {full_path}")
raise web.notfound()
if not os.path.exists(full_path) or not os.path.isfile(full_path):
# Browsers routinely probe optional asset variants (e.g. a
# .ttf fallback declared alongside .woff2 in @font-face);
# logging these as errors floods the console with harmless
# noise. Keep it at debug level — real misconfigurations
# will still surface via the network panel.
logger.debug(f"Static file not found: {full_path}")
raise web.notfound()
# 设置正确的Content-Type
content_type = mimetypes.guess_type(full_path)[0]
if content_type:
web.header('Content-Type', content_type)
else:
# 默认为二进制流
web.header('Content-Type', 'application/octet-stream')
# 读取并返回文件内容
with open(full_path, 'rb') as f:
return f.read()
except web.HTTPError:
# The 404 path above already logged at debug; re-raise as-is so
# web.py returns the original status to the client.
raise
except Exception as e:
logger.error(f"Error serving static file: {e}", exc_info=True)
raise web.notfound()
class KnowledgeListHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.knowledge.service import KnowledgeService
svc = KnowledgeService(_get_workspace_root())
result = svc.list_tree()
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Knowledge list error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class KnowledgeReadHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.knowledge.service import KnowledgeService
params = web.input(path='')
svc = KnowledgeService(_get_workspace_root())
result = svc.read_file(params.path)
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except (ValueError, FileNotFoundError) as e:
return json.dumps({"status": "error", "message": str(e)})
except Exception as e:
logger.error(f"[WebChannel] Knowledge read error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class KnowledgeGraphHandler:
def GET(self):
_require_auth()
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.knowledge.service import KnowledgeService
svc = KnowledgeService(_get_workspace_root())
return json.dumps(svc.build_graph(), ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Knowledge graph error: {e}")
return json.dumps({"nodes": [], "links": []})
class VersionHandler:
def GET(self):
web.header('Content-Type', 'application/json; charset=utf-8')
from cli import __version__
return json.dumps({"version": __version__})