C:实现一个迷你无栈协程框架——Minico

前言

让我们使用状态机编程实现无栈协程!只需要 50 行代码就可以实现有趣的功能。

状态设计

为了管理协程的生命周期,我们需要定义一些状态。状态转移图如下:

           +-------------------+
           |                   |
           |                   v
        +----+---+        +----------+
        | CO_INIT|  ----> |  CO_RUN  |
        +-----+--+        +----------+
           |                   |
           |                   |
           |                   |
           |                   |
           |                   v
           |               +---------+
           |               | CO_YIELD|
           |               +---------+
           |                   |
           |                   |
           +-------------------+
                               |
                               v
                          +----+----+
                          | CO_FIN  |
                          +----+----+

状态说明

我们设计了四个状态。

  • CO_INIT: 协程初始化状态,准备执行。

  • CO_RUN: 协程正在运行,可以生成下一个值。

  • CO_YIELD: 协程已生成一个值并可以暂停,准备恢复执行。

  • CO_FIN: 协程已完成所有操作,终止状态。

转移说明

  1. 从 CO_INIT 到 CO_RUN: 协程初始化后,第一次调用 next 会进入运行状态。

  2. 从 CO_RUN 到 CO_YIELD: 协程生成一个值后,进入暂停状态。这个状态下用户可以继续调用 next.

  3. 从 CO_YIELD 回到 CO_RUN: 用户调用 next,协程恢复执行,继续生成下一个值。

  4. 从 CO_RUN 到 CO_FIN: 协程完成所有生成操作,进入结束状态。这个状态下,调用 next 不会导致协程再执行。

实现

框架

首先实现协程的框架代码。

 1#include <stdio.h>
 2#include <stdlib.h>
 3
 4typedef enum {
 5    CO_INIT,
 6    CO_RUN,
 7    CO_YIELD,
 8    CO_FIN
 9} CoState;
10
11struct Co;
12
13typedef int (*CoTask)(struct Co *co, void *data);
14
15typedef struct Co {
16    CoState state;
17    CoTask func;
18    void *data;
19} Co;
20
21void co_init(Co *co, CoTask func, void *data) {
22    co->state = CO_INIT;
23    co->func = func;
24    co->data = data;
25}
26
27int co_next(Co *co) {
28    if (co->state == CO_FIN) {
29        return 0;
30    }
31    return co->func(co, co->data);
32}

在这里,协程需要保存其状态标签、具体逻辑指向的函数和用户自定义的状态数据。因此定义一个结构体 Co 来存储这些信息。

是的,这就是最核心的部分,只有 30 行代码。

实例:斐波那契数列

然后我们写一个具体例子来用这个框架。下面写了一个斐波那契数列的例子。定义三个状态,n 表示剩余的生成次数。a, b 表示最近两次的历史数据。

然后我们还用一个 switch 来手动控制协程的状态转移。

 1typedef struct {
 2    int n; 
 3    int a, b; 
 4} FibData;
 5
 6int fib_func(Co *co, void *data) {
 7    FibData *fib = (FibData *)data;
 8
 9    switch (co->state) {
10        case CO_INIT:
11            fib->a = 0;
12            fib->b = 1;
13            co->state = CO_RUN;
14            return fib->a;
15
16        case CO_RUN:
17            if (fib->n <= 0) {
18                co->state = CO_FIN;
19                return 0;
20            }
21            int curr = fib->a;
22            int next = fib->a + fib->b;
23            fib->a = fib->b;
24            fib->b = next;
25            fib->n--;
26            co->state = CO_YIELD;
27            return curr;
28
29        case CO_YIELD:
30            co->state = CO_RUN;
31            return fib_func(co, data);
32
33        case CO_FIN:
34            return 0;
35    }
36    return 0; 
37}

最后在主程序中,完成协程函数绑定,并且反复调用 co_next 来执行它:

 1
 2int get_fib_count(int argc, char *argv[]) {
 3    if (argc < 2) {
 4        return 10; 
 5    }
 6    char *endptr;
 7    long n = strtol(argv[1], &endptr, 10);
 8    if (*endptr != '\0' || n <= 0) {
 9        fprintf(stderr, "Expected a positive integer argument.\n");
10        exit(EXIT_FAILURE);
11    }
12    return (int)n;
13}
14
15int main(int argc, char *argv[]) {
16    int n = get_fib_count(argc, argv);
17    FibData fib = {.n = n};
18    Co co;
19
20    co_init(&co, fib_func, &fib);
21
22    while (co.state != CO_FIN) {
23        int val = co_next(&co);
24        if (co.state != CO_FIN) {
25            printf("%d ", val);
26        }
27    }
28    printf("\n");
29
30    return 0;
31}

