diff --git a/ocaml/libs/tracing/tracing_export.ml b/ocaml/libs/tracing/tracing_export.ml index 5844d389e1c..6b4371350cc 100644 --- a/ocaml/libs/tracing/tracing_export.ml +++ b/ocaml/libs/tracing/tracing_export.ml @@ -306,6 +306,8 @@ module Destination = struct (* Note this signal will flush the spans and terminate the exporter thread *) let signal () = Delay.signal delay + let wait_exit = Delay.make () + let create_exporter () = enable_span_garbage_collector () ; Thread.create @@ -319,7 +321,8 @@ module Destination = struct signaled := true ) ; flush_spans () - done + done ; + Delay.signal wait_exit ) () @@ -339,6 +342,12 @@ module Destination = struct ) end -let flush_and_exit = Destination.signal +let flush_and_exit ~max_wait () = + D.debug "flush_and_exit: signaling thread to export now" ; + Destination.signal () ; + if Delay.wait Destination.wait_exit max_wait then + D.info "flush_and_exit: timeout on span export" + else + D.debug "flush_and_exit: span export finished" let main = Destination.main diff --git a/ocaml/libs/tracing/tracing_export.mli b/ocaml/libs/tracing/tracing_export.mli index 3f8ca750026..f322bd2404c 100644 --- a/ocaml/libs/tracing/tracing_export.mli +++ b/ocaml/libs/tracing/tracing_export.mli @@ -85,9 +85,9 @@ module Destination : sig end end -val flush_and_exit : unit -> unit -(** [flush_and_exit ()] sends a signal to flush the finish spans and terminate - the exporter thread. +val flush_and_exit : max_wait:float -> unit -> unit +(** [flush_and_exit ~max_wait ()] sends a signal to flush the finish spans and terminate + the exporter thread. It waits at most [max_wait] seconds. *) val main : unit -> Thread.t diff --git a/ocaml/tests/bench/bench_tracing.ml b/ocaml/tests/bench/bench_tracing.ml index ff8d872ee64..8db30cfc220 100644 --- a/ocaml/tests/bench/bench_tracing.ml +++ b/ocaml/tests/bench/bench_tracing.ml @@ -25,7 +25,7 @@ let export_thread = (* need to ensure this isn't running outside the benchmarked section, or bechamel might fail with 'Failed to stabilize GC' *) - let after _ = Tracing_export.flush_and_exit () in + let after _ = Tracing_export.flush_and_exit ~max_wait:0. () in Bechamel_simple_cli.thread_workload ~before:Tracing_export.main ~after ~run:ignore @@ -52,7 +52,7 @@ let allocate () = let free t = Tracing.TracerProvider.destroy ~uuid ; - Tracing_export.flush_and_exit () ; + Tracing_export.flush_and_exit ~max_wait:0. () ; Thread.join t let test_tracing_on ?(overflow = false) ~name f = diff --git a/ocaml/xapi/xapi_fuse.ml b/ocaml/xapi/xapi_fuse.ml index 48d0737a613..8c2b5b56d3d 100644 --- a/ocaml/xapi/xapi_fuse.ml +++ b/ocaml/xapi/xapi_fuse.ml @@ -52,6 +52,8 @@ let light_fuse_and_run ?(fuse_length = !Constants.fuse_time) () = in let new_fuse_length = max 5. (fuse_length -. delay_so_far) in debug "light_fuse_and_run: current RRDs have been saved" ; + ignore + (Thread.create Tracing_export.(flush_and_exit ~max_wait:new_fuse_length) ()) ; ignore (Thread.create (fun () -> diff --git a/ocaml/xs-trace/dune b/ocaml/xs-trace/dune index e34fc7e5575..4a19b8c888a 100644 --- a/ocaml/xs-trace/dune +++ b/ocaml/xs-trace/dune @@ -1,23 +1,18 @@ (executable - (modes exe) - (name xs_trace) - (public_name xs-trace) - (package xapi-tools) - (libraries - uri - tracing - cmdliner - tracing_export - xapi-stdext-unix - zstd - ) -) + (modes exe) + (name xs_trace) + (public_name xs-trace) + (package xapi-tools) + (libraries uri tracing cmdliner tracing_export yojson xapi-stdext-unix zstd)) (rule - (targets xs-trace.1) - (deps (:exe xs_trace.exe)) - (action (with-stdout-to %{targets} (run %{exe} --help=groff))) -) + (targets xs-trace.1) + (deps + (:exe xs_trace.exe)) + (action + (with-stdout-to + %{targets} + (run %{exe} --help=groff)))) ; not expected by the specfile ;(install diff --git a/ocaml/xs-trace/xs_trace.ml b/ocaml/xs-trace/xs_trace.ml index 6360649fb20..a5f0c8becef 100644 --- a/ocaml/xs-trace/xs_trace.ml +++ b/ocaml/xs-trace/xs_trace.ml @@ -25,10 +25,7 @@ module Exporter = struct | _ -> () - (** Export traces from file system to a remote endpoint. *) - let export erase src dst = - let dst = Uri.of_string dst in - let submit_json = submit_json dst in + let iter_src src f = let rec export_file = function | path when Sys.is_directory path -> (* Recursively export trace files. *) @@ -38,7 +35,7 @@ module Exporter = struct (* Decompress compressed trace file and submit each line iteratively *) let args = [|"zstdcat"; path|] in let ic = Unix.open_process_args_in args.(0) args in - Unixext.lines_iter submit_json ic ; + Unixext.lines_iter f ic ; match Unix.close_process_in ic with | Unix.WEXITED 0 -> () @@ -47,15 +44,27 @@ module Exporter = struct ) | path when Filename.check_suffix path ".ndjson" -> (* Submit traces line by line. *) - Unixext.readfile_line submit_json path + Unixext.readfile_line f path | path -> (* Assume any other extension is a valid JSON file. *) let json = Unixext.string_of_file path in - submit_json json + f json in - export_file src ; + export_file src + + (** Export traces from file system to a remote endpoint. *) + let export erase src dst = + let dst = Uri.of_string dst in + let submit_json = submit_json dst in + iter_src src submit_json ; if erase then Unixext.rm_rec ~rm_top:true src + + let pretty_print src = + iter_src src @@ fun line -> + line + |> Yojson.Safe.from_string + |> Yojson.Safe.pretty_to_channel ~std:true stdout end module Cli = struct @@ -83,6 +92,11 @@ module Cli = struct let doc = "copy a trace to an endpoint and erase it afterwards" in Cmd.(v (info "mv" ~doc) term) + let pp_cmd = + let term = Term.(const Exporter.pretty_print $ src) in + let doc = "Pretty print NDJSON traces" in + Cmd.(v (info "pp" ~doc) term) + let xs_trace_cmd = let man = [ @@ -94,7 +108,7 @@ module Cli = struct let doc = "utility for working with local trace files" in Cmd.info "xs-trace" ~doc ~version:"0.1" ~man in - Cmd.group desc [cp_cmd; mv_cmd] + Cmd.group desc [cp_cmd; mv_cmd; pp_cmd] let main () = Cmd.eval xs_trace_cmd end