| |
| /* |
| * Copyright (C) Yichun Zhang (agentzh) |
| */ |
| |
| |
| #ifndef DDEBUG |
| #define DDEBUG 0 |
| #endif |
| #include "ddebug.h" |
| |
| |
| #include "ngx_http_lua_socket_tcp.h" |
| #include "ngx_http_lua_util.h" |
| #include "ngx_http_lua_output.h" |
| #include "ngx_http_lua_contentby.h" |
| #include "ngx_http_lua_probe.h" |
| |
| |
| #if 1 |
| #undef ngx_http_lua_probe_info |
| #define ngx_http_lua_probe_info(msg) |
| #endif |
| |
| |
| static int ngx_http_lua_socket_tcp(lua_State *L); |
| static int ngx_http_lua_socket_tcp_connect(lua_State *L); |
| static int ngx_http_lua_socket_tcp_receive(lua_State *L); |
| static int ngx_http_lua_socket_tcp_send(lua_State *L); |
| static int ngx_http_lua_socket_tcp_close(lua_State *L); |
| static int ngx_http_lua_socket_tcp_setoption(lua_State *L); |
| static int ngx_http_lua_socket_tcp_settimeout(lua_State *L); |
| static void ngx_http_lua_socket_tcp_handler(ngx_event_t *ev); |
| static ngx_int_t ngx_http_lua_socket_tcp_get_peer(ngx_peer_connection_t *pc, |
| void *data); |
| static void ngx_http_lua_socket_read_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static void ngx_http_lua_socket_send_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static void ngx_http_lua_socket_connected_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static void ngx_http_lua_socket_tcp_cleanup(void *data); |
| static void ngx_http_lua_socket_tcp_finalize(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static ngx_int_t ngx_http_lua_socket_send(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static ngx_int_t ngx_http_lua_socket_test_connect(ngx_http_request_t *r, |
| ngx_connection_t *c); |
| static void ngx_http_lua_socket_handle_error(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type); |
| static void ngx_http_lua_socket_handle_success(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static int ngx_http_lua_socket_tcp_send_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L); |
| static int ngx_http_lua_socket_tcp_connect_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L); |
| static void ngx_http_lua_socket_dummy_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static ngx_int_t ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static void ngx_http_lua_socket_read_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static int ngx_http_lua_socket_tcp_receive_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L); |
| static ngx_int_t ngx_http_lua_socket_read_line(void *data, ssize_t bytes); |
| static void ngx_http_lua_socket_resolve_handler(ngx_resolver_ctx_t *ctx); |
| static int ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L); |
| static int ngx_http_lua_socket_error_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L); |
| static ngx_int_t ngx_http_lua_socket_read_all(void *data, ssize_t bytes); |
| static ngx_int_t ngx_http_lua_socket_read_until(void *data, ssize_t bytes); |
| static ngx_int_t ngx_http_lua_socket_read_chunk(void *data, ssize_t bytes); |
| static int ngx_http_lua_socket_tcp_receiveuntil(lua_State *L); |
| static int ngx_http_lua_socket_receiveuntil_iterator(lua_State *L); |
| static ngx_int_t ngx_http_lua_socket_compile_pattern(u_char *data, size_t len, |
| ngx_http_lua_socket_compiled_pattern_t *cp, ngx_log_t *log); |
| static int ngx_http_lua_socket_cleanup_compiled_pattern(lua_State *L); |
| static int ngx_http_lua_req_socket(lua_State *L); |
| static void ngx_http_lua_req_socket_rev_handler(ngx_http_request_t *r); |
| static int ngx_http_lua_socket_tcp_getreusedtimes(lua_State *L); |
| static int ngx_http_lua_socket_tcp_setkeepalive(lua_State *L); |
| static ngx_int_t ngx_http_lua_get_keepalive_peer(ngx_http_request_t *r, |
| lua_State *L, int key_index, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static void ngx_http_lua_socket_keepalive_dummy_handler(ngx_event_t *ev); |
| static ngx_int_t ngx_http_lua_socket_keepalive_close_handler(ngx_event_t *ev); |
| static void ngx_http_lua_socket_keepalive_rev_handler(ngx_event_t *ev); |
| static void ngx_http_lua_socket_free_pool(ngx_log_t *log, |
| ngx_http_lua_socket_pool_t *spool); |
| static int ngx_http_lua_socket_tcp_upstream_destroy(lua_State *L); |
| static int ngx_http_lua_socket_downstream_destroy(lua_State *L); |
| static ngx_int_t ngx_http_lua_socket_push_input_data(ngx_http_request_t *r, |
| ngx_http_lua_ctx_t *ctx, ngx_http_lua_socket_tcp_upstream_t *u, |
| lua_State *L); |
| static ngx_int_t ngx_http_lua_socket_add_pending_data(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, u_char *pos, size_t len, u_char *pat, |
| int prefix, int old_state); |
| static ngx_int_t ngx_http_lua_socket_add_input_buffer(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u); |
| static ngx_int_t ngx_http_lua_socket_insert_buffer(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, u_char *pat, size_t prefix); |
| static ngx_int_t ngx_http_lua_socket_tcp_resume(ngx_http_request_t *r); |
| static void ngx_http_lua_tcp_resolve_cleanup(void *data); |
| static void ngx_http_lua_coctx_cleanup(void *data); |
| |
| |
| enum { |
| SOCKET_CTX_INDEX = 1, |
| SOCKET_TIMEOUT_INDEX = 2, |
| SOCKET_KEY_INDEX = 3 |
| }; |
| |
| |
| static char ngx_http_lua_req_socket_metatable_key; |
| static char ngx_http_lua_raw_req_socket_metatable_key; |
| static char ngx_http_lua_tcp_socket_metatable_key; |
| |
| |
| void |
| ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) |
| { |
| ngx_int_t rc; |
| |
| lua_createtable(L, 0, 3 /* nrec */); /* ngx.socket */ |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp); |
| lua_setfield(L, -2, "tcp"); |
| |
| { |
| const char buf[] = "local sock = ngx.socket.tcp()" |
| " local ok, err = sock:connect(...)" |
| " if ok then return sock else return nil, err end"; |
| |
| rc = luaL_loadbuffer(L, buf, sizeof(buf) - 1, "ngx.socket.connect"); |
| } |
| |
| if (rc != NGX_OK) { |
| ngx_log_error(NGX_LOG_CRIT, log, 0, |
| "failed to load Lua code for ngx.socket.connect(): %i", |
| rc); |
| |
| } else { |
| lua_setfield(L, -2, "connect"); |
| } |
| |
| lua_setfield(L, -2, "socket"); |
| |
| /* {{{req socket object metatable */ |
| lua_pushlightuserdata(L, &ngx_http_lua_req_socket_metatable_key); |
| lua_createtable(L, 0 /* narr */, 3 /* nrec */); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_receive); |
| lua_setfield(L, -2, "receive"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_receiveuntil); |
| lua_setfield(L, -2, "receiveuntil"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_settimeout); |
| lua_setfield(L, -2, "settimeout"); /* ngx socket mt */ |
| |
| lua_pushvalue(L, -1); |
| lua_setfield(L, -2, "__index"); |
| |
| lua_rawset(L, LUA_REGISTRYINDEX); |
| /* }}} */ |
| |
| /* {{{raw req socket object metatable */ |
| lua_pushlightuserdata(L, &ngx_http_lua_raw_req_socket_metatable_key); |
| lua_createtable(L, 0 /* narr */, 4 /* nrec */); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_receive); |
| lua_setfield(L, -2, "receive"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_receiveuntil); |
| lua_setfield(L, -2, "receiveuntil"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_send); |
| lua_setfield(L, -2, "send"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_settimeout); |
| lua_setfield(L, -2, "settimeout"); /* ngx socket mt */ |
| |
| lua_pushvalue(L, -1); |
| lua_setfield(L, -2, "__index"); |
| |
| lua_rawset(L, LUA_REGISTRYINDEX); |
| /* }}} */ |
| |
| /* {{{tcp object metatable */ |
| lua_pushlightuserdata(L, &ngx_http_lua_tcp_socket_metatable_key); |
| lua_createtable(L, 0 /* narr */, 10 /* nrec */); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_connect); |
| lua_setfield(L, -2, "connect"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_receive); |
| lua_setfield(L, -2, "receive"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_receiveuntil); |
| lua_setfield(L, -2, "receiveuntil"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_send); |
| lua_setfield(L, -2, "send"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_close); |
| lua_setfield(L, -2, "close"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_setoption); |
| lua_setfield(L, -2, "setoption"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_settimeout); |
| lua_setfield(L, -2, "settimeout"); /* ngx socket mt */ |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_getreusedtimes); |
| lua_setfield(L, -2, "getreusedtimes"); |
| |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_setkeepalive); |
| lua_setfield(L, -2, "setkeepalive"); |
| |
| lua_pushvalue(L, -1); |
| lua_setfield(L, -2, "__index"); |
| lua_rawset(L, LUA_REGISTRYINDEX); |
| /* }}} */ |
| } |
| |
| |
| void |
| ngx_http_lua_inject_req_socket_api(lua_State *L) |
| { |
| lua_pushcfunction(L, ngx_http_lua_req_socket); |
| lua_setfield(L, -2, "socket"); |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp(lua_State *L) |
| { |
| ngx_http_request_t *r; |
| ngx_http_lua_ctx_t *ctx; |
| |
| if (lua_gettop(L) != 0) { |
| return luaL_error(L, "expecting zero arguments, but got %d", |
| lua_gettop(L)); |
| } |
| |
| r = ngx_http_lua_get_req(L); |
| if (r == NULL) { |
| return luaL_error(L, "no request found"); |
| } |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (ctx == NULL) { |
| return luaL_error(L, "no ctx found"); |
| } |
| |
| ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE |
| | NGX_HTTP_LUA_CONTEXT_ACCESS |
| | NGX_HTTP_LUA_CONTEXT_CONTENT |
| | NGX_HTTP_LUA_CONTEXT_TIMER); |
| |
| lua_createtable(L, 3 /* narr */, 1 /* nrec */); |
| lua_pushlightuserdata(L, &ngx_http_lua_tcp_socket_metatable_key); |
| lua_rawget(L, LUA_REGISTRYINDEX); |
| lua_setmetatable(L, -2); |
| |
| dd("top: %d", lua_gettop(L)); |
| |
| return 1; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_connect(lua_State *L) |
| { |
| ngx_http_request_t *r; |
| ngx_http_lua_ctx_t *ctx; |
| ngx_str_t host; |
| int port; |
| ngx_resolver_ctx_t *rctx, temp; |
| ngx_http_core_loc_conf_t *clcf; |
| int saved_top; |
| int n; |
| u_char *p; |
| size_t len; |
| ngx_url_t url; |
| ngx_int_t rc; |
| ngx_http_lua_loc_conf_t *llcf; |
| ngx_peer_connection_t *pc; |
| int timeout; |
| unsigned custom_pool; |
| int key_index; |
| const char *msg; |
| ngx_http_lua_co_ctx_t *coctx; |
| |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| n = lua_gettop(L); |
| if (n != 2 && n != 3 && n != 4) { |
| return luaL_error(L, "ngx.socket connect: expecting 2, 3, or 4 " |
| "arguments (including the object), but seen %d", n); |
| } |
| |
| r = ngx_http_lua_get_req(L); |
| if (r == NULL) { |
| return luaL_error(L, "no request found"); |
| } |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (ctx == NULL) { |
| return luaL_error(L, "no ctx found"); |
| } |
| |
| ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE |
| | NGX_HTTP_LUA_CONTEXT_ACCESS |
| | NGX_HTTP_LUA_CONTEXT_CONTENT |
| | NGX_HTTP_LUA_CONTEXT_TIMER); |
| |
| luaL_checktype(L, 1, LUA_TTABLE); |
| |
| p = (u_char *) luaL_checklstring(L, 2, &len); |
| |
| host.data = ngx_palloc(r->pool, len + 1); |
| if (host.data == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| host.len = len; |
| |
| ngx_memcpy(host.data, p, len); |
| host.data[len] = '\0'; |
| |
| key_index = 2; |
| custom_pool = 0; |
| |
| if (lua_type(L, n) == LUA_TTABLE) { |
| |
| /* found the last optional option table */ |
| |
| lua_getfield(L, n, "pool"); |
| |
| switch (lua_type(L, -1)) { |
| case LUA_TNUMBER: |
| lua_tostring(L, -1); |
| |
| case LUA_TSTRING: |
| custom_pool = 1; |
| |
| lua_pushvalue(L, -1); |
| lua_rawseti(L, 1, SOCKET_KEY_INDEX); |
| |
| key_index = n + 1; |
| |
| break; |
| |
| case LUA_TNIL: |
| lua_pop(L, 2); |
| break; |
| |
| default: |
| msg = lua_pushfstring(L, "bad \"pool\" option type: %s", |
| luaL_typename(L, -1)); |
| luaL_argerror(L, n, msg); |
| break; |
| } |
| |
| n--; |
| } |
| |
| if (n == 3) { |
| port = luaL_checkinteger(L, 3); |
| |
| if (port < 0 || port > 65536) { |
| lua_pushnil(L); |
| lua_pushfstring(L, "bad port number: %d", port); |
| return 2; |
| } |
| |
| if (!custom_pool) { |
| lua_pushliteral(L, ":"); |
| lua_insert(L, 3); |
| lua_concat(L, 3); |
| } |
| |
| dd("socket key: %s", lua_tostring(L, -1)); |
| |
| } else { /* n == 2 */ |
| port = 0; |
| } |
| |
| if (!custom_pool) { |
| /* the key's index is 2 */ |
| |
| lua_pushvalue(L, 2); |
| lua_rawseti(L, 1, SOCKET_KEY_INDEX); |
| } |
| |
| lua_rawgeti(L, 1, SOCKET_CTX_INDEX); |
| u = lua_touserdata(L, -1); |
| lua_pop(L, 1); |
| |
| if (u) { |
| if (u->waiting) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "socket busy"); |
| return 2; |
| } |
| |
| if (u->body_downstream || u->raw_downstream) { |
| return luaL_error(L, "attempt to re-connect a request socket"); |
| } |
| |
| if (u->peer.connection) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket reconnect without shutting down"); |
| |
| ngx_http_lua_socket_tcp_finalize(r, u); |
| } |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua reuse socket upstream ctx"); |
| |
| } else { |
| u = lua_newuserdata(L, sizeof(ngx_http_lua_socket_tcp_upstream_t)); |
| if (u == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| #if 1 |
| lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */ |
| lua_pushcfunction(L, ngx_http_lua_socket_tcp_upstream_destroy); |
| lua_setfield(L, -2, "__gc"); |
| lua_setmetatable(L, -2); |
| #endif |
| |
| lua_rawseti(L, 1, SOCKET_CTX_INDEX); |
| } |
| |
| ngx_memzero(u, sizeof(ngx_http_lua_socket_tcp_upstream_t)); |
| |
| coctx = ctx->cur_co_ctx; |
| |
| u->request = r; /* set the controlling request */ |
| |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| u->conf = llcf; |
| |
| pc = &u->peer; |
| |
| pc->log = r->connection->log; |
| pc->log_error = NGX_ERROR_ERR; |
| |
| dd("lua peer connection log: %p", pc->log); |
| |
| lua_rawgeti(L, 1, SOCKET_TIMEOUT_INDEX); |
| timeout = (ngx_int_t) lua_tointeger(L, -1); |
| lua_pop(L, 1); |
| |
| if (timeout > 0) { |
| u->send_timeout = (ngx_msec_t) timeout; |
| u->read_timeout = (ngx_msec_t) timeout; |
| u->connect_timeout = (ngx_msec_t) timeout; |
| |
| } else { |
| u->read_timeout = u->conf->read_timeout; |
| u->send_timeout = u->conf->send_timeout; |
| u->connect_timeout = u->conf->connect_timeout; |
| } |
| |
| rc = ngx_http_lua_get_keepalive_peer(r, L, key_index, u); |
| |
| if (rc == NGX_OK) { |
| lua_pushinteger(L, 1); |
| return 1; |
| } |
| |
| if (rc == NGX_ERROR) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "error in get keepalive peer"); |
| return 2; |
| } |
| |
| /* rc == NGX_DECLINED */ |
| |
| ngx_memzero(&url, sizeof(ngx_url_t)); |
| |
| url.url.len = host.len; |
| url.url.data = host.data; |
| url.default_port = (in_port_t) port; |
| url.no_resolve = 1; |
| |
| if (ngx_parse_url(r->pool, &url) != NGX_OK) { |
| lua_pushnil(L); |
| |
| if (url.err) { |
| lua_pushfstring(L, "failed to parse host name \"%s\": %s", |
| host.data, url.err); |
| |
| } else { |
| lua_pushfstring(L, "failed to parse host name \"%s\"", host.data); |
| } |
| |
| return 2; |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket connect timeout: %M", u->connect_timeout); |
| |
| u->resolved = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_resolved_t)); |
| if (u->resolved == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| if (url.addrs && url.addrs[0].sockaddr) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket network address given directly"); |
| |
| u->resolved->sockaddr = url.addrs[0].sockaddr; |
| u->resolved->socklen = url.addrs[0].socklen; |
| u->resolved->naddrs = 1; |
| u->resolved->host = url.addrs[0].name; |
| |
| } else { |
| u->resolved->host = host; |
| u->resolved->port = (in_port_t) port; |
| } |
| |
| if (u->resolved->sockaddr) { |
| rc = ngx_http_lua_socket_resolve_retval_handler(r, u, L); |
| if (rc == NGX_AGAIN) { |
| return lua_yield(L, 0); |
| } |
| |
| return rc; |
| } |
| |
| clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); |
| |
| temp.name = host; |
| rctx = ngx_resolve_start(clcf->resolver, &temp); |
| if (rctx == NULL) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_RESOLVER; |
| lua_pushnil(L); |
| lua_pushliteral(L, "failed to start the resolver"); |
| return 2; |
| } |
| |
| if (rctx == NGX_NO_RESOLVER) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_RESOLVER; |
| lua_pushnil(L); |
| lua_pushfstring(L, "no resolver defined to resolve \"%s\"", host.data); |
| return 2; |
| } |
| |
| rctx->name = host; |
| #if !defined(nginx_version) || nginx_version < 1005008 |
| rctx->type = NGX_RESOLVE_A; |
| #endif |
| rctx->handler = ngx_http_lua_socket_resolve_handler; |
| rctx->data = u; |
| rctx->timeout = clcf->resolver_timeout; |
| |
| u->resolved->ctx = rctx; |
| u->co_ctx = ctx->cur_co_ctx; |
| |
| coctx->data = u; |
| |
| saved_top = lua_gettop(L); |
| |
| coctx->cleanup = ngx_http_lua_tcp_resolve_cleanup; |
| |
| if (ngx_resolve_name(rctx) != NGX_OK) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket fail to run resolver immediately"); |
| |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_RESOLVER; |
| |
| u->resolved->ctx = NULL; |
| lua_pushnil(L); |
| lua_pushfstring(L, "%s could not be resolved", host.data); |
| |
| return 2; |
| } |
| |
| if (u->waiting == 1) { |
| /* resolved and already connecting */ |
| return lua_yield(L, 0); |
| } |
| |
| n = lua_gettop(L) - saved_top; |
| if (n) { |
| /* errors occurred during resolving or connecting |
| * or already connected */ |
| return n; |
| } |
| |
| /* still resolving */ |
| |
| u->waiting = 1; |
| u->prepare_retvals = ngx_http_lua_socket_resolve_retval_handler; |
| |
| dd("setting data to %p", u); |
| |
| if (ctx->entered_content_phase) { |
| r->write_event_handler = ngx_http_lua_content_wev_handler; |
| |
| } else { |
| r->write_event_handler = ngx_http_core_run_phases; |
| } |
| |
| return lua_yield(L, 0); |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_resolve_handler(ngx_resolver_ctx_t *ctx) |
| { |
| ngx_http_request_t *r; |
| ngx_connection_t *c; |
| ngx_http_upstream_resolved_t *ur; |
| ngx_http_lua_ctx_t *lctx; |
| lua_State *L; |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| u_char *p; |
| size_t len; |
| #if defined(nginx_version) && nginx_version >= 1005008 |
| socklen_t socklen; |
| struct sockaddr *sockaddr; |
| #else |
| struct sockaddr_in *sin; |
| #endif |
| ngx_uint_t i; |
| unsigned waiting; |
| |
| u = ctx->data; |
| r = u->request; |
| c = r->connection; |
| ur = u->resolved; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, |
| "lua tcp socket resolve handler"); |
| |
| lctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (lctx == NULL) { |
| return; |
| } |
| |
| lctx->cur_co_ctx = u->co_ctx; |
| |
| u->co_ctx->cleanup = NULL; |
| |
| L = lctx->cur_co_ctx->co; |
| |
| waiting = u->waiting; |
| |
| if (ctx->state) { |
| ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, |
| "lua tcp socket resolver error: %s (waiting: %d)", |
| ngx_resolver_strerror(ctx->state), (int) u->waiting); |
| |
| lua_pushnil(L); |
| lua_pushlstring(L, (char *) ctx->name.data, ctx->name.len); |
| lua_pushfstring(L, " could not be resolved (%d: %s)", |
| (int) ctx->state, |
| ngx_resolver_strerror(ctx->state)); |
| lua_concat(L, 2); |
| |
| u->prepare_retvals = ngx_http_lua_socket_error_retval_handler; |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_RESOLVER); |
| |
| if (waiting) { |
| ngx_http_run_posted_requests(c); |
| } |
| |
| return; |
| } |
| |
| ur->naddrs = ctx->naddrs; |
| ur->addrs = ctx->addrs; |
| |
| #if (NGX_DEBUG) |
| { |
| # if defined(nginx_version) && nginx_version >= 1005008 |
| u_char text[NGX_SOCKADDR_STRLEN]; |
| ngx_str_t addr; |
| # else |
| in_addr_t addr; |
| # endif |
| ngx_uint_t i; |
| |
| # if defined(nginx_version) && nginx_version >= 1005008 |
| addr.data = text; |
| |
| for (i = 0; i < ctx->naddrs; i++) { |
| addr.len = ngx_sock_ntop(ur->addrs[i].sockaddr, ur->addrs[i].socklen, |
| text, NGX_SOCKADDR_STRLEN, 0); |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "name was resolved to %V", &addr); |
| } |
| # else |
| for (i = 0; i < ctx->naddrs; i++) { |
| dd("addr i: %d %p", (int) i, &ctx->addrs[i]); |
| |
| addr = ntohl(ctx->addrs[i]); |
| |
| ngx_log_debug4(NGX_LOG_DEBUG_HTTP, c->log, 0, |
| "name was resolved to %ud.%ud.%ud.%ud", |
| (addr >> 24) & 0xff, (addr >> 16) & 0xff, |
| (addr >> 8) & 0xff, addr & 0xff); |
| } |
| # endif |
| } |
| #endif |
| |
| if (ur->naddrs == 0) { |
| ngx_resolve_name_done(ctx); |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_RESOLVER; |
| |
| lua_pushnil(L); |
| lua_pushliteral(L, "name cannot be resolved to a address"); |
| |
| if (waiting) { |
| ngx_http_run_posted_requests(c); |
| } |
| |
| return; |
| } |
| |
| if (ur->naddrs == 1) { |
| i = 0; |
| |
| } else { |
| i = ngx_random() % ur->naddrs; |
| } |
| |
| dd("selected addr index: %d", (int) i); |
| |
| #if defined(nginx_version) && nginx_version >= 1005008 |
| socklen = ur->addrs[i].socklen; |
| |
| sockaddr = ngx_palloc(r->pool, socklen); |
| if (sockaddr == NULL) { |
| goto nomem; |
| } |
| |
| ngx_memcpy(sockaddr, ur->addrs[i].sockaddr, socklen); |
| |
| switch (sockaddr->sa_family) { |
| #if (NGX_HAVE_INET6) |
| case AF_INET6: |
| ((struct sockaddr_in6 *) sockaddr)->sin6_port = htons(ur->port); |
| break; |
| #endif |
| default: /* AF_INET */ |
| ((struct sockaddr_in *) sockaddr)->sin_port = htons(ur->port); |
| } |
| |
| p = ngx_pnalloc(r->pool, NGX_SOCKADDR_STRLEN); |
| if (p == NULL) { |
| goto nomem; |
| } |
| |
| len = ngx_sock_ntop(sockaddr, socklen, p, NGX_SOCKADDR_STRLEN, 1); |
| ur->sockaddr = sockaddr; |
| ur->socklen = socklen; |
| |
| #else |
| /* for nginx older than 1.5.8 */ |
| |
| len = NGX_INET_ADDRSTRLEN + sizeof(":65536") - 1; |
| |
| p = ngx_pnalloc(r->pool, len + sizeof(struct sockaddr_in)); |
| if (p == NULL) { |
| goto nomem; |
| } |
| |
| sin = (struct sockaddr_in *) &p[len]; |
| ngx_memzero(sin, sizeof(struct sockaddr_in)); |
| |
| len = ngx_inet_ntop(AF_INET, &ur->addrs[i], p, NGX_INET_ADDRSTRLEN); |
| len = ngx_sprintf(&p[len], ":%d", ur->port) - p; |
| |
| sin->sin_family = AF_INET; |
| sin->sin_port = htons(ur->port); |
| sin->sin_addr.s_addr = ur->addrs[i]; |
| |
| ur->sockaddr = (struct sockaddr *) sin; |
| ur->socklen = sizeof(struct sockaddr_in); |
| #endif |
| |
| ur->host.data = p; |
| ur->host.len = len; |
| ur->naddrs = 1; |
| |
| ur->ctx = NULL; |
| |
| ngx_resolve_name_done(ctx); |
| |
| u->waiting = 0; |
| |
| if (waiting) { |
| lctx->resume_handler = ngx_http_lua_socket_tcp_resume; |
| r->write_event_handler(r); |
| ngx_http_run_posted_requests(c); |
| |
| } else { |
| (void) ngx_http_lua_socket_resolve_retval_handler(r, u, L); |
| } |
| |
| return; |
| |
| nomem: |
| |
| ngx_resolve_name_done(ctx); |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_NOMEM; |
| |
| lua_pushnil(L); |
| lua_pushliteral(L, "no memory"); |
| |
| if (waiting) { |
| ngx_http_run_posted_requests(c); |
| } |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_resolve_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L) |
| { |
| ngx_http_lua_ctx_t *ctx; |
| ngx_peer_connection_t *pc; |
| ngx_connection_t *c; |
| ngx_http_cleanup_t *cln; |
| ngx_http_upstream_resolved_t *ur; |
| ngx_int_t rc; |
| ngx_http_lua_co_ctx_t *coctx; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket resolve retval handler"); |
| |
| if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_RESOLVER) { |
| return 2; |
| } |
| |
| pc = &u->peer; |
| |
| ur = u->resolved; |
| |
| if (ur->sockaddr) { |
| pc->sockaddr = ur->sockaddr; |
| pc->socklen = ur->socklen; |
| pc->name = &ur->host; |
| |
| } else { |
| lua_pushnil(L); |
| lua_pushliteral(L, "resolver not working"); |
| return 2; |
| } |
| |
| pc->get = ngx_http_lua_socket_tcp_get_peer; |
| |
| rc = ngx_event_connect_peer(pc); |
| |
| if (rc == NGX_ERROR) { |
| u->socket_errno = ngx_socket_errno; |
| } |
| |
| if (u->cleanup == NULL) { |
| cln = ngx_http_cleanup_add(r, 0); |
| if (cln == NULL) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR; |
| lua_pushnil(L); |
| lua_pushliteral(L, "out of memory"); |
| return 2; |
| } |
| |
| cln->handler = ngx_http_lua_socket_tcp_cleanup; |
| cln->data = u; |
| u->cleanup = &cln->handler; |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket connect: %i", rc); |
| |
| if (rc == NGX_ERROR) { |
| return ngx_http_lua_socket_error_retval_handler(r, u, L); |
| } |
| |
| if (rc == NGX_BUSY) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR; |
| lua_pushnil(L); |
| lua_pushliteral(L, "no live connection"); |
| return 2; |
| } |
| |
| if (rc == NGX_DECLINED) { |
| dd("socket errno: %d", (int) ngx_socket_errno); |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR; |
| u->socket_errno = ngx_socket_errno; |
| return ngx_http_lua_socket_error_retval_handler(r, u, L); |
| } |
| |
| /* rc == NGX_OK || rc == NGX_AGAIN */ |
| |
| c = pc->connection; |
| |
| c->data = u; |
| |
| c->write->handler = ngx_http_lua_socket_tcp_handler; |
| c->read->handler = ngx_http_lua_socket_tcp_handler; |
| |
| u->write_event_handler = ngx_http_lua_socket_connected_handler; |
| u->read_event_handler = ngx_http_lua_socket_connected_handler; |
| |
| c->sendfile &= r->connection->sendfile; |
| |
| c->pool = r->pool; |
| c->log = r->connection->log; |
| c->read->log = c->log; |
| c->write->log = c->log; |
| |
| /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */ |
| |
| #if 0 |
| u->writer.out = NULL; |
| u->writer.last = &u->writer.out; |
| #endif |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| |
| coctx = ctx->cur_co_ctx; |
| |
| dd("setting data to %p", u); |
| |
| coctx->data = u; |
| |
| if (rc == NGX_OK) { |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket connected: fd:%d", (int) c->fd); |
| |
| /* We should delete the current write/read event |
| * here because the socket object may not be used immediately |
| * on the Lua land, thus causing hot spin around level triggered |
| * event poll and wasting CPU cycles. */ |
| |
| if (ngx_handle_write_event(c->write, 0) != NGX_OK) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| lua_pushnil(L); |
| lua_pushliteral(L, "failed to handle write event"); |
| return 2; |
| } |
| |
| if (ngx_handle_read_event(c->read, 0) != NGX_OK) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| lua_pushnil(L); |
| lua_pushliteral(L, "failed to handle write event"); |
| return 2; |
| } |
| |
| u->read_event_handler = ngx_http_lua_socket_dummy_handler; |
| u->write_event_handler = ngx_http_lua_socket_dummy_handler; |
| |
| lua_pushinteger(L, 1); |
| return 1; |
| } |
| |
| /* rc == NGX_AGAIN */ |
| |
| coctx->cleanup = ngx_http_lua_coctx_cleanup; |
| |
| ngx_add_timer(c->write, u->connect_timeout); |
| |
| if (ctx->entered_content_phase) { |
| r->write_event_handler = ngx_http_lua_content_wev_handler; |
| |
| } else { |
| r->write_event_handler = ngx_http_core_run_phases; |
| } |
| |
| u->co_ctx = ctx->cur_co_ctx; |
| u->waiting = 1; |
| u->prepare_retvals = ngx_http_lua_socket_tcp_connect_retval_handler; |
| |
| dd("setting data to %p", u); |
| |
| if (ctx->entered_content_phase) { |
| r->write_event_handler = ngx_http_lua_content_wev_handler; |
| |
| } else { |
| r->write_event_handler = ngx_http_core_run_phases; |
| } |
| |
| return NGX_AGAIN; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_error_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L) |
| { |
| ngx_uint_t ft_type; |
| u_char errstr[NGX_MAX_ERROR_STR]; |
| u_char *p; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket error retval handler"); |
| |
| if (u->co_ctx) { |
| u->co_ctx->cleanup = NULL; |
| } |
| |
| ft_type = u->ft_type; |
| |
| if (u->no_close) { |
| u->no_close = 0; |
| u->ft_type = 0; |
| |
| } else { |
| ngx_http_lua_socket_tcp_finalize(r, u); |
| } |
| |
| if (ft_type & NGX_HTTP_LUA_SOCKET_FT_RESOLVER) { |
| return 2; |
| } |
| |
| lua_pushnil(L); |
| |
| if (ft_type & NGX_HTTP_LUA_SOCKET_FT_TIMEOUT) { |
| lua_pushliteral(L, "timeout"); |
| |
| } else if (ft_type & NGX_HTTP_LUA_SOCKET_FT_CLOSED) { |
| lua_pushliteral(L, "closed"); |
| |
| } else if (ft_type & NGX_HTTP_LUA_SOCKET_FT_BUFTOOSMALL) { |
| lua_pushliteral(L, "buffer too small"); |
| |
| } else if (ft_type & NGX_HTTP_LUA_SOCKET_FT_NOMEM) { |
| lua_pushliteral(L, "out of memory"); |
| |
| } else if (ft_type & NGX_HTTP_LUA_SOCKET_FT_CLIENTABORT) { |
| lua_pushliteral(L, "client aborted"); |
| |
| } else { |
| |
| if (u->socket_errno) { |
| #if (nginx_version >= 1000000) |
| p = ngx_strerror(u->socket_errno, errstr, sizeof(errstr)); |
| #else |
| p = ngx_strerror_r(u->socket_errno, errstr, sizeof(errstr)); |
| #endif |
| /* for compatibility with LuaSocket */ |
| ngx_strlow(errstr, errstr, p - errstr); |
| lua_pushlstring(L, (char *) errstr, p - errstr); |
| |
| } else { |
| lua_pushliteral(L, "error"); |
| } |
| } |
| |
| return 2; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_connect_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L) |
| { |
| if (u->ft_type) { |
| return ngx_http_lua_socket_error_retval_handler(r, u, L); |
| } |
| |
| lua_pushinteger(L, 1); |
| return 1; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_receive(lua_State *L) |
| { |
| ngx_http_request_t *r; |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| ngx_int_t rc; |
| ngx_http_lua_ctx_t *ctx; |
| int n; |
| ngx_str_t pat; |
| lua_Integer bytes; |
| char *p; |
| int typ; |
| ngx_http_lua_loc_conf_t *llcf; |
| ngx_http_lua_co_ctx_t *coctx; |
| |
| n = lua_gettop(L); |
| if (n != 1 && n != 2) { |
| return luaL_error(L, "expecting 1 or 2 arguments " |
| "(including the object), but got %d", n); |
| } |
| |
| r = ngx_http_lua_get_req(L); |
| if (r == NULL) { |
| return luaL_error(L, "no request found"); |
| } |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket calling receive() method"); |
| |
| luaL_checktype(L, 1, LUA_TTABLE); |
| |
| lua_rawgeti(L, 1, SOCKET_CTX_INDEX); |
| u = lua_touserdata(L, -1); |
| |
| if (u == NULL || u->peer.connection == NULL || u->ft_type || u->eof) { |
| |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (llcf->log_socket_errors) { |
| ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, |
| "attempt to receive data on a closed socket: u:%p, " |
| "c:%p, ft:%ui eof:%ud", |
| u, u ? u->peer.connection : NULL, u ? u->ft_type : 0, |
| u ? u->eof : 0); |
| } |
| |
| lua_pushnil(L); |
| lua_pushliteral(L, "closed"); |
| return 2; |
| } |
| |
| if (u->waiting) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "socket busy"); |
| return 2; |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket read timeout: %M", u->read_timeout); |
| |
| if (n > 1) { |
| if (lua_isnumber(L, 2)) { |
| typ = LUA_TNUMBER; |
| |
| } else { |
| typ = lua_type(L, 2); |
| } |
| |
| switch (typ) { |
| case LUA_TSTRING: |
| pat.data = (u_char *) luaL_checklstring(L, 2, &pat.len); |
| if (pat.len != 2 || pat.data[0] != '*') { |
| p = (char *) lua_pushfstring(L, "bad pattern argument: %s", |
| (char *) pat.data); |
| |
| return luaL_argerror(L, 2, p); |
| } |
| |
| switch (pat.data[1]) { |
| case 'l': |
| u->input_filter = ngx_http_lua_socket_read_line; |
| break; |
| |
| case 'a': |
| u->input_filter = ngx_http_lua_socket_read_all; |
| break; |
| |
| default: |
| return luaL_argerror(L, 2, "bad pattern argument"); |
| break; |
| } |
| |
| u->length = 0; |
| u->rest = 0; |
| |
| break; |
| |
| case LUA_TNUMBER: |
| bytes = lua_tointeger(L, 2); |
| if (bytes < 0) { |
| return luaL_argerror(L, 2, "bad pattern argument"); |
| } |
| |
| #if 1 |
| if (bytes == 0) { |
| lua_pushliteral(L, ""); |
| return 1; |
| } |
| #endif |
| |
| u->input_filter = ngx_http_lua_socket_read_chunk; |
| u->length = (size_t) bytes; |
| u->rest = u->length; |
| |
| break; |
| |
| default: |
| return luaL_argerror(L, 2, "bad pattern argument"); |
| break; |
| } |
| |
| } else { |
| u->input_filter = ngx_http_lua_socket_read_line; |
| u->length = 0; |
| u->rest = 0; |
| } |
| |
| u->input_filter_ctx = u; |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| |
| if (u->bufs_in == NULL) { |
| u->bufs_in = |
| ngx_http_lua_chains_get_free_buf(r->connection->log, r->pool, |
| &ctx->free_recv_bufs, |
| u->conf->buffer_size, |
| (ngx_buf_tag_t) |
| &ngx_http_lua_module); |
| |
| if (u->bufs_in == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| u->buf_in = u->bufs_in; |
| u->buffer = *u->buf_in->buf; |
| } |
| |
| dd("tcp receive: buf_in: %p, bufs_in: %p", u->buf_in, u->bufs_in); |
| |
| if (u->raw_downstream || u->body_downstream) { |
| r->read_event_handler = ngx_http_lua_req_socket_rev_handler; |
| } |
| |
| u->waiting = 0; |
| |
| rc = ngx_http_lua_socket_tcp_read(r, u); |
| |
| if (rc == NGX_ERROR) { |
| dd("read failed: %d", (int) u->ft_type); |
| rc = ngx_http_lua_socket_tcp_receive_retval_handler(r, u, L); |
| dd("tcp receive retval returned: %d", (int) rc); |
| return rc; |
| } |
| |
| if (rc == NGX_OK) { |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket receive done in a single run"); |
| |
| return ngx_http_lua_socket_tcp_receive_retval_handler(r, u, L); |
| } |
| |
| /* rc == NGX_AGAIN */ |
| |
| u->read_event_handler = ngx_http_lua_socket_read_handler; |
| u->write_event_handler = ngx_http_lua_socket_dummy_handler; |
| |
| ctx->cur_co_ctx->cleanup = ngx_http_lua_coctx_cleanup; |
| |
| if (ctx->entered_content_phase) { |
| r->write_event_handler = ngx_http_lua_content_wev_handler; |
| |
| } else { |
| r->write_event_handler = ngx_http_core_run_phases; |
| } |
| |
| u->co_ctx = ctx->cur_co_ctx; |
| u->waiting = 1; |
| u->prepare_retvals = ngx_http_lua_socket_tcp_receive_retval_handler; |
| |
| coctx = ctx->cur_co_ctx; |
| |
| dd("setting data to %p, coctx:%p", u, coctx); |
| |
| coctx->data = u; |
| |
| if (u->raw_downstream || u->body_downstream) { |
| ctx->downstream_co_ctx = coctx; |
| } |
| |
| return lua_yield(L, 0); |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_read_chunk(void *data, ssize_t bytes) |
| { |
| ngx_http_lua_socket_tcp_upstream_t *u = data; |
| |
| ngx_buf_t *b; |
| #if (NGX_DEBUG) |
| ngx_http_request_t *r; |
| |
| r = u->request; |
| #endif |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket read chunk %z", bytes); |
| |
| if (bytes == 0) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_CLOSED; |
| return NGX_ERROR; |
| } |
| |
| b = &u->buffer; |
| |
| if (bytes >= (ssize_t) u->rest) { |
| |
| u->buf_in->buf->last += u->rest; |
| b->pos += u->rest; |
| u->rest = 0; |
| |
| return NGX_OK; |
| } |
| |
| /* bytes < u->rest */ |
| |
| u->buf_in->buf->last += bytes; |
| b->pos += bytes; |
| u->rest -= bytes; |
| |
| return NGX_AGAIN; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_read_all(void *data, ssize_t bytes) |
| { |
| ngx_http_lua_socket_tcp_upstream_t *u = data; |
| |
| ngx_buf_t *b; |
| #if (NGX_DEBUG) |
| ngx_http_request_t *r; |
| |
| r = u->request; |
| #endif |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket read all"); |
| |
| if (bytes == 0) { |
| return NGX_OK; |
| } |
| |
| b = &u->buffer; |
| |
| u->buf_in->buf->last += bytes; |
| b->pos += bytes; |
| |
| return NGX_AGAIN; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_read_line(void *data, ssize_t bytes) |
| { |
| ngx_http_lua_socket_tcp_upstream_t *u = data; |
| |
| ngx_buf_t *b; |
| u_char *dst; |
| u_char c; |
| #if (NGX_DEBUG) |
| u_char *begin; |
| #endif |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0, |
| "lua tcp socket read line"); |
| |
| if (bytes == 0) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_CLOSED; |
| return NGX_ERROR; |
| } |
| |
| b = &u->buffer; |
| |
| #if (NGX_DEBUG) |
| begin = b->pos; |
| #endif |
| |
| dd("already read: %p: %.*s", u->buf_in, |
| (int) (u->buf_in->buf->last - u->buf_in->buf->pos), |
| u->buf_in->buf->pos); |
| |
| dd("data read: %.*s", (int) bytes, b->pos); |
| |
| dst = u->buf_in->buf->last; |
| |
| while (bytes--) { |
| |
| c = *b->pos++; |
| |
| switch (c) { |
| case '\n': |
| ngx_log_debug2(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0, |
| "lua tcp socket read the final line part: \"%*s\"", |
| b->pos - 1 - begin, begin); |
| |
| u->buf_in->buf->last = dst; |
| |
| dd("read a line: %p: %.*s", u->buf_in, |
| (int) (u->buf_in->buf->last - u->buf_in->buf->pos), |
| u->buf_in->buf->pos); |
| |
| return NGX_OK; |
| |
| case '\r': |
| /* ignore it */ |
| break; |
| |
| default: |
| *dst++ = c; |
| break; |
| } |
| } |
| |
| #if (NGX_DEBUG) |
| ngx_log_debug2(NGX_LOG_DEBUG_HTTP, u->request->connection->log, 0, |
| "lua tcp socket read partial line data: %*s", |
| dst - begin, begin); |
| #endif |
| |
| u->buf_in->buf->last = dst; |
| |
| return NGX_AGAIN; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_tcp_read(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_int_t rc; |
| ngx_connection_t *c; |
| ngx_buf_t *b; |
| ngx_event_t *rev; |
| size_t size; |
| ssize_t n; |
| unsigned read; |
| off_t preread = 0; |
| ngx_http_lua_loc_conf_t *llcf; |
| |
| c = u->peer.connection; |
| rev = c->read; |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, |
| "lua tcp socket read data: waiting: %d", (int) u->waiting); |
| |
| b = &u->buffer; |
| read = 0; |
| |
| for ( ;; ) { |
| |
| size = b->last - b->pos; |
| |
| if (size || u->eof) { |
| |
| rc = u->input_filter(u->input_filter_ctx, size); |
| |
| if (rc == NGX_OK) { |
| |
| ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket receive done: wait:%d, eof:%d, " |
| "uri:\"%V?%V\"", (int) u->waiting, (int) u->eof, |
| &r->uri, &r->args); |
| |
| if (u->body_downstream |
| && b->last == b->pos |
| && r->request_body->rest == 0) |
| { |
| |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (llcf->check_client_abort) { |
| rc = ngx_http_lua_check_broken_connection(r, rev); |
| |
| if (rc == NGX_OK) { |
| goto success; |
| } |
| |
| if (rc == NGX_HTTP_CLIENT_CLOSED_REQUEST) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_CLIENTABORT); |
| |
| } else { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| } |
| |
| return NGX_ERROR; |
| } |
| } |
| |
| #if 1 |
| if (ngx_handle_read_event(rev, 0) != NGX_OK) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return NGX_ERROR; |
| } |
| #endif |
| |
| success: |
| ngx_http_lua_socket_handle_success(r, u); |
| return NGX_OK; |
| } |
| |
| if (rc == NGX_ERROR) { |
| dd("input filter error: ft_type:%d waiting:%d", |
| (int) u->ft_type, (int) u->waiting); |
| |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return NGX_ERROR; |
| } |
| |
| /* rc == NGX_AGAIN */ |
| |
| if (u->body_downstream && r->request_body->rest == 0) { |
| u->eof = 1; |
| } |
| |
| continue; |
| } |
| |
| if (read && !rev->ready) { |
| rc = NGX_AGAIN; |
| break; |
| } |
| |
| size = b->end - b->last; |
| |
| if (size == 0) { |
| rc = ngx_http_lua_socket_add_input_buffer(r, u); |
| if (rc == NGX_ERROR) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_NOMEM); |
| |
| return NGX_ERROR; |
| } |
| |
| b = &u->buffer; |
| size = (size_t) (b->end - b->last); |
| } |
| |
| if (u->raw_downstream) { |
| preread = r->header_in->last - r->header_in->pos; |
| |
| if (preread) { |
| |
| if ((off_t) size > preread) { |
| size = (size_t) preread; |
| } |
| |
| ngx_http_lua_probe_req_socket_consume_preread(r, |
| r->header_in->pos, |
| size); |
| |
| b->last = ngx_copy(b->last, r->header_in->pos, size); |
| r->header_in->pos += size; |
| continue; |
| } |
| |
| } else if (u->body_downstream) { |
| |
| if (r->request_body->rest == 0) { |
| |
| dd("request body rest is zero"); |
| |
| u->eof = 1; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua request body exhausted"); |
| |
| continue; |
| } |
| |
| /* try to process the preread body */ |
| |
| preread = r->header_in->last - r->header_in->pos; |
| |
| if (preread) { |
| |
| /* there is the pre-read part of the request body */ |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "http client request body preread %O", preread); |
| |
| if (preread >= r->request_body->rest) { |
| preread = r->request_body->rest; |
| } |
| |
| if ((off_t) size > preread) { |
| size = (size_t) preread; |
| } |
| |
| ngx_http_lua_probe_req_socket_consume_preread(r, |
| r->header_in->pos, |
| size); |
| |
| b->last = ngx_copy(b->last, r->header_in->pos, size); |
| |
| r->header_in->pos += size; |
| r->request_length += size; |
| |
| if (r->request_body->rest) { |
| r->request_body->rest -= size; |
| } |
| |
| continue; |
| } |
| |
| if (size > (size_t) r->request_body->rest) { |
| size = (size_t) r->request_body->rest; |
| } |
| } |
| |
| #if 1 |
| if (rev->active && !rev->ready) { |
| rc = NGX_AGAIN; |
| break; |
| } |
| #endif |
| |
| ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket try to recv data %uz: \"%V?%V\"", |
| size, &r->uri, &r->args); |
| |
| n = c->recv(c, b->last, size); |
| |
| dd("read event ready: %d", (int) c->read->ready); |
| |
| read = 1; |
| |
| ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket recv returned %d: \"%V?%V\"", |
| (int) n, &r->uri, &r->args); |
| |
| if (n == NGX_AGAIN) { |
| rc = NGX_AGAIN; |
| dd("socket recv busy"); |
| break; |
| } |
| |
| if (n == 0) { |
| |
| if (u->raw_downstream || u->body_downstream) { |
| |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (llcf->check_client_abort) { |
| |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_CLIENTABORT); |
| return NGX_ERROR; |
| } |
| |
| /* llcf->check_client_abort == 0 */ |
| |
| if (u->body_downstream && r->request_body->rest) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_CLIENTABORT); |
| return NGX_ERROR; |
| } |
| } |
| |
| u->eof = 1; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket closed"); |
| |
| continue; |
| } |
| |
| if (n == NGX_ERROR) { |
| u->socket_errno = ngx_socket_errno; |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return NGX_ERROR; |
| } |
| |
| b->last += n; |
| |
| if (u->body_downstream) { |
| r->request_length += n; |
| r->request_body->rest -= n; |
| } |
| } |
| |
| #if 1 |
| if (ngx_handle_read_event(rev, 0) != NGX_OK) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return NGX_ERROR; |
| } |
| #endif |
| |
| if (rev->active) { |
| ngx_add_timer(rev, u->read_timeout); |
| |
| } else if (rev->timer_set) { |
| ngx_del_timer(rev); |
| } |
| |
| return rc; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_send(lua_State *L) |
| { |
| ngx_int_t rc; |
| ngx_http_request_t *r; |
| u_char *p; |
| size_t len; |
| ngx_chain_t *cl; |
| ngx_http_lua_ctx_t *ctx; |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| int type; |
| const char *msg; |
| ngx_buf_t *b; |
| ngx_http_lua_loc_conf_t *llcf; |
| ngx_http_lua_co_ctx_t *coctx; |
| |
| /* TODO: add support for the optional "i" and "j" arguments */ |
| |
| if (lua_gettop(L) != 2) { |
| return luaL_error(L, "expecting 2 arguments (including the object), " |
| "but got %d", lua_gettop(L)); |
| } |
| |
| r = ngx_http_lua_get_req(L); |
| if (r == NULL) { |
| return luaL_error(L, "no request found"); |
| } |
| |
| luaL_checktype(L, 1, LUA_TTABLE); |
| |
| lua_rawgeti(L, 1, SOCKET_CTX_INDEX); |
| u = lua_touserdata(L, -1); |
| lua_pop(L, 1); |
| |
| if (u == NULL || u->peer.connection == NULL || u->ft_type || u->eof) { |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (llcf->log_socket_errors) { |
| ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, |
| "attempt to send data on a closed socket: u:%p, " |
| "c:%p, ft:%ui eof:%ud", |
| u, u ? u->peer.connection : NULL, u ? u->ft_type : 0, |
| u ? u->eof : 0); |
| } |
| |
| lua_pushnil(L); |
| lua_pushliteral(L, "closed"); |
| return 2; |
| } |
| |
| if (u->waiting) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "socket busy"); |
| return 2; |
| } |
| |
| if (u->raw_downstream && r->connection->buffered) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "socket busy"); |
| return 2; |
| } |
| |
| if (u->body_downstream) { |
| return luaL_error(L, "attempt to write to request sockets"); |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket send timeout: %M", u->send_timeout); |
| |
| type = lua_type(L, 2); |
| switch (type) { |
| case LUA_TNUMBER: |
| case LUA_TSTRING: |
| lua_tolstring(L, 2, &len); |
| break; |
| |
| case LUA_TTABLE: |
| len = ngx_http_lua_calc_strlen_in_table(L, 2, 2, 1 /* strict */); |
| break; |
| |
| default: |
| msg = lua_pushfstring(L, "string, number, boolean, nil, " |
| "or array table expected, got %s", |
| lua_typename(L, type)); |
| |
| return luaL_argerror(L, 2, msg); |
| } |
| |
| if (len == 0) { |
| lua_pushinteger(L, 0); |
| return 1; |
| } |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| |
| cl = ngx_http_lua_chains_get_free_buf(r->connection->log, r->pool, |
| &ctx->free_bufs, len, |
| (ngx_buf_tag_t) |
| &ngx_http_lua_module); |
| |
| if (cl == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| b = cl->buf; |
| |
| switch (type) { |
| case LUA_TNUMBER: |
| case LUA_TSTRING: |
| p = (u_char *) lua_tolstring(L, -1, &len); |
| b->last = ngx_copy(b->last, (u_char *) p, len); |
| break; |
| |
| case LUA_TTABLE: |
| b->last = ngx_http_lua_copy_str_in_table(L, -1, b->last); |
| break; |
| |
| default: |
| return luaL_error(L, "impossible to reach here"); |
| } |
| |
| u->request_bufs = cl; |
| |
| u->request_len = len; |
| u->ft_type = 0; |
| |
| /* mimic ngx_http_upstream_init_request here */ |
| |
| #if 1 |
| u->waiting = 0; |
| #endif |
| |
| ngx_http_lua_probe_socket_tcp_send_start(r, u, b->pos, len); |
| |
| rc = ngx_http_lua_socket_send(r, u); |
| |
| dd("socket send returned %d", (int) rc); |
| |
| if (rc == NGX_ERROR) { |
| return ngx_http_lua_socket_error_retval_handler(r, u, L); |
| } |
| |
| if (rc == NGX_OK) { |
| lua_pushinteger(L, len); |
| return 1; |
| } |
| |
| /* rc == NGX_AGAIN */ |
| |
| ctx->cur_co_ctx->cleanup = ngx_http_lua_coctx_cleanup; |
| if (u->raw_downstream) { |
| ctx->writing_raw_req_socket = 1; |
| } |
| |
| if (ctx->entered_content_phase) { |
| r->write_event_handler = ngx_http_lua_content_wev_handler; |
| |
| } else { |
| r->write_event_handler = ngx_http_core_run_phases; |
| } |
| |
| u->co_ctx = ctx->cur_co_ctx; |
| u->waiting = 1; |
| u->prepare_retvals = ngx_http_lua_socket_tcp_send_retval_handler; |
| |
| dd("setting data to %p", u); |
| |
| coctx = ctx->cur_co_ctx; |
| coctx->data = u; |
| |
| return lua_yield(L, 0); |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_send_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L) |
| { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket send return value handler"); |
| |
| if (u->ft_type) { |
| return ngx_http_lua_socket_error_retval_handler(r, u, L); |
| } |
| |
| lua_pushinteger(L, u->request_len); |
| return 1; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_receive_retval_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, lua_State *L) |
| { |
| int n; |
| ngx_int_t rc; |
| ngx_http_lua_ctx_t *ctx; |
| ngx_event_t *ev; |
| |
| ngx_http_lua_loc_conf_t *llcf; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket receive return value handler"); |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| |
| #if 1 |
| if (u->raw_downstream || u->body_downstream) { |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (llcf->check_client_abort) { |
| |
| r->read_event_handler = ngx_http_lua_rd_check_broken_connection; |
| |
| ev = r->connection->read; |
| |
| dd("rev active: %d", ev->active); |
| |
| if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && !ev->active) { |
| if (ngx_add_event(ev, NGX_READ_EVENT, 0) != NGX_OK) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "failed to add event"); |
| return 2; |
| } |
| } |
| |
| } else { |
| /* llcf->check_client_abort == 0 */ |
| r->read_event_handler = ngx_http_block_reading; |
| } |
| } |
| #endif |
| |
| if (u->ft_type) { |
| |
| if (u->ft_type & NGX_HTTP_LUA_SOCKET_FT_TIMEOUT) { |
| u->no_close = 1; |
| } |
| |
| dd("u->bufs_in: %p", u->bufs_in); |
| |
| if (u->bufs_in) { |
| rc = ngx_http_lua_socket_push_input_data(r, ctx, u, L); |
| if (rc == NGX_ERROR) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "out of memory"); |
| return 2; |
| } |
| |
| (void) ngx_http_lua_socket_error_retval_handler(r, u, L); |
| |
| lua_pushvalue(L, -3); |
| lua_remove(L, -4); |
| return 3; |
| } |
| |
| n = ngx_http_lua_socket_error_retval_handler(r, u, L); |
| lua_pushliteral(L, ""); |
| return n + 1; |
| } |
| |
| rc = ngx_http_lua_socket_push_input_data(r, ctx, u, L); |
| if (rc == NGX_ERROR) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "out of memory"); |
| return 2; |
| } |
| |
| return 1; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_close(lua_State *L) |
| { |
| ngx_http_request_t *r; |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| if (lua_gettop(L) != 1) { |
| return luaL_error(L, "expecting 1 argument " |
| "(including the object) but seen %d", lua_gettop(L)); |
| } |
| |
| r = ngx_http_lua_get_req(L); |
| if (r == NULL) { |
| return luaL_error(L, "no request found"); |
| } |
| |
| luaL_checktype(L, 1, LUA_TTABLE); |
| |
| lua_rawgeti(L, 1, SOCKET_CTX_INDEX); |
| u = lua_touserdata(L, -1); |
| lua_pop(L, 1); |
| |
| if (u == NULL || u->peer.connection == NULL || u->ft_type || u->eof) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "closed"); |
| return 2; |
| } |
| |
| if (u->waiting) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "socket busy"); |
| return 2; |
| } |
| |
| if (u->raw_downstream || u->body_downstream) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "attempt to close a request socket"); |
| return 2; |
| } |
| |
| ngx_http_lua_socket_tcp_finalize(r, u); |
| |
| lua_pushinteger(L, 1); |
| return 1; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_setoption(lua_State *L) |
| { |
| /* TODO */ |
| return 0; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_settimeout(lua_State *L) |
| { |
| int n; |
| ngx_int_t timeout; |
| |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| n = lua_gettop(L); |
| |
| if (n != 2) { |
| return luaL_error(L, "ngx.socket settimout: expecting at least 2 " |
| "arguments (including the object) but seen %d", |
| lua_gettop(L)); |
| } |
| |
| timeout = (ngx_int_t) lua_tonumber(L, 2); |
| |
| lua_rawseti(L, 1, SOCKET_TIMEOUT_INDEX); |
| |
| lua_rawgeti(L, 1, SOCKET_CTX_INDEX); |
| u = lua_touserdata(L, -1); |
| |
| if (u) { |
| if (timeout > 0) { |
| u->read_timeout = (ngx_msec_t) timeout; |
| u->send_timeout = (ngx_msec_t) timeout; |
| u->connect_timeout = (ngx_msec_t) timeout; |
| |
| } else { |
| u->read_timeout = u->conf->read_timeout; |
| u->send_timeout = u->conf->send_timeout; |
| u->connect_timeout = u->conf->connect_timeout; |
| } |
| } |
| |
| return 0; |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_tcp_handler(ngx_event_t *ev) |
| { |
| ngx_connection_t *c; |
| ngx_http_request_t *r; |
| ngx_http_log_ctx_t *ctx; |
| |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| c = ev->data; |
| u = c->data; |
| r = u->request; |
| c = r->connection; |
| |
| ctx = c->log->data; |
| ctx->current_request = r; |
| |
| ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket handler for \"%V?%V\", wev %d", &r->uri, |
| &r->args, (int) ev->write); |
| |
| if (ev->write) { |
| u->write_event_handler(r, u); |
| |
| } else { |
| u->read_event_handler(r, u); |
| } |
| |
| ngx_http_run_posted_requests(c); |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_tcp_get_peer(ngx_peer_connection_t *pc, void *data) |
| { |
| /* empty */ |
| return NGX_OK; |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_read_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_connection_t *c; |
| ngx_http_lua_loc_conf_t *llcf; |
| |
| c = u->peer.connection; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket read handler"); |
| |
| if (c->read->timedout) { |
| c->read->timedout = 0; |
| |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (llcf->log_socket_errors) { |
| ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, |
| "lua tcp socket read timed out"); |
| } |
| |
| ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); |
| return; |
| } |
| |
| #if 1 |
| if (c->read->timer_set) { |
| ngx_del_timer(c->read); |
| } |
| #endif |
| |
| if (u->buffer.start != NULL) { |
| (void) ngx_http_lua_socket_tcp_read(r, u); |
| } |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_send_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_connection_t *c; |
| ngx_http_lua_loc_conf_t *llcf; |
| |
| c = u->peer.connection; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket send handler"); |
| |
| if (c->write->timedout) { |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (llcf->log_socket_errors) { |
| ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, |
| "lua tcp socket write timed out"); |
| } |
| |
| ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); |
| return; |
| } |
| |
| if (u->request_bufs) { |
| (void) ngx_http_lua_socket_send(r, u); |
| } |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_send(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_int_t n; |
| ngx_connection_t *c; |
| ngx_http_lua_ctx_t *ctx; |
| ngx_buf_t *b; |
| |
| c = u->peer.connection; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket send data"); |
| |
| dd("lua connection log: %p", c->log); |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (ctx == NULL) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return NGX_ERROR; |
| } |
| |
| b = u->request_bufs->buf; |
| |
| for (;;) { |
| n = c->send(c, b->pos, b->last - b->pos); |
| |
| if (n >= 0) { |
| b->pos += n; |
| |
| if (b->pos == b->last) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, |
| "lua tcp socket sent all the data"); |
| |
| if (c->write->timer_set) { |
| ngx_del_timer(c->write); |
| } |
| |
| |
| #if defined(nginx_version) && nginx_version >= 1001004 |
| ngx_chain_update_chains(r->pool, |
| #else |
| ngx_chain_update_chains( |
| #endif |
| &ctx->free_bufs, &ctx->busy_bufs, |
| &u->request_bufs, |
| (ngx_buf_tag_t) &ngx_http_lua_module); |
| |
| u->write_event_handler = ngx_http_lua_socket_dummy_handler; |
| |
| if (ngx_handle_write_event(c->write, 0) != NGX_OK) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return NGX_ERROR; |
| } |
| |
| ngx_http_lua_socket_handle_success(r, u); |
| return NGX_OK; |
| } |
| |
| /* keep sending more data */ |
| continue; |
| } |
| |
| /* NGX_ERROR || NGX_AGAIN */ |
| break; |
| } |
| |
| if (n == NGX_ERROR) { |
| u->socket_errno = ngx_socket_errno; |
| ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return NGX_ERROR; |
| } |
| |
| /* n == NGX_AGAIN */ |
| |
| if (u->raw_downstream) { |
| ctx->writing_raw_req_socket = 1; |
| } |
| |
| u->write_event_handler = ngx_http_lua_socket_send_handler; |
| u->read_event_handler = ngx_http_lua_socket_dummy_handler; |
| |
| ngx_add_timer(c->write, u->send_timeout); |
| |
| if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { |
| ngx_http_lua_socket_handle_error(r, u, |
| NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return NGX_ERROR; |
| } |
| |
| return NGX_AGAIN; |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_handle_success(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_http_lua_ctx_t *ctx; |
| |
| #if 1 |
| u->read_event_handler = ngx_http_lua_socket_dummy_handler; |
| u->write_event_handler = ngx_http_lua_socket_dummy_handler; |
| #endif |
| |
| if (u->co_ctx) { |
| u->co_ctx->cleanup = NULL; |
| } |
| |
| #if 0 |
| if (u->eof) { |
| ngx_http_lua_socket_tcp_finalize(r, u); |
| } |
| #endif |
| |
| if (u->waiting) { |
| u->waiting = 0; |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (ctx == NULL) { |
| return; |
| } |
| |
| ctx->resume_handler = ngx_http_lua_socket_tcp_resume; |
| ctx->cur_co_ctx = u->co_ctx; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket waking up the current request"); |
| |
| r->write_event_handler(r); |
| } |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_handle_error(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type) |
| { |
| ngx_http_lua_ctx_t *ctx; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket handle error"); |
| |
| u->ft_type |= ft_type; |
| |
| #if 0 |
| ngx_http_lua_socket_tcp_finalize(r, u); |
| #endif |
| |
| if (u->co_ctx) { |
| u->co_ctx->cleanup = NULL; |
| } |
| |
| u->read_event_handler = ngx_http_lua_socket_dummy_handler; |
| u->write_event_handler = ngx_http_lua_socket_dummy_handler; |
| |
| if (u->waiting) { |
| u->waiting = 0; |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (ctx == NULL) { |
| return; |
| } |
| |
| ctx->resume_handler = ngx_http_lua_socket_tcp_resume; |
| ctx->cur_co_ctx = u->co_ctx; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket waking up the current request"); |
| |
| r->write_event_handler(r); |
| } |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_connected_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_int_t rc; |
| ngx_connection_t *c; |
| ngx_http_lua_loc_conf_t *llcf; |
| |
| c = u->peer.connection; |
| |
| if (c->write->timedout) { |
| |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (llcf->log_socket_errors) { |
| ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, |
| "lua tcp socket connect timed out"); |
| } |
| |
| ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_TIMEOUT); |
| return; |
| } |
| |
| if (c->write->timer_set) { |
| ngx_del_timer(c->write); |
| } |
| |
| rc = ngx_http_lua_socket_test_connect(r, c); |
| if (rc != NGX_OK) { |
| if (rc > 0) { |
| u->socket_errno = (ngx_err_t) rc; |
| } |
| |
| ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return; |
| } |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket connected"); |
| |
| /* We should delete the current write/read event |
| * here because the socket object may not be used immediately |
| * on the Lua land, thus causing hot spin around level triggered |
| * event poll and wasting CPU cycles. */ |
| |
| if (ngx_handle_write_event(c->write, 0) != NGX_OK) { |
| ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return; |
| } |
| |
| if (ngx_handle_read_event(c->read, 0) != NGX_OK) { |
| ngx_http_lua_socket_handle_error(r, u, NGX_HTTP_LUA_SOCKET_FT_ERROR); |
| return; |
| } |
| |
| ngx_http_lua_socket_handle_success(r, u); |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_tcp_cleanup(void *data) |
| { |
| ngx_http_lua_socket_tcp_upstream_t *u = data; |
| |
| ngx_http_request_t *r; |
| |
| r = u->request; |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "cleanup lua tcp socket request: \"%V\"", &r->uri); |
| |
| ngx_http_lua_socket_tcp_finalize(r, u); |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_tcp_finalize(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_http_lua_socket_pool_t *spool; |
| ngx_chain_t *cl; |
| ngx_chain_t **ll; |
| ngx_http_lua_ctx_t *ctx; |
| |
| dd("request: %p, u: %p, u->cleanup: %p", r, u, u->cleanup); |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua finalize socket"); |
| |
| if (u->cleanup) { |
| *u->cleanup = NULL; |
| u->cleanup = NULL; |
| } |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| |
| if (ctx && u->bufs_in) { |
| |
| ll = &u->bufs_in; |
| for (cl = u->bufs_in; cl; cl = cl->next) { |
| dd("bufs_in chain: %p, next %p", cl, cl->next); |
| cl->buf->pos = cl->buf->last; |
| ll = &cl->next; |
| } |
| |
| dd("ctx: %p", ctx); |
| dd("free recv bufs: %p", ctx->free_recv_bufs); |
| *ll = ctx->free_recv_bufs; |
| ctx->free_recv_bufs = u->bufs_in; |
| u->bufs_in = NULL; |
| u->buf_in = NULL; |
| ngx_memzero(&u->buffer, sizeof(ngx_buf_t)); |
| } |
| |
| if (u->raw_downstream || u->body_downstream) { |
| if (ctx && ctx->writing_raw_req_socket) { |
| ctx->writing_raw_req_socket = 0; |
| if (r->connection->write->timer_set) { |
| ngx_del_timer(r->connection->write); |
| } |
| |
| r->connection->write->error = 1; |
| } |
| |
| if (r->connection->read->timer_set) { |
| ngx_del_timer(r->connection->read); |
| } |
| |
| u->peer.connection = NULL; |
| return; |
| } |
| |
| if (u->resolved && u->resolved->ctx) { |
| ngx_resolve_name_done(u->resolved->ctx); |
| u->resolved->ctx = NULL; |
| } |
| |
| if (u->peer.free) { |
| u->peer.free(&u->peer, u->peer.data, 0); |
| } |
| |
| if (u->peer.connection) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua close socket connection"); |
| |
| ngx_close_connection(u->peer.connection); |
| u->peer.connection = NULL; |
| |
| if (!u->reused) { |
| return; |
| } |
| |
| spool = u->socket_pool; |
| if (spool == NULL) { |
| return; |
| } |
| |
| spool->active_connections--; |
| |
| if (spool->active_connections == 0) { |
| ngx_http_lua_socket_free_pool(r->connection->log, spool); |
| } |
| } |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_test_connect(ngx_http_request_t *r, ngx_connection_t *c) |
| { |
| int err; |
| socklen_t len; |
| |
| ngx_http_lua_loc_conf_t *llcf; |
| |
| #if (NGX_HAVE_KQUEUE) |
| |
| ngx_event_t *ev; |
| |
| if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { |
| dd("pending eof: (%p)%d (%p)%d", c->write, c->write->pending_eof, |
| c->read, c->read->pending_eof); |
| |
| if (c->write->pending_eof) { |
| ev = c->write; |
| |
| } else if (c->read->pending_eof) { |
| ev = c->read; |
| |
| } else { |
| ev = NULL; |
| } |
| |
| if (ev) { |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| if (llcf->log_socket_errors) { |
| (void) ngx_connection_error(c, ev->kq_errno, |
| "kevent() reported that " |
| "connect() failed"); |
| } |
| return ev->kq_errno; |
| } |
| |
| } else |
| #endif |
| { |
| err = 0; |
| len = sizeof(int); |
| |
| /* |
| * BSDs and Linux return 0 and set a pending error in err |
| * Solaris returns -1 and sets errno |
| */ |
| |
| if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len) |
| == -1) |
| { |
| err = ngx_errno; |
| } |
| |
| if (err) { |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| if (llcf->log_socket_errors) { |
| (void) ngx_connection_error(c, err, "connect() failed"); |
| } |
| return err; |
| } |
| } |
| |
| return NGX_OK; |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_dummy_handler(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket dummy handler"); |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_receiveuntil(lua_State *L) |
| { |
| ngx_http_request_t *r; |
| int n; |
| ngx_str_t pat; |
| ngx_int_t rc; |
| size_t size; |
| unsigned inclusive = 0; |
| |
| ngx_http_lua_socket_compiled_pattern_t *cp; |
| |
| n = lua_gettop(L); |
| if (n != 2 && n != 3) { |
| return luaL_error(L, "expecting 2 or 3 arguments " |
| "(including the object), but got %d", n); |
| } |
| |
| if (n == 3) { |
| /* check out the options table */ |
| |
| luaL_checktype(L, 3, LUA_TTABLE); |
| |
| lua_getfield(L, 3, "inclusive"); |
| |
| switch (lua_type(L, -1)) { |
| case LUA_TNIL: |
| /* do nothing */ |
| break; |
| |
| case LUA_TBOOLEAN: |
| if (lua_toboolean(L, -1)) { |
| inclusive = 1; |
| } |
| break; |
| |
| default: |
| return luaL_error(L, "bad \"inclusive\" option value type: %s", |
| luaL_typename(L, -1)); |
| |
| } |
| |
| lua_pop(L, 2); |
| } |
| |
| r = ngx_http_lua_get_req(L); |
| if (r == NULL) { |
| return luaL_error(L, "no request found"); |
| } |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket calling receiveuntil() method"); |
| |
| luaL_checktype(L, 1, LUA_TTABLE); |
| |
| pat.data = (u_char *) luaL_checklstring(L, 2, &pat.len); |
| if (pat.len == 0) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "pattern is empty"); |
| return 2; |
| } |
| |
| size = sizeof(ngx_http_lua_socket_compiled_pattern_t); |
| |
| cp = lua_newuserdata(L, size); |
| if (cp == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */ |
| lua_pushcfunction(L, ngx_http_lua_socket_cleanup_compiled_pattern); |
| lua_setfield(L, -2, "__gc"); |
| lua_setmetatable(L, -2); |
| |
| ngx_memzero(cp, size); |
| |
| cp->inclusive = inclusive; |
| |
| rc = ngx_http_lua_socket_compile_pattern(pat.data, pat.len, cp, |
| r->connection->log); |
| |
| if (rc != NGX_OK) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "failed to compile pattern"); |
| return 2; |
| } |
| |
| lua_pushcclosure(L, ngx_http_lua_socket_receiveuntil_iterator, 3); |
| return 1; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_receiveuntil_iterator(lua_State *L) |
| { |
| ngx_http_request_t *r; |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| ngx_int_t rc; |
| ngx_http_lua_ctx_t *ctx; |
| lua_Integer bytes; |
| int n; |
| ngx_http_lua_co_ctx_t *coctx; |
| |
| ngx_http_lua_socket_compiled_pattern_t *cp; |
| |
| n = lua_gettop(L); |
| if (n > 1) { |
| return luaL_error(L, "expecting 0 or 1 arguments, " |
| "but seen %d", n); |
| } |
| |
| if (n >= 1) { |
| bytes = luaL_checkinteger(L, 1); |
| if (bytes < 0) { |
| bytes = 0; |
| } |
| |
| } else { |
| bytes = 0; |
| } |
| |
| lua_rawgeti(L, lua_upvalueindex(1), SOCKET_CTX_INDEX); |
| u = lua_touserdata(L, -1); |
| lua_pop(L, 1); |
| |
| if (u == NULL || u->peer.connection == NULL || u->ft_type || u->eof) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "closed"); |
| return 2; |
| } |
| |
| if (u->waiting) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "socket busy"); |
| return 2; |
| } |
| |
| r = u->request; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket receiveuntil iterator"); |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket read timeout: %M", u->read_timeout); |
| |
| u->input_filter = ngx_http_lua_socket_read_until; |
| |
| cp = lua_touserdata(L, lua_upvalueindex(3)); |
| |
| dd("checking existing state: %d", cp->state); |
| |
| if (cp->state == -1) { |
| cp->state = 0; |
| |
| lua_pushnil(L); |
| lua_pushnil(L); |
| lua_pushnil(L); |
| return 3; |
| } |
| |
| cp->upstream = u; |
| |
| cp->pattern.data = |
| (u_char *) lua_tolstring(L, lua_upvalueindex(2), |
| &cp->pattern.len); |
| |
| u->input_filter_ctx = cp; |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| |
| if (u->bufs_in == NULL) { |
| u->bufs_in = |
| ngx_http_lua_chains_get_free_buf(r->connection->log, r->pool, |
| &ctx->free_recv_bufs, |
| u->conf->buffer_size, |
| (ngx_buf_tag_t) |
| &ngx_http_lua_module); |
| |
| if (u->bufs_in == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| u->buf_in = u->bufs_in; |
| u->buffer = *u->buf_in->buf; |
| } |
| |
| u->length = (size_t) bytes; |
| u->rest = u->length; |
| |
| if (u->raw_downstream || u->body_downstream) { |
| r->read_event_handler = ngx_http_lua_req_socket_rev_handler; |
| } |
| |
| u->waiting = 0; |
| |
| rc = ngx_http_lua_socket_tcp_read(r, u); |
| |
| if (rc == NGX_ERROR) { |
| dd("read failed: %d", (int) u->ft_type); |
| rc = ngx_http_lua_socket_tcp_receive_retval_handler(r, u, L); |
| dd("tcp receive retval returned: %d", (int) rc); |
| return rc; |
| } |
| |
| if (rc == NGX_OK) { |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket receive done in a single run"); |
| |
| return ngx_http_lua_socket_tcp_receive_retval_handler(r, u, L); |
| } |
| |
| /* rc == NGX_AGAIN */ |
| |
| u->read_event_handler = ngx_http_lua_socket_read_handler; |
| u->write_event_handler = ngx_http_lua_socket_dummy_handler; |
| |
| ctx->cur_co_ctx->cleanup = ngx_http_lua_coctx_cleanup; |
| |
| if (ctx->entered_content_phase) { |
| r->write_event_handler = ngx_http_lua_content_wev_handler; |
| |
| } else { |
| r->write_event_handler = ngx_http_core_run_phases; |
| } |
| |
| u->co_ctx = ctx->cur_co_ctx; |
| u->waiting = 1; |
| u->prepare_retvals = ngx_http_lua_socket_tcp_receive_retval_handler; |
| |
| coctx = ctx->cur_co_ctx; |
| |
| dd("setting data to %p", u); |
| |
| coctx->data = u; |
| |
| if (u->raw_downstream || u->body_downstream) { |
| ctx->downstream_co_ctx = coctx; |
| } |
| |
| return lua_yield(L, 0); |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_compile_pattern(u_char *data, size_t len, |
| ngx_http_lua_socket_compiled_pattern_t *cp, ngx_log_t *log) |
| { |
| size_t i; |
| size_t prefix_len; |
| size_t size; |
| unsigned found; |
| int cur_state, new_state; |
| |
| ngx_http_lua_dfa_edge_t *edge; |
| ngx_http_lua_dfa_edge_t **last = NULL; |
| |
| cp->pattern.len = len; |
| |
| if (len <= 2) { |
| return NGX_OK; |
| } |
| |
| for (i = 1; i < len; i++) { |
| prefix_len = 1; |
| |
| while (prefix_len <= len - i - 1) { |
| |
| if (ngx_memcmp(data, &data[i], prefix_len) == 0) { |
| if (data[prefix_len] == data[i + prefix_len]) { |
| prefix_len++; |
| continue; |
| } |
| |
| cur_state = i + prefix_len; |
| new_state = prefix_len + 1; |
| |
| if (cp->recovering == NULL) { |
| size = sizeof(void *) * (len - 2); |
| cp->recovering = ngx_alloc(size, log); |
| if (cp->recovering == NULL) { |
| return NGX_ERROR; |
| } |
| |
| ngx_memzero(cp->recovering, size); |
| } |
| |
| edge = cp->recovering[cur_state - 2]; |
| |
| found = 0; |
| |
| if (edge == NULL) { |
| last = &cp->recovering[cur_state - 2]; |
| |
| } else { |
| |
| for (; edge; edge = edge->next) { |
| last = &edge->next; |
| |
| if (edge->chr == data[prefix_len]) { |
| found = 1; |
| |
| if (edge->new_state < new_state) { |
| edge->new_state = new_state; |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| if (!found) { |
| ngx_log_debug7(NGX_LOG_DEBUG_HTTP, log, 0, |
| "lua tcp socket read until recovering point:" |
| " on state %d (%*s), if next is '%c', then " |
| "recover to state %d (%*s)", cur_state, |
| (size_t) cur_state, data, data[prefix_len], |
| new_state, (size_t) new_state, data); |
| |
| edge = ngx_alloc(sizeof(ngx_http_lua_dfa_edge_t), log); |
| edge->chr = data[prefix_len]; |
| edge->new_state = new_state; |
| edge->next = NULL; |
| |
| *last = edge; |
| } |
| |
| break; |
| } |
| |
| break; |
| } |
| } |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_read_until(void *data, ssize_t bytes) |
| { |
| ngx_http_lua_socket_compiled_pattern_t *cp = data; |
| |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| ngx_http_request_t *r; |
| ngx_buf_t *b; |
| u_char c; |
| u_char *pat; |
| size_t pat_len; |
| int i; |
| int state; |
| int old_state = 0; /* just to make old |
| gcc happy */ |
| ngx_http_lua_dfa_edge_t *edge; |
| unsigned matched; |
| ngx_int_t rc; |
| |
| u = cp->upstream; |
| r = u->request; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket read until"); |
| |
| if (bytes == 0) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_CLOSED; |
| return NGX_ERROR; |
| } |
| |
| b = &u->buffer; |
| |
| pat = cp->pattern.data; |
| pat_len = cp->pattern.len; |
| state = cp->state; |
| |
| i = 0; |
| while (i < bytes) { |
| c = b->pos[i]; |
| |
| dd("%d: read char %d, state: %d", i, c, state); |
| |
| if (c == pat[state]) { |
| i++; |
| state++; |
| |
| if (state == (int) pat_len) { |
| /* already matched the whole pattern */ |
| dd("pat len: %d", (int) pat_len); |
| |
| b->pos += i; |
| |
| if (u->length) { |
| cp->state = -1; |
| |
| } else { |
| cp->state = 0; |
| } |
| |
| if (cp->inclusive) { |
| rc = ngx_http_lua_socket_add_pending_data(r, u, b->pos, 0, |
| pat, state, |
| state); |
| |
| if (rc != NGX_OK) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR; |
| return NGX_ERROR; |
| } |
| } |
| |
| return NGX_OK; |
| } |
| |
| continue; |
| } |
| |
| if (state == 0) { |
| u->buf_in->buf->last++; |
| |
| i++; |
| |
| if (u->length && --u->rest == 0) { |
| cp->state = state; |
| b->pos += i; |
| return NGX_OK; |
| } |
| |
| continue; |
| } |
| |
| matched = 0; |
| |
| if (cp->recovering && state >= 2) { |
| dd("accessing state: %d, index: %d", state, state - 2); |
| for (edge = cp->recovering[state - 2]; edge; edge = edge->next) { |
| |
| if (edge->chr == c) { |
| dd("matched '%c' and jumping to state %d", c, |
| edge->new_state); |
| |
| old_state = state; |
| state = edge->new_state; |
| matched = 1; |
| break; |
| } |
| } |
| } |
| |
| if (!matched) { |
| #if 1 |
| dd("adding pending data: %.*s", state, pat); |
| rc = ngx_http_lua_socket_add_pending_data(r, u, b->pos, i, pat, |
| state, state); |
| |
| if (rc != NGX_OK) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR; |
| return NGX_ERROR; |
| } |
| |
| #endif |
| |
| if (u->length) { |
| if (u->rest <= (size_t) state) { |
| u->rest = 0; |
| cp->state = 0; |
| b->pos += i; |
| return NGX_OK; |
| |
| } else { |
| u->rest -= state; |
| } |
| } |
| |
| state = 0; |
| continue; |
| } |
| |
| /* matched */ |
| |
| dd("adding pending data: %.*s", (int) (old_state + 1 - state), |
| (char *) pat); |
| |
| rc = ngx_http_lua_socket_add_pending_data(r, u, b->pos, i, pat, |
| old_state + 1 - state, |
| old_state); |
| |
| if (rc != NGX_OK) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR; |
| return NGX_ERROR; |
| } |
| |
| i++; |
| |
| if (u->length) { |
| if (u->rest <= (size_t) state) { |
| u->rest = 0; |
| cp->state = state; |
| b->pos += i; |
| return NGX_OK; |
| |
| } else { |
| u->rest -= state; |
| } |
| } |
| |
| continue; |
| } |
| |
| b->pos += i; |
| cp->state = state; |
| |
| return NGX_AGAIN; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_cleanup_compiled_pattern(lua_State *L) |
| { |
| ngx_http_lua_socket_compiled_pattern_t *cp; |
| |
| ngx_http_lua_dfa_edge_t *edge, *p; |
| unsigned i; |
| |
| dd("cleanup compiled pattern"); |
| |
| cp = lua_touserdata(L, 1); |
| if (cp == NULL || cp->recovering == NULL) { |
| return 0; |
| } |
| |
| dd("pattern len: %d", (int) cp->pattern.len); |
| |
| for (i = 0; i < cp->pattern.len - 2; i++) { |
| edge = cp->recovering[i]; |
| |
| while (edge) { |
| p = edge; |
| edge = edge->next; |
| |
| dd("freeing edge %p", p); |
| |
| ngx_free(p); |
| |
| dd("edge: %p", edge); |
| } |
| } |
| |
| #if 1 |
| ngx_free(cp->recovering); |
| cp->recovering = NULL; |
| #endif |
| |
| return 0; |
| } |
| |
| |
| static int |
| ngx_http_lua_req_socket(lua_State *L) |
| { |
| int n, raw; |
| ngx_peer_connection_t *pc; |
| ngx_http_lua_loc_conf_t *llcf; |
| ngx_connection_t *c; |
| ngx_http_request_t *r; |
| ngx_http_lua_ctx_t *ctx; |
| ngx_http_request_body_t *rb; |
| ngx_http_cleanup_t *cln; |
| ngx_http_lua_co_ctx_t *coctx; |
| #if nginx_version >= 1003013 |
| int tcp_nodelay; |
| ngx_http_core_loc_conf_t *clcf; |
| #endif |
| |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| n = lua_gettop(L); |
| if (n == 0) { |
| raw = 0; |
| |
| } else if (n == 1) { |
| raw = lua_toboolean(L, 1); |
| lua_pop(L, 1); |
| |
| } else { |
| return luaL_error(L, "expecting zero arguments, but got %d", |
| lua_gettop(L)); |
| } |
| |
| r = ngx_http_lua_get_req(L); |
| |
| if (r != r->main) { |
| return luaL_error(L, "attempt to read the request body in a " |
| "subrequest"); |
| } |
| |
| #if (NGX_HTTP_SPDY) |
| if (r->spdy_stream) { |
| return luaL_error(L, "spdy not supported yet"); |
| } |
| #endif |
| |
| #if nginx_version >= 1003009 |
| if (r->headers_in.chunked) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "chunked request bodies not supported yet"); |
| return 2; |
| } |
| #endif |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (ctx == NULL) { |
| return luaL_error(L, "no ctx found"); |
| } |
| |
| ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE |
| | NGX_HTTP_LUA_CONTEXT_ACCESS |
| | NGX_HTTP_LUA_CONTEXT_CONTENT); |
| |
| c = r->connection; |
| |
| if (raw) { |
| #if !defined(nginx_version) || nginx_version < 1003013 |
| lua_pushnil(L); |
| lua_pushliteral(L, "nginx version too old"); |
| return 2; |
| #else |
| if (r->request_body) { |
| if (r->request_body->rest > 0) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "pending request body reading in some " |
| "other thread"); |
| return 2; |
| } |
| |
| } else { |
| rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t)); |
| if (rb == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| r->request_body = rb; |
| } |
| |
| if (c->buffered) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "pending data to write"); |
| return 2; |
| } |
| |
| if (ctx->buffering) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "http 1.0 buffering"); |
| return 2; |
| } |
| |
| if (!r->header_sent) { |
| /* prevent other parts of nginx from sending out |
| * the response header */ |
| r->header_sent = 1; |
| } |
| |
| dd("ctx acquired raw req socket: %d", ctx->acquired_raw_req_socket); |
| |
| if (ctx->acquired_raw_req_socket) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "duplicate call"); |
| return 2; |
| } |
| |
| clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); |
| |
| if (clcf->tcp_nodelay) { |
| tcp_nodelay = 1; |
| |
| if (c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, |
| "lua raw req socket tcp_nodelay"); |
| |
| if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, |
| (const void *) &tcp_nodelay, sizeof(int)) |
| == -1) |
| { |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| if (llcf->log_socket_errors) { |
| ngx_connection_error(c, ngx_socket_errno, |
| "setsockopt(TCP_NODELAY) " |
| "failed"); |
| } |
| |
| lua_pushnil(L); |
| lua_pushliteral(L, "setsocketopt tcp_nodelay failed"); |
| return 2; |
| } |
| |
| c->tcp_nodelay = NGX_TCP_NODELAY_SET; |
| } |
| } |
| |
| ctx->acquired_raw_req_socket = 1; |
| r->keepalive = 0; |
| r->lingering_close = 1; |
| #endif |
| |
| } else { |
| /* request body reader */ |
| |
| if (r->request_body) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "request body already exists"); |
| return 2; |
| } |
| |
| if (r->discard_body) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "request body discarded"); |
| return 2; |
| } |
| |
| dd("req content length: %d", (int) r->headers_in.content_length_n); |
| |
| if (r->headers_in.content_length_n <= 0) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "no body"); |
| return 2; |
| } |
| |
| if (ngx_http_lua_test_expect(r) != NGX_OK) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "test expect failed"); |
| return 2; |
| } |
| |
| /* prevent other request body reader from running */ |
| |
| rb = ngx_pcalloc(r->pool, sizeof(ngx_http_request_body_t)); |
| if (rb == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| rb->rest = r->headers_in.content_length_n; |
| |
| r->request_body = rb; |
| } |
| |
| lua_createtable(L, 3 /* narr */, 1 /* nrec */); /* the object */ |
| |
| if (raw) { |
| lua_pushlightuserdata(L, &ngx_http_lua_raw_req_socket_metatable_key); |
| |
| } else { |
| lua_pushlightuserdata(L, &ngx_http_lua_req_socket_metatable_key); |
| } |
| |
| lua_rawget(L, LUA_REGISTRYINDEX); |
| lua_setmetatable(L, -2); |
| |
| u = lua_newuserdata(L, sizeof(ngx_http_lua_socket_tcp_upstream_t)); |
| if (u == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| #if 1 |
| lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */ |
| lua_pushcfunction(L, ngx_http_lua_socket_downstream_destroy); |
| lua_setfield(L, -2, "__gc"); |
| lua_setmetatable(L, -2); |
| #endif |
| |
| lua_rawseti(L, 1, SOCKET_CTX_INDEX); |
| |
| ngx_memzero(u, sizeof(ngx_http_lua_socket_tcp_upstream_t)); |
| |
| if (raw) { |
| u->raw_downstream = 1; |
| |
| } else { |
| u->body_downstream = 1; |
| } |
| |
| coctx = ctx->cur_co_ctx; |
| |
| u->request = r; |
| |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| u->conf = llcf; |
| |
| u->read_timeout = u->conf->read_timeout; |
| u->connect_timeout = u->conf->connect_timeout; |
| u->send_timeout = u->conf->send_timeout; |
| |
| cln = ngx_http_cleanup_add(r, 0); |
| if (cln == NULL) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR; |
| lua_pushnil(L); |
| lua_pushliteral(L, "out of memory"); |
| return 2; |
| } |
| |
| cln->handler = ngx_http_lua_socket_tcp_cleanup; |
| cln->data = u; |
| u->cleanup = &cln->handler; |
| |
| pc = &u->peer; |
| |
| pc->log = c->log; |
| pc->log_error = NGX_ERROR_ERR; |
| |
| pc->connection = c; |
| |
| dd("setting data to %p", u); |
| |
| coctx->data = u; |
| ctx->downstream_co_ctx = coctx; |
| |
| if (c->read->timer_set) { |
| ngx_del_timer(c->read); |
| } |
| |
| if (raw) { |
| if (c->write->timer_set) { |
| ngx_del_timer(c->write); |
| } |
| } |
| |
| lua_settop(L, 1); |
| return 1; |
| } |
| |
| |
| static void |
| ngx_http_lua_req_socket_rev_handler(ngx_http_request_t *r) |
| { |
| ngx_http_lua_ctx_t *ctx; |
| ngx_http_lua_co_ctx_t *coctx; |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua request socket read event handler"); |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (ctx == NULL) { |
| return; |
| } |
| |
| coctx = ctx->downstream_co_ctx; |
| u = coctx->data; |
| |
| if (u) { |
| u->read_event_handler(r, u); |
| } |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_getreusedtimes(lua_State *L) |
| { |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| if (lua_gettop(L) != 1) { |
| return luaL_error(L, "expecting 1 argument " |
| "(including the object), but got %d", lua_gettop(L)); |
| } |
| |
| luaL_checktype(L, 1, LUA_TTABLE); |
| |
| lua_rawgeti(L, 1, SOCKET_CTX_INDEX); |
| u = lua_touserdata(L, -1); |
| |
| if (u == NULL || u->peer.connection == NULL || u->ft_type || u->eof) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "closed"); |
| return 2; |
| } |
| |
| lua_pushinteger(L, u->reused); |
| return 1; |
| } |
| |
| |
| static int ngx_http_lua_socket_tcp_setkeepalive(lua_State *L) |
| { |
| ngx_http_lua_loc_conf_t *llcf; |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| ngx_connection_t *c; |
| ngx_http_lua_socket_pool_t *spool; |
| size_t size; |
| ngx_str_t key; |
| ngx_uint_t i; |
| ngx_queue_t *q; |
| ngx_peer_connection_t *pc; |
| u_char *p; |
| ngx_http_request_t *r; |
| ngx_msec_t timeout; |
| ngx_uint_t pool_size; |
| int n; |
| ngx_int_t rc; |
| ngx_buf_t *b; |
| |
| ngx_http_lua_socket_pool_item_t *items, *item; |
| |
| n = lua_gettop(L); |
| |
| if (n < 1 || n > 3) { |
| return luaL_error(L, "expecting 1 to 3 arguments " |
| "(including the object), but got %d", n); |
| } |
| |
| luaL_checktype(L, 1, LUA_TTABLE); |
| |
| lua_pushlightuserdata(L, &ngx_http_lua_socket_pool_key); |
| lua_rawget(L, LUA_REGISTRYINDEX); |
| |
| lua_rawgeti(L, 1, SOCKET_KEY_INDEX); |
| key.data = (u_char *) lua_tolstring(L, -1, &key.len); |
| if (key.data == NULL) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "key not found"); |
| return 2; |
| } |
| |
| lua_rawgeti(L, 1, SOCKET_CTX_INDEX); |
| u = lua_touserdata(L, -1); |
| lua_pop(L, 1); |
| |
| /* stack: obj cache key */ |
| |
| pc = &u->peer; |
| c = pc->connection; |
| |
| if (u == NULL || c == NULL || u->ft_type || u->eof) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "closed"); |
| return 2; |
| } |
| |
| if (u->waiting) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "socket busy"); |
| return 2; |
| } |
| |
| r = ngx_http_lua_get_req(L); |
| if (r == NULL) { |
| return luaL_error(L, "no request found"); |
| } |
| |
| b = &u->buffer; |
| |
| if (b->start && ngx_buf_size(b)) { |
| ngx_http_lua_probe_socket_tcp_setkeepalive_buf_unread(r, u, b->pos, |
| b->last - b->pos); |
| |
| lua_pushnil(L); |
| lua_pushliteral(L, "unread data in buffer"); |
| return 2; |
| } |
| |
| if (c->read->eof |
| || c->read->error |
| || c->read->timedout |
| || c->write->error |
| || c->write->timedout) |
| { |
| lua_pushnil(L); |
| lua_pushliteral(L, "invalid connection"); |
| return 2; |
| } |
| |
| if (ngx_handle_read_event(c->read, 0) != NGX_OK) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "failed to handle read event"); |
| return 2; |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, |
| "lua tcp socket set keepalive: saving connection %p", c); |
| |
| dd("saving connection to key %s", lua_tostring(L, -1)); |
| |
| lua_pushvalue(L, -1); |
| lua_rawget(L, -3); |
| spool = lua_touserdata(L, -1); |
| lua_pop(L, 1); |
| |
| /* stack: obj timeout? size? cache key */ |
| |
| llcf = ngx_http_get_module_loc_conf(r, ngx_http_lua_module); |
| |
| if (spool == NULL) { |
| /* create a new socket pool for the current peer key */ |
| |
| if (n == 3) { |
| pool_size = luaL_checkinteger(L, 3); |
| |
| } else { |
| pool_size = llcf->pool_size; |
| } |
| |
| if (pool_size == 0) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "zero pool size"); |
| return 2; |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket connection pool size: %ui", pool_size); |
| |
| size = sizeof(ngx_http_lua_socket_pool_t) + key.len |
| + sizeof(ngx_http_lua_socket_pool_item_t) |
| * pool_size; |
| |
| spool = lua_newuserdata(L, size); |
| if (spool == NULL) { |
| return luaL_error(L, "out of memory"); |
| } |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, |
| "lua tcp socket keepalive create connection pool for key" |
| " \"%s\"", lua_tostring(L, -2)); |
| |
| lua_rawset(L, -3); |
| |
| spool->active_connections = 0; |
| spool->lua_vm = ngx_http_lua_get_lua_vm(r, NULL); |
| |
| ngx_queue_init(&spool->cache); |
| ngx_queue_init(&spool->free); |
| |
| p = ngx_copy(spool->key, key.data, key.len); |
| *p++ = '\0'; |
| |
| items = (ngx_http_lua_socket_pool_item_t *) p; |
| |
| for (i = 0; i < pool_size; i++) { |
| ngx_queue_insert_head(&spool->free, &items[i].queue); |
| items[i].socket_pool = spool; |
| } |
| } |
| |
| if (ngx_queue_empty(&spool->free)) { |
| |
| q = ngx_queue_last(&spool->cache); |
| ngx_queue_remove(q); |
| spool->active_connections--; |
| |
| item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue); |
| |
| ngx_close_connection(item->connection); |
| |
| } else { |
| q = ngx_queue_head(&spool->free); |
| ngx_queue_remove(q); |
| |
| item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue); |
| } |
| |
| item->connection = c; |
| ngx_queue_insert_head(&spool->cache, q); |
| |
| if (!u->reused) { |
| spool->active_connections++; |
| } |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, |
| "lua tcp socket clear current socket connection"); |
| |
| pc->connection = NULL; |
| |
| #if 0 |
| if (u->cleanup) { |
| *u->cleanup = NULL; |
| u->cleanup = NULL; |
| } |
| #endif |
| |
| if (c->read->timer_set) { |
| ngx_del_timer(c->read); |
| } |
| |
| if (c->write->timer_set) { |
| ngx_del_timer(c->write); |
| } |
| |
| if (n >= 2) { |
| timeout = (ngx_msec_t) luaL_checkinteger(L, 2); |
| |
| } else { |
| timeout = llcf->keepalive_timeout; |
| } |
| |
| #if (NGX_DEBUG) |
| if (timeout == 0) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket keepalive timeout: unlimited"); |
| } |
| #endif |
| |
| if (timeout) { |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket keepalive timeout: %M ms", timeout); |
| |
| ngx_add_timer(c->read, timeout); |
| } |
| |
| c->write->handler = ngx_http_lua_socket_keepalive_dummy_handler; |
| c->read->handler = ngx_http_lua_socket_keepalive_rev_handler; |
| |
| c->data = item; |
| c->idle = 1; |
| c->log = ngx_cycle->log; |
| c->read->log = ngx_cycle->log; |
| c->write->log = ngx_cycle->log; |
| |
| item->socklen = pc->socklen; |
| ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); |
| item->reused = u->reused; |
| |
| if (c->read->ready) { |
| rc = ngx_http_lua_socket_keepalive_close_handler(c->read); |
| if (rc != NGX_OK) { |
| lua_pushnil(L); |
| lua_pushliteral(L, "connection in dubious state"); |
| return 2; |
| } |
| } |
| |
| #if 1 |
| ngx_http_lua_socket_tcp_finalize(r, u); |
| #endif |
| |
| lua_pushinteger(L, 1); |
| return 1; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_get_keepalive_peer(ngx_http_request_t *r, lua_State *L, |
| int key_index, ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_http_lua_socket_pool_item_t *item; |
| ngx_http_lua_socket_pool_t *spool; |
| ngx_http_cleanup_t *cln; |
| ngx_queue_t *q; |
| int top; |
| ngx_peer_connection_t *pc; |
| ngx_connection_t *c; |
| |
| top = lua_gettop(L); |
| |
| if (key_index < 0) { |
| key_index = top + key_index + 1; |
| } |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket pool get keepalive peer"); |
| |
| pc = &u->peer; |
| |
| lua_pushlightuserdata(L, &ngx_http_lua_socket_pool_key); |
| lua_rawget(L, LUA_REGISTRYINDEX); /* table */ |
| lua_pushvalue(L, key_index); /* key */ |
| lua_rawget(L, -2); |
| |
| spool = lua_touserdata(L, -1); |
| if (spool == NULL) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, |
| "lua tcp socket keepalive connection pool not found"); |
| lua_settop(L, top); |
| return NGX_DECLINED; |
| } |
| |
| u->socket_pool = spool; |
| |
| if (!ngx_queue_empty(&spool->cache)) { |
| q = ngx_queue_head(&spool->cache); |
| |
| item = ngx_queue_data(q, ngx_http_lua_socket_pool_item_t, queue); |
| c = item->connection; |
| |
| ngx_queue_remove(q); |
| ngx_queue_insert_head(&spool->free, q); |
| |
| ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, |
| "lua tcp socket get keepalive peer: using connection %p," |
| " fd:%d", c, c->fd); |
| |
| c->idle = 0; |
| c->log = pc->log; |
| c->read->log = pc->log; |
| c->write->log = pc->log; |
| c->data = u; |
| |
| #if 1 |
| c->write->handler = ngx_http_lua_socket_tcp_handler; |
| c->read->handler = ngx_http_lua_socket_tcp_handler; |
| #endif |
| |
| if (c->read->timer_set) { |
| ngx_del_timer(c->read); |
| } |
| |
| pc->connection = c; |
| pc->cached = 1; |
| |
| u->reused = item->reused + 1; |
| |
| #if 1 |
| u->write_event_handler = ngx_http_lua_socket_dummy_handler; |
| u->read_event_handler = ngx_http_lua_socket_dummy_handler; |
| #endif |
| |
| if (u->cleanup == NULL) { |
| cln = ngx_http_cleanup_add(r, 0); |
| if (cln == NULL) { |
| u->ft_type |= NGX_HTTP_LUA_SOCKET_FT_ERROR; |
| lua_settop(L, top); |
| return NGX_ERROR; |
| } |
| |
| cln->handler = ngx_http_lua_socket_tcp_cleanup; |
| cln->data = u; |
| u->cleanup = &cln->handler; |
| } |
| |
| lua_settop(L, top); |
| |
| return NGX_OK; |
| } |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, |
| "lua tcp socket keepalive: connection pool empty"); |
| |
| lua_settop(L, top); |
| |
| return NGX_DECLINED; |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_keepalive_dummy_handler(ngx_event_t *ev) |
| { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, |
| "keepalive dummy handler"); |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_keepalive_rev_handler(ngx_event_t *ev) |
| { |
| (void) ngx_http_lua_socket_keepalive_close_handler(ev); |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_keepalive_close_handler(ngx_event_t *ev) |
| { |
| ngx_http_lua_socket_pool_item_t *item; |
| ngx_http_lua_socket_pool_t *spool; |
| |
| int n; |
| char buf[1]; |
| ngx_connection_t *c; |
| |
| c = ev->data; |
| |
| if (c->close) { |
| goto close; |
| } |
| |
| if (c->read->timedout) { |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, |
| "lua tcp socket keepalive max idle timeout"); |
| |
| goto close; |
| } |
| |
| dd("read event ready: %d", (int) c->read->ready); |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, |
| "lua tcp socket keepalive close handler check stale events"); |
| |
| n = recv(c->fd, buf, 1, MSG_PEEK); |
| |
| if (n == -1 && ngx_socket_errno == NGX_EAGAIN) { |
| /* stale event */ |
| |
| if (ngx_handle_read_event(c->read, 0) != NGX_OK) { |
| goto close; |
| } |
| |
| return NGX_OK; |
| } |
| |
| close: |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0, |
| "lua tcp socket keepalive close handler: fd:%d", c->fd); |
| |
| item = c->data; |
| spool = item->socket_pool; |
| |
| ngx_close_connection(c); |
| |
| ngx_queue_remove(&item->queue); |
| ngx_queue_insert_head(&spool->free, &item->queue); |
| spool->active_connections--; |
| |
| dd("keepalive: active connections: %u", |
| (unsigned) spool->active_connections); |
| |
| if (spool->active_connections == 0) { |
| ngx_http_lua_socket_free_pool(ev->log, spool); |
| } |
| |
| return NGX_DECLINED; |
| } |
| |
| |
| static void |
| ngx_http_lua_socket_free_pool(ngx_log_t *log, ngx_http_lua_socket_pool_t *spool) |
| { |
| lua_State *L; |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, log, 0, |
| "lua tcp socket keepalive: free connection pool for \"%s\"", |
| spool->key); |
| |
| L = spool->lua_vm; |
| |
| lua_pushlightuserdata(L, &ngx_http_lua_socket_pool_key); |
| lua_rawget(L, LUA_REGISTRYINDEX); |
| lua_pushstring(L, (char *) spool->key); |
| lua_pushnil(L); |
| lua_rawset(L, -3); |
| lua_pop(L, 1); |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_tcp_upstream_destroy(lua_State *L) |
| { |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| dd("upstream destroy triggered by Lua GC"); |
| |
| u = lua_touserdata(L, 1); |
| if (u == NULL) { |
| return 0; |
| } |
| |
| if (u->cleanup) { |
| ngx_http_lua_socket_tcp_cleanup(u); /* it will clear u->cleanup */ |
| } |
| |
| return 0; |
| } |
| |
| |
| static int |
| ngx_http_lua_socket_downstream_destroy(lua_State *L) |
| { |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| dd("downstream destory"); |
| |
| u = lua_touserdata(L, 1); |
| if (u == NULL) { |
| dd("u is NULL"); |
| return 0; |
| } |
| |
| if (u->cleanup) { |
| ngx_http_lua_socket_tcp_cleanup(u); /* it will clear u->cleanup */ |
| } |
| |
| return 0; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_push_input_data(ngx_http_request_t *r, |
| ngx_http_lua_ctx_t *ctx, ngx_http_lua_socket_tcp_upstream_t *u, |
| lua_State *L) |
| { |
| ngx_chain_t *cl; |
| ngx_chain_t **ll; |
| size_t size; |
| ngx_buf_t *b; |
| size_t nbufs; |
| u_char *p; |
| u_char *last; |
| |
| if (!u->bufs_in) { |
| lua_pushliteral(L, ""); |
| ngx_http_lua_probe_socket_tcp_receive_done(r, u, (u_char *) "", 0); |
| return NGX_OK; |
| } |
| |
| dd("bufs_in: %p, buf_in: %p", u->bufs_in, u->buf_in); |
| |
| size = 0; |
| nbufs = 0; |
| ll = NULL; |
| |
| for (cl = u->bufs_in; cl; cl = cl->next) { |
| b = cl->buf; |
| size += b->last - b->pos; |
| |
| if (cl->next) { |
| ll = &cl->next; |
| } |
| |
| nbufs++; |
| } |
| |
| dd("size: %d, nbufs: %d", (int) size, (int) nbufs); |
| |
| if (size == 0) { |
| lua_pushliteral(L, ""); |
| |
| ngx_http_lua_probe_socket_tcp_receive_done(r, u, (u_char *) "", 0); |
| |
| goto done; |
| } |
| |
| if (nbufs == 1) { |
| b = u->buf_in->buf; |
| |
| lua_pushlstring(L, (char *) b->pos, size); |
| |
| dd("copying input data chunk from %p: \"%.*s\"", u->buf_in, (int) size, |
| b->pos); |
| |
| ngx_http_lua_probe_socket_tcp_receive_done(r, u, b->pos, size); |
| |
| goto done; |
| } |
| |
| /* nbufs > 1 */ |
| |
| dd("WARN: allocate a big memory: %d", (int) size); |
| |
| p = ngx_palloc(r->pool, size); |
| if (p == NULL) { |
| return NGX_ERROR; |
| } |
| |
| last = p; |
| for (cl = u->bufs_in; cl; cl = cl->next) { |
| b = cl->buf; |
| last = ngx_copy(last, b->pos, b->last - b->pos); |
| |
| dd("copying input data chunk from %p: \"%.*s\"", cl, |
| (int) (b->last - b->pos), b->pos); |
| } |
| |
| lua_pushlstring(L, (char *) p, size); |
| |
| ngx_http_lua_probe_socket_tcp_receive_done(r, u, p, size); |
| |
| ngx_pfree(r->pool, p); |
| |
| done: |
| if (nbufs > 1 && ll) { |
| dd("recycle buffers: %d", (int) (nbufs - 1)); |
| |
| *ll = ctx->free_recv_bufs; |
| ctx->free_recv_bufs = u->bufs_in; |
| u->bufs_in = u->buf_in; |
| } |
| |
| if (u->buffer.pos == u->buffer.last) { |
| dd("resetting u->buffer pos & last"); |
| u->buffer.pos = u->buffer.start; |
| u->buffer.last = u->buffer.start; |
| } |
| |
| u->buf_in->buf->last = u->buffer.pos; |
| u->buf_in->buf->pos = u->buffer.pos; |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_add_input_buffer(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u) |
| { |
| ngx_chain_t *cl; |
| ngx_http_lua_ctx_t *ctx; |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| |
| cl = ngx_http_lua_chains_get_free_buf(r->connection->log, r->pool, |
| &ctx->free_recv_bufs, |
| u->conf->buffer_size, |
| (ngx_buf_tag_t) |
| &ngx_http_lua_module); |
| |
| if (cl == NULL) { |
| return NGX_ERROR; |
| } |
| |
| u->buf_in->next = cl; |
| u->buf_in = cl; |
| u->buffer = *cl->buf; |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_add_pending_data(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, u_char *pos, size_t len, u_char *pat, |
| int prefix, int old_state) |
| { |
| u_char *last; |
| ngx_buf_t *b; |
| |
| dd("resuming data: %d: [%.*s]", prefix, prefix, pat); |
| |
| last = &pos[len]; |
| |
| b = u->buf_in->buf; |
| |
| if (last - b->last == old_state) { |
| b->last += prefix; |
| return NGX_OK; |
| } |
| |
| dd("need more buffers because %d != %d", (int) (last - b->last), |
| (int) old_state); |
| |
| if (ngx_http_lua_socket_insert_buffer(r, u, pat, prefix) != NGX_OK) { |
| return NGX_ERROR; |
| } |
| |
| b->pos = last; |
| b->last = last; |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t ngx_http_lua_socket_insert_buffer(ngx_http_request_t *r, |
| ngx_http_lua_socket_tcp_upstream_t *u, u_char *pat, size_t prefix) |
| { |
| ngx_chain_t *cl, *new_cl, **ll; |
| ngx_http_lua_ctx_t *ctx; |
| size_t size; |
| ngx_buf_t *b; |
| |
| if (prefix <= u->conf->buffer_size) { |
| size = u->conf->buffer_size; |
| |
| } else { |
| size = prefix; |
| } |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| |
| new_cl = ngx_http_lua_chains_get_free_buf(r->connection->log, r->pool, |
| &ctx->free_recv_bufs, |
| size, |
| (ngx_buf_tag_t) |
| &ngx_http_lua_module); |
| |
| if (new_cl == NULL) { |
| return NGX_ERROR; |
| } |
| |
| b = new_cl->buf; |
| |
| b->last = ngx_copy(b->last, pat, prefix); |
| |
| dd("copy resumed data to %p: %d: \"%.*s\"", |
| new_cl, (int) (b->last - b->pos), (int) (b->last - b->pos), b->pos); |
| |
| dd("before resuming data: bufs_in %p, buf_in %p, buf_in next %p", |
| u->bufs_in, u->buf_in, u->buf_in->next); |
| |
| ll = &u->bufs_in; |
| for (cl = u->bufs_in; cl->next; cl = cl->next) { |
| ll = &cl->next; |
| } |
| |
| *ll = new_cl; |
| new_cl->next = u->buf_in; |
| |
| dd("after resuming data: bufs_in %p, buf_in %p, buf_in next %p", |
| u->bufs_in, u->buf_in, u->buf_in->next); |
| |
| #if (DDEBUG) |
| for (cl = u->bufs_in; cl; cl = cl->next) { |
| b = cl->buf; |
| |
| dd("result buf after resuming data: %p: %.*s", cl, |
| (int) ngx_buf_size(b), b->pos); |
| } |
| #endif |
| |
| return NGX_OK; |
| } |
| |
| |
| static ngx_int_t |
| ngx_http_lua_socket_tcp_resume(ngx_http_request_t *r) |
| { |
| int nret; |
| lua_State *vm; |
| ngx_int_t rc; |
| ngx_connection_t *c; |
| ngx_http_lua_ctx_t *ctx; |
| ngx_http_lua_co_ctx_t *coctx; |
| |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| |
| ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); |
| if (ctx == NULL) { |
| return NGX_ERROR; |
| } |
| |
| ctx->resume_handler = ngx_http_lua_wev_handler; |
| |
| ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp operation done, resuming lua thread"); |
| |
| coctx = ctx->cur_co_ctx; |
| |
| #if 0 |
| ngx_http_lua_probe_info("tcp resume"); |
| #endif |
| |
| dd("coctx: %p", coctx); |
| |
| u = coctx->data; |
| |
| ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua tcp socket calling prepare retvals handler %p, " |
| "u:%p", u->prepare_retvals, u); |
| |
| nret = u->prepare_retvals(r, u, ctx->cur_co_ctx->co); |
| if (nret == NGX_AGAIN) { |
| return NGX_DONE; |
| } |
| |
| c = r->connection; |
| vm = ngx_http_lua_get_lua_vm(r, ctx); |
| |
| rc = ngx_http_lua_run_thread(vm, r, ctx, nret); |
| |
| ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, |
| "lua run thread returned %d", rc); |
| |
| if (rc == NGX_AGAIN) { |
| return ngx_http_lua_run_posted_threads(c, vm, r, ctx); |
| } |
| |
| if (rc == NGX_DONE) { |
| ngx_http_lua_finalize_request(r, NGX_DONE); |
| return ngx_http_lua_run_posted_threads(c, vm, r, ctx); |
| } |
| |
| if (ctx->entered_content_phase) { |
| ngx_http_lua_finalize_request(r, rc); |
| return NGX_DONE; |
| } |
| |
| return rc; |
| } |
| |
| |
| static void |
| ngx_http_lua_tcp_resolve_cleanup(void *data) |
| { |
| ngx_resolver_ctx_t *rctx; |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| ngx_http_lua_co_ctx_t *coctx = data; |
| |
| u = coctx->data; |
| if (u == NULL) { |
| return; |
| } |
| |
| rctx = u->resolved->ctx; |
| if (rctx == NULL) { |
| return; |
| } |
| |
| ngx_resolve_name_done(rctx); |
| } |
| |
| |
| static void |
| ngx_http_lua_coctx_cleanup(void *data) |
| { |
| ngx_http_lua_socket_tcp_upstream_t *u; |
| ngx_http_lua_co_ctx_t *coctx = data; |
| |
| dd("running coctx cleanup"); |
| |
| u = coctx->data; |
| if (u == NULL) { |
| return; |
| } |
| |
| if (u->request == NULL) { |
| return; |
| } |
| |
| ngx_http_lua_socket_tcp_finalize(u->request, u); |
| } |
| |
| /* vi:set ft=c ts=4 sw=4 et fdm=marker: */ |