目的

  1. 使用示例,函数说明
  2. 协程切换如何实现
  3. 支持哪些异步事件

一 协程相关接口

1. 相关数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 协程结构体
struct stCoRoutine_t
{
stCoRoutineEnv_t *env;
pfn_co_routine_t pfn; // 协程入口函数
void *arg; // 协程参数
coctx_t ctx; // 协程上下文,在协程切换时保存寄存器值

// 协程状态标记
char cStart;
char cEnd;
char cIsMain;
char cEnableSysHook;
char cIsShareStack;

void *pvEnv;

//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem;

// save stack buffer while conflict on same stack_buffer;
char* stack_sp;
unsigned int save_size;
char* save_buffer;

// 协程局部变量
stCoSpec_t aSpec[1024];
};

// 协程运行环境,可以理解为虚拟机
// 每个协程必定在一个 stCoRoutineEnv_t 中运行
struct stCoRoutineEnv_t
{
// 协程调用栈,最多 128 层
// 这个调用栈的含义是 coroutine-1 创建 coroutine-2... 最大深度
stCoRoutine_t *pCallStack[ 128 ];
int iCallStackSize;

// 事件循环驱动
stCoEpoll_t *pEpoll;

//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};

struct stCoEpoll_t
{
int iEpollFd;
static const int _EPOLL_SIZE = 1024 * 10;

struct stTimeout_t *pTimeout;

struct stTimeoutItemLink_t *pstTimeoutList;

struct stTimeoutItemLink_t *pstActiveList;

co_epoll_res *result;
};

2. 执行环境

协程需要运行在“执行环境”或“虚拟机”中,在使用 libco 时可以不要显示创建协程执行环境,每个线程都会有个默认执行环境“gCoEnvPerThread”。使用 libco 可以使用多线程多协程模式,每个线程与 CPU 绑定,每个线程有独立的协程执行环境。

创建执行环境时会创建一个主协程,不做任何事情,只用来与其他协程进行切换。协程中的 coctx_t 会保存协程切换时的寄存器状态,当再次切换到协程时会执行 co_swap 之后代码。这样就实现了用同步的视角实现异步逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL;

void co_init_curr_thread_env()
{
gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = gCoEnvPerThread;

env->iCallStackSize = 0;
// 创建主协程
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL, NULL );
self->cIsMain = 1;

env->pending_co = NULL;
env->occupy_co = NULL;

coctx_init( &self->ctx );

env->pCallStack[ env->iCallStackSize++ ] = self;

// epoll fd
stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}

3. 创建协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*******************************************************************************
* @param co stCoRoutine_t** 创建的协程对象,出参
* @param attr stCoRoutineAttr_t 协程属性,入参,定义栈大小、共享栈属性
* @param routine void*(*func)(void*) 协程执行函数
* @param arg void* 协程执行参数
* @return 返回值无意义
******************************************************************************/
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}

/*******************************************************************************
* 与 co_create 类似,附加参数 env 指定了创建协程的执行环境
* @param env stCoRoutineEnv_t* 协程执行环境
* @param co stCoRoutine_t** 创建的协程对象,出差
* @param attr stCoRoutineAttr_t 协程属性,入参,定义栈大小、共享栈属性
* @param routine void*(*func)(void*) 协程执行函数
* @param arg void* 协程执行参数
* @return 返回值无意义
******************************************************************************/
struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t *env, const stCoRoutineAttr_t* attr, pfn_co_routine_t pfn, void *arg )
{

stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}

if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}

stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );

memset( lp,0,(long)(sizeof(stCoRoutine_t)));

lp->env = env; // 协程执行虚拟机
lp->pfn = pfn; // 执行函数
lp->arg = arg; // 执行参数

// 栈内存分配,使用共享栈内存或独立栈内存
// 如果使用共享栈内存会与多个协程共享栈内存空间
stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;

lp->ctx.ss_sp = stack_mem->stack_buffer;
lp->ctx.ss_size = at.stack_size;

lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
lp->cIsShareStack = at.share_stack != NULL;

lp->save_size = 0;
lp->save_buffer = NULL;

return lp;
}

4. 启动协程

