浅谈C++20 协程那点事儿

771次阅读  |  发布于1年以前

阿里妹导读

本文是 C++20 的协程入门文章,作者围绕协程的概念到协程的实现思路全方位进行讲解,努力让本文成为全网最好理解的「C++20 协程」原理解析文章。

协程概念

先介绍一点协程的概念,如果你已经理解和掌握了相关的背景知识就可以跳过这个章节(或者快速浏览下,也许我们有些观念不一致可以讨论)。

这里我想稍微聊的深入一点,这涉及到入门后遇到复杂的协程问题时能不能正确的理解并解决问题。协程(Coroutines),也被称为微线程,纤程。一般指一个可以被暂停和恢复执行的逻辑体。一个普通的函数有 2 个常规的操作和行为:调用(Call)和返回(Return)。

当调用这个函数的时候,会暂停当前的执行,跳转到这个函数的起始位置去执行。

当函数执行完成后会返回结果(或者抛出异常)。这个调用过程通常是“一次性”的,再次调用这个函数又是一次独立的行为。但是协程不是,协程的操作和行为是:调用/创建(Create)、暂停/挂起(Suspend)、 恢复执行(Resume)和销毁(Destroy)。

协程可以允许这个被调用的函数执行到某个位置之后暂时保留自己的临时信息并挂起。在后面的某个时间点可以再回到当时执行的位置和状态继续执行。所以从某种意义上可以说协程是普通函数的泛化(Generalisation)。

当一个程序开始执行时,操作系统会创建出一个进程,操作系统内核里也会创建出一个调度实体(一个记录这个进程相关信息的数据结构),然后跳转到这个程序的代码入口去执行。

同一时间操作系统往往同时在执行很多个进程。这些进程在真实的多核 CPU 上并行或者轮流执行(当进程数大于 CPU 核数时排队)。操作系统负责对这些调度实体进行监控、统计、切换等一切调度工作。

一个进程通常只有一个常规的执行流(主线程),对应到 C/C++ 语言里就是一个串行执行的main()函数,main()函数里会调用其他函数。

其他函数可能继续调用另外的函数,最终main()函数执行完成后进程退出。当一个进程执行代码时,CPU 的寄存器保存着当前函数的执行状态,临时结果等信息。

如果要调用新的函数,那么需要把当前部分寄存器的值保存在内存里再去调用新的函数(因为新的函数也会使用这些寄存器进行运算)。

那么哪些寄存器需要保存?传递给新函数的参数放在哪里?调用完的返回值保存在哪里?

调用约定(Calling Conventions)规定了这些细节。本文不详细调用约定的细节,有兴趣的话可以阅读参考文献 [1]。在 AMD64/x86_64 架构下,函数调用使用栈(Stack)来实现,每个函数使用的栈区域称之为一个栈帧(Stack Frame)。下面展示一段代码和其执行时对应的栈状态:

当代码执行到main()->func1()->func2()的时候,内存栈是如图的形式。新的函数调用会使得栈空间向上增长为新函数创建栈帧,创建局部变量以及保存之前函数使用的寄存器。

函数调用返回后,栈帧就会被回收,同时从栈里恢复出之前的寄存器值并跳转到之前执行的代码地址(所以 C/C++代码里不能返回函数局部变量的指针,因为函数返回后再访问被回收的栈帧是很危险的,这些空间随后就被其他函数的临时数据覆盖了)。

随着函数的调用和返回,栈不断的增大和缩小。这个栈默认有多大?在 Linux 系统上执行ulimit -s就能看到默认的值是 8192 KB。如果程序里定义了过大的函数局部变量或者函数调用深度太长(尤其是递归),就有栈溢出的风险。与栈相对的内存区域是堆,一般在程序运行时使用brk(2)或者mmap(2)进行分配,C/C++程序里使用的malloc(3)/free(3)或者new/delete(只讨论内存分配的语义部分)分配一般是 CRT 的封装(或者jemalloc之类的内存分配库)。

堆内存区域不随着函数调用结束而自动回收,需要程序自行在使用完成后进行显式的释放动作。本质上堆、栈只是内存逻辑上的概念,只是创建时机和使用方式的不同而已,并无本质区别。如果逻辑上一个程序需要更多的执行流,可以创建线程来并发地执行另一个函数。

