@@ -8,11 +8,14 @@ use super::DataStore;
88use crate :: context:: OpContext ;
99use crate :: db:: error:: public_error_from_diesel;
1010use crate :: db:: error:: ErrorHandler ;
11+ use crate :: db:: model:: SqlU8 ;
1112use crate :: db:: model:: WebhookDelivery ;
1213use crate :: db:: model:: WebhookDeliveryAttempt ;
1314use crate :: db:: schema:: webhook_delivery:: dsl;
15+ use crate :: db:: schema:: webhook_delivery_attempt:: dsl as attempt_dsl;
1416use crate :: db:: schema:: webhook_event:: dsl as event_dsl;
1517use crate :: db:: update_and_check:: UpdateAndCheck ;
18+ use crate :: db:: update_and_check:: UpdateAndQueryResult ;
1619use crate :: db:: update_and_check:: UpdateStatus ;
1720use async_bb8_diesel:: AsyncRunQueryDsl ;
1821use chrono:: TimeDelta ;
@@ -43,13 +46,13 @@ impl DataStore {
4346 . filter ( dsl:: time_completed. is_null ( ) )
4447 . filter ( dsl:: rx_id. eq ( rx_id. into_untyped_uuid ( ) ) )
4548 . filter (
46- ( dsl:: time_delivery_started
47- . is_null ( )
48- . and ( dsl :: deliverator_id . is_null ( ) ) )
49- . or ( dsl:: time_delivery_started. is_not_null ( ) . and (
50- dsl :: time_delivery_started
51- . le ( now . nullable ( ) - lease_timeout ) ,
52- ) ) ,
49+ ( dsl:: deliverator_id . is_null ( ) ) . or ( dsl :: time_delivery_started
50+ . is_not_null ( )
51+ . and (
52+ dsl:: time_delivery_started
53+ . le ( now . nullable ( ) - lease_timeout ) ,
54+ ) ) ,
55+ // TODO(eliza): retry backoffs...?
5356 )
5457 . order_by ( dsl:: time_created. asc ( ) )
5558 // Join with the `webhook_event` table to get the event class, which
@@ -78,13 +81,12 @@ impl DataStore {
7881 . filter ( dsl:: time_completed. is_null ( ) )
7982 . filter ( dsl:: id. eq ( id) )
8083 . filter (
81- ( dsl:: time_delivery_started
82- . is_null ( )
83- . and ( dsl:: deliverator_id. is_null ( ) ) )
84- . or ( dsl:: time_delivery_started. is_not_null ( ) . and (
85- dsl:: time_delivery_started
86- . le ( now. nullable ( ) - lease_timeout) ,
87- ) ) ,
84+ dsl:: deliverator_id. is_null ( ) . or ( dsl:: time_delivery_started
85+ . is_not_null ( )
86+ . and (
87+ dsl:: time_delivery_started
88+ . le ( now. nullable ( ) - lease_timeout) ,
89+ ) ) ,
8890 )
8991 . set ( (
9092 dsl:: time_delivery_started. eq ( now. nullable ( ) ) ,
@@ -128,8 +130,79 @@ impl DataStore {
128130 opctx : & OpContext ,
129131 delivery : & WebhookDelivery ,
130132 nexus_id : & OmicronZoneUuid ,
131- result : & WebhookDeliveryAttempt ,
133+ attempt : & WebhookDeliveryAttempt ,
132134 ) -> Result < ( ) , Error > {
133- Err ( Error :: internal_error ( "TODO ELIZA DO THIS PART" ) )
135+ const MAX_ATTEMPTS : u8 = 4 ;
136+ let conn = self . pool_connection_authorized ( opctx) . await ?;
137+ diesel:: insert_into ( attempt_dsl:: webhook_delivery_attempt)
138+ . values ( attempt. clone ( ) )
139+ . on_conflict ( ( attempt_dsl:: delivery_id, attempt_dsl:: attempt) )
140+ . do_nothing ( )
141+ . returning ( WebhookDeliveryAttempt :: as_returning ( ) )
142+ . execute_async ( & * conn)
143+ . await
144+ . map_err ( |e| public_error_from_diesel ( e, ErrorHandler :: Server ) ) ?;
145+
146+ // Has the delivery either completed successfully or exhausted all of
147+ // its retry attempts?
148+ let succeeded =
149+ attempt. result == nexus_db_model:: WebhookDeliveryResult :: Succeeded ;
150+ let failed_permanently = * attempt. attempt >= MAX_ATTEMPTS ;
151+ let ( completed, new_nexus_id) = if succeeded || failed_permanently {
152+ // If the delivery has succeeded or failed permanently, set the
153+ // "time_completed" timestamp to mark it as finished. Also, leave
154+ // the delivering Nexus ID in place to maintain a record of who
155+ // finished the delivery.
156+ ( Some ( Utc :: now ( ) ) , Some ( nexus_id. into_untyped_uuid ( ) ) )
157+ } else {
158+ // Otherwise, "unlock" the delivery for other nexii.
159+ ( None , None )
160+ } ;
161+ let prev_attempts = SqlU8 :: new ( ( * attempt. attempt ) - 1 ) ;
162+ let UpdateAndQueryResult { status, found } =
163+ diesel:: update ( dsl:: webhook_delivery)
164+ . filter ( dsl:: id. eq ( delivery. id . into_untyped_uuid ( ) ) )
165+ . filter ( dsl:: deliverator_id. eq ( nexus_id. into_untyped_uuid ( ) ) )
166+ . filter ( dsl:: attempts. eq ( prev_attempts) )
167+ // Don't mark a delivery as completed if it's already completed!
168+ . filter ( dsl:: time_completed. is_null ( ) )
169+ . set ( (
170+ dsl:: time_completed. eq ( completed) ,
171+ // XXX(eliza): hmm this might be racy; we should probably increment this
172+ // in place and use it to determine the attempt number?
173+ dsl:: attempts. eq ( attempt. attempt ) ,
174+ dsl:: deliverator_id. eq ( new_nexus_id) ,
175+ ) )
176+ . check_if_exists :: < WebhookDelivery > ( delivery. id )
177+ . execute_and_check ( & conn)
178+ . await
179+ . map_err ( |e| {
180+ public_error_from_diesel ( e, ErrorHandler :: Server )
181+ } ) ?;
182+
183+ if status == UpdateStatus :: Updated {
184+ return Ok ( ( ) ) ;
185+ }
186+
187+ if let Some ( other_nexus_id) = found. deliverator_id {
188+ return Err ( Error :: conflict ( format ! (
189+ "cannot mark delivery completed, as {other_nexus_id:?} was \
190+ attempting to deliver it",
191+ ) ) ) ;
192+ }
193+
194+ if found. time_completed . is_some ( ) {
195+ return Err ( Error :: conflict (
196+ "delivery was already marked as completed" ,
197+ ) ) ;
198+ }
199+
200+ if found. attempts != prev_attempts {
201+ return Err ( Error :: conflict ( "wrong number of delivery attempts" ) ) ;
202+ }
203+
204+ Err ( Error :: internal_error (
205+ "couldn't update delivery for some other reason i didn't think of here..."
206+ ) )
134207 }
135208}
0 commit comments