diff --git a/AGENTS.md b/AGENTS.md index 89f6d824..d85ad923 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -43,7 +43,7 @@ when needed. variables, full `.java-codebase-rag.yml` reference, **graph layer** (node kinds, edges, capabilities, ranking, "Re-index required" callouts), brownfield overrides, ignore patterns. The current - `ontology_version` is **15** (`EDGE_SCHEMA` in `java_ontology.py`; + `ontology_version` is **17** (`EDGE_SCHEMA` in `java_ontology.py`; material `OVERRIDES` Symbol→Symbol edges: subtype instance method → supertype declaration with matching `signature`, one `IMPLEMENTS`/`EXTENDS` hop; valid `neighbors` `EdgeType`). @@ -190,7 +190,7 @@ template): `VALID_RESOLVE_REASONS`, `VALID_UNRESOLVED_CALL_REASONS`. - Schema changes that affect the Lance index or Kuzu graph need a matching update to the README "Re-index required" callout. Bump - `ontology_version` when enrichment semantics change (currently **15**). + `ontology_version` when enrichment semantics change (currently **17**). - Brownfield is a first-class surface: any new auto-detection (route, role, capability, http client, async producer) must compose with the matching `BrownfieldOverrides` layer. Last writer wins (outermost layer diff --git a/README.md b/README.md index 81dea4ef..f880d634 100644 --- a/README.md +++ b/README.md @@ -168,7 +168,7 @@ Run `java-codebase-rag --help` to list grouped subcommands. Operator playbook wi | Group | Subcommand | What it does | |---|---|---| | Lifecycle | `init` | First-time index. Refuses if artifacts already exist. | -| Lifecycle | `increment` | CocoIndex catch-up (Lance only); Kuzu stays stale until `reprocess`. | +| Lifecycle | `increment` | CocoIndex catch-up + incremental Kuzu update. `--vectors-only` for Lance only. | | Lifecycle | `reprocess` | Full Lance + Kuzu rebuild. `--vectors-only` / `--graph-only` for a single phase. | | Lifecycle | `erase` | Delete index artifacts. Requires `--yes` or TTY confirm. | | Introspection | `meta`, `tables`, `diagnose-ignore`, `unresolved-calls` | Health, table listing, ignore-layer diagnostics, receiver-failure call sites. | @@ -212,5 +212,4 @@ The default embedding model is `sentence-transformers/all-MiniLM-L6-v2` (downloa - `get_service_topology` — microservice-level summary aggregating `HTTP_CALLS` / `ASYNC_CALLS`. - Agentic routing layer (query classifier → vector / graph / both). -- Incremental Kuzu updates (per-changed-file). - Optional `codegraph_nodes` LanceDB table embedding symbol summaries so the graph itself is vector-searchable. diff --git a/ast_java.py b/ast_java.py index 0626125e..7bab8320 100644 --- a/ast_java.py +++ b/ast_java.py @@ -83,7 +83,7 @@ # Phase 11: `EDGE_SCHEMA` in `java_ontology.py` (canonical edge navigation schema; v14 re-index). # Phase 12: CALLS `callee_declaring_role`, supertype-walk dedup, pass3 unresolved counters (v15 re-index). # Bumps whenever extraction / enrichment semantics change. -ONTOLOGY_VERSION = 16 +ONTOLOGY_VERSION = 17 ROLE_ANNOTATIONS: dict[str, str] = { # Spring Web diff --git a/build_ast_graph.py b/build_ast_graph.py index 0d11db80..f83787f6 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -401,6 +401,330 @@ class GraphTables: type_role_by_node_id: dict[str, str] = field(default_factory=dict) +@dataclass +class IncrementalResult: + """Result of an incremental graph rebuild.""" + mode: str # "incremental" | "full_fallback" + files_changed: int + files_added: int + files_removed: int + dependents_reprocessed: int + elapsed_sec: float + + +class FileHashTracker: + """Track content hashes for incremental graph rebuild.""" + def __init__(self, index_dir: Path): + self._path = index_dir / ".graph_hashes.json" + self._hashes: dict[str, str] = {} # rel_path -> sha256_hex + + def load(self) -> None: + """Load hashes from disk. No-op if file missing (first run).""" + if not self._path.exists(): + return + try: + with open(self._path, "r", encoding="utf-8") as f: + self._hashes = json.load(f) + except (json.JSONDecodeError, OSError): + # Corrupt or unreadable hash file; start fresh. + self._hashes = {} + + def save(self) -> None: + """Persist hashes to disk atomically (write .tmp, rename).""" + tmp_path = self._path.with_suffix(".json.tmp") + try: + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(self._hashes, f, sort_keys=True) + os.replace(tmp_path, self._path) + except OSError as e: + # Fail gracefully; next run will treat as missing and rebuild. + log.warning("Failed to save hash file %s: %s; next run will rebuild from scratch", self._path, e) + + def detect_changes(self, source_root: Path, ignore: LayeredIgnore) -> tuple[set[str], set[str], set[str]]: + """Return (added, changed, removed) sets of relative POSIX paths.""" + current_files: set[str] = set() + # Resolve source_root to handle symlinks + source_root_resolved = source_root.resolve() + for abs_path in iter_java_source_files(source_root, ignore=ignore): + # Resolve the absolute path and compute relative path + abs_path_resolved = abs_path.resolve() + try: + rel_path = abs_path_resolved.relative_to(source_root_resolved).as_posix() + except ValueError: + # Fallback to using the path as-is if it's not under source_root + rel_path = abs_path.as_posix() + current_files.add(rel_path) + + added: set[str] = set() + changed: set[str] = set() + removed: set[str] = set() + + # Detect added and changed files. + for rel_path in current_files: + abs_path = source_root / rel_path + try: + file_hash = _hash_file(abs_path) + except FileNotFoundError: + continue + stored_hash = self._hashes.get(rel_path) + if stored_hash is None: + added.add(rel_path) + elif stored_hash != file_hash: + changed.add(rel_path) + + # Detect removed files. + for rel_path in self._hashes: + if rel_path not in current_files: + removed.add(rel_path) + + return added, changed, removed + + def update(self, rel_paths: set[str], source_root: Path) -> None: + """Compute and store hashes for the given paths.""" + for rel_path in rel_paths: + abs_path = source_root / rel_path + if abs_path.exists(): + self._hashes[rel_path] = _hash_file(abs_path) + + +def _hash_file(abs_path: Path) -> str: + """Compute SHA-256 hash of a file's raw bytes.""" + hasher = hashlib.sha256() + with open(abs_path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +# ---------- incremental rebuild helpers ---------- + + +def _load_existing_types(conn: kuzu.Connection, tables: GraphTables, exclude_files: set[str] | None = None) -> None: + """Load type entries from existing Kuzu graph into tables for cross-file resolution. + + When exclude_files is provided, only load types from files NOT in the set. + """ + if exclude_files is not None and not exclude_files: + return + + where = "WHERE s.kind IN ['class', 'interface', 'enum', 'annotation', 'record']" + params: dict = {} + if exclude_files: + where += "\n AND NOT (s.filename IN $exclude_files)" + params["exclude_files"] = list(exclude_files) + + query = f""" + MATCH (s:Symbol) + {where} + RETURN s.kind, s.fqn, s.name, s.filename, s.module, s.microservice, s.id + """ + result = conn.execute(query, params) + while result.has_next(): + row = result.get_next() + kind, fqn, name, filename = row[0], row[1], row[2], row[3] + module = row[4] if len(row) > 4 else "" + microservice = row[5] if len(row) > 5 else "" + node_id = row[6] if len(row) > 6 else "" + + decl = TypeDecl(name, kind, fqn) + package = fqn[: -(len(name) + 1)] if fqn.endswith("." + name) else "" + + entry = TypeIndexEntry( + decl=decl, + file_path=filename, + module=module, + microservice=microservice, + package=package, + outer_fqn=None, + node_id=node_id, + ) + tables.types[fqn] = entry + tables.by_simple_name.setdefault(name, []).append(entry) + tables.by_package.setdefault(package, []).append(entry) + + +def _load_existing_members(conn: kuzu.Connection, tables: GraphTables, exclude_files: set[str] | None = None) -> None: + """Load member entries from existing Kuzu graph into tables.members. + + When exclude_files is provided, only load members from files NOT in the set. + """ + if exclude_files is not None and not exclude_files: + return + + where = "WHERE s.kind IN ['method', 'constructor']" + params: dict = {} + if exclude_files: + where += "\n AND NOT (s.filename IN $exclude_files)" + params["exclude_files"] = list(exclude_files) + + query = f""" + MATCH (s:Symbol) + {where} + RETURN s.kind, s.name, s.filename, s.signature, s.parent_id, s.fqn, s.id + """ + result = conn.execute(query, params) + while result.has_next(): + row = result.get_next() + kind, name, filename = row[0], row[1], row[2] + signature = row[3] if len(row) > 3 else "" + parent_id = row[4] if len(row) > 4 else "" + fqn = row[5] if len(row) > 5 else "" + node_id = row[6] if len(row) > 6 else "" + + parent_fqn = fqn.split("#")[0] if "#" in fqn else "" + + decl = MethodDecl(name, "", kind == "constructor") + decl.signature = signature + + tables.members.append(MemberEntry( + kind=kind, + decl=decl, + parent_id=parent_id, + parent_fqn=parent_fqn, + file_path=filename, + module="", + microservice="", + node_id=node_id, + )) + + +def _find_dependents(conn: kuzu.Connection, changed_node_ids: set[str]) -> set[str]: + """Find files whose nodes have edges pointing into changed nodes. Returns set of filenames.""" + dependent_files: set[str] = set() + + # Query each Symbol-to-Symbol edge table for incoming edges + edge_types = ["EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES"] + params = {"changed_ids": list(changed_node_ids)} + + for edge_type in edge_types: + query = f""" + MATCH (src:Symbol)-[e:{edge_type}]->(dst:Symbol) + WHERE dst.id IN $changed_ids + RETURN DISTINCT src.filename + """ + result = conn.execute(query, params) + while result.has_next(): + row = result.get_next() + filename = row[0] + if filename: # Skip phantom nodes (filename = "") + dependent_files.add(filename) + + return dependent_files + + +def _delete_file_scope(conn: kuzu.Connection, filenames: set[str]) -> None: + """Delete all nodes and edges originating from the given files. + + Skip phantom nodes (filename=""). Deletes ALL edge types in Phase 1, + then nodes in subsequent phases. Route/Client/Producer nodes use + DETACH DELETE as a safety net for any edges missed in Phase 1. + + Edges are deleted in batch across all filenames first to avoid Kuzu + "has connected edges" errors when edges from one file point to nodes + in another file within the same scope. + """ + filename_list = list(filenames) + + # Phase 1: Delete ALL edges from ALL scope files at once. + # This avoids ordering issues where file A has an edge from file B + # pointing into it; if we delete A's nodes before B's edges, Kuzu + # raises "has connected edges" errors. + edge_tables = [ + "EXTENDS", "IMPLEMENTS", "INJECTS", "CALLS", "DECLARES", "OVERRIDES", + "UNRESOLVED_AT", "EXPOSES", "DECLARES_CLIENT", "DECLARES_PRODUCER", + "HTTP_CALLS", "ASYNC_CALLS", + ] + for edge_type in edge_tables: + query = f""" + MATCH (src)-[e:{edge_type}]->(dst) + WHERE e.source_file IN $filenames + DELETE e + """ + conn.execute(query, {"filenames": filename_list}) + + # Phase 2: Collect all Symbol node IDs for UnresolvedCallSite cleanup. + symbol_ids: list[str] = [] + symbol_ids_query = """ + MATCH (s:Symbol) + WHERE s.filename IN $filenames + RETURN s.id + """ + result = conn.execute(symbol_ids_query, {"filenames": filename_list}) + while result.has_next(): + row = result.get_next() + symbol_ids.append(row[0]) + + # Delete UnresolvedCallSite nodes whose caller_id is in the collected set + if symbol_ids: + unresolved_query = """ + MATCH (u:UnresolvedCallSite) + WHERE u.caller_id IN $symbol_ids + DELETE u + """ + conn.execute(unresolved_query, {"symbol_ids": symbol_ids}) + + # Phase 3: Delete Symbol nodes. + delete_symbols_query = """ + MATCH (s:Symbol) + WHERE s.filename IN $filenames + DELETE s + """ + conn.execute(delete_symbols_query, {"filenames": filename_list}) + + # Phase 4: Delete Route, Client, Producer nodes. + # Use DETACH DELETE as a safety net in case any edges were missed in Phase 1. + for label in ["Route", "Client", "Producer"]: + conn.execute( + f"MATCH (n:{label}) WHERE n.filename IN $filenames DETACH DELETE n", + {"filenames": filename_list}, + ) + + +def _scoped_write(conn: kuzu.Connection, tables: GraphTables, *, project_root: Path, meta_chain: dict[str, frozenset[str]] | None) -> None: + """Write nodes and edges to existing Kuzu database without drop/create schema. + + Like write_kuzu() but without _drop_all()/_create_schema(). The caller is + responsible for calling _populate_declares_rows() and _populate_overrides_rows() + before invoking this function. + + Uses MERGE instead of CREATE to handle cases where nodes already exist. + """ + t0 = time.time() + _write_nodes_merge( + conn, + tables, + project_root=project_root, + meta_chain=meta_chain, + ) + elapsed = time.time() - t0 + if elapsed > 0.1: # Only log if significant + _verbose_stderr_line(f"[graph] scoped write · nodes written in {elapsed:.2f}s") + + t1 = time.time() + _fbyid = _build_file_by_node_id(tables) + _write_edges(conn, tables, _fbyid) + elapsed = time.time() - t1 + if elapsed > 0.1: + _verbose_stderr_line(f"[graph] scoped write · edges written in {elapsed:.2f}s") + + t2 = time.time() + _write_routes_and_exposes(conn, tables, _fbyid) + elapsed = time.time() - t2 + if elapsed > 0.1: + _verbose_stderr_line(f"[graph] scoped write · routes/exposes written in {elapsed:.2f}s") + + +def _write_nodes_merge( + conn: kuzu.Connection, + tables: GraphTables, + *, + project_root: Path, + meta_chain: dict[str, frozenset[str]] | None, +) -> None: + """Write nodes to existing Kuzu database using MERGE to handle existing nodes.""" + _write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain, symbol_query=_MERGE_SYMBOL) + + # ---------- file walk (see `path_filtering.iter_java_source_files`) ---------- @@ -461,8 +785,15 @@ def _register_type( return entry -def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool) -> dict[str, JavaFileAst]: - """Walk files, parse them, populate node indexes. Returns path -> AST.""" +def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool, scope_files: set[str] | None = None) -> dict[str, JavaFileAst]: + """Walk files, parse them, populate node indexes. Returns path -> AST. + + Args: + root: Source root directory. + tables: GraphTables to populate. + verbose: Whether to emit progress output. + scope_files: Optional set of relative POSIX paths to parse. If None, parse all files. + """ asts: dict[str, JavaFileAst] = {} ignore = LayeredIgnore(root) t0 = time.time() @@ -480,6 +811,13 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool) -> dict[str, if verbose and slow_sec > 0: time.sleep(slow_sec) for p in iter_java_source_files(root, ignore=ignore): + # Skip files not in scope (if scope is provided) + try: + rel = p.resolve().relative_to(root.resolve()).as_posix() + except ValueError: + rel = p.as_posix() + if scope_files is not None and rel not in scope_files: + continue n_files += 1 try: content = p.read_bytes() @@ -488,10 +826,6 @@ def pass1_parse(root: Path, tables: GraphTables, *, verbose: bool) -> dict[str, continue if not content.strip(): continue - try: - rel = p.resolve().relative_to(root.resolve()).as_posix() - except ValueError: - rel = p.as_posix() try: ast = parse_java(content, filename=rel, verbose=verbose) except Exception: @@ -2414,22 +2748,22 @@ def _micro_factor(member: MemberEntry | None) -> float: _SCHEMA_EXTENDS = ( "CREATE REL TABLE EXTENDS(FROM Symbol TO Symbol, " - "dst_name STRING, dst_fqn STRING, resolved BOOLEAN)" + "source_file STRING, dst_name STRING, dst_fqn STRING, resolved BOOLEAN)" ) _SCHEMA_IMPLEMENTS = ( "CREATE REL TABLE IMPLEMENTS(FROM Symbol TO Symbol, " - "dst_name STRING, dst_fqn STRING, resolved BOOLEAN)" + "source_file STRING, dst_name STRING, dst_fqn STRING, resolved BOOLEAN)" ) _SCHEMA_INJECTS = ( "CREATE REL TABLE INJECTS(FROM Symbol TO Symbol, " - "dst_name STRING, dst_fqn STRING, resolved BOOLEAN, " + "source_file STRING, dst_name STRING, dst_fqn STRING, resolved BOOLEAN, " "mechanism STRING, annotation STRING, field_or_param STRING)" ) -_SCHEMA_DECLARES = "CREATE REL TABLE DECLARES(FROM Symbol TO Symbol)" -_SCHEMA_OVERRIDES = "CREATE REL TABLE OVERRIDES(FROM Symbol TO Symbol)" +_SCHEMA_DECLARES = "CREATE REL TABLE DECLARES(FROM Symbol TO Symbol, source_file STRING)" +_SCHEMA_OVERRIDES = "CREATE REL TABLE OVERRIDES(FROM Symbol TO Symbol, source_file STRING)" _SCHEMA_CALLS = ( "CREATE REL TABLE CALLS(FROM Symbol TO Symbol, " - "call_site_line INT64, call_site_byte INT64, arg_count INT64, " + "source_file STRING, call_site_line INT64, call_site_byte INT64, arg_count INT64, " "confidence DOUBLE, strategy STRING, source STRING, resolved BOOLEAN, " "callee_declaring_role STRING)" ) @@ -2439,27 +2773,27 @@ def _micro_factor(member: MemberEntry | None) -> float: "arg_count INT64, callee_simple STRING, receiver_expr STRING, reason STRING, " "PRIMARY KEY(id))" ) -_SCHEMA_UNRESOLVED_AT = "CREATE REL TABLE UNRESOLVED_AT(FROM Symbol TO UnresolvedCallSite)" +_SCHEMA_UNRESOLVED_AT = "CREATE REL TABLE UNRESOLVED_AT(FROM Symbol TO UnresolvedCallSite, source_file STRING)" _SCHEMA_EXPOSES = ( "CREATE REL TABLE EXPOSES(FROM Symbol TO Route, " - "confidence DOUBLE, strategy STRING)" + "source_file STRING, confidence DOUBLE, strategy STRING)" ) _SCHEMA_DECLARES_CLIENT = ( "CREATE REL TABLE DECLARES_CLIENT(FROM Symbol TO Client, " - "confidence DOUBLE, strategy STRING)" + "source_file STRING, confidence DOUBLE, strategy STRING)" ) _SCHEMA_DECLARES_PRODUCER = ( "CREATE REL TABLE DECLARES_PRODUCER(FROM Symbol TO Producer, " - "confidence DOUBLE, strategy STRING)" + "source_file STRING, confidence DOUBLE, strategy STRING)" ) _SCHEMA_HTTP_CALLS = ( "CREATE REL TABLE HTTP_CALLS(FROM Client TO Route, " - "confidence DOUBLE, strategy STRING, " + "source_file STRING, confidence DOUBLE, strategy STRING, " "method_call STRING, raw_uri STRING, match STRING)" ) _SCHEMA_ASYNC_CALLS = ( "CREATE REL TABLE ASYNC_CALLS(FROM Producer TO Route, " - "confidence DOUBLE, strategy STRING, " + "source_file STRING, confidence DOUBLE, strategy STRING, " "direction STRING, raw_topic STRING, match STRING)" ) @@ -2538,13 +2872,25 @@ def _node_row(**kwargs) -> dict: "role: $role, signature: $signature, parent_id: $parent_id, resolved: $resolved})" ) +_MERGE_SYMBOL = ( + "MERGE (n:Symbol {id: $id}) " + "SET n.kind = $kind, n.name = $name, n.fqn = $fqn, " + "n.package = $package, n.module = $module, n.microservice = $microservice, " + "n.filename = $filename, " + "n.start_line = $start_line, n.end_line = $end_line, " + "n.start_byte = $start_byte, n.end_byte = $end_byte, " + "n.modifiers = $modifiers, n.annotations = $annotations, n.capabilities = $capabilities, " + "n.role = $role, n.signature = $signature, n.parent_id = $parent_id, n.resolved = $resolved" +) -def _write_nodes( + +def _write_nodes_impl( conn: kuzu.Connection, tables: GraphTables, *, project_root: Path, meta_chain: dict[str, frozenset[str]] | None, + symbol_query: str, ) -> None: overrides = load_brownfield_overrides(project_root) try: @@ -2555,12 +2901,12 @@ def _write_nodes( mch = meta_chain # packages for pkg, pid in tables.packages.items(): - conn.execute(_CREATE_SYMBOL, _node_row( + conn.execute(symbol_query, _node_row( id=pid, kind="package", name=pkg.rsplit(".", 1)[-1], fqn=pkg, package=pkg, )) # files for path, fid in tables.files.items(): - conn.execute(_CREATE_SYMBOL, _node_row( + conn.execute(symbol_query, _node_row( id=fid, kind="file", name=Path(path).name, fqn=path, filename=path, )) # types @@ -2572,7 +2918,7 @@ def _write_nodes( meta_chain=mch, ) tables.type_role_by_node_id[entry.node_id] = role - conn.execute(_CREATE_SYMBOL, _node_row( + conn.execute(symbol_query, _node_row( id=entry.node_id, kind=d.kind, name=d.name, fqn=d.fqn, package=entry.package, module=entry.module, microservice=entry.microservice, @@ -2588,7 +2934,7 @@ def _write_nodes( )) # members (methods / constructors) for m in tables.members: - conn.execute(_CREATE_SYMBOL, _node_row( + conn.execute(symbol_query, _node_row( id=m.node_id, kind=m.kind, name=m.decl.name, fqn=f"{m.parent_fqn}#{m.decl.signature}", package=tables.types[m.parent_fqn].package if m.parent_fqn in tables.types else "", @@ -2602,33 +2948,44 @@ def _write_nodes( )) # phantoms for pid, row in tables.phantoms.items(): - conn.execute(_CREATE_SYMBOL, row) + conn.execute(symbol_query, row) + + +def _write_nodes( + conn: kuzu.Connection, + tables: GraphTables, + *, + project_root: Path, + meta_chain: dict[str, frozenset[str]] | None, +) -> None: + _write_nodes_impl(conn, tables, project_root=project_root, meta_chain=meta_chain, symbol_query=_CREATE_SYMBOL) _CREATE_EXT = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:EXTENDS {dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)" + "CREATE (a)-[:EXTENDS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)" ) _CREATE_IMPL = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:IMPLEMENTS {dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)" + "CREATE (a)-[:IMPLEMENTS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved}]->(b)" ) _CREATE_INJ = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:INJECTS {dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved, " + "CREATE (a)-[:INJECTS {source_file: $source_file, dst_name: $dst_name, dst_fqn: $dst_fqn, resolved: $resolved, " "mechanism: $mechanism, annotation: $annotation, field_or_param: $field_or_param}]->(b)" ) _CREATE_DECL = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:DECLARES]->(b)" + "CREATE (a)-[:DECLARES {source_file: $source_file}]->(b)" ) _CREATE_OVERRIDES = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " - "CREATE (a)-[:OVERRIDES]->(b)" + "CREATE (a)-[:OVERRIDES {source_file: $source_file}]->(b)" ) _CREATE_CALL = ( "MATCH (a:Symbol {id: $src}), (b:Symbol {id: $dst}) " "CREATE (a)-[:CALLS {" + "source_file: $source_file, " "call_site_line: $line, call_site_byte: $byte, arg_count: $argc, " "confidence: $conf, strategy: $strat, source: $src_kind, resolved: $resolved, " "callee_declaring_role: $callee_declaring_role" @@ -2656,11 +3013,11 @@ def _write_nodes( _CREATE_EXPOSES = ( "MATCH (s:Symbol {id: $sid}), (r:Route {id: $rid}) " - "CREATE (s)-[:EXPOSES {confidence: $confidence, strategy: $strategy}]->(r)" + "CREATE (s)-[:EXPOSES {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(r)" ) _CREATE_DECLARES_CLIENT = ( "MATCH (s:Symbol {id: $sid}), (c:Client {id: $cid}) " - "CREATE (s)-[:DECLARES_CLIENT {confidence: $confidence, strategy: $strategy}]->(c)" + "CREATE (s)-[:DECLARES_CLIENT {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(c)" ) _CREATE_PRODUCER = ( "CREATE (:Producer {" @@ -2673,16 +3030,16 @@ def _write_nodes( ) _CREATE_DECLARES_PRODUCER = ( "MATCH (s:Symbol {id: $sid}), (p:Producer {id: $pid}) " - "CREATE (s)-[:DECLARES_PRODUCER {confidence: $confidence, strategy: $strategy}]->(p)" + "CREATE (s)-[:DECLARES_PRODUCER {source_file: $source_file, confidence: $confidence, strategy: $strategy}]->(p)" ) _CREATE_HTTP_CALL = ( "MATCH (c:Client {id: $cid}), (r:Route {id: $rid}) " - "CREATE (c)-[:HTTP_CALLS {confidence: $confidence, strategy: $strategy, " + "CREATE (c)-[:HTTP_CALLS {source_file: $source_file, confidence: $confidence, strategy: $strategy, " "method_call: $method_call, raw_uri: $raw_uri, match: $match}]->(r)" ) _CREATE_ASYNC_CALL = ( "MATCH (p:Producer {id: $pid}), (r:Route {id: $rid}) " - "CREATE (p)-[:ASYNC_CALLS {confidence: $confidence, strategy: $strategy, " + "CREATE (p)-[:ASYNC_CALLS {source_file: $source_file, confidence: $confidence, strategy: $strategy, " "direction: $direction, raw_topic: $raw_topic, match: $match}]->(r)" ) @@ -2732,30 +3089,53 @@ def _populate_overrides_rows(tables: GraphTables) -> None: ] -def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: +def _build_file_by_node_id(tables: GraphTables) -> dict[str, str]: + """Build node_id -> file_path lookup for source_file resolution.""" + lookup: dict[str, str] = {} + for entry in tables.types.values(): + lookup[entry.node_id] = entry.file_path + for m in tables.members: + lookup[m.node_id] = m.file_path + return lookup + + +def _write_edges(conn: kuzu.Connection, tables: GraphTables, _file_by_node_id: dict[str, str] | None = None) -> None: + # Build node_id -> file_path lookup for source_file resolution. + if _file_by_node_id is None: + _file_by_node_id = _build_file_by_node_id(tables) + for r in tables.extends_rows: conn.execute(_CREATE_EXT, { "src": r.src_id, "dst": r.dst_id, + "source_file": _file_by_node_id.get(r.src_id, ""), "dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved, }) for r in tables.implements_rows: conn.execute(_CREATE_IMPL, { "src": r.src_id, "dst": r.dst_id, + "source_file": _file_by_node_id.get(r.src_id, ""), "dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved, }) for r in tables.injects_rows: conn.execute(_CREATE_INJ, { "src": r.src_id, "dst": r.dst_id, + "source_file": _file_by_node_id.get(r.src_id, ""), "dst_name": r.dst_name, "dst_fqn": r.dst_fqn, "resolved": r.resolved, "mechanism": r.mechanism, "annotation": r.annotation, "field_or_param": r.field_or_param, }) for row in tables.declares_rows: - conn.execute(_CREATE_DECL, {"src": row.src_id, "dst": row.dst_id}) + conn.execute(_CREATE_DECL, { + "src": row.src_id, "dst": row.dst_id, + "source_file": _file_by_node_id.get(row.src_id, ""), + }) for row in tables.overrides_rows: - conn.execute(_CREATE_OVERRIDES, {"src": row.src_id, "dst": row.dst_id}) + conn.execute(_CREATE_OVERRIDES, { + "src": row.src_id, "dst": row.dst_id, + "source_file": _file_by_node_id.get(row.src_id, ""), + }) seen_calls: set[tuple[str, str, int, int]] = set() unique_calls: list[CallsRow] = [] @@ -2769,6 +3149,7 @@ def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: for row in unique_calls: conn.execute(_CREATE_CALL, { "src": row.src_id, "dst": row.dst_id, + "source_file": _file_by_node_id.get(row.src_id, ""), "line": row.call_site_line, "byte": row.call_site_byte, "argc": row.arg_count, @@ -2789,7 +3170,7 @@ def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: ) _CREATE_UNRESOLVED_AT = ( "MATCH (a:Symbol {id: $caller}), (u:UnresolvedCallSite {id: $ucs}) " - "CREATE (a)-[:UNRESOLVED_AT]->(u)" + "CREATE (a)-[:UNRESOLVED_AT {source_file: $source_file}]->(u)" ) seen_ucs: set[str] = set() for row in tables.unresolved_call_site_rows: @@ -2806,10 +3187,23 @@ def _write_edges(conn: kuzu.Connection, tables: GraphTables) -> None: "recv": row.receiver_expr, "reason": row.reason, }) - conn.execute(_CREATE_UNRESOLVED_AT, {"caller": row.caller_id, "ucs": row.id}) + conn.execute(_CREATE_UNRESOLVED_AT, { + "caller": row.caller_id, "ucs": row.id, + "source_file": _file_by_node_id.get(row.caller_id, ""), + }) -def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> None: +def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables, _file_by_node_id: dict[str, str] | None = None) -> None: + # Build node_id -> file_path lookup for source_file resolution (for Symbol sources). + if _file_by_node_id is None: + _file_by_node_id = _build_file_by_node_id(tables) + + # Build client_id -> filename lookup for HTTP_CALLS source_file. + _file_by_client_id: dict[str, str] = {row.id: row.filename for row in tables.client_rows} + + # Build producer_id -> filename lookup for ASYNC_CALLS source_file. + _file_by_producer_id: dict[str, str] = {row.id: row.filename for row in tables.producer_rows} + for row in tables.routes_rows: conn.execute(_CREATE_ROUTE, { "id": row.id, @@ -2834,6 +3228,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_EXPOSES, { "sid": row.symbol_id, "rid": row.route_id, + "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, }) @@ -2843,6 +3238,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_DECLARES_CLIENT, { "sid": row.symbol_id, "cid": row.client_id, + "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, }) @@ -2852,6 +3248,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_DECLARES_PRODUCER, { "sid": row.symbol_id, "pid": row.producer_id, + "source_file": _file_by_node_id.get(row.symbol_id, ""), "confidence": row.confidence, "strategy": row.strategy, }) @@ -2859,6 +3256,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_HTTP_CALL, { "cid": row.client_id, "rid": row.route_id, + "source_file": _file_by_client_id.get(row.client_id, ""), "confidence": row.confidence, "strategy": row.strategy, "method_call": row.method_call, @@ -2869,6 +3267,7 @@ def _write_routes_and_exposes(conn: kuzu.Connection, tables: GraphTables) -> Non conn.execute(_CREATE_ASYNC_CALL, { "pid": row.producer_id, "rid": row.route_id, + "source_file": _file_by_producer_id.get(row.producer_id, ""), "confidence": row.confidence, "strategy": row.strategy, "direction": row.direction, @@ -2929,28 +3328,29 @@ def _write_meta(conn: kuzu.Connection, tables: GraphTables, source_root: Path) - clients_by_kind = dict(sorted(client_stats.clients_by_kind.items())) producers_by_kind = dict(sorted(producer_stats.producers_by_kind.items())) conn.execute( - "CREATE (:GraphMeta {key: $k, ontology_version: $ov, built_at: $t, " - "source_root: $sr, counts_json: $cj, parse_errors: $pe, " - "routes_total: $routes_total, exposes_total: $exposes_total, " - "routes_by_framework: $routes_by_framework, routes_resolved_pct: $routes_resolved_pct, " - "routes_from_brownfield_pct: $routes_from_brownfield_pct, routes_by_layer: $routes_by_layer, " - "clients_total: $clients_total, declares_client_total: $declares_client_total, " - "clients_by_kind: $clients_by_kind, " - "producers_total: $producers_total, declares_producer_total: $declares_producer_total, " - "producers_by_kind: $producers_by_kind, " - "http_calls_total: $http_calls_total, async_calls_total: $async_calls_total, " - "http_calls_by_strategy: $http_calls_by_strategy, async_calls_by_strategy: $async_calls_by_strategy, " - "http_calls_resolved_pct: $http_calls_resolved_pct, async_calls_resolved_pct: $async_calls_resolved_pct, " - "http_clients_from_brownfield_pct: $http_clients_from_brownfield_pct, " - "async_producers_from_brownfield_pct: $async_producers_from_brownfield_pct, " - "http_calls_match_breakdown: $http_calls_match_breakdown, " - "async_calls_match_breakdown: $async_calls_match_breakdown, " - "cross_service_calls_total: $cross_service_calls_total, " - "pass3_skipped_cross_service: $pass3_skipped_cross_service, " - "pass3_unresolved_phantom_receiver: $pass3_unresolved_phantom_receiver, " - "pass3_unresolved_chained: $pass3_unresolved_chained, " - "pass4_exposes_suppressed_feign: $pass4_exposes_suppressed_feign, " - "cross_service_resolution: $cross_service_resolution})", + "MERGE (m:GraphMeta {key: $k}) " + "SET m.ontology_version = $ov, m.built_at = $t, " + "m.source_root = $sr, m.counts_json = $cj, m.parse_errors = $pe, " + "m.routes_total = $routes_total, m.exposes_total = $exposes_total, " + "m.routes_by_framework = $routes_by_framework, m.routes_resolved_pct = $routes_resolved_pct, " + "m.routes_from_brownfield_pct = $routes_from_brownfield_pct, m.routes_by_layer = $routes_by_layer, " + "m.clients_total = $clients_total, m.declares_client_total = $declares_client_total, " + "m.clients_by_kind = $clients_by_kind, " + "m.producers_total = $producers_total, m.declares_producer_total = $declares_producer_total, " + "m.producers_by_kind = $producers_by_kind, " + "m.http_calls_total = $http_calls_total, m.async_calls_total = $async_calls_total, " + "m.http_calls_by_strategy = $http_calls_by_strategy, m.async_calls_by_strategy = $async_calls_by_strategy, " + "m.http_calls_resolved_pct = $http_calls_resolved_pct, m.async_calls_resolved_pct = $async_calls_resolved_pct, " + "m.http_clients_from_brownfield_pct = $http_clients_from_brownfield_pct, " + "m.async_producers_from_brownfield_pct = $async_producers_from_brownfield_pct, " + "m.http_calls_match_breakdown = $http_calls_match_breakdown, " + "m.async_calls_match_breakdown = $async_calls_match_breakdown, " + "m.cross_service_calls_total = $cross_service_calls_total, " + "m.pass3_skipped_cross_service = $pass3_skipped_cross_service, " + "m.pass3_unresolved_phantom_receiver = $pass3_unresolved_phantom_receiver, " + "m.pass3_unresolved_chained = $pass3_unresolved_chained, " + "m.pass4_exposes_suppressed_feign = $pass4_exposes_suppressed_feign, " + "m.cross_service_resolution = $cross_service_resolution", { "k": "graph", "ov": ONTOLOGY_VERSION, @@ -2990,6 +3390,359 @@ def _write_meta(conn: kuzu.Connection, tables: GraphTables, source_root: Path) - ) +def incremental_rebuild( + source_root: Path, + kuzu_path: Path, + *, + verbose: bool, + expansion_cap: int = 50, +) -> IncrementalResult: + """Incrementally rebuild the Kuzu graph, processing only changed files and their dependents. + + Returns IncrementalResult with statistics about the rebuild. + Falls back to full rebuild if: + - No previous graph exists + - Ontology version < 17 (missing source_file on edges) + - Crash marker exists (previous incremental run failed) + - Dependent expansion exceeds expansion_cap + """ + t_start = time.time() + + # Step 1: Load existing graph and detect changes + if not kuzu_path.exists(): + if verbose: + _verbose_stderr_line("[increment] no existing graph; falling back to full rebuild") + # Fall back to full rebuild + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=verbose) + pass2_edges(tables, asts, verbose=verbose) + pass3_calls(tables, asts, verbose=verbose) + pass4_routes(tables, asts, source_root=source_root, verbose=verbose) + pass5_imperative_edges(tables, asts, source_root=source_root, verbose=verbose) + pass6_match_edges(tables, verbose=verbose) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=verbose) + + n_files = _init_hash_tracker(source_root, kuzu_path) + + return IncrementalResult( + mode="full_fallback", + files_changed=0, + files_added=n_files, + files_removed=0, + dependents_reprocessed=0, + elapsed_sec=time.time() - t_start, + ) + + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + + # Check ontology version + try: + meta_result = conn.execute("MATCH (m:GraphMeta) RETURN m.ontology_version AS version") + if meta_result.has_next(): + row = meta_result.get_next() + version = row[0] if row else 0 + if version < 17: + if verbose: + _verbose_stderr_line(f"[increment] ontology version {version} < 17; falling back to full rebuild") + conn.close() + del conn, db + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + except Exception as e: + if verbose: + _verbose_stderr_line(f"[increment] failed to read ontology version: {e}; falling back to full rebuild") + try: + conn.close() + except Exception: + pass + del conn, db + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + + index_dir = kuzu_path.parent + tracker = FileHashTracker(index_dir) + tracker.load() + + ignore = LayeredIgnore(source_root) + added, changed, removed = tracker.detect_changes(source_root, ignore=ignore) + + changed_files = added | changed | removed + + if not changed_files: + if verbose: + _verbose_stderr_line("[increment] no changes detected; no-op") + conn.close() + return IncrementalResult( + mode="incremental", + files_changed=0, + files_added=0, + files_removed=0, + dependents_reprocessed=0, + elapsed_sec=time.time() - t_start, + ) + + if verbose: + _verbose_stderr_line(f"[increment] detected {len(added)} added, {len(changed)} changed, {len(removed)} removed files") + + # Step 2: Crash marker check + crash_marker_path = index_dir / ".graph_increment_in_progress" + if crash_marker_path.exists(): + if verbose: + _verbose_stderr_line("[increment] crash marker exists; falling back to full rebuild") + conn.close() + crash_marker_path.unlink(missing_ok=True) + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + + # Write crash marker + crash_marker_path.write_text("", encoding="utf-8") + + try: + # Step 3: Dependent expansion + # Collect node IDs for changed files (single query instead of N+1) + changed_node_ids: set[str] = set() + result = conn.execute( + "MATCH (s:Symbol) WHERE s.filename IN $filenames RETURN s.id", + {"filenames": list(changed_files)}, + ) + while result.has_next(): + row = result.get_next() + changed_node_ids.add(row[0]) + + # Find dependents + dependent_files = _find_dependents(conn, changed_node_ids) + + # Union changed files with dependents + scope_files = changed_files | dependent_files + + if len(scope_files) > expansion_cap: + if verbose: + _verbose_stderr_line(f"[increment] dependent expansion cap ({expansion_cap}) exceeded ({len(scope_files)} files); falling back to full rebuild") + conn.close() + crash_marker_path.unlink(missing_ok=True) + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + + if verbose: + _verbose_stderr_line(f"[increment] processing {len(scope_files)} files ({len(changed_files)} changed + {len(dependent_files)} dependents)") + + # Step 4: Scoped deletion + if verbose: + _verbose_stderr_line("[increment] deleting outdated nodes and edges") + _delete_file_scope(conn, scope_files) + + # Force deletion to be applied by running a dummy query + conn.execute("MATCH (s:Symbol) RETURN count(*)") + + # Step 5: Scoped pass 1-4 + if verbose: + _verbose_stderr_line("[increment] rebuilding scoped files (passes 1-4)") + + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=verbose, scope_files=scope_files) + + # Load existing types and members for cross-file resolution (only from unchanged files) + _load_existing_types(conn, tables, exclude_files=scope_files) + _load_existing_members(conn, tables, exclude_files=scope_files) + + pass2_edges(tables, asts, verbose=verbose) + pass3_calls(tables, asts, verbose=verbose) + pass4_routes(tables, asts, source_root=source_root, verbose=verbose) + + # Populate declares and overrides rows + _populate_declares_rows(tables) + _populate_overrides_rows(tables) + + # Write scoped nodes and edges + meta_chain = collect_annotation_meta_chain(str(source_root.resolve())) + _scoped_write(conn, tables, project_root=source_root, meta_chain=meta_chain) + + # Step 6: Global pass 5-6 + if verbose: + _verbose_stderr_line("[increment] running global passes 5-6") + + # Rebuild full tables for global pass 5-6 (pass1 populates members from scratch) + tables_for_global = GraphTables() + global_asts = pass1_parse(source_root, tables_for_global, verbose=verbose) + + pass5_imperative_edges(tables_for_global, global_asts, source_root=source_root, verbose=verbose) + + # Delete existing Client, Producer, and their edges + conn.execute("MATCH (c:Client) DETACH DELETE c") + conn.execute("MATCH (p:Producer) DETACH DELETE p") + + pass6_match_edges(tables_for_global, verbose=verbose) + + # Write Client, Producer, and cross-service edges + _write_clients_producers_and_calls(conn, tables_for_global) + + # Step 7: Update hash store and metadata + if verbose: + _verbose_stderr_line("[increment] updating hash store and metadata") + + # Update hashes for processed files + tracker.update(scope_files, source_root) + + # Remove hashes for deleted files + for filename in removed: + if filename in tracker._hashes: + del tracker._hashes[filename] + + tracker.save() + + # Update GraphMeta + _write_meta(conn, tables_for_global, source_root) + + # Remove crash marker + crash_marker_path.unlink(missing_ok=True) + + conn.close() + + elapsed = time.time() - t_start + if verbose: + _verbose_stderr_line(f"[increment] completed in {elapsed:.2f}s") + + return IncrementalResult( + mode="incremental", + files_changed=len(changed), + files_added=len(added), + files_removed=len(removed), + dependents_reprocessed=len(dependent_files), + elapsed_sec=elapsed, + ) + + except Exception as e: + # On error, remove crash marker and fall back to full rebuild + if verbose: + _verbose_stderr_line(f"[increment] error during incremental rebuild: {e}; falling back to full rebuild") + conn.close() + crash_marker_path.unlink(missing_ok=True) + return _fallback_to_full(source_root, kuzu_path, verbose, t_start) + + +def _init_hash_tracker(source_root: Path, kuzu_path: Path) -> int: + """Initialize hash tracker for all Java files. Returns number of files hashed.""" + index_dir = kuzu_path.parent + tracker = FileHashTracker(index_dir) + tracker.load() + ignore = LayeredIgnore(source_root) + all_files: set[str] = set() + source_root_resolved = source_root.resolve() + for p in iter_java_source_files(source_root, ignore=ignore): + p_resolved = p.resolve() + try: + rel_path = p_resolved.relative_to(source_root_resolved).as_posix() + except ValueError: + rel_path = p.as_posix() + all_files.add(rel_path) + tracker.update(all_files, source_root) + tracker.save() + return len(all_files) + + +def _fallback_to_full(source_root: Path, kuzu_path: Path, verbose: bool, t_start: float) -> IncrementalResult: + """Fallback to full rebuild.""" + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=verbose) + pass2_edges(tables, asts, verbose=verbose) + pass3_calls(tables, asts, verbose=verbose) + pass4_routes(tables, asts, source_root=source_root, verbose=verbose) + pass5_imperative_edges(tables, asts, source_root=source_root, verbose=verbose) + pass6_match_edges(tables, verbose=verbose) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=verbose) + + n_files = _init_hash_tracker(source_root, kuzu_path) + + return IncrementalResult( + mode="full_fallback", + files_changed=0, + files_added=n_files, + files_removed=0, + dependents_reprocessed=0, + elapsed_sec=time.time() - t_start, + ) + + +def _write_clients_producers_and_calls(conn: kuzu.Connection, tables: GraphTables) -> None: + """Write Route, Client, Producer, and cross-service edges to Kuzu. + + Used by the incremental rebuild's global pass 5-6 step. Writes phantom + Route nodes (created by pass5 for cross-service calls) that wouldn't + otherwise exist in Kuzu. + """ + # Write phantom routes that don't already exist (pass5 creates these for cross-service calls) + for row in tables.routes_rows: + # MERGE to avoid duplicates with routes written during scoped step + conn.execute( + "MERGE (r:Route {id: $id}) " + "SET r.kind = $kind, r.framework = $framework, r.method = $method, " + "r.path = $path, r.path_template = $path_template, r.path_regex = $path_regex, " + "r.topic = $topic, r.broker = $broker, r.feign_name = $feign_name, r.feign_url = $feign_url, " + "r.microservice = $microservice, r.module = $module, r.filename = $filename, " + "r.start_line = $start_line, r.end_line = $end_line, r.resolved = $resolved", + asdict(row), + ) + + # Build node_id lookup for members and types + member_by_id = {m.node_id: m for m in tables.members} + + # Write clients and producers using asdict (same pattern as _write_routes_and_exposes) + for row in tables.client_rows: + conn.execute(_CREATE_CLIENT, asdict(row)) + for row in tables.producer_rows: + conn.execute(_CREATE_PRODUCER, asdict(row)) + + client_by_id = {c.id: c for c in tables.client_rows} + producer_by_id = {p.id: p for p in tables.producer_rows} + + # Write declares_client edges + for row in tables.declares_client_rows: + source_file = member_by_id.get(row.symbol_id, MemberEntry(kind="", decl=None, parent_id="", parent_fqn="", file_path="", module="", microservice="")).file_path + conn.execute(_CREATE_DECLARES_CLIENT, { + "sid": row.symbol_id, + "cid": row.client_id, + "source_file": source_file, + "confidence": row.confidence, + "strategy": row.strategy, + }) + + # Write declares_producer edges + for row in tables.declares_producer_rows: + source_file = member_by_id.get(row.symbol_id, MemberEntry(kind="", decl=None, parent_id="", parent_fqn="", file_path="", module="", microservice="")).file_path + conn.execute(_CREATE_DECLARES_PRODUCER, { + "sid": row.symbol_id, + "pid": row.producer_id, + "source_file": source_file, + "confidence": row.confidence, + "strategy": row.strategy, + }) + + # Write HTTP_CALLS edges + for row in tables.http_call_rows: + client = client_by_id.get(row.client_id) + conn.execute(_CREATE_HTTP_CALL, { + "cid": row.client_id, + "rid": row.route_id, + "source_file": client.filename if client else "", + "confidence": row.confidence, + "strategy": row.strategy, + "method_call": row.method_call, + "raw_uri": row.raw_uri, + "match": row.match, + }) + + # Write ASYNC_CALLS edges + for row in tables.async_call_rows: + producer = producer_by_id.get(row.producer_id) + conn.execute(_CREATE_ASYNC_CALL, { + "pid": row.producer_id, + "rid": row.route_id, + "source_file": producer.filename if producer else "", + "confidence": row.confidence, + "strategy": row.strategy, + "direction": row.direction, + "raw_topic": row.raw_topic, + "match": row.match, + }) + + def write_kuzu( db_path: Path, tables: GraphTables, @@ -3022,11 +3775,12 @@ def write_kuzu( _populate_declares_rows(tables) _populate_overrides_rows(tables) t1 = time.time() - _write_edges(conn, tables) + _fbyid = _build_file_by_node_id(tables) + _write_edges(conn, tables, _fbyid) if verbose: _verbose_stderr_line(f"[graph] writing · edges written in {time.time() - t1:.2f}s") t2 = time.time() - _write_routes_and_exposes(conn, tables) + _write_routes_and_exposes(conn, tables, _fbyid) if verbose: _verbose_stderr_line(f"[graph] writing · routes/exposes written in {time.time() - t2:.2f}s") _write_meta(conn, tables, source_root) @@ -3055,6 +3809,7 @@ def main() -> int: ), ) parser.add_argument("--verbose", action="store_true") + parser.add_argument("--incremental", action="store_true", help="Run incremental rebuild instead of full rebuild") args = parser.parse_args() root = Path(args.source_root).expanduser().resolve() if args.source_root else Path.cwd().resolve() @@ -3064,6 +3819,20 @@ def main() -> int: kuzu_path = Path(args.kuzu_path).expanduser() if args.kuzu_path else _default_kuzu_path() + if args.incremental: + result = incremental_rebuild(root, kuzu_path, verbose=args.verbose) + print(json.dumps({ + "mode": result.mode, + "files_changed": result.files_changed, + "files_added": result.files_added, + "files_removed": result.files_removed, + "dependents_reprocessed": result.dependents_reprocessed, + "elapsed_sec": result.elapsed_sec, + })) + if args.verbose: + _verbose_stderr_line(f"[graph] done · mode={result.mode} files_changed={result.files_changed} files_added={result.files_added} files_removed={result.files_removed} dependents={result.dependents_reprocessed} elapsed={result.elapsed_sec:.2f}s") + return 0 + tables = GraphTables() asts = pass1_parse(root, tables, verbose=args.verbose) pass2_edges(tables, asts, verbose=args.verbose) diff --git a/docs/JAVA-CODEBASE-RAG-CLI.md b/docs/JAVA-CODEBASE-RAG-CLI.md index 80a971be..d95a62af 100644 --- a/docs/JAVA-CODEBASE-RAG-CLI.md +++ b/docs/JAVA-CODEBASE-RAG-CLI.md @@ -76,12 +76,21 @@ java-codebase-rag init --source-root /path/to/java/repo --index-dir /path/to/.ja ### `increment` -Runs cocoindex **catch-up** without a full Lance reprocess. **Does not** rebuild Kuzu. Every run prints a **multi-line stderr warning** that graph navigation may be stale until you run `reprocess` (see [`propose/completed/CLI-SCENARIOS-PROPOSE.md`](../propose/completed/CLI-SCENARIOS-PROPOSE.md) Appendix A for the contract). +Runs cocoindex **catch-up** and **incremental Kuzu graph update**. Only changed files and their single-hop dependents are re-parsed and re-written to the graph. Passes 5–6 (client/producer extraction and cross-service matching) run globally. Falls back to full `reprocess` if: +- No previous graph exists (first run) +- Graph schema is outdated (missing `source_file` on edges) +- Previous incremental run crashed (crash marker detected) +- Dependent expansion exceeds 50 files ```bash java-codebase-rag increment --source-root /path/to/java/repo --index-dir /path/to/.java-codebase-rag --quiet ``` +**Flags:** +- `--vectors-only` — runs only cocoindex catch-up; skips graph update and emits stale-graph warning. Use this when you want the old Lance-only behavior. + +**Migration note:** After upgrading, run `reprocess` once to ensure edge tables have `source_file` columns (ontology version 17+). + ### `reprocess` **Default (no extra flags):** full **Lance** reprocess (cocoindex `--full-reprocess`) then full **Kuzu** rebuild via `build_ast_graph.py`, in that order. This remains the recommended **coherence** operation when both stores might be out of date. diff --git a/java_codebase_rag/cli.py b/java_codebase_rag/cli.py index a3281e71..4d86c2d7 100644 --- a/java_codebase_rag/cli.py +++ b/java_codebase_rag/cli.py @@ -21,7 +21,7 @@ index_dir_has_existing_artifacts, resolve_operator_config, ) -from java_codebase_rag.pipeline import clip, run_build_ast_graph, run_cocoindex_drop, run_cocoindex_update +from java_codebase_rag.pipeline import clip, run_build_ast_graph, run_cocoindex_drop, run_cocoindex_update, run_incremental_graph from java_ontology import VALID_UNRESOLVED_CALL_REASONS KUZU_INCREMENTAL_TRACKING_ISSUE_URL = "https://github.com/HumanBean17/java-codebase-rag/issues/73" @@ -310,7 +310,11 @@ def _cmd_increment(args: argparse.Namespace) -> int: cfg = _resolved_from_ns(args) _startup_hints(cfg) cfg.apply_to_os_environ() - _emit_increment_kuzu_warning() + + # Check for --vectors-only flag + vectors_only = bool(getattr(args, "vectors_only", False)) + if vectors_only: + _emit_increment_kuzu_warning() def work() -> int: env = cfg.subprocess_env() @@ -332,7 +336,50 @@ def work() -> int: } ) return 1 - _emit({"success": True, "message": "increment completed (Lance only; graph may be stale — see stderr)"}) + + # If --vectors-only is set, skip graph update + if vectors_only: + _emit({"success": True, "message": "increment completed (Lance only; graph may be stale — see stderr)"}) + return 0 + + # Run incremental graph update + g = run_incremental_graph( + source_root=cfg.source_root, + kuzu_path=cfg.kuzu_path, + verbose=bool(args.verbose), + quiet=bool(args.quiet), + env=env, + ) + + # Check if incremental fell back to full rebuild + if g.returncode == 0 and g.stdout: + # Parse stdout to check for full_fallback mode + # The incremental_rebuild function returns a JSON payload with mode field + try: + result = json.loads(g.stdout.strip()) + if result.get("mode") == "full_fallback": + print( + "[increment] fell back to full graph rebuild — this is normal after schema changes or first run", + file=sys.stderr, + flush=True, + ) + except (json.JSONDecodeError, ValueError): + # If parsing fails, continue silently + pass + + if g.returncode != 0: + _emit( + { + "success": False, + "exit_code": g.returncode, + "stdout": clip(g.stdout, 4000), + "stderr": clip(g.stderr, 4000), + "message": f"graph builder exit {g.returncode}", + } + ) + return 1 + + _emit({"success": True, "message": "increment completed (Lance + graph updated)"}) return 0 return _run_with_pipeline_progress("increment", cfg, quiet=bool(args.quiet), work=work) @@ -627,7 +674,7 @@ def build_parser() -> argparse.ArgumentParser: "--quiet suppresses that stream; stdout remains the machine-readable payload.\n\n" "Lifecycle (manage the index):\n" " init Create a fresh index from a Java repository.\n" - " increment Pick up changes since the last index update (Lance only).\n" + " increment Pick up changes since the last index update (Lance + graph).\n" " reprocess Full vector + graph rebuild (default); optional --vectors-only / --graph-only.\n" " erase Delete the index from disk.\n\n" "Introspection (inspect the index):\n" @@ -662,10 +709,15 @@ def build_parser() -> argparse.ArgumentParser: increment = subparsers.add_parser( "increment", help="Pick up changes since the last index update.", - description="Runs cocoindex catch-up (no full reprocess). Does not rebuild Kuzu; see stderr warning.", + description="Runs cocoindex catch-up and incremental Kuzu graph update. Use --vectors-only to skip graph update.", ) _add_index_embedding_flags(increment) _add_verbosity_flags(increment) + increment.add_argument( + "--vectors-only", + action="store_true", + help="Run only cocoindex catch-up (Lance); skip graph update.", + ) increment.set_defaults(handler=_cmd_increment) reprocess = subparsers.add_parser( diff --git a/java_codebase_rag/pipeline.py b/java_codebase_rag/pipeline.py index f1d34270..83262c69 100644 --- a/java_codebase_rag/pipeline.py +++ b/java_codebase_rag/pipeline.py @@ -247,5 +247,60 @@ def run_build_ast_graph( return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s) +def run_incremental_graph( + *, + source_root: Path, + kuzu_path: Path, + verbose: bool, + quiet: bool = False, + env: dict[str, str] | None = None, +) -> subprocess.CompletedProcess[str]: + """Run incremental graph rebuild by passing --incremental flag to build_ast_graph.py.""" + builder = bundle_dir() / "build_ast_graph.py" + if not builder.is_file(): + return subprocess.CompletedProcess( + args=[], + returncode=126, + stdout="", + stderr=f"build_ast_graph.py not found under {builder.parent}", + ) + cmd: list[str] = [ + sys.executable, + str(builder), + "--source-root", + str(source_root), + "--kuzu-path", + str(kuzu_path), + "--incremental", + ] + # Three-tier: --quiet (silent) / default (filtered progress) / --verbose (raw). + # Default passes --verbose so the builder emits per-pass progress lines, + # which the parent filters via _LineFilter. --verbose bypasses the filter. + if verbose or not quiet: + cmd.append("--verbose") + if quiet: + return subprocess.run( + cmd, + cwd=str(source_root), + env=env or os.environ.copy(), + capture_output=True, + text=True, + ) + proc = subprocess.Popen( + cmd, + cwd=str(source_root), + env=env or os.environ.copy(), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + out_s, err_s, code = _popen_capturing_stderr(proc, verbose=verbose) + if not verbose: + from java_codebase_rag.cli_format import bold_cyan, styled_check, styled_cross + marker = styled_check() if code == 0 else styled_cross() + print(f"{marker} {bold_cyan('[increment]')} done", file=sys.stderr, flush=True) + return subprocess.CompletedProcess(args=cmd, returncode=code, stdout=out_s, stderr=err_s) + + def clip(s: str, n: int) -> str: return s[-n:] if len(s) > n else s diff --git a/tests/test_incremental_graph.py b/tests/test_incremental_graph.py new file mode 100644 index 00000000..b529a678 --- /dev/null +++ b/tests/test_incremental_graph.py @@ -0,0 +1,780 @@ +"""Tests for incremental graph rebuild functionality (PR-G1 and PR-G2). + +Tests cover FileHashTracker behavior, edge schema source_file column, and incremental orchestrator. +""" +from __future__ import annotations + +from pathlib import Path + +import kuzu + +from ast_java import ONTOLOGY_VERSION +from build_ast_graph import FileHashTracker, GraphTables, pass1_parse, pass2_edges +from path_filtering import LayeredIgnore + + +class TestFileHashTracker: + """Test FileHashTracker change detection and persistence.""" + + def test_file_hash_tracker_detects_added_file(self, tmp_path: Path) -> None: + """Empty hash store, one file in source → added populated.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + added, changed, removed = tracker.detect_changes(source_root, ignore=ignore) + + assert len(added) == 1 + assert "Test.java" in added + assert len(changed) == 0 + assert len(removed) == 0 + + def test_file_hash_tracker_detects_changed_file(self, tmp_path: Path) -> None: + """Stored hash differs from current → changed populated.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"Test.java"}, source_root) + tracker.save() + + # Modify the file + test_file.write_text("class Test { String x; }", encoding="utf-8") + + tracker2 = FileHashTracker(index_dir) + tracker2.load() + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + added, changed, removed = tracker2.detect_changes(source_root, ignore=ignore) + + assert len(added) == 0 + assert len(changed) == 1 + assert "Test.java" in changed + assert len(removed) == 0 + + def test_file_hash_tracker_detects_removed_file(self, tmp_path: Path) -> None: + """Hash store has entry but file gone → removed populated.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"Test.java"}, source_root) + tracker.save() + + # Remove the file + test_file.unlink() + + tracker2 = FileHashTracker(index_dir) + tracker2.load() + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + added, changed, removed = tracker2.detect_changes(source_root, ignore=ignore) + + assert len(added) == 0 + assert len(changed) == 0 + assert len(removed) == 1 + assert "Test.java" in removed + + def test_file_hash_tracker_no_changes(self, tmp_path: Path) -> None: + """Identical hashes → all three sets empty.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"Test.java"}, source_root) + tracker.save() + + tracker2 = FileHashTracker(index_dir) + tracker2.load() + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + added, changed, removed = tracker2.detect_changes(source_root, ignore=ignore) + + assert len(added) == 0 + assert len(changed) == 0 + assert len(removed) == 0 + + def test_file_hash_tracker_save_and_load_roundtrip(self, tmp_path: Path) -> None: + """Save hashes, new tracker instance loads same data.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file1 = source_root / "A.java" + test_file1.write_text("class A {}", encoding="utf-8") + test_file2 = source_root / "B.java" + test_file2.write_text("class B {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"A.java", "B.java"}, source_root) + tracker.save() + + tracker2 = FileHashTracker(index_dir) + tracker2.load() + + assert len(tracker2._hashes) == 2 + assert "A.java" in tracker2._hashes + assert "B.java" in tracker2._hashes + + def test_file_hash_tracker_atomic_save(self, tmp_path: Path) -> None: + """.graph_hashes.json.tmp not left behind on successful save.""" + index_dir = tmp_path / "index" + index_dir.mkdir() + source_root = tmp_path / "src" + source_root.mkdir() + test_file = source_root / "Test.java" + test_file.write_text("class Test {}", encoding="utf-8") + + tracker = FileHashTracker(index_dir) + tracker.load() + tracker.update({"Test.java"}, source_root) + tracker.save() + + # Verify the tmp file is not left behind + tmp_file = index_dir / ".graph_hashes.json.tmp" + assert not tmp_file.exists() + + # Verify the actual file exists + actual_file = index_dir / ".graph_hashes.json" + assert actual_file.exists() + + +class TestEdgeSchema: + """Test edge schema has source_file column and correct values.""" + + def test_edge_schema_has_source_file(self, tmp_path: Path) -> None: + """Build a full graph, query each edge table for source_file column existence and non-empty values.""" + from _builders import build_kuzu_full_into + + corpus_root = Path(__file__).parent / "bank-chat-system" + db_path = tmp_path / "test_graph.kuzu" + build_kuzu_full_into(corpus_root, db_path) + + conn = kuzu.Connection(kuzu.Database(str(db_path), read_only=True)) + + # All 12 edge tables should have source_file column + edge_tables = [ + "EXTENDS", "IMPLEMENTS", "INJECTS", "DECLARES", "OVERRIDES", + "CALLS", "UNRESOLVED_AT", "EXPOSES", "DECLARES_CLIENT", + "DECLARES_PRODUCER", "HTTP_CALLS", "ASYNC_CALLS" + ] + + for table in edge_tables: + # Check column exists by querying a sample and accessing source_file + query = f"MATCH ()-[e:{table}]->() RETURN e.source_file LIMIT 1" + result = conn.execute(query) + has_data = result.has_next() + if has_data: + row = result.get_next() + # source_file should be a string + assert row is not None + + def test_source_file_value_matches_symbol_filename(self, tmp_path: Path) -> None: + """For edges originating from Symbol nodes, edge's source_file equals source Symbol's filename.""" + from _builders import build_kuzu_full_into + + corpus_root = Path(__file__).parent / "bank-chat-system" + db_path = tmp_path / "test_graph.kuzu" + build_kuzu_full_into(corpus_root, db_path) + + conn = kuzu.Connection(kuzu.Database(str(db_path), read_only=True)) + + # Test CALLS edge: source_file should match caller Symbol's filename + query = """ + MATCH (caller:Symbol)-[e:CALLS]->(callee:Symbol) + RETURN caller.filename, e.source_file + LIMIT 1 + """ + result = conn.execute(query) + if result.has_next(): + caller_filename, edge_source_file = result.get_next() + assert caller_filename == edge_source_file + + # Test EXTENDS edge + query = """ + MATCH (sub:Symbol)-[e:EXTENDS]->(super:Symbol) + RETURN sub.filename, e.source_file + LIMIT 1 + """ + result = conn.execute(query) + if result.has_next(): + sub_filename, edge_source_file = result.get_next() + assert sub_filename == edge_source_file + + def test_ontology_version_bumped_to_17(self) -> None: + """ONTOLOGY_VERSION == 17.""" + assert ONTOLOGY_VERSION == 17 + + +class TestIncrementalOrchestrator: + """Test incremental rebuild orchestrator (PR-G2).""" + + def test_incremental_single_file_change(self, tmp_path: Path) -> None: + """Change one .java file, run incremental, verify only that file's nodes changed.""" + from build_ast_graph import incremental_rebuild + + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create initial files + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + (source_root / "B.java").write_text("package pkg; class B extends A {}", encoding="utf-8") + + # Initial build + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + assert len(asts) == 2 + + # Build full graph (pass2 needed for EXTENDS edges) + from build_ast_graph import write_kuzu + pass2_edges(tables, asts, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + for rel_path in ["A.java", "B.java"]: + tracker.update({rel_path}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text("package pkg; class A { void foo() {} }", encoding="utf-8") + + # Run incremental + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + assert result.files_changed == 1 + assert result.files_added == 0 + assert result.files_removed == 0 + assert result.dependents_reprocessed >= 1 # B depends on A + + def test_incremental_new_file(self, tmp_path: Path) -> None: + """Add a new .java file, run incremental, verify all new nodes/edges appear.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create initial file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Add new file + (source_root / "B.java").write_text("package pkg; class B {}", encoding="utf-8") + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + assert result.files_changed == 0 + assert result.files_added == 1 + + def test_incremental_deleted_file(self, tmp_path: Path) -> None: + """Remove a .java file from fixture, run incremental, verify orphaned nodes/edges cleaned up.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create initial files + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + (source_root / "B.java").write_text("package pkg; class B {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java", "B.java"}, source_root) + tracker.save() + + # Delete B.java + (source_root / "B.java").unlink() + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + assert result.files_changed == 0 + assert result.files_added == 0 + assert result.files_removed == 1 + + # Verify B's nodes are deleted + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + check_result = conn.execute("MATCH (s:Symbol) WHERE s.fqn = 'pkg.B' RETURN count(*)") + if check_result.has_next(): + count = check_result.get_next()[0] + assert count == 0 + + def test_incremental_phantom_nodes_preserved(self, tmp_path: Path) -> None: + """Run incremental after a change, verify phantom nodes (those with filename = "") are untouched.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create file with external reference + (source_root / "A.java").write_text( + "package pkg; import java.util.List; class A { List list; }", + encoding="utf-8", + ) + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Count phantom nodes before + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + phantom_count_before = 0 + phantom_result = conn.execute("MATCH (s:Symbol) WHERE s.filename = '' RETURN count(*)") + if phantom_result.has_next(): + phantom_count_before = phantom_result.get_next()[0] + + conn.close() + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text( + "package pkg; import java.util.List; class A { List list; }", + encoding="utf-8", + ) + + # Run incremental + from build_ast_graph import incremental_rebuild + incremental_rebuild(source_root, kuzu_path, verbose=False) + + # Verify phantom nodes still exist + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + phantom_count_after = 0 + phantom_result = conn.execute("MATCH (s:Symbol) WHERE s.filename = '' RETURN count(*)") + if phantom_result.has_next(): + phantom_count_after = phantom_result.get_next()[0] + + assert phantom_count_after >= phantom_count_before + + def test_incremental_dependent_expansion(self, tmp_path: Path) -> None: + """Change a base class, verify that files with EXTENDS/IMPLEMENTS edges into it are also reprocessed.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create files with inheritance + (source_root / "Base.java").write_text("package pkg; class Base {}", encoding="utf-8") + (source_root / "Derived.java").write_text( + "package pkg; class Derived extends Base {}", encoding="utf-8" + ) + + # Initial build (pass2 needed for EXTENDS edges) + from build_ast_graph import write_kuzu + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + pass2_edges(tables, asts, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"Base.java", "Derived.java"}, source_root) + tracker.save() + + # Modify Base.java + (source_root / "Base.java").write_text( + "package pkg; class Base { void foo() {} }", encoding="utf-8" + ) + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + # Derived.java should be reprocessed due to EXTENDS edge + assert result.dependents_reprocessed >= 1 + + def test_incremental_expansion_cap_fallback(self, tmp_path: Path) -> None: + """Mock expansion_cap=2, change a widely-used file that has >2 dependents, verify fallback to full rebuild.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create base class and many derived classes + (source_root / "Base.java").write_text("package pkg; class Base {}", encoding="utf-8") + for i in range(5): + (source_root / f"Derived{i}.java").write_text( + f"package pkg; class Derived{i} extends Base {{}}", encoding="utf-8" + ) + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + pass2_edges(tables, asts, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + all_files = {"Base.java"} | {f"Derived{i}.java" for i in range(5)} + tracker.update(all_files, source_root) + tracker.save() + + # Modify Base.java + (source_root / "Base.java").write_text( + "package pkg; class Base { void foo() {} }", encoding="utf-8" + ) + + # Run incremental with low expansion cap + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False, expansion_cap=2) + + # Should fall back to full rebuild due to cap exceeded + assert result.mode == "full_fallback" + + def test_incremental_crash_marker_triggers_fallback(self, tmp_path: Path) -> None: + """Leave .graph_increment_in_progress marker, run incremental, verify full rebuild happens.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Create crash marker + crash_marker = index_dir / ".graph_increment_in_progress" + crash_marker.write_text("", encoding="utf-8") + + # Modify A.java + (source_root / "A.java").write_text( + "package pkg; class A { void foo() {} }", encoding="utf-8" + ) + + # Run incremental - should fall back to full rebuild + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "full_fallback" + # Crash marker should be removed + assert not crash_marker.exists() + + def test_incremental_crash_marker_removed_on_success(self, tmp_path: Path) -> None: + """Run successful incremental, verify marker file is removed.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text( + "package pkg; class A { void foo() {} }", encoding="utf-8" + ) + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + + # Crash marker should not exist + crash_marker = index_dir / ".graph_increment_in_progress" + assert not crash_marker.exists() + + def test_incremental_no_changes_is_noop(self, tmp_path: Path) -> None: + """Run incremental with no file changes, verify graph is unchanged (same node/edge counts).""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Get node count before + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + count_before_result = conn.execute("MATCH (s:Symbol) RETURN count(*)") + count_before = 0 + if count_before_result.has_next(): + count_before = count_before_result.get_next()[0] + conn.close() + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Run incremental with no changes + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + assert result.files_changed == 0 + + # Verify node count unchanged + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + count_after_result = conn.execute("MATCH (s:Symbol) RETURN count(*)") + count_after = 0 + if count_after_result.has_next(): + count_after = count_after_result.get_next()[0] + conn.close() + + assert count_after == count_before + + def test_incremental_pass5_6_always_global(self, tmp_path: Path) -> None: + """Change a file unrelated to routes, verify Client/Producer/HTTP_CALLS/ASYNC_CALLS are still fully rebuilt.""" + source_root = tmp_path / "src" + source_root.mkdir() + index_dir = tmp_path / "index" + index_dir.mkdir() + kuzu_path = index_dir / "code_graph.kuzu" + + # Create files + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Initial build + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Initialize hash tracker + tracker = FileHashTracker(index_dir) + ignore = LayeredIgnore(source_root, use_gitignore=False, builtin_patterns=[]) + tracker.detect_changes(source_root, ignore) + tracker.update({"A.java"}, source_root) + tracker.save() + + # Modify A.java + (source_root / "A.java").write_text( + "package pkg; class A { void foo() {} }", encoding="utf-8" + ) + + # Run incremental + from build_ast_graph import incremental_rebuild + result = incremental_rebuild(source_root, kuzu_path, verbose=False) + + assert result.mode == "incremental" + + # Verify graph is still valid (Client/Producer tables exist even if empty) + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + + # Check that Client and Producer node tables exist by querying them + client_result = conn.execute("MATCH (c:Client) RETURN count(*)") + producer_result = conn.execute("MATCH (p:Producer) RETURN count(*)") + assert client_result.has_next() + assert producer_result.has_next() + + conn.close() + + def test_load_existing_types_populates_indexes(self, tmp_path: Path) -> None: + """Build full graph, then load existing types into empty GraphTables, verify types/by_simple_name/by_package populated.""" + from build_ast_graph import _load_existing_types + + source_root = tmp_path / "src" + source_root.mkdir() + kuzu_path = tmp_path / "code_graph.kuzu" + + # Create file + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + + # Build full graph + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Load existing types into empty tables + new_tables = GraphTables() + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + _load_existing_types(conn, new_tables) + conn.close() + + # Verify types loaded + assert "pkg.A" in new_tables.types + assert len(new_tables.by_simple_name.get("A", [])) == 1 + assert len(new_tables.by_package.get("pkg", [])) == 1 + + def test_find_dependents_returns_incoming_edge_sources(self, tmp_path: Path) -> None: + """Seed graph with EXTENDS edge from file B to file A, change file A, verify _find_dependents returns file B's filename.""" + from build_ast_graph import _find_dependents + + source_root = tmp_path / "src" + source_root.mkdir() + kuzu_path = tmp_path / "code_graph.kuzu" + + # Create files + (source_root / "Base.java").write_text("package pkg; class Base {}", encoding="utf-8") + (source_root / "Derived.java").write_text( + "package pkg; class Derived extends Base {}", encoding="utf-8" + ) + + # Build full graph + from build_ast_graph import write_kuzu + tables = GraphTables() + asts = pass1_parse(source_root, tables, verbose=False) + pass2_edges(tables, asts, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Get Base node ID + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + base_result = conn.execute("MATCH (s:Symbol) WHERE s.fqn = 'pkg.Base' RETURN s.id") + base_id = None + if base_result.has_next(): + base_id = base_result.get_next()[0] + + assert base_id is not None + + # Find dependents of Base + dependent_files = _find_dependents(conn, {base_id}) + + # Should include Derived.java + assert "Derived.java" in dependent_files + + conn.close() + + def test_delete_file_scope_removes_only_matching(self, tmp_path: Path) -> None: + """Delete scope for one file, verify other files' nodes/edges untouched.""" + from build_ast_graph import _delete_file_scope + + source_root = tmp_path / "src" + source_root.mkdir() + kuzu_path = tmp_path / "code_graph.kuzu" + + # Create files + (source_root / "A.java").write_text("package pkg; class A {}", encoding="utf-8") + (source_root / "B.java").write_text("package pkg; class B {}", encoding="utf-8") + + # Build full graph + from build_ast_graph import write_kuzu + tables = GraphTables() + pass1_parse(source_root, tables, verbose=False) + write_kuzu(kuzu_path, tables, source_root=source_root, verbose=False) + + # Get node count before + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + conn.execute("MATCH (s:Symbol) RETURN count(*)") + + # Delete only A.java's scope + _delete_file_scope(conn, {"A.java"}) + + # Verify A's nodes are gone but B's remain + a_result = conn.execute("MATCH (s:Symbol) WHERE s.fqn = 'pkg.A' RETURN count(*)") + b_result = conn.execute("MATCH (s:Symbol) WHERE s.fqn = 'pkg.B' RETURN count(*)") + + a_count = 0 + b_count = 0 + if a_result.has_next(): + a_count = a_result.get_next()[0] + if b_result.has_next(): + b_count = b_result.get_next()[0] + + assert a_count == 0 + assert b_count > 0 + + conn.close() diff --git a/tests/test_java_codebase_rag_cli.py b/tests/test_java_codebase_rag_cli.py index 1d67cb77..7e8c5920 100644 --- a/tests/test_java_codebase_rag_cli.py +++ b/tests/test_java_codebase_rag_cli.py @@ -324,6 +324,11 @@ def test_refresh_hidden_alias_deprecates_on_stderr(tmp_path: Path) -> None: def test_increment_emits_kuzu_stale_warning_block( corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: + """Test that increment does NOT emit stale warning by default (new behavior). + + The stale warning is now only emitted with --vectors-only flag. + This test verifies the new default behavior where graph IS updated. + """ idx = tmp_path / "idx_inc" idx.mkdir() monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) @@ -339,9 +344,10 @@ def test_increment_emits_kuzu_stale_warning_block( ) assert rc == 0 err = buf.getvalue() - assert "WARNING: AST graph (Kuzu) incremental rebuild is not yet implemented." in err - assert "java-codebase-rag reprocess" in err - assert cli_mod.KUZU_INCREMENTAL_TRACKING_ISSUE_URL in err + # Should NOT contain old stale warning + assert "WARNING: AST graph (Kuzu) incremental rebuild is not yet implemented." not in err + assert "java-codebase-rag reprocess" not in err + assert cli_mod.KUZU_INCREMENTAL_TRACKING_ISSUE_URL not in err def test_meta_reports_embedding_setting_source(corpus_root: Path, kuzu_db_path: Path) -> None: @@ -392,6 +398,10 @@ def test_init_after_erase_succeeds(corpus_root: Path, tmp_path: Path) -> None: def test_cli_lifecycle_round_trip_init_increment_meta_erase( corpus_root: Path, tmp_path: Path, ) -> None: + """Test lifecycle round-trip: init -> increment -> meta -> erase. + + This test verifies that increment updates both Lance and graph (new behavior). + """ idx = tmp_path / "rt_idx" env = os.environ.copy() env["JAVA_CODEBASE_RAG_INDEX_DIR"] = str(idx) @@ -411,7 +421,10 @@ def test_cli_lifecycle_round_trip_init_increment_meta_erase( env=env, ) assert inc.returncode == 0, inc.stdout + inc.stderr - assert "WARNING: AST graph" in inc.stderr + # Should NOT contain old stale warning (new behavior) + assert "WARNING: AST graph" not in inc.stderr + # Should contain new success message + assert "Lance + graph updated" in inc.stdout meta = _run_cli(["meta", "--source-root", str(corpus_root), "--index-dir", str(idx)], env=env) assert meta.returncode == 0, meta.stderr er = _run_cli( @@ -421,6 +434,116 @@ def test_cli_lifecycle_round_trip_init_increment_meta_erase( assert er.returncode == 0, er.stderr +@pytest.mark.skipif(not _cocoindex_available(), reason="cocoindex not installed in venv") +def test_increment_runs_graph_update( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment updates graph by default (no --vectors-only).""" + idx = tmp_path / "idx_graph_update" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + init_rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert init_rc == 0 + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["increment", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert rc == 0 + # Should NOT contain stale warning + err = buf.getvalue() + assert "WARNING: AST graph" not in err + + +def test_increment_vectors_only_skips_graph( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment --vectors-only emits stale warning and skips graph update.""" + idx = tmp_path / "idx_vectors_only" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + init_rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert init_rc == 0 + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = cli_mod.main( + ["increment", "--vectors-only", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert rc == 0 + err = buf.getvalue() + # Should contain stale warning + assert "WARNING: AST graph (Kuzu) incremental rebuild is not yet implemented." in err + assert "java-codebase-rag reprocess" in err + assert cli_mod.KUZU_INCREMENTAL_TRACKING_ISSUE_URL in err + + +def test_increment_cli_help_mentions_vectors_only( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment --help mentions --vectors-only flag.""" + buf = io.StringIO() + with contextlib.redirect_stdout(buf): + rc = cli_mod.main(["increment", "--help"]) + assert rc == 0 + help_text = buf.getvalue() + assert "--vectors-only" in help_text + assert "Run only cocoindex catch-up" in help_text + + +def test_increment_cli_help_no_longer_says_lance_only( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment --help no longer says 'Lance only'.""" + buf = io.StringIO() + with contextlib.redirect_stdout(buf): + rc = cli_mod.main(["increment", "--help"]) + assert rc == 0 + help_text = buf.getvalue() + # Should NOT say "Lance only" in help + assert "Lance only" not in help_text + # Should say it updates graph + assert "graph" in help_text.lower() + + +def test_increment_first_run_falls_back_to_full( + corpus_root: Path, tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that increment on fresh index (no graph hashes) falls back to full rebuild.""" + idx = tmp_path / "idx_first_run" + idx.mkdir() + monkeypatch.setenv("JAVA_CODEBASE_RAG_INDEX_DIR", str(idx)) + monkeypatch.setenv("JAVA_CODEBASE_RAG_SOURCE_ROOT", str(corpus_root)) + # Run init first + init_rc = cli_mod.main( + ["init", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert init_rc == 0 + # Remove hash file to simulate first run after upgrade + hash_file = idx / ".graph_hashes.json" + if hash_file.exists(): + hash_file.unlink() + buf = io.StringIO() + buf_err = io.StringIO() + with contextlib.redirect_stdout(buf): + with contextlib.redirect_stderr(buf_err): + rc = cli_mod.main( + ["increment", "--source-root", str(corpus_root), "--index-dir", str(idx), "--quiet"], + ) + assert rc == 0 + err = buf_err.getvalue() + # Should fall back to full rebuild gracefully + assert "fell back to full graph rebuild" in err + # Should still succeed + assert "increment completed (Lance + graph updated)" in buf.getvalue() + + + @pytest.mark.skipif(not _cocoindex_available(), reason="cocoindex not installed in venv") def test_increment_updates_lance_after_touch_java_file(corpus_root: Path, tmp_path: Path) -> None: import lancedb # noqa: PLC0415