From 23dfa7318e478efb94f45cb53bedceff10901f35 Mon Sep 17 00:00:00 2001 From: aby42 Date: Wed, 25 Mar 2026 12:51:33 +0800 Subject: [PATCH] feat(training): add ModelEvaluate with reasoning and performance metrics Signed-off-by: aby42 --- .gitignore | 2 + src/training/model_eval/onboard/README.md | 54 ++ src/training/model_eval/onboard/__init__.py | 4 + src/training/model_eval/onboard/__main__.py | 5 + src/training/model_eval/onboard/arc_eval.py | 120 +++++ src/training/model_eval/onboard/cli.py | 202 ++++++++ src/training/model_eval/onboard/constants.py | 40 ++ src/training/model_eval/onboard/evaluator.py | 86 ++++ src/training/model_eval/onboard/mmlu_eval.py | 151 ++++++ src/training/model_eval/onboard/report.py | 85 +++ .../model_eval/onboard/system_eval.py | 484 ++++++++++++++++++ src/training/model_eval/onboard/thresholds.py | 44 ++ src/training/model_eval/onboard/types.py | 29 ++ src/training/model_eval/onboard_eval.py | 15 + 14 files changed, 1321 insertions(+) create mode 100644 src/training/model_eval/onboard/README.md create mode 100644 src/training/model_eval/onboard/__init__.py create mode 100644 src/training/model_eval/onboard/__main__.py create mode 100644 src/training/model_eval/onboard/arc_eval.py create mode 100644 src/training/model_eval/onboard/cli.py create mode 100644 src/training/model_eval/onboard/constants.py create mode 100644 src/training/model_eval/onboard/evaluator.py create mode 100644 src/training/model_eval/onboard/mmlu_eval.py create mode 100644 src/training/model_eval/onboard/report.py create mode 100644 src/training/model_eval/onboard/system_eval.py create mode 100644 src/training/model_eval/onboard/thresholds.py create mode 100644 src/training/model_eval/onboard/types.py create mode 100644 src/training/model_eval/onboard_eval.py diff --git a/.gitignore b/.gitignore index 42f1104f68..ea51323903 100644 --- a/.gitignore +++ b/.gitignore @@ -203,3 +203,5 @@ src/training/cache_embeddings/aws/vllm-inventory-*.ini grafana/ grafana-data/ prometheus-data/ +src/semantic-router/.cache/ +src/semantic-router/.cache/ diff --git a/src/training/model_eval/onboard/README.md b/src/training/model_eval/onboard/README.md new file mode 100644 index 0000000000..076cbc86ae --- /dev/null +++ b/src/training/model_eval/onboard/README.md @@ -0,0 +1,54 @@ +# Onboarding Evaluation + +This module provides a unified onboarding workflow for model evaluation, +threshold policy reporting, and optional routing model updates. + +## CLI + +The CLI is exposed via: + +```bash +python src/training/model_eval/onboard_eval.py --help +``` + +## Run System Eval + Threshold Report + +```bash +python src/training/model_eval/onboard_eval.py \ + --config onboarding_config.json \ + --test-name system_eval \ + --datasets mmlu-pro-en mmlu-prox-zh fact-check-en feedback-en \ + --max-samples 50 \ + --report-out system_eval_report.json \ + --thresholds-out onboarding_thresholds.json \ + --min-accuracy 0.7 \ + --max-latency-ms 2000 +``` + +## Write Thresholds Back Into Config + +```bash +python src/training/model_eval/onboard_eval.py \ + --config onboarding_config.json \ + --test-name system_eval \ + --datasets mmlu-pro-en fact-check-en \ + --thresholds-out onboarding_thresholds.json \ + --update-config +``` + +This writes an `onboarding_thresholds` object into the JSON config file (or into +`--config-out` if provided) so the evaluation policy is stored alongside the +model onboarding config. + +## Optional: Update Routing Models + +```bash +python src/training/model_eval/onboard_eval.py \ + --config onboarding_config.json \ + --test-name system_eval \ + --datasets mmlu-pro-en \ + --ml-benchmark-queries queries.jsonl \ + --ml-benchmark-model-config models.yaml \ + --ml-benchmark-output benchmark_output.jsonl \ + --ml-train-output ./ml-models +``` diff --git a/src/training/model_eval/onboard/__init__.py b/src/training/model_eval/onboard/__init__.py new file mode 100644 index 0000000000..3243d9762c --- /dev/null +++ b/src/training/model_eval/onboard/__init__.py @@ -0,0 +1,4 @@ +from .evaluator import OnboardEvaluate +from .types import ModelConfig, TestResult + +__all__ = ["OnboardEvaluate", "ModelConfig", "TestResult"] diff --git a/src/training/model_eval/onboard/__main__.py b/src/training/model_eval/onboard/__main__.py new file mode 100644 index 0000000000..a049ad7aea --- /dev/null +++ b/src/training/model_eval/onboard/__main__.py @@ -0,0 +1,5 @@ +from .cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/training/model_eval/onboard/arc_eval.py b/src/training/model_eval/onboard/arc_eval.py new file mode 100644 index 0000000000..f3e5598584 --- /dev/null +++ b/src/training/model_eval/onboard/arc_eval.py @@ -0,0 +1,120 @@ +from typing import Any, Dict, Optional + +import pandas as pd +from datasets import load_dataset +from openai import OpenAI +from tqdm import tqdm + +from .constants import ANSWER_PATTERN_ARC +from .types import TestResult + + +class ArcEvalMixin: + def _run_arc_challenge(self, samples: Optional[int] = 20) -> TestResult: + """Run ARC Challenge test""" + print(f"Starting ARC Challenge test, samples: {samples}") + + dataset = load_dataset("allenai/ai2_arc", "ARC-Challenge", split="train") + df = pd.DataFrame(dataset) + + if samples and len(df) > samples: + df = df.sample(samples, random_state=self.model_config.seed) + + results_df = self._evaluate_arc(df) + self.arc_results = results_df + + valid_results = results_df[results_df["success"]] + overall_accuracy = ( + valid_results["is_correct"].mean() if not valid_results.empty else 0.0 + ) + + test_result = TestResult( + model_name=self.model_config.model_name, + test_name="arc_challenge", + score=overall_accuracy, + metrics={ + "overall": overall_accuracy, + "total_questions": len(results_df), + "successful_queries": len(valid_results), + "failed_queries": len(results_df) - len(valid_results), + }, + details={"split": "train", "samples_evaluated": len(df)}, + ) + + self.test_results.append(test_result) + return test_result + + def _evaluate_arc(self, df: pd.DataFrame) -> pd.DataFrame: + """Evaluate ARC""" + client = OpenAI( + base_url=self.model_config.endpoint, + api_key=self.model_config.api_key or "dummy", + ) + + results = [] + for _, row in tqdm( + df.iterrows(), total=len(df), desc="Evaluating ARC Challenge" + ): + result = self._process_arc_question(client, row.to_dict()) + results.append(result) + + return pd.DataFrame(results) + + def _process_arc_question( + self, client: OpenAI, question_data: Dict[str, Any] + ) -> Dict[str, Any]: + """Process a single ARC question""" + question = question_data["question"] + choices = question_data["choices"] + correct_answer = question_data["answerKey"] + + formatted_options = "" + for label, text in zip(choices["label"], choices["text"]): + formatted_options += f"{label}) {text}\n" + + if self.model_config.use_cot: + prompt = ( + "Question: " + f"{question}\n\nOptions:\n{formatted_options}\n\n" + "Please solve this step-by-step, then provide your final answer in the format " + "'Answer: [letter]'." + ) + else: + prompt = ( + "Question: " + f"{question}\n\nOptions:\n{formatted_options}\n\n" + "Please choose the correct answer from the options above. Provide your answer " + "in the format 'Answer: [letter]'." + ) + + response = client.chat.completions.create( + model=self.model_config.model_name, + messages=[{"role": "user", "content": prompt}], + max_tokens=self.model_config.max_tokens, + temperature=self.model_config.temperature, + ) + response_text = response.choices[0].message.content + + predicted_answer = self._extract_answer_arc(response_text) + is_correct = predicted_answer == correct_answer + + return { + "id": question_data["id"], + "question": question, + "correct_answer": correct_answer, + "predicted_answer": predicted_answer, + "is_correct": is_correct, + "success": True, + } + + def _extract_answer_arc(self, response: str) -> Optional[str]: + """Extract ARC answer from response""" + match = ANSWER_PATTERN_ARC.search(response) + if match: + return match.group(1).upper() + + for char in reversed(response): + if char.upper() in "ABCD": + return char.upper() + + return None diff --git a/src/training/model_eval/onboard/cli.py b/src/training/model_eval/onboard/cli.py new file mode 100644 index 0000000000..f97f768f79 --- /dev/null +++ b/src/training/model_eval/onboard/cli.py @@ -0,0 +1,202 @@ +import argparse +import json +import subprocess +from pathlib import Path +from typing import Any, Dict, List, Optional + +from .evaluator import OnboardEvaluate +from .thresholds import build_threshold_report + + +def _load_config(config_path: Path) -> Dict[str, Any]: + with config_path.open("r", encoding="utf-8") as handle: + return json.load(handle) + + +def _write_config(config_path: Path, config: Dict[str, Any]) -> None: + with config_path.open("w", encoding="utf-8") as handle: + json.dump(config, handle, indent=2) + handle.write("\n") + + +def _run_benchmark(args: argparse.Namespace) -> Optional[Path]: + if not args.ml_benchmark_queries: + return None + + queries = Path(args.ml_benchmark_queries) + output = Path(args.ml_benchmark_output) + bench_dir = Path(args.ml_selection_dir) + cmd = [ + "python", + str(bench_dir / "benchmark.py"), + "--queries", + str(queries), + "--output", + str(output), + ] + + if args.ml_benchmark_model_config: + cmd.extend(["--model-config", args.ml_benchmark_model_config]) + elif args.ml_benchmark_models: + cmd.extend(["--models", args.ml_benchmark_models]) + if args.ml_benchmark_endpoint: + cmd.extend(["--endpoint", args.ml_benchmark_endpoint]) + else: + raise ValueError("Provide --ml-benchmark-models or --ml-benchmark-model-config") + + if args.ml_benchmark_concurrency: + cmd.extend(["--concurrency", str(args.ml_benchmark_concurrency)]) + if args.ml_benchmark_max_tokens: + cmd.extend(["--max-tokens", str(args.ml_benchmark_max_tokens)]) + if args.ml_benchmark_temperature is not None: + cmd.extend(["--temperature", str(args.ml_benchmark_temperature)]) + if args.ml_benchmark_concise: + cmd.append("--concise") + if args.ml_benchmark_limit: + cmd.extend(["--limit", str(args.ml_benchmark_limit)]) + + subprocess.run(cmd, check=True) + return output + + +def _run_training(args: argparse.Namespace, benchmark_output: Optional[Path]) -> None: + if not args.ml_train_output: + return + + train_dir = Path(args.ml_selection_dir) + if not args.ml_train_data and benchmark_output is None: + raise ValueError("Provide --ml-train-data or enable ML benchmark first") + + data_file = Path(args.ml_train_data or benchmark_output) + + cmd = [ + "python", + str(train_dir / "train.py"), + "--data-file", + str(data_file), + "--output-dir", + str(args.ml_train_output), + ] + + if args.ml_train_algorithm: + cmd.extend(["--algorithm", args.ml_train_algorithm]) + if args.ml_train_device: + cmd.extend(["--device", args.ml_train_device]) + if args.ml_train_embedding_model: + cmd.extend(["--embedding-model", args.ml_train_embedding_model]) + if args.ml_train_quality_weight: + cmd.extend(["--quality-weight", str(args.ml_train_quality_weight)]) + if args.ml_train_batch_size: + cmd.extend(["--batch-size", str(args.ml_train_batch_size)]) + + subprocess.run(cmd, check=True) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Model onboarding pipeline (eval, thresholds, ML routing models)" + ) + parser.add_argument( + "--config", required=True, help="Path to onboarding config JSON" + ) + parser.add_argument( + "--test-name", + default="system_eval", + choices=["system_eval", "arc_challenge", "mmlu_pro"], + help="Evaluation to run", + ) + parser.add_argument("--datasets", nargs="*", help="System eval dataset IDs") + parser.add_argument("--max-samples", type=int, default=50) + parser.add_argument("--input-path", help="Text/jsonl input for system_eval") + parser.add_argument("--report-out", help="Write JSON report to this path") + + parser.add_argument("--thresholds-out", help="Write threshold report JSON") + parser.add_argument("--min-accuracy", type=float, default=0.7) + parser.add_argument("--max-latency-ms", type=float, default=2000.0) + parser.add_argument( + "--update-config", + action="store_true", + help="Write onboarding thresholds back into config JSON", + ) + parser.add_argument("--config-out", help="Write updated config to a new path") + + parser.add_argument( + "--ml-selection-dir", + default="src/training/model_selection/ml_model_selection", + help="Path to ML model selection directory", + ) + parser.add_argument("--ml-benchmark-queries", help="Queries JSONL for ML benchmark") + parser.add_argument("--ml-benchmark-models", help="Comma-separated model list") + parser.add_argument("--ml-benchmark-model-config", help="models.yaml for benchmark") + parser.add_argument("--ml-benchmark-endpoint", help="Endpoint for --models mode") + parser.add_argument("--ml-benchmark-output", default="benchmark_output.jsonl") + parser.add_argument("--ml-benchmark-concurrency", type=int) + parser.add_argument("--ml-benchmark-max-tokens", type=int) + parser.add_argument("--ml-benchmark-temperature", type=float) + parser.add_argument("--ml-benchmark-concise", action="store_true") + parser.add_argument("--ml-benchmark-limit", type=int) + + parser.add_argument("--ml-train-data", help="Benchmark output JSONL for training") + parser.add_argument("--ml-train-output", help="Output dir for trained models") + parser.add_argument("--ml-train-algorithm", help="all|knn|kmeans|svm|mlp") + parser.add_argument("--ml-train-device", help="cpu|cuda|mps") + parser.add_argument("--ml-train-embedding-model", help="Embedding model name") + parser.add_argument("--ml-train-quality-weight", type=float) + parser.add_argument("--ml-train-batch-size", type=int) + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + + config_path = Path(args.config) + config = _load_config(config_path) + + evaluator = OnboardEvaluate(config_path=str(config_path)) + evaluator.parse(config) + + if args.test_name == "system_eval": + datasets = args.datasets + result = evaluator.run_performance_test( + "system_eval", + datasets=datasets, + max_samples=args.max_samples, + input_path=args.input_path, + ) + else: + result = evaluator.run_performance_test(args.test_name) + + if args.report_out: + evaluator.generate_report(args.report_out) + + threshold_report = None + if args.test_name == "system_eval" and evaluator.system_eval_summary: + threshold_report = build_threshold_report( + evaluator.system_eval_summary, + args.min_accuracy, + args.max_latency_ms, + ) + if args.thresholds_out: + threshold_path = Path(args.thresholds_out) + with threshold_path.open("w", encoding="utf-8") as handle: + json.dump(threshold_report, handle, indent=2) + handle.write("\n") + + if args.update_config and threshold_report is not None: + config["onboarding_thresholds"] = threshold_report + output_path = Path(args.config_out) if args.config_out else config_path + _write_config(output_path, config) + + benchmark_output = _run_benchmark(args) + _run_training(args, benchmark_output) + + print( + f"Completed {result.test_name} with score {result.score:.4f} (samples={result.metrics.get('total_samples')})" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/training/model_eval/onboard/constants.py b/src/training/model_eval/onboard/constants.py new file mode 100644 index 0000000000..16c0d3da4d --- /dev/null +++ b/src/training/model_eval/onboard/constants.py @@ -0,0 +1,40 @@ +import re + +ANSWER_PATTERN_ARC = re.compile(r"(?:answer(?:\sis)?:?\s*)(A|B|C|D)", re.IGNORECASE) +ANSWER_PATTERN_MMLU = re.compile( + r"(?:answer(?:\sis)?:?\s*)(A|B|C|D|E|F|G|H|I|J)", re.IGNORECASE +) + +# Full system signal keys exposed by semantic-router classification pipeline. +SYSTEM_SIGNAL_KEYS = [ + "keyword", + "embedding", + "domain", + "fact_check", + "user_feedback", + "preference", + "language", + "context", + "complexity", + "modality", + "authz", + "jailbreak", + "pii", +] + +# matched_signals keys are plural for some signals. +SYSTEM_MATCHED_SIGNAL_KEYS = { + "keyword": "keywords", + "embedding": "embeddings", + "domain": "domains", + "fact_check": "fact_check", + "user_feedback": "user_feedback", + "preference": "preferences", + "language": "language", + "context": "context", + "complexity": "complexity", + "modality": "modality", + "authz": "authz", + "jailbreak": "jailbreak", + "pii": "pii", +} diff --git a/src/training/model_eval/onboard/evaluator.py b/src/training/model_eval/onboard/evaluator.py new file mode 100644 index 0000000000..dd8d0c55e9 --- /dev/null +++ b/src/training/model_eval/onboard/evaluator.py @@ -0,0 +1,86 @@ +import random +from typing import Any, Dict, List, Optional + +import numpy as np +import pandas as pd + +from .arc_eval import ArcEvalMixin +from .mmlu_eval import MmluEvalMixin +from .report import ReportMixin +from .system_eval import SystemEvalMixin +from .types import ModelConfig, TestResult + + +class OnboardEvaluate(SystemEvalMixin, ArcEvalMixin, MmluEvalMixin, ReportMixin): + """Model onboarding evaluation class (system-level eval via /api/v1/eval)""" + + def __init__(self, config_path: Optional[str] = None): + """ + Initialize OnboardEvaluate + + Args: + config_path: Path to config file + """ + self.config_path = config_path + self.model_config: Optional[ModelConfig] = None + self.test_results: List[TestResult] = [] + self.arc_results: Optional[pd.DataFrame] = None + self.mmlu_results: Optional[pd.DataFrame] = None + self.system_eval_responses: List[Dict[str, Any]] = [] + self.system_eval_summary: List[Dict[str, Any]] = [] + + def parse(self, config: Dict[str, Any]) -> ModelConfig: + """ + Parse model configuration + + Args: + config: Config dict including model name, endpoint, etc. + + Returns: + ModelConfig: Parsed model config + """ + self.model_config = ModelConfig( + model_name=config.get("model_name", ""), + endpoint=config.get("endpoint", ""), + api_key=config.get("api_key", ""), + max_tokens=config.get("max_tokens", 512), + temperature=config.get("temperature", 0.0), + use_cot=config.get("use_cot", False), + seed=config.get("seed", 42), + extra_params=config.get("extra_params", {}), + ) + + random.seed(self.model_config.seed) + np.random.seed(self.model_config.seed) + + return self.model_config + + def run_performance_test(self, test_name: str, **kwargs) -> TestResult: + """ + Run model performance test + + Args: + test_name: Test name ("arc_challenge", "mmlu_pro", or "system_eval") + **kwargs: Test parameters + - samples: number of samples (arc_challenge) + - samples_per_category: samples per category (mmlu_pro) + - categories: MMLU-Pro categories + - texts: list of texts to evaluate (system_eval) + - options: intent options dict (system_eval) + - input_path: optional text/jsonl file with one prompt per line (system_eval) + - datasets: list of dataset IDs for system eval (system_eval) + - max_samples: max samples per dataset (system_eval) + + Returns: + TestResult: Test result + """ + if not self.model_config: + raise ValueError("Please call parse() first to load model config") + + if test_name == "arc_challenge": + return self._run_arc_challenge(**kwargs) + if test_name == "mmlu_pro": + return self._run_mmlu_pro(**kwargs) + if test_name == "system_eval": + return self._run_system_eval(**kwargs) + raise ValueError(f"Unsupported test type: {test_name}") diff --git a/src/training/model_eval/onboard/mmlu_eval.py b/src/training/model_eval/onboard/mmlu_eval.py new file mode 100644 index 0000000000..e3371b687e --- /dev/null +++ b/src/training/model_eval/onboard/mmlu_eval.py @@ -0,0 +1,151 @@ +from typing import Any, Dict, List, Optional + +import pandas as pd +from datasets import load_dataset +from openai import OpenAI +from tqdm import tqdm + +from .constants import ANSWER_PATTERN_MMLU +from .types import TestResult + + +class MmluEvalMixin: + def _run_mmlu_pro( + self, samples_per_category: int = 5, categories: Optional[List[str]] = None + ) -> TestResult: + """Run MMLU-Pro test""" + print(f"Starting MMLU-Pro test, samples per category: {samples_per_category}") + + dataset = load_dataset("TIGER-Lab/MMLU-Pro", split="test") + df = pd.DataFrame(dataset) + + if categories: + df = df[df["category"].isin(categories)] + + sampled_dfs = [] + for category in df["category"].unique(): + category_df = df[df["category"] == category] + if len(category_df) > samples_per_category: + sampled_df = category_df.sample( + samples_per_category, random_state=self.model_config.seed + ) + sampled_dfs.append(sampled_df) + else: + sampled_dfs.append(category_df) + df = pd.concat(sampled_dfs) + + results_df = self._evaluate_mmlu(df) + self.mmlu_results = results_df + + valid_results = results_df[results_df["success"]] + overall_accuracy = ( + valid_results["is_correct"].mean() if not valid_results.empty else 0.0 + ) + + category_accuracy = {} + for category in valid_results["category"].unique(): + category_df = valid_results[valid_results["category"] == category] + category_accuracy[category] = category_df["is_correct"].mean() + + test_result = TestResult( + model_name=self.model_config.model_name, + test_name="mmlu_pro", + score=overall_accuracy, + metrics=category_accuracy, + details={ + "split": "test", + "samples_evaluated": len(df), + "overall_accuracy": overall_accuracy, + }, + ) + + self.test_results.append(test_result) + return test_result + + def _evaluate_mmlu(self, df: pd.DataFrame) -> pd.DataFrame: + """Evaluate MMLU""" + client = OpenAI( + base_url=self.model_config.endpoint, + api_key=self.model_config.api_key or "dummy", + ) + + results = [] + for _, row in tqdm(df.iterrows(), total=len(df), desc="Evaluating MMLU-Pro"): + result = self._process_mmlu_question(client, row.to_dict()) + results.append(result) + + return pd.DataFrame(results) + + def _process_mmlu_question( + self, client: OpenAI, question_data: Dict[str, Any] + ) -> Dict[str, Any]: + """Process a single MMLU question""" + question = question_data["question"] + options = question_data["options"] + correct_answer = question_data["answer"] + category = question_data["category"] + + letter_mapping = { + 0: "A", + 1: "B", + 2: "C", + 3: "D", + 4: "E", + 5: "F", + 6: "G", + 7: "H", + 8: "I", + 9: "J", + } + formatted_options = "" + for i, option in enumerate(options): + if option.lower() != "n/a": + formatted_options += f"{letter_mapping[i]}) {option}\n" + + if self.model_config.use_cot: + prompt = ( + "Question: " + f"{question}\n\nOptions:\n{formatted_options}\n\n" + "Please solve this step-by-step, then provide your final answer in the format " + "'Answer: [letter]'." + ) + else: + prompt = ( + "Question: " + f"{question}\n\nOptions:\n{formatted_options}\n\n" + "Please choose the correct answer from the options above. Provide your answer " + "in the format 'Answer: [letter]'." + ) + + response = client.chat.completions.create( + model=self.model_config.model_name, + messages=[{"role": "user", "content": prompt}], + max_tokens=self.model_config.max_tokens, + temperature=self.model_config.temperature, + ) + response_text = response.choices[0].message.content + + predicted_answer = self._extract_answer_mmlu(response_text) + is_correct = predicted_answer == correct_answer + + return { + "question_id": question_data["question_id"], + "question": question, + "correct_answer": correct_answer, + "predicted_answer": predicted_answer, + "is_correct": is_correct, + "category": category, + "success": True, + } + + def _extract_answer_mmlu(self, response: str) -> Optional[str]: + """Extract MMLU answer from response""" + match = ANSWER_PATTERN_MMLU.search(response) + if match: + return match.group(1).upper() + + for char in reversed(response): + if char.upper() in "ABCDEFGHIJ": + return char.upper() + + return None diff --git a/src/training/model_eval/onboard/report.py b/src/training/model_eval/onboard/report.py new file mode 100644 index 0000000000..583d771267 --- /dev/null +++ b/src/training/model_eval/onboard/report.py @@ -0,0 +1,85 @@ +import json +from datetime import datetime +from typing import Optional + + +class ReportMixin: + def generate_report(self, output_path: Optional[str] = None) -> str: + """ + Generate test report + + Args: + output_path: output path; if None, auto-generate filename + + Returns: + str: report file path + """ + if not self.test_results: + raise ValueError("No test results available") + + report = self._generate_report() + + if output_path is None: + approach = "CoT" if self.model_config.use_cot else "Direct" + output_path = ( + f"{self.model_config.model_name.replace('/', '_')}_{approach}.json" + ) + + with open(output_path, "w", encoding="utf-8") as handle: + handle.write(report) + + return output_path + + def _generate_report(self) -> str: + """Generate JSON report""" + if self.system_eval_responses: + overall_score = next( + (r.score for r in self.test_results if r.test_name == "system_eval"), + 0.0, + ) + report_data = { + "endpoint": self._normalize_eval_endpoint(), + "generated_at": datetime.now().isoformat() + "Z", + "overall_score": overall_score, + "summary": self.system_eval_summary, + "results": self.system_eval_responses, + } + return json.dumps(report_data, indent=4, ensure_ascii=False) + + arc_result = next( + (r for r in self.test_results if r.test_name == "arc_challenge"), None + ) + mmlu_result = next( + (r for r in self.test_results if r.test_name == "mmlu_pro"), None + ) + + report_data = { + "model_name": self.model_config.model_name, + "approach": "Chain-of-Thought" if self.model_config.use_cot else "Direct", + } + + if arc_result: + report_data["global_metrics"] = { + "reasoning": { + "benchmark": "ARC-Challenge", + "results": {"overall": round(arc_result.score, 3)}, + } + } + + if mmlu_result: + report_data["domain_scores"] = { + "benchmark": "MMLU-Pro", + "results": {k: round(v, 2) for k, v in mmlu_result.metrics.items()}, + } + + metadata = { + "test_time": datetime.now().isoformat() + "Z", + "seed": self.model_config.seed, + } + if arc_result and arc_result.details: + metadata.update(arc_result.details) + elif mmlu_result and mmlu_result.details: + metadata.update(mmlu_result.details) + + report_data["metadata"] = metadata + return json.dumps(report_data, indent=4, ensure_ascii=False) diff --git a/src/training/model_eval/onboard/system_eval.py b/src/training/model_eval/onboard/system_eval.py new file mode 100644 index 0000000000..af0958b207 --- /dev/null +++ b/src/training/model_eval/onboard/system_eval.py @@ -0,0 +1,484 @@ +import json +import urllib.error +import urllib.request +from typing import Any, Dict, List, Optional, Tuple + +from datasets import load_dataset +from tqdm import tqdm + +from .constants import SYSTEM_MATCHED_SIGNAL_KEYS, SYSTEM_SIGNAL_KEYS +from .types import TestResult + + +class SystemEvalMixin: + def _run_system_eval( + self, + texts: Optional[List[str]] = None, + options: Optional[Dict[str, Any]] = None, + input_path: Optional[str] = None, + datasets: Optional[List[str]] = None, + max_samples: int = 50, + ) -> TestResult: + """Run system-level evaluation via /api/v1/eval""" + if texts is None: + texts = [] + + if datasets: + return self._run_system_eval_datasets(datasets, max_samples=max_samples) + + if input_path: + texts.extend(self._load_texts_from_file(input_path)) + + if not texts: + raise ValueError("system_eval requires a non-empty 'texts' list") + + if options is None: + options = {"return_probabilities": False, "include_explanation": True} + + responses: List[Dict[str, Any]] = [] + for text in tqdm(texts, total=len(texts), desc="Evaluating System (API)"): + payload: Dict[str, Any] = {"text": text} + payload["options"] = options + response = self._post_eval_request(payload) + responses.append({"input": text, "response": response}) + + self.system_eval_responses = responses + + test_result = TestResult( + model_name=self.model_config.model_name, + test_name="system_eval", + score=0.0, + metrics={"total_samples": len(texts)}, + details={"endpoint": self._normalize_eval_endpoint()}, + ) + + self.test_results.append(test_result) + return test_result + + def _normalize_eval_endpoint(self) -> str: + endpoint = self.model_config.endpoint.rstrip("/") + if endpoint.endswith("/api/v1/eval"): + return endpoint + return endpoint + "/api/v1/eval" + + def _post_eval_request(self, payload: Dict[str, Any]) -> Dict[str, Any]: + """POST request to /api/v1/eval and return raw JSON response""" + if not self.model_config: + raise ValueError("Please call parse() first to load model config") + + endpoint = self._normalize_eval_endpoint() + data = json.dumps(payload).encode("utf-8") + request = urllib.request.Request( + endpoint, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + try: + with urllib.request.urlopen(request) as response: + body = response.read().decode("utf-8") + except urllib.error.HTTPError as exc: + error_body = exc.read().decode("utf-8") if exc.fp else "" + raise RuntimeError( + f"Eval request failed ({exc.code}): {error_body}" + ) from exc + + return json.loads(body) if body else {} + + def _run_system_eval_datasets( + self, datasets: List[str], max_samples: int = 50 + ) -> TestResult: + """Run system-level evaluation over curated datasets.""" + if not datasets: + raise ValueError("datasets must be a non-empty list") + + summary: List[Dict[str, Any]] = [] + all_results: List[Dict[str, Any]] = [] + + for dataset_id in datasets: + dimension, dataset, config = self._load_dataset_rows( + dataset_id, max_samples + ) + dataset_results, metrics = self._evaluate_dataset_rows( + dataset_id, dimension, dataset, config + ) + summary.append(metrics) + all_results.extend(dataset_results) + + self.system_eval_responses = all_results + + overall_score = self._compute_overall_score(summary) + total_samples = sum(item.get("total_samples", 0) for item in summary) + test_result = TestResult( + model_name=self.model_config.model_name, + test_name="system_eval", + score=overall_score, + metrics={"overall_score": overall_score, "total_samples": total_samples}, + details={"endpoint": self._normalize_eval_endpoint()}, + ) + + self.test_results.append(test_result) + self.system_eval_summary = summary + return test_result + + def _compute_overall_score(self, summary: List[Dict[str, Any]]) -> float: + accuracies = [item.get("accuracy") for item in summary if item.get("accuracy")] + if not accuracies: + return 0.0 + return float(sum(accuracies) / len(accuracies)) + + def _load_dataset_rows( + self, dataset_id: str, max_samples: int + ) -> Tuple[str, Any, Dict[str, Any]]: + dataset_registry = { + "mmlu-pro-en": { + "repo": "TIGER-Lab/MMLU-Pro", + "split": "test", + "text_field": "question", + "label_field": "category", + "dimension": "domain", + }, + "mmlu-prox-zh": { + "repo": "li-lab/MMLU-ProX", + "config": "zh", + "split": "test", + "text_field": "question", + "label_field": "category", + "dimension": "domain", + }, + "fact-check-en": { + "repo": "llm-semantic-router/fact-check-classification-dataset", + "config": "en", + "split": "test", + "text_field": "text", + "label_field": "label", + "dimension": "fact_check", + }, + "feedback-en": { + "repo": "llm-semantic-router/feedback-detector-dataset", + "config": "en", + "split": "test", + "text_field": "text", + "label_field": "label_name", + "dimension": "user_feedback", + }, + } + + if dataset_id not in dataset_registry: + raise ValueError(f"Unsupported dataset: {dataset_id}") + + config = dataset_registry[dataset_id] + repo = config["repo"] + split = config["split"] + config_name = config.get("config") + + dataset = self._load_dataset_with_fallback(repo, config_name, split) + + if max_samples > 0 and len(dataset) > max_samples: + dataset = dataset.shuffle(seed=self.model_config.seed).select( + range(max_samples) + ) + + return config["dimension"], dataset, config + + def _load_dataset_with_fallback( + self, repo: str, config_name: Optional[str], split: str + ) -> Any: + """Load dataset with config and split fallbacks.""" + split_candidates = [split, "validation", "train"] + + def load_with_split(cfg: Optional[str], split_name: str) -> Any: + if cfg: + return load_dataset(repo, cfg, split=split_name) + return load_dataset(repo, split=split_name) + + last_error: Optional[Exception] = None + + for split_name in split_candidates: + try: + return load_with_split(config_name, split_name) + except Exception as exc: + last_error = exc + try: + if config_name: + return load_with_split(None, split_name) + except Exception as fallback_exc: + last_error = fallback_exc + + if last_error: + raise last_error + raise ValueError("Failed to load dataset") + + def _evaluate_dataset_rows( + self, dataset_id: str, dimension: str, dataset: Any, config: Dict[str, Any] + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + total = 0 + correct = 0 + incorrect = 0 + skipped = 0 + latency_ms: List[float] = [] + signal_stats = self._init_signal_stats() + + results: List[Dict[str, Any]] = [] + for row in tqdm(dataset, desc=f"Evaluating {dataset_id}"): + text = self._extract_text(row, config) + expected = self._extract_label(row, config, dataset, dimension) + + response = self._post_eval_request({"text": text}) + self._collect_signal_stats(signal_stats, response) + predicted = self._extract_prediction(response, dimension) + + is_correct = None + if expected is None or predicted is None: + skipped += 1 + else: + is_correct = expected == predicted + if is_correct: + correct += 1 + else: + incorrect += 1 + + total += 1 + latency = self._extract_latency(response, dimension) + if latency is not None: + latency_ms.append(latency) + + results.append( + { + "dataset": dataset_id, + "dimension": dimension, + "input": text, + "expected": expected, + "predicted": predicted, + "correct": is_correct, + "response": response, + } + ) + + accuracy = float(correct / total) if total else 0.0 + avg_latency = float(sum(latency_ms) / len(latency_ms)) if latency_ms else 0.0 + metrics = { + "dataset": dataset_id, + "dimension": dimension, + "total_samples": total, + "correct": correct, + "incorrect": incorrect, + "skipped": skipped, + "accuracy": accuracy, + "avg_latency_ms": avg_latency, + "signal_coverage": self._finalize_signal_stats(signal_stats, total), + } + return results, metrics + + def _init_signal_stats(self) -> Dict[str, Dict[str, float]]: + stats: Dict[str, Dict[str, float]] = {} + for signal in SYSTEM_SIGNAL_KEYS: + stats[signal] = { + "matched_samples": 0.0, + "metric_available_samples": 0.0, + "confidence_samples": 0.0, + "confidence_nonzero_samples": 0.0, + "latency_sum_ms": 0.0, + "confidence_sum": 0.0, + } + return stats + + def _collect_signal_stats( + self, signal_stats: Dict[str, Dict[str, float]], response: Dict[str, Any] + ) -> None: + decision = response.get("decision_result") or {} + matched = decision.get("matched_signals") or {} + metrics = response.get("metrics") or {} + + for signal in SYSTEM_SIGNAL_KEYS: + matched_key = SYSTEM_MATCHED_SIGNAL_KEYS.get(signal, signal) + matched_values = matched.get(matched_key) or [] + if matched_values: + signal_stats[signal]["matched_samples"] += 1.0 + + metric_entry = metrics.get(signal) or {} + if not metric_entry: + continue + + latency = metric_entry.get("execution_time_ms") + if latency is not None: + signal_stats[signal]["metric_available_samples"] += 1.0 + signal_stats[signal]["latency_sum_ms"] += float(latency) + + confidence = metric_entry.get("confidence") + if confidence is not None: + signal_stats[signal]["confidence_samples"] += 1.0 + signal_stats[signal]["confidence_sum"] += float(confidence) + if float(confidence) > 0.0: + signal_stats[signal]["confidence_nonzero_samples"] += 1.0 + + def _finalize_signal_stats( + self, signal_stats: Dict[str, Dict[str, float]], total_samples: int + ) -> Dict[str, Dict[str, float]]: + total = float(total_samples) if total_samples > 0 else 1.0 + result: Dict[str, Dict[str, float]] = {} + for signal, stats in signal_stats.items(): + metric_count = stats["metric_available_samples"] + confidence_count = stats["confidence_samples"] + + avg_latency = 0.0 + if metric_count > 0: + avg_latency = stats["latency_sum_ms"] / metric_count + + avg_confidence = 0.0 + if confidence_count > 0: + avg_confidence = stats["confidence_sum"] / confidence_count + + result[signal] = { + "matched_samples": int(stats["matched_samples"]), + "match_rate": stats["matched_samples"] / total, + "metric_available_samples": int(metric_count), + "metric_coverage_rate": metric_count / total, + "avg_latency_ms": avg_latency, + "confidence_nonzero_samples": int(stats["confidence_nonzero_samples"]), + "confidence_nonzero_rate": stats["confidence_nonzero_samples"] / total, + "avg_confidence": avg_confidence, + } + return result + + def _extract_text(self, row: Dict[str, Any], config: Dict[str, Any]) -> str: + for field in [ + config.get("text_field"), + "text", + "prompt", + "question", + "input", + ]: + if field and field in row and row[field]: + return str(row[field]) + raise ValueError("No text field found in dataset row") + + def _extract_label( + self, + row: Dict[str, Any], + config: Dict[str, Any], + dataset: Any, + dimension: str, + ) -> Optional[str]: + label_field = config.get("label_field") + label_value = None + if label_field and label_field in row: + label_value = row[label_field] + else: + for field in ["label", "category", "domain", "class"]: + if field in row: + label_field = field + label_value = row[field] + break + + if label_value is None: + return None + + label_name = self._label_value_to_name(dataset, label_field, label_value) + + if dimension == "fact_check": + return self._normalize_fact_check_label(label_name) + if dimension == "user_feedback": + return self._normalize_feedback_label(label_name) + + return self._normalize_label(label_name) + + def _label_value_to_name( + self, dataset: Any, field: Optional[str], value: Any + ) -> str: + if field and hasattr(dataset, "features") and field in dataset.features: + feature = dataset.features[field] + if hasattr(feature, "names") and isinstance(value, int): + names = feature.names + if 0 <= value < len(names): + return str(names[value]) + + if isinstance(value, (int, float)): + return str(int(value)) + if isinstance(value, bool): + return "1" if value else "0" + return str(value) + + def _normalize_label(self, label: str) -> str: + return str(label).strip().lower().replace(" ", "_") + + def _normalize_fact_check_label(self, label: str) -> str: + normalized = self._normalize_label(label) + if normalized in {"1", "true", "needs_fact_check", "fact_check_needed"}: + return "needs_fact_check" + if normalized in {"0", "false", "no_fact_check_needed", "no_fact_check"}: + return "no_fact_check_needed" + if "need" in normalized or "fact" in normalized: + return "needs_fact_check" + return "no_fact_check_needed" + + def _normalize_feedback_label(self, label: str) -> str: + normalized = self._normalize_label(label) + mapping = { + "sat": "satisfied", + "need_clarification": "need_clarification", + "clarification": "need_clarification", + "satisfied": "satisfied", + "want_different": "want_different", + "different": "want_different", + "wrong_answer": "wrong_answer", + "wrong": "wrong_answer", + } + return mapping.get(normalized, normalized) + + def _extract_prediction( + self, response: Dict[str, Any], dimension: str + ) -> Optional[str]: + decision = response.get("decision_result") or {} + matched = decision.get("matched_signals") or {} + key = { + "domain": "domains", + "fact_check": "fact_check", + "user_feedback": "user_feedback", + }.get(dimension) + + if not key: + return None + + values = matched.get(key) or [] + if not values: + return None + return self._normalize_label(values[0]) + + def _extract_latency( + self, response: Dict[str, Any], dimension: str + ) -> Optional[float]: + metrics = response.get("metrics") or {} + metric_key = { + "domain": "domain", + "fact_check": "fact_check", + "user_feedback": "user_feedback", + }.get(dimension) + if not metric_key: + return None + entry = metrics.get(metric_key) or {} + value = entry.get("execution_time_ms") + if value is None: + return None + return float(value) + + def _load_texts_from_file(self, input_path: str) -> List[str]: + """Load texts from a txt/jsonl file (one prompt per line).""" + texts: List[str] = [] + with open(input_path, "r", encoding="utf-8") as handle: + for line in handle: + stripped = line.strip() + if not stripped: + continue + if stripped.startswith("{"): + try: + record = json.loads(stripped) + text = record.get("text") or record.get("prompt") + if text: + texts.append(str(text)) + except json.JSONDecodeError: + texts.append(stripped) + else: + texts.append(stripped) + return texts diff --git a/src/training/model_eval/onboard/thresholds.py b/src/training/model_eval/onboard/thresholds.py new file mode 100644 index 0000000000..33c96b9342 --- /dev/null +++ b/src/training/model_eval/onboard/thresholds.py @@ -0,0 +1,44 @@ +from datetime import datetime +from typing import Any, Dict, List + + +def build_threshold_report( + summary: List[Dict[str, Any]], + min_accuracy: float, + max_latency_ms: float, +) -> Dict[str, Any]: + violations: List[Dict[str, Any]] = [] + for item in summary: + accuracy = item.get("accuracy") + latency = item.get("avg_latency_ms") + if accuracy is not None and accuracy < min_accuracy: + violations.append( + { + "dataset": item.get("dataset"), + "dimension": item.get("dimension"), + "metric": "accuracy", + "value": accuracy, + "threshold": min_accuracy, + } + ) + if latency is not None and latency > max_latency_ms: + violations.append( + { + "dataset": item.get("dataset"), + "dimension": item.get("dimension"), + "metric": "avg_latency_ms", + "value": latency, + "threshold": max_latency_ms, + } + ) + + return { + "generated_at": datetime.now().isoformat() + "Z", + "policy": { + "min_accuracy": min_accuracy, + "max_latency_ms": max_latency_ms, + }, + "summary": summary, + "violations": violations, + "passed": len(violations) == 0, + } diff --git a/src/training/model_eval/onboard/types.py b/src/training/model_eval/onboard/types.py new file mode 100644 index 0000000000..9c8af3b976 --- /dev/null +++ b/src/training/model_eval/onboard/types.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, Optional + + +@dataclass +class ModelConfig: + """Model configuration""" + + model_name: str + endpoint: str + api_key: str = "" + max_tokens: int = 512 + temperature: float = 0.0 + use_cot: bool = False + seed: int = 42 + extra_params: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class TestResult: + """Test result""" + + model_name: str + test_name: str + score: float + metrics: Dict[str, Any] + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + details: Optional[Dict[str, Any]] = None diff --git a/src/training/model_eval/onboard_eval.py b/src/training/model_eval/onboard_eval.py new file mode 100644 index 0000000000..46a9c9ac3d --- /dev/null +++ b/src/training/model_eval/onboard_eval.py @@ -0,0 +1,15 @@ +"""Thin entrypoint for onboarding evaluation (imports modular implementation).""" + +from .onboard import ModelConfig, OnboardEvaluate, TestResult + +__all__ = ["ModelConfig", "OnboardEvaluate", "TestResult"] + + +def _run_cli() -> int: + from .onboard.cli import main + + return main() + + +if __name__ == "__main__": + raise SystemExit(_run_cli())