Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ingestor/chalicelib/ridership/arcgis.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ def ride_update_cache():


def download_latest_ridership_files() -> Tuple[str | None, str | None, str | None, str | None, str | None]:
"""Download the latest ridership files for all transit modes.

Fetches subway and bus data from SharePoint, and commuter rail, ferry,
and The RIDE data from ArcGIS.

Returns:
Tuple of file paths (subway, bus, commuter rail, ferry, The RIDE),
where each element may be None if the download failed.
"""
sharepoint = SharepointConnection()

cr_tmp_path = NamedTemporaryFile().name
Expand Down
6 changes: 6 additions & 0 deletions ingestor/chalicelib/ridership/dynamo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@


def ingest_ridership_to_dynamo(entries_by_line_id: Dict[str, List[Dict]]):
"""Batch write ridership entries to the DynamoDB Ridership table.

Args:
entries_by_line_id: Mapping of line IDs to lists of ridership entry dicts,
each containing 'date' (YYYY-MM-DD) and 'count' keys.
"""
dynamodb = boto3.resource("dynamodb")
Ridership = dynamodb.Table(DYNAMO_TABLE_NAME)
with Ridership.batch_writer() as batch:
Expand Down
5 changes: 5 additions & 0 deletions ingestor/chalicelib/ridership/gtfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@


