post-photo

How to Write a Custom Protocol for Gatling?

How to Write a Custom Protocol for Gatling?

I like to try out new tools and new things, so I decided to loadtest my newly written “simple chat” application to see how AkkaStreams handles load. Starting with load testing, there are some tools out there. The top three are JMeter, Gatling and Tsug. Tsug is written in erlang, and my half year long erlang course was nothing to show off, so I skipped it immediately. I used JMeter before, but I didn’t want to write a new plugin to it. Gatling stayed. It’s a nice loadtesting tool, written in Scala, and uses netty and akka. It has a not so hard DSL and I read somewhere that it’s easily extendable with self-written custom protocols. That last point was questionable because there is really minimal documentation. (I still have some questions but I reverse engineered the bare minimum for a new and simple protocol.)

text

The docs and the sources

As I said, minimal documentation:

So I think this mini tutorial will be filling a gap 🙂

The preparations

I’m using IntelliJ, and sbt, so let’s start a new project, with sbt and scala version 2.11 (I tried it with 2.12 first but I think Gatling is not compatible with it yet).

We now init a simple testrunner class like this:

package testrunner
 
import io.gatling.app.Gatling
import io.gatling.core.config.GatlingPropertiesBuilder
 
object GatlingRunner {
  def main(args: Array[String]) {
    val simClass = classOf[ExampleSimulation].getName
     
    val props = new GatlingPropertiesBuilder
    props.simulationClass(simClass)
    Gatling.fromMap(props.build)
  }
}

And a simple simulation like this (this is the test from the official documentation):

package testrunner
 
import io.gatling.core.Predef._
import io.gatling.http.Predef._
 
class ExampleSimulation extends Simulation {
    val httpConf = http // 4
      .baseURL("http://computer-database.gatling.io") // 5
      .acceptHeader("text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8") // 6
      .doNotTrackHeader("1")
      .acceptLanguageHeader("en-US,en;q=0.5")
      .acceptEncodingHeader("gzip, deflate")
      .userAgentHeader("Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0")
 
    val scn = scenario("BasicSimulation") // 7
      .exec(http("request_1") // 8
      .get("/")) // 9
      .pause(5) // 10
 
    setUp(// 11
      scn.inject(atOnceUsers(1)) // 12
    ).protocols(httpConf) // 13
}

At this point, you can run the GatlingRunner’s main method from the IDE and this will run a simple test for you.

We will do one more class. A simple tcp server for testing our protocol.

package server
 
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.Tcp.{IncomingConnection, ServerBinding}
import akka.stream.scaladsl._
import akka.util.ByteString
 
import scala.concurrent._
 
object UpperCaseTcp {
  implicit val system = ActorSystem("ServerSys")
  implicit val materializer = ActorMaterializer()
 
  def main(args: Array[String]): Unit = {
    val connections: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind("127.0.0.1", 8888)
    connections runForeach {
      connection =>
        println(s"New connection from: ${connection.remoteAddress}")
        val echo = Flow[ByteString].map(_.utf8String.toUpperCase).map(ByteString(_))
        connection.handleWith(echo)
    }
  }
}

It’s a really basic tcp server. Write some info to the output, and convert the incoming byteStrings to its upper counterpart for every incoming connection. (If you don’t understand the above code you can learn more from the AkkaStreams doc or the activator template.)

So we have a project where we have a testserver now, with basic implementation, and a GatlingRunner which can run (and debug) tests from our IDE.

text

Our protocol

So this protocol will be the bare minimum. We will define our server’s endpoint, and one action called connect. It will connect, send, and recieve some data.

Our simulation will look like this:

package testrunner
 
import io.gatling.core.Predef._
import io.gatling.http.Predef._
import protocol.Predef._
 
class ExampleSimulation extends Simulation {
  val upperProtocol = upper.endpoint("127.0.0.1",8888)
  val scn = scenario("test").exec(upper("user").connect)
  setUp(// 11
    scn.inject(atOnceUsers(5))
  ).protocols(uppreProtocol)
}

(If you copy & paste this code it will be all red, but it’s fine :))

So start with the Predef.

package protocol
 
object Predef {
  val upper = UpperProtocolBuilder
 
  implicit def upperBuilderToProtocol(builder: UpperProtocolBuilder): UpperProtocol = builder.build()
 
  def upper(name: String) = new Upper(name)
}

If you read the code carefully, you can notice that this is the file which we import in our new simulation. This file is the definition for our protocol DSL. The val and the function will lead us to the protocol and action builder, while the implicit builderToProtocol is needed for the framework.

Our next stop is the Protocol:

package protocol
 
import akka.actor.ActorSystem
import io.gatling.core
import io.gatling.core.CoreComponents
import io.gatling.core.config.GatlingConfiguration
import io.gatling.core.protocol.{Protocol, ProtocolComponents, ProtocolKey}
import io.gatling.core.session.Session
 
class UpperProtocol(val address: String, val port: Int) extends Protocol {
 type Components = UpperComponents
}
 
object UpperProtocol {
 def apply(address: String, port: Int) = new UpperProtocol(address, port)
 
