Copy-paste recipes for common pypiper patterns. Each recipe is a complete, runnable script. For API details, see the full documentation. For bioinformatics-specific examples, see example_pipelines/.
- Run a single command with restart support
- Run a multi-step pipeline with checkpoints
- Capture command output into a Python variable
- Multiple PipelineManagers in one script
- Report results to a structured output file
- Skip expensive steps when output already exists
- Clean up intermediate files
- Handle a command that might fail (nofail)
- Use follow functions for post-processing
- Parallelism patterns
- Interactive / notebook usage
Run a shell command, skip it on re-run if the output file already exists, and handle stale lock files from crashed runs.
#!/usr/bin/env python
"""Recipe 1: Run a single command with automatic restart support."""
import pypiper
pm = pypiper.PipelineManager(name="file_processor", outfolder="output/")
pm.run("wc -l /etc/passwd > output/line_count.txt", target="output/line_count.txt")
pm.stop_pipeline()
# Run again: the command is skipped because output/line_count.txt exists.
# Run with --recover: ignores stale lock files from a crashed run.
# Run with --new-start: forces re-run even if target exists.Alternative: context manager style (calls stop_pipeline() automatically on exit):
#!/usr/bin/env python
"""Recipe 1 (context manager variant)."""
import pypiper
with pypiper.PipelineManager(name="file_processor", outfolder="output/") as pm:
pm.run("wc -l /etc/passwd > output/line_count.txt", target="output/line_count.txt")
# stop_pipeline() is called automatically when the `with` block exits.What happens: The command runs and creates output/line_count.txt. On re-run, the command is skipped because the target file already exists. Pass --new-start to force re-run.
Use checkpoints to enable restarting a pipeline from any named step.
#!/usr/bin/env python
"""Recipe 2: Multi-step pipeline with checkpoints for restart."""
import pypiper
pm = pypiper.PipelineManager(name="sorter", outfolder="output/")
pm.timestamp("### Step 1: Generate data", checkpoint="generate")
pm.run("shuf -i 1-1000 -n 100 > output/numbers.txt", target="output/numbers.txt")
pm.timestamp("### Step 2: Sort data", checkpoint="sort")
pm.run("sort -n output/numbers.txt > output/sorted.txt", target="output/sorted.txt")
pm.timestamp("### Step 3: Extract top 10", checkpoint="top10")
pm.run("head -n 10 output/sorted.txt > output/top10.txt", target="output/top10.txt")
pm.stop_pipeline()
# Run with: python recipe2.py --start-point sort
# This skips Step 1 and starts from Step 2.What happens: Three steps run in sequence, each with a named checkpoint. Passing --start-point sort on re-run skips Step 1 and begins from Step 2.
Use checkprint() to capture stdout from a shell command and report_result() to log key-value metrics.
#!/usr/bin/env python
"""Recipe 3: Capture command output and report structured results."""
import pypiper
pm = pypiper.PipelineManager(name="system_info", outfolder="output/")
hostname = pm.checkprint("hostname")
pm.report_result("hostname", hostname)
timestamp = pm.checkprint("date +%s")
pm.report_result("unix_timestamp", timestamp)
line_count = pm.checkprint("wc -l < /etc/passwd")
pm.report_result("passwd_lines", int(line_count))
pm.stop_pipeline()
# Results are written to output/stats.yaml via pipestat.What happens: Each checkprint() runs a command and returns its stdout as a string. report_result() persists the key-value pairs to the stats file in the output folder.
Use multi=True to run multiple independent PipelineManagers without log file conflicts.
#!/usr/bin/env python
"""Recipe 4: Multiple PipelineManagers in one script."""
import pypiper
# multi=True disables output tee-ing so managers don't conflict on log files.
pm_a = pypiper.PipelineManager(name="task_a", outfolder="output/task_a/", multi=True)
pm_b = pypiper.PipelineManager(name="task_b", outfolder="output/task_b/", multi=True)
pm_a.run("echo 'A done' > output/task_a/result.txt", target="output/task_a/result.txt")
pm_b.run("echo 'B done' > output/task_b/result.txt", target="output/task_b/result.txt")
pm_a.stop_pipeline()
pm_b.stop_pipeline()
# Each manager has its own output folder, locks, and status tracking.
# multi=True is essential when running multiple PMs in one script.
# Commands here run sequentially; for parallelism patterns see Recipe 10.What happens: Two independent pipeline managers run in the same script, each with its own output folder and state. The multi=True flag prevents log file conflicts.
Use report_result() to persist metrics extracted from command output.
#!/usr/bin/env python
"""Recipe 5: Report structured results to a YAML output file."""
import pypiper
pm = pypiper.PipelineManager(name="analyzer", outfolder="output/")
# Generate a file and measure it
pm.run("seq 1 10000 > output/data.txt", target="output/data.txt")
line_count = pm.checkprint("wc -l < output/data.txt")
pm.report_result("line_count", int(line_count))
file_size = pm.checkprint("stat --format=%s output/data.txt")
pm.report_result("file_size_bytes", int(file_size))
word_count = pm.checkprint("wc -w < output/data.txt")
pm.report_result("word_count", int(word_count))
pm.stop_pipeline()
# Results are persisted via pipestat. Check output/ for the results file.
# report_result() also logs each result to the pipeline log.What happens: The pipeline generates data, extracts metrics with checkprint(), and stores them via report_result(). Results are persisted to the pipestat-backed output file (YAML by default).
The target parameter is the key mechanism -- pypiper checks for file existence before running.
#!/usr/bin/env python
"""Recipe 6: Skip expensive steps when output files already exist."""
import pypiper
pm = pypiper.PipelineManager(name="expensive", outfolder="output/")
# This simulates an expensive computation (3 seconds).
# On re-run, it's skipped instantly because the target file exists.
pm.run(
"sleep 3 && seq 1 1000000 > output/big_data.txt",
target="output/big_data.txt",
)
# This step also skips if its target exists.
pm.run(
"wc -l output/big_data.txt > output/summary.txt",
target="output/summary.txt",
)
pm.stop_pipeline()
# First run: takes ~3 seconds. Second run: completes instantly.
# Use --new-start to force re-computation of all steps.What happens: The first run takes about 3 seconds. The second run completes instantly because both target files already exist. Pass --new-start to force re-computation.
Use clean=True or clean_add() to register intermediate files for automatic deletion on success.
#!/usr/bin/env python
"""Recipe 7: Clean up intermediate files after pipeline success."""
import pypiper
pm = pypiper.PipelineManager(name="cleaner", outfolder="output/")
# Method 1: clean=True in run() auto-registers target for cleanup
pm.run(
"seq 1 100 > output/intermediate.txt",
target="output/intermediate.txt",
clean=True, # This file will be deleted on success
)
# Method 2: Explicit clean_add() for manual registration
pm.run("cp output/intermediate.txt output/temp_copy.txt", target="output/temp_copy.txt")
pm.clean_add("output/temp_copy.txt")
# Final output (not cleaned up)
pm.run(
"cat output/intermediate.txt output/temp_copy.txt > output/final.txt",
target="output/final.txt",
)
pm.stop_pipeline()
# On success: intermediate.txt and temp_copy.txt are deleted; final.txt remains.
# Run with --dirty to keep all intermediate files (useful for debugging).What happens: On successful completion, intermediate files are deleted automatically. Only output/final.txt remains. Pass --dirty to preserve all intermediate files for debugging.
Use nofail=True to let the pipeline continue past a command that returns a non-zero exit code.
#!/usr/bin/env python
"""Recipe 8: Continue past a command that might fail."""
import pypiper
pm = pypiper.PipelineManager(name="resilient", outfolder="output/")
# This command will fail (nonexistent path), but nofail=True lets the pipeline continue.
pm.run("ls /nonexistent_path_12345 > output/maybe.txt", target="output/maybe.txt", nofail=True)
# This command runs regardless of the previous failure.
pm.run("echo 'Pipeline continued' > output/success.txt", target="output/success.txt")
pm.stop_pipeline()
# Without nofail=True, the failed command would halt the entire pipeline.
# With nofail=True, the failure is logged and execution continues.What happens: The first command fails but the pipeline continues because of nofail=True. Without it, the pipeline would halt on the first failure.
The follow parameter runs a Python function after the command, only when the command actually executes.
#!/usr/bin/env python
"""Recipe 9: Post-process command output with follow functions."""
import pypiper
pm = pypiper.PipelineManager(name="follower", outfolder="output/")
def report_line_count():
"""This runs after the command, only when the command actually executes."""
count = pm.checkprint("wc -l < output/data.txt")
pm.report_result("line_count", int(count))
pm.run(
"seq 1 500 > output/data.txt",
target="output/data.txt",
follow=report_line_count,
)
pm.stop_pipeline()
# The follow function runs only when the command runs (target doesn't exist).
# On re-run, both the command and follow are skipped.
# Use --force-follow to run follow functions even when commands are skipped.What happens: The follow function runs after the command completes. On re-run, both the command and follow function are skipped because the target exists. Pass --force-follow to run follow functions even when commands are skipped.
Pypiper runs commands sequentially by design. Here are the two ways to get parallelism.
#!/usr/bin/env python
"""Recipe 10: Parallelism patterns with pypiper.
Pypiper runs commands sequentially. Passing a list to pm.run() does NOT
parallelize -- it runs commands one after another under a single lock.
To get parallelism, use one of these approaches:
"""
import pypiper
# APPROACH 1: Parallel pipelines via separate PipelineManager instances.
# Each PM has its own output folder, locks, and status tracking.
# Use multi=True to prevent log file conflicts.
# For large-scale parallelism, use looper to submit each as a separate job.
pm_a = pypiper.PipelineManager(name="task_a", outfolder="output/task_a/", multi=True)
pm_b = pypiper.PipelineManager(name="task_b", outfolder="output/task_b/", multi=True)
pm_a.run("echo 'A done' > output/task_a/result.txt", target="output/task_a/result.txt")
pm_b.run("echo 'B done' > output/task_b/result.txt", target="output/task_b/result.txt")
pm_a.stop_pipeline()
pm_b.stop_pipeline()
# APPROACH 2: Tool-level parallelism -- let the tool use multiple threads.
# Pypiper manages the lifecycle; the tool manages internal parallelism.
# pm.run("samtools sort -@ 8 input.bam", target="sorted.bam")
# pm.run("pigz -p 4 big_file.txt", target="big_file.txt.gz")
# ANTI-PATTERN: Don't call pm.run() from multiple threads on the same PM.
# The shared state (locks, running_procs) is not thread-safe, and the
# check-target-then-run logic would race. Use separate PMs instead.What happens: Each PipelineManager runs independently with its own locks and output folder. For true parallelism across samples, use looper to submit pipeline scripts as separate processes/jobs.
Use multi=True in Jupyter notebooks or interactive sessions to avoid log tee-ing conflicts.
"""Recipe 11: Interactive / notebook usage."""
import pypiper
# multi=True is essential for notebooks -- it disables log tee-ing
# which would interfere with notebook output.
pm = pypiper.PipelineManager(name="notebook", outfolder="output/", multi=True)
# Run commands interactively
pm.run("echo 'hello' > output/greeting.txt", target="output/greeting.txt")
# Capture output directly into Python variables
result = pm.checkprint("cat output/greeting.txt")
print(f"Got: {result}")
# Report results as you go
pm.report_result("greeting", result)
# When done exploring, finalize:
pm.stop_pipeline()
# In a notebook, you can also just let the PM go out of scope.
# multi=True is the key -- without it, pypiper tries to tee stdout
# to a log file, which conflicts with notebook output handling.What happens: The pipeline manager works in interactive mode without interfering with notebook output. multi=True is the key setting that makes this work.
| Method | Purpose |
|---|---|
PipelineManager(name, outfolder) |
Create a pipeline manager |
pm.run(cmd, target=) |
Run a shell command, skip if target exists |
pm.checkprint(cmd) |
Run a command, return its stdout as a string |
pm.report_result(key, value) |
Log a key-value result to the stats file |
pm.timestamp(msg, checkpoint=) |
Log a timestamp with optional checkpoint name |
pm.clean_add(path) |
Register a file for cleanup on success |
pm.stop_pipeline() |
Finalize the pipeline (write stats, clean up) |
| CLI Flag | Purpose |
|---|---|
--recover |
Ignore stale lock files from crashed runs |
--new-start |
Force re-run even if target files exist |
--start-point NAME |
Restart from a named checkpoint |
--dirty |
Keep intermediate files (skip cleanup) |
--force-follow |
Run follow functions even when commands are skipped |