post-photo

Where we left off

In the last two posts, we reached the capabilities of a simple ray-tracer. We now have two types of shapes (sphere and plane), we can calculate shadows, reflections and refractions recursively. Before we move onto the more computation-heavy shapes, I am curious about how I could move our computations to separated nodes. So we will first build a cluster!

text

Cluster

Before this ‘project’ I had never tried Akka clusters, so I learned a lot now! After I read the docs (at least twice), I had a clear vision as to what the thing is I want to achieve. I want a grid, built from worker nodes, and some ‘master’ node. The master nodes when joining a cluster tell the actual scene to the worker nodes, so these workers can build the scene locally. When a worker has finished building the scene and is ready to compute rays, it asks for work from the corresponding master, solves the task and sends the result back. I will split the app at the renderer-scene connection and divide it to renderer-masterNode–[cluster]–workerNode-scene.

The responsibilities:

Worker

  1. Can build a scene from some kind of message
  2. Can trace master-scene pairs
  3. Can request tasks, compute it on the given scene and send the results back
  4. Can tear down a scene when the master leaves the cluster (due to error or work completion)

Master

  1. Can tell what scene it wants to raytrace
  2. Can start a renderer
  3. Can trace which trace messages are requested, and which are answered by workers
  4. Can distribute work

At my first iteration I wanted to achieve W3 and M2, M3, M4. This is just splitting the renderer and the computation to two nodes. Before we start, I want to speak about my expectations in performance. On my computer, the time to render the previous scene was about 6.3-6.8 seconds. We will use a new transport layer, run two actor systems instead of one. My expectation was that it will need about 2-3x more time to compute the same picture. My first tries with the cluster implementation needed 10-12x more time. It was dissapointing. I asked some questions in the akka gitter channel, got some ideas and started experimenting with things. The end result is about 4x slower than the original single-machine-code. But with a more computation-hungry scene and with more nodes (which are actually on other computers), I think this slow-down can be reversed at some point. (I will try to tune the performance of my implementation maybe in the next post, and measure its speed in a bigger test environment, but now I left it like this.)

We want a cluster! We need to modify configs first. Create a new resource folder below the src/main and create a new file (cluster.conf).

akka {
  actor {
    provider = cluster
    warn-about-java-serializer-usage = off
  }
  remote {
    log-remote-lifecycle-events = off
    artery {
      enabled = on
      canonical.hostname = "127.0.0.1"
      canonical.port = 0
    }
  }
 
  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551",
      "akka://ClusterSystem@127.0.0.1:2552"]
    metrics.enabled = off
    min-nr-of-members = 2
    jmx.multi-mbeans-in-same-jvm = on
  }
}

Some words about what this is, what I tried before these values and so on. First of all, you need to change the actor provider to cluster because we want a cluster :) .

If you read the docs, they will write about the options to change the java serializer and use protobuff or kyro or even something that you wrote. I tried out Chill from Twitter and Kyro. Both produced some warning to the output, and in my case they didn’t modify the performance in any way (the performance was poor because of other things which you can see below). So because of the warnings and the lack of performance improvements, I simply switched back to the java build in serializer and turned off the warning.

I tried out both netty and artery. In my case, they worked with the same performance, artery was maybe a bit better, and it’s newer and cooler so I chose that. (I left the netty-specific codes commented out in case I change my mind.)

I added two seed node addresses, turned off the old metrics, set the minimum number of members to two, and enabled the “multi mbeans in same jvm” (because I will run the master and the worker from one method while playing).

Back to some naive implementation vs performance tuning. Our application is cpu and message heavy. The renderer sends out a lot of trace messages. The naive implementation would be

“W:give me one trace,

M:trace,

W:(after computing)done,

W:give me one trace”.

This will distribute the work to the node that is free, but the communication and waiting cost between the “give me one” and the “trace” is a waste of time. We could, on the other hand, batch messages like

“W:give me work,

M:trace,

M:trace,

M:trace,

M:trace…” or

“W:give me work,

M:Seq[trace]”.

