1010
1111import org .apache .lucene .index .NoMergePolicy ;
1212import org .opensearch .action .admin .indices .streamingingestion .state .ShardIngestionState ;
13+ import org .opensearch .cluster .ClusterState ;
1314import org .opensearch .cluster .metadata .IndexMetadata ;
15+ import org .opensearch .cluster .service .ClusterApplierService ;
1416import org .opensearch .common .lucene .Lucene ;
1517import org .opensearch .common .settings .Settings ;
1618import org .opensearch .index .IndexSettings ;
4143import static org .awaitility .Awaitility .await ;
4244import static org .mockito .ArgumentMatchers .any ;
4345import static org .mockito .Mockito .doThrow ;
46+ import static org .mockito .Mockito .mock ;
4447import static org .mockito .Mockito .spy ;
48+ import static org .mockito .Mockito .when ;
4549
4650public class IngestionEngineTests extends EngineTestCase {
4751
@@ -51,6 +55,7 @@ public class IngestionEngineTests extends EngineTestCase {
5155 // the messages of the stream to ingest from
5256 private List <byte []> messages ;
5357 private EngineConfig engineConfig ;
58+ private ClusterApplierService clusterApplierService ;
5459
5560 @ Override
5661 @ Before
@@ -63,7 +68,9 @@ public void setUp() throws Exception {
6368 messages = new ArrayList <>();
6469 publishData ("{\" _id\" :\" 2\" ,\" _source\" :{\" name\" :\" bob\" , \" age\" : 24}}" );
6570 publishData ("{\" _id\" :\" 1\" ,\" _source\" :{\" name\" :\" alice\" , \" age\" : 20}}" );
66- ingestionEngine = buildIngestionEngine (globalCheckpoint , ingestionEngineStore , indexSettings );
71+ clusterApplierService = mock (ClusterApplierService .class );
72+ when (clusterApplierService .state ()).thenReturn (ClusterState .EMPTY_STATE );
73+ ingestionEngine = buildIngestionEngine (globalCheckpoint , ingestionEngineStore , indexSettings , clusterApplierService );
6774 }
6875
6976 private void publishData (String message ) {
@@ -134,7 +141,7 @@ public void testRecovery() throws IOException {
134141 publishData ("{\" _id\" :\" 3\" ,\" _source\" :{\" name\" :\" john\" , \" age\" : 30}}" );
135142 publishData ("{\" _id\" :\" 4\" ,\" _source\" :{\" name\" :\" jane\" , \" age\" : 25}}" );
136143 ingestionEngine .close ();
137- ingestionEngine = buildIngestionEngine (new AtomicLong (0 ), ingestionEngineStore , indexSettings );
144+ ingestionEngine = buildIngestionEngine (new AtomicLong (0 ), ingestionEngineStore , indexSettings , clusterApplierService );
138145 waitForResults (ingestionEngine , 4 );
139146 }
140147
@@ -163,7 +170,7 @@ public void testCreationFailure() throws IOException {
163170 // overwrite the config with ingestion engine settings
164171 String mapping = "{\" properties\" :{\" name\" :{\" type\" : \" text\" },\" age\" :{\" type\" : \" integer\" }}}}" ;
165172 MapperService mapperService = createMapperService (mapping );
166- engineConfig = config (engineConfig , () -> new DocumentMapperForType (mapperService .documentMapper (), null ));
173+ engineConfig = config (engineConfig , () -> new DocumentMapperForType (mapperService .documentMapper (), null ), clusterApplierService );
167174 try {
168175 new IngestionEngine (engineConfig , consumerFactory );
169176 fail ("Expected EngineException to be thrown" );
@@ -173,15 +180,15 @@ public void testCreationFailure() throws IOException {
173180 }
174181 }
175182
176- private IngestionEngine buildIngestionEngine (AtomicLong globalCheckpoint , Store store , IndexSettings settings ) throws IOException {
183+ private IngestionEngine buildIngestionEngine (AtomicLong globalCheckpoint , Store store , IndexSettings settings , ClusterApplierService clusterApplierService ) throws IOException {
177184 FakeIngestionSource .FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource .FakeIngestionConsumerFactory (messages );
178185 if (engineConfig == null ) {
179186 engineConfig = config (settings , store , createTempDir (), NoMergePolicy .INSTANCE , null , null , globalCheckpoint ::get );
180187 }
181188 // overwrite the config with ingestion engine settings
182189 String mapping = "{\" properties\" :{\" name\" :{\" type\" : \" text\" },\" age\" :{\" type\" : \" integer\" }}}}" ;
183190 MapperService mapperService = createMapperService (mapping );
184- engineConfig = config (engineConfig , () -> new DocumentMapperForType (mapperService .documentMapper (), null ));
191+ engineConfig = config (engineConfig , () -> new DocumentMapperForType (mapperService .documentMapper (), null ), clusterApplierService );
185192 if (!Lucene .indexExists (store .directory ())) {
186193 store .createEmpty (engineConfig .getIndexSettings ().getIndexVersionCreated ().luceneVersion );
187194 final String translogUuid = Translog .createEmptyTranslog (
0 commit comments