A Prefect workflow orchestration project for managing data lake loading operations.
This project handles ETL operations for loading data from various sources (PeopleSoft, SnapLogic, VDS, SAP) into a PostgreSQL data lake. It consists of three main flows:
- Term Raw Flow - Loads term data from PeopleSoft (runs at 1:00 AM ET daily)
- Course Raw Flow - Loads course data from SnapLogic (runs at 2:00 AM ET daily)
- Person Raw Flow - Loads person data from multiple sources (runs at 3:00 AM ET daily)
DatalakeDataload/
├── flows/ # Prefect flow definitions
│ ├── __init__.py
│ ├── term_raw_flow.py # Term data loading flow
│ ├── course_raw_flow.py # Course data loading flow
│ └── person_raw_flow.py # Person data loading flow
├── tasks/ # Reusable Prefect tasks (currently empty)
│ └── __init__.py
├── config/ # Configuration modules
│ ├── __init__.py
│ ├── settings.py # Environment variables and settings
│ └── resources.py # Database and API connection resources
├── deployments/ # Deployment configurations
│ ├── term_raw_deployment.py
│ ├── course_raw_deployment.py
│ └── person_raw_deployment.py
├── prefect.yaml # Prefect project configuration
├── requirements.txt # Python dependencies
├── .env.example # Example environment variables
└── README.md # This file
- Prefect – Workflow orchestration and flow execution
- SQLAlchemy (async) – Database interactions
- httpx – Asynchronous HTTP-based integration with external APIs
- asyncio – Managing concurrent API calls
- DeepDiff – Object comparison for data change detection
- PostgreSQL – Data lake storage (target database)
- asyncpg – Async PostgreSQL driver for COPY operations and batch inserts
The project uses a three-layer data architecture for all pipelines (Person, Course, and Term):
-
Raw Layer (
*_rawschemas)- INSERT-only operations performed by Prefect flows
- Retains historical log of all changes
- Examples:
person_raw.person_data,course_raw.course_data,term_raw.term_data
-
Transform Layer (
*_xformschemas)- UPSERT operations managed by SQL triggers and functions
- Always contains the current state of data
- Examples:
person_xform.current_person_data,course_xform.current_course_data,term_xform.current_term_data
-
Curated Layer (
*_curatedschemas)- UPSERT operations managed by SQL triggers and functions
- Service-based filtering using JSON path definitions
- Examples:
person_curated.person_data_by_service,course_curated.course_data_by_service,term_curated.term_data_by_service
| Variable | Description |
|---|---|
POSTGRES_HOST |
Hostname for the PostgreSQL server |
POSTGRES_PORT |
Port number |
POSTGRES_DB |
Target database name |
POSTGRES_USER |
Username |
POSTGRES_PASS |
Password |
| Variable | Description |
|---|---|
SNAPLOGIC_PERSON_URL |
API URL for SnapLogic person data |
SNAPLOGIC_PERSON_KEY |
API token for SnapLogic person data |
SNAPLOGIC_COURSE_URL |
API URL for SnapLogic course data |
SNAPLOGIC_COURSE_KEY |
API token for SnapLogic course data |
CS_ENV |
Campus Solutions environment for SnapLogic (test, prod, etc.) |
| Variable | Description |
|---|---|
PEOPLE_SOFT_USER |
Username for PeopleSoft |
PEOPLE_SOFT_PASS |
Password for PeopleSoft |
| Variable | Description |
|---|---|
VDS_URL |
URL endpoint for the VDS API |
VDS_USERNAME |
Username for VDS authentication |
VDS_PASSWORD |
Password for VDS authentication |
| Variable | Description |
|---|---|
SAP_URL |
The base URL of the SAP endpoint for employee data |
SAP_KEY |
The API key for authenticating SAP requests |
- Python 3.9 or higher (requires Python >=3.9, <3.14)
- Prefect 2.14.0 or higher
- PostgreSQL database
- Access to PeopleSoft, SnapLogic, VDS, and SAP APIs
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activatepip install -r requirements.txtCopy the example environment file and configure all required variables:
cp .env.example .env
# Edit .env with your actual configurationRequired environment variables:
- PostgreSQL:
POSTGRES_HOST,POSTGRES_PORT,POSTGRES_DB,POSTGRES_USER,POSTGRES_PASS - Campus Solutions:
CS_ENV - PeopleSoft:
PEOPLE_SOFT_USER,PEOPLE_SOFT_PASS - SnapLogic:
SNAPLOGIC_COURSE_URL,SNAPLOGIC_COURSE_KEY,SNAPLOGIC_PERSON_URL,SNAPLOGIC_PERSON_KEY - VDS:
VDS_URL,VDS_USERNAME,VDS_PASSWORD - SAP:
SAP_URL,SAP_KEY
Run the SQL scripts to set up the required database schemas:
# Run term schema
psql -h $POSTGRES_HOST -U $POSTGRES_USER -d $POSTGRES_DB -f sql/term.sql
# Run course schema
psql -h $POSTGRES_HOST -U $POSTGRES_USER -d $POSTGRES_DB -f sql/course.sql
# Run person schema
psql -h $POSTGRES_HOST -U $POSTGRES_USER -d $POSTGRES_DB -f sql/person.sqlprefect server startOr connect to Prefect Cloud:
prefect cloud login# Run term flow
python flows/term_raw_flow.py
# Run course flow
python flows/course_raw_flow.py
# Run person flow
python flows/person_raw_flow.pyUsing the deployment scripts:
# Create term deployment
python deployments/term_raw_deployment.py
# Create course deployment
python deployments/course_raw_deployment.py
# Create person deployment
python deployments/person_raw_deployment.pyOr deploy all at once using the prefect.yaml file:
prefect deploy --all# For deployments using work pools
prefect worker start --pool "default-agent-pool"
# Or for older agent-based deployments
prefect agent start --pool "default-agent-pool"# Run term flow deployment
prefect deployment run 'term-raw-flow/term-raw-daily'
# Run course flow deployment
prefect deployment run 'course-raw-flow/course-raw-daily'
# Run person flow deployment
prefect deployment run 'person-raw-flow/person-raw-daily'- Schedule: Daily at 1:00 AM ET
- Source: PeopleSoft BU_TERM_QRY
- Target:
term_raw.term_datatable - Description: Fetches term data from PeopleSoft, truncates the target table, and inserts new data in JSONB format
- Schedule: Daily at 2:00 AM ET
- Source: SnapLogic Course API
- Target:
course_raw.course_datatable - Description: Fetches course data for active terms from SnapLogic and inserts into PostgreSQL using batch operations
- Schedule: Daily at 3:00 AM ET
- Sources: PeopleSoft, SAP, VDS (commented out), SnapLogic Person API
- Target:
person_raw.person_datatable - Description:
- Fetches BUIDs from PeopleSoft and SAP
- Queries PeopleSoft for uidCarTerm data for each BUID
- Batches uidCarTerm data and sends to SnapLogic Person API
- Inserts person data with sensitive fields removed
All flows support async execution and can be tested locally:
# Run with asyncio
python -c "import asyncio; from flows.term_raw_flow import term_raw_flow; asyncio.run(term_raw_flow())"Or simply execute the flow file:
python flows/term_raw_flow.pyAfter starting the Prefect server, visit:
- Local: http://localhost:4200
- Cloud: https://app.prefect.cloud
prefect deployment lsprefect flow-run lsprefect flow-run cancel <flow-run-id>prefect deployment delete <deployment-name>Schedules are configured in the deployment files. To change a schedule, edit the relevant deployment file:
# In deployments/term_raw_deployment.py
schedule=CronSchedule(
cron="0 1 * * *", # Change this cron expression
timezone="America/New_York"
)Then redeploy:
python deployments/term_raw_deployment.py- Create a new flow file in the
flows/directory - Define your flow using the
@flowdecorator - Import resources from
config.resources - Add tasks using async/await patterns
- Update
flows/__init__.pyto export your flow - Create a deployment configuration in
deployments/
The asyncpg connection pool sizes are hardcoded in config/resources.py:
- Minimum pool size: 12 connections
- Maximum pool size: 24 connections
To adjust these values, edit the AsyncpgPoolResource.get_pool_config() method in config/resources.py.
The person flow uses semaphores to control concurrency:
PSQUERY_SEMAPHORE_LIMIT = 10 # Concurrent PeopleSoft queries
SNAPLOGIC_SEMAPHORE_LIMIT = 8 # Concurrent SnapLogic requests
INSERT_SEMAPHORE_LIMIT = 100 # Concurrent database insertsAdjust these values in flows/person_raw_flow.py based on your infrastructure.
Course flow batch size can be adjusted:
INSERT_BATCH_SIZE = 50 # Records per batch insert- Set up notification blocks in Prefect UI or via code
- Add notification tasks to your flows
- Use Prefect automations for automatic alerts on flow failures
All flows and tasks use Prefect's logging system:
from prefect.logging import get_run_logger
logger = get_run_logger()
logger.info("Your message here")- Use task retries: Configure retries for tasks that might fail temporarily
- Add tags: Tag your flows and deployments for better organization
- Use blocks: Store credentials and configurations as Prefect blocks
- Version control: Keep deployments versioned
- Testing: Test flows locally before deploying
- Monitoring: Set up alerts for critical flows
- Check that an agent/worker is running for the correct work pool
- Verify the deployment is active:
prefect deployment ls - Check flow run logs in the Prefect UI
- Verify database credentials in
.env - Check network connectivity to database
- Ensure asyncpg and psycopg are installed
- Test connection:
psql -h $POSTGRES_HOST -U $POSTGRES_USER -d $POSTGRES_DB
- Verify API credentials and endpoints in
.env - Check that API keys are not expired
- Test API connectivity with curl or httpx
- Verify all dependencies are installed:
pip install -r requirements.txt - Check that you're using the correct Python environment
- Ensure Python 3.8+ is being used
If the person flow runs out of memory:
- Reduce
UIDCARTERM_GROUP_SIZE(default: 1000) - Reduce
insert_queuemax size (default: 20000) - Reduce semaphore limits
[Your License Here]