diff --git a/bridge/agent_event_handler.py b/bridge/agent_event_handler.py index 50826235..35173730 100644 --- a/bridge/agent_event_handler.py +++ b/bridge/agent_event_handler.py @@ -2,44 +2,40 @@ Agent Event Handler - Handles agent events and thinking process output """ +from common import const from common.log import logger +# Cap intermediate thinking messages on weixin to stay within send quota. +WEIXIN_THINKING_INSTANT_MAX = 7 + class AgentEventHandler: """ Handles agent events and optionally sends intermediate messages to channel """ - + def __init__(self, context=None, original_callback=None): - """ - Initialize event handler - - Args: - context: COW context (for accessing channel) - original_callback: Original event callback to chain - """ self.context = context self.original_callback = original_callback - - # Get channel for sending intermediate messages + self.channel = None if context: self.channel = context.kwargs.get("channel") if hasattr(context, "kwargs") else None - + self.current_content = "" self.turn_number = 0 - + + channel_type = "" + if context and hasattr(context, "kwargs"): + channel_type = context.kwargs.get("channel_type", "") or "" + self._is_weixin = channel_type == const.WEIXIN + self._thinking_sent_count = 0 + self._merged_buf: list[str] = [] + def handle_event(self, event): - """ - Main event handler - - Args: - event: Event dict with type and data - """ event_type = event.get("type") data = event.get("data", {}) - - # Dispatch to specific handlers + if event_type == "turn_start": self._handle_turn_start(data) elif event_type == "message_update": @@ -52,25 +48,23 @@ class AgentEventHandler: self._handle_tool_execution_start(data) elif event_type == "tool_execution_end": self._handle_tool_execution_end(data) - - # Call original callback if provided + elif event_type == "agent_end": + self._handle_agent_end(data) + if self.original_callback: self.original_callback(event) - + def _handle_turn_start(self, data): - """Handle turn start event""" self.turn_number = data.get("turn", 0) self.current_content = "" - + def _handle_message_update(self, data): - """Handle message update event (streaming content text)""" delta = data.get("delta", "") self.current_content += delta - + def _handle_message_end(self, data): - """Handle message end event""" tool_calls = data.get("tool_calls", []) - + if tool_calls: if self.current_content.strip(): logger.info(f"💭 {self.current_content.strip()[:200]}{'...' if len(self.current_content) > 200 else ''}") @@ -78,35 +72,54 @@ class AgentEventHandler: else: if self.current_content.strip(): logger.debug(f"💬 {self.current_content.strip()[:200]}{'...' if len(self.current_content) > 200 else ''}") - + # Drain weixin buffer before final reply leaves chat_channel + self._flush_merged_now() + self.current_content = "" - + + def _handle_agent_end(self, data): + self._flush_merged_now() + def _handle_tool_execution_start(self, data): - """Handle tool execution start event - logged by agent_stream.py""" pass - + def _handle_tool_execution_end(self, data): - """Handle tool execution end event - logged by agent_stream.py""" pass - + def _send_to_channel(self, message): - """ - Try to send intermediate message to channel. - Skipped in SSE mode because thinking text is already streamed via on_event. - """ if self.context and self.context.get("on_event"): return + if not self.channel: + return + + if not self._is_weixin: + self._do_send(message) + return + + if self._thinking_sent_count < WEIXIN_THINKING_INSTANT_MAX: + self._do_send(message) + self._thinking_sent_count += 1 + return + + self._merged_buf.append(message) + + def _flush_merged_now(self): + if not self._merged_buf: + return + merged = "\n\n".join(self._merged_buf) + count = len(self._merged_buf) + self._merged_buf = [] + logger.debug(f"[AgentEventHandler] Flushing {count} merged thinking msgs, len={len(merged)}") + self._do_send(merged) + self._thinking_sent_count += 1 + + def _do_send(self, message): + try: + from bridge.reply import Reply, ReplyType + reply = Reply(ReplyType.TEXT, message) + self.channel._send(reply, self.context) + except Exception as e: + logger.debug(f"[AgentEventHandler] Failed to send to channel: {e}") - if self.channel: - try: - from bridge.reply import Reply, ReplyType - reply = Reply(ReplyType.TEXT, message) - self.channel._send(reply, self.context) - except Exception as e: - logger.debug(f"[AgentEventHandler] Failed to send to channel: {e}") - def log_summary(self): - """Log execution summary - simplified""" - # Summary removed as per user request - # Real-time logging during execution is sufficient pass