0%

协程的原理

suspend 到底做了什么

一个简单的协程调用

1
2
3
4
5
6
7
8
9
10
suspend fun task() {
delay(100) // 延时来模拟耗时任务
}

suspend fun runTask(): String {
val time = System.currentTimeMillis()
val result = task()
println(result)
return "getTask case ${time - System.currentTimeMillis()}"
}

这里runTask表示一个在协程环境调用一个耗时suspend方法并获取结果。
看一下 runTask() 方法反编译之后的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Nullable
public final Object runTask(@NotNull Continuation var1) {
Object $continuation;
label20: { // 这边label可以理解为goto语句
if (var1 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var1;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}

$continuation = new ContinuationImpl(var1) {
// $FF: synthetic field
Object result;
int label;
long J$0;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return ContinueTest.this.runTask(this);
}
};
}

Object $result = ((<undefinedtype>)$continuation).result;
Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
long time;
switch(((<undefinedtype>)$continuation).label) { // 状态机的状态值
case 0:
ResultKt.throwOnFailure($result);
time = System.currentTimeMillis();
((<undefinedtype>)$continuation).J$0 = time;
((<undefinedtype>)$continuation).label = 1;
var10000 = this.task((Continuation)$continuation);
if (var10000 == var8) {
// 进入挂起状态,结束
return var8;
}
break;
case 1:
time = ((<undefinedtype>)$continuation).J$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

String result = (String)var10000;
boolean var5 = false;
System.out.println(result);
return "getTask cost " + (time - System.currentTimeMillis());
}

有几个关注点:

  • suspend 增加了一个 @NotNull Continuation var1 参数, Continuation 意思是 继续 再开始 , 可以理解为一个状态机实现的callback
  • switch(((<undefinedtype>)$continuation).label)
    switch语句里的 continuation.label 即是状态机的状态值。
    case 0 是执行suspend之前的代码,case 1 代表执行suspend之后的代码。 对应调用了几次suspend挂起方法,就会生成对应的case
    case 0有一个判断
    if (var10000 == var8
    可以转化为
    if (var10000 == COROUTINE_SUSPENDED)
    这个判断被调用的task()是不是挂起的,因为里面做了delay,所以肯定为true, 直接return,告诉调用自己的上层自己也是挂起状态。

现在,runTask() 和 task() 都是挂起的,该如何从挂起状态恢复呢?

挂起的恢复

在上面反编译的代码里面,有一个很明显的label20: 标记,这里我理解就是一个goto语句,里面首先有一个if判断,这个不知道是什么,不管它。。。然后后面有一个new实例化$continuation = new ContinuationImpl(var1) var1是suspend方法的Continuation入参,ContinuationImpl 继承自 BaseContinuationImpl , 我们来分析一下这个 BaseContinuationImpl 的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public interface Continuation<in T> {
public val context: kotlin.coroutines.CoroutineContext

public fun resumeWith(result: kotlin.Result<T>): kotlin.Unit
}

/**
completion 入参是 suspend 方法,在编译阶段增加的参数 `Continuation var1`
在 resumeWith 方法内部,会调用 $continuation 的 resumeWith 并递归向上调用 var1 的 resumeWith
**/
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) { // this ->
// ... 这个with语句,里面的变量有些是current的成员变量,注意区分...
// ... kotlin //的垃圾语法糖啊
val completion = completion!! // 这个 completion 是 current.completion ...
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param) // current.invokeCurrent(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
}

BaseContinuationImpl 实现了 Continuation 接口,resumeWith 在 耗时挂起执行完了之后会被调用。
在前面反编译的代码中创建了匿名内部类

1
2
3
4
5
6
7
8
9
10
11
12
$continuation = new ContinuationImpl(var1) {
Object result;
int label;
long J$0;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return ContinueTest.this.runTask(this);
}
};

continuation 的初始 label 值是0,在switch语句里,执行完suspend之前的代码后,label会设为1, 当调用完suspend方法后, BaseContinuationImpl#resumeWith 会调用 invokeSuspend 方法递归调用suspend方法 runTask() 。 如果 suspend 方法还是 挂起的,那么直接return。否则递归不断向上resume