Skip to content

Commit 9cffb43

Browse files
authored
Merge pull request #3387 from PolicyEngine/codex/budget-window-batch
Add budget-window batch economy endpoint
2 parents 57e0537 + 55e4f2f commit 9cffb43

23 files changed

Lines changed: 2842 additions & 132 deletions

.env.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,9 @@ OPENAI_API_KEY=policyengine_openai_api_key
1818

1919
# Token for Hugging Face models
2020
HUGGING_FACE_TOKEN=policyengine_huggingface_token
21+
22+
# Redis is required for budget-window economy requests and other API cache paths.
23+
# Local development and App Engine use an in-container/local Redis by default.
24+
CACHE_REDIS_HOST=127.0.0.1
25+
CACHE_REDIS_PORT=6379
26+
CACHE_REDIS_DB=0

.github/workflows/push.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ jobs:
226226
- name: Install staging test dependencies
227227
run: pip install pytest httpx
228228
- name: Run staging smoke test
229-
run: python -m pytest tests/integration/test_live_calculate.py tests/integration/test_live_economy.py -v
229+
run: python -m pytest tests/integration/test_live_calculate.py tests/integration/test_live_economy.py tests/integration/test_live_budget_window_cache.py -v
230230
env:
231231
API_BASE_URL: ${{ needs.deploy-staging.outputs.url }}
232232
STAGING_API_TEST_PROBE_ID: ${{ needs.deploy-staging.outputs.version }}

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ test-env-vars:
1111
pytest tests/env_variables
1212

1313
test:
14-
MAX_HOUSEHOLDS=1000 coverage run -a --branch -m pytest tests/to_refactor tests/unit --disable-pytest-warnings
14+
MAX_HOUSEHOLDS=1000 coverage run -a --branch -m pytest tests/to_refactor tests/unit tests/integration/test_budget_window_in_flight_dedupe.py --disable-pytest-warnings
1515
coverage xml -i
1616

1717
debug-test:

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ NOTE: Any output that needs to be calculated will not work. Therefore, only hous
120120

121121
### 6. Testing calculations
122122

123+
Redis is required for API cache paths, including budget-window economy requests. The budget-window endpoint uses Redis for completed-result caching and in-flight batch deduplication; if Redis is unavailable, those requests fail instead of falling back to the database or an in-process cache.
124+
123125
To test anything that utilizes Redis or the API's service workers (e.g. anything that requires society-wide calculations with the policy calculator), you'll also need to complete the following steps:
124126

125127
1. Start Redis
@@ -136,6 +138,8 @@ brew install redis
136138
redis-server
137139
```
138140

141+
By default the API connects to Redis at `127.0.0.1:6379`, database `0`. Override this with `CACHE_REDIS_HOST`, `CACHE_REDIS_PORT`, and `CACHE_REDIS_DB` if your local Redis uses different connection settings.
142+
139143
2. Start the API
140144

141145
Run the below
@@ -144,6 +148,8 @@ Run the below
144148
FLASK_DEBUG=1 python -m flask --app policyengine_api.api run
145149
```
146150

151+
App Engine staging and production deployments install and start Redis in the API container before Gunicorn starts.
152+
147153
NOTE: Calculations are not possible in the uk app without access to a specific dataset. Expect an error: "ValueError: Invalid response code 404 for url https://api.github.com/repos/policyengine/non-public-microdata/releases/tags/uk-2024-march-efo."
148154

149155
## Testing, Formatting, Changelogging
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added a budget-window economy endpoint that batches yearly impact calculations with bounded server-side concurrency and returns aggregated progress plus totals.

gcp/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
The deployment actions build Docker images and deploy them to Google App Engine. The docker images themselves are based off a starter image (to save each API docker image having to spend 5 minutes installing the same dependencies). The starter image is the `Dockerfile` in this directory.
44