这里的线程通常是操作系统支持的概念,对应内核的一个调度实体,可以在不同的 CPU 核上和主线程并行的执行。操作系统内核在什么时候进行任务调度呢?自然是执行流跳转到内核代码的时候,除了系统调用会陷入内核额外触发调度外,还需要硬件时钟中断强行中断当前执行的程序来切换到内核逻辑来兜底(否则一个在用户态死循环的程序岂不是永远不交出执行权了)。

这个创建的线程跟主线程一样,拥有一个完全独立的线程栈去进行自身函数调用的状态保存。

铺垫了这么多,那协程究竟是什么?前文提到协程函数的操作和行为有:调用/创建(Create)、暂停/挂起(Suspend)、恢复执行(Resume)和销毁(Destroy)。创建和销毁比较好理解,创建线程也是有这 2 个步骤。多出来的挂起和恢复操作,是因为这个执行逻辑完全是在用户态模拟出来的,不存在一个内核的调度实体,那么自然需要在用户态去管理这些执行流的挂起和恢复执行。如果你熟悉 Linux 线程发展史的话,你会知道在 Linux 内核支持线程概念前,用户态模拟出的多个可以切换的“线程”其实就是协程的一种实现。

在多线程编程里,一般不在线程里添加主动切换的逻辑,往往都是被动的被内核从执行状态转换为等待状态,比如 IO 操作未完成,尝试获取的资源暂时被强占的时候(最多就是调用某些系统调用告诉内核自己暂时让出 CPU,让内核看看要不要执行别的任务)。

但协程不是,协程需要用户逻辑在后续执行条件不满足的时候主动切换让出执行权。这里姑且将这种协程称之为第一代协程,其仅仅是线程等概念在理论上的推演。而广义的协程(Coroutines)是个很宽泛的概念。

尤其是async/await语义带来了新的浪潮之后。这几年新的编程语言里引入的协程模型几乎都是async/await语义的(除了 go 语言 )。本文不考古async/await概念在微软的发展历史,有兴趣的话可以参阅相关资料。

那么,如何在 C/C++ 语言里实现协程?

容易想到的方式就是类似线程的实现方式(第一代协程抽象),为每一个协程创建一个独立的内存栈进行上下文的保存和函数调用。这种方式在协程间进行切换的方法就比较简单,只需要用户代码自行保存上下文之后直接跳转到目标函数位置执行即可。这和操作系统内核切换多个执行流的方式也很类似。

这再也就是在 C++20 协程相关的资料里提到的所谓的有栈协程(Stackful Coroutine)。有栈协程的创建代价比较大。

因为协程函数执行前要预先创建好独立的协程栈,预分配内存这也就限制了同时并发的协程数量。而且预分配的栈过大会造成浪费,过小了又会导致函数调用深了以后导致栈溢出(其实从 Linux 内核内存分配的角度讲,实际用到了才会分配内存页,也就是一个协程栈最多也就比实际使用的内存浪费一个物理页)。另外现代 CPU 的分支预测里有返回栈缓冲区(Return Stack Buffer,RSB)的预测,不断手动的栈切换就在不断的破坏 RSB 的预测机制,且越复杂越高频的切换影响就越大。

但这种模型的好处是它对编译器几乎是透明的,对已有代码的协程化改造会非常简单,只需要修改创建协程的位置并在代码里添加主动的切换点即可(甚至可以用 hook 的方式自动加yield进去做到完全不需要修改已有代码)。

这里稍微提一下共享栈协程(Copying the Stack Coroutine),既然每个协程都创建一个额外的栈太浪费了,那就只创建一个。在协程切换的时候,拷贝当前已使用的栈到另外的内存里,腾出来栈给新的协程用即可。需要切换回来的时候,拷贝回来之前的栈即可。共享栈协程解决了预分配的内存浪费问题,但是引入了栈备份和还原的开销。要想性能好的话,还需要尽量减少函数调用深度以及尽量不在栈上分配太大的数据结构。所以共享栈协程只是一种优化手段,所以一般不单独拿出来对比和讨论。

