This post originated from an RSS feed registered with Python Buzz
by Thomas Guest.
Original Post: Merging sorted streams in Python
Feed Title: Word Aligned: Category Python
Feed URL: http://feeds.feedburner.com/WordAlignedCategoryPython
Feed Description: Dynamic languages in general. Python in particular. The adventures of a space sensitive programmer.
In a post on his Code Monk blog David Jones offers up some Python tidbits he’s only recently discovered and invites readers to share similar discoveries. I’d like to respond here by talking about a recipe from the Python Cookbook.
Problem
You have several sorted sequences (iterables) and need to iterate on the overall sorted sequence that results from “merging” these sequences.
For example, we could merge multiples of 2, 3 and 5.
>>> from itertools import count, imap, islice
>>> m2, m3, m5 = [imap(n.__mul__, count(1)) for n in (2, 3, 5)]
>>> m235 = merge(m2, m3, m5)
>>> list(islice(m235, 10))
[2, 3, 4, 5, 6, 6, 8, 9, 10, 10]
Here, 6 appears twice in the merged stream since it’s a multiple of 2 and also of 3, and similarly 10 makes a double appearance.
This example merges three infinite streams. If we were sure all our inputs were finite, we might well simply chain them together and sort the whole thing.
An algorithm which deals with a (potential) mix of finite and infinite sequences is a little more interesting. We might consider an approach which repeatedly peeks at the head element of each sequence, finds the smallest of these, then pops and yields it. The recipe in the Cookbook improves on this idea.
Solution
The ingenious merge implementation shown below is credited to Sébastien Keim, Raymond Hettinger and Danny Yoo (the same Raymond Hettinger who appears on Code Monk to point out that zip can unzip).
The algorithm uses a priority queue as a staging area. This priority queue is initialised to hold a collection of pairs comprising the head value from each input stream and the tail of that stream. We then pop the first element from this queue, returning its value, and queue the next (head, tail) pair from the stream it came from. At all times the priority queue contains a (head, tail) pair from each input stream.
This all works sweetly when the inputs are all infinite. The complication occurs when a finite sequence reaches its end.
import heapq
def merge(*subsequences):
# prepare a priority queue whose items are pairs of the form
# (current-value, iterator), one each per (non-empty) subsequence
heap = [ ]
for subseq in subsequences:
iterator = iter(subseq)
for current_value in iterator:
# subseq is not empty, therefore add this subseq's pair
# (current-value, iterator) to the list
heap.append((current_value, iterator))
break
# make the priority queue into a heap
heapq.heapify(heap)
while heap:
# get and yield lowest current value (and corresponding iterator)
current_value, iterator = heap[0]
yield current_value
for current_value in iterator:
# subseq is not finished, therefore add this subseq's pair
# (current-value, iterator) back into the priority queue
heapq.heapreplace(heap, (current_value, iterator))
break
else:
# subseq has been exhausted, therefore remove it from the queue
heapq.heappop(heap)
I had to look twice at this code despite the copious comments. There’s nothing unusual about the two outer loops, a for loop and a while loop. The first sets up the priority queue we use to stage inputs; the second actually generates the output results while items remain in this queue.
It’s the inner for loops which are less standard: execution of the body of each will break as soon as a single item has been processed, except when the body is never executed because iterable is empty, which, in the first case means that iterable needn’t be queued, and in the second case means the for’s else clause executes, dequeuing the iterable. These are for loops which actually do something at most once.
You did remember that for loops, in Python, have an else clause, right?
I confess that if I ever knew that, I’d certainly forgotten it! The else clause executes if the loop is not broken out of, whether or not the iterable is empty[1]. There’s nothing very else-y about it, in this context! I’d be interested to learn of any other C-family languages with a similar construct?
I’m a fan of the Python Cookbook because it teaches Python idioms by example. Here’s how it explains this one.
Note the idiom that we use to advance an iterator by one step, dealing with the possibility that the iterator is exhausted:
for current_value in iterator:
# if we get here the iterator was not empty, current_value was
# its first value, and the iterator has been advanced one step
...use pair (current_value, iterator)...
# we break at once as we only wanted the first item of iterator
break
else:
# if we get here the break did not execute, so the iterator
# was empty (exhausted)
# deal with the case of iterator being exhausted...
I have to admit the code still looks odd to me but it’s just about the perfect construct for this particular use case, eliminating any mention of iterator.next() and StopIteration.
Batteries Included
Python 2.6 includes merge as standard in the heapq module. Here’s the implementation as at 2.6rc2. You’ll notice it doesn’t use the for-break-else idiom, hence the explicit exception catching. It also packs triples rather than pairs into the staging queue, presumably to guarantee a stable merge of the inputs.
def merge(*iterables):
'''Merge multiple sorted inputs into a single sorted output.
Similar to sorted(itertools.chain(*iterables)) but returns a generator,
does not pull the data into memory all at once, and assumes that each of
the input streams is already sorted (smallest to largest).
>>> list(merge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
[0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]
'''
_heappop, _heapreplace, _StopIteration = heappop, heapreplace, StopIteration
h = []
h_append = h.append
for itnum, it in enumerate(map(iter, iterables)):
try:
next = it.next
h_append([next(), itnum, next])
except _StopIteration:
pass
heapify(h)
while 1:
try:
while 1:
v, itnum, next = s = h[0] # raises IndexError when h is empty
yield v
s[0] = next() # raises StopIteration when exhausted
_heapreplace(h, s) # restore heap condition
except _StopIteration:
_heappop(h) # remove empty iterator
except IndexError:
return
What else?
Personally I’ve only really used else alongside if[2]. As shown here, it also pairs up with for and while. It also also appears (optionally) towards the end of a try statement. From the documentation:
The try … except statement has an optional else clause, which, when present, must follow all except clauses. It is useful for code that must be executed if the try clause does not raise an exception. For example:
for arg in sys.argv[1:]:
try:
f = open(arg, 'r')
except IOError:
print 'cannot open', arg
else:
print arg, 'has', len(f.readlines()), 'lines'
f.close()
The use of the else clause is better than adding additional code to the try clause because it avoids accidentally catching an exception that wasn’t raised by the code being protected by the try … except statement.
[1] Coincidentally, the for ... else construct gets a mention today in Christopher Leary’s VaporWarning blog.
The for-else statement looks a little strange when you first encounter it, but I’ve come to love it.