Sunday, September 30, 2007

PyMOTW: copy

The copy module provides functions for duplicating objects using shallow or deep copy semantics.

Module: copy
Purpose: Duplicate objects.
Python Version: 1.4

Description:

The copy module includes 2 functions, copy() and deepcopy(), for duplicating existing objects.

Shallow Copies:

The shallow copy created by copy() is a new container populated with references to the contents of the original object. For example, a new list is constructed and the elements of the original list are appended to it.

import copy

class MyClass:
def __init__(self, name):
self.name = name
def __cmp__(self, other):
return cmp(self.name, other.name)

a = MyClass('a')
l = [ a ]
dup = copy.copy(l)

print 'l :', l
print 'dup:', dup
print 'dup is l:', (dup is l)
print 'dup == l:', (dup == l)
print 'dup[0] is l[0]:', (dup[0] is l[0])
print 'dup[0] == l[0]:', (dup[0] == l[0])


For a shallow copy, the MyClass instance is not duplicated so the reference in the dup list is to the same object that is in the l list.

$ python copy_shallow.py
l : [<__main__.MyClass instance at 0x78d00>]
dup: [<__main__.MyClass instance at 0x78d00>]
dup is l: False
dup == l: True
dup[0] is l[0]: True
dup[0] == l[0]: True


Deep Copies:

The deep copy created by deepcopy() is a new container populated with copies of the contents of the original object. For example, a new list is constructed and the elements of the original list are copied, then the copies are appended to the new list.

By replacing the call to copy() with deepcopy(), the difference becomes apparent.

dup = copy.deepcopy(l)


Notice that the first element of the list is no longer the same object reference, but the two objects still evaluate as being equal.

$ python copy_deep.py
l : [<__main__.MyClass instance at 0x78d28>]
dup: [<__main__.MyClass instance at 0x78cd8>]
dup is l: False
dup == l: True
dup[0] is l[0]: False
dup[0] == l[0]: True


Controlling Copy Behavior:

It is possible to control how copies are made using the __copy__ and __deepcopy__ hooks.

__copy__() is called without any arguments and should return a shallow copy of the object.

__deepcopy__() is called with a memo dictionary, and should return a deep copy of the object. Any member attributes which need to be deep-copied should be passed to copy.deepcopy(), along with the memo dictionary, to control for recursion (see below).

This example illustrates how the methods are called:

import copy

class MyClass:
def __init__(self, name):
self.name = name
def __cmp__(self, other):
return cmp(self.name, other.name)
def __copy__(self):
print '__copy__()'
return MyClass(self.name)
def __deepcopy__(self, memo):
print '__deepcopy__(%s)' % str(memo)
return MyClass(copy.deepcopy(self.name, memo))

a = MyClass('a')

sc = copy.copy(a)
dc = copy.deepcopy(a)


$ python copy_hooks.py
__copy__()
__deepcopy__({})


Recursion in Deep Copy:

To avoid problems with duplicating recursive data structures, deepcopy() uses a dictionary to track objects which have already been copied. This dictionary is passed to the __deepcopy__() method so it can be used there as well.

This example shows how an interconnected data structure such as a Digraph might assist with protecting against recursion by implementing a __deepcopy__() method. This particular example is just for illustration purposes, since the default implementation of deepcopy() already handles the recursion cases correctly.

First some basic directed graph methods. A graph can be initialized with a name and a list of existing nodes to which it is connected. The addConnection() method is used to set up bi-directional connections. It is also used by the deepcopy operator.

import copy
import pprint

class Graph:
def __init__(self, name, connections):
self.name = name
self.connections = connections
def addConnection(self, other):
self.connections.append(other)
def __repr__(self):
return '<Graph(%s) id=%s>' % (self.name, id(self))


The __deepcopy__() method prints messages to show how it is called, and manages the memo dictionary contents as needed. Instead of copying the connection list wholesale, it creates a new list and appends copies of the individual connections to it. That ensures that the memo dictionary is updated as each new node is duplicated, and avoids recursion issues or extra copies of nodes. As before, it returns the copied object when it is done.


def __deepcopy__(self, memo):
print
print repr(self)
not_there = []
existing = memo.get(self, not_there)
if existing is not not_there:
print ' ALREADY COPIED TO', repr(existing)
return existing
pprint.pprint(memo, indent=4, width=40)
dup = Graph(copy.deepcopy(self.name, memo), [])
print ' COPYING TO', repr(dup)
memo[self] = dup
for c in self.connections:
dup.addConnection(copy.deepcopy(c, memo))
return dup


Next we can set up a graph with a nodes root, a, and b. The edges are a->root, b->a, b->root, root->a, root->b.

root = Graph('root', [])
a = Graph('a', [root])
b = Graph('b', [a, root])
root.addConnection(a)
root.addConnection(b)

