import itertools
import math
import numbers
import os
import re
import subprocess
from typing import Dict, Tuple, Union, List
[docs]class Job:
_newid = itertools.count()
[docs] def __init__(
self,
script: str,
script_is_file: bool = True,
array: Union[str, list] = [],
cpus_per_task: int = -1,
error_file: str = "",
gpus: dict = {},
gres: dict = {},
logfile_directory: str = "",
mail_address: str = "",
mail_type: str = "",
memory_size: Union[str, int] = "",
name: str = "",
nodes: int = -1,
nodelist: str = "",
output_file: str = "",
partition: str = "",
tasks: int = -1,
time: str = "",
workdir: str = "",
):
"""
A SLURM job which is submitted using sbatch
Args:
script (str): The script file or command which the job should execute.
script_is_file (bool): If the script string is a command to execute directly instead
of a bash script, set this to False. Defaults to True.
array (list or str, optional): Optional array parameters to launch multiple jobs.
When a list is provided a job will be executed with each parameters in the list.
A string can be provided to allow array construction in the SLURM format.
cpus_per_task (int, optional): Number of cpus for each task.
error_file (str, optional): File path for the slurm error file.
gpus (dict, optional): Specify the gpu requirements for the job. See also gres.
gres (dict, optional): Specify the gres requirements for the jobs.
See :func:`slurmpie.slurmpie.Job.gres` for the full specification.
logfile_directory (str, optional): Set a base directory for the output and error files.
If this is set, the full paths don't have to be specified for error_file and output_file
mail_address (str, optional): Mail address to send notifications to.
mail_type (str, optional): Specify for which events a notification should be send.
One of: NONE, BEGIN, END, FAIL, REQUEUE, ALL
memory_size (str or int): Specify memory requirement for job.
See :func:`slurmpie.slurmpie.Job.memory_size` for the specification.
name (str, optional): The name of the job.
nodes (int, optional): Number of nodes to use for the job.
nodelist (str, optional): Request specific host nodes for job.
output_file (str, optional): File path for the slurm output file.
partition (str, optional): Name of the partition to which to submit the job.
tasks (int, optional): Number of tasks.
time (str, optional): The expected/maximum wall time for the job.
Needs to be specified in the SLURM format, one of:
"minutes", "minutes:seconds", "hours:minutes:seconds",
"days-hours", "days-hours:minutes" and "days-hours:minutes:seconds"
workdir (str, optional): The directory to change to at the start of job execution.
Raises:
RuntimeError: If the job could not be successfully executed
"""
self._array = ""
self._dependencies = ""
self._gpus = ""
self._gres = ""
self._memory_size = ""
self._memory_units = None
self._id = next(Job._newid)
self._nodelist = ""
self.script = script
self.script_is_file = script_is_file
self.array = array
self.cpus_per_task = cpus_per_task
self.dependencies = ""
if logfile_directory == "":
self.error_file = error_file
else:
self.error_file = os.path.join(logfile_directory, error_file)
self.gpus = gpus
self.gres = gres
self.mail_address = mail_address
self.name = name
self.nodes = nodes
if logfile_directory == "":
self.output_file = output_file
else:
self.output_file = os.path.join(logfile_directory, output_file)
self.partition = partition
self.tasks = tasks
self.time = time
self.workdir = workdir
if self.mail_address != "" and mail_type == "":
self.mail_type = "ALL"
else:
self.mail_type = mail_type
if memory_size != "":
self.memory_size = memory_size
@staticmethod
def _format_argument_list(argument_value: str, to_add_argument_values: str) -> str:
"""
Formats a multi-argument list for SLURM.
Args:
argument_value (str): The current value of the argument
to_add_argument_values (str): The values to add
Returns:
str: Concatenated arguments
"""
if argument_value == "":
argument_value = to_add_argument_values
elif to_add_argument_values == "":
# Dont need to do anything
pass
else:
argument_value += ","
argument_value += to_add_argument_values
return argument_value
@property
def memory_size(self) -> str:
"""
The memory size to request for the job.
Memory size can be set either as a float, then the default memory units
for the SLURM configuration is used.
Otherwise, the memory size can specified as a string, including the units.
For example `"15GB"` will set the request memory to 15 GB.
Supported memory units are K/KB for kilobyte, M/MB for megabyte
G/GB for gigabyte and T/TB for terabyte
"""
memory_size = self._memory_size
if self._memory_units is not None:
memory_size += self._memory_units
return memory_size
@property
def memory_units(self) -> str:
"""
The current memory units
"""
return self._memory_units
@staticmethod
def _format_memory_size(memory_size: Union[int, str]) -> Tuple[str, str]:
"""Formats the memory size to the correct format.
Memory size can be either given as a float (without units),
or a string which includes the units.
This function then correctly formats the memory size and units
Args:
memory_size (int or str): The memory size to format
Returns:
Tuple[str, str]: The memory size and memory units
"""
MEMORY_UNITS_MAP = {
"K": "K",
"M": "M",
"G": "G",
"T": "T",
"KB": "K",
"MB": "M",
"GB": "G",
"TB": "T",
}
memory_units = None
if isinstance(memory_size, str):
# Regex to split the number and the units
splitted_memory = list(filter(None, re.split(r"(\d*\.?\d*)", memory_size)))
if len(splitted_memory) > 1:
memory_units = MEMORY_UNITS_MAP[splitted_memory[1]]
# Here assume float to properly round later
memory_size = float(splitted_memory[0])
# Memory size has to be an int
# We fix this for the user if they specify a float
memory_size = str(int(math.ceil(memory_size)))
return memory_size, memory_units
@memory_size.setter
def memory_size(self, memory_size: Union[int, str]):
memory_size, memory_units = self._format_memory_size(memory_size)
self._memory_size = memory_size
self._memory_units = memory_units
@property
def gres(self) -> str:
"""The gres resources to request for the job.
The gres resources should be formatted as a (possibly nested) dict.
For example: `job.gres = {"gpu":1}` requests one gpu from gres.
`jobs.gres = {"gpu": {"k40": 1, "k80": 1}}` requests one k40 gpu
and one k80 gpu.
"""
return self._gres
def _format_gres(self, gres_spec: dict) -> str:
"""
Formats the gres specifications from a dict to a SLURM string
Args:
gres_spec (dict): Request gres resources
Returns:
str: SLURM formated gres string
"""
out_gres = ""
# make sure its reproducible
for key, value in sorted(gres_spec.items(), key=lambda x: x[0].lower()):
this_gres = str(key) + ":"
if isinstance(value, dict):
gres_types = self._format_gres(value).split(",")
this_gres = [this_gres + i_type for i_type in gres_types]
this_gres = ",".join(this_gres)
else:
this_gres += str(value)
out_gres = self._format_argument_list(out_gres, this_gres)
return out_gres
@gres.setter
def gres(self, gres_spec: dict):
self._gres = self._format_argument_list(
self._gres, self._format_gres(gres_spec)
)
[docs] def depends_on(self, job_id: Union[list, str], dependency_type="afterany"):
"""
Sets the dependencies of this job based on the SLURM job number.
When submitting a job that depends on another job this can be set
using the job id of the job.
Example:
>>> from slurmpie import slurmpie
>>> job = slurmpie.Job("slurm_script.sh")
>>> dependent_job = slurmpie.Job("slurm_script_2.sh")
>>> job_id = job.submit()
>>> dependent_job.depends_on(job_id)
>>> dependent_job.submit()
The `dependent_job` will now only start running when `job` has finished.
Args:
job_id (list or str): The job id (or multiple ids as a list) on which the job depends.
dependency_type (str, optional): The dependency type of the job (see
sbatch documentation). Defaults to "afterany".
"""
if isinstance(job_id, list):
job_id = ":".join(job_id)
dependency = "{}:{}".format(dependency_type, job_id)
self.dependencies = self._format_argument_list(self.dependencies, dependency)
@property
def array(self) -> str:
"""
The values of the job array.
SLURM supports submitting the same script with different parameters.
Set the array either as a list of values or as a string.
When specified as string it is directly parsed to SLURM, thus the string
should already be in the SLURM format.
When a list is specified it will parse all these values to the array
setting.
"""
return self._array
@array.setter
def array(self, array_spec: Union[list, str]):
if isinstance(array_spec, list):
array_spec = [str(i_spec) for i_spec in array_spec]
array_spec = ",".join(array_spec)
self._array = self._format_argument_list(self._array, array_spec)
@property
def nodelist(self) -> str:
"""
List of nodes to run the job on.
When you want to run a job on a specific node (that is not specified by a queue),
you can use this argument to specify the exact nodes you want to run
the job on.
"""
return self._nodelist
@nodelist.setter
def nodelist(self, nodelist_spec: str):
self._nodelist = nodelist_spec
@property
def gpus(self) -> str:
"""
The gpus to request from the SLURM jobs.
Just like the gres resources, one can request the gpu resources for the job.
This is configuration dependent, so make sure your cluster supports this.
The configuration has to be applied in the same way as for gres.
"""
return self._gpus
@gpus.setter
def gpus(self, gpu_spec: dict):
self._gpus = self._format_argument_list(self._gpus, self._format_gres(gpu_spec))
[docs] @staticmethod
def attribute_is_empty(attribute_value: Union[str, numbers.Number, dict, list]) -> bool:
"""
Checks whether an attribute is empty
Args:
attribute_value (str, numbers.Number, dict, or list]): The value to check
Returns:
bool: True if attribute is empty,false otherwise.
"""
if isinstance(attribute_value, str) and attribute_value == "":
return True
elif isinstance(attribute_value, numbers.Number) and attribute_value == -1:
return True
elif isinstance(attribute_value, dict) and attribute_value == {}:
return True
elif isinstance(attribute_value, list) and attribute_value == []:
return True
else:
return False
def _format_sbatch_command(self) -> list:
"""
Formats the command for sbatch from the job settings.
Returns:
list: Formatted command with one argument per item.
"""
command_mapping = {
"array": "array",
"cpus_per_task": "cpus-per-task",
"dependencies": "dependency",
"error_file": "error",
"gpus": "gpus",
"gres": "gres",
"mail_address": "mail-user",
"mail_type": "mail-type",
"memory_size": "mem",
"name": "job-name",
"nodes": "nodes",
"output_file": "output",
"partition": "partition",
"tasks": "ntasks",
"time": "time",
"workdir": "chdir",
"nodelist": "nodelist"
}
# We set parsable to easily get job id
command = ["sbatch", "--parsable"]
for attribute_name, bash_argument in sorted(
command_mapping.items(), key=lambda x: x[0].lower()
):
attribute_value = getattr(self, attribute_name)
if not self.attribute_is_empty(attribute_value):
command.append("--{}={}".format(bash_argument, attribute_value))
if self.script_is_file:
command.append(self.script)
else:
command.append("--wrap={}".format(self.script))
return command
[docs] def submit(self) -> str:
"""
Submit the job using `sbatch`
Raises:
RuntimeError: If `sbatch` returns an error
Returns:
str: The job number of the submitted job if successful
"""
# The submission is not currently tested, since it requires a slurm install
# Perhaps a docker with slurm pre-installed is a good idea in this case
sbatch_command = self._format_sbatch_command()
sbatch_process = subprocess.Popen(
sbatch_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = sbatch_process.communicate()
if sbatch_process.returncode != 0:
err_msg = "Sbatch job submission failed with follow error:\n{}"
raise RuntimeError(err_msg.format(stderr))
job_number = None
else:
job_number = stdout.decode("utf-8").strip().split(":")[0]
return job_number
[docs]class Pipeline:
"""
Examples:
Simple pipeline in which jobs are added consecutively.
.. literalinclude:: /examples/example_simple_pipeline.py
"""
[docs] def __init__(self, common_job_header=None, **kwargs):
"""
Pipeline to be constructed with multiple jobs depending on each other.
This pipeline makes it easier to create multiple job that depend on each
other and submit them all at ones. Jobs can be added to the pipeline
with different dependencies and the pipeline can then be submitted
as a whole, which will take care off the dependencies between the
different jobs.
Args:
common_job_header (str): In case the command to execute is
direct command (not a file), this will be prepended to every job.
kwargs: Arguments that will be specified for each job,
if that argument has not been set for the job already.
"""
# kwargs will be forwared to individual jobs
self.job_args = kwargs
self.pipeline_jobs = list()
self._job_graph = {-1: []}
self.common_job_header = common_job_header
def _update_job_graph(self, parent_id: Union[str, list], jobs: dict):
"""
Add new jobs to the graph to execute.
Args:
parent_id (str): The ID of the parent job (not submission ID!).
jobs (dict): Jobs that depend on the parent job, with specified dependency type.
"""
if parent_id == -1:
self._job_graph[parent_id].extend(list(itertools.chain(*jobs.values())))
else:
if isinstance(parent_id, str) or isinstance(parent_id, int):
parent_id = [parent_id]
for i_key, i_value in jobs.items():
for i_job in i_value:
if i_job not in self._job_graph:
self._job_graph[i_job] = {i_key: parent_id}
elif i_key not in self._job_graph[i_job]:
self._job_graph[i_job][i_key] = parent_id
else:
self._job_graph[i_job][i_key].extend(parent_id)
[docs] def add(self, jobs: Union[Job, Dict[str, list]], parent_job: Union[Job, list] = None):
"""
Add dependency jobs to the pipeline.
Jobs can keep being added, in which case they are execute consecutively.
Otherwise, a dict can be specified with the job dependency type and list
of the jobs with that dependency type.
A parent job can be set if the jobs depend on a certain parent job.
Otherwise the jobs will just be added to the end of the list and are
executed consecutively.
Args:
jobs (Job or dict): The jobs to add to the pipeline. Either a single job
which will be added to the end of the pipeline, or a dict specifying the
dependency type and a list with the dependent jobs
parent_job (Job or list, optional): If not None, will use this as the job on which the
`jobs` are dependent, a list in case of multiple parent jobs. Defaults to None.
"""
# We accept a single job as ease-of-use for the user
# Convert it here to make the operation consistent
if isinstance(jobs, Job):
jobs = {"afterany": [jobs]}
if parent_job is not None:
if isinstance(parent_job, Job):
parent_id = parent_job._id
elif isinstance(parent_job, list):
parent_id = [i_parent_job._id for i_parent_job in parent_job]
elif len(self.pipeline_jobs) > 0:
parent_id = self.pipeline_jobs[-1]._id
else:
parent_id = -1
for i_job_dependency_type, i_job_list in jobs.items():
for i_job in i_job_list:
# Set arguments from the pipeline if the job does not have them
# If the job does have them, they take precedence over what is defined
# in the pipeline.
for attribute_name, attribute_value in self.job_args.items():
if i_job.attribute_is_empty(getattr(i_job, attribute_name)):
setattr(i_job, attribute_name, attribute_value)
if self.common_job_header is not None and not i_job.script_is_file:
i_job.script = self.common_job_header + " " + i_job.script
self.pipeline_jobs.append(i_job)
self._update_job_graph(parent_id, jobs)
[docs] def add_start_job(self, jobs: Union[Job, list]):
"""
Add a job to the start of the pipeline
Args:
jobs (Job or list): A single job or list of jobs that should be
executed at the start of the pipeline.
"""
if isinstance(jobs, Job):
jobs = [jobs]
self._update_job_graph(-1, {"begin": jobs})
[docs] def submit(self):
"""
Submit all the jobs in the pipeline.
Raises:
RecursionError: If the pipeline cannot be properly executed because of
misformatted dependencies.
"""
# First submit all the start jobs
submission_info = dict()
prev_job_graph_len = len(self._job_graph)
start_jobs = self._job_graph.pop(-1)
for i_start_job in start_jobs:
i_submitted_id = i_start_job.submit()
submission_info[i_start_job._id] = i_submitted_id
cur_job_graph_len = len(self._job_graph)
while cur_job_graph_len > 0 and cur_job_graph_len < prev_job_graph_len:
submitted_jobs = list()
for job, job_dependencies in self._job_graph.items():
flat_dependencies = set(itertools.chain(*job_dependencies.values()))
if flat_dependencies.issubset(set(submission_info.keys())):
for dependency_type, parent_id in job_dependencies.items():
parent_submitted_job_ids = [
submission_info[i_id] for i_id in parent_id
]
job.depends_on(parent_submitted_job_ids, dependency_type)
job_submission_id = job.submit()
submission_info[job._id] = job_submission_id
submitted_jobs.append(job)
for i_submitted_job in submitted_jobs:
self._job_graph.pop(i_submitted_job)
prev_job_graph_len = cur_job_graph_len
cur_job_graph_len = len(self._job_graph)
if cur_job_graph_len > 0:
raise RecursionError(
"You have set impossible to execute job dependencies!\nPlease check your pipeline."
)
[docs]class System:
[docs] def __init__(self):
"""
Get system information of the SLURM cluster
"""
[docs] def get_job_memory(self) -> Union[int, None]:
"""
Get the memory available to the job
Returns:
int or None: The memory size in kilobytes, None if the SLURM
memory amount cannot be determined
"""
if "SLURM_MEM_PER_NODE" in os.environ:
memory_size = int(os.environ["SLURM_MEM_PER_NODE"])
else:
memory_size = None
return memory_size