咸鱼白的窝
一些奇奇怪怪的东西
一个C++协程库的实现与优化
站在巨人的肩膀上可以看得更远,但你得先爬到巨人的肩膀上。

预备知识

在阅读本文之前,你应当:

  • 理解阻塞I/O与非阻塞I/O同步调用与异步调用的概念。

  • 熟悉操作系统任务切换的过程(上下文的切换与保存)

  • 了解X86-64-ABI

  • 了解Linux下的I/O多路复用

本文所述的协程库源代码可以在我的github仓库获得。

协程是什么

(∩_∩)这种概念我就不慢慢解释了(偷个懒),自己查。

API

Schedule

Schedule::Schedule();

Schedule::Schedule(int size);

第一种方法会创建一个拥有1MB共享栈的schedule,该schedule创建的所有的共享栈协程都会共享这个1MB的运行栈。

第二种方法会创建一个schedule,其拥有的共享栈大小由stack_size参数决定。

即使一个schedule管理的所有协程都具有独立运行栈,使用者也必须为schedule创建至少100字节的共享栈。在我的设计中,所有协程在运行到末尾时都会销毁自有的栈,所有独立运行栈协程都必须在最后将运行栈切换到共享栈上以销毁其自有的独立运行栈(具体分析请见优化方案具有独立运行栈的协程部分)。

coroutine_create

int Schedule::coroutine_create(co_func func, void *args);

int Schedule::coroutine_create(co_func func, void *args, int stack_type, int stack_size);

第一个方法会创建一个共享运行栈协程,该方法等价于: Schedule::coroutine_create(func, args, SAVED_STACK, 0);

第二个方法在 stack_typeSAVED_STACK时会创建一个共享运行栈协程,stack_size决定了用来保存运行栈内容的内存区域的初始大小,该内存区域大小会随着每次的协程挂起而动态调整,所以建议将stack_size设为0

第二个方法在 stack_typeINDEPENDENT_STACK时会创建一个独立运行栈协程,stack_size决定了独立运行栈的大小。

coroutine_destroy

void Schedule::coroutine_destroy(int co_id);

这个方法会销毁由co_id指定的协程。

coroutine_resume

void Schedule::coroutine_resume(int co_id);

这个方法会将主协程切换到非主协程(由参数co_id指定)。

coroutine_yield

void Schedule::coroutine_yield();

这个方法会将正在运行的非主协程挂起并切换到主协程。

coroutine_status

int Schedule::coroutine_status(int co_id);

这个方法会返回一个协程(由参数co_id指定)的状态,状态(在coroutine.h中定义)可分为COROUTINE_DEAD, COROUTINE_READY, COROUTINE_RUNNING, COROUTINE_SUSPEND

coroutine_running

int Schedule::coroutine_running() const;

这个方法会返回正在运行的协程的id。

基本实现

基础的版本是去年读了云风前辈C coroutine源码后实现的C++ coroutine,这一版仅有共享运行栈协程。

后续的各种改进与优化都是基于这一版本。

该库主要由Schedule类和Coroutine类组成,创建schedule的程序被我们称为主协程,主协程未与任何一个Coroutine实例绑定,schedule创建的协程则称为非主协程,每一个非主协程都和一个Coroutine实例绑定。

协程分为四种状态:COROUTINE_DEAD, COROUTINE_READY, COROUTINE_RUNNING, COROUTINE_SUSPEND,如图:

协程状态图

schedule调用coroutine_create方法创建一个非主协程,此时该协程处于COROUTINE_READY状态。

当主协程的schedule第一次调用coroutine_resume方法切入一个处于COROUTINE_READY状态的非主协程时,会将schedule拥有的共享栈设为非主协程的运行栈,将非主协程上下文中的uc_link设置为主协程上下文(非主协程运行完会自动切换回主协程)并使用makecontext函数为非主协程构建上下文,最终调用swapcontext函数将上下文切至非主协程,该非主协程由此进入COROUTINE_RUNNING状态,代码如下:

getcontext(&co->ucontext);

co->ucontext.uc_stack.ss_sp = this->stack;
co->ucontext.uc_stack.ss_size = this->stack_size;

/*ucontext will change to main after non-main co return*/
co->ucontext.uc_link = &this->main;
co->status = COROUTINE_RUNNING;
this->running_id = co_id;

