Skip to content

Commit 69ab62f

Browse files
committed
Debugged for all combinations of (_LIMIT_WORKERS_RAM, _LIMIT_WORKERS_RAM), documentation extended
1 parent 732a57d commit 69ab62f

File tree

2 files changed

+102
-77
lines changed

2 files changed

+102
-77
lines changed

README.md

Lines changed: 66 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
# PyExPool
22

3-
A Lightweight Multi-Process Execution Pool to schedule Jobs execution with *per-job timeout*, optionally grouping them into Tasks and specifying optional execution parameters considering NUMA architecture:
3+
A Lightweight Multi-Process Execution Pool to schedule Jobs execution with *per-job timeout*, optionally grouping them into Tasks and specifying optional execution parameters considering NUMA architecture peculiarities:
44

55
- automatic CPU affinity management and maximization of the dedicated CPU cache for a worker process
6-
- automatic rescheduling and balancing (reduction) of the worker processes and on low memory condition for the in-RAM computations (requires [psutil](https://pypi.python.org/pypi/psutil), can be disabled)
7-
- chained termination of related worker processes and jobs rescheduling to satisfy timeout and memory limit constraints
6+
- automatic rescheduling and *load balancing* (reduction) of the worker processes and on low memory condition for the *in-RAM computations* (requires [psutil](https://pypi.python.org/pypi/psutil), can be disabled)
7+
- *chained termination* of related worker processes and jobs rescheduling to satisfy *timeout* and *memory limit* constraints
88
- timeout per each Job (it was the main initial motivation to implement this module, because this feature is not provided by any Python implementation out of the box)
9-
- onstart/ondone callbacks, ondone is called only on successful completion (not termination) for both Jobs and Tasks (group of jobs)
9+
- onstart/ondone *callbacks*, ondone is called only on successful completion (not termination) for both Jobs and Tasks (group of jobs)
1010
- stdout/err output, which can be redirected to any custom file or PIPE
1111
- custom parameters for each Job and respective owner Task besides the name/id
1212

@@ -16,16 +16,19 @@ Implemented as a *single-file module* to be *easily included into your project a
1616
The main purpose of this single-file module is the **asynchronous execution of modules and external executables with cache / parallelization tuning and automatic balancing of the worker processes for the in-RAM computations**.
1717
In case asynchronous execution of the *Python functions* is required and usage of external dependences is not a problem, or automatic jobs scheduling for in-RAM computations is not required, then more handy and straightforward approach is to use [Pebble](https://pypi.python.org/pypi/Pebble) library.
1818

19+
The load balancing is enabled when global variables `_LIMIT_WORKERS_RAM` and `_CHAINED_CONSTRAINTS` are set, jobs categories and relative size (if known) specified. The balancing is performed to use as much RAM and CPU resources as possible performing in-RAM computations and meeting timeout, memory limit and CPU cache (processes affinity) constraints. Large executing jobs are rescheduled for the later execution with less number of worker processes after the completion of smaller jobs. The number of workers is reduced automatically (balanced) on the jobs queue processing. It is recommended to add jobs in the order of the increasing memory/time complexity if possible to reduce the number of worker process terminations for the jobs execution postponing on rescheduling.
20+
1921
\author: (c) Artem Lutov <artem@exascale.info>
2022
\organizations: [eXascale Infolab](http://exascale.info/), [Lumais](http://www.lumais.com/), [ScienceWise](http://sciencewise.info/)
21-
\date: 2016-01
23+
\date: 2017-06
2224

2325
## Content
2426
- [Dependencies](#dependencies)
2527
- [API](#api)
2628
- [Job](#job)
2729
- [Task](#task)
2830
- [ExecPool](#execpool)
31+
- [Accessory Routines](#accessory-routines)
2932
- [Usage](#usage)
3033
- [Usage Example](#usage-example)
3134
- [Failsafe Termination](#failsafe-termination)
@@ -47,7 +50,7 @@ $ sudo pip install mock
4750

4851
## API
4952

50-
Flexible API provides *automatic CPU affinity management, maximization of the dedicated CPU cache, limitation of the minimal dedicated RAM per worker process, balancing of the worker processes and rescheduling of chains of related jobs on low memory condition for the in-RAM computations*, optional automatic restart of jobs on timeout, access to job's process, parent task, start and stop execution time and more...
53+
Flexible API provides *automatic CPU affinity management, maximization of the dedicated CPU cache, limitation of the minimal dedicated RAM per worker process, balancing of the worker processes and rescheduling of chains of the related jobs on low memory condition for the in-RAM computations*, optional automatic restart of jobs on timeout, access to job's process, parent task, start and stop execution time and more...
5154
`ExecPool` represents a pool of worker processes to execute `Job`s that can be grouped into `Tasks`s for more flexible management.
5255

5356
### Job
@@ -143,6 +146,63 @@ Task(name, timeout=0, onstart=None, ondone=None, params=None, stdout=sys.stdout,
143146
```
144147
### ExecPool
145148
```python
149+
ExecPool(wksnum=cpu_count(), afnstep=None, vmlimit=0., latency=0.)
150+
"""Multi-process execution pool of jobs
151+
152+
wksnum - number of resident worker processes, >=1. The reasonable value is
153+
<= NUMA nodes * node CPUs, which is typically returned by cpu_count(),
154+
where node CPUs = CPU cores * HW treads per core.
155+
To guarantee minimal average RAM per a process, for example 2.5 Gb:
156+
wksnum = min(cpu_count(), max(ramfracs(2.5), 1))
157+
afnstep - affinity step, integer if applied. Used to bind worker to the
158+
processing units to have warm cache for single thread workers.
159+
Typical values:
160+
None - do not use affinity at all (recommended for multi-threaded workers),
161+
1 - maximize parallelization (the number of worker processes = CPU units),
162+
cpucorethreads() - maximize the dedicated CPU cache (the number of
163+
worker processes = CPU cores = CPU units / hardware treads per CPU core).
164+
NOTE: specification of the afnstep might cause reduction of the workers number.
165+
vmlimit - limit total amount of VM (automatically reduced to the amount of physical
166+
RAM if the larger value is specified) in gigabytes that can be used by worker
167+
processes to provide in-RAM computations.
168+
Dynamically reduce the number of workers to consume total virtual memory
169+
not more than specified. The workers are rescheduled starting from the
170+
most memory-heavy processes. >= 0
171+
NOTE:
172+
- applicable only if _LIMIT_WORKERS_RAM
173+
- 0 means unlimited (some jobs might be [partially] swapped)
174+
- value > 0 is automatically limited with total physical RAM to process
175+
jobs in RAM almost without the swapping
176+
latency - approximate minimal latency of the workers monitoring in sec, float >= 0.
177+
0 means automatically defined value (recommended, typically 2-3 sec).
178+
"""
179+
180+
execute(job, async=True):
181+
"""Schedule the job for the execution
182+
183+
job - the job to be executed, instance of Job
184+
async - async execution or wait until execution completed
185+
NOTE: sync tasks are started at once
186+
return - 0 on successful execution, process return code otherwise
187+
"""
188+
189+
join(timeout=0):
190+
"""Execution cycle
191+
192+
timeout - execution timeout in seconds before the workers termination, >= 0.
193+
0 means absence of the timeout. The time is measured SINCE the first job
194+
was scheduled UNTIL the completion of all scheduled jobs.
195+
return - True on graceful completion, False on termination by the specified timeout
196+
"""
197+
198+
__del__():
199+
"""Force termination of the pool"""
200+
201+
__finalize__():
202+
"""Force termination of the pool"""
203+
```
204+
### Accessory Routines
205+
```
146206
def ramfracs(fracsize):
147207
"""Evaluate the minimal number of RAM fractions of the specified size in GB
148208
@@ -186,63 +246,8 @@ def afnicpu(iafn, corethreads=1, nodes=1, crossnodes=True):
186246
return CPU index respective to the specified index in the affinity table
187247
"""
188248
189-
ExecPool(workers=cpu_count(), afnstep=None, vmlimit=0., latency=0.)
190-
"""Multi-process execution pool of jobs
191-
192-
workers - number of resident worker processes, >=1. The reasonable value is
193-
<= NUMA nodes * node CPUs, which is typically returned by cpu_count(),
194-
where node CPUs = CPU cores * HW treads per core.
195-
To guarantee minimal average RAM per a process, for example 2.5 GB:
196-
workers = min(cpu_count(), max(ramfracs(2.5), 1))
197-
afnstep - affinity step, integer if applied. Used to bind worker to the
198-
processing units to have warm cache for single thread workers.
199-
Typical values:
200-
None - do not use affinity at all (recommended for multi-threaded workers),
201-
1 - maximize parallelization (the number of worker processes = CPU units),
202-
cpucorethreads() - maximize the dedicated CPU cache (the number of
203-
worker processes = CPU cores = CPU units / hardware treads per CPU core).
204-
NOTE: specification of the afnstep might cause reduction of the workers number.
205-
vmlimit - limit total amount of VM (automatically reduced to the amount of physical
206-
RAM if the larger value is specified) in gigabytes that can be used by worker
207-
processes to provide in-RAM computations.
208-
Dynamically reduce the number of workers to consume total virtual memory
209-
not more than specified. The workers are rescheduled starting from the
210-
most memory-heavy processes. >= 0
211-
NOTE:
212-
- applicable only if _LIMIT_WORKERS_RAM
213-
- 0 means unlimited (some jobs might be [partially] swapped)
214-
- value > 0 is automatically limited with total physical RAM to process
215-
jobs in RAM almost without the swapping
216-
latency - approximate minimal latency of the workers monitoring in sec, float >= 0.
217-
0 means automatically defined value (recommended, typically 2-3 sec).
218-
"""
219-
220-
execute(job, async=True):
221-
"""Schedule the job for the execution
222-
223-
job - the job to be executed, instance of Job
224-
async - async execution or wait until execution completed
225-
NOTE: sync tasks are started at once
226-
return - 0 on successful execution, process return code otherwise
227-
"""
228-
229-
join(timeout=0):
230-
"""Execution cycle
231-
232-
timeout - execution timeout in seconds before the workers termination, >= 0.
233-
0 means absence of the timeout. The time is measured SINCE the first job
234-
was scheduled UNTIL the completion of all scheduled jobs.
235-
return - True on graceful completion, False on termination by the specified timeout
236-
"""
237-
238-
__del__():
239-
"""Force termination of the pool"""
240-
241-
__finalize__():
242-
"""Force termination of the pool"""
243249
```
244250

245-
246251
## Usage
247252

248253
Target version of the Python is 2.7+ including 3.x, also works fine on PyPy.

mpepool.py

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,45 @@
22
# -*- coding: utf-8 -*-
33

44
"""
5-
\descr: Multi-Process Execution Pool to schedule Jobs execution with per-Job timeout,
6-
optionally grouping them into Tasks and specifying execution paremeters:
7-
- timeout per each Job (it was the main motivation to implemtent this module)
8-
- onstart/ondone callbacks, ondone is called only on successful completion (not termination)
9-
- stdout/err output, which can be redireted to any custom file or PIPE
10-
- custom parameters for each job and task besides the name/id
5+
\descr: Multi-Process Execution Pool to schedule Jobs execution with per-job timeout,
6+
optionally grouping them into Tasks and specifying optional execution parameters
7+
considering NUMA architecture:
8+
- automatic CPU affinity management and maximization of the dedicated CPU cache
9+
for a worker process
10+
- automatic rescheduling and balancing (reduction) of the worker processes and on
11+
low memory condition for the in-RAM computations (requires psutil, can be disabled)
12+
- chained termination of related worker processes and jobs rescheduling to satisfy
13+
timeout and memory limit constraints
14+
- timeout per each Job (it was the main initial motivation to implement this module,
15+
because this feature is not provided by any Python implementation out of the box)
16+
- onstart/ondone callbacks, ondone is called only on successful completion
17+
(not termination) for both Jobs and Tasks (group of jobs)
18+
- stdout/err output, which can be redirected to any custom file or PIPE
19+
- custom parameters for each Job and respective owner Task besides the name/id
1120
1221
Flexible API provides optional automatic restart of jobs on timeout, access to job's process,
1322
parent task, start and stop execution time and much more...
1423
15-
Global functionality parameters:
24+
25+
Core parameters specified as global variables:
1626
_LIMIT_WORKERS_RAM - limit the amount of virtual memory (<= RAM) used by worker processes,
1727
requires psutil import
1828
_CHAINED_CONSTRAINTS - terminate related jobs on terminating any job by the execution
1929
constraints (timeout or RAM limit)
2030
31+
The load balancing is enabled when global variables _LIMIT_WORKERS_RAM and _CHAINED_CONSTRAINTS
32+
are set, jobs categories and relative size (if known) specified. The balancing is performed
33+
to use as much RAM and CPU resources as possible performing in-RAM computations and meeting
34+
timeout, memory limit and CPU cache (processes affinity) constraints.
35+
Large executing jobs are rescheduled for the later execution with less number of worker
36+
processes after the completion of smaller jobs. The number of workers is reduced automatically
37+
(balanced) on the jobs queue processing. It is recommended to add jobs in the order of the
38+
increasing memory/time complexity if possible to reduce the number of worker process
39+
terminations for the jobs execution postponing on rescheduling.
40+
2141
\author: (c) Artem Lutov <artem@exascale.info>
2242
\organizations: eXascale Infolab <http://exascale.info/>, Lumais <http://www.lumais.com/>, ScienceWise <http://sciencewise.info/>
23-
\date: 2015-07
43+
\date: 2015-07 (v1), 2017-06 (v2)
2444
"""
2545

2646
from __future__ import print_function, division # Required for stderr output, must be the first import
@@ -279,13 +299,13 @@ def __init__(self, name, workdir=None, args=(), timeout=0, ontimeout=False, task
279299
self._omitafn = omitafn
280300
if _LIMIT_WORKERS_RAM or _CHAINED_CONSTRAINTS:
281301
self.size = size # Size of the processing data
302+
# Consumed VM on execution in gigabytes or the least expected (inherited from the
303+
# related jobs having the same category and non-smaller size)
304+
self.vmem = 0.
282305
if _CHAINED_CONSTRAINTS:
283306
self.category = category # Job name
284307
self.slowdown = slowdown # Execution slowdown ratio, ~ 1 / exec_speed
285308
if _LIMIT_WORKERS_RAM:
286-
# Consumed VM on execution in gigabytes or the least expected (inherited from the
287-
# related jobs having the same category and non-smaller size)
288-
self.vmem = 0.
289309
self.wkslim = None # Worker processes limit (max number) on the job postponing if any
290310

291311

@@ -516,9 +536,8 @@ def __init__(self, wksnum=cpu_count(), afnstep=None, vmlimit=0., latency=0.):
516536
# Virtual memory tracing attributes
517537
# Dedicate at least 256 Mb for OS using not more than 99% of RAM
518538
self._vmlimit = 0. if not _LIMIT_WORKERS_RAM else max(0, min(vmlimit, _RAM_SIZE * 0.99 - 0.25)) # in Gb
519-
#self.vmtotal = 0. # Virtual memory used by all workers in gigabytes
520539
# Execution rescheduling attributes
521-
self._latency = latency if latency else 2 + (not not self._vmlimit) # Seconds of sleep on pooling
540+
self._latency = latency if latency else 1 + (not not self._vmlimit) # Seconds of sleep on pooling
522541
# Predefined private attributes
523542
self._killCount = 3 # 3 cycles of self._latency, termination wait time
524543
self.__termlock = Lock() # Lock for the __terminate() to avoid simultaneous call by the signal and normal execution flow
@@ -653,7 +672,7 @@ def __postpone(self, job, reduced, priority=False):
653672
# Update limit of the worker processes of the other larger nonstarted jobs of the same category as this job has,
654673
# and move jobs with the lowest wkslim to the end.
655674
# Note: the update should be made for all nonstarted jobs, not only for the one caused the reduction.
656-
if job.category is not None:
675+
if _CHAINED_CONSTRAINTS and job.category is not None:
657676
jobsnum = len(self._jobs) # The number of jobs
658677
ij = 1 if priority else 0 # Job index
659678
while ij < jobsnum:
@@ -1379,10 +1398,11 @@ def test_jobMemlimSimple(self):
13791398
self.assertGreaterEqual(jmsDvs.tstop - jmsDvs.tstart, worktime) # Smaller size of the ralted chained job to the vioated origin should not cause termination
13801399
self.assertGreaterEqual(jms1.tstop - jms1.tstart, worktime) # Independent job should have graceful completion
13811400
self.assertFalse(jms1.proc.returncode) # Errcode code is 0 on the gracefull completion
1382-
self.assertIsNone(jmsDvl1.tstart) # Postponed job should be terminated before being started by the chaned relation on the memory-violating origin
1401+
if _CHAINED_CONSTRAINTS:
1402+
self.assertIsNone(jmsDvl1.tstart) # Postponed job should be terminated before being started by the chained relation on the memory-violating origin
1403+
self.assertIsNone(jmsDvl2.tstart) # Postponed job should be terminated before being started by the chained relation on the memory-violating origin
13831404
#self.assertLess(jmsDvl1.tstop - jmsDvl1.tstart, worktime) # Early termination by the chained retalion to the mem violated origin
13841405
self.assertGreaterEqual(jms2.tstop - jms2.tstart, worktime) # Independent job should have graceful completion
1385-
self.assertIsNone(jmsDvl2.tstart) # Postponed job should be terminated before being started by the chaned relation on the memory-violating origin
13861406

13871407

13881408
@unittest.skipUnless(_LIMIT_WORKERS_RAM, 'Requires _LIMIT_WORKERS_RAM')

0 commit comments

Comments
 (0)