Tuesday, April 28, 2009

Implementing MapReduce with multiprocessing

Implementing MapReduce with multiprocessing

While looking for example applications for Python's multiiprocessing module to use in this week's PyMOTW, someone suggested implementing MapReduce. Below is the simple implementation I came up with (the source is included in the PyMOTW tarball as of version 1.89).

The Pool class can be used to create a simple single-server MapReduce implementation. Although it does not give the full benefits of distributed processing, it does illustrate how easy it is to break some problems down into distributable units of work.

SimpleMapReduce

In MapReduce, input data is broken down into chunks for processing by different worker instances. Each chunk of input data is mapped to an intermediate state using a simple transformation. The intermediate data is then collected together and partitioned based on a key value so that all of the related values are together. Finally, the partitioned data is reduced to a result set.

import collections
import multiprocessing

class SimpleMapReduce(object):

def __init__(self, map_func, reduce_func, num_workers=None):
"""
map_func

Function to map inputs to intermediate data. Takes as argument one input value and
returns a tuple with the key and a value to be reduced.

reduce_func
Function to reduce partitioned version of intermediate data to final output. Takes
as argument a key as produced by map_func and a sequence of the values associated
with that key.

num_workers
The number of workers to create in the pool. Defaults to the number of CPUs
available on the current host.
"""
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)

def partition(self, mapped_values):
"""Organize the mapped values by their key.
Returns a dictionary mapping each key to a sequence of values.
"""
partitioned_data = collections.defaultdict(list)
for sublist in mapped_values:
for key, value in sublist:
partitioned_data[key].append(value)
return partitioned_data

def __call__(self, inputs, chunksize=1):
"""Process the inputs through the map and reduce functions given.

inputs
An iterable containing the input data to be processed.

chunksize=1
The portion of the input data to hand to each worker. This
can be used to tune performance during the mapping phase.
"""
mapped_values = self.pool.map(self.map_func, inputs, chunksize=chunksize)
partitioned_data = self.partition(mapped_values)
reduced_values = self.pool.map(self.reduce_func, partitioned_data.items())
return reduced_values

Counting Words in Files

The following example script uses SimpleMapReduce to counts the “words” in the reStructuredText source for this article, ignoring some of the markup.

import multiprocessing
import string

from multiprocessing_mapreduce import SimpleMapReduce

def file_to_words(filename):
"""Read a file and return a sequence of (word, occurances) values.
"""
STOP_WORDS = set([
'a', 'an', 'and', 'are', 'as', 'be', 'for', 'if', 'in',
'is', 'it', 'of', 'or', 'py', 'rst', 'the', 'to', 'with',
])
TR = string.maketrans(string.punctuation, ' ' * len(string.punctuation))

print multiprocessing.current_process().name, 'reading', filename
output = []

with open(filename, 'rt') as f:
for line in f:
if line.lstrip().startswith('..'): # Skip rst comment lines
continue
line = line.translate(TR) # Strip punctuation
for word in line.split():
word = word.lower()
if word.isalpha() and word not in STOP_WORDS:
output.append( (word, 1) )
return output


def count_words(item):
"""Convert the partitioned data for a word to a
tuple containing the word and the number of occurances.
"""
word, occurances = item
return (word, sum(occurances))


if __name__ == '__main__':
import operator
import glob

input_files = glob.glob('*.rst')

mapper = SimpleMapReduce(file_to_words, count_words)
word_counts = mapper(input_files)
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()

print '\nTOP 20 WORDS BY FREQUENCY\n'
top20 = word_counts[:20]
longest = max(len(word) for word, count in top20)
for word, count in top20:
print '%-*s: %5s' % (longest+1, word, count)

Each input filename is converted to a sequence of (word, 1) pairs by file_to_words. The data is partitioned by SimpleMapReduce.partition() using the word as the key, so the partitioned data consists of a key and a sequence of 1 values representing the number of occurrences of the word. The reduction phase converts that to a pair of (word, count) values by calling count_words for each element of the partitioned data set.

$ python multiprocessing_wordcount.py
PoolWorker-1 reading communication.rst
PoolWorker-1 reading index.rst
PoolWorker-1 reading mapreduce.rst
PoolWorker-2 reading basics.rst