In my tests the second method was far faster (10x slower vs 20x slower than the nonclusterd version). So I will batch the master→worker direction. I tested the batching in the other direction too, but it was slower. I have theories as to why I had to batch to the one direction and not batch in the others, but if someone wants to share their own, leave a comment below. 😀

package object cluster {
  case class JobPackage(traces: Seq[Trace])
  case object WorkerRegistration
  case object NeedWork
}

Take the cluster things to a subpackage with their own package object and with the communication classes.

object WorkerNode {
  def main(args: Array[String]): Unit = {
 
    val port = if(args.isEmpty) "0" else args(0)
 
    val config = ConfigFactory.parseString("akka.cluster.roles = [worker]").
      //withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")).
      withFallback(ConfigFactory.parseString(s"akka.remote.artery.canonical.port=$port")).
      withFallback(ConfigFactory.load("cluster"))
 
    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[WorkerNode], name = "workerNode")
  }
}

The worker node has the main function with port config and worker node actor start.

class WorkerNode extends Actor with ActorLogging {
 
  val cluster = Cluster(context.system)
 
  override def preStart(): Unit = {
    cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
  }
 
  override def postStop(): Unit = cluster.unsubscribe(self)
 
  var scene: ActorRef = _
  var master: ActorRef = _
  var counter = 0
 
  override def receive: Receive = {
 
    case MemberUp(member) =>
      if(member.hasRole("master")) {
        context.actorSelection(RootActorPath(member.address) / "user" / "masterNode") ! WorkerRegistration
      }
 
    case "createScene" =>
      master = sender
      scene = createScene()
      master ! NeedWork
 
    case JobPackage(traces) =>
      counter += traces.size
      traces.foreach(x => scene ! x)
 
    case msg: ColorMessageAns =>
      counter -= 1
      master ! msg
      if(counter == 0)
        master ! NeedWork
  }
 
  def createScene(): ActorRef = {
    implicit val epsilon = 0.00001
    val light = Light(Color(1, 1, 1), Point(200, 200, 0), Color(0.2, 0.2, 0.2))
    val scene = context.actorOf(Scene.props(light))
 
    val sphere = context.actorOf(Sphere.propsWithReflective(Point(-100, -200, 600), 100, Color(0.6, 0, 0), scene)(Color(0.17, 1.5, 0.17), Color(1.8, 3.1, 0.9)))
    val sphere2 = context.actorOf(Sphere.props(Point(150, -200, 500), 100, Color(1, 1, 1), scene))
    val sphere3 = context.actorOf(Sphere.propsWithRefractive(Point(25, -200, 325), 100, Color(0, 0, 0), scene)(Color(1.13, 1.13, 1.13), Color(1.0, 1.0, 1.0)))
    val plane = context.actorOf(Plane.propsWithReflective(Point(0, -300, 0), Vec(0, 1, 0), Color(1, 1, 1), scene)(Color(0.17, 0.35, 1.5), Color(3.1, 2.7, 1.9)))
 
    scene
  }
}

Dirty implementation. But the WorkNode actor is not complex.

At the start, it subscribes to memberUp events, at the stop it leaves the cluster.

When a master

(We didn’t need to touch the scene code. This is a good sign.)

object MasterNode {
  def main(args: Array[String]): Unit = {
 
    val port = if(args.isEmpty) "0" else args(0)
 
    val config = ConfigFactory.parseString("akka.cluster.roles = [master]").
      //withFallback(ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")).
      withFallback(ConfigFactory.parseString(s"akka.remote.artery.canonical.port=$port")).
      withFallback(ConfigFactory.load("cluster"))
 
    val system = ActorSystem("ClusterSystem", config)
 
    val camera = Camera(Point(0, 0, -500))
 
    system.actorOf(Props(
      new MasterNode(600, 600, 5, camera, "test.png")
    ), name = "masterNode")
  }
}

The MasterNode object’s main function is nearly the same as the WorkerNodes’. (We only change the role string, and the actor init.)

