diff --git a/.gitignore b/.gitignore index 44fc64fc..0612e1e3 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,7 @@ plugins/banwords/lib/__pycache__ !plugins/cow_cli client_config.json ref/ +**/.dev.vars .cursor/ local/ node_modules/ diff --git a/agent/chat/service.py b/agent/chat/service.py index 1afe1691..58931b00 100644 --- a/agent/chat/service.py +++ b/agent/chat/service.py @@ -75,6 +75,26 @@ class ChatService: # a new segment; collect tool results until turn_end. state.pending_tool_results = [] + elif event_type == "file_to_send": + # Cloud CHAT stream: local paths are useless to the web UI; push a markdown link when we have a public URL. + url = data.get("url") or "" + if url: + msg = (data.get("message") or "").strip() + fname = data.get("file_name") or "file" + ft = data.get("file_type") or "file" + parts = [] + if msg: + parts.append(f"{msg}\n\n") + if ft == "image": + parts.append(f"![{fname}]({url})") + else: + parts.append(f"[{fname}]({url})") + send_chunk_fn({ + "chunk_type": "content", + "delta": "\n\n" + "".join(parts) + "\n\n", + "segment_id": state.segment_id, + }) + elif event_type == "tool_execution_start": # Notify the client that a tool is about to run (with its input args) tool_name = data.get("tool_name", "") diff --git a/agent/tools/send/send.py b/agent/tools/send/send.py index 4adcc743..2a130ab0 100644 --- a/agent/tools/send/send.py +++ b/agent/tools/send/send.py @@ -98,7 +98,18 @@ class Send(BaseTool): "size_formatted": self._format_size(file_size), "message": message or f"正在发送 {file_name}" } - + + try: + from common.cloud_client import get_website_base_url, copy_send_file + + # Do nothing when in local env + if get_website_base_url(): + url = copy_send_file(absolute_path, self.cwd) + if url: + result["url"] = url + except Exception: + pass + return ToolResult.success(result) def _resolve_path(self, path: str) -> str: diff --git a/channel/web/static/js/console.js b/channel/web/static/js/console.js index f2c3ad29..ae87c094 100644 --- a/channel/web/static/js/console.js +++ b/channel/web/static/js/console.js @@ -891,6 +891,15 @@ function startSSE(requestId, loadingEl, timestamp) { mediaEl.appendChild(fileEl); scrollChatToBottom(); + } else if (item.type === 'phase') { + // Coarse progress (e.g. cow install-browser); must not close SSE (unlike "done") + ensureBotEl(); + const wrap = document.createElement('div'); + wrap.className = 'text-xs sm:text-sm text-slate-600 dark:text-slate-400 border-l-2 border-primary-400 pl-2 py-1 my-0.5'; + wrap.textContent = String(item.content || ''); + stepsEl.appendChild(wrap); + scrollChatToBottom(); + } else if (item.type === 'done') { es.close(); delete activeStreams[requestId]; diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 5dd57ab9..c19fdd63 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -96,10 +96,22 @@ class WebChannel(ChatChannel): logger.error(f"No session_id found for request {request_id}") return - # SSE mode: push done event to SSE queue + # SSE mode: push events to SSE queue if request_id in self.sse_queues: content = reply.content if reply.content is not None else "" + # Intermediate status lines (e.g. /install-browser phases) must NOT use "done", + # or the frontend closes EventSource and drops subsequent events. + if getattr(reply, "sse_phase", False): + self.sse_queues[request_id].put({ + "type": "phase", + "content": content, + "request_id": request_id, + "timestamp": time.time(), + }) + logger.debug(f"SSE phase for request {request_id}") + return + # Files are already pushed via on_event (file_to_send) during agent execution. # Skip duplicate file pushes here; just let the done event through. if reply.type in (ReplyType.IMAGE_URL, ReplyType.FILE) and content.startswith("file://"): diff --git a/cli/commands/install.py b/cli/commands/install.py index 8a6ab9ed..addec52c 100644 --- a/cli/commands/install.py +++ b/cli/commands/install.py @@ -3,6 +3,7 @@ import os import sys import subprocess +from typing import Callable, Optional import click @@ -11,6 +12,16 @@ PLAYWRIGHT_LEGACY_VERSION = "1.28.0" GLIBC_THRESHOLD = (2, 28) CHINA_MIRROR = "https://registry.npmmirror.com/-/binary/playwright" +# stream(msg, fg=None) — fg is "yellow" | "green" | "red" | None +StreamFn = Callable[[str, Optional[str]], None] +# on_phase(msg) — coarse-grained progress for chat channels (Chinese) +PhaseFn = Callable[[str], None] + + +def _phase(cb: Optional[PhaseFn], msg: str) -> None: + if cb: + cb(msg) + def _has_display() -> bool: """Check if a graphical display is available (Linux only).""" @@ -66,134 +77,183 @@ def _is_china_network() -> bool: return False -def _pip_install(package_spec: str) -> int: +def _pip_install(package_spec: str, stream: StreamFn) -> int: """Install a package, retrying with --user on permission failure.""" python = sys.executable ret = subprocess.call([python, "-m", "pip", "install", package_spec]) if ret != 0: - click.echo(" Retrying with --user flag...") + stream(" Retrying with --user flag...", "yellow") ret = subprocess.call([python, "-m", "pip", "install", "--user", package_spec]) return ret -@click.command("install-browser") -def install_browser(): - """Install browser tool dependencies (Playwright + Chromium).""" +def _default_stream(msg: str, fg: Optional[str] = None) -> None: + """CLI: colored click output.""" + if fg == "yellow": + click.echo(click.style(msg, fg="yellow")) + elif fg == "green": + click.echo(click.style(msg, fg="green")) + elif fg == "red": + click.echo(click.style(msg, fg="red")) + else: + click.echo(msg) + + +def run_install_browser( + stream: Optional[StreamFn] = None, + on_phase: Optional[PhaseFn] = None, +) -> int: + """ + Install Playwright Python package, optional Linux deps, and Chromium. + + Reused by ``cow install-browser`` CLI and chat ``/install-browser``. + + Args: + stream: Optional callback ``(message, fg)`` for each line. ``fg`` is + ``yellow`` / ``green`` / ``red`` or None. Defaults to colored click output. + on_phase: Optional callback for coarse progress (e.g. push to chat); + messages are short Chinese status lines. + + Returns: + 0 on success, 1 on fatal failure (pip or chromium install failed). + """ + stream = stream or _default_stream python = sys.executable legacy_mode = False - # Determine playwright version based on glibc + _phase(on_phase, "🔧 开始安装浏览器工具依赖(约几分钟,请耐心等待)…") + glibc = _get_glibc_version() if glibc and glibc < GLIBC_THRESHOLD: legacy_mode = True glibc_str = f"{glibc[0]}.{glibc[1]}" - click.echo(click.style( + stream( f"glibc {glibc_str} detected (< 2.28). " f"Will install playwright {PLAYWRIGHT_LEGACY_VERSION} for compatibility.", - fg="yellow", - )) - click.echo(click.style( - " Note: upgrade your OS for full browser tool support.", - fg="yellow", - )) - click.echo() + "yellow", + ) + stream(" Note: upgrade your OS for full browser tool support.", "yellow") + stream("") + _phase( + on_phase, + f"ℹ️ 检测到 glibc {glibc_str}(较旧),将安装兼容版 Playwright {PLAYWRIGHT_LEGACY_VERSION}。", + ) target_version = PLAYWRIGHT_LEGACY_VERSION if legacy_mode else PLAYWRIGHT_VERSION - # Step 1: Install playwright package - click.echo(click.style("[1/3] Installing playwright Python package...", fg="yellow")) - ret = _pip_install(f"playwright=={target_version}") + _phase(on_phase, "📦 [1/3] 正在安装 Playwright Python 包…") + stream("[1/3] Installing playwright Python package...", "yellow") + ret = _pip_install(f"playwright=={target_version}", stream) if ret != 0: - click.echo(click.style("Failed to install playwright package.", fg="red")) - raise SystemExit(1) + stream("Failed to install playwright package.", "red") + _phase(on_phase, "❌ [1/3] Playwright Python 包安装失败。") + return 1 installed = _get_installed_version() if installed: - click.echo(click.style(f" playwright {installed} installed.", fg="green")) - click.echo() + stream(f" playwright {installed} installed.", "green") + stream("") + _phase(on_phase, f"✅ [1/3] Playwright 包已安装({installed or target_version})。") - # Step 2: System dependencies (Linux only) if sys.platform == "linux": - click.echo(click.style("[2/3] Installing system dependencies (Linux)...", fg="yellow")) + _phase(on_phase, "🔧 [2/3] 正在安装 Linux 系统依赖与轻量中文字体(文泉驿正黑,部分步骤可能需要 sudo)…") + stream("[2/3] Installing system dependencies (Linux)...", "yellow") ret = subprocess.call([python, "-m", "playwright", "install-deps", "chromium"]) if ret != 0: - click.echo(click.style( + stream( " Could not auto-install system deps (may need sudo).\n" f" Run manually: sudo {python} -m playwright install-deps chromium", - fg="yellow", - )) - # Install CJK fonts for proper Chinese/Japanese/Korean rendering in screenshots - click.echo(" Installing CJK fonts...") + "yellow", + ) + # Prefer fonts-wqy-zenhei only (~few MB). fonts-noto-cjk is much larger (~150MB+). + stream(" Installing CJK font (fonts-wqy-zenhei, lightweight)...") font_ret = subprocess.call( - ["sudo", "apt-get", "install", "-y", "fonts-noto-cjk", "fonts-wqy-zenhei"], + ["sudo", "apt-get", "install", "-y", "--no-install-recommends", "fonts-wqy-zenhei"], stderr=subprocess.DEVNULL, ) if font_ret != 0: - click.echo(click.style( - " Could not auto-install CJK fonts.\n" - " Run manually: sudo apt-get install -y fonts-noto-cjk fonts-wqy-zenhei", - fg="yellow", - )) + stream( + " Could not auto-install CJK font.\n" + " Run manually: sudo apt-get install -y fonts-wqy-zenhei\n" + " (Optional, larger full coverage: sudo apt-get install -y fonts-noto-cjk)", + "yellow", + ) else: subprocess.call(["fc-cache", "-fv"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - click.echo(click.style(" CJK fonts installed.", fg="green")) + stream(" CJK font (wqy-zenhei) installed.", "green") + _phase( + on_phase, + "✅ [2/3] Linux 依赖与字体步骤已执行(若有权限问题请查看服务器日志或手动执行提示命令)。", + ) else: - click.echo(click.style(f"[2/3] Skipping system deps (not needed on {sys.platform}).", fg="yellow")) - click.echo() + stream(f"[2/3] Skipping system deps (not needed on {sys.platform}).", "yellow") + _phase(on_phase, f"ℹ️ [2/3] 当前系统({sys.platform})跳过 Linux 专用依赖。") + stream("") - # Step 3: Install Chromium - click.echo(click.style("[3/3] Installing Chromium browser...", fg="yellow")) + _phase(on_phase, "🌐 [3/3] 正在下载并安装 Chromium(体积较大,请耐心等待)…") + stream("[3/3] Installing Chromium browser...", "yellow") cmd = [python, "-m", "playwright", "install", "chromium"] - # --only-shell requires playwright >= 1.57 if _is_headless_linux() and not legacy_mode: ver = _version_tuple(installed or "") if ver >= (1, 57, 0): cmd.append("--only-shell") - click.echo(" (headless shell for Linux server)") + stream(" (headless shell for Linux server)", None) else: - click.echo(" (full Chromium)") + stream(" (full Chromium)", None) elif sys.platform == "linux" and _has_display(): - click.echo(" (full browser for Linux desktop)") + stream(" (full browser for Linux desktop)", None) - # Use China mirror if pip is configured with a domestic index env = os.environ.copy() use_mirror = _is_china_network() if use_mirror: env["PLAYWRIGHT_DOWNLOAD_HOST"] = CHINA_MIRROR - click.echo(f" (using China mirror: {CHINA_MIRROR})") + stream(f" (using China mirror: {CHINA_MIRROR})", None) + _phase(on_phase, "📡 检测到国内 pip 源配置,Chromium 将优先走国内镜像下载。") ret = subprocess.call(cmd, env=env) - # Fallback: if mirror download failed, retry with official CDN if ret != 0 and use_mirror: - click.echo(click.style( - " Mirror download failed, retrying with official CDN...", - fg="yellow", - )) + stream(" Mirror download failed, retrying with official CDN...", "yellow") + _phase(on_phase, "⚠️ 镜像下载失败,正在改用官方源重试…") env_no_mirror = os.environ.copy() env_no_mirror.pop("PLAYWRIGHT_DOWNLOAD_HOST", None) ret = subprocess.call(cmd, env=env_no_mirror) if ret != 0: - click.echo(click.style("Failed to install Chromium.", fg="red")) - raise SystemExit(1) + stream("Failed to install Chromium.", "red") + _phase(on_phase, "❌ [3/3] Chromium 安装失败。") + return 1 - # Quick smoke test - click.echo() - click.echo("Verifying browser installation...") + stream("") + _phase(on_phase, "✅ [3/3] Chromium 已安装。") + + stream("Verifying browser installation...", None) + _phase(on_phase, "🔍 正在验证 Playwright 能否正常加载…") ret = subprocess.call( [python, "-c", "from playwright.sync_api import sync_playwright; print('OK')"], stderr=subprocess.DEVNULL, ) if ret != 0: - click.echo(click.style( + stream( " Warning: playwright import failed. Browser tool may not work on this system.\n" " Consider upgrading your OS or using Docker.", - fg="yellow", - )) + "yellow", + ) + _phase(on_phase, "⚠️ 验证未完全通过:本机可能仍无法使用浏览器工具,请查看日志或升级系统。") else: - click.echo(click.style(" Verification passed.", fg="green")) + stream(" Verification passed.", "green") + _phase(on_phase, "✅ 验证通过。") - click.echo() - click.echo(click.style("Browser tool ready! Restart CowAgent to enable it.", fg="green")) + stream("") + stream("Browser tool ready! Restart CowAgent to enable it.", "green") + _phase(on_phase, "🎉 全部步骤结束。请重启 CowAgent 后使用 browser 工具。") + return 0 + + +@click.command("install-browser") +def install_browser(): + """Install browser tool dependencies (Playwright + Chromium).""" + code = run_install_browser() + if code != 0: + raise SystemExit(code) diff --git a/common/cloud_client.py b/common/cloud_client.py index 62b96536..073e734d 100644 --- a/common/cloud_client.py +++ b/common/cloud_client.py @@ -487,6 +487,19 @@ class CloudClient(LinkAIClient): session_id = f"session_{session_id}" logger.info(f"[CloudClient] on_chat: session={session_id}, channel={channel_type}, query={query[:80]}") + # Intercept cow/slash commands before the agent runs + try: + from plugins import PluginManager + mgr = PluginManager() + plugin = mgr.plugins.get("cow_cli") + if plugin and hasattr(plugin, "execute"): + result = plugin.execute(query, session_id=session_id) + if result is not None: + send_chunk_fn({"chunk_type": "content", "delta": result, "segment_id": 0}) + return + except Exception as e: + logger.warning(f"[CloudClient] cow_cli intercept failed: {e}") + svc = self.chat_service if svc is None: raise RuntimeError("ChatService not available") @@ -629,9 +642,9 @@ def get_deployment_id() -> str: def get_website_base_url() -> str: - """Return the public URL prefix that maps to the workspace websites/ dir. + """Return the URL prefix that maps to the workspace websites/ dir. - Returns empty string when cloud deployment is not configured. + Do nothing when in local env. """ deployment_id = get_deployment_id() if not deployment_id: @@ -648,6 +661,42 @@ def get_website_base_url() -> str: return f"https://app.{domain}/{deployment_id}" +# Subdir under websites/ used by the send tool +COW_SEND_WEB_SUBDIR = "cow-send" + + +def copy_send_file(src_path: str, workspace_root: str) -> str: + """Copy *src_path* into ``websites/cow-send/`` and return its URL. + + Returns empty string in local env. + """ + import shutil + import uuid + + from common.utils import expand_path + + base = get_website_base_url() + if not base or not src_path or not os.path.isfile(src_path): + return "" + ws = os.path.abspath(expand_path(workspace_root)) + send_dir = os.path.join(ws, "websites", COW_SEND_WEB_SUBDIR) + try: + os.makedirs(send_dir, exist_ok=True) + except OSError: + return "" + ext = os.path.splitext(src_path)[1].lower() + if len(ext) > 12 or not ext.replace(".", "").isalnum(): + ext = "" + dest_name = f"{uuid.uuid4().hex}{ext}" + dest_path = os.path.join(send_dir, dest_name) + try: + shutil.copy2(src_path, dest_path) + except OSError as e: + logger.warning(f"[cloud] copy_send_file: copy failed: {e}") + return "" + return f"{base}/{COW_SEND_WEB_SUBDIR}/{dest_name}" + + def build_website_prompt(workspace_dir: str) -> list: """Build system prompt lines for cloud website/file sharing rules. @@ -668,8 +717,8 @@ def build_website_prompt(workspace_dir: str) -> list: f" - 例如: `websites/my-app/index.html` → `{base_url}/my-app/index.html`", "", "2. **生成文件分享** (PPT、PDF、图片、音视频等): 当你为用户生成了需要下载或查看的文件时,**可以**将文件保存到 `websites/` 目录中", - f" - 例如: 生成的PPT保存到 `websites/files/report.pptx` → 下载链接为 `{base_url}/files/report.pptx`", - " - 你仍然可以同时使用 `send` 工具发送文件(在飞书、钉钉等IM渠道中有效),但**必须同时在回复文本中提供下载链接**作为兜底,因为部分渠道(如网页端)无法通过 send 接收本地文件", + f" - 例如: 生成的PPT保存到 `websites/files/report.pptx` → 下载链接为 `{base_url}/files/report.pptx`", + " - 你仍然可以同时使用 `send` 工具发送文件(在微信、飞书、钉钉、web等渠道中有效),但**必须同时在回复文本中提供下载链接**作为兜底,因为部分渠道无法通过 send 接收本地文件", "", "3. **必须发送链接**: 无论是网页还是文件,生成后**必须将完整的访问/下载链接直接写在回复文本中发送给用户**", "", diff --git a/plugins/cow_cli/cow_cli.py b/plugins/cow_cli/cow_cli.py index 97f722d8..39b578d9 100644 --- a/plugins/cow_cli/cow_cli.py +++ b/plugins/cow_cli/cow_cli.py @@ -3,10 +3,11 @@ CowCli plugin - Intercept cow/slash commands in chat messages. Matches messages like: cow skill list - cow context clear + cow install-browser /skill list /context clear /status + /install-browser Does NOT match: cow是什么 @@ -30,6 +31,7 @@ KNOWN_COMMANDS = { "help", "version", "status", "logs", "start", "stop", "restart", "skill", "context", "config", + "install-browser", } # Commands that can only run from the CLI (terminal), not in chat @@ -106,14 +108,27 @@ class CowCliPlugin(Plugin): # Command dispatch # ------------------------------------------------------------------ - def _dispatch(self, cmd: str, args: str, e_context: EventContext) -> str: + def execute(self, query: str, session_id: str = "") -> str: + """Execute a cow/slash command string without a channel context. + + Used by cloud on_chat to intercept commands before the agent runs. + Returns None when *query* is not a recognised command. + """ + parsed = self._parse_command(query.strip()) + if not parsed: + return None + cmd, args = parsed + return self._dispatch(cmd, args, e_context=None, session_id=session_id) + + def _dispatch(self, cmd: str, args: str, e_context: EventContext, session_id: str = "") -> str: if cmd in CLI_ONLY_COMMANDS: return f"⚠️ `cow {cmd}` 只能在命令行终端中执行。\n请在终端运行: cow {cmd}" - handler = getattr(self, f"_cmd_{cmd}", None) + handler_attr = "_cmd_" + cmd.replace("-", "_") + handler = getattr(self, handler_attr, None) if handler: try: - return handler(args, e_context) + return handler(args, e_context, session_id=session_id) except Exception as e: logger.error(f"[CowCli] command '{cmd}' failed: {e}") return f"命令执行失败: {e}" @@ -124,7 +139,7 @@ class CowCliPlugin(Plugin): # help / version # ------------------------------------------------------------------ - def _cmd_help(self, args: str, e_context: EventContext) -> str: + def _cmd_help(self, args: str, e_context, **_) -> str: lines = [ "📋 CowAgent 命令列表", "", @@ -142,19 +157,20 @@ class CowCliPlugin(Plugin): " /config 查看当前配置", " /config 查看某项配置", " /config 修改配置", + " /install-browser 安装浏览器工具依赖", "", "💡 也可以用 cow 代替 /", ] return "\n".join(lines) - def _cmd_version(self, args: str, e_context: EventContext) -> str: + def _cmd_version(self, args: str, e_context, **_) -> str: return f"CowAgent v{__version__}" # ------------------------------------------------------------------ # status # ------------------------------------------------------------------ - def _cmd_status(self, args: str, e_context: EventContext) -> str: + def _cmd_status(self, args: str, e_context: EventContext, session_id: str = "") -> str: from config import conf cfg = conf() @@ -174,7 +190,7 @@ class CowCliPlugin(Plugin): mode = "Agent" if cfg.get("agent") else "Chat" lines.append(f" 模式: {mode}") - session_id = self._get_session_id(e_context) + session_id = self._get_session_id(e_context, fallback=session_id) agent = self._get_agent(session_id) if agent: lines.append("") @@ -199,7 +215,7 @@ class CowCliPlugin(Plugin): # logs # ------------------------------------------------------------------ - def _cmd_logs(self, args: str, e_context: EventContext) -> str: + def _cmd_logs(self, args: str, e_context, **_) -> str: num_lines = 20 if args.strip().isdigit(): num_lines = min(int(args.strip()), 50) @@ -236,8 +252,8 @@ class CowCliPlugin(Plugin): # context # ------------------------------------------------------------------ - def _cmd_context(self, args: str, e_context: EventContext) -> str: - session_id = self._get_session_id(e_context) + def _cmd_context(self, args: str, e_context: EventContext, session_id: str = "") -> str: + session_id = self._get_session_id(e_context, fallback=session_id) agent = self._get_agent(session_id) sub = args.strip().lower() @@ -299,7 +315,7 @@ class CowCliPlugin(Plugin): _CONFIG_READABLE = _CONFIG_WRITABLE | {"channel_type"} - def _cmd_config(self, args: str, e_context: EventContext) -> str: + def _cmd_config(self, args: str, e_context, **_) -> str: from config import conf, load_config import json as _json @@ -418,11 +434,56 @@ class CowCliPlugin(Plugin): return btype return const.OPENAI + # ------------------------------------------------------------------ + # install-browser (shared logic with cow install-browser CLI) + # ------------------------------------------------------------------ + + @staticmethod + def _send_install_progress(e_context, text: str) -> None: + """Push a short status line to the chat channel (SSE: phase event, not done).""" + if e_context is None: + logger.info(f"[CowCli] install-browser: {text}") + return + try: + channel = e_context["channel"] + context = e_context["context"] + if channel and context: + r = Reply(ReplyType.TEXT, text) + r.sse_phase = True + channel.send(r, context) + except Exception as e: + logger.warning(f"[CowCli] install-browser progress send failed: {e}") + + def _cmd_install_browser(self, args: str, e_context, **_) -> str: + from cli.commands.install import run_install_browser + + if args.strip(): + return ( + "用法: /install-browser\n\n" + "无需参数,等同于终端执行 `cow install-browser`。\n" + "安装过程可能持续数分钟;进度会以多条消息推送,pip 详细输出见服务日志。" + ) + + # Suppress detailed stream in chat; phases go through channel.send + def _noop_stream(msg: str, fg=None): + pass + + code = run_install_browser( + stream=_noop_stream, + on_phase=lambda m: self._send_install_progress(e_context, m), + ) + if code != 0: + return ( + "❌ 安装未成功结束,请查看上方分段提示或服务器日志;" + "也可在终端执行 `cow install-browser`。" + ) + return "✅ 安装流程已结束。请重启 CowAgent 后使用 browser 工具(进度见上方消息)。" + # ------------------------------------------------------------------ # skill # ------------------------------------------------------------------ - def _cmd_skill(self, args: str, e_context: EventContext) -> str: + def _cmd_skill(self, args: str, e_context, **_) -> str: parts = args.strip().split(None, 1) sub = parts[0].lower() if parts else "" sub_args = parts[1].strip() if len(parts) > 1 else "" @@ -781,7 +842,9 @@ class CowCliPlugin(Plugin): # Helpers # ------------------------------------------------------------------ - def _get_session_id(self, e_context: EventContext) -> str: + def _get_session_id(self, e_context, fallback: str = "") -> str: + if e_context is None: + return fallback context = e_context["context"] return context.kwargs.get("session_id") or context.get("session_id", "")