5+
The App Engine API image installs `redis-server` and starts it through `gcp/policyengine_api/start.sh`. Redis is required at runtime for budget-window economy request caching and in-flight batch deduplication. The API reads `CACHE_REDIS_HOST`, `CACHE_REDIS_PORT`, and `CACHE_REDIS_DB`, defaulting to `127.0.0.1`, `6379`, and `0`.
6+
57
To update the starter image:
68
* `python setup.py sdist` to build the python package
79
* `twine upload dist/*` to upload the package to pypi as `policyengine-api`

gcp/policyengine_api/start.sh

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,25 @@
11
#!/bin/sh
22
# Environment variables
33
PORT="${PORT:-8080}"
4-
REDIS_PORT="${REDIS_PORT:-6379}"
4+
CACHE_REDIS_HOST="${CACHE_REDIS_HOST:-127.0.0.1}"
5+
CACHE_REDIS_PORT="${CACHE_REDIS_PORT:-6379}"
6+
CACHE_REDIS_DB="${CACHE_REDIS_DB:-0}"
7+
export CACHE_REDIS_HOST CACHE_REDIS_PORT CACHE_REDIS_DB
58

6-
# Start the API
7-
gunicorn -b :"$PORT" policyengine_api.api --timeout 300 --workers 5 --preload &
8-
9-
# Start Redis with configuration for multiple clients
10-
redis-server --protected-mode no \
9+
# Start Redis with configuration for multiple clients.
10+
redis-server --bind "$CACHE_REDIS_HOST" \
11+
--port "$CACHE_REDIS_PORT" \
12+
--protected-mode yes \
1113
--maxclients 10000 \
1214
--timeout 0 &
1315

1416
# Wait for Redis to be ready
15-
sleep 2
17+
until redis-cli -h "$CACHE_REDIS_HOST" -p "$CACHE_REDIS_PORT" ping >/dev/null 2>&1; do
18+
sleep 1
19+
done
20+
21+
# Start the API
22+
gunicorn -b :"$PORT" policyengine_api.api --timeout 300 --workers 5 --preload &
1623

1724
# Keep the script running and handle shutdown gracefully
1825
trap "pkill -P $$; exit 1" INT TERM

policyengine_api/api.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import time
66
import sys
7+
import os
78

89
start_time = time.time()
910

@@ -89,8 +90,9 @@ def log_timing(message):
8990
{
9091
"CACHE_TYPE": "RedisCache",
9192
"CACHE_KEY_PREFIX": "policyengine",
92-
"CACHE_REDIS_HOST": "127.0.0.1",
93-
"CACHE_REDIS_PORT": 6379,
93+
"CACHE_REDIS_HOST": os.environ.get("CACHE_REDIS_HOST", "127.0.0.1"),
94+
"CACHE_REDIS_PORT": int(os.environ.get("CACHE_REDIS_PORT", "6379")),
95+
"CACHE_REDIS_DB": int(os.environ.get("CACHE_REDIS_DB", "0")),
9496
"CACHE_DEFAULT_TIMEOUT": 300,
9597
}
9698
)

policyengine_api/libs/simulation_api_modal.py

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import os
99
import sys
10-
from dataclasses import dataclass
10+
from dataclasses import dataclass, field
1111
from typing import Optional
1212

1313
import httpx
@@ -42,6 +42,28 @@ def name(self) -> str:
4242
return self.job_id
4343

4444

