OpenResty Api - worker id
NGINX
C, Lua, NGINX, OpenResty
字数统计: 1.2k(字)
阅读时长: 5(分)
一 概述
在使用 OpenResty 实现业务逻辑时经常需要只在一个 worker 内执行某项动作, 例如读取配置文件到共享内存. 可以使用 ngx.worker.id
函数获取 worker 序号, 相对于 ngx.worker.pid
得到进程 pid 更有用.
获取 worker 序号, 进程 pid 功能是使用 FFI 技术, 由 resty.core.worker
包对外提供服务(实现 FFI 库时应该使用此种方式, 避免在业务代码中直接通过 FFI 调用 C 函数).
ngx.worker.pid
调用 lua-nginx
模块的 ngx_http_lua_ffi_worker_pid
函数(从全局变量 ngx_pid 中获得进程 pid); ngx.worker.id
调用 lua-nginx
模块的 ngx_http_lua_ffi_worker_id
函数.
对于 ngx.worker.id
进程序号需要考虑怎么给进程分配序号这件事情, 序号什么时候会发生变化.
首先, 给进程分配序号是在 worker 启动时确定的: master 根据配置的 worker 进程数循环计数, 并将此计数作为参数传递给 worker 进程, worker 进程获得的参数就是进程序号. 第二, 其实第一点确定了序号是怎么产生的, 那只需要什么时候 worker 会启动, 分析此时启动 worker 的参数即可确定.
在 nginx 进程生命周期中有三种情况会启动 worker 进程: 初始运行(new binary 等同初始运行), 热加载, master 拉起挂掉 worker. 初始运行与热加载相同逻辑, 都需要启动一组新的 worker 进程; worker 挂掉后 master 重新拉起 worker 时会使用进程数组中保存的信息启动 worker, 会与原槽位的 worker 持有相同的序号.
二 分配序号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) { ngx_int_t i; ngx_channel_t ch;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");
ngx_memzero(&ch, sizeof(ngx_channel_t));
ch.command = NGX_CMD_OPEN_CHANNEL;
for (i = 0; i < n; i++) {
ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type);
ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch); } } static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) { ngx_int_t worker = (intptr_t) data; ngx_worker = worker;
ngx_worker_process_init(cycle, worker);
ngx_setproctitle("worker process"); ... }
|
三 重新拉起 worker
在启动时 nginx 会注册 SIGCHLD
信号处理函数, 当 worker 挂掉时会调用信号处理函数, 设置 master 进程全局变量 ngx_reap=1
. 在 master 进程的主循环逻辑中根据 ngx_reap
变量进入 ngx_reap_children
处理.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
| static ngx_uint_t ngx_reap_children(ngx_cycle_t *cycle) { ngx_int_t i, n; ngx_uint_t live; ngx_channel_t ch; ngx_core_conf_t *ccf;
ngx_memzero(&ch, sizeof(ngx_channel_t));
ch.command = NGX_CMD_CLOSE_CHANNEL; ch.fd = -1;
live = 0; for (i = 0; i < ngx_last_process; i++) {
ngx_log_debug7(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "child: %i %P e:%d t:%d d:%d r:%d j:%d", i, ngx_processes[i].pid, ngx_processes[i].exiting, ngx_processes[i].exited, ngx_processes[i].detached, ngx_processes[i].respawn, ngx_processes[i].just_spawn);
if (ngx_processes[i].pid == -1) { continue; }
if (ngx_processes[i].exited) { ... if (ngx_processes[i].respawn && !ngx_processes[i].exiting && !ngx_terminate && !ngx_quit) { if (ngx_spawn_process(cycle, ngx_processes[i].proc, ngx_processes[i].data, ngx_processes[i].name, i) == NGX_INVALID_PID) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "could not respawn %s", ngx_processes[i].name); continue; }
ch.command = NGX_CMD_OPEN_CHANNEL; ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch);
live = 1;
continue; }
...
if (i == ngx_last_process - 1) { ngx_last_process--;
} else { ngx_processes[i].pid = -1; }
} else if (ngx_processes[i].exiting || !ngx_processes[i].detached) { live = 1; } }
return live; }
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) { u_long on; ngx_pid_t pid; ngx_int_t s;
... ngx_process_slot = s;
pid = fork();
switch (pid) {
case -1: ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "fork() failed while spawning \"%s\"", name); ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID;
case 0: ngx_parent = ngx_pid; ngx_pid = ngx_getpid(); proc(cycle, data); break;
default: break; }
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);
ngx_processes[s].pid = pid; ngx_processes[s].exited = 0;
if (respawn >= 0) { return pid; }
ngx_processes[s].proc = proc; ngx_processes[s].data = data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0;
switch (respawn) {
case NGX_PROCESS_NORESPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break;
case NGX_PROCESS_JUST_SPAWN: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break;
case NGX_PROCESS_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 0; break;
case NGX_PROCESS_JUST_RESPAWN: ngx_processes[s].respawn = 1; ngx_processes[s].just_spawn = 1; ngx_processes[s].detached = 0; break;
case NGX_PROCESS_DETACHED: ngx_processes[s].respawn = 0; ngx_processes[s].just_spawn = 0; ngx_processes[s].detached = 1; break; }
if (s == ngx_last_process) { ngx_last_process++; }
return pid; }
|
四 获取进程序号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| int ngx_http_lua_ffi_worker_id(void) { #if (nginx_version >= 1009001) if (ngx_process != NGX_PROCESS_WORKER && ngx_process != NGX_PROCESS_SINGLE) { return -1; }
return (int) ngx_worker; #else return -1; #endif }
|