TOP 20 WORDS BY FREQUENCY

process : 74
multiprocessing : 42
worker : 35
after : 31
running : 30
start : 29
python : 28
processes : 27
literal : 26
header : 26
pymotw : 26
end : 26
daemon : 23
now : 22
consumer : 20
starting : 18
exiting : 17
event : 16
that : 16
by : 15

See also

MapReduce - Wikipedia
Overview of MapReduce on Wikipedia.
MapReduce: Simplified Data Processing on Large Clusters
Google Labs presentation and paper on MapReduce.

Special thanks to Jesse Noller for helping review this information.

PyMOTW Home

PyMOTW: multiprocessing, part 2

Communication between processes with multiprocessing

This is part 2 of coverage of the multiprocessing module. If you missed part one, you may want to start there.


Passing Messages to Processes

As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. A simple way to do that with multiprocessing is to use Queues to pass messages back and forth. Any pickle-able object can pass through a multiprocessing Queue.

import multiprocessing

class MyFancyClass(object):

def __init__(self, name):
self.name = name

def do_something(self):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % (proc_name, self.name)


def worker(q):
obj = q.get()
obj.do_something()


if __name__ == '__main__':
queue = multiprocessing.Queue()

p = multiprocessing.Process(target=worker, args=(queue,))
p.start()

queue.put(MyFancyClass('Fancy Dan'))

# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()

This short example only passes a single message to a single worker, then the main process waits for the worker to finish.

$ python multiprocessing_queue.py
Doing something fancy in Process-1 for Fancy Dan!

A more complex example shows how to manage several workers consuming data from the queue and passing results back to the parent process. The poison pill technique is used to stop the workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop.

import multiprocessing
import time

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means we should exit
print '%s: Exiting' % proc_name
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.result_queue.put(answer)
return


class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do our work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)


if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()

# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [ Consumer(tasks, results)
for i in xrange(num_consumers) ]
for w in consumers:
w.start()

# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
tasks.put(Task(i, i))

# Add a poison pill for each consumer
for i in xrange(num_consumers):
tasks.put(None)

# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1

Although the jobs enter the queue in order, since their execution is parallelized there is no guarantee about the order they will be completed.

$ python multiprocessing_producer_consumer.py
Creating 4 consumers
Consumer-4: 3 * 3
Consumer-4: 7 * 7
Consumer-4: Exiting
Consumer-2: 1 * 1
Consumer-2: 6 * 6
Consumer-2: Exiting
Consumer-3: 0 * 0
Consumer-3: 5 * 5
Consumer-3: 8 * 8
Consumer-3: Exiting
Consumer-1: 2 * 2
Consumer-1: 4 * 4
Consumer-1: 9 * 9
Consumer-1: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
Result: 3 * 3 = 9
Result: 5 * 5 = 25
Result: 4 * 4 = 16
Result: 6 * 6 = 36
Result: 7 * 7 = 49
Result: 8 * 8 = 64
Result: 9 * 9 = 81

Signaling between Processes with Event objects

Events provide a simple way to communicate state information between processes. An event can be toggled between set and unset states. Users of the event object can wait for it to change from unset to set, using an optional timeout value.

import multiprocessing
import time

def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print 'wait_for_event: starting'
e.wait()
print 'wait_for_event: e.is_set()->', e.is_set()

def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print 'wait_for_event_timeout: starting'
e.wait(t)
print 'wait_for_event_timeout: e.is_set()->', e.is_set()


if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(name='block',
target=wait_for_event,
args=(e,))
w1.start()

w2 = multiprocessing.Process(name='non-block',
target=wait_for_event_timeout,
args=(e, 2))
w2.start()

print 'main: waiting before calling Event.set()'
time.sleep(3)
e.set()
print 'main: event is set'

When wait() times out it returns without an error. The caller is responsible for checking the state of the event using is_set().

$ python multiprocessing_event.py
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
wait_for_event: starting
wait_for_event: e.is_set()-> True
main: waiting before calling Event.set()
main: event is set

