一 概述

如何使用 OpenResty 实现负载均衡模块?使用 balancer_by_lua_file|block 指令能够使用 Lua 实现负载均衡模块,在 Lua 代码中必须使用 ngx.balancer.set_current_peer 函数设置当前使用 server 地址端口。

OpenResty 使用 balancer_by_lua_file|block 指令修改 ngx_http_upstream_module 模块的 ngx_http_upstream_srv_conf_t::peer.init_upstream 回调函数指针,在 upstream 初始化时会再次将当前 upstream 配置的 ngx_http_upstream_srv_conf_t::peer.init 回调函数指针修改,实现 server 获取释放(get/free)注入功能。

二 指令

1. balancer_by_lua_block

1
2
3
syntax: balancer_by_lua_block { lua-script }
context: upstream
phase: content

注入 Lua 代码块,实现负载均衡功能。
如果 lua-script 处理失败会回退到 round robin 模块进行负载均衡处理

2. balancer_by_lua_file

1
2
3
syntax: balancer_by_lua_file <path-to-lua-file>
context: upstream
phase: content

注入 Lua 代码文件,实现负载均衡功能。

三 函数

1. set_current_peer

1
2
syntax: ok, err = balancer.set_current_peer(host, port)
context: balancer_by_lua*

设置当前使用的 server 地址和端口。
支持 unix 域地址,IPv4 和 IPv6,但是不支持域名

2. set_more_tries

1
2
syntax: ok, err = balancer.set_more_tries(count)
context: balancer_by_lua*

设置当前请求重试次数(不包含当前尝试)。count 不能大于 proxy_next_upstream_tries 设置值。

3. get_last_failure

1
2
syntax: state_name, status_code = balancer.get_last_failure()
context: balancer_by_lua*

在触发重试机制时,查询先前失败的详细信息。如果有失败,则返回状态名称已经状态码。如果当前尝试是首次尝试,返回值为 nil

4. set_timeouts

1
2
syntax: ok, err = balancer.set_timeouts(connect_timeout, send_timeout, read_timeout)
context: balancer_by_lua*

设置当前和后续 upstream 连接的超时时间(连接超时,发送超时,读超时),以秒为单位精确到毫秒。
超时时间必须大于零,不能等于零或小于零

四 实现

1. balancer_by_lua_block|file

模块指令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
static ngx_command_t ngx_http_lua_cmds[] = {
...
{ ngx_string("balancer_by_lua_file"),
NGX_HTTP_UPS_CONF|NGX_CONF_TAKE1,
ngx_http_lua_balancer_by_lua,
NGX_HTTP_SRV_CONF_OFFSET,
0,
(void *) ngx_http_lua_balancer_handler_file },
...
};


// balancer_by_lua 配置解析函数
char *
ngx_http_lua_balancer_by_lua(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf)
{
...

value = cf->args->elts;
// 设置处理函数指针
// cmd->post 为 ngx_http_lua_balancer_handler_inline
lscf->balancer.handler = (ngx_http_lua_srv_conf_handler_pt) cmd->post;

if (cmd->post == ngx_http_lua_balancer_handler_file) {
/* Lua code in an external file */
// 外部 lua 文件
...

} else {
/* inlined Lua code */
// 配置文件中 lua 代码
...
}
// 修改 upstream 模块的 peer.init_upstream 指针,lua_module 介入处理
// 跟踪代码可以发现,lua-nginx-module 只在请求的 peer.get/peer.free 介入处理
uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);

if (uscf->peer.init_upstream) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
"load balancing method redefined");
}

uscf->peer.init_upstream = ngx_http_lua_balancer_init;

uscf->flags = NGX_HTTP_UPSTREAM_CREATE
|NGX_HTTP_UPSTREAM_WEIGHT
|NGX_HTTP_UPSTREAM_MAX_FAILS
|NGX_HTTP_UPSTREAM_FAIL_TIMEOUT
|NGX_HTTP_UPSTREAM_DOWN;

return NGX_CONF_OK;
}


