PyMOTW: multiprocessing, part 2

Communication between processes with multiprocessing

By Doug Hellmann
April 28, 2009 | Comments: 2

This is part 2 of coverage of the multiprocessing module. If you missed part one, you may want to start there.

Passing Messages to Processes

As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. A simple way to do that with multiprocessing is to use Queues to pass messages back and forth. Any pickle-able object can pass through a multiprocessing Queue.

import multiprocessing

class MyFancyClass(object):
    
    def __init__(self, name):
        self.name = name
    
    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print 'Doing something fancy in %s for %s!' % (proc_name, self.name)


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    
    queue.put(MyFancyClass('Fancy Dan'))
    
    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()
    

This short example only passes a single message to a single worker, then the main process waits for the worker to finish.

$ python multiprocessing_queue.py
Doing something fancy in Process-1 for Fancy Dan!

A more complex example shows how to manage several workers consuming data from the queue and passing results back to the parent process. The poison pill technique is used to stop the workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop.

import multiprocessing
import time

class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means we should exit
                print '%s: Exiting' % proc_name
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do our work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.Queue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = 10
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))
    
    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)
    
    # Start printing results
    while num_jobs:
        result = results.get()
        print 'Result:', result
        num_jobs -= 1
        

Although the jobs enter the queue in order, since their execution is parallelized there is no guarantee about the order they will be completed.

$ python multiprocessing_producer_consumer.py
Creating 4 consumers
Consumer-4: 3 * 3
Consumer-4: 7 * 7
Consumer-4: Exiting
Consumer-2: 1 * 1
Consumer-2: 6 * 6
Consumer-2: Exiting
Consumer-3: 0 * 0
Consumer-3: 5 * 5
Consumer-3: 8 * 8
Consumer-3: Exiting
Consumer-1: 2 * 2
Consumer-1: 4 * 4
Consumer-1: 9 * 9
Consumer-1: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 3 * 3 = 9
Result: 5 * 5 = 25
Result: 4 * 4 = 16
Result: 6 * 6 = 36
Result: 7 * 7 = 49
Result: 8 * 8 = 64
Result: 9 * 9 = 81

Signaling between Processes with Event objects

Events provide a simple way to communicate state information between processes. An event can be toggled between set and unset states. Users of the event object can wait for it to change from unset to set, using an optional timeout value.

import multiprocessing
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print 'wait_for_event: starting'
    e.wait()
    print 'wait_for_event: e.is_set()->', e.is_set()

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print 'wait_for_event_timeout: starting'
    e.wait(t)
    print 'wait_for_event_timeout: e.is_set()->', e.is_set()


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name='block', 
                                 target=wait_for_event,
                                 args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name='non-block', 
                                 target=wait_for_event_timeout, 
                                 args=(e, 2))
    w2.start()

    print 'main: waiting before calling Event.set()'
    time.sleep(3)
    e.set()
    print 'main: event is set'

When wait() times out it returns without an error. The caller is responsible for checking the state of the event using is_set().

$ python multiprocessing_event.py
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
wait_for_event: starting
wait_for_event: e.is_set()-> True
main: waiting before calling Event.set()
main: event is set

Controlling access to resources with Lock

In situations when a single resource needs to be shared between multiple processes, a Lock can be used to avoid conflicting accesses.

import multiprocessing
import sys

def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquired via with\n')
        
def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquired directly\n')
    finally:
        lock.release()

lock = multiprocessing.Lock()
w = multiprocessing.Process(target=worker_with, args=(lock, sys.stdout))
nw = multiprocessing.Process(target=worker_no_with, args=(lock, sys.stdout))

w.start()
nw.start()

w.join()
nw.join()

In this example, the messages printed to stdout may be jumbled together if the two processes do not synchronize their access of the output stream with the lock.

$ python multiprocessing_lock.py
Lock acquired via with
Lock acquired directly

Synchronizing threads with a Condition object

Condition objects let you synchronize parts of a workflow so that some run in parallel but others run sequentially, even if they are in separate processes.

import multiprocessing
import time

def stage_1(cond):
    """perform first stage of work, then notify stage_2 to continue"""
    name = multiprocessing.current_process().name
    print 'Starting', name
    with cond:
        print '%s done and ready for stage 2' % name
        cond.notify_all()

def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print 'Starting', name
    with cond:
        cond.wait()
        print '%s running' % name

