一.线程的桥接

1.runBlocking方法

runBlocking方法用于在线程中去执行suspend方法,代码如下:

@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    // 通知编译器,block只执行一次
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    // 获取当前线程
    val currentThread = Thread.currentThread()
    // 获取上下文中的拦截器
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    // 如果拦截器为空,代表无法进行调度
    if (contextInterceptor == null) {
        // 从线程中获取EventLoop,获取失败则创建一个新的
        eventLoop = ThreadLocalEventLoop.eventLoop
        // 添加到上下文中
        // newContext = EmptyCoroutineContext   context   eventLoop
        newContext = GlobalScope.newCoroutineContext(context   eventLoop)
    } else {// 如果有拦截器
        // 尝试将当前拦截器转换成EventLoop,
        // 如果转换成功,则判断是否允许可以在上下文中使用
        // 如果转换失败或不允许,则创建一个新的
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
            ?: ThreadLocalEventLoop.currentOrNull()
        // 计算新的上下文
        // 这里没有把EventLoop加到上下文,因为加入后会覆盖拦截器
        newContext = GlobalScope.newCoroutineContext(context)
    }
    // 创建一个协程
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    // 启动协程
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    // 分发任务
    return coroutine.joinBlocking()
}

2.BlockingCoroutine类

在runBlocking方法中,最终创建了一个类型为BlockingCoroutine的对象。BlockingCoroutine类继承自AbstractCoroutine类,代码如下:

// 继承了AbstractCoroutine
private class BlockingCoroutine<T>(
    parentContext: CoroutineContext,
    private val blockedThread: Thread,
    private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true) {
    // 该协程是一个作用域协程
    override val isScopedCoroutine: Boolean get() = true

    override fun afterCompletion(state: Any?) {
        // 如果当前线程不是阻塞线程
        if (Thread.currentThread() != blockedThread)
            // 唤醒阻塞线程
            LockSupport.unpark(blockedThread)
    }

    @Suppress("UNCHECKED_CAST")
    fun joinBlocking(): T {
        registerTimeLoopThread()
        try {
            // 注册使用EventLoop
            eventLoop?.incrementUseCount()
            try {
                // 死循环
                while (true) {
                    @Suppress("DEPRECATION")
                    // 如果线程当前中断,则抛出异常,同时取消当前协程
                    if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                    // 分发执行任务,同时获取等待时间
                    val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                    // 如果任务执行结束,则退出循环
                    if (isCompleted) break
                    // 休眠指定的等待时间
                    parkNanos(this, parkNanos)
                }
            } finally { // paranoia
                // 注册不使用EventLoop
                eventLoop?.decrementUseCount()
            }
        } finally { // paranoia
            unregisterTimeLoopThread()
        }
        // 获取执行的结果
        val state = this.state.unboxState()
        // 如果执行过程中取消,则抛出异常
        (state as? CompletedExceptionally)?.let { throw it.cause }
        // 返回结果
        return state as T
    }
}

BlockingCoroutine类重写了变量isScopedCoroutine为true。

isScopedCoroutine表示当前协程是否为作用域协程,该变量用在cancelParent方法中。对于一个作用域协程,当它的子协程在运行过程中抛出异常时,子协程调用cancelParent方法不会导致作用域协程取消,而是直接返回true。当子协程执行完毕,作用域协程获取结果时,如果发现子协程返回的结果为异常,则会再次抛出。

相比于一般协程,作用域协程不相信子协程在执行过程中取消通知,而是在执行完毕后亲自检查结果是否为异常,达到一种“耳听为虚,眼见为实”的效果。

joinBlocking方法通过循环在当前线程上对EventLoop进行任务分发来实现线程的阻塞。当任务发生异常或执行完毕后,会回调重写的afterCompletion方法,唤起线程继续循环,当在循环中检测到isCompleted标志位为true时,会跳出循环,恢复线程执行。

二.线程的切换

1.withContext方法

