Implementing MapReduce with multiprocessing
Implementing MapReduce with multiprocessing
While looking for example applications for Python's multiiprocessing module to use in this week's PyMOTW, someone suggested implementing MapReduce. Below is the simple implementation I came up with (the source is included in the PyMOTW tarball as of version 1.89).
The Pool class can be used to create a simple single-server MapReduce implementation. Although it does not give the full benefits of distributed processing, it does illustrate how easy it is to break some problems down into distributable units of work.
SimpleMapReduce
In MapReduce, input data is broken down into chunks for processing by different worker instances. Each chunk of input data is mapped to an intermediate state using a simple transformation. The intermediate data is then collected together and partitioned based on a key value so that all of the related values are together. Finally, the partitioned data is reduced to a result set.
import collections
import multiprocessing
class SimpleMapReduce(object):
def __init__(self, map_func, reduce_func, num_workers=None):
"""
map_func
Function to map inputs to intermediate data. Takes as argument one input value and
returns a tuple with the key and a value to be reduced.
reduce_func
Function to reduce partitioned version of intermediate data to final output. Takes
as argument a key as produced by map_func and a sequence of the values associated
with that key.
num_workers
The number of workers to create in the pool. Defaults to the number of CPUs
available on the current host.
"""
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)
def partition(self, mapped_values):
"""Organize the mapped values by their key.
Returns a dictionary mapping each key to a sequence of values.
"""
partitioned_data = collections.defaultdict(list)
for sublist in mapped_values:
for key, value in sublist:
partitioned_data[key].append(value)
return partitioned_data
def __call__(self, inputs, chunksize=1):
"""Process the inputs through the map and reduce functions given.
inputs
An iterable containing the input data to be processed.
chunksize=1
The portion of the input data to hand to each worker. This
can be used to tune performance during the mapping phase.
"""
mapped_values = self.pool.map(self.map_func, inputs, chunksize=chunksize)
partitioned_data = self.partition(mapped_values)
reduced_values = self.pool.map(self.reduce_func, partitioned_data.items())
return reduced_values
Counting Words in Files
The following example script uses SimpleMapReduce to counts the “words” in the reStructuredText source for this article, ignoring some of the markup.
import multiprocessing
import string
from multiprocessing_mapreduce import SimpleMapReduce
def file_to_words(filename):
"""Read a file and return a sequence of (word, occurances) values.
"""
STOP_WORDS = set([
'a', 'an', 'and', 'are', 'as', 'be', 'for', 'if', 'in',
'is', 'it', 'of', 'or', 'py', 'rst', 'the', 'to', 'with',
])
TR = string.maketrans(string.punctuation, ' ' * len(string.punctuation))
print multiprocessing.current_process().name, 'reading', filename
output = []
with open(filename, 'rt') as f:
for line in f:
if line.lstrip().startswith('..'): # Skip rst comment lines
continue
line = line.translate(TR) # Strip punctuation
for word in line.split():
word = word.lower()
if word.isalpha() and word not in STOP_WORDS:
output.append( (word, 1) )
return output
def count_words(item):
"""Convert the partitioned data for a word to a
tuple containing the word and the number of occurances.
"""
word, occurances = item
return (word, sum(occurances))
if __name__ == '__main__':
import operator
import glob
input_files = glob.glob('*.rst')
mapper = SimpleMapReduce(file_to_words, count_words)
word_counts = mapper(input_files)
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()
print '\nTOP 20 WORDS BY FREQUENCY\n'
top20 = word_counts[:20]
longest = max(len(word) for word, count in top20)
for word, count in top20:
print '%-*s: %5s' % (longest+1, word, count)
Each input filename is converted to a sequence of (word, 1) pairs by file_to_words. The data is partitioned by SimpleMapReduce.partition() using the word as the key, so the partitioned data consists of a key and a sequence of 1 values representing the number of occurrences of the word. The reduction phase converts that to a pair of (word, count) values by calling count_words for each element of the partitioned data set.
$ python multiprocessing_wordcount.py
PoolWorker-1 reading communication.rst
PoolWorker-1 reading index.rst
PoolWorker-1 reading mapreduce.rst
PoolWorker-2 reading basics.rst
TOP 20 WORDS BY FREQUENCY
process : 74
multiprocessing : 42
worker : 35
after : 31
running : 30
start : 29
python : 28
processes : 27
literal : 26
header : 26
pymotw : 26
end : 26
daemon : 23
now : 22
consumer : 20
starting : 18
exiting : 17
event : 16
that : 16
by : 15See also
- MapReduce - Wikipedia
- Overview of MapReduce on Wikipedia.
- MapReduce: Simplified Data Processing on Large Clusters
- Google Labs presentation and paper on MapReduce.
Special thanks to Jesse Noller for helping review this information.
