Multiprocessing - Advanced Python 17
In this tutorial we talk about how to use the `multiprocessing` module in Python.
In this article we talk about how to use the
multiprocessing module in Python.
- How to create and start multiple processes
- How to wait for processes to complete
- How to share data between processes
- How to use
Locksto prevent race conditions
- How to use a
Queuefor process-safe data/task processing.
- How to use a
Poolto manage multiple worker processes
Create and run processes¶
You create a process with
multiprocessing.Process(). It takes two important arguments:
target: a callable object (function) for this process to be invoked when the process starts
args: the (function) arguments for the target function. This must be a tuple
Start a process with
process.join() to tell the program that it should wait for this process to complete before it continues with the rest of the code.
from multiprocessing import Process import os def square_numbers(): for i in range(1000): result = i * i if __name__ == "__main__": processes =  num_processes = os.cpu_count() # number of CPUs on the machine. Usually a good choise for the number of processes # create processes and asign a function for each process for i in range(num_processes): process = Process(target=square_numbers) processes.append(process) # start all processes for process in processes: process.start() # wait for all processes to finish # block the main programm until these processes are finished for process in processes: process.join()
Share data between processes¶
Since processes don't live in the same memory space, they do not have access to the same (public) data. Thus, they need special shared memory objects to share data.
Data can be stored in a shared memory variable using
Value(type, value): Create a
ctypesobject of type
type. Access the value with
Array(type, value): Create a
ctypesarray with elements of type
type. Access the values with
Task: Create two processes, each process should have access to a shared variable and modify it (in this case only increase it repeatedly by 1 for 100 times). Create another two processes that share an array and modify (increase) all the elements in the array.
from multiprocessing import Process, Value, Array import time def add_100(number): for _ in range(100): time.sleep(0.01) number.value += 1 def add_100_array(numbers): for _ in range(100): time.sleep(0.01) for i in range(len(numbers)): numbers[i] += 1 if __name__ == "__main__": shared_number = Value('i', 0) print('Value at beginning:', shared_number.value) shared_array = Array('d', [0.0, 100.0, 200.0]) print('Array at beginning:', shared_array[:]) process1 = Process(target=add_100, args=(shared_number,)) process2 = Process(target=add_100, args=(shared_number,)) process3 = Process(target=add_100_array, args=(shared_array,)) process4 = Process(target=add_100_array, args=(shared_array,)) process1.start() process2.start() process3.start() process4.start() process1.join() process2.join() process3.join() process4.join() print('Value at end:', shared_number.value) print('Array at end:', shared_array[:]) print('end main')
Value at beginning: 0 Array at beginning: [0.0, 100.0, 200.0] Value at end: 144 Array at end: [134.0, 237.0, 339.0] end main
How to use
Notice that in the above example, the 2 processes should increment the shared value by 1 for 100 times. This results in 200 total operations. But why is the end value not 200?
A race condition happened here. A race condition occurs when two or more processes or threads can access shared data and they try to change it at the same time. In our example the two processes have to read the shared value, increase it by 1, and write it back into the shared variable. If this happens at the same time, the two processes read the same value, increase it and write it back. Thus, both processes write the same increased value back into the shared object, and the value was not increased by 2. See https://www.python-engineer.com/learn/advancedpython16_threading/ for a detailed explanation of race conditions.
Avoid race conditions with
A lock (also known as mutex) is a synchronization mechanism for enforcing limits on access to a resource in an environment where there are many processes/threads of execution. A Lock has two states: locked and unlocked. If the state is locked, it does not allow other concurrent processes/threads to enter this code section until the state is unlocked again.
Two functions are important: -
lock.acquire() : This will lock the state and block -
lock.release() : This will unlock the state again.
Important: You should always release the block again after it was acquired!
In our example the critical code section where the shared variable is read and increased is now locked. This prevents the second process from modyfing the shared object at the same time. Not much has changed in our code. All new changes are commented in the code below.
# import Lock from multiprocessing import Lock from multiprocessing import Process, Value, Array import time def add_100(number, lock): for _ in range(100): time.sleep(0.01) # lock the state lock.acquire() number.value += 1 # unlock the state lock.release() def add_100_array(numbers, lock): for _ in range(100): time.sleep(0.01) for i in range(len(numbers)): lock.acquire() numbers[i] += 1 lock.release() if __name__ == "__main__": # create a lock lock = Lock() shared_number = Value('i', 0) print('Value at beginning:', shared_number.value) shared_array = Array('d', [0.0, 100.0, 200.0]) print('Array at beginning:', shared_array[:]) # pass the lock to the target function process1 = Process(target=add_100, args=(shared_number, lock)) process2 = Process(target=add_100, args=(shared_number, lock)) process3 = Process(target=add_100_array, args=(shared_array, lock)) process4 = Process(target=add_100_array, args=(shared_array, lock)) process1.start() process2.start() process3.start() process4.start() process1.join() process2.join() process3.join() process4.join() print('Value at end:', shared_number.value) print('Array at end:', shared_array[:]) print('end main')
Value at beginning: 0 Array at beginning: [0.0, 100.0, 200.0] Value at end: 200 Array at end: [200.0, 300.0, 400.0] end main
Use the lock as a context manager¶
lock.acquire() you should never forget to call
lock.release() to unblock the code. You can also use a lock as a context manager, wich will safely lock and unlock your code. It is recommended to use a lock this way:
def add_100(number, lock): for _ in range(100): time.sleep(0.01) with lock: number.value += 1
Using Queues in Python¶
Data can also be shared between processes with a Queue. Queues can be used for thread-safe/process-safe data exchanges and data processing both in a multithreaded and a multiprocessing environment, which means you can avoid having to use any synchronization primitives like locks.
A queue is a linear data structure that follows the First In First Out (FIFO) principle. A good example is a queue of customers that are waiting in line, where the customer that came first is served first.
from multiprocessing import Queue # create queue q = Queue() # add elements q.put(1) # 1 q.put(2) # 2 1 q.put(3) # 3 2 1 # now q looks like this: # back --> 3 2 1 --> front # get and remove first element first = q.get() # --> 1 print(first) # q looks like this: # back --> 3 2 --> front
Using a queue in multiprocessing¶
Operations with a queue are process-safe. The multiprocessing Queue implements all the methods of queue.Queue except for
join(). Important methods are:
q.get(): Remove and return the first item. By default, it blocks until the item is available.
q.put(item): Puts element at the end of the queue. By default, it blocks until a free slot is available.
q.empty(): Return True if the queue is empty.
q.close(): Indicate that no more data will be put on this queue by the current process.
# communicate between processes with the multiprocessing Queue # Queues are thread and process safe from multiprocessing import Process, Queue def square(numbers, queue): for i in numbers: queue.put(i*i) def make_negative(numbers, queue): for i in numbers: queue.put(i*-1) if __name__ == "__main__": numbers = range(1, 6) q = Queue() p1 = Process(target=square, args=(numbers,q)) p2 = Process(target=make_negative, args=(numbers,q)) p1.start() p2.start() p1.join() p2.join() # order might not be sequential while not q.empty(): print(q.get()) print('end main')
1 4 9 16 25 -1 -2 -3 -4 -5 end main
A process pool object controls a pool of worker processes to which jobs can be submitted It supports asynchronous results with timeouts and callbacks and has a parallel map implementation. It can automatically manage the available processors and split data into smaller chunks which can then be processed in parallel by different processes. See https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing.pool for all possible methods. Important methods are:
map(func, iterable[, chunksize]): This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer. It blocks until the result is ready.
close(): Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
join(): Wait for the worker processes to exit. One must call
apply(func, args): Call func with arguments args. It blocks until the result is ready. func is only executed in ONE of the workers of the pool.
Note: There are also asynchronous variants
apply_async() that will not block. They can execute callbacks when the results are ready.
from multiprocessing import Pool def cube(number): return number * number * number if __name__ == "__main__": numbers = range(10) p = Pool() # by default this allocates the maximum number of available # processors for this task --> os.cpu_count() result = p.map(cube, numbers) # or # result = [p.apply(cube, args=(i,)) for i in numbers] p.close() p.join() print(result)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
FREE VS Code / PyCharm Extensions I Use
✅ Write cleaner code with Sourcery, instant refactoring suggestions: Link*
PySaaS: The Pure Python SaaS Starter Kit
🚀 Build a software business faster with pure Python: Link*