diff --git a/README.md b/README.md index 3cdd5060..151d56fe 100644 --- a/README.md +++ b/README.md @@ -198,6 +198,8 @@ Acts as a `preread` phase handler and executes Lua code string specified in `lua (or packet in datagram mode). The Lua code may make [API calls](#nginx-api-for-lua) and is executed as a new spawned coroutine in an independent global environment (i.e. a sandbox). +Note: In preread, the raw request socket returned by `ngx.req.socket()` exposes `peek(N)` for both TCP and UDP servers. `peek(N)` returns exactly N bytes from the preread buffer without consuming them, yielding until enough data arrives or timing out according to preread settings. + It is possible to acquire the raw request socket using [ngx.req.socket](https://github.com/openresty/lua-nginx-module#ngxreqsocket) and receive data from or send data to the client. However, keep in mind that calling the `receive()` method of the request socket will consume the data from the buffer and such consumed data will not be seen by handlers diff --git a/src/ngx_stream_lua_socket_udp.c b/src/ngx_stream_lua_socket_udp.c index d2612f59..dfa87414 100644 --- a/src/ngx_stream_lua_socket_udp.c +++ b/src/ngx_stream_lua_socket_udp.c @@ -41,6 +41,9 @@ static int ngx_stream_lua_socket_udp_setpeername(lua_State *L); static int ngx_stream_lua_socket_udp_send(lua_State *L); static int ngx_stream_lua_socket_udp_receive(lua_State *L); static int ngx_stream_lua_socket_udp_settimeout(lua_State *L); +static int ngx_stream_lua_req_socket_udp_peek(lua_State *L); +static ngx_int_t ngx_stream_lua_req_socket_udp_peek_resume( + ngx_stream_lua_request_t *r); static void ngx_stream_lua_socket_udp_finalize(ngx_stream_lua_request_t *r, ngx_stream_lua_socket_udp_upstream_t *u); static int ngx_stream_lua_socket_udp_upstream_destroy(lua_State *L); @@ -140,7 +143,7 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L) /* udp downstream socket object metatable */ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask( socket_udp_raw_req_socket_metatable_key)); - lua_createtable(L, 0 /* narr */, 4 /* nrec */); + lua_createtable(L, 0 /* narr */, 5 /* nrec */); lua_pushcfunction(L, ngx_stream_lua_socket_udp_send); lua_setfield(L, -2, "send"); @@ -151,6 +154,9 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L) lua_pushcfunction(L, ngx_stream_lua_socket_udp_settimeout); lua_setfield(L, -2, "settimeout"); /* ngx socket mt */ + lua_pushcfunction(L, ngx_stream_lua_req_socket_udp_peek); + lua_setfield(L, -2, "peek"); + lua_pushvalue(L, -1); lua_setfield(L, -2, "__index"); lua_rawset(L, LUA_REGISTRYINDEX); @@ -1122,6 +1128,8 @@ ngx_stream_lua_socket_udp_receive(lua_State *L) r->connection->buffer->pos, u->received); r->connection->buffer->pos += u->received; + u->read_consumed = 1; + ngx_stream_lua_socket_udp_handle_success(r, u); rc = NGX_OK; @@ -1269,6 +1277,188 @@ ngx_stream_lua_socket_udp_finalize(ngx_stream_lua_request_t *r, } +static int +ngx_stream_lua_req_socket_udp_peek(lua_State *L) +{ + ngx_stream_lua_request_t *r; + ngx_connection_t *c; + ngx_stream_lua_ctx_t *ctx; + ngx_stream_lua_loc_conf_t *llcf; + ngx_stream_lua_co_ctx_t *coctx; + int n; + lua_Integer bytes; + size_t size; + + ngx_stream_lua_socket_udp_upstream_t *u; + + r = ngx_stream_lua_get_req(L); + if (r == NULL) { + return luaL_error(L, "no request found"); + } + + ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module); + ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_PREREAD); + + n = lua_gettop(L); + if (n != 2) { + return luaL_error(L, "expecting 2 arguments " + "(including the object), but got %d", n); + } + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "stream lua udp socket calling peek() method"); + + luaL_checktype(L, 1, LUA_TTABLE); + + lua_rawgeti(L, 1, SOCKET_CTX_INDEX); + u = lua_touserdata(L, -1); + lua_pop(L, 1); + + if (u == NULL) { + llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module); + + if (llcf->log_socket_errors) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "attempt to peek data on a closed socket: u:%p", u); + } + + lua_pushnil(L); + lua_pushliteral(L, "closed"); + return 2; + } + + if (u->request != r) { + return luaL_error(L, "bad request"); + } + + if (!u->raw_downstream) { + return luaL_error(L, "attempt to peek on a non-request socket"); + } + + if (u->read_consumed) { + return luaL_error(L, "attempt to peek on a consumed socket"); + } + + if (u->waiting) { + lua_pushnil(L); + lua_pushliteral(L, "socket busy"); + return 2; + } + + if (!lua_isnumber(L, 2)) { + return luaL_error(L, "argument must be a number"); + } + + bytes = lua_tointeger(L, 2); + if (bytes < 0) { + return luaL_argerror(L, 2, "bytes can not be negative"); + } + + if (bytes == 0) { + lua_pushliteral(L, ""); + return 1; + } + + u->length = (size_t) bytes; + + c = r->connection; + if (c->buffer != NULL) { + size = c->buffer->last - c->buffer->pos; + + if (size >= u->length) { + lua_pushlstring(L, (char *) c->buffer->pos, u->length); + return 1; + } + } + + /* not enough data in the preread buffer; yield */ + coctx = ctx->cur_co_ctx; + ngx_stream_lua_cleanup_pending_operation(coctx); + coctx->cleanup = ngx_stream_lua_udp_socket_cleanup; + coctx->data = u; + + ctx->downstream = u; + ctx->resume_handler = ngx_stream_lua_req_socket_udp_peek_resume; + ctx->peek_needs_more_data = 1; + u->co_ctx = coctx; + u->waiting = 1; + + return lua_yield(L, 0); +} + + +static ngx_int_t +ngx_stream_lua_req_socket_udp_peek_resume(ngx_stream_lua_request_t *r) +{ + lua_State *vm; + ngx_int_t rc; + ngx_uint_t nreqs; + ngx_connection_t *c; + ngx_stream_lua_ctx_t *ctx; + size_t size; + + ngx_stream_lua_socket_udp_upstream_t *u; + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "stream lua udp socket resuming peek"); + + ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module); + if (ctx == NULL) { + return NGX_ERROR; + } + + u = ctx->downstream; + c = r->connection; + vm = ngx_stream_lua_get_lua_vm(r, ctx); + nreqs = c->requests; + + if (c->buffer == NULL) { + size = 0; + + } else { + size = c->buffer->last - c->buffer->pos; + } + + if (size < u->length) { + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "lua udp peek needs more data, returning NGX_AGAIN"); + + return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs); + } + + ctx->resume_handler = ngx_stream_lua_wev_handler; + /* read handler might have been changed by core preread phase */ + r->connection->read->handler = ngx_stream_lua_request_handler; + + lua_pushlstring(u->co_ctx->co, (char *) c->buffer->pos, u->length); + + u->co_ctx->cleanup = NULL; + ctx->cur_co_ctx = u->co_ctx; + u->co_ctx = NULL; + ctx->peek_needs_more_data = 0; + u->waiting = 0; + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "lua udp peek operation done, resuming lua thread"); + + rc = ngx_stream_lua_run_thread(vm, r, ctx, 1); + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "lua run thread returned %d", rc); + + if (rc == NGX_AGAIN) { + return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs); + } + + if (rc == NGX_DONE) { + ngx_stream_lua_finalize_request(r, NGX_DONE); + return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs); + } + + return rc; +} + + static int ngx_stream_lua_socket_udp_upstream_destroy(lua_State *L) { diff --git a/src/ngx_stream_lua_socket_udp.h b/src/ngx_stream_lua_socket_udp.h index 8f5b9e95..89e7c4f7 100644 --- a/src/ngx_stream_lua_socket_udp.h +++ b/src/ngx_stream_lua_socket_udp.h @@ -62,12 +62,14 @@ struct ngx_stream_lua_socket_udp_upstream_s { ngx_err_t socket_errno; size_t received; /* for receive */ size_t recv_buf_size; + size_t length; /* for peek */ ngx_stream_lua_co_ctx_t *co_ctx; unsigned waiting:1; unsigned raw_downstream:1; + unsigned read_consumed:1; }; diff --git a/t/023-preread/req-socket-udp.t b/t/023-preread/req-socket-udp.t new file mode 100644 index 00000000..4b559cc9 --- /dev/null +++ b/t/023-preread/req-socket-udp.t @@ -0,0 +1,107 @@ +# vim:set ft= ts=4 sw=4 et fdm=marker: + +use Test::Nginx::Socket::Lua::Stream; + +repeat_each(2); + +plan tests => repeat_each() * (blocks() * 3); + +no_long_string(); +run_tests(); + +__DATA__ + +=== TEST 1: UDP reqsock:peek returns prefix without consuming +--- stream_server_config + preread_by_lua_block { + local sock, err = ngx.req.socket() + if not sock then + ngx.say("no sock: ", err) + return + end + + local p, perr = sock.peek and sock:peek(8) + ngx.say("peek:", p, ", err:", perr) + + local data, rerr = sock:receive() + ngx.say("recv:", data, ", err:", rerr) + } + content_by_lua return; +--- stream_request chop +hello world +--- stream_response +peek:hello wo, err:nil +recv:hello world, err:nil +--- no_error_log +[error] + + + +=== TEST 2: two consecutive peeks return same bytes +--- stream_server_config + preread_by_lua_block { + local sock = ngx.req.socket() + local p1 = sock:peek(8) + local p2 = sock:peek(8) + ngx.say(p1 .. "|" .. p2) + } + content_by_lua return; +--- stream_request chop +abcdefghijk +--- stream_response +abcdefgh|abcdefgh +--- no_error_log +[error] + + + +=== TEST 3: peek after receive() raises error +--- stream_server_config + preread_by_lua_block { + local sock = ngx.req.socket() + local d = sock:receive(3) + local ok, err = pcall(function() return sock:peek(1) end) + ngx.say("ok=", ok, ", err=", err) + } + content_by_lua return; +--- stream_request chop +xyz +--- stream_response_like +^ok=false, err=attempt to peek on a consumed socket +--- no_error_log +[error] + + + +=== TEST 4: timeout behavior (peek waits then times out) +--- SKIP: requires partial sends in harness +--- stream_server_config + preread_by_lua_block { + local sock = ngx.req.socket() + local ok, err = pcall(function() return sock:peek(8) end) + ngx.say("ok:", ok, ", err:", err) + } + content_by_lua return; +--- stream_request chop +hi +--- stream_response_like +^ok:false, err:.*timeout + + + +=== TEST 5: buffer full behavior (peek errors when exceeded) +--- SKIP: requires controlled preread_buffer_size in harness +--- stream_config + preread_buffer_size 8; +--- stream_server_config + preread_by_lua_block { + local sock = ngx.req.socket() + local ok, err = pcall(function() return sock:peek(16) end) + ngx.say("ok:", ok, ", err:", err) + } + content_by_lua return; +--- stream_request chop +abcdefghijklmnop +--- stream_response_like +^ok:false, err:.*buffer.* +