if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,))
    s2_clients = [
        multiprocessing.Process(name='stage_2[%d]' % i, target=stage_2, args=(condition,))
        for i in range(1, 3)
        ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

In this example, two process run stage two of a job in parallel once the first stage is done.

$ python multiprocessing_condition.py
Starting s1
s1 done and ready for stage 2
Starting stage_2[1]
stage_2[1] running
Starting stage_2[2]
stage_2[2] running

Controlling concurrent access to resources with a Semaphore

Sometimes it is useful to allow more than one worker access to a resource at a time, while still limiting the overall number. For example, a connection pool might support a fixed number of simultaneous connections, or a network application might support a fixed number of concurrent downloads. A Semaphore is one way to manage those connections.

import random
import multiprocessing
import time

class ActivePool(object):
    def __init__(self):
        super(ActivePool, self).__init__()
        self.mgr = multiprocessing.Manager()
        self.active = self.mgr.list()
        self.lock = multiprocessing.Lock()
    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
    def __str__(self):
        with self.lock:
            return str(self.active)

def worker(s, pool):
    name = multiprocessing.current_process().name
    with s:
        pool.makeActive(name)
        print 'Now running: %s' % str(pool)
        time.sleep(random.random())
        pool.makeInactive(name)

if __name__ == '__main__':
    pool = ActivePool()
    s = multiprocessing.Semaphore(3)
    jobs = [
        multiprocessing.Process(target=worker, name=str(i), args=(s, pool))
        for i in range(10)
        ]

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()
        print 'Now running: %s' % str(pool)

In this example, the ActivePool class simply serves as a convenient way to track which process are running at a given moment. A real resource pool would probably allocate a connection or some other value to the newly active process, and reclaim the value when the task is done. Here, it is just used to hold the names of the active processes to show that only 3 are running concurrently.

$ python multiprocessing_semaphore.py
Now running: ['3', '2', '0']
Now running: ['3', '2', '0']
Now running: ['0', '1', '5']
Now running: ['2', '0', '1']
Now running: ['0', '1', '4']
Now running: ['3', '2', '0']
Now running: ['0', '7', '6']
Now running: ['0', '4', '7']
Now running: ['7', '6', '8']
Now running: ['7', '8', '9']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '8', '9']
Now running: ['8', '9']
Now running: ['9']
Now running: []

Managers

In the previous example, the list of active processes is maintained centrally in the ActivePool instance via a special type of list object created by a Manager. The Manager is responsible for coordinating shared information state between all of its users. By creating the list through the manager, the list is updated in all processes when anyone modifies it. In addition to lists, dictionaries are also supported.

import multiprocessing

def worker(d, key, value):
    d[key] = value

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [ multiprocessing.Process(target=worker, args=(d, i, i*2))
             for i in range(10) 
             ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print 'Results:', d
$ python multiprocessing_manager_dict.py
Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}

Namespaces

In addition to dictionaries and lists, a Manager can create a shared Namespace. Any named value added to the Namespace is visible across all of the clients.

import multiprocessing

def producer(ns, event):
    ns.value = 'This is the value'
    event.set()

def consumer(ns, event):
    try:
        value = ns.value
    except Exception, err:
        print 'Before event, consumer got:', str(err)
    event.wait()
    print 'After event, consumer got:', ns.value

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(target=producer, args=(namespace, event))
    c = multiprocessing.Process(target=consumer, args=(namespace, event))
    
    c.start()
    p.start()
    
    c.join()
    p.join()
$ python multiprocessing_namespaces.py
Before event, consumer got: 'Namespace' object has no attribute 'value'
After event, consumer got: This is the value

It is important to know that updates to mutable values in the namespace are not propagated.

import multiprocessing

def producer(ns, event):
    ns.my_list.append('This is the value') # DOES NOT UPDATE GLOBAL VALUE!
    event.set()

def consumer(ns, event):
    print 'Before event, consumer got:', ns.my_list
    event.wait()
    print 'After event, consumer got:', ns.my_list

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []
    
    event = multiprocessing.Event()
    p = multiprocessing.Process(target=producer, args=(namespace, event))
    c = multiprocessing.Process(target=consumer, args=(namespace, event))
    
    c.start()
    p.start()
    
    c.join()
    p.join()
$ python multiprocessing_namespaces_mutable.py
Before event, consumer got: []
After event, consumer got: []

Pool.map

For simple cases where the work to be done can be broken up and distributed between workers, you do not have to manage the queue and worker processes yourself. The Pool class maintains a fixed number of workers and passes them jobs. The return values are collected and returned as a list. The result is functionally equivalent to the built-in map(), except that individual tasks run in parallel.

import multiprocessing

def do_calculation(data):
    return data * 2

if __name__ == '__main__':
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size)
    
    inputs = list(range(10))
    print 'Input   :', inputs
    
    builtin_outputs = map(do_calculation, inputs)
    print 'Built-in:', builtin_outputs
    
    pool_outputs = pool.map(do_calculation, inputs)
    print 'Pool    :', pool_outputs
$ python multiprocessing_pool.py
Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

PyMOTW Home


You might also be interested in:

2 Comments


Is it possible share (and acquire) a lock in "do_calculation" when using pool.map(do_calculation, inputs) ?

ps: please notify me when you'll (and _if_ you'll) answer my question. Thanks.

This post rocks! Thanks a lot for the examples - I found them much more readable than ones in the official documentation.

News Topics

Recommended for You

Got a Question?