diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml index 893a7e4d9bc..8f29e8f1082 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.ml @@ -338,7 +338,8 @@ module CBuf = struct in let read = Unix.read fd x.buffer next len in if read = 0 then x.r_closed <- true ; - x.len <- x.len + read + x.len <- x.len + read ; + (x.buffer, read, next) end exception Process_still_alive @@ -381,11 +382,21 @@ let with_polly f = let finally () = Polly.close polly in Xapi_stdext_pervasives.Pervasiveext.finally (fun () -> f polly) finally -let proxy (a : Unix.file_descr) (b : Unix.file_descr) = +exception Close_proxy + +let proxy ?should_close ?(poll_timeout = -1) (a : Unix.file_descr) + (b : Unix.file_descr) = let size = 64 * 1024 in (* [a'] is read from [a] and will be written to [b] *) (* [b'] is read from [b] and will be written to [a] *) let a' = CBuf.empty size and b' = CBuf.empty size in + + let close_proxy () = + Unix.shutdown a Unix.SHUTDOWN_ALL ; + Unix.shutdown b Unix.SHUTDOWN_ALL ; + raise Close_proxy + in + Unix.set_nonblock a ; Unix.set_nonblock b ; with_polly @@ fun polly -> @@ -413,13 +424,22 @@ let proxy (a : Unix.file_descr) (b : Unix.file_descr) = Polly.upd polly a a_events ; if Polly.Events.(b_events <> empty) then Polly.upd polly b b_events ; - Polly.wait_fold polly 4 (-1) () (fun _polly fd events () -> + Polly.wait_fold polly 4 poll_timeout (Bytes.empty, 0, 0) + (fun _polly fd events acc -> (* Do the writing before the reading *) if Polly.Events.(test out events) then if a = fd then CBuf.write b' a else CBuf.write a' b ; if Polly.Events.(test inp events) then - if a = fd then CBuf.read a' a else CBuf.read b' b - ) ; + if a = fd then ( + ignore (CBuf.read a' a) ; + acc + ) else + CBuf.read b' b + else + acc + ) + |> fun data -> + Option.iter (fun cb -> if cb data then close_proxy ()) should_close ; (* If there's nothing else to read or write then signal the other end *) List.iter (fun (buf, fd) -> diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli index 3db652bd2a3..19a5a32bdfb 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-unix/unixext.mli @@ -126,7 +126,12 @@ exception Process_still_alive val kill_and_wait : ?signal:int -> ?timeout:float -> int -> unit -val proxy : Unix.file_descr -> Unix.file_descr -> unit +val proxy : + ?should_close:(bytes * int * int -> bool) + -> ?poll_timeout:int + -> Unix.file_descr + -> Unix.file_descr + -> unit val really_read : Unix.file_descr -> bytes -> int -> int -> unit diff --git a/ocaml/xapi/console.ml b/ocaml/xapi/console.ml index dcf95ef6106..08f4c863e87 100644 --- a/ocaml/xapi/console.ml +++ b/ocaml/xapi/console.ml @@ -126,7 +126,87 @@ let address_of_console __context console : address option = ) ; address_option -let real_proxy' vnc_port s = +module Console_idle_monitor = struct + let get_idle_timeout_config ~__context ~vm = + try + let idle_timeout = + if Db.VM.get_is_control_domain ~__context ~self:vm then + let host = Helpers.get_localhost ~__context in + Db.Host.get_console_idle_timeout ~__context ~self:host + else + let pool = Helpers.get_pool ~__context in + Db.Pool.get_vm_console_idle_timeout ~__context ~self:pool + in + if idle_timeout > 0L then Some (Int64.to_float idle_timeout) else None + with _ -> None + + let is_active messages = + List.exists + (function + | Rfb_client_msgtype_parser.KeyEvent + | Rfb_client_msgtype_parser.PointerEvent + | Rfb_client_msgtype_parser.QEMUClientMessage -> + true + | _ -> + false + ) + messages + + let timed_out ~idle_timeout_seconds ~last_activity = + let elapsed = Mtime_clock.count last_activity in + Mtime.Span.to_float_ns elapsed /. 1e9 > idle_timeout_seconds + + (* Create an idle timeout callback for the console, + if idle timeout, then close the proxy *) + let create_idle_timeout_callback ~__context ~vm = + match get_idle_timeout_config ~__context ~vm with + | Some idle_timeout_seconds -> ( + let module P = Rfb_client_msgtype_parser in + let state = ref (Some (P.create (), Mtime_clock.counter ())) in + (* Return true for idle timeout to close the proxy, + otherwise return false to keep the proxy open *) + fun (buf, read_len, offset) -> + match !state with + | None -> + false + | Some (rfb_parser, last_activity) -> + let ok msgs = + if is_active msgs then ( + state := Some (rfb_parser, Mtime_clock.counter ()) ; + false + ) else + let timeout_result = + timed_out ~idle_timeout_seconds ~last_activity + in + if timeout_result then + debug + "Console connection idle timeout exceeded for VM %s \ + (timeout: %.1fs)" + (Ref.string_of vm) idle_timeout_seconds ; + timeout_result + in + let error msg = + debug "RFB parse error: %s" msg ; + state := None ; + false + in + Bytes.sub_string buf offset read_len + |> rfb_parser + |> Result.fold ~ok ~error + ) + | None -> + Fun.const false +end + +let get_poll_timeout = + let poll_period_timeout = !Xapi_globs.proxy_poll_period_timeout in + if poll_period_timeout < 0. then + -1 + else + Float.to_int (poll_period_timeout *. 1000.) +(* convert to milliseconds *) + +let real_proxy' ~__context ~vm vnc_port s = try Http_svr.headers s (Http.http_200_ok ()) ; let vnc_sock = @@ -141,7 +221,12 @@ let real_proxy' vnc_port s = debug "Connected; running proxy (between fds: %d and %d)" (Unixext.int_of_file_descr vnc_sock) (Unixext.int_of_file_descr s') ; - Unixext.proxy vnc_sock s' ; + + let poll_timeout = get_poll_timeout in + let should_close = + Console_idle_monitor.create_idle_timeout_callback ~__context ~vm + in + Unixext.proxy ~should_close ~poll_timeout vnc_sock s' ; debug "Proxy exited" with exn -> debug "error: %s" (ExnHelper.string_of_exn exn) @@ -153,7 +238,7 @@ let real_proxy __context vm _ _ vnc_port s = in if Connection_limit.try_add vm_id is_limit_enabled then finally (* Ensure we drop the vm connection count if exceptions occur *) - (fun () -> real_proxy' vnc_port s) + (fun () -> real_proxy' ~__context ~vm vnc_port s) (fun () -> Connection_limit.drop vm_id) else Http_svr.headers s (Http.http_503_service_unavailable ()) diff --git a/ocaml/xapi/xapi_globs.ml b/ocaml/xapi/xapi_globs.ml index 3688478dce8..f84d72149da 100644 --- a/ocaml/xapi/xapi_globs.ml +++ b/ocaml/xapi/xapi_globs.ml @@ -450,6 +450,11 @@ let xha_timeout = "timeout" let message_limit = ref 10000 +(* The timeout (in seconds) for event polling in the proxy loop. + If set to a positive value, the poll will wake up periodically, + which is useful for implementing features like idle timeout or periodic inspection of proxy buffers. *) +let proxy_poll_period_timeout = ref 5.0 + let xapi_message_script = ref "mail-alarm" (* Emit a warning if more than this amount of clock skew detected *) @@ -1783,6 +1788,14 @@ let other_options = , (fun () -> string_of_float !vm_sysprep_wait) , "Time in seconds to wait for VM to recognise inserted CD" ) + ; ( "proxy_poll_period_timeout" + , Arg.Set_float proxy_poll_period_timeout + , (fun () -> string_of_float !proxy_poll_period_timeout) + , "Timeout (in seconds) for event polling in network proxy loops. When \ + positive, the proxy will wake up periodically to check tasks like vnc \ + idle timeouts or perform other maintenance tasks. Set to -1 to wait \ + indefinitely for network events without periodic wake-ups." + ) ] (* The options can be set with the variable xapiflags in /etc/sysconfig/xapi.