Kotlin Coroutine X Functional Programming

Andy Lu
24 min readOct 5, 2022
Image by Randy Rodriguez from Pixabay

What’s Coroutine?

Coroutine 在 Kotlin 1.3 版的時候推出,它是一種非同步任務的解決方案,如果用一句話形容,可以看作是一個輕量型的執行緒(Thread)。雖然 Coroutine 的背後也是在執行緒上執行,但是與直接使用執行緒不同的是,一個執行緒上能夠同時執行多個 Coroutine,啟動相同數量的非同步任務,使用 Coroutine 比起直接使用執行緒能夠減少內容交換(Context switch)次數,以利減少資源浪費。另外,Coroutine 將 Callback 隱藏在其設計中,使用它寫出來的非同步的程式碼將會像是同步程式碼一般的簡單。而由於 Kotlin 是多範式的一種程式語言,除了大多數人所熟知的物件導向程式設計(Object oriented programming — OOP),還有我們本篇文章要討論的函數式程式設計( Functional Programming — FP),而 Coroutine 也支援 FP 的寫法,關於 FP 的內容,我們將在後面的篇幅繼續討論,我們先繼續討論 Coroutine。

在 Coroutine 內有許多種執行非同步任務的方式,我們可以依照非同步任務的數量以及是否有回傳值來分為三類。

  1. 沒有回傳值的單一非同步任務: launch
  2. 有回傳值的單一非同步任務:async
  3. 多個非同步任務:channel,flow

launch

一個 launch 能夠啟動一個非同步任務,在 launch 區塊中,如果沒有額外提供 Context,那麼將會使用父 Coroutine 的 Context,底下是一個簡單的 launch 範例。

fun main() = runBlocking {
launch {
println("launch block")
}
print("Start: ")
}
// Start: launch block

launch 建立了一個 Coroutine 區塊,因為外層已有一個 runBlocking 所產生出來的 Coroutine 區塊,所以這個 launch 區塊將是屬於 runBlocking 的子 Coroutine, 而 Coroutine 執行的順序會由上至下、由外至內,所以上例的結果會先列印出 launch 區塊外層的 Start: 接著才是 launch 區塊內的 launch block 。這種依照結構的方式啟動非同步任務稱為「結構化併發(Structure concurrency)」,而在 Coroutine 有一個最方便的地方,那就是我們可以輕鬆的取消任何任務,而不需要考慮子 Coroutine 的任務是否完成,因為在 Coroutine 中,當我們取消父 Coroutine 時,所有的子 Coroutine 都會一併被取消。

async

async 與 launch 都是屬於單一非同步任務的類別,差別在於一個有回傳值,另一個沒有。在使用 async 建立的區塊中,會得到一個型別為 Deferred<T> 的回傳值,如此一來,我們就不用等到任務完成取得回傳值後才能繼續,只要在真正需要使用該回傳值的地方呼叫 await() 取值,若呼叫 await() 的時候任務已經完成,那麼就會回傳真正的結果,否則將會等待到任務完成時才會回傳值。

一個簡單的 async 範例如下,第一個 async 區塊使用 delay() 讓該 Coroutine 暫停 100 毫秒,並在 100 毫秒之後回傳整數 100。同樣地,第二個 async 區塊則是暫停 200 毫秒,並回傳整數 200。 並在底下分別在這兩個 async 所產生的回傳值上使用 await() 取得這兩個 async 區塊的回傳值。

