Skip to content
38 changes: 38 additions & 0 deletions core/horizon_selector_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from dataclasses import dataclass
from typing import Protocol, Sequence

from core.planner_selector_interface import AggregateBenchmarkRow


@dataclass(frozen=True)
class HorizonSelectionRequest:
dataset: str
scenario: str
success_threshold: float = 0.999
prefer_minimal: bool = True
fallback_metric: str = "final_distance"


@dataclass(frozen=True)
class HorizonRecommendation:
variant: str
dataset: str
scenario: str
planner: str
grad_update_horizon: int
success_rate: float
final_distance: float
score: float
rationale: str


class HorizonSelector(Protocol):
name: str
paradigm: str

def recommend(
self,
rows: Sequence[AggregateBenchmarkRow],
request: HorizonSelectionRequest,
) -> HorizonRecommendation:
...
Empty file.
192 changes: 192 additions & 0 deletions experiments/horizon_selection/difficulty_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
"""Map an off-grid (scenario, speed, radius) probe to a recommendation
from the sweep-trained HorizonSelector.

Strategy: load a sweep summary CSV, group rows by their difficulty cell
(scenario, dyn_speed_scale, dyn_radius_scale), and at probe time pick
the nearest cell within the same scenario by Euclidean distance on
(speed_scale, radius_scale). The selector is then run on the rows of
that cell, so the returned recommendation reflects the regime the
sweep already characterised.

This is the simplest "deployment" interface: no model training, just
nearest-neighbour over an offline benchmark grid. It also surfaces how
big the extrapolation step is by returning the matched cell distance.
"""

from __future__ import annotations

import csv
from dataclasses import dataclass
from math import hypot
from pathlib import Path
from typing import Sequence

from core.horizon_selector_interface import (
HorizonRecommendation,
HorizonSelectionRequest,
HorizonSelector,
)
from core.planner_selector_interface import AggregateBenchmarkRow


@dataclass(frozen=True)
class IndexedRecommendation:
matched_speed: float
matched_radius: float
distance: float
recommendation: HorizonRecommendation


def _dataset_label(speed: float, radius: float) -> str:
return f"speed={speed:+.2f}_radius={radius:.2f}"


def load_indexed_rows(summary_csv: Path) -> list[AggregateBenchmarkRow]:
rows: list[AggregateBenchmarkRow] = []
with open(summary_csv) as f:
for r in csv.DictReader(f):
rows.append(AggregateBenchmarkRow(
dataset=_dataset_label(
float(r["dyn_speed_scale"]),
float(r["dyn_radius_scale"])),
scenario=r["scenario"],
planner=r["planner"],
k_samples=4096,
success=float(r["success_rate"]),
steps=0.0,
final_distance=float(r["final_distance"]),
cumulative_cost=float(r["cumulative_cost"]),
avg_control_ms=float(r["avg_control_ms"]),
))
return rows


def known_cells(rows: Sequence[AggregateBenchmarkRow], scenario: str,
) -> list[tuple[float, float]]:
seen: set[tuple[float, float]] = set()
for row in rows:
if row.scenario != scenario:
continue
# Recover (speed, radius) from the dataset label.
sp = float(row.dataset.split("speed=")[1].split("_radius=")[0])
rad = float(row.dataset.split("_radius=")[1])
seen.add((sp, rad))
return sorted(seen)


def nearest_cell(rows: Sequence[AggregateBenchmarkRow], scenario: str,
speed: float, radius: float,
speed_weight: float = 1.0,
radius_weight: float = 1.0,
) -> tuple[float, float, float]:
"""Return (matched_speed, matched_radius, distance) for the closest
known cell of ``scenario`` to the probe."""
cells = known_cells(rows, scenario)
if not cells:
raise ValueError(f"No sweep cells found for scenario {scenario!r}")
best = min(cells,
key=lambda c: hypot(
speed_weight * (c[0] - speed),
radius_weight * (c[1] - radius)))
dist = hypot(speed_weight * (best[0] - speed),
radius_weight * (best[1] - radius))
return best[0], best[1], dist


