Skip to content

Commit 1c58b78

Browse files
Update poller from cluster state applier to listener
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent f7a6892 commit 1c58b78

File tree

10 files changed

+32
-28
lines changed

10 files changed

+32
-28
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ public static final IndexShard newIndexShard(
729729
OpenSearchTestCase::randomBoolean,
730730
() -> indexService.getIndexSettings().getRefreshInterval(),
731731
indexService.getRefreshMutex(),
732-
clusterService
732+
clusterService.getClusterApplierService()
733733
);
734734
}
735735

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ protected void closeInternal() {
728728
fixedRefreshIntervalSchedulingEnabled,
729729
this::getRefreshInterval,
730730
refreshMutex,
731-
clusterService
731+
clusterService.getClusterApplierService()
732732
);
733733
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
734734
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/opensearch/index/engine/EngineConfig.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import org.apache.lucene.search.ReferenceManager;
4242
import org.apache.lucene.search.Sort;
4343
import org.apache.lucene.search.similarities.Similarity;
44-
import org.opensearch.cluster.service.ClusterService;
44+
import org.opensearch.cluster.service.ClusterApplierService;
4545
import org.opensearch.common.Nullable;
4646
import org.opensearch.common.annotation.PublicApi;
4747
import org.opensearch.common.settings.Setting;
@@ -114,7 +114,7 @@ public final class EngineConfig {
114114
private final BooleanSupplier startedPrimarySupplier;
115115
private final Comparator<LeafReader> leafSorter;
116116
private final Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
117-
private final ClusterService clusterService;
117+
private final ClusterApplierService clusterApplierService;
118118

119119
/**
120120
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@@ -305,7 +305,7 @@ private EngineConfig(Builder builder) {
305305
this.leafSorter = builder.leafSorter;
306306
this.documentMapperForTypeSupplier = builder.documentMapperForTypeSupplier;
307307
this.indexReaderWarmer = builder.indexReaderWarmer;
308-
this.clusterService = builder.clusterService;
308+
this.clusterApplierService = builder.clusterApplierService;
309309
}
310310

311311
/**
@@ -580,10 +580,10 @@ public Comparator<LeafReader> getLeafSorter() {
580580
}
581581

582582
/**
583-
* Returns the ClusterService instance.
583+
* Returns the ClusterApplierService instance.
584584
*/
585-
public ClusterService getClusterService() {
586-
return this.clusterService;
585+
public ClusterApplierService getClusterApplierService() {
586+
return this.clusterApplierService;
587587
}
588588

589589
/**
@@ -621,7 +621,7 @@ public static class Builder {
621621
private Supplier<DocumentMapperForType> documentMapperForTypeSupplier;
622622
Comparator<LeafReader> leafSorter;
623623
private IndexWriter.IndexReaderWarmer indexReaderWarmer;
624-
private ClusterService clusterService;
624+
private ClusterApplierService clusterApplierService;
625625

626626
public Builder shardId(ShardId shardId) {
627627
this.shardId = shardId;
@@ -768,8 +768,8 @@ public Builder indexReaderWarmer(IndexWriter.IndexReaderWarmer indexReaderWarmer
768768
return this;
769769
}
770770

771-
public Builder clusterService(ClusterService clusterService) {
772-
this.clusterService = clusterService;
771+
public Builder clusterApplierService(ClusterApplierService clusterApplierService) {
772+
this.clusterApplierService = clusterApplierService;
773773
return this;
774774
}
775775

server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.apache.lucene.search.ReferenceManager;
1919
import org.apache.lucene.search.Sort;
2020
import org.apache.lucene.search.similarities.Similarity;
21-
import org.opensearch.cluster.service.ClusterService;
21+
import org.opensearch.cluster.service.ClusterApplierService;
2222
import org.opensearch.common.Nullable;
2323
import org.opensearch.common.unit.TimeValue;
2424
import org.opensearch.core.index.shard.ShardId;
@@ -160,7 +160,7 @@ public EngineConfig newEngineConfig(
160160
Comparator<LeafReader> leafSorter,
161161
Supplier<DocumentMapperForType> documentMapperForTypeSupplier,
162162
IndexWriter.IndexReaderWarmer indexReaderWarmer,
163-
ClusterService clusterService
163+
ClusterApplierService clusterApplierService
164164
) {
165165
CodecService codecServiceToUse = codecService;
166166
if (codecService == null && this.codecServiceFactory != null) {
@@ -196,7 +196,7 @@ public EngineConfig newEngineConfig(
196196
.leafSorter(leafSorter)
197197
.documentMapperForTypeSupplier(documentMapperForTypeSupplier)
198198
.indexReaderWarmer(indexReaderWarmer)
199-
.clusterService(clusterService)
199+
.clusterApplierService(clusterApplierService)
200200
.build();
201201
}
202202

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ public void start() {
136136

137137
// Register the poller with the ClusterService for receiving cluster state updates.
138138
// Also initialize cluster write block state in the poller.
139-
if (engineConfig.getClusterService() != null) {
140-
engineConfig.getClusterService().addStateApplier(streamPoller);
141-
boolean isWriteBlockEnabled = engineConfig.getClusterService()
139+
if (engineConfig.getClusterApplierService() != null) {
140+
engineConfig.getClusterApplierService().addListener(streamPoller);
141+
boolean isWriteBlockEnabled = engineConfig.getClusterApplierService()
142142
.state()
143143
.blocks()
144144
.indexBlocked(ClusterBlockLevel.WRITE, engineConfig.getIndexSettings().getIndex().getName());

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
7777
import org.opensearch.cluster.routing.ShardRouting;
7878
import org.opensearch.cluster.routing.ShardRoutingState;
79-
import org.opensearch.cluster.service.ClusterService;
79+
import org.opensearch.cluster.service.ClusterApplierService;
8080
import org.opensearch.common.Booleans;
8181
import org.opensearch.common.CheckedConsumer;
8282
import org.opensearch.common.CheckedFunction;
@@ -376,7 +376,7 @@ Runnable getGlobalCheckpointSyncer() {
376376
private final Supplier<TimeValue> refreshInterval;
377377
private final Object refreshMutex;
378378
private volatile AsyncShardRefreshTask refreshTask;
379-
private final ClusterService clusterService;
379+
private final ClusterApplierService clusterApplierService;
380380

381381
public IndexShard(
382382
final ShardRouting shardRouting,
@@ -414,7 +414,7 @@ public IndexShard(
414414
final Supplier<Boolean> fixedRefreshIntervalSchedulingEnabled,
415415
final Supplier<TimeValue> refreshInterval,
416416
final Object refreshMutex,
417-
final ClusterService clusterService
417+
final ClusterApplierService clusterApplierService
418418
) throws IOException {
419419
super(shardRouting.shardId(), indexSettings);
420420
assert shardRouting.initializing();
@@ -521,7 +521,7 @@ public boolean shouldCache(Query query) {
521521
this.fixedRefreshIntervalSchedulingEnabled = fixedRefreshIntervalSchedulingEnabled;
522522
this.refreshInterval = refreshInterval;
523523
this.refreshMutex = Objects.requireNonNull(refreshMutex);
524-
this.clusterService = clusterService;
524+
this.clusterApplierService = clusterApplierService;
525525
synchronized (this.refreshMutex) {
526526
if (shardLevelRefreshEnabled) {
527527
startRefreshTask();
@@ -4132,7 +4132,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
41324132
// timeseries
41334133
() -> docMapper(),
41344134
mergedSegmentWarmerFactory.get(this),
4135-
clusterService
4135+
clusterApplierService
41364136
);
41374137
}
41384138

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,8 +441,12 @@ public void setWriteBlockEnabled(boolean isWriteBlockEnabled) {
441441
}
442442

443443
@Override
444-
public void applyClusterState(final ClusterChangedEvent event) {
444+
public void clusterChanged(ClusterChangedEvent event) {
445445
try {
446+
if (event.blocksChanged() == false) {
447+
return;
448+
}
449+
446450
final ClusterState state = event.state();
447451
isWriteBlockEnabled = state.blocks().indexBlocked(ClusterBlockLevel.WRITE, indexName);
448452

server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
package org.opensearch.indices.pollingingest;
1010

11-
import org.opensearch.cluster.ClusterStateApplier;
11+
import org.opensearch.cluster.ClusterStateListener;
1212
import org.opensearch.common.annotation.ExperimentalApi;
1313
import org.opensearch.index.IngestionShardPointer;
1414

@@ -17,7 +17,7 @@
1717
/**
1818
* A poller for reading messages from an ingestion shard. This is used in the ingestion engine.
1919
*/
20-
public interface StreamPoller extends Closeable, ClusterStateApplier {
20+
public interface StreamPoller extends Closeable, ClusterStateListener {
2121

2222
String BATCH_START = "batch_start";
2323

server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,13 +504,13 @@ public void testClusterStateChange() {
504504
.build();
505505

506506
ClusterChangedEvent event1 = new ClusterChangedEvent("test", state2, state1);
507-
poller.applyClusterState(event1);
507+
poller.clusterChanged(event1);
508508
assertTrue(poller.isWriteBlockEnabled());
509509

510510
// remove write block
511511
ClusterState state3 = ClusterState.builder(ClusterName.DEFAULT).build();
512512
ClusterChangedEvent event2 = new ClusterChangedEvent("test", state3, state2);
513-
poller.applyClusterState(event2);
513+
poller.clusterChanged(event2);
514514
assertFalse(poller.isWriteBlockEnabled());
515515

516516
}

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ protected IndexShard newShard(
729729
() -> Boolean.FALSE,
730730
indexSettings::getRefreshInterval,
731731
new Object(),
732-
clusterService
732+
clusterService.getClusterApplierService()
733733
);
734734
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
735735
if (remoteStoreStatsTrackerFactory != null) {

0 commit comments

Comments
 (0)