45+
@dataclass
46+
class ModalBudgetWindowBatchExecution:
47+
"""
48+
Represents a budget-window batch execution in the Modal simulation API.
49+
"""
50+
51+
batch_job_id: str
52+
status: str
53+
progress: Optional[int] = None
54+
completed_years: list[str] = field(default_factory=list)
55+
running_years: list[str] = field(default_factory=list)
56+
queued_years: list[str] = field(default_factory=list)
57+
failed_years: list[str] = field(default_factory=list)
58+
result: Optional[dict] = None
59+
error: Optional[str] = None
60+
61+
@property
62+
def name(self) -> str:
63+
"""Alias for batch_job_id."""
64+
return self.batch_job_id
65+
66+
4567
class SimulationAPIModal:
4668
"""
4769
HTTP client for the Modal Simulation API.
@@ -154,6 +176,57 @@ def run(self, payload: dict) -> ModalSimulationExecution:
154176
)
155177
raise
156178

179+
def run_budget_window_batch(self, payload: dict) -> ModalBudgetWindowBatchExecution:
180+
"""
181+
Submit a budget-window batch job to the Modal API.
182+
"""
183+
try:
184+
modal_payload = dict(payload)
185+
if "model_version" in modal_payload:
186+
modal_payload["version"] = modal_payload.pop("model_version")
187+
modal_payload.pop("data_version", None)
188+
189+
response = self.client.post(
190+
f"{self.base_url}/simulate/economy/budget-window",
191+
json=modal_payload,
192+
)
193+
response.raise_for_status()
194+
data = response.json()
195+
196+
logger.log_struct(
197+
{
198+
"message": "Modal budget-window batch submitted",
199+
"batch_job_id": data.get("batch_job_id"),
200+
"status": data.get("status"),
201+
},
202+
severity="INFO",
203+
)
204+
205+
return ModalBudgetWindowBatchExecution(
206+
batch_job_id=data["batch_job_id"],
207+
status=data["status"],
208+
)
209+
210+
except httpx.HTTPStatusError as e:
211+
logger.log_struct(
212+
{
213+
"message": f"Modal batch API HTTP error: {e.response.status_code}",
214+
"response_text": e.response.text[:500],
215+
},
216+
severity="ERROR",
217+
)
218+
raise
219+
220+
except httpx.RequestError as e:
221+
logger.log_struct(
222+
{
223+
"message": f"Modal batch API request error: {str(e)}",
224+
"run_id": (payload.get("_telemetry") or {}).get("run_id"),
225+
},
226+
severity="ERROR",
227+
)
228+
raise
229+
157230
def resolve_app_name(
158231
self, country: str, version: Optional[str] = None
159232
) -> tuple[str, str]:
@@ -235,6 +308,51 @@ def get_execution_by_id(self, job_id: str) -> ModalSimulationExecution:
235308
)
236309
raise
237310

311+
def get_budget_window_batch_by_id(
312+
self, batch_job_id: str
313+
) -> ModalBudgetWindowBatchExecution:
314+
"""
315+
Poll the Modal API for the current status of a budget-window batch.
316+
"""
317+
try:
318+
response = self.client.get(
319+
f"{self.base_url}/budget-window-jobs/{batch_job_id}"
320+
)
321+
if response.status_code not in (200, 202, 500):
322+
response.raise_for_status()
323+
data = response.json()
324+
325+
return ModalBudgetWindowBatchExecution(
326+
batch_job_id=batch_job_id,
327+
status=data["status"],
328+
progress=data.get("progress"),
329+
completed_years=data.get("completed_years", []),
330+
running_years=data.get("running_years", []),
331+
queued_years=data.get("queued_years", []),
332+
failed_years=data.get("failed_years", []),
333+
result=data.get("result"),
334+
error=data.get("error"),
335+
)
336+
337+
except httpx.HTTPStatusError as e:
338+
logger.log_struct(
339+
{
340+
"message": f"Modal batch API HTTP error polling job {batch_job_id}: {e.response.status_code}",
341+
"response_text": e.response.text[:500],
342+
},
343+
severity="ERROR",
344+
)
345+
raise
346+
347+
except httpx.RequestError as e:
348+
logger.log_struct(
349+
{
350+
"message": f"Modal batch API request error polling job {batch_job_id}: {str(e)}",
351+
},
352+
severity="ERROR",
353+
)
354+
raise
355+
238356
def get_execution_status(self, execution: ModalSimulationExecution) -> str:
239357
"""
240358
Get the status string from an execution.

0 commit comments

Comments
 (0)