Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ Acts as a `preread` phase handler and executes Lua code string specified in `lua
(or packet in datagram mode).
The Lua code may make [API calls](#nginx-api-for-lua) and is executed as a new spawned coroutine in an independent global environment (i.e. a sandbox).

Note: In preread, the raw request socket returned by `ngx.req.socket()` exposes `peek(N)` for both TCP and UDP servers. `peek(N)` returns exactly N bytes from the preread buffer without consuming them, yielding until enough data arrives or timing out according to preread settings.

It is possible to acquire the raw request socket using [ngx.req.socket](https://github.com/openresty/lua-nginx-module#ngxreqsocket)
and receive data from or send data to the client. However, keep in mind that calling the `receive()` method
of the request socket will consume the data from the buffer and such consumed data will not be seen by handlers
Expand Down
192 changes: 191 additions & 1 deletion src/ngx_stream_lua_socket_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ static int ngx_stream_lua_socket_udp_setpeername(lua_State *L);
static int ngx_stream_lua_socket_udp_send(lua_State *L);
static int ngx_stream_lua_socket_udp_receive(lua_State *L);
static int ngx_stream_lua_socket_udp_settimeout(lua_State *L);
static int ngx_stream_lua_req_socket_udp_peek(lua_State *L);
static ngx_int_t ngx_stream_lua_req_socket_udp_peek_resume(
ngx_stream_lua_request_t *r);
static void ngx_stream_lua_socket_udp_finalize(ngx_stream_lua_request_t *r,
ngx_stream_lua_socket_udp_upstream_t *u);
static int ngx_stream_lua_socket_udp_upstream_destroy(lua_State *L);
Expand Down Expand Up @@ -140,7 +143,7 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L)
/* udp downstream socket object metatable */
lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
socket_udp_raw_req_socket_metatable_key));
lua_createtable(L, 0 /* narr */, 4 /* nrec */);
lua_createtable(L, 0 /* narr */, 5 /* nrec */);

lua_pushcfunction(L, ngx_stream_lua_socket_udp_send);
lua_setfield(L, -2, "send");
Expand All @@ -151,6 +154,9 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L)
lua_pushcfunction(L, ngx_stream_lua_socket_udp_settimeout);
lua_setfield(L, -2, "settimeout"); /* ngx socket mt */

lua_pushcfunction(L, ngx_stream_lua_req_socket_udp_peek);
lua_setfield(L, -2, "peek");

lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
lua_rawset(L, LUA_REGISTRYINDEX);
Expand Down Expand Up @@ -1122,6 +1128,8 @@ ngx_stream_lua_socket_udp_receive(lua_State *L)
r->connection->buffer->pos, u->received);
r->connection->buffer->pos += u->received;

u->read_consumed = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple udp packets in the same stream, will peak available in every packet?


ngx_stream_lua_socket_udp_handle_success(r, u);

rc = NGX_OK;
Expand Down Expand Up @@ -1269,6 +1277,188 @@ ngx_stream_lua_socket_udp_finalize(ngx_stream_lua_request_t *r,
}


static int
ngx_stream_lua_req_socket_udp_peek(lua_State *L)
{
ngx_stream_lua_request_t *r;
ngx_connection_t *c;
ngx_stream_lua_ctx_t *ctx;
ngx_stream_lua_loc_conf_t *llcf;
ngx_stream_lua_co_ctx_t *coctx;
int n;
lua_Integer bytes;
size_t size;

ngx_stream_lua_socket_udp_upstream_t *u;

r = ngx_stream_lua_get_req(L);
if (r == NULL) {
return luaL_error(L, "no request found");
}

ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_PREREAD);

n = lua_gettop(L);
if (n != 2) {
return luaL_error(L, "expecting 2 arguments "
"(including the object), but got %d", n);
}

ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
"stream lua udp socket calling peek() method");

luaL_checktype(L, 1, LUA_TTABLE);

lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
u = lua_touserdata(L, -1);
lua_pop(L, 1);

if (u == NULL) {
llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);

if (llcf->log_socket_errors) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"attempt to peek data on a closed socket: u:%p", u);
}

lua_pushnil(L);
lua_pushliteral(L, "closed");
return 2;
}

if (u->request != r) {
return luaL_error(L, "bad request");
}

if (!u->raw_downstream) {
return luaL_error(L, "attempt to peek on a non-request socket");
}

if (u->read_consumed) {
return luaL_error(L, "attempt to peek on a consumed socket");
}

if (u->waiting) {
lua_pushnil(L);
lua_pushliteral(L, "socket busy");
return 2;
}

if (!lua_isnumber(L, 2)) {
return luaL_error(L, "argument must be a number");
}

bytes = lua_tointeger(L, 2);
if (bytes < 0) {
return luaL_argerror(L, 2, "bytes can not be negative");
}

if (bytes == 0) {
lua_pushliteral(L, "");
return 1;
}

u->length = (size_t) bytes;

c = r->connection;
if (c->buffer != NULL) {
size = c->buffer->last - c->buffer->pos;

if (size >= u->length) {
lua_pushlstring(L, (char *) c->buffer->pos, u->length);
return 1;
}
}

/* not enough data in the preread buffer; yield */
coctx = ctx->cur_co_ctx;
ngx_stream_lua_cleanup_pending_operation(coctx);
coctx->cleanup = ngx_stream_lua_udp_socket_cleanup;
coctx->data = u;

ctx->downstream = u;
ctx->resume_handler = ngx_stream_lua_req_socket_udp_peek_resume;
ctx->peek_needs_more_data = 1;
u->co_ctx = coctx;
u->waiting = 1;

return lua_yield(L, 0);
}


static ngx_int_t
ngx_stream_lua_req_socket_udp_peek_resume(ngx_stream_lua_request_t *r)
{
lua_State *vm;
ngx_int_t rc;
ngx_uint_t nreqs;
ngx_connection_t *c;
ngx_stream_lua_ctx_t *ctx;
size_t size;

ngx_stream_lua_socket_udp_upstream_t *u;

ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
"stream lua udp socket resuming peek");

ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
if (ctx == NULL) {
return NGX_ERROR;
}

u = ctx->downstream;
c = r->connection;
vm = ngx_stream_lua_get_lua_vm(r, ctx);
nreqs = c->requests;

if (c->buffer == NULL) {
size = 0;

} else {
size = c->buffer->last - c->buffer->pos;
}

if (size < u->length) {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
"lua udp peek needs more data, returning NGX_AGAIN");

return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs);
}

ctx->resume_handler = ngx_stream_lua_wev_handler;
/* read handler might have been changed by core preread phase */
r->connection->read->handler = ngx_stream_lua_request_handler;

lua_pushlstring(u->co_ctx->co, (char *) c->buffer->pos, u->length);

u->co_ctx->cleanup = NULL;
ctx->cur_co_ctx = u->co_ctx;
u->co_ctx = NULL;
ctx->peek_needs_more_data = 0;
u->waiting = 0;

ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
"lua udp peek operation done, resuming lua thread");

rc = ngx_stream_lua_run_thread(vm, r, ctx, 1);

ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
"lua run thread returned %d", rc);

if (rc == NGX_AGAIN) {
return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs);
}

if (rc == NGX_DONE) {
ngx_stream_lua_finalize_request(r, NGX_DONE);
return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs);
}

return rc;
}


static int
ngx_stream_lua_socket_udp_upstream_destroy(lua_State *L)
{
Expand Down
2 changes: 2 additions & 0 deletions src/ngx_stream_lua_socket_udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ struct ngx_stream_lua_socket_udp_upstream_s {
ngx_err_t socket_errno;
size_t received; /* for receive */
size_t recv_buf_size;
size_t length; /* for peek */

ngx_stream_lua_co_ctx_t *co_ctx;

unsigned waiting:1;

unsigned raw_downstream:1;
unsigned read_consumed:1;
};


Expand Down
107 changes: 107 additions & 0 deletions t/023-preread/req-socket-udp.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# vim:set ft= ts=4 sw=4 et fdm=marker:

use Test::Nginx::Socket::Lua::Stream;

repeat_each(2);

plan tests => repeat_each() * (blocks() * 3);

no_long_string();
run_tests();

__DATA__

=== TEST 1: UDP reqsock:peek returns prefix without consuming
--- stream_server_config
preread_by_lua_block {
local sock, err = ngx.req.socket()
if not sock then
ngx.say("no sock: ", err)
return
end

local p, perr = sock.peek and sock:peek(8)
ngx.say("peek:", p, ", err:", perr)

local data, rerr = sock:receive()
ngx.say("recv:", data, ", err:", rerr)
}
content_by_lua return;
--- stream_request chop
hello world
--- stream_response
peek:hello wo, err:nil
recv:hello world, err:nil
--- no_error_log
[error]



=== TEST 2: two consecutive peeks return same bytes
--- stream_server_config
preread_by_lua_block {
local sock = ngx.req.socket()
local p1 = sock:peek(8)
local p2 = sock:peek(8)
ngx.say(p1 .. "|" .. p2)
}
content_by_lua return;
--- stream_request chop
abcdefghijk
--- stream_response
abcdefgh|abcdefgh
--- no_error_log
[error]



=== TEST 3: peek after receive() raises error
--- stream_server_config
preread_by_lua_block {
local sock = ngx.req.socket()
local d = sock:receive(3)
local ok, err = pcall(function() return sock:peek(1) end)
ngx.say("ok=", ok, ", err=", err)
}
content_by_lua return;
--- stream_request chop
xyz
--- stream_response_like
^ok=false, err=attempt to peek on a consumed socket
--- no_error_log
[error]



=== TEST 4: timeout behavior (peek waits then times out)
--- SKIP: requires partial sends in harness
--- stream_server_config
preread_by_lua_block {
local sock = ngx.req.socket()
local ok, err = pcall(function() return sock:peek(8) end)
ngx.say("ok:", ok, ", err:", err)
}
content_by_lua return;
--- stream_request chop
hi
--- stream_response_like
^ok:false, err:.*timeout



=== TEST 5: buffer full behavior (peek errors when exceeded)
--- SKIP: requires controlled preread_buffer_size in harness
--- stream_config
preread_buffer_size 8;
--- stream_server_config
preread_by_lua_block {
local sock = ngx.req.socket()
local ok, err = pcall(function() return sock:peek(16) end)
ngx.say("ok:", ok, ", err:", err)
}
content_by_lua return;
--- stream_request chop
abcdefghijklmnop
--- stream_response_like
^ok:false, err:.*buffer.*

Loading