The Artima Developer Community
The C++ Source | C++ Community News | Discuss | Print | Email | First Page | Previous | Next
Sponsored Link

C++ Source
Gathering Scattered I/O
by Matthew Wilson
September 18, 2007

Advertisement

Summary
Have your cake and eat it, too, with STL extensions. In this chapter extract from his latest book, Matthew Wilson shows you how to take full advantage of the STL Iterator abstraction, without sacrificing block-transfer efficiency of Scatter/Gather I/O memory.

Introduction

Don't ever let anyone tell you that STL is nice and abstract and all that, but it just doesn't perform well. I poke holes in that myth in several places in this book (Extended STL, Volume 1: Collections and Iterators, Sections 17.1.1, 19.1.1, 27.9, 36.2.2, and 38.2.1), but this chapter blows it clean out of the water.

The subject matter of this chapter is Scatter/Gather I/O (also known as Scatter I/O), which means the exchange of data between application code and (usually kernel) I/O routines in the form of multiple blocks per action. Its primary intent is to allow client code to manipulate application-level packet headers separately from the payloads, but it can also be turned to other cunning ends, and can yield considerable performance benefits.

The cost is that it complicates the manipulation of data when logically contiguous data is spread over physically discontiguous blocks. To aid in handling such cases, we may use classes that (re)linearize the data back to an acceptably manipulable abstraction.

Since this is a book about extending STL, the abstraction we will seek is that of an STL collection (Section 2.2) and its associated iterator ranges. But as we will see, there is a cost in such abstractions, so we will go further and examine how we might optimize the transfer of information without diluting the power of the abstraction. This will lead us into looking into the rules regarding overriding functions in the std namespace and how we may accommodate one with the other.

Scatter/Gather I/O

Scatter/Gather I/O involves the exchange of information between an I/O API and client code in a physically discontiguous form. In all cases I've come across, this discontiguous form involves a number of separate memory blocks. For example, the UNIX readv() and writev() functions act like their read() and write() siblings, but, rather than a pointer to a single area of memory and its size, they are passed an array of iovec structures:

struct iovec
{
  void*   iov_base;
  size_t  iov_len;
};
ssize_t readv(int fd, const struct iovec* vector, int count);
ssize_t writev(int fd, const struct iovec* vector, int count);

The Windows Sockets API has an analogous structure and corresponding functions:

struct WSABUF
{
  u_long  len;
  char*   buf;
};
int WSARecv(SOCKET  s
          , WSABUF* lpBuffers
          , DWORD   dwBufferCount
          , . . . // And 4 more parameters);
int WSASend(SOCKET  s
          , WSABUF* lpBuffers
          , DWORD   dwBufferCount
          , . . . // And 4 more parameters);
int WSARecvFrom(SOCKET  s
              , WSABUF* lpBuffers
              , DWORD   dwBufferCount
              , . . . // And 6 more parameters);
int WSASendTo(  SOCKET  s
              , WSABUF* lpBuffers
              , DWORD   dwBufferCount
              , . . . // And 6 more parameters);

You might wonder why people would want to perform I/O in such a fashion, given the obvious complication to client code. Well, if your file or network data has a fixed format, you can read one or more records/packets in or out without any need to move, reformat, or coalesce them. This can be quite a convenience. Similarly, if your records/packets have variable format but a fixed-size header, you can read/write the header directly to/from a matching structure and treat the rest as an opaque variable-size blob. And there's a third reason: performance. I once created a network server architecture using Scatter/Gather I/O that used a multithreaded nonlocking memory allocation scheme. (Suffice to say, it was rather nippy.)

But however much Scatter/Gather I/O may help in terms of performance, when dealing with variable-length records/packets, or those whose payloads contain elements that are variable-length, the client code is complicated, usually low on transparency, and bug-prone. An efficient abstraction is needed.

Scatter/Gather I/O APIs

Linearizing with COM Streams

The challenge with Scatter/Gather I/O is that using memory scattered over multiple blocks is not a trivial matter. On projects (on Windows platforms) in the 1990s, I tended to use a custom COM stream implementation from my company's proprietary libraries, which was implemented for a different task some years previously. Permit me to talk about the COM stream architecture for a moment. (I know I promised in the last chapter there would be no more COM, but there is a point to this, even for UNIX diehards. Trust me, I'm a doctor!)

A COM stream is an abstraction over an underlying storage medium having much in common with the file abstractions we're used to. Essentially, it has access to the underlying medium and defines a current point within its logical extent. A stream object exhibits the IStream interface (shown in abbreviated form in Listing 31.1), which contains a number of methods, including Seek(), SetSize(), Stat(), and Clone().

There are also methods for acquiring exclusive access to regions of the underlying medium. The IStream interface derives from ISequentialStream (also shown in Listing 31.1), which defines the two methods Read() and Write().You can implement a stream for a particular underlying medium directly by deriving from IStream and providing suitable definitions for its methods.

Listing 31.1 Definition of the ISequentialStream and IStream Interfaces

interface ISequentialStream
  : public IUnknown
{
  virtual HRESULT Read(void* p, ULONG n, ULONG* numRead) = 0;
  virtual HRESULT Write(void const* p, ULONG n, ULONG* numWritten) = 0;
};
interface IStream
  : public ISequentialStream
{
  virtual HRESULT Seek(. . .) = 0;
  virtual HRESULT SetSize(. . .) = 0;
  virtual HRESULT CopyTo(. . .) = 0;
  virtual HRESULT Commit(. . .) = 0;
  virtual HRESULT Revert(. . .) = 0;
  virtual HRESULT LockRegion(. . .) = 0;
  virtual HRESULT UnlockRegion(. . .) = 0;
  virtual HRESULT Stat(. . .) = 0;
  virtual HRESULT Clone(. . .) = 0;
};

COM defines another stream-related abstraction, in the form of the ILockBytes interface (shown in abbreviated form in Listing 31.2). It abstracts arbitrary underlying mediums as a logically contiguous array of bytes. It does not maintain any positional state. Hence, it has ReadAt() and WriteAt() methods rather than Read() and Write().

Listing 31.2 Definition of the ILockBytes Interface

interface ILockBytes
  : public IUnknown
{
  virtual HRESULT ReadAt( ULARGE_INTEGER pos, void* p
                        , ULONG n, ULONG* numRead) = 0;
  virtual HRESULT WriteAt(ULARGE_INTEGER pos, void const* p
                        , ULONG n, ULONG* numWritten) = 0;
  virtual HRESULT Flush() = 0;
  virtual HRESULT SetSize(. . .) = 0;
  virtual HRESULT LockRegion(. . .) = 0;
  virtual HRESULT UnlockRegion(. . .) = 0;
  virtual HRESULT Stat(. . .) = 0;
};

It is a relatively simple matter to implement a COM stream in terms of (an object that exhibits) the ILockBytes interface. All that's required is an ILockBytes* and a position. My company has just such an entity, accessible via the CreateStreamOnLockBytes() function:

HRESULT CreateStreamOnLockBytes(ILockBytes* plb, unsigned flags
                              , IStream** ppstm);

Obviously, the next question is, "How do we get hold of an ILockBytes object?" Again, there's a function for that, CreateLockBytesOnMemory():

HRESULT CreateLockBytesOnMemory(void*         pv
                              , size_t        si
                              , unsigned      flags
                              , void*         arena
                              , ILockBytes**  pplb);

This supports a whole host of memory scenarios, including using a fixed buffer, using Windows "global" memory, using a COM allocator (IAllocator), and so on. One of the many flags is SYCLBOMF_FIXED_ARRAY, which indicates that pv points to an array of MemLockBytesBlock structures:

struct MemLockBytesBlock
{
  size_t  cb;
  void*   pv;
};

I'm not going to bang on about this much more, as hindsight is a harsh judge of things such as opaque pointers whose meanings are moderated by flags. The point I want to get across about this stuff is that I was able to take a set of memory blocks containing the scattered packet contents and get back an IStream pointer from which the packet information can be extracted in a logical and linear manner. Such code takes the following simple and reasonably transparent form. (Error handling is elided for brevity.) The ref_ptr instances are used to ensure that the reference counts are managed irrespective of any early returns and/or exceptions.

std::vector<WSABUF>   blocks      = . . .
size_t                payloadSize = . . .
ILockBytes*           plb;
IStream*              pstm;
   
SynesisCom::CreateLockBytesOnMemory(&blocks[1], payloadSize
                            , SYCLBOMF_FIXED_ARRAY | . . ., NULL, &plb);
stlsoft::ref_ptr<ILockBytes>  lb(pbl, false); // false "eats" the ref

SynesisCom::CreateStreamOnLockBytes(plb, 0, &pstm);
stlsoft::ref_ptr<IStream>     stm(pstm, false); // false "eats" the ref

. . . // Pass off stm to higher-layer processing

The stream can then be wrapped by a byte-order-aware Instance Adaptor class that works in partnership with a message object Factory, to complete the mechanism for efficient translation from TCP packet stream segments to instances of higher-level protocol (C++) objects. The high efficiencies obtainable by such a scheme result from there being no allocations of, and no copying into, memory that does not constitute part of the final translated message object instances.

This is a powerful basis for a communications server model, one that I've used several times, albeit in different guises. In the case described earlier, a number of characteristics of the approach might incline you to search, as I have done, for better, less technology-specific solutions.

First, the major downside of the described mechanism is that, being COM, the server code is effectively Windows-specific. Second, many developers (incorrectly) consider COM, as they (equally incorrectly) do C++ and STL, to be intrinsically inefficient, and it can be hard to disabuse them of that notion even with hard facts. Finally, add in the type-unsafe opaque pointers and the fact that the stream and lock-bytes classes were hidden proprietary implementations, and it all leaves something to be desired.

platformstl::scatter_slice_sequence—A Teaser Trailer

An alternate representation is to be found in a new, and still evolving, component in the PlatformSTL subproject: scatter_slice_sequence. This Facade class template maintains an array of slice structures describing a set of I/O buffers and provides methods for invoking native read/write functions on the set of buffers, in addition to providing STL collection access (in the form of begin() and end() methods). The class works with both iovec and WSABUF by abstracting their features with attribute shims (Section 9.2.1) get_scatter_slice_size, get_scatter_slice_ptr, and get_scatter_slice_size_member_ptr, shown in Listing 31.3.

Listing 31.3 Attribute Shims for the iovec and WSABUF Structures

#if defined(PLATFORMSTL_OS_IS_UNIX)
inline void* const get_scatter_slice_ptr(struct iovec const& ss)
{
  return ss.iov_base;
}
inline void*& get_scatter_slice_ptr(struct iovec& ss);
   
inline size_t get_scatter_slice_size(struct iovec const& ss)
{
  return static_cast<size_t>(ss.iov_len);
}
inline size_t& get_scatter_slice_size(struct iovec& ss);
   
inline size_t iovec::*
 get_scatter_slice_size_member_ptr(struct iovec const*)
{
  return &iovec::iov_len;
}
#elif defined(PLATFORMSTL_OS_IS_WIN32)
inline void const* get_scatter_slice_ptr(WSABUF const& ss)
{
  return ss.buf;
}
inline void*& get_scatter_slice_ptr(WSABUF& ss);
   
inline size_t get_scatter_slice_size(WSABUF const& ss)
{
  return static_cast<size_t>(ss.len);
}
inline size_t& get_scatter_slice_size(WSABUF& ss);
   
inline u_long WSABUF::* get_scatter_slice_size_member_ptr(WSABUF const*)
{
  return &WSABUF::len;
}
#endif /* operating system */

scatter_slice_sequence currently provides for readv()/ writev() on UNIX and WSARecv()/WSASend() and WSARecvFrom()/WSASendTo() on Windows. Listing 31.4 shows an example that uses an iovec specialization of the class template to read the contents from one file descriptor into a number of buffers, processes the content in an STL kind of way, and then writes the converted contents to another file descriptor.

Listing 31.4 Example Use of scatter_slice_sequence with readv() and writev()

int fs  = . . . // Opened for read
int fd  = . . . // Opened for write
for(;;)
{
  const size_t  BUFF_SIZE = 100;
  const size_t  MAX_BUFFS = 10;
  char          buffers[MAX_BUFFS][BUFF_SIZE];
  const size_t  numBuffers  = rand() % MAX_BUFFS;
   
  // Declare an instance with arity of numBuffers
  platformstl::scatter_slice_sequence<iovec>  sss(numBuffers);
   
  // Set up each slice in the sequence, which may be of
  // different sizes in reality
  { for(size_t i = 0; i < numBuffers; ++i)
  {
    sss.set_slice(i, &buffers[i][0], sizeof(buffers[i]));
  }}
  if(0 != numBuffers) // In real scenario, might get 0 buffers
  {
    size_t  n = sss.read(::readv, fs); // Read from fs using ::readv()
    if(0 == n)
    {
      break;
    }
    // "Process" the contents
    std::transform( sss.payload().begin(), sss.payload().begin() + n
                , sss.payload().begin(), ::toupper);
    sss.write(::writev, fd, n); // Write n to fd using ::writev()
  }
}

Obviously this example is very stripped down, but I trust your abilities to imagine that fs and fd might represent sockets, that the buffers shown here would be obtained from a shared memory arena (which may not have any to spare at a given time), and that the "processing" would be something less trivial than setting the contents to uppercase before (re)transmission.

The sequence's payload (available via payload()) provides random access iterators over the contents of its memory blocks. Just as with std::deque, it's important to realize that these iterators are not contiguous (Section 2.3.6)! Pointer arithmetic on the iterators is a constant-time operation, but iterating the range is not a linear-time operation. The scatter_slice_sequence is still a work in progress, and its interface might evolve further before it's released into the PlatformSTL subproject proper. (It is on the CD.) But what it clearly provides is the ability to represent a given set of data blocks as an STL sequence (Section 2.2), along with adaptor methods read() and write() that take a file/socket handle and a Scatter/Gather I/O function and apply them to the blocks. This is the logical equivalent of the COM stream object created via CreateLockBytesOnMemory() + SYCLBOMF_FIXED_ARRAY and CreateStreamOnLockBytes(). The one apparent disadvantage is that its contents have to be traversed one element at a time, something that may have performance costs. (Hint: This is a clue about something interesting to follow. . . .)

Adapting ACE_Message_Queue

The main subject of this chapter covers my efforts to adapt the memory queues of the Adaptive Communications Environment (ACE) to the STL collection concept, to serve the requirements of one of my recent commercial networking projects, a middleware routing service. To use the ACE Reactor framework, you derive event handler classes from ACE_Event_Handler (overriding the requisite I/O event handler methods) and register instances of them with the program's reactor singleton. When the reactor encounters an I/O event of a type for which an instance is registered, it invokes the appropriate callback method on the handler. When used with TCP, the Internet's stream-oriented transport protocol, the common idiom is to handle received data into instances of ACE_Message_Block and queue them in an instance of (a specialization of) the class template ACE_Message_Queue, as shown (with error handling omitted for brevity) in Listing 31.5.

Listing 31.5 A Simple Event Handler for the ACE Reactor Framework

class SimpleTCPReceiver
  : public ACE_Event_Handler
{
  . . .
  virtual int handle_input(ACE_HANDLE h)
  {
    const size_t        BLOCK_SIZE = 1024;
    ACE_Message_Block*  mb  = new ACE_Message_Block(BLOCK_SIZE);
    ssize_t             n   = m_peer.recv(mb->base(), mb->size());
    mb->wr_ptr(n);
    m_mq.enqueue_tail(mb);
    return 0;
  }
  . . .
private: // Member Variables
  ACE_SOCK_Stream                   m_peer; // Connection socket
  ACE_Message_Queue<ACE_SYNCH_USE>  m_mq;   // Message queue
};

The ACE_Message_Queue class acts as an ordered repository for all blocks, thereby faithfully representing the data stream. But ACE_Message_Queue is strictly a container of blocks; it does not attempt to provide any kind of abstracted access to the contents of the blocks. To access the contents of a message queue, you can use the associated class template, ACE_Message_Queue_Iterator, to iterate the blocks, as shown in Listing 31.6. The ACE_Message_Queue_Iterator::next() method returns a nonzero result and sets the given pointer reference to the block if a next block is available; otherwise, it returns 0. The advance() method moves the current enumeration point to the next block (if any).

Listing 31.6 Example Code That Uses ACE_Message_Queue_Iterator

void SimpleTCPReceiver::ProcessQueue()
{
  ACE_Message_Queue_Iterator<ACE_NULL_SYNCH>  mqi(m_mq);
  ACE_Message_Block*                          mb;
  for(; mqi.next(mb); mqi.advance())
  {
    { for(size_t i = 0; i < mb->length(); ++i)
    {
       printf("%c", i[mb->rd_ptr()];
    }}
    mb->rd_ptr(mb->length()); // Advance read ptr to "exhaust" block
  }
}

Obviously, if you want to process a set of blocks as a logically contiguous single block, it's going to be a bit messy. We need a sequence to flatten the stream for STL manipulation.

acestl::message_queue_sequence, Version 1

The ACESTL subproject contains a number of components for adapting ACE to STL (and for making ACE components easier to use). acestl::message_queue_sequence is a class template that acts as an Instance Adaptor for the ACE_Message_Queue. Since this component's got quite a kick, I'm going to play my usual author's dirty trick of presenting you with a progression of implementations. Thankfully, unlike some material covered in other chapters, the changes between the versions are entirely additive, which should help keep me under 40 pages for this topic. Listing 31.7 shows the definition of the first version.

Listing 31.7 Definition of message_queue_sequence

// In namespace acestl
template <ACE_SYNCH_DECL>
class message_queue_sequence
{
public: // Member Types
  typedef char                                  value_type;
  typedef ACE_Message_Queue<ACE_SYNCH_USE>      sequence_type;
  typedef message_queue_sequence<ACE_SYNCH_USE> class_type;
  typedef size_t                                size_type;
  class                                         iterator;
public: // Construction
  explicit message_queue_sequence(sequence_type> mq);
public: // Iteration
  iterator begin();
  iterator end();
public: // Attributes
  size_type  size() const;
  bool       empty() const;
private: // Member Variables
  sequence_type&  m_mq;
private: // Not to be implemented
  message_queue_sequence(class_type const&);
  class_type& operator =(class_type const&);
};

Given what we've seen with previous sequences, there's little here that needs to be remarked on; the interesting stuff will be in the iterator class. Note that the value type is char, meaning that size() returns the number of bytes in the queue, and [begin(), end()) defines the range of bytes. No methods pertain to message blocks.

31.4.2 acestl::message_queue_sequence::iterator

Listing 31.8 shows the definition of the acestl::message_queue_sequence::iterator class. Again, a lot here should be familiar based on prior experience. (I hope by now you're building a familiarity with these techniques, recognizing their similarities and identifying the differences between the different cases of their application. Naturally, it's my hope that this stands you in great stead for writing your own STL extensions.)

The iterator category is input iterator (Section 1.3.1). The element reference category (Section 3.3) is transient or higher; in fact, it's fixed, with the caveat that no other code, within or without the defining thread, changes the contents of the underlying message queue or its blocks (in which case it would be invalidatable). The iterator is implemented in terms of a shared_handle, discussed shortly. I've not shown the canonical manipulation of the shared_handle in the construction methods since we've seen it before in other sequences (Sections 19.3 and 20.5).

Listing 31.8 Definition of message_queue_sequence::iterator

class message_queue_sequence<. . .>::iterator
  : public std::iterator>std::input_iterator_tag
                        , char, ptrdiff_t
                        , char*, char&
                        >
{
private: // Member Types
  friend class message_queue_sequence<ACE_SYNCH_USE>;
  typedef ACE_Message_Queue_Iterator<ACE_SYNCH_USE>   mq_iterator_type;
  struct                                              shared_handle;
public:
  typedef iterator                                    class_type;
  typedef char                                        value_type;
private: // Construction
  iterator(sequence_type& mq)
    : m_handle(new shared_handle(mq))
  {}
public:
  iterator()
    : m_handle(NULL)
  {}
  iterator(class_type const& rhs); // Share handle via AddRef() (+)
  ~iterator() throw(); // Call Release() (-) if non-NULL
  class_type& operator =(class_type const& rhs); // (+) new; (-) old
public: // Input Iteration
  class_type& operator ++()
  {
    ACESTL_ASSERT(NULL != m_handle);
    if(!m_handle->advance())
    {
      m_handle->Release();
      m_handle = NULL;
    }
    return *this;
  }
  class_type operator ++(int); // Canonical implementation
  value_type& operator *()
  {
    ACESTL_ASSERT(NULL != m_handle);
    return m_handle->current();
  }
  value_type operator *() const
  {
    ACESTL_ASSERT(NULL != m_handle);
    return m_handle->current();
  }
  bool equal(class_type const& rhs) const
  {
    return lhs.is_end_point() == rhs.is_end_point();
  }
private: // Implementation
  bool is_end_point() const
  {
    return NULL == m_handle || m_handle->is_end_point();
  }
private: // Member Variables
  shared_handle*  m_handle;
};

The iteration methods are implemented in terms of the methods of shared_handle. The endpoint state is identified by either a NULL handle or a handle that identifies itself as being at the endpoint. The preincrement operator advances by calling shared_handle::advance() and releases the handle when advance() returns false. The dereference operator overloads are implemented in terms of the current() overloads of shared_handle. Note that the mutating (non-const) overload returns a mutating reference, whereas the nonmutating (const) overload returns a char by value.

The main action lies in the shared_handle. Listing 31.9 shows its implementation. I'm going to invoke another low author tactic now and not explain the fine detail of the algorithm. I'll leave it as an exercise for you to figure out. To be fair, though, I will note that it skips empty ACE_Message_Block instances, which is how its endpoint condition can be so simple.

Listing 31.9 Definition of shared_handle

struct message_queue_sequence<. . .>::iterator::shared_handle
{
public: // Member Types
  typedef shared_handle   class_type;
public: // Member Variables
  mq_iterator_type    m_mqi;
  ACE_Message_Block*  m_entry;
  size_t              m_entryLength;
  size_t              m_entryIndex;
private:
  sint32_t            m_refCount;
public: // Construction
  explicit shared_handle(sequence_type& mq)
    : m_mqi(mq)
    , m_entry(NULL)
    , m_entryLength(0)
    , m_entryIndex(0)
    , m_refCount(1)
  {
    if(m_mqi.next(m_entry))
    {
      for(;;)
      {
        if(0 != (m_entryLength = m_entry->length()))
        {
          break;
        }
        else if(NULL == (m_entry = nextEntry()))
        {
          break;
        }
      }
    }
  }
private:
  ~shared_handle() throw()
  {
    ACESTL_MESSAGE_ASSERT("Shared handle destroyed with outstanding references!", 0 == m_refCount);
  }
public:
  sint32_t AddRef();  // Canonical implementation
  sint32_t Release(); // Canonical implementation
public: // Iteration Methods
  bool is_end_point() const
  {
    return m_entryIndex == m_entryLength;
  }
  char&   current()
  {
    ACESTL_ASSERT(NULL != m_entry);
    ACESTL_ASSERT(m_entryIndex != m_entryLength);
    return m_entryIndex[m_entry->rd_ptr()];
  }
  char    current() const
  {
    ACESTL_ASSERT(NULL != m_entry);
    ACESTL_ASSERT(m_entryIndex != m_entryLength);
    return m_entryIndex[m_entry->rd_ptr()];
  }
  bool   advance()
  {
    ACESTL_MESSAGE_ASSERT("Invalid index", m_entryIndex < m_entryLength);
    if(++m_entryIndex == m_entryLength)
    {
      m_entryIndex = 0;
      for(;;)
      {
        if(NULL == (m_entry = nextEntry()))
        {
          return false;
        }
        else if(0 != (m_entryLength = m_entry->length()))
        {
          break;
        }
      }
    }
    return true;
  }
private: // Implementation
  ACE_Message_Block* nextEntry()
  {
    ACE_Message_Block* entry = NULL;
    return m_mqi.advance() ? (m_mqi.next(entry), entry) : NULL;
  }
private: // Not to be implemented
  shared_handle(class_type const&);
  class_type& operator =(class_type const&);
};

31.5 Time for Some Cake

I hope you'll agree that being able to treat a set of ACE_Message_Block communications stream fragments as a logically contiguous stream eases considerably the task of dealing with such streams. In our middleware project, this enabled us to unmarshal the high-level protocol messages as objects, by the combination of another Instance Adaptor and a message Factory.

Permit me a small digression about the protocol. Standards Australia defines a protocol for the exchange of electronic payment implementation, called AS2805. This is a very flexible protocol, with a significant drawback. The messages do not contain any message-size information in their fixed format header, and each message can contain a variable number of fields, some of which are of variable size. This means that you can't know whether all of a message has been received from the peer until the message has been fully parsed. Consequently, being able to easily and efficiently deconstruct a message is critical.

This was achieved by applying another Instance Adaptor to the acestl::message_queue_sequence instance to make it be treated as a streamable object, similar to how the logically contiguous ILockBytes instance was turned into a streamable object with CreateStreamFromLockBytes(). The streamable object is used by the message Factory, which understands how to read the message type from the packet header and then uses that type to dispatch the appropriate unmarshaling function to read the remainder of the message contents and create a corresponding message instance. If insufficient data is available, the Factory invocation fails benignly, and the queue contents remain unchanged until the next I/O event. Only when a full message is retrieved is the requisite portion of the head of the queue removed and released back to the memory cache. If the message parsing fails for bad contents, the peer has sent bad data, and the connection is torn down.

But Captain, I Canna' Mek Her Goo Any Fastah!

So, we've got some nice abstractions going—some of which have seen service in large-scale deployments, which is never to be sniffed at—but you may have been receiving portents from your skeptical subconsciousness. We're manipulating a number of blocks of contiguous memory, but the nature of the acestl::message_queue_sequence::iterator means that each byte in those blocks must be processed one at a time. This has to have an efficiency cost. And it does.

Before we proceed, I want to trot out the received wisdom on performance, namely, to avoid premature optimization. More precisely, although it's something of a simplification, you've usually got only one bottleneck in a system at a time. Inefficiencies usually make themselves felt only when a greater inefficiency has been resolved. In none of the applications in which I've used message_queue_sequence has it been associated with the bottleneck. However, I tend to be a little efficiency obsessed—What's that? You've noticed?—and since STLSoft is an open-source publicly available library, the message_queue_sequence component might find itself being a bottleneck in someone else's project, and that would never do. So, I want to show you how to have your cake and eat it too, that is, how to linearize data block contents in an STL sequence and yet have block-like efficiency.

acestl::message_queue_sequence, Version 2

First, we need to identify when a block transfer is valid. The ACE libraries define opaque memory in terms of char, that is, the pointers are either char const* or char*, presumably to make pointer arithmetic straightforward. I don't hold with this strategy, but that's irrelevant; it is what it is. When transferring contents between STL iterators of type char* or char const* and acestl::message_queue_sequence::iterator, we want the sequence's contents to be block transferred. In other words, the following code should result in 2 calls to memcpy(), rather than 120 calls to shared_handle::advance():


ACE_Message_Queue<ACE_NULL_SYNCH>   mq; // 2 message blocks, 120 bytes
char                                results[120];
acestl::message_queue_sequence<ACE_NULL_SYNCH>  mqs(mq);
   
std::copy(mqs.begin(), mqs.end(), &results[120]);

We want the same efficiency when transferring from contiguous memory into the message queue sequence, as in the following:


std::copy(&results[0], &results[0] + STLSOFT_NUM_ELEMENTS(results)
        , mqs.begin());

The first thing we need for this is to define block copy operations for message_queue_sequence. Listing 31.10 shows the definition of two new static methods for the sequence class, overloads named fast_copy().

Listing 31.10 Definition of the message_queue_sequence Algorithm Worker Methods

template <ACE_SYNCH_DECL>
class message_queue_sequence
{
  . . .
  static char* fast_copy(iterator from, iterator to, char* o)
  {
#if defined(ACESTL_MQS_NO_FAST_COPY_TO)
    for(; from != to; ++from, ++o)
    {
      *o = *from;
    }
#else /* ? ACESTL_MQS_NO_FAST_COPY_TO */
    from.fast_copy(to, o);
#endif /* ACESTL_MQS_NO_FAST_COPY_TO */
    return o;
  }
  static iterator fast_copy(char const* from, char const* to
                          , iterator o)
  {
#if defined(ACESTL_MQS_NO_FAST_COPY_FROM)
    for(;from != to; ++from, ++o)
    {
      *o = *from;
    }
#else /* ? ACESTL_MQS_NO_FAST_COPY_FROM */
    o.fast_copy(from, to);
#endif /* ACESTL_MQS_NO_FAST_COPY_FROM */
    return o;
  }
  . . .

I've deliberately left in the #defines that suppress the block operations, just to illustrate in code what the alternative, default, behavior is. These #defines also facilitate tests with and without block copying enabled. (Anyone sniff a performance test in the near future?) The block mode code uses new iterator::fast_copy() methods, shown in Listing 31.11.

Listing 31.11 Definition of the iterator Algorithm Worker Methods

class message_queue_sequence<. . .>::iterator
{
  . . .
  void fast_copy(char const* from, char const* to)
  {
    if(from != to)
    {
      ACESTL_ASSERT(NULL != m_handle);
      m_handle->fast_copy(from, to, static_cast<size_type>(to - from));
    }
  }
  void fast_copy(class_type const& to, char* o)
  {
    if(*this != to)
    {
      ACESTL_ASSERT(NULL != m_handle);
      m_handle->fast_copy(to.m_handle, o);
    }
  }

Tantalizingly, these do very little beyond invoking the same-named new methods of the shared_handle class, shown in Listing 31.12. For both in and out transfers, these methods calculate the appropriate portion of each block to be read/written and effect the transfer with memcpy().

Listing 31.12 Definition of the shared_handle Algorithm Worker Methods

struct message_queue_sequence<. . .>::iterator::shared_handle
{
  . . .
  void fast_copy(char const* from, char const* to, size_type n)
  {
    ACESTL_ASSERT(0 != n);
    ACESTL_ASSERT(from != to);
    if(0 != n)
    {
      size_type n1 = m_entryLength - m_entryIndex;
      if(n <= n1)
      {
        ::memcpy(&m_entryIndex[m_entry->rd_ptr()], from, n);
      }
      else
      {
        ::memcpy(&m_entryIndex[m_entry->rd_ptr()], from, n1);
        from += n1;
        m_entry = nextEntry();
        ACESTL_ASSERT(NULL != m_entry);
        fast_copy(from, to, n - n1);
      }
    }
  }
  void fast_copy(class_type const* to, char* o)
  {
    size_type n1 = m_entryLength - m_entryIndex;
    if( NULL != to &&
      m_entry == to ->m_entry)
    {
      ::memcpy(o, &m_entryIndex[m_entry->rd_ptr()], n1);
    }
    else
    {
      ::memcpy(o, &m_entryIndex[m_entry->rd_ptr()], n1);
      o += n1;
      m_entry = nextEntry();
      if(NULL != m_entry)
      {
        fast_copy(to, o);
      }
    }
  }
  . . .

31.5.3 Specializing the Standard Library

So far so good, but no one wants to write client code such as the following:

ACE_Message_Queue<ACE_NULL_SYNCH>   mq; // 2 locks; total 120 bytes
char                                results[120];
acestl::message_queue_sequence<ACE_NULL_SYNCH>  mqs(mq);
   
acestl::message_queue_sequence<ACE_NULL_SYNCH>::fast_copy(mqs.begin()
                                           , mqs.end(), &results[120]);

We want the invocation std::copy() to pick up our fast version automatically when the other iterator type is char (const)*. For this we need to specialize std::copy().

For a number of reasons, defining partial template specializations in the std namespace is prohibited. This proves inconvenient in two ways. First, and most importantly, because message_queue_sequence is a template, we want to cater to all its specializations and so would want to do something like that shown in Listing 31.13. (For brevity I'm omitting the namespace qualification acestl from each message_queue_sequence<S>::iterator shown in this listing and the next.)

Listing 31.13 Illegal Specializations of std::copy()

// In namespace std
template <typename S>
char* copy( typename message_queue_sequence<S>::iterator from
          , typename message_queue_sequence<S>::iterator to, char* o)
{
  return message_queue_sequence<S>::fast_copy(from, to, o);
}
template <typename S>
typename message_queue_sequence<S>::iterator copy(char* from, char* to
                    , typename message_queue_sequence<S>::iterator  o)
{
  return message_queue_sequence<S>::fast_copy(from, to, o);
}

Since we may not do this, we are forced to anticipate the specializations of message_queue_sequence and (fully) specialize std::copy() accordingly, as in Listing 31.14. Note that separate char* and char const* specializations are required for the char-pointer-to-iterator block transfer, to ensure that copying from char* and char const* uses the optimization.

Listing 31.14 Legal Specializations of std::copy()

// In namespace std
template <> 
char*
 copy( typename message_queue_sequence<ACE_NULL_SYNCH>::iterator from
     , typename message_queue_sequence<ACE_NULL_SYNCH>::iterator to
     , char*                                                     o)
{
  return message_queue_sequence<ACE_NULL_SYNCH>::fast_copy(from, to, o);
}
 . . . // Same as above, but for ACE_MT_SYNCH
   
template <> 
typename message_queue_sequence<ACE_NULL_SYNCH>::iterator
 copy(char*                                                     from
    , char*                                                     to
    , typename message_queue_sequence<ACE_NULL_SYNCH>::iterator o)
{
  return message_queue_sequence<ACE_NULL_SYNCH>::fast_copy(from, to, o);
}
. . . // Same as above, but for ACE_MT_SYNCH
   
template <> 
typename message_queue_sequence<ACE_NULL_SYNCH>::iterator
 copy(char const*                                               from
    , char const*                                               to
    , typename message_queue_sequence<ACE_NULL_SYNCH>::iterator o)
{
  return message_queue_sequence<ACE_NULL_SYNCH>::fast_copy(from, to, o);
}
. . . // Same as above, but for ACE_MT_SYNCH

Fortunately, ACE offers only two specializations, in the form of ACE_NULL_SYNCH (a #define for ACE_Null_Mutex, ACE_Null_Condition) and ACE_MT_SYNCH (a #define for ACE_Thread_Mutex, ACE_Condition_Thread_Mutex), yielding only six specializations.

But there's more. If, like me, you avoid like the plague the use of char as a substitute for C++'s missing byte type, you probably instead use signed char or unsigned char, both of which are distinct types from char when it comes to overload resolution (and template resolution). Passing these to an invocation of std::copy() will not succeed in invoking the optimized transfer methods. So, with heads bowed low, we need to provide another six specializations for signed char and six for unsigned char, yielding a total of eighteen specializations, for what we'd like to have been two, or at most three, were we able to partially specialize in the std namespace.

Thankfully, all this effort is worth the payoff. Before we look at that, I just want to answer one question you might be pondering: Why only std::copy()? In principle there is no reason to not specialize all possible standard algorithms. The reason I've not done so is twofold. First, the sheer effort in doing so would be onerous, to say the least; to avoid a lot of manual plugging around we'd be pragmatically bound to use macros, and who likes macros? The second reason is more, well, reasoned. The whole reason for this optimization is to facilitate high-speed interpretation of data in its original memory block and high-speed exchange of data into new storage. In my experience, both of these involve std::copy(). I should admit one exception to this in our middleware project that required copy_n(). The copy_n() algorithm was overlooked for incorporation into the C++-98 standard (but will be included in the next version) and so appears in STLSoft. There are specializations of it, this time in the stlsoft namespace, in the same fashion as for std::copy(). Hence, there are a total of 36 function specializations in the <acestl/collections/message_queue_sequence.hpp> header file.

Performance

Now that we've examined the optimization mechanism, we'd better make sure it's worth the not inconsiderable effort. In order to demonstrate the differences in performance between the optimized block copying version and the original version, I used a test program that creates an ACE_Message_Queue instance to which it adds a number of blocks, copies the contents from a char array using std::copy(),copies them back to another char array (again with std::copy()), and verifies that the contents of the two arrays are identical. The number of blocks ranged from 1 to 10. The block size ranged from 10 to 10,000. The times for copying from the source char array to the sequence, and from the sequence to the destination char array, were taken separately, using the PlatformSTL component performance_counter. Each copying operation was repeated 20,000 times, in order to obtain measurement resolution in milliseconds. The code is shown in the extra material for this chapter on the CD.

Table 31.1 shows a representative sample of the results, expressed as percentages of the time (in milliseconds) taken by the equivalent nonoptimized version. As we might expect, with the very small block size of 10, the difference is negligible. For a buffer size of 100, there's an advantage with the optimized form, but it's not stunning. However, when we get to the more realistic buffer sizes of 1,000 and 10,000, there's no competition-the optimized form is 40-50 times faster.

Relative Performance of Block Copy Operations
Array to Iterator Iterator to Array
Number of Blocks Block Size Nonoptimized Block % Nonoptimized Block %
1 10 7 6 85.7% 7 9 128.6%
2 10 9 8 88.9% 9 7 77.8%
5 10 16 10 62.5% 16 10 62.5%
10 10 27 14 51.9% 26 14 53.8%
1 100 25 7 28.0% 23 7 30.4%
2 100 46 9 19.6% 42 9 21.4%
5 100 108 14 13.0% 99 14 14.1%
10 100 211 23 10.9% 188 23 12.2%
1 1,000 207 10 4.8% 184 11 6.0%
2 1,000 416 16 3.8% 391 17 4.3%
5 1,000 1,025 32 3.1% 898 32 3.6%
10 1,000 2,042 60 2.9% 1,793 61 3.4%
1 10,000 2,038 29 1.4% 1,786 29 1.6%
2 10,000 4,100 101 2.5% 3,570 101 2.8%
5 10,000 4,143 109 2.6% 3,606 101 2.8%
10 10,000 4,103 102 2.5% 3,573 102 2.9%

Summary

This chapter has looked at the features of Scatter/Gather I/O, whose APIs present considerable challenges to STL adaptation. We've examined an adaptation, in the form of the scatter_slice_sequence component, and have seen that such sequences must have genuinely random access iterators (i.e., not contiguous iterators), for which the identity &*it + 2 == &*(it + 2) does not hold (see Section 2.3.6). Notwithstanding, we've seen how we can take advantage of their partial contiguity in order to effect significant performance improvements, something that is particularly important given their use in file and/or socket I/O. With minimal sacrifice of the Principle of Transparency, we've made big gains in the Principle of Composition (and also served the Principle of Diversity along the way).

Share your opinion

Have an opinion about scatter/gather I/O?

Discuss this article in the Articles Forum topic, Gathering Scattered I/O in C++.

References

STLSoft
http://stlsoft.org

Pantheios
http://pantheios.org

Extended STL
http://extendedstl.com

Imperfect C++
http://imperfectcplusplus.com

ACE
http://www.cse.wustl.edu/~schmidt/ACE.html

About the Author

Matthew Wilson is a software development consultant for Synesis Software, and creator of the Pantheios and STLSoft libraries. He is author of Extended STL, volume 1 (Addison-Wesley, 2007) and Imperfect C++ (Addison-Wesley, 2004), and is currently working on his next one, Breaking Up The Monolith (Addison-Wesley, 2008), whenever he can squeeze in time between a demanding client and his busy sons. Matthew can be contacted via http://extendedstl.com.

The C++ Source | C++ Community News | Discuss | Print | Email | First Page | Previous | Next

Sponsored Links



Google
  Web Artima.com   
Copyright © 1996-2014 Artima, Inc. All Rights Reserved. - Privacy Policy - Terms of Use - Advertise with Us