post-photo

One Stop Shop for Kotlin Coroutines

Kotlin Coroutines

If you’re already familiar with the basic workings of Kotlin and now you feel the time has come to extend your knowledge a bit further with coroutines (but it looks a little overwhelming), you’ve come to the right place. In this post I will cover the basic concepts and inner workings of coroutines in a plain and simple manner. Before I jump into it, let’s have a high level overview of how threads work in general, what concurrency is and how coroutines come into the picture of all that.

Threads, concurrency, coroutines

We could say that threads are organized units of computations which can be executed by the CPU. In general the CPU’s thread scheduler allocates a certain amount of time for each of these threads to run. Then, after a certain period of time, they are switched to another thread to start work. It simulates as if they worked simultaneously. But how many threads can run in a truly parallel manner? Well, it’s defined and limited by the number of cores in the CPU. A single-core CPU can only process one thread at a time but a dual-core CPU can work with two threads at the very same time and so on… That’s how true parallel work can be achieved with threads.

Concurrency on the other hand is the concept aiming at proper structuring and managing of the various computations in a way that they can be executed independently from each other as efficiently as possible. So a task could be chopped into several parts then they could get executed on different CPU cores, in different threads. And this is where coroutines come into play.

Conceptually coroutines are somewhat similar to threads in regards of how they manage units of work but they are running ON threads. So why use them instead of threads? Well, creating more threads to achieve a higher level of concurrency can become very costly at the end due to their memory allocation requirement. Creation of coroutines on the other hand is much cheaper, you can create tens of thousands of them without any problem. Coroutines are not bound to any specific thread, execution can start on one thread then continued on another. They are non-blocking, they can suspend at suspension points (which are managed on code level) to give up execution time and free up the thread’s resources for other coroutines that have been dispatched to the same thread. So what does all that mean? Let’s say we start some work in coroutine A, which is executing on thread X. It’s running on its own then at a certain point we would need the result of some heavy calculation or some network request to continue. This other task is wrapped in another coroutine, coroutine B and it is dispatched to another thread, Y. So what happens is, execution of coroutine B will start on thread Y but instead of coroutine A waiting for B to finish, it suspends its work and says hey, someone can take my place on thread X while I wait for coroutine B. When the result from B is ready, A will continue execution from the same context.

This is just one simple example but you get the idea hopefully 🙂 Let’s move on now to how actual creation of Kotlin’s coroutines with the various building blocks can happen.

Declaring coroutines

The top of the coroutine structure hierarchy is occupied by CoroutineScopes. Every coroutine has to live in a CoroutineScope, there is no way around. CoroutineScopes ensure that the lifespan of coroutines which belong under the same umbrella in our application can be managed properly so for example they don’t outlive the entity which they are related to. CoroutineScopes are defined with their CoroutineContext variable which can be constructed from a couple of types of elements and give us the execution context for the scope. Once you have the scope with the context, you can start to declare coroutines with coroutine builders and nesting suspending functions (I will get to these later in the post, for now just go with the image of regular functions which can suspend execution in the way of the example in the intro). Now, when I mention that coroutines have to live in scopes what I mean is that builders are extensions of the CoroutineScope interface so trying to declare them outside of the scope on the regular randomly will result in your IDE starting to cry. And same goes for suspending functions when you try to access them from regular code.

CoroutineScope, CoroutineContext

There are two ways you can declare a CoroutineScope, either by extending a class (so the whole class becomes a scope) and override the scope’s coroutineContext variable:

class SomeClass : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

Or you can declare it as a variable by instantiating the CoroutineScope interface and passing the context elements as arguments then use this variable to extend coroutine builders from:

val someCoroutineScope = CoroutineScope(EmptyCoroutineContext)

If possible, I prefer the first way over the second, that way you can save repetition of using the scope variable whenever you have to use a new builder. Note: instantiation with EmptyCoroutineContext is not good practice, I did it only because we’ve yet to go through the types of context elements but let’s take care of that now.

There are four types of elements that can be used to construct the CoroutineContext: Job, ContinuationInterceptor, CoroutineExceptionHandler, CoroutineName. Let’s take a closer look at each: Job: Jobs are basically entities for modelling the lifecycle of coroutines and to represent a hierarchical structure between them. They can be arranged in a parent-child relation (with the usage of the Job’s (‘parent’) parameter) with the following rules between them:

// the job passed as argument to the child job's constructor doesn't necessarily have to be SupervisorJob,
// just be conscious about the kind of behavior you would like to achieve with each different situation.
val parentJob = SupervisorJob() // or Job()
val someJob = Job(parentJob)

