The Artima Developer Community
Sponsored Link

Python Buzz Forum
Merging sorted streams in Python

0 replies on 1 page.

Welcome Guest
  Sign In

Go back to the topic listing  Back to Topic List Click to reply to this topic  Reply to this Topic Click to search messages in this forum  Search Forum Click for a threaded view of the topic  Threaded View   
Previous Topic   Next Topic
Flat View: This topic has 0 replies on 1 page
Thomas Guest

Posts: 236
Nickname: tag
Registered: Nov, 2004

Thomas Guest likes dynamic languages.
Merging sorted streams in Python Posted: Oct 1, 2008 10:57 PM
Reply to this message Reply

This post originated from an RSS feed registered with Python Buzz by Thomas Guest.
Original Post: Merging sorted streams in Python
Feed Title: Word Aligned: Category Python
Feed URL: http://feeds.feedburner.com/WordAlignedCategoryPython
Feed Description: Dynamic languages in general. Python in particular. The adventures of a space sensitive programmer.
Latest Python Buzz Posts
Latest Python Buzz Posts by Thomas Guest
Latest Posts From Word Aligned: Category Python

Advertisement

Problem

Python Cookbook cover

In a post on his Code Monk blog David Jones offers up some Python tidbits he’s only recently discovered and invites readers to share similar discoveries. I’d like to respond here by talking about a recipe from the Python Cookbook.

Problem

You have several sorted sequences (iterables) and need to iterate on the overall sorted sequence that results from “merging” these sequences.

For example, we could merge multiples of 2, 3 and 5.

>>> from itertools import count, imap, islice
>>> m2, m3, m5 = [imap(n.__mul__, count(1)) for n in (2, 3, 5)]
>>> m235 = merge(m2, m3, m5)
>>> list(islice(m235, 10))
[2, 3, 4, 5, 6, 6, 8, 9, 10, 10]

Here, 6 appears twice in the merged stream since it’s a multiple of 2 and also of 3, and similarly 10 makes a double appearance.

This example merges three infinite streams. If we were sure all our inputs were finite, we might well simply chain them together and sort the whole thing.

>>> from itertools import chain
>>> def merge(*seqs):
>>>     return sorted(chain(*seqs))

An algorithm which deals with a (potential) mix of finite and infinite sequences is a little more interesting. We might consider an approach which repeatedly peeks at the head element of each sequence, finds the smallest of these, then pops and yields it. The recipe in the Cookbook improves on this idea.

Solution

The ingenious merge implementation shown below is credited to Sébastien Keim, Raymond Hettinger and Danny Yoo (the same Raymond Hettinger who appears on Code Monk to point out that zip can unzip).

The algorithm uses a priority queue as a staging area. This priority queue is initialised to hold a collection of pairs comprising the head value from each input stream and the tail of that stream. We then pop the first element from this queue, returning its value, and queue the next (head, tail) pair from the stream it came from. At all times the priority queue contains a (head, tail) pair from each input stream.

This all works sweetly when the inputs are all infinite. The complication occurs when a finite sequence reaches its end.

import heapq

def merge(*subsequences):
    # prepare a priority queue whose items are pairs of the form
    # (current-value, iterator), one each per (non-empty) subsequence
    heap = [  ]
    for subseq in subsequences:
        iterator = iter(subseq)
        for current_value in iterator:
            # subseq is not empty, therefore add this subseq's pair
            # (current-value, iterator) to the list
            heap.append((current_value, iterator))
            break
    # make the priority queue into a heap
    heapq.heapify(heap)
    while heap:
        # get and yield lowest current value (and corresponding iterator)
        current_value, iterator = heap[0]
        yield current_value
        for current_value in iterator:
            # subseq is not finished, therefore add this subseq's pair
            # (current-value, iterator) back into the priority queue
            heapq.heapreplace(heap, (current_value, iterator))
            break
        else:
            # subseq has been exhausted, therefore remove it from the queue
            heapq.heappop(heap)