与有栈协程相对的是无栈协程(Stackless Coroutine),即 C++20 所采用的模式。这个模式下创建的协程很轻量,一开始就会在堆上保存所有的协程函数的“临时变量”以及调用参数等上下文信息。从协程函数里切换出来的时候,因为大多数东西都是保存在堆上的,所以切换动作可以很短很快。恢复执行的时候也不需要太多的栈上结构还原就可以跳转回原来的代码位置执行。因为创建出来的协程不申请新的内存栈而是在调用/恢复位置原地还原上下文,所以被称之为无栈协程。

但是这种模式无法简单适配已有代码来实现协程化改造,需要对旧代码重构甚至重写才能完成改造。强调一下,这个有栈无栈是站在 C/C++ 语言的角度去讨论的。有些语言(比如 Python)并不在栈空间为执行流分配保存上下文的「栈」,所以不存在 Stackful 和 Stackless 讨论。甚至在纯理论的讨论协程概念时,提到栈(Stack)都显得有些「业余」。

因为栈不是实现函数调用的唯一方式,只是现代主流编程语言大都这么实现的而已。尽管这不影响这篇文档想表达的实际意思,但这里还是阐明一下,本文只是在 C/C++ 语言现状下偏向于工程化的讨论,不拔高到纯粹的理论高度。另外我也只是个普通的写代码的程序员而已,无法从科学(数学)角度讨论async/await的语义,还请见谅 。

说多了容易让读者糊涂,但是不说的话又担心让读者一叶障目。本文介绍具体的实现原理是为了快速的理解抽象理论,但也容易造成对协程理论的狭隘理解,这一点还请读者注意。

协程的发展方向是什么?目前看都是在着力解决异步代码的「形式化同步编写」的问题,即帮助程序员以近似同步的方式来编写异步逻辑,在形式上使得有逻辑关系的代码不要割裂。但是这种抽象是有运行时的成本的。这里不妨来一句暴论:在纯粹理论上协程一般不如线程 + callbacks 的性能来得好(至少当前的自动优化水平下)。

追求极限性能的话还是算了。追求维护成本和性能的平衡或者为了降低开发成本,这才是协程正确的打开方式。有人说「程序就是一个状态机,线程/协程是给那些写不好状态机的人准备的」[2]。这句话有道理吗?其实有一点道理。从最终的执行上看,程序就是一个在 CPU 上执行的状态机。但是从程序的设计上讲,更高级别的抽象方式和方法有助于写出来更容易被「人类」理解和改进的逻辑。高级抽象是构建大型软件系统所必须的基础设施。哪怕付出一点运行时成本也是值得的。随着技术的发展,运行时成本会逐渐优化降低的。而我们必须时刻提醒自己,代码是写给「人」看的,而不是「机器」。

C++20 的协程实现

前面讨论这么多背景和原理做什么?说来惭愧,C++20 的协程机制我看了好几遍 cppreference 的文档才理解。目前 C++20 实现的协程机制不适合给最终用户去使用,而是给协程库的作者提供的一些编译器的协程支持和语法糖。我认为想理解目前实现最好的方式就是从设计者的角度去理解为什么要这么做。前文提到,C++20 的协程实现是 Stackless 的实现。

其创建出来的协程将其执行所需要的必要数据保存在堆上。在协程切换出去保存上下文信息到堆后,依旧在原来的栈位置恢复之前的函数执行。协程切换回来的时候也是在当前调用恢复操作的栈上还原之前的状态然后执行。所以编译器在这个过程中支持了什么?编译器支持了自动的协程上下文保存/恢复,以及自动的变量捕获和堆上保存机制(有 go 语言那味了,go 语言支持返回函数局部变量的,编译器会自动将其保存在堆上)。

所以 C++20 目前的协程仅仅就是个带了编译器辅助机制的基础实现(当然这符合 C++ 的一贯风格,都是编译器来支持标准库无法实现的最小化特性,其他的交给标准库去做)。C++20 的协程代码像语法糖一样会被编译器展开成更复杂的代码,这就需要很多「约定」来写用户侧的代码,然后依赖编译器去采用近似 codegen 的方式来生成后续部分(如果你懂 js 并且研究过 React 生命周期的话,你可能脑海里已经有画面了)。

