Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ module CBuf = struct
in
let read = Unix.read fd x.buffer next len in
if read = 0 then x.r_closed <- true ;
x.len <- x.len + read
x.len <- x.len + read ;
(x.buffer, read, next)
end

exception Process_still_alive
Expand Down Expand Up @@ -381,11 +382,21 @@ let with_polly f =
let finally () = Polly.close polly in
Xapi_stdext_pervasives.Pervasiveext.finally (fun () -> f polly) finally

let proxy (a : Unix.file_descr) (b : Unix.file_descr) =
exception Close_proxy

let proxy ?should_close ?(poll_timeout = -1) (a : Unix.file_descr)
(b : Unix.file_descr) =
let size = 64 * 1024 in
(* [a'] is read from [a] and will be written to [b] *)
(* [b'] is read from [b] and will be written to [a] *)
let a' = CBuf.empty size and b' = CBuf.empty size in

let close_proxy () =
Unix.shutdown a Unix.SHUTDOWN_ALL ;
Unix.shutdown b Unix.SHUTDOWN_ALL ;
raise Close_proxy
in

Unix.set_nonblock a ;
Unix.set_nonblock b ;
with_polly @@ fun polly ->
Expand Down Expand Up @@ -413,13 +424,22 @@ let proxy (a : Unix.file_descr) (b : Unix.file_descr) =
Polly.upd polly a a_events ;
if Polly.Events.(b_events <> empty) then
Polly.upd polly b b_events ;
Polly.wait_fold polly 4 (-1) () (fun _polly fd events () ->
Polly.wait_fold polly 4 poll_timeout (Bytes.empty, 0, 0)
(fun _polly fd events acc ->
(* Do the writing before the reading *)
if Polly.Events.(test out events) then
if a = fd then CBuf.write b' a else CBuf.write a' b ;
if Polly.Events.(test inp events) then
if a = fd then CBuf.read a' a else CBuf.read b' b
) ;
if a = fd then (
ignore (CBuf.read a' a) ;
acc
) else
CBuf.read b' b
else
acc
)
|> fun data ->
Option.iter (fun cb -> if cb data then close_proxy ()) should_close ;
(* If there's nothing else to read or write then signal the other end *)
List.iter
(fun (buf, fd) ->
Expand Down
7 changes: 6 additions & 1 deletion ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ exception Process_still_alive

val kill_and_wait : ?signal:int -> ?timeout:float -> int -> unit

val proxy : Unix.file_descr -> Unix.file_descr -> unit
val proxy :
?should_close:(bytes * int * int -> bool)
-> ?poll_timeout:int
-> Unix.file_descr
-> Unix.file_descr
-> unit

val really_read : Unix.file_descr -> bytes -> int -> int -> unit

Expand Down
91 changes: 88 additions & 3 deletions ocaml/xapi/console.ml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,87 @@ let address_of_console __context console : address option =
) ;
address_option

let real_proxy' vnc_port s =
module Console_idle_monitor = struct
let get_idle_timeout_config ~__context ~vm =
try
let idle_timeout =
if Db.VM.get_is_control_domain ~__context ~self:vm then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this triggers several DB calls every 5 seconds for every console. If the VM is running on a pool member, then that means several calls to the coordinator. I understand that we want to be responsive to timeout changes, with we should reduce this to at most one DB call.
Several of these values are static for given VM and can be captured in the closure before the line with fun (buf, read_len, offset) -> below.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Good point.

let host = Helpers.get_localhost ~__context in
Db.Host.get_console_idle_timeout ~__context ~self:host
else
let pool = Helpers.get_pool ~__context in
Db.Pool.get_vm_console_idle_timeout ~__context ~self:pool
in
if idle_timeout > 0L then Some (Int64.to_float idle_timeout) else None
with _ -> None

let is_active messages =
List.exists
(function
| Rfb_client_msgtype_parser.KeyEvent
| Rfb_client_msgtype_parser.PointerEvent
| Rfb_client_msgtype_parser.QEMUClientMessage ->
true
| _ ->
false
)
messages

let timed_out ~idle_timeout_seconds ~last_activity =
let elapsed = Mtime_clock.count last_activity in
Mtime.Span.to_float_ns elapsed /. 1e9 > idle_timeout_seconds

(* Create an idle timeout callback for the console,
if idle timeout, then close the proxy *)
let create_idle_timeout_callback ~__context ~vm =
match get_idle_timeout_config ~__context ~vm with
| Some idle_timeout_seconds -> (
let module P = Rfb_client_msgtype_parser in
let state = ref (Some (P.create (), Mtime_clock.counter ())) in
(* Return true for idle timeout to close the proxy,
otherwise return false to keep the proxy open *)
fun (buf, read_len, offset) ->
match !state with
| None ->
false
| Some (rfb_parser, last_activity) ->
let ok msgs =
if is_active msgs then (
state := Some (rfb_parser, Mtime_clock.counter ()) ;
false
) else
let timeout_result =
timed_out ~idle_timeout_seconds ~last_activity
in
if timeout_result then
debug
"Console connection idle timeout exceeded for VM %s \
(timeout: %.1fs)"
(Ref.string_of vm) idle_timeout_seconds ;
timeout_result
in
let error msg =
debug "RFB parse error: %s" msg ;
state := None ;
false
in
Bytes.sub_string buf offset read_len
|> rfb_parser
|> Result.fold ~ok ~error
)
| None ->
Fun.const false
end

let get_poll_timeout =
let poll_period_timeout = !Xapi_globs.proxy_poll_period_timeout in
if poll_period_timeout < 0. then
-1
else
Float.to_int (poll_period_timeout *. 1000.)
(* convert to milliseconds *)

let real_proxy' ~__context ~vm vnc_port s =
try
Http_svr.headers s (Http.http_200_ok ()) ;
let vnc_sock =
Expand All @@ -141,7 +221,12 @@ let real_proxy' vnc_port s =
debug "Connected; running proxy (between fds: %d and %d)"
(Unixext.int_of_file_descr vnc_sock)
(Unixext.int_of_file_descr s') ;
Unixext.proxy vnc_sock s' ;

let poll_timeout = get_poll_timeout in
let should_close =
Console_idle_monitor.create_idle_timeout_callback ~__context ~vm
in
Unixext.proxy ~should_close ~poll_timeout vnc_sock s' ;
debug "Proxy exited"
with exn -> debug "error: %s" (ExnHelper.string_of_exn exn)

Expand All @@ -153,7 +238,7 @@ let real_proxy __context vm _ _ vnc_port s =
in
if Connection_limit.try_add vm_id is_limit_enabled then
finally (* Ensure we drop the vm connection count if exceptions occur *)
(fun () -> real_proxy' vnc_port s)
(fun () -> real_proxy' ~__context ~vm vnc_port s)
(fun () -> Connection_limit.drop vm_id)
else
Http_svr.headers s (Http.http_503_service_unavailable ())
Expand Down
13 changes: 13 additions & 0 deletions ocaml/xapi/xapi_globs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ let xha_timeout = "timeout"

let message_limit = ref 10000

(* The timeout (in seconds) for event polling in the proxy loop.
If set to a positive value, the poll will wake up periodically,
which is useful for implementing features like idle timeout or periodic inspection of proxy buffers. *)
let proxy_poll_period_timeout = ref 5.0

let xapi_message_script = ref "mail-alarm"

(* Emit a warning if more than this amount of clock skew detected *)
Expand Down Expand Up @@ -1783,6 +1788,14 @@ let other_options =
, (fun () -> string_of_float !vm_sysprep_wait)
, "Time in seconds to wait for VM to recognise inserted CD"
)
; ( "proxy_poll_period_timeout"
, Arg.Set_float proxy_poll_period_timeout
, (fun () -> string_of_float !proxy_poll_period_timeout)
, "Timeout (in seconds) for event polling in network proxy loops. When \
positive, the proxy will wake up periodically to check tasks like vnc \
idle timeouts or perform other maintenance tasks. Set to -1 to wait \
indefinitely for network events without periodic wake-ups."
)
]

(* The options can be set with the variable xapiflags in /etc/sysconfig/xapi.
Expand Down
Loading