Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions configs/other_software/batch_system/slurm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,30 @@ computer:
output_flags: "--output=${thisrun_logfile} --error=${thisrun_logfile}"
name_flag: "--job-name=${expid}"

taskset: false
hetjob_strategy: hetjob
mt_launcher_flag: ""

add_choose_heterogeneous_parallelization:
true:
cpu_bind: "none"
choose_taskset:
choose_hetjob_strategy:
# Support for old taskset heterogeneous parallelization approach
true:
taskset:
srun_execution_command: "${debugger_flags_prelauncher} ${launcher} ${launcher_flags} ${mt_launcher_flag}--multi-prog ${config_files.hostfile}"
# Support for new packjob heterogeneous parallelization approach
false:

# kh 24.06.22 --hint=nomultithread is needed (in many cases) on levante to avoid hyperthreading
# Support for hetjob parallelization approach (heterogeneity handled with heterogeneous slurm job + srun steps, allows for heterogeneous resources)
hetjob:
launcher_flags_per_component: "
${mt_launcher_flag}
--nodes=@nnodes@
--ntasks=@nproc@
--ntasks-per-node=@nproc_per_node@
--cpus-per-task=@cpus_per_proc@
--export=ALL,OMP_NUM_THREADS=@omp_num_threads@"
srun_execution_command: "${debugger_flags_prelauncher} ${launcher} ${launcher_flags} \\\n@components@"
launcher_comp_sep: " \\\n:"
# Support for srun steps approach (heterogeneity handled in the srun command)
srunsteps:
# kh 24.06.22 --hint=nomultithread is needed (in many cases) on levante to avoid hyperthreading
launcher_flags_per_component: "
${mt_launcher_flag}
--nodes=@nnodes@
Expand Down
93 changes: 74 additions & 19 deletions src/esm_runscripts/batch_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def job_is_still_running(self, jobid):
def add_pre_launcher_lines(self, config, cluster, runfile):
return self.bs.add_pre_launcher_lines(config, cluster, runfile)

# TODO: remove it once it's not needed anymore (substituted by packjob)
def write_het_par_wrappers(self, config):
return self.bs.write_het_par_wrappers(config)

Expand Down Expand Up @@ -375,26 +374,35 @@ def get_run_commands(config, subjob, batch_or_shell): # here or in compute.py?

commands = []
if subjob.startswith("compute"):
if config["general"].get("submit_to_batch_system", True):
batch_system = config["computer"]
if "execution_command" in batch_system:
commands.append(
"time "
+ batch_system["execution_command"]
+ f" 2>&1{config['computer'].get('write_execution_log', '')} &"
)
if config["general"].get("multi_srun"):
return self.bs.get_run_commands_multisrun(config, commands)
submit_to_batch_system = config["general"].get(
"submit_to_batch_system", True
)
write_execution_log = config["computer"].get(
"write_execution_log", ""
)

if submit_to_batch_system:
execution_command_list = config["computer"].get(
"execution_command_list",
[config["computer"].get("execution_command")]
)
else:
execution_command_list = []
for model in config:
if model == "computer":
continue
if "execution_command" in config[model]:
commands.append(
"time ./"
+ config[model]["execution_command"]
+ f" 2>&1{config['computer'].get('write_execution_log', '')} &"
execution_command_list.append(
config[model]["execution_command"]
)

for execution_command in execution_command_list:
commands.append(
f"time {execution_command} 2>&1{write_execution_log} &"
)

if submit_to_batch_system and config["general"].get("multi_srun"):
return self.bs.get_run_commands_multisrun(config, commands)
else:
subjob_tasks = dataprocess.subjob_tasks(config, subjob, batch_or_shell)
for task in subjob_tasks:
Expand Down Expand Up @@ -460,10 +468,9 @@ def write_simple_runscript(config, cluster, batch_or_shell="batch"):
if batch_or_shell == "batch":

config = batch_system.calculate_requirements(config, cluster)
# TODO: remove it once it's not needed anymore (substituted by packjob)
if cluster in reserved_jobtypes and config["computer"].get(
"taskset", False
):
"hetjob_strategy", "hetjob"
) == "taskset":
config = config["general"]["batch"].write_het_par_wrappers(config)
# Prepare launcher
config = config["general"]["batch"].prepare_launcher(config, cluster)
Expand Down Expand Up @@ -703,7 +710,7 @@ def find_openmp(config):
config[model]["nproc"] = 1
return config

def het_par_launcher_lines(self, config, cluster):
def hetjob_single_launcher_command(self, config, cluster):
"""
Loops through the components to generate job launcher flags and execution
commands, to be appended in substitution to the ``@components@`` tag, in
Expand Down Expand Up @@ -747,6 +754,54 @@ def het_par_launcher_lines(self, config, cluster):
.replace("@jobtype@", cluster)
)

def hetjob_concurrent_launcher_commands(self, config, cluster):
"""
Loops through the components to generate job launcher flags and execution
commands, to be appended in substitution to the ``@components@`` tag, in
``computer.execution_command``, that would later be used in the writing of the
``.run`` file, in ``batch_system.py``.