使用 co_resume 可以启动协程,在 co_resume 会先创建协程执行上下文。当事件触发时,会再次切换到当前协程,此时调用 CoRoutineFunc 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*******************************************************************************
* 当前执行协程让出,切换到 co 协程进行运行
* @param co stCoRoutine_t 待运行协程
******************************************************************************/
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
// lpCurrRoutine 是正在执行协程
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];

// 协程未启动
if( !co->cStart )
{
// 创建协程调用上下文,使用 CoRoutineFunc 函数地址作为执行栈,协程 co,0 作为 CoRoutineFunc 函数的参数。
// CoRoutineFunc 函数作为所有协程的 wrapper 函数。
// coctx_make 需要对 32 位、64 位做兼容,两种的函数调用方式是不同的。
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc, co, 0 );
co->cStart = 1;
}

// 将当前协程放在执行虚拟机栈上,当前协程是执行协程
env->pCallStack[ env->iCallStackSize++ ] = co;

// 协程切换,执行当前协程 co
co_swap( lpCurrRoutine, co );
}

#if defined(__i386__)
int coctx_make(coctx_t* ctx, coctx_pfn_t pfn, const void* s, const void* s1) {
char* sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);
sp = (char*)((unsigned long)sp & -16L);

coctx_param_t* param = (coctx_param_t*)sp;
void** ret_addr = (void**)(sp - sizeof(void*) * 2);
*ret_addr = (void*)pfn;
// 参数在栈上
param->s1 = s;
param->s2 = s1;

memset(ctx->regs, 0, sizeof(ctx->regs));

// 在 32 位模式下,使用栈传递参数。pfn 函数有两个参数,栈顶需要移动两个指针长度。
ctx->regs[kESP] = (char*)(sp) - sizeof(void*) * 2;
return 0;
}
#elif defined(__x86_64__)
int coctx_make(coctx_t* ctx, coctx_pfn_t pfn, const void* s, const void* s1) {
// 栈顶, 64 位使用寄存器传递参数,寄存器不够时才会使用栈。
// pfn 函数只有两个参数,直接使用寄存器,不会在栈上存储参数。
char* sp = ctx->ss_sp + ctx->ss_size - sizeof(void*);
sp = (char*)((unsigned long)sp & -16LL);

// 寄存器置零
memset(ctx->regs, 0, sizeof(ctx->regs));
// 函数地址
void** ret_addr = (void**)(sp);
*ret_addr = (void*)pfn;

ctx->regs[kRSP] = sp;

// kRETAddr 用于存储 IP 寄存器地址。
// 在 coctx_swap 调用中会将 regs[kRETAddr] 入栈,在 coctx_swap 的最后有 ret 指令,触发 pop ip,跳转到 pfn 处执行。
ctx->regs[kRETAddr] = (char*)pfn;

// pfn 函数的第一个参数
ctx->regs[kRDI] = (char*)s;
// pfn 函数的第二个参数
ctx->regs[kRSI] = (char*)s1;
return 0;
}
#endif

static int CoRoutineFunc( stCoRoutine_t *co,void * )
{
if( co->pfn )
{
co->pfn( co->arg );
}
co->cEnd = 1;

stCoRoutineEnv_t *env = co->env;

co_yield_env( env );

return 0;
}

函数 co_swap 是协程切换核心,使用汇编实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
.globl coctx_swap
#if !defined( __APPLE__ )
.type coctx_swap, @function
#endif
coctx_swap:

#if defined(__i386__)
# 4(%esp) 是调用 coctx_swap 函数的第一个参数
movl 4(%esp), %eax
# sp 是 ip 值,ret 指令使用
movl %esp, 28(%eax)
movl %ebp, 24(%eax)
movl %esi, 20(%eax)
movl %edi, 16(%eax)
movl %edx, 12(%eax)
movl %ecx, 8(%eax)
movl %ebx, 4(%eax)

# 8(%esp) 是调用 coctx_swap 函数的第二个参数
movl 8(%esp), %eax
movl 4(%eax), %ebx
movl 8(%eax), %ecx
movl 12(%eax), %edx
movl 16(%eax), %edi
movl 20(%eax), %esi
movl 24(%eax), %ebp
# 设置栈顶值,当 ret 指令调用时会触发 pop ip,恢复原先的执行
movl 28(%eax), %esp

ret

