diff --git a/build_ast_graph.py b/build_ast_graph.py index 8760a904..040ca25b 100644 --- a/build_ast_graph.py +++ b/build_ast_graph.py @@ -2395,7 +2395,8 @@ def _micro_factor(member: MemberEntry | None) -> float: "pass3_unresolved_phantom_receiver INT64, " "pass3_unresolved_chained INT64, " "pass4_exposes_suppressed_feign INT64, " - "cross_service_resolution STRING" + "cross_service_resolution STRING, " + "last_rebuild_mode STRING" ")" ) @@ -2732,6 +2733,1368 @@ def delete_all_for_file( } +# --------------------------------------------------------------------------- +# Incremental rebuild (PR-T3) +# --------------------------------------------------------------------------- + + +def expand_to_closure( + changed_paths: set[str], + deps_index: DepsIndex, +) -> set[str]: + """Expand changed_paths to include all transitively affected files. + + Implements the 8 closure rules from proposal §2.3 using inverted .deps.json + maps. Rule 5 (brownfield-override) forces full rebuild at the caller level, + not here. + """ + if not changed_paths: + return set() + + files = deps_index.files + + # --- build inverse maps --- + # type FQN → file that declares it + declared_by: dict[str, str] = {} + for fp, deps in files.items(): + for fqn in deps.declares: + declared_by[fqn] = fp + + # annotation simple name → set of files using it + anno_users: dict[str, set[str]] = defaultdict(set) + for fp, deps in files.items(): + for anno in deps.uses_anno: + anno_users[anno].add(fp) + + # method/type FQN → set of files calling it (strip method part for type lookup) + callers_of: dict[str, set[str]] = defaultdict(set) + for fp, deps in files.items(): + for callee in deps.calls: + callers_of[callee].add(fp) + # also index the declaring type so type-level changes reach callers + type_fqn = callee.split("#")[0] + if type_fqn: + callers_of[type_fqn].add(fp) + + # type FQN → set of files extending it + extenders_of: dict[str, set[str]] = defaultdict(set) + for fp, deps in files.items(): + for ext in deps.extends: + extenders_of[ext].add(fp) + + # type FQN → set of files injecting it + injectors_of: dict[str, set[str]] = defaultdict(set) + for fp, deps in files.items(): + for inj in deps.injects: + injectors_of[inj].add(fp) + + # method FQN → set of files overriding it + overriders_of: dict[str, set[str]] = defaultdict(set) + for fp, deps in files.items(): + for ov in deps.overrides: + overriders_of[ov].add(fp) + type_fqn = ov.split("#")[0] + if type_fqn: + overriders_of[type_fqn].add(fp) + + # member FQN → file declaring client/producer (inverse of declares_clients/producers) + client_producer_declaring_files: dict[str, str] = {} + for fp, deps in files.items(): + for mfqn in deps.declares_clients + deps.declares_producers: + client_producer_declaring_files[mfqn] = fp + + # --- fixed-point expansion --- + dirty: set[str] = {p for p in changed_paths if p in files} + frontier = set(dirty) + while frontier: + new_frontier: set[str] = set() + for fp in frontier: + deps = files.get(fp) + if deps is None: + continue + + # Rule 1: Inverse-INJECTS — files that inject symbols declared in fp + for fqn in deps.declares: + for other in injectors_of.get(fqn, ()): + if other not in dirty: + new_frontier.add(other) + + # Rule 2: Inverse-EXTENDS / Inverse-IMPLEMENTS — files that extend/implement from fp + for fqn in deps.declares: + for other in extenders_of.get(fqn, ()): + if other not in dirty: + new_frontier.add(other) + + # Rule 3: Inverse-CALLS — files that call symbols declared in fp + for fqn in deps.declares: + for other in callers_of.get(fqn, ()): + if other not in dirty: + new_frontier.add(other) + + # Rule 4: Meta-annotation closure — if fp declares an @interface, dirty its users + for fqn in deps.declares: + simple = fqn.rsplit(".", 1)[-1] + for other in anno_users.get(simple, ()): + if other not in dirty: + new_frontier.add(other) + + # Rule 6: Route resolution — files that extend types from fp (already covered + # by rule 2) plus files whose calls reference methods from fp (already covered + # by rule 3). No additional expansion needed. + + # Rule 7: Inverse-OVERRIDES — files that override methods declared in fp + for fqn in deps.declares: + for other in overriders_of.get(fqn, ()): + if other not in dirty: + new_frontier.add(other) + + # Rule 8: Inverse-DECLARES_CLIENT/PRODUCER — already covered by rule 3 + # (callers of methods in fp). The client/producer nodes are re-emitted + # when their declaring method's file is re-processed. + + # Also: forward deps — if fp injects/extends/calls something, and that + # thing is in another file, that file may need re-processing for resolution. + for fqn in deps.injects + deps.extends + deps.calls: + type_fqn = fqn.split("#")[0] + declaring_file = declared_by.get(type_fqn) + if declaring_file and declaring_file not in dirty: + new_frontier.add(declaring_file) + + # Forward overrides: if fp overrides something in another file + for ov in deps.overrides: + type_fqn = ov.split("#")[0] + declaring_file = declared_by.get(type_fqn) + if declaring_file and declaring_file not in dirty: + new_frontier.add(declaring_file) + + dirty |= new_frontier + frontier = new_frontier + + return dirty + + +def pass1_parse_subset( + root: Path, + dirty: set[str], + *, + verbose: bool, +) -> dict[str, JavaFileAst]: + """Re-parse only dirty files. Returns path -> AST.""" + asts: dict[str, JavaFileAst] = {} + t0 = time.time() + n_files = 0 + if verbose: + _verbose_stderr_line(f"[graph] incremental pass 1 · parsing {len(dirty)} dirty files") + for rel_path in sorted(dirty): + p = root / rel_path + if not p.is_file(): + continue + n_files += 1 + try: + content = p.read_bytes() + except OSError: + continue + if not content.strip(): + continue + try: + ast = parse_java(content, filename=rel_path, verbose=verbose) + except Exception: + continue + asts[rel_path] = ast + if verbose: + elapsed = time.time() - t0 + _verbose_stderr_line( + f"[graph] incremental pass 1 · parsed {n_files} files in {elapsed:.2f}s", + ) + return asts + + +def pass2_edges_subset( + tables: GraphTables, + asts: dict[str, JavaFileAst], + dirty: set[str], + *, + verbose: bool, +) -> None: + """Re-emit EXTENDS/IMPLEMENTS/INJECTS edges for types in dirty files.""" + if verbose: + _verbose_stderr_line("[graph] incremental pass 2 · emitting edges for dirty files") + seen_ext: set[tuple[str, str]] = set() + seen_impl: set[tuple[str, str]] = set() + seen_inj: set[tuple[str, str, str, str]] = set() + for fqn, entry in tables.types.items(): + if entry.file_path not in dirty: + continue + ast = asts.get(entry.file_path) + if ast is None: + continue + _emit_extends_implements(entry, ast, tables, seen_ext=seen_ext, seen_impl=seen_impl) + _emit_injects(entry, ast, tables, seen=seen_inj) + + +def pass3_calls_subset( + tables: GraphTables, + asts: dict[str, JavaFileAst], + dirty: set[str], + *, + verbose: bool, +) -> None: + """Re-emit CALLS + UnresolvedCallSite for dirty caller files.""" + if verbose: + _verbose_stderr_line("[graph] incremental pass 3 · resolving calls for dirty files") + _build_member_indexes(tables) + stats = CallResolutionStats() + for rel_path, file_ast in asts.items(): + if rel_path not in dirty: + continue + try: + _process_file_calls(file_ast, rel_path, tables, stats) + except Exception as e: + log.error("Call extraction failed for %s: %s", rel_path, e) + + +def pass4_routes_subset( + tables: GraphTables, + asts: dict[str, JavaFileAst], + dirty: set[str], + *, + source_root: Path, + verbose: bool, +) -> None: + """Re-emit Route/EXPOSES for methods in dirty files.""" + if verbose: + _verbose_stderr_line("[graph] incremental pass 4 · extracting routes for dirty files") + stats = tables.route_stats + overrides = load_brownfield_overrides(source_root) + try: + prs = str(source_root.resolve()) + except OSError: + prs = str(source_root) + tables.cross_service_resolution = _load_config_cross_service_resolution(prs) + meta_chain = collect_annotation_meta_chain(prs) + + routes_by_id: dict[str, RouteRow] = {} + exposes_seen: set[tuple[str, str]] = set() + http_kinds = frozenset({"http_endpoint", "http_consumer"}) + + for member in sorted(tables.members, key=lambda m: m.node_id): + if member.file_path not in dirty: + continue + if member.decl.is_constructor: + continue + ast = asts.get(member.file_path) + if ast is None: + continue + type_decl = tables.types[member.parent_fqn].decl + final_routes = resolve_routes_for_method( + method_decl=member.decl, + enclosing_type=type_decl, + overrides=overrides, + meta_chain=meta_chain, + builtin_routes=member.decl.routes, + ) + if not final_routes: + continue + for decl in final_routes: + path_template, path_regex = ("", "") + if decl.kind in http_kinds: + if decl.resolved and decl.resolution_strategy in ( + "annotation", + "codebase_route", + ): + path_template, path_regex = _normalize_path(decl.path) + rid = _route_id( + decl.framework, decl.kind, decl.http_method, + path_template, decl.path, decl.topic, decl.broker, + member.microservice, + ) + layer = decl.route_source_layer + if rid not in routes_by_id: + routes_by_id[rid] = RouteRow( + id=rid, kind=decl.kind, framework=decl.framework, + method=decl.http_method, path=decl.path, + path_template=path_template, path_regex=path_regex, + topic=decl.topic, broker=decl.broker, + feign_name=decl.feign_name, feign_url=decl.feign_url, + microservice=member.microservice, module=member.module, + filename=decl.filename, + start_line=decl.start_line, end_line=decl.end_line, + resolved=decl.resolved, source_layer=layer, + ) + else: + prev = routes_by_id[rid] + if _ROUTE_LAYER_RANK.get(layer, 0) > _ROUTE_LAYER_RANK.get( + prev.source_layer, 0, + ): + routes_by_id[rid] = replace(prev, source_layer=layer) + ek = (member.node_id, rid) + if ek not in exposes_seen: + route_kind = routes_by_id[rid].kind + if route_kind == "http_consumer": + stats.exposes_suppressed_feign += 1 + continue + exposes_seen.add(ek) + tables.exposes_rows.append( + ExposesRow( + symbol_id=member.node_id, route_id=rid, + confidence=decl.confidence, + strategy=decl.resolution_strategy, + ), + ) + + tables.routes_rows = sorted(routes_by_id.values(), key=lambda r: r.id) + for row in tables.routes_rows: + stats.by_framework[row.framework] += 1 + stats.by_kind[row.kind] += 1 + n_routes = len(tables.routes_rows) + if n_routes: + stats.routes_resolved_pct = 100.0 * sum( + 1 for r in tables.routes_rows if r.resolved + ) / n_routes + else: + stats.routes_resolved_pct = 100.0 + stats.routes_from_brownfield_pct = 0.0 + by_layer: dict[str, int] = defaultdict(int) + for row in tables.routes_rows: + by_layer[row.source_layer] += 1 + stats.routes_by_layer = dict(sorted(by_layer.items())) + + +def pass5_imperative_edges_subset( + tables: GraphTables, + dirty: set[str], + *, + source_root: Path, + verbose: bool, +) -> None: + """Re-emit Client/Producer/HTTP_CALLS/ASYNC_CALLS for members in dirty files.""" + if verbose: + _verbose_stderr_line("[graph] incremental pass 5 · extracting callers for dirty files") + overrides = load_brownfield_overrides(source_root) + try: + prs = str(source_root.resolve()) + except OSError: + prs = str(source_root) + tables.cross_service_resolution = _load_config_cross_service_resolution(prs) + meta_chain = collect_annotation_meta_chain(prs) + routes_by_id = {r.id: r for r in tables.routes_rows} + existing_route_ids = set(routes_by_id) + http_seen: set[tuple[str, str]] = set() + async_seen: set[tuple[str, str]] = set() + client_seen: set[str] = set() + producer_seen: set[str] = set() + declares_client_seen: set[tuple[str, str]] = set() + declares_producer_seen: set[tuple[str, str]] = set() + route_rows = list(tables.routes_rows) + + def _micro_factor(member: MemberEntry) -> float: + ms = microservice_for_path(member.file_path, source_root) + return 1.0 if ms else 0.85 + + def _append_route(row: RouteRow) -> None: + if row.id in existing_route_ids: + return + existing_route_ids.add(row.id) + routes_by_id[row.id] = row + route_rows.append(row) + + def _phantom_http_route_id(call: OutgoingCallDecl) -> str: + if call.path_template_call and call.method_call: + return _route_id("", "http_endpoint", call.method_call, call.path_template_call, call.path_template_call, "", "", "") + uniq = hashlib.sha1(f"{call.filename}:{call.start_line}:{call.raw_uri}".encode()).hexdigest()[:12] + return f"r:phantom:{uniq}" + + def _phantom_async_route_id(call: OutgoingCallDecl) -> str: + if call.topic_call: + return _route_id("", "kafka_topic", "", "", "", call.topic_call, call.broker_call, "") + uniq = hashlib.sha1(f"{call.filename}:{call.start_line}:{call.raw_topic}".encode()).hexdigest()[:12] + return f"r:phantom:{uniq}" + + for member in sorted(tables.members, key=lambda x: x.node_id): + if member.file_path not in dirty: + continue + if member.decl.is_constructor: + continue + type_decl = tables.types[member.parent_fqn].decl + final_http_calls = resolve_http_client_for_method( + method_decl=member.decl, + enclosing_type=type_decl, + overrides=overrides, + meta_chain=meta_chain, + builtin_calls=member.decl.outgoing_calls, + ) + final_async_calls = resolve_async_producer_for_method( + method_decl=member.decl, + enclosing_type=type_decl, + overrides=overrides, + meta_chain=meta_chain, + builtin_calls=member.decl.outgoing_calls, + ) + micro_factor = _micro_factor(member) + for call in final_http_calls + final_async_calls: + if call.channel == "http": + client_path = (call.path_template_call or "").strip() + client_method = (call.method_call or "").strip().upper() + client_path_template = "" + client_path_regex = "" + if client_path: + client_path_template, client_path_regex = _normalize_path(client_path) + cid = _client_id( + microservice=member.microservice, + member_fqn=call.method_fqn, + client_kind=call.client_kind, + path=client_path, + method=client_method, + ) + if cid not in client_seen: + client_seen.add(cid) + tables.client_rows.append( + ClientRow( + id=cid, client_kind=call.client_kind, + target_service=call.feign_target_name, + path=client_path, + path_template=client_path_template, + path_regex=client_path_regex, + method=client_method, + member_fqn=call.method_fqn, + member_id=member.node_id, + microservice=member.microservice, + module=member.module, + filename=call.filename, + start_line=call.start_line, + end_line=call.end_line, + resolved=call.resolved, + source_layer=_client_source_layer(call.resolution_strategy), + ), + ) + dkey = (member.node_id, cid) + if dkey not in declares_client_seen: + declares_client_seen.add(dkey) + tables.declares_client_rows.append( + DeclaresClientRow( + symbol_id=member.node_id, + client_id=cid, + confidence=call.confidence_base, + strategy=call.resolution_strategy, + ), + ) + rid = "" + strategy = call.resolution_strategy + if call.client_kind == "feign_method": + exposing = next( + (e for e in tables.exposes_rows if e.symbol_id == member.node_id), + None, + ) + if exposing is not None: + rid = exposing.route_id + if not rid: + rid = _phantom_http_route_id(call) + _append_route( + RouteRow( + id=rid, kind="http_endpoint", framework="", + method=call.method_call, path=call.path_template_call, + path_template=call.path_template_call, path_regex="", + topic="", broker="", + feign_name=call.feign_target_name, + feign_url=call.feign_target_url, + microservice="", module="", + filename=call.filename, + start_line=call.start_line, end_line=call.end_line, + resolved=False, source_layer="builtin", + ), + ) + key = (cid, rid) + if key in http_seen: + continue + http_seen.add(key) + conf = call.confidence_base * 0.3 * micro_factor + tables.http_call_rows.append( + HttpCallRow( + client_id=cid, route_id=rid, + confidence=conf, strategy=strategy, + method_call=call.method_call, raw_uri=call.raw_uri, + match="unresolved", + ), + ) + tables.call_edge_stats.http_calls_total += 1 + tables.call_edge_stats.http_calls_by_client_kind[call.client_kind] += 1 + tables.call_edge_stats.http_calls_by_strategy[strategy] += 1 + elif call.channel == "async": + topic_atom = (call.topic_call or "").strip() + pid = _producer_id( + microservice=member.microservice, + member_fqn=call.method_fqn, + producer_kind=call.client_kind, + topic=topic_atom, + ) + if pid not in producer_seen: + producer_seen.add(pid) + tables.producer_rows.append( + ProducerRow( + id=pid, producer_kind=call.client_kind, + topic=topic_atom, broker=call.broker_call, + direction="producer", + member_fqn=call.method_fqn, + member_id=member.node_id, + microservice=member.microservice, + module=member.module, + filename=call.filename, + start_line=call.start_line, + end_line=call.end_line, + resolved=call.resolved, + source_layer=_producer_source_layer(call.resolution_strategy), + ), + ) + dpkey = (member.node_id, pid) + if dpkey not in declares_producer_seen: + declares_producer_seen.add(dpkey) + tables.declares_producer_rows.append( + DeclaresProducerRow( + symbol_id=member.node_id, + producer_id=pid, + confidence=call.confidence_base, + strategy=call.resolution_strategy, + ), + ) + rid = _phantom_async_route_id(call) + _append_route( + RouteRow( + id=rid, kind="kafka_topic", framework="", + method="", path="", path_template="", path_regex="", + topic=call.topic_call, broker=call.broker_call, + feign_name="", feign_url="", + microservice="", module="", + filename=call.filename, + start_line=call.start_line, end_line=call.end_line, + resolved=False, source_layer="builtin", + ), + ) + key = (pid, rid) + if key in async_seen: + continue + async_seen.add(key) + conf = call.confidence_base * 0.3 * micro_factor + strategy = call.resolution_strategy + tables.async_call_rows.append( + AsyncCallRow( + producer_id=pid, route_id=rid, + confidence=conf, strategy=strategy, + direction="producer", raw_topic=call.raw_topic, + match="unresolved", + ), + ) + tables.call_edge_stats.async_calls_total += 1 + tables.call_edge_stats.async_calls_by_client_kind[call.client_kind] += 1 + tables.call_edge_stats.async_calls_by_strategy[strategy] += 1 + + tables.routes_rows = sorted(route_rows, key=lambda r: r.id) + tables.client_rows = sorted(tables.client_rows, key=lambda c: c.id) + tables.declares_client_rows = sorted( + tables.declares_client_rows, + key=lambda e: (e.symbol_id, e.client_id), + ) + tables.client_stats.clients_total = len(tables.client_rows) + tables.client_stats.declares_client_total = len(tables.declares_client_rows) + tables.client_stats.clients_by_kind = defaultdict(int) + for row in tables.client_rows: + tables.client_stats.clients_by_kind[row.client_kind] += 1 + tables.producer_rows = sorted(tables.producer_rows, key=lambda p: p.id) + tables.declares_producer_rows = sorted( + tables.declares_producer_rows, + key=lambda e: (e.symbol_id, e.producer_id), + ) + tables.producer_stats.producers_total = len(tables.producer_rows) + tables.producer_stats.declares_producer_total = len(tables.declares_producer_rows) + tables.producer_stats.producers_by_kind = defaultdict(int) + for row in tables.producer_rows: + tables.producer_stats.producers_by_kind[row.producer_kind] += 1 + + +def _load_remaining_from_db( + conn: kuzu.Connection, + dirty: set[str], +) -> GraphTables: + """Load all remaining (non-dirty) data from DB into a fresh GraphTables.""" + tables = GraphTables() + + # Routes + try: + r = conn.execute( + "MATCH (r:Route) RETURN r.id, r.kind, r.framework, r.method, " + "r.path, r.path_template, r.path_regex, r.topic, r.broker, " + "r.feign_name, r.feign_url, r.microservice, r.module, " + "r.filename, r.start_line, r.end_line, r.resolved" + ) + while r.has_next(): + row = r.get_next() + tables.routes_rows.append(RouteRow( + id=row[0], kind=row[1], framework=row[2], method=row[3], + path=row[4], path_template=row[5], path_regex=row[6], + topic=row[7], broker=row[8], feign_name=row[9], feign_url=row[10], + microservice=row[11], module=row[12], filename=row[13], + start_line=row[14], end_line=row[15], resolved=row[16], + )) + except Exception: + pass + + # Clients + try: + r = conn.execute( + "MATCH (c:Client) RETURN c.id, c.client_kind, c.target_service, " + "c.path, c.path_template, c.path_regex, c.method, " + "c.member_fqn, c.member_id, c.microservice, c.module, " + "c.filename, c.start_line, c.end_line, c.resolved, c.source_layer" + ) + while r.has_next(): + row = r.get_next() + tables.client_rows.append(ClientRow( + id=row[0], client_kind=row[1], target_service=row[2], + path=row[3], path_template=row[4], path_regex=row[5], + method=row[6], member_fqn=row[7], member_id=row[8], + microservice=row[9], module=row[10], filename=row[11], + start_line=row[12], end_line=row[13], resolved=row[14], + source_layer=row[15], + )) + except Exception: + pass + + # Producers + try: + r = conn.execute( + "MATCH (p:Producer) RETURN p.id, p.producer_kind, p.topic, p.broker, " + "p.direction, p.member_fqn, p.member_id, p.microservice, p.module, " + "p.filename, p.start_line, p.end_line, p.resolved, p.source_layer" + ) + while r.has_next(): + row = r.get_next() + tables.producer_rows.append(ProducerRow( + id=row[0], producer_kind=row[1], topic=row[2], broker=row[3], + direction=row[4], member_fqn=row[5], member_id=row[6], + microservice=row[7], module=row[8], filename=row[9], + start_line=row[10], end_line=row[11], resolved=row[12], + source_layer=row[13], + )) + except Exception: + pass + + # HTTP_CALLS + try: + r = conn.execute( + "MATCH (c:Client)-[e:HTTP_CALLS]->(r:Route) " + "RETURN c.id, r.id, e.confidence, e.strategy, " + "e.method_call, e.raw_uri, e.match" + ) + while r.has_next(): + row = r.get_next() + tables.http_call_rows.append(HttpCallRow( + client_id=row[0], route_id=row[1], + confidence=row[2], strategy=row[3], + method_call=row[4], raw_uri=row[5], match=row[6], + )) + except Exception: + pass + + # ASYNC_CALLS + try: + r = conn.execute( + "MATCH (p:Producer)-[e:ASYNC_CALLS]->(r:Route) " + "RETURN p.id, r.id, e.confidence, e.strategy, " + "e.direction, e.raw_topic, e.match" + ) + while r.has_next(): + row = r.get_next() + tables.async_call_rows.append(AsyncCallRow( + producer_id=row[0], route_id=row[1], + confidence=row[2], strategy=row[3], + direction=row[4], raw_topic=row[5], match=row[6], + )) + except Exception: + pass + + # Members (needed for pass6 member_by_id) + try: + r = conn.execute( + "MATCH (s:Symbol) WHERE s.kind = 'method' OR s.kind = 'constructor' " + "RETURN s.id, s.kind, s.name, s.fqn, s.package, " + "s.module, s.microservice, s.filename, " + "s.start_line, s.end_line, s.start_byte, s.end_byte, " + "s.modifiers, s.annotations, s.signature, s.parent_id, s.resolved" + ) + while r.has_next(): + row = r.get_next() + member = MemberEntry( + kind=row[1], + decl=MethodDecl( + name=row[2], + signature=row[14], + start_line=row[8], + end_line=row[9], + start_byte=row[10], + end_byte=row[11], + is_constructor=(row[1] == "constructor"), + modifiers=list(row[12]) if row[12] else [], + annotations=[], + parameters=[], + call_sites=[], + routes=[], + outgoing_calls=[], + local_vars=[], + ), + parent_id=row[15], + parent_fqn=row[3].split("#")[0] if "#" in row[3] else "", + file_path=row[7], + module=row[5], + microservice=row[6], + node_id=row[0], + ) + tables.members.append(member) + except Exception: + pass + + # EXPOSES + try: + r = conn.execute( + "MATCH (s:Symbol)-[e:EXPOSES]->(r:Route) " + "RETURN s.id, r.id, e.confidence, e.strategy" + ) + while r.has_next(): + row = r.get_next() + tables.exposes_rows.append(ExposesRow( + symbol_id=row[0], route_id=row[1], + confidence=row[2], strategy=row[3], + )) + except Exception: + pass + + # DECLARES_CLIENT (for client_hints_by_member in pass6) + try: + r = conn.execute( + "MATCH (s:Symbol)-[e:DECLARES_CLIENT]->(c:Client) " + "RETURN s.id, c.id, e.confidence, e.strategy" + ) + while r.has_next(): + row = r.get_next() + tables.declares_client_rows.append(DeclaresClientRow( + symbol_id=row[0], client_id=row[1], + confidence=row[2], strategy=row[3], + )) + except Exception: + pass + + # DECLARES_PRODUCER (for producer_hints_by_member in pass6) + try: + r = conn.execute( + "MATCH (s:Symbol)-[e:DECLARES_PRODUCER]->(p:Producer) " + "RETURN s.id, p.id, e.confidence, e.strategy" + ) + while r.has_next(): + row = r.get_next() + tables.declares_producer_rows.append(DeclaresProducerRow( + symbol_id=row[0], producer_id=row[1], + confidence=row[2], strategy=row[3], + )) + except Exception: + pass + + return tables + + +def _merge_tables(base: GraphTables, partial: GraphTables) -> GraphTables: + """Merge partial (dirty-file) data into base (remaining from DB). + + Returns a new GraphTables with combined data. Pass6 requires the full set. + """ + merged = GraphTables() + merged.routes_rows = sorted( + base.routes_rows + partial.routes_rows, key=lambda r: r.id, + ) + merged.client_rows = sorted( + base.client_rows + partial.client_rows, key=lambda c: c.id, + ) + merged.producer_rows = sorted( + base.producer_rows + partial.producer_rows, key=lambda p: p.id, + ) + merged.http_call_rows = base.http_call_rows + partial.http_call_rows + merged.async_call_rows = base.async_call_rows + partial.async_call_rows + merged.members = base.members + partial.members + merged.exposes_rows = base.exposes_rows + partial.exposes_rows + merged.declares_client_rows = base.declares_client_rows + partial.declares_client_rows + merged.declares_producer_rows = base.declares_producer_rows + partial.declares_producer_rows + merged.cross_service_resolution = partial.cross_service_resolution or base.cross_service_resolution + return merged + + +def _delete_all_http_async_calls(conn: kuzu.Connection) -> None: + """Delete ALL HTTP_CALLS and ASYNC_CALLS from DB (pre-pass6 rewrite).""" + try: + conn.execute("MATCH (c:Client)-[e:HTTP_CALLS]->(r:Route) DELETE e") + except Exception: + pass + try: + conn.execute("MATCH (p:Producer)-[e:ASYNC_CALLS]->(r:Route) DELETE e") + except Exception: + pass + + +def _write_call_edges_fresh(conn: kuzu.Connection, tables: GraphTables) -> None: + """Write ALL HTTP_CALLS and ASYNC_CALLS from tables (after pass6).""" + for row in tables.http_call_rows: + conn.execute(_CREATE_HTTP_CALL, { + "cid": row.client_id, "rid": row.route_id, + "confidence": row.confidence, "strategy": row.strategy, + "method_call": row.method_call, "raw_uri": row.raw_uri, + "match": row.match, + }) + for row in tables.async_call_rows: + conn.execute(_CREATE_ASYNC_CALL, { + "pid": row.producer_id, "rid": row.route_id, + "confidence": row.confidence, "strategy": row.strategy, + "direction": row.direction, "raw_topic": row.raw_topic, + "match": row.match, + }) + + +def _prune_phantom_routes(conn: kuzu.Connection, tables: GraphTables) -> None: + """Delete phantom routes that pass6 removed from tables.routes_rows.""" + inbound_ids = {r.route_id for r in tables.http_call_rows} | {r.route_id for r in tables.async_call_rows} + surviving = { + r.id for r in tables.routes_rows + if not ( + r.microservice == "" + and r.framework == "" + and not r.resolved + and r.id not in inbound_ids + ) + } + try: + r = conn.execute("MATCH (r:Route) RETURN r.id") + db_ids: set[str] = set() + while r.has_next(): + db_ids.add(r.get_next()[0]) + except Exception: + return + for rid in db_ids - surviving: + # Delete edges first + try: + conn.execute( + "MATCH (s:Symbol)-[e:EXPOSES]->(r:Route {id: $rid}) DELETE e", + {"rid": rid}, + ) + except Exception: + pass + try: + conn.execute( + "MATCH (c:Client)-[e:HTTP_CALLS]->(r:Route {id: $rid}) DELETE e", + {"rid": rid}, + ) + except Exception: + pass + try: + conn.execute( + "MATCH (p:Producer)-[e:ASYNC_CALLS]->(r:Route {id: $rid}) DELETE e", + {"rid": rid}, + ) + except Exception: + pass + try: + conn.execute("MATCH (r:Route {id: $rid}) DELETE r", {"rid": rid}) + except Exception: + pass + + +def _write_meta_incremental( + conn: kuzu.Connection, + source_root: Path, + *, + verbose: bool, +) -> None: + """Write GraphMeta by querying live DB for global stats (incremental mode).""" + + def _count(q: str) -> int: + try: + r = conn.execute(q) + return int(r.get_next()[0]) if r.has_next() else 0 + except Exception: + return 0 + + def _count_json(q: str) -> str: + try: + r = conn.execute(q) + d: dict[str, int] = {} + while r.has_next(): + k, v = r.get_next() + if k: + d[str(k)] = int(v) + return json.dumps(dict(sorted(d.items()))) + except Exception: + return "{}" + + routes_total = _count("MATCH (r:Route) RETURN count(r)") + exposes_total = _count("MATCH ()-[e:EXPOSES]->() RETURN count(e)") + clients_total = _count("MATCH (c:Client) RETURN count(c)") + declares_client_total = _count("MATCH ()-[e:DECLARES_CLIENT]->() RETURN count(e)") + producers_total = _count("MATCH (p:Producer) RETURN count(p)") + declares_producer_total = _count("MATCH ()-[e:DECLARES_PRODUCER]->() RETURN count(e)") + http_calls_total = _count("MATCH ()-[e:HTTP_CALLS]->() RETURN count(e)") + async_calls_total = _count("MATCH ()-[e:ASYNC_CALLS]->() RETURN count(e)") + packages_total = _count("MATCH (s:Symbol) WHERE s.kind = 'package' RETURN count(s)") + files_total = _count("MATCH (s:Symbol) WHERE s.kind = 'file' RETURN count(s)") + types_total = _count( + "MATCH (s:Symbol) WHERE s.kind IN ['class','interface','enum','record','annotation'] RETURN count(s)" + ) + members_total = _count( + "MATCH (s:Symbol) WHERE s.kind IN ['method','constructor'] RETURN count(s)" + ) + phantoms_total = _count("MATCH (s:Symbol) WHERE s.resolved = false RETURN count(s)") + extends_total = _count("MATCH ()-[e:EXTENDS]->() RETURN count(e)") + implements_total = _count("MATCH ()-[e:IMPLEMENTS]->() RETURN count(e)") + injects_total = _count("MATCH ()-[e:INJECTS]->() RETURN count(e)") + declares_total = _count("MATCH ()-[e:DECLARES]->() RETURN count(e)") + overrides_total = _count("MATCH ()-[e:OVERRIDES]->() RETURN count(e)") + calls_total = _count("MATCH ()-[e:CALLS]->() RETURN count(e)") + + routes_fw = _count_json( + "MATCH (r:Route) RETURN r.framework AS k, count(r) AS v" + ) + routes_by_layer = _count_json( + "MATCH (r:Route) WHERE r.source_layer IS NOT NULL RETURN r.source_layer AS k, count(r) AS v" + ) + clients_by_kind = _count_json( + "MATCH (c:Client) RETURN c.client_kind AS k, count(c) AS v" + ) + producers_by_kind = _count_json( + "MATCH (p:Producer) RETURN p.producer_kind AS k, count(p) AS v" + ) + http_by_strategy = _count_json( + "MATCH ()-[e:HTTP_CALLS]->() RETURN e.strategy AS k, count(e) AS v" + ) + async_by_strategy = _count_json( + "MATCH ()-[e:ASYNC_CALLS]->() RETURN e.strategy AS k, count(e) AS v" + ) + http_match_breakdown = _count_json( + "MATCH ()-[e:HTTP_CALLS]->() RETURN e.match AS k, count(e) AS v" + ) + async_match_breakdown = _count_json( + "MATCH ()-[e:ASYNC_CALLS]->() RETURN e.match AS k, count(e) AS v" + ) + + routes_resolved = _count("MATCH (r:Route) WHERE r.resolved = true RETURN count(r)") + routes_resolved_pct = (100.0 * routes_resolved / routes_total) if routes_total else 100.0 + brownfield_routes = _count( + "MATCH (r:Route) WHERE r.source_layer IS NOT NULL AND r.source_layer != 'builtin' RETURN count(r)" + ) + routes_from_brownfield_pct = (100.0 * brownfield_routes / routes_total) if routes_total else 0.0 + + cross_service_total = _count( + "MATCH ()-[e:HTTP_CALLS]->() WHERE e.match = 'cross_service' RETURN count(e)" + ) + _count( + "MATCH ()-[e:ASYNC_CALLS]->() WHERE e.match = 'cross_service' RETURN count(e)" + ) + + http_resolved = _count( + "MATCH ()-[e:HTTP_CALLS]->() WHERE e.strategy != 'unresolved' RETURN count(e)" + ) + http_resolved_pct = float(http_resolved) / float(http_calls_total) if http_calls_total else 0.0 + async_resolved = _count( + "MATCH ()-[e:ASYNC_CALLS]->() WHERE e.strategy != 'unresolved' RETURN count(e)" + ) + async_resolved_pct = float(async_resolved) / float(async_calls_total) if async_calls_total else 0.0 + + http_brownfield = _count( + "MATCH ()-[e:HTTP_CALLS]->() WHERE e.strategy IN " + "['layer_b_ann','layer_a_meta','layer_c_source','layer_b_fqn','codebase_client'] " + "RETURN count(e)" + ) + http_brownfield_pct = (100.0 * http_brownfield / http_calls_total) if http_calls_total else 0.0 + async_brownfield = _count( + "MATCH ()-[e:ASYNC_CALLS]->() WHERE e.strategy IN " + "['layer_b_ann','layer_a_meta','layer_c_source','layer_b_fqn','codebase_producer'] " + "RETURN count(e)" + ) + async_brownfield_pct = (100.0 * async_brownfield / async_calls_total) if async_calls_total else 0.0 + + counts = { + "packages": packages_total, + "files": files_total, + "types": types_total, + "members": members_total, + "phantoms": phantoms_total, + "extends": extends_total, + "implements": implements_total, + "injects": injects_total, + "declares": declares_total, + "overrides": overrides_total, + "calls": calls_total, + "routes": routes_total, + "exposes": exposes_total, + "clients": clients_total, + "declares_client": declares_client_total, + "producers": producers_total, + "declares_producer": declares_producer_total, + "http_calls": http_calls_total, + "async_calls": async_calls_total, + } + + cross_service_resolution = "auto" + try: + r = conn.execute( + "MATCH (m:GraphMeta {key: 'graph'}) RETURN m.cross_service_resolution" + ) + if r.has_next(): + cross_service_resolution = r.get_next()[0] or "auto" + except Exception: + pass + + # Delete old meta before writing new + conn.execute("MATCH (m:GraphMeta {key: 'graph'}) DELETE m") + + conn.execute( + "CREATE (:GraphMeta {key: $k, ontology_version: $ov, built_at: $t, " + "source_root: $sr, counts_json: $cj, parse_errors: $pe, " + "routes_total: $routes_total, exposes_total: $exposes_total, " + "routes_by_framework: $routes_by_framework, routes_resolved_pct: $routes_resolved_pct, " + "routes_from_brownfield_pct: $routes_from_brownfield_pct, routes_by_layer: $routes_by_layer, " + "clients_total: $clients_total, declares_client_total: $declares_client_total, " + "clients_by_kind: $clients_by_kind, " + "producers_total: $producers_total, declares_producer_total: $declares_producer_total, " + "producers_by_kind: $producers_by_kind, " + "http_calls_total: $http_calls_total, async_calls_total: $async_calls_total, " + "http_calls_by_strategy: $http_calls_by_strategy, async_calls_by_strategy: $async_calls_by_strategy, " + "http_calls_resolved_pct: $http_calls_resolved_pct, async_calls_resolved_pct: $async_calls_resolved_pct, " + "http_clients_from_brownfield_pct: $http_clients_from_brownfield_pct, " + "async_producers_from_brownfield_pct: $async_producers_from_brownfield_pct, " + "http_calls_match_breakdown: $http_calls_match_breakdown, " + "async_calls_match_breakdown: $async_calls_match_breakdown, " + "cross_service_calls_total: $cross_service_calls_total, " + "pass3_skipped_cross_service: $pass3_skipped_cross_service, " + "pass3_unresolved_phantom_receiver: $pass3_unresolved_phantom_receiver, " + "pass3_unresolved_chained: $pass3_unresolved_chained, " + "pass4_exposes_suppressed_feign: $pass4_exposes_suppressed_feign, " + "cross_service_resolution: $cross_service_resolution, " + "last_rebuild_mode: $last_rebuild_mode})", + { + "k": "graph", + "ov": ONTOLOGY_VERSION, + "t": int(time.time()), + "sr": str(source_root.resolve()), + "cj": json.dumps(counts), + "pe": 0, + "routes_total": routes_total, + "exposes_total": exposes_total, + "routes_by_framework": routes_fw, + "routes_resolved_pct": routes_resolved_pct, + "routes_from_brownfield_pct": routes_from_brownfield_pct, + "routes_by_layer": routes_by_layer, + "clients_total": clients_total, + "declares_client_total": declares_client_total, + "clients_by_kind": clients_by_kind, + "producers_total": producers_total, + "declares_producer_total": declares_producer_total, + "producers_by_kind": producers_by_kind, + "http_calls_total": http_calls_total, + "async_calls_total": async_calls_total, + "http_calls_by_strategy": http_by_strategy, + "async_calls_by_strategy": async_by_strategy, + "http_calls_resolved_pct": http_resolved_pct, + "async_calls_resolved_pct": async_resolved_pct, + "http_clients_from_brownfield_pct": http_brownfield_pct, + "async_producers_from_brownfield_pct": async_brownfield_pct, + "http_calls_match_breakdown": http_match_breakdown, + "async_calls_match_breakdown": async_match_breakdown, + "cross_service_calls_total": cross_service_total, + "pass3_skipped_cross_service": 0, + "pass3_unresolved_phantom_receiver": 0, + "pass3_unresolved_chained": 0, + "pass4_exposes_suppressed_feign": 0, + "cross_service_resolution": cross_service_resolution, + "last_rebuild_mode": "incremental", + }, + ) + + +def _node_exists(conn: kuzu.Connection, kind: str, node_id: str) -> bool: + try: + r = conn.execute( + f"MATCH (n:{kind} {{id: $id}}) RETURN count(n)", {"id": node_id} + ) + return r.has_next() and int(r.get_next()[0]) > 0 + except Exception: + return False + + +def _write_nodes_incremental( + conn: kuzu.Connection, + tables: GraphTables, + *, + project_root: Path, + meta_chain: dict[str, frozenset[str]] | None, +) -> None: + """Like _write_nodes but skips nodes whose primary key already exists in DB.""" + overrides = load_brownfield_overrides(project_root) + try: + prs = str(project_root.resolve()) + except OSError: + prs = str(project_root) + tables.cross_service_resolution = _load_config_cross_service_resolution(prs) + + # Pre-collect existing IDs to avoid per-query overhead + existing_ids: set[str] = set() + for kind in ("Symbol", "Route", "Client", "Producer", "UnresolvedCallSite"): + try: + r = conn.execute(f"MATCH (n:{kind}) RETURN n.id") + while r.has_next(): + existing_ids.add(r.get_next()[0]) + except Exception: + pass + + mch = meta_chain + for pkg, pid in tables.packages.items(): + if pid not in existing_ids: + conn.execute(_CREATE_SYMBOL, _node_row( + id=pid, kind="package", name=pkg.rsplit(".", 1)[-1], fqn=pkg, package=pkg, + )) + existing_ids.add(pid) + for path, fid in tables.files.items(): + if fid not in existing_ids: + conn.execute(_CREATE_SYMBOL, _node_row( + id=fid, kind="file", name=Path(path).name, fqn=path, filename=path, + )) + existing_ids.add(fid) + for entry in tables.types.values(): + if entry.node_id in existing_ids: + continue + d = entry.decl + role, capabilities = resolve_role_and_capabilities( + d, overrides=overrides, meta_chain=mch, + ) + tables.type_role_by_node_id[entry.node_id] = role + conn.execute(_CREATE_SYMBOL, _node_row( + id=entry.node_id, kind=d.kind, name=d.name, fqn=d.fqn, + package=entry.package, module=entry.module, microservice=entry.microservice, + filename=entry.file_path, + start_line=d.start_line, end_line=d.end_line, + start_byte=d.start_byte, end_byte=d.end_byte, + modifiers=list(d.modifiers), + annotations=[a.name for a in d.annotations], + capabilities=capabilities, + role=role, + signature="", + parent_id=tables.types[entry.outer_fqn].node_id if entry.outer_fqn and entry.outer_fqn in tables.types else "", + )) + existing_ids.add(entry.node_id) + for m in tables.members: + if m.node_id in existing_ids: + continue + conn.execute(_CREATE_SYMBOL, _node_row( + id=m.node_id, kind=m.kind, name=m.decl.name, + fqn=f"{m.parent_fqn}#{m.decl.signature}", + package=tables.types[m.parent_fqn].package if m.parent_fqn in tables.types else "", + module=m.module, microservice=m.microservice, + filename=m.file_path, + start_line=m.decl.start_line, end_line=m.decl.end_line, + start_byte=m.decl.start_byte, end_byte=m.decl.end_byte, + modifiers=list(m.decl.modifiers), + annotations=[a.name for a in m.decl.annotations], + signature=m.decl.signature, parent_id=m.parent_id, + )) + existing_ids.add(m.node_id) + for pid, row in tables.phantoms.items(): + if pid not in existing_ids: + conn.execute(_CREATE_SYMBOL, row) + existing_ids.add(pid) + + +def _write_routes_and_exposes_incremental( + conn: kuzu.Connection, tables: GraphTables, +) -> None: + """Like _write_routes_and_exposes but skips nodes whose PK already exists.""" + existing_route_ids: set[str] = set() + try: + r = conn.execute("MATCH (r:Route) RETURN r.id") + while r.has_next(): + existing_route_ids.add(r.get_next()[0]) + except Exception: + pass + existing_client_ids: set[str] = set() + try: + r = conn.execute("MATCH (c:Client) RETURN c.id") + while r.has_next(): + existing_client_ids.add(r.get_next()[0]) + except Exception: + pass + existing_producer_ids: set[str] = set() + try: + r = conn.execute("MATCH (p:Producer) RETURN p.id") + while r.has_next(): + existing_producer_ids.add(r.get_next()[0]) + except Exception: + pass + + for row in tables.routes_rows: + if row.id not in existing_route_ids: + conn.execute(_CREATE_ROUTE, { + "id": row.id, "kind": row.kind, "framework": row.framework, + "method": row.method, "path": row.path, + "path_template": row.path_template, "path_regex": row.path_regex, + "topic": row.topic, "broker": row.broker, + "feign_name": row.feign_name, "feign_url": row.feign_url, + "microservice": row.microservice, "module": row.module, + "filename": row.filename, + "start_line": row.start_line, "end_line": row.end_line, + "resolved": row.resolved, + }) + for row in tables.exposes_rows: + conn.execute(_CREATE_EXPOSES, { + "sid": row.symbol_id, "rid": row.route_id, + "confidence": row.confidence, "strategy": row.strategy, + }) + for row in tables.client_rows: + if row.id not in existing_client_ids: + conn.execute(_CREATE_CLIENT, asdict(row)) + for row in tables.declares_client_rows: + conn.execute(_CREATE_DECLARES_CLIENT, { + "sid": row.symbol_id, "cid": row.client_id, + "confidence": row.confidence, "strategy": row.strategy, + }) + for row in tables.producer_rows: + if row.id not in existing_producer_ids: + conn.execute(_CREATE_PRODUCER, asdict(row)) + for row in tables.declares_producer_rows: + conn.execute(_CREATE_DECLARES_PRODUCER, { + "sid": row.symbol_id, "pid": row.producer_id, + "confidence": row.confidence, "strategy": row.strategy, + }) + + +def build_ast_graph_incremental( + source_root: Path, + kuzu_path: Path, + changed_paths: set[str], + *, + verbose: bool = False, +) -> str | None: + """Incremental Kuzu rebuild. Returns None on fallback-needed, "incremental" on success.""" + deps_path = kuzu_path.parent / ".deps.json" + deps_index = _read_dependency_index(deps_path) + if deps_index is None: + if verbose: + _verbose_stderr_line("[graph] incremental · .deps.json missing or stale, falling back to full") + return None + + # Heuristic: skip incremental if >50% files are dirty + dirty = expand_to_closure(changed_paths, deps_index) + total = len(deps_index.files) + if total and len(dirty) > 0.5 * total: + if verbose: + _verbose_stderr_line( + f"[graph] incremental · dirty set {len(dirty)}/{total} > 50%, falling back to full" + ) + return None + + if verbose: + _verbose_stderr_line( + f"[graph] incremental · {len(changed_paths)} changed, " + f"{len(dirty)} after closure expansion" + ) + + db = kuzu.Database(str(kuzu_path)) + conn = kuzu.Connection(db) + try: + conn.execute("BEGIN TRANSACTION") + except Exception: + # Kuzu may not support explicit transactions; continue without + pass + + try: + # Delete dirty-file data from DB + if verbose: + _verbose_stderr_line("[graph] incremental · deleting dirty-file data from DB") + for fp in sorted(dirty): + counts = delete_all_for_file(conn, fp) + if verbose: + dirty_counts = {k: v for k, v in counts.items() if v > 0} + if dirty_counts: + _verbose_stderr_line(f" {fp}: {dirty_counts}") + + # Run pass1-5 subset + partial = GraphTables() + asts = pass1_parse_subset(source_root, dirty, verbose=verbose) + + # Register types (pass1 equivalent) + for rel_path, ast in asts.items(): + module = module_for_path(str(source_root / rel_path), source_root) + microservice = microservice_for_path(str(source_root / rel_path), source_root) + file_id = symbol_id("file", rel_path, rel_path, 0) + partial.files[rel_path] = file_id + if ast.package and ast.package not in partial.packages: + partial.packages[ast.package] = symbol_id("package", ast.package, "", 0) + for t in ast.top_level_types: + _register_type( + partial, t, file_path=rel_path, + module=module, microservice=microservice, outer_fqn=None, + ) + + pass2_edges_subset(partial, asts, dirty, verbose=verbose) + pass3_calls_subset(partial, asts, dirty, verbose=verbose) + pass4_routes_subset(partial, asts, dirty, source_root=source_root, verbose=verbose) + pass5_imperative_edges_subset(partial, dirty, source_root=source_root, verbose=verbose) + + # Load remaining non-dirty data from DB for pass6 + remaining = _load_remaining_from_db(conn, dirty) + full = _merge_tables(remaining, partial) + + # Run pass6 globally on full data + pass6_match_edges(full, verbose=verbose) + + # Write partial (dirty-file) data to DB + meta_chain = collect_annotation_meta_chain(str(source_root.resolve())) + _write_nodes_incremental(conn, partial, project_root=source_root, meta_chain=meta_chain) + _populate_declares_rows(partial) + _populate_overrides_rows(partial) + _write_edges(conn, partial) + _write_routes_and_exposes_incremental(conn, partial) + + # Rewrite ALL HTTP_CALLS/ASYNC_CALLS with pass6 outcomes + _delete_all_http_async_calls(conn) + _write_call_edges_fresh(conn, full) + _prune_phantom_routes(conn, full) + + # Write meta (queries live DB for global stats) + _write_meta_incremental(conn, source_root, verbose=verbose) + + # Merge deps: update dirty entries, preserve unchanged + new_deps = _build_file_deps(partial, source_root) + merged_files = dict(deps_index.files) + for fp, deps in new_deps.items(): + merged_files[fp] = deps + merged_index = DepsIndex( + version=_DEPS_VERSION, + ontology_version=ONTOLOGY_VERSION, + files=merged_files, + ) + _write_dependency_index_data(kuzu_path, merged_index) + + try: + conn.execute("COMMIT") + except Exception: + pass + conn.close() + + if verbose: + _verbose_stderr_line("[graph] incremental · done") + return "incremental" + + except Exception: + try: + conn.execute("ROLLBACK") + except Exception: + pass + conn.close() + if verbose: + _verbose_stderr_line("[graph] incremental · failed, falling back to full rebuild") + raise + + def _create_schema(conn: kuzu.Connection) -> None: for stmt in ( _SCHEMA_NODE, @@ -3191,7 +4554,8 @@ def _write_meta(conn: kuzu.Connection, tables: GraphTables, source_root: Path) - "pass3_unresolved_phantom_receiver: $pass3_unresolved_phantom_receiver, " "pass3_unresolved_chained: $pass3_unresolved_chained, " "pass4_exposes_suppressed_feign: $pass4_exposes_suppressed_feign, " - "cross_service_resolution: $cross_service_resolution})", + "cross_service_resolution: $cross_service_resolution, " + "last_rebuild_mode: $last_rebuild_mode})", { "k": "graph", "ov": ONTOLOGY_VERSION, @@ -3227,6 +4591,7 @@ def _write_meta(conn: kuzu.Connection, tables: GraphTables, source_root: Path) - "pass3_unresolved_chained": int(tables.pass3_unresolved_chained), "pass4_exposes_suppressed_feign": int(st.exposes_suppressed_feign), "cross_service_resolution": str(tables.cross_service_resolution), + "last_rebuild_mode": "full", }, ) @@ -3336,10 +4701,20 @@ def _write_dependency_index( ) -> None: """Write sidecar .deps.json alongside the Kuzu database.""" deps = _build_file_deps(tables, source_root) + idx = DepsIndex( + version=_DEPS_VERSION, + ontology_version=ONTOLOGY_VERSION, + files=deps, + ) + _write_dependency_index_data(db_path, idx) + + +def _write_dependency_index_data(db_path: Path, idx: DepsIndex) -> None: + """Write a pre-built DepsIndex to the sidecar .deps.json.""" payload = { - "version": _DEPS_VERSION, - "ontology_version": ONTOLOGY_VERSION, - "files": {fp: asdict(d) for fp, d in sorted(deps.items())}, + "version": idx.version, + "ontology_version": idx.ontology_version, + "files": {fp: asdict(d) for fp, d in sorted(idx.files.items())}, } deps_path = db_path.parent / ".deps.json" tmp = deps_path.with_suffix(".json.tmp") @@ -3455,6 +4830,14 @@ def main() -> int: ), ) parser.add_argument("--verbose", action="store_true") + parser.add_argument( + "--changed-paths", + default=None, + help=( + "Path to a file containing newline-separated changed file paths " + "(internal flag for incremental rebuild)" + ), + ) args = parser.parse_args() root = Path(args.source_root).expanduser().resolve() if args.source_root else Path.cwd().resolve() @@ -3464,6 +4847,35 @@ def main() -> int: kuzu_path = Path(args.kuzu_path).expanduser() if args.kuzu_path else _default_kuzu_path() + if args.changed_paths: + # Incremental rebuild mode + cp_file = Path(args.changed_paths) + if not cp_file.is_file(): + print(f"changed-paths file not found: {cp_file}", file=sys.stderr) + return 2 + changed = set( + line.strip() for line in cp_file.read_text().splitlines() if line.strip() + ) + if not changed: + if args.verbose: + _verbose_stderr_line("[graph] · empty changed-paths, falling back to full rebuild") + # Fall through to full rebuild + else: + try: + result = build_ast_graph_incremental( + root, kuzu_path, changed, verbose=args.verbose, + ) + if result is not None: + if args.verbose: + _verbose_stderr_line(f"[graph] done · incremental · kuzu at {kuzu_path}") + return 0 + if args.verbose: + _verbose_stderr_line("[graph] · incremental declined, falling back to full rebuild") + except Exception as exc: + if args.verbose: + _verbose_stderr_line(f"[graph] · incremental failed ({exc}), falling back to full rebuild") + + # Full rebuild tables = GraphTables() asts = pass1_parse(root, tables, verbose=args.verbose) pass2_edges(tables, asts, verbose=args.verbose) diff --git a/tests/test_incremental_equivalence.py b/tests/test_incremental_equivalence.py index 493b9813..b764b0a9 100644 --- a/tests/test_incremental_equivalence.py +++ b/tests/test_incremental_equivalence.py @@ -1,14 +1,21 @@ -"""PR-T1: Foundation tests — determinism, .deps.json read/write/validate.""" +"""PR-T3: Incremental rebuild equivalence, closure, and fallback tests.""" from __future__ import annotations import json +import shutil from pathlib import Path +import kuzu + from build_ast_graph import ( ONTOLOGY_VERSION, + DepsIndex, FileDeps, + GraphTables, _read_dependency_index, + build_ast_graph_incremental, + expand_to_closure, pass1_parse, pass2_edges, pass3_calls, @@ -18,13 +25,11 @@ write_kuzu, ) -CORPUS = Path(__file__).resolve().parent / "bank-chat-system" +TESTS_DIR = Path(__file__).resolve().parent +CORPUS = TESTS_DIR / "bank-chat-system" def _full_rebuild_into(corpus: Path, db_path: Path) -> Path: - """Run pass1–6 + write_kuzu into db_path; return db_path.""" - from build_ast_graph import GraphTables - tables = GraphTables() asts = pass1_parse(corpus, tables, verbose=False) pass2_edges(tables, asts, verbose=False) @@ -37,23 +42,21 @@ def _full_rebuild_into(corpus: Path, db_path: Path) -> Path: def _dump_node_ids(db_path: Path) -> set[str]: - """Return all Symbol node IDs from a Kuzu database.""" - import kuzu - db = kuzu.Database(str(db_path)) conn = kuzu.Connection(db) - result = conn.execute("MATCH (s:Symbol) RETURN s.id AS id") - ids = set() - while result.has_next(): - ids.add(result.get_next()[0]) + ids: set[str] = set() + for kind in ("Symbol", "Route", "Client", "Producer", "UnresolvedCallSite"): + try: + r = conn.execute(f"MATCH (n:{kind}) RETURN n.id AS id") + while r.has_next(): + ids.add(r.get_next()[0]) + except Exception: + pass conn.close() return ids def _dump_edge_rows(db_path: Path) -> set[tuple[str, ...]]: - """Return edge tuples (src, dst, label) from all relationship tables.""" - import kuzu - db = kuzu.Database(str(db_path)) conn = kuzu.Connection(db) labels = [ @@ -77,80 +80,418 @@ def _dump_edge_rows(db_path: Path) -> set[tuple[str, ...]]: return rows -# ---- PR-T1 tests ---- - - -def test_full_rebuild_is_deterministic(tmp_path: Path) -> None: - """Two full rebuilds on the same corpus produce identical graph state.""" - db_a = tmp_path / "a" / "code_graph.kuzu" - db_b = tmp_path / "b" / "code_graph.kuzu" - _full_rebuild_into(CORPUS, db_a) - _full_rebuild_into(CORPUS, db_b) - - nodes_a = _dump_node_ids(db_a) - nodes_b = _dump_node_ids(db_b) - assert nodes_a == nodes_b, ( - f"Node ID sets differ: {len(nodes_a)} vs {len(nodes_b)}" - ) +def _get_graph_meta(conn: kuzu.Connection) -> dict: + r = conn.execute("MATCH (m:GraphMeta) WHERE m.key = 'graph' RETURN m.*") + if not r.has_next(): + return {} + row = r.get_next() + cols = [d["name"] for d in r.get_column_names()] + # Kuzu returns column names via get_column_names on result + return dict(zip(cols, row)) - edges_a = _dump_edge_rows(db_a) - edges_b = _dump_edge_rows(db_b) - assert edges_a == edges_b, ( - f"Edge sets differ: {len(edges_a)} vs {len(edges_b)}" - ) +def _get_any_java_file(corpus: Path) -> str: + """Return a relative path to any .java file in corpus.""" + for p in sorted(corpus.rglob("*.java")): + try: + return str(p.resolve().relative_to(corpus.resolve())) + except ValueError: + continue + raise FileNotFoundError(f"No .java files in {corpus}") + + +def _copy_corpus(src: Path, dst: Path) -> Path: + """Copy corpus to dst for mutation.""" + shutil.copytree(src, dst, dirs_exist_ok=True) + return dst + + +# ---- Equivalence tests ---- + + +class TestIncrementalEquivalence: + """Incremental rebuild must produce identical graph state to full rebuild.""" + + def _assert_equivalence(self, corpus: Path, tmp_path: Path) -> None: + """Run full rebuild, then incremental with one file 'changed', compare.""" + # Full rebuild baseline + full_db = tmp_path / "full" / "code_graph.kuzu" + _full_rebuild_into(corpus, full_db) + full_nodes = _dump_node_ids(full_db) + full_edges = _dump_edge_rows(full_db) + + # Incremental: pick one file as "changed" + changed_file = _get_any_java_file(corpus) + incr_db = tmp_path / "incr" / "code_graph.kuzu" + _full_rebuild_into(corpus, incr_db) + + result = build_ast_graph_incremental( + corpus, incr_db, {changed_file}, verbose=False, + ) + assert result == "incremental" + + incr_nodes = _dump_node_ids(incr_db) + incr_edges = _dump_edge_rows(incr_db) + assert incr_nodes == full_nodes, ( + f"Node sets differ: {len(incr_nodes)} vs {len(full_nodes)}.\n" + f"Missing: {full_nodes - incr_nodes}\n" + f"Extra: {incr_nodes - full_nodes}" + ) + assert incr_edges == full_edges, ( + f"Edge sets differ: {len(incr_edges)} vs {len(full_edges)}.\n" + f"Missing: {full_edges - incr_edges}\n" + f"Extra: {incr_edges - full_edges}" + ) + + def test_incremental_matches_full_bank_chat_system(self, tmp_path: Path) -> None: + self._assert_equivalence(CORPUS, tmp_path / "bank_chat") + + def test_incremental_matches_full_cross_service_smoke(self, tmp_path: Path) -> None: + self._assert_equivalence( + TESTS_DIR / "fixtures" / "cross_service_smoke", + tmp_path / "cross_service", + ) + + def test_incremental_matches_full_call_graph_smoke(self, tmp_path: Path) -> None: + self._assert_equivalence( + TESTS_DIR / "fixtures" / "call_graph_smoke", + tmp_path / "call_graph", + ) + + def test_incremental_matches_full_http_caller_smoke(self, tmp_path: Path) -> None: + self._assert_equivalence( + TESTS_DIR / "fixtures" / "http_caller_smoke", + tmp_path / "http_caller", + ) + + def test_incremental_matches_full_route_extraction_smoke(self, tmp_path: Path) -> None: + self._assert_equivalence( + TESTS_DIR / "fixtures" / "route_extraction_smoke", + tmp_path / "route_extraction", + ) + + def test_incremental_multiple_files_changed(self, tmp_path: Path) -> None: + # Use cross_service_smoke (smaller corpus) to avoid >50% dirty-set heuristic + corpus = TESTS_DIR / "fixtures" / "cross_service_smoke" + full_db = tmp_path / "full" / "code_graph.kuzu" + _full_rebuild_into(corpus, full_db) + + incr_db = tmp_path / "incr" / "code_graph.kuzu" + _full_rebuild_into(corpus, incr_db) + + # Pick all files from one service (small enough to stay under 50%) + java_files = sorted( + str(p.resolve().relative_to(corpus.resolve())) + for p in corpus.rglob("*.java") + if "svc-a" in str(p) + ) + assert len(java_files) >= 2, f"Expected >= 2 svc-a files, got {java_files}" + + result = build_ast_graph_incremental( + corpus, incr_db, set(java_files), verbose=False, + ) + assert result == "incremental" + + assert _dump_node_ids(incr_db) == _dump_node_ids(full_db) + assert _dump_edge_rows(incr_db) == _dump_edge_rows(full_db) + + +# ---- Fallback tests ---- + + +class TestIncrementalFallback: + def test_incremental_fallback_on_missing_deps_json(self, tmp_path: Path) -> None: + corpus = CORPUS + incr_db = tmp_path / "code_graph.kuzu" + _full_rebuild_into(corpus, incr_db) + # Remove .deps.json + deps = incr_db.parent / ".deps.json" + deps.unlink(missing_ok=True) + result = build_ast_graph_incremental( + corpus, incr_db, {"some/File.java"}, verbose=False, + ) + assert result is None + + def test_incremental_fallback_on_stale_ontology(self, tmp_path: Path) -> None: + corpus = CORPUS + incr_db = tmp_path / "code_graph.kuzu" + _full_rebuild_into(corpus, incr_db) + deps = incr_db.parent / ".deps.json" + data = json.loads(deps.read_text()) + data["ontology_version"] = 0 + deps.write_text(json.dumps(data)) + result = build_ast_graph_incremental( + corpus, incr_db, {"some/File.java"}, verbose=False, + ) + assert result is None + + def test_incremental_fallback_on_large_dirty_set(self, tmp_path: Path) -> None: + corpus = CORPUS + incr_db = tmp_path / "code_graph.kuzu" + _full_rebuild_into(corpus, incr_db) + # Mark >50% of files dirty + deps = incr_db.parent / ".deps.json" + idx = _read_dependency_index(deps) + assert idx is not None + all_files = list(idx.files.keys()) + most_files = set(all_files[: int(len(all_files) * 0.6) + 1]) + result = build_ast_graph_incremental( + corpus, incr_db, most_files, verbose=False, + ) + assert result is None + + +# ---- Closure tests ---- + + +class TestClosureExpansion: + def _make_deps_index(self, files: dict[str, FileDeps]) -> DepsIndex: + return DepsIndex(version=1, ontology_version=ONTOLOGY_VERSION, files=files) + + def test_closure_includes_inverse_injects(self) -> None: + idx = self._make_deps_index({ + "a/Foo.java": FileDeps(declares=["com.example.Foo"]), + "a/Bar.java": FileDeps(injects=["com.example.Foo"]), + }) + dirty = expand_to_closure({"a/Foo.java"}, idx) + assert "a/Bar.java" in dirty + + def test_closure_includes_inverse_extends(self) -> None: + idx = self._make_deps_index({ + "a/Base.java": FileDeps(declares=["com.example.Base"]), + "a/Child.java": FileDeps(extends=["com.example.Base"]), + }) + dirty = expand_to_closure({"a/Base.java"}, idx) + assert "a/Child.java" in dirty + + def test_closure_includes_inverse_calls(self) -> None: + idx = self._make_deps_index({ + "a/Service.java": FileDeps(declares=["com.example.Service"]), + "a/Client.java": FileDeps(calls=["com.example.Service#run()"]), + }) + dirty = expand_to_closure({"a/Service.java"}, idx) + assert "a/Client.java" in dirty + + def test_closure_includes_inverse_overrides(self) -> None: + idx = self._make_deps_index({ + "a/Base.java": FileDeps(declares=["com.example.Base"]), + "a/Impl.java": FileDeps(overrides=["com.example.Base#method()"]), + }) + dirty = expand_to_closure({"a/Base.java"}, idx) + assert "a/Impl.java" in dirty + + def test_closure_includes_meta_annotation(self) -> None: + idx = self._make_deps_index({ + "a/CustomAnno.java": FileDeps(declares=["com.example.CustomAnno"]), + "a/User.java": FileDeps(uses_anno=["CustomAnno"]), + }) + dirty = expand_to_closure({"a/CustomAnno.java"}, idx) + assert "a/User.java" in dirty + + def test_closure_includes_forward_deps(self) -> None: + idx = self._make_deps_index({ + "a/Foo.java": FileDeps( + injects=["com.example.Service"], + declares=["com.example.Foo"], + ), + "a/Service.java": FileDeps(declares=["com.example.Service"]), + }) + dirty = expand_to_closure({"a/Foo.java"}, idx) + assert "a/Service.java" in dirty + + def test_closure_empty_changed_returns_empty(self) -> None: + idx = self._make_deps_index({"a/Foo.java": FileDeps()}) + dirty = expand_to_closure(set(), idx) + assert dirty == set() + + def test_closure_unknown_path_ignored(self) -> None: + idx = self._make_deps_index({"a/Foo.java": FileDeps()}) + dirty = expand_to_closure({"nonexistent/File.java"}, idx) + assert dirty == set() + + +# ---- Incremental deps merge ---- + + +class TestIncrementalDepsMerge: + def test_incremental_deps_json_merge(self, tmp_path: Path) -> None: + corpus = CORPUS + incr_db = tmp_path / "code_graph.kuzu" + _full_rebuild_into(corpus, incr_db) + deps = incr_db.parent / ".deps.json" + + idx_before = _read_dependency_index(deps) + assert idx_before is not None + unchanged_file = next(iter(idx_before.files)) + + changed_file = _get_any_java_file(corpus) + result = build_ast_graph_incremental( + corpus, incr_db, {changed_file}, verbose=False, + ) + assert result == "incremental" + + idx_after = _read_dependency_index(deps) + assert idx_after is not None + # Unchanged file entries preserved + assert unchanged_file in idx_after.files + # Changed file entries updated + assert changed_file in idx_after.files + + +# ---- Pass6 global invariant ---- + + +class TestPass6GlobalInvariant: + def test_incremental_pass6_global_invariant(self, tmp_path: Path) -> None: + corpus = TESTS_DIR / "fixtures" / "cross_service_smoke" + full_db = tmp_path / "full" / "code_graph.kuzu" + _full_rebuild_into(corpus, full_db) + + incr_db = tmp_path / "incr" / "code_graph.kuzu" + _full_rebuild_into(corpus, incr_db) + + changed_file = _get_any_java_file(corpus) + result = build_ast_graph_incremental( + corpus, incr_db, {changed_file}, verbose=False, + ) + assert result == "incremental" + + # Compare HTTP_CALLS and ASYNC_CALLS match outcomes + db_full = kuzu.Database(str(full_db)) + conn_full = kuzu.Connection(db_full) + db_incr = kuzu.Database(str(incr_db)) + conn_incr = kuzu.Connection(db_incr) + + for label in ("HTTP_CALLS", "ASYNC_CALLS"): + full_matches: dict[str, str] = {} + r = conn_full.execute( + f"MATCH (a)-[e:{label}]->(b) RETURN a.id, e.match" + ) + while r.has_next(): + row = r.get_next() + full_matches[row[0]] = row[1] -def test_deps_json_written_on_full_rebuild(tmp_path: Path) -> None: - """After a full rebuild, .deps.json exists and is well-formed.""" - db_path = tmp_path / "code_graph.kuzu" - _full_rebuild_into(CORPUS, db_path) - - deps_path = db_path.parent / ".deps.json" - assert deps_path.is_file(), ".deps.json missing after full rebuild" + incr_matches: dict[str, str] = {} + r = conn_incr.execute( + f"MATCH (a)-[e:{label}]->(b) RETURN a.id, e.match" + ) + while r.has_next(): + row = r.get_next() + incr_matches[row[0]] = row[1] - data = json.loads(deps_path.read_text()) - assert data["version"] == 1 - assert data["ontology_version"] == ONTOLOGY_VERSION - assert isinstance(data["files"], dict) - assert len(data["files"]) > 0 + assert full_matches == incr_matches, ( + f"{label} match outcomes differ: " + f"full={full_matches} vs incr={incr_matches}" + ) + conn_full.close() + conn_incr.close() -def test_deps_json_fields_coverage(tmp_path: Path) -> None: - """Spot-check a known file has expected dependency entries.""" - db_path = tmp_path / "code_graph.kuzu" - _full_rebuild_into(CORPUS, db_path) - deps_path = db_path.parent / ".deps.json" - idx = _read_dependency_index(deps_path) - assert idx is not None +# ---- Meta global stats ---- - # Find ChatIngressController.java — it should have declares, uses_anno, etc. - ctrl_files = [fp for fp in idx.files if "ChatIngressController" in fp] - assert len(ctrl_files) >= 1, "ChatIngressController.java not found in deps index" - fp = ctrl_files[0] - deps = idx.files[fp] - assert isinstance(deps, FileDeps) - assert deps.ext_hash, "ext_hash should be non-empty" - assert len(deps.declares) >= 1, "ChatIngressController should declare at least one type" - assert any( - "Controller" in a or "Mapping" in a - for a in deps.uses_anno - ), f"Expected controller/mapping annotations, got {deps.uses_anno}" +class TestIncrementalMetaStats: + def test_incremental_meta_global_stats(self, tmp_path: Path) -> None: + corpus = CORPUS + full_db = tmp_path / "full" / "code_graph.kuzu" + _full_rebuild_into(corpus, full_db) + incr_db = tmp_path / "incr" / "code_graph.kuzu" + _full_rebuild_into(corpus, incr_db) -def test_deps_json_stale_detection(tmp_path: Path) -> None: - """_read_dependency_index returns None for stale ontology version.""" - deps_path = tmp_path / ".deps.json" - stale = { - "version": 1, - "ontology_version": 0, # intentionally wrong - "files": {}, - } - deps_path.write_text(json.dumps(stale)) - assert _read_dependency_index(deps_path) is None + changed_file = _get_any_java_file(corpus) + result = build_ast_graph_incremental( + corpus, incr_db, {changed_file}, verbose=False, + ) + assert result == "incremental" + # Compare key meta fields + db_full = kuzu.Database(str(full_db)) + conn_full = kuzu.Connection(db_full) + db_incr = kuzu.Database(str(incr_db)) + conn_incr = kuzu.Connection(db_incr) -def test_deps_json_missing_returns_none(tmp_path: Path) -> None: - """_read_dependency_index returns None when file doesn't exist.""" - assert _read_dependency_index(tmp_path / "nonexistent.json") is None + for field in ( + "routes_total", "clients_total", "producers_total", + "http_calls_total", "async_calls_total", + ): + r_full = conn_full.execute( + f"MATCH (m:GraphMeta) WHERE m.key = 'graph' RETURN m.{field}" + ) + val_full = r_full.get_next()[0] if r_full.has_next() else None + r_incr = conn_incr.execute( + f"MATCH (m:GraphMeta) WHERE m.key = 'graph' RETURN m.{field}" + ) + val_incr = r_incr.get_next()[0] if r_incr.has_next() else None + assert val_full == val_incr, f"{field}: full={val_full} vs incr={val_incr}" + + # Check last_rebuild_mode + r = conn_incr.execute( + "MATCH (m:GraphMeta) WHERE m.key = 'graph' RETURN m.last_rebuild_mode" + ) + mode = r.get_next()[0] if r.has_next() else None + assert mode == "incremental" + + r = conn_full.execute( + "MATCH (m:GraphMeta) WHERE m.key = 'graph' RETURN m.last_rebuild_mode" + ) + mode = r.get_next()[0] if r.has_next() else None + assert mode == "full" + + conn_full.close() + conn_incr.close() + + +# ---- CLI flag tests ---- + + +class TestChangedPathsCLI: + def test_changed_paths_cli_flag_valid(self, tmp_path: Path) -> None: + import subprocess + import sys + + corpus = CORPUS + kuzu_path = tmp_path / "code_graph.kuzu" + _full_rebuild_into(corpus, kuzu_path) + + changed_file = _get_any_java_file(corpus) + paths_file = tmp_path / "changed.txt" + paths_file.write_text(changed_file + "\n") + + result = subprocess.run( + [ + sys.executable, "build_ast_graph.py", + "--source-root", str(corpus), + "--kuzu-path", str(kuzu_path), + "--changed-paths", str(paths_file), + ], + capture_output=True, + text=True, + timeout=60, + ) + assert result.returncode == 0 + + def test_changed_paths_cli_flag_empty(self, tmp_path: Path) -> None: + import subprocess + import sys + + corpus = CORPUS + kuzu_path = tmp_path / "code_graph.kuzu" + _full_rebuild_into(corpus, kuzu_path) + + paths_file = tmp_path / "changed.txt" + paths_file.write_text("") + + result = subprocess.run( + [ + sys.executable, "build_ast_graph.py", + "--source-root", str(corpus), + "--kuzu-path", str(kuzu_path), + "--changed-paths", str(paths_file), + ], + capture_output=True, + text=True, + timeout=60, + ) + assert result.returncode == 0