likes
comments
collection
share

Kotlin 协程的并发安全之道

作者站长头像
站长
· 阅读数 26

前言

Kotlin 协程作为异步编程的强大工具,带来了便捷和高效。然而,随着多个协程共同操作共享数据,我们面临竞态条件和数据竞争的挑战。本文将深入探讨 Kotlin 协程中的并发安全性问题,提供解决方案和最佳实践。

协程并发安全实战

1. 单线程调度(Main Thread)

var countVar = 0

fun main() = runBlocking{

    val jobs = mutableListOf<Job>()
    val timeCost = measureTimeMillis {
        repeat(1000){
            val job = launch {
                delay(100)
                countVar++
            }
            jobs.add(job)
        }
        jobs.forEach{
            it.join()
        }
    }
    log("timeCost =$timeCost")
    log("count =$countVar")
}

第一个例子:count结果是1000,因为共享数据没有发生线程切换,并不会出现并发安全,所以答案是1000。

2. 多线程调度

fun main() = runBlocking{

    val jobs = mutableListOf<Job>()
    val mutex = Mutex()
    val timeCost = measureTimeMillis {
        repeat(1000){
            val job = launch(Dispatchers.Default) {
                delay(100)
                countVar++
            }
            jobs.add(job)
        }
        jobs.forEach{
            it.join()
        }
    }
    log("timeCost =$timeCost")
    log("count =$countVar")
}

第二个例子:count的结果肯定小于等于1000,因为多线程访问,会出现并发安全问题。需要同步。

3. 单线程调度串行执行

fun main() = runBlocking {

    val jobs = mutableListOf<Job>()
    val timeCost = measureTimeMillis {
        val job = launch(Dispatchers.Default) {
            repeat(1000) {
                delay(100)
                countVar++
            }
        }
        jobs.add(job)
        jobs.forEach {
            it.join()
        }
    }
    log("timeCost =$timeCost")
    log("count =$countVar")
}

第三个例子:count结果是1000,和第一个例子一样,因为共享数据没有发生线程切换,并不会出现并发安全,但是串行执行的,所以答案是1000。

所以说 协程作用域是否安全取决于共享数据有没有发生线程切换。若发生线程切换,则需要额外的同步,否则数据不安全。

协程并发安全几种同步方式

CAS 乐观锁

/**
 * CAS 乐观锁
 */
fun main() = runBlocking {
    val atomicInteger = AtomicInteger()
    val jobs = mutableListOf<Job>()
    val timeCost = measureTimeMillis {
        repeat(1000) {
            val job = launch(Dispatchers.Default) {
                delay(100)
                atomicInteger.incrementAndGet()
            }
            jobs.add(job)
        }
        jobs.forEach {
            it.join()
        }
    }
    log("timeCost =$timeCost")
    log("count =${atomicInteger.get()}")
}

sychronized 高阶函数

/**
 * synchronized
 */
fun main() = runBlocking {

    val jobs = mutableListOf<Job>()
    val lock = Any()
    val timeCost = measureTimeMillis {
        repeat(100) {
            val job = launch(Dispatchers.Default) {
                delay(100)
                synchronized(lock) {
                    countVar++
                }
            }
            jobs.add(job)
        }

        jobs.forEach {
            it.join()
        }
    }
    log("timeCost =$timeCost")
    log("count =$countVar")
}

mutex

/**
 * 多线程调度器
 * 需要配合mutex
 */
fun main() = runBlocking{

    val jobs = mutableListOf<Job>()
    val mutex = Mutex()
    val timeCost = measureTimeMillis {
        repeat(1000){
            val job = launch(Dispatchers.Default) {
                delay(100)
                mutex.withLock{
                    countVar++
                }

            }
            jobs.add(job)
        }
        jobs.forEach{
            it.join()
        }
    }
    log("timeCost =$timeCost")
    log("count =$countVar")
}

无锁实现

协程内部访问外部count实现自增改为返回增量结果

fun main() = runBlocking {

    val count = 0
    val timeCost = measureTimeMillis {
        val result = count + List(1000) {
            GlobalScope.async {
                delay(100)
                1
            }
        }.sumOf {
            it.await()
        }
        log("result -->$result")
    }

    log("timeCost -->$timeCost")
    
}

协程并发另外一个例子

并发获取User,使用count 记录还剩多少user没有获取

/**
 * 并发获取User
 */
fun main() = runBlocking {

     val startTime = System.currentTimeMillis()
     val userIds: MutableList<Int> = ArrayList()
     for (i in 1..1000) {
          userIds.add(i)
     }
     var count = userIds.size
     val map: MutableMap<Int, User> = HashMap()
     val deferredResults = userIds.map { userId ->
          async {
               val user = getUserAsync(userId)
               //log("userId-->$userId :::: user --->  $user")
               map[userId] = user
               map
          }
     }


     // 获取每个 async 任务的结果
     val results = deferredResults.map { deferred ->
          count--
          log("count  $count")
          deferred.await()
     }
     val costTime = (System.currentTimeMillis() - startTime) / 1000
     log("count -> $count")
     log("costTime-->$costTime")
     log("user size -> ${results.size}")

}

/**
 * 异步同步化
 */
suspend fun getUserAsync(userId: Int): User = suspendCoroutine { continuation ->
     ClientManager.getUser(userId) {
          continuation.resume(it)
     }
}

其中 ClientManager:

/**
 * 模拟客户端请求
 */
object ClientManager {

    var executor: Executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2)
    val customDispatchers = executor.asCoroutineDispatcher()

    /**
     * getUser
     */
    fun getUser(userId: Int, callback: (User) -> Unit) {
        executor.execute {
            val sleepTime = Random().nextInt(100)
            Thread.sleep(sleepTime.toLong())
            callback(User(userId, sleepTime.toString(), "avatar", ""))

        }
    }
    /**
     * getAvatar
     */
    fun getUserAvatar(user: User, callback: (User) -> Unit) {
        executor.execute {
            val sleepTime = Random().nextInt(1000)
            try {
                Thread.sleep(sleepTime.toLong())
            } catch (e: InterruptedException) {
                e.printStackTrace()
            }
            user.file = "$sleepTime.png"
            callback(user)
        }
    }
    
}

思考,这里面的count 为何不需要同步就能正确获取数据呢?(因为count写操作发生在单线程调度器上)

协程并发总结

避免多线程访问外部可变状态

出现并发安全问题,无非是多线程访问公共变量,如果我们能在单线程调度器的情况下去访问公共变量,就不会出现并发安全问题。

总而言之,如非必须,则避免访问外部可变状态; 如无必要,则避免使用可变状态。这样可以有效降低并发问题的出现概率,使代码更加稳定可靠。

在协程并发中,几种同步方式(CAS 乐观锁、synchronized 高阶函数、mutex)都是为了保护共享的可变状态,确保在任意时刻只有一个协程能够修改它,从而避免数据竞争和不一致的结果。

结尾

通过本文,我们深入了解了 Kotlin 协程中的并发安全性问题,并提供了一些解决方案和最佳实践。在实际应用中,根据具体情况选择适当的同步机制,保证在并发环境中代码的正确性和稳定性。协程作为异步编程的强大工具,能够更方便地处理并发问题,但也需要谨慎使用,特别是在多线程环境下。

源码:github.com/ThirdPrince…