#elif defined(__x86_64__)
# 函数调用传参,优先将参数传递给寄存器,当寄存器不够用时,会丛右到左压栈,然后再传参给寄存器。
# %rdi,%rsi,%rdx,%rcx,%r8,%r9, 这6个不够用的时候才会借用栈。
# 所以 %rdi 是第一个参数,是保存寄存器组内存的首地址。

leaq (%rsp), %rax
movq %rax, 104(%rdi)
movq %rbx, 96(%rdi)
movq %rcx, 88(%rdi)
movq %rdx, 80(%rdi)
movq 0(%rax), %rax
movq %rax, 72(%rdi)
movq %rsi, 64(%rdi)
movq %rdi, 56(%rdi)
movq %rbp, 48(%rdi)
movq %r8, 40(%rdi)
movq %r9, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)
xorq %rax, %rax

movq 48(%rsi), %rbp
movq 104(%rsi), %rsp
movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 56(%rsi), %rdi
movq 80(%rsi), %rdx
movq 88(%rsi), %rcx
movq 96(%rsi), %rbx
leaq 8(%rsp), %rsp
pushq 72(%rsi) // ip 入栈

movq 64(%rsi), %rsi
ret
#endif

5. 让出执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*******************************************************************************
* 当前协程让出,上层调用栈协程运行
* @param co stCoRoutine_t 待让出协程
******************************************************************************/
void co_yield( stCoRoutine_t *co )
{
co_yield_env( co->env );
}

void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];

env->iCallStackSize--;

co_swap( curr, last);
}

6. 设置/获取协程局部变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void *co_getspecific(pthread_key_t key)
{
stCoRoutine_t *co = GetCurrThreadCo();
if( !co || co->cIsMain )
{
return pthread_getspecific( key );
}
return co->aSpec[ key ].value;
}

int co_setspecific(pthread_key_t key, const void *value)
{
stCoRoutine_t *co = GetCurrThreadCo();
if( !co || co->cIsMain )
{
return pthread_setspecific( key,value );
}
co->aSpec[ key ].value = (void*)value;
return 0;
}

7. 协程释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*******************************************************************************
* 释放协程对象 co
* @param co stCoRoutine_t 待释放协程对象
******************************************************************************/
void co_release( stCoRoutine_t *co )
{
co_free( co );
}

void co_free( stCoRoutine_t *co )
{
if (!co->cIsShareStack)
{
free(co->stack_mem->stack_buffer);
free(co->stack_mem);
}
else
{
if(co->save_buffer)
free(co->save_buffer);

if(co->stack_mem->occupy_co == co)
co->stack_mem->occupy_co = NULL;
}

free( co );
}

/*******************************************************************************
* @param co stCoRoutine_t 协程对象
******************************************************************************/
void co_reset(stCoRoutine_t * co)
{
if(!co->cStart || co->cIsMain)
return;

co->cStart = 0;
co->cEnd = 0;

if(co->save_buffer)
{
free(co->save_buffer);
co->save_buffer = NULL;
co->save_size = 0;
}

// 如果共享栈被当前协程占用,要释放占用标志,否则被切换,会执行save_stack_buffer()
if(co->stack_mem->occupy_co == co)
co->stack_mem->occupy_co = NULL;
}

二 事件循环

1. 等待事件触发

函数 co_poll 用来同步或异步等待事件触发。当前协程希望监听某一事件时,向协程执行环境的事件循环实例注册事件监听、超时监听,同时进行协程切换。当事件触发时会切换到当前协程的协程切换点执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*******************************************************************************
* 向 event_fd 添加监控事件,之所以使用 fds 数组是为了与 poll 函数兼容。
* 虽然参数格式是数组,但是每次调用都添加一个文件描述符,如果使用多个文件描述符会共享同一个超时时间。
* @param ctx stCoEpoll_t 协程对象
* @param fds struct pollfd 待添加文件描述符
* @param nfds nfds_t 待添加事件数量
* @param timeout_ms int 事件超时时间
******************************************************************************/
int co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout_ms )
{
return co_poll_inner(ctx, fds, nfds, timeout_ms, NULL);
}

