C:实现一个迷你无栈协程框架——Minico
前言
让我们使用状态机编程实现无栈协程!只需要 50 行代码就可以实现有趣的功能。
状态设计
为了管理协程的生命周期,我们需要定义一些状态。状态转移图如下:
+-------------------+
| |
| v
+----+---+ +----------+
| CO_INIT| ----> | CO_RUN |
+-----+--+ +----------+
| |
| |
| |
| |
| v
| +---------+
| | CO_YIELD|
| +---------+
| |
| |
+-------------------+
|
v
+----+----+
| CO_FIN |
+----+----+
状态说明
我们设计了四个状态。
-
CO_INIT: 协程初始化状态,准备执行。
-
CO_RUN: 协程正在运行,可以生成下一个值。
-
CO_YIELD: 协程已生成一个值并可以暂停,准备恢复执行。
-
CO_FIN: 协程已完成所有操作,终止状态。
转移说明
-
从 CO_INIT 到 CO_RUN: 协程初始化后,第一次调用 next 会进入运行状态。
-
从 CO_RUN 到 CO_YIELD: 协程生成一个值后,进入暂停状态。这个状态下用户可以继续调用 next.
-
从 CO_YIELD 回到 CO_RUN: 用户调用 next,协程恢复执行,继续生成下一个值。
-
从 CO_RUN 到 CO_FIN: 协程完成所有生成操作,进入结束状态。这个状态下,调用 next 不会导致协程再执行。
实现
框架
首先实现协程的框架代码。
1#include <stdio.h>
2#include <stdlib.h>
3
4typedef enum {
5 CO_INIT,
6 CO_RUN,
7 CO_YIELD,
8 CO_FIN
9} CoState;
10
11struct Co;
12
13typedef int (*CoTask)(struct Co *co, void *data);
14
15typedef struct Co {
16 CoState state;
17 CoTask func;
18 void *data;
19} Co;
20
21void co_init(Co *co, CoTask func, void *data) {
22 co->state = CO_INIT;
23 co->func = func;
24 co->data = data;
25}
26
27int co_next(Co *co) {
28 if (co->state == CO_FIN) {
29 return 0;
30 }
31 return co->func(co, co->data);
32}
在这里,协程需要保存其状态标签、具体逻辑指向的函数和用户自定义的状态数据。因此定义一个结构体 Co
来存储这些信息。
是的,这就是最核心的部分,只有 30 行代码。
实例:斐波那契数列
然后我们写一个具体例子来用这个框架。下面写了一个斐波那契数列的例子。定义三个状态,n 表示剩余的生成次数。a, b 表示最近两次的历史数据。
然后我们还用一个 switch 来手动控制协程的状态转移。
1typedef struct {
2 int n;
3 int a, b;
4} FibData;
5
6int fib_func(Co *co, void *data) {
7 FibData *fib = (FibData *)data;
8
9 switch (co->state) {
10 case CO_INIT:
11 fib->a = 0;
12 fib->b = 1;
13 co->state = CO_RUN;
14 return fib->a;
15
16 case CO_RUN:
17 if (fib->n <= 0) {
18 co->state = CO_FIN;
19 return 0;
20 }
21 int curr = fib->a;
22 int next = fib->a + fib->b;
23 fib->a = fib->b;
24 fib->b = next;
25 fib->n--;
26 co->state = CO_YIELD;
27 return curr;
28
29 case CO_YIELD:
30 co->state = CO_RUN;
31 return fib_func(co, data);
32
33 case CO_FIN:
34 return 0;
35 }
36 return 0;
37}
最后在主程序中,完成协程函数绑定,并且反复调用 co_next 来执行它:
1
2int get_fib_count(int argc, char *argv[]) {
3 if (argc < 2) {
4 return 10;
5 }
6 char *endptr;
7 long n = strtol(argv[1], &endptr, 10);
8 if (*endptr != '\0' || n <= 0) {
9 fprintf(stderr, "Expected a positive integer argument.\n");
10 exit(EXIT_FAILURE);
11 }
12 return (int)n;
13}
14
15int main(int argc, char *argv[]) {
16 int n = get_fib_count(argc, argv);
17 FibData fib = {.n = n};
18 Co co;
19
20 co_init(&co, fib_func, &fib);
21
22 while (co.state != CO_FIN) {
23 int val = co_next(&co);
24 if (co.state != CO_FIN) {
25 printf("%d ", val);
26 }
27 }
28 printf("\n");
29
30 return 0;
31}
运行输出:
0 0 1 1 2 3 5 8 13 21 34
可用性改进
上面的协程虽然能跑,但是存在这几个问题:
-
协程程序只有一个恢复点,导致我们不能实现状态更复杂的程序。
-
协程程序编写的人要手动控制状态转移,很麻烦,容易出错。
简化状态控制
根据依赖倒置的原则,我们也应该把初始化逻辑抽离到上一级。
1 FibData fib = {.n = n, .a = 0, .b = 1};
同时,为了简化状态控制,我在 co_next
中设置状态为 CO_RUN
1int co_next(Co *co)
2{
3 if (co->state == CO_FIN)
4 {
5 return 0;
6 }
7 co->state = CO_RUN;
8 return co->func(co, co->data);
9}
这样 fib_run
中就只需要处理是否终止了:
1int fib_func(Co *co, void *data)
2{
3 FibData *fib = (FibData *)data;
4
5 if (fib->n > 0)
6 {
7 int curr = fib->a;
8 int next = fib->a + fib->b;
9 fib->a = fib->b;
10 fib->b = next;
11 fib->n--;
12 co->state = CO_YIELD;
13 return curr;
14 }
15
16 co->state = CO_FIN;
17 return 0;
18}
支持多个暂停点
为了实现多处暂停和恢复,需要为每个暂停点编号,在暂停时,记下下次要跳转到的编号。所以我们增加一个 label 字段来记录这个点。
1typedef struct Co
2{
3 _CoState state;
4 CoTask func;
5 void *data;
6 int label; // 记住上次离开的点
7} Co;
并且保证它的默认值是 0.
1void co_init(Co *co, CoTask func, void *data)
2{
3 co->state = CO_INIT;
4 co->func = func;
5 co->data = data;
6 co->label = 0; // 添加这行
7}
为了验证是否成功,我们可以稍微更改程序,检测到 5 的时候,打印一个特殊的消息:
1int fib_func(Co *co, void *data)
2{
3 FibData *fib = (FibData *)data;
4
5 switch (co->label)
6 {
7 case 0:
8 if (fib->n > 0)
9 {
10 int curr = fib->a;
11 int next = fib->a + fib->b;
12 fib->a = fib->b;
13 fib->b = next;
14 fib->n--;
15 co->state = CO_YIELD;
16 if (curr == 5)
17 {
18 co->label = 1;
19 }
20 return curr;
21 }
22 break;
23 case 1:
24 printf("\noh you got 5!\n");
25 co->label = 0;
26 return fib->a;
27 break;
28 }
29
30 co->state = CO_FIN;
31 return 0;
32}
程序输出:
0 1 1 2 3 5
oh you got 5!
8 8 13 21 34
和预期一致。
让它更加易用
我们发现其实开发者不需要关注协程处于什么状态。因此可以封装起来。
1typedef enum
2{
3 _CO_STATE_INIT,
4 _CO_STATE_RUN,
5 _CO_STATE_YIELD,
6 _CO_STATE_FIN
7} _CoState;
8
9// ...
10
11int co_stop(Co *co)
12{
13 co->state = _CO_STATE_FIN;
14 return 0;
15}
16
17int co_done(Co *co)
18{
19 return co->state == _CO_STATE_FIN;
20}
然后我们施展一点小魔法:
1#define CO_BEGIN(co) \
2 switch ((co)->label) \
3 { \
4 case 0:
5#define CO_YIELD(co, value) \
6 do \
7 { \
8 (co)->label = __LINE__; \
9 (co)->state = _CO_STATE_YIELD; \
10 return (value); \
11 case __LINE__:; \
12 } while (0)
13#define CO_END(co) \
14 (co)->state = _CO_STATE_FIN; \
15 }
C 内置的宏 __LINE__
表示当前行号,恰好可以利用一下,表示当前的执行状态。这样下次跳转可以直接从对应的 case
开始。实际上,这种技术被称为“Duff’s device”或“computed goto”,在实现协程或状态机时非常有用。关于这部分知识,可以阅读我的另一篇文章 理解 LuaJIT:一个简单程序的执行过程 中的相关章节。
具体的解释如下:
CO_BEGIN(co)
1#define CO_BEGIN(co) \
2 switch ((co)->label) \
3 { \
4 case 0:
这个宏用来标记协程状态机的开始。它使用 switch
语句来跳转到之前保存的 label
位置。初始状态,label
为 0,因此从 case 0:
开始执行。
CO_YIELD(co, value)
1#define CO_YIELD(co, value) \
2 do \
3 { \
4 (co)->label = __LINE__; \
5 (co)->state = CO_YIELD; \
6 return (value); \
7 case __LINE__:; \
8 } while (0)
这个宏让用户可以比较方便地交出控制权。执行以下操作:
-
将当前行号(
__LINE__
)保存到label
中。 -
将协程状态设置为
CO_YIELD
。 -
返回指定的值。
-
在下一次调用时,
switch
语句会跳转到case __LINE__:
,即上次暂停的位置继续执行。
CO_END(co)
1#define CO_END(co) \
2 (co)->state = CO_FIN; \
3 }
这个宏时协程状态机的结束部分,将协程状态设置为 CO_FIN
,并且闭合了花括号。
其实这里还用到一个 C 的一个冷门技巧:case 语句并不必 switch 的花括号的第一个直接层级内。所以即便是复杂的作用域情形,CO_YIELD
也可以正常工作。
实例:质数分解
我们定义一个 prime_factor_co
函数,它有两个 YIELD
,而且还位于不同的作用域层级:
1int prime_factor_co(Co *co, void *data)
2{
3 PrimeFactorData *pfData = (PrimeFactorData *)data;
4 CO_BEGIN(co);
5
6 pfData->current = 2;
7 while (pfData->number > 1)
8 {
9 while (pfData->number % pfData->current == 0)
10 {
11 pfData->number /= pfData->current;
12 pfData->factor = pfData->current;
13 CO_YIELD(co, pfData->factor);
14 }
15 pfData->current++;
16 CO_YIELD(co, 0);
17 }
18
19 CO_END(co);
20 return 0;
21}
在每次 CO_YIELD
调用时,当前行号被保存到 label
中。当协程再次运行时,switch
语句会跳转到保存的行号,从而继续执行。
为了更容易理解,可以看我手动展开后的版本:
1#include <stdio.h>
2#include <stdlib.h>
3#include "co.h"
4
5typedef struct
6{
7 int number;
8 int factor;
9 int current;
10} PrimeFactorData;
11
12int prime_factor_co(Co *co, void *data)
13{
14 PrimeFactorData *pfData = (PrimeFactorData *)data;
15 switch ((co)->label)
16 {
17 case 0:
18 pfData->current = 2;
19 while (pfData->number > 1)
20 {
21 while (pfData->number % pfData->current == 0)
22 {
23 pfData->number /= pfData->current;
24 pfData->factor = pfData->current;
25 (co)->label = 25;
26 (co)->state = _CO_STATE_YIELD;
27 return (pfData->factor);
28 case 25:;
29 }
30 pfData->current++;
31 (co)->label = 31;
32 (co)->state = _CO_STATE_YIELD;
33 return (0);
34 case 31:;
35 }
36
37 (co)->state = _CO_STATE_FIN;
38 }
39
40 return 0;
41}
通过这种方式,label
变量实现了保存和恢复协程执行位置的功能,使得协程可以在多个 YIELD
点之间切换。
保存和恢复局部变量
意想不到的功能
我们的协程依然具有局限性。理论上,它无法保存和恢复局部变量。不过,如果我们编写一个这样的程序:
1#include <stdio.h>
2#include "co.h"
3
4int task(Co *co, void *data)
5{
6 CO_BEGIN(co);
7
8 int i = 0;
9 int sum = 0;
10 printf("Task start: i = %d, sum = %d\n", i, sum);
11
12 for (i = 1; i <= 3; ++i)
13 {
14 sum += i;
15 printf("Task yield: i = %d, sum = %d\n", i, sum);
16 CO_YIELD(co, sum);
17 }
18
19 printf("Task end: i = %d, sum = %d\n", i, sum);
20
21 CO_END(co);
22 return 0;
23}
24int main()
25{
26 Co co;
27 co_init(&co, task, NULL);
28
29 while (!co_done(&co))
30 {
31 int result = co_next(&co);
32 printf("Main: result = %d\n", result);
33 }
34
35 return 0;
36}
可以看到它竟然正常输出:
1Task start: i = 0, sum = 0
2Task yield: i = 1, sum = 1
3Main: result = 1
4Task yield: i = 2, sum = 3
5Main: result = 3
6Task yield: i = 3, sum = 6
7Main: result = 6
8Task end: i = 4, sum = 6
9Main: result = 0
破案
这是因为 co_next
调用之后,它的栈帧并不会被清零,只是逻辑上不再使用而已。因此如果下一次调用依然是 co_next,则栈帧的数据依然时沿用上次的。所以产生了局部变量被“保存”的效果。
只需稍作修改,就破坏了栈帧,证明局部变量并不能被稳定保存。
1void foo()
2{
3 int arr[100] = {0};
4 int i = 0;
5 for (i = 0; i < 100; ++i)
6 {
7 arr[i] = i;
8 }
9}
10
11int main()
12{
13 Co co;
14 co_init(&co, task, NULL);
15
16 while (!co_done(&co))
17 {
18 int result = co_next(&co);
19 printf("Main: result = %d\n", result);
20 foo();
21 }
22
23 return 0;
24}
输出:
1Task start: i = 0, sum = 0
2Task yield: i = 1, sum = 1
3Main: result = 1
4Task end: i = 95, sum = 95
5Main: result = 0
6(base)
这证明了我们的观点。
在这样的情况下,我们最安全的做法是利用协程结构体存放要在不同状态共享的数据。
支持保存局部变量
我们有两种实现思路,一种是在编译阶段识别用到的变量,将其捕获到一个暂存区域,届时恢复。不过,我们暂时没有对编译器开洞的能力。
另一个思路。众所周知,局部变量保存在栈帧里,如果我们可以通过计算栈帧的大小和位置,将其暂存,等重新运行的时候再取出恢复,就可以获得局部变量了。
1#ifndef __TINY_CO_H__
2#define __TINY_CO_H__
3
4#include <stdlib.h>
5#include <string.h>
6#include <stdio.h>
7
8typedef enum
9{
10 _CO_STATE_INIT,
11 _CO_STATE_RUN,
12 _CO_STATE_YIELD,
13 _CO_STATE_FIN
14} _CoState;
15
16struct Co;
17
18typedef int (*CoTask)(struct Co *co, void *data);
19
20typedef struct Co
21{
22 _CoState state;
23 CoTask func;
24 void *data;
25 int label;
26 void *stack;
27 size_t stack_size;
28 void *stack_base;
29} Co;
30
31#define CO_BEGIN(co) \
32 switch ((co)->label) \
33 { \
34 case 0:
35
36#define CO_YIELD(co, value) \
37 do \
38 { \
39 (co)->label = __LINE__; \
40 asm volatile( \
41 "mov %%rsp, %0\n\t" \
42 "mov %%rbp, %1" \
43 : "=g"((co)->stack), "=g"((co)->stack_base)); \
44 (co)->stack_size = (char *)(co)->stack_base - (char *)(co)->stack; \
45 (co)->stack = malloc((co)->stack_size); \
46 memcpy((co)->stack, (void *)((char *)(co)->stack_base - (co)->stack_size), (co)->stack_size); \
47 (co)->state = _CO_STATE_YIELD; \
48 return (value); \
49 case __LINE__: \
50 memcpy((void *)((char *)(co)->stack_base - (co)->stack_size), (co)->stack, (co)->stack_size); \
51 free((co)->stack); \
52 (co)->stack = NULL; \
53 asm volatile( \
54 "mov %0, %%rsp\n\t" \
55 "mov %1, %%rbp" \
56 : \
57 : "g"((char *)(co)->stack_base - (co)->stack_size), "g"((co)->stack_base)); \
58 } while (0)
59
60#define CO_END(co) \
61 (co)->state = _CO_STATE_FIN; \
62 }
63
64// ...
65
66#endif // __TINY_CO_H__
这种方法也存在缺点:
-
如果开启编译优化,可能导致某些代码不能正常工作。比如优化后可能根本不用栈来存放局部变量,那么我们的这些代码就失去意义。
-
只支持了 x86_64 平台的汇编。
完整的代码
我已经把代码开源到 Github:pluveto/minico: a simple, header-only stack-less coroutine library implemented in C (github.com)
您可以留下一个 star 以资鼓励,后面我们也许会一起试一下写一个有栈协程。