feat(memory): add Deep Dream module for daily memory distillation

- Add Deep Dream: nightly distill daily memories → refined MEMORY.md + dream diary
- Simplify flush prompt to daily-only, defer MEMORY.md maintenance to Deep Dream
- Remove dead code (_append_to_main_memory) and fix fallback summary logic
- Add shrinkage protection and input dedup for dream process
- Ensure flush threads complete before dream starts
- Update docs (zh/en/ja) with dream diary and distillation mechanism
This commit is contained in:
zhayujie
2026-04-13 21:32:52 +08:00
parent 7bc88c17e4
commit ea1a0c8b3d
5 changed files with 320 additions and 113 deletions

View File

@@ -1,13 +1,12 @@
"""
Memory flush manager (with Light Dream)
Memory flush manager with Deep Dream distillation
Handles memory persistence when conversation context is trimmed or overflows:
- Uses LLM to summarize discarded messages into concise key-information entries
- Uses LLM to summarize discarded messages into concise daily records
- Writes to daily memory files (lazy creation)
- Light Dream: extracts long-term memories to MEMORY.md in the same LLM call
- Deduplicates trim flushes to avoid repeated writes
- Runs summarization asynchronously to avoid blocking normal replies
- Provides daily summary interface for scheduler
- Deep Dream: periodically distills daily memories → refined MEMORY.md + dream diary
"""
import threading
@@ -17,43 +16,76 @@ from datetime import datetime
from common.log import logger
SUMMARIZE_SYSTEM_PROMPT = """你是一个记忆提取助手。你的任务是从对话记录中提炼出两种记忆:
SUMMARIZE_SYSTEM_PROMPT = """你是一个对话记录助手。请将对话内容归纳为当天的日常记录。
## 第一部分:日常记录([DAILY]
## 要求
按「事件」维度归纳当天发生的事,不要按对话轮次逐条记录:
按「事件」维度归纳发生的事,不要按对话轮次逐条记录:
- 每条一行,用 "- " 开头
- 合并同一件事的多轮对话
- 只记录有意义的事件,忽略闲聊和问候
- 保留关键的决策、结论和待办事项
## 第二部分:长期记忆([MEMORY]
当对话没有任何记录价值(仅含问候或无意义内容),直接回复"""""
提取值得**永久记住**的关键信息,这些信息在未来的对话中仍然有价值
- 用户的偏好、习惯、风格
- 重要的决策或约定
- 关键人物关系
- 用户明确要求记住的内容
- 重要的教训或经验总结
SUMMARIZE_USER_PROMPT = """请归纳以下对话的日常记录
**如果没有值得永久记住的信息,[MEMORY] 部分留空即可。**
{conversation}"""
# ---------------------------------------------------------------------------
# Deep Dream prompts — distill daily memories → MEMORY.md + dream diary
# ---------------------------------------------------------------------------
DREAM_SYSTEM_PROMPT = """你是一个记忆整理助手,负责定期整理用户的长期记忆。
你将收到两份材料:
1. **当前长期记忆** — MEMORY.md 的全部现有内容
2. **今日日记** — 当天的日常记录
MEMORY.md 会注入每次对话的系统提示词中,因此必须保持精炼。
**重要:只能基于提供的材料进行整理,严禁编造、推测或添加材料中不存在的信息。**
## 任务
### Part 1: 更新后的长期记忆([MEMORY]
在现有记忆基础上进行整理和提炼,输出完整的更新后内容:
- **合并提炼**:将含义相近的多条合并为一条高密度表述,而非简单罗列
- **新增萃取**:从今日日记中提取值得永久记住的新信息(偏好、决策、人物、经验)
- **冲突更新**:当新信息与旧条目矛盾时,以新信息为准,替换旧条目
- **清理无效**:删除临时性记录、空白条目、格式残留等
- **删除冗余**:已被更精炼表述涵盖的旧条目应删除,避免信息重复
- 每条一行,用 "- " 开头,不带日期前缀
- 目标:控制在 30 条以内,每条尽量一句话概括
### Part 2: 梦境日记([DREAM]
用简洁的叙事风格写一篇短日记,记录这次整理的发现,保持格式美观易读:
- 发现了哪些重复或矛盾
- 从日记中提取了什么新洞察
- 做了哪些清理和优化
- 整体感受和观察
## 输出格式(严格遵守)
```
[DAILY]
- 事件1的摘要
- 事件2的摘要
[MEMORY]
- 值得永久记住的信息1
- 值得永久记住的信息2
```
- 记忆条目1
- 记忆条目2
...
当对话没有任何记录价值(仅含问候或无意义内容),直接回复"""""
[DREAM]
梦境日记内容...
```"""
SUMMARIZE_USER_PROMPT = """请从以下对话记录中提取记忆(按 [DAILY] 和 [MEMORY] 两部分输出):
DREAM_USER_PROMPT = """## 当前长期记忆MEMORY.md
{conversation}"""
{memory_content}
## 近期日记(最近 {days} 天)
{daily_content}"""
@@ -82,6 +114,8 @@ class MemoryFlushManager:
self.last_flush_timestamp: Optional[datetime] = None
self._trim_flushed_hashes: set = set() # Content hashes of already-flushed messages
self._last_flushed_content_hash: str = "" # Content hash at last flush, for daily dedup
self._last_dream_input_hash: str = "" # Hash of dream input, for dedup
self._last_flush_thread: Optional[threading.Thread] = None
def get_today_memory_file(self, user_id: Optional[str] = None, ensure_exists: bool = False) -> Path:
"""Get today's memory file path: memory/YYYY-MM-DD.md"""
@@ -162,6 +196,7 @@ class MemoryFlushManager:
)
thread.start()
logger.info(f"[MemoryFlush] Async flush dispatched (reason={reason}, msgs={len(snapshot)})")
self._last_flush_thread = thread
return True
except Exception as e:
@@ -176,45 +211,35 @@ class MemoryFlushManager:
max_messages: int,
context_summary_callback: Optional[Callable[[str], None]] = None,
):
"""Background worker: summarize with LLM, write daily file + MEMORY.md (Light Dream)."""
"""Background worker: summarize with LLM, write daily memory file."""
try:
raw_summary = self._summarize_messages(messages, max_messages)
if not raw_summary or not raw_summary.strip() or raw_summary.strip() == "":
logger.info(f"[MemoryFlush] No valuable content to flush (reason={reason})")
return
daily_part, memory_part = self._parse_dual_output(raw_summary)
# Strip legacy [DAILY]/[MEMORY] markers if model still outputs them
daily_part = self._clean_summary_output(raw_summary)
if not daily_part:
return
# --- Write daily memory ---
if daily_part:
daily_file = ensure_daily_memory_file(self.workspace_dir, user_id)
daily_file = ensure_daily_memory_file(self.workspace_dir, user_id)
if reason == "overflow":
header = f"## Context Overflow Recovery ({datetime.now().strftime('%H:%M')})"
note = "The following conversation was trimmed due to context overflow:\n"
elif reason == "trim":
header = f"## Trimmed Context ({datetime.now().strftime('%H:%M')})"
note = ""
elif reason == "daily_summary":
header = f"## Daily Summary ({datetime.now().strftime('%H:%M')})"
note = ""
else:
header = f"## Session Notes ({datetime.now().strftime('%H:%M')})"
note = ""
headers = {
"overflow": f"## Context Overflow Recovery ({datetime.now().strftime('%H:%M')})",
"trim": f"## Trimmed Context ({datetime.now().strftime('%H:%M')})",
"daily_summary": f"## Daily Summary ({datetime.now().strftime('%H:%M')})",
}
header = headers.get(reason, f"## Session Notes ({datetime.now().strftime('%H:%M')})")
flush_entry = f"\n{header}\n\n{note}{daily_part}\n"
with open(daily_file, "a", encoding="utf-8") as f:
f.write(f"\n{header}\n\n{daily_part}\n")
with open(daily_file, "a", encoding="utf-8") as f:
f.write(flush_entry)
logger.info(f"[MemoryFlush] Wrote daily memory to {daily_file.name} (reason={reason}, chars={len(daily_part)})")
# --- Light Dream: write long-term memory to MEMORY.md ---
if memory_part:
self._append_to_main_memory(memory_part, user_id)
logger.info(f"[MemoryFlush] Wrote daily memory to {daily_file.name} (reason={reason}, chars={len(daily_part)})")
# --- Inject context summary into live messages (if callback provided) ---
if context_summary_callback and daily_part:
if context_summary_callback:
try:
context_summary_callback(daily_part)
except Exception as e:
@@ -226,67 +251,26 @@ class MemoryFlushManager:
logger.warning(f"[MemoryFlush] Async flush failed (reason={reason}): {e}")
@staticmethod
def _parse_dual_output(raw: str) -> tuple:
"""
Parse LLM output into (daily_part, memory_part).
Handles both new [DAILY]/[MEMORY] format and legacy single-section format.
"""
def _clean_summary_output(raw: str) -> str:
"""Strip legacy [DAILY]/[MEMORY] markers if present, return clean daily text."""
raw = raw.strip()
if not raw or raw == "":
return ""
if "[DAILY]" in raw or "[MEMORY]" in raw:
daily_part = ""
memory_part = ""
# Strip [DAILY] marker
if "[DAILY]" in raw:
start = raw.index("[DAILY]") + len("[DAILY]")
end = raw.index("[MEMORY]") if "[MEMORY]" in raw else len(raw)
raw = raw[start:end].strip()
# Extract [DAILY] section
if "[DAILY]" in raw:
start = raw.index("[DAILY]") + len("[DAILY]")
end = raw.index("[MEMORY]") if "[MEMORY]" in raw else len(raw)
daily_part = raw[start:end].strip()
# Remove stray [MEMORY] section entirely
if "[MEMORY]" in raw:
raw = raw[:raw.index("[MEMORY]")].strip()
# Extract [MEMORY] section
if "[MEMORY]" in raw:
start = raw.index("[MEMORY]") + len("[MEMORY]")
memory_part = raw[start:].strip()
# Remove markdown code fences
raw = raw.replace("```", "").strip()
# Filter out empty markers
if memory_part and all(
not line.strip() or line.strip() == "-"
for line in memory_part.split("\n")
):
memory_part = ""
return daily_part, memory_part
# Legacy format: treat entire output as daily, no memory extraction
return raw, ""
def _append_to_main_memory(self, memory_entries: str, user_id: Optional[str] = None):
"""Append extracted long-term memories to MEMORY.md with date stamp."""
try:
main_file = self.get_main_memory_file(user_id)
today = datetime.now().strftime("%Y-%m-%d")
# Add date prefix to each entry line
stamped_lines = []
for line in memory_entries.strip().split("\n"):
line = line.strip()
if line.startswith("- "):
stamped_lines.append(f"- ({today}) {line[2:]}")
elif line:
stamped_lines.append(f"- ({today}) {line}")
if not stamped_lines:
return
stamped_text = "\n".join(stamped_lines)
with open(main_file, "a", encoding="utf-8") as f:
f.write(f"\n{stamped_text}\n")
logger.info(f"[LightDream] Appended {len(stamped_lines)} entries to MEMORY.md")
except Exception as e:
logger.warning(f"[LightDream] Failed to append to MEMORY.md: {e}")
return raw
def create_daily_summary(
self,
@@ -313,12 +297,195 @@ class MemoryFlushManager:
reason="daily_summary",
max_messages=0,
)
# ---- Deep Dream (memory distillation) ----
def deep_dream(self, user_id: Optional[str] = None, lookback_days: int = 1) -> bool:
"""
Distill recent daily memories into MEMORY.md and generate a dream diary.
Process:
1. Read current MEMORY.md + recent N days of daily files
2. LLM produces updated MEMORY.md (deduped/pruned) + dream diary narrative
3. Overwrite MEMORY.md with the distilled version
4. Write dream diary to memory/dreams/YYYY-MM-DD.md
"""
if not self.llm_model:
logger.warning("[DeepDream] No LLM model available, skipping")
return False
logger.info(f"[DeepDream] Starting memory distillation (lookback={lookback_days} days)")
# Collect materials
memory_content = self._read_main_memory(user_id)
daily_content, has_content = self._read_recent_dailies(user_id, lookback_days)
if not has_content:
logger.info("[DeepDream] No recent daily records, skipping to preserve existing MEMORY.md")
return False
# Dedup: skip if input materials haven't changed since last dream
import hashlib
input_hash = hashlib.md5((memory_content + daily_content).encode("utf-8")).hexdigest()
if input_hash == self._last_dream_input_hash:
logger.debug("[DeepDream] Input unchanged since last dream, skipping")
return False
self._last_dream_input_hash = input_hash
logger.info(
f"[DeepDream] Materials collected: "
f"MEMORY.md={len(memory_content)} chars, "
f"daily={len(daily_content)} chars"
)
# Call LLM for distillation
import time as _time
t0 = _time.monotonic()
try:
user_msg = DREAM_USER_PROMPT.format(
memory_content=memory_content or "(empty)",
days=lookback_days,
daily_content=daily_content or "(no recent daily records)",
)
from agent.protocol.models import LLMRequest
# Scale max_tokens based on input size to avoid truncating large MEMORY.md
input_chars = len(memory_content) + len(daily_content)
dream_max_tokens = max(2000, min(input_chars, 8000))
request = LLMRequest(
messages=[{"role": "user", "content": user_msg}],
temperature=0.3,
max_tokens=dream_max_tokens,
stream=False,
system=DREAM_SYSTEM_PROMPT,
)
response = self.llm_model.call(request)
raw = self._extract_response_text(response)
elapsed = _time.monotonic() - t0
if not raw or not raw.strip():
logger.warning(f"[DeepDream] LLM returned empty response ({elapsed:.1f}s)")
return False
logger.info(f"[DeepDream] LLM distillation completed ({elapsed:.1f}s, {len(raw)} chars)")
except Exception as e:
elapsed = _time.monotonic() - t0
logger.warning(f"[DeepDream] LLM call failed ({elapsed:.1f}s): {e}")
return False
# Parse [MEMORY] and [DREAM] sections
new_memory, dream_diary = self._parse_dream_output(raw)
if not new_memory:
logger.warning("[DeepDream] No [MEMORY] section in LLM output, skipping overwrite")
return False
# Overwrite MEMORY.md (with shrinkage protection)
try:
main_file = self.get_main_memory_file(user_id)
old_size = len(memory_content)
if old_size > 200 and len(new_memory) < old_size * 0.3:
logger.warning(
f"[DeepDream] Distilled MEMORY.md is too small "
f"({old_size}{len(new_memory)} chars, <30%), aborting to prevent data loss"
)
return False
main_file.write_text(new_memory + "\n", encoding="utf-8")
logger.info(
f"[DeepDream] Updated MEMORY.md "
f"({old_size}{len(new_memory)} chars)"
)
except Exception as e:
logger.warning(f"[DeepDream] Failed to write MEMORY.md: {e}")
return False
# Write dream diary
if dream_diary:
try:
self._write_dream_diary(dream_diary, user_id)
except Exception as e:
logger.warning(f"[DeepDream] Failed to write dream diary: {e}")
logger.info("[DeepDream] ✅ Deep Dream completed successfully")
return True
def _read_main_memory(self, user_id: Optional[str] = None) -> str:
"""Read current MEMORY.md content."""
main_file = self.get_main_memory_file(user_id)
if main_file.exists():
return main_file.read_text(encoding="utf-8").strip()
return ""
def _read_recent_dailies(
self, user_id: Optional[str] = None, lookback_days: int = 1
) -> tuple:
"""
Read recent daily memory files.
Returns:
(combined_text, has_content) tuple
"""
from datetime import timedelta
parts = []
has_content = False
today = datetime.now().date()
for offset in range(lookback_days):
day = today - timedelta(days=offset)
date_str = day.strftime("%Y-%m-%d")
if user_id:
daily_file = self.memory_dir / "users" / user_id / f"{date_str}.md"
else:
daily_file = self.memory_dir / f"{date_str}.md"
if daily_file.exists():
content = daily_file.read_text(encoding="utf-8").strip()
if content:
parts.append(f"### {date_str}\n\n{content}")
has_content = True
else:
parts.append(f"### {date_str}\n\n(no records)")
return "\n\n".join(parts), has_content
@staticmethod
def _parse_dream_output(raw: str) -> tuple:
"""Parse LLM output into (new_memory, dream_diary)."""
raw = raw.strip().replace("```", "")
new_memory = ""
dream_diary = ""
if "[MEMORY]" in raw:
start = raw.index("[MEMORY]") + len("[MEMORY]")
end = raw.index("[DREAM]") if "[DREAM]" in raw else len(raw)
new_memory = raw[start:end].strip()
if "[DREAM]" in raw:
start = raw.index("[DREAM]") + len("[DREAM]")
dream_diary = raw[start:].strip()
return new_memory, dream_diary
def _write_dream_diary(self, content: str, user_id: Optional[str] = None):
"""Write dream diary to memory/dreams/YYYY-MM-DD.md."""
dreams_dir = self.memory_dir / "dreams"
if user_id:
dreams_dir = self.memory_dir / "users" / user_id / "dreams"
dreams_dir.mkdir(parents=True, exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d")
diary_file = dreams_dir / f"{today}.md"
diary_file.write_text(
f"# Dream Diary: {today}\n\n{content}\n",
encoding="utf-8",
)
logger.info(f"[DeepDream] Wrote dream diary to {diary_file}")
# ---- Internal helpers ----
def _summarize_messages(self, messages: List[Dict], max_messages: int = 0) -> str:
"""
Summarize conversation messages using LLM, with rule-based fallback.
Summarize conversation messages using LLM.
Returns empty string if LLM deems content not worth recording.
Rule-based fallback only used when LLM call raises an exception.
"""
conversation_text = self._format_conversation_for_summary(messages, max_messages)
if not conversation_text.strip():
@@ -329,13 +496,14 @@ class MemoryFlushManager:
summary = self._call_llm_for_summary(conversation_text)
if summary and summary.strip() and summary.strip() != "":
return summary.strip()
logger.info(f"[MemoryFlush] LLM returned empty or '', using fallback")
logger.info("[MemoryFlush] LLM returned empty or '', skipping write")
return ""
except Exception as e:
logger.warning(f"[MemoryFlush] LLM summarization failed, using fallback: {e}")
return self._extract_summary_fallback(messages, max_messages)
else:
logger.info("[MemoryFlush] No LLM model available, using rule-based fallback")
return self._extract_summary_fallback(messages, max_messages)
return self._extract_summary_fallback(messages, max_messages)
def _format_conversation_for_summary(self, messages: List[Dict], max_messages: int = 0) -> str:
"""Format messages into readable conversation text for LLM summarization."""