auto S_ptr = (uintptr_t) this;
makecontext(&co->ucontext, (void (*)()) start_func, 2, (uint32_t) S_ptr, (uint32_t) (S_ptr >> 32));

/*main co to non-main co*/
swapcontext(&this->main, &co->ucontext);

当运行中的非主协程调用coroutine_yield方法时,schedule会调用save_stack函数将共享运行栈中的内容保存到当前运行的非主协程的保存栈中,调用swapcontext函数将上下文切至主协程,非主协程变为COROUTINE_SUSPEND状态,代码如下:

int co_id = this->running_id;
Coroutine *co = this->co_list[co_id];
co->status = COROUTINE_SUSPEND;

save_stack(co, this->stack + STACK_SIZE);

this->running_id = -1;

/*non-main co to main co*/
swapcontext(&co->ucontext, &this->main);

当schedule调用coroutine_resume函数切入一个处于COROUTINE_SUSPEND的非主协程时,将非主协程保存栈中的内容恢复到schedule的共享运行栈中,调用swapcontext函数将上下文切至非主协程,该非主协程由此变回COROUTINE_RUNNING状态,代码如下:

memcpy(this->stack + STACK_SIZE - co->size, co->stack, co->size);

co->status = COROUTINE_RUNNING;
this->running_id = co_id;

swapcontext(&this->main, &co->ucontext);

无论一个协程处于什么状态(除了COROUTINE_RUNNING状态),只要对其调用coroutine_destroy函数即可销毁该协程。

优化方案

具有独立运行栈的协程

为了区分共享运行栈协程和独立运行栈协程, 在Coroutine类中增加了type成员, 构造Coroutine时会为type == INDEPENDENT_STACK分配独立运行栈,在Schedule类中增加了成员函数int coroutine_create(co_func func, void *args, int stack_type, int stack_size);, stack_type指定了协程类型, stack_size指定了独立运行栈的大小(字节)。

coroutine_resume函数中,对两种协程的处理做了判断区分,对于处于COROUTINE_READY状态的独立运行栈协程,上下文的栈区将初始化为独立运行栈,对于处于COROUTINE_SUSPEND状态的独立运行栈协程,取消了恢复栈区的操作,直接切换上下文即可。

coroutine_yield函数中,对于独立运行栈协程,取消了保存栈区的操作,直接进行上下文切换。

一点小插曲,在我初版的共享栈协程实现中,协程结束前会释放对应的Coroutine对象的内存(包括其保存栈),免去了库使用者手动释放已结束协程的麻烦,但增加了独立运行栈协程后,协程结束前释放对应的Coroutine对象的内存这一操作将引发对内存的越界访问(单线程运行时看似不会出错,但在多线程情况下存在隐患),我在撰写本文时意识到了这一bug,并采取了复制栈内容至共享栈,将rsp寄存器内容指向共享栈对应的新“栈顶”的解决方案,代码实现如下:

if (co->type == INDEPENDENT_STACK){
	register long rsp asm ("rsp");

	co->size = co->stack + co->cap - (char *)rsp;
	memcpy(S->stack + S->stack_size - co->size, (char *)rsp, co->size);
	rsp = (long)S->stack + S->stack_size - co->size;
}

基于这一实现,即使一个schedule管理的所有协程都具有独立运行栈,使用者也必须为schedule创建至少100字节的共享栈。

上下文切换优化

在初版实现中,上下文切换使用的是ucontext.h中提供的ucontext_t结构及相关库函数,然而该实现中涉及的FPU(浮点运算器),MXCSR(控制状态寄存器)在服务器端编程中极少用到,因此对这些寄存器的保存与恢复造成了不必要的性能消耗,这里参考了腾讯的libco的上下文切换并进行了精简:

.globl swap_ctx
.globl change_stack
.type  swap_ctx, @function

swap_ctx:
    leaq (%rsp),%rax
    movq %rax, 64(%rdi)

    movq %rbx, 56(%rdi)

    movq 0(%rax), %rax
    movq %rax, 48(%rdi)

    #movq %rdi, 40(%rdi)
    movq %rbp, 32(%rdi)
    movq %r12, 24(%rdi)
    movq %r13, 16(%rdi)
    movq %r14, 8(%rdi)
    movq %r15, (%rdi)
    xorq %rax, %rax

    movq 32(%rsi), %rbp
    movq 64(%rsi), %rsp
    movq (%rsi), %r15
    movq 8(%rsi), %r14
    movq 16(%rsi), %r13
    movq 24(%rsi), %r12
    movq 40(%rsi), %rdi
    movq 56(%rsi), %rbx

    leaq 8(%rsp), %rsp
    pushq 48(%rsi)

    ret

