协程的简单实现(C/C++)#

本文介绍Linux下用C/C++实现简单的协程的方法

协程有数种不同的实现方式,这里尝试最简单的一种方法。


参考:

https://en.wikipedia.org/wiki/Coroutine 协程介绍

https://linux.die.net/man/3/getcontext 函数文档

https://linux.die.net/man/3/makecontext 函数文档

https://code.woboq.org/userspace/glibc/sysdeps/unix/sysv/linux/x86/sys/ucontext.h.html linux源码

https://zhuanlan.zhihu.com/p/52061644

https://github.com/tonbit/coroutine

https://github.com/Winnerhust/uthread

https://github.com/cloudwu/coroutine

搜索引擎能搜到不少资料。其他在此不一一列举。


  • 重要的概念:调用栈上下文

  • 两个重要数据:mcontext_t,ucontext_t

  • 四个系统级函数:getcontext,setcontext,makecontext,swapcontext

主要就是用这几个工具,在调用栈层面修改相关寄存器的内容,来实现代码的执行流程(上下文)控制(切换、跳转),从而实现简单的协程。函数为系统级库函数,多数用汇编实现。


看一下数据结构:#

    /* Userlevel context.  */
    typedef struct ucontext_t
      {
        unsigned long int __ctx(uc_flags);
        struct ucontext_t *uc_link;
        stack_t uc_stack;                     //signal stack
        mcontext_t uc_mcontext;
        sigset_t uc_sigmask;
        struct _libc_fpstate __fpregs_mem;
        __extension__ unsigned long long int __ssp[4];
      } ucontext_t;

mcontext_t uc_mcontext 描述cpu的状态。包含相关寄存器的数据。

uc_link 指针,指向下一个上下文(当前上下文结束后执行)。

stack_t uc_stack signal stack信号栈

sigset_t uc_sigmask 信号mask

再看两个函数:#

int getcontext(ucontext_t *ucp) 把当前上下文存入ucp,成功返回0。 当后续切换到这个ucp时仿佛从此处再次返回,并往下执行。

int setcontext(const ucontext_t *ucp) 将当前上下文替换为ucp,即“切换”协程,不返回。

看个小例子感受一下:#

    #include <iostream>
    #include <ucontext.h>
    #include <chrono>
    #include <thread>
    
    int main()
    {
        int count = 0;
        
        ucontext_t context;
        
        getcontext(&context);
        
        std::cout<<"gg "<<count<<std::endl; // c++ 打印
        
        ++ count;
        
        std::this_thread::sleep_for(std::chrono::seconds(1));
        
       setcontext(&context); // 跳转到上面的getcontext处继续执行
        
        return 0;
    }

输出为

gg 0

gg 1

gg 2

gg 3

gg 4

gg 5

....

无限循环

注意count的值不会归零,只是执行点瞬间跳转,不相关的值并不会发生回退。


再看另两个函数#

void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...); getcontext获取了上下文后,可以对其进行人为修改。func为此ucp被激活时调用的函数,即一般所说的协程。 此前得给这个ucp的uc_stack干净的空间。uc_link也得指定func结束之后要切换的目的上下文,如果设为NULL,那就不再切换,流程随func直接返回。

int swapcontext(ucontext_t *oucp, ucontext_t *ucp); 把当前上下文存入oucp,然后切换到ucp。


那么现在已经可以实现一个简单的协程系统。#

  1. 先有一个Coroutine类。 包含每个协程自己的状态,上下文等。

  2. 再有一个调度者Hub类。 Hub管理所有协程的数据、状态,并进行调度。

