fix(memory): address PR review — numpy/UPSERT soft deps + BM25 floor + BLOB dim

- numpy soft dependency: try/except import + _HAS_NUMPY flag; _encode_embedding
  and _decode_embedding fall back to struct.pack/unpack; search_vector falls back
  to pure-Python cosine loop — startup never fails without numpy reinstalled
- SQLite UPSERT guard: _HAS_UPSERT = sqlite_version_info >= (3,24,0); save_chunk
  and save_chunks_batch fall back to INSERT OR REPLACE on SQLite < 3.24 with a
  one-time startup warning about potential FTS rowid drift
- _bm25_rank_to_score floor: 0.3 + 0.69*(|rank|/(1+|rank|)) → always in [0.3, 0.99),
  prevents small-corpus matches scoring 0.0 and being filtered by min_score
- detect_index_dim BLOB-aware: check isinstance(raw, bytes) first and return
  len(raw)//4 before json.loads, so /memory status works after embedding format switch
- Comment: "CJK single-char" → "CJK tokens shorter than 3 characters"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
yangluxin613
2026-05-25 14:15:16 +08:00
parent 9b31f45481
commit fd571ac539
2 changed files with 148 additions and 73 deletions

View File

@@ -31,9 +31,13 @@ def detect_index_dim(storage) -> Optional[int]:
if not row or not row["embedding"]:
return None
try:
emb = json.loads(row["embedding"])
raw = row["embedding"]
if isinstance(raw, (bytes, bytearray)):
# New BLOB format: 4 bytes per float32
return len(raw) // 4
emb = json.loads(raw)
return len(emb) if isinstance(emb, list) else None
except (json.JSONDecodeError, TypeError):
except (json.JSONDecodeError, TypeError, Exception):
return None

View File

