Laufband enables parallel iteration over a dataset from multiple processes, utilizing file-based locking and communication to ensure each item is processed exactly once.
Install Laufband using pip:
pip install laufbandUsing Laufband is similar to the familiar tqdm progress bar for sequential iteration.
from laufband import Laufband
data = list(range(100))
for item in Laufband(data):
# Process each item in the dataset
passThe true power of Laufband emerges when you run your script in parallel. Multiple processes will coordinate using file-based locking to ensure that each item in the dataset is processed by only one process.
Here's a typical example demonstrating parallel processing with Laufband and file-based locking for shared resource access:
import json
import time
from pathlib import Path
from laufband import Laufband
output_file = Path("data.json")
output_file.write_text(json.dumps({"processed_data": []}))
data = list(range(100))
worker = Laufband(data, tqdm_kwargs={"desc": "using Laufband"})
for item in worker:
# Simulate some computationally intensive task
time.sleep(0.1)
with worker.lock:
# Access and modify a shared resource (e.g., a file) safely using the lock
file_content = json.loads(output_file.read_text())
file_content["processed_data"].append(item)
output_file.write_text(json.dumps(file_content))To execute this script (main.py) in parallel, you can use a command like the following in your terminal (this example launches 10 background processes):
for i in {1..10} ; do python main.py & doneImportant
The different processes may finish at different times. Therefore, the order of items in file_content is not guaranteed.
If the order is important, you will need to implement sorting logic afterwards.
In Laufband, a job will be automatically marked as failed if the iteration is interrupted by:
- an unhandled Exception
- or an explicit break.
from laufband import Laufband
data = list(range(100))
# Example 1: break
for item in Laufband(data):
if item == 50:
break # Job 50 will be marked as failed
# Example 2: Exception
for item in Laufband(data):
if item == 70:
raise ValueError("Something went wrong") # Job 70 will be marked as failedIf you want to exit early but still mark the job as successfully completed,
you should use Laufband.close() instead of break:
from laufband import Laufband
data = list(range(100))
worker = Laufband(data)
for item in worker:
if item == 50:
worker.close() # Job 50 will be marked as completed, and iteration will stop cleanlyLaufband supports context manager usage for better worker lifecycle management.
This is recommended, if you want to iterate over the Laufband or Graphband instance multiple times.
from laufband import Laufband
data = list(range(100))
# Using context manager for multiple iterations (recommended)
with Laufband(data) as worker:
while True:
for item in worker:
# do something
pass
# Worker automatically goes offline when exiting contextContext Manager Benefits:
- Proper cleanup: Workers are automatically set to
OFFLINEstatus when exiting the context
Worker Status Lifecycle:
- Without context manager: Worker goes from
IDLE→BUSY→OFFLINE(after completing all tasks) - With context manager: Worker goes from
IDLE→BUSY→IDLE(during processing) →OFFLINE(on context exit)
Note
Graphband uses two locks:
worker.lock: user-facing lock for protecting shared resources in your code (files, sockets, etc.).- An internal
db_lock: used by laufband for database coordination. It’s managed by the library and typically should not be acquired directly by user code.
For atomistic data, the ASE package is widely used to calculate energies and forces of atomic configurations using either ab initio methods or machine-learned interatomic potentials (MLIPs).
You can use Laufband to parallelize these calculations easily without duplication or manual bookkeeping and automatic checkpointing.
The following example uses a MACE foundation model to compute energies and forces on the ASE S22 dataset.
Tip
You can safely run this script multiple times — even across multiple SLURM jobs — without any modifications.
Laufband will automatically coordinate which configurations are processed.
For local parallelization, you can use bash: for i in {1..10} ; do python main.py & done
import ase.io
from ase.collections import s22
from laufband import Laufband
from mace.calculators import mace_mp
# Initialize calculator
calc = mace_mp(model="medium", dispersion=False, default_dtype="float32")
worker = Laufband(list(s22))
for atoms in worker:
atoms.calc = calc
energy = atoms.get_potential_energy()
worker.iterator.set_description(f"{energy = }")
with worker.lock:
ase.io.write("frames.xyz", atoms, append=True)You can use the laufband watch to follow the progress across all active workers.
Laufband supports dependency-aware tasks through laufband.Graphband.
To use this, provide an iterator that yields tasks in a valid execution order.
Note
laufband.Laufband effectively uses a directed graph with no edges as the input to laufband.Graphband.
import networkx as nx
from laufband import Task
def graph_tasks():
digraph = nx.DiGraph()
edges = [
("a", "b"),
("a", "c"),
("b", "d"),
("b", "e"),
("c", "f"),
("c", "g"),
]
digraph.add_edges_from(edges)
for node in nx.topological_sort(digraph):
yield Task(
id=node, # unique string representation of the task
data=node, # optional data associated with the task
dependencies=set(digraph.predecessors(node)), # dependencies of the task
)Given this generator, you can iterate the graph in parallel using laufband.Graphband.
Warning
Once laufband.Graphband has executed a task, it will not re-execute it, even if a dependency is added later on.
from laufband import Graphband
worker = Graphband(graph_tasks())
for task in worker:
print(task.id, task.data)You can assign requirements to your tasks and labels to workers to control their execution.
from laufband import Task, Graphband
def iterator():
yield Task(id="task1")
yield Task(id="task2", requirements={"gpu"})
w1 = Graphband(iterator(), identifier="w1")
w2 = Graphband(iterator(), identifier="w2", labels={"gpu"})
print([x.id for x in w1])
# ["task1"]
print([x.id for x in w2])
# ["task2"]Sometimes a task supports internal parallel execution (e.g., nested use of laufband). In such a case, you can assign multiple workers to one task.
Note
Keep in mind that laufband does not actually schedule the execution. The number of available workers per task depends on how many workers are spawned.
from laufband import Task, Graphband
def iterator():
yield Task(id="task1", max_parallel_workers=2)
# At most 2 workers will be assigned to this job until both successfully finish.
worker = Graphband(iterator())
for item in worker:
# Code that can be executed multiple times, e.g., via laufband itself.
...