Continuation Interceptor: This element is responsible for managing the thread pool the coroutine is running on as well as handling the suspension and resuming that happens during execution. These interceptors in Kotlin’s coroutines are implementations of CoroutineDispatcher class. The out of the box dispatchers are:

val IO = Executors.newFixedThreadPool(2).asCoroutineDispatcher()

CoroutineName: Adding it to the mix comes in handy when it comes to debugging.

val someCoroutineName = CoroutineName("someCoroutineName")

CoroutineExceptionHandler: Having an exception handler in the context of coroutines is somewhat a must if you don’t want your app to crash when some coroutine throws an exception for some reason. Since exceptions are propagated upwards in the coroutine hierarchy, it is good practice to have an exception handler in the scope’s context to surely catch any thrown errors. Be careful! If a child coroutine has exception handler in its context but its parent doesn’t have one then when the parent rethrows the exception, the app will crash. Also worth noting: when multiple errors being catched by the handler, only the first one will be handled, others will get swallowed up.

val someExceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
     val coroutineName = coroutineContext[CoroutineName]?.name ?: "default coroutine name"
     println("Error in $coroutineName: ${throwable.localizedMessage}")
}

Concatenating the context elements is this easy, thanks the plus operator function in the CoroutineContext interface:

val someCoroutineContext = someJob + Dispatchers.IO + someCoroutineName + someExceptionHandler

Coroutine builders

Now that the basics of scope and context have been covered we can finally start looking at builders and how they work and behave. In this section I’ll go through the torch-bearers, launch and async (+ runBlocking) but later in the post we’ll get to some more other really worthy builders.

launch {
   delay(1000)
   println("Hello from first launch")
}

launch {
   println("Hello from second launch")
}

Launch doesn’t return any result but it still has a return type which is a Job. So if you declare a launch block with a variable, you can control the lifecycle of it. More on this later but first let’s check async and the somewhat odd one out builder, runBlocking.

launch {
   val someAsyncTask: Deferred<String> = async {
       delay(1000) // just pretending to do something heavy...
       return@async "This string is the result"
   }
   println(someAsyncTask.await())
}

Question you might ask: why did I put all this in a launch block? Because the await() function, as mentioned before, is a suspending function thus it can be called only from another coroutine or another suspending function (and those can be called from some coroutine as well at the end of the day). It makes sense if you think about it. What is the thing that would suspend in a regular code without coroutines if the code would just hit the await()? The scope itself is just a scope, not a coroutine, it cannot suspend :)

runBlocking(Dispatchers.IO) {
    launch {
        repeat(5) {
            println("counting my fingers ${it + 1}")
            delay(1000)
        }
    }
}
println("geez, 5 seconds wasted")
val context = Dispatchers.IO + CoroutineName("parent")

launch(context) {
    println(coroutineContext[CoroutineName]?.name)

	val coroutineName = CoroutineName("inner launch")

	launch(coroutineName) {
	    println(coroutineContext[CoroutineName]?.name)
	}
}

output: parent inner launch

And second, launch and async both start their execution immediately however they both have a ‘start’ parameter which can be useful when you would want to tie the start of the execution to some event for example:

val someJob = launch(start = CoroutineStart.LAZY) {
    // some code //
}
// *** //
someButton.setOnClickListener {
    someJob.start()
}

More on Jobs

What options do we have after execution has started? How can we cancel jobs? How can we make sure a particular launch finished before starting a new one? When do jobs complete? Fortunately these things were made pretty straight forward by the engineers at Jetbrains. As for cancellation, you just simply have to call cancel() function on the job. To cancel a scopes job is as simple as the following:

class SomeViewModel : ViewModel(), CoroutineScope {
    private val supervisorJob = SupervisorJob()
    override val coroutineContext: CoroutineContext
        get() = Dispatchers.IO + supervisorJob

    override fun onCleared() {
        supervisorJob.cancel()
        super.onCleared()
    }
}

To cancel just the coroutines inside a scope or children jobs of a parent job without cancelling the parent job itself you could simply call the cancelChildren() function on the parent job.

Cancelling a coroutine will immediately transition it from the active to the cancelled state but it will be considered complete only when all code inside of the coroutine has finished execution and all children coroutines it might contain have also been completed. To check the state of jobs in code is as easy as checking the the job instance’s isActive, isCancelled, isCompleted boolean values.

Last thing is if you have to make sure a launch is completed before going forward with execution of the coroutine; use the join() function. This is the sibling of Deferred type’s await() function by the way.