def recommend_for_probe(
selector: HorizonSelector,
rows: Sequence[AggregateBenchmarkRow],
scenario: str,
speed: float,
radius: float,
success_threshold: float = 0.999,
speed_weight: float = 1.0,
radius_weight: float = 1.0,
) -> IndexedRecommendation:
matched_sp, matched_rad, dist = nearest_cell(
rows, scenario, speed, radius, speed_weight, radius_weight)
rec = selector.recommend(
rows,
HorizonSelectionRequest(
dataset=_dataset_label(matched_sp, matched_rad),
scenario=scenario,
success_threshold=success_threshold,
),
)
return IndexedRecommendation(
matched_speed=matched_sp,
matched_radius=matched_rad,
distance=dist,
recommendation=rec,
)


def _k_nearest(rows: Sequence[AggregateBenchmarkRow], scenario: str,
speed: float, radius: float, k: int,
speed_weight: float, radius_weight: float,
) -> list[tuple[float, float, float]]:
cells = known_cells(rows, scenario)
if not cells:
raise ValueError(f"No sweep cells found for scenario {scenario!r}")
scored = [
(sp, rad,
hypot(speed_weight * (sp - speed),
radius_weight * (rad - radius)))
for sp, rad in cells
]
scored.sort(key=lambda c: c[2])
return scored[:k]


def recommend_for_probe_robust(
selector: HorizonSelector,
rows: Sequence[AggregateBenchmarkRow],
scenario: str,
speed: float,
radius: float,
k: int = 3,
success_threshold: float = 0.999,
speed_weight: float = 1.0,
radius_weight: float = 1.0,
) -> IndexedRecommendation:
"""Conservative variant: poll the k nearest cells, take the
recommendation whose horizon is the maximum across them.

The motivation is that minimal-sufficient picks the smallest
horizon meeting the threshold, so a corner case where exactly one
cell happens to have a tiny horizon succeed can dominate a probe
sitting between that cell and a tougher one. Taking the max over
k neighbours hedges against that.

Tie-breaking among recommendations with equal horizons picks the
candidate whose matched cell is closest to the probe (so the
returned ``matched_speed`` / ``matched_radius`` and ``distance``
remain meaningful).
"""
neighbours = _k_nearest(
rows, scenario, speed, radius, k, speed_weight, radius_weight)
recs: list[IndexedRecommendation] = []
for matched_sp, matched_rad, dist in neighbours:
rec = selector.recommend(
rows,
HorizonSelectionRequest(
dataset=_dataset_label(matched_sp, matched_rad),
scenario=scenario,
success_threshold=success_threshold,
),
)
recs.append(IndexedRecommendation(
matched_speed=matched_sp,
matched_radius=matched_rad,
distance=dist,
recommendation=rec,
))

best = max(
recs,
key=lambda r: (
r.recommendation.grad_update_horizon,
-r.distance,
),
)
return best
33 changes: 33 additions & 0 deletions experiments/horizon_selection/horizon_naming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Parse the gradient-update horizon out of a Diff-MPPI planner name.

Conventions used by `benchmark_diff_mppi.cu`:
- ``diff_mppi_3_early1`` -> horizon 1 step
- ``diff_mppi_3_early2`` -> horizon 2
- ``diff_mppi_3_early4`` -> horizon 4
- ``diff_mppi_3_early8`` -> horizon 8
- ``diff_mppi_3_early16`` -> horizon 16
- ``diff_mppi_3`` -> full horizon (sentinel 0 in the C++ side; we
surface it as 0 here so callers can spot it)
- ``mppi``, ``feedback_*``, ``step_mppi`` -> no gradient horizon (None)