Hub会保存自己的上下文,然后每次协程切换都会先回到Hub的上下文,Hub再进行调度。

  1. Hub::make() make创建或者说是注册一个协程,用户端把自己的函数传给make,等待执行即可。

  2. Hub::yield() yield切换(交出控制权),返回hub继续调度执行其他协程。 这里说明一下。单纯的协程切换并没有什么用,需要配合事件系统(也可以说异步机制)来达到不浪费cpu的目的。

    举例:现在要从socket读数据,数据由远程机器的数据库产生,可能需要10秒,那么这10秒阻塞就浪费了自己的cpu。

    如果借助某个事件系统比如libev来起一个事件,先切换出去干别的事,数据到了以后libev会发通知,那时再切换回读socket这个点继续执行代码。这样就不会浪费cpu。 可参考gevent等。这里不作深入,只实现基本的协程。

  3. 调度。这里没有任何算法,没有优化,就是从当前位置无脑往后轮询,保证每个协程都能轮到、不出现卡死即可。 实用情况应参考或发明各种调度方法。


最终代码和测试结果:#

cr.h#

    // xc 20200129 coroutine cr.h
    #include <iostream>
    #include <ucontext.h>
    #include <vector>
    #include <chrono>
    #include <cassert>
    
    // EMPTY-可创建新协程 NEW-新的协程 RUNNING-运行中 SUSPEND-挂起 DONE-完成
    enum CoroutineStatus {EMPTY, NEW, RUNNING, SUSPEND, DONE};
    
    const int STACK_SIZE = 1024 * 32; // 每个协程的栈大小
    const int MAX_COROUTINE_COUNT = 5; // 协程总数限制
    
    class Coroutine // 协程类。包含状态、上下文、函数、各种标志位。
    {
    public:
        int id;
        CoroutineStatus status;
        ucontext_t context;     // 上下文
        char stack[STACK_SIZE]; // 栈空间。直接用数组,不折腾动态申请。
        void (*func)(void); // 要执行的函数。为了简便,不处理参数了。
        int yielded; // 区分协程是1.函数自然执行完毕,还是2.切换出来。如果是执行完毕,状态改为EMPTY腾出位置。
        
        Coroutine()
        {
            status = EMPTY; // 默认EMPTY,该位置可创建新协程。
        }
    };
    
    class Hub // hub类。即调度者,每次协程切换都会回到hub的context进行调度。
    {
    private:
        ucontext_t hub_context; // hub的上下文。每次协程切换都会回这里进行调度。
    
    public:
        int max_size; // 协程总数限制
        int current_index; // 当前位置
        int coroutine_count; // 当前协程数
        
        std::vector<Coroutine> coroutine_vector; // 协程vector。简化为vector,一次创建,不动态申请。
    
    public:
        Hub()
        {
            coroutine_count = 0;
            current_index = -1;
            max_size = MAX_COROUTINE_COUNT;
            coroutine_vector = std::vector<Coroutine>(MAX_COROUTINE_COUNT); // 直接创建所有。不折腾动态申请内存。
        }
        
        int print_status() // 打印系统当前状态
        {
            std::cout<<"--    coroutine_count = "<<this->coroutine_count<<"/"<<this->max_size<<std::endl;
            std::cout<<"--    current coroutine id = "<<this->current_index<<std::endl;
        }
        
        int make(void (*func)(void)) // 创建协程
        {
            if(coroutine_count >= max_size) // 超出限定数量
                return 1;
            
            // 找一个EMPTY的位置
            int pos = -1;
            auto coroutine_vector_size = coroutine_vector.size();
            for(int i = current_index + 1; i < coroutine_vector_size; ++ i) // 往后找
            {
                if(EMPTY == coroutine_vector[i].status)
                {
                    pos = i;
                    break;
                }
            }
            
            if(pos == -1) // 没找到。再从头找。
            {
                for(int i = 0; i < current_index; ++ i)
                {
                    if(EMPTY == coroutine_vector[i].status)
                    {
                        pos = i;
                        break;
                    }
                }
            }
            
            assert(pos != -1);
            
            coroutine_vector[pos].id = pos;
            coroutine_vector[pos].status = NEW; // 标志为新协程
            coroutine_vector[pos].func = func;
            
            ++ coroutine_count; // 计数+1
            
            return 0;
        }
    
        int schedule() // 调度
        {
            // 选出要执行的协程。NEW或SUSPEND状态。
            int pos = -1;
            auto coroutine_vector_size = coroutine_vector.size();
            for(int i = current_index + 1; i < coroutine_vector_size; ++ i) // 往后找
            {
                if(NEW == coroutine_vector[i].status || SUSPEND == coroutine_vector[i].status)
                {
                    pos = i;
                    break;
                }
            }
            
            if(pos == -1) // 从头找
            {
                for(int i = 0; i < current_index; ++ i)
                {
                    if(NEW == coroutine_vector[i].status || SUSPEND == coroutine_vector[i].status)
                    {
                        pos = i;
                        break;
                    }
                }
            }
            
            if(pos == -1) // 没找到可执行的
            {
                if(coroutine_count == 1 && coroutine_vector[current_index].yielded == 0)
                {
                    return 1; // 如果只存在一个协程,那么就是当前的协程,并且yielded == 0,代表函数正常结束,那么代表所有协程都执行完毕。
                }
                
                std::cout<<"no runnable coroutine"<<std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
                return 0;
            }
            
            std::cout<<"schedule pick "<<pos<<std::endl; // 选择了pos这个位置
            
            // 更新状态
            if(current_index != -1) // 初始状态的话,不用更新。否则更新当前协程的状态。
            {
                if(coroutine_vector[current_index].yielded == 0)
                {
                    // 如果是函数自己执行完毕,设为EMPTY,计数-1。即销毁。
                    coroutine_vector[current_index].status = EMPTY;
                    -- coroutine_count;
                }
                else // 否则是切换出来的,设为挂起。
                    coroutine_vector[current_index].status = SUSPEND;
            }
            
            current_index = pos; // 当前位置标记设为pos
            
            print_status();
            
            // 开始切换
            auto coroutine = &coroutine_vector[pos]; // 取新选定的协程
            
            if(coroutine->status == NEW) // 新的协程。需要配置一下。
            {
                getcontext(&(coroutine->context)); // 获取当前contenxt
                
                // 配置
                std::fill_n(coroutine->stack, STACK_SIZE, 0); // 清理stack
                coroutine->context.uc_stack.ss_sp = coroutine->stack;
                coroutine->context.uc_stack.ss_size = STACK_SIZE;
                coroutine->context.uc_stack.ss_flags = 0;
                coroutine->context.uc_link = &hub_context; // 此协程执行完跳回hub_context
                
                makecontext(&(coroutine->context), coroutine->func, 0); // 设置要执行的函数
                
            }
            
            coroutine->status = RUNNING;
            coroutine->yielded = 0;  // 默认标记为“非切换”
            
            // 切换。当前上下文存hub_context
            swapcontext(&hub_context, &(coroutine->context));
            
            // 因为uc_link设置为了&hub_context,所以下次切换回hub时会走到这里。
            // 这样就实现了每次协程切换都会回到hub_context进行下一次调度。
            
            return 0;
        }
        
        int yield() // 切换出去,交出控制权。
        {
            // 切换回hub
            coroutine_vector[current_index].yielded = 1; // 标记为切换
            swapcontext(&(coroutine_vector[current_index].context), &hub_context);
            
            // 在用户端的函数内调用yield()
            // 当前上下文会存到对应的协程内。下次切换回这个函数时,就会从这里继续执行。
        }
        
        int run() // 启动
        {
            std::cout<<"run"<<std::endl;
            print_status();
            for(;;) // 不停地调度,直到所有协程都执行完毕。
            {
                std::cout<<"schedule ..."<<std::endl;
                auto result = schedule();
                
                if(result == 1)
                {
                    std::cout<<"all over"<<std::endl;
                    break;
                }
            }
            
            return 0;
        }
    };
    
    Hub hub;

