Skip to content

Commit a98bfc5

Browse files
authored
CA-409431: Use an Observer forwarder for xapi-storage-script (#6488)
Currently, xapi-storage-script uses the presence/absence of a smapi observer config file to determine whether it should create traces. This only happens on startup which means smapiv3 traces will often not be created when they should be (e.g. if the smapi component is enabled after startup, tracing for smapiv3 will not be) This commit updates the Smapi Observer forwarder to use an RPC client to send messages to xapi-storage-script, updating it on any relevant changes to the Observer. This is done in a generic way so that any future components could use also listen to this queue to call its own Observer functions. Also move the Observer RPC declarations to a common file to reduce code duplication and make some debug logs more helpful.
2 parents a14bc90 + 4ab193a commit a98bfc5

File tree

9 files changed

+404
-197
lines changed

9 files changed

+404
-197
lines changed

ocaml/xapi-idl/cluster/cluster_interface.ml

Lines changed: 1 addition & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -384,80 +384,5 @@ module LocalAPI (R : RPC) = struct
384384
(debug_info_p @-> timeout_p @-> returning result_p err)
385385
end
386386

387-
module Observer = struct
388-
open TypeCombinators
389-
390-
let endpoints_p = Param.mk ~name:"endpoints" (list Types.string)
391-
392-
let bool_p = Param.mk ~name:"bool" Types.bool
393-
394-
let uuid_p = Param.mk ~name:"uuid" Types.string
395-
396-
let name_label_p = Param.mk ~name:"name_label" Types.string
397-
398-
let dict_p = Param.mk ~name:"dict" dict
399-
400-
let string_p = Param.mk ~name:"string" Types.string
401-
402-
let int_p = Param.mk ~name:"int" Types.int
403-
404-
let float_p = Param.mk ~name:"float" Types.float
405-
406-
let create =
407-
declare "Observer.create" []
408-
(debug_info_p
409-
@-> uuid_p
410-
@-> name_label_p
411-
@-> dict_p
412-
@-> endpoints_p
413-
@-> bool_p
414-
@-> returning unit_p err
415-
)
416-
417-
let destroy =
418-
declare "Observer.destroy" []
419-
(debug_info_p @-> uuid_p @-> returning unit_p err)
420-
421-
let set_enabled =
422-
declare "Observer.set_enabled" []
423-
(debug_info_p @-> uuid_p @-> bool_p @-> returning unit_p err)
424-
425-
let set_attributes =
426-
declare "Observer.set_attributes" []
427-
(debug_info_p @-> uuid_p @-> dict_p @-> returning unit_p err)
428-
429-
let set_endpoints =
430-
declare "Observer.set_endpoints" []
431-
(debug_info_p @-> uuid_p @-> endpoints_p @-> returning unit_p err)
432-
433-
let init = declare "Observer.init" [] (debug_info_p @-> returning unit_p err)
434-
435-
let set_trace_log_dir =
436-
declare "Observer.set_trace_log_dir" []
437-
(debug_info_p @-> string_p @-> returning unit_p err)
438-
439-
let set_export_interval =
440-
declare "Observer.set_export_interval" []
441-
(debug_info_p @-> float_p @-> returning unit_p err)
442-
443-
let set_host_id =
444-
declare "Observer.set_host_id" []
445-
(debug_info_p @-> string_p @-> returning unit_p err)
446-
447-
let set_max_traces =
448-
declare "Observer.set_max_traces" []
449-
(debug_info_p @-> int_p @-> returning unit_p err)
450-
451-
let set_max_spans =
452-
declare "Observer.set_max_spans" []
453-
(debug_info_p @-> int_p @-> returning unit_p err)
454-
455-
let set_max_file_size =
456-
declare "Observer.set_max_file_size" []
457-
(debug_info_p @-> int_p @-> returning unit_p err)
458-
459-
let set_compress_tracing_files =
460-
declare "Observer.set_compress_tracing_files" []
461-
(debug_info_p @-> bool_p @-> returning unit_p err)
462-
end
387+
module Observer = Observer_helpers.ObserverAPI (R)
463388
end

ocaml/xapi-idl/lib/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
(wrapped false)
4141
(preprocess
4242
(per_module
43-
((pps ppx_deriving_rpc) Xcp_channel Xcp_channel_protocol TypeCombinators)
43+
((pps ppx_deriving_rpc) Xcp_channel Xcp_channel_protocol TypeCombinators Observer_helpers Observer_skeleton)
4444
((pps ppx_sexp_conv ppx_deriving_rpc) Xcp_pci))))
4545