launch {
    val thisJobIsDelayed = launch {
        delay(2000)
        println("Thanks for waiting, I know it took 2 seconds")
    }

    thisJobIsDelayed.join()
    println("Finally we can move on..")
}
// output:
// Thanks for waiting, I know it took 2 seconds
// Finally we can move on..

The same code with cancelAndJoin() will cancel the job and then the continue execution with the rest of the coroutine. And if you could group together several jobs by putting them in a list, you could use the joinAll() function to wait for all of them to complete before moving on.

Suspending functions

The term suspending functions and suspension have been brought up a few times in the post but I haven’t really gone into how these functions’ work. If we look under the hood, these functions are just like regular functions with an addition of an implementation of a Continuation interface. This interface includes the context which is the CoroutineContext of course and a resume function which can return some type of value or with an exception.

Suspending functions can be called from coroutines only and at some point in the function there has to be a suspension point (in this case it is the await() function) where the function will suspend.

suspend fun someSuspendingFunction() {
    val someComputation1 = async { /* doing some work */ }
    // some more work maybe //
    return someComputation1.await()
}

With that said, don’t be surprised if you declare a function with a suspend modifier that does not suspend at any point. It will be treated as a regular function with a coroutine and the suspend keyword will become useless and can be deleted as launch by itself is not a suspending coroutine:

suspend fun someSuspendingFunction() = launch {  // no use of suspend
	// doing some non suspending work
}

Other coroutine goodies

At this point, the essential basics of coroutines have been covered but there is some other stuff worth to mentioning that can be used to further tweak coroutines.

Both functions inherit the outer scope’s context and the mechanics of the isolation is due to the fact that they both override the context’s job with a new one, coroutineScope with plain Job(), supervisorScope with SupervisorJob().

suspend fun getSomeList(): List<Int> = suspendCancellableCoroutine { continuation ->
    val result = ArrayList<Int>().apply {
        repeat(5) { number ->
            add(number * 2)
        }
    }

    continuation.invokeOnCancellation { println("getSomeList did not return due to cancellation") }
    continuation.resume(result)
}

withContext()

This builder is used to create a coroutine with the context passed as an argument, suspends until its block completed execution, then returns with a result.

Combining it with coroutineScope:

suspend fun updateTextView(): Boolean = coroutineScope {
    val resultAsync = async(Dispatchers.IO) { "result of async" }

    withContext(Dispatchers.Main) {
        someTextView.text = resultAsync.await()
        return@withContext true
    }
}

suspendCoroutine()

This function suspends the caller coroutine until the block returns with the usage of the continuation’s resume function. (actually I found suspendCancellableCoroutine to be better because it provides an invokeOnCancellation method to run upon cancelling the outer coroutine)

suspend fun getSomeList(): List<Int> = suspendCancellableCoroutine { continuation ->
    val result = ArrayList<Int>().apply {
        repeat(5) { number ->
            add(number * 2)
        }
    }

    continuation.invokeOnCancellation { println("getSomeList did not return due to cancellation") }
    continuation.resume(result)
}

withTimeOut()

A way to limit computation time for suspending blocks can happen with usage of either withTimeOut() or withTimeOutOrNull(). Both are pretty self explanatory. Their parameter is the time limit of execution in milliseconds. If execution has not been finished within the given time frame, the coroutine will throw an exception or in case of withTimeOutOrNull() it will return null.

This example function will happily return:

suspend fun someFunctionWithTimeout(): String = withTimeout(1000) {
    val result = async {
        delay(900)
        return@async "We did it!! :)"
    }
    return@withTimeout result.await()
}

Channels

(some parts are still in experimental!)

Channels are conceptually similar to a pipeline. You can send and receive different types of elements to and from them in a first in first out manner. They have different types of buffer capacities: Rendezvous, Conflated, Unlimited and you can define a custom capacity as well:

These capacities represent Int values, 0 for Rendezvous, -1 for Conflated, Int.MAX_VALUE for Unlimited. Besides these, you can specify a custom Int as well and then it will become the buffered capacity of channel.

The types of channels in coroutines are SendChannel and ReceiveChannel and there is also BroadcastChannel. A SendChannel is the type of channel you can send elements to while ReceiveChannel is the channel you receive elements from and BroadcastChannel is a special kind of SendChannel, you can subscribe to its elements and receive items from it from multiple places from your code.

You might ask how can these channels be useful if on their own they do just one part of the work? Well, to get around these, you could instantiate a Channel variable which implements both SendChannel and ReceiveChannel interfaces for example and that could be one solution. But coroutines have special builders for the various types of channels. These builders, of course, have the capacity parameter and also the context parameter just like other coroutine builders:

val someProducer: ReceiveChannel<Int> = produce {
    repeat(3) {
        send(it)
    }
}