withContext方法用于在协程中切换线程去执行其他任务,该方法被suspend关键字修饰,因此会引起协程的挂起,代码如下:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    // 通知编译器,block只执行一次
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    // 直接挂起,获取续体
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // 从续体中获取上下文
        val oldContext = uCont.context
        // 计算新的上下文
        val newContext = oldContext   context
        // 检查任务是否执行完毕或取消
        newContext.checkCompletion()
        // 如果前后两次的上下文完全相同,说明不需要切换,只需要执行即可
        if (newContext === oldContext) {
            // 创建续体的协程
            val coroutine = ScopeCoroutine(newContext, uCont)
            // 执行block
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // 拦截器相同,但是上下文中增加了其他的元素
        // 这里也是在同一个线程上执行,但是其中增加的元素只在执行当前的block中使用
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            // 创建续体的协程
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // 将当前线程ThreadLocal中的对象更新成newContext上下文对应的对象
            withCoroutineContext(newContext, null) {
                // 执行block
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // 走到这里,说明要切换线程执行block任务
        val coroutine = DispatchedCoroutine(newContext, uCont)
        // 启动父协程
        coroutine.initParentJob()
        // 启动协程
        block.startCoroutineCancellable(coroutine, coroutine)
        // 获取结果
        coroutine.getResult()
    }
}

通过对上面代码的分析,可以发现withContext根据上下文的不同进行了三种分类,创建不同的协程并通过不同的方式去执行block。如下表所示:

协程上下文变化 协程类型 启动方式
完全相同 ScopeCoroutine startUndispatchedOrReturn
拦截器相同 UndispatchedCoroutine startUndispatchedOrReturn
拦截器不同 DispatchedCoroutine startCoroutineCancellable

接下来,将对不同情况下协程的启动与执行进行分析。

2.startUndispatchedOrReturn方法

startUndispatchedOrReturn方法用于在相同的上下文环境中启动协程,代码如下:

internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
    // 初始化并绑定父协程
    initParentJob()
    // 获取并处理执行结果
    return undispatchedResult({ true }) {
        // 启动协程
        block.startCoroutineUninterceptedOrReturn(receiver, this)
    }
}
private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
    shouldThrow: (Throwable) -> Boolean,
    startBlock: () -> Any?
): Any? {
    // 启动协程,获取结果,
    val result = try {
        startBlock()
    } catch (e: Throwable) {
        // 产生异常,则按照取消处理
        CompletedExceptionally(e)
    }
    // 如果结果为挂起,则通知外部挂起    
    if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
    // 结束任务执行,获取最终状态
    val state = makeCompletingOnce(result)
    // 如果需要等待子协程的结束,则通知外部挂起
    if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED
    // 如果执最终为异常状态
    return if (state is CompletedExceptionally) {
        when {
            // 通过参数判断是否抛出
            shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont)
            // 执行结果为异常
            result is CompletedExceptionally -> throw recoverStackTrace(result.cause, uCont)
            // 结果不为异常,则返回
            else -> result
        }
    } else {
        // 对最终状态进行拆箱,返回最终结果
        state.unboxState()
    }
}
// JobSupport中提供了下面的类和方法,当协程进入完成状态时,会对状态进行装箱。
// 包装类
private class IncompleteStateBox(@JvmField val state: Incomplete)
// 装箱
internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
// 拆箱
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this

在startUndispatchedOrReturn方法中,通过调用block的startCoroutineUninterceptedOrReturn方法启动协程,获取最终结果,并对结果进行异常处理。

接下来,将分析startCoroutineUninterceptedOrReturn方法如何启动协程,代码如下:

@SinceKotlin("1.3")
@InlineOnly
public actual inline fun <R, T> (suspend R.() -> T).startCoroutineUninterceptedOrReturn(
    receiver: R,
    completion: Continuation<T>
): Any? = (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)

这里,直接找到最终的actual方法,可以发现该方法没有创建状态机,而是直接执行了block。这个方法被设计用在suspendCoroutineUninterceptedOrReturn方法中,来恢复挂起协程的执行。

至此,可以知道startUndispatchedOrReturn方法实际上就是在同一个协程中执行了block。

3.ScopeCoroutine类