保存与恢复了参数寄存器rdi(rsi不用保存与恢复,rdi这个寄存器对应start_func的参数S,其实只需要保证第一次切入时能从上下文中取出来而已,也不需要保存操作,rcx,rdx,r8,r9理论上在本设计中无须保存),返回地址(每次切换上下文时保证能回到之前调用切换函数的地方继续运行),7个callee-saved寄存器:rbx,rsp,rbp,r12,r13,r14,r15。

用来取代的ucontext_tcoctx定义在coctx.h中:

typedef struct coctx{
    void *regs[9];
    struct coctx *uc_link;
    stack_t uc_stack;
}coctx;

stack_t类型(见 bits/types/stack_t.h)的uc_stackucontext_t中的作用一样,用于初始化上下文时确定栈空间,regs[9]中即是上面提到的返回地址与8个寄存器。uc_link用来指定当前协程运行结束后的上下文,在本库中,所有非主协程上下文中的uc_link都指向了主协程的上下文。

值得一提的是,腾讯的libco并没有提供类似uc_link的实现,libco似乎默认所有的协程函数都是处于永不终止的循环中,并且简单粗暴的把所有协程的返回地址都指向了协程函数的入口。。直觉告诉我在这种实现下,如果库使用者的协程会运行到结尾,极有可能引发错误,但因为libco缺少详细文档,加上我并没有完全细究其代码,所以对这种情况我也无法断言必然出错,也许有什么容错机制吧。

因为对uc_link的需求,所以我去看了一下glibc中makecontext函数的源码,结合上面的coctx结构体,实现了make_ctx函数:

void make_ctx(coctx *ctx, co_start func, void *s){
    auto *sp = (long long *)((uintptr_t)ctx->uc_stack.ss_sp + ctx->uc_stack.ss_size);

    /* 16 Bytes room reserved for uc_link and start_ctx addr*/
    sp -= 2;

    /* Align stack, 8 Bytes room reserved for swap_ctx ret addr */
    sp = (long long *)(((uintptr_t)sp & -16L) - 8);

    ctx->regs[RSP] = (char *)sp;
    ctx->regs[RBX] = (char *)&sp[2];
    ctx->regs[RDI] = (char *)s;
    ctx->regs[RET] = (char *)func;

    sp[1] = (uintptr_t)start_ctx;
    if (ctx->uc_link)
        sp[2] = (uintptr_t)ctx->uc_link;
    else
        sp[2] = (uintptr_t)func;
}

除去一些对基本上下文的初始化,make_ctx确保在栈底预留了16字节给uc_linkstart_ctx函数地址,如下:

/*
high memory
uc_link		<--stack base
start_ctx
start_func	<--rsp
...
...
low memory
*/

make_ctxrsp指向start_ctx上面一个栈帧,在第一次调用swap_ctx进入协程时,会在rsp指向的栈帧中存入协程入口start_func地址,然后ret以进入协程,此时rsp重新指向start_ctx,栈由此向下扩展,当协程运行结束,rsp指回start_ctx,协程退出时ret直接进入start_ctx,其实现如下:

.globl start_ctx
.globl __set_ctx
.type  start_ctx, @function
start_ctx:
    movq (%rsp), %rdi

    call __set_ctx

__set_ctx:
    movq 32(%rdi), %rbp
    movq 64(%rdi), %rsp
    movq (%rdi), %r15
    movq 8(%rdi), %r14
    movq 16(%rdi), %r13
    movq 24(%rdi), %r12
    movq 40(%rdi), %rdi
    movq 56(%rdi), %rbx

    leaq 8(%rsp), %rsp
    pushq 48(%rdi)

    ret

uc_link取出至rdi寄存器,然后调用__set_ctx函数将上下文切换回主协程。

在实现uc_link的过程中,我没有将rsp寄存器设置到位,导致set_start的地址被start_func的地址覆盖了,debug时整出了uc_link的另一种用法,即在协程中直接获取uc_link的值并将其作为参数在协程结束前调用__set_ctx从而完成跳转,两种实现性能基本无区别,默认使用第二种实现,如果想使用第一种实现,可以 #define USE_UC_LINK

