Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 102 additions & 96 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2866,17 +2866,17 @@ async def _process_experiment_item(
}
)

with _propagate_attributes(
experiment=PropagatedExperimentAttributes(
experiment_id=experiment_id,
experiment_name=experiment_run_name,
experiment_metadata=_serialize(experiment_metadata),
experiment_dataset_id=dataset_id,
experiment_item_id=experiment_item_id,
experiment_item_metadata=_serialize(item_metadata),
experiment_item_root_observation_id=span.id,
)
):
propagated_experiment_attributes = PropagatedExperimentAttributes(
experiment_id=experiment_id,
experiment_name=experiment_run_name,
experiment_metadata=_serialize(experiment_metadata),
experiment_dataset_id=dataset_id,
experiment_item_id=experiment_item_id,
experiment_item_metadata=_serialize(item_metadata),
experiment_item_root_observation_id=span.id,
)

with _propagate_attributes(experiment=propagated_experiment_attributes):
output = await _run_task(task, item)

span.update(
Expand All @@ -2891,95 +2891,101 @@ async def _process_experiment_item(
)
raise e

# Run evaluators
evaluations = []

for evaluator in evaluators:
try:
eval_metadata: Optional[Dict[str, Any]] = None
# Run evaluators
evaluations = []

if isinstance(item, dict):
eval_metadata = item.get("metadata")
elif hasattr(item, "metadata"):
eval_metadata = item.metadata
for evaluator in evaluators:
try:
eval_metadata: Optional[Dict[str, Any]] = None

eval_results = await _run_evaluator(
evaluator,
input=input_data,
output=output,
expected_output=expected_output,
metadata=eval_metadata,
)
evaluations.extend(eval_results)

# Store evaluations as scores
for evaluation in eval_results:
self.create_score(
trace_id=trace_id,
observation_id=span.id,
name=evaluation.name,
value=evaluation.value, # type: ignore
comment=evaluation.comment,
metadata=evaluation.metadata,
config_id=evaluation.config_id,
data_type=evaluation.data_type, # type: ignore
)

except Exception as e:
langfuse_logger.error(f"Evaluator failed: {e}")

# Run composite evaluator if provided and we have evaluations
if composite_evaluator and evaluations:
try:
composite_eval_metadata: Optional[Dict[str, Any]] = None
if isinstance(item, dict):
composite_eval_metadata = item.get("metadata")
elif hasattr(item, "metadata"):
composite_eval_metadata = item.metadata
if isinstance(item, dict):
eval_metadata = item.get("metadata")
elif hasattr(item, "metadata"):
eval_metadata = item.metadata

result = composite_evaluator(
input=input_data,
output=output,
expected_output=expected_output,
metadata=composite_eval_metadata,
evaluations=evaluations,
)

# Handle async composite evaluators
if asyncio.iscoroutine(result):
result = await result

# Normalize to list
composite_evals: List[Evaluation] = []
if isinstance(result, (dict, Evaluation)):
composite_evals = [result] # type: ignore
elif isinstance(result, list):
composite_evals = result # type: ignore

# Store composite evaluations as scores and add to evaluations list
for composite_evaluation in composite_evals:
self.create_score(
trace_id=trace_id,
observation_id=span.id,
name=composite_evaluation.name,
value=composite_evaluation.value, # type: ignore
comment=composite_evaluation.comment,
metadata=composite_evaluation.metadata,
config_id=composite_evaluation.config_id,
data_type=composite_evaluation.data_type, # type: ignore
)
evaluations.append(composite_evaluation)

except Exception as e:
langfuse_logger.error(f"Composite evaluator failed: {e}")
with _propagate_attributes(
experiment=propagated_experiment_attributes
):
eval_results = await _run_evaluator(
evaluator,
input=input_data,
output=output,
expected_output=expected_output,
metadata=eval_metadata,
)
evaluations.extend(eval_results)

# Store evaluations as scores
for evaluation in eval_results:
self.create_score(
trace_id=trace_id,
observation_id=span.id,
name=evaluation.name,
value=evaluation.value, # type: ignore
comment=evaluation.comment,
metadata=evaluation.metadata,
config_id=evaluation.config_id,
data_type=evaluation.data_type, # type: ignore
)

except Exception as e:
langfuse_logger.error(f"Evaluator failed: {e}")

# Run composite evaluator if provided and we have evaluations
if composite_evaluator and evaluations:
try:
composite_eval_metadata: Optional[Dict[str, Any]] = None
if isinstance(item, dict):
composite_eval_metadata = item.get("metadata")
elif hasattr(item, "metadata"):
composite_eval_metadata = item.metadata

with _propagate_attributes(
experiment=propagated_experiment_attributes
):
result = composite_evaluator(
input=input_data,
output=output,
expected_output=expected_output,
metadata=composite_eval_metadata,
evaluations=evaluations,
)

return ExperimentItemResult(
item=item,
output=output,
evaluations=evaluations,
trace_id=trace_id,
dataset_run_id=dataset_run_id,
)
# Handle async composite evaluators
if asyncio.iscoroutine(result):
result = await result

# Normalize to list
composite_evals: List[Evaluation] = []
if isinstance(result, (dict, Evaluation)):
composite_evals = [result] # type: ignore
elif isinstance(result, list):
composite_evals = result # type: ignore

# Store composite evaluations as scores and add to evaluations list
for composite_evaluation in composite_evals:
self.create_score(
trace_id=trace_id,
observation_id=span.id,
name=composite_evaluation.name,
value=composite_evaluation.value, # type: ignore
comment=composite_evaluation.comment,
metadata=composite_evaluation.metadata,
config_id=composite_evaluation.config_id,
data_type=composite_evaluation.data_type, # type: ignore
)
evaluations.append(composite_evaluation)

except Exception as e:
langfuse_logger.error(f"Composite evaluator failed: {e}")

return ExperimentItemResult(
item=item,
output=output,
evaluations=evaluations,
trace_id=trace_id,
dataset_run_id=dataset_run_id,
)

def _create_experiment_run_name(
self, *, name: Optional[str] = None, run_name: Optional[str] = None
Expand Down
2 changes: 1 addition & 1 deletion tests/test_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ def test_prompt_end_to_end():
@pytest.fixture
def langfuse():
from langfuse._client.resource_manager import LangfuseResourceManager

langfuse_instance = Langfuse()
langfuse_instance.api = Mock()

Expand Down
Loading