Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions ocaml/xapi-client/tasks.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module TaskSet = Set.Make (struct
end)

(* Return once none of the tasks have a `pending status. *)
let wait_for_all_inner ~rpc ~session_id ~all_timeout ~tasks =
let wait_for_all_inner ~rpc ~session_id ~all_timeout ~tasks ~callback =
let classes =
List.map (fun task -> Printf.sprintf "task/%s" (Ref.string_of task)) tasks
in
Expand All @@ -36,7 +36,12 @@ let wait_for_all_inner ~rpc ~session_id ~all_timeout ~tasks =
in
let timer = Mtime_clock.counter () in
let timeout = 5.0 in
let rec wait ~token ~task_set =
let get_new_classes task_set =
TaskSet.fold
(fun task l -> Printf.sprintf "task/%s" (Ref.string_of task) :: l)
task_set []
in
let rec wait ~token ~task_set ~completed_task_count ~classes =
if TaskSet.is_empty task_set then
true
else
Expand All @@ -58,42 +63,67 @@ let wait_for_all_inner ~rpc ~session_id ~all_timeout ~tasks =
List.map Event_helper.record_of_event event_from.events
in
(* If any records indicate that a task is no longer pending, remove that task from the set. *)
let pending_task_set =
let pending_task_set, completed_task_count, classes =
List.fold_left
(fun task_set' record ->
(fun (task_set', completed_task_count, _) record ->
match record with
| Event_helper.Task (t, Some t_rec) ->
if
TaskSet.mem t task_set'
&& t_rec.API.task_status <> `pending
then
TaskSet.remove t task_set'
let new_task_set = TaskSet.remove t task_set' in
let completed_task_count = completed_task_count + 1 in

(* Call the callback function, wait for new tasks if any *)
let tasks_to_add = callback completed_task_count t in
let new_task_set =
List.fold_left
(fun task_set task -> TaskSet.add task task_set)
new_task_set tasks_to_add
in
( new_task_set
, completed_task_count
, get_new_classes new_task_set
)
else
task_set'
(task_set', completed_task_count, classes)
| _ ->
task_set'
(task_set', completed_task_count, classes)
)
task_set records
(task_set, completed_task_count, classes)
records
in
wait ~token:event_from.Event_types.token ~task_set:pending_task_set
~completed_task_count ~classes
in
let token = "" in
let task_set =
List.fold_left
(fun task_set' task -> TaskSet.add task task_set')
TaskSet.empty tasks
in
wait ~token ~task_set
wait ~token ~task_set ~completed_task_count:0 ~classes

let wait_for_all ~rpc ~session_id ~tasks =
wait_for_all_inner ~rpc ~session_id ~all_timeout:None ~tasks |> ignore
wait_for_all_inner ~rpc ~session_id ~all_timeout:None ~tasks
~callback:(fun _ _ -> []
)
|> ignore

let wait_for_all_with_callback ~rpc ~session_id ~tasks ~callback =
wait_for_all_inner ~rpc ~session_id ~all_timeout:None ~tasks ~callback
|> ignore

let with_tasks_destroy ~rpc ~session_id ~timeout ~tasks =
let wait_or_cancel () =
D.info "Waiting for %d tasks, timeout: %.3fs" (List.length tasks) timeout ;
if
not
(wait_for_all_inner ~rpc ~session_id ~all_timeout:(Some timeout) ~tasks)
(wait_for_all_inner ~rpc ~session_id ~all_timeout:(Some timeout) ~tasks
~callback:(fun _ _ -> []
)
)
then (
D.info "Canceling tasks" ;
List.iter
Expand All @@ -104,6 +134,8 @@ let with_tasks_destroy ~rpc ~session_id ~timeout ~tasks =
tasks ;
(* cancel is not immediate, give it a reasonable chance to take effect *)
wait_for_all_inner ~rpc ~session_id ~all_timeout:(Some 60.) ~tasks
~callback:(fun _ _ -> []
)
|> ignore ;
false
) else
Expand Down
23 changes: 23 additions & 0 deletions ocaml/xapi-client/tasks.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* GNU Lesser General Public License for more details.
*)

module TaskSet : Set.S with type elt = API.ref_task

val wait_for_all :
rpc:(Rpc.call -> Rpc.response)
-> session_id:API.ref_session
Expand All @@ -20,6 +22,27 @@ val wait_for_all :
(** [wait_for_all ~rpc ~session_id ~tasks] returns when all of [tasks]
are in some non-pending state. *)

val wait_for_all_with_callback :
rpc:(Rpc.call -> Rpc.response)
-> session_id:API.ref_session
-> tasks:API.ref_task list
-> callback:(int -> API.ref_task -> API.ref_task list)
-> unit
(** [wait_for_all_with_callback ~rpc ~session_id ~tasks ~callback] returns when
all of [tasks] are in some non-pending state. When one of the [tasks] is
completed, [callback overall_completed_task_count] is invoked, which returns
a list of tasks that need to be added to [tasks] and waited on as well.

This allows, for example, to implement a system where tasks are processed
in batches of *constant* size X, with new tasks being started as soon as at
least one slot in the batch is freed, instead of waiting for the whole batch
to finish (and potentially being slowed down by a single worst performer).

The callback could instead just perform some side-effect (set the progress
of the overall task representing progress of individual units, for example)
and return an empty list.
*)

val with_tasks_destroy :
rpc:(Rpc.call -> Rpc.response)
-> session_id:API.ref_session
Expand Down
73 changes: 51 additions & 22 deletions ocaml/xapi/xapi_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,9 @@ let evacuate ~__context ~host ~network ~evacuate_batch_size =
raise (Api_errors.Server_error (code, params))
in

(* execute [n] asynchronous API calls [api_fn] for [xs] and wait for them to
finish before executing the next batch. *)
(* execute [plans_length] asynchronous API calls [api_fn] for [xs] in batches
of [n] at a time, scheduling a new call as soon as one of the tasks from
the previous batch is completed *)
let batch ~__context n api_fn xs =
let finally = Xapi_stdext_pervasives.Pervasiveext.finally in
let destroy = Client.Client.Task.destroy in
Expand All @@ -675,27 +676,55 @@ let evacuate ~__context ~host ~network ~evacuate_batch_size =
fail task "unexpected status of migration task"
in

let rec loop xs =
match take n xs with
| [], _ ->
()
| head, tail ->
Helpers.call_api_functions ~__context @@ fun rpc session_id ->
let tasks = List.map (api_fn ~rpc ~session_id) head in
finally
(fun () ->
Tasks.wait_for_all ~rpc ~session_id ~tasks ;
List.iter assert_success tasks ;
let tail_length = List.length tail |> float in
let progress = 1.0 -. (tail_length /. plans_length) in
TaskHelper.set_progress ~__context progress
Helpers.call_api_functions ~__context @@ fun rpc session_id ->
( match take n xs with
| [], _ ->
()
| head, tasks_left ->
let tasks_left = ref tasks_left in
let initial_task_batch = List.map (api_fn ~rpc ~session_id) head in
let tasks_pending =
ref
(List.fold_left
(fun task_set' task -> Tasks.TaskSet.add task task_set')
Tasks.TaskSet.empty initial_task_batch
)
(fun () ->
List.iter (fun self -> destroy ~rpc ~session_id ~self) tasks
) ;
loop tail
in
loop xs ;
in

let single_task_progress = 1.0 /. plans_length in
let on_each_task_completion completed_task_count completed_task =
(* Clean up the completed task *)
assert_success completed_task ;
destroy ~rpc ~session_id ~self:completed_task ;
tasks_pending := Tasks.TaskSet.remove completed_task !tasks_pending ;

(* Update progress *)
let progress =
Int.to_float completed_task_count *. single_task_progress
in
TaskHelper.set_progress ~__context progress ;

(* Schedule a new task, if there are any left *)
match !tasks_left with
| [] ->
[]
| task_to_schedule :: left ->
tasks_left := left ;
let new_task = api_fn ~rpc ~session_id task_to_schedule in
tasks_pending := Tasks.TaskSet.add new_task !tasks_pending ;
[new_task]
in
finally
(fun () ->
Tasks.wait_for_all_with_callback ~rpc ~session_id
~tasks:initial_task_batch ~callback:on_each_task_completion
)
(fun () ->
Tasks.TaskSet.iter
(fun self -> destroy ~rpc ~session_id ~self)
!tasks_pending
)
) ;
TaskHelper.set_progress ~__context 1.0
in

Expand Down