typedef int (*poll_pfn_t)(struct pollfd fds[], nfds_t nfds, int timeout);
int co_poll_inner( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout, poll_pfn_t pollfunc)
{
if (timeout == 0)
{
return pollfunc(fds, nfds, timeout);
}
if (timeout < 0)
{
timeout = INT_MAX;
}
int epfd = ctx->iEpollFd;

// co_self 获取当前正在执行协程
stCoRoutine_t* self = co_self();

// 1.struct change
stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));
memset( &arg, 0, sizeof(arg) );

arg.iEpollFd = epfd;
arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd));
arg.nfds = nfds;

stPollItem_t arr[2];
if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack)
{
arg.pPollItems = arr;
}
else
{
arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );
}
memset( arg.pPollItems, 0, nfds * sizeof(stPollItem_t) );

arg.pfnProcess = OnPollProcessEvent;
arg.pArg = GetCurrCo( co_get_curr_thread_env() );

// 2. 增加监听事件
for(nfds_t i=0;i<nfds;i++)
{
arg.pPollItems[i].pSelf = arg.fds + i;
arg.pPollItems[i].pPoll = &arg;

arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
struct epoll_event &ev = arg.pPollItems[i].stEvent;

if( fds[i].fd > -1 )
{
ev.data.ptr = arg.pPollItems + i;
ev.events = PollEvent2Epoll( fds[i].events );

int ret = co_epoll_ctl( epfd,EPOLL_CTL_ADD, fds[i].fd, &ev );
if (ret < 0 && errno == EPERM && nfds == 1 && pollfunc != NULL)
{
if( arg.pPollItems != arr )
{
free( arg.pPollItems );
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
return pollfunc(fds, nfds, timeout);
}
}
}

// 3. 增加超时事件
unsigned long long now = GetTickMS();
arg.ullExpireTime = now + timeout;
int ret = AddTimeout( ctx->pTimeout, &arg,now );
int iRaiseCnt = 0;
if( ret != 0 )
{
co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
ret,now,timeout,arg.ullExpireTime);
errno = EINVAL;
iRaiseCnt = -1;
}
else
{
// 协程切换
co_yield_env( co_get_curr_thread_env() );
iRaiseCnt = arg.iRaiseCnt;
}

{
// 移除超时事件
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &arg );

// 移除监听事件
for(nfds_t i = 0;i < nfds;i++)
{
int fd = fds[i].fd;
if( fd > -1 )
{
co_epoll_ctl( epfd,EPOLL_CTL_DEL,fd,&arg.pPollItems[i].stEvent );
}
fds[i].revents = arg.fds[i].revents;
}

if( arg.pPollItems != arr )
{
free( arg.pPollItems );
arg.pPollItems = NULL;
}

free(arg.fds);
free(&arg);
}

return iRaiseCnt;
}

void OnPollProcessEvent( stTimeoutItem_t * ap )
{
stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
co_resume( co );
}

void OnPollPreparePfn( stTimeoutItem_t * ap,struct epoll_event &e,stTimeoutItemLink_t *active )
{
stPollItem_t *lp = (stPollItem_t *)ap;
// 触发的事件
lp->pSelf->revents = EpollEvent2Poll( e.events );

stPoll_t *pPoll = lp->pPoll;
pPoll->iRaiseCnt++;

// 有一个事件触发后就会加入活跃事件队列,iAllEventDetach 成员用于避免加入多次
// 此处业务使用 co_poll 时应该添加对一个事件的监听,而非增加多个事件原因,导致了代码逻辑混乱
if( !pPoll->iAllEventDetach )
{
pPoll->iAllEventDetach = 1;
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( pPoll );
AddTail( active,pPoll );
}
}

2. 主协程事件循环

主协程事件循环处理用于协程调度,其核心是使用 epoll_wait 等待事件(异步事件、超时事件)触发,执行事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*******************************************************************************
* 主协程事件处理循环
* @param ctx stCoEpoll_t 协程对象
* @param pfn pfn_co_eventloop_t 退出检查函数
* @param arg void* 退出检查函数参数
******************************************************************************/
void co_eventloop( stCoEpoll_t *ctx, pfn_co_eventloop_t pfn, void *arg )
{
if( !ctx->result )
{
ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
}
co_epoll_res *result = ctx->result;

for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd, result,stCoEpoll_t::_EPOLL_SIZE, 1 );

stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);

memset( timeout,0,sizeof(stTimeoutItemLink_t) );

