Framework for calculating Energy Correlation Functions (ECFs) and other jet substructure variables using Dask and TaskVine for distributed computation in High Energy Physics (HEP).
unset PYTHONPATH
# Create conda environment 'dv5' with Python 3.10
conda create -y -n dv5 -c conda-forge --strict-channel-priority python=3.10
conda activate dv5
# Install build requirements for cctools
conda install -y gcc_linux-64 gxx_linux-64 gdb m4 perl swig make zlib libopenssl-static openssl conda-pack packaging cloudpickle flake8 clang-format threadpoolctlBuild and install the required version of cctools:
# Clone the repository
git clone https://github.com/JinZhou5042/cctools.git
cd cctools
# Configure cctools for the active conda environment
./configure --with-base-dir $CONDA_PREFIX --prefix $CONDA_PREFIX
# Build and install
make -j8 && make install
# Verify installation
vine_worker --version
cd .. # Return to the main project directorypip install dask==2024.7.1 dask-awkward==2024.7.0 awkward==2.6.6 coffea==2024.4.0 fastjet==3.4.2.1These specific versions are recommended for compatibility.
Clone this repository to your shared filesystem:
git clone https://github.com/cooperative-computing-lab/cms-taskvine-example.git
cd sites/nd-crc/Create a poncho package of the conda environment for TaskVine workers. Run this from a directory accessible by both the manager and workers (shared filesystem).
# Ensure you are on a shared filesystem
# Package the 'dv5' conda environment
poncho_package_create $CONDA_PREFIX dv5.tar.gzThis might take some time.
The factory.json file defines resource requirements (cores, memory, disk) for TaskVine workers.
- Review and adjust
factory.jsonfor your computing environment. - If you modify
coresinfactory.json, updatelib_resourcesinecf_calculator.pyaccordingly:lib_resources={'cores': N}whereNmatchesfactory.json.
In a new terminal with access to the shared filesystem (where the poncho package is), start the TaskVine factory:
bash run_factory.shKeep this factory process running during analysis.
Input ROOT files must be organized into subdirectories. The script expects this structure within a samples/ directory by default:
samples/
├── dataset1/ # e.g., hgg
│ ├── file1.root
│ └── file2.root
├── dataset2/ # e.g., hgg_1
│ └── file3.root
└── ...
- Default Path: The script defaults to an example path like
/project01/ndcms/jzhou24/samples. You will likely need to modify thesamples_pathvariable insideecf_calculator.pyif your data resides elsewhere.
From the main project directory (nd-crc), run the provided script to download the necessary datasets. The script will place the data into the samples/ subdirectory.
# Ensure you are in the 'nd-crc' project directory
# THIS SCRIPT IS ON THE WAY...
bash download_datasets.shThis script handles the downloading and extraction of the required ROOT files.
- Preprocessing Time: Preprocessing (
--preprocess) scans all files and can be slow with many datasets. - Selective Preprocessing: Temporarily move unused dataset directories out of the main data directory before running
--preprocessto speed it up. - Adding New Datasets:
- Create a new subdirectory in your main data directory (e.g.,
samples/new_dataset). - Place ROOT files inside it.
- Re-run
--preprocess.
- Create a new subdirectory in your main data directory (e.g.,
Run the main script ecf_calculator.py:
python ecf_calculator.py [options]-
Preprocessing (Required First & After Dataset Changes): Scans the data directory and generates
samples_ready.json.python ecf_calculator.py --preprocess
-
Show Available Samples (Optional): Lists datasets found in
samples_ready.json.python ecf_calculator.py --show-samples
-
Analysis Run (Task Generation & Execution): Generates/loads the Dask task graph and executes it using TaskVine workers.
# Process default 'hgg' dataset, ECF n<=3 python ecf_calculator.py # Process a specific dataset python ecf_calculator.py --sub-dataset hgg_1 # Process all datasets python ecf_calculator.py --all # Process all datasets, ECF n<=5 python ecf_calculator.py --ecf-upper-bound 5 --all
Separate task graph generation from execution.
-
Generate and Save Task Graph: Add
--checkpoint-to <filename.pkl>to an analysis command. Saves the graph without execution.# Generate tasks for all datasets (ECF n<=5) and save python ecf_calculator.py --all --ecf-upper-bound 5 --checkpoint-to tasks.pkl -
Load and Execute Task Graph: Use
--load-from <filename.pkl>instead of dataset selection arguments.# Load the graph and execute python ecf_calculator.py --load-from tasks.pkl --temp-replica-count 2--checkpoint-toand--load-fromare mutually exclusive.
Fine-tune TaskVine interaction during task execution.
Manager Connection & Run Logging:
--manager-name <name>: Connect to a specific TaskVine manager (default:{user}-hgg7).--run-info-path <path>: Directory for TaskVine logs/reports. The script may default to a pre-configured shared path (e.g., on AFS) if available and writable in certain environments, otherwise it defaults to./vine-run-info. If the specified or default path is inaccessible, it falls back to./vine-run-info.--template <template_name>: Create a subdirectory<template_name>within the effectiverun_info_pathfor logs/reports.--enforce-template: Overwrite the--templatedirectory if it exists.
General Tuning:
--wait-for-workers <seconds>: Wait time for workers before starting.--max-workers <count>: Limit max workers (default: unlimited).
Fault Tolerance:
--temp-replica-count <count>: Number of replicas for intermediate files (default: 1).--checkpoint-threshold <seconds>: Min time (s) before considering checkpointing intermediate results (default: 30).--enforce-worker-eviction-interval <seconds>: Testing only. Force worker eviction periodically.
Performance & Scheduling:
--load-balancing: Enable dynamic load balancing.--prune-depth <depth>: Dask graph pruning depth (default: 0).--scheduling-mode <mode>: Task scheduling strategy (default:LIFO).
-
Minimal Workflow:
# 1. Preprocess python ecf_calculator.py --preprocess # 2. Run analysis python ecf_calculator.py
-
Analyze Specific Dataset, Higher ECF Bound:
python ecf_calculator.py --sub-dataset hgg_2 --ecf-upper-bound 6
-
Workflow using Checkpointing:
# 1. Preprocess (if needed) python ecf_calculator.py --preprocess # 2. Generate and save task graph python ecf_calculator.py --all --ecf-upper-bound 5 --checkpoint-to tasks.pkl # 3. Load graph and run python ecf_calculator.py --load-from tasks.pkl --temp-replica-count 3 --template run_rep3 --manager-name my-manager
-
Batch Experiments: See
run_batch_*.shscripts for examples using--load-fromand iterating through tuning parameters with--templatefor log separation.
Analysis results (parquet files) are saved in output/, organized by dataset:
output/
├── dataset1/ # e.g., output/hgg/
│ ├── part-0.parquet
│ └── ...
├── dataset2/ # e.g., output/hgg_1/
│ └── ...
└── ...
The following batch scripts correspond to the experiments presented in the accompanying paper. They are designed to test different aspects of TaskVine's performance and fault tolerance features.
Prerequisite: Before running any batch script, ensure you have generated the necessary task graph file (typically tasks.pkl). This is done using the --checkpoint-to argument with ecf_calculator.py, for example:
# Generate tasks for all datasets (ECF n<=5) and save to tasks.pkl
python ecf_calculator.py --all --ecf-upper-bound 5 --checkpoint-to tasks.pklThese scripts evaluate TaskVine's checkpointing mechanism and data replication for fault tolerance.
# Run replication experiments (varying --temp-replica-count)
bash run_batch_replication.sh
# Run checkpointing experiments (varying --checkpoint-threshold)
bash run_batch_checkpoint.shThis script tests the dynamic load balancing feature.
# Run load balancing experiment (enabling --load-balancing)
bash run_batch_load_balancing.shThis script investigates aspects related to storage usage during computation.
# Run storage consumption related experiment
bash run_batch_storage_consumption.shEach batch script will typically run ecf_calculator.py multiple times with different tuning parameters, loading the task graph from tasks.pkl and using the --template argument to organize logs for each specific run configuration within the TaskVine run info directory.
After running the batch experiments, use the provided Jupyter notebooks to visualize the results:
analyze_fault_tolerance.ipynb: Use this notebook to plot and analyze the results from the checkpointing and replication experiments (run_batch_checkpoint.sh,run_batch_replication.sh).analyze_storage_consumption.ipynb: Use this notebook to plot and analyze the results from the storage consumption experiment (run_batch_storage_consumption.sh).
Ensure your Jupyter environment has access to the necessary Python packages (like matplotlib, pandas, etc.) and can read the log files generated by the batch runs within the TaskVine run info directory (specified by --run-info-path and organized by --template).
- Preprocessing: Run
--preprocessifsamples_ready.jsonis missing or data directory changes. Preprocessing only creates the JSON. - TaskVine Setup: Requires a running TaskVine manager and connected workers.
- Paths: Adapt paths for your environment: input data (
samples_pathinecf_calculator.py), TaskVine run info (--run-info-path, defaults may vary by environment), poncho package location (shared filesystem). - Output/Log Directories:
output/and TaskVine run info dir (--run-info-patheffective location) are created automatically. --show-samples: Use to confirm dataset names before using--sub-dataset.--sporadic-failureArgument: Not part ofecf_calculator.py; likely for external TaskVine worker testing.- Resource Matching: Keep
factory.json(cores) andecf_calculator.py(lib_resources) synchronized.