diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d4fe2e8f9b7d2..784bfd273c227 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -213,8 +213,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; + // ensure happens-before relation between addRefreshListener() and postRecovery() + private final Object postRecoveryMutex = new Object(); private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm - protected final AtomicReference currentEngineReference = new AtomicReference<>(); + private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex + private final AtomicReference currentEngineReference = new AtomicReference<>(); final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; @@ -1192,20 +1195,23 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { * @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present. */ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { + assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex"; Engine.IndexCommitRef indexCommit = null; store.incRef(); try { - Engine engine; - synchronized (mutex) { + synchronized (engineMutex) { // if the engine is not running, we can access the store directly, but we need to make sure no one starts - // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. - // That can be done out of mutex, since the engine can be closed half way. - engine = getEngineOrNull(); - if (engine == null) { + // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine. + synchronized (mutex) { + final Engine engine = getEngineOrNull(); + if (engine != null) { + indexCommit = engine.acquireLastIndexCommit(false); + } + } + if (indexCommit == null) { return store.getMetadata(null, true); } } - indexCommit = engine.acquireLastIndexCommit(false); return store.getMetadata(indexCommit.getIndexCommit()); } finally { store.decRef(); @@ -1334,23 +1340,24 @@ public void close(String reason, boolean flushEngine) throws IOException { } } - public IndexShard postRecovery(String reason) - throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { - synchronized (mutex) { - if (state == IndexShardState.CLOSED) { - throw new IndexShardClosedException(shardId); - } - if (state == IndexShardState.STARTED) { - throw new IndexShardStartedException(shardId); - } + public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { + synchronized (postRecoveryMutex) { // we need to refresh again to expose all operations that were index until now. Otherwise // we may not expose operations that were indexed with a refresh listener that was immediately - // responded to in addRefreshListener. + // responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener + // and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard). getEngine().refresh("post_recovery"); - recoveryState.setStage(RecoveryState.Stage.DONE); - changeState(IndexShardState.POST_RECOVERY, reason); + synchronized (mutex) { + if (state == IndexShardState.CLOSED) { + throw new IndexShardClosedException(shardId); + } + if (state == IndexShardState.STARTED) { + throw new IndexShardStartedException(shardId); + } + recoveryState.setStage(RecoveryState.Stage.DONE); + changeState(IndexShardState.POST_RECOVERY, reason); + } } - return this; } /** @@ -1583,6 +1590,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { } private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException { + assert Thread.holdsLock(mutex) == false : "opening engine under mutex"; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1595,16 +1603,24 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty() : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource() + "] but got " + getRetentionLeases(); - synchronized (mutex) { - verifyNotClosed(); - assert currentEngineReference.get() == null : "engine is running"; + synchronized (engineMutex) { // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); - onNewEngine(newEngine); - currentEngineReference.set(newEngine); - // We set active because we are now writing operations to the engine; this way, - // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. - active.set(true); + synchronized (mutex) { + try { + verifyNotClosed(); + assert currentEngineReference.get() == null : "engine is running"; + onNewEngine(newEngine); + currentEngineReference.set(newEngine); + // We set active because we are now writing operations to the engine; this way, + // if we go idle after some time and become inactive, we still give sync'd flush a chance to run. + active.set(true); + } finally { + if (currentEngineReference.get() != newEngine) { + newEngine.close(); + } + } + } } // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. @@ -1627,6 +1643,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException { } private void onNewEngine(Engine newEngine) { + assert Thread.holdsLock(engineMutex); refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); } @@ -2675,7 +2692,13 @@ private DocumentMapperForType docMapper(String type) { } private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { - Sort indexSort = indexSortSupplier.get(); + final Sort indexSort = indexSortSupplier.get(); + final Engine.Warmer warmer = reader -> { + assert Thread.holdsLock(mutex) == false : "warming engine under mutex"; + if (this.warmer != null) { + this.warmer.warm(reader); + } + }; return new EngineConfig(shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService != null ? mapperService.indexAnalyzer() : null, @@ -3237,10 +3260,10 @@ public void addRefreshListener(Translog.Location location, Consumer lis if (isReadAllowed()) { readAllowed = true; } else { - // check again under mutex. this is important to create a happens before relationship + // check again under postRecoveryMutex. this is important to create a happens before relationship // between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond // to a listener before a refresh actually happened that contained that operation. - synchronized (mutex) { + synchronized (postRecoveryMutex) { readAllowed = isReadAllowed(); } } @@ -3305,6 +3328,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) { * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ void resetEngineToGlobalCheckpoint() throws IOException { + assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']'; sync(); // persist the global checkpoint to disk @@ -3316,15 +3340,17 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED SetOnce newEngineReference = new SetOnce<>(); final long globalCheckpoint = getLastKnownGlobalCheckpoint(); assert globalCheckpoint == getLastSyncedGlobalCheckpoint(); - synchronized (mutex) { - verifyNotClosed(); - // we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata, + synchronized (engineMutex) { + // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata, // acquireXXXCommit and close works. final Engine readOnlyEngine = new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) { @Override public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { - synchronized (mutex) { + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); + } // ignore flushFirst since we flushed above and we do not want to interfere with ongoing translog replay return newEngineReference.get().acquireLastIndexCommit(false); } @@ -3332,7 +3358,10 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { @Override public IndexCommitRef acquireSafeIndexCommit() { - synchronized (mutex) { + synchronized (engineMutex) { + if (newEngineReference.get() == null) { + throw new AlreadyClosedException("engine was closed"); + } return newEngineReference.get().acquireSafeIndexCommit(); } } @@ -3349,9 +3378,28 @@ public void close() throws IOException { IOUtils.close(super::close, newEngine); } }; - IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); - newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); - onNewEngine(newEngineReference.get()); + synchronized (mutex) { + try { + verifyNotClosed(); + IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + } finally { + if (currentEngineReference.get() != readOnlyEngine) { + readOnlyEngine.close(); + } + } + } + final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)); + synchronized (mutex) { + try { + verifyNotClosed(); + newEngineReference.set(newReadWriteEngine); + onNewEngine(newReadWriteEngine); + } finally { + if (newEngineReference.get() != newReadWriteEngine) { + newReadWriteEngine.close(); // shard was closed + } + } + } } final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8947f4e9905e4..0d92bc3802f63 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -79,10 +79,12 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine.DeleteResult; +import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; @@ -4131,4 +4133,39 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs)); closeShards(readonlyShard); } + + public void testCloseShardWhileEngineIsWarming() throws Exception { + CountDownLatch warmerStarted = new CountDownLatch(1); + CountDownLatch warmerBlocking = new CountDownLatch(1); + IndexShard shard = newShard(true, Settings.EMPTY, config -> { + Engine.Warmer warmer = reader -> { + try { + warmerStarted.countDown(); + warmerBlocking.await(); + config.getWarmer().warm(reader); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + }; + EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), + config.getIndexSettings(), warmer, config.getStore(), config.getMergePolicy(), config.getAnalyzer(), + config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), + config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + return new InternalEngine(configWithWarmer); + }); + Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); + recoveryThread.start(); + try { + warmerStarted.await(); + shard.close("testing", false); + assertThat(shard.state, equalTo(IndexShardState.CLOSED)); + } finally { + warmerBlocking.countDown(); + } + recoveryThread.join(); + shard.store().close(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 95ff43e8b3c7d..b67108a16c19b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1130,34 +1130,38 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio } public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException { - if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false || engine instanceof InternalEngine == false) { - return; + if (engine instanceof InternalEngine) { + try { + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + assertAtMostOneLuceneDocumentPerSequenceNumber(engine.config().getIndexSettings(), searcher.getDirectoryReader()); + } + } catch (AlreadyClosedException ignored) { + // engine was closed + } } - try { - engine.refresh("test"); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader()); - Set seqNos = new HashSet<>(); - for (LeafReaderContext leaf : reader.leaves()) { - NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - int docId; - while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - assertTrue(seqNoDocValues.advanceExact(docId)); - long seqNo = seqNoDocValues.longValue(); - assertThat(seqNo, greaterThanOrEqualTo(0L)); - if (primaryTermDocValues.advanceExact(docId)) { - if (seqNos.add(seqNo) == false) { - final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); - leaf.reader().document(docId, idFieldVisitor); - throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId()); - } - } + } + + public static void assertAtMostOneLuceneDocumentPerSequenceNumber(IndexSettings indexSettings, + DirectoryReader reader) throws IOException { + Set seqNos = new HashSet<>(); + final DirectoryReader wrappedReader = indexSettings.isSoftDeleteEnabled() ? Lucene.wrapAllDocsLive(reader) : reader; + for (LeafReaderContext leaf : wrappedReader.leaves()) { + NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + int docId; + while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertTrue(seqNoDocValues.advanceExact(docId)); + long seqNo = seqNoDocValues.longValue(); + assertThat(seqNo, greaterThanOrEqualTo(0L)); + if (primaryTermDocValues.advanceExact(docId)) { + if (seqNos.add(seqNo) == false) { + final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); + leaf.reader().document(docId, idFieldVisitor); + throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId()); } } } - } catch (AlreadyClosedException ignored) { - } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 4b621e5fe5153..bc5a368c47daa 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -375,7 +375,7 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe indexSettings.getSettings(), "index"); mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_RECOVERY); SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap()); - final Engine.Warmer warmer = reader -> {}; + final Engine.Warmer warmer = createTestWarmer(indexSettings); ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); CircuitBreakerService breakerService = new HierarchyCircuitBreakerService(nodeSettings, clusterSettings); indexShard = new IndexShard( @@ -860,4 +860,17 @@ public static Translog getTranslog(IndexShard shard) { public static ReplicationTracker getReplicationTracker(IndexShard indexShard) { return indexShard.getReplicationTracker(); } + + public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { + return reader -> { + // This isn't a warmer but sometimes verify the content in the reader + if (randomBoolean()) { + try { + EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(indexSettings, reader); + } catch (IOException e) { + throw new AssertionError(e); + } + } + }; + } }