// 触发事件队列
for(int i=0; i<ret; i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare )
{
item->pfnPrepare( item, result->events[i], active );
}
else
{
AddTail( active, item );
}
}

// 超时事件队列
unsigned long long now = GetTickMS();
TakeAllTimeout( ctx->pTimeout, now, timeout );

stTimeoutItem_t *lp = timeout->head;
while( lp )
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true;
lp = lp->pNext;
}

// 将超时事件队列添加到触发事件队列之后
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

// 处理所有事件
lp = active->head;
while( lp )
{
// 从队列移除事件
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if (lp->bTimeout && now < lp->ullExpireTime)
{
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret)
{
lp->bTimeout = false;
lp = active->head;
continue;
}
}

// 执行事件处理函数
if( lp->pfnProcess )
{
lp->pfnProcess( lp );
}

lp = active->head;
}

// 事件循环退出检查函数
if( pfn )
{
if( -1 == pfn( arg ) )
{
break;
}
}

}
}

3. 系统调用 hook

co_enable_hook_sys 函数用于在阻塞的系统调用时进行协程切换。当开启时,在 readsend 等系统调用点会调用 co_poll 触发协程切换。

1
2
3
4
5
6
7
/*******************************************************************************
* 开启、关闭系统函数 hook
* 当开启系统函数 hook 时会调用 poll 函数触发协程切换。当事件触发时,协程切换回来,继续执行。
******************************************************************************/
void co_enable_hook_sys();
void co_disable_hook_sys();
bool co_is_enable_sys_hook();

co_hook_sys_call.cpp 中实现了系统 hook 功能,libco 中定义了 readsend 等函数。以 read 为例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ssize_t read( int fd, void *buf, size_t nbyte )
{
HOOK_SYS_FUNC( read );

if( !co_is_enable_sys_hook() )
{
return g_sys_read_func( fd,buf,nbyte );
}
... // 忽略无关代码
int pollret = poll( &pf,1,timeout );
ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte );
if( readret < 0 )
{
co_log_err("CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d",
fd,readret,errno,pollret,timeout);
}

return readret;
}

HOOK_SYS_FUNC 是宏定义,用生成 g_sys_xxx_func 变量,其值指向系统调用的地址。

1
#define HOOK_SYS_FUNC(name) if( !g_sys_##name##_func ) { g_sys_##name##_func = (name##_pfn_t)dlsym(RTLD_NEXT,#name); }

4. libco 协程使用

co_pollco_eventloop 代码会思考该怎样使用 libco 进行事件处理。如果没有协程,异步事件处理应该是:一进程或线程添加事件监听,二进行 epoll_wait 等待事件触发,三处理事件。以 CS 模式为例,在 libco 中应该对 listenfd 创建一个协程、每个 accept 的连接创建一个协程,并将其添加到主协程事件循环中。

三 协程同步

libco 提供了条件变量原语,因为在 libco 中是主动调度,非抢占式调度,因此不必担心竞争状态,实现起来也比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
typedef void (*OnPreparePfn_t)( stTimeoutItem_t *,struct epoll_event &ev, stTimeoutItemLink_t *active );
typedef void (*OnProcessPfn_t)( stTimeoutItem_t *);
struct stTimeoutItem_t
{

enum
{
eMaxTimeout = 40 * 1000 //40s
};
stTimeoutItem_t *pPrev;
stTimeoutItem_t *pNext;
stTimeoutItemLink_t *pLink;

unsigned long long ullExpireTime;

OnPreparePfn_t pfnPrepare;
OnProcessPfn_t pfnProcess;

void *pArg; // routine
bool bTimeout;
};

struct stCoCondItem_t
{
stCoCondItem_t *pPrev;
stCoCondItem_t *pNext;
stCoCond_t *pLink;

stTimeoutItem_t timeout;
};
struct stCoCond_t
{
stCoCondItem_t *head;
stCoCondItem_t *tail;
};

stCoCond_t *co_cond_alloc();
int co_cond_free( stCoCond_t * cc );

/*******************************************************************************
* 协程条件变量,用于协程之间的同步
******************************************************************************/
int co_cond_signal( stCoCond_t * );
int co_cond_broadcast( stCoCond_t * );
int co_cond_timedwait( stCoCond_t *,int timeout_ms );

参考资料