 val UpperProtocolKey = new ProtocolKey {
 
  type Protocol = UpperProtocol
  type Components = UpperComponents
 
  override def protocolClass: Class[core.protocol.Protocol] = classOf[UpperProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]
 
  override def defaultProtocolValue(configuration: GatlingConfiguration): UpperProtocol = throw new IllegalStateException("Can't provide a default value for UpperProtocol")
 
  override def newComponents(system: ActorSystem, coreComponents: CoreComponents): UpperProtocol => UpperComponents = {
   upperProtocol => UpperComponents(upperProtocol)
  }
 }
}
 
case class UpperComponents(upperProtocol: UpperProtocol) extends ProtocolComponents {
 def onStart: Option[Session => Session] = None
 
 def onExit: Option[Session => Unit] = None
}
 
case class UpperProtocolBuilder(address: String, port: Int) {
 def build() = UpperProtocol(address, port)
}
 
object UpperProtocolBuilder {
 def endpoint(address: String, port: Int) = UpperProtocolBuilder(address, port)
}

I left these classes and objects in one file due to couple-line implementations, and because they are sticking together. However, it’s reasonable to split it into 2-3 files and take them to a package.

When you say upper.endpoint(…) in the simulation, the calls come from the Predef.upper (which is the ProtocolBuilder). The builder creates an object from the Builder class, which “build” method is implicitly given in the Predef. That’s why the system knows how to build and get the inicialized Protocol.

The Upper is a small class, it collects the reachable actions:

package protocol
 
class Upper(requestName: String) {
  def connect() = new UpperConnectActionBuilder(requestName)
}

It has one action, connect, which creates a new ConnectActionuilder.

Our next (and almost last) stop is the Action:

package protocol
 
import io.gatling.commons.stats.OK
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.action.{Action, ExitableAction}
import io.gatling.core.protocol.ProtocolComponentsRegistry
import io.gatling.core.session.Session
import io.gatling.core.stats.StatsEngine
import io.gatling.core.stats.message.ResponseTimings
import io.gatling.core.structure.ScenarioContext
import io.gatling.core.util.NameGen
 
class UpperConnectActionBuilder(requestName: String) extends ActionBuilder {
 private def components(protocolComponentsRegistry: ProtocolComponentsRegistry) =
  protocolComponentsRegistry.components(UpperProtocol.UpperProtocolKey)
 
 override def build(ctx: ScenarioContext, next: Action): Action = {
  import ctx._
  val statsEngine = coreComponents.statsEngine
  val upperComponents = components(protocolComponentsRegistry)
  new UpperConnect(upperComponents.upperProtocol, statsEngine, next)
 }
}
 
class UpperConnect(protocol: UpperProtocol, val statsEngine: StatsEngine, val next: Action) extends ExitableAction with NameGen {
 override def name: String = genName("upperConnect")
 
 override def execute(session: Session) = {
  val k = new UpperServiceClient(protocol.address, protocol.port)
  val start = System.currentTimeMillis
  k.run
  val end = System.currentTimeMillis
  val timings = ResponseTimings(start, end)
  statsEngine.logResponse(session, name, timings, OK, None, None)
  next ! session
 }
}

The ConnectActionBuilder extends from the Action builder, and because of that, it needs to implement the build method. The builder should call the action itself. It’s a bit complicated at first, but because the protocol holds the connection data, the connect action needs to be able to read the actual protocol. From the builder, we can search our component, and can give – from the protocol – it down to the action. The statsEngine is basically the data holder and aggregator, if you want to log responses you must give it down too. The action itself could be anything. You can create actors, send messages, write or read values from-to the session, log responses, and so on. Our action creates a client, run it, measure the runtime, and log it down. The last line in the execute function is again a bit of blackmagic :) I think this replies back to the runner actor that the actual execution is ended, and this is our new session object.

The Client is a minimal implementation for feeding the server:

package protocol
 
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Tcp}
import akka.util.ByteString
 
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
 
class UpperServiceClient(ip: String, port: Int) {
 def run = {
  implicit val system = ActorSystem("ClientSys")
  implicit val materializer = ActorMaterializer()
 
  val testInput = ('a' to 'z').map(ByteString(_))
 
  val result: Future[ByteString] = Source(testInput).via(Tcp().outgoingConnection(ip, port)).
   runFold(ByteString.empty) { (acc, in) ⇒ acc ++ in }
 
  val res: ByteString = Await.result(result, 10.seconds)
 }
}

(It’s from the activator template, a basic tcp connector stream.) I think the Await is an antipattern in this context, but if I would write it with onComplete, I would need to give down the full parameter list of the statsEnine.logResponse + the StatsEngine itself + force the simulation to wait for the responses, so much noise for a simple piece of code. This is a blocking request for now, only for illustration :).

If you copypasted all the code from above to a project, you can run the server, then the simulation, and see some result on the output. Congrats! You just wrote your first custom protocol for Gatling!

The whole source is avaiable on github.

Some things I am still playing with:

Conclusion

If you want to write a new custom protocol to Gatling, you need to dig deep, and do some guessing, but I think this short tutorial will bring light to some questions. I still have some questions with the inner workings, but I think this post can save some hours for other people… like yourself. :)

Closing words

If you have any advise or proper knowledge in this field, please write a comment, and I will edit this post to became a better source. The code above is just an example and most of it is reverse engineered… or is well educated guessing on my side. Any help or correction is appreciated.

member photo

His precision is only coupled by his attention to detail. (Really.) He is passionate about becoming more and more effective in software development and loves experimenting with new technologies.

Latest post by Gergő Törcsvári

Learning by Doing – BlockChain & Akka Tutorial