这也就是为什么看了很多 C++20 协程代码的 demo 后仍旧一头雾水的原因。所以本质上是理解生成后代码的流程,然后就能理解为什么要预先定义这些类型和回调来帮助编译器展开代码。

从一个“简单”的 demo 开始

下面是一个 C++20 协程的简单 demo 代码,如果没有高版本编译器的话可以用 Complier Explorer [3] 执行:

#include <iostream>
#include <coroutine>

template <bool READY>
struct Awaiter {
    bool await_ready() noexcept {
        std::cout << "await_ready: " << READY << std::endl;
        return READY;
    }
    void await_resume() noexcept {
        std::cout << "await_resume" << std::endl;
    }
    void await_suspend(std::coroutine_handle<>) noexcept {
        std::cout << "await_suspend" << std::endl;
    }
};

struct TaskPromise {
    struct promise_type {
        TaskPromise get_return_object() {
            std::cout << "get_return_object" << std::endl;
            return TaskPromise{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        Awaiter<true> initial_suspend() noexcept {
            std::cout << "initial_suspend" << std::endl;
            return {};
        }
        Awaiter<true> final_suspend() noexcept {
            std::cout << "final_suspend" << std::endl;
            return {};
        }
        void unhandled_exception() {
            std::cout << "unhandled_exception" << std::endl;
        }
        void return_void() noexcept {
            std::cout << "return_void" << std::endl;
        }
    };

    void resume() {
        std::cout << "resume" << std::endl;
        handle.resume();
    }

    std::coroutine_handle<promise_type> handle;
};

TaskPromise task_func() {
    std::cout << "task first run" << std::endl;
    co_await Awaiter<false>{};
    std::cout << "task resume" << std::endl;
}

int main() {
    auto promise = task_func();
    promise.resume();

    return 0;
}

这段代码[4]运行后输出:

get_return_object
initial_suspend
await_ready: 1
await_resume
task first run
await_ready: 0
await_suspend
resume
await_resume
task resume
return_void
final_suspend
await_ready: 1
await_resume

尽管我已经尽力把第一个 demo 写的足够小了,但是依旧比其他语言的协程 demo 长很多。原因也很简单,C++ 想让程序员可以定制协程创建和执行的任意一个阶段的任意步骤的行为,那么就必须定义足够多的回调函数来定义每个阶段的行为。不参考任何文献想看懂上面的代码的话还是有些难度的。但是回想一下上文所说的“编译器展开”代码的实现原理,结合这些函数的执行顺序就可以大致推测出来编译器展开代码后函数调用的顺序。其实 cppreference 给出来了具体的执行过程 [5],但是有些语焉不详,这里我们更详细的讲述流程并解释一些细节:

  1. 调用operator new申请空间并初始化协程状态(Coroutine State),协程状态是编译器根据协程函数自动生成的类,每个不同的协程都得生成单独的。
  2. 复制/移动调用协程函数的参数到协程状态对象里(参数要保存到堆里才能在切换时保留)[6],另外这个函数体内部如果定义了其他栈上变量,也需要放到堆上(demo 里没有定义)。这是靠编译器自动分析完成的,有些不依赖编译器实现的“Stackless C/C++”协程库会给个函数创建临时变量(分配在堆上)。当然还是编译器原生支持会简单自然很多。这里引申一个问题,如果某些临时对象在协程返回时已经离开了作用域而理论上不需要捕获,是不是不用额外保存呢?是不是可以直接析构呢?答案是不行,因为 C++ 有个规则是对象的析构顺序是构造的反顺序。所以一旦中间的某个对象被保存在堆上,就可能被迫保存更多的对象以保证析构顺序(希望后续标准能修订协程函数在这里的行为规范,可以提升性能)。
  3. 构造协程的Promise::promise_type对象(也保存在协程状态里)。Promise对象是 C++ 规定的协程的返回值对象,对应上面 demo 代码里的TaskPromise 类 [7]。如果用户定义的Promise::promise_type有接受所有协程参数的构造函数,则调用该构造函数构造。否则调用默认构造函数构造。随后调用promise_type.get_return_object()函数创建协程函数的返回值对象TaskPromise。该对象会在协程首次挂起时返回到调用协程函数的位置。coroutine_handle和promise_type是可以使用handle::from_promise()和handle.promise()接口相互转换的(暂时不用深究原理,这涉及到一些编译器 builtin 的实现,和具体对象的内存布局有关系)。协程句柄(std::coroutine_handle<>)是一个可以操作协程的对象(类似可以操作线程的std::thread对象一样)。为了协程函数可以像普通函数调用一样去调用,所以这个 handle 的获取方式就有些别扭。
  4. 调用Promise::promise_type.initial_suspend(),后者会返回一个awaitable的对象,这个对象有三个定义好的成员函数,这里只需要知道await_ready()成员返回 true 时不进行默认挂起并立即调用await_resume()函数,否则立即挂起协程并调用await_suspend()函数即可。因为很多时候的co_await只需要知道要不要立即执行就行,所以标准库提供了默认的Awaiter实现:std::suspend_never和std::suspend_always类。前者永远不挂起,后者永远挂起。这里为什么搞这么复杂?答案是为了可扩展性,可以通过这个返回的对象来控制一个协程创建后是立即执行还是立即挂起,以及执行前是否要额外做一些操作,可以让用户方便的支持协程的调度机制。顺便说一句,demo 里的await_resume()函数返回值是 void,但其实可以返回任意类型,该返回值作为co_await表达式的返回值(demo 里没返回)。另外 demo 里的await_suspend()函数返回的是 void 类型,但实际上可以返回 bool 类型(返回 false 又会变成不挂起)甚至其他协程的coroutine_handle对象,此时会切换到该协程去执行。这就有了切换和调度的基础支持了,后面会详细聊这个机制。
  5. 根据 4 的选择是直接执行还是挂起,直接挂起的话就立即返回Promise对象给调用者,否则要等到协程显式挂起或者执行完成才会返回调用者。demo 里是默认不挂起,等第一次挂起时才返回给调用者。调用者立即resume()了这个协程函数,然后协程函数执行完成退出。上面的代码没有显式的写返回语句,编译器会在最后补上co_return,co_return会调用 promise.return_void()函数。这里没返回值,如果co_return返回了一个值T t的话,就需要定义一个叫void return_value(T t)的函数,return_value()和return_void()不能共存。
  6. 最后编译器调用co_await Promise::promise_type.final_suspend()函数结束协程(无论是抛异常结束还是正常退出都会调用)。注意final_suspend()返回的又是一个awaitable的对象,但是这里用std::suspend_always() 返回“挂起”时协程不会立即销毁内部的状态信息(否则直接销毁),因为有部分信息还保存在promise里呢。最佳实践是自己在Promise的析构函数里写coroutine_handle<>.destroy()。让handle的生命周期和Promise对象一致比较好,否则协程退出后再操作Promise对象就是 UAF 了(这个 demo 目前不需要,所以简单起见没这么处理)。
  7. 至于promise.unhandled_exception(),是在协程里出现未捕获的异常时候调用的。但是注意在promise.get_return_object()之前就抛出的异常不会来到这里,比如new导致的std::bad_alloc异常就不会调用到这里。异常处理流程不详细解释了,cppreference 描述的很清楚。

上面的task_func 代码运行结果结合这个流程介绍,编译器展开后的代码都能大致想象出来了:

TaskPromise task_func() {
    // No parameters and local variables.
    auto state = new __TaskPromise_state_(); // has TaskPromise::promise_type promise; 
    TaskPromise coro = state.promise.get_return_object();
    try {
        co_await p.inital_suspend();
        std::cout << "task first run" << std::endl;
        co_await Awaiter<false>{};
        std::cout << "task resume" << std::endl;
    } catch (...) {
        state.promise.unhandled_exception();
    }
    co_await state.promise.final_suspend();
}

co_await 展开会麻烦点,尤其是await_suspend()同步的返回值含义是不同的。这里先不深究编译器代码展开的更具体的细节了,基本的原理解释清楚了就先把关注点放在如何应用上[8]。上文提到了协程函数返回的必须是一个符合规范的Promise对象,这个对象内部必须有一个Promise::promise_type类型(名字不能变,但是可以用别的名字定义在别处,在类里面 using 声明为这个名字)。

这个Promise::promise_type类型必须实现上面流程里说的这些必备的函数,否则编译不通过。上面提到了Awaiter对象三个接口函数await_ready()、await_suspend()和await_resume()接口的行为,这里写一个例子来演示:

#include <iostream>
#include <coroutine>
#include <future>
#include <thread>

struct TaskPromise {
    struct promise_type {
        TaskPromise get_return_object() {
            std::cout << "get_return_object(), thread_id: " << std::this_thread::get_id() << std::endl;
            return TaskPromise{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() {}
        void return_void() noexcept {}
        size_t data = 0;
    };
    std::coroutine_handle<promise_type> handle;
};

struct Awaiter {
    bool await_ready() noexcept {
        std::cout << "await_ready(), thread_id: " << std::this_thread::get_id() << std::endl;
        return false;
    }
    void await_suspend(std::coroutine_handle<TaskPromise::promise_type> handle) noexcept {
        std::cout << "await_suspend(), thread_id: " << std::this_thread::get_id() << std::endl;
        auto thread = std::thread([=]() {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            handle.promise().data = 1;
            handle.resume();
        });
        thread.join();
    }
    void await_resume() noexcept {
        std::cout << "await_resume(), thread_id: " << std::this_thread::get_id() << std::endl;
    }
};

TaskPromise task_func() {
    std::cout << "task_func() step 1, thread_id: " << std::this_thread::get_id() << std::endl;
    co_await Awaiter{};
    std::cout << "task_func() step 2, thread_id: " << std::this_thread::get_id() << std::endl;
}

int main() {
    std::cout << "main(), thread_id: " << std::this_thread::get_id() << std::endl;
    auto promise = task_func();
    std::cout << "main(), data: " << promise.handle.promise().data << ", thread_id: " << std::this_thread::get_id() << std::endl;
    promise.handle.resume();
    std::cout << "main(), data: " << promise.handle.promise().data << ", thread_id: " << std::this_thread::get_id() << std::endl;

    return 0;
}

执行结果如下:

main(), thread_id: 0x1d9d91ec0
get_return_object(), thread_id: 0x1d9d91ec0
main(), data: 0, thread_id: 0x1d9d91ec0
task_func() step 1, thread_id: 0x1d9d91ec0
await_ready(), thread_id: 0x1d9d91ec0
await_suspend(), thread_id: 0x1d9d91ec0
await_resume(), thread_id: 0x16dce7000
task_func() step 2, thread_id: 0x16dce7000
main(), data: 1, thread_id: 0x1d9d91ec0

结合日志和上面的流程说明很好理解这段代码。代码里 26 行调用await_suspend()的handle参数是编译器帮着传入的(这个 handle 就一个void *指针,值传递的成本很低)。而 31 行代码是在另一个线程上调用的,而后续的协程代码也是在另一个线程运行的。这也揭示了协程的跨线程传递的能力,只要传递协程句柄,就可以实现在任意线程上恢复协程的执行。那么当协程跨线程传递时,线程安全的问题依旧要注意,而且因为执行流可以任意移动,因此带来的其他同步问题需要额外注意。

实现一个简单的 generator

generator(生成器)在 Python/Js 等语言里已经应用的很常见的,在 C++20 之前无法简单的实现,利用上面的协程机制可以方便的写出来简单的实现。现在有这样一个获取斐波那契数列的需求,每次拿到数列后续的一个数字。如果用 C++ 朴素实现的话要怎么做?需要定义一个斐波那契数列生成器的类。这个类有一个形如size_t next()的函数,每次调用就会返回下一个个数字,那么相关的变量就要保存在类成员变量里,以便于下一次调用next()函数时根据之前的值计算本次需要返回的值。这个类的代码很简单,这里就不占用篇幅了。这里只贴协程形式的实现:

#include <iostream>
#include <coroutine>

template <typename T>
struct Generator {
    struct promise_type {
        Generator get_return_object() {
            return Generator{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() {}
        void return_value(T t) noexcept {
            v = t;
        }
        std::suspend_always yield_value(T t) {
            v = t;
            return {};
        }
        T v{};
    };
    bool has_next() {
        return !handle.done();
    }
    size_t next() {
        handle.resume();
        return handle.promise().v;
    }
    std::coroutine_handle<promise_type> handle;
};

Generator<size_t> fib(size_t max_count) {
    co_yield 1;
    size_t a = 0, b = 1, count = 0;
    while (++count < max_count - 1) {
        co_yield a + b;
        b = a + b;
        a = b - a;
    }
    co_return a + b;
}

int main() {
    size_t max_count = 10;
    auto generator = fib(max_count);
    size_t i = 0;
    while (generator.has_next()) {
        std::cout << "No." << ++i << ": " << generator.next() << std::endl;
    }
    return 0;
}

代码运行结果为:


No.1: 1
No.2: 1
No.3: 2
No.4: 3
No.5: 5
No.6: 8
No.7: 13
No.8: 21
No.9: 34
No.10: 55

前文介绍过co_await和co_return,新出现的标识符co_yield算是co_await的语法糖,可以比co_await更方便的传递出来一个值[9]。

这里的Generator定义为一个模板类,可以用于各种类型的Generator的快速定义,如果有其他需求的话就可以快速实现了。当然这里的Generator考虑的不是很全面。一般还需要考虑 handle 对应的 coroutine state 的生命周期要和返回的 promise 对象一致,所以需要final_suspend()返回std::suspend_always后,promise 需要自己去释放 handle 对应的协程的相关内存。析构函数里加上释放逻辑后就需要完善Promise类的构造,拷贝等函数(三五法则)。稍微完善点的Generator代码如下:


template <typename T>
class Generator {
public:
    struct promise_type;
    using promise_handle_t = std::coroutine_handle<promise_type>;
    explicit Generator(promise_handle_t h) : handle(h) {}
    explicit Generator(Generator &&generator) : handle(std::exchange(generator.handle, {})) {}
    ~Generator() {
        if (handle) {
            handle.destroy();
        }
    }
    Generator(Generator &) = delete;
    Generator &operator=(Generator &) = delete;

