1212// See the License for the specific language governing permissions and 
1313// limitations under the License. 
1414
15- use  std:: cmp:: Reverse ; 
15+ use  std:: cmp:: { Ordering ,   Reverse } ; 
1616use  std:: collections:: BTreeMap ; 
1717use  std:: collections:: btree_map:: Entry ; 
1818
19- use  itertools:: Itertools ; 
2019use  quickwit_proto:: indexing:: CpuCapacity ; 
2120
2221use  super :: scheduling_logic_model:: * ; 
@@ -229,21 +228,61 @@ fn assert_enforce_nodes_cpu_capacity_post_condition(
229228// If this algorithm fails to place all remaining shards, we inflate 
230229// the node capacities by 20% in the scheduling problem and start from the beginning. 
231230
231+ #[ derive( Debug ,  PartialEq ,  Eq ,  Ord ) ]  
232+ struct  PlacementCandidate  { 
233+     indexer_ord :  IndexerOrd , 
234+     current_num_shards :  u32 , 
235+     available_capacity :  CpuCapacity , 
236+     affinity :  u32 , 
237+ } 
238+ 
239+ impl  PartialOrd  for  PlacementCandidate  { 
240+     fn  partial_cmp ( & self ,  other :  & Self )  -> Option < Ordering >  { 
241+         // Higher affinity is better 
242+         match  self . affinity . cmp ( & other. affinity )  { 
243+             Ordering :: Equal  => { } 
244+             ordering => return  Some ( ordering. reverse ( ) ) , 
245+         } 
246+         // If tie, pick the node with shards already assigned first 
247+         match  self . current_num_shards . cmp ( & other. current_num_shards )  { 
248+             Ordering :: Equal  => { } 
249+             ordering => return  Some ( ordering. reverse ( ) ) , 
250+         } 
251+         // If tie, pick the node with the highest available capacity 
252+         match  self . available_capacity . cmp ( & other. available_capacity )  { 
253+             Ordering :: Equal  => { } 
254+             ordering => return  Some ( ordering. reverse ( ) ) , 
255+         } 
256+         // Final tie-breaker: indexer ID for deterministic ordering 
257+         Some ( self . indexer_ord . cmp ( & other. indexer_ord ) . reverse ( ) ) 
258+     } 
259+ } 
260+ 
232261fn  attempt_place_unassigned_shards ( 
233262    unassigned_shards :  & [ Source ] , 
234263    problem :  & SchedulingProblem , 
235264    partial_solution :  & SchedulingSolution , 
236265)  -> Result < SchedulingSolution ,  NotEnoughCapacity >  { 
237266    let  mut  solution = partial_solution. clone ( ) ; 
238267    for  source in  unassigned_shards { 
239-         let  indexers_with_most_available_capacity =
240-             compute_indexer_available_capacity ( problem,  & solution) 
241-                 . sorted_by_key ( |( indexer_ord,  capacity) | Reverse ( ( * capacity,  * indexer_ord) ) ) ; 
242-         place_unassigned_shards_single_source ( 
243-             source, 
244-             indexers_with_most_available_capacity, 
245-             & mut  solution, 
246-         ) ?; 
268+         let  mut  placements:  Vec < PlacementCandidate >  = solution
269+             . indexer_assignments 
270+             . iter ( ) 
271+             . map ( |indexer_assignment :  & IndexerAssignment | { 
272+                 let  available_capacity = indexer_assignment. indexer_available_capacity ( problem) ; 
273+                 assert ! ( available_capacity >= 0i32 ) ; 
274+                 let  available_capacity = CpuCapacity :: from_cpu_millis ( available_capacity as  u32 ) ; 
275+                 let  current_num_shards = indexer_assignment. num_shards ( source. source_ord ) ; 
276+                 PlacementCandidate  { 
277+                     affinity :  0 , 
278+                     current_num_shards, 
279+                     available_capacity, 
280+                     indexer_ord :  indexer_assignment. indexer_ord , 
281+                 } 
282+             } ) 
283+             . collect ( ) ; 
284+         placements. sort ( ) ; 
285+         place_unassigned_shards_single_source ( source,  & placements,  & mut  solution) ?; 
247286    } 
248287    assert_place_unassigned_shards_post_condition ( problem,  & solution) ; 
249288    Ok ( solution) 
@@ -259,27 +298,26 @@ fn place_unassigned_shards_with_affinity(
259298        Reverse ( load) 
260299    } ) ; 
261300    for  source in  & unassigned_shards { 
262-         // List of indexer with a non-null affinity and some available capacity, sorted by 
263-         // (affinity, available capacity) in that order. 
264-         let  indexers_with_affinity_and_available_capacity = source
301+         let  mut  placements:  Vec < PlacementCandidate >  = source
265302            . affinities 
266303            . iter ( ) 
267304            . filter ( |& ( _,  & affinity) | affinity != 0u32 ) 
268-             . map ( |( & indexer_ord,  affinity) | { 
305+             . map ( |( & indexer_ord,  & affinity) | { 
269306                let  available_capacity =
270307                    solution. indexer_assignments [ indexer_ord] . indexer_available_capacity ( problem) ; 
271-                 let  capacity = CpuCapacity :: from_cpu_millis ( available_capacity as  u32 ) ; 
272-                 ( indexer_ord,  affinity,  capacity) 
273-             } ) 
274-             . sorted_by_key ( |( indexer_ord,  affinity,  capacity) | { 
275-                 Reverse ( ( * affinity,  * capacity,  * indexer_ord) ) 
308+                 let  available_capacity = CpuCapacity :: from_cpu_millis ( available_capacity as  u32 ) ; 
309+                 let  current_num_shards =
310+                     solution. indexer_assignments [ indexer_ord] . num_shards ( source. source_ord ) ; 
311+                 PlacementCandidate  { 
312+                     affinity, 
313+                     current_num_shards, 
314+                     available_capacity, 
315+                     indexer_ord, 
316+                 } 
276317            } ) 
277-             . map ( |( indexer_ord,  _,  capacity) | ( indexer_ord,  capacity) ) ; 
278-         let  _ = place_unassigned_shards_single_source ( 
279-             source, 
280-             indexers_with_affinity_and_available_capacity, 
281-             solution, 
282-         ) ; 
318+             . collect ( ) ; 
319+         placements. sort ( ) ; 
320+         let  _ = place_unassigned_shards_single_source ( source,  & placements,  solution) ; 
283321    } 
284322} 
285323
@@ -348,22 +386,27 @@ struct NotEnoughCapacity;
348386/// amongst the node with their given node capacity. 
349387fn  place_unassigned_shards_single_source ( 
350388    source :  & Source , 
351-     mut   indexer_with_capacities :   impl   Iterator < Item  =  ( IndexerOrd ,   CpuCapacity ) > , 
389+     sorted_candidates :   & [ PlacementCandidate ] , 
352390    solution :  & mut  SchedulingSolution , 
353391)  -> Result < ( ) ,  NotEnoughCapacity >  { 
354392    let  mut  num_shards = source. num_shards ; 
355-     while  num_shards > 0  { 
356-         let  Some ( ( indexer_ord,  available_capacity) )  = indexer_with_capacities. next ( )  else  { 
357-             return  Err ( NotEnoughCapacity ) ; 
358-         } ; 
393+     for  PlacementCandidate  { 
394+         indexer_ord, 
395+         available_capacity, 
396+         ..
397+     }  in  sorted_candidates
398+     { 
359399        let  num_placable_shards = available_capacity. cpu_millis ( )  / source. load_per_shard ; 
360400        let  num_shards_to_place = num_placable_shards. min ( num_shards) ; 
361401        // Update the solution, the shard load, and the number of shards to place. 
362-         solution. indexer_assignments [ indexer_ord] 
402+         solution. indexer_assignments [ * indexer_ord] 
363403            . add_shards ( source. source_ord ,  num_shards_to_place) ; 
364404        num_shards -= num_shards_to_place; 
405+         if  num_shards == 0  { 
406+             return  Ok ( ( ) ) ; 
407+         } 
365408    } 
366-     Ok ( ( ) ) 
409+     Err ( NotEnoughCapacity ) 
367410} 
368411
369412/// Compute the sources/shards that have not been assigned to any indexer yet. 
@@ -392,30 +435,11 @@ fn compute_unassigned_sources(
392435    unassigned_sources. into_values ( ) . collect ( ) 
393436} 
394437
395- /// Builds a BinaryHeap with the different indexer capacities. 
396- /// 
397- /// Panics if one of the indexer is over-assigned. 
398- fn  compute_indexer_available_capacity < ' a > ( 
399-     problem :  & ' a  SchedulingProblem , 
400-     solution :  & ' a  SchedulingSolution , 
401- )  -> impl  Iterator < Item  = ( IndexerOrd ,  CpuCapacity ) >  + ' a  { 
402-     solution
403-         . indexer_assignments 
404-         . iter ( ) 
405-         . map ( |indexer_assignment| { 
406-             let  available_capacity:  i32  = indexer_assignment. indexer_available_capacity ( problem) ; 
407-             assert ! ( available_capacity >= 0i32 ) ; 
408-             ( 
409-                 indexer_assignment. indexer_ord , 
410-                 CpuCapacity :: from_cpu_millis ( available_capacity as  u32 ) , 
411-             ) 
412-         } ) 
413- } 
414- 
415438#[ cfg( test) ]  
416439mod  tests { 
417440    use  std:: num:: NonZeroU32 ; 
418441
442+     use  itertools:: Itertools ; 
419443    use  proptest:: prelude:: * ; 
420444    use  quickwit_proto:: indexing:: mcpu; 
421445
0 commit comments