Linux Clustering with Ruby Queue: Small is Beautiful
by Ara Howard
October 10, 2005

Summary
Ruby Queue software package lowers the barriers scientists need to overcome in order to realize the power of Linux clusters. It provides an extremely simple, economic, and easy-to-understand tool that harnesses the power of many CPUs while simultaneously allowing researchers to shift their focus away from the mundane details of complicated distributed computing systems and back to the task of actually doing science. The tool set is designed with a K.I.S.S, research-focused, philosophy that enables any ordinary (non-root) user to set up a zero-admin Linux cluster in 10 minutes or less. It is currently being used successfully in such diverse fields as bio-chemical research at the University of Toronto, geo-mechanical modeling at IGEOSS, and studying the nighttime lights of the world at the National Geophysical Data Center.

My friend Dave Clements is always game for a brainstorming session—especially if I'm buying the coffee. We met at the usual place and I explained my problem to him over the first cup: We had a bunch of Linux nodes sitting idle and we had a stack of work lined up for them, but we had no way to distribute the work to them. And the deadline for completion loomed over us. Over the second cup, I related how I had evaluated several packages such as Open Mosix [0] and Sun's Grid Engine [1], but had ultimately decided against them. It all came down to this: I wanted something leaner than everything I'd seen, something fast and easy, not a giant software system that would take weeks of work to install and configure.

After the third cup of coffee, we had it: Why not simply create an NFS [2] mounted priority queue and let nodes pull jobs from it as fast as they could? No scheduler. No process migration. No central controller. No kernel mods. Just a collection of compute nodes working as quickly as possible to complete a list of tasks. But there was one big question: Was concurrently accessing an NFS-mounted queue possible to do safely? Armed with my favorite development tools (a brilliant IDE named vim [3] and the Ruby programming language [4]), I aimed to find out.

History

I work for the National Geophysical Data Center's (NGDC) [5] Solar-Terrestrial Physics Division (STP) [6] in the Defense Meteorological Satellite Program (DMSP) group [7]. My boss, Chris Elvidge, and the other scientists of our group study the nighttime lights of earth from space. The data we receive helps researchers understand changes in human population and the movement of forest fires, among other things. The infrastructure required to do this sort of work is astounding. This image:

Figure 1. Average intensity of nighttime lights over part of North America.

showing the average intensity of nighttime lights over part of North America, required over 100 gigabytes of input data and 142 terabytes of intermediate files to produce. Over 50000 separate processes spread across 18 compute nodes and a week of wall clock time went into its production.

Linux clusters have become the new super computers. The economics of teraflop performance built on commodity hardware is impossible to ignore in the current climate of dwindling research funding. However, one critical aspect of cluster-building, namely orchestration, is frequently overlooked by the people doing the buying. The problem facing a developer with clustered systems is analogous to the one facing a home buyer who can only afford a lot and some bricks: He's got a lot of building to do.

Building a small brick house on a shoestring

Yukihiro Matsumoto, a.k.a Matz, has said that "The purpose of Ruby is to maximize programming pleasure" and experience has taught me that enjoying the creative process leads to faster development and higher quality code. Ruby features powerful object oriented abstraction techniques, extreme dynamism, ease of extensibility, and an armada of useful libraries. It is a veritable "Swiss Army machete," precisely the sort of tool one should bring into uncharted territory like this.

Laying the foundation

The first task was to work out the issues with concurrent access to NFS shared storage, and the first bridge I had to cross was how to accomplish NFS-safe locking from within Ruby. Ruby has an fcntl(2) interface similar to Perl's and, just like Perl's, the interface requires you to pack a buffer with the struct arguments. This is perfectly safe, but, unfortunately, non-portable. I've wondered about this oversight before and decided to address it by writing a little C extension, "posixlock", which extends Ruby's built-in File class with a method to apply fcntl(2), or posix style, advisory locks to a File object. Here is a majority of the code from posixlock.c:

static int
posixlock (fd, operation)
     int fd;
     int operation;
{
  struct flock lock;
  switch (operation & ~LOCK_NB)
    {
    case LOCK_SH:
      lock.l_type = F_RDLCK;
      break;
    case LOCK_EX:
      lock.l_type = F_WRLCK;
      break;
    case LOCK_UN:
      lock.l_type = F_UNLCK;
      break;
    default:
      errno = EINVAL;
      return -1;
    }
  lock.l_whence = SEEK_SET;
  lock.l_start = lock.l_len = 0L;
  return fcntl (fd,
    (operation & LOCK_NB) ? F_SETLK :
    F_SETLKW, &lock);
}

static VALUE
rb_file_posixlock (obj, operation)
     VALUE obj;
     VALUE operation;
{
  OpenFile *fptr;
  int ret;
  rb_secure (2);
  GetOpenFile (obj, fptr);
  if (fptr->mode & FMODE_WRITABLE)
    {
      fflush (GetWriteFile (fptr));
    }
retry:
  TRAP_BEG;
  ret =
    posixlock (fileno (fptr->f),
         NUM2INT (operation));
  TRAP_END;
  if (ret < 0)
    {
      switch (errno)
  {
  case EAGAIN:
  case EACCES:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
  case EWOULDBLOCK:
#endif
    return Qfalse;
  case EINTR:
#if defined(ERESTART)
  case ERESTART:
#endif
    goto retry;
  }
      rb_sys_fail (fptr->path);
    }

void
Init_posixlock ()
{
  rb_define_method (rb_cFile, "posixlock",
        rb_file_posixlock, 1);
}

Granted it's a bit ugly, but C code always is. One of things that's really impressive about Ruby is that the code for the interpreter itself is very readable. The source includes array.c, hash.c, and object.c—files that even I can make some sense of. In fact, I was able to steal about ninety eight percent of the above code from Ruby's File.flock implementation defined in file.c.

Along with posixlock.c I needed to write an extconf.rb (extension configure) file which Ruby auto-magically turns into a Makefile. Here is the complete extconf.rb file used for the posixlock extension:

require 'mkmf' and create_makefile 'posixlock'

Usage of the extension mirrors Ruby's own File.flock call, but is safe for NFS mounted files. The example below can be run simultaneously from several NFS clients:

require 'socket'
require 'posixlock'

host = Socket::gethostname
puts "test running on host #{ host }"

File::open('nfs/fcntl_locking.test','a+') do |file|
  file.sync = true
  loop do
    file.posixlock File::LOCK_EX
    file.puts "host : #{ host }"
    file.puts "locked : #{ Time::now }"
    file.posixlock File::LOCK_UN
    sleep 0.42
  end
end

A "tail -f" of the NFS mounted file "fcntl_locking.test" will show it being concurrently accessed in a safe fashion. Notice the lack of error checking—Ruby is an exception-based language, any method which does not succeed will raise an error and a detailed stack trace will be printed on standard error.

One of the things to note about this extension is that I was able to actually add a method to Ruby's built-in File class. Ruby's classes are open—you can extend any class at any time, even the built-in ones. Obviously extending the built-in classes should be done with great care, but there is a time and a place for it and Ruby does not prevent you from doing so where it makes sense. The point here is not that you have to to extend Ruby but that you can. And it is not difficult.

Having resolved my locking dilemma, the next design choice I had to make was what format to store the queue in. Ruby has the ability to serialize any object to disk and also includes a transactions-based file-backed object storage class, PStore, which I have used successfully as a 'mini database' for many cgi programs. I began by implementing a wrapper on this class that used the posixlock module to ensure NFS safe transactions and which supported methods like "insert_job", "delete_job", and "find_job." Right away I started feeling like I was writing a little database.

Not being one to reinvent the wheel (or at least not too often!) I decided to utilize the SQLite [8] embedded database and the excellent Ruby bindings for it written by Jamis Buck [9] as a storage back end. This really helped get the project moving as I was freed from writing a lot of database-like functionality.

Many database APIs have made the choice of returning either a hash or an array to represent a database tuple (row). With tuples represented as hashes you can write very readable code like:

ssn = tuple['ssn']

and yet are unable to write natural code like

sql =
  "insert into jobs values ( #{ tuple.join ',' } )"

or

primary_key, rest = tuple

While with an array representation you end up with undecipherable code like:

field = tuple[7]

Now, what was field "7" again?

When I first started using the SQLite binding for Ruby, all it's tuples were returned as hashes and I had a lot of slightly-verbose code converting tuples from hash to array and back again. Anyone who's spent much time working with Ruby will tell you that its elegance inspires you to make your own code more elegant. All this converting was not only inelegant, but inefficient. What I wanted was a tuple class that was an array, but one that allowed keyword field access for readability and elegance.

For Ruby this was no problem. I wrote a pure Ruby module, ArrayFields, that allowed any array to do exactly this. In Ruby a module is not only a namespace but can be "mixed-in" to other classes to impart functionality. The effect is similar, but less confusing, than multiple inheritance. In fact, not only can Ruby classes be extended in this way, but instances of Ruby objects themselves can be dynamically extended with the functionality of a module—leaving other instances of that same class untouched. Here's an example using Ruby's Observable module, which implements the Publish/Subscribe design pattern:

require 'observer'
publisher = Publisher::new
publisher.extend Observable

In this example only this instance of the Publisher class is extended with Observable's methods.

Jamis was more than happy to work with me to add ArrayFields support to his SQLite package. The way it works is simple: if the ArrayFields module is detected at runtime the tuples returned by a query will be dynamically extended to support named field access. No other Array objects in memory are touched—only those Arrays returned as tuples are extended with ArrayFields.

Finally I was able to write readable code like:

require 'arrayfields'
require 'sqlite'

...

query = 'select * from jobs order by submitted asc'

tuples = db.execute query

tuples.each do |tuple|

  jid, command = job['jid'], job['command']

  run command

  job['state'] = 'finished'

 # quoted list of job's fields

  values = job.map{|val| "'#{ val }'" }.join ','

  sql = "insert into done values( #{ values } )"

  db.execute sql

end

and elegant code like:

tuples.sort_by{ |tuple| tuple['submitted'] }.reverse

This is no mere convenience; using arrays over hashes is faster, requires about 30% less memory, and makes many operations on tuples more natural to code. Allowing keyword access to the arrays makes the code more readable and frees the developer from remembering field positions or, worse, having to update code if a change to the database schema should change the order of fields. Finally, a reduction in lines of code almost always aids both development and maintenance.

Putting up walls

Using posixlock and SQLite made coding a persistent NFS-safe priority queue class relatively straightforward. Of course there were performance issues to address and a lease-based locking system was added to detect the possible lockd starvation issues I'd heard rumors about on the SQLite mailing list. I posted many questions to the NFS mailing lists during this development stage, and developers such as Trond Myklebust were invaluable resources to me.

I'm not too smart, especially when it comes to guessing the state of programs I myself wrote. Wise programmers know that there is no substitute for good logging. Ruby ships with a built-in Logger class that offers features like automatic log rolling. Using this class as a foundation, I was able to abstract a small module that's used by all the classes in rq to give consistent, configurable, and pervasive logging to all its objects in only a few lines of code. Being able to leverage built-in libraries to abstract important building blocks like logging is a time- and mind-saver.

If you are still using XML as a data serialization format and yearn for something easier and more readable I urge you to check out YAML [10]. Ruby Queue uses YAML extensively both as in input and output format. For instance, the rq command line tool shows jobs marked "important" as:

-
  jid: 1
  priority: 0
  state: pending
  submitted: 2004-11-12 15:06:49.514387
  started:
  finished:
  elapsed:
  submitter: redfish
  runner:
  pid:
  exit_status:
  tag: important
  restartable:
  command: my_job.sh
-
  jid: 2
  priority: 42
  state: finished
  submitted: 2004-11-12 17:37:10.312094
  started: 2004-11-12 17:37:13.132700
  finished: 2004-11-12 17:37:13.739824
  elapsed: 0.015724
  submitter: redfish
  runner: bluefish
  pid: 5477
  exit_status: 0
  tag: important
  restartable:
  command: my_high_priority_job.sh

This format is easy for humans to read and friendly to Linux commands like egrep(1). But best of all, the document above, when used as the input to a command, can be loaded into Ruby as an array of hashes with a single command:

require 'yaml'
jobs = YAML::load STDIN

It can then be used as a native Ruby object with no complex API required:

jobs.each do |job|
  priority = job['priority']
  ...
end

Perhaps the best summary of YAML for Ruby [11] is by it's author, "_why":

"Really, it's quite fantastic. Spreads right on your Rubyware like butter on bread!"

The roof

I actually had a prototype in production (which we do a lot in the DMSP group) when a subtle bug cropped up.

There is a feature of NFS known as "silly renaming." This happens when two clients have an NFS file open and one of them removes it, causing the the NFS server to rename the file as something like ".nfs123456789" until the second client is done with it and the file can truly be removed.

The general mode of operation for rq, when feeding on a queue (running jobs from it) is to start a transaction on the SQLite database, find a job to run, fork a child process to run the job, update the database with information such as the pid of the job, and to end the transaction. As it turns out, transactions in SQLite involve some temporary files which are removed at the end of the transaction. The problem was that I was forking in the middle of a transaction causing the file handle of the temporary file to be open in both the child and the parent. When the parent then removed the temporary file at the end of the transaction, a "silly rename" occurred so that the child's file handle was still valid. I started seeing dozens of these "silly" files cluttering my queue directories; they eventually would disappear but they were ugly and unnerving to users.

I initially looked into closing the file handle somehow after forking but received some bad news from Dr. Richard Hipp, the creator of SQLite, on the mailing list. He said forking in the middle of a transaction results in "undefined" behavior and was not recommended.

This was bad news, as my design depended heavily on forking in a transaction in order to preserve the atomicity of starting a job and updating it's state. What I needed to be able to do was "fork without forking." More specifically, I needed another process to fork, run the job, and wait for it on my behalf. Now, the idea of setting up a co-process and using IPC to achieve this made me break out in hives. Fortunately Ruby had a hiveless solution.

DRb, or Distributed Ruby, is a built-in library for working with remote objects. It's like Java RMI or SOAP, only about a million times easier to get going. What do remote objects have to do with forking in another process? I coded a tiny class that does the forking, job running, and waiting for me, and an instance of this class can then setup a local DRb server in a child process. Communication is done transparently via Unix domain sockets. In other words the DRb server is the co-process that does all the forking and waiting for me. Interacting with this object is like interacting any other Ruby object. The entire JobRunnerDaemon class is 101 lines of code, including setting up the child process. Following are some excerpts from the Feeder class which shows the key points of its usage.

An instance of a JobRunnerDaemon is started in child process and a handle on that remote (but on localhost) object returned:

jrd = JobRunnerDaemon::daemon

A JobRunner object is created for a job, the JobRunner is created by pre-forking a child in the JobRunnerDaemon's process used later to run the Job. Note that the actual fork takes place in the child process so it does not affect parent's transaction:

runner = jrd.runner job
pid = runner.pid
runner.run

Later the DRb handle on the JobRunnerDaemon can be used to wait on the child. This blocks just as a normal wait would, even though we are waiting on the child of a totally different process!

cid, status = jrd.waitpid2 -1, Process::WUNTRACED

We go through "Run it. Break it. Fix it." cycles like this a lot in my group, the philosophy being that there is no test like production. The scientists I work most closely with, Kim Baugh and Jeff Safran, are more than happy to have programs explode in their faces if the end result is better, more reliable code. Programs written in a dynamic language like Ruby enable me to fix bugs fast, which keeps their enthusiasm for testing high. The combined effect is a rapid evolutionary development cycle.

Moving in

I'll walk though the actual sequence of rq commands used to set up an instant Linux cluster comprised of four nodes. The nodes we'll use are onefish, twofish, redfish, bluefish. Each host is identified in its prompt, below. In my home directory on each of the hosts I have the symbolic link "~/nfs" pointing at a common NFS directory.

The first thing we have to do is initialize the queue:

redfish:~/nfs > rq queue create
created <~/nfs/queue>

Next we start feeder daemons on all four hosts:

onefish:~/nfs > rq queue feed --daemon --log=~/rq.log
twofish:~/nfs > rq queue feed --daemon --log=~/rq.log
redfish:~/nfs > rq queue feed --daemon --log=~/rq.log
bluefish:~/nfs > rq queue feed --daemon --log=~/rq.log

In practice you would not want to start feeders by hand on each node, so rq supports being "kept alive" via a crontab entry. When rq runs in daemon mode it acquires a lockfile that effectively limits it to one feeding process per host, per queue. Starting a feeder daemon will simply fail if another one is already feeding on the same queue. Thus a crontab entry like:

15/* * * * * rq queue feed --daemon --log=log

will check to see if a daemon is running every fifteen minutes and start one if, and only if, one is not already running. In this way an ordinary user can setup a process that will be running at all times, even after a machine reboot.

Jobs can be submitted from the command line, input file, or, in Linux tradition, from standard input as part of process pipeline. When submitting using an input file or stdin the format is either YAML (such as that produced as the output of other rq commands) or a simple list of jobs, one job per line. The format is auto-detected. Any host that sees the queue can run commands on it:

onefish:~/nfs > cat joblist
echo 'job 0' && sleep 0
echo 'job 1' && sleep 1
echo 'job 2' && sleep 2
echo 'job 3' && sleep 3

onefish:~/nfs > cat joblist | rq queue submit
-
  jid: 1
  priority: 0
  state: pending
  submitted: 2005-05-12 13:35:31.757662
  started:
  finished:
  elapsed:
  submitter: onefish
  runner:
  pid:
  exit_status:
  tag:
  restartable:
  command: echo 'job 0' && sleep 0
-
  jid: 2
  priority: 0
  state: pending
  submitted: 2005-05-12 13:35:31.757662
  started:
  finished:
  elapsed:
  submitter: onefish
  runner:
  pid:
  exit_status:
  tag:
  restartable:
  command: echo 'job 1' && sleep 1
-
  jid: 3
  priority: 0
  state: pending
  submitted: 2005-05-12 13:35:31.757662
  started:
  finished:
  elapsed:
  submitter: onefish
  runner:
  pid:
  exit_status:
  tag:
  restartable:
  command: echo 'job 2' && sleep 2
-
  jid: 4
  priority: 0
  state: pending
  submitted: 2005-05-12 13:35:31.757662
  started:
  finished:
  elapsed:
  submitter: onefish
  runner:
  pid:
  exit_status:
  tag:
  restartable:
  command: echo 'job 3' && sleep 3

We see, in the output of submitting to the queue, all of information about each of the jobs in YAML format. At this point we check the status of the queue:

redfish:~/nfs > rq queue status
---
jobs:
  pending: 2
  holding: 0
  running: 2
  finished: 0
  dead: 0
  total: 4
temporal:
  pending:
    earliest: { jid: 3, metric: submitted, time: 2005-05-12 13:35:31.757662 }
    latest: { jid: 4, metric: submitted, time: 2005-05-12 13:35:31.757662 }
    shortest:
    longest:
  holding:
    earliest:
    latest:
    shortest:
    longest:
  running:
    earliest: { jid: 1, metric: started, time: 2005-05-12 13:35:37.155667 }
    latest: { jid: 2, metric: started, time: 2005-05-12 13:35:40.111865 }
    shortest:
    longest:
  finished:
    earliest:
    latest:
    shortest:
    longest:
  dead:
    earliest:
    latest:
    shortest:
    longest:
performance:
  avg_time_per_job: 0
  n_jobs_in_last_1_hrs: 0
  n_jobs_in_last_2_hrs: 0
  n_jobs_in_last_4_hrs: 0
  n_jobs_in_last_8_hrs: 0
  n_jobs_in_last_16_hrs: 0
  n_jobs_in_last_32_hrs: 0
exit_status:
  successes: 0
  failures: 0

As you can see, many statistics about the queue are tracked, but right now we see only that two of the jobs have been picked up by a node and are being run while two others are yet to be started. When many jobs have been submitted to a queue and run by a node the status command gives valuable information about the health of the cluster in an instant.

We can find out which nodes are running our jobs using:

onefish:~/nfs > rq queue list running | egrep 'jid|runner'
 jid: 1
 runner: redfish
 jid: 2
 runner: bluefish

The record for a finished jobs remains in the queue until it's deleted since a user would generally want to collect this information. At this point we expect all jobs to be complete so we check their exit status:

bluefish:~/nfs > rq queue list finished | egrep 'jid|command|exit_status'
 jid: 1
 exit_status: 0
 command: echo 'job 0' && sleep 0
 jid: 2
 exit_status: 0
 command: echo 'job 1' && sleep 1
 jid: 3
 exit_status: 0
 command: echo 'job 2' && sleep 2
 jid: 4
 exit_status: 0
 command: echo 'job 3' && sleep 3
All commands have finished successfully. We can now delete any successful job from the queue:
twofish:~/nfs > rq queue query exit_status=0 | rq queue delete
There are many other useful operations rq can perform. For a description, type "rq help."

Looking backward and forward

Making the choice to "roll your own" is always a tough one because it breaks Programmer's Rule Number 42, which clearly states:

Every problem has been solved. It is Open Source. And it is the first link on Google.

Having a tool like Ruby is critical when you decide to break this rule and the fact that a project like Ruby Queue can be written in 3292 lines of code is testament to this fact. With few major enhancements planned, it is likely this small number will not grow much as the code base is refined and improved. The goals of rq remain simplicity and ease of use.

Ruby Queue [12] set out to lower the barrier scientists had to overcome in order to realize the power of Linux clusters. Providing a simple and easy to understand tool which harnesses the power of many CPUS allows them to shift their focus away from the mundane details of complicated distributed computing systems and back to the task of actually doing science. Sometimes small is beautiful.

Note

At the time this article was first written rq was a new piece of software that was promising but had barely been tested. After nearly 9 months of 24/7 use it has proved to be a viable solution : our group has now run millions of jobs using the software with a zero bugs filed and zero admin time dedicated.

Resources

[0] openMosix is a Linux kernel extension for single-system image clustering which turns a network of ordinary computers into a supercomputer.
http://openmosix.sourceforge.net/

[1] The Grid Engine project is an open source community effort to facilitate the adoption of distributed computing solutions. Sponsored by Sun Microsystems.
http://gridengine.sunsource.net/

[2] The Linux Network File System is the backbone of many laboratories and Linux clusters.
http://nfs.sourceforge.net/

[3] The tommy gun of text editors.
http://www.vim.org/

[4] The main Ruby language site.
http://www.ruby-lang.org/

[5] The National Geophysical Data Center.
http://www.ngdc.noaa.gov/

[6] The Solar-Terrestrial Physics group pays Ara to write Ruby and do other fun things.
http://www.ngdc.noaa.gov/stp/

[7] The Defense Meteorological Satellite Program is responsible for the slew of data and associated processing that necessitated the development of Ruby Queue.
http://dmsp.ngdc.noaa.gov/

[8] SQLite is a small C library that implements a self-contained, embeddable, zero-configuration SQL database engine.
http://www.sqlite.org/

[9] Ruby bindings for the SQLite library.
http://rubyforge.org/projects/sqlite-ruby/

[10] YAML (rhymes with "camel") is a straightforward machine parsable data serialization format designed for human readability and interaction with scripting languages such as Perl, Python and Ruby.
http://www.yaml.org/

[11] For Ruby developers, YAML is a natural fit for object serialization and general data storage. Really, it's quite fantastic. Spreads right on your Rubyware like butter on bread!
http://yaml4r.sourceforge.net/

[12] Get the latest release of Ruby Queue here today and rev up the CPU cycles you're throwing at your projects!
http://raa.ruby-lang.org/project/rq/

This article was first published in Linux Journal, in December 2004.

Talk back!

Have an opinion? Readers have already posted 6 comments about this article. Why not add yours?

About the author

Colin Gray is the America's developer marketing manager for Business Objects. Colin has been in technical sales, product management and developer marketing roles with Business Objects (Crystal Decisions pre-acquisition) for 16 years. Over this tenure one of the most daunting communication hurdles has been helping developers map their project requirements to the solutions that Business Objects offers. This goal gave birth to The Interactive Object Model Tree.