class MasterNode(width: Int, height: Int, iterations: Int, camera: Camera, filename: String) extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  val batchSize = 750
  var workers = IndexedSeq.empty[ActorRef]
  var renderer: ActorRef = _
  val tracemap = collection.mutable.Map.empty[String, (Trace, Long)]
 
  def now(): Long = System.currentTimeMillis
 
  override def preStart(): Unit = {
    renderer = context.actorOf(ImageRender.props(width, height, iterations, camera, filename, self))
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
      classOf[MemberEvent])
  }
 
  override def postStop(): Unit = cluster.unsubscribe(self)
 
  override def receive: Receive = {
 
    case WorkerRegistration if !workers.contains(sender()) =>
      if(workers.isEmpty) renderer ! "start"
      context watch sender()
      workers = workers :+ sender()
      sender() ! "createScene"
 
    case msg: ColorMessageAns =>
      if(tracemap.get(msg.id).isDefined) {
        tracemap -= msg.id
        renderer ! msg
      } else {
        log.debug(msg.toString)
      }
 
    case trace: Trace =>
      tracemap += trace.id -> (trace, 0L)
 
    case NeedWork =>
      if(tracemap.nonEmpty) {
        val start = now()
        val worklist = tracemap.values.toSeq.sortBy(x => x._2).take(batchSize).filter(x => x._2 < (start - 100)).map(x => x._1)
        if(worklist.nonEmpty) {
          worklist.foreach(x => tracemap += x.id -> (x, start))
          sender ! JobPackage(worklist)
        }
      }
  }
}

In the implementation of the MasterNode we are waiting for worker registrations. If we know them, we add them to our inner list (if we get the first one to start the render), and send them a “createScene” message. We can handle trace messages from the renderer, and write them to an inner map. We can handle the ColorMessageAns-es from the workers. If the ColorMessageAns-es are on this map, we remove them and relay to the renderer (and if they are not in the map, we get this answer twice, so log it out and drop). And if the worker needs work, we are making a jobPackage from the last sent and not-yet-received jobs. (This part of the code can cause a deadlock right now, because of the filter and if some messages were dropped. (I have never seen this. I just wanted to mention that I know it’s not safe right now.))

object Main {
  def main(args: Array[String]): Unit = {
    WorkerNode.main(Seq("2551").toArray)
    //WorkerNode.main(Seq("2553").toArray)
    MasterNode.main(Seq("2552").toArray)
  }
}

If we modify the Main function, we can run our ‘cluster’, and it will produce a picture. Small win because of the performance drop, but it can do it :). An even bigger win is that we can wrap the functionality of the original code without any modification!

At this point, I made a small refactor and took all the shape-related objects to a ‘shapes’ package (Shape, Sphere, Plane, Reflective, Reflactive).

Cluster phase 2

We have a working cluster implementation, but we can handle only one master, and we need to redeploy the workers if we want a new scene. Let’s make our scene be dynamically built by initial data. For this we will need to provide a list of shape-data, and we need to build concrete shape actors from this list. And we need to stop there for a moment. Whose responsibility is it to know how to create the concrete actor from the data? We had a little discussion about this matter and we came up with three insufficient solutions and I chose the least bad.

The ideas:

Here is in my implementation, but I’m not so sure that there is not a better (scala) solution, so if you have an alternative method, please make a comment below.

Let’s start to refactor all the things!

object Shape{
  trait ShapeData{
    def props(scene:ActorRef)(implicit epsilon: Double): Props
  }
    ...
}
abstract class Shape(val color: Color, scene: ActorRef)(implicit epsilon: Double) extends Actor {
    ...
  // we no longer need this
 /* override def preStart(): Unit = {
    scene ! AddShape(self)
  } */
    ...
}
object Sphere {
 
  case class SphereData(origo: Point, radius: Double, color: Color) extends ShapeData {
    def props(scene: ActorRef)(implicit epsilon: Double) = Sphere.props(origo, radius, color, scene)
  }
 
  case class ReflectiveSphereData(origo: Point, radius: Double, color: Color, fr: Color, kappa: Color) extends ShapeData {
    def props(scene: ActorRef)(implicit epsilon: Double) = Sphere.propsWithReflective(origo, radius, color, scene)(fr, kappa)
  }
 
