Scala Akka Tutorial: Merge Sort with Actors

This is a tutorial on how to set up a Scala project with sbt and use actors with Akka. For this example I am going to implement merge sort using actors. This is not intended to be production code, the focus is on exploring scala and actors, rather than providing an efficient implementation of merge sort.

When developing in Scala I use the scala build tool (sbt) to build my project while I code with Scala IDE for Eclipse and this is what I am going to use for this tutorial. So, as a first step you need to download and install both of them.

Instead of creating a project from within the IDE, I create the few files needed manually. I first create the following folders

ActorMergeSort\
ActorMergeSort\project
ActorMergeSort\src\main\scala\actormergesort

Where src\main\scala is the standard folder where scala code lives and actormergesort is the name of our package similarly to Java. From now on, I will omit the top folder ActorMergeSort and give the paths relative to this top folder.

Next, I add build.sbt in the top folder:

name := "Actor MergeSort"

version := "1.0"

scalaVersion := "2.11.4"

libraryDependencies ++= Seq(
        "com.typesafe.akka" %% "akka-actor" % "2.3.6"
    )

This file defines the name of the project, its version, the version of Scala, and a list of library dependencies. In this case I only declare the Akka library.

I will use the sbt eclipse plugin to generate the Eclipse project, and this needs to be declared in project\plugins.sbt which contains this single line:

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")

Finally, I will add three empty scala files

src\main\scala\actormergesort\Sorter.scala
src\main\scala\actormergesort\Merger.scala
src\main\scala\actormergesort\Runner.scala

This is where my Scala code will live.

Once I’ve done all the above, I navigate to the top folder in the command line and run sbt eclipse. This is going to create all the necessary files for the eclipse project. If I put the code under source control, I do not include the eclipse files as these can be generated by the build.sbt file. If new libraries are added, then I need to add them in build.sbt and regenerate the Eclipse files. Note that if I regenerate the eclipse files while the project is open in Eclipse, I need to right-click it from within Eclipse and select Refresh. The .hgignore or .gitignore file that ignores the Eclipse files is as follows:

target/

.cache
.classpath
.project

Note that target/ has no leading / so that it matches any folder (the trailing / means folder) at any depth that is named target.

Once you run sbt eclipse, fire up Eclipse and do File > Import > General > Existing Projects into Workspace, click next, select the root of the project and Finish.

You can build the project from the command line by running sbt build.

I will start by implementing the easier Merger actor first since it has no dependencies on the other actors. The idea is simple, I need an actor that will receive a message with two sorted lists and will reply back with a merged sorted list. So first I define the two messages in src\main\scala\actormergesort\Merger.scala

package actormergesort

object Merger {
  case class Merge(list1: List[Int], list2: List[Int])
  case class MergeResult(merged: List[Int])
}

I have defined the so-called companion object for the Merger class that will follow, using the object keyword. Think of companion objects as a placeholder for static stuff. Within the companion object I have defined the two messages as case classes: an incoming message carrying two integer lists and an outgoing message carrying the single merged list. Case classes are immutable classes that give us for free a constructor method so that I don’t need the new keyword, accessor methods for all constructor arguments, and pattern matching that I’ll cover soon.

Before making the Merger class an actor, let’s first create a class that can merge two lists. I am still in src\main\scala\actormergesort\Merger.scala

class Merger() {
  import scala.annotation.tailrec

  def merge(list1: List[Int], list2: List[Int]) = mergeRec(list1, list2, Nil)

  @tailrec
  def mergeRec(list1: List[Int], list2: List[Int], revAcc: List[Int]): List[Int] = {
    (list1, list2) match {
      case (hd1::tl1, hd2::tl2) =>
        if (hd1 <= hd2) mergeRec(tl1, list2, hd1::revAcc) else mergeRec(list1, tl2, hd2::revAcc)
      case (hd1::tl1, Nil) => mergeRec(tl1, list2, hd1::revAcc)
      case (Nil, hd2::tl2) => mergeRec(list1, tl2, hd2::revAcc)
      case _ => revAcc.reverse
    }
  }
}

Let’s focus on the second method first. This is a standard way of writing recursive code in functional programming, where the recursive method carries an accumulator as a parameter. I have decorated the method with @tailrec to ensure that the method is tail-recursive, if not the compiler will let us know.

I have used pattern matching on the tuple (list1, list2). When I pattern match a list to hd::tl, this means that it has at least one element hd and the rest of the list tl which could be the emtpy list Nil. In the example above the first case is where both lists are non-empty, in which case I take the head of the list with the lowest value and stick it to the front of the accumulator, and then I call the same function recursively with one of the two lists chopped off and the new accumulator. If one of the two lists is empty (the 2nd and 3rd case) the decision of which of the two heads to take next is easy. Finally, if both lists are empty, it is time to use the accumulator and return the result. Since I am prepending every new element to the accumulator (this is because it is very cheap to prepend to a list rather than append), I reverse the list before returning the result. This is why are accumulator is called revAcc and not just acc.

