预备知识
在阅读本文之前,你应当:
-
理解
阻塞I/O与非阻塞I/O
,同步调用与异步调用
的概念。 -
熟悉
操作系统任务切换的过程(上下文的切换与保存)
。 -
了解
X86-64-ABI
。 -
了解
Linux下的I/O多路复用
。
本文所述的协程库源代码可以在我的github仓库获得。
协程是什么
(∩_∩)这种概念我就不慢慢解释了(偷个懒),自己查。
API
Schedule
Schedule::Schedule();
Schedule::Schedule(int size);
第一种方法会创建一个拥有1MB共享栈的schedule,该schedule创建的所有的共享栈协程都会共享这个1MB的运行栈。
第二种方法会创建一个schedule,其拥有的共享栈大小由stack_size
参数决定。
即使一个schedule管理的所有协程都具有独立运行栈,使用者也必须为schedule创建至少100字节的共享栈。在我的设计中,所有协程在运行到末尾时都会销毁自有的栈,所有独立运行栈协程都必须在最后将运行栈切换到共享栈上以销毁其自有的独立运行栈(具体分析请见优化方案
中具有独立运行栈的协程
部分)。
coroutine_create
int Schedule::coroutine_create(co_func func, void *args);
int Schedule::coroutine_create(co_func func, void *args, int stack_type, int stack_size);
第一个方法会创建一个共享运行栈协程,该方法等价于: Schedule::coroutine_create(func, args, SAVED_STACK, 0);
。
第二个方法在 stack_type
为SAVED_STACK
时会创建一个共享运行栈协程,stack_size
决定了用来保存运行栈内容的内存区域的初始大小,该内存区域大小会随着每次的协程挂起而动态调整,所以建议将stack_size
设为0
。
第二个方法在 stack_type
为INDEPENDENT_STACK
时会创建一个独立运行栈协程,stack_size
决定了独立运行栈的大小。
coroutine_destroy
void Schedule::coroutine_destroy(int co_id);
这个方法会销毁由co_id
指定的协程。
coroutine_resume
void Schedule::coroutine_resume(int co_id);
这个方法会将主协程切换到非主协程(由参数co_id
指定)。
coroutine_yield
void Schedule::coroutine_yield();
这个方法会将正在运行的非主协程挂起并切换到主协程。
coroutine_status
int Schedule::coroutine_status(int co_id);
这个方法会返回一个协程(由参数co_id
指定)的状态,状态(在coroutine.h
中定义)可分为COROUTINE_DEAD
, COROUTINE_READY
, COROUTINE_RUNNING
, COROUTINE_SUSPEND
。
coroutine_running
int Schedule::coroutine_running() const;
这个方法会返回正在运行的协程的id。
基本实现
基础的版本是去年读了云风前辈的C coroutine源码后实现的C++ coroutine,这一版仅有共享运行栈协程。
后续的各种改进与优化都是基于这一版本。
该库主要由Schedule类和Coroutine类组成,创建schedule的程序被我们称为主协程,主协程未与任何一个Coroutine实例绑定,schedule创建的协程则称为非主协程,每一个非主协程都和一个Coroutine实例绑定。
协程分为四种状态:COROUTINE_DEAD
, COROUTINE_READY
, COROUTINE_RUNNING
, COROUTINE_SUSPEND
,如图:
schedule调用coroutine_create
方法创建一个非主协程,此时该协程处于COROUTINE_READY
状态。
当主协程的schedule第一次调用coroutine_resume
方法切入一个处于COROUTINE_READY
状态的非主协程时,会将schedule拥有的共享栈设为非主协程的运行栈,将非主协程上下文中的uc_link
设置为主协程上下文(非主协程运行完会自动切换回主协程)并使用makecontext
函数为非主协程构建上下文,最终调用swapcontext
函数将上下文切至非主协程,该非主协程由此进入COROUTINE_RUNNING
状态,代码如下:
getcontext(&co->ucontext);
co->ucontext.uc_stack.ss_sp = this->stack;
co->ucontext.uc_stack.ss_size = this->stack_size;
/*ucontext will change to main after non-main co return*/
co->ucontext.uc_link = &this->main;
co->status = COROUTINE_RUNNING;
this->running_id = co_id;
auto S_ptr = (uintptr_t) this;
makecontext(&co->ucontext, (void (*)()) start_func, 2, (uint32_t) S_ptr, (uint32_t) (S_ptr >> 32));
/*main co to non-main co*/
swapcontext(&this->main, &co->ucontext);
当运行中的非主协程调用coroutine_yield
方法时,schedule会调用save_stack
函数将共享运行栈中的内容保存到当前运行的非主协程的保存栈中,调用swapcontext
函数将上下文切至主协程,非主协程变为COROUTINE_SUSPEND
状态,代码如下:
int co_id = this->running_id;
Coroutine *co = this->co_list[co_id];
co->status = COROUTINE_SUSPEND;
save_stack(co, this->stack + STACK_SIZE);
this->running_id = -1;
/*non-main co to main co*/
swapcontext(&co->ucontext, &this->main);
当schedule调用coroutine_resume
函数切入一个处于COROUTINE_SUSPEND
的非主协程时,将非主协程保存栈中的内容恢复到schedule的共享运行栈中,调用swapcontext
函数将上下文切至非主协程,该非主协程由此变回COROUTINE_RUNNING
状态,代码如下:
memcpy(this->stack + STACK_SIZE - co->size, co->stack, co->size);
co->status = COROUTINE_RUNNING;
this->running_id = co_id;
swapcontext(&this->main, &co->ucontext);
无论一个协程处于什么状态(除了COROUTINE_RUNNING
状态),只要对其调用coroutine_destroy
函数即可销毁该协程。
优化方案
具有独立运行栈的协程
为了区分共享运行栈协程和独立运行栈协程, 在Coroutine类中增加了type
成员, 构造Coroutine时会为type == INDEPENDENT_STACK
分配独立运行栈,在Schedule类中增加了成员函数int coroutine_create(co_func func, void *args, int stack_type, int stack_size);
, stack_type
指定了协程类型, stack_size
指定了独立运行栈的大小(字节)。
在coroutine_resume
函数中,对两种协程的处理做了判断区分,对于处于COROUTINE_READY
状态的独立运行栈协程,上下文的栈区将初始化为独立运行栈,对于处于COROUTINE_SUSPEND
状态的独立运行栈协程,取消了恢复栈区的操作,直接切换上下文即可。
在coroutine_yield
函数中,对于独立运行栈协程,取消了保存栈区的操作,直接进行上下文切换。
一点小插曲,在我初版的共享栈协程实现中,协程结束前会释放对应的Coroutine对象的内存(包括其保存栈),免去了库使用者手动释放已结束协程的麻烦,但增加了独立运行栈协程后,协程结束前释放对应的Coroutine对象的内存这一操作将引发对内存的越界访问(单线程运行时看似不会出错,但在多线程情况下存在隐患),我在撰写本文时意识到了这一bug,并采取了复制栈内容至共享栈,将rsp寄存器内容指向共享栈对应的新“栈顶”的解决方案,代码实现如下:
if (co->type == INDEPENDENT_STACK){
register long rsp asm ("rsp");
co->size = co->stack + co->cap - (char *)rsp;
memcpy(S->stack + S->stack_size - co->size, (char *)rsp, co->size);
rsp = (long)S->stack + S->stack_size - co->size;
}
基于这一实现,即使一个schedule管理的所有协程都具有独立运行栈,使用者也必须为schedule创建至少100字节的共享栈。
上下文切换优化
在初版实现中,上下文切换使用的是ucontext.h
中提供的ucontext_t
结构及相关库函数,然而该实现中涉及的FPU(浮点运算器),MXCSR(控制状态寄存器)在服务器端编程中极少用到,因此对这些寄存器的保存与恢复造成了不必要的性能消耗,这里参考了腾讯的libco的上下文切换并进行了精简:
.globl swap_ctx
.globl change_stack
.type swap_ctx, @function
swap_ctx:
leaq (%rsp),%rax
movq %rax, 64(%rdi)
movq %rbx, 56(%rdi)
movq 0(%rax), %rax
movq %rax, 48(%rdi)
#movq %rdi, 40(%rdi)
movq %rbp, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)
xorq %rax, %rax
movq 32(%rsi), %rbp
movq 64(%rsi), %rsp
movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 40(%rsi), %rdi
movq 56(%rsi), %rbx
leaq 8(%rsp), %rsp
pushq 48(%rsi)
ret
保存与恢复了参数寄存器rdi(rsi不用保存与恢复,rdi这个寄存器对应start_func
的参数S
,其实只需要保证第一次切入时能从上下文中取出来而已,也不需要保存操作,rcx,rdx,r8,r9理论上在本设计中无须保存),返回地址(每次切换上下文时保证能回到之前调用切换函数的地方继续运行),7个callee-saved寄存器:rbx,rsp,rbp,r12,r13,r14,r15。
用来取代的ucontext_t
的coctx
定义在coctx.h
中:
typedef struct coctx{
void *regs[9];
struct coctx *uc_link;
stack_t uc_stack;
}coctx;
stack_t
类型(见 bits/types/stack_t.h
)的uc_stack
和ucontext_t
中的作用一样,用于初始化上下文时确定栈空间,regs[9]
中即是上面提到的返回地址与8个寄存器。uc_link
用来指定当前协程运行结束后的上下文,在本库中,所有非主协程上下文中的uc_link
都指向了主协程的上下文。
值得一提的是,腾讯的libco并没有提供类似uc_link的实现,libco似乎默认所有的协程函数都是处于永不终止的循环中,并且简单粗暴的把所有协程的返回地址都指向了协程函数的入口。。直觉告诉我在这种实现下,如果库使用者的协程会运行到结尾,极有可能引发错误,但因为libco缺少详细文档,加上我并没有完全细究其代码,所以对这种情况我也无法断言必然出错,也许有什么容错机制吧。
因为对uc_link
的需求,所以我去看了一下glibc中makecontext
函数的源码,结合上面的coctx
结构体,实现了make_ctx
函数:
void make_ctx(coctx *ctx, co_start func, void *s){
auto *sp = (long long *)((uintptr_t)ctx->uc_stack.ss_sp + ctx->uc_stack.ss_size);
/* 16 Bytes room reserved for uc_link and start_ctx addr*/
sp -= 2;
/* Align stack, 8 Bytes room reserved for swap_ctx ret addr */
sp = (long long *)(((uintptr_t)sp & -16L) - 8);
ctx->regs[RSP] = (char *)sp;
ctx->regs[RBX] = (char *)&sp[2];
ctx->regs[RDI] = (char *)s;
ctx->regs[RET] = (char *)func;
sp[1] = (uintptr_t)start_ctx;
if (ctx->uc_link)
sp[2] = (uintptr_t)ctx->uc_link;
else
sp[2] = (uintptr_t)func;
}
除去一些对基本上下文的初始化,make_ctx
确保在栈底预留了16字节给uc_link
与start_ctx
函数地址,如下:
/*
high memory
uc_link <--stack base
start_ctx
start_func <--rsp
...
...
low memory
*/
make_ctx
将rsp
指向start_ctx
上面一个栈帧,在第一次调用swap_ctx
进入协程时,会在rsp
指向的栈帧中存入协程入口start_func
地址,然后ret
以进入协程,此时rsp
重新指向start_ctx
,栈由此向下扩展,当协程运行结束,rsp
指回start_ctx
,协程退出时ret
直接进入start_ctx
,其实现如下:
.globl start_ctx
.globl __set_ctx
.type start_ctx, @function
start_ctx:
movq (%rsp), %rdi
call __set_ctx
__set_ctx:
movq 32(%rdi), %rbp
movq 64(%rdi), %rsp
movq (%rdi), %r15
movq 8(%rdi), %r14
movq 16(%rdi), %r13
movq 24(%rdi), %r12
movq 40(%rdi), %rdi
movq 56(%rdi), %rbx
leaq 8(%rsp), %rsp
pushq 48(%rdi)
ret
将uc_link
取出至rdi
寄存器,然后调用__set_ctx
函数将上下文切换回主协程。
在实现uc_link
的过程中,我没有将rsp
寄存器设置到位,导致set_start
的地址被start_func
的地址覆盖了,debug时整出了uc_link
的另一种用法,即在协程中直接获取uc_link
的值并将其作为参数在协程结束前调用__set_ctx
从而完成跳转,两种实现性能基本无区别,默认使用第二种实现,如果想使用第一种实现,可以 #define USE_UC_LINK
。
协程库默认使用coctx这一实现,如果想要使用ucontext_t的实现,可以#define USE_SYS_UCONTEXT
。
性能比较
100,000,000次协程切换总时长:
类型 | USE_SYS_UCONTEXT | USE_UC_LINK | 时间(s) |
---|---|---|---|
共享运行栈 | 否 | 是 | 7.125 |
共享运行栈 | 否 | 否 | 7.230 |
共享运行栈 | 是 | 无作用 | 91.81 |
独立运行栈 | 否 | 是 | 5.749 |
独立运行栈 | 否 | 否 | 5.697 |
独立运行栈 | 是 | 无作用 | 89.52 |
通过对比我们可以看出使用没有FPU信息和MXCSR信息的coctx
代替glibc提供的ucontext_t
进行上下文切换后,整体速度提升超过了10倍。且独立运行栈的上下文切换速度也明显优于共享运行栈的上下文切换速度(在实际应用中,共享运行栈中含有大量数据时,这种切换速度的差距会更明显)。
简略谈谈libco
首先就是上面提到的libco的协程运行到结尾应该是没有任何回收措施的,很想找个鹅厂的前辈证实一下这个问题的答案。。
本来很想尝试像libco那样hook几个阻塞IO调用,但最近事情太多,暂时没那个精力去实现,这里简略地聊聊libco hook的大致思路吧。
libco通过LD_PRELOAD
(详见ld.so(8))优先于系统的库函数加载进内存,运行时调用的将是libco hook后的IO调用,而真实的系统IO调用的函数指针则通过dlsym(3)取得,比如真实的read调用的函数指针通过static read_pfn_t g_sys_read_func = (read_pfn_t)dlsym(RTLD_NEXT,"read");
获得,现在g_sys_read_func
指向真正的read
调用。
我们来看看hook后的read
的实现:
ssize_t read( int fd, void *buf, size_t nbyte )
{
HOOK_SYS_FUNC( read );
if( !co_is_enable_sys_hook() )
{
return g_sys_read_func( fd,buf,nbyte );
}
rpchook_t *lp = get_by_fd( fd );
if( !lp || ( O_NONBLOCK & lp->user_flag ) )
{
ssize_t ret = g_sys_read_func( fd,buf,nbyte );
return ret;
}
int timeout = ( lp->read_timeout.tv_sec * 1000 )
+ ( lp->read_timeout.tv_usec / 1000 );
struct pollfd pf = { 0 };
pf.fd = fd;
pf.events = ( POLLIN | POLLERR | POLLHUP );
int pollret = poll( &pf,1,timeout );
ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte );
if( readret < 0 )
{
co_log_err("CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d",
fd,readret,errno,pollret,timeout);
}
return readret;
}
对于设置了O_NONBLOCK
标志位的文件描述符,将直接通过g_sys_read_func
调用真实的read
,而对于阻塞的文件描述符,会加入poll轮询列表,等到poll返回后才通过g_sys_read_func
调用真实的read
,这里的poll
也是经过了hook改造的,所有的hook后的函数的核心都在于这个poll
,hook后的poll
内部实现是co_poll_inner
,co_poll_inne
会为文件描述符注册epoll事件,且如果设定了timeout
参数,co_poll_inner
还会将文件描述符相关的结构加入定时器队列,随后让出当前协程。当主协程中的co_eventloop
触发监听事件时抑或是定时器队列触发超时,都会将对应协程切回来,此时会发起第二次poll
,如果判断出是IO事件唤醒了协程,则会通过g_sys_read_func
调用真实的read
,如果判断出超时,则会重复之前的流程。
poll
背后的具体实现比较复杂,我也只理清了个大概,若是展开细讲,大概又是好几万字,只能像这样粗略地描述一下以作记录了。
Tips
- 注意共享栈协程和独立栈协程的选择,共享栈协程因为栈数据的保存与恢复操作,会使得性能下降,独立栈协程则更容易产生栈溢出的风险以及大量内存的占用。
- 在共享栈协程中使用异步IO调用时,必须使用堆中分配的内存作为缓冲区,不可以将栈上的空间作为缓冲区,因为一旦协程挂起,运行栈就不再归属于该协程。在独立栈协程中则无此限制。
- 在本库提供的协程中最好使用非阻塞调用而非阻塞调用,阻塞调用会使协程失去存在的必要性。(因为没有像libco一样实现对常用网络编程函数的hook,所以程序使用的第三方库中如果使用了阻塞调用,同样会使协程失去存在必要性)。
Last modified on 2021-01-31