From bc9c1691f5c54f9d5a8cc6956a965170bc515bf3 Mon Sep 17 00:00:00 2001 From: yangluxin613 Date: Mon, 25 May 2026 08:56:08 +0800 Subject: [PATCH 1/3] fix(memory): CJK keyword search + vector search optimization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add trigram FTS5 table for CJK/mixed-language search with BM25 ranking - Fix three-step search routing: unicode61 (ASCII) → trigram (CJK/mixed) → LIKE fallback - Fix _bm25_rank_to_score: abs(rank)/(1+abs(rank)) instead of max(0,rank) - Fix INSERT OR REPLACE → UPSERT to preserve FTS5 content table rowid stability - Fix FTS5 JOIN to use rowid instead of id column - Fix _search_like: single-char CJK match, dynamic scoring, merged CJK+ASCII path - Add numpy vectorized cosine similarity + BLOB embedding storage (6x smaller) - Add _decode_embedding backward compat for legacy JSON embeddings - Add threading.RLock for concurrent write safety - Add _meta table to avoid trigram backfill re-running on every startup - Activate EmbeddingCache in MemoryManager for session-level query deduplication - Add numpy>=1.24 to requirements.txt - Merge upstream master (embedding package refactor, FTS5 self-healing methods) Co-Authored-By: Claude Sonnet 4.6 --- agent/memory/manager.py | 17 +- agent/memory/storage.py | 521 ++++++++++++++++++++++++++++++---------- requirements.txt | 1 + 3 files changed, 408 insertions(+), 131 deletions(-) diff --git a/agent/memory/manager.py b/agent/memory/manager.py index 6aaac767..5ec2ade7 100644 --- a/agent/memory/manager.py +++ b/agent/memory/manager.py @@ -13,7 +13,7 @@ from datetime import datetime, timedelta from agent.memory.config import MemoryConfig, get_default_memory_config from agent.memory.storage import MemoryStorage, MemoryChunk, SearchResult from agent.memory.chunker import TextChunker -from agent.memory.embedding import EmbeddingProvider +from agent.memory.embedding import EmbeddingProvider, EmbeddingCache from agent.memory.summarizer import MemoryFlushManager, create_memory_files_if_needed @@ -61,7 +61,11 @@ class MemoryManager: logger.info( "[MemoryManager] No embedding provider; memory will use keyword search only" ) - + + # Cache for query embeddings (avoids redundant API calls within a session) + self._embedding_cache = EmbeddingCache() + + # Initialize memory flush manager workspace_dir = self.config.get_workspace() self.flush_manager = MemoryFlushManager( @@ -128,7 +132,14 @@ class MemoryManager: vector_results = [] if self.embedding_provider: try: - query_embedding = self.embedding_provider.embed_query(query) + provider_name = type(self.embedding_provider).__name__ + model_name = getattr(self.embedding_provider, 'model', '') + cached = self._embedding_cache.get(query, provider_name, model_name) + if cached is not None: + query_embedding = cached + else: + query_embedding = self.embedding_provider.embed_query(query) + self._embedding_cache.put(query, provider_name, model_name, query_embedding) vector_results = self.storage.search_vector( query_embedding=query_embedding, user_id=user_id, diff --git a/agent/memory/storage.py b/agent/memory/storage.py index 0a4e6edb..e8c6a505 100644 --- a/agent/memory/storage.py +++ b/agent/memory/storage.py @@ -5,12 +5,32 @@ Provides vector and keyword search capabilities """ from __future__ import annotations +import re import sqlite3 import json import hashlib +import threading from typing import List, Dict, Optional, Any from pathlib import Path from dataclasses import dataclass +import numpy as np + +# --------------------------------------------------------------------------- +# CJK character ranges, compiled once at module load. +# Covers: CJK Symbols/Punctuation, Japanese kana (hiragana + katakana), +# CJK Unified Ideographs + Extension A, Korean syllables (Hangul), +# CJK Compatibility Ideographs, and CJK Extension B–F. +# --------------------------------------------------------------------------- +_CJK_RANGES = ( + r'\u3000-\u30ff' # CJK Symbols/Punctuation + Japanese kana + r'\u3400-\u9fff' # CJK Unified Ideographs (incl. Extension A) + r'\uac00-\ud7af' # Korean syllables (Hangul) + r'\uf900-\ufaff' # CJK Compatibility Ideographs + r'\U00020000-\U0002fa1f' # CJK Extension B–F +) +_RE_CONTAINS_CJK = re.compile(f'[{_CJK_RANGES}]') +_RE_CJK_WORDS = re.compile(f'[{_CJK_RANGES}]+') +_RE_TRIGRAM_TOKENS = re.compile(f'[{_CJK_RANGES}]+|[A-Za-z0-9_]+') @dataclass @@ -48,6 +68,10 @@ class MemoryStorage: self.db_path = db_path self.conn: Optional[sqlite3.Connection] = None self.fts5_available = False # Track FTS5 availability + # RLock protects concurrent writes from the same process. + # SQLite WAL mode handles read/write concurrency at the file level, + # but same-process concurrent writes still need a Python-level lock. + self._lock = threading.RLock() self._init_db() def _check_fts5_support(self) -> bool: @@ -175,6 +199,75 @@ class MemoryStorage: ) self._rebuild_fts5_from_chunks() + # Internal key-value store for persistent flags (e.g. backfill tracking) + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS _meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ) + """) + + # Create trigram FTS5 table for CJK / mixed-language search + self.trigram_fts5_available = False + if self.fts5_available: + try: + self.conn.execute(""" + CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts_trigram USING fts5( + text, + id UNINDEXED, + user_id UNINDEXED, + path UNINDEXED, + source UNINDEXED, + scope UNINDEXED, + content='chunks', + content_rowid='rowid', + tokenize='trigram case_sensitive 0' + ) + """) + self.conn.execute(""" + CREATE TRIGGER IF NOT EXISTS chunks_trigram_ai + AFTER INSERT ON chunks BEGIN + INSERT INTO chunks_fts_trigram(rowid, text, id, user_id, path, source, scope) + VALUES (new.rowid, new.text, new.id, new.user_id, new.path, new.source, new.scope); + END + """) + self.conn.execute(""" + CREATE TRIGGER IF NOT EXISTS chunks_trigram_ad + AFTER DELETE ON chunks BEGIN + DELETE FROM chunks_fts_trigram WHERE rowid = old.rowid; + END + """) + self.conn.execute(""" + CREATE TRIGGER IF NOT EXISTS chunks_trigram_au + AFTER UPDATE ON chunks BEGIN + UPDATE chunks_fts_trigram + SET text=new.text, id=new.id, user_id=new.user_id, + path=new.path, source=new.source, scope=new.scope + WHERE rowid = new.rowid; + END + """) + # One-time backfill for existing rows. + # NOTE: COUNT(*) on an FTS5 content table always returns 0, so we + # use a persistent flag in _meta instead of counting trigram rows. + backfill_done = self.conn.execute( + "SELECT 1 FROM _meta WHERE key = 'trigram_backfill_done'" + ).fetchone() + chunks_count = self.conn.execute( + "SELECT COUNT(*) as c FROM chunks" + ).fetchone()['c'] + if chunks_count > 0 and not backfill_done: + self.conn.execute( + "INSERT INTO chunks_fts_trigram(chunks_fts_trigram) VALUES('rebuild')" + ) + self.conn.execute( + "INSERT OR REPLACE INTO _meta(key, value) VALUES('trigram_backfill_done', '1')" + ) + self.trigram_fts5_available = True + except Exception: + from common.log import logger + logger.warning("[MemoryStorage] trigram FTS5 unavailable, CJK search will use LIKE fallback", exc_info=True) + self.trigram_fts5_available = False + # Create files metadata table self.conn.execute(""" CREATE TABLE IF NOT EXISTS files ( @@ -186,7 +279,7 @@ class MemoryStorage: updated_at INTEGER DEFAULT (strftime('%s', 'now')) ) """) - + self.conn.commit() def _fts5_state_inconsistent(self) -> bool: @@ -299,43 +392,82 @@ class MemoryStorage: self.conn.commit() def save_chunk(self, chunk: MemoryChunk): - """Save a memory chunk""" - self.conn.execute(""" - INSERT OR REPLACE INTO chunks - (id, user_id, scope, source, path, start_line, end_line, text, embedding, hash, metadata, updated_at) + """Save a memory chunk (insert or update by id). + + Uses SQLite UPSERT (INSERT … ON CONFLICT DO UPDATE) instead of + INSERT OR REPLACE. INSERT OR REPLACE internally does DELETE+INSERT, + which changes the row's rowid. Because both FTS5 tables use + content_rowid='rowid', a new rowid would leave the old FTS index + entries pointing at a non-existent rowid and trigger + "fts5: missing row N from content table" errors. + ON CONFLICT DO UPDATE fires the AFTER UPDATE trigger (chunks_au / + chunks_trigram_au) and keeps the original rowid intact. + """ + _SQL = """ + INSERT INTO chunks + (id, user_id, scope, source, path, start_line, end_line, + text, embedding, hash, metadata, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now')) - """, ( - chunk.id, - chunk.user_id, - chunk.scope, - chunk.source, - chunk.path, - chunk.start_line, - chunk.end_line, - chunk.text, - json.dumps(chunk.embedding) if chunk.embedding else None, + ON CONFLICT(id) DO UPDATE SET + user_id = excluded.user_id, + scope = excluded.scope, + source = excluded.source, + path = excluded.path, + start_line = excluded.start_line, + end_line = excluded.end_line, + text = excluded.text, + embedding = excluded.embedding, + hash = excluded.hash, + metadata = excluded.metadata, + updated_at = strftime('%s', 'now') + """ + params = ( + chunk.id, chunk.user_id, chunk.scope, chunk.source, chunk.path, + chunk.start_line, chunk.end_line, chunk.text, + self._encode_embedding(chunk.embedding), chunk.hash, - json.dumps(chunk.metadata) if chunk.metadata else None - )) - self.conn.commit() - + json.dumps(chunk.metadata) if chunk.metadata else None, + ) + with self._lock: + self.conn.execute(_SQL, params) + self.conn.commit() + def save_chunks_batch(self, chunks: List[MemoryChunk]): - """Save multiple chunks in a batch""" - self.conn.executemany(""" - INSERT OR REPLACE INTO chunks - (id, user_id, scope, source, path, start_line, end_line, text, embedding, hash, metadata, updated_at) + """Save multiple chunks in a batch (insert or update by id). + + See save_chunk for why UPSERT is used instead of INSERT OR REPLACE. + """ + _SQL = """ + INSERT INTO chunks + (id, user_id, scope, source, path, start_line, end_line, + text, embedding, hash, metadata, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now')) - """, [ + ON CONFLICT(id) DO UPDATE SET + user_id = excluded.user_id, + scope = excluded.scope, + source = excluded.source, + path = excluded.path, + start_line = excluded.start_line, + end_line = excluded.end_line, + text = excluded.text, + embedding = excluded.embedding, + hash = excluded.hash, + metadata = excluded.metadata, + updated_at = strftime('%s', 'now') + """ + params_list = [ ( c.id, c.user_id, c.scope, c.source, c.path, c.start_line, c.end_line, c.text, - json.dumps(c.embedding) if c.embedding else None, + self._encode_embedding(c.embedding), c.hash, - json.dumps(c.metadata) if c.metadata else None + json.dumps(c.metadata) if c.metadata else None, ) for c in chunks - ]) - self.conn.commit() + ] + with self._lock: + self.conn.executemany(_SQL, params_list) + self.conn.commit() def get_chunk(self, chunk_id: str) -> Optional[MemoryChunk]: """Get a chunk by ID""" @@ -356,21 +488,21 @@ class MemoryStorage: limit: int = 10 ) -> List[SearchResult]: """ - Vector similarity search using in-memory cosine similarity - (sqlite-vec can be added later for better performance) + Vector similarity search using numpy-vectorized cosine similarity. + All embeddings are loaded then scored in a single BLAS matrix-vector + multiply, which is ~100x faster than the pure-Python per-row loop. """ if scopes is None: scopes = ["shared"] if user_id: scopes.append("user") - - # Build query + scope_placeholders = ','.join('?' * len(scopes)) - params = scopes - + params = list(scopes) + if user_id: query = f""" - SELECT * FROM chunks + SELECT * FROM chunks WHERE scope IN ({scope_placeholders}) AND (scope = 'shared' OR user_id = ?) AND embedding IS NOT NULL @@ -378,50 +510,69 @@ class MemoryStorage: params.append(user_id) else: query = f""" - SELECT * FROM chunks + SELECT * FROM chunks WHERE scope IN ({scope_placeholders}) AND embedding IS NOT NULL """ - + rows = self.conn.execute(query, params).fetchall() + if not rows: + return [] - # Calculate cosine similarity. We probe the first row's dim to fail - # loudly on a query/index dim mismatch — otherwise every doc would - # score 0 silently, leaving the user wondering why search broke. - results = [] - query_dim = len(query_embedding) - if rows: - first = json.loads(rows[0]['embedding']) - if isinstance(first, list) and len(first) != query_dim: - raise ValueError( - f"Embedding dim mismatch: query is {query_dim}-dim but " - f"index stores {len(first)}-dim vectors. The configured " - f"embedding model differs from the one that built the " - f"index — run /memory rebuild-index to re-embed." - ) - + # Parse embeddings and build a (N, D) matrix in one pass. + # New rows store BLOB bytes (np.frombuffer); legacy rows fall back to JSON. + # Filter out rows whose embedding dimension differs from the query — + # mixing dimensions would cause np.array() to produce an object array + # and matrix @ q_vec to raise ValueError. + expected_dim = len(query_embedding) + valid_rows = [] + vectors = [] for row in rows: - embedding = json.loads(row['embedding']) - similarity = self._cosine_similarity(query_embedding, embedding) + vec = self._decode_embedding(row['embedding']) + if not vec: + continue + if len(vec) != expected_dim: + from common.log import logger + logger.warning( + "[MemoryStorage] Skipping chunk %s: embedding dim %d != query dim %d", + row['id'], len(vec), expected_dim + ) + continue + valid_rows.append(row) + vectors.append(vec) + + if not vectors: + return [] + + matrix = np.array(vectors, dtype=np.float32) # (N, D) + q_vec = np.array(query_embedding, dtype=np.float32) # (D,) + + # Vectorized cosine similarity: dot(matrix, q) / (||matrix|| * ||q||) + dots = matrix @ q_vec # (N,) + row_norms = np.linalg.norm(matrix, axis=1) # (N,) + q_norm = float(np.linalg.norm(q_vec)) + denominators = row_norms * q_norm + np.maximum(denominators, 1e-10, out=denominators) # avoid div-by-zero + sims = dots / denominators # (N,) + + # Select TopK using argpartition (O(N) average), then sort only those K + k = min(limit, len(valid_rows)) + top_idx = np.argpartition(sims, -k)[-k:] + top_idx = top_idx[np.argsort(sims[top_idx])[::-1]] + - if similarity > 0: - results.append((similarity, row)) - - # Sort by similarity and limit - results.sort(key=lambda x: x[0], reverse=True) - results = results[:limit] - return [ SearchResult( - path=row['path'], - start_line=row['start_line'], - end_line=row['end_line'], - score=score, - snippet=self._truncate_text(row['text'], 500), - source=row['source'], - user_id=row['user_id'] + path=valid_rows[i]['path'], + start_line=valid_rows[i]['start_line'], + end_line=valid_rows[i]['end_line'], + score=float(sims[i]), + snippet=self._truncate_text(valid_rows[i]['text'], 500), + source=valid_rows[i]['source'], + user_id=valid_rows[i]['user_id'] ) - for score, row in results + for i in top_idx + if sims[i] > 0 ] def search_keyword( @@ -445,12 +596,37 @@ class MemoryStorage: if user_id: scopes.append("user") - if self.fts5_available: + # Step 1: Standard FTS5 (unicode61) — pure ASCII queries only. + # Skipped when query contains any CJK characters: unicode61 tokenises CJK + # as individual characters without forming meaningful tokens, so it would + # match only the ASCII portion of a mixed query (e.g. "Python" from + # "Python教程") and silently discard the CJK part. Those queries go + # directly to Step 2 (trigram), which handles both ASCII and CJK together. + fts1_attempted = False + if (self.fts5_available + and not MemoryStorage._contains_cjk(query) + and MemoryStorage._build_fts_query(query)): + fts1_attempted = True fts_results = self._search_fts5(query, user_id, scopes, limit) if fts_results: return fts_results - return self._search_like(query, user_id, scopes, limit) + # Step 2: Trigram FTS5 — CJK/mixed queries, plus fallback when unicode61 + # returned nothing (trigram indexes all scripts with 3-char sliding windows, + # so it can catch terms that unicode61 tokenisation misses). + if self.trigram_fts5_available and ( + MemoryStorage._contains_cjk(query) or fts1_attempted + ): + trigram_results = self._search_fts5_trigram(query, user_id, scopes, limit) + if trigram_results: + return trigram_results + + # Step 3: LIKE fallback — last resort (FTS5 unavailable, or CJK single-char + # that trigram cannot match because it requires ≥3-char tokens). + if not self.fts5_available or MemoryStorage._contains_cjk(query): + return self._search_like(query, user_id, scopes, limit) + + return [] def _search_fts5( self, @@ -471,7 +647,7 @@ class MemoryStorage: sql_query = f""" SELECT chunks.*, bm25(chunks_fts) as rank FROM chunks_fts - JOIN chunks ON chunks.id = chunks_fts.id + JOIN chunks ON chunks.rowid = chunks_fts.rowid WHERE chunks_fts MATCH ? AND chunks.scope IN ({scope_placeholders}) AND (chunks.scope = 'shared' OR chunks.user_id = ?) @@ -483,7 +659,7 @@ class MemoryStorage: sql_query = f""" SELECT chunks.*, bm25(chunks_fts) as rank FROM chunks_fts - JOIN chunks ON chunks.id = chunks_fts.id + JOIN chunks ON chunks.rowid = chunks_fts.rowid WHERE chunks_fts MATCH ? AND chunks.scope IN ({scope_placeholders}) ORDER BY rank @@ -505,13 +681,11 @@ class MemoryStorage: ) for row in rows ] - except Exception as e: + except Exception: from common.log import logger - logger.error( - f"[MemoryStorage] FTS5 search failed (caller will fall back to LIKE): {e}" - ) + logger.warning("[MemoryStorage] _search_fts5 failed, returning empty", exc_info=True) return [] - + def _search_like( self, query: str, @@ -522,12 +696,11 @@ class MemoryStorage: """LIKE-based search. Used as the keyword-search fallback when FTS5 is unavailable, fails, - or returns empty. Supports both CJK runs and ASCII word tokens so it - can serve as a true safety net for any query. + or returns empty. Supports both CJK runs (1+ chars) and ASCII word + tokens (3+ chars) so it can serve as a true safety net for any query. """ - import re - # CJK runs (2+ chars) + ASCII word tokens (3+ chars to avoid noise) - cjk_words = re.findall(r'[\u4e00-\u9fff]{2,}', query) + # CJK runs (1+ chars, wide Unicode range) + ASCII words (3+ chars to avoid noise) + cjk_words = _RE_CJK_WORDS.findall(query) ascii_words = [t for t in re.findall(r'[A-Za-z0-9_]+', query) if len(t) >= 3] words = cjk_words + ascii_words if not words: @@ -565,44 +738,52 @@ class MemoryStorage: try: rows = self.conn.execute(sql_query, params).fetchall() - return [ - SearchResult( + results = [] + for row in rows: + # Dynamic score: reward chunks that contain more of the query words. + # matched_count should always be ≥1 (WHERE uses OR), but guard + # defensively so zero-match rows are never surfaced. + matched_count = sum(1 for w in cjk_words if w in row['text']) + if matched_count == 0: + continue + score = min(0.85, 0.3 + 0.15 * matched_count) + results.append(SearchResult( path=row['path'], start_line=row['start_line'], end_line=row['end_line'], - score=0.5, # Fixed score for LIKE search + score=score, snippet=self._truncate_text(row['text'], 500), source=row['source'], user_id=row['user_id'] - ) - for row in rows - ] - except Exception as e: + )) + results.sort(key=lambda r: r.score, reverse=True) + return results + except Exception: from common.log import logger - logger.error(f"[MemoryStorage] LIKE search failed: {e}") + logger.warning("[MemoryStorage] _search_like failed, returning empty", exc_info=True) return [] - + def delete_by_path(self, path: str): """Delete all chunks from a file""" - self.conn.execute(""" - DELETE FROM chunks WHERE path = ? - """, (path,)) - self.conn.commit() - + with self._lock: + self.conn.execute("DELETE FROM chunks WHERE path = ?", (path,)) + self.conn.commit() + def get_file_hash(self, path: str) -> Optional[str]: """Get stored file hash""" row = self.conn.execute(""" SELECT hash FROM files WHERE path = ? """, (path,)).fetchone() return row['hash'] if row else None - + def update_file_metadata(self, path: str, source: str, file_hash: str, mtime: int, size: int): """Update file metadata""" - self.conn.execute(""" - INSERT OR REPLACE INTO files (path, source, hash, mtime, size, updated_at) - VALUES (?, ?, ?, ?, ?, strftime('%s', 'now')) - """, (path, source, file_hash, mtime, size)) - self.conn.commit() + with self._lock: + self.conn.execute(""" + INSERT OR REPLACE INTO files (path, source, hash, mtime, size, updated_at) + VALUES (?, ?, ?, ?, ?, strftime('%s', 'now')) + """, (path, source, file_hash, mtime, size)) + self.conn.commit() def get_stats(self) -> Dict[str, int]: """Get storage statistics""" @@ -632,7 +813,8 @@ class MemoryStorage: self.conn.close() self.conn = None # Mark as closed except Exception as e: - print(f"⚠️ Error closing database connection: {e}") + from common.log import logger + logger.warning("[MemoryStorage] Error closing database connection: %s", e) def __del__(self): """Destructor to ensure connection is closed""" @@ -642,7 +824,24 @@ class MemoryStorage: pass # Ignore errors during cleanup # Helper methods - + + @staticmethod + def _encode_embedding(embedding: Optional[List[float]]) -> Optional[bytes]: + """Encode embedding as float32 BLOB bytes (~6x smaller and faster than JSON).""" + if embedding is None: + return None + return np.array(embedding, dtype=np.float32).tobytes() + + @staticmethod + def _decode_embedding(raw) -> Optional[List[float]]: + """Decode embedding from BLOB bytes or legacy JSON string.""" + if raw is None: + return None + if isinstance(raw, (bytes, bytearray)): + return np.frombuffer(raw, dtype=np.float32).tolist() + # Legacy JSON format written by older versions + return json.loads(raw) + def _row_to_chunk(self, row) -> MemoryChunk: """Convert database row to MemoryChunk""" return MemoryChunk( @@ -654,32 +853,89 @@ class MemoryStorage: start_line=row['start_line'], end_line=row['end_line'], text=row['text'], - embedding=json.loads(row['embedding']) if row['embedding'] else None, + embedding=self._decode_embedding(row['embedding']), hash=row['hash'], metadata=json.loads(row['metadata']) if row['metadata'] else None ) @staticmethod - def _cosine_similarity(vec1: List[float], vec2: List[float]) -> float: - """Calculate cosine similarity between two vectors""" - if len(vec1) != len(vec2): - return 0.0 - - dot_product = sum(a * b for a, b in zip(vec1, vec2)) - norm1 = sum(a * a for a in vec1) ** 0.5 - norm2 = sum(b * b for b in vec2) ** 0.5 - - if norm1 == 0 or norm2 == 0: - return 0.0 - - return dot_product / (norm1 * norm2) + def _contains_cjk(text: str) -> bool: + """Check if text contains CJK or related characters (Chinese, Japanese, Korean).""" + return bool(_RE_CONTAINS_CJK.search(text)) @staticmethod - def _contains_cjk(text: str) -> bool: - """Check if text contains CJK (Chinese/Japanese/Korean) characters""" - import re - return bool(re.search(r'[\u4e00-\u9fff]', text)) - + def _build_trigram_query(raw_query: str) -> Optional[str]: + """ + Build FTS5 MATCH query for the trigram tokenizer. + Extracts CJK sequences (including single characters) and ASCII words, + joining them with AND so all terms must appear in the matched chunk. + """ + tokens = _RE_TRIGRAM_TOKENS.findall(raw_query) + tokens = [t for t in tokens if t] + if not tokens: + return None + # Escape embedded double-quotes (FTS5 uses "" inside quoted phrases) + quoted = [f'"{t.replace(chr(34), chr(34)*2)}"' for t in tokens] + return ' AND '.join(quoted) + + def _search_fts5_trigram( + self, + query: str, + user_id: Optional[str], + scopes: List[str], + limit: int + ) -> List[SearchResult]: + """Trigram FTS5 search — handles CJK and mixed queries with BM25 ranking.""" + trigram_query = self._build_trigram_query(query) + if not trigram_query: + return [] + + scope_placeholders = ','.join('?' * len(scopes)) + params = [trigram_query] + list(scopes) + + if user_id: + sql = f""" + SELECT chunks.*, bm25(chunks_fts_trigram) as rank + FROM chunks_fts_trigram + JOIN chunks ON chunks.rowid = chunks_fts_trigram.rowid + WHERE chunks_fts_trigram MATCH ? + AND chunks.scope IN ({scope_placeholders}) + AND (chunks.scope = 'shared' OR chunks.user_id = ?) + ORDER BY rank + LIMIT ? + """ + params.extend([user_id, limit]) + else: + sql = f""" + SELECT chunks.*, bm25(chunks_fts_trigram) as rank + FROM chunks_fts_trigram + JOIN chunks ON chunks.rowid = chunks_fts_trigram.rowid + WHERE chunks_fts_trigram MATCH ? + AND chunks.scope IN ({scope_placeholders}) + ORDER BY rank + LIMIT ? + """ + params.append(limit) + + try: + rows = self.conn.execute(sql, params).fetchall() + return [ + SearchResult( + path=row['path'], + start_line=row['start_line'], + end_line=row['end_line'], + score=self._bm25_rank_to_score(row['rank']), + snippet=self._truncate_text(row['text'], 500), + source=row['source'], + user_id=row['user_id'] + ) + for row in rows + ] + except Exception: + from common.log import logger + logger.warning("[MemoryStorage] _search_fts5_trigram failed, returning empty", exc_info=True) + return [] + @staticmethod def _build_fts_query(raw_query: str) -> Optional[str]: """ @@ -688,7 +944,6 @@ class MemoryStorage: Works best for English and word-based languages. For CJK characters, LIKE search will be used as fallback. """ - import re # Extract words (primarily English words and numbers) tokens = re.findall(r'[A-Za-z0-9_]+', raw_query) if not tokens: @@ -701,9 +956,19 @@ class MemoryStorage: @staticmethod def _bm25_rank_to_score(rank: float) -> float: - """Convert BM25 rank to 0-1 score""" - normalized = max(0, rank) if rank is not None else 999 - return 1 / (1 + normalized) + """Convert SQLite BM25 rank to a [0, 1) relevance score. + + SQLite's bm25() returns a non-positive float (0 or negative). + More negative = more relevant. max(0, rank) would clip every + negative value to 0, making every score 1/(1+0) = 1.0 and + destroying all ranking information. + + abs(rank) / (1 + abs(rank)) maps the absolute relevance magnitude + to [0, 1): larger |rank| (stronger match) → score closer to 1. + """ + if rank is None: + return 0.0 + return abs(rank) / (1.0 + abs(rank)) @staticmethod def _truncate_text(text: str, max_chars: int) -> str: diff --git a/requirements.txt b/requirements.txt index 88b832b7..f92b0979 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +numpy>=1.24 aiohttp>=3.8.6,<3.10 requests>=2.28.2 chardet>=5.1.0 From 9b31f454819c92ebe9a9b563ed79b0f18918727d Mon Sep 17 00:00:00 2001 From: yangluxin613 Date: Mon, 25 May 2026 09:02:07 +0800 Subject: [PATCH 2/3] fix(memory): _search_like ASCII query always returns empty matched_count only counted cjk_words hits; pure ASCII queries had cjk_words=[] so matched_count=0 and all SQL-matched rows were filtered out. Change to count across all tokens (cjk_words + ascii_words) so the LIKE fallback works correctly when FTS5 is unavailable. Co-Authored-By: Claude Sonnet 4.6 --- agent/memory/storage.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/agent/memory/storage.py b/agent/memory/storage.py index e8c6a505..1a004904 100644 --- a/agent/memory/storage.py +++ b/agent/memory/storage.py @@ -741,9 +741,11 @@ class MemoryStorage: results = [] for row in rows: # Dynamic score: reward chunks that contain more of the query words. - # matched_count should always be ≥1 (WHERE uses OR), but guard - # defensively so zero-match rows are never surfaced. - matched_count = sum(1 for w in cjk_words if w in row['text']) + # Use all tokens (CJK + ASCII) so pure-ASCII queries are not skipped. + # matched_count is always ≥1 because the WHERE clause uses OR, but + # guard defensively so unexpected zero-match rows are never surfaced. + text_lower = row['text'].lower() + matched_count = sum(1 for w in words if w.lower() in text_lower) if matched_count == 0: continue score = min(0.85, 0.3 + 0.15 * matched_count) From fd571ac53931917a5c4e47377cfa4a675367ab5b Mon Sep 17 00:00:00 2001 From: yangluxin613 Date: Mon, 25 May 2026 14:15:16 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix(memory):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20numpy/UPSERT=20soft=20deps=20+=20BM25=20floor=20+?= =?UTF-8?q?=20BLOB=20dim?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - numpy soft dependency: try/except import + _HAS_NUMPY flag; _encode_embedding and _decode_embedding fall back to struct.pack/unpack; search_vector falls back to pure-Python cosine loop — startup never fails without numpy reinstalled - SQLite UPSERT guard: _HAS_UPSERT = sqlite_version_info >= (3,24,0); save_chunk and save_chunks_batch fall back to INSERT OR REPLACE on SQLite < 3.24 with a one-time startup warning about potential FTS rowid drift - _bm25_rank_to_score floor: 0.3 + 0.69*(|rank|/(1+|rank|)) → always in [0.3, 0.99), prevents small-corpus matches scoring 0.0 and being filtered by min_score - detect_index_dim BLOB-aware: check isinstance(raw, bytes) first and return len(raw)//4 before json.loads, so /memory status works after embedding format switch - Comment: "CJK single-char" → "CJK tokens shorter than 3 characters" Co-Authored-By: Claude Sonnet 4.6 --- agent/memory/embedding/state.py | 8 +- agent/memory/storage.py | 213 +++++++++++++++++++++----------- 2 files changed, 148 insertions(+), 73 deletions(-) diff --git a/agent/memory/embedding/state.py b/agent/memory/embedding/state.py index 3fb60b23..5efffef2 100644 --- a/agent/memory/embedding/state.py +++ b/agent/memory/embedding/state.py @@ -31,9 +31,13 @@ def detect_index_dim(storage) -> Optional[int]: if not row or not row["embedding"]: return None try: - emb = json.loads(row["embedding"]) + raw = row["embedding"] + if isinstance(raw, (bytes, bytearray)): + # New BLOB format: 4 bytes per float32 + return len(raw) // 4 + emb = json.loads(raw) return len(emb) if isinstance(emb, list) else None - except (json.JSONDecodeError, TypeError): + except (json.JSONDecodeError, TypeError, Exception): return None diff --git a/agent/memory/storage.py b/agent/memory/storage.py index 1a004904..683b083f 100644 --- a/agent/memory/storage.py +++ b/agent/memory/storage.py @@ -13,7 +13,17 @@ import threading from typing import List, Dict, Optional, Any from pathlib import Path from dataclasses import dataclass -import numpy as np +try: + import numpy as np + _HAS_NUMPY = True +except ImportError: + _HAS_NUMPY = False + np = None # type: ignore[assignment] + +# UPSERT (INSERT … ON CONFLICT DO UPDATE) requires SQLite ≥ 3.24.0 (2018). +# Older systems (e.g. CentOS 7 ships SQLite 3.7) fall back to INSERT OR REPLACE, +# which risks FTS5 rowid drift on chunk updates (see save_chunk docstring). +_HAS_UPSERT = sqlite3.sqlite_version_info >= (3, 24, 0) # --------------------------------------------------------------------------- # CJK character ranges, compiled once at module load. @@ -93,6 +103,14 @@ class MemoryStorage: # Check FTS5 support self.fts5_available = self._check_fts5_support() + if not _HAS_UPSERT: + from common.log import logger + logger.warning( + "[MemoryStorage] SQLite %s < 3.24 — UPSERT unavailable. " + "Falling back to INSERT OR REPLACE; FTS5 rowid may drift on " + "chunk updates (rebuild index periodically to recover).", + sqlite3.sqlite_version, + ) if not self.fts5_available: from common.log import logger logger.debug("[MemoryStorage] FTS5 not available, using LIKE-based keyword search") @@ -403,24 +421,32 @@ class MemoryStorage: ON CONFLICT DO UPDATE fires the AFTER UPDATE trigger (chunks_au / chunks_trigram_au) and keeps the original rowid intact. """ - _SQL = """ - INSERT INTO chunks - (id, user_id, scope, source, path, start_line, end_line, - text, embedding, hash, metadata, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now')) - ON CONFLICT(id) DO UPDATE SET - user_id = excluded.user_id, - scope = excluded.scope, - source = excluded.source, - path = excluded.path, - start_line = excluded.start_line, - end_line = excluded.end_line, - text = excluded.text, - embedding = excluded.embedding, - hash = excluded.hash, - metadata = excluded.metadata, - updated_at = strftime('%s', 'now') - """ + if _HAS_UPSERT: + _SQL = """ + INSERT INTO chunks + (id, user_id, scope, source, path, start_line, end_line, + text, embedding, hash, metadata, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now')) + ON CONFLICT(id) DO UPDATE SET + user_id = excluded.user_id, + scope = excluded.scope, + source = excluded.source, + path = excluded.path, + start_line = excluded.start_line, + end_line = excluded.end_line, + text = excluded.text, + embedding = excluded.embedding, + hash = excluded.hash, + metadata = excluded.metadata, + updated_at = strftime('%s', 'now') + """ + else: + _SQL = """ + INSERT OR REPLACE INTO chunks + (id, user_id, scope, source, path, start_line, end_line, + text, embedding, hash, metadata, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now')) + """ params = ( chunk.id, chunk.user_id, chunk.scope, chunk.source, chunk.path, chunk.start_line, chunk.end_line, chunk.text, @@ -437,24 +463,32 @@ class MemoryStorage: See save_chunk for why UPSERT is used instead of INSERT OR REPLACE. """ - _SQL = """ - INSERT INTO chunks - (id, user_id, scope, source, path, start_line, end_line, - text, embedding, hash, metadata, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now')) - ON CONFLICT(id) DO UPDATE SET - user_id = excluded.user_id, - scope = excluded.scope, - source = excluded.source, - path = excluded.path, - start_line = excluded.start_line, - end_line = excluded.end_line, - text = excluded.text, - embedding = excluded.embedding, - hash = excluded.hash, - metadata = excluded.metadata, - updated_at = strftime('%s', 'now') - """ + if _HAS_UPSERT: + _SQL = """ + INSERT INTO chunks + (id, user_id, scope, source, path, start_line, end_line, + text, embedding, hash, metadata, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now')) + ON CONFLICT(id) DO UPDATE SET + user_id = excluded.user_id, + scope = excluded.scope, + source = excluded.source, + path = excluded.path, + start_line = excluded.start_line, + end_line = excluded.end_line, + text = excluded.text, + embedding = excluded.embedding, + hash = excluded.hash, + metadata = excluded.metadata, + updated_at = strftime('%s', 'now') + """ + else: + _SQL = """ + INSERT OR REPLACE INTO chunks + (id, user_id, scope, source, path, start_line, end_line, + text, embedding, hash, metadata, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now')) + """ params_list = [ ( c.id, c.user_id, c.scope, c.source, c.path, @@ -544,36 +578,61 @@ class MemoryStorage: if not vectors: return [] - matrix = np.array(vectors, dtype=np.float32) # (N, D) - q_vec = np.array(query_embedding, dtype=np.float32) # (D,) + if _HAS_NUMPY: + matrix = np.array(vectors, dtype=np.float32) # (N, D) + q_vec = np.array(query_embedding, dtype=np.float32) # (D,) - # Vectorized cosine similarity: dot(matrix, q) / (||matrix|| * ||q||) - dots = matrix @ q_vec # (N,) - row_norms = np.linalg.norm(matrix, axis=1) # (N,) - q_norm = float(np.linalg.norm(q_vec)) - denominators = row_norms * q_norm - np.maximum(denominators, 1e-10, out=denominators) # avoid div-by-zero - sims = dots / denominators # (N,) + # Vectorized cosine similarity: dot(matrix, q) / (||matrix|| * ||q||) + dots = matrix @ q_vec # (N,) + row_norms = np.linalg.norm(matrix, axis=1) # (N,) + q_norm = float(np.linalg.norm(q_vec)) + denominators = row_norms * q_norm + np.maximum(denominators, 1e-10, out=denominators) # avoid div-by-zero + sims = dots / denominators # (N,) - # Select TopK using argpartition (O(N) average), then sort only those K - k = min(limit, len(valid_rows)) - top_idx = np.argpartition(sims, -k)[-k:] - top_idx = top_idx[np.argsort(sims[top_idx])[::-1]] + # Select TopK using argpartition (O(N) average), then sort only those K + k = min(limit, len(valid_rows)) + top_idx = np.argpartition(sims, -k)[-k:] + top_idx = top_idx[np.argsort(sims[top_idx])[::-1]] - - return [ - SearchResult( - path=valid_rows[i]['path'], - start_line=valid_rows[i]['start_line'], - end_line=valid_rows[i]['end_line'], - score=float(sims[i]), - snippet=self._truncate_text(valid_rows[i]['text'], 500), - source=valid_rows[i]['source'], - user_id=valid_rows[i]['user_id'] - ) - for i in top_idx - if sims[i] > 0 - ] + return [ + SearchResult( + path=valid_rows[i]['path'], + start_line=valid_rows[i]['start_line'], + end_line=valid_rows[i]['end_line'], + score=float(sims[i]), + snippet=self._truncate_text(valid_rows[i]['text'], 500), + source=valid_rows[i]['source'], + user_id=valid_rows[i]['user_id'] + ) + for i in top_idx + if sims[i] > 0 + ] + else: + # Pure-Python cosine similarity fallback (numpy not installed) + import math + q = query_embedding + q_norm = math.sqrt(sum(x * x for x in q)) or 1e-10 + scored = [] + for i, vec in enumerate(vectors): + dot = sum(a * b for a, b in zip(vec, q)) + v_norm = math.sqrt(sum(x * x for x in vec)) or 1e-10 + sim = dot / (v_norm * q_norm) + if sim > 0: + scored.append((sim, valid_rows[i])) + scored.sort(key=lambda x: x[0], reverse=True) + return [ + SearchResult( + path=row['path'], + start_line=row['start_line'], + end_line=row['end_line'], + score=sim, + snippet=self._truncate_text(row['text'], 500), + source=row['source'], + user_id=row['user_id'] + ) + for sim, row in scored[:limit] + ] def search_keyword( self, @@ -621,8 +680,8 @@ class MemoryStorage: if trigram_results: return trigram_results - # Step 3: LIKE fallback — last resort (FTS5 unavailable, or CJK single-char - # that trigram cannot match because it requires ≥3-char tokens). + # Step 3: LIKE fallback — last resort (FTS5 unavailable, or CJK tokens + # shorter than 3 characters that trigram cannot match, e.g. a single-char query). if not self.fts5_available or MemoryStorage._contains_cjk(query): return self._search_like(query, user_id, scopes, limit) @@ -829,18 +888,27 @@ class MemoryStorage: @staticmethod def _encode_embedding(embedding: Optional[List[float]]) -> Optional[bytes]: - """Encode embedding as float32 BLOB bytes (~6x smaller and faster than JSON).""" + """Encode embedding as float32 BLOB bytes (~6x smaller and faster than JSON). + Falls back to struct.pack when numpy is unavailable.""" if embedding is None: return None - return np.array(embedding, dtype=np.float32).tobytes() + if _HAS_NUMPY: + return np.array(embedding, dtype=np.float32).tobytes() + import struct + return struct.pack(f'{len(embedding)}f', *embedding) @staticmethod def _decode_embedding(raw) -> Optional[List[float]]: - """Decode embedding from BLOB bytes or legacy JSON string.""" + """Decode embedding from BLOB bytes or legacy JSON string. + Handles both numpy and numpy-free environments.""" if raw is None: return None if isinstance(raw, (bytes, bytearray)): - return np.frombuffer(raw, dtype=np.float32).tolist() + if _HAS_NUMPY: + return np.frombuffer(raw, dtype=np.float32).tolist() + import struct + n = len(raw) // 4 + return list(struct.unpack(f'{n}f', raw)) # Legacy JSON format written by older versions return json.loads(raw) @@ -970,7 +1038,10 @@ class MemoryStorage: """ if rank is None: return 0.0 - return abs(rank) / (1.0 + abs(rank)) + # Add a floor of 0.3 so any FTS5 match always exceeds typical + # min_score thresholds (default 0.1). Small-corpus ranks close to + # 0 would otherwise produce score≈0 and be filtered out downstream. + return 0.3 + 0.69 * (abs(rank) / (1.0 + abs(rank))) @staticmethod def _truncate_text(text: str, max_chars: int) -> str: