This post originated from an RSS feed registered with Scala Buzz
by Lalit Pant.
Original Post: Clustered Scala Actors
Feed Title: All Things Runnable / Scala
Feed URL: http://feeds.feedburner.com/lalitpant/scala
Feed Description: Scala related posts on Lalit Pant's Blog
I have recently been looking at the Terracotta Integration Module (TIM) for Scala actors that was announced earlier this year by Jonas Boner.
As I started to play with the TIM, I decided to do the following:
Write a simple but hopefully interesting actors based program
Try to cluster it using the Scala TIM
Identify the lessons learned on the way
The first thing to do was to come up with a sample problem. After some thought, I settled on writing a parallel Fold implementation. As a quick recap: a Fold operation combines the elements of a sequence and a seed using a binary operator:
scala>val list = List(1,2,3) list: List[Int] = List(1, 2, 3)
scala> list.foldLeft(0)((x, y) => x+y) res5: Int = 6
So I went ahead and coded a ParallelFolder that uses a Master actor and some Worker actors to do a left-Fold. Here's how the ParallelFolder is implemented:
The Master receives a fold request containing a data sequence and a fold function
The Master splits the provided data and farms the work out to the Workers
The Workers apply the fold function to the provided data, and send the results back to the Master
The Master aggregates the results from the Workers by applying the provided fold function, and then sends the final result back to the Requester
Enough talk. Let's look at some code.
Here's the definition of the messages that flow through the system:
case class ParallelFoldReq[A](data: Seq[A], folder: (A, A) => A, seed: A, requester: AnyRef) case class ParallelFoldWorkReq[A](taskId: Int, data: Iterable[A], folder: (A, A) => A, fromMaster: Actor) case class ParallelFoldWorkResult[A](taskId: Int, result: A, folder: (A, A) => A, fromWorker: Actor) case class Tick(s: String) // cluster-support case class Maim(s: String) case object Quit
Here's the Master:
class Master extends Actor { val (worker1, worker2, numWorkers) = (new Worker, new Worker, 2) val taskStatus = new HashMap[Int, (Any, Int, AnyRef)] var nextTaskId = 0
def go = { worker1.go; worker2.go this.start }
def act = { init // cluster-support log.debug("Scheduling ticks for Master") ActorPing.scheduleAtFixedRate(this, Tick("master"), 0L, 5000L) // cluster-support loop { react { case ParallelFoldReq(data, folder, seed, requester) => log.info("Master Received Request. Current Task Id: {}", nextTaskId)
case ParallelFoldWorkReq(taskId, data, folder, master) => log.info("Worker {} received request. Current Task Id: {}", id, taskId) Thread.sleep(1000 * 1) val result = data.reduceLeft(folder) master ! ParallelFoldWorkResult(taskId, result, folder, self)
case Quit => log.info("Worker asked to Quit: {}", id) throw new RuntimeException("Bye from: " + this)
case Tick(x) => log.debug("Worker {} got a Tick", id) } } } }
And finally, here's the Application object that creates and runs the Actors:
object ParallelFolder extends util.Logged { log.info("Parallel Folder Starting...") val master = new Master master.go log.info("Parallel Folder ready to go.")
def main(args: Array[String]): Unit = { if (args.size == 0 || !args(0).trim.equals("-c")) { // not running in cluster. Initiate some work right here for (i <- 1 to 1) { val work = List(1,2,3,4,5,6) log.info("Sending sequence to master for Parallel Fold: {}", work) master ! ParallelFoldReq(work, (x:Int ,y:Int) => x+y, 0, "host://protocol/requesterLocation") } } }
def fold[A](data: Seq[A], folder: (A, A) => A, x: A): A = { master ! ParallelFoldReq(data, folder, x, self) val ret = self.receive({case x => x}) ret.asInstanceOf[A] } }
$ ../src/run-plain.sh 12:05:38.127 [main] INFO p1.ParallelFolder$ - Parallel Folder Starting... 12:05:38.221 [main] INFO p1.ParallelFolder$ - Parallel Folder ready to go. 12:05:38.252 [main] INFO p1.ParallelFolder$ - Sending sequence to master for Pa rallel Fold: List(1, 2, 3, 4, 5, 6) 12:05:38.252 [Thread-4] INFO p1.Master - Master Received Request. Current Task Id: 0 12:05:38.252 [Thread-3] INFO p1.Worker - Worker 1 received request. Current Tas k Id: 0 12:05:38.252 [Thread-1] INFO p1.Worker - Worker 2 received request. Current Tas k Id: 0 12:05:39.252 [Thread-3] INFO p1.Master - Master received result: 6 - from worke r p1.Worker@8b819f. Task Id: 0 12:05:39.252 [Thread-3] INFO p1.Master - Results received: 1. Current Taskk Id: 0 12:05:39.252 [Thread-3] INFO p1.Master - Waiting for more results from worker. Current Taskk Id: 0 12:05:39.252 [Thread-3] INFO p1.Master - Master received result: 15 - from work er p1.Worker@120a47e. Task Id: 0 12:05:39.252 [Thread-3] INFO p1.Master - Results received: 2. Current Taskk Id: 0 12:05:39.252 [Thread-3] INFO p1.Master - Result available for Requester: host:/ /protocol/requesterLocation. Current Taskk Id: 0. The Result is: 21
As you can see, ParallelFolder gets the Master going when it starts up. Then, the main method of ParallelFolder sends in a List to the Master for Folding. The Master does its thing with the help of the Workers, and reports the result.
Everything looks hunky-dory, so let me ramp it up and try to run the same program within a Terracotta cluster. Here's the plan of action for this scenario:
Start the Terracotta Server
Start an instance of ParallelFolder in Console#1
Start another instance of ParallelFolder in Console#2. These two instances are clustered via Terracotta
Start a client that feeds in work into the cluster
See what happens
Before going any further, let me show you the Cluster Client:
object ClusterClient extends util.Logged { def main(args : Array[String]) : Unit = { log.info("Attaching to WorkManager") val master = new Master master.init
for (i <- 1 to 10) { val work = List(1,2,3,4,5,6) log.info("Sending sequence to master for Parallel Fold: {}", work) master ! ParallelFoldReq(work, (x:Int ,y:Int) => x+y, 0, "host://protocol/requesterLocation") }
// Kill a worker // log.info("Crippling the Master") // master ! Maim("cripple") } }
And here's the Scala TIM actor configuration file:
$ cat clustered-scala-actors.conf
p1.Master: class p1.Worker: custom
The tc-config.xml file is pretty standard, so I will not show it here.
Everything looks set, so let me go ahead and run the Terracotta Server:
$ start-tc-server.bat
[snip] 2008-07-01 13:36:49,141 INFO - Terracotta Server has started up as ACTIVE node o n 0.0.0.0:9510 successfully, and is now ready for work.
[snip] Parsing scala actors config file: clustered-scala-actors.conf Configuring clustering for Scala Actor [p1.Master] with scope [class] Configuring clustering for Scala Actor [p1.Worker] with scope [custom] 13:50:40.493 [main] INFO p1.ParallelFolder$ - Parallel Folder Starting... 13:50:41.258 [main] INFO p1.ParallelFolder$ - Parallel Folder ready to go.
Console#2
$ ../src/run.sh -c
[snip] Parsing scala actors config file: clustered-scala-actors.conf Configuring clustering for Scala Actor [p1.Master] with scope [class] Configuring clustering for Scala Actor [p1.Worker] with scope [custom] 13:52:33.988 [main] INFO p1.ParallelFolder$ - Parallel Folder Starting... 13:52:34.800 [main] INFO p1.ParallelFolder$ - Parallel Folder ready to go.
And finally, the moment of truth! Let me run the Cluster Client:
[snip] Parsing scala actors config file: clustered-scala-actors.conf Configuring clustering for Scala Actor [p1.Master] with scope [class] Configuring clustering for Scala Actor [p1.Worker] with scope [custom] 14:03:29.159 [main] INFO p1.ClusterClient$ - Sending sequence to master for Par allel Fold: List(1, 2, 3, 4, 5, 6) 14:03:29.706 [main] INFO p1.ClusterClient$ - Sending sequence to master for Par allel Fold: List(1, 2, 3, 4, 5, 6) 14:03:29.706 [main] INFO p1.ClusterClient$ - Sending sequence to master for Par allel Fold: List(1, 2, 3, 4, 5, 6)
Here's the output that shows up on Console#1 and Console#2
Console#1:
14:03:35.687 [Thread-14] INFO p1.Master - Master received result: 6 - from work er p1.Worker@86554. Task Id: 0 14:03:35.718 [Thread-14] INFO p1.Master - Results received: 1. Current Taskk Id : 0 14:03:35.718 [Thread-14] INFO p1.Master - Waiting for more results from worker. Current Taskk Id: 0 14:03:37.811 [Thread-14] INFO p1.Master - Master received result: 15 - from wor ker p1.Worker@1e06fc2. Task Id: 0 14:03:37.811 [Thread-14] INFO p1.Master - Results received: 2. Current Taskk Id : 0 14:03:37.811 [Thread-14] INFO p1.Master - Result available for Requester: host: //protocol/requesterLocation. Current Taskk Id: 0. The Result is: 21 14:03:38.920 [Thread-15] INFO p1.Master - Master received result: 6 - from work er p1.Worker@86554. Task Id: 1 14:03:38.920 [Thread-15] INFO p1.Master - Results received: 2. Current Taskk Id : 1 14:03:38.920 [Thread-15] INFO p1.Master - Result available for Requester: host: //protocol/requesterLocation. Current Taskk Id: 1. The Result is: 21 14:03:39.045 [Thread-14] INFO p1.Master - Master received result: 6 - from work er p1.Worker@86554. Task Id: 2 14:03:39.045 [Thread-14] INFO p1.Master - Results received: 2. Current Taskk Id : 2 14:03:39.045 [Thread-14] INFO p1.Master - Result available for Requester: host: //protocol/requesterLocation. Current Taskk Id: 2. The Result is: 21
Console#2:
14:03:32.361 [Thread-16] INFO p1.Master - Master Received Request. Current Task Id: 0 14:03:32.454 [Thread-12] INFO p1.Worker - Worker 1 received request. Current Ta sk Id: 0 14:03:33.485 [Thread-13] INFO p1.Worker - Worker 2 received request. Current Ta sk Id: 0 14:03:34.500 [Thread-16] INFO p1.Master - Master Received Request. Current Task Id: 1 14:03:34.610 [Thread-13] INFO p1.Worker - Worker 2 received request. Current Ta sk Id: 1 14:03:35.734 [Thread-16] INFO p1.Master - Master Received Request. Current Task Id: 2 14:03:35.750 [Thread-12] INFO p1.Worker - Worker 1 received request. Current Ta sk Id: 1 14:03:36.796 [Thread-13] INFO p1.Worker - Worker 2 received request. Current Ta sk Id: 2 14:03:37.827 [Thread-12] INFO p1.Worker - Worker 1 received request. Current Ta sk Id: 2 14:03:38.873 [Thread-16] INFO p1.Master - Master received result: 15 - from wor ker p1.Worker@a86dfb. Task Id: 1 14:03:38.873 [Thread-16] INFO p1.Master - Results received: 1. Current Taskk Id : 1 14:03:38.873 [Thread-16] INFO p1.Master - Waiting for more results from worker. Current Taskk Id: 1 14:03:39.045 [Thread-13] INFO p1.Master - Master received result: 15 - from wor ker p1.Worker@a86dfb. Task Id: 2 14:03:39.045 [Thread-13] INFO p1.Master - Results received: 1. Current Taskk Id : 2 14:03:39.045 [Thread-13] INFO p1.Master - Waiting for more results from worker. Current Taskk Id: 2
It seems to work!
I have highlighted the log trace for task#1 in the console output above; this shows how the Master and the Worker handle the task.
Let me take a moment to talk about exactly what's going on here. In the above scenario, Terracotta has clustered the Master and the two Workers. What exactly does that mean? For any actor, clustering via Terracotta means the following:
The actor's mailbox is clustered and lives within the Terracotta NAM (Network Attached Memory)
Multiple copies of the actor run within the cluster, one per JVM/node
All the different copies of the actor are waiting to pull messages out of the clustered mailbox
An entity within any node in the cluster can send a message to the actor
This message shows up in the clustered mailbox of the actor
Only one copy of the actor gets the message. At this point, the message disappears from the mailbox. This provides for load balancing and fault tolerence:
load balancing: if a copy of the actor is busy in one VM, a different copy in another VM picks up the message
fault tolerance: if a copy of the actor dies in one VM, the remaining copies are still around, and a different copy in another VM picks up the message
So - can we conclude that the Scala TIM auto-magically clusters an actors based program and gives us all the great sounding features described above. Well - not quite. The following concessions have to be made to make standalone code cluster-friendly (in the listings above, such sections of code are marked with '// cluster-support'):
The waitingFor field within the Actor trait has to be marked transient for Terracotta. As a result, this field has to be inited outside of a constructor for copies that attach to Terracotta
Every clustered actor needs to schedule a heartbeat to pump messages from the clustered mailbox (but this is not as messed-up as it might first sound)
That's not too bad, is it?
I should mention that the current Scala TIM implementation uses named locks to protect the internal state of actors. This is a potential bottleneck, because it introduces lock contention during mailbox access by even unrelated actors. But, based on my currently rather limited knowledge of Terracotta, I think this can be improved.
In a future post, I'll talk more about the features and limitations discussed above. Till then, have fun.