Skip to content

Commit e11cbed

Browse files
authored
Adding a refresh listener to a recovering shard should be a noop (#26055)
When `refresh=wait_for` is set on an indexing request, we register a listener on the shards that are call during the next refresh. During the recover translog phase, when the engine is open, we have a window of time when indexing operations succeed and they can add their listeners. Those listeners will only be called when the recovery finishes as we do not refresh during recoveries (unless the indexing buffer is full). Next to being a bad user experience, it can also cause deadlocks with an ongoing peer recovery that may wait for those operations to mark the replica in sync (details below). To fix this, this PR changes refresh listeners to be a noop when the shard is not yet serving reads (implicitly covering the recovery period). It doesn't matter anyway. Deadlock with recovery: When finalizing a peer recovery we mark the peer as "in sync". To do so we wait until the peer's local checkpoint is at least as high as the global checkpoint. If an operation with `refresh=wait_for` is added as a listener on that peer during recovery, it is not completed from the perspective of the primary. The primary than may wait for it to complete before advancing the local checkpoint for that peer. Since that peer is not considered in sync, the global checkpoint on the primary can be higher, causing a deadlock. Operation waits for recovery to finish and a refresh to happen. Recovery waits on the operation.
1 parent ad4dbbf commit e11cbed

File tree

5 files changed

+103
-57
lines changed

5 files changed

+103
-57
lines changed

core/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -675,24 +675,14 @@ private void maybeFSyncTranslogs() {
675675
private void maybeRefreshEngine() {
676676
if (indexSettings.getRefreshInterval().millis() > 0) {
677677
for (IndexShard shard : this.shards.values()) {
678-
switch (shard.state()) {
679-
case CREATED:
680-
case RECOVERING:
681-
case CLOSED:
682-
continue;
683-
case POST_RECOVERY:
684-
case STARTED:
685-
case RELOCATED:
686-
try {
687-
if (shard.isRefreshNeeded()) {
688-
shard.refresh("schedule");
689-
}
690-
} catch (IndexShardClosedException | AlreadyClosedException ex) {
691-
// fine - continue;
678+
if (shard.isReadAllowed()) {
679+
try {
680+
if (shard.isRefreshNeeded()) {
681+
shard.refresh("schedule");
692682
}
693-
continue;
694-
default:
695-
throw new IllegalStateException("unknown state: " + shard.state());
683+
} catch (IndexShardClosedException | AlreadyClosedException ex) {
684+
// fine - continue;
685+
}
696686
}
697687
}
698688
}

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -847,8 +847,7 @@ public long getWritingBytes() {
847847
}
848848

849849
public RefreshStats refreshStats() {
850-
// Null refreshListeners means this shard doesn't support them so there can't be any.
851-
int listeners = refreshListeners == null ? 0 : refreshListeners.pendingCount();
850+
int listeners = refreshListeners.pendingCount();
852851
return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()), listeners);
853852
}
854853

@@ -1155,6 +1154,10 @@ public IndexShard postRecovery(String reason) throws IndexShardStartedException,
11551154
if (state == IndexShardState.RELOCATED) {
11561155
throw new IndexShardRelocatedException(shardId);
11571156
}
1157+
// we need to refresh again to expose all operations that were index until now. Otherwise
1158+
// we may not expose operations that were indexed with a refresh listener that was immediately
1159+
// responded to in addRefreshListener.
1160+
getEngine().refresh("post_recovery");
11581161
recoveryState.setStage(RecoveryState.Stage.DONE);
11591162
changeState(IndexShardState.POST_RECOVERY, reason);
11601163
}
@@ -1324,6 +1327,7 @@ public void performRecoveryRestart() throws IOException {
13241327
if (state != IndexShardState.RECOVERING) {
13251328
throw new IndexShardNotRecoveringException(shardId, state);
13261329
}
1330+
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
13271331
final Engine engine = this.currentEngineReference.getAndSet(null);
13281332
IOUtils.close(engine);
13291333
recoveryState().setStage(RecoveryState.Stage.INIT);
@@ -1372,6 +1376,11 @@ public void readAllowed() throws IllegalIndexShardStateException {
13721376
}
13731377
}
13741378

1379+
/** returns true if the {@link IndexShardState} allows reading */
1380+
public boolean isReadAllowed() {
1381+
return readAllowedStates.contains(state);
1382+
}
1383+
13751384
private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException {
13761385
IndexShardState state = this.state; // one time volatile read
13771386

@@ -2356,7 +2365,23 @@ public boolean isRefreshNeeded() {
23562365
* false otherwise.
23572366
*/
23582367
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
2359-
refreshListeners.addOrNotify(location, listener);
2368+
final boolean readAllowed;
2369+
if (isReadAllowed()) {
2370+
readAllowed = true;
2371+
} else {
2372+
// check again under mutex. this is important to create a happens before relationship
2373+
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
2374+
// to a listener before a refresh actually happened that contained that operation.
2375+
synchronized (mutex) {
2376+
readAllowed = isReadAllowed();
2377+
}
2378+
}
2379+
if (readAllowed) {
2380+
refreshListeners.addOrNotify(location, listener);
2381+
} else {
2382+
// we're not yet ready fo ready for reads, just ignore refresh cycles
2383+
listener.accept(false);
2384+
}
23602385
}
23612386

23622387
private static class RefreshMetricUpdater implements ReferenceManager.RefreshListener {

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -993,7 +993,7 @@ public void testMinimumCompatVersion() throws IOException {
993993
.settings(settings)
994994
.primaryTerm(0, 1).build();
995995
IndexShard test = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
996-
recoveryShardFromStore(test);
996+
recoverShardFromStore(test);
997997

998998
indexDoc(test, "test", "test");
999999
assertEquals(versionCreated.luceneVersion, test.minimumCompatibleVersion());
@@ -1040,14 +1040,14 @@ public void testShardStats() throws IOException {
10401040

10411041
public void testRefreshMetric() throws IOException {
10421042
IndexShard shard = newStartedShard();
1043-
assertThat(shard.refreshStats().getTotal(), equalTo(2L)); // one refresh on end of recovery, one on starting shard
1043+
assertThat(shard.refreshStats().getTotal(), equalTo(3L)); // refresh on: finalize, end of recovery and on starting shard
10441044
long initialTotalTime = shard.refreshStats().getTotalTimeInMillis();
10451045
// check time advances
10461046
for (int i = 1; shard.refreshStats().getTotalTimeInMillis() == initialTotalTime; i++) {
10471047
indexDoc(shard, "test", "test");
1048-
assertThat(shard.refreshStats().getTotal(), equalTo(2L + i - 1));
1048+
assertThat(shard.refreshStats().getTotal(), equalTo(3L + i - 1));
10491049
shard.refresh("test");
1050-
assertThat(shard.refreshStats().getTotal(), equalTo(2L + i));
1050+
assertThat(shard.refreshStats().getTotal(), equalTo(3L + i));
10511051
assertThat(shard.refreshStats().getTotalTimeInMillis(), greaterThanOrEqualTo(initialTotalTime));
10521052
}
10531053
long refreshCount = shard.refreshStats().getTotal();
@@ -1130,7 +1130,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
11301130

11311131
}
11321132
});
1133-
recoveryShardFromStore(shard);
1133+
recoverShardFromStore(shard);
11341134

11351135
indexDoc(shard, "test", "1");
11361136
assertEquals(1, preIndex.get());
@@ -1679,7 +1679,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
16791679
IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()),
16801680
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null);
16811681

1682-
recoveryShardFromStore(newShard);
1682+
recoverShardFromStore(newShard);
16831683

16841684
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
16851685
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
@@ -1718,7 +1718,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
17181718
.settings(settings)
17191719
.primaryTerm(0, 1).build();
17201720
IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, wrapper);
1721-
recoveryShardFromStore(shard);
1721+
recoverShardFromStore(shard);
17221722
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
17231723
shard.refresh("created segment 1");
17241724
indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}");
@@ -1788,7 +1788,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
17881788
}
17891789
};
17901790
final IndexShard newShard = reinitShard(shard, listener);
1791-
recoveryShardFromStore(newShard);
1791+
recoverShardFromStore(newShard);
17921792
IndexingStats indexingStats = newShard.indexingStats();
17931793
// ensure we are not influencing the indexing stats
17941794
assertEquals(0, indexingStats.getTotal().getDeleteCount());
@@ -1824,7 +1824,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
18241824
IndexShard newShard = newShard(ShardRoutingHelper.reinitPrimary(shard.routingEntry()),
18251825
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null);
18261826

