一 概述 如何使用 OpenResty 实现负载均衡模块?使用 balancer_by_lua_file|block
指令能够使用 Lua 实现负载均衡模块,在 Lua 代码中必须使用 ngx.balancer.set_current_peer
函数设置当前使用 server
地址端口。
OpenResty 使用 balancer_by_lua_file|block
指令修改 ngx_http_upstream_module
模块的 ngx_http_upstream_srv_conf_t::peer.init_upstream
回调函数指针,在 upstream 初始化时会再次将当前 upstream 配置的 ngx_http_upstream_srv_conf_t::peer.init
回调函数指针修改,实现 server
获取释放(get/free)注入功能。
二 指令 1. balancer_by_lua_block 1 2 3 syntax: balancer_by_lua_block { lua-script } context: upstream phase: content
注入 Lua 代码块,实现负载均衡功能。如果 lua-script
处理失败会回退到 round robin
模块进行负载均衡处理 。
2. balancer_by_lua_file 1 2 3 syntax: balancer_by_lua_file <path-to-lua-file> context: upstream phase: content
注入 Lua 代码文件,实现负载均衡功能。
三 函数 1. set_current_peer 1 2 syntax: ok, err = balancer.set_current_peer(host, port) context: balancer_by_lua*
设置当前使用的 server
地址和端口。支持 unix
域地址,IPv4 和 IPv6,但是不支持域名 。
2. set_more_tries 1 2 syntax: ok, err = balancer.set_more_tries(count) context: balancer_by_lua*
设置当前请求重试次数(不包含当前尝试)。count
不能大于 proxy_next_upstream_tries
设置值。
3. get_last_failure 1 2 syntax: state_name, status_code = balancer.get_last_failure() context: balancer_by_lua*
在触发重试机制时,查询先前失败的详细信息。如果有失败,则返回状态名称已经状态码。如果当前尝试是首次尝试,返回值为 nil
。
4. set_timeouts 1 2 syntax: ok, err = balancer.set_timeouts(connect_timeout, send_timeout, read_timeout) context: balancer_by_lua*
设置当前和后续 upstream 连接的超时时间(连接超时,发送超时,读超时),以秒为单位精确到毫秒。超时时间必须大于零,不能等于零或小于零 。
四 实现 1. balancer_by_lua_block|file 模块指令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 static ngx_command_t ngx_http_lua_cmds[] = { ... { ngx_string("balancer_by_lua_file" ), NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1, ngx_http_lua_balancer_by_lua, NGX_HTTP_SRV_CONF_OFFSET, 0 , (void *) ngx_http_lua_balancer_handler_file }, ... }; char *ngx_http_lua_balancer_by_lua (ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ... value = cf->args->elts; lscf->balancer.handler = (ngx_http_lua_srv_conf_handler_pt) cmd->post; if (cmd->post == ngx_http_lua_balancer_handler_file) { ... } else { ... } uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); if (uscf->peer.init_upstream) { ngx_conf_log_error(NGX_LOG_WARN, cf, 0 , "load balancing method redefined" ); } uscf->peer.init_upstream = ngx_http_lua_balancer_init; uscf->flags = NGX_HTTP_UPSTREAM_CREATE |NGX_HTTP_UPSTREAM_WEIGHT |NGX_HTTP_UPSTREAM_MAX_FAILS |NGX_HTTP_UPSTREAM_FAIL_TIMEOUT |NGX_HTTP_UPSTREAM_DOWN; return NGX_CONF_OK; } static ngx_int_t ngx_http_lua_balancer_init (ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us) { if (ngx_http_upstream_init_round_robin(cf, us) != NGX_OK) { return NGX_ERROR; } us->peer.init = ngx_http_lua_balancer_init_peer; return NGX_OK; } static ngx_int_t ngx_http_lua_balancer_init_peer (ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us) { ngx_http_lua_srv_conf_t *bcf; ngx_http_lua_balancer_peer_data_t *bp; bp = ngx_pcalloc(r->pool, sizeof (ngx_http_lua_balancer_peer_data_t )); if (bp == NULL ) { return NGX_ERROR; } r->upstream->peer.data = &bp->rrp; if (ngx_http_upstream_init_round_robin_peer(r, us) != NGX_OK) { return NGX_ERROR; } r->upstream->peer.get = ngx_http_lua_balancer_get_peer; r->upstream->peer.free = ngx_http_lua_balancer_free_peer; ... bcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_lua_module); bp->conf = bcf; bp->request = r; return NGX_OK; }
2. ngx.balancer.set_current_peer 1 2 3 syntax: ok, err = balancer.set_current_peer(host, port) context: balancer_by_lua*
用于设置后端地址、端口, 不能使用域名 .
在 lua-nginx-module 模块中 set_current_peer
通过修改 UPSTREAM 配置中 balancer_peer_data
字段达到目的. 看代码, set_current_peer
并没有直接修改 r->upstream->peer.data
字段, 是因为需要与其他 UPSTREAM 模块配合, 避免被修改.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 int ngx_http_lua_ffi_balancer_set_current_peer (ngx_http_request_t *r, const u_char *addr, size_t addr_len, int port, char **err) { ... ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); ... lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); bp = lmcf->balancer_peer_data; if (bp == NULL ) { *err = "no upstream peer data found" ; return NGX_ERROR; } ngx_memzero(&url, sizeof (ngx_url_t )); url.url.data = ngx_palloc(r->pool, addr_len); if (url.url.data == NULL ) { *err = "no memory" ; return NGX_ERROR; } ngx_memcpy(url.url.data, addr, addr_len); url.url.len = addr_len; url.default_port = (in_port_t ) port; url.uri_part = 0 ; url.no_resolve = 1 ; if (ngx_parse_url(r->pool, &url) != NGX_OK) { if (url.err) { *err = url.err; } return NGX_ERROR; } if (url.addrs && url.addrs[0 ].sockaddr) { bp->sockaddr = url.addrs[0 ].sockaddr; bp->socklen = url.addrs[0 ].socklen; bp->host = &url.addrs[0 ].name; } else { *err = "no host allowed" ; return NGX_ERROR; } return NGX_OK; }
后端信息通过 ngx_http_lua_module 模块配置中 balancer_peer_data
字段暂存 . 看代码中注释, balancer_peer_data 在 balancer 阶段只会被一个请求修改, 不会有竞争状态.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 typedef struct ngx_http_lua_main_conf_s ngx_http_lua_main_conf_t ;struct ngx_http_lua_main_conf_s { ... ngx_http_lua_balancer_peer_data_t *balancer_peer_data; ... };
3. ngx.balancer.set_current_peer NGINX 原生 在 NGINX UPSTREAM 处理框架中, ngx_http_upstream_connect
函数会调用 ngx_event_connect_peer
进行连接处理, 会调用 lua-nginx 模块设置的回调函数, 读取 bp
中的内容, 进行后端地址设置; 使用非阻塞连接方式建立连接, 使用 epoll 模块时会对连接同时添加读/写事件监听.
非阻塞连接, 连接建立成功时会是可写状态, 失败时是可读、可写状态. 需要通过调用 getsockopt 函数, 判断 socket 是否有错误.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 static void ngx_http_upstream_connect (ngx_http_request_t *r, ngx_http_upstream_t *u) { r->connection->log ->action = "connecting to upstream" ; ... u->state->response_time = ngx_current_msec; u->state->connect_time = (ngx_msec_t ) -1 ; u->state->header_time = (ngx_msec_t ) -1 ; rc = ngx_event_connect_peer(&u->peer); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log , 0 , "http upstream connect: %i" , rc); ... c = u->peer.connection; c->data = r; c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler; u->write_event_handler = ngx_http_upstream_send_request_handler; u->read_event_handler = ngx_http_upstream_process_header; c->sendfile &= r->connection->sendfile; u->output.sendfile = c->sendfile; ... if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->connect_timeout); return ; } ... ngx_http_upstream_send_request(r, u, 1 ); }
连接操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 ngx_int_t ngx_event_connect_peer (ngx_peer_connection_t *pc) { int rc, type; #if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX) in_port_t port; #endif ngx_int_t event; ngx_err_t err; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_connection_t *c; rc = pc->get(pc, pc->data); if (rc != NGX_OK) { return rc; } ... c = ngx_get_connection(s, pc->log ); ... if (ngx_nonblocking(s) == -1 ) { ngx_log_error(NGX_LOG_ALERT, pc->log , ngx_socket_errno, ngx_nonblocking_n " failed" ); goto failed; } ... if (ngx_add_conn) { if (ngx_add_conn(c) == NGX_ERROR) { goto failed; } } rc = connect(s, pc->sockaddr, pc->socklen); ... if (ngx_add_conn) { if (rc == -1 ) { return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log , 0 , "connected" ); wev->ready = 1 ; return NGX_OK; } ... } static ngx_int_t ngx_epoll_add_connection (ngx_connection_t *c) { struct epoll_event ee ; ee.events = EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP; ee.data.ptr = (void *) ((uintptr_t ) c | c->read->instance); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log , 0 , "epoll add connection: fd:%d ev:%08XD" , c->fd, ee.events); if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1 ) { ngx_log_error(NGX_LOG_ALERT, c->log , ngx_errno, "epoll_ctl(EPOLL_CTL_ADD, %d) failed" , c->fd); return NGX_ERROR; } c->read->active = 1 ; c->write->active = 1 ; return NGX_OK; }
4. ngx.balancer.set_timeouts 1 2 3 syntax: ok, err = balancer.set_timeouts(connect_timeout, send_timeout, read_timeout) context: balancer_by_lua*
超时时间设置与后端地址设置有所不同, 每个请求会创建独立的 UPSTREAM CONF 结构, 确保请求间超时时间独立.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 int ngx_http_lua_ffi_balancer_set_timeouts (ngx_http_request_t *r, long connect_timeout, long send_timeout, long read_timeout, char **err) { ngx_http_lua_ctx_t *ctx; ngx_http_upstream_t *u; #if !(HAVE_NGX_UPSTREAM_TIMEOUT_FIELDS) ngx_http_upstream_conf_t *ucf; #endif ngx_http_lua_main_conf_t *lmcf; ngx_http_lua_balancer_peer_data_t *bp; ... u = r->upstream; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); ... lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); bp = lmcf->balancer_peer_data; ... #if !(HAVE_NGX_UPSTREAM_TIMEOUT_FIELDS) if (!bp->cloned_upstream_conf) { ucf = ngx_palloc(r->pool, sizeof (ngx_http_upstream_conf_t )); if (ucf == NULL ) { *err = "no memory" ; return NGX_ERROR; } ngx_memcpy(ucf, u->conf, sizeof (ngx_http_upstream_conf_t )); u->conf = ucf; bp->cloned_upstream_conf = 1 ; } else { ucf = u->conf; } #endif if (connect_timeout > 0 ) { #if (HAVE_NGX_UPSTREAM_TIMEOUT_FIELDS) u->connect_timeout = (ngx_msec_t ) connect_timeout; #else ucf->connect_timeout = (ngx_msec_t ) connect_timeout; #endif } ... return NGX_OK; }
5. ngx.balancer.set_more_tries 1 2 3 syntax: ok, err = balancer.set_more_tries(count) context: balancer_by_lua*
设置允许重试次数, 最大重试次数受限于 proxy_next_upstream_tries
配置项.
NGINX 中重试是通过 ngx_http_upstream_next
函数实现, 是否允许重试需要同时考虑允许重试状态码、超时时间以及重试次数, 当超过重试次数时直接给客户端响应. 允许重试是通过递减允许重试次数, 当允许重试次数为零时, 不允许进行重试. ngx_peer_connection_t::tries 为允许重试次数.
UPSTREAM 处理框架中有 peer.get
, peer.free
分别用于后端地址获取、释放. 在获取时设置允许重试次数(根据用户 lua 代码更新), 释放时重试次数减一.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static ngx_int_t ngx_http_lua_balancer_init_peer (ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us) { ... r->upstream->peer.get = ngx_http_lua_balancer_get_peer; r->upstream->peer.free = ngx_http_lua_balancer_free_peer; ... return NGX_OK; }
获取:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 static ngx_int_t ngx_http_lua_balancer_get_peer (ngx_peer_connection_t *pc, void *data) { ... rc = lscf->balancer.handler(r, lscf, L); ... if (bp->sockaddr && bp->socklen) { pc->sockaddr = bp->sockaddr; pc->socklen = bp->socklen; pc->cached = 0 ; pc->connection = NULL ; pc->name = bp->host; bp->rrp.peers->single = 0 ; if (bp->more_tries) { r->upstream->peer.tries += bp->more_tries; } dd("tries: %d" , (int ) r->upstream->peer.tries); return NGX_OK; } return ngx_http_upstream_get_round_robin_peer(pc, &bp->rrp); }
释放:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void ngx_http_lua_balancer_free_peer (ngx_peer_connection_t *pc, void *data, ngx_uint_t state) { ngx_http_lua_balancer_peer_data_t *bp = data; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log , 0 , "lua balancer free peer, tries: %ui" , pc->tries); if (bp->sockaddr && bp->socklen) { bp->last_peer_state = (int ) state; if (pc->tries) { pc->tries--; } return ; } ngx_http_upstream_free_round_robin_peer(pc, data, state); }
在与后端 connect、send_request、process_header 出错时会调用失败重试判断函数 ngx_http_upstream_next
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 static void ngx_http_upstream_next (ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t ft_type) { ... if (u->peer.sockaddr) { if (ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_403 || ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_404) { state = NGX_PEER_NEXT; } else { state = NGX_PEER_FAILED; } u->peer.free (&u->peer, u->peer.data, state); u->peer.sockaddr = NULL ; } ... if (u->peer.tries == 0 || ((u->conf->next_upstream & ft_type) != ft_type) || (u->request_sent && r->request_body_no_buffering) || (timeout && ngx_current_msec - u->peer.start_time >= timeout)) { ... ngx_http_upstream_finalize_request(r, u, status); return ; } ... ngx_http_upstream_connect(r, u); }