static ngx_int_t
ngx_http_lua_balancer_init(ngx_conf_t *cf,
ngx_http_upstream_srv_conf_t *us)
{
if (ngx_http_upstream_init_round_robin(cf, us) != NGX_OK) {
return NGX_ERROR;
}

/* this callback is called upon individual requests */
us->peer.init = ngx_http_lua_balancer_init_peer;

return NGX_OK;
}


static ngx_int_t
ngx_http_lua_balancer_init_peer(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us)
{
ngx_http_lua_srv_conf_t *bcf;
ngx_http_lua_balancer_peer_data_t *bp;

bp = ngx_pcalloc(r->pool, sizeof(ngx_http_lua_balancer_peer_data_t));
if (bp == NULL) {
return NGX_ERROR;
}

r->upstream->peer.data = &bp->rrp;

if (ngx_http_upstream_init_round_robin_peer(r, us) != NGX_OK) {
return NGX_ERROR;
}

r->upstream->peer.get = ngx_http_lua_balancer_get_peer;
r->upstream->peer.free = ngx_http_lua_balancer_free_peer;

...

bcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_lua_module);

bp->conf = bcf;
bp->request = r;

return NGX_OK;
}

2. ngx.balancer.set_current_peer

1
2
3
syntax: ok, err = balancer.set_current_peer(host, port)

context: balancer_by_lua*

用于设置后端地址、端口, 不能使用域名.

在 lua-nginx-module 模块中 set_current_peer 通过修改 UPSTREAM 配置中 balancer_peer_data 字段达到目的. 看代码, set_current_peer 并没有直接修改 r->upstream->peer.data 字段, 是因为需要与其他 UPSTREAM 模块配合, 避免被修改.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
int
ngx_http_lua_ffi_balancer_set_current_peer(ngx_http_request_t *r,
const u_char *addr, size_t addr_len, int port, char **err)
{
...

ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
...
lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module);

// 不直接修改 peer.data 的原因:
/* we cannot read r->upstream->peer.data here directly because
* it could be overridden by other modules like
* ngx_http_upstream_keepalive_module.
*/
bp = lmcf->balancer_peer_data;
if (bp == NULL) {
*err = "no upstream peer data found";
return NGX_ERROR;
}

ngx_memzero(&url, sizeof(ngx_url_t));

url.url.data = ngx_palloc(r->pool, addr_len);
if (url.url.data == NULL) {
*err = "no memory";
return NGX_ERROR;
}

ngx_memcpy(url.url.data, addr, addr_len);

url.url.len = addr_len;
url.default_port = (in_port_t) port;
url.uri_part = 0;
url.no_resolve = 1;

if (ngx_parse_url(r->pool, &url) != NGX_OK) {
if (url.err) {
*err = url.err;
}

return NGX_ERROR;
}

// 设置转发后端地址
if (url.addrs && url.addrs[0].sockaddr) {
bp->sockaddr = url.addrs[0].sockaddr;
bp->socklen = url.addrs[0].socklen;
bp->host = &url.addrs[0].name;

} else {
*err = "no host allowed";
return NGX_ERROR;
}

return NGX_OK;
}

后端信息通过 ngx_http_lua_module 模块配置中 balancer_peer_data 字段暂存. 看代码中注释, balancer_peer_data 在 balancer 阶段只会被一个请求修改, 不会有竞争状态.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct ngx_http_lua_main_conf_s  ngx_http_lua_main_conf_t;

struct ngx_http_lua_main_conf_s {
...

ngx_http_lua_balancer_peer_data_t *balancer_peer_data;
/* neither yielding nor recursion is possible in
* balancer_by_lua*, so there cannot be any races among
* concurrent requests and it is safe to store the peer
* data pointer in the main conf.
*/

...
};

3. ngx.balancer.set_current_peer NGINX 原生

在 NGINX UPSTREAM 处理框架中, ngx_http_upstream_connect 函数会调用 ngx_event_connect_peer 进行连接处理, 会调用 lua-nginx 模块设置的回调函数, 读取 bp 中的内容, 进行后端地址设置; 使用非阻塞连接方式建立连接, 使用 epoll 模块时会对连接同时添加读/写事件监听.