1827-
recoveryShardFromStore(newShard);
1827+
recoverShardFromStore(newShard);
18281828

18291829
try {
18301830
newShard.acquireSearcher("test");
@@ -1845,7 +1845,7 @@ public void testTranslogRecoverySyncsTranslog() throws IOException {
18451845
.settings(settings)
18461846
.primaryTerm(0, 1).build();
18471847
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
1848-
recoveryShardFromStore(primary);
1848+
recoverShardFromStore(primary);
18491849

18501850
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
18511851
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
@@ -1947,7 +1947,7 @@ public void testShardActiveDuringPeerRecovery() throws IOException {
19471947
.settings(settings)
19481948
.primaryTerm(0, 1).build();
19491949
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
1950-
recoveryShardFromStore(primary);
1950+
recoverShardFromStore(primary);
19511951

19521952
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
19531953
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
@@ -1978,6 +1978,58 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot
19781978
closeShards(primary, replica);
19791979
}
19801980

1981+
public void testRefreshListenersDuringPeerRecovery() throws IOException {
1982+
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
1983+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
1984+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
1985+
.build();
1986+
IndexMetaData metaData = IndexMetaData.builder("test")
1987+
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
1988+
.settings(settings)
1989+
.primaryTerm(0, 1).build();
1990+
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
1991+
recoverShardFromStore(primary);
1992+
1993+
indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}");
1994+
Consumer<IndexShard> assertListenerCalled = shard -> {
1995+
AtomicBoolean called = new AtomicBoolean();
1996+
shard.addRefreshListener(null, b -> {
1997+
assertFalse(b);
1998+
called.set(true);
1999+
});
2000+
assertTrue(called.get());
2001+
};
2002+
IndexShard replica = newShard(primary.shardId(), false, "n2", metaData, null);
2003+
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
2004+
replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode));
2005+
assertListenerCalled.accept(replica);
2006+
recoverReplica(replica, primary, (shard, discoveryNode) ->
2007+
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
2008+
}) {
2009+
// we're only checking that listeners are called when the engine is open, before there is no point
2010+
@Override
2011+
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
2012+
super.prepareForTranslogOperations(totalTranslogOps);
2013+
assertListenerCalled.accept(replica);
2014+
}
2015+
2016+
@Override
2017+
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
2018+
final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps);
2019+
assertListenerCalled.accept(replica);
2020+
return localCheckpoint;
2021+
}
2022+
2023+
@Override
2024+
public void finalizeRecovery(long globalCheckpoint) {
2025+
super.finalizeRecovery(globalCheckpoint);
2026+
assertListenerCalled.accept(replica);
2027+
}
2028+
}, false);
2029+
2030+
closeShards(primary, replica);
2031+
}
2032+
19812033
public void testRecoverFromLocalShard() throws IOException {
19822034
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
19832035
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
@@ -1989,7 +2041,7 @@ public void testRecoverFromLocalShard() throws IOException {
19892041
.primaryTerm(0, 1).build();
19902042

19912043
IndexShard sourceShard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
1992-
recoveryShardFromStore(sourceShard);
2044+
recoverShardFromStore(sourceShard);
19932045

19942046
indexDoc(sourceShard, "test", "0", "{\"foo\" : \"bar\"}");
19952047
indexDoc(sourceShard, "test", "1", "{\"foo\" : \"bar\"}");
@@ -2011,7 +2063,7 @@ public void testRecoverFromLocalShard() throws IOException {
20112063
};
20122064

20132065
final IndexShard differentIndex = newShard(new ShardId("index_2", "index_2", 0), true);
2014-
recoveryShardFromStore(differentIndex);
2066+
recoverShardFromStore(differentIndex);
20152067
expectThrows(IllegalArgumentException.class, () -> {
20162068
targetShard.recoverFromLocalShards(mappingConsumer, Arrays.asList(sourceShard, differentIndex));
20172069
});
@@ -2038,7 +2090,7 @@ public void testRecoverFromLocalShard() throws IOException {
20382090
// now check that it's persistent ie. that the added shards are committed
20392091
{
20402092
final IndexShard newShard = reinitShard(targetShard);
2041-
recoveryShardFromStore(newShard);
2093+
recoverShardFromStore(newShard);
20422094
assertDocCount(newShard, 2);
20432095
closeShards(newShard);
20442096
}

rest-api-spec/src/main/resources/rest-api-spec/test/bulk/50_refresh.yml

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,3 @@
1-
---
2-
setup:
3-
- do:
4-
cluster.put_settings:
5-
body:
6-
persistent:
7-
logger._root: debug
8-
9-
---
10-
teardown:
11-
- do:
12-
cluster.put_settings:
13-
body:
14-
persistent:
15-
# this is not exactly correct as tests could be running
16-
# under a different logging level; we sacrifice correctness
17-
# here for now in the hopes of quickly understanding what is
18-
# causing this test to fail and simply reverting the change
19-
# here
20-
logger._root: null
21-
221
---
232
"refresh=true immediately makes changes are visible in search":
243
- do:

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ protected IndexShard newStartedShard() throws IOException {
329329
protected IndexShard newStartedShard(boolean primary) throws IOException {
330330
IndexShard shard = newShard(primary);
331331
if (primary) {
332-
recoveryShardFromStore(shard);
332+
recoverShardFromStore(shard);
333333
} else {
334334
recoveryEmptyReplica(shard);
335335
}
@@ -352,7 +352,7 @@ protected void closeShards(Iterable<IndexShard> shards) throws IOException {
352352
}
353353
}
354354

355-
protected void recoveryShardFromStore(IndexShard primary) throws IOException {
355+
protected void recoverShardFromStore(IndexShard primary) throws IOException {
356356
primary.markAsRecovering("store", new RecoveryState(primary.routingEntry(),
357357
getFakeDiscoNode(primary.routingEntry().currentNodeId()),
358358
null));

0 commit comments

Comments
 (0)