协程库默认使用coctx这一实现,如果想要使用ucontext_t的实现,可以#define USE_SYS_UCONTEXT

性能比较

100,000,000次协程切换总时长:

类型 USE_SYS_UCONTEXT USE_UC_LINK 时间(s)
共享运行栈 7.125
共享运行栈 7.230
共享运行栈 无作用 91.81
独立运行栈 5.749
独立运行栈 5.697
独立运行栈 无作用 89.52

通过对比我们可以看出使用没有FPU信息和MXCSR信息的coctx代替glibc提供的ucontext_t进行上下文切换后,整体速度提升超过了10倍。且独立运行栈的上下文切换速度也明显优于共享运行栈的上下文切换速度(在实际应用中,共享运行栈中含有大量数据时,这种切换速度的差距会更明显)。

简略谈谈libco

首先就是上面提到的libco的协程运行到结尾应该是没有任何回收措施的,很想找个鹅厂的前辈证实一下这个问题的答案。。

本来很想尝试像libco那样hook几个阻塞IO调用,但最近事情太多,暂时没那个精力去实现,这里简略地聊聊libco hook的大致思路吧。

libco通过LD_PRELOAD(详见ld.so(8))优先于系统的库函数加载进内存,运行时调用的将是libco hook后的IO调用,而真实的系统IO调用的函数指针则通过dlsym(3)取得,比如真实的read调用的函数指针通过static read_pfn_t g_sys_read_func = (read_pfn_t)dlsym(RTLD_NEXT,"read");获得,现在g_sys_read_func指向真正的read调用。

我们来看看hook后的read的实现:

ssize_t read( int fd, void *buf, size_t nbyte )
{
	HOOK_SYS_FUNC( read );
	

	if( !co_is_enable_sys_hook() )
	{
		return g_sys_read_func( fd,buf,nbyte );
	}
	rpchook_t *lp = get_by_fd( fd );
	
	if( !lp || ( O_NONBLOCK & lp->user_flag ) ) 
	{
		ssize_t ret = g_sys_read_func( fd,buf,nbyte );
		return ret;
	}
	int timeout = ( lp->read_timeout.tv_sec * 1000 ) 
				+ ( lp->read_timeout.tv_usec / 1000 );
	
	struct pollfd pf = { 0 };
	pf.fd = fd;
	pf.events = ( POLLIN | POLLERR | POLLHUP );
	
	int pollret = poll( &pf,1,timeout );
	
	ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte );
	
	if( readret < 0 )
	{
		co_log_err("CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d",
					fd,readret,errno,pollret,timeout);
	}
	
	return readret;

}

对于设置了O_NONBLOCK标志位的文件描述符,将直接通过g_sys_read_func调用真实的read,而对于阻塞的文件描述符,会加入poll轮询列表,等到poll返回后才通过g_sys_read_func调用真实的read,这里的poll也是经过了hook改造的,所有的hook后的函数的核心都在于这个poll,hook后的poll内部实现是co_poll_innerco_poll_inne会为文件描述符注册epoll事件,且如果设定了timeout参数,co_poll_inner还会将文件描述符相关的结构加入定时器队列,随后让出当前协程。当主协程中的co_eventloop触发监听事件时抑或是定时器队列触发超时,都会将对应协程切回来,此时会发起第二次poll,如果判断出是IO事件唤醒了协程,则会通过g_sys_read_func调用真实的read,如果判断出超时,则会重复之前的流程。

poll背后的具体实现比较复杂,我也只理清了个大概,若是展开细讲,大概又是好几万字,只能像这样粗略地描述一下以作记录了。

Tips

  • 注意共享栈协程和独立栈协程的选择,共享栈协程因为栈数据的保存与恢复操作,会使得性能下降,独立栈协程则更容易产生栈溢出的风险以及大量内存的占用。
  • 在共享栈协程中使用异步IO调用时,必须使用堆中分配的内存作为缓冲区,不可以将栈上的空间作为缓冲区,因为一旦协程挂起,运行栈就不再归属于该协程。在独立栈协程中则无此限制。
  • 在本库提供的协程中最好使用非阻塞调用而非阻塞调用,阻塞调用会使协程失去存在的必要性。(因为没有像libco一样实现对常用网络编程函数的hook,所以程序使用的第三方库中如果使用了阻塞调用,同样会使协程失去存在必要性)。

Last modified on 2021-01-31