Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 123 additions & 6 deletions server/chalicelib/aggregation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Time-series aggregation of travel time, headway, and dwell data over date ranges."""

import datetime
from chalicelib import data_funcs
import pandas as pd
Expand All @@ -14,16 +16,15 @@ def add_holidays(
holiday_col_name: str = "holiday",
service_date_col_name: str = "service_date",
) -> pd.DataFrame:
"""
This function adds a boolean marker for whether or not the specified date is a holiday to a Pandas DataFrame in place.
"""Add a boolean column indicating whether each row's date is a US federal holiday.

Args:
df (DataFrame): The first parameter, an integer value.
holiday (str): The field name for the holiday column.
service_date_col_name (str): The field name for the service_date column, this is also the date value to check if it is a holiday.
df: DataFrame containing a date column to check against holidays.
holiday_col_name: Name of the boolean column to add. Defaults to "holiday".
service_date_col_name: Name of the existing date column to check. Defaults to "service_date".

Returns:
DataFrame: Returns the modified dataframe.
The input DataFrame with an added boolean holiday column.
"""
# Handle empty DataFrame or missing dates
if df.empty or df[service_date_col_name].isna().all():
Expand All @@ -39,6 +40,17 @@ def add_holidays(


def train_peak_status(df: pd.DataFrame):
"""Classify each trip as AM peak, PM peak, or off-peak based on departure time.

Peak hours are defined as non-holiday weekdays 6:30-9:00 AM (am_peak) and
3:30-6:30 PM (pm_peak). All other times are labeled off_peak.

Args:
df: DataFrame with "dep_time" and "weekday" columns.

Returns:
The input DataFrame with added "holiday", "is_peak_day", and "peak" columns.
"""
df = add_holidays(df)

# Peak Hours: non-holiday weekdays 6:30-9am; 3:30-6:30pm
Expand All @@ -54,6 +66,18 @@ def train_peak_status(df: pd.DataFrame):


def faster_describe(grouped: DataFrameGroupBy):
"""Compute descriptive statistics for a grouped DataFrame, optimized for speed.

Equivalent to pandas DataFrame.describe() but up to 25x faster. Computes count,
mean, min, median (as "50%"), max, sum, std (population), 25th, and 75th percentiles.
Filters out groups with 4 or fewer observations to reduce outlier noise.

Args:
grouped: A grouped single-column DataFrame to compute statistics on.

Returns:
DataFrame with descriptive statistics per group, excluding groups with count <= 4.
"""
# This does the same thing as pandas.DataFrame.describe(), but is up to 25x faster!
# also, we can specify population std instead of sample.
stats = grouped.aggregate(["count", "mean", "min", "median", "max", "sum"])
Expand Down Expand Up @@ -81,6 +105,20 @@ def faster_describe(grouped: DataFrameGroupBy):


def aggregate_traveltime_data(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops):
"""Fetch and prepare travel time data for aggregation.

Retrieves raw travel time records, converts them to a DataFrame, and enriches
with service date, weekday, and peak status columns.

Args:
start_date: Start of the date range (inclusive).
end_date: End of the date range (inclusive).
from_stops: Origin stop ID(s) for the travel time query.
to_stops: Destination stop ID(s) for the travel time query.

Returns:
Enriched DataFrame with travel time data, or None if no data is available.
"""
all_data = data_funcs.travel_times(start_date, from_stops, to_stops, end_date)
if not all_data:
return None
Expand All @@ -100,6 +138,18 @@ def aggregate_traveltime_data(start_date: datetime.date, end_date: datetime.date


def calc_travel_times_by_time(df: pd.DataFrame):
"""Aggregate travel times into 30-minute time-of-day buckets, split by peak day status.

Converts departure times to epoch-relative datetimes for resampling, then computes
descriptive statistics for each 30-minute interval.

Args:
df: DataFrame from aggregate_traveltime_data with "dep_time", "is_peak_day",
and "travel_time_sec" columns.

Returns:
DataFrame with travel time statistics per 30-minute bucket and peak day status.
"""
# convert time of day to a consistent datetime relative to epoch
timedeltas = pd.to_timedelta(df["dep_time"].astype(str))
timedeltas.loc[timedeltas < SERVICE_HR_OFFSET] += datetime.timedelta(days=1)
Expand All @@ -112,6 +162,19 @@ def calc_travel_times_by_time(df: pd.DataFrame):


def calc_travel_times_by_date(df: pd.DataFrame):
"""Aggregate travel times by service date, with overall and per-peak-period breakdowns.

Computes descriptive statistics grouped by service date for all trips combined
(peak="all") and separately by peak status (am_peak, pm_peak, off_peak).
Adds holiday and weekend flags to the results.

Args:
df: DataFrame from aggregate_traveltime_data with "service_date", "peak",
and "travel_time_sec" columns.

Returns:
DataFrame with travel time statistics per service date and peak period.
"""
# get summary stats
summary_stats = faster_describe(df.groupby("service_date")["travel_time_sec"])
summary_stats["peak"] = "all"
Expand All @@ -132,6 +195,18 @@ def calc_travel_times_by_date(df: pd.DataFrame):


def travel_times_all(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops):
"""Return travel time aggregations both by date and by time of day.

Args:
start_date: Start of the date range (inclusive).
end_date: End of the date range (inclusive).
from_stops: Origin stop ID(s).
to_stops: Destination stop ID(s).

Returns:
Dict with "by_date" and "by_time" keys, each containing a list of records.
Returns empty lists for both if no data is available.
"""
df = aggregate_traveltime_data(start_date, end_date, from_stops, to_stops)
if df is None:
return {"by_date": [], "by_time": []}
Expand All @@ -145,6 +220,19 @@ def travel_times_all(start_date: datetime.date, end_date: datetime.date, from_st


def travel_times_over_time(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops):
"""Return travel time statistics by date for all peak periods combined.

Legacy endpoint that returns only the by-date aggregation filtered to peak="all".

Args:
start_date: Start of the date range (inclusive).
end_date: End of the date range (inclusive).
from_stops: Origin stop ID(s).
to_stops: Destination stop ID(s).

Returns:
List of record dicts with daily travel time statistics, or empty list if no data.
"""
df = aggregate_traveltime_data(start_date, end_date, from_stops, to_stops)
if df is None:
return []
Expand All @@ -156,6 +244,21 @@ def travel_times_over_time(start_date: datetime.date, end_date: datetime.date, f
# HEADWAYS
####################
def headways_over_time(start_date: datetime.date, end_date: datetime.date, stops):
"""Compute daily headway statistics with bunching and on-time metrics.

Fetches headway data, computes descriptive statistics by service date, and
calculates bunched (headway ratio <= 0.5) and on-time (ratio between 0.75
and 1.25) trip counts relative to benchmark headways. Results are filtered
to the "all" peak category and include holiday/weekend flags.

Args:
start_date: Start of the date range (inclusive).
end_date: End of the date range (inclusive).
stops: Stop ID(s) to query headways for.

Returns:
List of record dicts with daily headway statistics, or empty list if no data.
"""
all_data = data_funcs.headways(start_date, stops, end_date)
if not all_data:
return []
Expand Down Expand Up @@ -209,6 +312,20 @@ def headways_over_time(start_date: datetime.date, end_date: datetime.date, stops


def dwells_over_time(start_date: str | datetime.date, end_date: str | datetime.date, stops):
"""Compute daily dwell time statistics over a date range.

Fetches dwell time data, computes descriptive statistics by service date for
all trips and by peak period, then returns results filtered to the "all"
peak category with holiday and weekend flags.

Args:
start_date: Start of the date range (inclusive).
end_date: End of the date range (inclusive).
stops: Stop ID(s) to query dwell times for.

Returns:
List of record dicts with daily dwell time statistics, or empty list if no data.
"""
all_data = data_funcs.dwells(start_date, stops, end_date)
if not all_data:
return []
Expand Down
98 changes: 97 additions & 1 deletion server/chalicelib/s3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""S3 data access layer for downloading and uploading transit event data."""

from datetime import date
import boto3
import botocore
Expand All @@ -16,6 +18,16 @@

# General downloading/uploading
def download(key, encoding="utf8", compressed=True):
"""Download and decode a file from the S3 performance data bucket.

Args:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file may have been committed across 2 branches?

key: S3 object key to download.
encoding: Character encoding for decoding. (Default value = "utf8")
compressed: Whether the file is gzip/zlib compressed. (Default value = True)

Returns:
str: The decoded file contents as a string.
"""
obj = s3.get_object(Bucket=BUCKET, Key=key)
s3_data = obj["Body"].read()
if not compressed:
Expand All @@ -26,24 +38,63 @@ def download(key, encoding="utf8", compressed=True):


def upload(key, bytes, compress=True):
"""Upload data to the S3 performance data bucket.

Args:
key: S3 object key to write to.
bytes: Raw bytes to upload.
compress: Whether to zlib-compress before uploading. (Default value = True)
"""
if compress:
bytes = zlib.compress(bytes)
s3.put_object(Bucket=BUCKET, Key=key, Body=bytes)


def is_bus(stop_id: str):
"""Check if a stop ID belongs to a bus route based on its naming convention.

Args:
stop_id: str: The stop identifier.

Returns:
bool: True if the stop ID matches bus stop patterns ("-0-" or "-1-").
"""
return ("-0-" in stop_id) or ("-1-" in stop_id)


def is_cr(stop_id: str):
"""Check if a stop ID belongs to a commuter rail route.

Args:
stop_id: str: The stop identifier.

Returns:
bool: True if the stop ID starts with "CR-".
"""
return stop_id.startswith("CR-")


def is_ferry(stop_id: str):
"""Check if a stop ID belongs to a ferry route.

Args:
stop_id: str: The stop identifier.

Returns:
bool: True if the stop ID starts with "Boat-".
"""
return stop_id.startswith("Boat-")


def get_gobble_folder(stop_id: str):
"""Return the S3 folder name for Gobble-ingested data based on the stop's mode.

Args:
stop_id: str: The stop identifier.

Returns:
str: One of "daily-bus-data", "daily-cr-data", or "daily-rapid-data".
"""
if is_bus(stop_id):
return "daily-bus-data"
elif is_cr(stop_id):
Expand All @@ -53,11 +104,26 @@ def get_gobble_folder(stop_id: str):


def get_lamp_folder():
"""Return the S3 folder name for LAMP-ingested data."""
return "daily-data"


def download_one_event_file(date: pd.Timestamp, stop_id: str, use_gobble=False, route_context=None):
"""As advertised: single event file from s3"""
"""Download and parse a single day's event CSV for a stop from S3.

Selects the appropriate S3 path based on the stop's mode (bus, CR, ferry, rapid
transit) and data recency (monthly archives vs. daily LAMP/Gobble feeds). Falls
back to Gobble data if the LAMP key is not found.

Args:
date: pd.Timestamp: The date to fetch events for.
stop_id: str: The stop identifier.
use_gobble: Force using Gobble data instead of LAMP. (Default value = False)
route_context: Unused, reserved for future use. (Default value = None)

Returns:
list[dict]: Rows of event data sorted by event_time, or empty list if unavailable.
"""
year, month, day = date.year, date.month, date.day

if is_cr(stop_id):
Expand Down Expand Up @@ -105,11 +171,32 @@ def download_one_event_file(date: pd.Timestamp, stop_id: str, use_gobble=False,

@parallel.make_parallel
def parallel_download_events(datestop: itertools.product):
"""Download event data for a single (date, stop) pair. Parallelized via @make_parallel.

Args:
datestop: A (date, stop_id) tuple from an itertools.product iterator.

Returns:
list[dict]: Event rows for the given date and stop.
"""
(date, stop) = datestop
return download_one_event_file(date, stop)


def download_events(start_date: date, end_date: date, stops: list):
"""Download event data for multiple stops over a date range, in parallel.

Fetches all combinations of dates and stops, then filters to the exact date range
and sorts by event time.

Args:
start_date: date: Start of the date range (inclusive).
end_date: date: End of the date range (inclusive).
stops: list: List of stop IDs to fetch data for.

Returns:
list[dict]: All event rows sorted by event_time.
"""
datestops = itertools.product(parallel.s3_date_range(start_date, end_date, stops), stops)
result = parallel_download_events(datestops)
result = filter(
Expand All @@ -119,6 +206,15 @@ def download_events(start_date: date, end_date: date, stops: list):


def get_all_s3_objects(s3, **base_kwargs):
"""Paginate through all objects in an S3 bucket/prefix, yielding each object metadata.

Args:
s3: A boto3 S3 client.
**base_kwargs: Arguments passed to s3.list_objects_v2 (e.g., Bucket, Prefix).

Yields:
dict: Individual object metadata dicts from the S3 listing.
"""
continuation_token = None
while True:
list_kwargs = dict(MaxKeys=1000, **base_kwargs)
Expand Down
Loading