blob: d6c29796ea918e9d2e9e9c105072adaf176c29c1 [file] [log] [blame] [raw]
/*
* Copyright (C) Yichun Zhang (agentzh)
*/
#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"
#include "ngx_http_lua_socket_udp.h"
#include "ngx_http_lua_socket_tcp.h"
#include "ngx_http_lua_util.h"
#include "ngx_http_lua_contentby.h"
#include "ngx_http_lua_output.h"
#include "ngx_http_lua_probe.h"
#if 1
#undef ngx_http_lua_probe_info
#define ngx_http_lua_probe_info(msg)
#endif
#define UDP_MAX_DATAGRAM_SIZE 8192
static int ngx_http_lua_socket_udp(lua_State *L);
static int ngx_http_lua_socket_udp_setpeername(lua_State *L);
static int ngx_http_lua_socket_udp_send(lua_State *L);
static int ngx_http_lua_socket_udp_receive(lua_State *L);
static int ngx_http_lua_socket_udp_settimeout(lua_State *L);
static void ngx_http_lua_socket_udp_finalize(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u);
static int ngx_http_lua_socket_udp_upstream_destroy(lua_State *L);
static int ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u, lua_State *L);
static void ngx_http_lua_socket_resolve_handler(ngx_resolver_ctx_t *ctx);
static int ngx_http_lua_socket_error_retval_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u, lua_State *L);
static void ngx_http_lua_socket_udp_handle_error(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u, ngx_uint_t ft_type);
static void ngx_http_lua_socket_udp_cleanup(void *data);
static void ngx_http_lua_socket_udp_handler(ngx_event_t *ev);
static void ngx_http_lua_socket_dummy_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u);
static int ngx_http_lua_socket_udp_receive_retval_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u, lua_State *L);
static ngx_int_t ngx_http_lua_socket_udp_read(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u);
static void ngx_http_lua_socket_udp_read_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u);
static void ngx_http_lua_socket_udp_handle_success(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u);
static ngx_int_t ngx_http_lua_udp_connect(ngx_udp_connection_t *uc);
static int ngx_http_lua_socket_udp_close(lua_State *L);
static ngx_int_t ngx_http_lua_socket_udp_resume(ngx_http_request_t *r);
static void ngx_http_lua_udp_resolve_cleanup(void *data);
static void ngx_http_lua_udp_socket_cleanup(void *data);
enum {
SOCKET_CTX_INDEX = 1,
SOCKET_TIMEOUT_INDEX = 2
};
static char ngx_http_lua_socket_udp_metatable_key;
static u_char ngx_http_lua_socket_udp_buffer[UDP_MAX_DATAGRAM_SIZE];
void
ngx_http_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L)
{
lua_getfield(L, -1, "socket"); /* ngx socket */
lua_pushcfunction(L, ngx_http_lua_socket_udp);
lua_setfield(L, -2, "udp"); /* ngx socket */
/* udp socket object metatable */
lua_pushlightuserdata(L, &ngx_http_lua_socket_udp_metatable_key);
lua_createtable(L, 0 /* narr */, 4 /* nrec */);
lua_pushcfunction(L, ngx_http_lua_socket_udp_setpeername);
lua_setfield(L, -2, "setpeername"); /* ngx socket mt */
lua_pushcfunction(L, ngx_http_lua_socket_udp_send);
lua_setfield(L, -2, "send");
lua_pushcfunction(L, ngx_http_lua_socket_udp_receive);
lua_setfield(L, -2, "receive");
lua_pushcfunction(L, ngx_http_lua_socket_udp_settimeout);
lua_setfield(L, -2, "settimeout"); /* ngx socket mt */
lua_pushcfunction(L, ngx_http_lua_socket_udp_close);
lua_setfield(L, -2, "close"); /* ngx socket mt */
lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
lua_rawset(L, LUA_REGISTRYINDEX);
/* }}} */
lua_pop(L, 1);
}
static int
ngx_http_lua_socket_udp(lua_State *L)
{
ngx_http_request_t *r;
ngx_http_lua_ctx_t *ctx;
if (lua_gettop(L) != 0) {
return luaL_error(L, "expecting zero arguments, but got %d",
lua_gettop(L));
}
r = ngx_http_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "no request found");
}
ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
if (ctx == NULL) {
return luaL_error(L, "no ctx found");
}
ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE
| NGX_HTTP_LUA_CONTEXT_ACCESS
| NGX_HTTP_LUA_CONTEXT_CONTENT
| NGX_HTTP_LUA_CONTEXT_TIMER);
lua_createtable(L, 3 /* narr */, 1 /* nrec */);
lua_pushlightuserdata(L, &ngx_http_lua_socket_udp_metatable_key);
lua_rawget(L, LUA_REGISTRYINDEX);
lua_setmetatable(L, -2);
dd("top: %d", lua_gettop(L));
return 1;
}
static int
ngx_http_lua_socket_udp_setpeername(lua_State *L)
{
ngx_http_request_t *r;
ngx_http_lua_ctx_t *ctx;
ngx_str_t host;
int port;
ngx_resolver_ctx_t *rctx, temp;
ngx_http_core_loc_conf_t *clcf;
int saved_top;
int n;
u_char *p;
size_t len;
ngx_url_t url;
ngx_int_t rc;
ngx_http_lua_loc_conf_t *llcf;
ngx_udp_connection_t *uc;
int timeout;
ngx_http_lua_co_ctx_t *coctx;
ngx_http_lua_socket_udp_upstream_t *u;
/*
* TODO: we should probably accept an extra argument to setpeername()
* to allow the user bind the datagram unix domain socket himself,
* which is necessary for systems without autobind support.
*/
n = lua_gettop(L);
if (n != 2 && n != 3) {
return luaL_error(L, "ngx.socket.udp setpeername: expecting 2 or 3 "
"arguments (including the object), but seen %d", n);
}
r = ngx_http_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "no request found");
}
ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
if (ctx == NULL) {
return luaL_error(L, "no ctx found");
}
ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE
| NGX_HTTP_LUA_CONTEXT_ACCESS
| NGX_HTTP_LUA_CONTEXT_CONTENT
| NGX_HTTP_LUA_CONTEXT_TIMER);
luaL_checktype(L, 1, LUA_TTABLE);
p = (u_char *) luaL_checklstring(L, 2, &len);
host.data = ngx_palloc(r->pool, len + 1);
if (host.data == NULL) {
return luaL_error(L, "out of memory");
}
host.len = len;
ngx_memcpy(host.data, p, len);
host.data[len] = '\0';
if (n == 3) {
port = luaL_checkinteger(L, 3);
if (port < 0 || port > 65536) {
lua_pushnil(L);
lua_pushfstring(L, "bad port number: %d", port);
return 2;
}
} else { /* n == 2 */
port = 0;
}
lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);
lua_pop(L, 1);
if (u) {
if (u->waiting) {
lua_pushnil(L);
lua_pushliteral(L, "socket busy");
return 2;
}
if (u->udp_connection.connection) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket reconnect without shutting down");
ngx_http_lua_socket_udp_finalize(r, u);
}
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua reuse socket upstream ctx");
} else {
u = lua_newuserdata(L, sizeof(ngx_http_lua_socket_udp_upstream_t));
if (u == NULL) {
return luaL_error(L, "out of memory");
}
#if 1
lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
lua_pushcfunction(L, ngx_http_lua_socket_udp_upstream_destroy);
lua_setfield(L, -2, "__gc");
lua_setmetatable(L, -2);
#endif
lua_rawseti(L, 1, SOCKET_CTX_INDEX);
}
ngx_memzero(u, sizeof(ngx_http_lua_socket_udp_upstream_t));
u->request = r; /* set the controlling request */
llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module);
u->conf = llcf;
uc = &u->udp_connection;
uc->log = *r->connection->log;
dd("lua peer connection log: %p", &uc->log);
lua_rawgeti(L, 1, SOCKET_TIMEOUT_INDEX);
timeout = (ngx_int_t) lua_tointeger(L, -1);
lua_pop(L, 1);
if (timeout > 0) {
u->read_timeout = (ngx_msec_t) timeout;
} else {
u->read_timeout = u->conf->read_timeout;
}
ngx_memzero(&url, sizeof(ngx_url_t));
url.url.len = host.len;
url.url.data = host.data;
url.default_port = (in_port_t) port;
url.no_resolve = 1;
if (ngx_parse_url(r->pool, &url) != NGX_OK) {
lua_pushnil(L);
if (url.err) {
lua_pushfstring(L, "failed to parse host name \"%s\": %s",
host.data, url.err);
} else {
lua_pushfstring(L, "failed to parse host name \"%s\"", host.data);
}
return 2;
}
u->resolved = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_resolved_t));
if (u->resolved == NULL) {
return luaL_error(L, "out of memory");
}
if (url.addrs && url.addrs[0].sockaddr) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket network address given directly");
u->resolved->sockaddr = url.addrs[0].sockaddr;
u->resolved->socklen = url.addrs[0].socklen;
u->resolved->naddrs = 1;
u->resolved->host = url.addrs[0].name;
} else {
u->resolved->host = host;
u->resolved->port = (in_port_t) port;
}
if (u->resolved->sockaddr) {
rc = ngx_http_lua_socket_resolve_retval_handler(r, u, L);
if (rc == NGX_AGAIN) {
return lua_yield(L, 0);
}
return rc;
}
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
temp.name = host;
rctx = ngx_resolve_start(clcf->resolver, &temp);
if (rctx == NULL) {
u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_RESOLVER;
lua_pushnil(L);
lua_pushliteral(L, "failed to start the resolver");
return 2;
}
if (rctx == NGX_NO_RESOLVER) {
u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_RESOLVER;
lua_pushnil(L);
lua_pushfstring(L, "no resolver defined to resolve \"%s\"", host.data);
return 2;
}
rctx->name = host;
#if !defined(nginx_version) || nginx_version < 1005008
rctx->type = NGX_RESOLVE_A;
#endif
rctx->handler = ngx_http_lua_socket_resolve_handler;
rctx->data = u;
rctx->timeout = clcf->resolver_timeout;
u->co_ctx = ctx->cur_co_ctx;
u->resolved->ctx = rctx;
saved_top = lua_gettop(L);
coctx = ctx->cur_co_ctx;
coctx->cleanup = ngx_http_lua_udp_resolve_cleanup;
if (ngx_resolve_name(rctx) != NGX_OK) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket fail to run resolver immediately");
u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_RESOLVER;
u->resolved->ctx = NULL;
lua_pushnil(L);
lua_pushfstring(L, "%s could not be resolved", host.data);
return 2;
}
if (u->waiting == 1) {
/* resolved and already connecting */
return lua_yield(L, 0);
}
n = lua_gettop(L) - saved_top;
if (n) {
/* errors occurred during resolving or connecting
* or already connected */
return n;
}
/* still resolving */
u->waiting = 1;
u->prepare_retvals = ngx_http_lua_socket_resolve_retval_handler;
coctx->data = u;
if (ctx->entered_content_phase) {
r->write_event_handler = ngx_http_lua_content_wev_handler;
} else {
r->write_event_handler = ngx_http_core_run_phases;
}
return lua_yield(L, 0);
}
static void
ngx_http_lua_socket_resolve_handler(ngx_resolver_ctx_t *ctx)
{
ngx_http_request_t *r;
ngx_connection_t *c;
ngx_http_upstream_resolved_t *ur;
ngx_http_lua_ctx_t *lctx;
lua_State *L;
ngx_http_lua_socket_udp_upstream_t *u;
u_char *p;
size_t len;
#if defined(nginx_version) && nginx_version >= 1005008
socklen_t socklen;
struct sockaddr *sockaddr;
#else
struct sockaddr_in *sin;
#endif
ngx_uint_t i;
unsigned waiting;
u = ctx->data;
r = u->request;
c = r->connection;
ur = u->resolved;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"lua udp socket resolve handler");
lctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
if (lctx == NULL) {
return;
}
lctx->cur_co_ctx = u->co_ctx;
u->co_ctx->cleanup = NULL;
L = lctx->cur_co_ctx->co;
dd("setting socket_ready to 1");
waiting = u->waiting;
if (ctx->state) {
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"lua udp socket resolver error: %s (waiting: %d)",
ngx_resolver_strerror(ctx->state), (int) u->waiting);
lua_pushnil(L);
lua_pushlstring(L, (char *) ctx->name.data, ctx->name.len);
lua_pushfstring(L, " could not be resolved (%d: %s)",
(int) ctx->state,
ngx_resolver_strerror(ctx->state));
lua_concat(L, 2);
#if 1
ur->ctx = NULL;
ngx_resolve_name_done(ctx);
#endif
u->prepare_retvals = ngx_http_lua_socket_error_retval_handler;
ngx_http_lua_socket_udp_handle_error(r, u,
NGX_HTTP_LUA_SOCKET_FT_RESOLVER);
if (waiting) {
ngx_http_run_posted_requests(c);
}
return;
}
ur->naddrs = ctx->naddrs;
ur->addrs = ctx->addrs;
#if (NGX_DEBUG)
{
# if defined(nginx_version) && nginx_version >= 1005008
u_char text[NGX_SOCKADDR_STRLEN];
ngx_str_t addr;
# else
in_addr_t addr;
# endif
ngx_uint_t i;
# if defined(nginx_version) && nginx_version >= 1005008
addr.data = text;
for (i = 0; i < ctx->naddrs; i++) {
addr.len = ngx_sock_ntop(ur->addrs[i].sockaddr, ur->addrs[i].socklen,
text, NGX_SOCKADDR_STRLEN, 0);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"name was resolved to %V", &addr);
}
# else
for (i = 0; i < ctx->naddrs; i++) {
dd("addr i: %d %p", (int) i, &ctx->addrs[i]);
addr = ntohl(ctx->addrs[i]);
ngx_log_debug4(NGX_LOG_DEBUG_HTTP, c->log, 0,
"name was resolved to %ud.%ud.%ud.%ud",
(addr >> 24) & 0xff, (addr >> 16) & 0xff,
(addr >> 8) & 0xff, addr & 0xff);
}
# endif
}
#endif
if (ur->naddrs == 0) {
ngx_resolve_name_done(ctx);
u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_RESOLVER;
lua_pushnil(L);
lua_pushliteral(L, "name cannot be resolved to a address");
if (waiting) {
ngx_http_run_posted_requests(c);
}
return;
}
if (ur->naddrs == 1) {
i = 0;
} else {
i = ngx_random() % ur->naddrs;
}
dd("selected addr index: %d", (int) i);
#if defined(nginx_version) && nginx_version >= 1005008
socklen = ur->addrs[i].socklen;
sockaddr = ngx_palloc(r->pool, socklen);
if (sockaddr == NULL) {
goto nomem;
}
ngx_memcpy(sockaddr, ur->addrs[i].sockaddr, socklen);
switch (sockaddr->sa_family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
((struct sockaddr_in6 *) sockaddr)->sin6_port = htons(ur->port);
break;
#endif
default: /* AF_INET */
((struct sockaddr_in *) sockaddr)->sin_port = htons(ur->port);
}
p = ngx_pnalloc(r->pool, NGX_SOCKADDR_STRLEN);
if (p == NULL) {
goto nomem;
}
len = ngx_sock_ntop(sockaddr, socklen, p, NGX_SOCKADDR_STRLEN, 1);
ur->sockaddr = sockaddr;
ur->socklen = socklen;
#else
/* for nginx older than 1.5.8 */
len = NGX_INET_ADDRSTRLEN + sizeof(":65536") - 1;
p = ngx_pnalloc(r->pool, len + sizeof(struct sockaddr_in));
if (p == NULL) {
goto nomem;
}
sin = (struct sockaddr_in *) &p[len];
ngx_memzero(sin, sizeof(struct sockaddr_in));
len = ngx_inet_ntop(AF_INET, &ur->addrs[i], p, NGX_INET_ADDRSTRLEN);
len = ngx_sprintf(&p[len], ":%d", ur->port) - p;
sin->sin_family = AF_INET;
sin->sin_port = htons(ur->port);
sin->sin_addr.s_addr = ur->addrs[i];
ur->sockaddr = (struct sockaddr *) sin;
ur->socklen = sizeof(struct sockaddr_in);
#endif
ur->host.data = p;
ur->host.len = len;
ur->naddrs = 1;
ur->ctx = NULL;
ngx_resolve_name_done(ctx);
u->waiting = 0;
if (waiting) {
lctx->resume_handler = ngx_http_lua_socket_udp_resume;
r->write_event_handler(r);
ngx_http_run_posted_requests(c);
} else {
(void) ngx_http_lua_socket_resolve_retval_handler(r, u, L);
}
return;
nomem:
ngx_resolve_name_done(ctx);
u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_NOMEM;
lua_pushnil(L);
lua_pushliteral(L, "no memory");
if (waiting) {
ngx_http_run_posted_requests(c);
}
}
static int
ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u, lua_State *L)
{
ngx_http_lua_ctx_t *ctx;
ngx_http_lua_co_ctx_t *coctx;
ngx_udp_connection_t *uc;
ngx_connection_t *c;
ngx_http_cleanup_t *cln;
ngx_http_upstream_resolved_t *ur;
ngx_int_t rc;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket resolve retval handler");
if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_RESOLVER) {
return 2;
}
uc = &u->udp_connection;
ur = u->resolved;
if (ur->sockaddr) {
uc->sockaddr = ur->sockaddr;
uc->socklen = ur->socklen;
uc->server = ur->host;
} else {
lua_pushnil(L);
lua_pushliteral(L, "resolver not working");
return 2;
}
rc = ngx_http_lua_udp_connect(uc);
if (rc != NGX_OK) {
u->socket_errno = ngx_socket_errno;
}
if (u->cleanup == NULL) {
cln = ngx_http_cleanup_add(r, 0);
if (cln == NULL) {
u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR;
lua_pushnil(L);
lua_pushliteral(L, "out of memory");
return 2;
}
cln->handler = ngx_http_lua_socket_udp_cleanup;
cln->data = u;
u->cleanup = &cln->handler;
}
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket connect: %i", rc);
if (rc != NGX_OK) {
return ngx_http_lua_socket_error_retval_handler(r, u, L);
}
/* rc == NGX_OK */
c = uc->connection;
c->data = u;
c->write->handler = NULL;
c->read->handler = ngx_http_lua_socket_udp_handler;
c->read->resolver = 0;
c->pool = r->pool;
c->log = r->connection->log;
c->read->log = c->log;
c->write->log = c->log;
ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
coctx = ctx->cur_co_ctx;
coctx->data = u;
u->read_event_handler = ngx_http_lua_socket_dummy_handler;
lua_pushinteger(L, 1);
return 1;
}
static int
ngx_http_lua_socket_error_retval_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u, lua_State *L)
{
u_char errstr[NGX_MAX_ERROR_STR];
u_char *p;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket error retval handler");
if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_RESOLVER) {
return 2;
}
lua_pushnil(L);
if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_PARTIALWRITE) {
lua_pushliteral(L, "partial write");
} else if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_TIMEOUT) {
lua_pushliteral(L, "timeout");
} else if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_CLOSED) {
lua_pushliteral(L, "closed");
} else if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_BUFTOOSMALL) {
lua_pushliteral(L, "buffer too small");
} else if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_NOMEM) {
lua_pushliteral(L, "out of memory");
} else {
if (u->socket_errno) {
#if (nginx_version >= 1000000)
p = ngx_strerror(u->socket_errno, errstr, sizeof(errstr));
#else
p = ngx_strerror_r(u->socket_errno, errstr, sizeof(errstr));
#endif
/* for compatibility with LuaSocket */
ngx_strlow(errstr, errstr, p - errstr);
lua_pushlstring(L, (char *) errstr, p - errstr);
} else {
lua_pushliteral(L, "error");
}
}
return 2;
}
static int
ngx_http_lua_socket_udp_send(lua_State *L)
{
ssize_t n;
ngx_http_request_t *r;
u_char *p;
size_t len;
ngx_http_lua_socket_udp_upstream_t *u;
int type;
const char *msg;
ngx_str_t query;
ngx_http_lua_loc_conf_t *llcf;
if (lua_gettop(L) != 2) {
return luaL_error(L, "expecting 2 arguments (including the object), "
"but got %d", lua_gettop(L));
}
r = ngx_http_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "request object not found");
}
luaL_checktype(L, 1, LUA_TTABLE);
lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);
lua_pop(L, 1);
if (u == NULL || u->udp_connection.connection == NULL) {
llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module);
if (llcf->log_socket_errors) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"attempt to send data on a closed socket: u:%p, c:%p",
u, u ? u->udp_connection.connection : NULL);
}
lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}
if (u->ft_type) {
u->ft_type = 0;
}
if (u->waiting) {
lua_pushnil(L);
lua_pushliteral(L, "socket busy");
return 2;
}
type = lua_type(L, 2);
switch (type) {
case LUA_TNUMBER:
case LUA_TSTRING:
lua_tolstring(L, 2, &len);
break;
case LUA_TTABLE:
len = ngx_http_lua_calc_strlen_in_table(L, 2, 2, 1 /* strict */);
break;
default:
msg = lua_pushfstring(L, "string, number, boolean, nil, "
"or array table expected, got %s",
lua_typename(L, type));
return luaL_argerror(L, 2, msg);
}
query.data = lua_newuserdata(L, len);
query.len = len;
switch (type) {
case LUA_TNUMBER:
case LUA_TSTRING:
p = (u_char *) lua_tolstring(L, 2, &len);
ngx_memcpy(query.data, (u_char *) p, len);
break;
case LUA_TTABLE:
(void) ngx_http_lua_copy_str_in_table(L, 2, query.data);
break;
default:
return luaL_error(L, "impossible to reach here");
}
u->ft_type = 0;
/* mimic ngx_http_upstream_init_request here */
#if 1
u->waiting = 0;
#endif
dd("sending query %.*s", (int) query.len, query.data);
n = ngx_send(u->udp_connection.connection, query.data, query.len);
dd("ngx_send returns %d (query len %d)", (int) n, (int) query.len);
if (n == NGX_ERROR || n == NGX_AGAIN) {
u->socket_errno = ngx_socket_errno;
return ngx_http_lua_socket_error_retval_handler(r, u, L);
}
if (n != (ssize_t) query.len) {
dd("not the while query was sent");
u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_PARTIALWRITE;
return ngx_http_lua_socket_error_retval_handler(r, u, L);
}
dd("n == len");
lua_pushinteger(L, 1);
return 1;
}
static int
ngx_http_lua_socket_udp_receive(lua_State *L)
{
ngx_http_request_t *r;
ngx_http_lua_socket_udp_upstream_t *u;
ngx_int_t rc;
ngx_http_lua_ctx_t *ctx;
ngx_http_lua_co_ctx_t *coctx;
size_t size;
int nargs;
ngx_http_lua_loc_conf_t *llcf;
nargs = lua_gettop(L);
if (nargs != 1 && nargs != 2) {
return luaL_error(L, "expecting 1 or 2 arguments "
"(including the object), but got %d", nargs);
}
r = ngx_http_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "no request found");
}
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket calling receive() method");
luaL_checktype(L, 1, LUA_TTABLE);
lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);
lua_pop(L, 1);
if (u == NULL || u->udp_connection.connection == NULL) {
llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module);
if (llcf->log_socket_errors) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"attempt to receive data on a closed socket: u:%p, "
"c:%p", u, u ? u->udp_connection.connection : NULL);
}
lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}
if (u->ft_type) {
u->ft_type = 0;
}
#if 1
if (u->waiting) {
lua_pushnil(L);
lua_pushliteral(L, "socket busy");
return 2;
}
#endif
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket read timeout: %M", u->read_timeout);
size = (size_t) luaL_optnumber(L, 2, UDP_MAX_DATAGRAM_SIZE);
size = ngx_min(size, UDP_MAX_DATAGRAM_SIZE);
u->recv_buf_size = size;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket receive buffer size: %uz", u->recv_buf_size);
rc = ngx_http_lua_socket_udp_read(r, u);
if (rc == NGX_ERROR) {
dd("read failed: %d", (int) u->ft_type);
rc = ngx_http_lua_socket_udp_receive_retval_handler(r, u, L);
dd("udp receive retval returned: %d", (int) rc);
return rc;
}
if (rc == NGX_OK) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket receive done in a single run");
return ngx_http_lua_socket_udp_receive_retval_handler(r, u, L);
}
/* n == NGX_AGAIN */
u->read_event_handler = ngx_http_lua_socket_udp_read_handler;
ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
if (ctx == NULL) {
return luaL_error(L, "no request ctx found");
}
ctx->cur_co_ctx->cleanup = ngx_http_lua_udp_socket_cleanup;
if (ctx->entered_content_phase) {
r->write_event_handler = ngx_http_lua_content_wev_handler;
} else {
r->write_event_handler = ngx_http_core_run_phases;
}
u->co_ctx = ctx->cur_co_ctx;
u->waiting = 1;
u->prepare_retvals = ngx_http_lua_socket_udp_receive_retval_handler;
coctx = ctx->cur_co_ctx;
coctx->data = u;
return lua_yield(L, 0);
}
static int
ngx_http_lua_socket_udp_receive_retval_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u, lua_State *L)
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket receive return value handler");
if (u->ft_type) {
return ngx_http_lua_socket_error_retval_handler(r, u, L);
}
lua_pushlstring(L, (char *) ngx_http_lua_socket_udp_buffer, u->received);
return 1;
}
static int
ngx_http_lua_socket_udp_settimeout(lua_State *L)
{
int n;
ngx_int_t timeout;
ngx_http_lua_socket_udp_upstream_t *u;
n = lua_gettop(L);
if (n != 2) {
return luaL_error(L, "ngx.socket settimout: expecting at least 2 "
"arguments (including the object) but seen %d",
lua_gettop(L));
}
timeout = (ngx_int_t) lua_tonumber(L, 2);
lua_rawseti(L, 1, SOCKET_TIMEOUT_INDEX);
lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);
if (u) {
if (timeout > 0) {
u->read_timeout = (ngx_msec_t) timeout;
} else {
u->read_timeout = u->conf->read_timeout;
}
}
return 0;
}
static void
ngx_http_lua_socket_udp_finalize(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u)
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua finalize socket");
if (u->cleanup) {
*u->cleanup = NULL;
u->cleanup = NULL;
}
if (u->resolved && u->resolved->ctx) {
ngx_resolve_name_done(u->resolved->ctx);
u->resolved->ctx = NULL;
}
if (u->udp_connection.connection) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua close socket connection");
ngx_close_connection(u->udp_connection.connection);
u->udp_connection.connection = NULL;
}
if (u->waiting) {
u->waiting = 0;
}
}
static int
ngx_http_lua_socket_udp_upstream_destroy(lua_State *L)
{
ngx_http_lua_socket_udp_upstream_t *u;
dd("upstream destroy triggered by Lua GC");
u = lua_touserdata(L, 1);
if (u == NULL) {
return 0;
}
if (u->cleanup) {
ngx_http_lua_socket_udp_cleanup(u); /* it will clear u->cleanup */
}
return 0;
}
static void
ngx_http_lua_socket_dummy_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u)
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket dummy handler");
}
static ngx_int_t
ngx_http_lua_socket_udp_read(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u)
{
ngx_connection_t *c;
ngx_event_t *rev;
ssize_t n;
c = u->udp_connection.connection;
rev = c->read;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"lua udp socket read data: waiting: %d", (int) u->waiting);
n = ngx_udp_recv(u->udp_connection.connection,
ngx_http_lua_socket_udp_buffer, u->recv_buf_size);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"lua udp recv returned %z", n);
if (n >= 0) {
u->received = n;
ngx_http_lua_socket_udp_handle_success(r, u);
return NGX_OK;
}
if (n == NGX_ERROR) {
u->socket_errno = ngx_socket_errno;
ngx_http_lua_socket_udp_handle_error(r, u,
NGX_HTTP_LUA_SOCKET_FT_ERROR);
return NGX_ERROR;
}
/* n == NGX_AGAIN */
#if 1
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_lua_socket_udp_handle_error(r, u,
NGX_HTTP_LUA_SOCKET_FT_ERROR);
return NGX_ERROR;
}
#endif
if (rev->active) {
ngx_add_timer(rev, u->read_timeout);
} else if (rev->timer_set) {
ngx_del_timer(rev);
}
return NGX_AGAIN;
}
static void
ngx_http_lua_socket_udp_read_handler(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u)
{
ngx_connection_t *c;
ngx_http_lua_loc_conf_t *llcf;
c = u->udp_connection.connection;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket read handler");
if (c->read->timedout) {
c->read->timedout = 0;
llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module);
if (llcf->log_socket_errors) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"lua udp socket read timed out");
}
ngx_http_lua_socket_udp_handle_error(r, u,
NGX_HTTP_LUA_SOCKET_FT_TIMEOUT);
return;
}
#if 1
if (c->read->timer_set) {
ngx_del_timer(c->read);
}
#endif
(void) ngx_http_lua_socket_udp_read(r, u);
}
static void
ngx_http_lua_socket_udp_handle_error(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u, ngx_uint_t ft_type)
{
ngx_http_lua_ctx_t *ctx;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket handle error");
u->ft_type |= ft_type;
#if 0
ngx_http_lua_socket_udp_finalize(r, u);
#endif
u->read_event_handler = ngx_http_lua_socket_dummy_handler;
if (u->co_ctx) {
u->co_ctx->cleanup = NULL;
}
if (u->waiting) {
u->waiting = 0;
ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
if (ctx == NULL) {
return;
}
ctx->resume_handler = ngx_http_lua_socket_udp_resume;
ctx->cur_co_ctx = u->co_ctx;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket waking up the current request");
r->write_event_handler(r);
}
}
static void
ngx_http_lua_socket_udp_cleanup(void *data)
{
ngx_http_lua_socket_udp_upstream_t *u = data;
ngx_http_request_t *r;
r = u->request;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"cleanup lua udp socket upstream request: \"%V\"", &r->uri);
ngx_http_lua_socket_udp_finalize(r, u);
}
static void
ngx_http_lua_socket_udp_handler(ngx_event_t *ev)
{
ngx_connection_t *c;
ngx_http_request_t *r;
ngx_http_log_ctx_t *ctx;
ngx_http_lua_socket_udp_upstream_t *u;
c = ev->data;
u = c->data;
r = u->request;
c = r->connection;
ctx = c->log->data;
ctx->current_request = r;
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket handler for \"%V?%V\", wev %d", &r->uri,
&r->args, (int) ev->write);
u->read_event_handler(r, u);
ngx_http_run_posted_requests(c);
}
static void
ngx_http_lua_socket_udp_handle_success(ngx_http_request_t *r,
ngx_http_lua_socket_udp_upstream_t *u)
{
ngx_http_lua_ctx_t *ctx;
u->read_event_handler = ngx_http_lua_socket_dummy_handler;
if (u->co_ctx) {
u->co_ctx->cleanup = NULL;
}
if (u->waiting) {
u->waiting = 0;
ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
if (ctx == NULL) {
return;
}
ctx->resume_handler = ngx_http_lua_socket_udp_resume;
ctx->cur_co_ctx = u->co_ctx;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket waking up the current request");
r->write_event_handler(r);
}
}
static ngx_int_t
ngx_http_lua_udp_connect(ngx_udp_connection_t *uc)
{
int rc;
ngx_int_t event;
ngx_event_t *rev, *wev;
ngx_socket_t s;
ngx_connection_t *c;
s = ngx_socket(uc->sockaddr->sa_family, SOCK_DGRAM, 0);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, &uc->log, 0, "UDP socket %d", s);
if (s == -1) {
ngx_log_error(NGX_LOG_ALERT, &uc->log, ngx_socket_errno,
ngx_socket_n " failed");
return NGX_ERROR;
}
c = ngx_get_connection(s, &uc->log);
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, &uc->log, ngx_socket_errno,
ngx_close_socket_n "failed");
}
return NGX_ERROR;
}
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, &uc->log, ngx_socket_errno,
ngx_nonblocking_n " failed");
ngx_free_connection(c);
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, &uc->log, ngx_socket_errno,
ngx_close_socket_n " failed");
}
return NGX_ERROR;
}
rev = c->read;
wev = c->write;
rev->log = &uc->log;
wev->log = &uc->log;
uc->connection = c;
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
#if (NGX_THREADS)
/* TODO: lock event when call completion handler */
rev->lock = &c->lock;
wev->lock = &c->lock;
rev->own_lock = &c->lock;
wev->own_lock = &c->lock;
#endif
#if (NGX_HTTP_LUA_HAVE_SO_PASSCRED)
if (uc->sockaddr->sa_family == AF_UNIX) {
struct sockaddr addr;
addr.sa_family = AF_UNIX;
/* just to make valgrind happy */
ngx_memzero(addr.sa_data, sizeof(addr.sa_data));
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, &uc->log, 0, "datagram unix "
"domain socket autobind");
if (bind(uc->connection->fd, &addr, sizeof(sa_family_t)) != 0) {
ngx_log_error(NGX_LOG_CRIT, &uc->log, ngx_socket_errno,
"bind() failed");
return NGX_ERROR;
}
}
#endif
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, &uc->log, 0,
"connect to %V, fd:%d #%d", &uc->server, s, c->number);
rc = connect(s, uc->sockaddr, uc->socklen);
/* TODO: aio, iocp */
if (rc == -1) {
ngx_log_error(NGX_LOG_CRIT, &uc->log, ngx_socket_errno,
"connect() failed");
return NGX_ERROR;
}
/* UDP sockets are always ready to write */
wev->ready = 1;
if (ngx_add_event) {
event = (ngx_event_flags & NGX_USE_CLEAR_EVENT) ?
/* kqueue, epoll */ NGX_CLEAR_EVENT:
/* select, poll, /dev/poll */ NGX_LEVEL_EVENT;
/* eventport event type has no meaning: oneshot only */
if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {
return NGX_ERROR;
}
} else {
/* rtsig */
if (ngx_add_conn(c) == NGX_ERROR) {
return NGX_ERROR;
}
}
return NGX_OK;
}
static int
ngx_http_lua_socket_udp_close(lua_State *L)
{
ngx_http_request_t *r;
ngx_http_lua_socket_udp_upstream_t *u;
if (lua_gettop(L) != 1) {
return luaL_error(L, "expecting 1 argument "
"(including the object) but seen %d", lua_gettop(L));
}
r = ngx_http_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "no request found");
}
luaL_checktype(L, 1, LUA_TTABLE);
lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);
lua_pop(L, 1);
if (u == NULL || u->udp_connection.connection == NULL) {
lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}
if (u->waiting) {
lua_pushnil(L);
lua_pushliteral(L, "socket busy");
return 2;
}
ngx_http_lua_socket_udp_finalize(r, u);
lua_pushinteger(L, 1);
return 1;
}
static ngx_int_t
ngx_http_lua_socket_udp_resume(ngx_http_request_t *r)
{
int nret;
lua_State *vm;
ngx_int_t rc;
ngx_connection_t *c;
ngx_http_lua_ctx_t *ctx;
ngx_http_lua_co_ctx_t *coctx;
ngx_http_lua_socket_udp_upstream_t *u;
ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
if (ctx == NULL) {
return NGX_ERROR;
}
ctx->resume_handler = ngx_http_lua_wev_handler;
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp operation done, resuming lua thread");
coctx = ctx->cur_co_ctx;
#if 0
ngx_http_lua_probe_info("udp resume");
#endif
u = coctx->data;
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua udp socket calling prepare retvals handler %p, "
"u:%p", u->prepare_retvals, u);
nret = u->prepare_retvals(r, u, ctx->cur_co_ctx->co);
if (nret == NGX_AGAIN) {
return NGX_DONE;
}
c = r->connection;
vm = ngx_http_lua_get_lua_vm(r, ctx);
rc = ngx_http_lua_run_thread(vm, r, ctx, nret);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"lua run thread returned %d", rc);
if (rc == NGX_AGAIN) {
return ngx_http_lua_run_posted_threads(c, vm, r, ctx);
}
if (rc == NGX_DONE) {
ngx_http_lua_finalize_request(r, NGX_DONE);
return ngx_http_lua_run_posted_threads(c, vm, r, ctx);
}
if (ctx->entered_content_phase) {
ngx_http_lua_finalize_request(r, rc);
return NGX_DONE;
}
return rc;
}
static void
ngx_http_lua_udp_resolve_cleanup(void *data)
{
ngx_resolver_ctx_t *rctx;
ngx_http_lua_socket_udp_upstream_t *u;
ngx_http_lua_co_ctx_t *coctx = data;
u = coctx->data;
if (u == NULL) {
return;
}
rctx = u->resolved->ctx;
if (rctx == NULL) {
return;
}
ngx_resolve_name_done(rctx);
}
static void
ngx_http_lua_udp_socket_cleanup(void *data)
{
ngx_http_lua_socket_udp_upstream_t *u;
ngx_http_lua_co_ctx_t *coctx = data;
u = coctx->data;
if (u == NULL) {
return;
}
if (u->request == NULL) {
return;
}
ngx_http_lua_socket_udp_finalize(u->request, u);
}
/* vi:set ft=c ts=4 sw=4 et fdm=marker: */