Skip to content

Commit bb28222

Browse files
authored
better support threads in precompile and jl_task_wait_empty (#52445)
Add a `nrunning` counter which identifies (when zero) when there is nothing running anymore. Allowing us to gate all tasks on all threads on reaching a quiescent state, not just thread 0. This should let us better support running precompile with threads (since we will be ensured that all of them are asleep in a consistent state before serialization tries to inspect the process state). We could additionally stop them afterwards to make sure there is no way for them to begin running, even if we forgot about some other event source, but that seems unnecessary paranoia for now. Note it is quite hard to encounter currently, as most places where precompile happens currently try to force the number of threads to 1. But this should become more relevant in the future as more threads are supported in more places. This also may help generally with being able to ensure the IO loop is running on at least one thread (as that is currently lacking in this PR and on master). And also help with being able to decide on a more advanced tree-wakeup strategy, as we start to track how many threads are in various states of running and sleeping, relative to the amount of work they find. Fixes #52435
1 parent 7b54ae7 commit bb28222

File tree

8 files changed

+125
-69
lines changed

8 files changed

+125
-69
lines changed

src/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ SRCS := \
4444
jltypes gf typemap smallintset ast builtins module interpreter symbol \
4545
dlload sys init task array genericmemory staticdata toplevel jl_uv datatype \
4646
simplevector runtime_intrinsics precompile jloptions mtarraylist \
47-
threading partr stackwalk gc gc-debug gc-pages gc-stacks gc-alloc-profiler method \
47+
threading scheduler stackwalk gc gc-debug gc-pages gc-stacks gc-alloc-profiler method \
4848
jlapi signal-handling safepoint timing subtype rtutils gc-heap-snapshot \
4949
crc32c APInt-C processor ircode opaque_closure codegen-stubs coverage runtime_ccall
5050

src/jl_uv.c

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,26 @@ static void wait_empty_func(uv_timer_t *t)
7575
void jl_wait_empty_begin(void)
7676
{
7777
JL_UV_LOCK();
78-
if (wait_empty_worker.type != UV_TIMER && jl_io_loop) {
79-
// try to purge anything that is just waiting for cleanup
80-
jl_io_loop->stop_flag = 0;
81-
uv_run(jl_io_loop, UV_RUN_NOWAIT);
82-
uv_timer_init(jl_io_loop, &wait_empty_worker);
78+
if (jl_io_loop) {
79+
if (wait_empty_worker.type != UV_TIMER) {
80+
// try to purge anything that is just waiting for cleanup
81+
jl_io_loop->stop_flag = 0;
82+
uv_run(jl_io_loop, UV_RUN_NOWAIT);
83+
uv_timer_init(jl_io_loop, &wait_empty_worker);
84+
uv_unref((uv_handle_t*)&wait_empty_worker);
85+
}
86+
// make sure this is running
8387
uv_update_time(jl_io_loop);
8488
uv_timer_start(&wait_empty_worker, wait_empty_func, 10, 15000);
85-
uv_unref((uv_handle_t*)&wait_empty_worker);
8689
}
8790
JL_UV_UNLOCK();
8891
}
8992
void jl_wait_empty_end(void)
9093
{
9194
// n.b. caller must be holding jl_uv_mutex
92-
uv_close((uv_handle_t*)&wait_empty_worker, NULL);
95+
if (wait_empty_worker.type == UV_TIMER)
96+
// make sure this timer is stopped, but not destroyed in case the user calls jl_wait_empty_begin again
97+
uv_timer_stop(&wait_empty_worker);
9398
}
9499

95100

@@ -174,9 +179,12 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
174179
ct->world_age = last_age;
175180
return;
176181
}
177-
if (handle == (uv_handle_t*)&signal_async || handle == (uv_handle_t*)&wait_empty_worker)
182+
if (handle == (uv_handle_t*)&wait_empty_worker)
183+
handle->type = UV_UNKNOWN_HANDLE;
184+
else if (handle == (uv_handle_t*)&signal_async)
178185
return;
179-
free(handle);
186+
else
187+
free(handle);
180188
}
181189

182190
static void jl_uv_flush_close_callback(uv_write_t *req, int status)