dup = copy.deepcopy(root)


When the root node is copied, we see:

$ python copy_recursion.py

<Graph(root) id=508592>
{}
COPYING TO <Graph(root) id=510792>

<Graph(a) id=510512>
{ <Graph(root) id=508592>: <Graph(root) id=510792>,
488896: 'root',
497392: ['root']}
COPYING TO <Graph(a) id=510752>

<Graph(root) id=508592>
ALREADY COPIED TO <Graph(root) id=510792>

<Graph(b) id=510552>
{ <Graph(root) id=508592>: <Graph(root) id=510792>,
<Graph(a) id=510512>: <Graph(a) id=510752>,
237120: 'a',
488896: 'root',
497392: [ 'root',
'a',
<Graph(root) id=508592>,
<Graph(a) id=510512>],
508592: <Graph(root) id=510792>,
510512: <Graph(a) id=510752>}
COPYING TO <Graph(b) id=530392>


Notice that the second time the root node is encountered, while the a node is being copied, the recursion is detected and the existing copy is used instead of a new one.

References:

Python Module of the Week Home
Download Sample Code


Technorati Tags:
,


Tuesday, September 25, 2007

PyMOTW: sched

The sched module implements a generic event scheduler for running tasks at specific times.

Module: sched
Purpose: Generic event scheduler.
Python Version: 1.4

Description:

The scheduler class uses a generic interface to schedule events. It uses a time function to learn the current time, and a delay function to wait for a specific period of time. The actual units of time are not important, which makes the interface flexible enough to be used for many purposes.

The time function is called without any arguments, and should return a number representing the current time. The delay function is called with a single integer argument, using the same scale as the time function, and should wait that many time units before returning. For example, the time.time() and time.sleep() functions meet these requirements.

To support multi-threaded applications, the delay function is called with argument 0 after each event is generated, to ensure that other threads also have a chance to run.

Running Events With a Delay:

Events can be scheduled to run after a delay, or at a specific time. To schedule them with a delay, use the enter() method, which takes 4 arguments:

  • A number representing the delay
  • A priority value
  • The function to call
  • A tuple of arguments for the function


This example schedules 2 different events to run after 2 and 3 seconds respectively. When the event's time comes up, print_event() is called and prints the current time and the name argument passed to the event.

import sched
import time

scheduler = sched.scheduler(time.time, time.sleep)

def print_event(name):
print 'EVENT:', time.time(), name

print 'START:', time.time()
scheduler.enter(2, 1, print_event, ('first',))
scheduler.enter(3, 1, print_event, ('second',))

scheduler.run()


The output will look something like this:

$ python sched_basic.py
START: 1190727943.36
EVENT: 1190727945.36 first
EVENT: 1190727946.36 second


The time printed for the first event is 2 seconds after start, and the time for the second event is 3 seconds after start.

Overlapping Events:

The call to run() blocks until all of the events have been processed. Each event is run in the same thread, so if an event takes longer to run than the delay between events, there will be overlap. The overlap is resolved by postponing the later event. No events are lost, but some events may be called later than they were scheduled. In the next example, long_event() sleeps but it could just as easily delay by performing a long calculation or by blocking on I/O.

import sched
import time

scheduler = sched.scheduler(time.time, time.sleep)

def long_event(name):
print 'BEGIN EVENT :', time.time(), name
time.sleep(2)
print 'FINISH EVENT:', time.time(), name

print 'START:', time.time()
scheduler.enter(2, 1, long_event, ('first',))
scheduler.enter(3, 1, long_event, ('second',))

scheduler.run()


The result is the second event is run immediately after the first finishes, since the first event took long enough to push the clock past the desired start time of the second event.

$ python sched_overlap.py 
START: 1190728573.16
BEGIN EVENT : 1190728575.16 first
FINISH EVENT: 1190728577.16 first
BEGIN EVENT : 1190728577.16 second
FINISH EVENT: 1190728579.16 second


Event Priorities:

If more than one event is scheduled for the same time their priority values are used to determine the order they are run.

now = time.time()
print 'START:', now
scheduler.enterabs(now+2, 2, print_event, ('first',))
scheduler.enterabs(now+2, 1, print_event, ('second',))
scheduler.run()


In order to ensure that they are scheduled for the exact same time, the enterabs() method is used instead of enter(). The first argument to enterabs() is the time to run the event, instead of the amount of time to delay.

$ python sched_priority.py 
START: 1190728789.4
EVENT: 1190728791.4 second
EVENT: 1190728791.4 first


Canceling Events:

Both enter() and enterabs() return a reference to the event which can be used to cancel it later. Since run() blocks, the event has to be canceled in a different thread. For this example, a thread is started to run the scheduler and the main processing thread is used to cancel the event.

import sched
import threading
import time

