From 7249d488efeeacd5a405de31a7d5017d5e06eab9 Mon Sep 17 00:00:00 2001 From: aaurizon <750048+aaurizon@users.noreply.github.com> Date: Wed, 27 Aug 2025 00:07:10 +0200 Subject: [PATCH 1/4] Add UDP preread socket peek() support and tests Introduces the peek(N) method for UDP request sockets in the preread phase, allowing inspection of N bytes from the preread buffer without consuming them. Updates the UDP socket implementation, metatable, and state tracking to support this feature. Adds documentation and comprehensive tests for UDP socket peek behavior, including error and edge cases. --- README.md | 2 + src/ngx_stream_lua_socket_udp.c | 184 +++++++++++++++++++++++++++++++- src/ngx_stream_lua_socket_udp.h | 2 + t/023-preread/req-socket-udp.t | 107 +++++++++++++++++++ 4 files changed, 294 insertions(+), 1 deletion(-) create mode 100644 t/023-preread/req-socket-udp.t diff --git a/README.md b/README.md index 3cdd5060..151d56fe 100644 --- a/README.md +++ b/README.md @@ -198,6 +198,8 @@ Acts as a `preread` phase handler and executes Lua code string specified in `lua (or packet in datagram mode). The Lua code may make [API calls](#nginx-api-for-lua) and is executed as a new spawned coroutine in an independent global environment (i.e. a sandbox). +Note: In preread, the raw request socket returned by `ngx.req.socket()` exposes `peek(N)` for both TCP and UDP servers. `peek(N)` returns exactly N bytes from the preread buffer without consuming them, yielding until enough data arrives or timing out according to preread settings. + It is possible to acquire the raw request socket using [ngx.req.socket](https://github.com/openresty/lua-nginx-module#ngxreqsocket) and receive data from or send data to the client. However, keep in mind that calling the `receive()` method of the request socket will consume the data from the buffer and such consumed data will not be seen by handlers diff --git a/src/ngx_stream_lua_socket_udp.c b/src/ngx_stream_lua_socket_udp.c index d2612f59..54386442 100644 --- a/src/ngx_stream_lua_socket_udp.c +++ b/src/ngx_stream_lua_socket_udp.c @@ -41,6 +41,9 @@ static int ngx_stream_lua_socket_udp_setpeername(lua_State *L); static int ngx_stream_lua_socket_udp_send(lua_State *L); static int ngx_stream_lua_socket_udp_receive(lua_State *L); static int ngx_stream_lua_socket_udp_settimeout(lua_State *L); +static int ngx_stream_lua_req_socket_udp_peek(lua_State *L); +static ngx_int_t ngx_stream_lua_req_socket_udp_peek_resume( + ngx_stream_lua_request_t *r); static void ngx_stream_lua_socket_udp_finalize(ngx_stream_lua_request_t *r, ngx_stream_lua_socket_udp_upstream_t *u); static int ngx_stream_lua_socket_udp_upstream_destroy(lua_State *L); @@ -140,7 +143,7 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L) /* udp downstream socket object metatable */ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask( socket_udp_raw_req_socket_metatable_key)); - lua_createtable(L, 0 /* narr */, 4 /* nrec */); + lua_createtable(L, 0 /* narr */, 5 /* nrec */); lua_pushcfunction(L, ngx_stream_lua_socket_udp_send); lua_setfield(L, -2, "send"); @@ -151,6 +154,9 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L) lua_pushcfunction(L, ngx_stream_lua_socket_udp_settimeout); lua_setfield(L, -2, "settimeout"); /* ngx socket mt */ + lua_pushcfunction(L, ngx_stream_lua_req_socket_udp_peek); + lua_setfield(L, -2, "peek"); + lua_pushvalue(L, -1); lua_setfield(L, -2, "__index"); lua_rawset(L, LUA_REGISTRYINDEX); @@ -1122,6 +1128,8 @@ ngx_stream_lua_socket_udp_receive(lua_State *L) r->connection->buffer->pos, u->received); r->connection->buffer->pos += u->received; + u->read_consumed = 1; + ngx_stream_lua_socket_udp_handle_success(r, u); rc = NGX_OK; @@ -1269,6 +1277,180 @@ ngx_stream_lua_socket_udp_finalize(ngx_stream_lua_request_t *r, } +static int +ngx_stream_lua_req_socket_udp_peek(lua_State *L) +{ + ngx_stream_lua_request_t *r; + ngx_connection_t *c; + ngx_stream_lua_ctx_t *ctx; + ngx_stream_lua_loc_conf_t *llcf; + ngx_stream_lua_co_ctx_t *coctx; + int n; + lua_Integer bytes; + size_t size; + + ngx_stream_lua_socket_udp_upstream_t *u; + + r = ngx_stream_lua_get_req(L); + if (r == NULL) { + return luaL_error(L, "no request found"); + } + + ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module); + ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_PREREAD); + + n = lua_gettop(L); + if (n != 2) { + return luaL_error(L, "expecting 2 arguments " + "(including the object), but got %d", n); + } + + luaL_checktype(L, 1, LUA_TTABLE); + + lua_rawgeti(L, 1, SOCKET_CTX_INDEX); + u = lua_touserdata(L, -1); + lua_pop(L, 1); + + if (u == NULL) { + llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module); + + if (llcf->log_socket_errors) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "attempt to peek data on a closed socket: u:%p", u); + } + + lua_pushnil(L); + lua_pushliteral(L, "closed"); + return 2; + } + + if (u->request != r) { + return luaL_error(L, "bad request"); + } + + if (!u->raw_downstream) { + return luaL_error(L, "attempt to peek on a non-request socket"); + } + + if (u->read_consumed) { + return luaL_error(L, "attempt to peek on a consumed socket"); + } + + if (u->waiting) { + lua_pushnil(L); + lua_pushliteral(L, "socket busy"); + return 2; + } + + if (!lua_isnumber(L, 2)) { + return luaL_error(L, "argument must be a number"); + } + + bytes = lua_tointeger(L, 2); + if (bytes < 0) { + return luaL_argerror(L, 2, "bytes can not be negative"); + } + + if (bytes == 0) { + lua_pushliteral(L, ""); + return 1; + } + + u->length = (size_t) bytes; + + c = r->connection; + if (c->buffer != NULL) { + size = c->buffer->last - c->buffer->pos; + + if (size >= u->length) { + lua_pushlstring(L, (char *) c->buffer->pos, u->length); + return 1; + } + } + + /* not enough data in the preread buffer; yield */ + coctx = ctx->cur_co_ctx; + ngx_stream_lua_cleanup_pending_operation(coctx); + coctx->cleanup = ngx_stream_lua_coctx_cleanup; + coctx->data = u; + + ctx->downstream = u; + ctx->resume_handler = ngx_stream_lua_req_socket_udp_peek_resume; + ctx->peek_needs_more_data = 1; + u->co_ctx = coctx; + u->waiting = 1; + + return lua_yield(L, 0); +} + + +static ngx_int_t +ngx_stream_lua_req_socket_udp_peek_resume(ngx_stream_lua_request_t *r) +{ + lua_State *vm; + ngx_int_t rc; + ngx_uint_t nreqs; + ngx_connection_t *c; + ngx_stream_lua_ctx_t *ctx; + size_t size; + + ngx_stream_lua_socket_udp_upstream_t *u; + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "stream lua udp req socket resuming peek"); + + ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module); + if (ctx == NULL) { + return NGX_ERROR; + } + + u = ctx->downstream; + c = r->connection; + vm = ngx_stream_lua_get_lua_vm(r, ctx); + nreqs = c->requests; + + size = c->buffer->last - c->buffer->pos; + + if (size < u->length) { + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "lua udp peek needs more data, returning NGX_AGAIN"); + + return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs); + } + + ctx->resume_handler = ngx_stream_lua_wev_handler; + /* read handler might have been changed by core preread phase */ + r->connection->read->handler = ngx_stream_lua_request_handler; + + lua_pushlstring(u->co_ctx->co, (char *) c->buffer->pos, u->length); + + u->co_ctx->cleanup = NULL; + ctx->cur_co_ctx = u->co_ctx; + u->co_ctx = NULL; + ctx->peek_needs_more_data = 0; + u->waiting = 0; + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "lua udp peek operation done, resuming lua thread"); + + rc = ngx_stream_lua_run_thread(vm, r, ctx, 1); + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "lua run thread returned %d", rc); + + if (rc == NGX_AGAIN) { + return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs); + } + + if (rc == NGX_DONE) { + ngx_stream_lua_finalize_request(r, NGX_DONE); + return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs); + } + + return rc; +} + + static int ngx_stream_lua_socket_udp_upstream_destroy(lua_State *L) { diff --git a/src/ngx_stream_lua_socket_udp.h b/src/ngx_stream_lua_socket_udp.h index 8f5b9e95..89e7c4f7 100644 --- a/src/ngx_stream_lua_socket_udp.h +++ b/src/ngx_stream_lua_socket_udp.h @@ -62,12 +62,14 @@ struct ngx_stream_lua_socket_udp_upstream_s { ngx_err_t socket_errno; size_t received; /* for receive */ size_t recv_buf_size; + size_t length; /* for peek */ ngx_stream_lua_co_ctx_t *co_ctx; unsigned waiting:1; unsigned raw_downstream:1; + unsigned read_consumed:1; }; diff --git a/t/023-preread/req-socket-udp.t b/t/023-preread/req-socket-udp.t new file mode 100644 index 00000000..4b559cc9 --- /dev/null +++ b/t/023-preread/req-socket-udp.t @@ -0,0 +1,107 @@ +# vim:set ft= ts=4 sw=4 et fdm=marker: + +use Test::Nginx::Socket::Lua::Stream; + +repeat_each(2); + +plan tests => repeat_each() * (blocks() * 3); + +no_long_string(); +run_tests(); + +__DATA__ + +=== TEST 1: UDP reqsock:peek returns prefix without consuming +--- stream_server_config + preread_by_lua_block { + local sock, err = ngx.req.socket() + if not sock then + ngx.say("no sock: ", err) + return + end + + local p, perr = sock.peek and sock:peek(8) + ngx.say("peek:", p, ", err:", perr) + + local data, rerr = sock:receive() + ngx.say("recv:", data, ", err:", rerr) + } + content_by_lua return; +--- stream_request chop +hello world +--- stream_response +peek:hello wo, err:nil +recv:hello world, err:nil +--- no_error_log +[error] + + + +=== TEST 2: two consecutive peeks return same bytes +--- stream_server_config + preread_by_lua_block { + local sock = ngx.req.socket() + local p1 = sock:peek(8) + local p2 = sock:peek(8) + ngx.say(p1 .. "|" .. p2) + } + content_by_lua return; +--- stream_request chop +abcdefghijk +--- stream_response +abcdefgh|abcdefgh +--- no_error_log +[error] + + + +=== TEST 3: peek after receive() raises error +--- stream_server_config + preread_by_lua_block { + local sock = ngx.req.socket() + local d = sock:receive(3) + local ok, err = pcall(function() return sock:peek(1) end) + ngx.say("ok=", ok, ", err=", err) + } + content_by_lua return; +--- stream_request chop +xyz +--- stream_response_like +^ok=false, err=attempt to peek on a consumed socket +--- no_error_log +[error] + + + +=== TEST 4: timeout behavior (peek waits then times out) +--- SKIP: requires partial sends in harness +--- stream_server_config + preread_by_lua_block { + local sock = ngx.req.socket() + local ok, err = pcall(function() return sock:peek(8) end) + ngx.say("ok:", ok, ", err:", err) + } + content_by_lua return; +--- stream_request chop +hi +--- stream_response_like +^ok:false, err:.*timeout + + + +=== TEST 5: buffer full behavior (peek errors when exceeded) +--- SKIP: requires controlled preread_buffer_size in harness +--- stream_config + preread_buffer_size 8; +--- stream_server_config + preread_by_lua_block { + local sock = ngx.req.socket() + local ok, err = pcall(function() return sock:peek(16) end) + ngx.say("ok:", ok, ", err:", err) + } + content_by_lua return; +--- stream_request chop +abcdefghijklmnop +--- stream_response_like +^ok:false, err:.*buffer.* + From f355a0f36ca3b3d4c587f0292f837778ffa01c42 Mon Sep 17 00:00:00 2001 From: aaurizon <750048+aaurizon@users.noreply.github.com> Date: Wed, 27 Aug 2025 00:16:02 +0200 Subject: [PATCH 2/4] Fix UDP socket cleanup handler in peek function Replaces the coroutine cleanup handler from ngx_stream_lua_coctx_cleanup to ngx_stream_lua_udp_socket_cleanup in ngx_stream_lua_req_socket_udp_peek. This ensures proper cleanup specific to UDP socket operations when yielding due to insufficient preread buffer data. --- src/ngx_stream_lua_socket_udp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ngx_stream_lua_socket_udp.c b/src/ngx_stream_lua_socket_udp.c index 54386442..b5c64c9c 100644 --- a/src/ngx_stream_lua_socket_udp.c +++ b/src/ngx_stream_lua_socket_udp.c @@ -1371,7 +1371,7 @@ ngx_stream_lua_req_socket_udp_peek(lua_State *L) /* not enough data in the preread buffer; yield */ coctx = ctx->cur_co_ctx; ngx_stream_lua_cleanup_pending_operation(coctx); - coctx->cleanup = ngx_stream_lua_coctx_cleanup; + coctx->cleanup = ngx_stream_lua_udp_socket_cleanup; coctx->data = u; ctx->downstream = u; From 38bdefc2a2e727b622ae4bd41ec169012540d160 Mon Sep 17 00:00:00 2001 From: aaurizon <750048+aaurizon@users.noreply.github.com> Date: Wed, 27 Aug 2025 00:39:23 +0200 Subject: [PATCH 3/4] Fix null pointer dereference in UDP socket peek Added a check for NULL buffer before calculating the size in ngx_stream_lua_req_socket_udp_peek_resume to prevent potential null pointer dereference. --- src/ngx_stream_lua_socket_udp.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ngx_stream_lua_socket_udp.c b/src/ngx_stream_lua_socket_udp.c index b5c64c9c..ef541c87 100644 --- a/src/ngx_stream_lua_socket_udp.c +++ b/src/ngx_stream_lua_socket_udp.c @@ -1409,7 +1409,12 @@ ngx_stream_lua_req_socket_udp_peek_resume(ngx_stream_lua_request_t *r) vm = ngx_stream_lua_get_lua_vm(r, ctx); nreqs = c->requests; - size = c->buffer->last - c->buffer->pos; + if (c->buffer == NULL) { + size = 0; + + } else { + size = c->buffer->last - c->buffer->pos; + } if (size < u->length) { ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, From f1f2b44ea402f5f95f39ad0c4c7f89fd02577bd7 Mon Sep 17 00:00:00 2001 From: aaurizon <750048+aaurizon@users.noreply.github.com> Date: Wed, 27 Aug 2025 00:42:25 +0200 Subject: [PATCH 4/4] Add debug logs for UDP socket peek in stream lua Introduces additional debug logging when the UDP socket peek() method is called and updates an existing log message for clarity. This helps with tracing and debugging UDP socket operations in the stream lua module. --- src/ngx_stream_lua_socket_udp.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ngx_stream_lua_socket_udp.c b/src/ngx_stream_lua_socket_udp.c index ef541c87..dfa87414 100644 --- a/src/ngx_stream_lua_socket_udp.c +++ b/src/ngx_stream_lua_socket_udp.c @@ -1305,6 +1305,9 @@ ngx_stream_lua_req_socket_udp_peek(lua_State *L) "(including the object), but got %d", n); } + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, + "stream lua udp socket calling peek() method"); + luaL_checktype(L, 1, LUA_TTABLE); lua_rawgeti(L, 1, SOCKET_CTX_INDEX); @@ -1397,7 +1400,7 @@ ngx_stream_lua_req_socket_udp_peek_resume(ngx_stream_lua_request_t *r) ngx_stream_lua_socket_udp_upstream_t *u; ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, - "stream lua udp req socket resuming peek"); + "stream lua udp socket resuming peek"); ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module); if (ctx == NULL) {