    struct promise_type {
        Generator get_return_object() {
            return Generator{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() {}
        void return_value(T t) noexcept {
            v = t;
        }
        std::suspend_always yield_value(T t) {
            v = t;
            return {};
        }
        T v{};
    };
    bool has_next() {
        return !handle.done();
    }
    size_t next() {
        handle.resume();
        return handle.promise().v;
    }
private:
    std::coroutine_handle<promise_type> handle;
};

实现这个generator是一次性工作,一劳永逸。当然如果标准库有std::generator模板的话就更省事了,但是这个愿望要等 C++23 才会实现。可能你会觉得写个简单的类去实现相关的逻辑并不困难。但是设想一下,如果这个算法很复杂,所需要保存的变量值有很多的话。将这些变量抽到成员变量里并且在next()函数里实现阶段性返回就不那么容易了,而且写出来的代码绝对没有这个协程版本好维护。generator都有了,python 的其他函数式编程的filter、map之类的函数能实现吗?当然能,但是就又扯远了。本文不继续讨论函数式了,还是回到协程上来。

通用的协程返回类 Task

截止目前,已经有一些基于 C++20 协程基础支持的开源协程库开源了,比如 cppcoro [10] ,follyl 里的协程库 coro [11]、以及我司(阿里巴巴)开源的 async_simplep [12]。这里悄悄说一句,async_simple 的名字起的不够“高大上”,但是代码质量还是不错的 。

这些协程库都会提供一些通用的Coroutine Types、Awaitable Types等辅助类以及一些基于 C++20 协程的锁等同步机制,可以很方便的实现协程逻辑。而这些基础设施里,最最基本的就是一个通用的协程返回类型Task的实现。一个功能完备且极具可扩展性的Task实在是太复杂了,但是简单的一个 demo 还是比较容易的。

在参(抄)考(袭)了上述协程库的大致思路以及其他参考文献后,这里给出来一个简单的实现。为了简单起见,忽略了所有 C++ 异常的保存和处理(我们平时 C++ 项目一般也不让使用异常机制,有非预期异常都是直接let it crash的):


#include <iostream>
#include <functional>
#include <deque>
#include <optional>
#include <coroutine>
#include <thread>

template <typename T>
class Task {
public:
    struct promise_type;
    using promise_handle_t = std::coroutine_handle<promise_type>;