scheduler = sched.scheduler(time.time, time.sleep)

# Set up a global to be modified by the threads
counter = 0

def increment_counter(name):
global counter
print 'EVENT:', time.time(), name
counter += 1
print 'NOW:', counter

print 'START:', time.time()
e1 = scheduler.enter(2, 1, increment_counter, ('E1',))
e2 = scheduler.enter(3, 1, increment_counter, ('E2',))

# Start a thread to run the events
t = threading.Thread(target=scheduler.run)
t.start()

# Back in the main thread, cancel the first scheduled event.
scheduler.cancel(e1)

# Wait for the scheduler to finish running in the thread
t.join()

print 'FINAL:', counter


Two events were scheduled, but the first was later canceled. Only the second event runs, so the counter variable is only incremented one time.

$ python sched_cancel.py
START: 1190729094.13
EVENT: 1190729097.13 E2
NOW: 1
FINAL: 1


References:

Python Module of the Week Home
Download Sample Code


Technorati Tags:
,


Thursday, September 20, 2007

PyMOTW: timeit

The timeit module provides a simple interface for determining the execution time of small bits of Python code. It uses a platform-specific time function to provide the most accurate time calculation possible.

Module: timeit
Purpose: Time the execution of small bits of Python code.
Python Version: 2.3

The timeit module offers programmatic and command line interfaces for testing the time it takes to run small bits of code. It reduces the impact of startup or shutdown costs on the time calculation by executing the code repeatedly.

Module Contents:

timeit defines a single public class, Timer. The constructor for Timer takes a statement to be timed, and a setup statement (to initialize variables, for example). The Python statements should be strings and can include embedded newlines.

The timeit() method runs the setup statement one time, then executes the primary statement repeatedly and returns the amount of time which passes. The argument to timeit() controls how many times to run the statement; the default is 1,000,000.

Example:

To illustrate how the various arguments to Timer are used, here is a simple example which prints an identifying value when each statement is executed:

import timeit

# using setitem
t = timeit.Timer("print 'main statement'", "print 'setup'")

print 'TIMEIT:'
print t.timeit(2)

print 'REPEAT:'
print t.repeat(3, 2)


When run, the output is:

$ python timeit_example.py
TIMEIT:
setup
main statement
main statement
0.000208854675293
REPEAT:
setup
main statement
main statement
setup
main statement
main statement
setup
main statement
main statement
[0.00019812583923339844, 0.00019383430480957031, 0.00019383430480957031]


When called, timeit() runs the setup statement one time, then calls the main statement count times. It returns a single floating point value representing the amount of time it took to run the main statement count times.

When repeat() is used, it calls timeit() severeal times (3 in this case) and all of the responses are returned in a list.

Storing Values in a Dictionary:

For a more complex example, let's compare the amount of time it takes to populate a dictionary with a large number of values using a variety of methods. First, a few constants are needed to configure the Timer. We'll be using a list of tuples containing strings and integers. The Timer will be storing the integers in a dictionary using the strings as keys.

import timeit
import sys

# A few constants
range_size=1000
count=1000
setup_statement="l = [ (str(x), x) for x in range(%d) ]; d = {}" % range_size


Next, we can define a short utility function to print the results in a useful format. The timeit() method returns the amount of time it takes to execute the statement repeatedly. The output of show_results() converts that into the amount of time it takes per iteration, and then further reduces the value to the amount of time it takes to store one item in the dictionary (as averages, of course).

def show_results(result):
"Print results in terms of microseconds per pass and per item."
global count, range_size
per_pass = 1000000 * (result / count)
print '%.2f usec/pass' % per_pass,
per_item = per_pass / range_size
print '%.2f usec/item' % per_item

print "%d items" % range_size
print "%d iterations" % count
print


To establish a baseline, the first configuration tested will use __setitem__. All of the other variations avoid overwriting values already in the dictionary, so this simple version should be the fastest.

Notice that the first argument to Timer is a multi-line string, with indention preserved to ensure that it parses correctly when run. The second argument is a constant established above to initialize the list of values and the dictionary.

# Using __setitem__ without checking for existing values first
print '__setitem__:\t',
sys.stdout.flush()
# using setitem
t = timeit.Timer("""
for s, i in l:
d[s] = i
""",
setup_statement)
show_results(t.timeit(number=count))


The next variation uses setdefault() to ensure that values already in the dictionary are not overwritten.

# Using setdefault
print 'setdefault:\t',
sys.stdout.flush()
t = timeit.Timer("""
for s, i in l:
d.setdefault(s, i)
""",
setup_statement)
show_results(t.timeit(number=count))


Another way to avoid overwriting existing values is to use has_key() to check the contents of the dictionary explicitly.

