@@ -240,8 +240,11 @@ end = struct
240240 let * outdated_contents = files_in_existing key_dir in
241241
242242 let filename = key_dir // (Mtime. to_uint64_ns now |> Int64. to_string) in
243- (* 2. Write new timestamped content to cache, atomically, if needed; and
244- notify the other side, if needed *)
243+
244+ (* 2. Try to push the changes, if possible. If it's not possible because of
245+ the mode or a failure, write new timestamped content to cache,
246+ atomically; and finally notify the other side if needed *)
247+ (* Note that all queue operations must use while holding its mutex *)
245248 let persist () = persist_to ~filename ~contents in
246249 let persist_and_push () =
247250 let push () =
@@ -263,27 +266,42 @@ end = struct
263266 m " %s: Error on push. Reason: %s" __FUN (Printexc. to_string exn )
264267 )
265268 in
266- persist_and_push ()
269+ let * () = persist_and_push () in
270+ Lwt_result. return ()
271+ in
272+ let fail exn =
273+ Debug. log_backtrace exn (Backtrace. get exn ) ;
274+ Lwt_result. fail exn
275+ in
276+ let read_state_and_push on_exception () =
277+ match queue.state with
278+ | Direct ->
279+ let * result =
280+ Lwt. try_bind
281+ (fun () -> direct (uuid, now, key) contents)
282+ (function
283+ | Ok () -> Lwt_result. return () | Error exn -> on_exception exn
284+ )
285+ on_exception
286+ in
287+ Lwt. return result
288+ | Engaged ->
289+ let * () = persist_and_push () in
290+ Lwt_result. return ()
291+ | Disengaged ->
292+ let * () = persist () in
293+ Lwt_result. return ()
267294 in
295+ let on_exception = engage_and_persist in
296+ let lock_and_push () =
297+ with_lock queue.lock (read_state_and_push on_exception)
298+ in
299+
300+ let * result = lock_and_push () in
268301 let * () =
269- with_lock queue.lock (fun () ->
270- match queue.state with
271- | Direct ->
272- Lwt. try_bind
273- (fun () -> direct (uuid, now, key) contents)
274- (function
275- | Ok () ->
276- Lwt. return_unit
277- | Error exn ->
278- engage_and_persist exn
279- )
280- (function exn -> engage_and_persist exn )
281- | Engaged ->
282- persist_and_push ()
283- | Disengaged ->
284- persist ()
285- )
302+ match result with Ok () -> Lwt. return_unit | Error exn -> raise exn
286303 in
304+
287305 (* 4. Delete previous requests from filesystem *)
288306 let * _ = Lwt_list. map_p unlink_safe outdated_contents in
289307 Lwt. return_unit
0 commit comments