Controlling access to resources with Lock

In situations when a single resource needs to be shared between multiple processes, a Lock can be used to avoid conflicting accesses.

import multiprocessing
import sys

def worker_with(lock, stream):
with lock:
stream.write('Lock acquired via with\n')

def worker_no_with(lock, stream):
lock.acquire()
try:
stream.write('Lock acquired directly\n')
finally:
lock.release()

lock = multiprocessing.Lock()
w = multiprocessing.Process(target=worker_with, args=(lock, sys.stdout))
nw = multiprocessing.Process(target=worker_no_with, args=(lock, sys.stdout))

w.start()
nw.start()

w.join()
nw.join()

In this example, the messages printed to stdout may be jumbled together if the two processes do not synchronize their access of the output stream with the lock.

$ python multiprocessing_lock.py
Lock acquired via with
Lock acquired directly

Synchronizing threads with a Condition object

Condition objects let you synchronize parts of a workflow so that some run in parallel but others run sequentially, even if they are in separate processes.

import multiprocessing
import time

def stage_1(cond):
"""perform first stage of work, then notify stage_2 to continue"""
name = multiprocessing.current_process().name
print 'Starting', name
with cond:
print '%s done and ready for stage 2' % name
cond.notify_all()

def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print 'Starting', name
with cond:
cond.wait()
print '%s running' % name

if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,))
s2_clients = [
multiprocessing.Process(name='stage_2[%d]' % i, target=stage_2, args=(condition,))
for i in range(1, 3)
]

for c in s2_clients:
c.start()
time.sleep(1)
s1.start()

s1.join()
for c in s2_clients:
c.join()

In this example, two process run stage two of a job in parallel once the first stage is done.

$ python multiprocessing_condition.py
Starting s1
s1 done and ready for stage 2
Starting stage_2[1]
stage_2[1] running
Starting stage_2[2]
stage_2[2] running

Controlling concurrent access to resources with a Semaphore

Sometimes it is useful to allow more than one worker access to a resource at a time,
while still limiting the overall number. For example, a connection pool might
support a fixed number of simultaneous connections, or a network application
might support a fixed number of concurrent downloads. A Semaphore is one way
to manage those connections.

import random
import multiprocessing
import time

class ActivePool(object):
def __init__(self):
super(ActivePool, self).__init__()
self.mgr = multiprocessing.Manager()
self.active = self.mgr.list()
self.lock = multiprocessing.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
def __str__(self):
with self.lock:
return str(self.active)

def worker(s, pool):
name = multiprocessing.current_process().name
with s:
pool.makeActive(name)
print 'Now running: %s' % str(pool)
time.sleep(random.random())
pool.makeInactive(name)

if __name__ == '__main__':
pool = ActivePool()
s = multiprocessing.Semaphore(3)
jobs = [
multiprocessing.Process(target=worker, name=str(i), args=(s, pool))
for i in range(10)
]

for j in jobs:
j.start()

for j in jobs:
j.join()
print 'Now running: %s' % str(pool)

In this example, the ActivePool class simply serves as a convenient way to
track which process are running at a given moment. A real resource pool
would probably allocate a connection or some other value to the newly active
process, and reclaim the value when the task is done. Here, it is just used to
hold the names of the active processes to show that only 3 are running
concurrently.

$ python multiprocessing_semaphore.py
Now running: ['3', '2', '0']
Now running: ['3', '2', '0']
Now running: ['0', '1', '5']
Now running: ['2', '0', '1']
Now running: ['0', '1', '4']
Now running: ['3', '2', '0']
Now running: ['0', '7', '6']
Now running: ['0', '4', '7']
Now running: ['7', '6', '8']
Now running: ['7', '8', '9']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '6', '8']
Now running: ['7', '8', '9']
Now running: ['8', '9']
Now running: ['9']
Now running: []

Managers

In the previous example, the list of active processes is maintained centrally in the ActivePool instance via a special type of list object created by a Manager. The Manager is responsible for coordinating shared information state between all of its users. By creating the list through the manager, the list is updated in all processes when anyone modifies it. In addition to lists, dictionaries are also supported.

import multiprocessing

