Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9799d78
Disk Cache: design an API to disable GC collection
glyh Jul 21, 2025
9dfdf9b
Snark Pool RC: add all_referenced_work
glyh Jul 22, 2025
c61e0ae
Disk Cache & Proof Cache Tag: Allow casting ID to and from binary repr
glyh Jul 22, 2025
604ddb2
Exit Handler: enforce lockfile cleanup handler to run last
glyh Jul 23, 2025
6f64c02
Network Pool > Snark Pool: allow pool to be loaded from disk
glyh Jul 23, 2025
788704c
Disk Cache: allow initializing persistent information from disk to co…
glyh Jul 23, 2025
83af34b
Exit Handler: add new level: snark pool
glyh Jul 23, 2025
327b44a
Disk Cache: add persistence info needed to reload disk cache across d…
glyh Jul 23, 2025
1dbc4af
Enable disk cache, store snark pool only once on shutdown
glyh Jul 23, 2025
0c7ceb9
Disk Cache & Snark Pool: log on success
glyh Jul 24, 2025
e71307c
Snark Pool Persistence: store periodically
glyh Jul 24, 2025
a7a5868
Verifier: log the reason of a worker termination
glyh Jul 24, 2025
2c25652
Add error message when unwrapping non-existent disk-cached proof
glyh Jul 24, 2025
141c0c0
Snark Pool: log totoal number of SNARKs saved on disk
glyh Jul 24, 2025
41f00db
Disk Cache: expose register_gc so deserialized ID can be garbage coll…
glyh Jul 24, 2025
83f22ec
Disk Cache: expose [try_get_deserialized] instead of [register_gc] fo…
glyh Jul 24, 2025
14457e0
Snark Pool: check all proofs read from disk actual exists
glyh Jul 24, 2025
15f1ced
Disk Cache > Lmdb: don't try to be smart and reuse disk cache keys
glyh Jul 24, 2025
60b5ceb
Disk Cache: add TODO on index growth
glyh Jul 24, 2025
06c3446
PR 17559: Add changelog
glyh Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/17559.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement snark pool persistence. Now restarting daemon will catch up to SNARK works from a previous run, so it's unlikely to be hit by unreasonbly high-fee SNARKs or producing a block without coinbase transaction.
96 changes: 83 additions & 13 deletions src/lib/disk_cache/filesystem/disk_cache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,106 @@ module Make (B : sig
include Binable.S
end) =
struct
Copy link
Member Author

@glyh glyh Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may consider removing FS disk cache as a whole before merging:

  • We're not using it
  • It performs worse than LMDB
  • It's a burden to implement similar logic twice here

type t = string * int ref
type t =
{ root : string
; next_idx : int ref
; eviction_freezed : bool ref
; disk_meta_location : string option
}

type id = { idx : int }
type persistence = int [@@deriving bin_io_unversioned]

let initialize path ~logger =
type id = { idx : int } [@@deriving bin_io_unversioned]

let freeze_eviction_and_snapshot ~logger
{ eviction_freezed; next_idx; disk_meta_location; _ } =
eviction_freezed := true ;
match disk_meta_location with
| None ->
[%log info]
"No metadata location is set for FS disk cache, not saving disk \
cache persistence information" ;
Async.Deferred.unit
| Some disk_meta_location ->
Async_unix.Writer.save_bin_prot disk_meta_location
bin_writer_persistence !next_idx

let initialize path ~logger ?disk_meta_location () =
let open Async in
let open Deferred.Let_syntax in
let%bind next_idx =
match disk_meta_location with
| None ->
return 0
| Some disk_meta_location -> (
match%map
Async.Reader.load_bin_prot disk_meta_location bin_reader_persistence
with
| Error e ->
[%log warn]
"Failed to read FS disk cache persistence information from \
disk, initializing a fresh cache"
~metadata:
[ ("location", `String disk_meta_location)
; ("reason", `String (Error.to_string_hum e))
] ;
0
| Ok idx ->
idx )
in
Async.Deferred.Result.map (Disk_cache_utils.initialize_dir path ~logger)
~f:(fun path -> (path, ref 0))
~f:(fun root ->
let cache =
{ root
; next_idx = ref next_idx
; eviction_freezed = ref false
; disk_meta_location
}
in

Option.iter disk_meta_location ~f:(fun _ ->
Mina_stdlib_unix.Exit_handlers.register_async_shutdown_handler
~logger
~description:
"Shutting down LMDB Disk Cache GC Eviction and store \
persistence info needed to reload from disk" (fun () ->
freeze_eviction_and_snapshot ~logger cache ) ) ;

cache )

