KMM(Kotlin Multiplatform Mobile)入门(五)处理多线程

为什么需要在 KMM 中处理多线程?

我们使用 KMM,通常是处理和 UI 无关的业务逻辑,所以多数情况是网络请求、数据缓存、文件读写等操作,为了不影响 UI 绘制,这些操作往往都会在异步线程中执行,而 KMM 模块的线程切换,调用方肯定是不应该去管理的,所以需要探索一种在 KMM 中比较靠谱的多线程处理方式

可行的多线程切换方式

  1. 协程(kotlinx-coroutines

    Kotlin 协程不依赖于 JVM 实现,可以应用在 Kotlin Native 项目当中,不需要再实现平台差异化代码,且协程开销与线程相比较小,可以满足异步任务的需求

    但协程也有一些问题,比如,执行顺序不好控制,如果需要异步且串行地执行一系列任务,使用协程并不有效地、方便地控制执行顺序

    另外,协程对 Kotlin Native 支持的并不像 JVM 上那么完美(尤其是多线程的实现),有待进一步完善

  2. expect + Block

    将需要异步执行的任务包成一个 Kotlin 闭包,实际将其扔进工作线程的方法,由 expect + actual 组合的形式实现,Android 端可以利用线程池,iOS 端可以使用 GCD

    这种方法归根结底还是使用了现有比较成熟的多线程方案,执行顺序比较容易控制,但需要一定的基础能力建设,需要编写一些平台差异化代码

  3. 第三方库

    KMM 官方推荐 CoroutineWorkerReaktive

    CoroutineWorker 是对 Kotlin 协程的封装,迭代比较少,不算比较稳定的方案

    Reaktive 采用 RxJava 的实现思想,Native 底层采用 Kotlin Native 实现,目前看功能还算比较方便,但框架相对较重

KMM 多线程需要注意的问题

针对 Android,KMM 多线程并没有太大的问题,其主要问题是针对 Native(包括 macOS、iOS 等)

可变性

Kotlin Native 仅允许不可变状态的变量在多线程之间共享,但它所要求的不可变,并不是简简单单地使用 val 修饰,而是必须为 frozen 状态!

所以 Kotlin 的设计是,Native 代码中进行跨线程对象传递,必须调用 freeze() 方法,使其变成 frozen 状态,否则就会抛出异常,在 KMM 的表现就是,iOS App Crash

同时,会在控制台中打出以下异常 Log

1
Uncaught Kotlin exception: kotlin.native.IncorrectDereferenceException: illegal attempt to access non-shared [某个对象] from other thread

此时只需要 iosMain 的代码中,将需要多线程共享的变量最外层调用 freeze() 方法即可将某个对象置为 frozen 状态

注意:freeze() 方法只能在 iosMain 中调用,即 Kotlin Native 模块,所有涉及线程切换的变量,都需要调用这个方法

一般将需要在异步线程中执行的代码放在一个 Block 当中,对这个 Block 变量进行 freeze 即可,可参考如下代码:

1
2
3
4
5
6
// Common 代码
fun testMethod() {
doInWorkThread {
// ...
}
}
1
2
3
4
5
// iosMain 代码
actual fun doInWorkThread(task: () -> Unit) {
task.freeze()
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.toLong(), 0), task)
}

单例及全局变量的使用问题

  • 单例

    在 Kotlin 语法中,可以使用 object 来创建一个单例,在 KMM 中,单例默认都是 frozen 状态的,所以单例及内部的变量,都是不可变的(仅允许初始化时赋值)所以如果需要在单例内部使用 var 变量,需要为单例添加 @ThreadLocal 注解,使其可以在多线程中共享(多线程访问时,会对其进行 copy),且变为可变状态

    如果使用 companion object (伴生对象),以上规则同样适用

    1
    2
    3
    4
    5
    object MySingleton {
    var state: Int = 0
    }

    MySingleton.state = 10 // 此段代码会抛出异常
    1
    2
    3
    4
    5
    6
    @ThreadLocal
    object MySingleton {
    var state: Int = 0
    }

    MySingleton.state = 10 // 正常运行
  • 全局变量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // Test.kt
    package com.coderyuan.test

    // 仅主线程可见,多线程时候时报错
    val myField = "test"

    // 多线程可共享
    @SharedImmutable
    val myField = "test"

    // 多线程环境可赋值
    @ThreadLocal
    var mutableField = "xxx"

    以上代码中,myField 字段只在主线程可见,如需多线程共享,需要添加 @SharedImmutable 注解,但仍是不可变属性,如需在多线程中对其赋值,仍需要添加 @ThreadLocal 注解

    1
    2
    @ThreadLocal
    var mutableField = "xxx"

    多线程变量共享

上述方案中,使用 @ThreadLocal 的原理和 Java 中的 ThreadLocal 类似,都是对某个对象的复制,从而实现在多线程中的赋值,虽然可以避免在 iOS 的多线程环境中出现 Crash,但也造成无法真正实现共享的问题,如:一些全局的状态、注入的方法实现等,此时就需要用到原子性(Atomics)相关的工具类,比较推荐的是由 Touchlab 开发的 Stately

Stately 专注解决 KMM 中的多线程问题,分为:stately-common、stately-concurrency、stately-isolate、stately-iso-collections、stately-collections(已弃用,推荐使用 stately-iso-collections)几个子组件

