From 3b61fad895e590e3ee42520cba1bc664e788f2c5 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Thu, 28 Dec 2023 22:00:47 +0800 Subject: [PATCH 01/11] WIP: Add ScheduleCollection when requesting GC --- src/mmtk.rs | 2 +- src/plan/gc_requester.rs | 29 +++++++++++------------------ src/scheduler/controller.rs | 16 +--------------- src/scheduler/scheduler.rs | 29 ++--------------------------- 4 files changed, 15 insertions(+), 61 deletions(-) diff --git a/src/mmtk.rs b/src/mmtk.rs index fa90270637..3d91f23ae8 100644 --- a/src/mmtk.rs +++ b/src/mmtk.rs @@ -148,7 +148,7 @@ impl MMTK { let state = Arc::new(GlobalState::default()); - let gc_requester = Arc::new(GCRequester::new()); + let gc_requester = Arc::new(GCRequester::new(scheduler.clone())); let gc_trigger = Arc::new(GCTrigger::new( options.clone(), diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index 2e5f518cb2..0bae785202 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -1,7 +1,9 @@ +use crate::scheduler::gc_work::ScheduleCollection; +use crate::scheduler::{GCWorkScheduler, WorkBucketStage}; use crate::vm::VMBinding; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Condvar, Mutex}; +use std::sync::{Arc, Mutex}; struct RequestSync { request_count: isize, @@ -12,27 +14,20 @@ struct RequestSync { /// and the GC coordinator thread waits for GC requests using this object. pub struct GCRequester { request_sync: Mutex, - request_condvar: Condvar, request_flag: AtomicBool, + scheduler: Arc>, phantom: PhantomData, } -// Clippy says we need this... -impl Default for GCRequester { - fn default() -> Self { - Self::new() - } -} - impl GCRequester { - pub fn new() -> Self { + pub fn new(scheduler: Arc>) -> Self { GCRequester { request_sync: Mutex::new(RequestSync { request_count: 0, last_request_count: -1, }), - request_condvar: Condvar::new(), request_flag: AtomicBool::new(false), + scheduler, phantom: PhantomData, } } @@ -46,7 +41,8 @@ impl GCRequester { if !self.request_flag.load(Ordering::Relaxed) { self.request_flag.store(true, Ordering::Relaxed); guard.request_count += 1; - self.request_condvar.notify_all(); + + self.schedule_collection(); } } @@ -56,11 +52,8 @@ impl GCRequester { drop(guard); } - pub fn wait_for_request(&self) { - let mut guard = self.request_sync.lock().unwrap(); - guard.last_request_count += 1; - while guard.last_request_count == guard.request_count { - guard = self.request_condvar.wait(guard).unwrap(); - } + fn schedule_collection(&self) { + // Add a ScheduleCollection work packet. It is the seed of other work packets. + self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); } } diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index 3608f1ebfb..acc5426264 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -27,20 +27,6 @@ pub struct GCController { } impl GCController { - pub(crate) fn new( - mmtk: &'static MMTK, - requester: Arc>, - scheduler: Arc>, - coordinator_worker: GCWorker, - ) -> Box> { - Box::new(Self { - mmtk, - requester, - scheduler, - coordinator_worker, - }) - } - /// The main loop for the GC controller. pub fn run(&mut self, tls: VMWorkerThread) -> ! { probe!(mmtk, gccontroller_run); @@ -50,7 +36,7 @@ impl GCController { loop { debug!("[STWController: Waiting for request...]"); - self.requester.wait_for_request(); + //self.requester.wait_for_request(); debug!("[STWController: Request recieved.]"); self.do_gc_until_completion_traced(); diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 5c71296eca..46357e4a46 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -6,9 +6,8 @@ use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; use crate::util::options::AffinityKind; use crate::util::rust_util::array_from_fn; -use crate::vm::Collection; -use crate::vm::{GCThreadContext, VMBinding}; -use crossbeam::deque::{self, Steal}; +use crate::vm::VMBinding; +use crossbeam::deque::Steal; use enum_map::{Enum, EnumMap}; use std::collections::HashMap; use std::sync::Arc; @@ -18,8 +17,6 @@ pub struct GCWorkScheduler { pub work_buckets: EnumMap>, /// Workers pub(crate) worker_group: Arc>, - /// The shared part of the GC worker object of the controller thread - coordinator_worker_shared: Arc>, /// Condition Variable for worker synchronization pub(crate) worker_monitor: Arc, /// How to assign the affinity of each GC thread. Specified by the user. @@ -70,7 +67,6 @@ impl GCWorkScheduler { Arc::new(Self { work_buckets, worker_group, - coordinator_worker_shared, worker_monitor, affinity, }) @@ -82,23 +78,6 @@ impl GCWorkScheduler { /// Create GC threads, including the controller thread and all workers. pub fn spawn_gc_threads(self: &Arc, mmtk: &'static MMTK, tls: VMThread) { - // Spawn the controller thread. - let coordinator_worker = GCWorker::new( - mmtk, - usize::MAX, - self.clone(), - true, - self.coordinator_worker_shared.clone(), - deque::Worker::new_fifo(), - ); - let gc_controller = GCController::new( - mmtk, - mmtk.gc_requester.clone(), - self.clone(), - coordinator_worker, - ); - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Controller(gc_controller)); - self.worker_group.spawn(mmtk, tls) } @@ -379,8 +358,6 @@ impl GCWorkScheduler { let worker_stat = worker.borrow_stat(); worker_stat.enable(); } - let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat(); - coordinator_worker_stat.enable(); } pub fn statistics(&self) -> HashMap { @@ -389,8 +366,6 @@ impl GCWorkScheduler { let worker_stat = worker.borrow_stat(); summary.merge(&worker_stat); } - let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat(); - summary.merge(&coordinator_worker_stat); summary.harness_stat() } From 18f7bb610f9aac268518951f718c4b19122e12d4 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Thu, 28 Dec 2023 23:05:05 +0800 Subject: [PATCH 02/11] WIP: Rework GC triggering --- src/plan/gc_requester.rs | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index 0bae785202..43f7c5a5ac 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -6,12 +6,11 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; struct RequestSync { - request_count: isize, - last_request_count: isize, + /// Is GC scheduled (but not finished)? + gc_scheduled: bool, } -/// GC requester. This object allows other threads to request (trigger) GC, -/// and the GC coordinator thread waits for GC requests using this object. +/// This data structure lets mutators trigger GC, and may schedule collection when appropriate. pub struct GCRequester { request_sync: Mutex, request_flag: AtomicBool, @@ -23,8 +22,7 @@ impl GCRequester { pub fn new(scheduler: Arc>) -> Self { GCRequester { request_sync: Mutex::new(RequestSync { - request_count: 0, - last_request_count: -1, + gc_scheduled: true, }), request_flag: AtomicBool::new(false), scheduler, @@ -38,22 +36,37 @@ impl GCRequester { } let mut guard = self.request_sync.lock().unwrap(); + // Note: This is the double-checked locking algorithm. + // The load has the `Relaxed` order instead of `Acquire` because we only use the flag to + // remove successive requests, but we don't use it to synchronize other data fields. if !self.request_flag.load(Ordering::Relaxed) { self.request_flag.store(true, Ordering::Relaxed); - guard.request_count += 1; - self.schedule_collection(); + self.try_schedule_collection(&mut *guard); } } pub fn clear_request(&self) { - let guard = self.request_sync.lock().unwrap(); + let _guard = self.request_sync.lock().unwrap(); self.request_flag.store(false, Ordering::Relaxed); - drop(guard); } - fn schedule_collection(&self) { - // Add a ScheduleCollection work packet. It is the seed of other work packets. - self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); + pub fn on_gc_finished(&self) { + let mut guard = self.request_sync.lock().unwrap(); + guard.gc_scheduled = false; + + self.try_schedule_collection(&mut *guard); + } + + fn try_schedule_collection(&self, sync: &mut RequestSync) { + if self.request_flag.load(Ordering::Relaxed) && !sync.gc_scheduled { + // Add a ScheduleCollection work packet. It is the seed of other work packets. + self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); + + sync.gc_scheduled = true; + + // Note: We do not clear `request_flag` now. It will be cleared by `clear_request` + // after all mutators have stopped. + } } } From ffe8f33342f167c1662094c22e76198903adac61 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Tue, 2 Jan 2024 19:36:30 +0800 Subject: [PATCH 03/11] Working --- src/global_state.rs | 7 ++ src/plan/gc_requester.rs | 30 ++++++-- src/scheduler/controller.rs | 14 ++-- src/scheduler/gc_work.rs | 10 +++ src/scheduler/scheduler.rs | 137 ++++++++++++++++++++++++++++++++++-- src/scheduler/worker.rs | 114 ++++++++---------------------- 6 files changed, 210 insertions(+), 102 deletions(-) diff --git a/src/global_state.rs b/src/global_state.rs index e066811557..2340957eae 100644 --- a/src/global_state.rs +++ b/src/global_state.rs @@ -1,5 +1,8 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Mutex; +use std::time::Instant; + +use atomic_refcell::AtomicRefCell; /// This stores some global states for an MMTK instance. /// Some MMTK components like plans and allocators may keep an reference to the struct, and can access it. @@ -40,6 +43,9 @@ pub struct GlobalState { pub(crate) stacks_prepared: AtomicBool, /// A counter that keeps tracks of the number of bytes allocated since last stress test pub(crate) allocation_bytes: AtomicUsize, + /// The time when the current GC started. Only accessible in `ScheduleCollection` and + /// `GCWorkScheduler::on_gc_finished` which never happen at the same time. + pub(crate) gc_start_time: AtomicRefCell>, /// A counteer that keeps tracks of the number of bytes allocated by malloc #[cfg(feature = "malloc_counted_size")] pub(crate) malloc_bytes: AtomicUsize, @@ -214,6 +220,7 @@ impl Default for GlobalState { cur_collection_attempts: AtomicUsize::new(0), scanned_stacks: AtomicUsize::new(0), allocation_bytes: AtomicUsize::new(0), + gc_start_time: AtomicRefCell::new(None), #[cfg(feature = "malloc_counted_size")] malloc_bytes: AtomicUsize::new(0), #[cfg(feature = "count_live_bytes_in_gc")] diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index 43f7c5a5ac..53419aa486 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -1,5 +1,4 @@ -use crate::scheduler::gc_work::ScheduleCollection; -use crate::scheduler::{GCWorkScheduler, WorkBucketStage}; +use crate::scheduler::GCWorkScheduler; use crate::vm::VMBinding; use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; @@ -22,7 +21,7 @@ impl GCRequester { pub fn new(scheduler: Arc>) -> Self { GCRequester { request_sync: Mutex::new(RequestSync { - gc_scheduled: true, + gc_scheduled: false, }), request_flag: AtomicBool::new(false), scheduler, @@ -30,6 +29,8 @@ impl GCRequester { } } + /// Request a GC. Called by mutators when polling (during allocation) and when handling user + /// GC requests (e.g. `System.gc();` in Java); pub fn request(&self) { if self.request_flag.load(Ordering::Relaxed) { return; @@ -46,11 +47,28 @@ impl GCRequester { } } + /// Returns true if GC has been scheduled. + pub fn is_gc_scheduled(&self) -> bool { + let guard = self.request_sync.lock().unwrap(); + guard.gc_scheduled + } + + /// Clear the "GC requested" flag so that mutators can trigger the next GC. + /// Called by a GC worker when all mutators have come to a stop. pub fn clear_request(&self) { let _guard = self.request_sync.lock().unwrap(); self.request_flag.store(false, Ordering::Relaxed); } + /// Called by a GC worker when a GC has finished. + /// This will check the `request_flag` again and schedule the next GC. + /// + /// Note that this may schedule the next GC immediately if + /// 1. The plan is concurrent, and a mutator triggered another GC while the current GC was + /// still running (between `clear_request` and `on_gc_finished`), or + /// 2. After the invocation of `resume_mutators`, a mutator runs so fast that it + /// exhausted the heap, or called `handle_user_collection_request`, before this function + /// is called. pub fn on_gc_finished(&self) { let mut guard = self.request_sync.lock().unwrap(); guard.gc_scheduled = false; @@ -59,9 +77,11 @@ impl GCRequester { } fn try_schedule_collection(&self, sync: &mut RequestSync) { + // Do not schedule collection if a GC is still in progress. + // When the GC finishes, a GC worker will call `on_gc_finished` and check `request_flag` + // again. if self.request_flag.load(Ordering::Relaxed) && !sync.gc_scheduled { - // Add a ScheduleCollection work packet. It is the seed of other work packets. - self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); + self.scheduler.schedule_collection(); sync.gc_scheduled = true; diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index acc5426264..125efbbe91 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -75,17 +75,17 @@ impl GCController { fn do_gc_until_completion(&mut self) { let gc_start = std::time::Instant::now(); - debug_assert!( - self.scheduler.worker_monitor.debug_is_sleeping(), - "Workers are still doing work when GC started." - ); + // debug_assert!( + // self.scheduler.worker_monitor.debug_is_sleeping(), + // "Workers are still doing work when GC started." + // ); // Add a ScheduleCollection work packet. It is the seed of other work packets. self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); // Notify only one worker at this time because there is only one work packet, // namely `ScheduleCollection`. - self.scheduler.worker_monitor.resume_and_wait(false); + // self.scheduler.worker_monitor.resume_and_wait(false); // Gradually open more buckets as workers stop each time they drain all open bucket. loop { @@ -102,11 +102,11 @@ impl GCController { // Notify all workers because there should be many work packets available in the newly // opened bucket(s). - self.scheduler.worker_monitor.resume_and_wait(true); + // self.scheduler.worker_monitor.resume_and_wait(true); } // All GC workers must have parked by now. - debug_assert!(self.scheduler.worker_monitor.debug_is_sleeping()); + // debug_assert!(self.scheduler.worker_monitor.debug_is_sleeping()); debug_assert!(!self.scheduler.worker_group.has_designated_work()); debug_assert!(self.scheduler.all_buckets_empty()); diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 692873eb84..a651437911 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -14,6 +14,16 @@ pub struct ScheduleCollection; impl GCWork for ScheduleCollection { fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { + // Record the time when GC starts. + { + let mut guard = mmtk.state.gc_start_time.borrow_mut(); + let old_time = guard.replace(std::time::Instant::now()); + debug_assert!( + old_time.is_none(), + "gc_start_time is still set when GC started: {old_time:?}", + ); + } + // Tell GC trigger that GC started. mmtk.gc_trigger.policy.on_gc_start(mmtk); diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 46357e4a46..3372ff9770 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -1,12 +1,16 @@ +use super::gc_work::ScheduleCollection; use super::stat::SchedulerStat; use super::work_bucket::*; -use super::worker::{GCWorker, GCWorkerShared, ThreadId, WorkerGroup, WorkerMonitor}; +use super::worker::{GCWorker, ThreadId, WorkerGroup, WorkerMonitor}; use super::*; +use crate::global_state::GcStatus; use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; use crate::util::options::AffinityKind; use crate::util::rust_util::array_from_fn; +use crate::vm::Collection; use crate::vm::VMBinding; +use crate::Plan; use crossbeam::deque::Steal; use enum_map::{Enum, EnumMap}; use std::collections::HashMap; @@ -62,8 +66,6 @@ impl GCWorkScheduler { } } - let coordinator_worker_shared = Arc::new(GCWorkerShared::::new(None)); - Arc::new(Self { work_buckets, worker_group, @@ -86,9 +88,17 @@ impl GCWorkScheduler { self.affinity.resolve_affinity(thread); } + /// Schedule collection. Called via `GCRequester`. + /// Because this function may be called by a mutator thread, we only add a `ScheduleCollection` + /// work packet here so that a GC worker can wake up later and actually schedule the work for a + /// collection. + pub(crate) fn schedule_collection(&self) { + // Add a ScheduleCollection work packet. It is the seed of other work packets. + self.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); + } + /// Schedule all the common work packets pub fn schedule_common_work>(&self, plan: &'static C::PlanType) { - use crate::plan::Plan; use crate::scheduler::gc_work::*; // Stop & scan mutators (mutator scanning can happen before STW) self.work_buckets[WorkBucketStage::Unconstrained].add(StopMutators::::new()); @@ -349,8 +359,125 @@ impl GCWorkScheduler { return work; } - self.worker_monitor.park_and_wait(worker); + self.worker_monitor.park_and_wait(worker, || { + // This is the last worker parked. + + // Test whether we are doing GC. + if worker.mmtk.gc_requester.is_gc_scheduled() { + trace!("GC is scheduled. Try to find more work to do..."); + // We are in the middle of GC, and the last GC worker parked. + // Find more work for workers to do. + let found_more_work = self.find_more_work_for_workers(); + + if !found_more_work { + // GC finished. + self.on_gc_finished(worker); + } + + found_more_work + } else { + trace!("GC is not scheduled. Wait for the first GC."); + // GC is not scheduled. Do nothing. + // Note that when GC worker threads has just been created, they will try to get + // work packets to execute. But since the first GC has not started, yet, there + // is not any work packets to execute, yet. Therefore, all workers will park, + // and the last parked worker will reach here. In that case, we should simply + // let workers wait until the first GC starts, instead of trying to open more + // buckets. + // If a GC worker spuriously wakes up when GC is not scheduled, it should not + // do anything, either. + false + } + }); + } + } + + /// Find more work for workers to do. Return true if more work is available. + fn find_more_work_for_workers(&self) -> bool { + if self.worker_group.has_designated_work() { + return true; } + + // See if any bucket has a sentinel. + if self.schedule_sentinels() { + return true; + } + + // Try to open new buckets. + if self.update_buckets() { + return true; + } + + // If all of the above failed, it means GC has finished. + false + } + + /// Called when GC has finished, i.e. when all work packets have been executed. + fn on_gc_finished(&self, worker: &GCWorker) { + // All GC workers (except this one) must have parked by now. + debug_assert!(!self.worker_group.has_designated_work()); + debug_assert!(self.all_buckets_empty()); + + // Deactivate all work buckets to prepare for the next GC. + self.deactivate_all(); + self.debug_assert_all_buckets_deactivated(); + + let mmtk = worker.mmtk; + + // Tell GC trigger that GC ended - this happens before we resume mutators. + mmtk.gc_trigger.policy.on_gc_end(mmtk); + + // Compute the elapsed time of the GC. + let gc_start = { + let mut guard = mmtk.state.gc_start_time.borrow_mut(); + guard.take().expect("gc_start_time was not set") + }; + let elapsed = gc_start.elapsed(); + + info!( + "End of GC ({}/{} pages, took {} ms)", + mmtk.get_plan().get_reserved_pages(), + mmtk.get_plan().get_total_pages(), + elapsed.as_millis() + ); + + #[cfg(feature = "count_live_bytes_in_gc")] + { + let live_bytes = mmtk.state.get_live_bytes_in_last_gc(); + let used_bytes = + mmtk.get_plan().get_used_pages() << crate::util::constants::LOG_BYTES_IN_PAGE; + debug_assert!( + live_bytes <= used_bytes, + "Live bytes of all live objects ({} bytes) is larger than used pages ({} bytes), something is wrong.", + live_bytes, used_bytes + ); + info!( + "Live objects = {} bytes ({:04.1}% of {} used pages)", + live_bytes, + live_bytes as f64 * 100.0 / used_bytes as f64, + mmtk.get_plan().get_used_pages() + ); + } + + // All other workers are parked, so it is safe to access the Plan instance mutably. + let plan_mut: &mut dyn Plan = unsafe { mmtk.get_plan_mut() }; + plan_mut.end_of_gc(worker.tls); + + #[cfg(feature = "extreme_assertions")] + if crate::util::edge_logger::should_check_duplicate_edges(mmtk.get_plan()) { + // reset the logging info at the end of each GC + mmtk.edge_logger.reset(); + } + + // Reset the triggering information. + mmtk.state.reset_collection_trigger(); + + // Set to NotInGC after everything, and right before resuming mutators. + mmtk.set_gc_status(GcStatus::NotInGC); + ::VMCollection::resume_mutators(worker.tls); + + // Notify the `GCRequester` that GC has finished. + mmtk.gc_requester.on_gc_finished(); } pub fn enable_stat(&self) { diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index e415a14e46..3f483f7200 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -12,7 +12,7 @@ use crossbeam::queue::ArrayQueue; #[cfg(feature = "count_live_bytes_in_gc")] use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard}; /// Represents the ID of a GC worker thread. pub type ThreadId = usize; @@ -75,40 +75,14 @@ impl GCWorkerShared { } } -/// Used to synchronize mutually exclusive operations between workers and controller, -/// and also waking up workers when more work packets are available. +/// A data structure for parking and waking up workers. +/// It keeps track of the number of workers parked, and allow the last parked worker to perform +/// operations that require all other workers to be parked. pub(crate) struct WorkerMonitor { /// The synchronized part. sync: Mutex, /// This is notified when new work is made available for the workers. - /// Particularly, it is notified when - /// - `sync.worker_group_state` is transitioned to `Working` because - /// - some workers still have designated work, or - /// - some sentinel work packets are added to their drained buckets, or - /// - some work buckets are opened, or - /// - any work packet is added to any open bucket. - /// Workers wait on this condvar. work_available: Condvar, - /// This is notified when all workers parked. - /// The coordinator waits on this condvar. - all_workers_parked: Condvar, -} - -/// The state of the worker group. -/// -/// The worker group alternates between the `Sleeping` and the `Working` state. Workers are -/// allowed to execute work packets in the `Working` state. However, once workers entered the -/// `Sleeping` state, they will not be allowed to packets from buckets until the coordinator -/// explicitly transitions the state back to `Working` after it found more work for workers to do. -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -enum WorkerGroupState { - /// In this state, the coordinator can open new buckets and close buckets, - /// but workers cannot execute any packets or get any work packets from any buckets. - /// Workers cannot unpark in this state. - Sleeping, - /// In this state, workers can get work packets from open buckets, - /// but no buckets can be opened or closed. - Working, } /// The synchronized part of `WorkerMonitor`. @@ -117,8 +91,6 @@ pub(crate) struct WorkerMonitorSync { worker_count: usize, /// Number of parked workers. parked_workers: usize, - /// The worker group state. - worker_group_state: WorkerGroupState, } impl WorkerMonitor { @@ -127,61 +99,37 @@ impl WorkerMonitor { sync: Mutex::new(WorkerMonitorSync { worker_count, parked_workers: 0, - worker_group_state: WorkerGroupState::Sleeping, }), work_available: Default::default(), - all_workers_parked: Default::default(), } } /// Wake up workers when more work packets are made available for workers. /// This function is called when adding work packets to buckets. - /// This function doesn't change the `work_group_state` variable. - /// If workers are in the `Sleeping` state, use `resume_and_wait` to resume workers. pub fn notify_work_available(&self, all: bool) { - let sync = self.sync.lock().unwrap(); - - // Don't notify workers if we are adding packets when workers are sleeping. - // This could happen when we add `ScheduleCollection` or schedule sentinels. - if sync.worker_group_state == WorkerGroupState::Sleeping { - return; - } - - if all { - self.work_available.notify_all(); - } else { - self.work_available.notify_one(); - } + let mut guard = self.sync.lock().unwrap(); + self.notify_work_available_inner(all, &mut guard); } - /// Wake up workers and wait until they transition to `Sleeping` state again. - /// This is called by the coordinator. - /// If `all` is true, notify all workers; otherwise only notify one worker. - pub fn resume_and_wait(&self, all: bool) { - let mut sync = self.sync.lock().unwrap(); - sync.worker_group_state = WorkerGroupState::Working; + fn notify_work_available_inner(&self, all: bool, _guard: &mut MutexGuard) { if all { self.work_available.notify_all(); } else { self.work_available.notify_one(); } - let _sync = self - .all_workers_parked - .wait_while(sync, |sync| { - sync.worker_group_state == WorkerGroupState::Working - }) - .unwrap(); } - /// Test if the worker group is in the `Sleeping` state. - pub fn debug_is_sleeping(&self) -> bool { - let sync = self.sync.lock().unwrap(); - sync.worker_group_state == WorkerGroupState::Sleeping - } - - /// Park until more work is available. - /// The argument `worker` indicates this function can only be called by workers. - pub fn park_and_wait(&self, worker: &GCWorker) { + /// Park a worker until more work is available. + /// If it is the last worker parked, `on_last_parked` will be called. + /// If `on_last_parked` returns `true`, this function will notify other workers about available + /// work before unparking the current worker; + /// if `on_last_parked` returns `false`, the current worker will also wait for work packets to + /// be added. + pub fn park_and_wait(&self, worker: &GCWorker, on_last_parked: F) + where + VM: VMBinding, + F: FnOnce() -> bool, + { let mut sync = self.sync.lock().unwrap(); // Park this worker @@ -189,13 +137,17 @@ impl WorkerMonitor { trace!("Worker {} parked.", worker.ordinal); if all_parked { - // If all workers are parked, enter "Sleeping" state and notify controller. - sync.worker_group_state = WorkerGroupState::Sleeping; - debug!( - "Worker {} notifies the coordinator that all workerer parked.", - worker.ordinal - ); - self.all_workers_parked.notify_one(); + debug!("Worker {} is the last worker parked.", worker.ordinal); + let more_work_available = on_last_parked(); + if more_work_available { + self.notify_work_available_inner(true, &mut sync); + } else { + // It didn't make more work available. Keep waiting. + // This should happen when + // 1. Worker threads have just been created, but GC has not started, yet, or + // 2. A GC has just finished. + sync = self.work_available.wait(sync).unwrap(); + } } else { // Otherwise wait until notified. // Note: The condition for this `cond.wait` is "more work is available". @@ -205,14 +157,6 @@ impl WorkerMonitor { sync = self.work_available.wait(sync).unwrap(); } - // If we are in the `Sleeping` state, wait until leaving that state. - sync = self - .work_available - .wait_while(sync, |sync| { - sync.worker_group_state == WorkerGroupState::Sleeping - }) - .unwrap(); - // Unpark this worker. sync.dec_parked_workers(); trace!("Worker {} unparked.", worker.ordinal); From b64e2a78ecb42e1a60431f9e7473aa1a6f027f37 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Wed, 3 Jan 2024 12:17:20 +0800 Subject: [PATCH 04/11] WIP: Remove Controller --- src/memory_manager.rs | 17 +---- src/scheduler/controller.rs | 134 ------------------------------------ src/scheduler/mod.rs | 3 - src/scheduler/scheduler.rs | 21 +++++- src/scheduler/worker.rs | 8 ++- src/vm/collection.rs | 6 +- 6 files changed, 29 insertions(+), 160 deletions(-) delete mode 100644 src/scheduler/controller.rs diff --git a/src/memory_manager.rs b/src/memory_manager.rs index 67135e9678..e5a918233f 100644 --- a/src/memory_manager.rs +++ b/src/memory_manager.rs @@ -16,7 +16,7 @@ use crate::mmtk::MMTK; use crate::plan::AllocationSemantics; use crate::plan::{Mutator, MutatorContext}; use crate::scheduler::WorkBucketStage; -use crate::scheduler::{GCController, GCWork, GCWorker}; +use crate::scheduler::{GCWork, GCWorker}; use crate::util::alloc::allocators::AllocatorSelector; use crate::util::constants::{LOG_BYTES_IN_PAGE, MIN_OBJECT_SIZE}; use crate::util::heap::layout::vm_layout::vm_layout; @@ -461,21 +461,6 @@ pub fn gc_poll(mmtk: &MMTK, tls: VMMutatorThread) { } } -/// Run the main loop for the GC controller thread. This method does not return. -/// -/// Arguments: -/// * `tls`: The thread that will be used as the GC controller. -/// * `gc_controller`: The execution context of the GC controller threa. -/// It is the `GCController` passed to `Collection::spawn_gc_thread`. -/// * `mmtk`: A reference to an MMTk instance. -pub fn start_control_collector( - _mmtk: &'static MMTK, - tls: VMWorkerThread, - gc_controller: &mut GCController, -) { - gc_controller.run(tls); -} - /// Run the main loop of a GC worker. This method does not return. /// /// Arguments: diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs deleted file mode 100644 index 125efbbe91..0000000000 --- a/src/scheduler/controller.rs +++ /dev/null @@ -1,134 +0,0 @@ -//! The GC controller thread. -//! -//! MMTk has many GC threads. There are many GC worker threads and one GC controller thread. -//! The GC controller thread responds to GC requests and coordinates the workers to perform GC. - -use std::sync::Arc; - -use crate::plan::gc_requester::GCRequester; -use crate::scheduler::gc_work::{EndOfGC, ScheduleCollection}; -use crate::scheduler::{GCWork, WorkBucketStage}; -use crate::util::VMWorkerThread; -use crate::vm::VMBinding; -use crate::MMTK; - -use super::{GCWorkScheduler, GCWorker}; - -/// The thread local struct for the GC controller, the counterpart of `GCWorker`. -pub struct GCController { - /// The reference to the MMTk instance. - mmtk: &'static MMTK, - /// The reference to the GC requester. - requester: Arc>, - /// The reference to the scheduler. - scheduler: Arc>, - /// The `GCWorker` is used to execute packets. The controller is also a `GCWorker`. - coordinator_worker: GCWorker, -} - -impl GCController { - /// The main loop for the GC controller. - pub fn run(&mut self, tls: VMWorkerThread) -> ! { - probe!(mmtk, gccontroller_run); - // Initialize the GC worker for coordinator. We are not using the run() method from - // GCWorker so we manually initialize the worker here. - self.coordinator_worker.tls = tls; - - loop { - debug!("[STWController: Waiting for request...]"); - //self.requester.wait_for_request(); - debug!("[STWController: Request recieved.]"); - - self.do_gc_until_completion_traced(); - debug!("[STWController: Worker threads complete!]"); - } - } - - /// Find more work for workers to do. Return true if more work is available. - fn find_more_work_for_workers(&mut self) -> bool { - if self.scheduler.worker_group.has_designated_work() { - return true; - } - - // See if any bucket has a sentinel. - if self.scheduler.schedule_sentinels() { - return true; - } - - // Try to open new buckets. - if self.scheduler.update_buckets() { - return true; - } - - // If all of the above failed, it means GC has finished. - false - } - - /// A wrapper method for [`do_gc_until_completion`](GCController::do_gc_until_completion) to insert USDT tracepoints. - fn do_gc_until_completion_traced(&mut self) { - probe!(mmtk, gc_start); - self.do_gc_until_completion(); - probe!(mmtk, gc_end); - } - - /// Coordinate workers to perform GC in response to a GC request. - fn do_gc_until_completion(&mut self) { - let gc_start = std::time::Instant::now(); - - // debug_assert!( - // self.scheduler.worker_monitor.debug_is_sleeping(), - // "Workers are still doing work when GC started." - // ); - - // Add a ScheduleCollection work packet. It is the seed of other work packets. - self.scheduler.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); - - // Notify only one worker at this time because there is only one work packet, - // namely `ScheduleCollection`. - // self.scheduler.worker_monitor.resume_and_wait(false); - - // Gradually open more buckets as workers stop each time they drain all open bucket. - loop { - // Workers should only transition to the `Sleeping` state when all open buckets have - // been drained. - self.scheduler.assert_all_activated_buckets_are_empty(); - - let new_work_available = self.find_more_work_for_workers(); - - // GC finishes if there is no new work to do. - if !new_work_available { - break; - } - - // Notify all workers because there should be many work packets available in the newly - // opened bucket(s). - // self.scheduler.worker_monitor.resume_and_wait(true); - } - - // All GC workers must have parked by now. - // debug_assert!(self.scheduler.worker_monitor.debug_is_sleeping()); - debug_assert!(!self.scheduler.worker_group.has_designated_work()); - debug_assert!(self.scheduler.all_buckets_empty()); - - // Deactivate all work buckets to prepare for the next GC. - // NOTE: There is no need to hold any lock. - // Workers are in the `Sleeping` state. - // so they will not wake up while we deactivate buckets. - self.scheduler.deactivate_all(); - - // Tell GC trigger that GC ended - this happens before EndOfGC where we resume mutators. - self.mmtk.gc_trigger.policy.on_gc_end(self.mmtk); - - // Finalization: Resume mutators, reset gc states - // Note: Resume-mutators must happen after all work buckets are closed. - // Otherwise, for generational GCs, workers will receive and process - // newly generated remembered-sets from those open buckets. - // But these remsets should be preserved until next GC. - let mut end_of_gc = EndOfGC { - elapsed: gc_start.elapsed(), - }; - end_of_gc.do_work_with_stat(&mut self.coordinator_worker, self.mmtk); - - self.scheduler.debug_assert_all_buckets_deactivated(); - } -} diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 03b7d822bc..45501e9943 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -20,8 +20,5 @@ mod worker; pub(crate) use worker::current_worker_ordinal; pub use worker::GCWorker; -mod controller; -pub use controller::GCController; - pub(crate) mod gc_work; pub use gc_work::ProcessEdgesWork; diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 3372ff9770..9fc8fc18fd 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -276,7 +276,7 @@ impl GCWorkScheduler { } /// Check if all the work buckets are empty - pub(crate) fn assert_all_activated_buckets_are_empty(&self) { + pub(crate) fn assert_all_activated_buckets_are_empty(&self, worker: &GCWorker) { let mut error_example = None; for (id, bucket) in self.work_buckets.iter() { if bucket.is_activated() && !bucket.is_empty() { @@ -286,6 +286,16 @@ impl GCWorkScheduler { // we should show at least one abnormal bucket in the panic message // so that we still have some information for debugging. error_example = Some(id); + + while !bucket.is_empty() { + match bucket.poll(&worker.local_work_buffer) { + Steal::Success(w) => { + error!(" Bucket {:?} has {:?}", id, w.get_type_name()); + }, + Steal::Retry => continue, + _ => {} + } + } } } if let Some(id) = error_example { @@ -364,8 +374,12 @@ impl GCWorkScheduler { // Test whether we are doing GC. if worker.mmtk.gc_requester.is_gc_scheduled() { - trace!("GC is scheduled. Try to find more work to do..."); // We are in the middle of GC, and the last GC worker parked. + trace!("GC is scheduled. Try to find more work to do..."); + + // During GC, if all workers parked, all open buckets must have been drained. + self.assert_all_activated_buckets_are_empty(worker); + // Find more work for workers to do. let found_more_work = self.find_more_work_for_workers(); @@ -395,16 +409,19 @@ impl GCWorkScheduler { /// Find more work for workers to do. Return true if more work is available. fn find_more_work_for_workers(&self) -> bool { if self.worker_group.has_designated_work() { + trace!("Some workers have designated work."); return true; } // See if any bucket has a sentinel. if self.schedule_sentinels() { + trace!("Some sentinels are scheduled."); return true; } // Try to open new buckets. if self.update_buckets() { + trace!("Some buckets are opened."); return true; } diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 3f483f7200..dead43a048 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -134,7 +134,13 @@ impl WorkerMonitor { // Park this worker let all_parked = sync.inc_parked_workers(); - trace!("Worker {} parked.", worker.ordinal); + trace!( + "Worker {} parked. parked/total: {}/{}. All parked: {}", + worker.ordinal, + sync.parked_workers, + sync.worker_count, + all_parked + ); if all_parked { debug!("Worker {} is the last worker parked.", worker.ordinal); diff --git a/src/vm/collection.rs b/src/vm/collection.rs index 5420224ab0..7fe17ccdfd 100644 --- a/src/vm/collection.rs +++ b/src/vm/collection.rs @@ -4,9 +4,8 @@ use crate::vm::VMBinding; use crate::{scheduler::*, Mutator}; /// Thread context for the spawned GC thread. It is used by spawn_gc_thread. +/// Currently, `GCWorker` is the only kind of thread that mmtk-core will create. pub enum GCThreadContext { - /// The GC thread to spawn is a controller thread. There is only one controller thread. - Controller(Box>), /// The GC thread to spawn is a worker thread. There can be multiple worker threads. Worker(Box>), } @@ -53,10 +52,9 @@ pub trait Collection { /// * `tls`: The thread pointer for the parent thread that we spawn new threads from. This is the same `tls` when the VM /// calls `initialize_collection()` and passes as an argument. /// * `ctx`: The context for the GC thread. - /// * If `Controller` is passed, it means spawning a thread to run as the GC controller. - /// The spawned thread shall call `memory_manager::start_control_collector`. /// * If `Worker` is passed, it means spawning a thread to run as a GC worker. /// The spawned thread shall call `memory_manager::start_worker`. + /// Currently `Worker` is the only kind of thread which mmtk-core will create. /// In either case, the `Box` inside should be passed back to the called function. fn spawn_gc_thread(tls: VMThread, ctx: GCThreadContext); From 4fa8fac5bf7f6a73750157fe076bc0104d7b67d3 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Wed, 3 Jan 2024 17:24:55 +0800 Subject: [PATCH 05/11] Fix deadlock between GCRequester and GC workers. --- src/global_state.rs | 20 +++++++++--- src/mmtk.rs | 14 ++++----- src/plan/gc_requester.rs | 59 +++++++++++++++++++++--------------- src/scheduler/scheduler.rs | 45 +++++++++++++++++++++------ src/scheduler/work_bucket.rs | 22 +++++++++++--- src/scheduler/worker.rs | 46 +++++++++++++++++++++------- 6 files changed, 145 insertions(+), 61 deletions(-) diff --git a/src/global_state.rs b/src/global_state.rs index 2340957eae..55dbab7b3b 100644 --- a/src/global_state.rs +++ b/src/global_state.rs @@ -1,8 +1,9 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Mutex; use std::time::Instant; +use atomic::Atomic; use atomic_refcell::AtomicRefCell; +use bytemuck::NoUninit; /// This stores some global states for an MMTK instance. /// Some MMTK components like plans and allocators may keep an reference to the struct, and can access it. @@ -20,7 +21,7 @@ pub struct GlobalState { /// bindings to temporarily disable GC, at which point, we do not trigger GC even if the heap is full. pub(crate) trigger_gc_when_heap_is_full: AtomicBool, /// The current GC status. - pub(crate) gc_status: Mutex, + pub(crate) gc_status: Atomic, /// Is the current GC an emergency collection? Emergency means we may run out of memory soon, and we should /// attempt to collect as much as we can. pub(crate) emergency_collection: AtomicBool, @@ -209,7 +210,7 @@ impl Default for GlobalState { Self { initialized: AtomicBool::new(false), trigger_gc_when_heap_is_full: AtomicBool::new(true), - gc_status: Mutex::new(GcStatus::NotInGC), + gc_status: Atomic::new(GcStatus::NotInGC), stacks_prepared: AtomicBool::new(false), emergency_collection: AtomicBool::new(false), user_triggered_collection: AtomicBool::new(false), @@ -229,9 +230,20 @@ impl Default for GlobalState { } } -#[derive(PartialEq)] +/// GC status, an indicator of whether MMTk is in the progress of a GC or not. +/// +/// This type was only used for assertions in JikesRVM. After we removed the coordinator (a.k.a. +/// controller) thread, we use the GC status to decide whether GC workers should open new work +/// buckets when the last worker parked. +// FIXME: `GcProper` is inherited from JikesRVM, but it is not used in MMTk core. Consider +// removing it. +#[derive(PartialEq, Clone, Copy, NoUninit, Debug)] +#[repr(u8)] pub enum GcStatus { + /// Not in GC NotInGC, + /// GC has started, but not all stacks have been scanned, yet. GcPrepare, + /// GC has started, and all stacks have been scanned. GcProper, } diff --git a/src/mmtk.rs b/src/mmtk.rs index 3d91f23ae8..9c433fa06d 100644 --- a/src/mmtk.rs +++ b/src/mmtk.rs @@ -255,15 +255,15 @@ impl MMTK { self.inside_sanity.load(Ordering::Relaxed) } - pub(crate) fn set_gc_status(&self, s: GcStatus) { - let mut gc_status = self.state.gc_status.lock().unwrap(); - if *gc_status == GcStatus::NotInGC { + pub(crate) fn set_gc_status(&self, new_status: GcStatus) { + let old_status = self.state.gc_status.swap(new_status, Ordering::SeqCst); + debug_assert_ne!(old_status, new_status); + if old_status == GcStatus::NotInGC { self.state.stacks_prepared.store(false, Ordering::SeqCst); // FIXME stats self.stats.start_gc(); } - *gc_status = s; - if *gc_status == GcStatus::NotInGC { + if new_status == GcStatus::NotInGC { // FIXME stats if self.stats.get_gathering_stats() { self.stats.end_gc(); @@ -273,12 +273,12 @@ impl MMTK { /// Return true if a collection is in progress. pub fn gc_in_progress(&self) -> bool { - *self.state.gc_status.lock().unwrap() != GcStatus::NotInGC + self.state.gc_status.load(Ordering::SeqCst) != GcStatus::NotInGC } /// Return true if a collection is in progress and past the preparatory stage. pub fn gc_in_progress_proper(&self) -> bool { - *self.state.gc_status.lock().unwrap() == GcStatus::GcProper + self.state.gc_status.load(Ordering::SeqCst) == GcStatus::GcProper } /// Return true if the current GC is an emergency GC. diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index 53419aa486..03d54889df 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -5,13 +5,17 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; struct RequestSync { - /// Is GC scheduled (but not finished)? + /// Has the GCRequester called `GCWorkScheduler::schedule_collection` for the current request? + /// This flag exists so that once `GCRequester` called `GCWorkScheduler::schedule_collection`, + /// it cannot call it again until the GC it initiated finished. gc_scheduled: bool, } /// This data structure lets mutators trigger GC, and may schedule collection when appropriate. pub struct GCRequester { request_sync: Mutex, + /// An atomic flag outside `RequestSync` so that mutators can check if GC has already been + /// requested in `poll` without acquiring the mutex. request_flag: AtomicBool, scheduler: Arc>, phantom: PhantomData, @@ -32,18 +36,23 @@ impl GCRequester { /// Request a GC. Called by mutators when polling (during allocation) and when handling user /// GC requests (e.g. `System.gc();` in Java); pub fn request(&self) { + // Note: This is the double-checked locking algorithm. + // The load has the `Relaxed` order instead of `Acquire` because we are not doing lazy + // initialization here. We are only using this flag to remove successive requests. if self.request_flag.load(Ordering::Relaxed) { return; } let mut guard = self.request_sync.lock().unwrap(); - // Note: This is the double-checked locking algorithm. - // The load has the `Relaxed` order instead of `Acquire` because we only use the flag to - // remove successive requests, but we don't use it to synchronize other data fields. if !self.request_flag.load(Ordering::Relaxed) { self.request_flag.store(true, Ordering::Relaxed); - self.try_schedule_collection(&mut *guard); + let should_schedule_gc = self.try_schedule_collection(&mut guard); + if should_schedule_gc { + self.scheduler.mutator_schedule_collection(); + // Note: We do not clear `request_flag` now. It will be cleared by `clear_request` + // after all mutators have stopped. + } } } @@ -61,32 +70,34 @@ impl GCRequester { } /// Called by a GC worker when a GC has finished. - /// This will check the `request_flag` again and schedule the next GC. - /// - /// Note that this may schedule the next GC immediately if - /// 1. The plan is concurrent, and a mutator triggered another GC while the current GC was - /// still running (between `clear_request` and `on_gc_finished`), or - /// 2. After the invocation of `resume_mutators`, a mutator runs so fast that it - /// exhausted the heap, or called `handle_user_collection_request`, before this function - /// is called. - pub fn on_gc_finished(&self) { + /// This will check the `request_flag` again and check if we should immediately schedule the + /// next GC. If we should, `gc_scheduled` will be set back to `true` and this function will + /// return `true`. + pub fn on_gc_finished(&self) -> bool { let mut guard = self.request_sync.lock().unwrap(); guard.gc_scheduled = false; - self.try_schedule_collection(&mut *guard); + self.try_schedule_collection(&mut guard) } - fn try_schedule_collection(&self, sync: &mut RequestSync) { - // Do not schedule collection if a GC is still in progress. - // When the GC finishes, a GC worker will call `on_gc_finished` and check `request_flag` - // again. + /// Decide whether we should schedule a new collection. Will transition the state of + /// `gc_scheduled` from `false` to `true` if we should schedule a new collection. + /// Return `true` if the state transition happens. + fn try_schedule_collection(&self, sync: &mut RequestSync) -> bool { + // The time to schedule a collection is when `request_flag` is `true` but `gc_scheduled` + // is `false`. `gc_scheduled` is `true` if either + // + // 1. another mutator called `request()` concurrently and scheduled a collection, or + // 2. a new GC is requested while the current GC is still in progress. + // + // If `gc_scheduled` is `true` when GC is requested, we do nothing now. But when the + // currrent GC finishes, a GC worker will call `on_gc_finished` which clears the + // `gc_scheduled` flag, and checks the `request_flag` again to trigger the next GC. if self.request_flag.load(Ordering::Relaxed) && !sync.gc_scheduled { - self.scheduler.schedule_collection(); - sync.gc_scheduled = true; - - // Note: We do not clear `request_flag` now. It will be cleared by `clear_request` - // after all mutators have stopped. + true + } else { + false } } } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 9fc8fc18fd..0f035d7a93 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -5,6 +5,7 @@ use super::worker::{GCWorker, ThreadId, WorkerGroup, WorkerMonitor}; use super::*; use crate::global_state::GcStatus; use crate::mmtk::MMTK; +use crate::scheduler::worker::LastParkedResult; use crate::util::opaque_pointer::*; use crate::util::options::AffinityKind; use crate::util::rust_util::array_from_fn; @@ -89,10 +90,11 @@ impl GCWorkScheduler { } /// Schedule collection. Called via `GCRequester`. - /// Because this function may be called by a mutator thread, we only add a `ScheduleCollection` + /// Because this function is called by a mutator thread, we only add a `ScheduleCollection` /// work packet here so that a GC worker can wake up later and actually schedule the work for a /// collection. - pub(crate) fn schedule_collection(&self) { + pub(crate) fn mutator_schedule_collection(&self) { + debug!("Adding ScheduleCollection work packet upon mutator request."); // Add a ScheduleCollection work packet. It is the seed of other work packets. self.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); } @@ -373,7 +375,7 @@ impl GCWorkScheduler { // This is the last worker parked. // Test whether we are doing GC. - if worker.mmtk.gc_requester.is_gc_scheduled() { + if worker.mmtk.gc_in_progress() { // We are in the middle of GC, and the last GC worker parked. trace!("GC is scheduled. Try to find more work to do..."); @@ -383,12 +385,17 @@ impl GCWorkScheduler { // Find more work for workers to do. let found_more_work = self.find_more_work_for_workers(); - if !found_more_work { + if found_more_work { + LastParkedResult::WakeAll + } else { // GC finished. - self.on_gc_finished(worker); + let scheduled_next_gc = self.on_gc_finished(worker); + if scheduled_next_gc { + LastParkedResult::WakeSelf + } else { + LastParkedResult::ParkSelf + } } - - found_more_work } else { trace!("GC is not scheduled. Wait for the first GC."); // GC is not scheduled. Do nothing. @@ -400,7 +407,7 @@ impl GCWorkScheduler { // buckets. // If a GC worker spuriously wakes up when GC is not scheduled, it should not // do anything, either. - false + LastParkedResult::ParkSelf } }); } @@ -430,7 +437,8 @@ impl GCWorkScheduler { } /// Called when GC has finished, i.e. when all work packets have been executed. - fn on_gc_finished(&self, worker: &GCWorker) { + /// Return `true` if it scheduled the next GC immediately. + fn on_gc_finished(&self, worker: &GCWorker) -> bool { // All GC workers (except this one) must have parked by now. debug_assert!(!self.worker_group.has_designated_work()); debug_assert!(self.all_buckets_empty()); @@ -494,7 +502,24 @@ impl GCWorkScheduler { ::VMCollection::resume_mutators(worker.tls); // Notify the `GCRequester` that GC has finished. - mmtk.gc_requester.on_gc_finished(); + let should_schedule_gc_now = mmtk.gc_requester.on_gc_finished(); + if should_schedule_gc_now { + // We should schedule the next GC immediately. This means GC was triggered between + // `clear_request` (when stacks were scanned) and `on_gc_finished` (right above). This + // can happen if + // 1. It is concurrent GC, and a mutator triggered another GC while the current GC was + // still running, or + // 2. It is STW GC, but after the invocation of `resume_mutators` above, one mutator + // ran so fast that it triggered a GC before we called `on_gc_finished`. + // Note that we are holding the `WorkerMonitor` mutex, and cannot notify workers now. + // When this function returns, the current worker should continue to execute the newly + // added `ScheduleCollection` work packet. + debug!("GC already requested before `on_gc_finished`. Add ScheduleCollection now."); + self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection); + true + } else { + false + } } pub fn enable_stat(&self) { diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index d6c6cb6545..d7076354eb 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -139,6 +139,19 @@ impl WorkBucket { self.notify_one_worker(); } + /// Add a work packet ot this bucket, but do not notify any workers. + /// This is useful when the current thread is holding the mutex of `WorkerMonitor` which is + /// used for notifying workers. This usually happens if the current thread is the last worker + /// parked. + pub(crate) fn add_no_notify>(&self, work: W) { + self.queue.push(Box::new(work)); + } + + /// Like `add_no_notify`, but the work is boxed. + pub(crate) fn add_boxed_no_notify(&self, work: Box>) { + self.queue.push(work); + } + /// Add multiple packets with a higher priority. /// Panic if this bucket cannot receive prioritized packets. pub fn bulk_add_prioritized(&self, work_vec: Vec>>) { @@ -210,11 +223,10 @@ impl WorkBucket { sentinel.take() }; if let Some(work) = maybe_sentinel { - // We don't need to call `self.add` because this function is called by the coordinator - // when workers are stopped. We don't need to notify the workers because the - // coordinator will do that later. - // We can just "sneak" the sentinel work packet into the current bucket. - self.queue.push(work); + // We don't need to notify other workers because this function is called by the last + // parked worker. After this function returns, the caller will notify workers because + // more work packets become available. + self.add_boxed_no_notify(work); true } else { false diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index dead43a048..764f9f649d 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -75,6 +75,17 @@ impl GCWorkerShared { } } +/// The result type of the `on_last_parked` call-back in `WorkMonitor::park_and_wait`. +/// It decides how many workers should wake up after `on_last_parked`. +pub(crate) enum LastParkedResult { + /// The last parked worker should wait, too, until more work packets are added. + ParkSelf, + /// The last parked worker should unpark and find work packet to do. + WakeSelf, + /// Wake up all parked GC workers. + WakeAll, +} + /// A data structure for parking and waking up workers. /// It keeps track of the number of workers parked, and allow the last parked worker to perform /// operations that require all other workers to be parked. @@ -128,7 +139,7 @@ impl WorkerMonitor { pub fn park_and_wait(&self, worker: &GCWorker, on_last_parked: F) where VM: VMBinding, - F: FnOnce() -> bool, + F: FnOnce() -> LastParkedResult, { let mut sync = self.sync.lock().unwrap(); @@ -144,15 +155,23 @@ impl WorkerMonitor { if all_parked { debug!("Worker {} is the last worker parked.", worker.ordinal); - let more_work_available = on_last_parked(); - if more_work_available { - self.notify_work_available_inner(true, &mut sync); - } else { - // It didn't make more work available. Keep waiting. - // This should happen when - // 1. Worker threads have just been created, but GC has not started, yet, or - // 2. A GC has just finished. - sync = self.work_available.wait(sync).unwrap(); + let result = on_last_parked(); + match result { + LastParkedResult::ParkSelf => { + // It didn't make more work available. Keep waiting. + // This should happen when + // 1. Worker threads have just been created, but GC has not started, yet, or + // 2. A GC has just finished. + sync = self.work_available.wait(sync).unwrap(); + } + LastParkedResult::WakeSelf => { + // Continue without waiting. + // This should happen when only one work packet is made available. + } + LastParkedResult::WakeAll => { + // Notify all GC workers. + self.notify_work_available_inner(true, &mut sync); + } } } else { // Otherwise wait until notified. @@ -165,7 +184,12 @@ impl WorkerMonitor { // Unpark this worker. sync.dec_parked_workers(); - trace!("Worker {} unparked.", worker.ordinal); + trace!( + "Worker {} unparked. parked/total: {}/{}.", + worker.ordinal, + sync.parked_workers, + sync.worker_count, + ); } } From 8f8c3e008cb99781278d70e5354f7f52f5108c6c Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Thu, 4 Jan 2024 12:11:25 +0800 Subject: [PATCH 06/11] Remove comments and add back trace points --- docs/header/mmtk.h | 3 --- src/scheduler/gc_work.rs | 3 +++ src/scheduler/scheduler.rs | 20 +++++++++++--------- src/scheduler/worker.rs | 21 ++------------------- src/util/options.rs | 2 +- src/vm/collection.rs | 3 +-- tools/tracing/README.md | 1 - 7 files changed, 18 insertions(+), 35 deletions(-) diff --git a/docs/header/mmtk.h b/docs/header/mmtk.h index 7fb6949d6b..3985bee06a 100644 --- a/docs/header/mmtk.h +++ b/docs/header/mmtk.h @@ -84,9 +84,6 @@ extern void mmtk_scan_region(); // Request MMTk to trigger a GC. Note that this may not actually trigger a GC extern void mmtk_handle_user_collection_request(void* tls); -// Run the main loop for the GC controller thread. Does not return -extern void mmtk_start_control_collector(void* tls, void* worker); - // Run the main loop for a GC worker. Does not return extern void mmtk_start_worker(void* tls, void* worker); diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index a651437911..2be49a06eb 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -14,6 +14,9 @@ pub struct ScheduleCollection; impl GCWork for ScheduleCollection { fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { + // This is officially when a GC starts. + probe!(mmtk, gc_start); + // Record the time when GC starts. { let mut guard = mmtk.state.gc_start_time.borrow_mut(); diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 0f035d7a93..c81ea280ed 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -79,7 +79,7 @@ impl GCWorkScheduler { self.worker_group.as_ref().worker_count() } - /// Create GC threads, including the controller thread and all workers. + /// Create GC threads, including all workers. pub fn spawn_gc_threads(self: &Arc, mmtk: &'static MMTK, tls: VMThread) { self.worker_group.spawn(mmtk, tls) } @@ -501,6 +501,9 @@ impl GCWorkScheduler { mmtk.set_gc_status(GcStatus::NotInGC); ::VMCollection::resume_mutators(worker.tls); + // GC offically ends here. + probe!(mmtk, gc_end); + // Notify the `GCRequester` that GC has finished. let should_schedule_gc_now = mmtk.gc_requester.on_gc_finished(); if should_schedule_gc_now { @@ -542,14 +545,13 @@ impl GCWorkScheduler { mmtk.gc_requester.clear_request(); let first_stw_bucket = &self.work_buckets[WorkBucketStage::first_stw_stage()]; debug_assert!(!first_stw_bucket.is_activated()); - // Note: This is the only place where a non-coordinator thread opens a bucket. - // If the `StopMutators` is executed by the coordinator thread, it will open - // the `Prepare` bucket and let workers start executing packets while the coordinator - // can still add more work packets to `Prepare`. However, since `Prepare` is the first STW - // bucket and only the coordinator can open any subsequent buckets, workers cannot execute - // work packets out of order. This is not generally true if we are not opening the first - // STW bucket. In the future, we should redesign the opening condition of work buckets to - // make the synchronization more robust, + // Note: This is the only place where a bucket is opened without having all workers parked. + // We usually require all workers to park before opening new buckets because otherwise + // packets will be executed out of order. However, since `Prepare` is the first STW + // bucket, and all subsequent buckets require all workers to park before opening, workers + // cannot execute work packets out of order. This is not generally true if we are not + // opening the first STW bucket. In the future, we should redesign the opening condition + // of work buckets to make the synchronization more robust, first_stw_bucket.activate(); self.worker_monitor.notify_work_available(true); } diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 764f9f649d..bd5367f987 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -44,10 +44,6 @@ pub struct GCWorkerShared { #[cfg(feature = "count_live_bytes_in_gc")] live_bytes: AtomicUsize, /// A queue of GCWork that can only be processed by the owned thread. - /// - /// Note: Currently, designated work cannot be added from the GC controller thread, or - /// there will be synchronization problems. If it is necessary to do so, we need to - /// update the code in `GCWorkScheduler::poll_slow` for proper synchornization. pub designated_work: ArrayQueue>>, /// Handle for stealing packets from the current worker pub stealer: Option>>>, @@ -218,12 +214,10 @@ impl WorkerMonitorSync { } /// A GC worker. This part is privately owned by a worker thread. -/// The GC controller also has an embedded `GCWorker` because it may also execute work packets. pub struct GCWorker { /// The VM-specific thread-local state of the GC thread. pub tls: VMWorkerThread, - /// The ordinal of the worker, numbered from 0 to the number of workers minus one. The ordinal - /// is usize::MAX if it is the embedded worker of the GC controller thread. + /// The ordinal of the worker, numbered from 0 to the number of workers minus one. pub ordinal: ThreadId, /// The reference to the scheduler. scheduler: Arc>, @@ -231,9 +225,6 @@ pub struct GCWorker { copy: GCWorkerCopyContext, /// The reference to the MMTk instance. pub mmtk: &'static MMTK, - /// True if this struct is the embedded GCWorker of the controller thread. - /// False if this struct belongs to a standalone GCWorker thread. - is_coordinator: bool, /// Reference to the shared part of the GC worker. It is used for synchronization. pub shared: Arc>, /// Local work packet queue. @@ -262,7 +253,6 @@ impl GCWorker { mmtk: &'static MMTK, ordinal: ThreadId, scheduler: Arc>, - is_coordinator: bool, shared: Arc>, local_work_buffer: deque::Worker>>, ) -> Self { @@ -273,7 +263,6 @@ impl GCWorker { copy: GCWorkerCopyContext::new_non_copy(), scheduler, mmtk, - is_coordinator, shared, local_work_buffer, } @@ -307,11 +296,6 @@ impl GCWorker { self.local_work_buffer.push(Box::new(work)); } - /// Is this worker a coordinator or a normal GC worker? - pub fn is_coordinator(&self) -> bool { - self.is_coordinator - } - /// Get the scheduler. There is only one scheduler per MMTk instance. pub fn scheduler(&self) -> &GCWorkScheduler { &self.scheduler @@ -364,7 +348,7 @@ impl GCWorker { } } -/// A worker group to manage all the GC workers (except the coordinator worker). +/// A worker group to manage all the GC workers. pub(crate) struct WorkerGroup { /// Shared worker data pub workers_shared: Vec>>, @@ -401,7 +385,6 @@ impl WorkerGroup { mmtk, ordinal, mmtk.scheduler.clone(), - false, shared.clone(), unspawned_local_work_queues.pop().unwrap(), )); diff --git a/src/util/options.rs b/src/util/options.rs index 11a1b329ae..eb6513e1c3 100644 --- a/src/util/options.rs +++ b/src/util/options.rs @@ -705,7 +705,7 @@ mod gc_trigger_tests { options! { /// The GC plan to use. plan: PlanSelector [env_var: true, command_line: true] [always_valid] = PlanSelector::GenImmix, - /// Number of GC worker threads. (There is always one GC controller thread besides the GC workers) + /// Number of GC worker threads. // FIXME: Currently we create GCWorkScheduler when MMTK is created, which is usually static. // To allow this as a command-line option, we need to refactor the creation fo the `MMTK` instance. // See: https://github.com/mmtk/mmtk-core/issues/532 diff --git a/src/vm/collection.rs b/src/vm/collection.rs index 7fe17ccdfd..b3b172ded0 100644 --- a/src/vm/collection.rs +++ b/src/vm/collection.rs @@ -30,8 +30,7 @@ pub trait Collection { /// This method may not be called by the same GC thread that called `stop_all_mutators`. /// /// Arguments: - /// * `tls`: The thread pointer for the GC worker. Currently it is the tls of the embedded `GCWorker` instance - /// of the coordinator thread, but it is subject to change, and should not be depended on. + /// * `tls`: The thread pointer for the GC worker. fn resume_mutators(tls: VMWorkerThread); /// Block the current thread for GC. This is called when an allocation request cannot be fulfilled and a GC diff --git a/tools/tracing/README.md b/tools/tracing/README.md index 23a633dd21..1c8598abe0 100644 --- a/tools/tracing/README.md +++ b/tools/tracing/README.md @@ -15,7 +15,6 @@ Currently, the core provides the following tracepoints. - `mmtk:collection_initialized()`: GC is enabled - `mmtk:harness_begin()`: the timing iteration of a benchmark begins - `mmtk:harness_end()`: the timing iteration of a benchmark ends -- `mmtk:gccontroller_run()`: the GC controller thread enters its work loop - `mmtk:gcworker_run()`: a GC worker thread enters its work loop - `mmtk:gc_start()`: a collection epoch starts - `mmtk:gc_end()`: a collection epoch ends From 845eea6f0d5ca1d1732071ab82fc13d6deb46b69 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Fri, 5 Jan 2024 00:05:20 +0800 Subject: [PATCH 07/11] WIP: Updating comments and minor code change. Found a case where workers may sleep forever. Need to be fixed later. --- Cargo.toml | 1 + src/global_state.rs | 7 +++-- src/scheduler/scheduler.rs | 54 ++++++++++++++------------------ src/scheduler/worker.rs | 63 ++++++++++++++++++++++++++------------ src/util/options.rs | 3 -- 5 files changed, 73 insertions(+), 55 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 558f820701..3461c25835 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ static_assertions = "1.1.0" strum = "0.25" strum_macros = "0.25" sysinfo = "0.29" +rand = "0.8.5" [dev-dependencies] paste = "1.0.8" diff --git a/src/global_state.rs b/src/global_state.rs index 55dbab7b3b..4d57fd2e23 100644 --- a/src/global_state.rs +++ b/src/global_state.rs @@ -44,8 +44,11 @@ pub struct GlobalState { pub(crate) stacks_prepared: AtomicBool, /// A counter that keeps tracks of the number of bytes allocated since last stress test pub(crate) allocation_bytes: AtomicUsize, - /// The time when the current GC started. Only accessible in `ScheduleCollection` and - /// `GCWorkScheduler::on_gc_finished` which never happen at the same time. + /// The time when the current GC started. Currently only used for logging. + /// Note that some (but not all) `GCTriggerPolicy` implementations do their own time tracking + /// independently for their own need. + /// This field only accessible in `ScheduleCollection` and `GCWorkScheduler::on_gc_finished` + /// which never happen at the same time, so `AtomicRefCell` is enough. pub(crate) gc_start_time: AtomicRefCell>, /// A counteer that keeps tracks of the number of bytes allocated by malloc #[cfg(feature = "malloc_counted_size")] diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index c81ea280ed..1c44e8c736 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -22,7 +22,7 @@ pub struct GCWorkScheduler { pub work_buckets: EnumMap>, /// Workers pub(crate) worker_group: Arc>, - /// Condition Variable for worker synchronization + /// Monitor for worker synchronization, including a mutex and conditional variables. pub(crate) worker_monitor: Arc, /// How to assign the affinity of each GC thread. Specified by the user. affinity: AffinityKind, @@ -48,14 +48,16 @@ impl GCWorkScheduler { // Set the open condition of each bucket. { - // Unconstrained is always open. Prepare will be opened at the beginning of a GC. - // This vec will grow for each stage we call with open_next() let first_stw_stage = WorkBucketStage::first_stw_stage(); let mut open_stages: Vec = vec![first_stw_stage]; - // The rest will open after the previous stage is done. let stages = (0..WorkBucketStage::LENGTH).map(WorkBucketStage::from_usize); for stage in stages { + // Unconstrained is always open. + // The first STW stage (Prepare) will be opened when the world stopped + // (i.e. when all mutators are suspended). if stage != WorkBucketStage::Unconstrained && stage != first_stw_stage { + // Other work packets will be opened after previous stages are done + // (i.e their buckets are drained and all workers parked). let cur_stages = open_stages.clone(); work_buckets[stage].set_open_condition( move |scheduler: &GCWorkScheduler| { @@ -278,7 +280,7 @@ impl GCWorkScheduler { } /// Check if all the work buckets are empty - pub(crate) fn assert_all_activated_buckets_are_empty(&self, worker: &GCWorker) { + pub(crate) fn assert_all_activated_buckets_are_empty(&self) { let mut error_example = None; for (id, bucket) in self.work_buckets.iter() { if bucket.is_activated() && !bucket.is_empty() { @@ -288,16 +290,6 @@ impl GCWorkScheduler { // we should show at least one abnormal bucket in the panic message // so that we still have some information for debugging. error_example = Some(id); - - while !bucket.is_empty() { - match bucket.poll(&worker.local_work_buffer) { - Steal::Success(w) => { - error!(" Bucket {:?} has {:?}", id, w.get_type_name()); - }, - Steal::Retry => continue, - _ => {} - } - } } } if let Some(id) = error_example { @@ -377,10 +369,10 @@ impl GCWorkScheduler { // Test whether we are doing GC. if worker.mmtk.gc_in_progress() { // We are in the middle of GC, and the last GC worker parked. - trace!("GC is scheduled. Try to find more work to do..."); + trace!("The last worker parked during GC. Try to find more work to do..."); // During GC, if all workers parked, all open buckets must have been drained. - self.assert_all_activated_buckets_are_empty(worker); + self.assert_all_activated_buckets_are_empty(); // Find more work for workers to do. let found_more_work = self.find_more_work_for_workers(); @@ -397,16 +389,16 @@ impl GCWorkScheduler { } } } else { - trace!("GC is not scheduled. Wait for the first GC."); - // GC is not scheduled. Do nothing. - // Note that when GC worker threads has just been created, they will try to get - // work packets to execute. But since the first GC has not started, yet, there - // is not any work packets to execute, yet. Therefore, all workers will park, - // and the last parked worker will reach here. In that case, we should simply - // let workers wait until the first GC starts, instead of trying to open more - // buckets. - // If a GC worker spuriously wakes up when GC is not scheduled, it should not - // do anything, either. + trace!("The last worker parked while not in GC. Wait for GC to start."); + // GC has not started, yet. Do not try to open work buckets. + // + // This branch is usually reached when `initialize_colection` has just been + // called and GC worker threads have just been created. In that case, there is + // no work packets to execute, and workers should park and wait for the first + // GC. + // + // This branch can also be reached if a GC worker spuriously wakes up while not + // in GC. LastParkedResult::ParkSelf } }); @@ -439,7 +431,7 @@ impl GCWorkScheduler { /// Called when GC has finished, i.e. when all work packets have been executed. /// Return `true` if it scheduled the next GC immediately. fn on_gc_finished(&self, worker: &GCWorker) -> bool { - // All GC workers (except this one) must have parked by now. + // All GC workers must have parked by now. debug_assert!(!self.worker_group.has_designated_work()); debug_assert!(self.all_buckets_empty()); @@ -466,6 +458,9 @@ impl GCWorkScheduler { elapsed.as_millis() ); + // USDT tracepoint for the end of GC. + probe!(mmtk, gc_end); + #[cfg(feature = "count_live_bytes_in_gc")] { let live_bytes = mmtk.state.get_live_bytes_in_last_gc(); @@ -501,9 +496,6 @@ impl GCWorkScheduler { mmtk.set_gc_status(GcStatus::NotInGC); ::VMCollection::resume_mutators(worker.tls); - // GC offically ends here. - probe!(mmtk, gc_end); - // Notify the `GCRequester` that GC has finished. let should_schedule_gc_now = mmtk.gc_requester.on_gc_finished(); if should_schedule_gc_now { diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index bd5367f987..754d50b0a3 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -84,7 +84,7 @@ pub(crate) enum LastParkedResult { /// A data structure for parking and waking up workers. /// It keeps track of the number of workers parked, and allow the last parked worker to perform -/// operations that require all other workers to be parked. +/// operations that can only be performed when all workers have parked. pub(crate) struct WorkerMonitor { /// The synchronized part. sync: Mutex, @@ -118,6 +118,8 @@ impl WorkerMonitor { self.notify_work_available_inner(all, &mut guard); } + /// Like `notify_work_available` but the current thread must have already acquired the + /// mutex of `WorkerMonitorSync`. fn notify_work_available_inner(&self, all: bool, _guard: &mut MutexGuard) { if all { self.work_available.notify_all(); @@ -126,17 +128,19 @@ impl WorkerMonitor { } } - /// Park a worker until more work is available. + /// Park a worker until work packets are available. /// If it is the last worker parked, `on_last_parked` will be called. - /// If `on_last_parked` returns `true`, this function will notify other workers about available - /// work before unparking the current worker; - /// if `on_last_parked` returns `false`, the current worker will also wait for work packets to - /// be added. + /// The return value of `on_last_parked` will determine whether this worker will block wait, + /// too, and whether other worker will be waken up. pub fn park_and_wait(&self, worker: &GCWorker, on_last_parked: F) where VM: VMBinding, F: FnOnce() -> LastParkedResult, { + warn!("Sleep before acquiring self.sync..."); + std::thread::sleep(std::time::Duration::from_millis(30)); + warn!("Slept. Now acquire self.sync..."); + let mut sync = self.sync.lock().unwrap(); // Park this worker @@ -149,33 +153,54 @@ impl WorkerMonitor { all_parked ); + let mut should_wait = false; + if all_parked { debug!("Worker {} is the last worker parked.", worker.ordinal); let result = on_last_parked(); match result { LastParkedResult::ParkSelf => { - // It didn't make more work available. Keep waiting. - // This should happen when - // 1. Worker threads have just been created, but GC has not started, yet, or - // 2. A GC has just finished. - sync = self.work_available.wait(sync).unwrap(); + should_wait = true; } LastParkedResult::WakeSelf => { // Continue without waiting. - // This should happen when only one work packet is made available. } LastParkedResult::WakeAll => { - // Notify all GC workers. self.notify_work_available_inner(true, &mut sync); } } } else { - // Otherwise wait until notified. - // Note: The condition for this `cond.wait` is "more work is available". - // If this worker spuriously wakes up, then in the next loop iteration, the - // `poll_schedulable_work` invocation above will fail, and the worker will reach - // here and wait again. - sync = self.work_available.wait(sync).unwrap(); + should_wait = true; + } + + if should_wait { + // Note: + // 1. The condition variable `work_available` is guarded by `self.sync`. Because the + // last parked worker is holding the mutex `self.sync` when executing + // `on_last_parked`, no workers can unpark (even if they spuriously wake up) during + // `on_last_parked` because they cannot re-acquire the mutex `self.sync`. + // 2. Workers may spuriously wake up and unpark when `on_last_parked` is not being + // executed (including the case when the last parked worker is waiting here, too). + // Spurious wake-up is safe because the actual condition for this `cond.wait` is + // "more work is available". If a worker spuriously wakes up, then in the next + // loop iteration, it will call `poll_schedulable_work`, and find no work packets + // to execute. Then the worker will reach here again and wait. + // 3. Mutators may add a `ScheduleCollection` work packet via `GCRequester` to trigger + // GC. It is the only case where a work packet is added by a thread that is not a + // GC worker. Because the mutator must hold the mutex `self.sync` to notify GC + // workers when adding a work packet to a bucket, either of the two can happen: + // 1. The mutator called `work_available.notify_one` after the worker has called + // `wait`, in which case one worker (not necessarily the last parked worker) + // will be waken up. + // 2. The mutator notified when the last worker has just entered this function. + // In that case, ... Oh no! All workers sleep forever! + warn!("Now wait..."); + if rand::random::() { + sync = self.work_available.wait(sync).unwrap(); + warn!("Out from wait."); + } else { + warn!("Emulated spurious wakeup."); + } } // Unpark this worker. diff --git a/src/util/options.rs b/src/util/options.rs index eb6513e1c3..fb51d0fc65 100644 --- a/src/util/options.rs +++ b/src/util/options.rs @@ -706,9 +706,6 @@ options! { /// The GC plan to use. plan: PlanSelector [env_var: true, command_line: true] [always_valid] = PlanSelector::GenImmix, /// Number of GC worker threads. - // FIXME: Currently we create GCWorkScheduler when MMTK is created, which is usually static. - // To allow this as a command-line option, we need to refactor the creation fo the `MMTK` instance. - // See: https://github.com/mmtk/mmtk-core/issues/532 threads: usize [env_var: true, command_line: true] [|v: &usize| *v > 0] = num_cpus::get(), /// Enable an optimization that only scans the part of the stack that has changed since the last GC (not supported) use_short_stack_scans: bool [env_var: true, command_line: true] [always_valid] = false, From 2d52f00d5c30268ae6cfced27a1c5a214162e969 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Fri, 5 Jan 2024 03:18:22 +0800 Subject: [PATCH 08/11] Fix a case where workers wait forever Mutators no longer trigger GC by adding ScheduleCollection directly. --- Cargo.toml | 1 - src/plan/gc_requester.rs | 2 +- src/scheduler/scheduler.rs | 114 +++++++++++++++++++++---------------- src/scheduler/worker.rs | 103 +++++++++++++++++++++++---------- 4 files changed, 139 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3461c25835..558f820701 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,6 @@ static_assertions = "1.1.0" strum = "0.25" strum_macros = "0.25" sysinfo = "0.29" -rand = "0.8.5" [dev-dependencies] paste = "1.0.8" diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index 03d54889df..234f74b423 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -49,7 +49,7 @@ impl GCRequester { let should_schedule_gc = self.try_schedule_collection(&mut guard); if should_schedule_gc { - self.scheduler.mutator_schedule_collection(); + self.scheduler.request_schedule_collection(); // Note: We do not clear `request_flag` now. It will be cleared by `clear_request` // after all mutators have stopped. } diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 1c44e8c736..151bdcc8c8 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -91,14 +91,16 @@ impl GCWorkScheduler { self.affinity.resolve_affinity(thread); } - /// Schedule collection. Called via `GCRequester`. - /// Because this function is called by a mutator thread, we only add a `ScheduleCollection` - /// work packet here so that a GC worker can wake up later and actually schedule the work for a - /// collection. - pub(crate) fn mutator_schedule_collection(&self) { - debug!("Adding ScheduleCollection work packet upon mutator request."); - // Add a ScheduleCollection work packet. It is the seed of other work packets. - self.work_buckets[WorkBucketStage::Unconstrained].add(ScheduleCollection); + /// Request a GC to be scheduled. Called by mutator via `GCRequester`. + pub(crate) fn request_schedule_collection(&self) { + debug!("A mutator is sending GC-scheduling request to workers..."); + self.worker_monitor.request_schedule_collection(); + } + + /// Add the `ScheduleCollection` packet. Called by the last parked worker. + fn add_schedule_collection_packet(&self) { + // We are still holding the mutex `WorkerMonitor::sync`. Do not notify now. + self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection); } /// Schedule all the common work packets @@ -363,45 +365,57 @@ impl GCWorkScheduler { return work; } - self.worker_monitor.park_and_wait(worker, || { - // This is the last worker parked. - - // Test whether we are doing GC. - if worker.mmtk.gc_in_progress() { - // We are in the middle of GC, and the last GC worker parked. - trace!("The last worker parked during GC. Try to find more work to do..."); - - // During GC, if all workers parked, all open buckets must have been drained. - self.assert_all_activated_buckets_are_empty(); - - // Find more work for workers to do. - let found_more_work = self.find_more_work_for_workers(); - - if found_more_work { - LastParkedResult::WakeAll - } else { - // GC finished. - let scheduled_next_gc = self.on_gc_finished(worker); - if scheduled_next_gc { - LastParkedResult::WakeSelf - } else { - LastParkedResult::ParkSelf - } - } + self.worker_monitor + .park_and_wait(worker, |should_schedule_gc| { + self.on_last_parked(worker, should_schedule_gc) + }); + } + } + + /// Called when the last worker parked. + /// `should_schedule_gc` is true if a mutator requested a GC. + fn on_last_parked(&self, worker: &GCWorker, should_schedule_gc: bool) -> LastParkedResult { + // Test whether this is happening during GC. + if worker.mmtk.gc_in_progress() { + assert!( + !should_schedule_gc, + "GC request sent to WorkerMonitor while GC is still in progress." + ); + + // We are in the middle of GC, and the last GC worker parked. + trace!("The last worker parked during GC. Try to find more work to do..."); + + // During GC, if all workers parked, all open buckets must have been drained. + self.assert_all_activated_buckets_are_empty(); + + // Find more work for workers to do. + let found_more_work = self.find_more_work_for_workers(); + + if found_more_work { + LastParkedResult::WakeAll + } else { + // GC finished. + let scheduled_next_gc = self.on_gc_finished(worker); + if scheduled_next_gc { + LastParkedResult::WakeSelf } else { - trace!("The last worker parked while not in GC. Wait for GC to start."); - // GC has not started, yet. Do not try to open work buckets. - // - // This branch is usually reached when `initialize_colection` has just been - // called and GC worker threads have just been created. In that case, there is - // no work packets to execute, and workers should park and wait for the first - // GC. - // - // This branch can also be reached if a GC worker spuriously wakes up while not - // in GC. LastParkedResult::ParkSelf } - }); + } + } else { + trace!( + "The last worker parked while not in GC. should_schedule_gc: {}", + should_schedule_gc + ); + + if should_schedule_gc { + // A mutator requested a GC to be scheduled. + self.add_schedule_collection_packet(); + LastParkedResult::WakeSelf + } else { + // Wait until GC is requested. + LastParkedResult::ParkSelf + } } } @@ -506,13 +520,15 @@ impl GCWorkScheduler { // still running, or // 2. It is STW GC, but after the invocation of `resume_mutators` above, one mutator // ran so fast that it triggered a GC before we called `on_gc_finished`. - // Note that we are holding the `WorkerMonitor` mutex, and cannot notify workers now. - // When this function returns, the current worker should continue to execute the newly - // added `ScheduleCollection` work packet. - debug!("GC already requested before `on_gc_finished`. Add ScheduleCollection now."); - self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection); + debug!("GC already requested before `on_gc_finished`. Schedule GC now."); + self.add_schedule_collection_packet(); true } else { + // Note that if a mutator attempts to request GC after `on_gc_finished`, it will call + // `request_schedule_collection`, but will block on the mutex `WorkerMonitor::sync` + // because the current GC worker is holding it. After the current worker calls + // `work_available.wait()`, the mutator will continue and wake up a GC worker (not + // necessarily this one) to schedule the GC. false } } diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 754d50b0a3..74351bedc1 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -98,6 +98,8 @@ pub(crate) struct WorkerMonitorSync { worker_count: usize, /// Number of parked workers. parked_workers: usize, + /// True if a mutator has requested the workers to schedule a GC. + should_schedule_gc: bool, } impl WorkerMonitor { @@ -106,12 +108,26 @@ impl WorkerMonitor { sync: Mutex::new(WorkerMonitorSync { worker_count, parked_workers: 0, + should_schedule_gc: false, }), work_available: Default::default(), } } - /// Wake up workers when more work packets are made available for workers. + /// Request a GC worker to schedule the next GC. + /// Callable from mutator threads. + pub fn request_schedule_collection(&self) { + let mut guard = self.sync.lock().unwrap(); + assert!( + !guard.should_schedule_gc, + "should_schedule_gc is already set" + ); + guard.should_schedule_gc = true; + self.notify_work_available_inner(false, &mut guard); + } + + /// Wake up workers when more work packets are made available for workers, + /// or a mutator has requested the GC workers to schedule a GC. /// This function is called when adding work packets to buckets. pub fn notify_work_available(&self, all: bool) { let mut guard = self.sync.lock().unwrap(); @@ -130,17 +146,15 @@ impl WorkerMonitor { /// Park a worker until work packets are available. /// If it is the last worker parked, `on_last_parked` will be called. - /// The return value of `on_last_parked` will determine whether this worker will block wait, - /// too, and whether other worker will be waken up. + /// The argument is true if `sync.gc_requested` is `true`, + /// and `sync.gc_requested` will be cleared to `false` regardless of its value. + /// The return value of `on_last_parked` will determine whether this worker will block and + /// wait, too, and whether other worker will be waken up. pub fn park_and_wait(&self, worker: &GCWorker, on_last_parked: F) where VM: VMBinding, - F: FnOnce() -> LastParkedResult, + F: FnOnce(bool) -> LastParkedResult, { - warn!("Sleep before acquiring self.sync..."); - std::thread::sleep(std::time::Duration::from_millis(30)); - warn!("Slept. Now acquire self.sync..."); - let mut sync = self.sync.lock().unwrap(); // Park this worker @@ -157,7 +171,8 @@ impl WorkerMonitor { if all_parked { debug!("Worker {} is the last worker parked.", worker.ordinal); - let result = on_last_parked(); + let should_schedule_gc = std::mem::replace(&mut sync.should_schedule_gc, false); + let result = on_last_parked(should_schedule_gc); match result { LastParkedResult::ParkSelf => { should_wait = true; @@ -174,33 +189,61 @@ impl WorkerMonitor { } if should_wait { - // Note: + // Notes on CondVar usage: + // + // Conditional variables may spurious wake up. Therefore, they are usually tested in a + // loop while holding a mutex + // + // lock(); + // while condition() { + // condvar.wait(); + // } + // unlock(); + // + // The actual condition for this `self.work_available.wait(sync)` is: + // + // 1. any work packet is available, or + // 2. a request for scheduling GC is submitted. + // + // But it is not used like the typical use pattern shown above, mainly because work + // packets can be added without holding the mutex `self.sync`. This means one worker + // can add a new work packet (no mutex needed) right after another worker finds no work + // packets are available and then park. In other words, condition (1) can suddenly + // become true after a worker sees it is false but before the worker blocks waiting on + // the CondVar. If this happens, the last parked worker will block forever and never + // get notified. This may happen if mutators or the previously existing "coordinator + // thread" can add work packets. + // + // However, after the "coordinator thread" was removed, only GC worker threads can add + // work packets during GC. Parked workers (except the last parked worker) cannot make + // more work packets availble (by adding new packets or opening buckets). For this + // reason, the **last** parked worker can be sure that after it finds no packets + // available, no other workers can add another work packet (because they all parked). + // So the **last** parked worker can open more buckets or declare GC finished. + // + // Condition (2), i.e. `sync.should_schedule_gc` is guarded by the mutator `sync`. + // When set (by a mutator via `request_schedule_collection`), it will notify a + // worker; and the last parked worker always checks it before waiting. So this + // condition will not be set without any worker noticing. + // + // Note that generational barriers may add `ProcessModBuf` work packets when not in GC. + // This is benign because those work packets are not executed immediately, and are + // guaranteed to be executed in the next GC. + // + // Notes on spurious wake-up: + // // 1. The condition variable `work_available` is guarded by `self.sync`. Because the // last parked worker is holding the mutex `self.sync` when executing // `on_last_parked`, no workers can unpark (even if they spuriously wake up) during // `on_last_parked` because they cannot re-acquire the mutex `self.sync`. + // // 2. Workers may spuriously wake up and unpark when `on_last_parked` is not being // executed (including the case when the last parked worker is waiting here, too). - // Spurious wake-up is safe because the actual condition for this `cond.wait` is - // "more work is available". If a worker spuriously wakes up, then in the next - // loop iteration, it will call `poll_schedulable_work`, and find no work packets - // to execute. Then the worker will reach here again and wait. - // 3. Mutators may add a `ScheduleCollection` work packet via `GCRequester` to trigger - // GC. It is the only case where a work packet is added by a thread that is not a - // GC worker. Because the mutator must hold the mutex `self.sync` to notify GC - // workers when adding a work packet to a bucket, either of the two can happen: - // 1. The mutator called `work_available.notify_one` after the worker has called - // `wait`, in which case one worker (not necessarily the last parked worker) - // will be waken up. - // 2. The mutator notified when the last worker has just entered this function. - // In that case, ... Oh no! All workers sleep forever! - warn!("Now wait..."); - if rand::random::() { - sync = self.work_available.wait(sync).unwrap(); - warn!("Out from wait."); - } else { - warn!("Emulated spurious wakeup."); - } + // If one or more GC workers spuriously wake up, they will check for work packets, + // and park again if not available. The last parked worker will ensure the two + // conditions listed above are both false before blocking. If either condition is + // true, the last parked worker will take action. + sync = self.work_available.wait(sync).unwrap(); } // Unpark this worker. From fa2aa5012281ae9744be77b89cd442d4407b8b87 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Fri, 5 Jan 2024 18:04:15 +0800 Subject: [PATCH 09/11] Added comments and removed redundant code. --- src/global_state.rs | 19 ++++++++++++------ src/mmtk.rs | 2 ++ src/plan/gc_requester.rs | 42 +++++++++++++++++++--------------------- src/scheduler/worker.rs | 31 ++++++++++++++++------------- 4 files changed, 53 insertions(+), 41 deletions(-) diff --git a/src/global_state.rs b/src/global_state.rs index 4d57fd2e23..468efb9f37 100644 --- a/src/global_state.rs +++ b/src/global_state.rs @@ -233,13 +233,20 @@ impl Default for GlobalState { } } -/// GC status, an indicator of whether MMTk is in the progress of a GC or not. +/// GC status. /// -/// This type was only used for assertions in JikesRVM. After we removed the coordinator (a.k.a. -/// controller) thread, we use the GC status to decide whether GC workers should open new work -/// buckets when the last worker parked. -// FIXME: `GcProper` is inherited from JikesRVM, but it is not used in MMTk core. Consider -// removing it. +/// - It starts in the `NotInGC` state. +/// - It enters `GcPrepare` when GC starts (i.e. when `ScheduleCollection` is executed). +/// - It enters `GcProper` when all mutators have stopped. +/// - It returns to `NotInGC` when GC finishes. +/// +/// All states except `NotInGC` are considered "in GC". +/// +/// The status is checked by GC workers. The last parked worker only tries to open new buckets +/// during GC. +/// +/// The distinction between `GcPrepare` and `GcProper` is inherited from JikesRVM. Several +/// assertions in JikesRVM involve those two states. #[derive(PartialEq, Clone, Copy, NoUninit, Debug)] #[repr(u8)] pub enum GcStatus { diff --git a/src/mmtk.rs b/src/mmtk.rs index 9c433fa06d..44ef399083 100644 --- a/src/mmtk.rs +++ b/src/mmtk.rs @@ -344,6 +344,8 @@ impl MMTK { self.state .internal_triggered_collection .store(true, Ordering::Relaxed); + // TODO: The current `GCRequester::request()` is probably incorrect for internally triggered GC. + // Consider removing functions related to "internal triggered collection". self.gc_requester.request(); } diff --git a/src/plan/gc_requester.rs b/src/plan/gc_requester.rs index 234f74b423..43ae8e2abd 100644 --- a/src/plan/gc_requester.rs +++ b/src/plan/gc_requester.rs @@ -1,24 +1,33 @@ use crate::scheduler::GCWorkScheduler; use crate::vm::VMBinding; -use std::marker::PhantomData; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; struct RequestSync { - /// Has the GCRequester called `GCWorkScheduler::schedule_collection` for the current request? - /// This flag exists so that once `GCRequester` called `GCWorkScheduler::schedule_collection`, - /// it cannot call it again until the GC it initiated finished. + /// Are the GC workers already aware that we requested a GC? + /// + /// Mutators call `GCRequester::request()` to trigger GC. It communicates with workers by + /// calling `GCWorkScheduler::request_schedule_collection`. Under the hood, it sets a + /// synchronized variable and notifies one worker. Conceptually, it is sending a message to GC + /// workers. + /// + /// The purpose of this variable is preventing the message above from being sent while GC + /// workers are still doing GC. This is mainly significant for concurrent GC. In the current + /// design (inherited from JikesRVM), `request_flag` is cleared when all mutators are + /// suspended, at which time GC is still in progress. If `GCRequester::request()` is called + /// after `request_flag` is cleared but before `on_gc_finished` is called, the mutator will not + /// send the message. But GC workers will check `request_flag` when GC finishes, and schedule + /// the next GC. gc_scheduled: bool, } -/// This data structure lets mutators trigger GC, and may schedule collection when appropriate. +/// This data structure lets mutators trigger GC. pub struct GCRequester { request_sync: Mutex, /// An atomic flag outside `RequestSync` so that mutators can check if GC has already been /// requested in `poll` without acquiring the mutex. request_flag: AtomicBool, scheduler: Arc>, - phantom: PhantomData, } impl GCRequester { @@ -29,12 +38,11 @@ impl GCRequester { }), request_flag: AtomicBool::new(false), scheduler, - phantom: PhantomData, } } /// Request a GC. Called by mutators when polling (during allocation) and when handling user - /// GC requests (e.g. `System.gc();` in Java); + /// GC requests (e.g. `System.gc();` in Java). pub fn request(&self) { // Note: This is the double-checked locking algorithm. // The load has the `Relaxed` order instead of `Acquire` because we are not doing lazy @@ -56,12 +64,6 @@ impl GCRequester { } } - /// Returns true if GC has been scheduled. - pub fn is_gc_scheduled(&self) -> bool { - let guard = self.request_sync.lock().unwrap(); - guard.gc_scheduled - } - /// Clear the "GC requested" flag so that mutators can trigger the next GC. /// Called by a GC worker when all mutators have come to a stop. pub fn clear_request(&self) { @@ -82,16 +84,12 @@ impl GCRequester { /// Decide whether we should schedule a new collection. Will transition the state of /// `gc_scheduled` from `false` to `true` if we should schedule a new collection. + /// /// Return `true` if the state transition happens. fn try_schedule_collection(&self, sync: &mut RequestSync) -> bool { - // The time to schedule a collection is when `request_flag` is `true` but `gc_scheduled` - // is `false`. `gc_scheduled` is `true` if either - // - // 1. another mutator called `request()` concurrently and scheduled a collection, or - // 2. a new GC is requested while the current GC is still in progress. - // - // If `gc_scheduled` is `true` when GC is requested, we do nothing now. But when the - // currrent GC finishes, a GC worker will call `on_gc_finished` which clears the + // The time to schedule a collection is when `request_flag` is `true` but `gc_scheduled` is + // `false`. If `gc_scheduled` is `true` when GC is requested, we do nothing now. But when + // the currrent GC finishes, a GC worker will call `on_gc_finished` which clears the // `gc_scheduled` flag, and checks the `request_flag` again to trigger the next GC. if self.request_flag.load(Ordering::Relaxed) && !sync.gc_scheduled { sync.gc_scheduled = true; diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 74351bedc1..302bc9d239 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -82,13 +82,16 @@ pub(crate) enum LastParkedResult { WakeAll, } -/// A data structure for parking and waking up workers. -/// It keeps track of the number of workers parked, and allow the last parked worker to perform -/// operations that can only be performed when all workers have parked. +/// A data structure for synchronizing workers. +/// +/// - It allows workers to park and unpark. It keeps track of the number of workers parked, and +/// allows the last parked worker to perform operations that can only be performed when all +/// workers have parked. +/// - It allows mutators to notify workers to schedule a GC. pub(crate) struct WorkerMonitor { /// The synchronized part. sync: Mutex, - /// This is notified when new work is made available for the workers. + /// This is notified when new work packets are available, or a mutator has requested GC. work_available: Condvar, } @@ -99,6 +102,10 @@ pub(crate) struct WorkerMonitorSync { /// Number of parked workers. parked_workers: usize, /// True if a mutator has requested the workers to schedule a GC. + /// + /// Consider this as a one-element message queue. The last parked worker will poll this + /// "queue" by reading this field and setting it to `false`. The last parked worker will + /// schedule a GC whenever seeing `true` in this field. should_schedule_gc: bool, } @@ -128,7 +135,6 @@ impl WorkerMonitor { /// Wake up workers when more work packets are made available for workers, /// or a mutator has requested the GC workers to schedule a GC. - /// This function is called when adding work packets to buckets. pub fn notify_work_available(&self, all: bool) { let mut guard = self.sync.lock().unwrap(); self.notify_work_available_inner(all, &mut guard); @@ -144,12 +150,12 @@ impl WorkerMonitor { } } - /// Park a worker until work packets are available. + /// Park a worker and wait on the CondVar `work_available`. + /// /// If it is the last worker parked, `on_last_parked` will be called. - /// The argument is true if `sync.gc_requested` is `true`, - /// and `sync.gc_requested` will be cleared to `false` regardless of its value. - /// The return value of `on_last_parked` will determine whether this worker will block and - /// wait, too, and whether other worker will be waken up. + /// The argument of `on_last_parked` is true if `sync.gc_requested` is `true`. + /// The return value of `on_last_parked` will determine whether this worker and other workers + /// will wake up or block waiting. pub fn park_and_wait(&self, worker: &GCWorker, on_last_parked: F) where VM: VMBinding, @@ -191,8 +197,7 @@ impl WorkerMonitor { if should_wait { // Notes on CondVar usage: // - // Conditional variables may spurious wake up. Therefore, they are usually tested in a - // loop while holding a mutex + // Conditional variables are usually tested in a loop while holding a mutex // // lock(); // while condition() { @@ -229,7 +234,7 @@ impl WorkerMonitor { // Note that generational barriers may add `ProcessModBuf` work packets when not in GC. // This is benign because those work packets are not executed immediately, and are // guaranteed to be executed in the next GC. - // + // Notes on spurious wake-up: // // 1. The condition variable `work_available` is guarded by `self.sync`. Because the From 5bda9ee3362887a2b0499eabb347191f5d887cb6 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Thu, 11 Jan 2024 16:22:20 +0800 Subject: [PATCH 10/11] Move the gc_start trace point. --- src/scheduler/gc_work.rs | 3 --- src/scheduler/scheduler.rs | 4 ++++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 2be49a06eb..a651437911 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -14,9 +14,6 @@ pub struct ScheduleCollection; impl GCWork for ScheduleCollection { fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { - // This is officially when a GC starts. - probe!(mmtk, gc_start); - // Record the time when GC starts. { let mut guard = mmtk.state.gc_start_time.borrow_mut(); diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 151bdcc8c8..927dd8fbd0 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -99,6 +99,10 @@ impl GCWorkScheduler { /// Add the `ScheduleCollection` packet. Called by the last parked worker. fn add_schedule_collection_packet(&self) { + // We set the eBPF trace point here so that bpftrace scripts can start recording work + // packet events before the `ScheduleCollection` work packet starts. + probe!(mmtk, gc_start); + // We are still holding the mutex `WorkerMonitor::sync`. Do not notify now. self.work_buckets[WorkBucketStage::Unconstrained].add_no_notify(ScheduleCollection); } From 72e6eb93f0c119c7fe44dbf1b148921e32c3219c Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Fri, 12 Jan 2024 14:27:57 +0800 Subject: [PATCH 11/11] Remove the EndOfGC work packet. --- src/scheduler/gc_work.rs | 51 ---------------------------------------- 1 file changed, 51 deletions(-) diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index a651437911..1084cffc57 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -222,57 +222,6 @@ impl GCWork for StopMutators { } } -#[derive(Default)] -pub struct EndOfGC { - pub elapsed: std::time::Duration, -} - -impl GCWork for EndOfGC { - fn do_work(&mut self, worker: &mut GCWorker, mmtk: &'static MMTK) { - info!( - "End of GC ({}/{} pages, took {} ms)", - mmtk.get_plan().get_reserved_pages(), - mmtk.get_plan().get_total_pages(), - self.elapsed.as_millis() - ); - - #[cfg(feature = "count_live_bytes_in_gc")] - { - let live_bytes = mmtk.state.get_live_bytes_in_last_gc(); - let used_bytes = - mmtk.get_plan().get_used_pages() << crate::util::constants::LOG_BYTES_IN_PAGE; - debug_assert!( - live_bytes <= used_bytes, - "Live bytes of all live objects ({} bytes) is larger than used pages ({} bytes), something is wrong.", - live_bytes, used_bytes - ); - info!( - "Live objects = {} bytes ({:04.1}% of {} used pages)", - live_bytes, - live_bytes as f64 * 100.0 / used_bytes as f64, - mmtk.get_plan().get_used_pages() - ); - } - - // We assume this is the only running work packet that accesses plan at the point of execution - let plan_mut: &mut dyn Plan = unsafe { mmtk.get_plan_mut() }; - plan_mut.end_of_gc(worker.tls); - - #[cfg(feature = "extreme_assertions")] - if crate::util::edge_logger::should_check_duplicate_edges(mmtk.get_plan()) { - // reset the logging info at the end of each GC - mmtk.edge_logger.reset(); - } - - // Reset the triggering information. - mmtk.state.reset_collection_trigger(); - - // Set to NotInGC after everything, and right before resuming mutators. - mmtk.set_gc_status(GcStatus::NotInGC); - ::VMCollection::resume_mutators(worker.tls); - } -} - /// This implements `ObjectTracer` by forwarding the `trace_object` calls to the wrapped /// `ProcessEdgesWork` instance. pub(crate) struct ProcessEdgesWorkTracer {