# Using has_key
print 'has_key:\t',
sys.stdout.flush()
# using setitem
t = timeit.Timer("""
for s, i in l:
if not d.has_key(s):
d[s] = i
""",
setup_statement)
show_results(t.timeit(number=count))


Or by adding the value only if we receive a KeyError exception when looking for the existing value.

# Using exceptions
print 'KeyError:\t',
sys.stdout.flush()
# using setitem
t = timeit.Timer("""
for s, i in l:
try:
existing = d[s]
except KeyError:
d[s] = i
""",
setup_statement)
show_results(t.timeit(number=count))


And the last method we will test is the (relatively) new form using "in" to determine if a dictionary has a particular key.

# Using "not in"
print '"in":\t',
sys.stdout.flush()
# using setitem
t = timeit.Timer("""
for s, i in l:
if s not in d:
d[s] = i
""",
setup_statement)
show_results(t.timeit(number=count))


When run, the script produces output similar to this:

$ python timeit_dictionary.py 
1000 items
1000 iterations

__setitem__: 848.35 usec/pass 0.85 usec/item
setdefault: 2050.21 usec/pass 2.05 usec/item
has_key: 1554.51 usec/pass 1.55 usec/item
KeyError: 1040.11 usec/pass 1.04 usec/item
"not in": 707.38 usec/pass 0.71 usec/item


Those times are for a G4 PowerBook running Python 2.5. Your times will be different. Experiment with the range_size and count variables, since different combinations will produce different results. For example:

$ python timeit_dictionary.py 
10000 items
1000 iterations

__setitem__: 11227.27 usec/pass 1.12 usec/item
setdefault: 24125.01 usec/pass 2.41 usec/item
has_key: 19145.40 usec/pass 1.91 usec/item
KeyError: 24127.39 usec/pass 2.41 usec/item
"not in": 16064.97 usec/pass 1.61 usec/item


From the Command Line:

In addition to the programmatic interface, timeit provides a command line interface for testing modules without instrumentation.

To run the module, use the new -m option to find the module and treat it as main:

$ python -m timeit


For example, to get help:

$ python -m timeit -h
Tool for measuring execution time of small code snippets.

This module avoids a number of common traps for measuring execution
times. See also Tim Peters' introduction to the Algorithms chapter in
the Python Cookbook, published by O'Reilly.

Library usage: see the Timer class.

Command line usage:
python timeit.py [-n N] [-r N] [-s S] [-t] [-c] [-h] [statement]

Options:
-n/--number N: how many times to execute 'statement' (default: see below)
-r/--repeat N: how many times to repeat the timer (default 3)
-s/--setup S: statement to be executed once initially (default 'pass')
-t/--time: use time.time() (default on Unix)
-c/--clock: use time.clock() (default on Windows)
-v/--verbose: print raw timing results; repeat for more digits precision
-h/--help: print this usage message and exit
statement: statement to be timed (default 'pass')


The statement argument works a little differently than the argument to Timer. Instead of one long string, you pass each line of the instructions as a separate command line argument. To indent lines (such as inside a loop), embed spaces in the string by enclosing the whole thing in quotes. For example:

$ python -m timeit -s "d={}" "for i in range(1000):" "  d[str(i)] = i"
10 loops, best of 3: 16.7 msec per loop


It is also possible to define a function with more complex code, then import the module and call the function from the command line:

def test_setitem(range_size=1000):
l = [ (str(x), x) for x in range(range_size) ]
d = {}
for s, i in l:
d[s] = i


Then to run the test:

$ python -m timeit "import timeit_setitem; timeit_setitem.test_setitem()"
100 loops, best of 3: 3.56 msec per loop


References:

Python Module of the Week Home
Download Sample Code


Technorati Tags:
,


Tuesday, September 18, 2007

PyATL Book Club on O'Reilly

Marsee Henon from O'Reilly recently interviewed a couple of us about the Atlanta-area Python Users' Group Book Club. She asked some good questions, and although I'm not entirely comfortable with the group being characterized as an "O'Reilly" book club, O'Reilly does offer us a lot of support (esp. free books) so I guess I shouldn't complain.

If you're interested in participating in a technical book club, you don't have to live in or even near, Atlanta to join ours. Head over to http://pyatl.org/bookclub and join the Google Group we have set up for our discussions.



Technorati Tags:
,


Wednesday, September 12, 2007

PyMOTW: hmac

The hmac module implements keyed-hashing for message authentication, as described in RFC-2104.

Module: hmac
Purpose: Cryptographic signature and verification of messages.
Python Version: 2.2

Description:

The HMAC algorithm can be used to verify the integrity of information passed between applications or stored in a potentially vulnerable location. The basic idea is to generate a cryptographic hash of the actual data combined with a shared secret key. The resulting hash can then be used to check the transmitted or stored message to determine a level of trust, without transmitting the secret key.

