Skip to content

Commit 0fd6338

Browse files
authored
feat(batch-evaluation): allow passing fields param for efficient trace fetching (#1502)
* feat(batch-evaluation): allow passing fields param for efficient trace fetching * push
1 parent d11155e commit 0fd6338

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

langfuse/_client/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,12 @@
7979
from langfuse._utils.parse_error import handle_fern_exception
8080
from langfuse._utils.prompt_cache import PromptCache
8181
from langfuse.api.resources.commons.errors.error import Error
82+
from langfuse.api.resources.commons.errors.not_found_error import NotFoundError
8283
from langfuse.api.resources.commons.types import DatasetRunWithItems
8384
from langfuse.api.resources.datasets.types import (
8485
DeleteDatasetRunResponse,
8586
PaginatedDatasetRuns,
8687
)
87-
from langfuse.api.resources.commons.errors.not_found_error import NotFoundError
8888
from langfuse.api.resources.ingestion.types.score_body import ScoreBody
8989
from langfuse.api.resources.prompts.types import (
9090
CreatePromptRequest_Chat,
@@ -3096,6 +3096,7 @@ def run_batched_evaluation(
30963096
mapper: MapperFunction,
30973097
filter: Optional[str] = None,
30983098
fetch_batch_size: int = 50,
3099+
fetch_trace_fields: Optional[str] = None,
30993100
max_items: Optional[int] = None,
31003101
max_retries: int = 3,
31013102
evaluators: List[EvaluatorFunction],
@@ -3138,6 +3139,7 @@ def run_batched_evaluation(
31383139
Default: None (fetches all items).
31393140
fetch_batch_size: Number of items to fetch per API call and hold in memory.
31403141
Larger values may be faster but use more memory. Default: 50.
3142+
fetch_trace_fields: Comma-separated list of fields to include when fetching traces. Available field groups: 'core' (always included), 'io' (input, output, metadata), 'scores', 'observations', 'metrics'. If not specified, all fields are returned. Example: 'core,scores,metrics'. Note: Excluded 'observations' or 'scores' fields return empty arrays; excluded 'metrics' returns -1 for 'totalCost' and 'latency'. Only relevant if scope is 'traces'.
31413143
max_items: Maximum total number of items to process. If None, processes all
31423144
items matching the filter. Useful for testing or limiting evaluation runs.
31433145
Default: None (process all).
@@ -3306,6 +3308,7 @@ def composite_evaluator(*, item, evaluations):
33063308
evaluators=evaluators,
33073309
filter=filter,
33083310
fetch_batch_size=fetch_batch_size,
3311+
fetch_trace_fields=fetch_trace_fields,
33093312
max_items=max_items,
33103313
max_concurrency=max_concurrency,
33113314
composite_evaluator=composite_evaluator,

langfuse/batch_evaluation.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,7 @@ async def run_async(
847847
evaluators: List[EvaluatorFunction],
848848
filter: Optional[str] = None,
849849
fetch_batch_size: int = 50,
850+
fetch_trace_fields: Optional[str] = None,
850851
max_items: Optional[int] = None,
851852
max_concurrency: int = 50,
852853
composite_evaluator: Optional[CompositeEvaluatorFunction] = None,
@@ -867,6 +868,7 @@ async def run_async(
867868
evaluators: List of evaluation functions to run on each item.
868869
filter: JSON filter string for querying items.
869870
fetch_batch_size: Number of items to fetch per API call.
871+
fetch_trace_fields: Comma-separated list of fields to include when fetching traces. Available field groups: 'core' (always included), 'io' (input, output, metadata), 'scores', 'observations', 'metrics'. If not specified, all fields are returned. Example: 'core,scores,metrics'. Note: Excluded 'observations' or 'scores' fields return empty arrays; excluded 'metrics' returns -1 for 'totalCost' and 'latency'. Only relevant if scope is 'traces'.
870872
max_items: Maximum number of items to process (None = all).
871873
max_concurrency: Maximum number of concurrent evaluations.
872874
composite_evaluator: Optional function to create composite scores.
@@ -913,6 +915,8 @@ async def run_async(
913915

914916
if verbose:
915917
self._log.info(f"Starting batch evaluation on {scope}")
918+
if scope == "traces" and fetch_trace_fields:
919+
self._log.info(f"Fetching trace fields: {fetch_trace_fields}")
916920
if resume_from:
917921
self._log.info(
918922
f"Resuming from {resume_from.last_processed_timestamp} "
@@ -936,6 +940,7 @@ async def run_async(
936940
page=page,
937941
limit=fetch_batch_size,
938942
max_retries=max_retries,
943+
fields=fetch_trace_fields,
939944
)
940945
except Exception as e:
941946
# Failed after max_retries - create resume token and return
@@ -1115,6 +1120,7 @@ async def _fetch_batch_with_retry(
11151120
page: int,
11161121
limit: int,
11171122
max_retries: int,
1123+
fields: Optional[str],
11181124
) -> List[Union[TraceWithFullDetails, ObservationsView]]:
11191125
"""Fetch a batch of items with retry logic.
11201126
@@ -1125,6 +1131,7 @@ async def _fetch_batch_with_retry(
11251131
limit: Number of items per page.
11261132
max_retries: Maximum number of retry attempts.
11271133
verbose: Whether to log retry attempts.
1134+
fields: Trace fields to fetch
11281135
11291136
Returns:
11301137
List of items from the API.
@@ -1138,6 +1145,7 @@ async def _fetch_batch_with_retry(
11381145
limit=limit,
11391146
filter=filter,
11401147
request_options={"max_retries": max_retries},
1148+
fields=fields,
11411149
) # type: ignore
11421150
return list(response.data) # type: ignore
11431151
elif scope == "observations":

0 commit comments

Comments
 (0)