cr.cpp#

    // xc 20200129 coroutine cr.cpp
    #include <iostream>
    #include <chrono>
    #include <thread>
    #include "cr.h"
    
    void func_1_1()
    {
        std::cout<<"func_1_1"<<std::endl;
        hub.yield(); // 切换
        std::cout<<"func_1_1 over"<<std::endl;
    }
    
    void func_1()
    {
        std::cout<<"func_1"<<std::endl;
        hub.yield(); // 切换
        
        for(int i = 0; i < 6; ++ i)
        {
            auto result = hub.make(func_1_1); // 创建
            if(result == 0)
                std::cout<<"hub.make ok"<<std::endl;
            else
                std::cout<<"hub.make failed"<<std::endl;
            
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        std::cout<<"func_1 over"<<std::endl;
    }
    
    void func_2()
    {
        std::cout<<"func_2"<<std::endl;
        hub.yield(); // 切换
        hub.yield(); // 切换
        std::cout<<"func_2 over"<<std::endl;
    }
    
    void func_3()
    {
        std::cout<<"func_3"<<std::endl;
        hub.yield(); // 切换
        std::cout<<"func_3 over"<<std::endl;
    }
    
    
    int main()
    {
        std::cout<<"main"<<std::endl;
        auto result = hub.make(func_1); // 创建
        result = hub.make(func_2); // 创建
        result = hub.make(func_3); // 创建
        
        hub.run(); // 启动
        
        return 0;
    }

输出:

    main
    run                                  // hub 启动
    --    coroutine_count = 3/5          // 新建了3个协程
    --    current coroutine id = -1      // 初始状态。当前id为-1
    schedule ...
    schedule pick 0                      // 执行0号位
    --    coroutine_count = 3/5
    --    current coroutine id = 0
    func_1                               // 0号位func_1
    schedule ...
    schedule pick 1                      // 切换。执行1号位
    --    coroutine_count = 3/5
    --    current coroutine id = 1
    func_2                               // 1号位func_2
    schedule ...
    schedule pick 2                      // 切换。执行2号位
    --    coroutine_count = 3/5
    --    current coroutine id = 2
    func_3                               // 2号位func_3
    schedule ...
    schedule pick 0                      // 切换。执行0号位
    --    coroutine_count = 3/5
    --    current coroutine id = 0
    hub.make ok                          // 连续建6个协程。但是当前已经有3个,而且总数限定为5。所以只成功2个。
    hub.make ok
    hub.make failed
    hub.make failed
    hub.make failed
    hub.make failed
    func_1 over                          // 0号位func_1结束。
    schedule ...
    schedule pick 1                      // 切换。执行1号位func_2
    --    coroutine_count = 4/5          // 数量变为4
    --    current coroutine id = 1
    schedule ...
    schedule pick 2                      // 切换。执行2号位
    --    coroutine_count = 4/5
    --    current coroutine id = 2
    func_3 over                          // 2号位func_3结束。
    schedule ...
    schedule pick 3                      // 切换。执行3号位
    --    coroutine_count = 3/5          // 数量变为3
    --    current coroutine id = 3
    func_1_1                             // 3号位func_1_1
    schedule ...
    schedule pick 4                      // 切换。执行4号位
    --    coroutine_count = 3/5
    --    current coroutine id = 4
    func_1_1                             // 4号位func_1_1
    schedule ...
    schedule pick 1                      // 切换。执行1号位
    --    coroutine_count = 3/5
    --    current coroutine id = 1
    func_2 over                          // 1号位func_2结束。
    schedule ...
    schedule pick 3                      // 切换。执行3号位
    --    coroutine_count = 2/5          // 数量变为2
    --    current coroutine id = 3
    func_1_1 over                        // 3号位func_1_1结束。
    schedule ...
    schedule pick 4                      // 切换。执行4号位
    --    coroutine_count = 1/5          // 数量变为1
    --    current coroutine id = 4
    func_1_1 over                        // 4号位func_1_1结束。
    schedule ...
    all over                             // 所有协程执行完毕