"""
..
Copyright (c) 2014-2017, Magni developers.
All rights reserved.
See LICENSE.rst for further information.
Module providing the process function.
Routine listings
----------------
process(func, namespace={}, args_list=None, kwargs_list=None, maxtasks=None)
Map multiple function calls to multiple processors.
See Also
--------
magni.utils.multiprocessing.config : Configuration options.
"""
from __future__ import division
import multiprocessing as mp
import os
import traceback
import warnings
import numpy as np
try:
# The concurrent.futures was first added to Python in version 3.2
# A backport for Python 2 is available at https://pythonhosted.org/futures/
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import BrokenProcessPool
_futures_available = True
except ImportError:
_futures_available = False
try:
# disable mkl multiprocessing to avoid conflicts with manual
# multiprocessing
import mkl
_get_num_threads = mkl.get_max_threads
_set_num_threads = mkl.set_num_threads
except ImportError:
[docs] def _get_num_threads():
return 0
[docs] def _set_num_threads(n):
pass
from magni.utils.multiprocessing import config as _config
from magni.utils.validation import decorate_validation as _decorate_validation
from magni.utils.validation import validate_generic as _generic
from magni.utils.validation import validate_levels as _levels
from magni.utils.validation import validate_numeric as _numeric
[docs]def process(func, namespace={}, args_list=None, kwargs_list=None,
maxtasks=None):
"""
Map multiple function calls to multiple processes.
For each entry in args_list and kwargs_list, a task is formed which is used
for a function call of the type `func(*args, **kwargs)`. Each task is
executed in a seperate process using the concept of a processing pool.
Parameters
----------
func : function
A function handle to the function which the calls should be mapped to.
namespace : dict, optional
A dict whose keys and values should be globally available in func (the
default is an empty dict).
args_list : list or tuple, optional
A sequence of argument lists for the function calls (the default is
None, which implies that no arguments are used in the calls).
kwargs_list : list or tuple, optional
A sequence of keyword argument dicts for the function calls (the
default is None, which implies that no keyword arguments are used in
the calls).
maxtasks : int, optional
The maximum number of tasks of a process before it is replaced by a new
process (the default is None, which implies that processes are not
replaced).
Returns
-------
results : list
A list with the results from the function calls.
Raises
------
BrokenPoolError
If using the `concurrent.futures` module and one or more workers
terminate abrubtly with the automatic broken pool restart funtionality
disabled.
See Also
--------
magni.utils.multiprocessing.config : Configuration options.
Notes
-----
If the `workers` configuration option is equal to 0, map is used.
Otherwise, the map functionality of a processing pool is used.
Reasons for using this function over map or standard multiprocessing:
- Simplicity of the code over standard multiprocessing.
- Simplicity in switching between single- and multiprocessing.
- The use of both arguments and keyword arguments in the function calls.
- The reporting of exceptions before termination.
- The possibility of terminating multiprocessing with a single interrupt.
- The option of automatically restarting a broken process pool.
As of Python 3.2, two different, though quite similar, modules exist in the
standard library for managing processing pools: `multiprocessing` and
`concurrent.futures`. According to Python core contributor Jesse Noller,
the plan is to eventually make concurrent.futures the only interface to the
high level processing pools (futures), whereas multiprocessing is supposed
to serve more low level needs for individually handling processes, locks,
queues, etc. (see https://bugs.python.org/issue9205#msg132661). As of
Python 3.5, both the `multiprocessing.Pool` and
`concurrent.futures.ProcessPoolExecutor` serve almost the same purpose and
provide very similar interfaces. The main differences between the two are:
- The option of using a worker initialiser is only available in
`multiprocessing`.
- The option of specifing a maximum number of tasks for a worker to execute
before being replaced to free up ressources (the maxtasksperchild option)
is only available in `multiprocessing`.
- The option of specifying a context is only available in `multiprocessing`
- "Reasonable" handling of abrubt worker termination and exceptions is only
available in `concurrent.futures`.
Particularly, the "reasonable" handling of a broken process pool may be a
strong argument to prefer `concurrent.futures` over `multiprocessing`. The
matter of handling a broken process pool has been extensively discussed in
https://bugs.python.org/issue9205 which led to the fix for
`concurrent.futures`. A similar fix for `multiprocessing` has been proposed
in https://bugs.python.org/issue22393.
Both the `multiprocessing` and `concurrent.futures` interfaces are
available for use with this function. If the configuration parameter
`prefer_futures` is set to True and the `concurrent.futures` module is
available, this is used. Otherwise, the `multiprocessing` module is used. A
Python 2 backport of `concurrent.futures` is available at
https://pythonhosted.org/futures/.
When using `concurrent.futures`, the `namespace` and `init_args` are
ignored since these are not supported by that module. Support for
`maxtasks` is emulated in a way such that each process on average does
`maxtasks` before it is restarted. The `init_args` functionality may be
added later on if an initialiser is added to `concurrent.futures` - see
http://bugs.python.org/issue21423. If the `max_broken_pool_restarts`
configuration parameter is set to a value different from 0, the Pool is
automatically restarted and the tasks are re-run should a broken pool be
encountered. If `max_broken_pool_restarts` is set to 0, a BrokenPoolError
is raised should a broken pool be encountered.
When using `multiprocessing`, the `max_broken_pool_restarts` is ignored
since the BrokenPoolError handling has not yet been implemented for the
`multiprocessing.Pool` - see https://bugs.python.org/issue22393 as well as
https://bugs.python.org/issue9205.
Examples
--------
An example of how to use args_list and kwargs_list:
>>> import magni
>>> from magni.utils.multiprocessing._processing import process
>>> def calculate(a, b, op='+'):
... if op == '+':
... return a + b
... elif op == '-':
... return a - b
...
>>> args_list = [[5, 7], [9, 3]]
>>> kwargs_list = [{'op': '+'}, {'op': '-'}]
>>> process(calculate, args_list=args_list, kwargs_list=kwargs_list)
[12, 6]
or the same example preferring `concurrent.futures` over `multiprocessing`:
>>> magni.utils.multiprocessing.config['prefer_futures'] = True
>>> process(calculate, args_list=args_list, kwargs_list=kwargs_list)
[12, 6]
"""
@_decorate_validation
def validate_input():
_generic('func', 'function')
_generic('namespace', 'mapping')
_levels('args_list', (_generic(None, 'collection', ignore_none=True),
_generic(None, 'explicit collection')))
_levels('kwargs_list', (_generic(None, 'collection', ignore_none=True),
_generic(None, 'mapping')))
if args_list is None and kwargs_list is None:
msg = ('The value of >>args_list<<, {!r}, and/or the value of '
'>>kwargs_list<<, {!r}, must be different from {!r}.')
raise ValueError(msg.format(args_list, kwargs_list, None))
elif args_list is not None and kwargs_list is not None:
if len(args_list) != len(kwargs_list):
msg = ('The value of >>len(args_list)<<, {!r}, must be equal '
'to the value of >>len(kwargs_list)<<, {!r}.')
raise ValueError(msg.format(len(args_list), len(kwargs_list)))
_numeric('maxtasks', 'integer', range_='(0;inf)', ignore_none=True)
validate_input()
tasks = _get_tasks(func, args_list, kwargs_list)
if _config['workers'] == 0:
_process_init(func, namespace)
results = list(map(_process_worker, tasks))
else:
if os.name == 'nt':
raise NotImplementedError('This function is not available under '
'Windows.')
if _futures_available and _config['prefer_futures']:
map_ = _map_using_futures
else:
map_ = _map_using_mppool
try:
num_threads = _get_num_threads()
_set_num_threads(1)
results = map_(_process_worker, tasks, (func, namespace), maxtasks,
_config['max_broken_pool_restarts'])
finally:
_set_num_threads(num_threads)
return results
[docs]def _get_tasks(func, args_list, kwargs_list):
"""
Prepare a list of tasks.
Parameters
----------
func : function
A function handle to the function which the calls should be mapped to.
args_list : list or tuple, optional
A sequence of argument lists for the function calls (the default is
None, which implies that no arguments are used in the calls).
kwargs_list : list or tuple, optional
A sequence of keyword argument dicts for the function calls (the
default is None, which implies that no keyword arguments are used in
the calls).
Returns
-------
tasks : list
The list of tasks.
"""
if args_list is None:
args_list = [() for dct in kwargs_list]
if kwargs_list is None:
kwargs_list = [{} for lst in args_list]
tasks = [func for lst in args_list]
tasks = list(zip(tasks, args_list, kwargs_list))
return tasks
[docs]def _map_using_futures(func, tasks, init_args, maxtasks,
max_broken_pool_restarts):
"""
Map a set of `tasks` to `func` and run them in parallel using a futures.
If `max_broken_pool_restarts` is different from 0, the tasks must be an
explicit collection, e.g. a list or tuple, for the restart to work. If an
exception occurs in one of the function calls, the process pool terminates
ASAP and re-raises the first exception that occurred. All exceptions that
may have occurred in the workers are available as the last element in the
exceptions args.
Parameters
----------
func : function
A function handle to the function which the calls should be mapped to.
tasks : iterable
The list of tasks to use as arguments in the function calls.
init_args : tuple
The (func, namespace) tuple for the _process_init initialisation
function.
maxtasks : int
The maximum number of tasks of a process before it is replaced by a new
process. If set to None, the process is never replaced.
max_broken_pool_restarts : int or None
The maximum number of attempts at restarting the process pool upon a
BrokenPoolError. If set to None, the process pool may restart
indefinitely.
Returns
-------
results : list
The list of results from the map operation.
Notes
-----
The `namespace` and `init_args` are ignored since these are not supported
by `concurrent.futures`. Support for `maxtasks` is emulated in a way such
that each process on average does `maxtasks` before it is restarted. The
`init_args` functionality may be added if an initialiser is added to
`concurrent.futures` - see http://bugs.python.org/issue21423.
"""
num_workers = _config['workers']
def _fix_none_maxtasks(maxtasks):
"""Handle maxtasks==None correctly."""
if maxtasks is None:
return int(len(tasks) / num_workers)
else:
return maxtasks
maxtasks = _fix_none_maxtasks(maxtasks)
try:
batch_size = num_workers * maxtasks
future_batches = max(int(np.ceil(len(tasks) / batch_size)), 1)
future_results = []
for l in range(future_batches):
# Restart ProcessPool when all workers have done `maxtasks` on
# average in order to free ressources
workers = ProcessPoolExecutor(max_workers=num_workers)
futures = [workers.submit(func, task)
for task in tasks[l*batch_size:(l+1)*batch_size]]
concurrent.futures.wait(
futures, return_when=concurrent.futures.FIRST_EXCEPTION)
future_results.extend([future.result() for future in futures])
workers.shutdown()
return future_results
except BrokenProcessPool as e:
workers.shutdown()
base_msg = 'A BrokenProcessPool was encountered. '
if max_broken_pool_restarts is None:
msg = base_msg + 'Restarting the process pool.'
warnings.warn(msg, RuntimeWarning, stacklevel=2)
return _map_using_futures(func, tasks, init_args, maxtasks, None)
elif max_broken_pool_restarts > 0:
new_restart_count = max_broken_pool_restarts - 1
msg = (base_msg + 'Restarting the process pool with ' +
'max_broken_pool_restarts={}.').format(new_restart_count)
warnings.warn(msg, RuntimeWarning, stacklevel=2)
return _map_using_futures(
func, tasks, init_args, maxtasks, new_restart_count)
else:
msg = (base_msg + 'Giving up on restarting the process pool ' +
'and re-raising.')
warnings.warn(msg, RuntimeWarning, stacklevel=2)
raise
except BaseException as e:
worker_exceptions = [future.exception() for future in futures]
msg = ('An exception occurred in one or more workers. ' +
'Re-raising with all exceptions appended to the current ' +
'exceptions arguments.')
warnings.warn(msg, RuntimeWarning)
e.args = e.args + tuple(worker_exceptions)
raise
finally:
workers.shutdown()
[docs]def _map_using_mppool(func, tasks, init_args, maxtasks,
max_broken_pool_restarts):
"""
Map a set of `tasks` to `func` and run them in parallel via multiprocessing
Parameters
----------
func : function
A function handle to the function which the calls should be mapped to.
tasks : iterable
The list of tasks to use as arguments in the function calls.
init_args : tuple
The (func, namespace) tuple for the _process_init initialisation
function.
maxtasks : int
The maximum number of tasks of a process before it is replaced by a new
process. If set to None, the process is never replaced.
max_broken_pool_restarts : int or None
The maximum number of attempts at restarting the process pool upon a
BrokenPoolError. If set to None, the process pool may restart
indefinitely.
Returns
-------
results : list
The list of results from the map operation.
Notes
-----
The `max_broken_pool_restarts` is ignored since the BrokenPoolError
handling has not yet been implemented in the multiprocessing.Pool - see
https://bugs.python.org/issue22393 and https://bugs.python.org/issue9205.
"""
try:
workers = mp.Pool(
_config['workers'], _process_init, init_args, maxtasks)
return workers.map(func, tasks, chunksize=1)
finally:
workers.close()
workers.join()
[docs]def _process_init(func, namespace):
"""
Initialise the process by making global variables available to it.
Parameters
----------
func : function
A function handle to the function which the calls should be mapped to.
namespace : dict
A dict whose keys and values should be globally available in func.
"""
func.__globals__.update(namespace)
[docs]def _process_worker(fak_tuple):
"""
Unpack and map a task to the function.
Parameters
----------
fak_tuple : tuple
A tuple (func, args, kwargs) containing the parameters listed below.
func : function
A function handle to the function which the calls should be mapped to.
args : list or tuple
The sequence of arguments that should be unpacked and passed.
kwargs : list or tuple
The sequence of keyword arguments that should be unpacked and passed.
Notes
-----
If an exception is raised in `func`, the stacktrace of that exception is
printed since the exception is otherwise silenced until every task has been
executed when using multiple workers.
Also, a workaround has been implemented to allow KeyboardInterrupts to
interrupt the current tasks and all remaining tasks. This is done by
setting a global variable, when catching a KeyboardInterrupt, which is
checked for every call.
"""
func, args, kwargs = fak_tuple
if 'interrupted' not in _process_worker.__globals__:
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
_process_worker.__globals__['interrupted'] = True
except BaseException as e:
print(traceback.format_exc())
if _config['silence_exceptions']:
return e
elif _config['re_raise_exceptions']:
raise
else:
raise RuntimeError('An exception has occured.')