@@ -15,6 +15,7 @@ use futures::FutureExt;
1515use futures:: StreamExt ;
1616use futures:: future:: BoxFuture ;
1717use futures:: stream:: FuturesUnordered ;
18+ use nexus_background_task_interface:: Activator ;
1819use nexus_db_queries:: context:: OpContext ;
1920use nexus_types:: internal_api:: views:: ActivationReason ;
2021use nexus_types:: internal_api:: views:: CurrentStatus ;
@@ -23,12 +24,8 @@ use nexus_types::internal_api::views::LastResult;
2324use nexus_types:: internal_api:: views:: LastResultCompleted ;
2425use nexus_types:: internal_api:: views:: TaskStatus ;
2526use std:: collections:: BTreeMap ;
26- use std:: sync:: Arc ;
27- use std:: sync:: atomic:: AtomicBool ;
28- use std:: sync:: atomic:: Ordering ;
2927use std:: time:: Duration ;
3028use std:: time:: Instant ;
31- use tokio:: sync:: Notify ;
3229use tokio:: sync:: watch;
3330use tokio:: time:: MissedTickBehavior ;
3431
@@ -118,17 +115,10 @@ impl Driver {
118115 // requested. The caller provides their own Activator, which just
119116 // provides a specific Notify for us to use here.
120117 let activator = taskdef. activator ;
121- if let Err ( previous) = activator. 0 . wired_up . compare_exchange (
122- false ,
123- true ,
124- Ordering :: SeqCst ,
125- Ordering :: SeqCst ,
126- ) {
118+ if let Err ( error) = activator. mark_wired_up ( ) {
127119 panic ! (
128- "attempted to wire up the same background task handle \
129- twice (previous \" wired_up\" = {}): currently attempting \
130- to wire it up to task {:?}",
131- previous, name
120+ "{error}: currently attempting to wire it up to task {:?}" ,
121+ name
132122 ) ;
133123 }
134124
@@ -141,7 +131,7 @@ impl Driver {
141131 let task_exec = TaskExec :: new (
142132 taskdef. period ,
143133 taskdef. task_impl ,
144- Arc :: clone ( & activator. 0 ) ,
134+ activator. clone ( ) ,
145135 opctx,
146136 status_tx,
147137 ) ;
@@ -241,59 +231,6 @@ pub struct TaskDefinition<'a, N: ToString, D: ToString> {
241231 pub activator : & ' a Activator ,
242232}
243233
244- /// Activates a background task
245- ///
246- /// See [`crate::app::background`] module-level documentation for more on what
247- /// that means.
248- ///
249- /// Activators are created with [`Activator::new()`] and then wired up to
250- /// specific background tasks using [`Driver::register()`]. If you call
251- /// `Activator::activate()` before the activator is wired up to a background
252- /// task, then once the Activator _is_ wired up to a task, that task will
253- /// immediately be activated.
254- ///
255- /// Activators are designed specifically so they can be created before the
256- /// corresponding task has been created and then wired up with just an
257- /// `&Activator` (not a `&mut Activator`). See the [`super::init`] module-level
258- /// documentation for more on why.
259- #[ derive( Clone ) ]
260- pub struct Activator ( Arc < ActivatorInner > ) ;
261-
262- /// Shared state for an `Activator`.
263- struct ActivatorInner {
264- pub ( super ) notify : Notify ,
265- pub ( super ) wired_up : AtomicBool ,
266- }
267-
268- impl Activator {
269- /// Create an activator that is not yet wired up to any background task
270- pub fn new ( ) -> Activator {
271- Self ( Arc :: new ( ActivatorInner {
272- notify : Notify :: new ( ) ,
273- wired_up : AtomicBool :: new ( false ) ,
274- } ) )
275- }
276-
277- /// Activate the background task that this Activator has been wired up to
278- ///
279- /// If this Activator has not yet been wired up with [`Driver::register()`],
280- /// then whenever it _is_ wired up, that task will be immediately activated.
281- pub fn activate ( & self ) {
282- self . 0 . notify . notify_one ( ) ;
283- }
284- }
285-
286- impl ActivatorInner {
287- async fn activated ( & self ) {
288- debug_assert ! (
289- self . wired_up. load( Ordering :: SeqCst ) ,
290- "nothing should await activation from an activator that hasn't \
291- been wired up"
292- ) ;
293- self . notify . notified ( ) . await
294- }
295- }
296-
297234/// Encapsulates state needed by the background tokio task to manage activation
298235/// of the background task
299236struct TaskExec {
@@ -303,7 +240,7 @@ struct TaskExec {
303240 imp : Box < dyn BackgroundTask > ,
304241 /// used to receive notifications from the Driver that someone has requested
305242 /// explicit activation
306- activation : Arc < ActivatorInner > ,
243+ activation : Activator ,
307244 /// passed through to the background task impl when activated
308245 opctx : OpContext ,
309246 /// used to send current status back to the Driver
@@ -316,7 +253,7 @@ impl TaskExec {
316253 fn new (
317254 period : Duration ,
318255 imp : Box < dyn BackgroundTask > ,
319- activation : Arc < ActivatorInner > ,
256+ activation : Activator ,
320257 opctx : OpContext ,
321258 status_tx : watch:: Sender < TaskStatus > ,
322259 ) -> TaskExec {
0 commit comments