@@ -26,6 +26,7 @@ use scheduling_logic_model::{IndexerOrd, SourceOrd};
2626use tracing:: { error, warn} ;
2727
2828use crate :: indexing_plan:: PhysicalIndexingPlan ;
29+ use crate :: indexing_scheduler:: MAX_LOAD_PER_PIPELINE ;
2930use crate :: indexing_scheduler:: scheduling:: scheduling_logic_model:: {
3031 IndexerAssignment , SchedulingProblem , SchedulingSolution ,
3132} ;
@@ -44,9 +45,6 @@ use crate::model::ShardLocations;
4445/// of 30%. Which translates into an overall load of 60%.
4546const CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD : CpuCapacity = CpuCapacity :: from_cpu_millis ( 1_200 ) ;
4647
47- /// That's 80% of a period
48- const MAX_LOAD_PER_PIPELINE : CpuCapacity = CpuCapacity :: from_cpu_millis ( 3_200 ) ;
49-
5048fn populate_problem (
5149 source : & SourceToSchedule ,
5250 problem : & mut SchedulingProblem ,
@@ -748,7 +746,7 @@ mod tests {
748746
749747 use fnv:: FnvHashMap ;
750748 use itertools:: Itertools ;
751- use quickwit_proto:: indexing:: { CpuCapacity , IndexingTask , mcpu} ;
749+ use quickwit_proto:: indexing:: { CpuCapacity , IndexingTask , PIPELINE_FULL_CAPACITY , mcpu} ;
752750 use quickwit_proto:: types:: { IndexUid , NodeId , PipelineUid , ShardId , SourceUid } ;
753751 use rand:: seq:: SliceRandom ;
754752
@@ -939,6 +937,71 @@ mod tests {
939937 }
940938 }
941939
940+ #[ test]
941+ fn test_build_physical_plan_with_pipeline_limit ( ) {
942+ let indexer1 = "indexer1" . to_string ( ) ;
943+ let indexer2 = "indexer2" . to_string ( ) ;
944+ let source_uid0 = source_id ( ) ;
945+ let source_uid1 = source_id ( ) ;
946+ let source_0 = SourceToSchedule {
947+ source_uid : source_uid0. clone ( ) ,
948+ source_type : SourceToScheduleType :: Sharded {
949+ shard_ids : ( 0 ..16 ) . map ( ShardId :: from) . collect ( ) ,
950+ load_per_shard : NonZeroU32 :: new ( 800 ) . unwrap ( ) ,
951+ } ,
952+ params_fingerprint : 0 ,
953+ } ;
954+ let source_1 = SourceToSchedule {
955+ source_uid : source_uid1. clone ( ) ,
956+ source_type : SourceToScheduleType :: NonSharded {
957+ num_pipelines : 4 ,
958+ load_per_pipeline : NonZeroU32 :: new ( PIPELINE_FULL_CAPACITY . cpu_millis ( ) ) . unwrap ( ) ,
959+ } ,
960+ params_fingerprint : 0 ,
961+ } ;
962+ let mut indexer_id_to_cpu_capacities = FnvHashMap :: default ( ) ;
963+ indexer_id_to_cpu_capacities. insert ( indexer1. clone ( ) , mcpu ( 16_000 ) ) ;
964+ indexer_id_to_cpu_capacities. insert ( indexer2. clone ( ) , mcpu ( 16_000 ) ) ;
965+ let shard_locations = ShardLocations :: default ( ) ;
966+ let indexing_plan = build_physical_indexing_plan (
967+ & [ source_0, source_1] ,
968+ & indexer_id_to_cpu_capacities,
969+ None ,
970+ & shard_locations,
971+ ) ;
972+ assert_eq ! ( indexing_plan. indexing_tasks_per_indexer( ) . len( ) , 2 ) ;
973+
974+ let node1_plan = indexing_plan. indexer ( & indexer1) . unwrap ( ) ;
975+ let node2_plan = indexing_plan. indexer ( & indexer2) . unwrap ( ) ;
976+
977+ println ! ( "node1_plan: {node1_plan:#?}" ) ;
978+ println ! ( "node2_plan: {node2_plan:#?}" ) ;
979+
980+ let source_0_on_node1 = node1_plan
981+ . iter ( )
982+ . filter ( |task| task. source_id == source_uid0. source_id )
983+ . count ( ) ;
984+ let source_0_on_node2 = node2_plan
985+ . iter ( )
986+ . filter ( |task| task. source_id == source_uid0. source_id )
987+ . count ( ) ;
988+ assert ! ( source_0_on_node1 <= 3 ) ;
989+ assert ! ( source_0_on_node2 <= 3 ) ;
990+ assert_eq ! ( source_0_on_node1 + source_0_on_node2, 4 ) ;
991+
992+ let source_1_on_node1 = node1_plan
993+ . iter ( )
994+ . filter ( |task| task. source_id == source_uid1. source_id )
995+ . count ( ) ;
996+ let source_1_on_node2 = node2_plan
997+ . iter ( )
998+ . filter ( |task| task. source_id == source_uid1. source_id )
999+ . count ( ) ;
1000+ assert ! ( source_1_on_node1 <= 3 ) ;
1001+ assert ! ( source_1_on_node2 <= 3 ) ;
1002+ assert_eq ! ( source_1_on_node1 + source_1_on_node2, 4 ) ;
1003+ }
1004+
9421005 fn make_indexing_tasks (
9431006 source_uid : & SourceUid ,
9441007 shards : & [ ( PipelineUid , & [ ShardId ] ) ] ,
0 commit comments