Skip to content

Commit 817b434

Browse files
authored
Parallelize host evacuation even more (#6514)
With bab83d9, host evacuation was parallelized by grouping VMs into batches, and starting a new batch once the previous one has finished. This means that a single slow VM can potentially slow down the whole evacuation. Add a new `Tasks.wait_for_all_with_callback` function that will invoke a callback every time one of the tasks is deemed non-pending. This will allow its users to: 1) track the progress of tasks within the submitted batch 2) schedule new tasks to replace the completed ones Use the new `Tasks.wait_for_all_with_callback` in `xapi_host` to schedule a new migration as soon as any of the previous ones have finished, thus maintaining a constant flow of `n` migrations. Additionally expose the `evacuate-batch-size` parameter in the CLI, this was missed when it was originally added with the CLI setting it to `0` (pick the default) all the time. === Manually tested multiple times, confirmed to not break anything and to actually maintain a constant flow of migrations. This should greatly speed up host evacuations when there is a combination of bigger and smaller VMs (in terms of memory/disk, or VMs with some other reason for slow migration) on the host
2 parents 6727c4e + 40e9fa1 commit 817b434

File tree

5 files changed

+128
-36
lines changed

5 files changed

+128
-36
lines changed

ocaml/xapi-cli-server/cli_frontend.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2881,7 +2881,7 @@ let rec cmdtable_data : (string * cmd_spec) list =
28812881
; ( "host-evacuate"
28822882
, {
28832883
reqd= []
2884-
; optn= ["network-uuid"]
2884+
; optn= ["network-uuid"; "batch-size"]
28852885
; help= "Migrate all VMs off a host."
28862886
; implementation= No_fd Cli_operations.host_evacuate
28872887
; flags= [Host_selectors]

ocaml/xapi-cli-server/cli_operations.ml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5381,13 +5381,21 @@ let host_evacuate _printer rpc session_id params =
53815381
Client.Network.get_by_uuid ~rpc ~session_id ~uuid
53825382
)
53835383
in
5384+
let evacuate_batch_size =
5385+
match List.assoc_opt "batch-size" params with
5386+
| Some x ->
5387+
Scanf.sscanf x "%Lu%!" Fun.id
5388+
| None ->
5389+
0L
5390+
in
53845391
ignore
53855392
(do_host_op rpc session_id ~multiple:false
53865393
(fun _ host ->
53875394
Client.Host.evacuate ~rpc ~session_id ~host:(host.getref ()) ~network
5388-
~evacuate_batch_size:0L
5395+
~evacuate_batch_size
53895396
)
5390-
params ["network-uuid"]
5397+
params
5398+
["network-uuid"; "batch-size"]
53915399
)
53925400

53935401
let host_get_vms_which_prevent_evacuation printer rpc session_id params =

ocaml/xapi-client/tasks.ml

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ module TaskSet = Set.Make (struct
2323
end)
2424

2525
(* Return once none of the tasks have a `pending status. *)
26-
let wait_for_all_inner ~rpc ~session_id ~all_timeout ~tasks =
26+
let wait_for_all_inner ~rpc ~session_id ~all_timeout ~tasks ~callback =
2727
let classes =
2828
List.map (fun task -> Printf.sprintf "task/%s" (Ref.string_of task)) tasks
2929
in
@@ -36,7 +36,12 @@ let wait_for_all_inner ~rpc ~session_id ~all_timeout ~tasks =
3636
in
3737
let timer = Mtime_clock.counter () in
3838
let timeout = 5.0 in
39-
let rec wait ~token ~task_set =
39+
let get_new_classes task_set =
40+
TaskSet.fold
41+
(fun task l -> Printf.sprintf "task/%s" (Ref.string_of task) :: l)
42+
task_set []
43+
in
44+
let rec wait ~token ~task_set ~completed_task_count ~classes =
4045
if TaskSet.is_empty task_set then
4146
true
4247
else
@@ -58,42 +63,67 @@ let wait_for_all_inner ~rpc ~session_id ~all_timeout ~tasks =
5863
List.map Event_helper.record_of_event event_from.events
5964
in
6065
(* If any records indicate that a task is no longer pending, remove that task from the set. *)
61-
let pending_task_set =
66+
let pending_task_set, completed_task_count, classes =
6267
List.fold_left
63-
(fun task_set' record ->
68+
(fun (task_set', completed_task_count, _) record ->
6469
match record with
6570
| Event_helper.Task (t, Some t_rec) ->
6671
if
6772
TaskSet.mem t task_set'
6873
&& t_rec.API.task_status <> `pending
6974
then
70-
TaskSet.remove t task_set'
75+
let new_task_set = TaskSet.remove t task_set' in
76+
let completed_task_count = completed_task_count + 1 in
77+
78+
(* Call the callback function, wait for new tasks if any *)
79+
let tasks_to_add = callback completed_task_count t in
80+
let new_task_set =
81+
List.fold_left
82+
(fun task_set task -> TaskSet.add task task_set)
83+
new_task_set tasks_to_add
84+
in
85+
( new_task_set
86+
, completed_task_count
87+
, get_new_classes new_task_set
88+
)
7189
else
72-
task_set'
90+
(task_set', completed_task_count, classes)
7391
| _ ->
74-
task_set'
92+
(task_set', completed_task_count, classes)
7593
)
76-
task_set records
94+
(task_set, completed_task_count, classes)
95+
records
7796
in
7897
wait ~token:event_from.Event_types.token ~task_set:pending_task_set
98+
~completed_task_count ~classes
7999
in
80100
let token = "" in
81101
let task_set =
82102
List.fold_left
83103
(fun task_set' task -> TaskSet.add task task_set')
84104
TaskSet.empty tasks
85105
in
86-
wait ~token ~task_set
106+
wait ~token ~task_set ~completed_task_count:0 ~classes
87107

88108
let wait_for_all ~rpc ~session_id ~tasks =
89-
wait_for_all_inner ~rpc ~session_id ~all_timeout:None ~tasks |> ignore
109+
wait_for_all_inner ~rpc ~session_id ~all_timeout:None ~tasks
110+
~callback:(fun _ _ -> []
111+
)
112+
|> ignore
113+
114+
let wait_for_all_with_callback ~rpc ~session_id ~tasks ~callback =
115+
wait_for_all_inner ~rpc ~session_id ~all_timeout:None ~tasks ~callback
116+
|> ignore
90117

91118
let with_tasks_destroy ~rpc ~session_id ~timeout ~tasks =
92119
let wait_or_cancel () =
93120
D.info "Waiting for %d tasks, timeout: %.3fs" (List.length tasks) timeout ;
94121
if
95122
not
96-
(wait_for_all_inner ~rpc ~session_id ~all_timeout:(Some timeout) ~tasks)
123+
(wait_for_all_inner ~rpc ~session_id ~all_timeout:(Some timeout) ~tasks
124+
~callback:(fun _ _ -> []
125+
)
126+
)
97127
then (
98128
D.info "Canceling tasks" ;
99129
List.iter
@@ -104,6 +134,8 @@ let with_tasks_destroy ~rpc ~session_id ~timeout ~tasks =
104134
tasks ;
105135
(* cancel is not immediate, give it a reasonable chance to take effect *)
106136
wait_for_all_inner ~rpc ~session_id ~all_timeout:(Some 60.) ~tasks
137+
~callback:(fun _ _ -> []
138+
)
107139
|> ignore ;
108140
false
109141
) else

ocaml/xapi-client/tasks.mli

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
* GNU Lesser General Public License for more details.
1313
*)
1414

15+
module TaskSet : Set.S with type elt = API.ref_task
16+
1517
val wait_for_all :
1618
rpc:(Rpc.call -> Rpc.response)
1719
-> session_id:API.ref_session
@@ -20,6 +22,27 @@ val wait_for_all :
2022
(** [wait_for_all ~rpc ~session_id ~tasks] returns when all of [tasks]
2123
are in some non-pending state. *)
2224

25+
val wait_for_all_with_callback :
26+
rpc:(Rpc.call -> Rpc.response)
27+
-> session_id:API.ref_session
28+
-> tasks:API.ref_task list
29+
-> callback:(int -> API.ref_task -> API.ref_task list)
30+
-> unit
31+
(** [wait_for_all_with_callback ~rpc ~session_id ~tasks ~callback] returns when
32+
all of [tasks] are in some non-pending state. When one of the [tasks] is
33+
completed, [callback overall_completed_task_count] is invoked, which returns
34+
a list of tasks that need to be added to [tasks] and waited on as well.
35+
36+
This allows, for example, to implement a system where tasks are processed
37+
in batches of *constant* size X, with new tasks being started as soon as at
38+
least one slot in the batch is freed, instead of waiting for the whole batch
39+
to finish (and potentially being slowed down by a single worst performer).
40+
41+
The callback could instead just perform some side-effect (set the progress
42+
of the overall task representing progress of individual units, for example)
43+
and return an empty list.
44+
*)
45+
2346
val with_tasks_destroy :
2447
rpc:(Rpc.call -> Rpc.response)
2548
-> session_id:API.ref_session

ocaml/xapi/xapi_host.ml

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -649,8 +649,9 @@ let evacuate ~__context ~host ~network ~evacuate_batch_size =
649649
raise (Api_errors.Server_error (code, params))
650650
in
651651

652-
(* execute [n] asynchronous API calls [api_fn] for [xs] and wait for them to
653-
finish before executing the next batch. *)
652+
(* execute [plans_length] asynchronous API calls [api_fn] for [xs] in batches
653+
of [n] at a time, scheduling a new call as soon as one of the tasks from
654+
the previous batch is completed *)
654655
let batch ~__context n api_fn xs =
655656
let finally = Xapi_stdext_pervasives.Pervasiveext.finally in
656657
let destroy = Client.Client.Task.destroy in
@@ -675,27 +676,55 @@ let evacuate ~__context ~host ~network ~evacuate_batch_size =
675676
fail task "unexpected status of migration task"
676677
in
677678

678-
let rec loop xs =
679-
match take n xs with
680-
| [], _ ->
681-
()
682-
| head, tail ->
683-
Helpers.call_api_functions ~__context @@ fun rpc session_id ->
684-
let tasks = List.map (api_fn ~rpc ~session_id) head in
685-
finally
686-
(fun () ->
687-
Tasks.wait_for_all ~rpc ~session_id ~tasks ;
688-
List.iter assert_success tasks ;
689-
let tail_length = List.length tail |> float in
690-
let progress = 1.0 -. (tail_length /. plans_length) in
691-
TaskHelper.set_progress ~__context progress
679+
Helpers.call_api_functions ~__context @@ fun rpc session_id ->
680+
( match take n xs with
681+
| [], _ ->
682+
()
683+
| head, tasks_left ->
684+
let tasks_left = ref tasks_left in
685+
let initial_task_batch = List.map (api_fn ~rpc ~session_id) head in
686+
let tasks_pending =
687+
ref
688+
(List.fold_left
689+
(fun task_set' task -> Tasks.TaskSet.add task task_set')
690+
Tasks.TaskSet.empty initial_task_batch
692691
)
693-
(fun () ->
694-
List.iter (fun self -> destroy ~rpc ~session_id ~self) tasks
695-
) ;
696-
loop tail
697-
in
698-
loop xs ;
692+
in
693+
694+
let single_task_progress = 1.0 /. plans_length in
695+
let on_each_task_completion completed_task_count completed_task =
696+
(* Clean up the completed task *)
697+
assert_success completed_task ;
698+
destroy ~rpc ~session_id ~self:completed_task ;
699+
tasks_pending := Tasks.TaskSet.remove completed_task !tasks_pending ;
700+
701+
(* Update progress *)
702+
let progress =
703+
Int.to_float completed_task_count *. single_task_progress
704+
in
705+
TaskHelper.set_progress ~__context progress ;
706+
707+
(* Schedule a new task, if there are any left *)
708+
match !tasks_left with
709+
| [] ->
710+
[]
711+
| task_to_schedule :: left ->
712+
tasks_left := left ;
713+
let new_task = api_fn ~rpc ~session_id task_to_schedule in
714+
tasks_pending := Tasks.TaskSet.add new_task !tasks_pending ;
715+
[new_task]
716+
in
717+
finally
718+
(fun () ->
719+
Tasks.wait_for_all_with_callback ~rpc ~session_id
720+
~tasks:initial_task_batch ~callback:on_each_task_completion
721+
)
722+
(fun () ->
723+
Tasks.TaskSet.iter
724+
(fun self -> destroy ~rpc ~session_id ~self)
725+
!tasks_pending
726+
)
727+
) ;
699728
TaskHelper.set_progress ~__context 1.0
700729
in
701730

0 commit comments

Comments
 (0)