  case class RefractiveSphereData(origo: Point, radius: Double, color: Color, fr: Color, kappa: Color) extends ShapeData {
    def props(scene: ActorRef)(implicit epsilon: Double) = Sphere.propsWithRefractive(origo, radius, color, scene)(fr, kappa)
  }
  ...
}
object Plane {
 
  case class PlaneData(point: Point, normalVector: Vec, color: Color) extends ShapeData {
    def props(scene: ActorRef)(implicit epsilon: Double): Props = Plane.props(point, normalVector, color, scene)
  }
 
  case class ReflectivePlaneData(point: Point, normalVector: Vec, color: Color, fr: Color, kappa: Color) extends ShapeData {
    def props(scene: ActorRef)(implicit epsilon: Double): Props = Plane.propsWithReflective(point, normalVector, color, scene)(fr, kappa)
  }
  ...
}

Create the shape-data objects with the proper “factory” call.

object Scene {
  def props(light: Light, objectsData: Seq[ShapeData])(implicit epsilon: Double) = Props(new Scene(light, objectsData))
  ...
}
class Scene(light: Light, objectsData: Seq[ShapeData])(implicit epsilon: Double) extends Actor {
  ...
  override def preStart(): Unit = {
    createObjects()
  }
  override def postStop(): Unit = {
    objects.foreach(x => x ! PoisonPill)
  }
  ...
    /* case AddShape(obj) =>
      objects :+= obj */ //we dont need this anymore
  ...
  private def createObject(obj: ShapeData) = {
    objects :+= context.actorOf(obj.props(self))
  }
 
  private def createObjects() = {
    objectsData.foreach(obj => createObject(obj))
  }
}

Modify the scene to create itself from shape-data.

class ImageRender(width: Int, height: Int, iterations: Int, camera: Camera, filename: String, scene: ActorRef) extends Actor {
  val idMap = collection.mutable.Map.empty[String, (Int, Int)]
  ...
  def uuid = java.util.UUID.randomUUID.toString
  ...
  def startTrace: BufferedImage = {
    ...
        val ray = Ray(startingPoint, (startingPoint - camera.place).normalize)
        val id = uuid
        idMap += id -> (x, y)
        scene ! Trace(id, ray, iterations)
    ...
  }
  def writeToImage(msg: ColorMessageAns) = {
    val coords = idMap(msg.id)
    image.setRGB(coords._1, height - coords._2 - 1, msg.color.getImageColor)
    idMap -= msg.id
  }
}

It may not be a must-have-step. But you should make the ids more id-like in order to remove possible collisions due to the “x,y”-like id generating from the masters.

We can delete the AddShape from the package object.

package object cluster {
  ...
  case class CreateScene(light: Light, objectsData: Seq[ShapeData])
  ...
}

Create a CreateScene message object in the package.

object MasterNode {
  def main(args: Array[String]): Unit = {
    ...
    val camera = Camera(Point(0, 0, -500))
 
    val light = Light(Color(1, 1, 1), Point(200, 200, 0), Color(0.2, 0.2, 0.2))
 
    val sphere = ReflectiveSphereData(Point(-100, -200, 600), 100, Color(0.6, 0, 0), Color(0.17, 1.5, 0.17), Color(1.8, 3.1, 0.9))
    val sphere2 = SphereData(Point(150, -200, 500), 100, Color(1, 1, 1))
    val sphere3 = RefractiveSphereData(Point(25, -200, 325), 100, Color(0, 0, 0), Color(1.13, 1.13, 1.13), Color(1.0, 1.0, 1.0))
    val plane = ReflectivePlaneData(Point(0, -300, 0), Vec(0, 1, 0), Color(1, 1, 1), Color(0.17, 0.35, 1.5), Color(3.1, 2.7, 1.9))
 
    system.actorOf(Props(
      new MasterNode(600, 600, 5, camera, "test.png", light, Seq(sphere,sphere2,sphere3,plane))
    ), name = "masterNode")
  }
}
class MasterNode(width: Int, height: Int, iterations: Int, camera: Camera, filename: String, light: Light, objectsData: Seq[ShapeData]) extends Actor with ActorLogging {
  ...
    case WorkerRegistration if !workers.contains(sender()) =>
      if(workers.isEmpty) renderer ! "start"
      context watch sender()
      workers = workers :+ sender()
      sender() ! CreateScene(light,objectsData)
  ...
}

