Unit Test with Kotlin Flow

Florent Blot
Level Up Coding
Published in
6 min readApr 3, 2022

--

Photo by Michael Dziedzic on Unsplash

When we develop a feature, we need to write unit tests for each section of the functionality. It is a huge and necessary part of the development. This guarantees the correct functioning of the code over future changing / refactoring.

Kotlin Flow is no exception. In our Android app, we used Kotlin Flow and we had to test the functionality of these streams. While this topic summarizes its application, we honestly were exalted that writing unit tests of flows was straightforward.

A test case 🛠

Supposing we have a flow to fetch three strings with delays:

fun fetchSentences(): Flow<String> = flow { emit(“Hello Jade.”) delay(1000) emit(“It’s today.”) delay(1000) emit(“Happy Birthday!”) }

When collecting it, we can assume to use collect and catch operators:

fetchSentences() .catch { … } .collect { … }

So now, we need to test the correct functioning of this flow. By proper functioning, I mean testing these following cases:

  • We should collect three items.
  • Hello Jade.” should be the first item.
  • We should collect “Hello Jade.”, “It’s today.” and “Happy Birthday!”, precisely.
  • The flow should complete with no exception.
  • The flow should handle unexpected exceptions.

Of course, there are others cases but these ones are enough to demonstrate how to unit testing Kotlin Flow.

The tools 📚

We will provide examples with three dependencies which make tests effortless, concise and human readable:

  • In order to run our tests, we will use kotlinx-coroutines-test, specially for delays. runTest will allow us to run test, automatically skipping delays and handling uncaught exceptions.
  • The assertions will be done with Kluent. This is a “fluent assertions” library written specifically for Kotlin above the classic JUnit-Assertions. We will call shouldEqual, shouldNotBeNull and shouldBeNull which are pretty explicit by their names.
  • Mockk will be applied to mock classes and define behavior of functions. It will only be used here to mock exception on the flow collecting.

We will just see how to create unit tests for Kotlin Flow. We won’t look at mocking behavior or assertions details, to keep this article basic and provide a brief and clear explanation.

So let’s see how to unitarily test our flow fetchSentences().

Test the flow 🥊

According to the Google Android’s documentation:

If the subject under test is exposing a flow, the test needs to make assertions on the elements of the data stream.

To do so, we simply consume the Flow API with the already available operators such as drop, take, map… And we collect the flow to check received items with terminal operators like single, toList and so on. It makes the writing easy and intuitive because it’s just like the real execution.

Assuming fetchSentences() is under a class named SentenceService, the first test where it should return exactly three items is made with the count operator:

The next test to verify the first item is done with the first operator:

Checking the items ordering and equality is achieved thanks to a list comparaison with toList:

How to test the correct completion without any exception? By confirming the nullability of the cause into the onCompletion operator:

@Test fun `fetch the sentences with completion and succeed`() = runTest { sentenceService.fetchSentences() .onCompletion { cause -> cause.shouldBeNull() } .collect() }

Finally, the unexpected exceptions can be tested when we mock and modify the response of the flow. So we have to create a new flow and throw an exception into it. It is then checked with catch operator:

Simple, isn’t it?

As we first implemented these cases, we were surprised how easy is it to test with the Flow API. However, we were not really confortable to call “real" operators like onCompletion or catch directly inside our tests.

This is where Turbine comes into place.

Into the Turbine 👀

We decided to rewrite our tests and to play with Turbine — a small but powerful testing library for Kotlin Flow.

It allows us to write succinct tests with no need of operators like the previous chapter. We only have to call the flow’s extension test and look at the items or errors inside it. This light library lets us to deal nicely with flow reception, completion or cancellation. @Geev, we found this library better than the official way.

But using it with delays seemed painful. Since Turbine ignores the current dispatcher, delay() actually delays the flow, and the test‘s block actually runs in the test dispatcher which means timeout won’t work. (#42)

In order to run the tests properly, we can:

  • Use runBlocking instead of runTest. But, we will have to wait two real seconds to complete the full test! Imagine with higher delays…
  • Wrap the execution test in a specific dispatcher thanks to withContext.
  • Create a sibling extension of test to change the context of the flow with flowOn operator.

We used the last thanks to PaulWoitaschek:

suspend fun <T> Flow<T>.testWithScheduler(
timeoutMs: Long = 1000,
validate: suspend FlowTurbine<T>.() -> Unit
) {
val testScheduler = coroutineContext[TestCoroutineScheduler]
return if (testScheduler == null) {
test(timeoutMs, validate)
} else {
flowOn(UnconfinedTestDispatcher(testScheduler))
.test(timeoutMs, validate)

}
}

Start the Turbine 🦾

Actually, the tests with Turbine can be resumed in three methods.

In order to check the number of sentences received, we will catch the items in the stream with awaitItem and call awaitComplete at the end to verify that the flow is done after three items. This will also test the right completion of the stream, without any exception.

As you can see, we use runTest to trigger advanceTimeBy and control the test’s virtual clock, passing the flow delays. And we apply testWithScheduler in order to deal with it properly.

Next, instead of first operator, we wait for the first item with awaitItem and cancel the flow by ignoring the other events:

Finally, we can verify that the flow can deal with an unexpected exception when mocking the service, by using awaitError to compare the exception received as follows:

@Test fun `fetch the sentences with exception and succeed`() = runTest { val expected = RuntimeException(“an exception is raised”) // mock the service val sentenceService: SentenceService = mockk() every { sentenceService.fetchSentences() } returns flow { throw expected } // throw the exception sentenceService.fetchSentences().testWithScheduler { val error = awaitError() error.message shouldEqual expected.message } }

That’s it! No more tests are needed. We think Turbine really simplifies our test cases.

Unit testing with Kotlin Flow 🧞‍♂️

As we saw, writing flows unit tests is pretty straightforward. We consume the Flow API itself or depend of the lightweight library Turbine - both offer a simple and intuitive way to create unit tests. With few operators (first, toList) or few suspend functions (awaitItem, awaitComplete), we are able to create easily our tests.

If you found this post helpful, feel free to clap! 👏 Thanks for reading.

--

--