@@ -2866,17 +2866,17 @@ async def _process_experiment_item(
28662866 }
28672867 )
28682868
2869- with _propagate_attributes (
2870- experiment = PropagatedExperimentAttributes (
2871- experiment_id = experiment_id ,
2872- experiment_name = experiment_run_name ,
2873- experiment_metadata = _serialize ( experiment_metadata ) ,
2874- experiment_dataset_id = dataset_id ,
2875- experiment_item_id = experiment_item_id ,
2876- experiment_item_metadata = _serialize ( item_metadata ) ,
2877- experiment_item_root_observation_id = span . id ,
2878- )
2879- ):
2869+ propagated_experiment_attributes = PropagatedExperimentAttributes (
2870+ experiment_id = experiment_id ,
2871+ experiment_name = experiment_run_name ,
2872+ experiment_metadata = _serialize ( experiment_metadata ) ,
2873+ experiment_dataset_id = dataset_id ,
2874+ experiment_item_id = experiment_item_id ,
2875+ experiment_item_metadata = _serialize ( item_metadata ) ,
2876+ experiment_item_root_observation_id = span . id ,
2877+ )
2878+
2879+ with _propagate_attributes ( experiment = propagated_experiment_attributes ):
28802880 output = await _run_task (task , item )
28812881
28822882 span .update (
@@ -2891,95 +2891,101 @@ async def _process_experiment_item(
28912891 )
28922892 raise e
28932893
2894- # Run evaluators
2895- evaluations = []
2896-
2897- for evaluator in evaluators :
2898- try :
2899- eval_metadata : Optional [Dict [str , Any ]] = None
2894+ # Run evaluators
2895+ evaluations = []
29002896
2901- if isinstance (item , dict ):
2902- eval_metadata = item .get ("metadata" )
2903- elif hasattr (item , "metadata" ):
2904- eval_metadata = item .metadata
2897+ for evaluator in evaluators :
2898+ try :
2899+ eval_metadata : Optional [Dict [str , Any ]] = None
29052900
2906- eval_results = await _run_evaluator (
2907- evaluator ,
2908- input = input_data ,
2909- output = output ,
2910- expected_output = expected_output ,
2911- metadata = eval_metadata ,
2912- )
2913- evaluations .extend (eval_results )
2914-
2915- # Store evaluations as scores
2916- for evaluation in eval_results :
2917- self .create_score (
2918- trace_id = trace_id ,
2919- observation_id = span .id ,
2920- name = evaluation .name ,
2921- value = evaluation .value , # type: ignore
2922- comment = evaluation .comment ,
2923- metadata = evaluation .metadata ,
2924- config_id = evaluation .config_id ,
2925- data_type = evaluation .data_type , # type: ignore
2926- )
2927-
2928- except Exception as e :
2929- langfuse_logger .error (f"Evaluator failed: { e } " )
2930-
2931- # Run composite evaluator if provided and we have evaluations
2932- if composite_evaluator and evaluations :
2933- try :
2934- composite_eval_metadata : Optional [Dict [str , Any ]] = None
2935- if isinstance (item , dict ):
2936- composite_eval_metadata = item .get ("metadata" )
2937- elif hasattr (item , "metadata" ):
2938- composite_eval_metadata = item .metadata
2901+ if isinstance (item , dict ):
2902+ eval_metadata = item .get ("metadata" )
2903+ elif hasattr (item , "metadata" ):
2904+ eval_metadata = item .metadata
29392905
2940- result = composite_evaluator (
2941- input = input_data ,
2942- output = output ,
2943- expected_output = expected_output ,
2944- metadata = composite_eval_metadata ,
2945- evaluations = evaluations ,
2946- )
2947-
2948- # Handle async composite evaluators
2949- if asyncio .iscoroutine (result ):
2950- result = await result
2951-
2952- # Normalize to list
2953- composite_evals : List [Evaluation ] = []
2954- if isinstance (result , (dict , Evaluation )):
2955- composite_evals = [result ] # type: ignore
2956- elif isinstance (result , list ):
2957- composite_evals = result # type: ignore
2958-
2959- # Store composite evaluations as scores and add to evaluations list
2960- for composite_evaluation in composite_evals :
2961- self .create_score (
2962- trace_id = trace_id ,
2963- observation_id = span .id ,
2964- name = composite_evaluation .name ,
2965- value = composite_evaluation .value , # type: ignore
2966- comment = composite_evaluation .comment ,
2967- metadata = composite_evaluation .metadata ,
2968- config_id = composite_evaluation .config_id ,
2969- data_type = composite_evaluation .data_type , # type: ignore
2970- )
2971- evaluations .append (composite_evaluation )
2972-
2973- except Exception as e :
2974- langfuse_logger .error (f"Composite evaluator failed: { e } " )
2906+ with _propagate_attributes (
2907+ experiment = propagated_experiment_attributes
2908+ ):
2909+ eval_results = await _run_evaluator (
2910+ evaluator ,
2911+ input = input_data ,
2912+ output = output ,
2913+ expected_output = expected_output ,
2914+ metadata = eval_metadata ,
2915+ )
2916+ evaluations .extend (eval_results )
2917+
2918+ # Store evaluations as scores
2919+ for evaluation in eval_results :
2920+ self .create_score (
2921+ trace_id = trace_id ,
2922+ observation_id = span .id ,
2923+ name = evaluation .name ,
2924+ value = evaluation .value , # type: ignore
2925+ comment = evaluation .comment ,
2926+ metadata = evaluation .metadata ,
2927+ config_id = evaluation .config_id ,
2928+ data_type = evaluation .data_type , # type: ignore
2929+ )
2930+
2931+ except Exception as e :
2932+ langfuse_logger .error (f"Evaluator failed: { e } " )
2933+
2934+ # Run composite evaluator if provided and we have evaluations
2935+ if composite_evaluator and evaluations :
2936+ try :
2937+ composite_eval_metadata : Optional [Dict [str , Any ]] = None
2938+ if isinstance (item , dict ):
2939+ composite_eval_metadata = item .get ("metadata" )
2940+ elif hasattr (item , "metadata" ):
2941+ composite_eval_metadata = item .metadata
2942+
2943+ with _propagate_attributes (
2944+ experiment = propagated_experiment_attributes
2945+ ):
2946+ result = composite_evaluator (
2947+ input = input_data ,
2948+ output = output ,
2949+ expected_output = expected_output ,
2950+ metadata = composite_eval_metadata ,
2951+ evaluations = evaluations ,
2952+ )
29752953
2976- return ExperimentItemResult (
2977- item = item ,
2978- output = output ,
2979- evaluations = evaluations ,
2980- trace_id = trace_id ,
2981- dataset_run_id = dataset_run_id ,
2982- )
2954+ # Handle async composite evaluators
2955+ if asyncio .iscoroutine (result ):
2956+ result = await result
2957+
2958+ # Normalize to list
2959+ composite_evals : List [Evaluation ] = []
2960+ if isinstance (result , (dict , Evaluation )):
2961+ composite_evals = [result ] # type: ignore
2962+ elif isinstance (result , list ):
2963+ composite_evals = result # type: ignore
2964+
2965+ # Store composite evaluations as scores and add to evaluations list
2966+ for composite_evaluation in composite_evals :
2967+ self .create_score (
2968+ trace_id = trace_id ,
2969+ observation_id = span .id ,
2970+ name = composite_evaluation .name ,
2971+ value = composite_evaluation .value , # type: ignore
2972+ comment = composite_evaluation .comment ,
2973+ metadata = composite_evaluation .metadata ,
2974+ config_id = composite_evaluation .config_id ,
2975+ data_type = composite_evaluation .data_type , # type: ignore
2976+ )
2977+ evaluations .append (composite_evaluation )
2978+
2979+ except Exception as e :
2980+ langfuse_logger .error (f"Composite evaluator failed: { e } " )
2981+
2982+ return ExperimentItemResult (
2983+ item = item ,
2984+ output = output ,
2985+ evaluations = evaluations ,
2986+ trace_id = trace_id ,
2987+ dataset_run_id = dataset_run_id ,
2988+ )
29832989
29842990 def _create_experiment_run_name (
29852991 self , * , name : Optional [str ] = None , run_name : Optional [str ] = None
0 commit comments