    explicit Task(promise_handle_t h) : handle(h) {}
    Task(Task &&task) noexcept : handle(std::exchange(task.handle, {})) {}
    ~Task() { if (handle) { handle.destroy(); } }

    template <typename R>
    struct task_awaiter {
        explicit task_awaiter(Task<R> &&task) noexcept : task(std::move(task)) {}

        task_awaiter(task_awaiter &) = delete;
        task_awaiter &operator=(task_awaiter &) = delete;

        bool await_ready() noexcept { return false; }
        void await_suspend(std::coroutine_handle<> handle) noexcept {
            task.finally([handle]() { handle.resume(); });
        }
        R await_resume() noexcept { return task.get_result(); }

    private:
        Task<R> task;
    };

    struct promise_type {
        Task get_return_object() {
            return Task(promise_handle_t::from_promise(*this));
        }

        std::suspend_never initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }

        template <typename U>
        task_awaiter<U> await_transform(Task<U> &&task) {
            return task_awaiter<U>(std::move(task));
        }

        void unhandled_exception() {}

        void return_value(T t) {
            data_ = t;
            notify_callbacks();
        }

        void on_completed(std::function<void(T)> &&callback) {
            if (data_.has_value()) {
                callback(data_.value());
            } else {
                callbacks_.push_back(callback);
            }
        }