在withContext方法中,当上下文相同时,会创建一个类型为ScopeCoroutine的对象。ScopeCoroutine类代表一个标准的作用域协程,代码如下:

internal open class ScopeCoroutine<in T>(
    context: CoroutineContext,
    @JvmField val uCont: Continuation<T>
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
    final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
    final override fun getStackTraceElement(): StackTraceElement? = null
    // 作用域协程
    final override val isScopedCoroutine: Boolean get() = true
    internal val parent: Job? get() = parentContext[Job]
    // 该方法会在协程异常或取消时调用
    override fun afterCompletion(state: Any?) {
        // 进行拦截,切换线程,恢复执行
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
    // 该方法会在将其挂起的方法执行完毕后回调
    override fun afterResume(state: Any?) {
        // 直接恢复续体的执行
        uCont.resumeWith(recoverResult(state, uCont))
    }
}

ScopeCoroutine类重写了afterCompletion和afterResume两个方法,afterCompletion方法用于在协程取消时被回调。afterResume方法用于在挂起恢复时被回调。

根据上面的分析,当发生异常时,afterCompletion方法可能在其他的协程上下文中被调用,因此会调用拦截器切换回原本的线程中。而afterResume方法由于已经在正确的上下文环境中,因此可以直接恢复执行。

4.UndispatchedCoroutine类

在withContext方法中,当上下文不同,但调度器相同时,会创建一个类型为UndispatchedCoroutine的对象。UndispatchedCoroutine类继承自ScopeCoroutine类,重写了afterResume方法,代码如下:

private class UndispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    override fun afterResume(state: Any?) {
        val result = recoverResult(state, uCont)
        // 将当前线程ThreadLocal中的对象更新成uCont.context上下文对应的对象
        withCoroutineContext(uCont.context, null) {
            // 恢复执行
            uCont.resumeWith(result)
        }
    }
}

与父类ScopeCoroutine的afterResume方法相比,UndispatchedCoroutine类在afterResume方法中对协程上下文进行了更新,然后再恢复执行。

  • withCoroutineContext

withCoroutineContext方法用于当一个线程中执行多个协程时,保存和恢复ThreadLocal类中的对象。

通过withContext方法的代码可以知道,当上下文不同但调度器相同时,在执行之前会通过withCoroutineContext方法将ThreadLocal中的对象更新成newContext对应的对象。在执行结束后,又将ThradLocal中的对象更新成原本续体的上下文context对应的对象。代码如下:

internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
    // 将线程上下文更新新的上下文,并返回老的上下文
    val oldValue = updateThreadContext(context, countOrElement)
    try {
        // 在新的上下文环境中执行
        return block()
    } finally {
        // 执行结束恢复老的上下文
        restoreThreadContext(context, oldValue)
    }
}

协程中有一类上下文元素是ThreadContextElement,ThreadContextElement是一个接口,具体的实现类有CoroutineId类和ThreadLocalElement类。其中,CoroutineId类用来修改线程的名字。ThreadLocalElement类用来保存和恢复ThreadLocal类中的对象,withCoroutineContext方法内部的updateThreadContext方法与restoreThreadContext方法正是通过ThreadLocalElement类实现的。ThreadContextElement接口的代码如下:

public interface ThreadContextElement<S> : CoroutineContext.Element {
    // 用于更新新的上下文,并且返回老的上下文
    public fun updateThreadContext(context: CoroutineContext): S
    // 重新恢复当前线程的上下文,
    // 其中oldStart来自updateThreadContext方法的返回值
    public fun restoreThreadContext(context: CoroutineContext, oldState: S)
}

当调用updateThreadContext方法时,会返回一个代表当前状态的对象。当调用restoreThreadContext方法时,又需要传入一个代表状态的对象作为参数,来恢复之前的状态。因此,这就需要对updateThreadContext方法的返回值进行保存。

当协程上下文中只有一个ThreadContextElement接口指向的对象时,保存在变量中即可。而如果协程上下文中有多个ThreadContextElement接口指向的对象,这时就需要一个专门的类来对这些对象进行管理,这个类就是ThreadState类,他们之间的对应关系如下图所示:

withCoroutineContext方法执行图:

5.DispatchedCoroutine类

在withContext方法中,当需要切换线程时,会创建一个类型为DispatchedCoroutine的对象。DispatchedCoroutine类继承自ScopeCoroutine类,代码如下:

// 状态机状态
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2
private class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    // 初始状态
    private val _decision = atomic(UNDECIDED)
    // 尝试挂起
    private fun trySuspend(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
                RESUMED -> return false
                else -> error("Already suspended")
            }
        }
    }
    // 尝试恢复
    private fun tryResume(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
                SUSPENDED -> return false
                else -> error("Already resumed")
            }
        }
    }
    override fun afterCompletion(state: Any?) {
        // 通过afterResume方法实现
        afterResume(state)
    }
    override fun afterResume(state: Any?) {
        // 如果没有挂起,则返回
        if (tryResume()) return
        // 进行拦截,切换线程,恢复执行
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
    // 获取最终结果
    fun getResult(): Any? {
        if (trySuspend()) return COROUTINE_SUSPENDED
        val state = this.state.unboxState()
        if (state is CompletedExceptionally) throw state.cause
        @Suppress("UNCHECKED_CAST")
        return state as T
    }
}

DispatchedCoroutine类中使用了一个状态机模型,这个状态机与在Kotlin协程:生命周期原理中分析CancellableContinuationImpl类中的状态机相同,获取结果的逻辑也与CancellableContinuationImpl类相同。

这里最重要的是DispatchedCoroutine类重写了afterCompletion和afterResume方法,并且回调这两个方法都会进行线程的切换。

6.总结

  ScopeCoroutine类 UndispatchedCoroutine类 DispatchedCoroutine类
afterCompletion方法 切线程 切线程 切线程
afterResume方法 不切线程 不切线程。更新ThreadLocal 切线程

以上就是Kotlin线程的桥接与切换使用介绍的详细内容,更多关于Kotlin线程的资料请关注Devmax其它相关文章!