Disclaimer: I'm not a security expert. For the full details on HMAC, check out RFC-2104.

Example:

Creating the hash is not complex. Here's a simple example which uses the default MD5 hash algorithm:

import hmac

digest_maker = hmac.new('secret-shared-key-goes-here')

f = open('hmac_simple.py', 'rb')
try:
while True:
block = f.read(1024)
if not block:
break
digest_maker.update(block)
finally:
f.close()

digest = digest_maker.hexdigest()
print digest


When run, the code reads its source file and computes an HMAC signature for it:

$ python hmac_simple.py 
7ce1ea6b179f5053b285410600dc3327


If I haven't changed the file by the time I release the example source for this week, the copy you download should produce the same hash.

SHA vs. MD5

Although the default cryptographic algorithm for hmac is MD5, that is not the most secure method to use. MD5 hashes have some weaknesses, such as collisions (where two different messages produce the same hash). The SHA-1 algorithm is considered to be stronger, and should be used instead.

import hmac
import hashlib

digest_maker = hmac.new('secret-shared-key-goes-here', '', hashlib.sha1)

f = open('hmac_sha.py', 'rb')
try:
while True:
block = f.read(1024)
if not block:
break
digest_maker.update(block)
finally:
f.close()

digest = digest_maker.hexdigest()
print digest


hmac.new() takes 3 arguments. The first is the secret key, which should be shared between the two endpoints which are communicating so both ends can use the same value. The second value is an initial message. If the message content that needs to be authenticated is small, such as a timestamp or HTTP POST, the entire body of the message can be passed to new() instead of using the update() method. The last argument is the digest module to be used. The default is hashlib.md5. The previous example substitutes hashlib.sha1.

$ python hmac_sha.py
ab667429a893eb1eaa0554dcff2511f130969044


Binary Digests:

The first few examples used the hexdigest() method to produce printable digests. The hexdigest is is a different representation of the value calculated by the digest() method, which is a binary value that may include unprintable or non-ASCII characters, including NULs. Some web services (Google checkout, Amazon S3) use the base64 encoded version of the binary digest instead of the hexdigest.

import base64
import hmac
import hashlib

f = open('hmac_base64.py', 'rb')
try:
body = f.read()
finally:
f.close()

digest = hmac.new('secret-shared-key-goes-here', body, hashlib.sha1).digest()
print base64.encodestring(digest)


The base64 encoded string ends in a newline, which frequently needs to be stripped off when embedding the string in HTTP headers or other formatting-sensitive contexts.

$ python hmac_base64.py
IVSRpXWIt2k15UUH3gKkU5ogfV4=


Applications:

HMAC authentication should be used for any public network service, and any time data is stored where security is important. For example, when sending data through a pipe or socket, that data should be signed and then the signature should be tested before the data is used. The extended example below is available in the hmac_pickle.py file as part of the PyMOTW source package.

First, let's establish a function to calculate a digest for a string, and a simple class to be instantiated and passed through a communication channel.

import hashlib
import hmac
try:
import cPickle as pickle
except:
import pickle
import pprint
from StringIO import StringIO


def make_digest(message):
"Return a digest for the message."
return hmac.new('secret-shared-key-goes-here', message, hashlib.sha1).hexdigest()


class SimpleObject(object):
"A very simple class to demonstrate checking digests before unpickling."
def __init__(self, name):
self.name = name
def __str__(self):
return self.name


Next, create StringIO buffer to represent the socket or pipe. We will using a naive, but easy to parse, format for the data stream. The digest and length of the pickled data are written, followed by a new line. The pickled representation of the object follows. In a real system, we would not want to depend on a length value, since if the digest is wrong the length is probably wrong as well. Some sort of terminator sequence not likely to appear in the real data would be more appropriate.

For this example, we will write two objects to the stream. The first is written using the correct digest value.

# Simulate a writable socket or pipe with StringIO
out_s = StringIO()

# Write a valid object to the stream:
# digest\nlength\npickle
o = SimpleObject('digest matches')
pickled_data = pickle.dumps(o)
digest = make_digest(pickled_data)
header = '%s %s' % (digest, len(pickled_data))
print '\nWRITING:', header
out_s.write(header + '\n')
out_s.write(pickled_data)


The second object is written to the stream with an invalid digest, produced by calculating the digest for some other data instead of the pickle.

# Write an invalid object to the stream
o = SimpleObject('digest does not match')
pickled_data = pickle.dumps(o)
digest = make_digest('not the pickled data at all')
header = '%s %s' % (digest, len(pickled_data))
print '\nWRITING:', header
out_s.write(header + '\n')
out_s.write(pickled_data)

out_s.flush()