非阻塞连接, 连接建立成功时会是可写状态, 失败时是可读、可写状态. 需要通过调用 getsockopt 函数, 判断 socket 是否有错误.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
r->connection->log->action = "connecting to upstream";

...

// 状态设置
u->state->response_time = ngx_current_msec;
u->state->connect_time = (ngx_msec_t) -1;
u->state->header_time = (ngx_msec_t) -1;

// 作为客户端发起与上游连接
rc = ngx_event_connect_peer(&u->peer);

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http upstream connect: %i", rc);

// 错误处理
...

/* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */

c = u->peer.connection;

c->data = r;

// 事件回调函数设置
c->write->handler = ngx_http_upstream_handler;
c->read->handler = ngx_http_upstream_handler;

u->write_event_handler = ngx_http_upstream_send_request_handler;
u->read_event_handler = ngx_http_upstream_process_header;

c->sendfile &= r->connection->sendfile;
u->output.sendfile = c->sendfile;

...

// 设置连接超时时间
// u->conf 实际指向的是 lua-nginx 模块创建的临时内存
if (rc == NGX_AGAIN) {
ngx_add_timer(c->write, u->conf->connect_timeout);
return;
}

...

ngx_http_upstream_send_request(r, u, 1);
}

连接操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
int rc, type;
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
in_port_t port;
#endif
ngx_int_t event;
ngx_err_t err;
ngx_uint_t level;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_connection_t *c;

// 调用 lua-nginx 模块回调函数
// pc->get 指向 ngx_http_lua_balancer_get_peer 函数
rc = pc->get(pc, pc->data);
if (rc != NGX_OK) {
return rc;
}

...

c = ngx_get_connection(s, pc->log);

...

// 将 socket 设置为非阻塞模式
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
ngx_nonblocking_n " failed");

goto failed;
}
...

// epoll 模块有 ngx_add_conn 函数, 为 ngx_epoll_add_connection
// 会添加连接的读、写事件监听
if (ngx_add_conn) {
if (ngx_add_conn(c) == NGX_ERROR) {
goto failed;
}
}

// 调用 connect
rc = connect(s, pc->sockaddr, pc->socklen);

... // 错误处理

// epoll 模块有 ngx_add_conn 函数
if (ngx_add_conn) {
if (rc == -1) {

/* NGX_EINPROGRESS */

return NGX_AGAIN;
}

ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");

wev->ready = 1;

return NGX_OK;
}

// 非 Linux 平台处理
...
}

// epoll 事件模块, 添加连接
static ngx_int_t
ngx_epoll_add_connection(ngx_connection_t *c)
{
struct epoll_event ee;

// 监听读、写事件
ee.events = EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP;
ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);

ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"epoll add connection: fd:%d ev:%08XD", c->fd, ee.events);

if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
return NGX_ERROR;
}

c->read->active = 1;
c->write->active = 1;

return NGX_OK;
}

4. ngx.balancer.set_timeouts

1
2
3
syntax: ok, err = balancer.set_timeouts(connect_timeout, send_timeout, read_timeout)

context: balancer_by_lua*

超时时间设置与后端地址设置有所不同, 每个请求会创建独立的 UPSTREAM CONF 结构, 确保请求间超时时间独立.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
int
ngx_http_lua_ffi_balancer_set_timeouts(ngx_http_request_t *r,
long connect_timeout, long send_timeout, long read_timeout,
char **err)
{
ngx_http_lua_ctx_t *ctx;
ngx_http_upstream_t *u;

#if !(HAVE_NGX_UPSTREAM_TIMEOUT_FIELDS)
ngx_http_upstream_conf_t *ucf;
#endif
ngx_http_lua_main_conf_t *lmcf;
ngx_http_lua_balancer_peer_data_t *bp;
...
u = r->upstream;

ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
...
lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module);

bp = lmcf->balancer_peer_data;
...