Move the scene definition to the main as a Seq[ShapeData]. When a worker registers, send the scene-data to it.

class WorkerNode extends Actor with ActorLogging {
  implicit val epsilon = 0.00001
  val cluster = Cluster(context.system)
  val resolverMap = collection.mutable.Map.empty[ActorRef, ActorRef]
  val mastersMap = collection.mutable.Map.empty[Address, ActorRef]
  
  ...
  
  override def receive: Receive = {
    case MemberUp(member) =>
      if(member.hasRole("master")) {
        context.actorSelection(RootActorPath(member.address) / "user" / "masterNode") ! WorkerRegistration
      }
 
    case UnreachableMember(member) =>
      cleanupDownedMasters(member)
 
    case MemberRemoved(member, previousStatus) =>
      cleanupDownedMasters(member)
 
    case _: MemberEvent => // ignore
     
    case CreateScene(light, objectsData) =>
      val scene = context.actorOf(Scene.props(light, objectsData))
      mastersMap += sender.path.address -> sender
      resolverMap += scene -> sender
      resolverMap += sender -> scene
      sender ! NeedWork
 
    case JobPackage(traces) =>
      val scene = resolverMap(sender)
      counter += traces.size
      traces.foreach(x => scene ! x)
 
    case trace: Trace =>
      val scene = resolverMap(sender)
      counter += 1
      scene ! trace
 
    case msg: ColorMessageAns =>
      val master = resolverMap(sender)
      counter -= 1
      master ! msg
      if(counter == 0)
        master ! NeedWork
  }
 
  private def cleanupDownedMasters(member: Member) = {
    if(mastersMap.get(member.address).isDefined) {
      val master = mastersMap(member.address)
      val scene = resolverMap(master)
      resolverMap -= scene
      resolverMap -= sender
      mastersMap -= member.address
      scene ! PoisonPill
    }
  }
}

It seems like a huge difference at first, but it isn’t. When a master sends the worker a CreateScene message, then the worker creates a scene with the given data. It needs to relay the traces from the master to the scene and back from the scene to the master. We need to make a lookup function for both directions. So I inserted the ActorRef↔ ActorRef both as key-values and value-key to a map. From there, I can resolve the destination from the sender to both directions. When a master is going down or becomes unreachable, the worker needs to tear the scene down. This is our cleanup function.

At this point, we can start some workers on machines (and we can scale up at any time), they will create a compute-grid, and when we start the masters, they will distribute and compute the work on this worker grid.

Some words about the performance again. Both the worker and the master can be memory-hungry if you let them. Both actor systems eat up about 20-50Mb of memory without doing anything (I think this is relatively good of a jvm application). When a master starts to compute (and is not heap-limited), it can reach about 1.5Gb of memory, (which is terrible if you ask me,) but seeing the graphs, this is only because the gc is not executed frequently. The worker node is a bit better, I didn’t see it eating up more than 350Mb of memory. I tried both of them out with -Xmx256m. They did not die with an “out of memory” exception, the gc was just working a lot more (and the time it took to be executed was increased by 10-15%).

Summary

At the end of the day, we could separate our tracer logic from our scene and image renderer. We now can start up a (static) worker-grid and add master-nodes to distribute and compute the scene on it. I left some “possibly problematic” decisions in the code, so some edge cases can deadlock our communication, but the happy cases work flawlessly. The performance is worse than the single jvm one. The interesting question is how it will perform if we use more comutation-heavy objects. (In the near future I want to write a cylinder and metaball implementation too in order to test how flexible my initial implementation is, and uncover how I can play with performance tuning.)

This is a “learning” tutorial for both the readers and the author, so if you have any suggestions or questions, feel free to comment below!

PS.: The fourth and final part of this series is coming soon! Stay uptodate by liking us on Facebook or following us on Twitter!

member photo

His precision is only coupled by his attention to detail. (Really.) He is passionate about becoming more and more effective in software development and loves experimenting with new technologies.

Latest post by Gergő Törcsvári

Learning by Doing – BlockChain & Akka Tutorial