Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ocaml/database/db_backend.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ let db_FLUSH_TIMER = 2.0

(* --------------------- Util functions on db datastructures *)

let master_database = ref (Db_cache_types.Database.make Schema.empty)
let master_database = Atomic.make (Db_cache_types.Database.make Schema.empty)

let __test_set_master_database db = master_database := db
let __test_set_master_database db = Atomic.set master_database db

let make () = Db_ref.in_memory (ref master_database)
let make () = Db_ref.in_memory master_database

(* !!! Right now this is called at cache population time. It would probably be preferable to call it on flush time instead, so we
don't waste writes storing non-persistent field values on disk.. At the moment there's not much to worry about, since there are
Expand Down
16 changes: 3 additions & 13 deletions ocaml/database/db_connections.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,12 @@ let preferred_write_db () = List.hd (Db_conn_store.read_db_connections ())
let exit_on_next_flush = ref false

(* db flushing thread refcount: the last thread out of the door does the exit(0) when flush_on_exit is true *)
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute
let db_flush_thread_refcount = Atomic.make 0

let db_flush_thread_refcount_m = Mutex.create ()

let db_flush_thread_refcount = ref 0

let inc_db_flush_thread_refcount () =
with_lock db_flush_thread_refcount_m (fun () ->
db_flush_thread_refcount := !db_flush_thread_refcount + 1
)
let inc_db_flush_thread_refcount () = Atomic.incr db_flush_thread_refcount

let dec_and_read_db_flush_thread_refcount () =
with_lock db_flush_thread_refcount_m (fun () ->
db_flush_thread_refcount := !db_flush_thread_refcount - 1 ;
!db_flush_thread_refcount
)
Atomic.fetch_and_add db_flush_thread_refcount (-1)

let pre_exit_hook () =
(* We're about to exit. Close the active redo logs. *)
Expand Down
17 changes: 5 additions & 12 deletions ocaml/database/db_lock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ module ReentrantLock : REENTRANT_LOCK = struct
type t = {
holder: tid option Atomic.t (* The holder of the lock *)
; mutable holds: int (* How many holds the holder has on the lock *)
; lock: Mutex.t (* Barrier to signal waiting threads *)
; condition: Condition.t
(* Waiting threads are signalled via this condition to reattempt to acquire the lock *)
; lock: Mutex.t (* Mutex held by the holder thread *)
; statistics: statistics (* Bookkeeping of time taken to acquire lock *)
}

Expand All @@ -73,7 +71,6 @@ module ReentrantLock : REENTRANT_LOCK = struct
holder= Atomic.make None
; holds= 0
; lock= Mutex.create ()
; condition= Condition.create ()
; statistics= create_statistics ()
}

Expand All @@ -94,17 +91,15 @@ module ReentrantLock : REENTRANT_LOCK = struct
let intended = Some me in
let counter = Mtime_clock.counter () in
Mutex.lock l.lock ;
while not (Atomic.compare_and_set l.holder None intended) do
Condition.wait l.condition l.lock
done ;
Atomic.set l.holder intended ;
lock_acquired () ;
let stats = l.statistics in
let delta = Clock.Timer.span_to_s (Mtime_clock.count counter) in
stats.total_time <- stats.total_time +. delta ;
stats.min_time <- Float.min delta stats.min_time ;
stats.max_time <- Float.max delta stats.max_time ;
stats.acquires <- stats.acquires + 1 ;
Mutex.unlock l.lock ;
(* do not unlock, it will be done when holds reaches 0 instead *)
l.holds <- 1

let unlock l =
Expand All @@ -114,10 +109,8 @@ module ReentrantLock : REENTRANT_LOCK = struct
l.holds <- l.holds - 1 ;
if l.holds = 0 then (
let () = Atomic.set l.holder None in
Mutex.lock l.lock ;
Condition.signal l.condition ;
Mutex.unlock l.lock ;
lock_released ()
(* the lock is held (acquired in [lock]), we only need to unlock *)
Mutex.unlock l.lock ; lock_released ()
)
| _ ->
failwith
Expand Down
8 changes: 4 additions & 4 deletions ocaml/database/db_ref.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@
* GNU Lesser General Public License for more details.
*)