Kotlin线程的桥接与切换使用介绍的更多相关文章

  1. iOS:核心图像和多线程应用程序

    我试图以最有效的方式运行一些核心图像过滤器.试图避免内存警告和崩溃,这是我在渲染大图像时得到的.我正在看Apple的核心图像编程指南.关于多线程,它说:“每个线程必须创建自己的CIFilter对象.否则,你的应用程序可能会出现意外行为.”这是什么意思?我实际上是试图在后台线程上运行我的过滤器,所以我可以在主线程上运行HUD(见下文).这在coreImage的上下文中是否有意义?

  2. ios – 多个NSPersistentStoreCoordinator实例可以连接到同一个底层SQLite持久性存储吗?

    我读过的关于在多个线程上使用CoreData的所有内容都讨论了使用共享单个NSPersistentStoreCoordinator的多个NSManagedobjectContext实例.这是理解的,我已经使它在一个应用程序中工作,该应用程序在主线程上使用CoreData来支持UI,并且具有可能需要一段时间才能运行的后台获取操作.问题是NSPersistentStoreCoordinator会对基础

  3. ios – XCode断点应该只挂起当前线程

    我需要调试多线程错误.因此,为了获得生成崩溃的条件,我需要在代码中的特定点停止一个线程,并等待另一个线程到达第二个断点.我现在遇到的问题是,如果一个线程遇到断点,则所有其他线程都被挂起.有没有办法只停止一个线程,让其他线程运行,直到它们到达第二个断点?)其他更有趣的选择:当你点击第一个断点时,你可以进入控制台并写入这应该在该断点处暂停当前上下文中的线程一小时.然后在Xcode中恢复执行.

  4. ios – 在后台线程中写入Realm后,主线程看不到更新的数据

    >清除数据库.>进行API调用以获取新数据.>将从API检索到的数据写入后台线程中的数据库中.>从主线程上的数据库中读取数据并渲染UI.在步骤4中,数据应该是最新数据,但我们没有看到任何数据.解决方法具有runloops的线程上的Realm实例,例如主线程,updatetothelatestversionofthedataintheRealmfile,因为通知被发布到其线程的runloop.在后台

  5. ios – NSURLConnectionLoader线程中的奇怪崩溃

    我们开始看到我们的应用启动时发生的崩溃.我无法重现它,它只发生在少数用户身上.例外情况是:异常类型:EXC_BAD_ACCESS代码:KERN_INVALID_ADDRESS位于0x3250974659崩溃发生在名为com.apple.NSURLConnectionLoader的线程中在调用时–[NSBlockOperationmain]这是该线程的堆栈跟踪:非常感谢任何帮助,以了解可能导致这种崩

  6. ios – 合并子上下文时的NSObjectInaccessbileExceptions

    我尝试手动重现,但失败了.是否有其他可能发生这种情况的情况,是否有处理此类问题的提示?解决方法在创建子上下文时,您可以尝试使用以下行:

  7. ios – 从后台线程调用UIKit时发出警告

    你如何处理项目中的这个问题?

  8. ios – 在SpriteKit中,touchesBegan在与SKScene更新方法相同的线程中运行吗?

    在这里的Apple文档AdvancedSceneProcessing中,它描述了更新方法以及场景的呈现方式,但没有提到何时处理输入.目前尚不清楚它是否与渲染循环位于同一个线程中,或者它是否与它并发.如果我有一个对象,我从SKScene更新方法和touchesBegan方法(在这种情况下是SKSpriteNode)更新,我是否要担心同步对我的对象的两次访问?解决方法所以几天后没有回答我设置了一些实验

  9. ios – 在后台获取中加载UIWebView

    )那么,有一种方法可以在后台加载UIWebView吗?解决方法如果要从用户界面更新元素,则必须在应用程序的主队列(或线程)中访问它们.我建议您在后台继续获取所需的数据,但是当需要更新UIWebView时,请在主线程中进行.你可以这样做:或者您可以创建一个方法来更新UIWebView上的数据,并使用以下方法从后台线程调用它:这将确保您从正确的线程访问UIWebView.希望这可以帮助.

  10. ios – 何时使用Semaphore而不是Dispatch Group?

    我会假设我知道如何使用DispatchGroup,为了解问题,我尝试过:结果–预期–是:为了使用信号量,我实现了:并在viewDidLoad方法中调用它.结果是:从概念上讲,dispachGroup和Semaphore都有同样的目的.老实说,我不熟悉:什么时候使用信号量,尤其是在与dispachGroup合作时–可能–处理问题.我错过了什么部分?

随机推荐

  1. Flutter 网络请求框架封装详解

    这篇文章主要介绍了Flutter 网络请求框架封装详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  2. Android单选按钮RadioButton的使用详解

    今天小编就为大家分享一篇关于Android单选按钮RadioButton的使用详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

  3. 解决android studio 打包发现generate signed apk 消失不见问题

    这篇文章主要介绍了解决android studio 打包发现generate signed apk 消失不见问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

  4. Android 实现自定义圆形listview功能的实例代码

    这篇文章主要介绍了Android 实现自定义圆形listview功能的实例代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  5. 详解Android studio 动态fragment的用法

    这篇文章主要介绍了Android studio 动态fragment的用法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  6. Android用RecyclerView实现图标拖拽排序以及增删管理

    这篇文章主要介绍了Android用RecyclerView实现图标拖拽排序以及增删管理的方法,帮助大家更好的理解和学习使用Android,感兴趣的朋友可以了解下

  7. Android notifyDataSetChanged() 动态更新ListView案例详解

    这篇文章主要介绍了Android notifyDataSetChanged() 动态更新ListView案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下

  8. Android自定义View实现弹幕效果

    这篇文章主要为大家详细介绍了Android自定义View实现弹幕效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  9. Android自定义View实现跟随手指移动

    这篇文章主要为大家详细介绍了Android自定义View实现跟随手指移动,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  10. Android实现多点触摸操作

    这篇文章主要介绍了Android实现多点触摸操作,实现图片的放大、缩小和旋转等处理,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部