Files
chatgpt-on-wechat/agent/memory/summarizer.py
zhayujie 54e81aba11 feat(memory+knowledge): add knowledge wiki system and Light Dream memory extraction
- Add knowledge/ directory structure and knowledge-wiki skill for structured knowledge accumulation
- Auto-inject MEMORY.md into system prompt with truncation (last 200 lines)
- Light Dream: extend flush_memory to extract long-term memories into MEMORY.md with date stamps
- Add mandatory knowledge auto-write rules in system prompt (no user confirmation needed)
- Expand MemoryManager.sync() to index knowledge/ files for vector search
- Update RULE.md template with workspace conventions and knowledge guidelines
2026-04-09 21:22:43 +08:00

481 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Memory flush manager (with Light Dream)
Handles memory persistence when conversation context is trimmed or overflows:
- Uses LLM to summarize discarded messages into concise key-information entries
- Writes to daily memory files (lazy creation)
- Light Dream: extracts long-term memories to MEMORY.md in the same LLM call
- Deduplicates trim flushes to avoid repeated writes
- Runs summarization asynchronously to avoid blocking normal replies
- Provides daily summary interface for scheduler
"""
import threading
from typing import Optional, Callable, Any, List, Dict
from pathlib import Path
from datetime import datetime
from common.log import logger
SUMMARIZE_SYSTEM_PROMPT = """你是一个记忆提取助手。你的任务是从对话记录中提炼出两种记忆:
## 第一部分:日常记录([DAILY]
按「事件」维度归纳当天发生的事,不要按对话轮次逐条记录:
- 每条一行,用 "- " 开头
- 合并同一件事的多轮对话
- 只记录有意义的事件,忽略闲聊和问候
## 第二部分:长期记忆([MEMORY]
提取值得**永久记住**的关键信息,这些信息在未来的对话中仍然有价值:
- 用户的偏好、习惯、风格(如"用户偏好中文回复""用户喜欢简洁风格"
- 重要的决策或约定(如"项目决定使用 PostgreSQL"
- 关键人物信息(如"张总是用户的上级"
- 用户明确要求记住的内容
- 重要的教训或经验总结
**如果没有值得永久记住的信息,[MEMORY] 部分留空即可。**
## 输出格式(严格遵守)
```
[DAILY]
- 事件1的摘要
- 事件2的摘要
[MEMORY]
- 值得永久记住的信息1
- 值得永久记住的信息2
```
当对话没有任何记录价值(仅含问候或无意义内容),直接回复"""""
SUMMARIZE_USER_PROMPT = """请从以下对话记录中提取记忆(按 [DAILY] 和 [MEMORY] 两部分输出):
{conversation}"""
class MemoryFlushManager:
"""
Manages memory flush operations.
Flush is triggered by agent_stream in two scenarios:
1. Context trim: _trim_messages discards old turns → flush discarded content
2. Context overflow: API rejects request → emergency flush before clearing
Additionally, create_daily_summary() can be called by scheduler for end-of-day summaries.
"""
def __init__(
self,
workspace_dir: Path,
llm_model: Optional[Any] = None,
):
self.workspace_dir = workspace_dir
self.llm_model = llm_model
self.memory_dir = workspace_dir / "memory"
self.memory_dir.mkdir(parents=True, exist_ok=True)
self.last_flush_timestamp: Optional[datetime] = None
self._trim_flushed_hashes: set = set() # Content hashes of already-flushed messages
self._last_flushed_content_hash: str = "" # Content hash at last flush, for daily dedup
def get_today_memory_file(self, user_id: Optional[str] = None, ensure_exists: bool = False) -> Path:
"""Get today's memory file path: memory/YYYY-MM-DD.md"""
today = datetime.now().strftime("%Y-%m-%d")
if user_id:
user_dir = self.memory_dir / "users" / user_id
if ensure_exists:
user_dir.mkdir(parents=True, exist_ok=True)
today_file = user_dir / f"{today}.md"
else:
today_file = self.memory_dir / f"{today}.md"
if ensure_exists and not today_file.exists():
today_file.parent.mkdir(parents=True, exist_ok=True)
today_file.write_text(f"# Daily Memory: {today}\n\n")
return today_file
def get_main_memory_file(self, user_id: Optional[str] = None) -> Path:
"""Get main memory file path: MEMORY.md (workspace root)"""
if user_id:
user_dir = self.memory_dir / "users" / user_id
user_dir.mkdir(parents=True, exist_ok=True)
return user_dir / "MEMORY.md"
else:
return Path(self.workspace_dir) / "MEMORY.md"
def get_status(self) -> dict:
return {
'last_flush_time': self.last_flush_timestamp.isoformat() if self.last_flush_timestamp else None,
'today_file': str(self.get_today_memory_file()),
'main_file': str(self.get_main_memory_file())
}
# ---- Flush execution (called by agent_stream or scheduler) ----
def flush_from_messages(
self,
messages: List[Dict],
user_id: Optional[str] = None,
reason: str = "trim",
max_messages: int = 0,
) -> bool:
"""
Asynchronously summarize and flush messages to daily memory.
Deduplication runs synchronously, then LLM summarization + file write
run in a background thread so the main reply flow is never blocked.
Args:
messages: Conversation message list (OpenAI/Claude format)
user_id: Optional user ID for user-scoped memory
reason: Why flush was triggered ("trim" | "overflow" | "daily_summary")
max_messages: Max recent messages to summarize (0 = all)
Returns:
True if flush was dispatched
"""
try:
import hashlib
deduped = []
for m in messages:
text = self._extract_text_from_content(m.get("content", ""))
if not text or not text.strip():
continue
h = hashlib.md5(text.encode("utf-8")).hexdigest()
if h not in self._trim_flushed_hashes:
self._trim_flushed_hashes.add(h)
deduped.append(m)
if not deduped:
return False
import copy
snapshot = copy.deepcopy(deduped)
thread = threading.Thread(
target=self._flush_worker,
args=(snapshot, user_id, reason, max_messages),
daemon=True,
)
thread.start()
logger.info(f"[MemoryFlush] Async flush dispatched (reason={reason}, msgs={len(snapshot)})")
return True
except Exception as e:
logger.warning(f"[MemoryFlush] Failed to dispatch flush (reason={reason}): {e}")
return False
def _flush_worker(
self,
messages: List[Dict],
user_id: Optional[str],
reason: str,
max_messages: int,
):
"""Background worker: summarize with LLM, write daily file + MEMORY.md (Light Dream)."""
try:
raw_summary = self._summarize_messages(messages, max_messages)
if not raw_summary or not raw_summary.strip() or raw_summary.strip() == "":
logger.info(f"[MemoryFlush] No valuable content to flush (reason={reason})")
return
daily_part, memory_part = self._parse_dual_output(raw_summary)
# --- Write daily memory ---
if daily_part:
daily_file = ensure_daily_memory_file(self.workspace_dir, user_id)
if reason == "overflow":
header = f"## Context Overflow Recovery ({datetime.now().strftime('%H:%M')})"
note = "The following conversation was trimmed due to context overflow:\n"
elif reason == "trim":
header = f"## Trimmed Context ({datetime.now().strftime('%H:%M')})"
note = ""
elif reason == "daily_summary":
header = f"## Daily Summary ({datetime.now().strftime('%H:%M')})"
note = ""
else:
header = f"## Session Notes ({datetime.now().strftime('%H:%M')})"
note = ""
flush_entry = f"\n{header}\n\n{note}{daily_part}\n"
with open(daily_file, "a", encoding="utf-8") as f:
f.write(flush_entry)
logger.info(f"[MemoryFlush] Wrote daily memory to {daily_file.name} (reason={reason}, chars={len(daily_part)})")
# --- Light Dream: write long-term memory to MEMORY.md ---
if memory_part:
self._append_to_main_memory(memory_part, user_id)
self.last_flush_timestamp = datetime.now()
except Exception as e:
logger.warning(f"[MemoryFlush] Async flush failed (reason={reason}): {e}")
@staticmethod
def _parse_dual_output(raw: str) -> tuple:
"""
Parse LLM output into (daily_part, memory_part).
Handles both new [DAILY]/[MEMORY] format and legacy single-section format.
"""
raw = raw.strip()
if "[DAILY]" in raw or "[MEMORY]" in raw:
daily_part = ""
memory_part = ""
# Extract [DAILY] section
if "[DAILY]" in raw:
start = raw.index("[DAILY]") + len("[DAILY]")
end = raw.index("[MEMORY]") if "[MEMORY]" in raw else len(raw)
daily_part = raw[start:end].strip()
# Extract [MEMORY] section
if "[MEMORY]" in raw:
start = raw.index("[MEMORY]") + len("[MEMORY]")
memory_part = raw[start:].strip()
# Filter out empty markers
if memory_part and all(
not line.strip() or line.strip() == "-"
for line in memory_part.split("\n")
):
memory_part = ""
return daily_part, memory_part
# Legacy format: treat entire output as daily, no memory extraction
return raw, ""
def _append_to_main_memory(self, memory_entries: str, user_id: Optional[str] = None):
"""Append extracted long-term memories to MEMORY.md with date stamp."""
try:
main_file = self.get_main_memory_file(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# Add date prefix to each entry line
stamped_lines = []
for line in memory_entries.strip().split("\n"):
line = line.strip()
if line.startswith("- "):
stamped_lines.append(f"- ({today}) {line[2:]}")
elif line:
stamped_lines.append(f"- ({today}) {line}")
if not stamped_lines:
return
stamped_text = "\n".join(stamped_lines)
with open(main_file, "a", encoding="utf-8") as f:
f.write(f"\n{stamped_text}\n")
logger.info(f"[LightDream] Appended {len(stamped_lines)} entries to MEMORY.md")
except Exception as e:
logger.warning(f"[LightDream] Failed to append to MEMORY.md: {e}")
def create_daily_summary(
self,
messages: List[Dict],
user_id: Optional[str] = None
) -> bool:
"""
Generate end-of-day summary. Called by daily timer.
Skips if messages haven't changed since last flush.
"""
import hashlib
content = "".join(
self._extract_text_from_content(m.get("content", ""))
for m in messages
)
content_hash = hashlib.md5(content.encode("utf-8")).hexdigest()
if content_hash == self._last_flushed_content_hash:
logger.debug("[MemoryFlush] Daily summary skipped: no new content since last flush")
return False
self._last_flushed_content_hash = content_hash
return self.flush_from_messages(
messages=messages,
user_id=user_id,
reason="daily_summary",
max_messages=0,
)
# ---- Internal helpers ----
def _summarize_messages(self, messages: List[Dict], max_messages: int = 0) -> str:
"""
Summarize conversation messages using LLM, with rule-based fallback.
"""
conversation_text = self._format_conversation_for_summary(messages, max_messages)
if not conversation_text.strip():
return ""
if self.llm_model:
try:
summary = self._call_llm_for_summary(conversation_text)
if summary and summary.strip() and summary.strip() != "":
return summary.strip()
logger.info(f"[MemoryFlush] LLM returned empty or '', using fallback")
except Exception as e:
logger.warning(f"[MemoryFlush] LLM summarization failed, using fallback: {e}")
else:
logger.info("[MemoryFlush] No LLM model available, using rule-based fallback")
return self._extract_summary_fallback(messages, max_messages)
def _format_conversation_for_summary(self, messages: List[Dict], max_messages: int = 0) -> str:
"""Format messages into readable conversation text for LLM summarization."""
msgs = messages if max_messages == 0 else messages[-max_messages * 2:]
lines = []
for msg in msgs:
role = msg.get("role", "")
text = self._extract_text_from_content(msg.get("content", ""))
if not text or not text.strip():
continue
text = text.strip()
if role == "user":
lines.append(f"用户: {text[:500]}")
elif role == "assistant":
lines.append(f"助手: {text[:500]}")
return "\n".join(lines)
def _call_llm_for_summary(self, conversation_text: str) -> str:
"""Call LLM to generate a concise summary of the conversation."""
from agent.protocol.models import LLMRequest
request = LLMRequest(
messages=[{"role": "user", "content": SUMMARIZE_USER_PROMPT.format(conversation=conversation_text)}],
temperature=0,
max_tokens=500,
stream=False,
system=SUMMARIZE_SYSTEM_PROMPT,
)
response = self.llm_model.call(request)
if isinstance(response, dict):
if response.get("error"):
raise RuntimeError(response.get("message", "LLM call failed"))
# OpenAI format
choices = response.get("choices", [])
if choices:
return choices[0].get("message", {}).get("content", "")
# Handle response object with attribute access (e.g. OpenAI SDK response)
if hasattr(response, "choices") and response.choices:
return response.choices[0].message.content or ""
return ""
@staticmethod
def _extract_summary_fallback(messages: List[Dict], max_messages: int = 0) -> str:
"""
Rule-based fallback when LLM is unavailable.
Groups consecutive user+assistant messages into events instead of
listing each message individually.
"""
msgs = messages if max_messages == 0 else messages[-max_messages * 2:]
events: List[str] = []
current_user_text = ""
for msg in msgs:
role = msg.get("role", "")
text = MemoryFlushManager._extract_text_from_content(msg.get("content", ""))
if not text or not text.strip():
continue
text = text.strip()
if role == "user":
if len(text) <= 5:
continue
current_user_text = text[:150]
elif role == "assistant" and current_user_text:
first_line = text.split("\n")[0].strip()
if len(first_line) > 10:
events.append(f"- {current_user_text}{first_line[:150]}")
else:
events.append(f"- {current_user_text}")
current_user_text = ""
if current_user_text:
events.append(f"- {current_user_text}")
return "\n".join(events[:10])
@staticmethod
def _extract_text_from_content(content) -> str:
"""Extract plain text from message content (string or content blocks)."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
parts.append(block.get("text", ""))
elif isinstance(block, str):
parts.append(block)
return "\n".join(parts)
return ""
def create_memory_files_if_needed(workspace_dir: Path, user_id: Optional[str] = None):
"""
Create essential memory files if they don't exist.
Only creates MEMORY.md; daily files are created lazily on first write.
Args:
workspace_dir: Workspace directory
user_id: Optional user ID for user-specific files
"""
memory_dir = workspace_dir / "memory"
memory_dir.mkdir(parents=True, exist_ok=True)
# Create main MEMORY.md in workspace root (always needed for bootstrap)
if user_id:
user_dir = memory_dir / "users" / user_id
user_dir.mkdir(parents=True, exist_ok=True)
main_memory = user_dir / "MEMORY.md"
else:
main_memory = Path(workspace_dir) / "MEMORY.md"
if not main_memory.exists():
main_memory.write_text("")
def ensure_daily_memory_file(workspace_dir: Path, user_id: Optional[str] = None) -> Path:
"""
Ensure today's daily memory file exists, creating it only when actually needed.
Called lazily before first write to daily memory.
Args:
workspace_dir: Workspace directory
user_id: Optional user ID for user-specific files
Returns:
Path to today's memory file
"""
memory_dir = workspace_dir / "memory"
memory_dir.mkdir(parents=True, exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d")
if user_id:
user_dir = memory_dir / "users" / user_id
user_dir.mkdir(parents=True, exist_ok=True)
today_memory = user_dir / f"{today}.md"
else:
today_memory = memory_dir / f"{today}.md"
if not today_memory.exists():
today_memory.write_text(
f"# Daily Memory: {today}\n\n"
)
return today_memory