type t = In_memory of Db_cache_types.Database.t ref ref | Remote
type t = In_memory of Db_cache_types.Database.t Atomic.t | Remote

exception Database_not_in_memory

let in_memory (rf : Db_cache_types.Database.t ref ref) = In_memory rf
let in_memory (rf : Db_cache_types.Database.t Atomic.t) = In_memory rf

let get_database = function
| In_memory x ->
!(!x)
Atomic.get x
| Remote ->
raise Database_not_in_memory

let update_database t f =
match t with
| In_memory x ->
let d : Db_cache_types.Database.t = f (get_database t) in
!x := d
Atomic.set x d
| Remote ->
raise Database_not_in_memory
4 changes: 2 additions & 2 deletions ocaml/database/db_ref.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
* GNU Lesser General Public License for more details.
*)

type t = In_memory of Db_cache_types.Database.t ref ref | Remote
type t = In_memory of Db_cache_types.Database.t Atomic.t | Remote

exception Database_not_in_memory

val in_memory : Db_cache_types.Database.t ref ref -> t
val in_memory : Db_cache_types.Database.t Atomic.t -> t

val get_database : t -> Db_cache_types.Database.t

Expand Down
7 changes: 2 additions & 5 deletions ocaml/database/db_remote_cache_access_v1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ module DBCacheRemoteListener = struct

exception DBCacheListenerUnknownMessageName of string

let ctr_mutex = Mutex.create ()

let calls_processed = ref 0
let calls_processed = Atomic.make 0

let success xml =
let resp = XMLRPC.To.array [XMLRPC.To.string "success"; xml] in
Expand All @@ -34,8 +32,7 @@ module DBCacheRemoteListener = struct
Note that, although the messages still contain the pool_secret for historical reasons,
access has already been applied by the RBAC code in Xapi_http.add_handler. *)
let process_xmlrpc xml =
let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute in
with_lock ctr_mutex (fun () -> calls_processed := !calls_processed + 1) ;
Atomic.incr calls_processed ;
let fn_name, args =
match XMLRPC.From.array (fun x -> x) xml with
| [fn_name; _; args] ->
Expand Down
26 changes: 9 additions & 17 deletions ocaml/database/redo_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ type redo_log_conf = {
; backoff_delay: int ref
; sock: Unix.file_descr option ref
; pid: (Forkhelpers.pidty * string * string) option ref
; dying_processes_mutex: Mutex.t
; num_dying_processes: int ref
; num_dying_processes: int Atomic.t
; mutex: Mutex.t (** exclusive access to this configuration *)
}