def worker(d, key, value):
d[key] = value

if __name__ == '__main__':
mgr = multiprocessing.Manager()
d = mgr.dict()
jobs = [ multiprocessing.Process(target=worker, args=(d, i, i*2))
for i in range(10)
]
for j in jobs:
j.start()
for j in jobs:
j.join()
print 'Results:', d
$ python multiprocessing_manager_dict.py
Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}

Namespaces

In addition to dictionaries and lists, a Manager can create a shared Namespace. Any named value added to the Namespace is visible across all of the clients.

import multiprocessing

def producer(ns, event):
ns.value = 'This is the value'
event.set()

def consumer(ns, event):
try:
value = ns.value
except Exception, err:
print 'Before event, consumer got:', str(err)
event.wait()
print 'After event, consumer got:', ns.value

if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
event = multiprocessing.Event()
p = multiprocessing.Process(target=producer, args=(namespace, event))
c = multiprocessing.Process(target=consumer, args=(namespace, event))

c.start()
p.start()

c.join()
p.join()
$ python multiprocessing_namespaces.py
Before event, consumer got: 'Namespace' object has no attribute 'value'
After event, consumer got: This is the value

It is important to know that updates to mutable values in the namespace are not propagated.

import multiprocessing

def producer(ns, event):
ns.my_list.append('This is the value') # DOES NOT UPDATE GLOBAL VALUE!
event.set()

def consumer(ns, event):
print 'Before event, consumer got:', ns.my_list
event.wait()
print 'After event, consumer got:', ns.my_list

if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
namespace.my_list = []

event = multiprocessing.Event()
p = multiprocessing.Process(target=producer, args=(namespace, event))
c = multiprocessing.Process(target=consumer, args=(namespace, event))

c.start()
p.start()

c.join()
p.join()
$ python multiprocessing_namespaces_mutable.py
Before event, consumer got: []
After event, consumer got: []

Pool.map

For simple cases where the work to be done can be broken up and distributed between workers, you do not have to manage the queue and worker processes yourself. The Pool class maintains a fixed number of workers and passes them jobs. The return values are collected and returned as a list. The result is functionally equivalent to the built-in map(), except that individual tasks run in parallel.

import multiprocessing

def do_calculation(data):
return data * 2

if __name__ == '__main__':
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size)

inputs = list(range(10))
print 'Input :', inputs

builtin_outputs = map(do_calculation, inputs)
print 'Built-in:', builtin_outputs

pool_outputs = pool.map(do_calculation, inputs)
print 'Pool :', pool_outputs
$ python multiprocessing_pool.py
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

PyMOTW Home

Sunday, April 19, 2009

PyMOTW: multiprocessing, part 1

multiprocessing Basics

Purpose:Provides an API for managing processes.
Python Version:2.6

The multiprocessing module includes a relatively simple API for dividing work up between multiple processes. It is based on the API for threading, and in some cases is a drop-in replacement. Due to the similarity, the first few examples here are modified from the threading examples. Features provided by multiprocessing but not available in threading are covered later.

Process objects

The simplest way to use a sub-process is to instantiate it with a target function
and call start() to let it begin working.

import multiprocessing

def worker():
"""worker function"""
print 'Worker'
return

if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()

The output includes the word “Worker” printed five times, although it may not be entirely clean depending on the order of execution. A later example illustrates using a lock to ensure that only one worker can print to stdout at a time.

$ python multiprocessing_simple.py
Worker
Worker
Worker
Worker
Worker

It usually more useful to be able to spawn a process with arguments to tell it what
work to do. Unlike with threading, to pass arguments to a multiprocessing Process the argument must be able to be pickled. As a simple example we could pass each
worker a number so the output is a little more interesting in the second
example.

import multiprocessing

def worker(num):
"""thread worker function"""
print 'Worker:', num
return

if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()

The integer argument is now included in the message printed by each worker:

$ python multiprocessing_simpleargs.py
Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

Importable Target Functions

One difference you will notice between the threading and multiprocessing examples is the extra protection for __main__ used here. Due to the way the new processes are started, the child process needs to be able to import the script containing the target function. In these examples I accomplish that by wrapping the main part of the application so it is not run recursively in each child. You could also import the target function from a separate script.

