From bb868b83baf3e08d1fc60acab8a9a88beeef297c Mon Sep 17 00:00:00 2001 From: zhayujie Date: Sun, 8 Mar 2026 13:03:27 +0800 Subject: [PATCH] feat: add chat history query --- agent/chat/service.py | 28 ++++++++++++++++++- common/cloud_client.py | 63 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 3 deletions(-) diff --git a/agent/chat/service.py b/agent/chat/service.py index ebb1a6d4..b6a9ae67 100644 --- a/agent/chat/service.py +++ b/agent/chat/service.py @@ -27,7 +27,8 @@ class ChatService: """ self.agent_bridge = agent_bridge - def run(self, query: str, session_id: str, send_chunk_fn: Callable[[dict], None]): + def run(self, query: str, session_id: str, send_chunk_fn: Callable[[dict], None], + channel_type: str = ""): """ Run the agent for *query* and stream results back via *send_chunk_fn*. @@ -37,6 +38,7 @@ class ChatService: :param query: user query text :param session_id: session identifier for agent isolation :param send_chunk_fn: callable(chunk_data: dict) to send a streaming chunk + :param channel_type: source channel (e.g. "web", "feishu") for persistence """ agent = self.agent_bridge.get_agent(session_id=session_id) if agent is None: @@ -164,6 +166,11 @@ class ChatService: new_messages = executor.messages[original_length:] agent.messages.extend(new_messages) + # Persist new messages to SQLite so they survive restarts and + # can be queried via the HISTORY interface. + if new_messages: + self._persist_messages(session_id, list(new_messages), channel_type) + # Store executor reference for files_to_send access agent.stream_executor = executor @@ -174,6 +181,25 @@ class ChatService: + @staticmethod + def _persist_messages(session_id: str, new_messages: list, channel_type: str = ""): + try: + from config import conf + if not conf().get("conversation_persistence", True): + return + except Exception: + pass + try: + from agent.memory import get_conversation_store + get_conversation_store().append_messages( + session_id, new_messages, channel_type=channel_type + ) + except Exception as e: + logger.warning( + f"[ChatService] Failed to persist messages for session={session_id}: {e}" + ) + + class _StreamState: """Mutable state shared between the event callback and the run method.""" diff --git a/common/cloud_client.py b/common/cloud_client.py index 75bb5888..1b6cd96a 100644 --- a/common/cloud_client.py +++ b/common/cloud_client.py @@ -394,13 +394,72 @@ class CloudClient(LinkAIClient): payload = data.get("payload", {}) query = payload.get("query", "") session_id = payload.get("session_id", "cloud_console") - logger.info(f"[CloudClient] on_chat: session={session_id}, query={query[:80]}") + channel_type = payload.get("channel_type", "") + if not session_id.startswith("session_"): + session_id = f"session_{session_id}" + logger.info(f"[CloudClient] on_chat: session={session_id}, channel={channel_type}, query={query[:80]}") svc = self.chat_service if svc is None: raise RuntimeError("ChatService not available") - svc.run(query=query, session_id=session_id, send_chunk_fn=send_chunk_fn) + svc.run(query=query, session_id=session_id, channel_type=channel_type, send_chunk_fn=send_chunk_fn) + + # ------------------------------------------------------------------ + # history callback + # ------------------------------------------------------------------ + def on_history(self, data: dict) -> dict: + """ + Handle HISTORY messages from the cloud console. + Returns paginated conversation history for a session. + + :param data: message data with 'action' and 'payload' (session_id, page, page_size) + :return: response dict + """ + action = data.get("action", "query") + payload = data.get("payload", {}) + logger.info(f"[CloudClient] on_history: action={action}") + + if action == "query": + return self._query_history(payload) + + return {"action": action, "code": 404, "message": f"unknown action: {action}", "payload": None} + + def _query_history(self, payload: dict) -> dict: + """Query paginated conversation history using ConversationStore.""" + session_id = payload.get("session_id", "") + page = int(payload.get("page", 1)) + page_size = int(payload.get("page_size", 20)) + + if not session_id: + return { + "action": "query", + "payload": {"status": "error", "message": "session_id required"}, + } + + # Web channel stores sessions with a "session_" prefix + if not session_id.startswith("session_"): + session_id = f"session_{session_id}" + logger.info(f"[CloudClient] history query: session={session_id}, page={page}, page_size={page_size}") + + try: + from agent.memory.conversation_store import get_conversation_store + store = get_conversation_store() + result = store.load_history_page( + session_id=session_id, + page=page, + page_size=page_size, + ) + return { + "action": "query", + "payload": {"status": "success", **result}, + } + except Exception as e: + logger.error(f"[CloudClient] History query error: {e}") + return { + "action": "query", + "payload": {"status": "error", "message": str(e)}, + } # ------------------------------------------------------------------ # channel restart helpers