magni.utils.multiprocessing._processing module

Module providing the process function.

Routine listings

process(func, namespace={}, args_list=None, kwargs_list=None, maxtasks=None)
Map multiple function calls to multiple processors.

See also

magni.utils.multiprocessing.config
Configuration options.
magni.utils.multiprocessing._processing._get_num_threads()[source]
magni.utils.multiprocessing._processing._set_num_threads(n)[source]
magni.utils.multiprocessing._processing.process(func, namespace={}, args_list=None, kwargs_list=None, maxtasks=None)[source]

Map multiple function calls to multiple processes.

For each entry in args_list and kwargs_list, a task is formed which is used for a function call of the type func(*args, **kwargs). Each task is executed in a seperate process using the concept of a processing pool.

Parameters:
  • func (function) – A function handle to the function which the calls should be mapped to.
  • namespace (dict, optional) – A dict whose keys and values should be globally available in func (the default is an empty dict).
  • args_list (list or tuple, optional) – A sequence of argument lists for the function calls (the default is None, which implies that no arguments are used in the calls).
  • kwargs_list (list or tuple, optional) – A sequence of keyword argument dicts for the function calls (the default is None, which implies that no keyword arguments are used in the calls).
  • maxtasks (int, optional) – The maximum number of tasks of a process before it is replaced by a new process (the default is None, which implies that processes are not replaced).
Returns:

results (list) – A list with the results from the function calls.

Raises:

BrokenPoolError – If using the concurrent.futures module and one or more workers terminate abrubtly with the automatic broken pool restart funtionality disabled.

See also

magni.utils.multiprocessing.config()
Configuration options.

Notes

If the workers configuration option is equal to 0, map is used. Otherwise, the map functionality of a processing pool is used.

Reasons for using this function over map or standard multiprocessing:

  • Simplicity of the code over standard multiprocessing.
  • Simplicity in switching between single- and multiprocessing.
  • The use of both arguments and keyword arguments in the function calls.
  • The reporting of exceptions before termination.
  • The possibility of terminating multiprocessing with a single interrupt.
  • The option of automatically restarting a broken process pool.

