magni.utils.multiprocessing._processing module¶
Module providing the process function.
Routine listings¶
- process(func, namespace={}, args_list=None, kwargs_list=None, maxtasks=None)
- Map multiple function calls to multiple processors.
See also
magni.utils.multiprocessing.config
- Configuration options.
-
magni.utils.multiprocessing._processing.
process
(func, namespace={}, args_list=None, kwargs_list=None, maxtasks=None)[source]¶ Map multiple function calls to multiple processes.
For each entry in args_list and kwargs_list, a task is formed which is used for a function call of the type func(*args, **kwargs). Each task is executed in a seperate process using the concept of a processing pool.
Parameters: - func (function) – A function handle to the function which the calls should be mapped to.
- namespace (dict, optional) – A dict whose keys and values should be globally available in func (the default is an empty dict).
- args_list (list or tuple, optional) – A sequence of argument lists for the function calls (the default is None, which implies that no arguments are used in the calls).
- kwargs_list (list or tuple, optional) – A sequence of keyword argument dicts for the function calls (the default is None, which implies that no keyword arguments are used in the calls).
- maxtasks (int, optional) – The maximum number of tasks of a process before it is replaced by a new process (the default is None, which implies that processes are not replaced).
Returns: results (list) – A list with the results from the function calls.
Raises: BrokenPoolError
– If using theconcurrent.futures
module and one or more workers terminate abrubtly with the automatic broken pool restart funtionality disabled.See also
magni.utils.multiprocessing.config()
- Configuration options.
Notes
If the workers configuration option is equal to 0, map is used. Otherwise, the map functionality of a processing pool is used.
Reasons for using this function over map or standard multiprocessing:
- Simplicity of the code over standard multiprocessing.
- Simplicity in switching between single- and multiprocessing.
- The use of both arguments and keyword arguments in the function calls.
- The reporting of exceptions before termination.
- The possibility of terminating multiprocessing with a single interrupt.
- The option of automatically restarting a broken process pool.
As of Python 3.2, two different, though quite similar, modules exist in the standard library for managing processing pools:
multiprocessing
andconcurrent.futures
. According to Python core contributor Jesse Noller, the plan is to eventually make concurrent.futures the only interface to the high level processing pools (futures), whereas multiprocessing is supposed to serve more low level needs for individually handling processes, locks, queues, etc. (see https://bugs.python.org/issue9205#msg132661). As of Python 3.5, both themultiprocessing.Pool
andconcurrent.futures.ProcessPoolExecutor
serve almost the same purpose and provide very similar interfaces. The main differences between the two are:- The option of using a worker initialiser is only available in
multiprocessing
. - The option of specifing a maximum number of tasks for a worker to execute
before being replaced to free up ressources (the maxtasksperchild option)
is only available in
multiprocessing
. - The option of specifying a context is only available in
multiprocessing
- “Reasonable” handling of abrubt worker termination and exceptions is only
- available in
concurrent.futures
.
Particularly, the “reasonable” handling of a broken process pool may be a strong argument to prefer
concurrent.futures
overmultiprocessing
. The matter of handling a broken process pool has been extensively discussed in https://bugs.python.org/issue9205 which led to the fix forconcurrent.futures
. A similar fix formultiprocessing
has been proposed in https://bugs.python.org/issue22393.Both the
multiprocessing
andconcurrent.futures
interfaces are available for use with this function. If the configuration parameter prefer_futures is set to True and theconcurrent.futures
module is available, this is used. Otherwise, themultiprocessing
module is used. A Python 2 backport ofconcurrent.futures
is available at https://pythonhosted.org/futures/.When using
concurrent.futures
, the namespace and init_args are ignored since these are not supported by that module. Support for maxtasks is emulated in a way such that each process on average does maxtasks before it is restarted. The init_args functionality may be added later on if an initialiser is added toconcurrent.futures
- see http://bugs.python.org/issue21423. If the max_broken_pool_restarts configuration parameter is set to a value different from 0, the Pool is automatically restarted and the tasks are re-run should a broken pool be encountered. If max_broken_pool_restarts is set to 0, a BrokenPoolError is raised should a broken pool be encountered.When using
multiprocessing
, the max_broken_pool_restarts is ignored since the BrokenPoolError handling has not yet been implemented for themultiprocessing.Pool
- see https://bugs.python.org/issue22393 as well as https://bugs.python.org/issue9205.Examples
An example of how to use args_list and kwargs_list:
>>> import magni >>> from magni.utils.multiprocessing._processing import process >>> def calculate(a, b, op='+'): ... if op == '+': ... return a + b ... elif op == '-': ... return a - b ... >>> args_list = [[5, 7], [9, 3]] >>> kwargs_list = [{'op': '+'}, {'op': '-'}] >>> process(calculate, args_list=args_list, kwargs_list=kwargs_list) [12, 6]
or the same example preferring
concurrent.futures
overmultiprocessing
:>>> magni.utils.multiprocessing.config['prefer_futures'] = True >>> process(calculate, args_list=args_list, kwargs_list=kwargs_list) [12, 6]
-
magni.utils.multiprocessing._processing.
_get_tasks
(func, args_list, kwargs_list)[source]¶ Prepare a list of tasks.
Parameters: - func (function) – A function handle to the function which the calls should be mapped to.
- args_list (list or tuple, optional) – A sequence of argument lists for the function calls (the default is None, which implies that no arguments are used in the calls).
- kwargs_list (list or tuple, optional) – A sequence of keyword argument dicts for the function calls (the default is None, which implies that no keyword arguments are used in the calls).
Returns: tasks (list) – The list of tasks.
-
magni.utils.multiprocessing._processing.
_map_using_futures
(func, tasks, init_args, maxtasks, max_broken_pool_restarts)[source]¶ Map a set of tasks to func and run them in parallel using a futures.
If max_broken_pool_restarts is different from 0, the tasks must be an explicit collection, e.g. a list or tuple, for the restart to work. If an exception occurs in one of the function calls, the process pool terminates ASAP and re-raises the first exception that occurred. All exceptions that may have occurred in the workers are available as the last element in the exceptions args.
Parameters: - func (function) – A function handle to the function which the calls should be mapped to.
- tasks (iterable) – The list of tasks to use as arguments in the function calls.
- init_args (tuple) – The (func, namespace) tuple for the _process_init initialisation function.
- maxtasks (int) – The maximum number of tasks of a process before it is replaced by a new process. If set to None, the process is never replaced.
- max_broken_pool_restarts (int or None) – The maximum number of attempts at restarting the process pool upon a BrokenPoolError. If set to None, the process pool may restart indefinitely.
Returns: results (list) – The list of results from the map operation.
Notes
The namespace and init_args are ignored since these are not supported by
concurrent.futures
. Support for maxtasks is emulated in a way such that each process on average does maxtasks before it is restarted. The init_args functionality may be added if an initialiser is added toconcurrent.futures
- see http://bugs.python.org/issue21423.
-
magni.utils.multiprocessing._processing.
_map_using_mppool
(func, tasks, init_args, maxtasks, max_broken_pool_restarts)[source]¶ Map a set of tasks to func and run them in parallel via multiprocessing
Parameters: - func (function) – A function handle to the function which the calls should be mapped to.
- tasks (iterable) – The list of tasks to use as arguments in the function calls.
- init_args (tuple) – The (func, namespace) tuple for the _process_init initialisation function.
- maxtasks (int) – The maximum number of tasks of a process before it is replaced by a new process. If set to None, the process is never replaced.
- max_broken_pool_restarts (int or None) – The maximum number of attempts at restarting the process pool upon a BrokenPoolError. If set to None, the process pool may restart indefinitely.
Returns: results (list) – The list of results from the map operation.
Notes
The max_broken_pool_restarts is ignored since the BrokenPoolError handling has not yet been implemented in the multiprocessing.Pool - see https://bugs.python.org/issue22393 and https://bugs.python.org/issue9205.
-
magni.utils.multiprocessing._processing.
_process_init
(func, namespace)[source]¶ Initialise the process by making global variables available to it.
Parameters: - func (function) – A function handle to the function which the calls should be mapped to.
- namespace (dict) – A dict whose keys and values should be globally available in func.
-
magni.utils.multiprocessing._processing.
_process_worker
(fak_tuple)[source]¶ Unpack and map a task to the function.
Parameters: - fak_tuple (tuple) – A tuple (func, args, kwargs) containing the parameters listed below.
- func (function) – A function handle to the function which the calls should be mapped to.
- args (list or tuple) – The sequence of arguments that should be unpacked and passed.
- kwargs (list or tuple) – The sequence of keyword arguments that should be unpacked and passed.
Notes
If an exception is raised in func, the stacktrace of that exception is printed since the exception is otherwise silenced until every task has been executed when using multiple workers.
Also, a workaround has been implemented to allow KeyboardInterrupts to interrupt the current tasks and all remaining tasks. This is done by setting a global variable, when catching a KeyboardInterrupt, which is checked for every call.