Skip to content

Commit f43fce6

Browse files
author
Irina
committed
project part 6
1 parent 06da330 commit f43fce6

3 files changed

Lines changed: 234 additions & 0 deletions

File tree

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
name: Update Energy Data
2+
3+
on:
4+
schedule:
5+
- cron: "0 12 * * 1"
6+
workflow_dispatch:
7+
8+
jobs:
9+
update_data:
10+
runs-on: ubuntu-latest
11+
12+
steps:
13+
- name: Check out repository
14+
uses: actions/checkout@v4
15+
16+
- name: Set up Python
17+
uses: actions/setup-python@v5
18+
with:
19+
python-version: "3.11"
20+
cache: pip
21+
22+
- name: Install dependencies
23+
run: pip install -r requirements.txt
24+
25+
- name: Run weekly supply ETL
26+
env:
27+
GCP_SERVICE_ACCOUNT: ${{ secrets.GCP_SERVICE_ACCOUNT }}
28+
EIA_API_KEY: ${{ secrets.EIA_API_KEY }}
29+
run: python scripts/load_weekly_supply_data.py
30+
31+
- name: Run WTI ETL
32+
env:
33+
GCP_SERVICE_ACCOUNT: ${{ secrets.GCP_SERVICE_ACCOUNT }}
34+
EIA_API_KEY: ${{ secrets.EIA_API_KEY }}
35+
run: python scripts/load_wti_data.py

scripts/load_weekly_supply_data.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import json
2+
import os
3+
4+
import pandas as pd
5+
import requests
6+
from google.cloud import bigquery
7+
from google.oauth2 import service_account
8+
9+
PROJECT_ID = "sipa-adv-c-giggling-wombat"
10+
DATASET_ID = "petroleum_supply"
11+
12+
WEEKLY_SUPPLY_TABLE = f"{PROJECT_ID}.{DATASET_ID}.weekly_supply"
13+
WEEKLY_SUPPLY_BY_PRODUCT_TABLE = (
14+
f"{PROJECT_ID}.{DATASET_ID}.weekly_supply_by_product"
15+
)
16+
17+
REQUEST_TIMEOUT = 30
18+
19+
20+
def get_bq_client():
21+
service_account_info = json.loads(os.environ["GCP_SERVICE_ACCOUNT"])
22+
credentials = service_account.Credentials.from_service_account_info(
23+
service_account_info
24+
)
25+
return bigquery.Client(
26+
credentials=credentials,
27+
project=credentials.project_id,
28+
)
29+
30+
31+
def fetch_supply_data() -> pd.DataFrame:
32+
api_key = os.environ["EIA_API_KEY"]
33+
url = (
34+
"https://api.eia.gov/v2/petroleum/cons/wpsup/data/"
35+
f"?api_key={api_key}"
36+
"&frequency=weekly"
37+
"&data[0]=value"
38+
"&sort[0][column]=period"
39+
"&sort[0][direction]=desc"
40+
"&offset=0&length=5000"
41+
)
42+
43+
response = requests.get(url, timeout=REQUEST_TIMEOUT)
44+
response.raise_for_status()
45+
records = response.json()["response"]["data"]
46+
47+
df = pd.DataFrame(records)
48+
df["week"] = pd.to_datetime(df["period"])
49+
df["value"] = pd.to_numeric(df["value"], errors="coerce")
50+
51+
for col in df.columns:
52+
if df[col].dtype == "object":
53+
df[col] = df[col].astype(str)
54+
55+
df = df.dropna(subset=["week", "value"]).copy()
56+
df = df.sort_values("week").reset_index(drop=True)
57+
return df
58+
59+
60+
def build_weekly_supply(df: pd.DataFrame) -> pd.DataFrame:
61+
weekly_supply = (
62+
df.groupby("week", as_index=False)["value"]
63+
.sum()
64+
.rename(columns={"value": "total_supply"})
65+
.sort_values("week")
66+
.reset_index(drop=True)
67+
)
68+
return weekly_supply
69+
70+
71+
def find_product_column(df: pd.DataFrame) -> str:
72+
candidate_columns = [
73+
"product",
74+
"product-name",
75+
"product_name",
76+
"process",
77+
"name",
78+
]
79+
for col in candidate_columns:
80+
if col in df.columns:
81+
return col
82+
raise KeyError(
83+
"Could not find a product column in the EIA supply data."
84+
)
85+
86+
87+
def build_weekly_supply_by_product(df: pd.DataFrame) -> pd.DataFrame:
88+
product_col = find_product_column(df)
89+
90+
weekly_supply_by_product = (
91+
df.groupby(["week", product_col], as_index=False)["value"]
92+
.sum()
93+
.rename(
94+
columns={
95+
product_col: "product",
96+
"value": "product_supplied",
97+
}
98+
)
99+
.sort_values(["week", "product"])
100+
.reset_index(drop=True)
101+
)
102+
return weekly_supply_by_product
103+
104+
105+
def load_table(df: pd.DataFrame, table_id: str):
106+
client = get_bq_client()
107+
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
108+
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
109+
job.result()
110+
111+
112+
def main():
113+
raw_df = fetch_supply_data()
114+
115+
weekly_supply = build_weekly_supply(raw_df)
116+
weekly_supply_by_product = build_weekly_supply_by_product(raw_df)
117+
118+
load_table(weekly_supply, WEEKLY_SUPPLY_TABLE)
119+
print(f"Loaded {len(weekly_supply)} rows into {WEEKLY_SUPPLY_TABLE}")
120+
121+
load_table(weekly_supply_by_product, WEEKLY_SUPPLY_BY_PRODUCT_TABLE)
122+
print(
123+
"Loaded "
124+
f"{len(weekly_supply_by_product)} rows into "
125+
f"{WEEKLY_SUPPLY_BY_PRODUCT_TABLE}"
126+
)
127+
128+
129+
if __name__ == "__main__":
130+
main()

scripts/load_wti_data.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import json
2+
import os
3+
4+
import pandas as pd
5+
import requests
6+
from google.cloud import bigquery
7+
from google.oauth2 import service_account
8+
9+
PROJECT_ID = "sipa-adv-c-giggling-wombat"
10+
DATASET_ID = "petroleum_supply"
11+
TABLE_ID = f"{PROJECT_ID}.{DATASET_ID}.weekly_wti"
12+
13+
WTI_SERIES = "RWTC"
14+
REQUEST_TIMEOUT = 30
15+
16+
17+
def get_bq_client():
18+
service_account_info = json.loads(os.environ["GCP_SERVICE_ACCOUNT"])
19+
credentials = service_account.Credentials.from_service_account_info(
20+
service_account_info
21+
)
22+
return bigquery.Client(
23+
credentials=credentials,
24+
project=credentials.project_id,
25+
)
26+
27+
28+
def fetch_wti_data() -> pd.DataFrame:
29+
api_key = os.environ["EIA_API_KEY"]
30+
url = (
31+
"https://api.eia.gov/v2/petroleum/pri/spt/data/"
32+
f"?api_key={api_key}"
33+
"&frequency=weekly"
34+
"&data[0]=value"
35+
f"&facets[series][]={WTI_SERIES}"
36+
"&sort[0][column]=period"
37+
"&sort[0][direction]=desc"
38+
"&offset=0&length=5000"
39+
)
40+
41+
response = requests.get(url, timeout=REQUEST_TIMEOUT)
42+
response.raise_for_status()
43+
records = response.json()["response"]["data"]
44+
45+
df = pd.DataFrame(records)
46+
df["week"] = pd.to_datetime(df["period"])
47+
df["wti_price"] = pd.to_numeric(df["value"], errors="coerce")
48+
df["series"] = df["series"].astype(str)
49+
50+
df = df[["week", "series", "wti_price"]].dropna()
51+
df = df.sort_values("week").reset_index(drop=True)
52+
return df
53+
54+
55+
def load_to_bigquery(df: pd.DataFrame):
56+
client = get_bq_client()
57+
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
58+
job = client.load_table_from_dataframe(df, TABLE_ID, job_config=job_config)
59+
job.result()
60+
61+
62+
def main():
63+
df = fetch_wti_data()
64+
load_to_bigquery(df)
65+
print(f"Loaded {len(df)} rows into {TABLE_ID}")
66+
67+
68+
if __name__ == "__main__":
69+
main()

0 commit comments

Comments
 (0)