As of Python 3.2, two different, though quite similar, modules exist in the standard library for managing processing pools: multiprocessing and concurrent.futures. According to Python core contributor Jesse Noller, the plan is to eventually make concurrent.futures the only interface to the high level processing pools (futures), whereas multiprocessing is supposed to serve more low level needs for individually handling processes, locks, queues, etc. (see https://bugs.python.org/issue9205#msg132661). As of Python 3.5, both the multiprocessing.Pool and concurrent.futures.ProcessPoolExecutor serve almost the same purpose and provide very similar interfaces. The main differences between the two are:

  • The option of using a worker initialiser is only available in multiprocessing.
  • The option of specifing a maximum number of tasks for a worker to execute before being replaced to free up ressources (the maxtasksperchild option) is only available in multiprocessing.
  • The option of specifying a context is only available in multiprocessing
  • “Reasonable” handling of abrubt worker termination and exceptions is only
    available in concurrent.futures.

Particularly, the “reasonable” handling of a broken process pool may be a strong argument to prefer concurrent.futures over multiprocessing. The matter of handling a broken process pool has been extensively discussed in https://bugs.python.org/issue9205 which led to the fix for concurrent.futures. A similar fix for multiprocessing has been proposed in https://bugs.python.org/issue22393.

Both the multiprocessing and concurrent.futures interfaces are available for use with this function. If the configuration parameter prefer_futures is set to True and the concurrent.futures module is available, this is used. Otherwise, the multiprocessing module is used. A Python 2 backport of concurrent.futures is available at https://pythonhosted.org/futures/.

When using concurrent.futures, the namespace and init_args are ignored since these are not supported by that module. Support for maxtasks is emulated in a way such that each process on average does maxtasks before it is restarted. The init_args functionality may be added later on if an initialiser is added to concurrent.futures - see http://bugs.python.org/issue21423. If the max_broken_pool_restarts configuration parameter is set to a value different from 0, the Pool is automatically restarted and the tasks are re-run should a broken pool be encountered. If max_broken_pool_restarts is set to 0, a BrokenPoolError is raised should a broken pool be encountered.

When using multiprocessing, the max_broken_pool_restarts is ignored since the BrokenPoolError handling has not yet been implemented for the multiprocessing.Pool - see https://bugs.python.org/issue22393 as well as https://bugs.python.org/issue9205.

Examples

An example of how to use args_list and kwargs_list:

>>> import magni
>>> from magni.utils.multiprocessing._processing import process
>>> def calculate(a, b, op='+'):
...     if op == '+':
...         return a + b
...     elif op == '-':
...         return a - b
...
>>> args_list = [[5, 7], [9, 3]]
>>> kwargs_list = [{'op': '+'}, {'op': '-'}]
>>> process(calculate, args_list=args_list, kwargs_list=kwargs_list)
[12, 6]

or the same example preferring concurrent.futures over multiprocessing:

>>> magni.utils.multiprocessing.config['prefer_futures'] = True
>>> process(calculate, args_list=args_list, kwargs_list=kwargs_list)
[12, 6]
magni.utils.multiprocessing._processing._get_tasks(func, args_list, kwargs_list)[source]

Prepare a list of tasks.

Parameters:
  • func (function) – A function handle to the function which the calls should be mapped to.
  • args_list (list or tuple, optional) – A sequence of argument lists for the function calls (the default is None, which implies that no arguments are used in the calls).
  • kwargs_list (list or tuple, optional) – A sequence of keyword argument dicts for the function calls (the default is None, which implies that no keyword arguments are used in the calls).
Returns:

tasks (list) – The list of tasks.

magni.utils.multiprocessing._processing._map_using_futures(func, tasks, init_args, maxtasks, max_broken_pool_restarts)[source]

Map a set of tasks to func and run them in parallel using a futures.

If max_broken_pool_restarts is different from 0, the tasks must be an explicit collection, e.g. a list or tuple, for the restart to work. If an exception occurs in one of the function calls, the process pool terminates ASAP and re-raises the first exception that occurred. All exceptions that may have occurred in the workers are available as the last element in the exceptions args.

Parameters:
  • func (function) – A function handle to the function which the calls should be mapped to.
  • tasks (iterable) – The list of tasks to use as arguments in the function calls.
  • init_args (tuple) – The (func, namespace) tuple for the _process_init initialisation function.
  • maxtasks (int) – The maximum number of tasks of a process before it is replaced by a new process. If set to None, the process is never replaced.
  • max_broken_pool_restarts (int or None) – The maximum number of attempts at restarting the process pool upon a BrokenPoolError. If set to None, the process pool may restart indefinitely.
Returns:

results (list) – The list of results from the map operation.

Notes

The namespace and init_args are ignored since these are not supported by concurrent.futures. Support for maxtasks is emulated in a way such that each process on average does maxtasks before it is restarted. The init_args functionality may be added if an initialiser is added to concurrent.futures - see http://bugs.python.org/issue21423.

magni.utils.multiprocessing._processing._map_using_mppool(func, tasks, init_args, maxtasks, max_broken_pool_restarts)[source]

Map a set of tasks to func and run them in parallel via multiprocessing

Parameters:
  • func (function) – A function handle to the function which the calls should be mapped to.
  • tasks (iterable) – The list of tasks to use as arguments in the function calls.
  • init_args (tuple) – The (func, namespace) tuple for the _process_init initialisation function.
  • maxtasks (int) – The maximum number of tasks of a process before it is replaced by a new process. If set to None, the process is never replaced.
  • max_broken_pool_restarts (int or None) – The maximum number of attempts at restarting the process pool upon a BrokenPoolError. If set to None, the process pool may restart indefinitely.
Returns:

results (list) – The list of results from the map operation.

Notes

The max_broken_pool_restarts is ignored since the BrokenPoolError handling has not yet been implemented in the multiprocessing.Pool - see https://bugs.python.org/issue22393 and https://bugs.python.org/issue9205.

magni.utils.multiprocessing._processing._process_init(func, namespace)[source]

Initialise the process by making global variables available to it.

Parameters:
  • func (function) – A function handle to the function which the calls should be mapped to.
  • namespace (dict) – A dict whose keys and values should be globally available in func.
magni.utils.multiprocessing._processing._process_worker(fak_tuple)[source]

Unpack and map a task to the function.

Parameters:
  • fak_tuple (tuple) – A tuple (func, args, kwargs) containing the parameters listed below.
  • func (function) – A function handle to the function which the calls should be mapped to.
  • args (list or tuple) – The sequence of arguments that should be unpacked and passed.
  • kwargs (list or tuple) – The sequence of keyword arguments that should be unpacked and passed.

Notes

If an exception is raised in func, the stacktrace of that exception is printed since the exception is otherwise silenced until every task has been executed when using multiple workers.

Also, a workaround has been implemented to allow KeyboardInterrupts to interrupt the current tasks and all remaining tasks. This is done by setting a global variable, when catching a KeyboardInterrupt, which is checked for every call.