def get_routes_by_line_id() -> Dict[str, Route]:
"""Fetch GTFS route data from S3 and group routes by their line ID.

Returns:
Mapping of line IDs to lists of Route objects from the latest GTFS feed.
"""
s3 = boto3.resource("s3")
archive = MbtaGtfsArchive(
local_archive_path=TemporaryDirectory().name,
Expand Down
20 changes: 20 additions & 0 deletions ingestor/chalicelib/ridership/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ def get_ridership_by_line_id(
ridership_by_route_id: Dict[str, Dict],
routes_by_line_id: Dict[str, Route],
):
"""Aggregate ridership data from route-level to line-level.

Sums ridership counts across all routes belonging to each line, grouping
by date. Handles Green Line branch aggregation and adds The RIDE as a
separate line entry.

Args:
ridership_by_route_id: Mapping of route IDs to lists of ridership
entry dicts with 'date' and 'count' keys.
routes_by_line_id: Mapping of line IDs to lists of Route objects.

Returns:
Mapping of line IDs to sorted lists of ridership entries with
summed counts per date.
"""
by_line_id = {}
# Track route_ids that are accounted for by subway/CR/ferry/RIDE
# so that "line-bus" captures everything else.
Expand Down Expand Up @@ -74,6 +89,11 @@ def get_ridership_by_line_id(


def ingest_ridership_data():
"""Run the full ridership ingestion pipeline.

Downloads the latest ridership files for all transit modes, processes
and aggregates them by line ID, and writes the results to DynamoDB.
"""
routes = get_routes_by_line_id()
cr_update_cache()
ferry_update_cache()
Expand Down
99 changes: 99 additions & 0 deletions ingestor/chalicelib/ridership/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ def pre_process_csv(
count_key: str,
route_name: str | None = None,
):
"""Pre-process a CSV file by aggregating daily ridership into weekly totals.

Reads the CSV, groups records by ISO year, week, and route, sums the count
values, and writes the result to a temporary CSV file.

Args:
path_to_csv_file: Path to the input CSV file.
date_key: Column name containing date values.
route_key: Column name containing route identifiers. If None and
route_name is provided, a 'Route' column is added with route_name.
count_key: Column name containing ridership counts.
route_name: Constant route name to assign when route_key is None.

Returns:
Path to a temporary CSV file containing the weekly aggregated data.
"""
if route_key is None and route_name is not None:
route_key = "Route"
df = pd.read_csv(path_to_csv_file, usecols=[date_key, count_key])
Expand Down Expand Up @@ -93,6 +109,21 @@ def format_ridership_csv(
count_key: str,
route_ids_map: Union[None, Dict[str, str]] = None,
):
"""Format a ridership CSV into a dict of weekly average peak-day counts by route.

Reads ridership data, filters to weekday non-holiday (peak) days, computes
weekly averages per route, and returns the results grouped by route ID.

Args:
path_to_csv_file: Path to the input CSV file.
date_key: Column name containing date values.
route_key: Column name containing route identifiers.
count_key: Column name containing ridership counts.
route_ids_map: Optional mapping from raw route names to canonical route IDs.

Returns:
Dict mapping route IDs to lists of dicts with 'date' and 'count' keys.
"""
# read data, convert to datetime
df = pd.read_csv(path_to_csv_file)
df[date_key] = pd.to_datetime(df[date_key])
Expand Down Expand Up @@ -141,6 +172,17 @@ def format_ridership_csv(


def format_subway_data(path_to_csv_file: str):
"""Format subway gated station validation data into weekly peak-day averages.

Reads the subway CSV, filters to peak weekdays, computes weekly average
validations per route/line, and maps route names to canonical IDs.

Args:
path_to_csv_file: Path to the subway ridership CSV file.

Returns:
Dict mapping route IDs to lists of dicts with 'date' and 'count' keys.
"""
# read data, convert to datetime
df = pd.read_csv(path_to_csv_file)
df["servicedate"] = pd.to_datetime(df["servicedate"])
Expand Down Expand Up @@ -192,6 +234,17 @@ def format_subway_data(path_to_csv_file: str):


def format_bus_data(path_to_excel_file: str):
"""Format bus ridership data from an Excel file into weekly counts by route.

Reads the 'Weekly by Route' sheet, normalizes column names, and groups
ridership data by route with mapped canonical route IDs.

Args:
path_to_excel_file: Path to the bus ridership Excel file.

Returns:
Dict mapping route IDs to lists of dicts with 'date' and 'count' keys.
"""
# read data - new format doesn't need skiprows
df = pd.read_excel(
path_to_excel_file,
Expand Down Expand Up @@ -231,6 +284,14 @@ def format_bus_data(path_to_excel_file: str):


def format_cr_data(path_to_ridershp_file: str):
"""Format commuter rail ridership data into weekly peak-day averages by line.

Args:
path_to_ridershp_file: Path to the commuter rail ridership CSV file.

Returns:
Dict mapping CR route IDs to lists of dicts with 'date' and 'count' keys.
"""
ridership_by_route = format_ridership_csv(
path_to_csv_file=path_to_ridershp_file,
date_key="servicedate",
Expand All @@ -242,6 +303,17 @@ def format_cr_data(path_to_ridershp_file: str):


def format_ferry_data(path_to_ridership_file: str):
"""Format ferry ridership data into weekly peak-day averages by route.

Pre-processes daily departure data into weekly aggregates, then formats
into peak-day averages with canonical ferry route IDs.

Args:
path_to_ridership_file: Path to the ferry ridership CSV file.

Returns:
Dict mapping ferry route IDs to lists of dicts with 'date' and 'count' keys.
"""
preprocess = pre_process_csv(
path_to_csv_file=path_to_ridership_file,
date_key="actual_departure",
Expand All @@ -259,6 +331,17 @@ def format_ferry_data(path_to_ridership_file: str):


def format_the_ride_data(path_to_ridership_file: str):
"""Format The RIDE paratransit ridership data into weekly peak-day averages.

Pre-processes daily completed trip data into weekly aggregates, then
formats into peak-day averages under a single 'RIDE' route.

Args:
path_to_ridership_file: Path to The RIDE ridership CSV file.

Returns:
Dict mapping 'RIDE' to a list of dicts with 'date' and 'count' keys.
"""
preprocess = pre_process_csv(
path_to_csv_file=path_to_ridership_file,
date_key="Date",
Expand All @@ -282,6 +365,22 @@ def get_ridership_by_route_id(
path_to_ferry_file: str | None,
path_to_ride_file: str | None,
):
"""Process all ridership files and merge into a single dict keyed by route ID.

Formats each transit mode's data file (if provided) and combines the results
into a unified mapping of route IDs to ridership entries.

Args:
path_to_subway_file: Path to the subway ridership CSV, or None to skip.
path_to_bus_file: Path to the bus ridership Excel file, or None to skip.
path_to_cr_file: Path to the commuter rail ridership CSV, or None to skip.
path_to_ferry_file: Path to the ferry ridership CSV, or None to skip.
path_to_ride_file: Path to The RIDE ridership CSV, or None to skip.

Returns:
Dict mapping route IDs to lists of dicts with 'date' and 'count' keys,
spanning all provided transit modes.
"""
subway = format_subway_data(path_to_subway_file) if path_to_subway_file else {}
bus = format_bus_data(path_to_bus_file) if path_to_bus_file else {}
cr = format_cr_data(path_to_cr_file) if path_to_cr_file else {}
Expand Down
85 changes: 55 additions & 30 deletions ingestor/chalicelib/ridership/sharepoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,39 @@

class SharepointConnection:
def __init__(self, user_agent: str = DEFAULT_USER_AGENT, base_url=BASE_URL, prefix="mbta") -> None:
"""Initialize a SharePoint connection with session and configuration.

Args:
user_agent: User-Agent string for HTTP requests.
base_url: Base SharePoint URL for folder browsing.
prefix: SharePoint tenant prefix used in download URLs.
"""
self.session = self.setup_session(user_agent)
self.base_url = base_url
self.all_files = []
self.prefix = prefix

def setup_session(self, user_agent: str) -> requests.Session:
"""Create and configure an HTTP session with the given User-Agent.

Args:
user_agent: User-Agent header value for the session.

Returns:
Configured requests.Session instance.
"""
session = requests.Session()
session.headers.update({"User-Agent": user_agent})
return session

def get_sharepoint_folder_contents_anonymous(self, share_url):
"""
Get contents of a SharePoint folder using anonymous access via sharing link.
"""Get contents of a SharePoint folder using anonymous access via sharing link.

Args:
share_url: The SharePoint 'anyone with the link' URL
share_url: The SharePoint 'anyone with the link' URL.

Returns:
List of dictionaries containing file information, or None on error
List of dictionaries containing file information, or None on error.
"""
# Follow the sharing link
response = self.session.get(share_url, allow_redirects=True)
Expand All @@ -51,6 +65,17 @@ def get_sharepoint_folder_contents_anonymous(self, share_url):
return files

def parse_g_data(self, html: str):
"""Parse the g_listData JavaScript variable from a SharePoint HTML page.

Extracts the embedded JSON file listing from the page source by locating
and parsing the g_listData variable assignment.

Args:
html: Raw HTML content from a SharePoint folder page.

Returns:
List of file info dictionaries, or None if parsing fails.
"""
# Extract g_listData which contains the file list
# Find the start of g_listData
start_marker = "g_listData = "
Expand Down Expand Up @@ -118,15 +143,13 @@ def parse_g_data(self, html: str):
return None

def get_folder_by_path(self, folder_path):
"""
Get contents of a specific folder by its server-relative path.
"""Get contents of a specific folder by its server-relative path.

Args:
session: requests.Session with cookies
folder_path: Server-relative path like '/sites/PublicData/Shared Documents/...'
folder_path: Server-relative path like '/sites/PublicData/Shared Documents/...'.

Returns:
List of file info dictionaries
List of file info dictionaries, or None on error.
"""
# Construct the URL to view that specific folder

Expand All @@ -142,14 +165,13 @@ def get_folder_by_path(self, folder_path):
return files

def list_all_files_recursive(self, folder_path):
"""
Recursively list all files in a folder and its subfolders.
"""Recursively list all files in a folder and its subfolders.

Args:
folder_path: Server-relative path to start from
folder_path: Server-relative path to start from.

Returns:
List of all files (not folders) found
List of all file info dictionaries (not folders) found.
"""
files = self.get_folder_by_path(folder_path)
if not files:
Expand Down Expand Up @@ -178,15 +200,14 @@ def list_all_files_recursive(self, folder_path):
return all_files

def download_sharepoint_file_anonymous(self, file_ref, output_path):
"""
Download a file from SharePoint using an existing session.
"""Download a file from SharePoint using an existing session.

Args:
file_ref: The FileRef path from the file list
output_path: Local path to save the file
file_ref: The FileRef path from the file list.
output_path: Local path to save the file.

Returns:
True if successful, False otherwise
True if successful, False otherwise.
"""
# Construct download URL
download_url = f"https://{self.prefix}.sharepoint.com{file_ref}?download=1"
Expand All @@ -203,17 +224,22 @@ def download_sharepoint_file_anonymous(self, file_ref, output_path):
return False

def fetch_sharepoint_file(self, file_regex=None, share_url=None, target_date=None, bus_data=True):
"""
Downloads files from Sharepoint matching a regex pattern.
"""Download files from SharePoint matching a regex pattern.

Args:
file_regex (str or Pattern): Regular expression pattern to match against filenames. If None, uses default patterns based on bus_data.
share_url (str): SharePoint sharing URL to download from. If None, uses default URLs based on bus_data.
target_date (date): Date object specifying which file to download. Used for default subway data pattern matching. Optional for Bus data, required for Subway Data when file_regex is None.
bus_data (bool): Whether to download Bus Data (True) or Subway Data (False). Only used when file_regex is None.
file_regex: Regular expression pattern (str or compiled Pattern) to match
against filenames. If None, uses default patterns based on bus_data.
share_url: SharePoint sharing URL to download from. If None, uses default
URLs based on bus_data.
target_date: Date object specifying which file to download. Used for default
subway data pattern matching. Optional for bus data, required for subway
data when file_regex is None.
bus_data: Whether to download bus data (True) or subway data (False). Only
used when file_regex is None.

Returns:
str: Path to named Temporary File with data.
Path to a named temporary file containing the downloaded data, or None
if no matching file is found.
"""
# Determine share URL
if share_url is None:
Expand Down Expand Up @@ -289,16 +315,15 @@ def fetch_sharepoint_file(self, file_regex=None, share_url=None, target_date=Non


def get_file_matching_date_pattern(files: List[dict], pattern: Pattern, target_date: Optional[date] = None):
"""
Find a file matching a date pattern and extract the date from its name.
"""Find a file matching a date pattern and extract the date from its name.

Args:
files: List of file dictionaries with 'name' key
pattern: Compiled regex pattern with three capture groups for year, month, day
files: List of file dictionaries with 'name' key.
pattern: Compiled regex pattern with three capture groups for year, month, day.
target_date: Specific date to match. If None, returns the newest matching file.

Returns:
Tuple of (file_dict, date) if match found, None otherwise
Tuple of (file_dict, date) if a match is found, None otherwise.
"""
newest_file = None
newest_date = None
Expand Down
Loading