mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-06-02 00:57:41 +08:00
fix(weixin): cap thinking messages to avoid rate-limit drops
This commit is contained in:
@@ -2,44 +2,40 @@
|
|||||||
Agent Event Handler - Handles agent events and thinking process output
|
Agent Event Handler - Handles agent events and thinking process output
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from common import const
|
||||||
from common.log import logger
|
from common.log import logger
|
||||||
|
|
||||||
|
# Cap intermediate thinking messages on weixin to stay within send quota.
|
||||||
|
WEIXIN_THINKING_INSTANT_MAX = 7
|
||||||
|
|
||||||
|
|
||||||
class AgentEventHandler:
|
class AgentEventHandler:
|
||||||
"""
|
"""
|
||||||
Handles agent events and optionally sends intermediate messages to channel
|
Handles agent events and optionally sends intermediate messages to channel
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, context=None, original_callback=None):
|
def __init__(self, context=None, original_callback=None):
|
||||||
"""
|
|
||||||
Initialize event handler
|
|
||||||
|
|
||||||
Args:
|
|
||||||
context: COW context (for accessing channel)
|
|
||||||
original_callback: Original event callback to chain
|
|
||||||
"""
|
|
||||||
self.context = context
|
self.context = context
|
||||||
self.original_callback = original_callback
|
self.original_callback = original_callback
|
||||||
|
|
||||||
# Get channel for sending intermediate messages
|
|
||||||
self.channel = None
|
self.channel = None
|
||||||
if context:
|
if context:
|
||||||
self.channel = context.kwargs.get("channel") if hasattr(context, "kwargs") else None
|
self.channel = context.kwargs.get("channel") if hasattr(context, "kwargs") else None
|
||||||
|
|
||||||
self.current_content = ""
|
self.current_content = ""
|
||||||
self.turn_number = 0
|
self.turn_number = 0
|
||||||
|
|
||||||
|
channel_type = ""
|
||||||
|
if context and hasattr(context, "kwargs"):
|
||||||
|
channel_type = context.kwargs.get("channel_type", "") or ""
|
||||||
|
self._is_weixin = channel_type == const.WEIXIN
|
||||||
|
self._thinking_sent_count = 0
|
||||||
|
self._merged_buf: list[str] = []
|
||||||
|
|
||||||
def handle_event(self, event):
|
def handle_event(self, event):
|
||||||
"""
|
|
||||||
Main event handler
|
|
||||||
|
|
||||||
Args:
|
|
||||||
event: Event dict with type and data
|
|
||||||
"""
|
|
||||||
event_type = event.get("type")
|
event_type = event.get("type")
|
||||||
data = event.get("data", {})
|
data = event.get("data", {})
|
||||||
|
|
||||||
# Dispatch to specific handlers
|
|
||||||
if event_type == "turn_start":
|
if event_type == "turn_start":
|
||||||
self._handle_turn_start(data)
|
self._handle_turn_start(data)
|
||||||
elif event_type == "message_update":
|
elif event_type == "message_update":
|
||||||
@@ -52,25 +48,23 @@ class AgentEventHandler:
|
|||||||
self._handle_tool_execution_start(data)
|
self._handle_tool_execution_start(data)
|
||||||
elif event_type == "tool_execution_end":
|
elif event_type == "tool_execution_end":
|
||||||
self._handle_tool_execution_end(data)
|
self._handle_tool_execution_end(data)
|
||||||
|
elif event_type == "agent_end":
|
||||||
# Call original callback if provided
|
self._handle_agent_end(data)
|
||||||
|
|
||||||
if self.original_callback:
|
if self.original_callback:
|
||||||
self.original_callback(event)
|
self.original_callback(event)
|
||||||
|
|
||||||
def _handle_turn_start(self, data):
|
def _handle_turn_start(self, data):
|
||||||
"""Handle turn start event"""
|
|
||||||
self.turn_number = data.get("turn", 0)
|
self.turn_number = data.get("turn", 0)
|
||||||
self.current_content = ""
|
self.current_content = ""
|
||||||
|
|
||||||
def _handle_message_update(self, data):
|
def _handle_message_update(self, data):
|
||||||
"""Handle message update event (streaming content text)"""
|
|
||||||
delta = data.get("delta", "")
|
delta = data.get("delta", "")
|
||||||
self.current_content += delta
|
self.current_content += delta
|
||||||
|
|
||||||
def _handle_message_end(self, data):
|
def _handle_message_end(self, data):
|
||||||
"""Handle message end event"""
|
|
||||||
tool_calls = data.get("tool_calls", [])
|
tool_calls = data.get("tool_calls", [])
|
||||||
|
|
||||||
if tool_calls:
|
if tool_calls:
|
||||||
if self.current_content.strip():
|
if self.current_content.strip():
|
||||||
logger.info(f"💭 {self.current_content.strip()[:200]}{'...' if len(self.current_content) > 200 else ''}")
|
logger.info(f"💭 {self.current_content.strip()[:200]}{'...' if len(self.current_content) > 200 else ''}")
|
||||||
@@ -78,35 +72,54 @@ class AgentEventHandler:
|
|||||||
else:
|
else:
|
||||||
if self.current_content.strip():
|
if self.current_content.strip():
|
||||||
logger.debug(f"💬 {self.current_content.strip()[:200]}{'...' if len(self.current_content) > 200 else ''}")
|
logger.debug(f"💬 {self.current_content.strip()[:200]}{'...' if len(self.current_content) > 200 else ''}")
|
||||||
|
# Drain weixin buffer before final reply leaves chat_channel
|
||||||
|
self._flush_merged_now()
|
||||||
|
|
||||||
self.current_content = ""
|
self.current_content = ""
|
||||||
|
|
||||||
|
def _handle_agent_end(self, data):
|
||||||
|
self._flush_merged_now()
|
||||||
|
|
||||||
def _handle_tool_execution_start(self, data):
|
def _handle_tool_execution_start(self, data):
|
||||||
"""Handle tool execution start event - logged by agent_stream.py"""
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _handle_tool_execution_end(self, data):
|
def _handle_tool_execution_end(self, data):
|
||||||
"""Handle tool execution end event - logged by agent_stream.py"""
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _send_to_channel(self, message):
|
def _send_to_channel(self, message):
|
||||||
"""
|
|
||||||
Try to send intermediate message to channel.
|
|
||||||
Skipped in SSE mode because thinking text is already streamed via on_event.
|
|
||||||
"""
|
|
||||||
if self.context and self.context.get("on_event"):
|
if self.context and self.context.get("on_event"):
|
||||||
return
|
return
|
||||||
|
if not self.channel:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._is_weixin:
|
||||||
|
self._do_send(message)
|
||||||
|
return
|
||||||
|
|
||||||
|
if self._thinking_sent_count < WEIXIN_THINKING_INSTANT_MAX:
|
||||||
|
self._do_send(message)
|
||||||
|
self._thinking_sent_count += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
self._merged_buf.append(message)
|
||||||
|
|
||||||
|
def _flush_merged_now(self):
|
||||||
|
if not self._merged_buf:
|
||||||
|
return
|
||||||
|
merged = "\n\n".join(self._merged_buf)
|
||||||
|
count = len(self._merged_buf)
|
||||||
|
self._merged_buf = []
|
||||||
|
logger.debug(f"[AgentEventHandler] Flushing {count} merged thinking msgs, len={len(merged)}")
|
||||||
|
self._do_send(merged)
|
||||||
|
self._thinking_sent_count += 1
|
||||||
|
|
||||||
|
def _do_send(self, message):
|
||||||
|
try:
|
||||||
|
from bridge.reply import Reply, ReplyType
|
||||||
|
reply = Reply(ReplyType.TEXT, message)
|
||||||
|
self.channel._send(reply, self.context)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"[AgentEventHandler] Failed to send to channel: {e}")
|
||||||
|
|
||||||
if self.channel:
|
|
||||||
try:
|
|
||||||
from bridge.reply import Reply, ReplyType
|
|
||||||
reply = Reply(ReplyType.TEXT, message)
|
|
||||||
self.channel._send(reply, self.context)
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug(f"[AgentEventHandler] Failed to send to channel: {e}")
|
|
||||||
|
|
||||||
def log_summary(self):
|
def log_summary(self):
|
||||||
"""Log execution summary - simplified"""
|
|
||||||
# Summary removed as per user request
|
|
||||||
# Real-time logging during execution is sufficient
|
|
||||||
pass
|
pass
|
||||||
|
|||||||
Reference in New Issue
Block a user