Now that the data is in the StringIO buffer, we can read it back out again. The first step is to read the line of data with the digest and data length. Then the remaining data is read (using the length value). We could use pickle.load() to read directly from the stream, but that assumes a trusted data stream and we do not yet trust the data enough to unpickle it. Reading the pickle as a string collect the data from the stream, without actually unpickling the object.

# Simulate a readable socket or pipe with StringIO
in_s = StringIO(out_s.getvalue())

# Read the data
while True:
first_line = in_s.readline()
if not first_line:
break
incoming_digest, incoming_length = first_line.split(' ')
incoming_length = int(incoming_length)
print '\nREAD:', incoming_digest, incoming_length
incoming_pickled_data = in_s.read(incoming_length)


Once we have the pickled data, we can recalculate the digest value and compare it against what we read. If the digests match, we assume it is safe to trust the data and unpickle it.

    actual_digest = make_digest(incoming_pickled_data)
print 'ACTUAL:', actual_digest

if incoming_digest != actual_digest:
print 'WARNING: Data corruption'
else:
obj = pickle.loads(incoming_pickled_data)
print 'OK:', obj


The output shows that the first object is verified and the second is deemed "corrupted", as expected:

$ python hmac_pickle.py

WRITING: 387632cfa3d18cd19bdfe72b61ac395dfcdc87c9 124

WRITING: b01b209e28d7e053408ebe23b90fe5c33bc6a0ec 131

READ: 387632cfa3d18cd19bdfe72b61ac395dfcdc87c9 124
ACTUAL: 387632cfa3d18cd19bdfe72b61ac395dfcdc87c9
OK: digest matches

READ: b01b209e28d7e053408ebe23b90fe5c33bc6a0ec 131
ACTUAL: dec53ca1ad3f4b657dd81d514f17f735628b6828
WARNING: Data corruption


References:

Python Module of the Week Home
Download Sample Code
Wikipedia: MD5
Authenticating to Amazon S3


Technorati Tags:
,


Sunday, September 9, 2007

Book Review: RESTful Web Services

As part of the Atlanta Python Users' Group Book Club, I received a copy of RESTful Web Services written by Leonard Richardson and Sam Ruby, and published by O'Reilly Media. When we started the book club, this was the first book I suggested we read. I had previously studied some material on various blogs discussing REST, but I wanted a clear description and more specific examples. The book provided both, and I highly recommend reading it before planning your next web development project.

Overview

Unlike many such books, RESTful does not depend on a single programming language for examples. Much of the code is Ruby, but Python and Java make up a respectable proportion of the material as well. Since I was primarily interested in the design principles and "theory", I did not try to run any of the sample code myself. Others in the book club have, so check the forum for more details if you are interested in that aspect of the book.

The outline of the book follows a well thought-out progression of topics from basic "programmable web" concepts to in-depth discussion of Roy Fielding's Representational State Transfer (REST) ideas and then Resource Oriented Architecture (ROA), a natural extension of REST. Intermediate chapters include discussions of best-of-breed tools for web development and copious example code.

Outline

Chapter 1 is a foundation chapter for the remainder of the book. It describes how the HTTP protocol works and breaks down the different architectural styles discussed in the remaining chapters (REST, RPC, and REST-RPC hybrid). The theme of this chapter, and perhaps the entire book, is that "the human web is on the programmable web". If something is on the web, it is a service.

Chapter 2 introduces the concepts necessary to implement clients using web services. The easily digestible example code (in several languages) implements a client for the del.icio.us bookmarking service. Bookmarks are an excellent choice for an example program, since the information being managed is straightforward and everyone understands the concept, even if they have never used del.icio.us directly. Chapter 2 also includes recommendations for client tools and libraries for common languages. Basic HTTP access, JSON parsers, XML parsers (including details about DOM, SAX, and pull-style parsers and when each is appropriate), and WADL libraries are discussed, with best-of-breed options presented for each language.

In chapter 3, the authors use Amazon's S3 service design to point out features of the REST architecture which make it different from RPC-style APIs. The complexity of the examples increases to match the requirements of the service, including advanced authentication techniques.

Resource Oriented Architecture, introduced in Chapter 4 and discussed in an extended design example used through chapters 5 and 6, is perhaps the most interesting part of the book. ROA is a set of design principles which encourage you to think about your service in a specific way to enable REST APIs. The principles are:

Descriptive URIs
URIs should convey meaning
Addressibility
Expose all information via URLs
Statelessness
The client maintains the application state so the server does not have to.
Representations
Resources can have multiple representations, based on format, level of detail, language, and other criteria
Connectedness
Link between related resources explicitly within the representations, so the client does not have to know how to build URLs
Uniform Interface
Use the HTTP methods (GET, PUT, POST, DELETE) as designed


To illustrate these principles, in chapters 5 and 6 the authors build a web mapping service, similar to Google Maps. This detailed example also serves as a way to introduce their ROA development process.