@@ -13,7 +13,17 @@ import threading
from typing import List, Dict, Optional, Any
from pathlib import Path
from dataclasses import dataclass
import numpy as np
try:
import numpy as np
_HAS_NUMPY = True
except ImportError:
_HAS_NUMPY = False
np = None # type: ignore[assignment]
# UPSERT (INSERT … ON CONFLICT DO UPDATE) requires SQLite ≥ 3.24.0 (2018).
# Older systems (e.g. CentOS 7 ships SQLite 3.7) fall back to INSERT OR REPLACE,
# which risks FTS5 rowid drift on chunk updates (see save_chunk docstring).
_HAS_UPSERT = sqlite3.sqlite_version_info >= (3, 24, 0)
# ---------------------------------------------------------------------------
# CJK character ranges, compiled once at module load.
@@ -93,6 +103,14 @@ class MemoryStorage:
# Check FTS5 support
self.fts5_available = self._check_fts5_support()
if not _HAS_UPSERT:
from common.log import logger
logger.warning(
"[MemoryStorage] SQLite %s < 3.24 — UPSERT unavailable. "
"Falling back to INSERT OR REPLACE; FTS5 rowid may drift on "
"chunk updates (rebuild index periodically to recover).",
sqlite3.sqlite_version,
)
if not self.fts5_available:
from common.log import logger
logger.debug("[MemoryStorage] FTS5 not available, using LIKE-based keyword search")
@@ -403,6 +421,7 @@ class MemoryStorage:
ON CONFLICT DO UPDATE fires the AFTER UPDATE trigger (chunks_au /
chunks_trigram_au) and keeps the original rowid intact.
"""
if _HAS_UPSERT:
_SQL = """
INSERT INTO chunks
(id, user_id, scope, source, path, start_line, end_line,
@@ -421,6 +440,13 @@ class MemoryStorage:
metadata = excluded.metadata,
updated_at = strftime('%s', 'now')
"""
else:
_SQL = """
INSERT OR REPLACE INTO chunks
(id, user_id, scope, source, path, start_line, end_line,
text, embedding, hash, metadata, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now'))
"""
params = (
chunk.id, chunk.user_id, chunk.scope, chunk.source, chunk.path,
chunk.start_line, chunk.end_line, chunk.text,
@@ -437,6 +463,7 @@ class MemoryStorage:
See save_chunk for why UPSERT is used instead of INSERT OR REPLACE.
"""
if _HAS_UPSERT:
_SQL = """
INSERT INTO chunks
(id, user_id, scope, source, path, start_line, end_line,
@@ -455,6 +482,13 @@ class MemoryStorage:
metadata = excluded.metadata,
updated_at = strftime('%s', 'now')
"""
else:
_SQL = """
INSERT OR REPLACE INTO chunks
(id, user_id, scope, source, path, start_line, end_line,
text, embedding, hash, metadata, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now'))
"""
params_list = [
(
c.id, c.user_id, c.scope, c.source, c.path,
@@ -544,6 +578,7 @@ class MemoryStorage:
if not vectors:
return []
if _HAS_NUMPY:
matrix = np.array(vectors, dtype=np.float32) # (N, D)
q_vec = np.array(query_embedding, dtype=np.float32) # (D,)
@@ -560,7 +595,6 @@ class MemoryStorage:
top_idx = np.argpartition(sims, -k)[-k:]
top_idx = top_idx[np.argsort(sims[top_idx])[::-1]]
return [
SearchResult(
path=valid_rows[i]['path'],
@@ -574,6 +608,31 @@ class MemoryStorage:
for i in top_idx
if sims[i] > 0
]
else:
# Pure-Python cosine similarity fallback (numpy not installed)
import math
q = query_embedding
q_norm = math.sqrt(sum(x * x for x in q)) or 1e-10
scored = []
for i, vec in enumerate(vectors):
dot = sum(a * b for a, b in zip(vec, q))
v_norm = math.sqrt(sum(x * x for x in vec)) or 1e-10
sim = dot / (v_norm * q_norm)
if sim > 0:
scored.append((sim, valid_rows[i]))
scored.sort(key=lambda x: x[0], reverse=True)
return [
SearchResult(
path=row['path'],
start_line=row['start_line'],
end_line=row['end_line'],
score=sim,
snippet=self._truncate_text(row['text'], 500),
source=row['source'],
user_id=row['user_id']
)
for sim, row in scored[:limit]
]
def search_keyword(
self,
@@ -621,8 +680,8 @@ class MemoryStorage:
if trigram_results:
return trigram_results
# Step 3: LIKE fallback — last resort (FTS5 unavailable, or CJK single-char
# that trigram cannot match because it requires ≥3-char tokens).
# Step 3: LIKE fallback — last resort (FTS5 unavailable, or CJK tokens
# shorter than 3 characters that trigram cannot match, e.g. a single-char query).
if not self.fts5_available or MemoryStorage._contains_cjk(query):
return self._search_like(query, user_id, scopes, limit)
@@ -829,18 +888,27 @@ class MemoryStorage:
@staticmethod
def _encode_embedding(embedding: Optional[List[float]]) -> Optional[bytes]:
"""Encode embedding as float32 BLOB bytes (~6x smaller and faster than JSON)."""
"""Encode embedding as float32 BLOB bytes (~6x smaller and faster than JSON).
Falls back to struct.pack when numpy is unavailable."""
if embedding is None:
return None
if _HAS_NUMPY:
return np.array(embedding, dtype=np.float32).tobytes()
import struct
return struct.pack(f'{len(embedding)}f', *embedding)
@staticmethod
def _decode_embedding(raw) -> Optional[List[float]]:
"""Decode embedding from BLOB bytes or legacy JSON string."""
"""Decode embedding from BLOB bytes or legacy JSON string.
Handles both numpy and numpy-free environments."""
if raw is None:
return None
if isinstance(raw, (bytes, bytearray)):
if _HAS_NUMPY:
return np.frombuffer(raw, dtype=np.float32).tolist()
import struct
n = len(raw) // 4
return list(struct.unpack(f'{n}f', raw))
# Legacy JSON format written by older versions
return json.loads(raw)
@@ -970,7 +1038,10 @@ class MemoryStorage:
"""
if rank is None:
return 0.0
return abs(rank) / (1.0 + abs(rank))
# Add a floor of 0.3 so any FTS5 match always exceeds typical
# min_score thresholds (default 0.1). Small-corpus ranks close to
# 0 would otherwise produce score≈0 and be filtered out downstream.
return 0.3 + 0.69 * (abs(rank) / (1.0 + abs(rank)))
@staticmethod
def _truncate_text(text: str, max_chars: int) -> str: