Skip to content

Commit d3e7b45

Browse files
vtjnashKristofferC
authored andcommitted
add try/catch around scheduler to reset sleep state (#54721)
Fixes #54700 Mostly just an indentation change, so recommend viewing with whitespace hidden (or if backporting). (cherry picked from commit b1e5a86)
1 parent 0211c83 commit d3e7b45

File tree

2 files changed

+133
-122
lines changed

2 files changed

+133
-122
lines changed

src/julia.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2339,8 +2339,8 @@ extern int had_exception;
23392339
__eh_ct = jl_current_task; \
23402340
size_t __excstack_state = jl_excstack_state(__eh_ct); \
23412341
jl_enter_handler(__eh_ct, &__eh); \
2342-
if (1)
2343-
/* TRY BLOCK; */
2342+
for (i__try=1; i__try; i__try=0)
2343+
23442344
#define JL_CATCH \
23452345
if (!had_exception) \
23462346
jl_eh_restore_state_noexcept(__eh_ct, &__eh); \

src/scheduler.c

Lines changed: 131 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -459,140 +459,151 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
459459
}
460460
continue;
461461
}
462-
task = get_next_task(trypoptask, q); // note: this should not yield
463-
if (ptls != ct->ptls) {
464-
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
465-
ptls = ct->ptls;
466-
if (set_not_sleeping(ptls)) {
467-
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
462+
volatile int isrunning = 1;
463+
JL_TRY {
464+
task = get_next_task(trypoptask, q); // note: this should not yield
465+
if (ptls != ct->ptls) {
466+
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
467+
ptls = ct->ptls;
468+
if (set_not_sleeping(ptls)) {
469+
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
470+
}
471+
continue; // jump to JL_CATCH
468472
}
469-
if (task)
470-
return task;
471-
continue;
472-
}
473-
if (task) {
474-
if (set_not_sleeping(ptls)) {
475-
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
473+
if (task) {
474+
if (set_not_sleeping(ptls)) {
475+
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
476+
}
477+
continue; // jump to JL_CATCH
476478
}
477-
return task;
478-
}
479479

480-
// IO is always permitted, but outside a threaded region, only
481-
// thread 0 will process messages.
482-
// Inside a threaded region, any thread can listen for IO messages,
483-
// and one thread should win this race and watch the event loop,
484-
// but we bias away from idle threads getting parked here.
485-
//
486-
// The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]:
487-
// - After decrementing _threadedregion, the thread is required to
488-
// call jl_wakeup_thread(0), that will kick out any thread who is
489-
// already there, and then eventually thread 0 will get here.
490-
// - Inside a _threadedregion, there must exist at least one
491-
// thread that has a happens-before relationship on the libuv lock
492-
// before reaching this decision point in the code who will see
493-
// the lock as unlocked and thus must win this race here.
494-
int uvlock = 0;
495-
if (jl_atomic_load_relaxed(&_threadedregion)) {
496-
uvlock = jl_mutex_trylock(&jl_uv_mutex);
497-
}
498-
else if (ptls->tid == 0) {
499-
uvlock = 1;
500-
JL_UV_LOCK();
501-
}
502-
else {
503-
// Since we might have started some IO work, we might need
504-
// to ensure tid = 0 will go watch that new event source.
505-
// If trylock would have succeeded, that may have been our
506-
// responsibility, so need to make sure thread 0 will take care
507-
// of us.
508-
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
509-
jl_wakeup_thread(0);
510-
}
511-
if (uvlock) {
512-
int enter_eventloop = may_sleep(ptls);
513-
int active = 0;
514-
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
515-
// if we won the race against someone who actually needs
516-
// the lock to do real work, we need to let them have it instead
517-
enter_eventloop = 0;
518-
if (enter_eventloop) {
519-
uv_loop_t *loop = jl_global_event_loop();
520-
loop->stop_flag = 0;
521-
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
522-
active = uv_run(loop, UV_RUN_ONCE);
523-
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
524-
jl_gc_safepoint();
480+
// IO is always permitted, but outside a threaded region, only
481+
// thread 0 will process messages.
482+
// Inside a threaded region, any thread can listen for IO messages,
483+
// and one thread should win this race and watch the event loop,
484+
// but we bias away from idle threads getting parked here.
485+
//
486+
// The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]:
487+
// - After decrementing _threadedregion, the thread is required to
488+
// call jl_wakeup_thread(0), that will kick out any thread who is
489+
// already there, and then eventually thread 0 will get here.
490+
// - Inside a _threadedregion, there must exist at least one
491+
// thread that has a happens-before relationship on the libuv lock
492+
// before reaching this decision point in the code who will see
493+
// the lock as unlocked and thus must win this race here.
494+
int uvlock = 0;
495+
if (jl_atomic_load_relaxed(&_threadedregion)) {
496+
uvlock = jl_mutex_trylock(&jl_uv_mutex);
525497
}
526-
JL_UV_UNLOCK();
527-
// optimization: check again first if we may have work to do.
528-
// Otherwise we got a spurious wakeup since some other thread
529-
// that just wanted to steal libuv from us. We will just go
530-
// right back to sleep on the individual wake signal to let
531-
// them take it from us without conflict.
532-
if (active || !may_sleep(ptls)) {
533-
if (set_not_sleeping(ptls)) {
534-
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
498+
else if (ptls->tid == 0) {
499+
uvlock = 1;
500+
JL_UV_LOCK();
501+
}
502+
else {
503+
// Since we might have started some IO work, we might need
504+
// to ensure tid = 0 will go watch that new event source.
505+
// If trylock would have succeeded, that may have been our
506+
// responsibility, so need to make sure thread 0 will take care
507+
// of us.
508+
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) == NULL) // aka trylock
509+
jl_wakeup_thread(0);
510+
}
511+
if (uvlock) {
512+
int enter_eventloop = may_sleep(ptls);
513+
int active = 0;
514+
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
515+
// if we won the race against someone who actually needs
516+
// the lock to do real work, we need to let them have it instead
517+
enter_eventloop = 0;
518+
if (enter_eventloop) {
519+
uv_loop_t *loop = jl_global_event_loop();
520+
loop->stop_flag = 0;
521+
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
522+
active = uv_run(loop, UV_RUN_ONCE);
523+
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
524+
jl_gc_safepoint();
525+
}
526+
JL_UV_UNLOCK();
527+
// optimization: check again first if we may have work to do.
528+
// Otherwise we got a spurious wakeup since some other thread
529+
// that just wanted to steal libuv from us. We will just go
530+
// right back to sleep on the individual wake signal to let
531+
// them take it from us without conflict.
532+
if (active || !may_sleep(ptls)) {
533+
if (set_not_sleeping(ptls)) {
534+
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
535+
}
536+
start_cycles = 0;
537+
continue; // jump to JL_CATCH
538+
}
539+
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
540+
// thread 0 is the only thread permitted to run the event loop
541+
// so it needs to stay alive, just spin-looping if necessary
542+
if (set_not_sleeping(ptls)) {
543+
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
544+
}
545+
start_cycles = 0;
546+
continue; // jump to JL_CATCH
535547
}
536-
start_cycles = 0;
537-
continue;
538548
}
539-
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
540-
// thread 0 is the only thread permitted to run the event loop
541-
// so it needs to stay alive, just spin-looping if necessary
542-
if (set_not_sleeping(ptls)) {
543-
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
549+
550+
// any thread which wants us running again will have to observe
551+
// sleep_check_state==sleeping and increment nrunning for us
552+
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
553+
assert(wasrunning);
554+
isrunning = 0;
555+
if (wasrunning == 1) {
556+
// This was the last running thread, and there is no thread with !may_sleep
557+
// so make sure tid 0 is notified to check wait_empty
558+
// TODO: this also might be a good time to check again that
559+
// libuv's queue is truly empty, instead of during delete_thread
560+
if (ptls->tid != 0) {
561+
uv_mutex_lock(&ptls->sleep_lock);
562+
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
563+
uv_mutex_unlock(&ptls->sleep_lock);
544564
}
545-
start_cycles = 0;
546-
continue;
547565
}
548-
}
549566

550-
// any thread which wants us running again will have to observe
551-
// sleep_check_state==sleeping and increment nrunning for us
552-
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
553-
assert(wasrunning);
554-
if (wasrunning == 1) {
555-
// This was the last running thread, and there is no thread with !may_sleep
556-
// so make sure tid 0 is notified to check wait_empty
557-
// TODO: this also might be a good time to check again that
558-
// libuv's queue is truly empty, instead of during delete_thread
559-
if (ptls->tid != 0) {
560-
uv_mutex_lock(&ptls->sleep_lock);
567+
// the other threads will just wait for an individual wake signal to resume
568+
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
569+
int8_t gc_state = jl_gc_safe_enter(ptls);
570+
uv_mutex_lock(&ptls->sleep_lock);
571+
while (may_sleep(ptls)) {
572+
task = wait_empty;
573+
if (ptls->tid == 0 && task && jl_atomic_load_relaxed(&nrunning) == 0) {
574+
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
575+
assert(!wasrunning);
576+
wasrunning = !set_not_sleeping(ptls);
577+
assert(!wasrunning);
578+
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
579+
if (!ptls->finalizers_inhibited)
580+
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
581+
break;
582+
}
583+
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
561584
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
562-
uv_mutex_unlock(&ptls->sleep_lock);
563585
}
564-
}
565-
566-
// the other threads will just wait for an individual wake signal to resume
567-
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
568-
int8_t gc_state = jl_gc_safe_enter(ptls);
569-
uv_mutex_lock(&ptls->sleep_lock);
570-
while (may_sleep(ptls)) {
571-
task = wait_empty;
572-
if (ptls->tid == 0 && task && jl_atomic_load_relaxed(&nrunning) == 0) {
573-
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
574-
assert(!wasrunning);
575-
wasrunning = !set_not_sleeping(ptls);
576-
assert(!wasrunning);
577-
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
578-
if (!ptls->finalizers_inhibited)
579-
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
580-
break;
586+
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
587+
assert(jl_atomic_load_relaxed(&nrunning));
588+
start_cycles = 0;
589+
uv_mutex_unlock(&ptls->sleep_lock);
590+
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
591+
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
592+
if (task) {
593+
assert(task == wait_empty);
594+
wait_empty = NULL;
595+
continue;
581596
}
582-
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
583-
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
584597
}
585-
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
586-
assert(jl_atomic_load_relaxed(&nrunning));
587-
start_cycles = 0;
588-
uv_mutex_unlock(&ptls->sleep_lock);
589-
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
590-
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
591-
if (task) {
592-
assert(task == wait_empty);
593-
wait_empty = NULL;
594-
return task;
598+
JL_CATCH {
599+
// probably SIGINT, but possibly a user mistake in trypoptask
600+
if (!isrunning)
601+
jl_atomic_fetch_add_relaxed(&nrunning, 1);
602+
set_not_sleeping(ptls);
603+
jl_rethrow();
595604
}
605+
if (task)
606+
return task;
596607
}
597608
else {
598609
// maybe check the kernel for new messages too

0 commit comments

Comments
 (0)