The convention is intentionally narrow: callers that need to recover the
actual full-horizon length (the C++ default is 30 steps) must keep that
constant themselves.
"""

from __future__ import annotations


FULL_HORIZON_SENTINEL = 0


def parse_grad_update_horizon(planner: str) -> int | None:
if not planner.startswith("diff_mppi"):
return None
if "_early" in planner:
suffix = planner.split("_early", 1)[1]
try:
return int(suffix)
except ValueError:
return None
return FULL_HORIZON_SENTINEL
124 changes: 124 additions & 0 deletions experiments/horizon_selection/minimal_sufficient_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Functional minimal-sufficient horizon selector.

Strategy:
1. Keep only candidates whose planner name encodes a gradient-update
horizon (so ``mppi`` and feedback baselines are filtered out).
2. Among those whose ``success_rate >= success_threshold``, return the
candidate with the smallest horizon (ties broken by final_distance).
3. If no candidate meets the threshold, fall back to ranking by the
requested ``fallback_metric`` (default ``final_distance``) and pick
the best one.

The point of step 2 is to encode the "early8 sweet spot" finding as a
deployable policy: do not pay for a longer horizon than you need.
"""

from __future__ import annotations

from typing import Sequence

from core.horizon_selector_interface import (
HorizonRecommendation,
HorizonSelectionRequest,
HorizonSelector,
)
from core.planner_selector_interface import AggregateBenchmarkRow
from experiments.horizon_selection.horizon_naming import (
FULL_HORIZON_SENTINEL,
parse_grad_update_horizon,
)


def _effective_horizon(planner: str, full_horizon_steps: int) -> int:
raw = parse_grad_update_horizon(planner)
if raw is None:
return -1
if raw == FULL_HORIZON_SENTINEL:
return full_horizon_steps
return raw


class MinimalSufficientHorizonSelector(HorizonSelector):
name = "minimal_sufficient"
paradigm = "functional"

def __init__(self, full_horizon_steps: int = 30) -> None:
# ``full_horizon_steps`` is the C++-side DEFAULT_T_HORIZON used to
# rank ``diff_mppi_3`` against the ``early*`` variants.
self.full_horizon_steps = full_horizon_steps

def recommend(
self,
rows: Sequence[AggregateBenchmarkRow],
request: HorizonSelectionRequest,
) -> HorizonRecommendation:
candidates = [
row
for row in rows
if row.dataset == request.dataset
and row.scenario == request.scenario
and parse_grad_update_horizon(row.planner) is not None
]
if not candidates:
raise ValueError(
f"No gradient-horizon candidates for "
f"{request.dataset}/{request.scenario}"
)

def horizon_of(row: AggregateBenchmarkRow) -> int:
return _effective_horizon(row.planner, self.full_horizon_steps)

sufficient = [
row for row in candidates
if row.success >= request.success_threshold
]

if sufficient and request.prefer_minimal:
best = min(
sufficient,
key=lambda r: (horizon_of(r), r.final_distance, r.planner),
)
rationale = (
f"smallest horizon with success >= "
f"{request.success_threshold:.3f} "
f"({len(sufficient)} of {len(candidates)} candidates met)"
)
score = best.success
else:
if request.fallback_metric == "final_distance":
best = min(
candidates,
key=lambda r: (r.final_distance, horizon_of(r), r.planner),
)
score = -best.final_distance
rationale = (
"no candidate met success threshold; ranked by "
"final_distance"
)
elif request.fallback_metric == "cumulative_cost":
best = min(
candidates,
key=lambda r: (r.cumulative_cost, horizon_of(r), r.planner),
)
score = -best.cumulative_cost
rationale = (
"no candidate met success threshold; ranked by "
"cumulative_cost"
)
else:
raise ValueError(
f"Unsupported fallback_metric: {request.fallback_metric}"
)

recommended_horizon = horizon_of(best)
return HorizonRecommendation(
variant=self.name,
dataset=request.dataset,
scenario=request.scenario,
planner=best.planner,
grad_update_horizon=recommended_horizon,
success_rate=best.success,
final_distance=best.final_distance,
score=score,
rationale=rationale,
)
Loading
Loading