Once I have a recursive method with the accumulating parameter, then it is just a matter of writing another method that will just start the recursive process with an empty accumulator.

Next, I am going to convert the Merger into an actor by inheriting from the akka Actor class.

import akka.actor._

class Merger() extends Actor {

An actor must define a receive method that defines how the actor behaves when receiving a message. This method is matching the type of the incoming message. In this case, the actor will only respond to Merge messages and will send back to the sender a MergeResult message with the merged result.

import Merger._

def receive = {
    case Merge(list1, list2) =>
        val merged = merge(list1, list2)
        sender ! MergeResult(merged)
}

Notice that I have imported the case classes from the companion object. The sender is part of the akka magic, and the ! means send the following message.

Putting everything together

package actormergesort

import akka.actor._

object Merger {
  case class Merge(list1: List[Int], list2: List[Int])
  case class MergeResult(merged: List[Int])

  def props() = Props(classOf[Merger])
}

class Merger() extends Actor {
  import Merger._
  import scala.annotation.tailrec

  def receive = {
      case Merge(list1, list2) =>
      val merged = merge(list1, list2)
      sender ! MergeResult(merged)
  }

  private def merge(list1: List[Int], list2: List[Int]) = mergeRec(list1, list2, Nil)

  @tailrec
  private def mergeRec(list1: List[Int], list2: List[Int], revAcc: List[Int]): List[Int] = {
    (list1, list2) match {
      case (hd1::tl1, hd2::tl2) =>
        if (hd1 <= hd2) mergeRec(tl1, list2, hd1::revAcc) else mergeRec(list1, tl2, hd2::revAcc)
      case (hd1::tl1, Nil) => mergeRec(tl1, list2, hd1::revAcc)
      case (Nil, hd2::tl2) => mergeRec(list1, tl2, hd2::revAcc)
      case _ => revAcc.reverse
    }
  }
}

I have made the merge helper functions private and I have added a props() method that returns a configuration object for creating new actors.

Next, I move on to the Sorter actor in src\main\scala\actormergesort\Merger.scala. I first define the companion object with the two messages

package actormergesort

object Sorter {
  case class Sort(array: Array[Int])
  case class SortResult(sorted: List[Int])
}

I have used Array’s as inputs rather than Lists’s, because it is much cheaper to get sub-arrays than sub-lists. There are two messages here: an instruction to sort that carries the table to be sorted, and a result message with the sorted table.

The idea for the Sorter is the following, once it receives an array to sort, it will split the array into two and then create two new child Sorter actors that will sort each sub-array. The recursion will end when the array has length 1, this means any Sorter will reply immediately with the same array if the length is 1. When the parent Sorter gets the first of the two replies back, it will store the sorted sub-list. When it get’s the second sorted sub-list, it will delegate the merging of the two sorted sub-lists to a newly created child Merger actor. When the Merger replies back to its parent Sorter, then the parent Sorter will send the sorted result to its parent.

class Sorter() extends Actor {
  import Sorter._
  import Merger.Merge
  import Merger.MergeResult

  var sorted1: Option[List[Int]] = None
  var parent: ActorRef = context.system.deadLetters

  def receive = idle

For this actor I will define several behaviours and the Sorter will change behaviours depending on the stage of the sorting process it is at. I have added two mutable fields. sorted1 will store the result that arrives first. Its type is Option of an integer list, this means that it can have the value None or Some(x) where x is an integer list in this example. I also store the actor reference of the parent actor, since it will have to process several messages until it can get back to the sender of the original Sort message. The actor starts with the behaviour idle, which means it is doing nothing waiting for a Sort message

val idle: Receive = {
  case Sort(array) =>
    if (array.length < 2) {
      sender ! SortResult(array.toList)
    } else {
      parent = sender
      
      val subSorter1 = context.actorOf(props(), self.path.name + "-1")
      val subSorter2 = context.actorOf(props(), self.path.name + "-2")
  
      val subArray1 = array.slice(0, array.length / 2)
      val subArray2 = array.slice(array.length / 2, array.length)
  
      subSorter1 ! Sort(subArray1)
      subSorter2 ! Sort(subArray2)
  
      context.become(waitingSubSorts)
    }
}

When I receive a Sort message, I check the array size and if it cannot be split I just return the same array as the result to the sender. Otherwise, I store the sender and we create two child Sorter actors and send on half of the original array to each to sort. At the end, I change behaviour to waitingSubSorts

val waitingSubSorts: Receive = {
  case SortResult(sorted) =>
    sorted1 match {
      case None =>
        sorted1 = Some(sorted)
      case Some(s1) =>
        val merger = context.actorOf(Merger.props(), "merger-for-" + self.path.name)
        merger ! Merge(s1, sorted)
        context.become(waitingMerge)
    }
}

In this behaviour I only respond to SortResult messages, as I am expecting two of them, one from each child. Once I receive one, I check whether sorted1 contains a result, if not (this means it is None), then this is the first result and I store it there. Notice that I store it as Some(sorted) because sorted1 is an Option. If this is the second result I see, then I create a Merger actor and I send the two sub-lists for sorting. Next, the actor changes behaviour to waitingMerge

val waitingMerge: Receive = {
  case MergeResult(merged) =>
    parent ! SortResult(merged)
}

Finally, I simply send the merged result back to the parent, which is the original sender of the Sort message. Putting everything together

package actormergesort

import akka.actor._

object Sorter {
  case class Sort(array: Array[Int])
  case class SortResult(sorted: List[Int])

