Skip to content

Commit af5d68f

Browse files
committed
io_uring/sqpoll: manage task_work privately
Decouple from task_work running, and cap the number of entries we process at the time. If we exceed that number, push remaining entries to a retry list that we'll process first next time. We cap the number of entries to process at 8, which is fairly random. We just want to get enough per-ctx batching here, while not processing endlessly. Since we manually run PF_IO_WORKER related task_work anyway as the task never exits to userspace, with this we no longer need to add an actual task_work item to the per-process list. Signed-off-by: Jens Axboe <[email protected]>
1 parent 2708af1 commit af5d68f

File tree

3 files changed

+82
-17
lines changed

3 files changed

+82
-17
lines changed

io_uring/io_uring.c

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,7 +1173,14 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
11731173
percpu_ref_put(&ctx->refs);
11741174
}
11751175

1176-
static void handle_tw_list(struct llist_node *node, unsigned int *count)
1176+
/*
1177+
* Run queued task_work, returning the number of entries processed in *count.
1178+
* If more entries than max_entries are available, stop processing once this
1179+
* is reached and return the rest of the list.
1180+
*/
1181+
struct llist_node *io_handle_tw_list(struct llist_node *node,
1182+
unsigned int *count,
1183+
unsigned int max_entries)
11771184
{
11781185
struct io_ring_ctx *ctx = NULL;
11791186
struct io_tw_state ts = { };
@@ -1200,9 +1207,10 @@ static void handle_tw_list(struct llist_node *node, unsigned int *count)
12001207
ctx = NULL;
12011208
cond_resched();
12021209
}
1203-
} while (node);
1210+
} while (node && *count < max_entries);
12041211

12051212
ctx_flush_and_put(ctx, &ts);
1213+
return node;
12061214
}
12071215

12081216
/**
@@ -1247,27 +1255,41 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx, bool sync)
12471255
}
12481256
}
12491257

1250-
void tctx_task_work(struct callback_head *cb)
1258+
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
1259+
unsigned int max_entries,
1260+
unsigned int *count)
12511261
{
1252-
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
1253-
task_work);
12541262
struct llist_node *node;
1255-
unsigned int count = 0;
12561263

12571264
if (unlikely(current->flags & PF_EXITING)) {
12581265
io_fallback_tw(tctx, true);
1259-
return;
1266+
return NULL;
12601267
}
12611268

12621269
node = llist_del_all(&tctx->task_list);
1263-
if (node)
1264-
handle_tw_list(llist_reverse_order(node), &count);
1270+
if (node) {
1271+
node = llist_reverse_order(node);
1272+
node = io_handle_tw_list(node, count, max_entries);
1273+
}
12651274

12661275
/* relaxed read is enough as only the task itself sets ->in_cancel */
12671276
if (unlikely(atomic_read(&tctx->in_cancel)))
12681277
io_uring_drop_tctx_refs(current);
12691278

1270-
trace_io_uring_task_work_run(tctx, count);
1279+
trace_io_uring_task_work_run(tctx, *count);
1280+
return node;
1281+
}
1282+
1283+
void tctx_task_work(struct callback_head *cb)
1284+
{
1285+
struct io_uring_task *tctx;
1286+
struct llist_node *ret;
1287+
unsigned int count = 0;
1288+
1289+
tctx = container_of(cb, struct io_uring_task, task_work);
1290+
ret = tctx_task_work_run(tctx, UINT_MAX, &count);
1291+
/* can't happen */
1292+
WARN_ON_ONCE(ret);
12711293
}
12721294

12731295
static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
@@ -1350,6 +1372,10 @@ static void io_req_normal_work_add(struct io_kiocb *req)
13501372
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
13511373
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
13521374

1375+
/* SQPOLL doesn't need the task_work added, it'll run it itself */
1376+
if (ctx->flags & IORING_SETUP_SQPOLL)
1377+
return;
1378+
13531379
if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
13541380
return;
13551381

io_uring/io_uring.h

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use);
5757
void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
5858
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
5959
void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
60+
struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
61+
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
6062
void tctx_task_work(struct callback_head *cb);
6163
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
6264
int io_uring_alloc_task_context(struct task_struct *task,
@@ -275,6 +277,8 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
275277

276278
static inline int io_run_task_work(void)
277279
{
280+
bool ret = false;
281+
278282
/*
279283
* Always check-and-clear the task_work notification signal. With how
280284
* signaling works for task_work, we can find it set with nothing to
@@ -286,18 +290,26 @@ static inline int io_run_task_work(void)
286290
* PF_IO_WORKER never returns to userspace, so check here if we have
287291
* notify work that needs processing.
288292
*/
289-
if (current->flags & PF_IO_WORKER &&
290-
test_thread_flag(TIF_NOTIFY_RESUME)) {
291-
__set_current_state(TASK_RUNNING);
292-
resume_user_mode_work(NULL);
293+
if (current->flags & PF_IO_WORKER) {
294+
if (test_thread_flag(TIF_NOTIFY_RESUME)) {
295+
__set_current_state(TASK_RUNNING);
296+
resume_user_mode_work(NULL);
297+
}
298+
if (current->io_uring) {
299+
unsigned int count = 0;
300+
301+
tctx_task_work_run(current->io_uring, UINT_MAX, &count);
302+
if (count)
303+
ret = true;
304+
}
293305
}
294306
if (task_work_pending(current)) {
295307
__set_current_state(TASK_RUNNING);
296308
task_work_run();
297-
return 1;
309+
ret = true;
298310
}
299311

300-
return 0;
312+
return ret;
301313
}
302314

303315
static inline bool io_task_work_pending(struct io_ring_ctx *ctx)

io_uring/sqpoll.c

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "sqpoll.h"
1919

2020
#define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
21+
#define IORING_TW_CAP_ENTRIES_VALUE 8
2122

2223
enum {
2324
IO_SQ_THREAD_SHOULD_STOP = 0,
@@ -219,8 +220,31 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
219220
return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
220221
}
221222

223+
/*
224+
* Run task_work, processing the retry_list first. The retry_list holds
225+
* entries that we passed on in the previous run, if we had more task_work
226+
* than we were asked to process. Newly queued task_work isn't run until the
227+
* retry list has been fully processed.
228+
*/
229+
static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
230+
{
231+
struct io_uring_task *tctx = current->io_uring;
232+
unsigned int count = 0;
233+
234+
if (*retry_list) {
235+
*retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
236+
if (count >= max_entries)
237+
return count;
238+
max_entries -= count;
239+
}
240+
241+
*retry_list = tctx_task_work_run(tctx, max_entries, &count);
242+
return count;
243+
}
244+
222245
static int io_sq_thread(void *data)
223246
{
247+
struct llist_node *retry_list = NULL;
224248
struct io_sq_data *sqd = data;
225249
struct io_ring_ctx *ctx;
226250
unsigned long timeout = 0;
@@ -257,7 +281,7 @@ static int io_sq_thread(void *data)
257281
if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
258282
sqt_spin = true;
259283
}
260-
if (io_run_task_work())
284+
if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
261285
sqt_spin = true;
262286

263287
if (sqt_spin || !time_after(jiffies, timeout)) {
@@ -312,6 +336,9 @@ static int io_sq_thread(void *data)
312336
timeout = jiffies + sqd->sq_thread_idle;
313337
}
314338

339+
if (retry_list)
340+
io_sq_tw(&retry_list, UINT_MAX);
341+
315342
io_uring_cancel_generic(true, sqd);
316343
sqd->thread = NULL;
317344
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)

0 commit comments

Comments
 (0)