Parameters
----------
config : dict
Configuration dictionary containing information about the experiment and
experiment directory.
cluster : str
Type of job cluster.
"""
component_lines = []
# Read in the separator to be used in between component calls in the job
# launcher
sep = config["computer"].get("launcher_comp_sep", "\\\n ") + " "
# Loop through the components
for model in config["general"]["valid_model_names"]:
command = None
# Read in execution command
if "execution_command" in config[model]:
command = config[model]["execution_command"]
elif "executable" in config[model]:
command = config[model]["executable"]
# Prepare the MPMD commands

# kh 24.06.22 workaround: filter hdmodel
if command and (command != "NONE"):
launcher = config["computer"].get("launcher")
launcher_flags = self.calc_launcher_flags(config, model, cluster)
component_lines.append(f"{launcher_flags} ./{command} ")

execution_command = config["computer"]["execution_command"]
execution_command_list = []
for component in component_lines:
# Replace the ``@components@`` tag with the component command
# and add the ``&`` at the end to run it in background
execution_command_list.append(
execution_command
.replace("@components@", component)
.replace("@jobtype@", cluster)
)

config["computer"]["execution_command_list"] = execution_command_list

@staticmethod
def calc_launcher_flags(config, model, cluster):
"""
Expand Down
29 changes: 18 additions & 11 deletions src/esm_runscripts/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from loguru import logger

import esm_parser
from esm_tools import user_note
from esm_tools import user_note, user_error


class Slurm:
Expand Down Expand Up @@ -69,19 +69,28 @@ def prepare_launcher(self, config, cluster):
current_hostfile = self.path + "_" + run_type
write_one_hostfile(current_hostfile, config)

if config["computer"].get(
heterogeneous_parallelization = config["computer"].get(
"heterogeneous_parallelization", False
) and not config["computer"].get("taskset", False):
# Prepare heterogeneous parallelization call
config["general"]["batch"].het_par_launcher_lines(config, cluster)
else:
)
hetjob_strategy = config["computer"].get("hetjob_strategy", "hetjob")
if not heterogeneous_parallelization or hetjob_strategy == "taskset":
# Standard/old way of running jobs with slurm
self.write_one_hostfile(self.path, config)

hostfile_in_work = (
config["general"]["work_dir"] + "/" + os.path.basename(self.path)
)
shutil.copyfile(self.path, hostfile_in_work)
elif hetjob_strategy == "hetjob" or hetjob_strategy == "srunsteps":
# Prepare heterogeneous parallelization call for hetjob (one srun command
# per binary)
config["general"]["batch"].hetjob_single_launcher_command(config, cluster)
else:
user_error(
"hetjob strategy",
f"``{hetjob_strategy}`` is not a valid one. Choose one among "
f"``hetjob`` (default), ``srunsteps`` or ``taskset``.",
)

return config

Expand Down Expand Up @@ -157,9 +166,8 @@ def add_pre_launcher_lines(self, config, cluster, runfile):
(``runfile.write("<your_line_here>")``).
"""

# TODO: remove it once it's not needed anymore (substituted by packjob)
if config["computer"].get("heterogeneous_parallelization", False):
if config["computer"].get("taskset", False):
if config["computer"].get("hetjob_strategy") == "taskset":
self.add_hostlist_file_gen_lines(config, runfile)

@staticmethod
Expand All @@ -185,9 +193,10 @@ def het_par_headers(config, cluster, headers):
for heterogeneous parallelization in SLURM.
"""
# Only modify the headers if ``heterogeneous_parallelization`` is ``True``
hetjob_strategy = config["computer"].get("hetjob_strategy", "hetjob")
if config["computer"].get(
"heterogeneous_parallelization", False
) and not config["computer"].get("taskset", False):
) and hetjob_strategy not in ["taskset", "srunsteps"]:
this_batch_system = config["computer"]
# Get the variables to be modified for the headers
nodes_flag = this_batch_system["nodes_flag"].split("=")[0]
Expand Down Expand Up @@ -223,7 +232,6 @@ def het_par_headers(config, cluster, headers):

return headers

# TODO: remove it once it's not needed anymore (substituted by packjob)
@staticmethod
def write_het_par_wrappers(config):
cores_per_node = config["computer"]["partitions"]["compute"]["cores_per_node"]
Expand Down Expand Up @@ -300,7 +308,6 @@ def write_het_par_wrappers(config):
config[model]["execution_command_het_par"] = execution_command_het_par
return config

# TODO: remove it once it's not needed anymore (substituted by packjob)
@staticmethod
def add_hostlist_file_gen_lines(config, runfile):
cores_per_node = config["computer"]["partitions"]["compute"]["cores_per_node"]
Expand Down
21 changes: 21 additions & 0 deletions utils/model_time.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

# Help flag
if [[ "$1" == "--help" || "$1" == "-h" ]]; then
echo "Usage: $0 <simulation_pattern>"
echo "Example: $0 'rundir/test_*'"
exit 0
fi

SIM_PATTERN=$1
FILE_PATTERN="${SIM_PATTERN}/log/${SIM_PATTERN}_*_compute_????????-????????_*.log"

for file in $FILE_PATTERN; do
# Find real time in the file
REAL_TIME=$(grep -oP 'real\t.*' "$file" | head -n 1 | awk '{print $2}')
# Extract SIM
SIM=$(echo $file | grep -o '^[^/]*')
# Extract date from the file name
DATE=$(echo $file | grep -oP 'compute_\K[0-9]{8}-[0-9]{8}')
echo $SIM $DATE $REAL_TIME
done
Loading