  def props() = Props(classOf[Sorter])
}

class Sorter() extends Actor {
  import Sorter._
  import Merger.Merge
  import Merger.MergeResult

  var sorted1: Option[List[Int]] = None
  var parent: ActorRef = context.system.deadLetters

  def receive = idle

  val idle: Receive = {
    case Sort(array) =>
      if (array.length < 2) {
        parent ! SortResult(array.toList)
      } else {
        parent = sender
      
        val subSorter1 = context.actorOf(props(), self.path.name + "-1")
        val subSorter2 = context.actorOf(props(), self.path.name + "-2")
    
        val subArray1 = array.slice(0, array.length / 2)
        val subArray2 = array.slice(array.length / 2, array.length)
    
        subSorter1 ! Sort(subArray1)
        subSorter2 ! Sort(subArray2)
    
        context.become(waitingSubSorts)
      }
  }

  val waitingSubSorts: Receive = {
    case SortResult(sorted) =>
      sorted1 match {
        case None =>
          sorted1 = Some(sorted)
        case Some(s1) =>
          val merger = context.actorOf(Merger.props(), "merger-for-" + self.path.name)
          merger ! Merge(s1, sorted)
          context.become(waitingMerge)
      }
  }

  val waitingMerge: Receive = {
    case MergeResult(merged) =>
      parent ! SortResult(merged)
  }
}

To test how the Sorter works, I create a small App in src\main\scala\actormergesort\Runner.scala

package actormergesort

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.util.Random

object Runner extends App {
  import Sorter._

  val rnd = new Random()

  val array = (1 to 30).map {_ => rnd.nextInt(100) }.toArray
  println("Unsorted: " + array.mkString("[",",","]"))

  val system = ActorSystem()
  val sorter = system.actorOf(Sorter.props(), "sorter")
  
  implicit val timeout = Timeout(25.seconds)     
  // Use system's dispatcher ExecutionContext for futures.
  import system.dispatcher 
  
  val future = sorter ? Sort(array)
  
  future.map { 
    case SortResult(sorted) =>
      println("Sorted: " + sorted.mkString("[",",","]"))
      system.shutdown()
  }
}

I have created a list with random integers to sort. I create an ActorSystem and within that I create the top Sorter. I use the ask pattern with ? to send a Sort message and get back a future. The future holds a value that will become available in the future. Using map on the future, I define what needs to be done once the future has successfully completed. In this particular case, I know the result will be SortResult, so I pattern match that to extract the sorted list. I print the sorted list the same way I printed the unsorted one earlier on.

The ask pattern has an implicit parameter for the timeout, which means it takes its value from the context. This is the reason I have declared the implicit timeout a few lines earlier. Similarly, the futures need an ExecutionContext to run on, and I have defined one by importing the dispatcher of our actor system.

To run the application, I simply run sbt run on the command line from the top folder. You should see something like this

Unsorted: [41,9,2,96,73,42,43,33,27,31,26,35,65,53,44,43,41,66,9,34,9,18,25,84,21,49,69,0,24,83]
Sorted: [0,2,9,9,9,18,21,24,25,26,27,31,33,34,35,41,41,42,43,43,44,49,53,65,66,69,73,83,84,96]

One thing that is convenient in sbt run, is that it can detect multiple entry points for your code and it will list all the options if there are more than one for you to choose. Let’s add a second runner to see how this works. Make a copy of Runner.scala into Runner2.scala in the same folder. Run sbt eclipse and refresh your project in Eclipse. Make sure you also rename the class within the new file into Runner2

object Runner2 extends App {

Now if I run sbt run, I am presented with two options:

Multiple main classes detected, select one to run:

 [1] actormergesort.Runner2
 [2] actormergesort.Runner

Enter number: 1

[info] Running actormergesort.Runner2
Unsorted: [5,36,54,91,35,88,47,14,55,28,6,93,33,4,87,64,6,23,49,64,76,11,18,12,34,55,7,35,2,47]
Sorted: [2,4,5,6,6,7,11,12,14,18,23,28,33,34,35,35,36,47,47,49,54,55,55,64,64,76,87,88,91,93]
[success] Total time: 47 s, completed 09-Nov-2014 10:01:12

Finally, I can run a specific main class like this:

sbt "runMain actormergesort.Runner"

The source code for this tutorial can be found here. I hope this was helpful!

 

comments powered by Disqus