Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions java_codebase_rag/lance_optimize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Serialized post-flow LanceDB optimize with commit-conflict retry.

cocoindex 1.0.7 schedules ``table.optimize()`` (a LanceDB **Rewrite**/compaction
transaction) as a *background* ``asyncio`` task that races concurrent
``table.delete()`` (**Delete**) transactions emitted by later mutation batches.
LanceDB does not allow a Rewrite to commit concurrently with a Delete
(upstream lancedb#1504 — "We do not support concurrent deletes right now"),
which surfaces as a flood of::

RuntimeError: lance error: Retryable commit conflict for version N: \
This Rewrite transaction was preempted by concurrent transaction Delete ...

To eliminate the race, the flow (``java_index_flow_lancedb.py``) disables the
in-flight background optimize entirely by raising
``num_transactions_before_optimize`` to a value that is effectively never
reached. This module then performs a *single*, serialized optimize after the
flow returns (exit 0 → no concurrent writers), retrying the rare residual
commit conflict that two internal compaction passes can still produce.
"""
from __future__ import annotations

import asyncio
import sys
from pathlib import Path

# Single source of truth for the three Lance table names created by the flow.
# Keep in sync with ``search_lancedb.TABLES`` (the values there mirror these).
LANCE_TABLE_NAMES: tuple[str, ...] = (
"javacodeindex_java_code",
"sqlschemaindex_sql_schema",
"yamlconfigindex_yaml_config",
)

# Commit conflicts are transient; a handful of exponential-backoff retries is
# enough because, post-flow, there are no concurrent writers — only successive
# optimize/compaction passes within this single serialized call can still
# transiently preempt one another.
_MAX_ATTEMPTS = 6
_BASE_BACKOFF_S = 0.1

# Substrings identifying the retryable Lance commit-conflict error. LanceDB
# wraps the underlying lance error text into the raised ``RuntimeError`` str,
# so a substring match is the robust detector (no dedicated exception type).
_RETRYABLE_MARKERS = (
"Retryable commit conflict",
"preempted by concurrent transaction",
)


def _is_retryable(exc: BaseException) -> bool:
text = str(exc)
return any(marker in text for marker in _RETRYABLE_MARKERS)


async def _list_table_names(db: object) -> set[str]:
"""Existing table names across LanceDB API variants (``list_tables`` ≥ ``table_names``)."""
if hasattr(db, "list_tables"):
response = await db.list_tables()
return set(getattr(response, "tables", response))
return set(await db.table_names())


async def optimize_lance_tables(index_dir: Path, *, quiet: bool = False) -> dict[str, str]:
"""Optimize all known Lance tables under *index_dir*, serially, with retry.

Runs ``table.optimize()`` for each name in :data:`LANCE_TABLE_NAMES` that
exists in the DB. Retryable commit conflicts are retried with exponential
backoff; any other exception (or an exhausted retry budget) is captured
per-table in the returned dict and logged to **stderr** — never stdout,
since this is callable from stdio-MCP / JSON-stdout contexts.

Args:
index_dir: directory holding the Lance tables (the flow's LanceDB URI).
quiet: when True, suppress the per-table success/skip info lines on
stderr (errors are always logged).

Returns:
Mapping of table name → status. Values are ``"ok"``, ``"skipped"``
(table absent — e.g. a repo with no SQL/YAML), or ``"error: <text>"``.
"""
# Lazy import: the flow imports this module for LANCE_TABLE_NAMES and must
# not pay the lancedb import cost at flow-definition time.
import lancedb

results: dict[str, str] = {}
db = await lancedb.connect_async(str(index_dir))
try:
try:
existing = await _list_table_names(db)
except Exception as exc:
print(
f"java-codebase-rag: optimize: failed to list tables in "
f"{index_dir}: {exc}",
file=sys.stderr,
)
return {name: f"error: list failed: {exc}" for name in LANCE_TABLE_NAMES}

for name in LANCE_TABLE_NAMES:
if name not in existing:
results[name] = "skipped"
if not quiet:
print(
f"java-codebase-rag: optimize: {name} absent, skipped",
file=sys.stderr,
)
continue
try:
table = await db.open_table(name)
except Exception as exc:
results[name] = f"error: open failed: {exc}"
print(
f"java-codebase-rag: optimize: {name} open failed: {exc}",
file=sys.stderr,
)
continue

last_exc: BaseException | None = None
for attempt in range(_MAX_ATTEMPTS):
try:
await table.optimize()
last_exc = None
break
except Exception as exc:
last_exc = exc
if _is_retryable(exc) and attempt < _MAX_ATTEMPTS - 1:
await asyncio.sleep(_BASE_BACKOFF_S * (2**attempt))
continue
# Non-retryable, or retries exhausted: stop the loop and
# surface below — do not swallow silently.
break

if last_exc is None:
results[name] = "ok"
if not quiet:
print(
f"java-codebase-rag: optimize: {name} ok",
file=sys.stderr,
)
else:
results[name] = f"error: {last_exc}"
print(
f"java-codebase-rag: optimize: {name} failed: {last_exc}",
file=sys.stderr,
)
finally:
# ``AsyncConnection.close`` is a *sync* method in lancedb 0.30.x.
db.close()
return results
52 changes: 52 additions & 0 deletions java_codebase_rag/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Subprocess helpers for cocoindex + graph builder (no heavy ML imports at import time)."""
from __future__ import annotations

import asyncio
import os
import shutil
import subprocess
Expand Down Expand Up @@ -111,6 +112,57 @@ def run_cocoindex_update(
quiet: bool,
verbose: bool = True,
lance_project_root: Path | None = None,
) -> subprocess.CompletedProcess[str]:
result = _run_cocoindex_update_impl(
env,
full_reprocess=full_reprocess,
quiet=quiet,
verbose=verbose,
lance_project_root=lance_project_root,
)
# After cocoindex returns exit 0 there are no concurrent writers, so this
# is the safe window to compact the Lance tables. The flow disabled its
# in-flight background optimize (see java_index_flow_lancedb.py), making
# this serialized pass the sole optimizer. Optimize failure does not flip
# the cocoindex CompletedProcess (a successful index is still usable, just
# not compacted); the outcome is logged to stderr only.
if result.returncode == 0:
_maybe_run_serialized_optimize(env, quiet=quiet)
return result


def _maybe_run_serialized_optimize(env: dict[str, str], *, quiet: bool) -> None:
"""Resolve the index dir from *env* and run the serialized Lance optimize.

The flow's lifespan reads ``JAVA_CODEBASE_RAG_INDEX_DIR`` (set by the CLI /
config.subprocess_env), so it is guaranteed present when cocoindex ran.
If it is somehow absent we skip optimize with a stderr warning rather than
crash — a successful index is still searchable un-compacted.
"""
idx_raw = env.get("JAVA_CODEBASE_RAG_INDEX_DIR", "").strip()
if not idx_raw:
print(
"java-codebase-rag: optimize skipped — JAVA_CODEBASE_RAG_INDEX_DIR "
"not set in subprocess env",
file=sys.stderr,
)
return
try:
from java_codebase_rag.lance_optimize import optimize_lance_tables

asyncio.run(optimize_lance_tables(Path(idx_raw), quiet=quiet))
except Exception as exc:
# Never crash the CLI on an optimize failure — surface on stderr only.
print(f"java-codebase-rag: optimize failed: {exc}", file=sys.stderr)


def _run_cocoindex_update_impl(
env: dict[str, str],
*,
full_reprocess: bool,
quiet: bool,
verbose: bool = True,
lance_project_root: Path | None = None,
) -> subprocess.CompletedProcess[str]:
exe = cocoindex_bin()
if not exe.is_file():
Expand Down
24 changes: 21 additions & 3 deletions java_index_flow_lancedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from cocoindex.resources.file import PatternFilePathMatcher

from java_codebase_rag.config import resolved_sbert_model_for_process_env
from java_codebase_rag.lance_optimize import LANCE_TABLE_NAMES
from java_index_v1_common import (
JAVA_CHUNK,
SBERT_MODEL,
Expand Down Expand Up @@ -68,6 +69,20 @@

splitter = RecursiveSplitter()

# cocoindex 1.0.7 schedules ``table.optimize()`` (a LanceDB Rewrite/compaction
# transaction) as a *background* asyncio task after every
# ``num_transactions_before_optimize`` mutation batches (default 50). That
# background Rewrite races the concurrent ``table.delete()`` (Delete)
# transactions emitted by later batches, and LanceDB does not allow a Rewrite
# to commit concurrently with a Delete (upstream lancedb#1504), which floods
# stderr with "Retryable commit conflict ... preempted by concurrent
# transaction Delete". Setting this effectively to infinity disables the
# in-flight background optimize; the serialized post-flow optimize in
# ``lance_optimize.optimize_lance_tables`` then compacts the table with no
# concurrent writers. ``optimize()`` is pure maintenance (compact/prune/index);
# upsert/delete correctness via merge_insert does not depend on it.
_NUM_TXN_BEFORE_OPTIMIZE = 10**12


@dataclass
class JavaLanceChunk:
Expand Down Expand Up @@ -317,8 +332,9 @@ async def app_main() -> None:
)
java_table = await lancedb.mount_table_target(
LANCE_DB,
"javacodeindex_java_code",
LANCE_TABLE_NAMES[0],
java_schema,
num_transactions_before_optimize=_NUM_TXN_BEFORE_OPTIMIZE,
)

sql_schema = await lancedb.TableSchema.from_class(
Expand All @@ -327,8 +343,9 @@ async def app_main() -> None:
)
sql_table = await lancedb.mount_table_target(
LANCE_DB,
"sqlschemaindex_sql_schema",
LANCE_TABLE_NAMES[1],
sql_schema,
num_transactions_before_optimize=_NUM_TXN_BEFORE_OPTIMIZE,
)

yaml_schema = await lancedb.TableSchema.from_class(
Expand All @@ -337,8 +354,9 @@ async def app_main() -> None:
)
yaml_table = await lancedb.mount_table_target(
LANCE_DB,
"yamlconfigindex_yaml_config",
LANCE_TABLE_NAMES[2],
yaml_schema,
num_transactions_before_optimize=_NUM_TXN_BEFORE_OPTIMIZE,
)

project_root = coco.use_context(PROJECT_ROOT)
Expand Down
26 changes: 26 additions & 0 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class RefreshIndexOutput(BaseModel):
graph_stdout: str = ""
graph_stderr: str = ""
phases_run: list[Literal["vectors", "graph"]] = Field(default_factory=list)
optimize_error: str | None = None


class IndexInfoOutput(BaseModel):
Expand Down Expand Up @@ -329,9 +330,29 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) ->
graph_code: int | None = None
graph_out = ""
graph_err = ""
optimize_error: str | None = None
if ok:
if not quiet:
print(file=sys.stderr, flush=True)
# Serialized post-flow Lance optimize: the flow disabled its background
# optimize, so with cocoindex returned exit 0 there are no concurrent
# writers — this is the safe window to compact. An optimize failure is
# surfaced via optimize_error / stderr and must NOT flip the success of
# a vectors phase that succeeded; the index is still searchable.
try:
from java_codebase_rag.lance_optimize import optimize_lance_tables

idx_raw = os.environ.get("JAVA_CODEBASE_RAG_INDEX_DIR", "").strip()
if idx_raw and not idx_raw.startswith(("s3://", "gs://", "az://")):
idx_dir = Path(idx_raw).expanduser().resolve()
elif idx_raw:
idx_dir = Path(idx_raw)
else:
idx_dir = (root / ".java-codebase-rag").resolve()
await optimize_lance_tables(idx_dir, quiet=quiet)
except Exception as exc:
optimize_error = f"lance optimize failed: {exc}"
print(f"java-codebase-rag: {optimize_error}", file=sys.stderr)
builder = Path(__file__).resolve().parent / "build_ast_graph.py"
if builder.is_file():
try:
Expand Down Expand Up @@ -368,6 +389,10 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) ->
message = f"cocoindex exit {proc.returncode}"
elif graph_code is not None and graph_code != 0:
message = f"graph builder exit {graph_code}"
# Surface a post-flow optimize failure in the message too (success is not
# flipped — the vectors phase succeeded and the index is still usable).
if optimize_error is not None:
message = optimize_error if message is None else f"{message}; {optimize_error}"
return RefreshIndexOutput(
success=ok and (graph_code is None or graph_code == 0),
exit_code=proc.returncode,
Expand All @@ -378,6 +403,7 @@ async def run_refresh_pipeline(*, quiet: bool = False, verbose: bool = True) ->
graph_stdout=graph_out[-4000:] if len(graph_out) > 4000 else graph_out,
graph_stderr=graph_err[-4000:] if len(graph_err) > 4000 else graph_err,
phases_run=phases_run,
optimize_error=optimize_error,
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"exit_code": 0, "graph_exit_code": 0, "graph_stderr": "", "graph_stdout": "", "message": null, "phases_run": ["vectors", "graph"], "stderr": "", "stdout": "", "success": true}
{"exit_code": 0, "graph_exit_code": 0, "graph_stderr": "", "graph_stdout": "", "message": null, "optimize_error": null, "phases_run": ["vectors", "graph"], "stderr": "", "stdout": "", "success": true}
Loading
Loading