其中 stately-common 中提供了 freeze、ensureNeverFrozen 等方法来方便开发者处理对象的 freeze 状态

而 stately-concurrency 封装了一系列常用的 Atomic 类,如 AtomicBoolean、AtomicInt、AtomicReference 等,以及对 ThreadLocal、Lock 的封装和多平台实现

stately-iso-collections 则利用上述部分组件实现了一些线程安全的集合组件,如:IsoMutableMap 就类似于 Java 中的 ConcurrentHashMap

使用 Stately 能够满足日常业务开发中常用的多线程处理,非常方便,如果需要实现多线程之间的数据共享,可以利用 AtomicReference

比如,我需要一个用来存储全局状态的单例,可以按以下的方式定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// MySingleton.kt

object MySingleton {
// 计数器
val globalCount = AtomicInt(0)
// 开关
val globalSwitch = AtomicBoolean(false)
// 全局数据对象
val globalUserInfo = AtomicReference(UserInfo())
}

data class UserInfo(
var name: String? = null,
var number: String? = null
)

那么在多线程环境中,如果需要对以上全局对象进行修改或取值,就可以用如下方式实现:

1
2
3
4
5
6
7
8
9
10
11
fun demo() {
// 修改
MySingleton.globalCount.set(10)
MySingleton.globalSwitch.value = false
MySingleton.globalUserInfo.set(UserInfo("xxx", "123"))

// 取值
val currentCount = MySingleton.globalCount.value
val currentSwitch = MySingleton.globalSwitch.value
val currentUser = MySingleton.globalUserInfo.value
}

以上 demo 方法在多线程环境中,不会出现异常,且可以在多线程环境中同步每次修改的结果

个人比较推荐的异步工作实现方案

由于常用的操作对 Rx 或 协程的依赖并不那么强,所以个人认为应当做的比较轻量才是,所以更推荐上述的第二种自行实现的方案,可以根据自己的需要定制异步任务管理器

首先在 Common 中设计好需要的 Top-Level 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// BackgroundTaskUtils.kt (Common)

package com.coderyuan.kmm.async

/**
* 异步工作任务(支持delay)
*
* @param task Kotlin闭包
* @param delayedSec 延迟时间,默认0s
*/
expect fun doInBgThread(task: (() -> Unit), delayedSec: Int = 0)

/**
* 异步工作任务
*
* @param task Kotlin闭包
*/
expect fun doInBgThread(task: (() -> Unit))

/**
* 主线程任务(支持delay)
*
* @param task Kotlin闭包
* @param delayedSec 延迟时间,默认0s
*/
expect fun doInMainThread(task: (() -> Unit), delayedSec: Int = 0)

/**
* 主线程任务
*
* @param task Kotlin闭包
*/
expect fun doInMainThread(task: (() -> Unit))

在 Android 中,利用 SingleThreadExecutor 实现异步串行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// BackgroundTaskUtils.kt (Android)

package com.coderyuan.kmm.async

import android.os.Handler
import android.os.Looper
import java.util.concurrent.Executors

/**
* 默认串行执行队列
*/
private val backgroundSerialExecutor by lazy { Executors.newSingleThreadExecutor() }
/**
* 主线程Handler
*/
private val mainHandler by lazy { Handler(Looper.getMainLooper()) }

actual fun doInBgThread(task: (() -> Unit), delayedSec: Int) {
doInMainThread({
backgroundSerialExecutor.submit(task)
}, delayedSec)
}

actual fun doInBgThread(task: (() -> Unit)) {
backgroundSerialExecutor.submit(task)
}

actual fun doInMainThread(task: (() -> Unit), delayedSec: Int) {
mainHandler.postDelayed(task, delayedSec * 1000L)
}

actual fun doInMainThread(task: (() -> Unit)) {
doInMainThread(task, 0)
}

在 iOS 中,利用 GCD 实现异步执行,要注意对跨线程的 task 变量,都进行 freeze()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// BackgroundTaskUtils.kt (iOS)

package com.coderyuan.kmm.async

import platform.darwin.*
import kotlin.native.concurrent.freeze

actual fun doInBgThread(task: (() -> Unit), delayedSec: Int) {
task.freeze()
val delayTime = dispatch_time(DISPATCH_TIME_NOW, (NSEC_PER_SEC * delayedSec.toUInt()).toLong())
dispatch_after(delayTime, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.toLong(), 0), task)
}

actual fun doInBgThread(task: (() -> Unit)) {
task.freeze()
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.toLong(), 0), task)
}

actual fun doInMainThread(task: (() -> Unit), delayedSec: Int) {
task.freeze()
val delayTime = dispatch_time(DISPATCH_TIME_NOW, (NSEC_PER_SEC * delayedSec.toUInt()).toLong())
dispatch_after(delayTime, dispatch_get_main_queue(), task)
}

actual fun doInMainThread(task: (() -> Unit)) {
task.freeze()
dispatch_async(dispatch_get_main_queue(), task)
}

以上代码,可以比较简单地满足异步执行任务的需要,比较轻量化,有一点不足就是没有实现取消的操作,需要的话,可以根据 GCD、Executor、Handler 的特点,自行实现,难度不大