src/julia_threads.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,11 +371,11 @@ void jl_gc_safe_leave(jl_ptls_t ptls, int8_t state) JL_NOTSAFEPOINT_LEAVE; // th
371371
#endif
372372

373373
JL_DLLEXPORT void jl_gc_enable_finalizers(struct _jl_task_t *ct, int on);
374-
JL_DLLEXPORT void jl_gc_disable_finalizers_internal(void);
374+
JL_DLLEXPORT void jl_gc_disable_finalizers_internal(void) JL_NOTSAFEPOINT;
375375
JL_DLLEXPORT void jl_gc_enable_finalizers_internal(void);
376376
JL_DLLEXPORT void jl_gc_run_pending_finalizers(struct _jl_task_t *ct);
377377
extern JL_DLLEXPORT _Atomic(int) jl_gc_have_pending_finalizers;
378-
JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void);
378+
JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void) JL_NOTSAFEPOINT;
379379

380380
JL_DLLEXPORT void jl_wakeup_thread(int16_t tid);
381381

src/options.h

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -143,26 +143,6 @@
143143
#define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE"
144144
#define DEFAULT_MACHINE_EXCLUSIVE 0
145145

146-
// partr -- parallel tasks runtime options ------------------------------------
147-
148-
// multiq
149-
// number of heaps = MULTIQ_HEAP_C * nthreads
150-
#define MULTIQ_HEAP_C 4
151-
// how many in each heap
152-
#define MULTIQ_TASKS_PER_HEAP 129
153-
154-
// parfor
155-
// tasks = niters / (GRAIN_K * nthreads)
156-
#define GRAIN_K 4
157-
158-
// synchronization
159-
// narrivers = ((GRAIN_K * nthreads) ^ ARRIVERS_P) + 1
160-
// limit for number of recursive parfors
161-
#define ARRIVERS_P 2
162-
// nreducers = narrivers * REDUCERS_FRAC
163-
#define REDUCERS_FRAC 1
164-
165-
166146
// sanitizer defaults ---------------------------------------------------------
167147

168148
// Automatically enable MEMDEBUG and KEEP_BODIES for the sanitizers

src/precompile.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,12 @@ JL_DLLEXPORT void jl_write_compiler_output(void)
9393
return;
9494
}
9595

96-
jl_task_wait_empty();
96+
jl_task_wait_empty(); // wait for most work to finish (except possibly finalizers)
97+
jl_gc_collect(JL_GC_FULL);
98+
jl_gc_collect(JL_GC_INCREMENTAL); // sweep finalizers
99+
jl_task_t *ct = jl_current_task;
100+
jl_gc_enable_finalizers(ct, 0); // now disable finalizers, as they could schedule more work or make other unexpected changes to reachability
101+
jl_task_wait_empty(); // then make sure we are the only thread alive that could be running user code past here
97102

98103
if (!jl_module_init_order) {
99104
jl_printf(JL_STDERR, "WARNING: --output requested, but no modules defined during run\n");
@@ -184,6 +189,7 @@ JL_DLLEXPORT void jl_write_compiler_output(void)
184189
}
185190
}
186191
JL_GC_POP();
192+
jl_gc_enable_finalizers(ct, 1);
187193
}
188194

189195
#ifdef __cplusplus

src/partr.c renamed to src/scheduler.c

Lines changed: 95 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ static const int16_t sleeping = 1;
2929
// this thread is dead.
3030
static const int16_t sleeping_like_the_dead JL_UNUSED = 2;
3131

32+
// a running count of how many threads are currently not_sleeping
33+
// plus a running count of the number of in-flight wake-ups
34+
// n.b. this may temporarily exceed jl_n_threads
35+
static _Atomic(int) nrunning = 1;
36+
3237
// invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending).
3338
// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
3439
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
@@ -64,7 +69,7 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int16_t tid) JL_NOTSAFEPOINT
6469
if (was == tid)
6570
return 1;
6671
if (was == -1)
67-
return jl_atomic_cmpswap(&task->tid, &was, tid);
72+
return jl_atomic_cmpswap(&task->tid, &was, tid) || was == tid;
6873
return 0;
6974
}
7075

