Skip to content

Commit efcdd95

Browse files
committed
unused async fixes
1 parent 6abd9b6 commit efcdd95

File tree

2 files changed

+24
-68
lines changed

2 files changed

+24
-68
lines changed

packages/graphrag/graphrag/index/operations/build_noun_graph/build_noun_graph.py

Lines changed: 24 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
"""Graph extraction using NLP."""
55

6-
import asyncio
76
import logging
87
from collections import defaultdict
98
from itertools import combinations
@@ -12,7 +11,6 @@
1211
from graphrag_cache import Cache
1312
from graphrag_storage.tables.table import Table
1413

15-
from graphrag.config.enums import AsyncType
1614
from graphrag.graphs.edge_weights import calculate_pmi_edge_weights
1715
from graphrag.index.operations.build_noun_graph.np_extractors.base import (
1816
BaseNounPhraseExtractor,
@@ -26,16 +24,12 @@ async def build_noun_graph(
2624
text_unit_table: Table,
2725
text_analyzer: BaseNounPhraseExtractor,
2826
normalize_edge_weights: bool,
29-
num_threads: int,
30-
async_mode: AsyncType,
3127
cache: Cache,
3228
) -> tuple[pd.DataFrame, pd.DataFrame]:
3329
"""Build a noun graph from text units."""
3430
title_to_ids = await _extract_nodes(
3531
text_unit_table,
3632
text_analyzer,
37-
num_threads=num_threads,
38-
async_mode=async_mode,
3933
cache=cache,
4034
)
4135

@@ -62,73 +56,42 @@ async def build_noun_graph(
6256
async def _extract_nodes(
6357
text_unit_table: Table,
6458
text_analyzer: BaseNounPhraseExtractor,
65-
num_threads: int,
66-
async_mode: AsyncType,
6759
cache: Cache,
6860
) -> dict[str, list[str]]:
6961
"""Extract noun-phrase nodes from text units.
7062
63+
NLP extraction is CPU-bound (spaCy/TextBlob), so threading
64+
provides no benefit under the GIL. We process rows
65+
sequentially, relying on the cache to skip repeated work.
66+
7167
Returns a mapping of noun-phrase title to text-unit ids.
7268
"""
7369
extraction_cache = cache.child("extract_noun_phrases")
74-
semaphore = asyncio.Semaphore(num_threads)
75-
use_threads = async_mode == AsyncType.Threaded
76-
77-
async def _extract_one(
78-
text_unit_id: str,
79-
text: str,
80-
) -> tuple[str, list[str]]:
81-
"""Return ``(text_unit_id, noun_phrases)`` for one row."""
82-
async with semaphore:
83-
attrs = {"text": text, "analyzer": str(text_analyzer)}
84-
key = gen_sha512_hash(attrs, attrs.keys())
85-
result = await extraction_cache.get(key)
86-
if not result:
87-
if use_threads:
88-
result = await asyncio.to_thread(
89-
text_analyzer.extract,
90-
text,
91-
)
92-
else:
93-
result = text_analyzer.extract(text)
94-
await extraction_cache.set(key, result)
95-
return (text_unit_id, result)
96-
9770
total = await text_unit_table.length()
9871
title_to_ids: dict[str, list[str]] = defaultdict(list)
9972
completed = 0
100-
chunk_size = num_threads * 4
101-
chunk: list[asyncio.Task[tuple[str, list[str]]]] = []
102-
103-
async def _drain(
104-
tasks: list[asyncio.Task[tuple[str, list[str]]]],
105-
) -> None:
106-
"""Await every task in the chunk and accumulate results."""
107-
nonlocal completed
108-
for coro in asyncio.as_completed(tasks):
109-
text_unit_id, noun_phrases = await coro
110-
completed += 1
111-
if completed % 100 == 0 or completed == total:
112-
logger.info(
113-
"extract noun phrases progress: %d/%d",
114-
completed,
115-
total,
116-
)
117-
for phrase in noun_phrases:
118-
title_to_ids[phrase].append(text_unit_id)
11973

12074
async for row in text_unit_table:
121-
chunk.append(
122-
asyncio.create_task(
123-
_extract_one(row["id"], row["text"]),
124-
),
125-
)
126-
if len(chunk) >= chunk_size:
127-
await _drain(chunk)
128-
chunk.clear()
129-
130-
if chunk:
131-
await _drain(chunk)
75+
text_unit_id = row["id"]
76+
text = row["text"]
77+
78+
attrs = {"text": text, "analyzer": str(text_analyzer)}
79+
key = gen_sha512_hash(attrs, attrs.keys())
80+
result = await extraction_cache.get(key)
81+
if not result:
82+
result = text_analyzer.extract(text)
83+
await extraction_cache.set(key, result)
84+
85+
for phrase in result:
86+
title_to_ids[phrase].append(text_unit_id)
87+
88+
completed += 1
89+
if completed % 100 == 0 or completed == total:
90+
logger.info(
91+
"extract noun phrases progress: %d/%d",
92+
completed,
93+
total,
94+
)
13295

13396
return dict(title_to_ids)
13497

packages/graphrag/graphrag/index/workflows/extract_graph_nlp.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from graphrag_cache import Cache
1111
from graphrag_storage.tables.table import Table
1212

13-
from graphrag.config.enums import AsyncType
1413
from graphrag.config.models.graph_rag_config import GraphRagConfig
1514
from graphrag.index.operations.build_noun_graph.build_noun_graph import (
1615
build_noun_graph,
@@ -53,8 +52,6 @@ async def run_workflow(
5352
relationships_table=relationships_table,
5453
text_analyzer=text_analyzer,
5554
normalize_edge_weights=(config.extract_graph_nlp.normalize_edge_weights),
56-
num_threads=config.extract_graph_nlp.concurrent_requests,
57-
async_type=config.extract_graph_nlp.async_mode,
5855
)
5956

6057
logger.info("Workflow completed: extract_graph_nlp")
@@ -68,16 +65,12 @@ async def extract_graph_nlp(
6865
relationships_table: Table,
6966
text_analyzer: BaseNounPhraseExtractor,
7067
normalize_edge_weights: bool,
71-
num_threads: int,
72-
async_type: AsyncType,
7368
) -> dict[str, list[dict[str, Any]]]:
7469
"""Extract noun-phrase graph and stream results to output tables."""
7570
extracted_nodes, extracted_edges = await build_noun_graph(
7671
text_units_table,
7772
text_analyzer=text_analyzer,
7873
normalize_edge_weights=normalize_edge_weights,
79-
num_threads=num_threads,
80-
async_mode=async_type,
8174
cache=cache,
8275
)
8376

0 commit comments

Comments
 (0)