        T get() {
            return data_.value();
        }

    private:
        void notify_callbacks() {
            for (auto &callback : callbacks_) {
                callback(data_.value());
            }
            callbacks_.clear();
        }

        std::optional<T> data_;
        std::deque<std::function<void(T)>> callbacks_;
    };

    T get_result() {
        return handle.promise().get();
    }

    void then(std::function<void(T)> &&callback) {
        handle.promise().on_completed([callback](auto data) {
            callback(data);
        });
    }

    void finally(std::function<void()> &&callback) {
        handle.promise().on_completed([callback](auto result) {
            callback();
        });
    }

private:
    promise_handle_t handle;
};

Task<int> task1() {
    std::cout << "task1 run" << std::endl;
    co_return 1;
}

Task<int> task2() {
    std::cout << "task2 run" << std::endl;
    co_return 2;
}

Task<int> call_task() {
    std::cout << "call_task" << std::endl;
    int data1 = co_await task1();
    std::cout << "call_task task1 data: " << data1 << std::endl;
    int data2 = co_await task2();
    std::cout << "call_task task2 data: " << data2 << std::endl;
    co_return data1 + data2;
}

int main() {
    Task<int> task = call_task();
    task.then([](int data) {
        std::cout << "call_task data: " << data << std::endl;
    });
    return 0;
}

代码执行结果如下:

call_task
task1 run
call_task task1 data: 1
task2 run
call_task task2 data: 2
call_task data: 3

可以在源码的函数里像之前的 demo 里加点日志去理解流程(或者直接使用调试器单步跟踪)。实际上这个 demo 没有实现真正意义上的等待和唤醒(甚至有些唤醒的逻辑都没运行到)。特别地,如果这些协程任务需要被调度到其他线程执行的话,还要考虑这些对象内部数据结构的并发安全性(直接使用std::mutex写不好就比普通的函数更容易出现死锁了 )。另外这个 demo 也揭示了一旦使用了C++20 的协程异步手段,从入口开始要一直改造下去,最终「传染」到整个项目的所有异步函数。

如果你有耐心读到这里的话,应该算是对 C++20 的协程勉强「入门」了。就可以能以这个文章为跳板去阅读其他起点更高的文章了。以目前 C++ 标准库提供的支持要手搓一个生产环境的协程框架还是比较困难的。这里推荐你阅读 async_simple 的文档 [13] 以及这两篇源码分析文章 [14] [15]。也可以回过头去阅读最经典的这个协程系列文章 [0],读完了这篇文章后,这个系列文章已经不是那么难啃了。写在后面

文章在这里结束很不过瘾,像是刚推开门又扭头快速离开了。但如果以C++ 20/23 标准协程落地的情况看,这篇文章其实也该co_yield了。毕竟再写下去就是介绍其他二方库三方库的实现思路了。还是等 C++26 完全落地吧,到时候看心情可能会继续handle.resume() 完成后续的内容。到那个时候,使用 C++ 标准库的协程实现一些东西的难度应该会下降一个数量级(但愿)。

参考文献

Copyright© 2013-2019

京ICP备2023019179号-2