diff --git a/java_codebase_rag/lance_optimize.py b/java_codebase_rag/lance_optimize.py new file mode 100644 index 0000000..cb435dd --- /dev/null +++ b/java_codebase_rag/lance_optimize.py @@ -0,0 +1,148 @@ +"""Serialized post-flow LanceDB optimize with commit-conflict retry. + +cocoindex 1.0.7 schedules ``table.optimize()`` (a LanceDB **Rewrite**/compaction +transaction) as a *background* ``asyncio`` task that races concurrent +``table.delete()`` (**Delete**) transactions emitted by later mutation batches. +LanceDB does not allow a Rewrite to commit concurrently with a Delete +(upstream lancedb#1504 — "We do not support concurrent deletes right now"), +which surfaces as a flood of:: + + RuntimeError: lance error: Retryable commit conflict for version N: \ +This Rewrite transaction was preempted by concurrent transaction Delete ... + +To eliminate the race, the flow (``java_index_flow_lancedb.py``) disables the +in-flight background optimize entirely by raising +``num_transactions_before_optimize`` to a value that is effectively never +reached. This module then performs a *single*, serialized optimize after the +flow returns (exit 0 → no concurrent writers), retrying the rare residual +commit conflict that two internal compaction passes can still produce. +""" +from __future__ import annotations + +import asyncio +import sys +from pathlib import Path + +# Single source of truth for the three Lance table names created by the flow. +# Keep in sync with ``search_lancedb.TABLES`` (the values there mirror these). +LANCE_TABLE_NAMES: tuple[str, ...] = ( + "javacodeindex_java_code", + "sqlschemaindex_sql_schema", + "yamlconfigindex_yaml_config", +) + +# Commit conflicts are transient; a handful of exponential-backoff retries is +# enough because, post-flow, there are no concurrent writers — only successive +# optimize/compaction passes within this single serialized call can still +# transiently preempt one another. +_MAX_ATTEMPTS = 6 +_BASE_BACKOFF_S = 0.1 + +# Substrings identifying the retryable Lance commit-conflict error. LanceDB +# wraps the underlying lance error text into the raised ``RuntimeError`` str, +# so a substring match is the robust detector (no dedicated exception type). +_RETRYABLE_MARKERS = ( + "Retryable commit conflict", + "preempted by concurrent transaction", +) + + +def _is_retryable(exc: BaseException) -> bool: + text = str(exc) + return any(marker in text for marker in _RETRYABLE_MARKERS) + + +async def _list_table_names(db: object) -> set[str]: + """Existing table names across LanceDB API variants (``list_tables`` ≥ ``table_names``).""" + if hasattr(db, "list_tables"): + response = await db.list_tables() + return set(getattr(response, "tables", response)) + return set(await db.table_names()) + + +async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict[str, str]: + """Optimize all known Lance tables under *index_dir*, serially, with retry. + + Runs ``table.optimize()`` for each name in :data:`LANCE_TABLE_NAMES` that + exists in the DB. Retryable commit conflicts are retried with exponential + backoff; any other exception (or an exhausted retry budget) is captured + per-table in the returned dict and logged to **stderr** — never stdout, + since this is callable from stdio-MCP / JSON-stdout contexts. + + Args: + index_dir: directory holding the Lance tables (the flow's LanceDB URI). + quiet: when True, suppress the per-table success/skip info lines on + stderr (errors are always logged). + + Returns: + Mapping of table name → status. Values are ``"ok"``, ``"skipped"`` + (table absent — e.g. a repo with no SQL/YAML), or ``"error: "``. + """ + # Lazy import: the flow imports this module for LANCE_TABLE_NAMES and must + # not pay the lancedb import cost at flow-definition time. + import lancedb + + results: dict[str, str] = {} + db = await lancedb.connect_async(str(index_dir)) + try: + try: + existing = await _list_table_names(db) + except Exception as exc: + print( + f"java-codebase-rag: optimize: failed to list tables in " + f"{index_dir}: {exc}", + file=sys.stderr, + ) + return {name: f"error: list failed: {exc}" for name in LANCE_TABLE_NAMES} + + for name in LANCE_TABLE_NAMES: + if name not in existing: + results[name] = "skipped" + if not quiet: + print( + f"java-codebase-rag: optimize: {name} absent, skipped", + file=sys.stderr, + ) + continue + try: + table = await db.open_table(name) + except Exception as exc: + results[name] = f"error: open failed: {exc}" + print( + f"java-codebase-rag: optimize: {name} open failed: {exc}", + file=sys.stderr, + ) + continue + + last_exc: BaseException | None = None + for attempt in range(_MAX_ATTEMPTS): + try: + await table.optimize() + last_exc = None + break + except Exception as exc: + last_exc = exc + if _is_retryable(exc) and attempt < _MAX_ATTEMPTS - 1: + await asyncio.sleep(_BASE_BACKOFF_S * (2**attempt)) + continue + # Non-retryable, or retries exhausted: stop the loop and + # surface below — do not swallow silently. + break + + if last_exc is None: + results[name] = "ok" + if not quiet: + print( + f"java-codebase-rag: optimize: {name} ok", + file=sys.stderr, + ) + else: + results[name] = f"error: {last_exc}" + print( + f"java-codebase-rag: optimize: {name} failed: {last_exc}", + file=sys.stderr, + ) + finally: + # ``AsyncConnection.close`` is a *sync* method in lancedb 0.30.x. + db.close() + return results diff --git a/java_codebase_rag/pipeline.py b/java_codebase_rag/pipeline.py index d75e04d..1caa7e5 100644 --- a/java_codebase_rag/pipeline.py +++ b/java_codebase_rag/pipeline.py @@ -1,6 +1,7 @@ """Subprocess helpers for cocoindex + graph builder (no heavy ML imports at import time).""" from __future__ import annotations +import asyncio import os import shutil import subprocess @@ -111,6 +112,57 @@ def run_cocoindex_update( quiet: bool, verbose: bool = True, lance_project_root: Path | None = None, +) -> subprocess.CompletedProcess[str]: + result = _run_cocoindex_update_impl( + env, + full_reprocess=full_reprocess, + quiet=quiet, + verbose=verbose, + lance_project_root=lance_project_root, + ) + # After cocoindex returns exit 0 there are no concurrent writers, so this + # is the safe window to compact the Lance tables. The flow disabled its + # in-flight background optimize (see java_index_flow_lancedb.py), making + # this serialized pass the sole optimizer. Optimize failure does not flip + # the cocoindex CompletedProcess (a successful index is still usable, just + # not compacted); the outcome is logged to stderr only. + if result.returncode == 0: + _maybe_run_serialized_optimize(env, quiet=quiet) + return result + + +def _maybe_run_serialized_optimize(env: dict[str, str], *, quiet: bool) -> None: + """Resolve the index dir from *env* and run the serialized Lance optimize. + + The flow's lifespan reads ``JAVA_CODEBASE_RAG_INDEX_DIR`` (set by the CLI / + config.subprocess_env), so it is guaranteed present when cocoindex ran. + If it is somehow absent we skip optimize with a stderr warning rather than + crash — a successful index is still searchable un-compacted. + """ + idx_raw = env.get("JAVA_CODEBASE_RAG_INDEX_DIR", "").strip() + if not idx_raw: + print( + "java-codebase-rag: optimize skipped — JAVA_CODEBASE_RAG_INDEX_DIR " + "not set in subprocess env", + file=sys.stderr, + ) + return + try: + from java_codebase_rag.lance_optimize import optimize_lance_tables + + asyncio.run(optimize_lance_tables(Path(idx_raw), quiet=quiet)) + except Exception as exc: + # Never crash the CLI on an optimize failure — surface on stderr only. + print(f"java-codebase-rag: optimize failed: {exc}", file=sys.stderr) + + +def _run_cocoindex_update_impl( + env: dict[str, str], + *, + full_reprocess: bool, + quiet: bool, + verbose: bool = True, + lance_project_root: Path | None = None, ) -> subprocess.CompletedProcess[str]: exe = cocoindex_bin() if not exe.is_file(): diff --git a/java_index_flow_lancedb.py b/java_index_flow_lancedb.py index 91af5b9..6213616 100644 --- a/java_index_flow_lancedb.py +++ b/java_index_flow_lancedb.py @@ -36,6 +36,7 @@ from cocoindex.resources.file import PatternFilePathMatcher from java_codebase_rag.config import resolved_sbert_model_for_process_env +from java_codebase_rag.lance_optimize import LANCE_TABLE_NAMES from java_index_v1_common import ( JAVA_CHUNK, SBERT_MODEL, @@ -68,6 +69,20 @@ splitter = RecursiveSplitter() +# cocoindex 1.0.7 schedules ``table.optimize()`` (a LanceDB Rewrite/compaction +# transaction) as a *background* asyncio task after every +# ``num_transactions_before_optimize`` mutation batches (default 50). That +# background Rewrite races the concurrent ``table.delete()`` (Delete) +# transactions emitted by later batches, and LanceDB does not allow a Rewrite +# to commit concurrently with a Delete (upstream lancedb#1504), which floods +# stderr with "Retryable commit conflict ... preempted by concurrent +# transaction Delete". Setting this effectively to infinity disables the +# in-flight background optimize; the serialized post-flow optimize in +# ``lance_optimize.optimize_lance_tables`` then compacts the table with no +# concurrent writers. ``optimize()`` is pure maintenance (compact/prune/index); +# upsert/delete correctness via merge_insert does not depend on it. +_NUM_TXN_BEFORE_OPTIMIZE = 10**12 + @dataclass class JavaLanceChunk: @@ -317,8 +332,9 @@ async def app_main() -> None: ) java_table = await lancedb.mount_table_target( LANCE_DB, - "javacodeindex_java_code", + LANCE_TABLE_NAMES[0], java_schema, + num_transactions_before_optimize=_NUM_TXN_BEFORE_OPTIMIZE, ) sql_schema = await lancedb.TableSchema.from_class( @@ -327,8 +343,9 @@ async def app_main() -> None: ) sql_table = await lancedb.mount_table_target( LANCE_DB, - "sqlschemaindex_sql_schema", + LANCE_TABLE_NAMES[1], sql_schema, + num_transactions_before_optimize=_NUM_TXN_BEFORE_OPTIMIZE, ) yaml_schema = await lancedb.TableSchema.from_class( @@ -337,8 +354,9 @@ async def app_main() -> None: ) yaml_table = await lancedb.mount_table_target( LANCE_DB, - "yamlconfigindex_yaml_config", + LANCE_TABLE_NAMES[2], yaml_schema, + num_transactions_before_optimize=_NUM_TXN_BEFORE_OPTIMIZE, ) project_root = coco.use_context(PROJECT_ROOT) diff --git a/server.py b/server.py index bbfbea1..8495b5d 100644 --- a/server.py +++ b/server.py @@ -87,6 +87,7 @@ class RefreshIndexOutput(BaseModel): graph_stdout: str = "" graph_stderr: str = "" phases_run: list[Literal["vectors", "graph"]] = Field(default_factory=list) + optimize_error: str | None = None class IndexInfoOutput(BaseModel): @@ -329,9 +330,29 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> graph_code: int | None = None graph_out = "" graph_err = "" + optimize_error: str | None = None if ok: if not quiet: print(file=sys.stderr, flush=True) + # Serialized post-flow Lance optimize: the flow disabled its background + # optimize, so with cocoindex returned exit 0 there are no concurrent + # writers — this is the safe window to compact. An optimize failure is + # surfaced via optimize_error / stderr and must NOT flip the success of + # a vectors phase that succeeded; the index is still searchable. + try: + from java_codebase_rag.lance_optimize import optimize_lance_tables + + idx_raw = os.environ.get("JAVA_CODEBASE_RAG_INDEX_DIR", "").strip() + if idx_raw and not idx_raw.startswith(("s3://", "gs://", "az://")): + idx_dir = Path(idx_raw).expanduser().resolve() + elif idx_raw: + idx_dir = Path(idx_raw) + else: + idx_dir = (root / ".java-codebase-rag").resolve() + await optimize_lance_tables(idx_dir, quiet=quiet) + except Exception as exc: + optimize_error = f"lance optimize failed: {exc}" + print(f"java-codebase-rag: {optimize_error}", file=sys.stderr) builder = Path(__file__).resolve().parent / "build_ast_graph.py" if builder.is_file(): try: @@ -368,6 +389,10 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> message = f"cocoindex exit {proc.returncode}" elif graph_code is not None and graph_code != 0: message = f"graph builder exit {graph_code}" + # Surface a post-flow optimize failure in the message too (success is not + # flipped — the vectors phase succeeded and the index is still usable). + if optimize_error is not None: + message = optimize_error if message is None else f"{message}; {optimize_error}" return RefreshIndexOutput( success=ok and (graph_code is None or graph_code == 0), exit_code=proc.returncode, @@ -378,6 +403,7 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) -> graph_stdout=graph_out[-4000:] if len(graph_out) > 4000 else graph_out, graph_stderr=graph_err[-4000:] if len(graph_err) > 4000 else graph_err, phases_run=phases_run, + optimize_error=optimize_error, ) diff --git a/tests/fixtures/cli_progress_stdout/reprocess_quiet_success.stdout.txt b/tests/fixtures/cli_progress_stdout/reprocess_quiet_success.stdout.txt index 0bb74d2..58fc326 100644 --- a/tests/fixtures/cli_progress_stdout/reprocess_quiet_success.stdout.txt +++ b/tests/fixtures/cli_progress_stdout/reprocess_quiet_success.stdout.txt @@ -1 +1 @@ -{"exit_code": 0, "graph_exit_code": 0, "graph_stderr": "", "graph_stdout": "", "message": null, "phases_run": ["vectors", "graph"], "stderr": "", "stdout": "", "success": true} +{"exit_code": 0, "graph_exit_code": 0, "graph_stderr": "", "graph_stdout": "", "message": null, "optimize_error": null, "phases_run": ["vectors", "graph"], "stderr": "", "stdout": "", "success": true} diff --git a/tests/test_lance_optimize.py b/tests/test_lance_optimize.py new file mode 100644 index 0000000..c3a0e6c --- /dev/null +++ b/tests/test_lance_optimize.py @@ -0,0 +1,165 @@ +"""Tests for the serialized Lance optimize helper (``java_codebase_rag.lance_optimize``). + +These tests fake the lancedb async connection/table so the retry logic is +exercised without a real LanceDB on disk. They assert invariants (retry on +commit-conflict, no retry on other errors, missing tables skipped) rather than +overspecifying the lancedb API surface — see ``tests/README.md``. +""" +from __future__ import annotations + +import sys +from pathlib import Path + + +def _conflict_error(version: int = 4424) -> RuntimeError: + return RuntimeError( + "lance error: Retryable commit conflict for version " + f"{version}: This Rewrite transaction was preempted by concurrent " + "transaction Delete at version 4424. Please retry." + ) + + +class _FakeTable: + """Fake async table whose ``optimize`` follows a scripted call sequence.""" + + def __init__(self, name: str, outcomes: list[BaseException | None]) -> None: + self.name = name + self._outcomes = list(outcomes) + self.optimize_calls = 0 + + async def optimize(self, *args, **kwargs): # noqa: ANN002, ANN003 — fake + self.optimize_calls += 1 + if not self._outcomes: + return None + outcome = self._outcomes.pop(0) + if isinstance(outcome, BaseException): + raise outcome + return None + + +class _FakeListResponse: + def __init__(self, names: set[str]) -> None: + self.tables = list(names) + + +class _FakeConnection: + """Fake async connection: ``list_tables``/``table_names`` + ``open_table`` + sync ``close``.""" + + def __init__(self, *, table_names: set[str], tables: dict[str, _FakeTable]) -> None: + self._names = table_names + self._tables = tables + self.closed = False + + async def list_tables(self): # noqa: ANN201 — fake + return _FakeListResponse(self._names) + + async def open_table(self, name: str) -> _FakeTable: + return self._tables[name] + + def close(self) -> None: + self.closed = True + + +class _FakeLanceDB: + """Module stand-in for ``lancedb`` exposing ``connect_async``.""" + + def __init__(self, connection: _FakeConnection) -> None: + self._connection = connection + + async def connect_async(self, uri: str): # noqa: ANN201 — fake + return self._connection + + +def _install_fake_lancedb(monkeypatch, connection: _FakeConnection) -> _FakeLanceDB: + fake_module = _FakeLanceDB(connection) + monkeypatch.setitem(sys.modules, "lancedb", fake_module) + return fake_module + + +async def test_optimize_retries_commit_conflict_then_succeeds(monkeypatch, tmp_path) -> None: + """A Retryable commit conflict is retried until ``optimize`` succeeds.""" + from java_codebase_rag import lance_optimize + + table = _FakeTable( + lance_optimize.LANCE_TABLE_NAMES[0], + [_conflict_error(), _conflict_error(), None], # 2 conflicts, then ok + ) + conn = _FakeConnection( + table_names={lance_optimize.LANCE_TABLE_NAMES[0]}, + tables={lance_optimize.LANCE_TABLE_NAMES[0]: table}, + ) + _install_fake_lancedb(monkeypatch, conn) + + results = await lance_optimize.optimize_lance_tables(tmp_path, quiet=True) + assert results[lance_optimize.LANCE_TABLE_NAMES[0]] == "ok" + assert table.optimize_calls == 3 # 2 retries + 1 success + assert conn.closed is True + + +async def test_optimize_does_not_retry_non_conflict_error(monkeypatch, tmp_path) -> None: + """A non-conflict exception is re-raised (captured per-table), never retried.""" + from java_codebase_rag import lance_optimize + + boom = ValueError("totally unrelated disk error") + table = _FakeTable(lance_optimize.LANCE_TABLE_NAMES[0], [boom]) + conn = _FakeConnection( + table_names={lance_optimize.LANCE_TABLE_NAMES[0]}, + tables={lance_optimize.LANCE_TABLE_NAMES[0]: table}, + ) + _install_fake_lancedb(monkeypatch, conn) + + results = await lance_optimize.optimize_lance_tables(tmp_path, quiet=True) + # The error is captured in the result (not re-raised out of the helper) so + # the caller can report it; but it must not have been retried. + assert results[lance_optimize.LANCE_TABLE_NAMES[0]].startswith("error:") + assert "totally unrelated disk error" in results[lance_optimize.LANCE_TABLE_NAMES[0]] + assert table.optimize_calls == 1 + + +async def test_optimize_reports_missing_table_as_skipped(monkeypatch, tmp_path) -> None: + """A table name absent from the DB is reported skipped, with no exception.""" + from java_codebase_rag import lance_optimize + + # DB contains only the java table; sql + yaml are absent (e.g. a repo with + # no SQL/YAML) and must come back as skipped. + java_name = lance_optimize.LANCE_TABLE_NAMES[0] + java_table = _FakeTable(java_name, [None]) + conn = _FakeConnection(table_names={java_name}, tables={java_name: java_table}) + _install_fake_lancedb(monkeypatch, conn) + + results = await lance_optimize.optimize_lance_tables(tmp_path, quiet=True) + assert results[java_name] == "ok" + for missing in lance_optimize.LANCE_TABLE_NAMES[1:]: + assert results[missing] == "skipped" + + +async def test_optimize_closes_connection_even_on_open_failure(monkeypatch, tmp_path) -> None: + """``db.close()`` runs in finally even if a table fails to open.""" + from java_codebase_rag import lance_optimize + + name = lance_optimize.LANCE_TABLE_NAMES[0] + + class _ConnOpenFails(_FakeConnection): + async def open_table(self, name: str) -> _FakeTable: + raise OSError("cannot open") + + conn = _ConnOpenFails(table_names={name}, tables={}) + _install_fake_lancedb(monkeypatch, conn) + + results = await lance_optimize.optimize_lance_tables(tmp_path, quiet=True) + assert results[name].startswith("error:") + assert conn.closed is True + + +def test_lance_table_names_constant_matches_search_lancedb_tables() -> None: + """The single source of truth agrees with the search-side TABLES mapping.""" + from java_codebase_rag.lance_optimize import LANCE_TABLE_NAMES + + # Imported lazily to avoid pulling sentence-transformers at collection time. + sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + try: + from search_lancedb import TABLES + finally: + sys.path.pop(0) + assert set(LANCE_TABLE_NAMES) == set(TABLES.values()) + assert len(LANCE_TABLE_NAMES) == 3 diff --git a/tests/test_lancedb_e2e.py b/tests/test_lancedb_e2e.py index 4b70e4a..5ba0d9b 100644 --- a/tests/test_lancedb_e2e.py +++ b/tests/test_lancedb_e2e.py @@ -115,6 +115,16 @@ def lance_index(tmp_path_factory, corpus_root: Path) -> Path: assert proc.returncode == 0, ( f"cocoindex failed: stdout={proc.stdout}\nstderr={proc.stderr}" ) + # The flow disables cocoindex's concurrent background optimize (which raced + # table.delete() and flooded stderr with commit conflicts — issue #308). + # After the fix, no commit-conflict markers should appear in the flow's + # stderr. We assert this here because this fixture runs a real full + # reprocess; if the race regressed, this is where it would surface. + for marker in ("Retryable commit conflict", "preempted by concurrent transaction"): + assert marker not in proc.stderr, ( + f"commit-conflict marker '{marker}' present in cocoindex stderr; " + f"the in-flow background optimize race may have regressed:\n{proc.stderr}" + ) builder = bundle_dir / "build_ast_graph.py" proc = subprocess.run(