运行输出:

0 0 1 1 2 3 5 8 13 21 34 

可用性改进

上面的协程虽然能跑,但是存在这几个问题:

  1. 协程程序只有一个恢复点,导致我们不能实现状态更复杂的程序。

  2. 协程程序编写的人要手动控制状态转移,很麻烦,容易出错。

简化状态控制

根据依赖倒置的原则,我们也应该把初始化逻辑抽离到上一级。

1    FibData fib = {.n = n, .a = 0, .b = 1};

同时,为了简化状态控制,我在 co_next 中设置状态为 CO_RUN

1int co_next(Co *co)
2{
3    if (co->state == CO_FIN)
4    {
5        return 0;
6    }
7    co->state = CO_RUN;
8    return co->func(co, co->data);
9}

这样 fib_run 中就只需要处理是否终止了:

 1int fib_func(Co *co, void *data)
 2{
 3    FibData *fib = (FibData *)data;
 4
 5    if (fib->n > 0)
 6    {
 7        int curr = fib->a;
 8        int next = fib->a + fib->b;
 9        fib->a = fib->b;
10        fib->b = next;
11        fib->n--;
12        co->state = CO_YIELD;
13        return curr;
14    }
15
16    co->state = CO_FIN;
17    return 0;
18}

支持多个暂停点

为了实现多处暂停和恢复,需要为每个暂停点编号,在暂停时,记下下次要跳转到的编号。所以我们增加一个 label 字段来记录这个点。

1typedef struct Co
2{
3    _CoState state;
4    CoTask func;
5    void *data;
6    int label; // 记住上次离开的点
7} Co;

并且保证它的默认值是 0.

1void co_init(Co *co, CoTask func, void *data)
2{
3    co->state = CO_INIT;
4    co->func = func;
5    co->data = data;
6    co->label = 0; // 添加这行
7}

为了验证是否成功,我们可以稍微更改程序,检测到 5 的时候,打印一个特殊的消息:

 1int fib_func(Co *co, void *data)
 2{
 3    FibData *fib = (FibData *)data;
 4
 5    switch (co->label)
 6    {
 7    case 0:
 8        if (fib->n > 0)
 9        {
10            int curr = fib->a;
11            int next = fib->a + fib->b;
12            fib->a = fib->b;
13            fib->b = next;
14            fib->n--;
15            co->state = CO_YIELD;
16            if (curr == 5)
17            {
18                co->label = 1;
19            }
20            return curr;
21        }
22        break;
23    case 1:
24        printf("\noh you got 5!\n");
25        co->label = 0;
26        return fib->a;
27        break;
28    }
29
30    co->state = CO_FIN;
31    return 0;
32}

程序输出:

0 1 1 2 3 5 
oh you got 5!
8 8 13 21 34

和预期一致。

让它更加易用

我们发现其实开发者不需要关注协程处于什么状态。因此可以封装起来。

 1typedef enum
 2{
 3    _CO_STATE_INIT,
 4    _CO_STATE_RUN,
 5    _CO_STATE_YIELD,
 6    _CO_STATE_FIN
 7} _CoState;
 8
 9// ...
10
11int co_stop(Co *co)
12{
13    co->state = _CO_STATE_FIN;
14    return 0;
15}
16
17int co_done(Co *co)
18{
19    return co->state == _CO_STATE_FIN;
20}

然后我们施展一点小魔法:

 1#define CO_BEGIN(co)     \
 2    switch ((co)->label) \
 3    {                    \
 4    case 0:
 5#define CO_YIELD(co, value)     \
 6    do                          \
 7    {                           \
 8        (co)->label = __LINE__; \
 9        (co)->state = _CO_STATE_YIELD; \
10        return (value);         \
11    case __LINE__:;             \
12    } while (0)
13#define CO_END(co)        \
14    (co)->state = _CO_STATE_FIN; \
15    }

C 内置的宏 __LINE__ 表示当前行号,恰好可以利用一下,表示当前的执行状态。这样下次跳转可以直接从对应的 case 开始。实际上,这种技术被称为“Duff’s device”或“computed goto”,在实现协程或状态机时非常有用。关于这部分知识,可以阅读我的另一篇文章 理解 LuaJIT:一个简单程序的执行过程 中的相关章节。

具体的解释如下:

CO_BEGIN(co)

