Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20260220214632816094.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "generate_text_embeddings streaming"
}
39 changes: 9 additions & 30 deletions docs/examples_notebooks/index_migration_to_v1.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -205,44 +205,23 @@
"metadata": {},
"outputs": [],
"source": [
"from graphrag.cache.factory import CacheFactory\n",
"from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks\n",
"from graphrag.index.workflows.generate_text_embeddings import generate_text_embeddings\n",
"from graphrag.language_model.manager import ModelManager\n",
"from graphrag.tokenizer.get_tokenizer import get_tokenizer\n",
"from graphrag_cache import create_cache\n",
"\n",
"# We only need to re-run the embeddings workflow, to ensure that embeddings for all required search fields are in place\n",
"# We'll construct the context and run this function flow directly to avoid everything else\n",
"# We only need to re-run the embeddings workflow, to ensure that embeddings\n",
"# for all required search fields are in place.\n",
"# We pass in the table_provider created earlier so that generate_text_embeddings\n",
"# reads the migrated tables we just wrote.\n",
"\n",
"model_config = config.get_language_model_config(config.embed_text.model_id)\n",
"callbacks = NoopWorkflowCallbacks()\n",
"cache_config = config.cache.model_dump() # type: ignore\n",
"cache = CacheFactory().create_cache(\n",
" cache_type=cache_config[\"type\"], # type: ignore\n",
" **cache_config,\n",
")\n",
"model = ModelManager().get_or_create_embedding_model(\n",
" name=\"text_embedding\",\n",
" model_type=model_config.type,\n",
" config=model_config,\n",
" callbacks=callbacks,\n",
" cache=cache,\n",
")\n",
"\n",
"tokenizer = get_tokenizer(model_config)\n",
"cache = create_cache(config.cache)\n",
"\n",
"await generate_text_embeddings(\n",
" text_units=final_text_units,\n",
" entities=final_entities,\n",
" community_reports=final_community_reports,\n",
" config=config,\n",
" table_provider=table_provider,\n",
" cache=cache,\n",
" callbacks=callbacks,\n",
" model=model,\n",
" tokenizer=tokenizer,\n",
" batch_size=config.embed_text.batch_size,\n",
" batch_max_tokens=config.embed_text.batch_max_tokens,\n",
" num_threads=model_config.concurrent_requests,\n",
" vector_store_config=config.vector_store,\n",
" embedded_fields=config.embed_text.names,\n",
")"
]
}
Expand Down
10 changes: 10 additions & 0 deletions packages/graphrag/graphrag/data_model/row_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ def transform_entity_row(row: dict[str, Any]) -> dict[str, Any]:
return row


def transform_entity_row_for_embedding(
row: dict[str, Any],
) -> dict[str, Any]:
"""Add a title_description column for embedding generation."""
title = row.get("title") or ""
description = row.get("description") or ""
row["title_description"] = f"{title}:{description}"
return row


# -- relationships (mirrors relationships_typed) --------------------------


Expand Down
147 changes: 102 additions & 45 deletions packages/graphrag/graphrag/index/operations/embed_text/embed_text.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Copyright (c) 2024 Microsoft Corporation.
# Copyright (C) 2026 Microsoft
# Licensed under the MIT License

"""A module containing embed_text method definition."""
"""Streaming text embedding operation."""

import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import numpy as np
import pandas as pd
from graphrag_llm.tokenizer import Tokenizer
from graphrag_storage.tables.table import Table
from graphrag_vectors import VectorStore, VectorStoreDocument

from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
Expand All @@ -21,7 +21,7 @@


async def embed_text(
input: pd.DataFrame,
input_table: Table,
callbacks: WorkflowCallbacks,
model: "LLMEmbedding",
tokenizer: Tokenizer,
Expand All @@ -31,59 +31,116 @@ async def embed_text(
num_threads: int,
vector_store: VectorStore,
id_column: str = "id",
):
"""Embed a piece of text into a vector space. The operation outputs a new column containing a mapping between doc_id and vector."""
if embed_column not in input.columns:
msg = f"Column {embed_column} not found in input dataframe with columns {input.columns}"
raise ValueError(msg)
if id_column not in input.columns:
msg = f"Column {id_column} not found in input dataframe with columns {input.columns}"
raise ValueError(msg)

output_table: Table | None = None,
) -> int:
"""Embed text from a streaming Table into a vector store."""
vector_store.create_index()

index = 0
buffer: list[dict[str, Any]] = []
total_rows = 0

all_results = []
async for row in input_table:
text = row.get(embed_column)
if text is None:
text = ""

num_total_batches = (input.shape[0] + batch_size - 1) // batch_size
while batch_size * index < input.shape[0]:
logger.info(
"uploading text embeddings batch %d/%d of size %d to vector store",
index + 1,
num_total_batches,
batch_size,
)
batch = input.iloc[batch_size * index : batch_size * (index + 1)]
texts: list[str] = batch[embed_column].tolist()
ids: list[str] = batch[id_column].tolist()
result = await run_embed_text(
texts,
buffer.append({
id_column: row[id_column],
embed_column: text,
})

if len(buffer) >= batch_size:
total_rows += await _flush_embedding_buffer(
buffer,
embed_column,
id_column,
callbacks,
model,
tokenizer,
batch_size,
batch_max_tokens,
num_threads,
vector_store,
output_table,
)
buffer.clear()

if buffer:
total_rows += await _flush_embedding_buffer(
buffer,
embed_column,
id_column,
callbacks,
model,
tokenizer,
batch_size,
batch_max_tokens,
num_threads,
vector_store,
output_table,
)
if result.embeddings:
embeddings = [
embedding for embedding in result.embeddings if embedding is not None
]
all_results.extend(embeddings)

vectors = result.embeddings or []
documents: list[VectorStoreDocument] = []
for doc_id, doc_vector in zip(ids, vectors, strict=True):
if type(doc_vector) is np.ndarray:
doc_vector = doc_vector.tolist()
document = VectorStoreDocument(

return total_rows


async def _flush_embedding_buffer(
buffer: list[dict[str, Any]],
embed_column: str,
id_column: str,
callbacks: WorkflowCallbacks,
model: "LLMEmbedding",
tokenizer: Tokenizer,
batch_size: int,
batch_max_tokens: int,
num_threads: int,
vector_store: VectorStore,
output_table: Table | None,
) -> int:
"""Embed a buffer of rows and load results into the vector store."""
texts: list[str] = [row[embed_column] for row in buffer]
ids: list[str] = [row[id_column] for row in buffer]

result = await run_embed_text(
texts,
callbacks,
model,
tokenizer,
batch_size,
batch_max_tokens,
num_threads,
)

vectors = result.embeddings or []
skipped = 0
documents: list[VectorStoreDocument] = []
for doc_id, doc_vector in zip(ids, vectors, strict=True):
if doc_vector is None:
skipped += 1
continue
if type(doc_vector) is np.ndarray:
doc_vector = doc_vector.tolist()
documents.append(
VectorStoreDocument(
id=doc_id,
vector=doc_vector,
)
documents.append(document)
)

vector_store.load_documents(documents)

if skipped > 0:
logger.warning(
"Skipped %d rows with None embeddings out of %d",
skipped,
len(buffer),
)

vector_store.load_documents(documents)
index += 1
if output_table is not None:
for doc_id, doc_vector in zip(ids, vectors, strict=True):
if doc_vector is None:
continue
if type(doc_vector) is np.ndarray:
doc_vector = doc_vector.tolist()
await output_table.write({"id": doc_id, "embedding": doc_vector})

return all_results
return len(buffer)
Loading