From 15d8c6ba13eb7e0b70ace480e9aaf5cb7b31b75f Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sat, 20 Jun 2026 23:42:56 +0300 Subject: [PATCH 1/3] perf(vectors): batch per-file embeddings + offload parse/enrich MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit process_*_file embedded each chunk serially (one `await embedder.embed()` per chunk in a for-loop), so the batched SentenceTransformerEmbedder (max_batch_size=64) only ever saw batches of 1. This wasted its batching within a file and hurt `increment` most: few changed files means little cross-file concurrency, so embeddings ran one-at-a-time. #1 — embed all chunks of a file concurrently via asyncio.gather so the embedder groups them into one model.encode(...). Applied to java/sql/yaml. #2 — for java, move parse_java + per-chunk enrich_chunk off the event loop (asyncio.to_thread) so the loop keeps the embedder's batching queue fed while a file is being parsed. parse_java is serialized by _PARSE_LOCK because it reuses the lru-cached singleton tree-sitter Parser, whose parse() mutates state and is not thread-safe. splitter.split stays inline (shared Rust RecursiveSplitter singleton); enrich_chunk is pure-Python over the immutable AST (lru_cache reads are GIL-safe) and runs without the lock. Output is equivalent: same chunk texts → same normalized vectors; rows are still declared in chunk order; uuids stay random. No schema/ontology/env change. Because process_java_file is @coco.fn(memo=True), the changed body invalidates memo entries, so the first run after upgrade reprocesses (equivalent to a full reindex) — subsequent runs are incremental again. Verified: ruff clean; 838 light tests pass; heavy test_lancedb_e2e (real cocoindex update --full-reprocess + graph + search) passes. One heavy test (test_lancedb_ignore_file_reduces_indexed_java_files) fails, but it fails identically on clean master (FileExistsError in the test's own mkdir, unrelated to this change). Co-Authored-By: Claude --- java_index_flow_lancedb.py | 82 +++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 14 deletions(-) diff --git a/java_index_flow_lancedb.py b/java_index_flow_lancedb.py index 0f6edac..78fdeed 100644 --- a/java_index_flow_lancedb.py +++ b/java_index_flow_lancedb.py @@ -16,6 +16,7 @@ """ from __future__ import annotations +import asyncio import inspect import os import sys @@ -72,6 +73,16 @@ splitter = RecursiveSplitter() +# ``parse_java`` (ast_java.parse_java) reuses a process-wide, lru-cached +# tree-sitter ``Parser`` (ast_java._parser) whose ``parse()`` mutates internal +# parser state and is NOT safe to call concurrently from multiple threads. +# ``process_java_file`` runs parsing + per-chunk enrichment off the event loop +# (vectors perf lever #2) so the loop stays free to feed the embedder's batching +# queue while a file is being parsed; this lock serializes the non-reentrant +# Parser across those worker threads. Parsing is cheap (ms-scale) so the cost +# of serializing it is negligible — the win is event-loop responsiveness. +_PARSE_LOCK = threading.Lock() + # cocoindex 1.0.7 schedules ``table.optimize()`` (a LanceDB Rewrite/compaction # transaction) as a *background* asyncio task after every # ``num_transactions_before_optimize`` mutation batches (default 50). That @@ -306,6 +317,39 @@ async def _lance_cm() -> AsyncIterator[Any]: yield +def _parse_and_enrich_java( + content_bytes: bytes, + chunks: list[Any], + rel: str, + project_root: Path, +) -> list[Any]: + """Parse one Java file and enrich every chunk, off the event loop. + + Returns a list of :class:`graph_enrich.ChunkEnrichment` aligned 1:1 with + ``chunks``. Intended to run via ``asyncio.to_thread`` from + ``process_java_file`` (vectors perf lever #2): while the worker thread + parses + enriches, the event loop is free to drive other files and keep the + embedder's batching queue fed. + + ``parse_java`` is serialized by ``_PARSE_LOCK`` (shared non-thread-safe + tree-sitter ``Parser``). ``enrich_chunk`` is pure-Python over the now + immutable AST — its ``lru_cache`` reads are thread-safe under the GIL — so + it runs outside the lock and can overlap across files. + """ + with _PARSE_LOCK: + ast = parse_java(content_bytes) + return [ + enrich_chunk( + ast, + chunk_start_byte=ch.start.byte_offset, + chunk_end_byte=ch.end.byte_offset, + file_path=rel, + project_root=project_root, + ) + for ch in chunks + ] + + @coco.fn(memo=True) async def process_java_file( file: localfs.File, @@ -326,6 +370,9 @@ async def process_java_file( language = detect_code_language(filename=file.file_path.path.name) or "text" cs, mn, ov = JAVA_CHUNK + # ``splitter.split`` stays inline: the module-level ``RecursiveSplitter`` + # shares one Rust object, so keeping split on the event loop preserves its + # existing single-threaded access (no new cross-file concurrency hazard). chunks = splitter.split( content, cs, @@ -335,18 +382,21 @@ async def process_java_file( ) rel = file.file_path.path.as_posix() content_bytes = content.encode("utf-8", errors="replace") - ast = parse_java(content_bytes) - for ch in chunks: + # (vectors perf lever #2) parse + enrich off the event loop so the loop can + # keep the embedder's batching queue fed while this file is being parsed. + # ``parse_java`` is lock-serialized internally (shared tree-sitter Parser). + enrichments = await asyncio.to_thread( + _parse_and_enrich_java, content_bytes, chunks, rel, project_root + ) + # (vectors perf lever #1) embed all chunks concurrently so the batched + # embedder groups them into one ``model.encode(...)`` (max_batch_size=64) + # instead of N serial batch-of-1 calls. Dominant win for ``increment`` + # (few changed files → little cross-file concurrency → otherwise no batching). + embeddings = await asyncio.gather(*(embedder.embed(ch.text) for ch in chunks)) + + for ch, enrich, emb in zip(chunks, enrichments, embeddings): rs, re = chunk_key_range(ch) - enrich = enrich_chunk( - ast, - chunk_start_byte=ch.start.byte_offset, - chunk_end_byte=ch.end.byte_offset, - file_path=rel, - project_root=project_root, - ) - emb = await embedder.embed(ch.text) table.declare_row( row=JavaLanceChunk( id=str(uuid.uuid4()), @@ -401,9 +451,11 @@ async def process_sql_file( ) rel = file.file_path.path.as_posix() - for ch in chunks: + # (vectors perf lever #1) embed chunks concurrently → batched encode. + embeddings = await asyncio.gather(*(embedder.embed(ch.text) for ch in chunks)) + + for ch, emb in zip(chunks, embeddings): rs, re = chunk_key_range(ch) - emb = await embedder.embed(ch.text) table.declare_row( row=SqlLanceChunk( id=str(uuid.uuid4()), @@ -448,9 +500,11 @@ async def process_yaml_file( ) rel = file.file_path.path.as_posix() - for ch in chunks: + # (vectors perf lever #1) embed chunks concurrently → batched encode. + embeddings = await asyncio.gather(*(embedder.embed(ch.text) for ch in chunks)) + + for ch, emb in zip(chunks, embeddings): rs, re = chunk_key_range(ch) - emb = await embedder.embed(ch.text) table.declare_row( row=YamlLanceChunk( id=str(uuid.uuid4()), From b810446db8c470d1a35a1c500d2a64de9314113a Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 21 Jun 2026 00:10:40 +0300 Subject: [PATCH 2/3] fix(ast): make parse_java thread-safe (per-thread Parser) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code review of the vectors perf change found a race it introduced: _parse_and_enrich_java runs enrich_chunk outside _PARSE_LOCK on concurrent worker threads, and enrich_chunk -> collect_annotation_meta_chain (lru_cached, non-serializing) -> _collect_annotation_decl_index -> parse_java. On a cold cache, multiple threads hit parse_java concurrently on the shared @lru_cache tree-sitter Parser, whose parse() mutates state and is not thread-safe -> corrupt AST or native segfault. (Before the perf change, enrich ran on the single event-loop thread, so this was serialized.) Root-cause fix at the source: _parser() now returns a per-thread Parser (threading.local) instead of a process-wide lru_cached singleton. Each OS thread gets its own Parser; the Language is immutable and shared. This protects EVERY parse_java caller — the flow's direct parse, enrich's transitive parse, and build_ast_graph — and preserves parse parallelism rather than serializing it. Removes the now-redundant _PARSE_LOCK from the flow file. Added tests/test_ast_java_thread_safety.py: 16 threads × 60 parses must each match the single-threaded reference (locks the invariant in; a revert to a shared Parser would corrupt/crash here). Verified: ruff clean; 839 light tests pass (+1 new); heavy test_lancedb_e2e main path (full reprocess + graph + search) passes. The pre-existing test_lancedb_ignore_file_reduces_indexed_java_files failure is unchanged (its own mkdir bug, fails on master). Co-Authored-By: Claude --- ast_java.py | 21 +++++++-- java_index_flow_lancedb.py | 25 ++++------- tests/test_ast_java_thread_safety.py | 66 ++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 21 deletions(-) create mode 100644 tests/test_ast_java_thread_safety.py diff --git a/ast_java.py b/ast_java.py index f6bf063..c3dc239 100644 --- a/ast_java.py +++ b/ast_java.py @@ -14,8 +14,8 @@ import posixpath import sys +import threading from dataclasses import dataclass, field -from functools import lru_cache from typing import Iterable import tree_sitter_java as _ts_java @@ -215,10 +215,23 @@ }) -@lru_cache(maxsize=1) +# tree-sitter's ``Parser`` mutates internal state during ``parse()`` and is NOT +# thread-safe, so each OS thread gets its own instance. ``parse_java`` is called +# concurrently from worker threads when indexing runs with cocoindex's inflight +# parallelism — both directly (java_index_flow_lancedb.py: process_java_file +# offloads parse+enrich to asyncio.to_thread) and transitively +# (graph_enrich.collect_annotation_meta_chain -> _collect_annotation_decl_index +# -> parse_java, reached from enrich_chunk). The ``Language`` is immutable and +# shared; per-thread ``Parser`` construction is lazy and cheap (once per thread), +# which also preserves parse parallelism instead of serializing it. +_parser_tls = threading.local() + + def _parser() -> Parser: - lang = Language(_ts_java.language()) - return Parser(lang) + p = getattr(_parser_tls, "parser", None) + if p is None: + _parser_tls.parser = p = Parser(Language(_ts_java.language())) + return p # ---------- dataclasses ---------- diff --git a/java_index_flow_lancedb.py b/java_index_flow_lancedb.py index 78fdeed..bffd8bb 100644 --- a/java_index_flow_lancedb.py +++ b/java_index_flow_lancedb.py @@ -73,16 +73,6 @@ splitter = RecursiveSplitter() -# ``parse_java`` (ast_java.parse_java) reuses a process-wide, lru-cached -# tree-sitter ``Parser`` (ast_java._parser) whose ``parse()`` mutates internal -# parser state and is NOT safe to call concurrently from multiple threads. -# ``process_java_file`` runs parsing + per-chunk enrichment off the event loop -# (vectors perf lever #2) so the loop stays free to feed the embedder's batching -# queue while a file is being parsed; this lock serializes the non-reentrant -# Parser across those worker threads. Parsing is cheap (ms-scale) so the cost -# of serializing it is negligible — the win is event-loop responsiveness. -_PARSE_LOCK = threading.Lock() - # cocoindex 1.0.7 schedules ``table.optimize()`` (a LanceDB Rewrite/compaction # transaction) as a *background* asyncio task after every # ``num_transactions_before_optimize`` mutation batches (default 50). That @@ -331,13 +321,14 @@ def _parse_and_enrich_java( parses + enriches, the event loop is free to drive other files and keep the embedder's batching queue fed. - ``parse_java`` is serialized by ``_PARSE_LOCK`` (shared non-thread-safe - tree-sitter ``Parser``). ``enrich_chunk`` is pure-Python over the now - immutable AST — its ``lru_cache`` reads are thread-safe under the GIL — so - it runs outside the lock and can overlap across files. + Thread-safety: ``parse_java`` uses a per-thread tree-sitter ``Parser`` + (see ``ast_java._parser``), so it is safe to call concurrently from these + worker threads — including the transitive ``parse_java`` that ``enrich_chunk`` + triggers via ``collect_annotation_meta_chain`` → ``_collect_annotation_decl_index``. + ``enrich_chunk`` is otherwise pure-Python over the now-immutable AST; its + ``lru_cache`` reads are thread-safe under the GIL. """ - with _PARSE_LOCK: - ast = parse_java(content_bytes) + ast = parse_java(content_bytes) return [ enrich_chunk( ast, @@ -385,7 +376,7 @@ async def process_java_file( # (vectors perf lever #2) parse + enrich off the event loop so the loop can # keep the embedder's batching queue fed while this file is being parsed. - # ``parse_java`` is lock-serialized internally (shared tree-sitter Parser). + # parse_java is thread-safe (per-thread tree-sitter Parser in ast_java). enrichments = await asyncio.to_thread( _parse_and_enrich_java, content_bytes, chunks, rel, project_root ) diff --git a/tests/test_ast_java_thread_safety.py b/tests/test_ast_java_thread_safety.py new file mode 100644 index 0000000..dc5e3f2 --- /dev/null +++ b/tests/test_ast_java_thread_safety.py @@ -0,0 +1,66 @@ +"""Regression test: ``parse_java`` must be safe to call from multiple threads. + +``ast_java._parser()`` returns a **per-thread** tree-sitter ``Parser`` because +``Parser.parse()`` mutates internal parser state and is not thread-safe on a +shared instance. ``parse_java`` is now reached concurrently from worker threads +when indexing runs with cocoindex's inflight parallelism (both directly from +``process_java_file`` and transitively from ``enrich_chunk`` → +``collect_annotation_meta_chain``). This test locks that invariant in: a future +change that reverts to a single shared ``Parser`` would corrupt parses here +(wrong counts / ``parse_error`` / native crash). +""" +from __future__ import annotations + +from concurrent.futures import ThreadPoolExecutor + +from ast_java import parse_java + +_SRC_A = b""" +package com.example.alpha; + +import java.util.List; + +public class Alpha { + private final Beta beta; + public Alpha(Beta beta) { this.beta = beta; } + public void run(int n) { + for (int i = 0; i < n; i++) { beta.handle(i); } + } +} +""" + +_SRC_B = b""" +package com.example.beta; + +public class Beta { + public void handle(int x) { System.out.println(x); } + protected int compute(long a, long b) { return (int)(a + b); } +} +""" + + +def _facts(src: bytes) -> tuple[str, int, int]: + """Stable structural fingerprint: (package, #types, #methods).""" + ast = parse_java(src) + methods = sum(len(t.methods) for t in ast.all_types) + return (ast.package, len(ast.all_types), methods) + + +def test_parse_java_concurrent_matches_single_threaded() -> None: + ref_a = _facts(_SRC_A) + ref_b = _facts(_SRC_B) + # Loose sanity: the single-threaded references must be non-trivial and + # distinct, so the equality check below is actually exercising something. + assert ref_a[1] >= 1 and ref_a[2] >= 1 + assert ref_b[1] >= 1 and ref_b[2] >= 1 + assert ref_a != ref_b + + # 16 threads each parse both sources 60×; every result must match the + # single-threaded reference. A shared Parser would corrupt some parses. + def worker() -> bool: + return all(_facts(_SRC_A) == ref_a and _facts(_SRC_B) == ref_b for _ in range(60)) + + with ThreadPoolExecutor(max_workers=16) as ex: + results = list(ex.map(lambda _: worker(), range(16))) + + assert all(results) From e7b63ec62d6d8b01a525294ffc3a3b2496e51739 Mon Sep 17 00:00:00 2001 From: Dmitry Teryaev Date: Sun, 21 Jun 2026 00:19:27 +0300 Subject: [PATCH 3/3] perf(vectors): warm per-project enrich caches before fanning out to threads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second code-review round found a perf regression introduced by lever #2: making enrich_chunk concurrent meant the first wave of worker threads each cold-miss collect_annotation_meta_chain (lru_cached, non-serializing) and each redundantly walk+parse the ENTIRE project (~min(32,cpu+4)x redundant full-project parses at cold start, GIL-serialized into the critical path) — directly offsetting the embedding-batching win on large repos. Fix: warm collect_annotation_meta_chain + load_brownfield_overrides ONCE on the event-loop thread in app_main, before coco.mount_each fans files into worker threads. Key derivation mirrors enrich_chunk exactly so the warmed entries are the ones workers hit; with warming every worker gets a cache hit (lru_cache reads are thread-safe). Wrapped so a warm-up failure never breaks indexing (falls back to lazy per-thread misses = prior behavior). The same review round confirmed ZERO correctness races remain: the per-thread Parser covers every parse_java call site (incl. enrich's transitive parse), and an exhaustive walk of enrich_chunk's callees found no other non-thread-safe shared state. Verified: ruff clean; heavy test_lancedb_e2e main path passes. Co-Authored-By: Claude --- java_index_flow_lancedb.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/java_index_flow_lancedb.py b/java_index_flow_lancedb.py index bffd8bb..3308118 100644 --- a/java_index_flow_lancedb.py +++ b/java_index_flow_lancedb.py @@ -51,7 +51,7 @@ ) from path_filtering import LayeredIgnore from ast_java import ONTOLOGY_VERSION, parse_java -from graph_enrich import enrich_chunk +from graph_enrich import collect_annotation_meta_chain, enrich_chunk, load_brownfield_overrides # Older cocoindex (e.g. 1.0.0a43) uses ``tracked=False``; newer releases renamed # the flag to ``detect_change`` (default False) and reject ``tracked``. @@ -546,6 +546,26 @@ async def app_main() -> None: ) project_root = coco.use_context(PROJECT_ROOT) + # Warm per-project enrichment caches ONCE on the event-loop thread, BEFORE + # coco.mount_each fans files into worker threads. collect_annotation_meta_chain + # and load_brownfield_overrides are lru_cached per (resolved) project root; + # without warming, the first wave of concurrent process_java_file worker + # threads each cold-miss and redundantly walk+parse the ENTIRE project (a + # thundering herd that would offset the embedding-batching win on large + # repos — perf lever #2 made enrich concurrent). With warming, every worker + # hits a populated cache (lru_cache reads are thread-safe). Key derivation + # mirrors enrich_chunk exactly so the warmed entries are the ones workers hit. + try: + load_brownfield_overrides(project_root) + try: + prs = str(Path(project_root).resolve()) + except OSError: + prs = str(project_root) + collect_annotation_meta_chain(prs) + except Exception: + # Warm-up must never break indexing — a failure just means workers + # cold-miss lazily (the pre-warming behavior). Swallow and continue. + pass _ignore = LayeredIgnore(project_root) _walk_excludes = _ignore.cocoindex_excluded_patterns() # Emit ONE approximate total so the parent's renderer can show a determinate