For example, this main program:

import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=multiprocessing_import_worker.worker)
jobs.append(p)
p.start()

uses this worker function, defined in a separate module:

def worker():
"""worker function"""
print 'Worker'
return

and produces output like the first example above:

$ python multiprocessing_import_main.py
Worker
Worker
Worker
Worker
Worker

Determining the Current Process

Passing arguments to identify or name the process is cumbersome, and unnecessary.
Each Process instance has a name with a default value that you can change as
the process is created. Naming processes is useful if you have a server
with multiple service children handling different operations.

import multiprocessing
import time

def worker():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(2)
print name, 'Exiting'

def my_service():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(3)
print name, 'Exiting'

if __name__ == '__main__':
service = multiprocessing.Process(name='my_service', target=my_service)
worker_1 = multiprocessing.Process(name='worker 1', target=worker)
worker_2 = multiprocessing.Process(target=worker) # use default name

worker_1.start()
worker_2.start()
service.start()

The debug output includes the name of the current process on each line. The
lines with “Process-3” in the name column correspond to the unnamed
process w2.

$ python multiprocessing_names.py
worker 1 Starting
worker 1 Exiting
Process-3 Starting
Process-3 Exiting
my_service Starting
my_service Exiting


Daemon Processes

By default the main program will not exit until all of the children have exited. There are
times when you want to start a background process and let it run without blocking the main
program from exiting. Using daemon processes like this is useful for services where there may
not be an easy way to interrupt the worker or where letting it die in the middle of its work
does not lose or corrupt data (for example, a task that generates “heart beats” for a service
monitoring tool). To mark a process as a daemon, set its daemon attribute with a boolean
value. The default is for processes to not be daemons, so passing True turns the daemon mode
on.

import multiprocessing
import time
import sys

def daemon():
print 'Starting:', multiprocessing.current_process().name
time.sleep(2)
print 'Exiting :', multiprocessing.current_process().name

def non_daemon():
print 'Starting:', multiprocessing.current_process().name
print 'Exiting :', multiprocessing.current_process().name

if __name__ == '__main__':
d = multiprocessing.Process(name='daemon', target=daemon)
d.daemon = True

n = multiprocessing.Process(name='non-daemon', target=non_daemon)
n.daemon = False

d.start()
time.sleep(1)
n.start()

Notice that the output does not include the “Exiting” message from the daemon
process, since all of the non-daemon processes (including the main program) exit
before the daemon process wakes up from its 2 second sleep.

$ python multiprocessing_daemon.py
Starting: non-daemon
Exiting : non-daemon

The daemon process is terminated before the main program exits, to avoid leaving orphaned processes running.


Waiting for Processes

To wait until a process has completed its work and exited, use the join() method.

import multiprocessing
import time
import sys

def daemon():
print 'Starting:', multiprocessing.current_process().name
time.sleep(2)
print 'Exiting :', multiprocessing.current_process().name

def non_daemon():
print 'Starting:', multiprocessing.current_process().name
print 'Exiting :', multiprocessing.current_process().name

if __name__ == '__main__':
d = multiprocessing.Process(name='daemon', target=daemon)
d.daemon = True

n = multiprocessing.Process(name='non-daemon', target=non_daemon)
n.daemon = False

d.start()
time.sleep(1)
n.start()

d.join()
n.join()

Since we wait for the daemon to exit using join(), we do see its
“Exiting” message.

$ python multiprocessing_daemon_join.py
Starting: non-daemon
Exiting : non-daemon
Starting: daemon
Exiting : daemon

By default, join() blocks indefinitely. It is also possible to pass a timeout
argument (a float representing the number of seconds to wait for the process to
become inactive). If the process does not complete within the timeout period,
join() returns anyway.

import multiprocessing
import time
import sys

def daemon():
print 'Starting:', multiprocessing.current_process().name
time.sleep(2)
print 'Exiting :', multiprocessing.current_process().name

def non_daemon():
print 'Starting:', multiprocessing.current_process().name
print 'Exiting :', multiprocessing.current_process().name

