一 功能 ngx_http_dyups_module
模块能够在不进行 reload
的情况下动态更新 upstream
配置。
二 指令 1. dyups_interface 1 2 3 Syntax: dyups_interface Default: `none` Context: `loc`
使用当前 location
处理 upstream{}
的查看、添加、删除操作。
2. dyups_read_msg_timeout 1 2 3 Syntax: dyups_read_msg_timeout `time` Default: `1s` Context: `main`
worker
进程读取共享内存中操作指令的间隔时间。dyups
模块中每个 worker
收到外部指令后需要将指令同步到共享内存,其他 worker
将指令处理一遍,同步各个 worker
间的状态。
3. dyups_shm_zone_size 1 2 3 Syntax: dyups_shm_zone_size `size` Default: `2MB` Context: `main`
dyups
模块使用共享内存保存指令,dyups_shm_zone_size
指令设置共享内存大小。
4. dyups_trylock 1 2 3 Syntax: dyups_trylock `on | off` Default: `off` Context: `main`
5. dyups_read_msg_log 1 2 3 Syntax: dyups_read_msg_log `on | off` Default: `off` Context: `main`
控制 dyups
模块执行指令是否输出日志。
三 调用接口 当使用 dyups_interface
配置 location
处理指令后,当前 location
可以处理 restful
类型的指令。
GET 请求 1. 查看主机详细信息 路径:/detail
输出所有的 upstream{}
以及其中的 server
。
2. 查看所有 upstrem{}
路径: /list
输出所有的 upstream{}
。
3. 查看单个 upstream
信息 路径:/upstream/name
根据 upstream{}
的名字查找其配置,输出其中的所有 server
。
POST 请求 1. 更新 upstream
中 server
列表 路径:/upstream/name
更新 name
指定的 upstream{}
中的 server
列表。会创建一个新的 upstream{}
配置,使用 body
中指定的 server
。body
内数据格式为:
DELETE 1. 删除某个 upstream
路径:/upstream/name
删除由 name
指定的 upstream{}
。会将 upstream{}
的 dyups
配置标记为 deleting
,待引用计数为零时标记为 deleted
,之后就可以重用共享内存。
四 安装测试 1. 安装 1 2 3 4 5 6 7 cd nginx-1.17.2./configure \ --add-module=/opt/src/tengine-2.3.1/modules/ngx_http_upstream_dyups_module make make install
2. 配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 # vi $NGINX_PATH/conf/dyups/dyups.conf #user nobody; worker_processes 1; #error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info; #pid logs/nginx.pid; events { worker_connections 1024; } http { log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; access_log logs/access.log main; dyups_upstream_conf dyups_upstream.conf; include dyups_upstream.conf; server { listen 8080; # location 会根据 host 变量做代理,默认 host 变量会从请求域名、从请求头 host 中取值 location / { proxy_pass http://$host; } } server { listen 8088; location / { return 200 "8088"; } } server { listen 8089; location / { return 200 "8089"; } } server { listen 8081; # dyups 操作接口 location / { dyups_interface; } } }
1 2 3 4 5 6 7 8 # vi $NGINX_PATH/conf/dyup/dyup_upstream.conf upstream host1 { server 127.0.0.1:8088; } upstream host2 { server 127.0.0.1:8089; }
3. 测试命令 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 sudo /usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/dyups/dyups.conf $curl -H "host: host1" 127.0.0.1:80808088 $curl -H "host: host2" 127.0.0.1:80808089 $curl 127.0.0.1:8081/detailhost1 server 127.0.0.1:8088 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0 host2 server 127.0.0.1:8089 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0 curl -d "server 127.0.0.1:8089;server 127.0.0.1:8088;" 127.0.0.1:8081/upstream/dyhost $curl 127.0.0.1:8081/detailhost1 server 127.0.0.1:8088 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0 host2 server 127.0.0.1:8089 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0 dyhost server 127.0.0.1:8089 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0 server 127.0.0.1:8088 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0 $curl -H "host: dyhost" 127.0.0.1:80808089 $curl -H "host: dyhost" 127.0.0.1:80808088 $curl -d "server 127.0.0.1:8089;" 127.0.0.1:8081/upstream/dyhost$curl 127.0.0.1:8081/detailhost1 server 127.0.0.1:8088 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0 host2 server 127.0.0.1:8089 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0 dyhost server 127.0.0.1:8089 weight=1 max_conns=0 max_fails=1 fail_timeout=10 backup=0 down=0
五 实现 dyups
模块提供 dyups_interface
指令将 location
做为 dyups
对外接口,提供对 upstream
的查看、新增、删除管理功能。在 dyups
内部,使用定时器、基于共享内存的消息队列实现 upstream
的同步。
全局变量 ngx_http_dyups_api_enable
用来标识当前进程是否启用 dyups
功能。ngx_http_dyups_deleted_upstream
是 postconfig
阶段创建的冗余配置,在执行删除指令时会将 upstream
的配置更新为 ngx_http_dyups_deleted_upstream
,非共享内存。ngx_http_dyups_shm_generation
用来作为 dyups
的代,在共享初始化时使用。每次进行 reload
操作,ngx_http_dyups_shm_generation
会进行累加,reload
创建的进程会使用当前“代”的共享内存进行处理。ngx_dyups_global_ctx
作为全局配置用来存储 slab
、共享内存配置、消息处理定时器,在共享内存初始化、进程初始化时进行初始化。
1. 共享内存 在 dyups
代中并没有对某个”代”共享内存的清理工作(reload
时会释放内存),其实观察共享内存的初始化回调可以发现 dyups
并没有在共享内存上创建附加的管理信息, ngx_dyups_global_ctx
是进程中的变量,通过 shpool
使用共享内存。在共享内存中除了 sh
之外,只有消息队列中的消息。在消息被销毁是会将内存归还共享内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 static ngx_int_t ngx_http_dyups_init_shm_zone (ngx_shm_zone_t *shm_zone, void *data) { ngx_slab_pool_t *shpool; ngx_dyups_shctx_t *sh; shpool = (ngx_slab_pool_t *) shm_zone->shm.addr; sh = ngx_slab_alloc(shpool, sizeof (ngx_dyups_shctx_t )); if (sh == NULL ) { return NGX_ERROR; } ngx_dyups_global_ctx.sh = sh; ngx_dyups_global_ctx.shpool = shpool; ngx_queue_init(&sh->msg_queue); sh->version = 0 ; sh->status = NULL ; return NGX_OK; } static void ngx_dyups_destroy_msg (ngx_slab_pool_t *shpool, ngx_dyups_msg_t *msg) { if (msg->pid) { ngx_slab_free_locked(shpool, msg->pid); } if (msg->name.data) { ngx_slab_free_locked(shpool, msg->name.data); } if (msg->content.data) { ngx_slab_free_locked(shpool, msg->content.data); } ngx_slab_free_locked(shpool, msg); }
2. list/detail/upstream
请求处理 list/detail/upstram
用来查看 dyups
中 upstream
、server
的信息。在调用指令时先进行消息处理,待状态同步后才进行请求处理。
3. 删除 upstream
请求处理 删除操作通过 ngx_dyups_delete_upstream
函数实现,函数流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 --------------- | shpool 加锁 | --------------- | V ------------------- | 处理消息队列中消息 | ------------------- | V ------------------- | 执行 delete 操作 | ------------------- | V --------------------------- | 发送 delete 指令到消息队列 | --------------------------- | V ------------------ | 处理结束 | ------------------
删除操作只是将当前 upstream
的配置(ngx_http_upstream_srv_conf_t
)设置为一个无存活 server
的配置。如果开启健康检查,会调用健康检查模块的函数,将原先配置中的 server
从健康检查中删除。 在发送消息时,消息发送方会将消息的进程数组中第一个槽标记为当前进程 pid
,可以通过此值寻找消息发送方。
在执行删除 upstream
指令时并未立即删除 upstream
的内存配置,只是将其标记为 NGX_DYUPS_DELETING
状态。这是因为当前内存可能被请求使用,只有在当前内存的引用计数为零时才会删除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 static ngx_http_dyups_srv_conf_t *ngx_dyups_find_upstream (ngx_str_t *name, ngx_int_t *idx) { ngx_uint_t i; ngx_http_dyups_srv_conf_t *duscfs, *duscf, *duscf_del; ngx_http_dyups_main_conf_t *dumcf; ngx_http_upstream_srv_conf_t *uscf; dumcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, ngx_http_dyups_module); *idx = -1 ; duscf_del = NULL ; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ngx_cycle->log , 0 , "[dyups] find dynamic upstream" ); duscfs = dumcf->dy_upstreams.elts; for (i = 0 ; i < dumcf->dy_upstreams.nelts; i++) { duscf = &duscfs[i]; if (!duscf->dynamic) { continue ; } if (duscf->deleted == NGX_DYUPS_DELETING) { ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log , 0 , "[dyups] find upstream idx: %ui ref: %ui " "on %V deleting" , i, *(duscf->ref), &duscf->upstream->host); if (*(duscf->ref) == 0 ) { ngx_log_error(NGX_LOG_INFO, ngx_cycle->log , 0 , "[dyups] free dynamic upstream in find upstream" " %ui" , duscf->idx); duscf->deleted = NGX_DYUPS_DELETED; if (duscf->pool) { ngx_destroy_pool(duscf->pool); duscf->pool = NULL ; } } } if (duscf->deleted == NGX_DYUPS_DELETING) { continue ; } if (duscf->deleted == NGX_DYUPS_DELETED) { *idx = i; duscf_del = duscf; continue ; } uscf = duscf->upstream; if (uscf->host.len != name->len || ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len) != 0 ) { continue ; } *idx = i; return duscf; } return duscf_del; }
4. 更新 upstream
请求处理 更新操作通过 ngx_http_dyups_body_handler
函数实现,因为更新操作需要根据包体中的信息进行更新,因此首先要将请求包体信息读出。函数流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 --------------- | 读取请求包体 | --------------- | V --------------- | 执行更新操作 | --------------- | V --------------- | shpool 加锁 | --------------- | V ------------------- | 处理消息队列中消息 | ------------------- | V --------------------------------------------------- | 使用假的 upstream 名进行更新操作,验证请求数据是否正确 | --------------------------------------------------- | V -------------- | 执行更新操作 | -------------- | V --------------------------- | 删除旧的 upstream 配置 | --------------------------- | V --------------------------- | 创建新的 upstream 配置 | --------------------------- | V --------------------------- | 包体中 server 配置创建 | --------------------------- | V ---------------------------------------------- | 对新创建的 upstream 进行初始化 | | 遍历所有的 HTTP 模块调用 create_srv_conf 函数 | ---------------------------------------------- | V --------------------------- | 发送 ADD 指令到消息队列 | --------------------------- | V ------------------ | 处理结束 | ------------------
5. 消息队列 基于共享内存的消息队列在共享内存初始化回调中创建,此时消息队列为空,在处理删除、新增指令时会向消息队列新增消息,消息被 worker
处理过之后会增加处理次数,如果次数达到 worker_process
数,则消息会被删除。消息处理锁:shpool->mutex
。
在 worker
中,当消息处理定时器触发时会遍历消息队列,对其中的消息进行处理。每个 worker
会处理当前”代”中的所有消息,如果消息已经被所有 worker
处理过,会删除消息。
在进程退出时销毁所有动态配置的 pool
(基于内存,非共享内存),共享内存并未清理。这是因为创建的 timer
是不可取消事件,在 worker
退出时会阻止进程退出。在消息处理定时器触发,消息处理函数在将所有消息处理完毕后会添加定时器时,此时会判断当前进程的状态,如果进程为退出状态则不添加定时器,此时进程才会退出,保证了正常状况下共享内存能够被释放。
6. upstream
引用计数 为了保证 upstream
内存能够正确回收同时又不会影响现有请求,dyups
模块使用引用计数机制在使用 upstream
时增加 upstream
的引用计数,在连接上注册清理函数,当连接关闭时引用计数减少。
dyups
模块在向 upstream
添加 server
时会对 peer.init
函数进行封装,每次成功建立连接后增加 upstream
的引用计数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 static ngx_int_t ngx_dyups_add_server (ngx_http_dyups_srv_conf_t *duscf, ngx_buf_t *buf) { ngx_conf_t cf; ngx_http_upstream_init_pt init; ngx_http_upstream_srv_conf_t *uscf; ngx_http_dyups_upstream_srv_conf_t *dscf; uscf = duscf->upstream; if (uscf->servers == NULL ) { uscf->servers = ngx_array_create(duscf->pool, 4 , sizeof (ngx_http_upstream_server_t )); if (uscf->servers == NULL ) { return NGX_ERROR; } } ngx_memzero(&cf, sizeof (ngx_conf_t )); cf.name = "dyups_init_module_conf" ; cf.pool = duscf->pool; cf.cycle = (ngx_cycle_t *) ngx_cycle; cf.module_type = NGX_HTTP_MODULE; cf.cmd_type = NGX_HTTP_UPS_CONF; cf.log = ngx_cycle->log ; cf.ctx = duscf->ctx; cf.args = ngx_array_create(duscf->pool, 10 , sizeof (ngx_str_t )); if (cf.args == NULL ) { return NGX_ERROR; } if (ngx_dyups_parse_upstream(&cf, buf) != NGX_CONF_OK) { return NGX_ERROR; } ngx_memzero(&cf, sizeof (ngx_conf_t )); cf.name = "dyups_init_upstream" ; cf.cycle = (ngx_cycle_t *) ngx_cycle; cf.pool = duscf->pool; cf.module_type = NGX_HTTP_MODULE; cf.cmd_type = NGX_HTTP_MAIN_CONF; cf.log = ngx_cycle->log ; cf.ctx = duscf->ctx; init = uscf->peer.init_upstream ? uscf->peer.init_upstream: ngx_http_upstream_init_round_robin; if (init(&cf, uscf) != NGX_OK) { return NGX_ERROR; } #if (T_NGX_HTTP_UPSTREAM_RANDOM) { ngx_http_upstream_rr_peers_t *peers, *backup; peers = uscf->peer.data; peers->init_number = ngx_random() % peers->number; backup = peers->next; if (backup) { backup->init_number = ngx_random() % backup->number; } } #endif dscf = uscf->srv_conf[ngx_http_dyups_module.ctx_index]; dscf->init = uscf->peer.init; uscf->peer.init = ngx_http_dyups_init_peer; return NGX_OK; }
引用计数增加操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 static ngx_int_t ngx_http_dyups_init_peer (ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us) { ngx_int_t rc; ngx_pool_cleanup_t *cln; ngx_http_dyups_ctx_t *ctx; ngx_http_dyups_upstream_srv_conf_t *dscf; dscf = us->srv_conf[ngx_http_dyups_module.ctx_index]; rc = dscf->init(r, us); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log , 0 , "[dyups] dynamic upstream init peer: %i" , rc); if (rc != NGX_OK) { return rc; } ctx = ngx_pcalloc(r->pool, sizeof (ngx_http_dyups_ctx_t )); if (ctx == NULL ) { return NGX_ERROR; } ctx->scf = dscf; ctx->data = r->upstream->peer.data; ctx->get = r->upstream->peer.get; ctx->free = r->upstream->peer.free ; r->upstream->peer.data = ctx; r->upstream->peer.get = ngx_http_dyups_get_peer; r->upstream->peer.free = ngx_http_dyups_free_peer; #if (NGX_HTTP_SSL) r->upstream->peer.set_session = ngx_http_dyups_set_peer_session; r->upstream->peer.save_session = ngx_http_dyups_save_peer_session; #endif cln = ngx_pool_cleanup_add(r->pool, 0 ); if (cln == NULL ) { return NGX_ERROR; } dscf->ref++; cln->handler = ngx_http_dyups_clean_request; cln->data = &dscf->ref; return NGX_OK; }