fun main() = runBlocking {
val result1 = async(Dispatchers.Default) {
println("task 1, ${Thread.currentThread().name} ")
delay(100)
100
}
val result2 = async(Dispatchers.Default) {
println("task 2, ${Thread.currentThread().name}")
delay(200)
200
}
println("Result is ${result1.await() + result2.await()},
Thread: {Thread.currentThread().name}")
}
// task 1, DefaultDispatcher-worker-1
// task 2, DefaultDispatcher-worker-2
// Result is 300, Thread: main

channel

channel 如同它的名稱,它建立了一個提供給非同步任務使用的通道,通道的一端為發射端,呼叫 channel 物件的 send() 函式將非同步任務傳出去,而在通道的另一端使用 receive() 來取得該任務的回傳值。 channel 是屬於熱流(hot flow),意思是當我們呼叫 send 的時候,該任務就啟動,所以假如在通道中有一個任務需要較多的時間來處理,那麼通道將會阻塞。下方的範例中,使用通道將 task1()task2() 的任務傳出去,接收端使用 for loop 來接收取得的值。使用 channel 要小心,當傳完所有的值時,必須要呼叫 close() 把通道關閉,這樣在接收端才不會永遠無法處理完該通道的值。

fun main() = runBlocking {
val channel = Channel<Int>()
launch(Dispatchers.Default) {
channel.send(task1())
channel.send(task2())
channel.close()
}

var result = 0
for (i in channel) result += i
print("Result: $result,
Thread: ${Thread.currentThread().name}")
}
// task 1, DefaultDispatcher-worker-1
// task 2, DefaultDispatcher-worker-1
// Result: 300, Thread: main

flow

flow 類似 channel 都是負責處理多個非同步任務的方法之一。不過與 channel 不同的是,它是屬於冷流(cold flow),在 flow 內只有在最後終端流運算子(Terminal flow operators)的階段才會執行 flow 裡的任務,如 collect, reduce… 。另外,flow 是採用 FP 的寫法來編寫,所以能夠用多個運算子來組合成所需的動作。

下面為一個簡單的 flow 範例,使用 emit() 將任務發射出去,並在呼叫鍊的最後使用終端流運算子 reduce 來處理所有 flow 的值。

fun main() = runBlocking {
val result = flow() {
emit(task1())
emit(task2())
}
.flowOn(Dispatchers.Default)
.reduce { accumulator, value -> accumulator + value }
print("Result: $result,
Thread: ${Thread.currentThread().name}")
}
// task 1, DefaultDispatcher-worker-1
// task 2, DefaultDispatcher-worker-1
// Result: 300, Thread: main

What’s Functional Programming?

前面介紹了 Coroutine 內四種的執行非同步任務的方法,launch、async、channel 以及 flow 。這四種方法中, 只有 flow 是採用 FP 的方式,其餘都是使用循序程式設計的方式。

那麼 FP 與平常我們開發的方式有什麼不同呢?

維基百科提到 FP 是一種宣告式程式設計,將值映射到其他值的表達式樹。與命令式程式設計不同,不是更新程式執行的狀態。

Functional programming is a programming paradigm where programs are constructed by applying and composing functions.

It is a declarative programming paradigm in which function definitions are trees of expressions that map values to other values, rather than a sequence of imperative statements which update the running state of the program. — Wikipedia

聽起來有點霧煞煞,是吧。

關於宣告式程式設計與命令式程式設計的差異,可以用一段話來分辨。命令式程式設計是 How you do something,而宣告式程式設計則是 What you do something.

宣告式程式設計使用已經設計好的高階函式,依照我們的需求組合而成,一個步驟用多個高階函式組合而成,由於每個函式的結果的值都可預測,所以將所有函式組合起來其結果也是可預測的,表示相同的函式能用更高的抽象來實作。

純函式

在 FP 中,有一個很重要的觀念稱作「純函式(pure function)」,前面所說的高階函式都是屬於純函式。

什麼是純函式呢?有兩個要素。

  1. 相同的輸入永遠得到相同的值。
  2. 沒有副作用(Side effect)。

第一點應該很好理解,我們無論輸入什麼值,都能夠得到相同的值,無論呼叫多少次,其結果都不會變。

那麼副作用是什麼呢?有時候我們在寫函式的時候,可能會在該函式內去修改其他變數的值,例如,當下載檔案的時候,可能在呼叫這個函式的裡面,同時更新狀態為”下載中”,但是由於這個狀態是存在函式外面,所以就必須要呼叫該函式或是直接設值。這就是副作用。

純函式可以用一個一元二次方程式來比喻,下方這個函式無論我們輸入什麼值,它都能對應到一個唯一的結果,而且不會因為呼叫這個函式而讓某個值改變其內容,這就是純函式。

將上面這個一元二次方程式畫成 x -> f(x) 映射圖,可以發現左側的輸入值對皆對應到右邊的一個值,也就是說這個一元二次方程式是一個純函式。

想想看,若純函式的輸入與輸出值是一對一的映射,那麼多對一是純函式嗎?答案是「沒錯」。

只要相同輸入能夠取得相同的輸出,且沒有副作用,那麼就可以稱這個函式為「純函式」;反之則不是,也就是說,一個函式若其輸入與輸出的映射關係是一對多,就不是純函式。

高階函式

高階函式可以分為三類:過濾(Filter)、轉換(Transform)以及合併(Combine)。

  1. 過濾(Filter):輸入與輸出的型別是相同的,且都為 List。如:filter()
  2. 轉換(Transform):輸入與輸出可能是不同型別,不過經過轉換函式後,還是 List。如:map()
  3. 合併(Combine):將輸入的值經由條件合併成一個值。如:reduce()。

細節可參考我之前的文章

Why Coroutine x FP?

Kotlin 內的高階函式大多都是「純函式」,它讓程式碼能用更高階的抽象來實作,處理需求時,程式的焦點由如何作(How to do something?)改為怎麼作(What to do something?)每個高階函式因為負責的功能只有單一功能,所以我們更能夠在不同的地方複用相同的函式,如此就能減少在不同類別開發類似功能的情況。

除了純函式, FP 還有一個值得討論的部分 — 錯誤處理。一般情況之下,當我們呼叫一個函式,且正常執行,如果該函式有回傳值,使用 return 將結果傳出;假如沒有回傳值,當函式由上至下執行完成後,就會離開函式。萬一執行的過程發生異常(Exception)呢?為了避免因異常導致系統非預期的運作,需要在呼叫端使用 try-catch 將異常捕捉起來,在 catch 區塊針對異常處理,當異常發生時,因正常的流程不會進到 catch 區塊裡,所以執行的流程與正常的不太相同,有可能會造成預期以外的情況,所以在使用 try-catch 處理異常需要特別小心。

第二種處理錯誤的方式是使用一個不存在的回傳值,譬如當回傳值的型別是整數時,可能會用 -1 來代表錯誤。雖然不需使用 try-catch,函式內的執行流程也會與原本的相同,但這種方式有個問題,這個現在沒有在使用的值,若在未來被使用,在接收端就有可能將正確的值誤認為錯誤的值。

面對這兩種情況,可以使用一個包裝型別(Wrapper Type) 解決,接下來將介紹兩種常用的包裝型別。

Option

第一個包裝型別是 Option,在 Option 型別內,包含了兩個子型別 — Some 以及 None。若呼叫函式並正確取得結果時,這時候可以將結果塞進 Some 裡面;反之,若沒有取得任何結果,就回傳 None。

sealed class Option<out A> {
abstract fun isEmpty(): Boolean
internal object None : Option<Nothing>() {
override fun isEmpty(): Boolean = true
override fun toString(): String = "Option.None"
}
internal data class Some<out A>(internal val value: A) :
Option<A>() {
override fun isEmpty(): Boolean = false
override fun toString(): String = "Option.Some($value)"
}
}

在 Kotlin 中,使用 sealed class 實作 Option 類,在 Option 內部定義兩個 internal 的物件,一個是 internal object None,另一個則是 internal data class Some<out A> 。使用 sealed class 最大的好處就是,使用 when 時,IDE 會提醒你列出所有的可能性。而使用 internal 是讓相同模組都能夠看到此物件。

下方 getUser 函式,由於使用者的 Id 皆是大於零,所以當呼叫端呼叫這個函式時,如果輸入小於零的值,該函式將會直接拋出 IllegalArgumentException ,讓呼叫端知道無法正確的取得使用者,如前文所述,這時就需要在呼叫端使用一個 try-catch 來處理。

我們將 getUser 函式透過 fetchUser 來呼叫,因為已知 Id 的值不可小於零,所以將 id >= 0 的情況給予正常的值,也就是 Option.Some(getUser(id);若 id < 0 則回傳 Option.None

suspend fun fetchUser(id: Int): Option<String> =
if (id >= 0) Option.Some(getUser(id)) else Option.None
private suspend fun getUser(id: Int): String {
delay(100)
if (id < 0) {
throw IllegalArgumentException("Id incorrect")
}
return "User: $id"
}

對於呼叫端來說,它得到的是 Option 的內部物件 Some 或是 None,雖然已經將異常消除,但是呼叫端最需要的結果,目前卻還沒有辦法得到,因為縱使是正確的流程,其結果還是存放在 Option.Some 的內部。

對於這種情況,需要在 Option 內部加上 getOrElse 函式,當傳回的值是 None 時,就回傳給予的預設值,否則直接回傳存放在 Some 內部的 value。

fun getOrElse(default: @UnsafeVariance A): A = when (this) {
is None -> default
is Some -> value
}
val user1 = fetchUser(-1).getOrElse("Not found")
val user2 = fetchUser(0).getOrElse("None")
println(user1) //Not found
println(user2) //User: 0

使用 Option 能避免異常的發生,也能夠讓異常與正常的流程的呼叫流程都相同,如此就能減少錯誤發生。

Either

把結果用 Option 包起來,可以讓程式碼的語意更清晰,呼叫的流程更明確,不過有一個問題,假設呼叫端需要知道異常的內容,才能夠加以處理,那我們該如何作呢?這時候,Either 類就能夠解決這個問題。

與 Option 類似,一樣都有兩個內部類別,差別在於 Either 的兩個內部類別都包含了一個型別,而這正是與 Option 不同的地方。

public sealed class Either<out A, out B> {    public data class Left<out A> constructor(val value: A) :
Either<A, Nothing>() {
override fun toString(): String = "Either.Left($value)"
}
public data class Right<out B> constructor(val value: B) :
Either<Nothing, B>() {
override fun toString(): String = "Either.Right($value)"
}
}

從 Either 的內部構造來看,不難發現 Left 與 Right 的內容大同小異,其目的就是用來儲存兩個不同的類別。在英文中, Right 除了有右邊的意思,還有正確的意思,所以在 Either 類就是以這個雙重的意思來儲存數值,當結果正確的時候,就將數值存放在 Right 中,反之就存在 Left 內。

將前面的範例改用 Either :

suspend fun getUser(id: Int): Either<IllegalArgumentException, String> {
delay(100)
if (id < 0) {
return Either.Left(IllegalArgumentException("Id incorrect"))
}
return Either.Right("User: $id")
}

可以發現在 id < 0 的情況之下,改用 Either 就能夠將異常的類別一並存起來,呼叫端就能夠在不使用 try-catch 的情況之下處理這個異常了。

val user0 = getUser(0)
val user1 = getUser(-1)
println(user0) // Either.Right(User: 0)
println(user1) // Either.Left(java.lang.IllegalArgumentException: Id incorrect)

如同 Option 的情況,目前我們取得的值還是被 Either 類包起來,我們可以在 Either 實作類似 Option 的 getOrElse,如此就能夠當異常發生時使用預設值。

fun getOrElse(default: () -> @UnsafeVariance B): B = when (this) {
is Left -> default()
is Right -> value
}
val user0 = getUser(0).getOrElse { "none" } // User: 0
val user1 = getUser(-1).getOrElse { "none" } // none

Introduce 𝝠rrow library

使用 Option 以及 Either 在異常處理上雖然有著很顯著的好處,但是如果這兩個類都要自己寫,不免太費力,這時候我們就可以使用 𝝠rrow 函式庫,它針對 Kotlin 的標準函式庫提供了許多類別、函式使用,包含 Option 以及 Either 都包含在內。

https://arrow-kt.io/

進入𝝠rrow 官網後,正中間有分四個類別,在前一節介紹的 Option 以及 Either 也包含在 Core 裡。

將 𝝠rrow core 加入至專案中,就不必那麼辛苦寫 Option 以及 Either 了XD

除了 Core,還有一個 Fx 類別,而這個類別包含提供給 Kotlin Coroutine 使用的 Functional Effects Framework,這個函式庫提供了一些工具讓我們在平行處理上有更好的使用。

接著將介紹兩個 FX 內方便的函式、類別。

guaranteeCase

有時候我們需要執行多個非同步任務,如下方的 action1() 以及 action2() 分別執行一個時間不同的任務。

suspend fun action1() {
println("action 1: start")
delay(100)
println("action 1: done")
}
suspend fun action2() {
println("action 2: start")
delay(200)
println("action 2: done")
}
launch {
action1()
action2()
}
// action 1: start
// action 1: done
// action 2: start
// action 2: done

假如 action1() 出現異常,那 action2() 就不會執行。

launch {
action1WithException()
action2()
}
// Start launch block
// action 1: start
// Exception in thread "main" java.lang.RuntimeException: Something wrong

如果我們希望無論如何都要執行 action2(),guaranteeCase 就能夠派上用場了,將 action1() 放在第一個變數,action2() 放在第二個變數,如此一來,就算第一個任務發生異常,也能夠執行 action2()。

launch {
guaranteeCase(
{ action1WithException() })
{ action2() }
}
// action 1: start
// action 2: start
// action 2: done
// Exception in thread "main" java.lang.RuntimeException: Something wrong

攤開 guaranteeCase 的程式碼一看,其實它也是使用 try-catch 來實作的,不過用 guaranteeCase 的好處就是我們不需要自行處理這個 try-catch,另外,無論是異常發生或是任務被取消,ExitCase 都能夠得知,我們也就可以根據不同的情況來執行後續的任務。

public suspend inline fun <A> guaranteeCase(
fa: suspend () -> A,
crossinline finalizer: suspend (ExitCase) -> Unit
): A {
val res = try {
fa()
} catch (e: CancellationException) {
runReleaseAndRethrow(e) { finalizer(ExitCase.Cancelled(e)) }
} catch (t: Throwable) {
runReleaseAndRethrow(t.nonFatalOrThrow()) { finalizer(ExitCase.Failure(t.nonFatalOrThrow())) }
}
withContext(NonCancellable) { finalizer(ExitCase.Completed) }
return res
}
guaranteeCase({
action1WithException()
}) {
when(it){
ExitCase.Completed -> TODO()
is ExitCase.Cancelled -> TODO()
is ExitCase.Failure -> TODO()
}
}

Atomic

多執行緒的使用情境下,如果需要在不同的執行緒上讀寫同一個變數,這時有可能因為數值還沒寫進記憶體,執行緒就已經切換,導致最後的結果與我們預期的不同。

由於 Coroutine 背後也是執行在執行緒上,雖然單一執行緒可以執行多個 Coroutine,但是當啟動的 Coroutine 太多時,還是必須要在多個執行緒上切換。如下面範例,有一個整數 count 存放在 runBlocking區塊中,並且啟動了 20000 個 Coroutine 來作相加,理想狀態之下,這個結果應該會是 20000,不過最後的結果卻是 17704,與預期不符:

fun main() = runBlocking {
var count = 0
repeat(20000) {
launch(Dispatchers.Default) {
count = count.inc()
}
}
delay(10)
println(count)
}
// 17704

如果將 count 改成使用 Atomic ,修改後的範例如下,先使用 Atomic(0) 給予初始值,並在每一個 Coroutine 內使用 Atomic 的 update 更新內容,執行完成後的結果就如我們預期的 20000 :

fun main() = runBlocking {
val atomicCount = Atomic(0)

repeat(20000) {
launch(Dispatchers.Default) {
atomicCount.update(Int::inc)
}
}
delay(10)
println(atomicCount.get())
}
// 20000

使用 Atomic 將這個變數變成原子的變數,它會確保完整寫入記憶體之後才會切換執行緒,所以不會遇到數值還沒有更新至記憶體中,執行緒的內容就被切換,導致計算的結果與實際不符。

Conclusion

Functional Programming 讓開發者用更安全的方式開發,純函式讓程式的結果變得可預期,而 Kotlin 標準函式庫內的高階函式讓開發提升了一個層次,我們能夠用更高的抽象來實作,減少了低階的實作,我們從 How you to do something? 變成 What you to do something?

在 Kotlin Coroutine 中,flow 的用法就是使用 FP,相比同樣都是用來處理多個非同步任務的工具 — channel ,它的寫法是不是又更簡便、易懂呢?

而 FP 除了純函式之外,它所帶來的幾個型別(Option, Either… )也能夠妥善的處理異常及錯誤的情況,將結果使用 Option 包起來,程式除了能夠正確的處理正常的情況,不正常的情況也能夠使用 None 來表示,這種情況適用於當我們不介意為什麼拿不到值的情況。若我們需要知道取不到值的原因,或許可以改用 Either 將結果包起來,在 Either 的內部有包含了 Left 以及 Right ,在這兩個物件中,都能存放一個數值,Right 存放的是正確的值,而 Left 存放的則是不正確的值。我們便可以依照需求判斷 Left 裡面的值來作後續處理。

無論是 Option 或是 Either,都可以在 Arrow core 函式庫裡面找到,因為這兩個型別是FP 領域中經常使用的型別之二。

而 Arrow 函式庫的分類,除了 core 類別以外,還有 FX 類別。

FX 是提供給 Kotlin Coroutine 的 Functional Effects Framework ,在裡面包含了一些在進行平行處理時可能會需要用到的項目,在本篇文章介紹了 guaranteeCase 以及 Atomic ,在 FX 裡面不只這兩的東西,其他的部分之後有機會在討論。

你的鼓勵是我繼續寫作的動力

--

--

Andy Lu

Android/Flutter developer, Kotlin Expert, like to learn and share.