// 创建独立的 upstream conf 内存, 每个请求都可以有独立的超时时间
#if !(HAVE_NGX_UPSTREAM_TIMEOUT_FIELDS)
if (!bp->cloned_upstream_conf) {
/* we clone the upstream conf for the current request so that
* we do not affect other requests at all. */

ucf = ngx_palloc(r->pool, sizeof(ngx_http_upstream_conf_t));

if (ucf == NULL) {
*err = "no memory";
return NGX_ERROR;
}

ngx_memcpy(ucf, u->conf, sizeof(ngx_http_upstream_conf_t));

u->conf = ucf;
bp->cloned_upstream_conf = 1;

} else {
ucf = u->conf;
}
#endif

if (connect_timeout > 0) {
#if (HAVE_NGX_UPSTREAM_TIMEOUT_FIELDS)
u->connect_timeout = (ngx_msec_t) connect_timeout;
#else
ucf->connect_timeout = (ngx_msec_t) connect_timeout;
#endif
}

...

return NGX_OK;
}

5. ngx.balancer.set_more_tries

1
2
3
syntax: ok, err = balancer.set_more_tries(count)

context: balancer_by_lua*

设置允许重试次数, 最大重试次数受限于 proxy_next_upstream_tries 配置项.

NGINX 中重试是通过 ngx_http_upstream_next 函数实现, 是否允许重试需要同时考虑允许重试状态码、超时时间以及重试次数, 当超过重试次数时直接给客户端响应. 允许重试是通过递减允许重试次数, 当允许重试次数为零时, 不允许进行重试. ngx_peer_connection_t::tries 为允许重试次数.

UPSTREAM 处理框架中有 peer.get, peer.free 分别用于后端地址获取、释放. 在获取时设置允许重试次数(根据用户 lua 代码更新), 释放时重试次数减一.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static ngx_int_t
ngx_http_lua_balancer_init_peer(ngx_http_request_t *r,
ngx_http_upstream_srv_conf_t *us)
{
...

// 获取后端地址
r->upstream->peer.get = ngx_http_lua_balancer_get_peer;
// 释放后端地址
r->upstream->peer.free = ngx_http_lua_balancer_free_peer;

...
return NGX_OK;
}

获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static ngx_int_t
ngx_http_lua_balancer_get_peer(ngx_peer_connection_t *pc, void *data)
{
...
// 运行 lua 代码
rc = lscf->balancer.handler(r, lscf, L);
...

if (bp->sockaddr && bp->socklen) {
pc->sockaddr = bp->sockaddr;
pc->socklen = bp->socklen;
pc->cached = 0;
pc->connection = NULL;
pc->name = bp->host;

bp->rrp.peers->single = 0;

// 如果有设置重试次数, 更新允许重试次数
if (bp->more_tries) {
r->upstream->peer.tries += bp->more_tries;
}

dd("tries: %d", (int) r->upstream->peer.tries);

return NGX_OK;
}

return ngx_http_upstream_get_round_robin_peer(pc, &bp->rrp);
}

释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void
ngx_http_lua_balancer_free_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
ngx_http_lua_balancer_peer_data_t *bp = data;

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"lua balancer free peer, tries: %ui", pc->tries);

if (bp->sockaddr && bp->socklen) {
bp->last_peer_state = (int) state;

if (pc->tries) {
// 重试次数减一
pc->tries--;
}

return;
}

/* fallback */

ngx_http_upstream_free_round_robin_peer(pc, data, state);
}

在与后端 connect、send_request、process_header 出错时会调用失败重试判断函数 ngx_http_upstream_next:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static void
ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u,
ngx_uint_t ft_type)
{
...

if (u->peer.sockaddr) {

if (ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_403
|| ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_404)
{
state = NGX_PEER_NEXT;

} else {
state = NGX_PEER_FAILED;
}

// 调用 peer.free 触发重试次数减一
u->peer.free(&u->peer, u->peer.data, state);
u->peer.sockaddr = NULL;
}

...

// 允许重试次数为零时, 请求处理结束
if (u->peer.tries == 0
|| ((u->conf->next_upstream & ft_type) != ft_type)
|| (u->request_sent && r->request_body_no_buffering)
|| (timeout && ngx_current_msec - u->peer.start_time >= timeout))
{
...
ngx_http_upstream_finalize_request(r, u, status);
return;
}

...

// 重新回到连接阶段
ngx_http_upstream_connect(r, u);
}