1#define CO_BEGIN(co)     \
2    switch ((co)->label) \
3    {                    \
4    case 0:

这个宏用来标记协程状态机的开始。它使用 switch 语句来跳转到之前保存的 label 位置。初始状态,label 为 0,因此从 case 0: 开始执行。

CO_YIELD(co, value)

1#define CO_YIELD(co, value)     \
2    do                          \
3    {                           \
4        (co)->label = __LINE__; \
5        (co)->state = CO_YIELD; \
6        return (value);         \
7    case __LINE__:;             \
8    } while (0)

这个宏让用户可以比较方便地交出控制权。执行以下操作:

  1. 将当前行号(__LINE__)保存到 label 中。

  2. 将协程状态设置为 CO_YIELD

  3. 返回指定的值。

  4. 在下一次调用时,switch 语句会跳转到 case __LINE__:,即上次暂停的位置继续执行。

CO_END(co)

1#define CO_END(co)        \
2    (co)->state = CO_FIN; \
3    }

这个宏时协程状态机的结束部分,将协程状态设置为 CO_FIN,并且闭合了花括号。

其实这里还用到一个 C 的一个冷门技巧:case 语句并不必 switch 的花括号的第一个直接层级内。所以即便是复杂的作用域情形,CO_YIELD 也可以正常工作。

实例:质数分解

我们定义一个 prime_factor_co 函数,它有两个 YIELD,而且还位于不同的作用域层级:

 1int prime_factor_co(Co *co, void *data)
 2{
 3    PrimeFactorData *pfData = (PrimeFactorData *)data;
 4    CO_BEGIN(co);
 5
 6    pfData->current = 2;
 7    while (pfData->number > 1)
 8    {
 9        while (pfData->number % pfData->current == 0)
10        {
11            pfData->number /= pfData->current;
12            pfData->factor = pfData->current;
13            CO_YIELD(co, pfData->factor);
14        }
15        pfData->current++;
16        CO_YIELD(co, 0);
17    }
18
19    CO_END(co);
20    return 0;
21}

在每次 CO_YIELD 调用时,当前行号被保存到 label 中。当协程再次运行时,switch 语句会跳转到保存的行号,从而继续执行。

为了更容易理解,可以看我手动展开后的版本:

 1#include <stdio.h>
 2#include <stdlib.h>
 3#include "co.h"
 4
 5typedef struct
 6{
 7    int number;
 8    int factor;
 9    int current;
10} PrimeFactorData;
11
12int prime_factor_co(Co *co, void *data)
13{
14    PrimeFactorData *pfData = (PrimeFactorData *)data;
15    switch ((co)->label)
16    {
17    case 0:
18        pfData->current = 2;
19        while (pfData->number > 1)
20        {
21            while (pfData->number % pfData->current == 0)
22            {
23                pfData->number /= pfData->current;
24                pfData->factor = pfData->current;
25                (co)->label = 25;
26                (co)->state = _CO_STATE_YIELD;
27                return (pfData->factor);
28            case 25:;
29            }
30            pfData->current++;
31            (co)->label = 31;
32            (co)->state = _CO_STATE_YIELD;
33            return (0);
34        case 31:;
35        }
36
37        (co)->state = _CO_STATE_FIN;
38    }
39
40    return 0;
41}

通过这种方式,label 变量实现了保存和恢复协程执行位置的功能,使得协程可以在多个 YIELD 点之间切换。

保存和恢复局部变量

意想不到的功能

我们的协程依然具有局限性。理论上,它无法保存和恢复局部变量。不过,如果我们编写一个这样的程序:

 1#include <stdio.h>
 2#include "co.h"
 3
 4int task(Co *co, void *data)
 5{
 6    CO_BEGIN(co);
 7
 8    int i = 0;
 9    int sum = 0;
10    printf("Task start: i = %d, sum = %d\n", i, sum);
11
12    for (i = 1; i <= 3; ++i)
13    {
14        sum += i;
15        printf("Task yield: i = %d, sum = %d\n", i, sum);
16        CO_YIELD(co, sum);
17    }
18
19    printf("Task end: i = %d, sum = %d\n", i, sum);
20
21    CO_END(co);
22    return 0;
23}
24int main()
25{
26    Co co;
27    co_init(&co, task, NULL);
28
29    while (!co_done(&co))
30    {
31        int result = co_next(&co);
32        printf("Main: result = %d\n", result);
33    }
34
35    return 0;
36}

可以看到它竟然正常输出:

1Task start: i = 0, sum = 0
2Task yield: i = 1, sum = 1
3Main: result = 1
4Task yield: i = 2, sum = 3
5Main: result = 3
6Task yield: i = 3, sum = 6
7Main: result = 6
8Task end: i = 4, sum = 6
9Main: result = 0

破案

这是因为 co_next 调用之后,它的栈帧并不会被清零,只是逻辑上不再使用而已。因此如果下一次调用依然是 co_next,则栈帧的数据依然时沿用上次的。所以产生了局部变量被“保存”的效果。

只需稍作修改,就破坏了栈帧,证明局部变量并不能被稳定保存。

 1void foo()
 2{
 3    int arr[100] = {0};
 4    int i = 0;
 5    for (i = 0; i < 100; ++i)
 6    {
 7        arr[i] = i;
 8    }
 9}
10
11int main()
12{
13    Co co;
14    co_init(&co, task, NULL);
15
16    while (!co_done(&co))
17    {
18        int result = co_next(&co);
19        printf("Main: result = %d\n", result);
20        foo();
21    }
22
23    return 0;
24}

输出:

1Task start: i = 0, sum = 0
2Task yield: i = 1, sum = 1
3Main: result = 1
4Task end: i = 95, sum = 95
5Main: result = 0
6(base)

这证明了我们的观点。

在这样的情况下,我们最安全的做法是利用协程结构体存放要在不同状态共享的数据。

支持保存局部变量

我们有两种实现思路,一种是在编译阶段识别用到的变量,将其捕获到一个暂存区域,届时恢复。不过,我们暂时没有对编译器开洞的能力。

另一个思路。众所周知,局部变量保存在栈帧里,如果我们可以通过计算栈帧的大小和位置,将其暂存,等重新运行的时候再取出恢复,就可以获得局部变量了。

 1#ifndef __TINY_CO_H__
 2#define __TINY_CO_H__
 3
 4#include <stdlib.h>
 5#include <string.h>
 6#include <stdio.h>
 7
 8typedef enum
 9{
10    _CO_STATE_INIT,
11    _CO_STATE_RUN,
12    _CO_STATE_YIELD,
13    _CO_STATE_FIN
14} _CoState;
15
16struct Co;
17
18typedef int (*CoTask)(struct Co *co, void *data);
19
20typedef struct Co
21{
22    _CoState state;
23    CoTask func;
24    void *data;
25    int label;
26    void *stack;
27    size_t stack_size;
28    void *stack_base;
29} Co;
30
31#define CO_BEGIN(co)     \
32    switch ((co)->label) \
33    {                    \
34    case 0:
35
36#define CO_YIELD(co, value)                                                                           \
37    do                                                                                                \
38    {                                                                                                 \
39        (co)->label = __LINE__;                                                                       \
40        asm volatile(                                                                                 \
41            "mov %%rsp, %0\n\t"                                                                       \
42            "mov %%rbp, %1"                                                                           \
43            : "=g"((co)->stack), "=g"((co)->stack_base));                                             \
44        (co)->stack_size = (char *)(co)->stack_base - (char *)(co)->stack;                            \
45        (co)->stack = malloc((co)->stack_size);                                                       \
46        memcpy((co)->stack, (void *)((char *)(co)->stack_base - (co)->stack_size), (co)->stack_size); \
47        (co)->state = _CO_STATE_YIELD;                                                                \
48        return (value);                                                                               \
49    case __LINE__:                                                                                    \
50        memcpy((void *)((char *)(co)->stack_base - (co)->stack_size), (co)->stack, (co)->stack_size); \
51        free((co)->stack);                                                                            \
52        (co)->stack = NULL;                                                                           \
53        asm volatile(                                                                                 \
54            "mov %0, %%rsp\n\t"                                                                       \
55            "mov %1, %%rbp"                                                                           \
56            :                                                                                         \
57            : "g"((char *)(co)->stack_base - (co)->stack_size), "g"((co)->stack_base));               \
58    } while (0)
59
60#define CO_END(co)               \
61    (co)->state = _CO_STATE_FIN; \
62    }
63
64// ...
65
66#endif // __TINY_CO_H__

这种方法也存在缺点:

  • 如果开启编译优化,可能导致某些代码不能正常工作。比如优化后可能根本不用栈来存放局部变量,那么我们的这些代码就失去意义。

  • 只支持了 x86_64 平台的汇编。

完整的代码

我已经把代码开源到 Github:pluveto/minico: a simple, header-only stack-less coroutine library implemented in C (github.com)

您可以留下一个 star 以资鼓励,后面我们也许会一起试一下写一个有栈协程。