Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 275 additions & 13 deletions mbta-performance/chalicelib/lamp/tests/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ def setUp(self):
)
self.mock_gtfs_data["direction_id"] = self.mock_gtfs_data["direction_id"].astype("int16")

def _mock_s3_upload(self):
# mock upload of s3.upload_df_as_csv() to a fake bucket
pass

def test__process_arrival_departure_times(self):
pq_df_before = pd.read_parquet(
io.BytesIO(self.data),
Expand Down Expand Up @@ -348,28 +344,222 @@ def test_event_time_sorting(self):
sorted_event_times = sorted(event_times)
self.assertEqual(event_times, sorted_event_times, "Output should be sorted by event_time")

# --- _derive_lamp_branch_route_id ---

def test__derive_lamp_branch_route_id_ashmont(self):
"""Red Line trips that visit Ashmont-branch stops should get 'Red-A' as branch_route_id."""
df = pd.DataFrame(
{
"trip_id": ["trip_a", "trip_a", "trip_a"],
"route_id": ["Red", "Red", "Red"],
# trunk stop + a stop unique to the Ashmont branch
"stop_id": ["70061", "70063", "70085"],
}
)
result = ingest._derive_lamp_branch_route_id(df)
branch_ids = result[result["trip_id"] == "trip_a"]["branch_route_id"].unique()
self.assertIn("Red-A", branch_ids)
self.assertNotIn("Red-B", branch_ids)

def test__derive_lamp_branch_route_id_braintree(self):
"""Red Line trips that visit Braintree-branch stops should get 'Red-B' as branch_route_id."""
df = pd.DataFrame(
{
"trip_id": ["trip_b", "trip_b", "trip_b"],
"route_id": ["Red", "Red", "Red"],
# trunk stop + a stop unique to the Braintree branch
"stop_id": ["70061", "70063", "70095"],
}
)
result = ingest._derive_lamp_branch_route_id(df)
branch_ids = result[result["trip_id"] == "trip_b"]["branch_route_id"].unique()
self.assertIn("Red-B", branch_ids)
self.assertNotIn("Red-A", branch_ids)

def test__derive_lamp_branch_route_id_trunk_only(self):
"""Red Line trips visiting only trunk stops (no branch-specific stops) keep 'Red'."""
df = pd.DataFrame(
{
"trip_id": ["trip_trunk", "trip_trunk"],
"route_id": ["Red", "Red"],
"stop_id": ["70061", "70063"], # trunk-only stops
}
)
result = ingest._derive_lamp_branch_route_id(df)
branch_ids = result[result["trip_id"] == "trip_trunk"]["branch_route_id"].unique()
self.assertListEqual(list(branch_ids), ["Red"])

def test__derive_lamp_branch_route_id_non_red_line(self):
"""Non-Red Line trips should use route_id unchanged as branch_route_id."""
df = pd.DataFrame(
{
"trip_id": ["trip_o", "trip_o"],
"route_id": ["Orange", "Orange"],
"stop_id": ["70001", "70003"],
}
)
result = ingest._derive_lamp_branch_route_id(df)
branch_ids = result[result["trip_id"] == "trip_o"]["branch_route_id"].unique()
self.assertListEqual(list(branch_ids), ["Orange"])

def test__derive_lamp_branch_route_id_green_line_unchanged(self):
"""Green Line sub-routes already have distinct route_ids and should pass through unchanged."""
df = pd.DataFrame(
{
"trip_id": ["trip_gb", "trip_gb"],
"route_id": ["Green-B", "Green-B"],
"stop_id": ["70107", "70110"],
}
)
result = ingest._derive_lamp_branch_route_id(df)
branch_ids = result[result["trip_id"] == "trip_gb"]["branch_route_id"].unique()
self.assertListEqual(list(branch_ids), ["Green-B"])

# --- Helpers for pipeline tests ---

def _make_minimal_lamp_df(self, service_date_int, trip_stop_pairs, base_ts=1707825600):
"""
Build a minimal LAMP-style DataFrame for use in ingest_pq_file tests.

trip_stop_pairs: list of (trip_id, stop_id, vehicle_id) tuples.
base_ts: base Unix timestamp (default: 2024-02-13 noon UTC).
"""
rows = []
for seq, (trip_id, stop_id, vehicle_id) in enumerate(trip_stop_pairs, start=1):
rows.append(
{
"service_date": service_date_int,
"route_id": "Orange",
"trip_id": trip_id,
"stop_id": stop_id,
"direction_id": 0,
"stop_sequence": seq,
"vehicle_id": vehicle_id,
"vehicle_label": "1201",
"move_timestamp": base_ts + seq * 100,
"stop_timestamp": base_ts + 50 + seq * 100,
"travel_time_seconds": 60,
"dwell_time_seconds": 30,
"headway_trunk_seconds": 300,
"headway_branch_seconds": 300,
"scheduled_travel_time": 55,
"scheduled_headway_trunk": 290,
"scheduled_headway_branch": 290,
"vehicle_consist": "",
}
)
return pd.DataFrame(rows).convert_dtypes(dtype_backend="numpy_nullable")

# --- NONREV trip filtering ---

def test_nonrev_trip_filtering_before_cutoff(self):
"""NONREV- trips with a service_date before Dec 2023 should be dropped from the output."""
# Nov 1, 2023 noon UTC = 8 AM Eastern Daylight Time (well within service day)
base_ts = 1698840000
test_data = self._make_minimal_lamp_df(
service_date_int=20231101,
trip_stop_pairs=[
("NONREV-12345", "70001", "O-001"),
("regular-trip", "70003", "O-002"),
],
base_ts=base_ts,
)

with mock.patch("chalicelib.lamp.ingest.fetch_stop_times_from_gtfs", return_value=self.mock_gtfs_data):
result = ingest.ingest_pq_file(test_data, date(2023, 11, 1))

self.assertFalse(
any(result["trip_id"].str.startswith("NONREV-")),
"NONREV- trips before the cutoff date should not appear in the output",
)

def test_nonrev_trip_not_filtered_after_cutoff(self):
"""NONREV- trips with a service_date on or after Dec 2023 should be retained."""
# Feb 13, 2024 noon UTC = 8 AM ET
base_ts = 1707825600
test_data = self._make_minimal_lamp_df(
service_date_int=20240213,
trip_stop_pairs=[
("NONREV-99999", "70001", "O-001"),
("NONREV-88888", "70003", "O-002"),
],
base_ts=base_ts,
)

with mock.patch("chalicelib.lamp.ingest.fetch_stop_times_from_gtfs", return_value=self.mock_gtfs_data):
result = ingest.ingest_pq_file(test_data, date(2024, 2, 13))

nonrev_events = result[result["trip_id"].str.startswith("NONREV-")]
self.assertFalse(
nonrev_events.empty,
"NONREV- trips after the cutoff date should be retained in the output",
)

# --- Stop ID alias mapping ---

def test_stop_id_alias_mapping(self):
"""Aliased stop IDs (e.g. 'Alewife-01') should be replaced with the canonical numeric ID."""
base_ts = 1707825600 # Feb 13, 2024 noon UTC
# Two-stop trip: Alewife-01 (alias) → 70063 (canonical)
# Using the same vehicle_id so the merge_asof in _process_arrival_departure_times works
test_data = pd.DataFrame(
{
"service_date": [20240213, 20240213],
"route_id": ["Red", "Red"],
"trip_id": ["trip1", "trip1"],
"stop_id": ["Alewife-01", "70063"],
"direction_id": [0, 0],
"stop_sequence": [1, 2],
"vehicle_id": ["R-001", "R-001"],
"vehicle_label": ["1801", "1801"],
"move_timestamp": [base_ts, base_ts + 100],
"stop_timestamp": [base_ts + 50, base_ts + 150],
"travel_time_seconds": [60, 60],
"dwell_time_seconds": [30, 30],
"headway_trunk_seconds": [300, 300],
"headway_branch_seconds": [300, 300],
"scheduled_travel_time": [55, 55],
"scheduled_headway_trunk": [290, 290],
"scheduled_headway_branch": [290, 290],
"vehicle_consist": ["", ""],
}
).convert_dtypes(dtype_backend="numpy_nullable")

with mock.patch("chalicelib.lamp.ingest.fetch_stop_times_from_gtfs", return_value=self.mock_gtfs_data):
result = ingest.ingest_pq_file(test_data, date(2024, 2, 13))

self.assertFalse(
any(result["stop_id"] == "Alewife-01"),
"The aliased stop ID 'Alewife-01' should not appear in the output",
)
# The canonical replacement "70061" should be present
self.assertIn("70061", result["stop_id"].values)

def test_service_date_formatting(self):
"""Test that service_date is formatted correctly as string."""
# Use timestamps that fall within the 2024-02-07 service window (8–9 am ET)
# 1707310800 = 2024-02-07 13:00:00 UTC = 2024-02-07 08:00:00 ET
base_ts = 1707310800
test_data = pd.DataFrame(
{
"service_date": [20240207, 20240208],
"service_date": [20240207, 20240207],
"route_id": ["Red", "Red"],
"trip_id": ["trip1", "trip2"],
"stop_id": ["70061", "70061"],
"trip_id": ["trip1", "trip1"],
"stop_id": ["70061", "70063"],
"direction_id": [0, 0],
"stop_sequence": [1, 1],
"vehicle_id": ["R-001", "R-002"],
"vehicle_label": ["1801", "1802"],
"move_timestamp": [1707000000, 1707100000],
"stop_timestamp": [1707000050, 1707100050],
"stop_sequence": [1, 2],
"vehicle_id": ["R-001", "R-001"],
"vehicle_label": ["1801", "1801"],
"move_timestamp": [base_ts, base_ts + 100],
"stop_timestamp": [base_ts + 50, base_ts + 150],
"travel_time_seconds": [60] * 2,
"dwell_time_seconds": [30] * 2,
"headway_trunk_seconds": [300] * 2,
"headway_branch_seconds": [300] * 2,
"scheduled_travel_time": [55] * 2,
"scheduled_headway_trunk": [290] * 2,
"scheduled_headway_branch": [290] * 2,
"vehicle_consist": ["2-car"] * 2,
"vehicle_consist": ["", ""] * 1,
}
).convert_dtypes(dtype_backend="numpy_nullable")

Expand All @@ -378,6 +568,78 @@ def test_service_date_formatting(self):

# Check that service_date is formatted as YYYY-MM-DD strings
service_dates = result["service_date"].unique()
self.assertGreater(len(service_dates), 0, "Result should not be empty")
for sdate in service_dates:
self.assertIsInstance(sdate, str)
self.assertRegex(sdate, r"^\d{4}-\d{2}-\d{2}$", "service_date should be YYYY-MM-DD format")

# --- _derive_gtfs_branch_route_id ---

def test__derive_gtfs_branch_route_id_trunk_only(self):
"""Red Line GTFS trips visiting only trunk stops fall back to route_id."""
gtfs_stops = pd.DataFrame(
{
"trip_id": ["gtfs_trunk", "gtfs_trunk"],
"route_id": ["Red", "Red"],
"stop_id": ["70061", "70063"], # trunk-only stops
"direction_id": [0, 0],
"arrival_time": [28800.0, 28900.0],
}
)
result = ingest._derive_gtfs_branch_route_id(gtfs_stops)
branch_ids = result[result["trip_id"] == "gtfs_trunk"]["branch_route_id"].unique()
self.assertListEqual(list(branch_ids), ["Red"])

# --- upload_to_s3 error handling ---

def test_upload_to_s3_failure(self):
"""upload_to_s3 should re-raise when s3 upload fails."""
pq_df = pd.read_parquet(
io.BytesIO(self.data),
columns=constants.LAMP_COLUMNS,
engine="pyarrow",
dtype_backend="numpy_nullable",
)

with mock.patch("chalicelib.lamp.ingest.fetch_stop_times_from_gtfs", return_value=self.mock_gtfs_data):
processed_df = ingest.ingest_pq_file(pq_df, date(2024, 2, 7))

stop_groups = processed_df.groupby("stop_id")
stop_id, stop_events = next(iter(stop_groups))

with mock.patch("chalicelib.lamp.ingest.s3.upload_df_as_csv", side_effect=RuntimeError("S3 error")):
with self.assertRaises(RuntimeError):
ingest.upload_to_s3((stop_id, stop_events), date(2024, 2, 7))

# --- ingest_lamp_data error handling ---

def test_ingest_lamp_data_fetch_value_error(self):
"""ingest_lamp_data should return gracefully when fetch raises ValueError."""
with mock.patch("chalicelib.lamp.ingest.fetch_pq_file_from_remote", side_effect=ValueError("404")):
# Should not raise; just return early
ingest.ingest_lamp_data(date(2024, 2, 7))

def test_ingest_lamp_data_fetch_unexpected_error(self):
"""ingest_lamp_data should re-raise unexpected fetch exceptions."""
with mock.patch(
"chalicelib.lamp.ingest.fetch_pq_file_from_remote", side_effect=RuntimeError("network failure")
):
with self.assertRaises(RuntimeError):
ingest.ingest_lamp_data(date(2024, 2, 7))

def test_ingest_lamp_data_process_error(self):
"""ingest_lamp_data should re-raise when ingest_pq_file fails."""
mock_response = mock.Mock(status_code=200, content=self.data)
with mock.patch("requests.get", return_value=mock_response):
with mock.patch("chalicelib.lamp.ingest.ingest_pq_file", side_effect=RuntimeError("process error")):
with self.assertRaises(RuntimeError):
ingest.ingest_lamp_data(date(2024, 2, 7))

def test_ingest_lamp_data_upload_error(self):
"""ingest_lamp_data should re-raise when parallel upload fails."""
mock_response = mock.Mock(status_code=200, content=self.data)
with mock.patch("requests.get", return_value=mock_response):
with mock.patch("chalicelib.lamp.ingest.fetch_stop_times_from_gtfs", return_value=self.mock_gtfs_data):
with mock.patch("chalicelib.lamp.ingest._parallel_upload", side_effect=RuntimeError("upload error")):
with self.assertRaises(RuntimeError):
ingest.ingest_lamp_data(date(2024, 2, 7))