1. Identify the data set to be managed by the service.
2. Split up that data into resources.
3. Name each resource with a URI.
4. Expose a subset of the uniform interface for each resource, depending on what makes sense and what features are to be supported.
5. Design representations to be passed from client to server.
6. Design representations to be passed from server to client.
7. Include links to other resources.
8. Consider a typical course of events, to ensure completeness.
9. Consider error conditions, to identify the HTTP response codes to be used.


Chapter 7 includes the implementation of a bookmarking service similar to del.icio.us. The sample code uses Ruby extensively, and it was a little more advanced than what I was prepared to absorb without a Ruby primer. One important point made in the prose of the chapter is that code frameworks may constrain your design by making certain choices for implementation easier or harder.

Chapter 8 is a summary of the REST and ROA principles discussed in the earlier chapters, and is an excellent reference once you've finished reading the whole book. It is also suitable as a "Cliff's Notes" version of the material, if you don't have time to read everything. If you want to review the book before reading it, go to the book store and take a look at this chapter.

While chapter 2 covered client implementation techniques, chapter 9 is a survey of tools and aspects of web service implementations in different languages. It covers topics such as XHTML, HTML5, microformats, Atom, JSON, RDF, control flow patterns, and WSDL.

In chapter 10, the authors give an extensive comparison of ROA and "Big Web Services" to argue that ROA is simpler, requires fewer tools, and can even be more interoperable.

Chapter 11 is the requisite "How to use this with AJAX" chapter.

And the book wraps up in chapter 12 with a discussion of frameworks for doing RESTful development in multiple languages. The coverage of Django includes a dispatcher that decides how to handle the request based on the HTTP method invoked.

Conclusion

Before reading "RESTful Web Services", I had a somewhat cloudy notion of REST and how to apply it. The book clarified what REST is and how to apply it. It also offered an invaluable concrete process to follow when implementing a web service using REST and ROA principles. I expect my copy to see a lot of use and become dog-eared as I refer back to it frequently.

PyATL Book Club

The Atlanta Python Users' Group runs an online book club. We encourage all Atlanta area Python developers to check the schedule on PyATL.org and come down to a meeting. Anyone is free to join and participate. For more reviews by members of the book club, check out the forums or our Reviews List.


Technorati Tags:
, ,


Monday, September 3, 2007

PyMOTW: unittest

Module: unittest
Purpose: Automated testing framework
Python Version: 2.1

Description:

Python's unittest module, sometimes referred to as PyUnit, is based on the XUnit framework design by Kent Beck and Erich Gamma. The same pattern is repeated in many other languages, including C, perl, Java, and Smalltalk. The framework implemented by unittest supports fixtures, test suites, and a test runner to enable automated testing for your code.

Basic Test Structure:

Tests, as defined by the unittest module, have two parts: code to manage test "fixtures", and the test itself. Individual tests are created by subclassing unittest.TestCase and overriding or adding appropriate methods. For example,

import unittest

class SimplisticTest(unittest.TestCase):
def test(self):
self.failUnless(True)

if __name__ == '__main__':
unittest.main()


In this case, the SimplisticTest has a single test() method, which would fail if True is ever False.

Running Tests:

The simplest way to run unittest tests is to include:

if __name__ == '__main__':
unittest.main()


at the bottom of each test file, then simply run the script directly from the command line:

$ python unittest_simple.py
.
----------------------------------------------------------------------
Ran 1 test in 0.000s

OK


This abbreviated output includes the amount of time the tests took, along with a status indicator for each test (the "." on the first line of output means that a test passed). For more detailed test results, include the -v option:

$ python unittest_simple.py -v
test (__main__.SimplisticTest) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.001s

OK


Test Outcomes:

Tests have 3 possible outcomes:

ok
The test passes.
FAIL
The test does not pass, and raises an AssertionError exception.
ERROR
The test raises an exception other than AssertionError.

There is no explicit way to cause a test to "pass", so a test's status depends on the presence (or absence) of an exception.

class OutcomesTest(unittest.TestCase):

def testPass(self):
return

def testFail(self):
self.failIf(True)

def testError(self):
raise RuntimeError('Test error!')


When a test fails or generates an error, the traceback is included in the output.

