From b6571e5cadfe2e95287700dcf6f7d110a3999646 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Mon, 30 Mar 2026 21:39:38 +0800 Subject: [PATCH] fix: browser resource optimization --- agent/tools/browser/browser_service.py | 379 +++++++++++++++++-------- 1 file changed, 253 insertions(+), 126 deletions(-) diff --git a/agent/tools/browser/browser_service.py b/agent/tools/browser/browser_service.py index cc6203a0..cc1b94e9 100644 --- a/agent/tools/browser/browser_service.py +++ b/agent/tools/browser/browser_service.py @@ -1,22 +1,26 @@ """ Browser service - Playwright wrapper managing browser lifecycle and page operations. -Lazily launches a Chromium instance on first use, reuses it across tool calls, -and cleans up on close(). Headless mode is auto-detected based on platform and -display availability. +All Playwright calls run on a dedicated background thread so that callers from +any worker thread can safely use the service. An idle-timeout mechanism +automatically shuts down the browser (and its thread) after a configurable +period of inactivity to free resources. """ -import asyncio import os import sys -import re import uuid +import queue import threading -from typing import Optional, Dict, Any, List +from typing import Optional, Dict, Any, List, Callable from common.log import logger -from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright +try: + from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright + _HAS_PLAYWRIGHT = True +except ImportError: + _HAS_PLAYWRIGHT = False # --------------------------------------------------------------------------- @@ -198,34 +202,105 @@ def _flatten_tree(node, indent=0) -> List[str]: class BrowserService: - """Manages a single Playwright browser instance with page operations.""" + """Manages a Playwright browser on a dedicated background thread. + + All Playwright operations are dispatched to a single long-lived thread via + a task queue. Callers from *any* worker thread can use the public API + safely. An idle timer automatically shuts the browser down after + ``idle_timeout`` seconds of inactivity (default 300 = 5 min). + """ + + _IDLE_TIMEOUT_DEFAULT = 300 # seconds def __init__(self, config: Optional[Dict[str, Any]] = None): self._config = config or {} - self._playwright: Optional[Playwright] = None - self._browser: Optional[Browser] = None - self._context: Optional[BrowserContext] = None - self._page: Optional[Page] = None self._headless: Optional[bool] = None self._screenshot_dir: Optional[str] = None - self._owner_thread: Optional[int] = None + + # Background thread state + self._thread: Optional[threading.Thread] = None + self._task_queue: queue.Queue = queue.Queue() + self._lock = threading.Lock() + self._alive = False + self._ready = threading.Event() + + # Playwright objects (only accessed on the background thread) + self._playwright = None + self._browser = None + self._context = None + self._page = None + + # Idle auto-release + idle_cfg = self._config.get("idle_timeout") + self._idle_timeout: float = float(idle_cfg) if idle_cfg is not None else self._IDLE_TIMEOUT_DEFAULT + self._idle_timer: Optional[threading.Timer] = None # ------------------------------------------------------------------ - # Lifecycle + # Background-thread lifecycle # ------------------------------------------------------------------ - def _ensure_browser(self): - """Lazily launch browser on first use. Re-launch if called from a different thread.""" - current_thread = threading.current_thread().ident + def _start_thread(self): + """Start the dedicated Playwright thread if not already running.""" + with self._lock: + if self._alive and self._thread and self._thread.is_alive(): + return + self._alive = True + # Reuse existing queue so tasks queued during restart are not lost + if self._task_queue is None: + self._task_queue = queue.Queue() + self._ready = threading.Event() + self._thread = threading.Thread(target=self._run_loop, daemon=True, name="BrowserThread") + self._thread.start() + # Block until browser is ready (or failed) + self._ready.wait(timeout=30) - # Playwright sync API is single-threaded; if called from a different thread, restart - if self._owner_thread and self._owner_thread != current_thread and self._playwright: - logger.info("[Browser] Thread changed, restarting browser instance") - self._force_cleanup() - - if self._page and not self._page.is_closed(): + def _run_loop(self): + """Event loop running on the dedicated thread. Processes tasks until stopped.""" + logger.info("[Browser] Background thread started") + try: + self._launch_browser() + except Exception as e: + logger.error(f"[Browser] Failed to launch browser: {e}") + self._alive = False + self._ready.set() + self._drain_queue(RuntimeError(f"Browser launch failed: {e}")) return + self._ready.set() + while self._alive: + try: + task = self._task_queue.get(timeout=1.0) + except queue.Empty: + continue + if task is None: + break + fn, args, kwargs, result_slot = task + try: + result_slot["value"] = fn(*args, **kwargs) + except Exception as e: + result_slot["error"] = e + finally: + result_slot["event"].set() + + self._shutdown_browser() + self._drain_queue(RuntimeError("Browser thread stopped")) + logger.info("[Browser] Background thread exited") + + def _drain_queue(self, error: Exception): + """Unblock all callers waiting on the queue with an error.""" + while True: + try: + task = self._task_queue.get_nowait() + except queue.Empty: + break + if task is None: + continue + _, _, _, result_slot = task + result_slot["error"] = error + result_slot["event"].set() + + def _launch_browser(self): + """Launch Chromium on the background thread.""" if self._headless is None: headless_cfg = self._config.get("headless") self._headless = headless_cfg if headless_cfg is not None else _should_use_headless() @@ -241,9 +316,7 @@ class BrowserService: viewport_w = self._config.get("viewport_width", 1280) viewport_h = self._config.get("viewport_height", 720) - if not self._playwright: - self._playwright = sync_playwright().start() - + self._playwright = sync_playwright().start() logger.info(f"[Browser] Launching Chromium (headless={self._headless})") self._browser = self._playwright.chromium.launch( headless=self._headless, @@ -258,63 +331,20 @@ class BrowserService: ), ) self._page = self._context.new_page() - self._owner_thread = current_thread logger.info("[Browser] Browser ready") - @property - def page(self) -> Page: - self._ensure_browser() - return self._page - - def _force_cleanup(self): - """Force-release resources when the owner thread has already exited. - - Normal close() calls Playwright's sync API which requires the original - thread to still be alive. When the thread is gone we can only kill the - underlying browser process and discard all references. - """ - logger.info("[Browser] Force-cleaning stale browser (owner thread exited)") - try: - if self._browser and self._browser.is_connected(): - pid = self._browser.process and self._browser.process.pid - if pid: - import signal, os as _os - try: - _os.kill(pid, signal.SIGKILL) - logger.debug(f"[Browser] Killed browser process {pid}") - except OSError: - pass - except Exception as e: - logger.debug(f"[Browser] force cleanup browser kill: {e}") - self._page = None - self._context = None - self._browser = None - try: - if self._playwright: - self._playwright.stop() - except Exception: - pass - self._playwright = None - self._owner_thread = None - - def close(self): - """Release all browser resources.""" - try: - loop = asyncio.get_event_loop() - old_handler = loop.get_exception_handler() - loop.set_exception_handler(lambda l, c: None) - except Exception: - loop, old_handler = None, None - try: - if self._context: - self._context.close() - except Exception as e: - logger.debug(f"[Browser] context close error: {e}") - try: - if self._browser: - self._browser.close() - except Exception as e: - logger.debug(f"[Browser] browser close error: {e}") + def _shutdown_browser(self): + """Shut down all Playwright resources on the background thread.""" + self._cancel_idle_timer() + for obj, label in [ + (self._context, "context"), + (self._browser, "browser"), + ]: + try: + if obj: + obj.close() + except Exception as e: + logger.debug(f"[Browser] {label} close error: {e}") try: if self._playwright: self._playwright.stop() @@ -324,21 +354,77 @@ class BrowserService: self._context = None self._browser = None self._playwright = None - self._owner_thread = None - if loop and old_handler: - try: - loop.set_exception_handler(old_handler) - except Exception: - pass logger.info("[Browser] Browser closed") + def _submit(self, fn: Callable, *args, **kwargs): + """Submit *fn* to the background thread and block until it completes.""" + self._start_thread() + + if not self._alive: + raise RuntimeError("Browser is not available") + + self._reset_idle_timer() + + result_slot: Dict[str, Any] = {"event": threading.Event()} + self._task_queue.put((fn, args, kwargs, result_slot)) + + # Timeout prevents permanent hang if the background thread crashes + completed = result_slot["event"].wait(timeout=120) + if not completed: + raise TimeoutError("Browser operation timed out (120s)") + + if "error" in result_slot: + raise result_slot["error"] + return result_slot.get("value") + # ------------------------------------------------------------------ - # Actions + # Idle auto-release + # ------------------------------------------------------------------ + + def _reset_idle_timer(self): + self._cancel_idle_timer() + if self._idle_timeout > 0: + self._idle_timer = threading.Timer(self._idle_timeout, self._on_idle_timeout) + self._idle_timer.daemon = True + self._idle_timer.start() + + def _cancel_idle_timer(self): + if self._idle_timer: + self._idle_timer.cancel() + self._idle_timer = None + + def _on_idle_timeout(self): + logger.info(f"[Browser] Idle for {self._idle_timeout}s, auto-releasing browser") + self.close() + + # ------------------------------------------------------------------ + # Public lifecycle + # ------------------------------------------------------------------ + + def close(self): + """Shut down browser and background thread (safe from any thread).""" + self._cancel_idle_timer() + with self._lock: + if not self._alive: + return + self._alive = False + t = self._thread + if self._task_queue is not None: + self._task_queue.put(None) + if t is not None and t.is_alive(): + t.join(timeout=10) + with self._lock: + self._thread = None + + # ------------------------------------------------------------------ + # Actions (each method is dispatched to the background thread) # ------------------------------------------------------------------ def navigate(self, url: str, timeout: int = 30000) -> Dict[str, Any]: - """Navigate to a URL and wait for the page to be ready.""" - page = self.page + return self._submit(self._do_navigate, url, timeout) + + def _do_navigate(self, url: str, timeout: int) -> Dict[str, Any]: + page = self._page try: resp = page.goto(url, wait_until="domcontentloaded", timeout=timeout) status = resp.status if resp else None @@ -360,20 +446,14 @@ class BrowserService: except Exception: current_url = url - return { - "url": current_url, - "title": title, - "status": status, - } + return {"url": current_url, "title": title, "status": status} def snapshot(self, selector: Optional[str] = None) -> str: - """ - Return a compact text representation of the page DOM for LLM consumption. - Interactive elements get numeric refs usable in click/fill actions. - """ - page = self.page + return self._submit(self._do_snapshot, selector) + + def _do_snapshot(self, selector: Optional[str] = None) -> str: + page = self._page try: - target = selector or "body" result = page.evaluate(_SNAPSHOT_JS) except Exception as e: return f"[Snapshot error: {e}]" @@ -382,10 +462,18 @@ class BrowserService: ref_count = result.get("refCount", 0) lines = _flatten_tree(tree) - header = f"Page: {page.title()} ({page.url})\nInteractive elements: {ref_count}\n---" + try: + title = page.title() + except Exception: + title = "" + try: + url = page.url + except Exception: + url = "" + + header = f"Page: {title} ({url})\nInteractive elements: {ref_count}\n---" body = "\n".join(lines) - # Limit output size max_chars = self._config.get("snapshot_max_chars", 30000) if len(body) > max_chars: body = body[:max_chars] + "\n... [snapshot truncated]" @@ -393,20 +481,23 @@ class BrowserService: return f"{header}\n{body}" def screenshot(self, full_page: bool = False, cwd: str = "") -> str: - """Take a screenshot and save to workspace/tmp. Returns file path.""" - page = self.page + return self._submit(self._do_screenshot, full_page, cwd) + + def _do_screenshot(self, full_page: bool = False, cwd: str = "") -> str: + page = self._page save_dir = self._get_screenshot_dir(cwd) filename = f"screenshot_{uuid.uuid4().hex[:8]}.png" filepath = os.path.join(save_dir, filename) - page.screenshot(path=filepath, full_page=full_page) logger.info(f"[Browser] Screenshot saved: {filepath}") return filepath def click(self, ref: Optional[int] = None, selector: Optional[str] = None, timeout: int = 5000) -> Dict[str, Any]: - """Click an element by snapshot ref or CSS selector.""" - page = self.page + return self._submit(self._do_click, ref, selector, timeout) + + def _do_click(self, ref, selector, timeout) -> Dict[str, Any]: + page = self._page try: if ref is not None: result = page.evaluate(f""" @@ -431,8 +522,10 @@ class BrowserService: def fill(self, text: str, ref: Optional[int] = None, selector: Optional[str] = None, timeout: int = 5000) -> Dict[str, Any]: - """Fill text into an input/textarea by snapshot ref or CSS selector.""" - page = self.page + return self._submit(self._do_fill, text, ref, selector, timeout) + + def _do_fill(self, text, ref, selector, timeout) -> Dict[str, Any]: + page = self._page try: if ref is not None: result = page.evaluate(f""" @@ -458,8 +551,10 @@ class BrowserService: def select(self, value: str, ref: Optional[int] = None, selector: Optional[str] = None, timeout: int = 5000) -> Dict[str, Any]: - """Select an option in a