4646
(library
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
(*
2+
* Copyright (c) Cloud Software Group
3+
*
4+
* This program is free software; you can redistribute it and/or modify
5+
* it under the terms of the GNU Lesser General Public License as published
6+
* by the Free Software Foundation; version 2.1 only. with the special
7+
* exception on linking described in file LICENSE.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Lesser General Public License for more details.
13+
*)
14+
15+
open Rpc
16+
open Idl
17+
18+
module D = Debug.Make (struct let name = "observer_interface" end)
19+
20+
open D
21+
22+
let service_name = "observer"
23+
24+
let queue_name = Xcp_service.common_prefix ^ service_name
25+
26+
let default_sockets_dir = "/var/lib/xcp"
27+
28+
let default_path = Filename.concat default_sockets_dir service_name
29+
30+
let uri () = "file:" ^ default_path
31+
32+
module Errors = struct
33+
type error =
34+
| Internal_error of string
35+
| Unimplemented of string
36+
| Unknown_error
37+
[@@default Unknown_error] [@@deriving rpcty]
38+
end
39+
40+
exception Observer_error of Errors.error
41+
42+
let err =
43+
let open Error in
44+
{
45+
def= Errors.error
46+
; raiser=
47+
(fun e ->
48+
let exn = Observer_error e in
49+
error "%s (%s)" (Printexc.to_string exn) __LOC__ ;
50+
raise exn
51+
)
52+
; matcher=
53+
(function
54+
| Observer_error e as exn ->
55+
error "%s (%s)" (Printexc.to_string exn) __LOC__ ;
56+
Some e
57+
| exn ->
58+
error "%s (%s)" (Printexc.to_string exn) __LOC__ ;
59+
Some (Internal_error (Printexc.to_string exn))
60+
)
61+
}
62+
63+
(** An uninterpreted string associated with the operation. *)
64+
type debug_info = string [@@deriving rpcty]
65+
66+
module ObserverAPI (R : RPC) = struct
67+
open R
68+
open TypeCombinators
69+
70+
let description =
71+
let open Interface in
72+
{
73+
name= "Observer"
74+
; namespace= None
75+
; description=
76+
[
77+
"This interface is used to create, update and destroy Observers to \
78+
control the use of tracing in different xapi components"
79+
]
80+
; version= (1, 0, 0)
81+
}
82+
83+
let implementation = implement description
84+
85+
let dbg_p = Param.mk ~name:"dbg" Types.string
86+
87+
let unit_p = Param.mk ~name:"unit" Types.unit
88+
89+
let endpoints_p = Param.mk ~name:"endpoints" (list Types.string)
90+
91+
let bool_p = Param.mk ~name:"bool" Types.bool
92+
93+
let uuid_p = Param.mk ~name:"uuid" Types.string
94+
95+
let name_label_p = Param.mk ~name:"name_label" Types.string
96+
97+
let dict_p = Param.mk ~name:"dict" dict
98+
99+
let string_p = Param.mk ~name:"string" Types.string
100+
101+
let int_p = Param.mk ~name:"int" Types.int
102+
103+
let float_p = Param.mk ~name:"float" Types.float
104+
105+
let create =
106+
declare "Observer.create" []
107+
(dbg_p
108+
@-> uuid_p
109+
@-> name_label_p
110+
@-> dict_p
111+
@-> endpoints_p
112+
@-> bool_p
113+
@-> returning unit_p err
114+
)
115+
116+
let destroy =
117+
declare "Observer.destroy" [] (dbg_p @-> uuid_p @-> returning unit_p err)
118+
119+
let set_enabled =
120+
declare "Observer.set_enabled" []
121+
(dbg_p @-> uuid_p @-> bool_p @-> returning unit_p err)
122+
123+
let set_attributes =
124+
declare "Observer.set_attributes" []
125+
(dbg_p @-> uuid_p @-> dict_p @-> returning unit_p err)
126+
127+
let set_endpoints =
128+
declare "Observer.set_endpoints" []
129+
(dbg_p @-> uuid_p @-> endpoints_p @-> returning unit_p err)
130+
131+
let init = declare "Observer.init" [] (dbg_p @-> returning unit_p err)
132+
133+
let set_trace_log_dir =
134+
declare "Observer.set_trace_log_dir" []
135+
(dbg_p @-> string_p @-> returning unit_p err)
136+
137+
let set_export_interval =
138+
declare "Observer.set_export_interval" []
139+
(dbg_p @-> float_p @-> returning unit_p err)
140+
141+
let set_max_spans =
142+
declare "Observer.set_max_spans" []
143+
(dbg_p @-> int_p @-> returning unit_p err)
144+
145+
let set_max_traces =
146+
declare "Observer.set_max_traces" []
147+
(dbg_p @-> int_p @-> returning unit_p err)
148+
149+
let set_max_file_size =
150+
declare "Observer.set_max_file_size" []
151+
(dbg_p @-> int_p @-> returning unit_p err)
152+
153+
let set_host_id =
154+
declare "Observer.set_host_id" []
155+
(dbg_p @-> string_p @-> returning unit_p err)
156+
157+
let set_compress_tracing_files =
158+
declare "Observer.set_compress_tracing_files" []
159+
(dbg_p @-> bool_p @-> returning unit_p err)
160+
end
161+
162+
module type Server_impl = sig
163+
type context = unit
164+
165+
val create :
166+
context
167+
-> dbg:debug_info
168+
-> uuid:string
169+
-> name_label:string
170+
-> attributes:(string * string) list
171+
-> endpoints:string list
172+
-> enabled:bool
173+
-> unit
174+
175+
val destroy : context -> dbg:debug_info -> uuid:string -> unit
176+
177+
val set_enabled :
178+
context -> dbg:debug_info -> uuid:string -> enabled:bool -> unit
179+
180+
val set_attributes :
181+
context
182+
-> dbg:debug_info
183+
-> uuid:string
184+
-> attributes:(string * string) list
185+
-> unit
186+
187+
val set_endpoints :
188+
context -> dbg:debug_info -> uuid:string -> endpoints:string list -> unit
189+
190+
val init : context -> dbg:debug_info -> unit
191+
192+
val set_trace_log_dir : context -> dbg:debug_info -> dir:string -> unit
193+
194+
val set_export_interval : context -> dbg:debug_info -> interval:float -> unit
195+
196+
val set_max_spans : context -> dbg:debug_info -> spans:int -> unit
197+
198+
val set_max_traces : context -> dbg:debug_info -> traces:int -> unit
199+
200+
val set_max_file_size : context -> dbg:debug_info -> file_size:int -> unit
201+
202+
val set_host_id : context -> dbg:debug_info -> host_id:string -> unit
203+
204+
val set_compress_tracing_files :
205+
context -> dbg:debug_info -> enabled:bool -> unit
206+
end
207+
208+
module Server (Impl : Server_impl) () = struct
209+
module S = ObserverAPI (Idl.Exn.GenServer ())
210+
211+
let _ =
212+
S.create (fun dbg uuid name_label attributes endpoints enabled ->
213+
Impl.create () ~dbg ~uuid ~name_label ~attributes ~endpoints ~enabled
214+
) ;
215+
S.destroy (fun dbg uuid -> Impl.destroy () ~dbg ~uuid) ;
216+
S.set_enabled (fun dbg uuid enabled ->
217+
Impl.set_enabled () ~dbg ~uuid ~enabled
218+
) ;
219+
S.set_attributes (fun dbg uuid attributes ->
220+
Impl.set_attributes () ~dbg ~uuid ~attributes
221+
) ;
222+
S.set_endpoints (fun dbg uuid endpoints ->
223+
Impl.set_endpoints () ~dbg ~uuid ~endpoints
224+
) ;
225+
S.init (fun dbg -> Impl.init () ~dbg) ;
226+
S.set_trace_log_dir (fun dbg dir -> Impl.set_trace_log_dir () ~dbg ~dir) ;
227+
S.set_export_interval (fun dbg interval ->
228+
Impl.set_export_interval () ~dbg ~interval
229+
) ;
230+
S.set_max_spans (fun dbg spans -> Impl.set_max_spans () ~dbg ~spans) ;
231+
S.set_max_traces (fun dbg traces -> Impl.set_max_traces () ~dbg ~traces) ;
232+
S.set_max_file_size (fun dbg file_size ->
233+
Impl.set_max_file_size () ~dbg ~file_size
234+
) ;
235+
S.set_host_id (fun dbg host_id -> Impl.set_host_id () ~dbg ~host_id) ;
236+
S.set_compress_tracing_files (fun dbg enabled ->
237+
Impl.set_compress_tracing_files () ~dbg ~enabled
238+
)
239+
240+
(* Bind all *)
241+
let process call = Idl.Exn.server S.implementation call
242+
end
243+
244+
let rec retry_econnrefused f =
245+
try f () with
246+
| Unix.Unix_error (Unix.ECONNREFUSED, "connect", _) ->
247+
(* debug "Caught ECONNREFUSED; retrying in 5s"; *)
248+
Thread.delay 5. ; retry_econnrefused f
249+
| e ->
250+
(* error "Caught %s: does the observer service need restarting?"
251+
(Printexc.to_string e); *)
252+
raise e
253+
254+
module Client = ObserverAPI (Idl.Exn.GenClient (struct
255+
open Xcp_client
256+
257+
let rpc call =
258+
retry_econnrefused (fun () ->
259+
if !use_switch then
260+
json_switch_rpc queue_name call
261+
else
262+
xml_http_rpc ~srcstr:(get_user_agent ()) ~dststr:queue_name uri call
263+
)
264+
end))

0 commit comments

Comments
 (0)