if __name__ == '__main__':
d = multiprocessing.Process(name='daemon', target=daemon)
d.daemon = True

n = multiprocessing.Process(name='non-daemon', target=non_daemon)
n.daemon = False

d.start()
n.start()

d.join(1)
print 'd.is_alive()', d.is_alive()
n.join()

Since the timeout passed is less than the amount of time the daemon
sleeps, the process is still “alive” after join() returns.

$ python multiprocessing_daemon_join_timeout.py
Starting: non-daemon
Exiting : non-daemon
d.is_alive() True


Terminating Processes

Although it is better to use the poison pill method of signaling to a process that it should exit, if a process appears hung or deadlocked it can be useful to be able to kill it forcibly. Calling terminate() on a process object kills the child process.

import multiprocessing
import time

def slow_worker():
print 'Starting worker'
time.sleep(0.1)
print 'Finished worker'

if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print 'BEFORE:', p, p.is_alive()

p.start()
print 'DURING:', p, p.is_alive()

p.terminate()
print 'TERMINATED:', p, p.is_alive()

p.join()
print 'JOINED:', p, p.is_alive()

Note

It is important to join() the process after terminating it in order to give the background machinery time to update the status of the object to reflect the termination.

$ python multiprocessing_terminate.py
BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
TERMINATED: <Process(Process-1, started)> True
JOINED: <Process(Process-1, stopped[SIGTERM])> False

Process Exit Status

The status code produced when the process exits can be accessed via the exitcode attribute.

For exitcode values


  • == 0 – no error was produced
  • > 0 – the process had an error, and exited with that code
  • < 0 – the process was killed with a signal of -1 * exitcode
import multiprocessing
import sys
import time

def exit_error():
sys.exit(1)

def exit_ok():
return

def return_value():
return 1

def raises():
raise RuntimeError('There was an error!')

def terminated():
time.sleep(3)

if __name__ == '__main__':
jobs = []
for f in [exit_error, exit_ok, return_value, raises, terminated]:
print 'Starting process for', f.func_name
j = multiprocessing.Process(target=f, name=f.func_name)
jobs.append(j)
j.start()

jobs[-1].terminate()

for j in jobs:
j.join()
print '%s.exitcode = %s' % (j.name, j.exitcode)

Processes that raise an exception automatically get an exitcode of 1.

$ python multiprocessing_exitcode.py
Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Process raises:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "multiprocessing_exitcode.py", line 24, in raises
raise RuntimeError('There was an error!')
RuntimeError: There was an error!
Starting process for terminated
exit_error.exitcode = 1
exit_ok.exitcode = 0
return_value.exitcode = 0
raises.exitcode = 1
terminated.exitcode = -15

Logging

When debugging concurrency issues, it can be useful to have access to the internals of the objects provided by multiprocessing. There is a convenient module-level function to enable logging called log_to_stderr(). It sets up a logger object using logging and adds a handler so that log messages are sent to the standard error channel.

import multiprocessing
import logging
import sys

def worker():
print 'Doing some work'
sys.stdout.flush()

if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target=worker)
p.start()
p.join()

By default the logging level is set to NOTSET so no messages are produced. Pass a different level to initialize the logger to the level of detail you want.

$ python multiprocessing_log_to_stderr.py
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

To manipulate the logger directly (change its level setting or add handlers), use get_logger().

import multiprocessing
import logging
import sys

def worker():
print 'Doing some work'
sys.stdout.flush()

if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
p.start()
p.join()

The logger can also be configured through the logging configuration file API, using the name multiprocessing.

$ python multiprocessing_get_logger.py
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

Subclassing Process

Although the simplest way to start a job in a separate process is to use Process and pass a target function, it is also possible to use a custom subclass. The derived class should override run() to do its work.

import multiprocessing

class Worker(multiprocessing.Process):

def run(self):
print 'In %s' % self.name
return

if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
$ python multiprocessing_subclass.py
In Worker-1
In Worker-2
In Worker-3
In Worker-4
In Worker-5

See also

multiprocessing
The standard library documentation for this module.
threading
High-level API for working with threads.

PyMOTW Home