I had to look twice at this code despite the copious comments. There’s nothing unusual about the two outer loops, a for loop and a while loop. The first sets up the priority queue we use to stage inputs; the second actually generates the output results while items remain in this queue.

It’s the inner for loops which are less standard: execution of the body of each will break as soon as a single item has been processed, except when the body is never executed because iterable is empty, which, in the first case means that iterable needn’t be queued, and in the second case means the for’s else clause executes, dequeuing the iterable. These are for loops which actually do something at most once.

You did remember that for loops, in Python, have an else clause, right?

I confess that if I ever knew that, I’d certainly forgotten it! The else clause executes if the loop is not broken out of, whether or not the iterable is empty[1]. There’s nothing very else-y about it, in this context! I’d be interested to learn of any other C-family languages with a similar construct?

I’m a fan of the Python Cookbook because it teaches Python idioms by example. Here’s how it explains this one.

Note the idiom that we use to advance an iterator by one step, dealing with the possibility that the iterator is exhausted:

for current_value in iterator:
    # if we get here the iterator was not empty, current_value was
    # its first value, and the iterator has been advanced one step
    ...use pair (current_value, iterator)...
    # we break at once as we only wanted the first item of iterator
    break
else:
    # if we get here the break did not execute, so the iterator
    # was empty (exhausted)
    # deal with the case of iterator being exhausted...

I have to admit the code still looks odd to me but it’s just about the perfect construct for this particular use case, eliminating any mention of iterator.next() and StopIteration.

Batteries Included

Batteries included

Python 2.6 includes merge as standard in the heapq module. Here’s the implementation as at 2.6rc2. You’ll notice it doesn’t use the for-break-else idiom, hence the explicit exception catching. It also packs triples rather than pairs into the staging queue, presumably to guarantee a stable merge of the inputs.

def merge(*iterables):
    '''Merge multiple sorted inputs into a single sorted output.
    
    Similar to sorted(itertools.chain(*iterables)) but returns a generator,
    does not pull the data into memory all at once, and assumes that each of
    the input streams is already sorted (smallest to largest).
    
    >>> list(merge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
    [0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]
    
    '''
    _heappop, _heapreplace, _StopIteration = heappop, heapreplace, StopIteration
    
    h = []
    h_append = h.append
    for itnum, it in enumerate(map(iter, iterables)):
        try:
            next = it.next
            h_append([next(), itnum, next])
        except _StopIteration:
            pass
    heapify(h)
    
    while 1:
        try:
            while 1:
                v, itnum, next = s = h[0]   # raises IndexError when h is empty
                yield v
                s[0] = next()               # raises StopIteration when exhausted
                _heapreplace(h, s)          # restore heap condition
        except _StopIteration:
            _heappop(h)                     # remove empty iterator
        except IndexError:
            return

What else?

Personally I’ve only really used else alongside if[2]. As shown here, it also pairs up with for and while. It also also appears (optionally) towards the end of a try statement. From the documentation:

The try … except statement has an optional else clause, which, when present, must follow all except clauses. It is useful for code that must be executed if the try clause does not raise an exception. For example:

    for arg in sys.argv[1:]:
        try:
            f = open(arg, 'r')
        except IOError:
            print 'cannot open', arg
        else:
            print arg, 'has', len(f.readlines()), 'lines'
            f.close()

The use of the else clause is better than adding additional code to the try clause because it avoids accidentally catching an exception that wasn’t raised by the code being protected by the try … except statement.


[1] Coincidentally, the for ... else construct gets a mention today in Christopher Leary’s VaporWarning blog.

The for-else statement looks a little strange when you first encounter it, but I’ve come to love it.

[2] If and else appear together in traditional if statements and the newer conditional expressions. The latter render one of the Cookbook recipes obsolete.

Read: Merging sorted streams in Python

Topic: A Mistake to Use Open Source Version Control Tools? Previous Topic   Next Topic Topic: Ignore the Heart Association and Lose Weight

Sponsored Links



Google
  Web Artima.com   

Copyright © 1996-2019 Artima, Inc. All Rights Reserved. - Privacy Policy - Terms of Use