And then somewhere else in the code you can retrieve the elements from the variable:

launch {
    repeat(3) {
        println(someProducer.receive())
    }
}

actor

The other builder is the actor. And the way of actor is somewhat similar to produce but the roles are the opposite. Actor will return a SendChannel but the block’s receiver is an ActorScope which implements the ReceiveChannel interface. So, instead of sending the values in the block and extracting them elsewhere, you can define what kind of work you would want to do with the elements which will be sent into the channel.

val someActor: SendChannel<String> = actor(Dispatchers.Main) {
    for (msg in channel) { 					// channel is the ActorScope's own variable 
        someTextView.text = msg
    }
}

Then using the actor from somewhere else in the code:

launch {
    repeat(10) {
        delay(1000)
        someActor.send((it + 1).toString())
    }
}

Again, this is not something you would probably use channels for but imagine for example if you’ve defined a sealed class with different types of classes inside and the actor’s type would be this sealed class you could then use a ‘when’ block to route the work to different worker parts of the code.

Actors are also a great way to deal with shared mutable states aka variables that can be accessed from different parts of the code because coroutines are executed sequentially, one after an other and inside the actor, the shared state is updated through the channel’s messages which also happens sequentially.

And this will update the someTextView’s text each second with numbers from one to ten.

Again, this is not something you would probably use channels for but imagine for example if you’ve defined a sealed class with different types of classes inside and the actor’s type would be this sealed class you could then use a ‘when’ block to route the work to different worker parts of the code.

Actors are also a great way to deal with shared mutable states aka variables that can be accessed from different parts of the code because coroutines are executed sequentially, one after an other and inside the actor, the shared state is updated through the channel’s messages which also happens sequentially.

Closing channels

In the case of produce, once its block reached its end the SendChannel will be closed for sending so it’s taking care of itself automatically. But in the case of actor, you have to close the channel manually once it’s not needed anymore with the close() function.

offer(), poll()

Also worth to mention some subtleties for ways of sending and receiving from channels which could happen also with offer() and poll() functions. In the case of a SendChannel you can use the offer to try sending elements to the channel which will succeed only if the capacity of the channel allows sending more elements. It returns a Boolean as an indicator for that. With ReceiveChannel poll will try to retrieve an element if it can or returns null if the channel is empty.

Select

(some parts are still in experimental!)

A select clause is also suspending function and it does great work when it comes to select coroutines which meet some kind of criteria first or fails when it cannot happen for some reason.

In case of async tasks it selects whichever deferred is resolved first:

select<Unit> {
    someAsyncTask.onAwait { result ->
		println(result.toString())
    }
    someOtherAsyncTask.onAwait { result ->
        println(result.toString())
    }
}

In case of ReceiveChannels it will select whichever channel received an element then executes the block with the value:

select<Unit> {
    someReceiveChannel.onReceive { value ->
        println(value.toString())
    }
    someOtherReceiveChannel.onReceive { value ->
        println(value.toString())
    }
}

In case of SendChannels select will choose the channel where the given parameter can be sent first. The parameter’s type has to match with the type of the channels you send it in (so in this case both channels have to be of type String):

val param = "May the first available channel take me"

select<Unit> {
    someSendChannel.onSend(param) {
        println(param)
    }
    someOtherSendChannel.onSend(param) {
        println(param)
    }
}

For simplicity I used Unit as a return type in each case but select can return with other types as well, depending on what type of value will be returned from the blocks.

select<String> {
    someReceiveChannel.onReceive { value ->
        return@onReceive "${value.toString()} received in someReceiveChannel"
    }
    someOtherReceiveChannel.onReceive { value ->
        return@onReceive "${value.toString()} received in somOthereReceiveChannel"
    }
}

Closing thoughts

Coroutines in real code with real world use cases can get a bit more complex of course. But with some looking under the hood investigation of the various components and with a couple of hours of playtime, you can get your head around each topic pretty easily and from that point it will look much simpler and more understandable than it seems at first sight. Don’t be shy to check on others’ solutions as well, you can always learn a lot from them and since coroutines left its crib with stable versions late October ’17, I think we can expect more and more solutions built with this technology will surface in the future. Also don’t forget to give yourself some pats on the back for getting through this post and if you feel like sharing a thought or asking something please hit the comments section 🙂

Editor’s note: Wanari is a custom software development company, established in 2000. We are committed to technological excellence and building functional, robust and quality software. Check out our website to learn more about what we do.

member photo

He is an eager-to-learn junior developer!

Latest post by Csaba Szenczi

One Stop Shop for Kotlin Coroutines