Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ocaml/xapi-client/tasks.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* GNU Lesser General Public License for more details.
*)

module TaskSet : Set.S with type elt = API.ref_task

val wait_for_all :
rpc:(Rpc.call -> Rpc.response)
-> session_id:API.ref_session
Expand Down
73 changes: 51 additions & 22 deletions ocaml/xapi/xapi_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,9 @@ let evacuate ~__context ~host ~network ~evacuate_batch_size =
raise (Api_errors.Server_error (code, params))
in

(* execute [n] asynchronous API calls [api_fn] for [xs] and wait for them to
finish before executing the next batch. *)
(* execute [plans_length] asynchronous API calls [api_fn] for [xs] in batches
of [n] at a time, scheduling a new call as soon as one of the tasks from
the previous batch is completed *)
let batch ~__context n api_fn xs =
let finally = Xapi_stdext_pervasives.Pervasiveext.finally in
let destroy = Client.Client.Task.destroy in
Expand All @@ -675,27 +676,55 @@ let evacuate ~__context ~host ~network ~evacuate_batch_size =
fail task "unexpected status of migration task"
in

let rec loop xs =
match take n xs with
| [], _ ->
()
| head, tail ->
Helpers.call_api_functions ~__context @@ fun rpc session_id ->
let tasks = List.map (api_fn ~rpc ~session_id) head in
finally
(fun () ->
Tasks.wait_for_all ~rpc ~session_id ~tasks ;
List.iter assert_success tasks ;
let tail_length = List.length tail |> float in
let progress = 1.0 -. (tail_length /. plans_length) in
TaskHelper.set_progress ~__context progress
Helpers.call_api_functions ~__context @@ fun rpc session_id ->
( match take n xs with
| [], _ ->
()
| head, tasks_left ->
let tasks_left = ref tasks_left in
let initial_task_batch = List.map (api_fn ~rpc ~session_id) head in
let tasks_pending =
ref
(List.fold_left
(fun task_set' task -> Tasks.TaskSet.add task task_set')
Tasks.TaskSet.empty initial_task_batch
)
(fun () ->
List.iter (fun self -> destroy ~rpc ~session_id ~self) tasks
) ;
loop tail
in
loop xs ;
in

let single_task_progress = 1.0 /. plans_length in
let on_each_task_completion completed_task_count completed_task =
(* Clean up the completed task *)
assert_success completed_task ;
destroy ~rpc ~session_id ~self:completed_task ;
tasks_pending := Tasks.TaskSet.remove completed_task !tasks_pending ;

(* Update progress *)
let progress =
Int.to_float completed_task_count *. single_task_progress
in
TaskHelper.set_progress ~__context progress ;

(* Schedule a new task, if there are any left *)
match !tasks_left with
| [] ->
[]
| task_to_schedule :: left ->
tasks_left := left ;
let new_task = api_fn ~rpc ~session_id task_to_schedule in
tasks_pending := Tasks.TaskSet.add new_task !tasks_pending ;
[new_task]
in
finally
(fun () ->
Tasks.wait_for_all_with_callback ~rpc ~session_id
~tasks:initial_task_batch ~callback:on_each_task_completion
)
(fun () ->
Tasks.TaskSet.iter
(fun self -> destroy ~rpc ~session_id ~self)
!tasks_pending
)
) ;
TaskHelper.set_progress ~__context 1.0
in

Expand Down