Expand Down Expand Up @@ -585,14 +584,10 @@ let shutdown log =
(Thread.create
(fun () ->
D.debug "Waiting for I/O process with pid %d to die..." ipid ;
with_lock log.dying_processes_mutex (fun () ->
log.num_dying_processes := !(log.num_dying_processes) + 1
) ;
Atomic.incr log.num_dying_processes ;
ignore (Forkhelpers.waitpid p) ;
D.debug "Finished waiting for process with pid %d" ipid ;
with_lock log.dying_processes_mutex (fun () ->
log.num_dying_processes := !(log.num_dying_processes) - 1
)
Atomic.decr log.num_dying_processes
)
()
) ;
Expand Down Expand Up @@ -633,13 +628,11 @@ let startup log =
() (* We're already started *)
| None -> (
(* Don't start if there are already some processes hanging around *)
with_lock log.dying_processes_mutex (fun () ->
if
!(log.num_dying_processes)
>= Db_globs.redo_log_max_dying_processes
then
raise TooManyProcesses
) ;
if
Atomic.get log.num_dying_processes
>= Db_globs.redo_log_max_dying_processes
then
raise TooManyProcesses ;
match !(log.device) with
| None ->
D.info "Could not find block device" ;
Expand Down Expand Up @@ -793,8 +786,7 @@ let create ~name ~state_change_callback ~read_only =
; backoff_delay= ref Db_globs.redo_log_initial_backoff_delay
; sock= ref None
; pid= ref None
; dying_processes_mutex= Mutex.create ()
; num_dying_processes= ref 0
; num_dying_processes= Atomic.make 0
; mutex= Mutex.create ()
}
in
Expand Down
72 changes: 68 additions & 4 deletions ocaml/tests/bench/bench_pool_field.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ open Bechamel
let () =
Suite_init.harness_init () ;
Printexc.record_backtrace true ;
Debug.set_level Syslog.Emerg
Debug.set_level Syslog.Emerg ;
Xapi_event.register_hooks ()

let date = "20250102T03:04:05Z"

Expand All @@ -36,11 +37,21 @@ let json_str =

let __context = Test_common.make_test_database ()

let host = Test_common.make_host ~__context ()

let pool = Test_common.make_pool ~__context ~master:host ()

let () =
let host = Test_common.make_host ~__context () in
let pool = Test_common.make_pool ~__context ~master:host () in
Db.Pool.set_license_server ~__context ~self:pool
~value:[("jsontest", json_str)]
~value:[("jsontest", json_str)] ;
let open Xapi_database in
Db_ref.update_database
(Context.database_of __context)
(Db_cache_types.Database.register_callback "redo_log"
Redo_log.database_callback
)

let vm = Test_common.make_vm ~__context ~name_label:"test" ()

let get_all () : API.pool_t list =
Db.Pool.get_all_records ~__context |> List.map snd
Expand All @@ -64,6 +75,51 @@ let date_of_iso8601 () = Clock.Date.of_iso8601 date
let local_session_hook () =
Xapi_local_session.local_session_hook ~__context ~session_id:Ref.null

let atomic = Atomic.make 0

let atomic_inc () = Atomic.incr atomic

let mutex = Mutex.create ()

let locked_ref = ref 0

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

let inc_locked () = incr locked_ref

let inc_with_mutex () = with_lock mutex inc_locked

let noop () = Sys.opaque_identity ()

let db_lock_uncontended () : unit = Xapi_database.Db_lock.with_lock noop

let event =
let open Event_types in
{
id= "id"
; ts= "1000"
; ty= "test"
; op= `add
; reference= "test"
; snapshot= Some (Rpc.Dict [])
}

let test_rpc_of_event () = Event_types.rpc_of_event event

let counter = Atomic.make 0

let test_set_vm_nvram () : unit =
let c = Atomic.fetch_and_add counter 1 mod 0x7F in
(* use different value each iteration, otherwise it becomes a noop *)
Db.VM.set_NVRAM ~__context ~self:vm
~value:[("test", String.make 32768 (Char.chr @@ c))]

let test_db_pool_write () =
let c = Atomic.fetch_and_add counter 1 mod 0x7F in
Db.Pool.set_tags ~__context ~self:pool ~value:[String.make 16 (Char.chr @@ c)]

let test_db_pool_read () = Db.Pool.get_tags ~__context ~self:pool

let benchmarks =
[
Test.make ~name:"local_session_hook" (Staged.stage local_session_hook)
Expand All @@ -73,6 +129,14 @@ let benchmarks =
; Test.make ~name:"Db.Pool.get_all_records" (Staged.stage get_all)
; Test.make ~name:"pool_t -> Rpc.t" (Staged.stage serialize)
; Test.make ~name:"Rpc.t -> pool_t" (Staged.stage deserialize)
; Test.make ~name:"Atomic.incr" (Staged.stage atomic_inc)
; Test.make ~name:"Mutex+incr" (Staged.stage inc_with_mutex)
; Test.make ~name:"Db_lock.with_lock uncontended"
(Staged.stage db_lock_uncontended)
; Test.make ~name:"rpc_of_event" (Staged.stage test_rpc_of_event)
; Test.make ~name:"Db.Pool.set_tags" (Staged.stage test_db_pool_write)
; Test.make ~name:"Db.Pool.get_tags" (Staged.stage test_db_pool_read)
; Test.make ~name:"Db.VM.set_NVRAM" (Staged.stage test_set_vm_nvram)
]

let () = Bechamel_simple_cli.cli benchmarks
3 changes: 2 additions & 1 deletion ocaml/tests/bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
log
xapi_database
xapi_datamodel
xapi_internal))
xapi_internal
xapi-stdext-threads))
34 changes: 6 additions & 28 deletions ocaml/xapi-types/event_types.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,15 @@ let rpc_of_op = API.rpc_of_event_operation
let op_of_rpc = API.event_operation_of_rpc

type event = {
id: string
; ts: string
; ty: string
; op: op
; reference: string
; snapshot: Rpc.t option
id: string [@key "id"]
; ts: string [@key "timestamp"]
; ty: string [@key "class"]
; op: op [@key "operation"]
; reference: string [@key "ref"]
; snapshot: Rpc.t option [@key "snapshot"]
}
[@@deriving rpc]

let ev_struct_remap =
[
("id", "id")
; ("ts", "timestamp")
; ("ty", "class")
; ("op", "operation")
; ("reference", "ref")
; ("snapshot", "snapshot")
]

let remap map str =
match str with
| Rpc.Dict d ->
Rpc.Dict (List.map (fun (k, v) -> (List.assoc k map, v)) d)
| _ ->
str

let rpc_of_event ev = remap ev_struct_remap (rpc_of_event ev)

let event_of_rpc rpc =
event_of_rpc (remap (List.map (fun (k, v) -> (v, k)) ev_struct_remap) rpc)

type events = event list [@@deriving rpc]

type token = string [@@deriving rpc]
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/pool_db_backup.ml
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ let restore_from_xml __context dry_run (xml_filename : string) =
(Db_xml.From.file (Datamodel_schema.of_datamodel ()) xml_filename)
in
version_check db ;
let db_ref = Db_ref.in_memory (ref (ref db)) in
let db_ref = Db_ref.in_memory (Atomic.make db) in
let new_context = Context.make ~database:db_ref "restore_db" in
prepare_database_for_restore ~old_context:__context ~new_context ;
(* write manifest and unmarshalled db directly to db_temporary_restore_path, so its ready for us on restart *)
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_session.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1569,5 +1569,5 @@ let create_from_db_file ~__context ~filename =
Xapi_database.Db_xml.From.file (Datamodel_schema.of_datamodel ()) filename
|> Xapi_database.Db_upgrade.generic_database_upgrade
in
let db_ref = Some (Xapi_database.Db_ref.in_memory (ref (ref db))) in
let db_ref = Some (Xapi_database.Db_ref.in_memory (Atomic.make db)) in
create_readonly_session ~__context ~uname:"db-from-file" ~db_ref
2 changes: 1 addition & 1 deletion ocaml/xapi/xapi_vdi_helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ let database_ref_of_vdi ~__context ~vdi =
debug "Enabling redo_log with device reason [%s]" device ;
Redo_log.enable_block_existing log device ;
let db = Database.make (Datamodel_schema.of_datamodel ()) in
let db_ref = Xapi_database.Db_ref.in_memory (ref (ref db)) in
let db_ref = Xapi_database.Db_ref.in_memory (Atomic.make db) in
Redo_log_usage.read_from_redo_log log Xapi_globs.foreign_metadata_db db_ref ;
Redo_log.delete log ;
(* Upgrade database to the local schema. *)
Expand Down
Loading