From 57dc081ba8b971a109034c813cc7f0b33f2b3a4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrew=20Kour=C3=A9?= Date: Sun, 15 Feb 2026 03:06:16 +0000 Subject: [PATCH 1/2] Added docstrings to S3 --- server/chalicelib/s3.py | 98 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/server/chalicelib/s3.py b/server/chalicelib/s3.py index 18ac3feb..0c472341 100644 --- a/server/chalicelib/s3.py +++ b/server/chalicelib/s3.py @@ -1,3 +1,5 @@ +"""S3 data access layer for downloading and uploading transit event data.""" + from datetime import date import boto3 import botocore @@ -16,6 +18,16 @@ # General downloading/uploading def download(key, encoding="utf8", compressed=True): + """Download and decode a file from the S3 performance data bucket. + + Args: + key: S3 object key to download. + encoding: Character encoding for decoding. (Default value = "utf8") + compressed: Whether the file is gzip/zlib compressed. (Default value = True) + + Returns: + str: The decoded file contents as a string. + """ obj = s3.get_object(Bucket=BUCKET, Key=key) s3_data = obj["Body"].read() if not compressed: @@ -26,24 +38,63 @@ def download(key, encoding="utf8", compressed=True): def upload(key, bytes, compress=True): + """Upload data to the S3 performance data bucket. + + Args: + key: S3 object key to write to. + bytes: Raw bytes to upload. + compress: Whether to zlib-compress before uploading. (Default value = True) + """ if compress: bytes = zlib.compress(bytes) s3.put_object(Bucket=BUCKET, Key=key, Body=bytes) def is_bus(stop_id: str): + """Check if a stop ID belongs to a bus route based on its naming convention. + + Args: + stop_id: str: The stop identifier. + + Returns: + bool: True if the stop ID matches bus stop patterns ("-0-" or "-1-"). + """ return ("-0-" in stop_id) or ("-1-" in stop_id) def is_cr(stop_id: str): + """Check if a stop ID belongs to a commuter rail route. + + Args: + stop_id: str: The stop identifier. + + Returns: + bool: True if the stop ID starts with "CR-". + """ return stop_id.startswith("CR-") def is_ferry(stop_id: str): + """Check if a stop ID belongs to a ferry route. + + Args: + stop_id: str: The stop identifier. + + Returns: + bool: True if the stop ID starts with "Boat-". + """ return stop_id.startswith("Boat-") def get_gobble_folder(stop_id: str): + """Return the S3 folder name for Gobble-ingested data based on the stop's mode. + + Args: + stop_id: str: The stop identifier. + + Returns: + str: One of "daily-bus-data", "daily-cr-data", or "daily-rapid-data". + """ if is_bus(stop_id): return "daily-bus-data" elif is_cr(stop_id): @@ -53,11 +104,26 @@ def get_gobble_folder(stop_id: str): def get_lamp_folder(): + """Return the S3 folder name for LAMP-ingested data.""" return "daily-data" def download_one_event_file(date: pd.Timestamp, stop_id: str, use_gobble=False, route_context=None): - """As advertised: single event file from s3""" + """Download and parse a single day's event CSV for a stop from S3. + + Selects the appropriate S3 path based on the stop's mode (bus, CR, ferry, rapid + transit) and data recency (monthly archives vs. daily LAMP/Gobble feeds). Falls + back to Gobble data if the LAMP key is not found. + + Args: + date: pd.Timestamp: The date to fetch events for. + stop_id: str: The stop identifier. + use_gobble: Force using Gobble data instead of LAMP. (Default value = False) + route_context: Unused, reserved for future use. (Default value = None) + + Returns: + list[dict]: Rows of event data sorted by event_time, or empty list if unavailable. + """ year, month, day = date.year, date.month, date.day if is_cr(stop_id): @@ -105,11 +171,32 @@ def download_one_event_file(date: pd.Timestamp, stop_id: str, use_gobble=False, @parallel.make_parallel def parallel_download_events(datestop: itertools.product): + """Download event data for a single (date, stop) pair. Parallelized via @make_parallel. + + Args: + datestop: A (date, stop_id) tuple from an itertools.product iterator. + + Returns: + list[dict]: Event rows for the given date and stop. + """ (date, stop) = datestop return download_one_event_file(date, stop) def download_events(start_date: date, end_date: date, stops: list): + """Download event data for multiple stops over a date range, in parallel. + + Fetches all combinations of dates and stops, then filters to the exact date range + and sorts by event time. + + Args: + start_date: date: Start of the date range (inclusive). + end_date: date: End of the date range (inclusive). + stops: list: List of stop IDs to fetch data for. + + Returns: + list[dict]: All event rows sorted by event_time. + """ datestops = itertools.product(parallel.s3_date_range(start_date, end_date, stops), stops) result = parallel_download_events(datestops) result = filter( @@ -119,6 +206,15 @@ def download_events(start_date: date, end_date: date, stops: list): def get_all_s3_objects(s3, **base_kwargs): + """Paginate through all objects in an S3 bucket/prefix, yielding each object metadata. + + Args: + s3: A boto3 S3 client. + **base_kwargs: Arguments passed to s3.list_objects_v2 (e.g., Bucket, Prefix). + + Yields: + dict: Individual object metadata dicts from the S3 listing. + """ continuation_token = None while True: list_kwargs = dict(MaxKeys=1000, **base_kwargs) From a1c76bef32c832678c5ddf9cc49f5f8a7b9701c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrew=20Kour=C3=A9?= Date: Sun, 15 Feb 2026 03:07:07 +0000 Subject: [PATCH 2/2] added docstrings to aggregation --- server/chalicelib/aggregation.py | 129 +++++++++++++++++++++++++++++-- 1 file changed, 123 insertions(+), 6 deletions(-) diff --git a/server/chalicelib/aggregation.py b/server/chalicelib/aggregation.py index 274b7dbb..5c3d2d2a 100644 --- a/server/chalicelib/aggregation.py +++ b/server/chalicelib/aggregation.py @@ -1,3 +1,5 @@ +"""Time-series aggregation of travel time, headway, and dwell data over date ranges.""" + import datetime from chalicelib import data_funcs import pandas as pd @@ -14,16 +16,15 @@ def add_holidays( holiday_col_name: str = "holiday", service_date_col_name: str = "service_date", ) -> pd.DataFrame: - """ - This function adds a boolean marker for whether or not the specified date is a holiday to a Pandas DataFrame in place. + """Add a boolean column indicating whether each row's date is a US federal holiday. Args: - df (DataFrame): The first parameter, an integer value. - holiday (str): The field name for the holiday column. - service_date_col_name (str): The field name for the service_date column, this is also the date value to check if it is a holiday. + df: DataFrame containing a date column to check against holidays. + holiday_col_name: Name of the boolean column to add. Defaults to "holiday". + service_date_col_name: Name of the existing date column to check. Defaults to "service_date". Returns: - DataFrame: Returns the modified dataframe. + The input DataFrame with an added boolean holiday column. """ # Handle empty DataFrame or missing dates if df.empty or df[service_date_col_name].isna().all(): @@ -39,6 +40,17 @@ def add_holidays( def train_peak_status(df: pd.DataFrame): + """Classify each trip as AM peak, PM peak, or off-peak based on departure time. + + Peak hours are defined as non-holiday weekdays 6:30-9:00 AM (am_peak) and + 3:30-6:30 PM (pm_peak). All other times are labeled off_peak. + + Args: + df: DataFrame with "dep_time" and "weekday" columns. + + Returns: + The input DataFrame with added "holiday", "is_peak_day", and "peak" columns. + """ df = add_holidays(df) # Peak Hours: non-holiday weekdays 6:30-9am; 3:30-6:30pm @@ -54,6 +66,18 @@ def train_peak_status(df: pd.DataFrame): def faster_describe(grouped: DataFrameGroupBy): + """Compute descriptive statistics for a grouped DataFrame, optimized for speed. + + Equivalent to pandas DataFrame.describe() but up to 25x faster. Computes count, + mean, min, median (as "50%"), max, sum, std (population), 25th, and 75th percentiles. + Filters out groups with 4 or fewer observations to reduce outlier noise. + + Args: + grouped: A grouped single-column DataFrame to compute statistics on. + + Returns: + DataFrame with descriptive statistics per group, excluding groups with count <= 4. + """ # This does the same thing as pandas.DataFrame.describe(), but is up to 25x faster! # also, we can specify population std instead of sample. stats = grouped.aggregate(["count", "mean", "min", "median", "max", "sum"]) @@ -81,6 +105,20 @@ def faster_describe(grouped: DataFrameGroupBy): def aggregate_traveltime_data(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops): + """Fetch and prepare travel time data for aggregation. + + Retrieves raw travel time records, converts them to a DataFrame, and enriches + with service date, weekday, and peak status columns. + + Args: + start_date: Start of the date range (inclusive). + end_date: End of the date range (inclusive). + from_stops: Origin stop ID(s) for the travel time query. + to_stops: Destination stop ID(s) for the travel time query. + + Returns: + Enriched DataFrame with travel time data, or None if no data is available. + """ all_data = data_funcs.travel_times(start_date, from_stops, to_stops, end_date) if not all_data: return None @@ -100,6 +138,18 @@ def aggregate_traveltime_data(start_date: datetime.date, end_date: datetime.date def calc_travel_times_by_time(df: pd.DataFrame): + """Aggregate travel times into 30-minute time-of-day buckets, split by peak day status. + + Converts departure times to epoch-relative datetimes for resampling, then computes + descriptive statistics for each 30-minute interval. + + Args: + df: DataFrame from aggregate_traveltime_data with "dep_time", "is_peak_day", + and "travel_time_sec" columns. + + Returns: + DataFrame with travel time statistics per 30-minute bucket and peak day status. + """ # convert time of day to a consistent datetime relative to epoch timedeltas = pd.to_timedelta(df["dep_time"].astype(str)) timedeltas.loc[timedeltas < SERVICE_HR_OFFSET] += datetime.timedelta(days=1) @@ -112,6 +162,19 @@ def calc_travel_times_by_time(df: pd.DataFrame): def calc_travel_times_by_date(df: pd.DataFrame): + """Aggregate travel times by service date, with overall and per-peak-period breakdowns. + + Computes descriptive statistics grouped by service date for all trips combined + (peak="all") and separately by peak status (am_peak, pm_peak, off_peak). + Adds holiday and weekend flags to the results. + + Args: + df: DataFrame from aggregate_traveltime_data with "service_date", "peak", + and "travel_time_sec" columns. + + Returns: + DataFrame with travel time statistics per service date and peak period. + """ # get summary stats summary_stats = faster_describe(df.groupby("service_date")["travel_time_sec"]) summary_stats["peak"] = "all" @@ -132,6 +195,18 @@ def calc_travel_times_by_date(df: pd.DataFrame): def travel_times_all(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops): + """Return travel time aggregations both by date and by time of day. + + Args: + start_date: Start of the date range (inclusive). + end_date: End of the date range (inclusive). + from_stops: Origin stop ID(s). + to_stops: Destination stop ID(s). + + Returns: + Dict with "by_date" and "by_time" keys, each containing a list of records. + Returns empty lists for both if no data is available. + """ df = aggregate_traveltime_data(start_date, end_date, from_stops, to_stops) if df is None: return {"by_date": [], "by_time": []} @@ -145,6 +220,19 @@ def travel_times_all(start_date: datetime.date, end_date: datetime.date, from_st def travel_times_over_time(start_date: datetime.date, end_date: datetime.date, from_stops, to_stops): + """Return travel time statistics by date for all peak periods combined. + + Legacy endpoint that returns only the by-date aggregation filtered to peak="all". + + Args: + start_date: Start of the date range (inclusive). + end_date: End of the date range (inclusive). + from_stops: Origin stop ID(s). + to_stops: Destination stop ID(s). + + Returns: + List of record dicts with daily travel time statistics, or empty list if no data. + """ df = aggregate_traveltime_data(start_date, end_date, from_stops, to_stops) if df is None: return [] @@ -156,6 +244,21 @@ def travel_times_over_time(start_date: datetime.date, end_date: datetime.date, f # HEADWAYS #################### def headways_over_time(start_date: datetime.date, end_date: datetime.date, stops): + """Compute daily headway statistics with bunching and on-time metrics. + + Fetches headway data, computes descriptive statistics by service date, and + calculates bunched (headway ratio <= 0.5) and on-time (ratio between 0.75 + and 1.25) trip counts relative to benchmark headways. Results are filtered + to the "all" peak category and include holiday/weekend flags. + + Args: + start_date: Start of the date range (inclusive). + end_date: End of the date range (inclusive). + stops: Stop ID(s) to query headways for. + + Returns: + List of record dicts with daily headway statistics, or empty list if no data. + """ all_data = data_funcs.headways(start_date, stops, end_date) if not all_data: return [] @@ -209,6 +312,20 @@ def headways_over_time(start_date: datetime.date, end_date: datetime.date, stops def dwells_over_time(start_date: str | datetime.date, end_date: str | datetime.date, stops): + """Compute daily dwell time statistics over a date range. + + Fetches dwell time data, computes descriptive statistics by service date for + all trips and by peak period, then returns results filtered to the "all" + peak category with holiday and weekend flags. + + Args: + start_date: Start of the date range (inclusive). + end_date: End of the date range (inclusive). + stops: Stop ID(s) to query dwell times for. + + Returns: + List of record dicts with daily dwell time statistics, or empty list if no data. + """ all_data = data_funcs.dwells(start_date, stops, end_date) if not all_data: return []