let path root i = root ^ Filename.dir_sep ^ Int.to_string i

let get ((root, _) : t) (id : id) : B.t =
let get ({ root; _ } : t) (id : id) : B.t =
(* Read from the file. *)
In_channel.with_file ~binary:true (path root id.idx) ~f:(fun chan ->
let str = In_channel.input_all chan in
Binable.of_string (module B) str )

let put ((root, next_idx) : t) x : id =
let register_gc ~(id : id) ({ root; eviction_freezed; _ } : t) =
let { idx } = id in
(* When this reference is GC'd, delete the file. *)
Core.Gc.Expert.add_finalizer_last_exn id (fun () ->
if not !eviction_freezed then
(* Ignore errors: if a directory is deleted, it's ok. *)
try Core.Unix.unlink (path root idx) with _ -> () )

let try_get_deserialized (t : t) (id : id) : B.t option =
try
let result = Some (get t id) in
register_gc ~id t ; result
with Sys_error _ -> None

let put ({ root; next_idx; _ } as t : t) x : id =
let idx = !next_idx in
incr next_idx ;
let res = { idx } in
(* When this reference is GC'd, delete the file. *)
Core.Gc.Expert.add_finalizer_last_exn res (fun () ->
(* Ignore errors: if a directory is deleted, it's ok. *)
try Core.Unix.unlink (path root idx) with _ -> () ) ;
let id = { idx } in
register_gc ~id t ;
(* Write the proof to the file. *)
Out_channel.with_file ~binary:true (path root idx) ~f:(fun chan ->
Out_channel.output_string chan @@ Binable.to_string (module B) x ) ;
res
id

let count ((path, _) : t) = Sys.ls_dir path |> List.length
let count ({ root; _ } : t) = Sys.ls_dir root |> List.length
end

let%test_module "disk_cache filesystem" =
Expand Down
9 changes: 6 additions & 3 deletions src/lib/disk_cache/identity/disk_cache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ open Async_kernel
open Core_kernel

module Make (Data : sig
type t
type t [@@deriving bin_io]
end) =
struct
type t = unit

type id = Data.t
type id = Data.t [@@deriving bin_io_unversioned]

let initialize _path ~logger:_ = Deferred.Result.return ()
let initialize _path ~logger:_ ?disk_meta_location:_ () =
Deferred.Result.return ()

let try_get_deserialized _ v = Some v

let get () = ident

Expand Down
13 changes: 11 additions & 2 deletions src/lib/disk_cache/intf/disk_cache_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,28 @@ module type S = sig

type t

(** Initialize the on-disk cache explicitly before interactions with it take place. *)
(* TODO: LMDB & File system disk cache now grows infinitely on index, we need to fix it. *)

(** Initialize the on-disk cache explicitly before interactions with it take
place. If disk_meta_location is set, will try to read from metadata at
that location, so to reuse disk cache from a previous run. *)
val initialize :
string
-> logger:Logger.t
-> ?disk_meta_location:string
-> unit
-> (t, [> `Initialization_error of Error.t ]) Deferred.Result.t

type id
type id [@@deriving bin_io]

(** Put the value to disk, return an identifier that is associated with a special handler in GC. *)
val put : t -> Data.t -> id

(** Read from the cache, crashing if the value cannot be found. *)
val get : t -> id -> Data.t

(** We created an ID without invoking `put`, trying to test if the ID exist, if so, register GC *)
val try_get_deserialized : t -> id -> Data.t option
end

module type S_with_count = sig
Expand Down
134 changes: 98 additions & 36 deletions src/lib/disk_cache/lmdb/disk_cache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,58 +24,120 @@ module Make (Data : Binable.S) = struct
(** A list of ids that are no longer reachable from OCaml runtime, but
haven't been cleared inside the LMDB disk cache *)
; queue_guard : Error_checking_mutex.t
; eviction_freezed : bool ref
; disk_meta_location : string option
}

type persistence = int [@@deriving bin_io_unversioned]

(** How big can the queue [reusable_keys] be before we do a cleanup *)
let reuse_size_limit = 512

let initialize path ~logger =
let freeze_eviction_and_snapshot ~logger
({ eviction_freezed; counter; disk_meta_location; _ } : t) =
eviction_freezed := true ;
match disk_meta_location with
| None ->
[%log info]
"No metadata location is set for LMDB disk cache, not saving disk \
cache persistence information" ;
Async.Deferred.unit
| Some disk_meta_location ->
Async_unix.Writer.save_bin_prot disk_meta_location
bin_writer_persistence !counter

let initialize path ~logger ?disk_meta_location () =
let open Async in
let open Deferred.Let_syntax in
let%bind counter =
match disk_meta_location with
| None ->
return 0
| Some disk_meta_location -> (
match%map
Async.Reader.load_bin_prot disk_meta_location bin_reader_persistence
with
| Error e ->
[%log warn]
"Failed to read LMDB disk cache persistence information from \
disk $location, initializing a fresh cache"
~metadata:
[ ("location", `String disk_meta_location)
; ("reason", `String (Error.to_string_hum e))
] ;
0
| Ok idx ->
[%log info]
"Successfully restored LMDB disk cacahe persistence from disk \
$location"
~metadata:[ ("location", `String disk_meta_location) ] ;
idx )
in
Async.Deferred.Result.map (Disk_cache_utils.initialize_dir path ~logger)
~f:(fun path ->
let env, db = Rw.create path in
{ env
; db
; counter = ref 0
; reusable_keys = Queue.create ()
; queue_guard = Error_checking_mutex.create ()
} )

type id = { idx : int }
let cache =
{ env
; db
; counter = ref counter
; reusable_keys = Queue.create ()
; queue_guard = Error_checking_mutex.create ()
; eviction_freezed = ref false
; disk_meta_location
}
in

Option.iter disk_meta_location ~f:(fun _ ->
Mina_stdlib_unix.Exit_handlers.register_async_shutdown_handler
~logger
~description:
"Shutting down LMDB Disk Cache GC Eviction and store \
persistence info needed to reload from disk" (fun () ->
freeze_eviction_and_snapshot ~logger cache ) ) ;
cache )

type id = { idx : int } [@@deriving bin_io_unversioned]

let get ({ env; db; _ } : t) ({ idx } : id) : Data.t =
Rw.get ~env db idx |> Option.value_exn

let put ({ env; db; counter; reusable_keys; queue_guard } : t) (x : Data.t) :
id =
let idx =
match
Error_checking_mutex.critical_section queue_guard ~f:(fun () ->
Queue.dequeue reusable_keys )
with
| None ->
(* We don't have reusable keys, assign a new one nobody ever used *)
incr counter ; !counter - 1
| Some reused_key ->
(* Any key inside [reusable_keys] is marked as garbage by GC, so we're
free to use them *)
reused_key
in
let res = { idx } in
Rw.get ~env db idx
|> Option.value_exn
~message:
(Printf.sprintf "Trying to access non-existent cache item %d" idx)

let register_gc ~(id : id)
({ reusable_keys; queue_guard; eviction_freezed; _ } : t) =
let { idx } = id in
(* When this reference is GC'd, delete the file. *)
Gc.Expert.add_finalizer_last_exn res (fun () ->
(* The actual deletion is delayed, as GC maybe triggered in LMDB's
critical section. LMDB critical section then will be re-entered if
it's invoked directly in a GC hook.
This causes mutex double-acquiring and node freezes. *)
Error_checking_mutex.critical_section queue_guard ~f:(fun () ->
Queue.enqueue reusable_keys idx ) ) ;

Gc.Expert.add_finalizer_last_exn id (fun () ->
if not !eviction_freezed then
(* The actual deletion is delayed, as GC maybe triggered in LMDB's
critical section. LMDB critical section then will be re-entered if
it's invoked directly in a GC hook.
This causes mutex double-acquiring and node freezes. *)
Error_checking_mutex.critical_section queue_guard ~f:(fun () ->
Queue.enqueue reusable_keys idx ) )

let try_get_deserialized ({ env; db; _ } as t : t) ({ idx } as id : id) :
Data.t option =
Rw.get ~env db idx |> Option.map ~f:(fun data -> register_gc ~id t ; data)

(* WARN: Don't try to be smart here and reuse LMDB keys, as SNARK pool
persistence will try to read from disk and trusting the ID they have
correspond to the proofs in Cache DB. If reused, we need a mechanism to
sync IDs between the 2 which is complex.
*)
let put ({ env; db; counter; reusable_keys; queue_guard; _ } as t : t)
(x : Data.t) : id =
let idx = !counter in
incr counter ;
let id = { idx } in
register_gc ~id t ;
Error_checking_mutex.critical_section queue_guard ~f:(fun () ->
if Queue.length reusable_keys >= reuse_size_limit then (
Rw.batch_remove ~env db reusable_keys ;
Queue.clear reusable_keys ) ) ;
Rw.set ~env db idx x ;
res
id

let iteri ({ env; db; _ } : t) ~f = Rw.iter ~env db ~f

Expand Down
2 changes: 1 addition & 1 deletion src/lib/disk_cache/test/test_cache_deadlock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ end
let run_test_with_cache (module Cache_impl : Cache_intf) ~timeout_seconds
~tmpdir =
let module Cache = Cache_impl.Make (Evil_data) in
let%bind cache_result = Cache.initialize tmpdir ~logger:(Logger.null ()) in
let%bind cache_result = Cache.initialize tmpdir ~logger:(Logger.null ()) () in
let cache =
match cache_result with
| Ok cache ->
Expand Down
6 changes: 3 additions & 3 deletions src/lib/disk_cache/test_lib/disk_cache_test_lib.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ module Make_impl (Cache : Disk_cache_intf.S_with_count with module Data := Mock)

let initialize_cache_or_fail tmpd ~logger =
let open Deferred.Let_syntax in
let%bind cache_res = Cache.initialize tmpd ~logger in
let%bind cache_res = Cache.initialize tmpd ~logger () in
match cache_res with
| Ok cache ->
return cache
Expand Down Expand Up @@ -107,7 +107,7 @@ module Make_impl (Cache : Disk_cache_intf.S_with_count with module Data := Mock)
~f:(remove_data_on_gc_impl ~gc_strict)

let initialize_and_expect_failure path ~logger =
let%bind cache_res = Cache.initialize path ~logger in
let%bind cache_res = Cache.initialize path ~logger () in
match cache_res with
| Ok _ ->
failwith "unexpected initialization success"
Expand All @@ -130,7 +130,7 @@ module Make_impl (Cache : Disk_cache_intf.S_with_count with module Data := Mock)
Core.Unix.mkdir some_dir ;
let dir_symlink = tmp_dir ^/ "dir_link" in
Core.Unix.symlink ~target:some_dir_name ~link_name:dir_symlink ;
Cache.initialize dir_symlink ~logger
Cache.initialize dir_symlink ~logger ()
>>| function
| Ok _ ->
()
Expand Down
15 changes: 8 additions & 7 deletions src/lib/mina_lib/conf_dir.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ let rec check_and_set_lockfile ~logger conf_dir =
| Ok () ->
[%log debug] "Created daemon lockfile $lockfile"
~metadata:[ ("lockfile", `String lockfile) ] ;
Mina_stdlib_unix.Exit_handlers.register_async_shutdown_handler ~logger
~description:"Remove daemon lockfile" (fun () ->
match%bind Sys.file_exists lockfile with
| `Yes ->
Unix.unlink lockfile
| _ ->
return () )
Mina_stdlib_unix.Exit_handlers.(
register_async_shutdown_handler ~priority:Priority.Exclusion ~logger
~description:"Remove daemon lockfile" (fun () ->
match%bind Sys.file_exists lockfile with
| `Yes ->
Unix.unlink lockfile
| _ ->
return () ))
| Error exn ->
Error.tag_arg (Error.of_exn exn)
"Could not create the daemon lockfile" ("lockfile", lockfile)
Expand Down
Loading