@@ -180,6 +185,8 @@ void jl_threadfun(void *arg)
180185
jl_init_stack_limits(0, &stack_lo, &stack_hi);
181186
// warning: this changes `jl_current_task`, so be careful not to call that from this function
182187
jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi);
188+
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
189+
assert(wasrunning); (void)wasrunning;
183190
JL_GC_PROMISE_ROOTED(ct);
184191

185192
// wait for all threads
@@ -220,7 +227,7 @@ int jl_running_under_rr(int recheck)
220227

221228

222229
// sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1
223-
static int sleep_check_after_threshold(uint64_t *start_cycles)
230+
static int sleep_check_after_threshold(uint64_t *start_cycles) JL_NOTSAFEPOINT
224231
{
225232
JULIA_DEBUG_SLEEPWAKE( return 1 ); // hammer on the sleep/wake logic much harder
226233
/**
@@ -243,18 +250,31 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
243250
return 0;
244251
}
245252

253+
static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
254+
{
255+
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
256+
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) {
257+
return 1;
258+
}
259+
}
260+
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1); // consume in-flight wakeup
261+
assert(wasrunning > 1); (void)wasrunning;
262+
return 0;
263+
}
246264

247265
static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
248266
{
249-
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
250-
int8_t state = sleeping;
251-
252-
if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
253-
if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
254-
JL_PROBE_RT_SLEEP_CHECK_WAKE(other, state);
255-
uv_mutex_lock(&other->sleep_lock);
256-
uv_cond_signal(&other->wake_signal);
257-
uv_mutex_unlock(&other->sleep_lock);
267+
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
268+
269+
if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) {
270+
int8_t state = sleeping;
271+
if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) {
272+
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1); // increment in-flight wakeup count
273+
assert(wasrunning); (void)wasrunning;
274+
JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state);
275+
uv_mutex_lock(&ptls2->sleep_lock);
276+
uv_cond_signal(&ptls2->wake_signal);
277+
uv_mutex_unlock(&ptls2->sleep_lock);
258278
return 1;
259279
}
260280
}
@@ -280,10 +300,14 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT
280300
JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() );
281301
if (tid == self || tid == -1) {
282302
// we're already awake, but make sure we'll exit uv_run
303+
// and that nrunning is updated if this is now considered in-flight
283304
jl_ptls_t ptls = ct->ptls;
284-
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping) {
285-
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping);
286-
JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls);
305+
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
306+
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) {
307+
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
308+
assert(wasrunning); (void)wasrunning;
309+
JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls);
310+
}
287311
}
288312
if (uvlock == ct)
289313
uv_stop(jl_global_event_loop());
@@ -360,8 +384,10 @@ void jl_task_wait_empty(void)
360384
// we are back from jl_task_get_next now
361385
ct->world_age = lastage;
362386
wait_empty = NULL;
363-
// TODO: move this lock acquire-release pair to the caller, so that we ensure new work
364-
// (from uv_unref objects) didn't unexpectedly get scheduled and start running behind our back
387+
// TODO: move this lock acquire to before the wait_empty return and the
388+
// unlock to the caller, so that we ensure new work (from uv_unref
389+
// objects) didn't unexpectedly get scheduled and start running behind
390+
// our back during the function return
365391
JL_UV_LOCK();
366392
jl_wait_empty_end();
367393
JL_UV_UNLOCK();
@@ -378,6 +404,7 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
378404
return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
379405
}
380406

407+
381408
extern _Atomic(unsigned) _threadedregion;
382409

383410
JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
@@ -405,8 +432,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
405432
jl_fence(); // [^store_buffering_1]
406433
JL_PROBE_RT_SLEEP_CHECK_SLEEP(ptls);
407434
if (!check_empty(checkempty)) { // uses relaxed loads
408-
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
409-
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
435+
if (set_not_sleeping(ptls)) {
410436
JL_PROBE_RT_SLEEP_CHECK_TASKQ_WAKE(ptls);
411437
}
412438
continue;
@@ -415,23 +441,20 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
415441
if (ptls != ct->ptls) {
416442
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
417443
ptls = ct->ptls;
418-
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
419-
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
444+
if (set_not_sleeping(ptls)) {
420445
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
421446
}
422447
if (task)
423448
return task;
424449
continue;
425450
}
426451
if (task) {
427-
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
428-
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
452+
if (set_not_sleeping(ptls)) {
429453
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
430454
}
431455
return task;
432456
}
433457

434-
435458
// IO is always permitted, but outside a threaded region, only
436459
// thread 0 will process messages.
437460
// Inside a threaded region, any thread can listen for IO messages,
@@ -485,41 +508,64 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
485508
// right back to sleep on the individual wake signal to let
486509
// them take it from us without conflict.
487510
if (active || !may_sleep(ptls)) {
511+
if (set_not_sleeping(ptls)) {
512+
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
513+
}
488514
start_cycles = 0;
489515
continue;
490516
}
491517
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
492518
// thread 0 is the only thread permitted to run the event loop
493519
// so it needs to stay alive, just spin-looping if necessary
494-
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
495-
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
520+
if (set_not_sleeping(ptls)) {
496521
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
497522
}
498523
start_cycles = 0;
499524
continue;
500525
}
501526
}
502527

528+
// any thread which wants us running again will have to observe
529+
// sleep_check_state==sleeping and increment nrunning for us
530+
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
531+
assert(wasrunning);
532+
if (wasrunning == 1) {
533+
// This was the last running thread, and there is no thread with !may_sleep
534+
// so make sure tid 0 is notified to check wait_empty
535+
// TODO: this also might be a good time to check again that
536+
// libuv's queue is truly empty, instead of during delete_thread
537+
if (ptls->tid != 0) {
538+
uv_mutex_lock(&ptls->sleep_lock);
539+
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
540+
uv_mutex_unlock(&ptls->sleep_lock);
541+
}
542+
}
543+
503544
// the other threads will just wait for an individual wake signal to resume
504545
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
505546
int8_t gc_state = jl_gc_safe_enter(ptls);
506547
uv_mutex_lock(&ptls->sleep_lock);
507548
while (may_sleep(ptls)) {
508-
if (ptls->tid == 0 && wait_empty) {
509-
task = wait_empty;
510-
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
511-
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
512-
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
513-
}
549+
task = wait_empty;
550+
if (ptls->tid == 0 && task && jl_atomic_load_relaxed(&nrunning) == 0) {
551+
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
552+
assert(!wasrunning);
553+
wasrunning = !set_not_sleeping(ptls);
554+
assert(!wasrunning);
555+
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
556+
if (!ptls->finalizers_inhibited)
557+
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
514558
break;
515559
}
560+
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
516561
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
517562
}
518563
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
564+
assert(jl_atomic_load_relaxed(&nrunning));
565+
start_cycles = 0;
519566
uv_mutex_unlock(&ptls->sleep_lock);
520567
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
521568
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
522-
start_cycles = 0;
523569
if (task) {
524570
assert(task == wait_empty);
525571
wait_empty = NULL;
@@ -533,6 +579,23 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
533579
}
534580
}
535581

582+
void scheduler_delete_thread(jl_ptls_t ptls) JL_NOTSAFEPOINT
583+
{
584+
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, sleeping_like_the_dead) != sleeping) {
585+
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
586+
if (wasrunning == 1) {
587+
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[0];
588+
// This was the last running thread, and there is no thread with !may_sleep
589+
// so make sure tid 0 is notified to check wait_empty
590+
uv_mutex_lock(&ptls2->sleep_lock);
591+
uv_cond_signal(&ptls2->wake_signal);
592+
uv_mutex_unlock(&ptls2->sleep_lock);
593+
}
594+
}
595+
jl_fence();
596+
jl_wakeup_thread(0); // force thread 0 to see that we do not have the IO lock (and am dead)
597+
}
598+
536599
#ifdef __cplusplus
537600
}
538601
#endif

src/staticdata.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ static int jl_needs_serialization(jl_serializer_state *s, jl_value_t *v) JL_NOTS
643643
else if (jl_typetagis(v, jl_uint8_tag << 4)) {
644644
return 0;
645645
}
646-
else if (jl_typetagis(v, jl_task_tag << 4)) {
646+
else if (v == (jl_value_t*)s->ptls->root_task) {
647647
return 0;
648648
}
649649

0 commit comments

Comments
 (0)