$ python unittest_outcomes.py   
EF.
======================================================================
ERROR: testError (__main__.OutcomesTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "unittest_outcomes.py", line 43, in testError
raise RuntimeError('Test error!')
RuntimeError: Test error!

======================================================================
FAIL: testFail (__main__.OutcomesTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "unittest_outcomes.py", line 40, in testFail
self.failIf(True)
AssertionError

----------------------------------------------------------------------
Ran 3 tests in 0.002s

FAILED (failures=1, errors=1)


In the example above, testFail() fails and the traceback shows the line with the failure code. It is up to the person reading the test output to look at the code to figure out the semantic meaning of the failed test, though. To make it easier to understand the nature of a test failure, the fail*() and assert*() methods all accept an argument msg, which can be used to produce a more detailed error message.

class FailureMessageTest(unittest.TestCase):
def testFail(self):
self.failIf(True, 'failure message goes here')


$ python unittest_failwithmessage.py -vtestFail (__main__.FailureMessageTest) ... FAIL

======================================================================
FAIL: testFail (__main__.FailureMessageTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "unittest_failwithmessage.py", line 37, in testFail
self.failIf(True, 'failure message goes here')
AssertionError: failure message goes here

----------------------------------------------------------------------
Ran 1 test in 0.002s

FAILED (failures=1)


Asserting Truth:

Most tests assert the truth of some condition. There are a few different ways to write truth-checking tests, depending on the perspective of the test author and the desired outcome of the code being tested. If the code produces a value which can be evaluated as true, the methods failUnless() and assertTrue() should be used. If the code produces a false value, the methods failIf() and assertFalse() make more sense.

class TruthTest(unittest.TestCase):

def testFailUnless(self):
self.failUnless(True)

def testAssertTrue(self):
self.assertTrue(True)

def testFailIf(self):
self.failIf(False)

def testAssertFalse(self):
self.assertFalse(False)


Testing Equality:

As a special case, unittest includes methods for testing the equality of two values.

class EqualityTest(unittest.TestCase):

def testEqual(self):
self.failUnlessEqual(1, 3-2)

def testNotEqual(self):
self.failIfEqual(2, 3-2)


These special tests are handy, since the values being compared appear in the failure message when a test fails.

class InequalityTest(unittest.TestCase):

def testEqual(self):
self.failIfEqual(1, 3-2)

def testNotEqual(self):
self.failUnlessEqual(2, 3-2)


And when these tests are run:

$ python unittest_notequal.py -v
testEqual (__main__.EqualityTest) ... FAIL
testNotEqual (__main__.EqualityTest) ... FAIL

======================================================================
FAIL: testEqual (__main__.EqualityTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "unittest_notequal.py", line 37, in testEqual
self.failIfEqual(1, 3-2)
AssertionError: 1 == 1

======================================================================
FAIL: testNotEqual (__main__.EqualityTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "unittest_notequal.py", line 40, in testNotEqual
self.failUnlessEqual(2, 3-2)
AssertionError: 2 != 1

----------------------------------------------------------------------
Ran 2 tests in 0.002s

FAILED (failures=2)


Almost Equal?

In addition to strict equality, it is possible to test for near equality of floating point numbers using failIfAlmostEqual() and failUnlessAlmostEqual().

class AlmostEqualTest(unittest.TestCase):

def testNotAlmostEqual(self):
self.failIfAlmostEqual(1.1, 3.3-2.0, places=1)

def testAlmostEqual(self):
self.failUnlessAlmostEqual(1.1, 3.3-2.0, places=0)


The arguments are the 2 values to be compared, and the number of decimal places to use for the test.

$ python  unittest_almostequal.py   
..
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK


Testing for Exceptions:

As previously mentioned, if a test raises an exception it is treated as an error in the test. This is very useful for uncovering mistakes while you are modifying code which has existing test coverage. There are circumstances, however, in which you want the test to verify that some code does produce an exception. For example, if an invalid value is given to an attribute of an object. In such cases, TestCase.failUnlessRaises() makes the code more clear than trapping the exception yourself. Compare these two tests:

def raises_error(*args, **kwds):
print args, kwds
raise ValueError('Invalid value: ' + str(args) + str(kwds))

class ExceptionTest(unittest.TestCase):

def testTrapLocally(self):
try:
raises_error('a', b='c')
except ValueError:
pass
else:
self.fail('Did not see ValueError')

def testFailUnlessRaises(self):
self.failUnlessRaises(ValueError, raises_error, 'a', b='c')


The results for both are the same, but the second test using failUnlessRaises() is more succinct.

$ python unittest_exception.py -v
testFailUnlessRaises (__main__.ExceptionTest) ... ('a',) {'b': 'c'}
ok
testTrapLocally (__main__.ExceptionTest) ... ('a',) {'b': 'c'}
ok

----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK


Test Fixtures:

Fixtures are resources needed by a test. For example, if you are writing several tests for the same class, those tests all need an instance of that class to use for testing. Other test fixtures include database connections and temporary files (many people would argue that using external resources makes such tests not "unit" tests, but they are still tests and still useful). TestCase includes a special hook to configure and clean up any fixtures needed by your tests. To configure the fixtures, override setUp(). To clean up, override tearDown().

class FixturesTest(unittest.TestCase):

def setUp(self):
print 'In setUp()'
self.fixture = range(1, 10)

def tearDown(self