multiprocess#
- caf.toolkit.concurrency.multiprocessing_wrapper.multiprocess(fn, *, arg_list=None, kwarg_list=None, process_count=None, pool_maxtasksperchild=4, in_order=False, result_timeout=86400, pbar_kwargs=None)[source]#
Run a function and arguments across multiple cores of a CPU.
Runs the given function with the arguments given in a multiprocessing.Pool, returning the function output.
- Deals with various process_count values:
If negative, os.cpu_count() - process_count processes will be used
If 0, no multiprocessing will be used. The code will be run in a for loop, using only one process (and therefore CPU).
If positive, process_count processes will be used. If process_count is greater than os.cpu_count() - 1, a warning will be raised.
- Parameters:
fn (Callable[[...], _T]) – The function to call.
arg_list (Collection[Iterable[Any]] | None) – A list of iterables e.g. tuples/lists. len(args) equals the number of times fn will be called. If kwargs is also provided, args should directly correspond to it. Each tuple should contain a full set of non-keyword arguments to be passed to a single call of fn.
kwarg_list (Collection[Mapping[str, Any]] | None) – A list of dictionaries. The keys should be the keyword argument names, and the values the keyword argument values. len(kwargs) equals the number of times fn will be called. If args is also provided, kwargs should directly correspond to it. Each dictionary should contain a full set of keyword arguments to be passed to a single call of fn.
process_count (int | None) – The number of processes to create in the Pool. Typically, this should not exceed the number of cores available. Defaults to os.cpu_count() - 1.
pool_maxtasksperchild (int) – Passed into the created Pool as maxtaskperchild=pool_maxtaskperchild. It is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed.
in_order (bool) – Whether the indexes of the return values need to directly corresspond to the input values (args and kwargs) given. Setting this to True is slightly slower due to sorting the results.
result_timeout (int) – How long to wait for each process to finish before throwing an exception. Defaults to 86400 seconds, (24 hours).
pbar_kwargs (dict[str, Any] | None) – A dictionary of keyword arguments to pass into a progress bar. This dictionary is passed into tqdm.tqdm(**pbar_kwargs) when building the progress bar.
- Return type:
list[_T]
See also
tqdm.tqdm()
Examples
The following three function calls: >>> a = sorted(range(10)) >>> b = sorted(range(100)) >>> c = sorted(range(20), reverse=True)
Would be called, using this function, like this: >>> # Note the use of a tuple to make sure a single argument is still >>> # iterable >>> a_args = (range(10), ) >>> b_args = (range(100), ) >>> c_args = (range(20 ), ) >>> >>> # Need to use an empty dict where arguments are not given >>> a_kwargs = dict() >>> b_kwargs = dict() >>> c_kwargs = {‘reverse’: True}
>>> args_list = [a_args, b_args, c_args] >>> kwargs_list = [a_kwargs, b_kwargs, c_kwargs] >>> a, b, c = multiprocess(sorted, arg_list=args_list, kwarg_list=kwargs_list)
# TODO(BT): Add example of how to convert a for-loop into one of these calls.