Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,19 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/*
* Holds all the configuration that is used to create an {@link Engine}.
Expand Down Expand Up @@ -77,6 +81,18 @@ public final class EngineConfig {
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
* soft deleted should be retained.
*
* @return a supplier of outstanding retention leases
*/
public Supplier<Collection<RetentionLease>> retentionLeasesSupplier() {
return retentionLeasesSupplier;
}

private final LongSupplier primaryTermSupplier;
private final TombstoneDocSupplier tombstoneDocSupplier;

Expand Down Expand Up @@ -125,7 +141,9 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) {
Supplier<Collection<RetentionLease>> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -161,6 +179,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
this.indexSort = indexSort;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,11 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
} else {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
}
return new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations());
return new SoftDeletesPolicy(
translog::getLastSyncedGlobalCheckpoint,
lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(),
engineConfig.retentionLeasesSupplier());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
import org.apache.lucene.search.Query;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying history changes purpose.
Expand All @@ -41,11 +45,19 @@ final class SoftDeletesPolicy {
private long retentionOperations;
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
private Collection<RetentionLease> retentionLeases;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;

SoftDeletesPolicy(LongSupplier globalCheckpointSupplier, long minRetainedSeqNo, long retentionOperations) {
SoftDeletesPolicy(
final LongSupplier globalCheckpointSupplier,
final long minRetainedSeqNo,
final long retentionOperations,
final Supplier<Collection<RetentionLease>> retentionLeasesSupplier) {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
this.retentionLockCount = 0;
}
Expand Down Expand Up @@ -97,14 +109,34 @@ private synchronized void releaseRetentionLock() {
synchronized long getMinRetainedSeqNo() {
// Do not advance if the retention lock is held
if (retentionLockCount == 0) {
// This policy retains operations for two purposes: peer-recovery and querying changes history.
// - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
// then sends ops after the local checkpoint of that commit. This requires keeping all ops after localCheckpointOfSafeCommit;
// - Changes APIs are driven the combination of the global checkpoint and retention ops. Here we prefer using the global
// checkpoint instead of max_seqno because only operations up to the global checkpoint are exposed in the the changes APIs.
final long minSeqNoForQueryingChanges = globalCheckpointSupplier.getAsLong() - retentionOperations;
/*
* This policy retains operations for two purposes: peer-recovery and querying changes history.
* - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
* then sends operations after the local checkpoint of that commit. This requires keeping all ops after
* localCheckpointOfSafeCommit.
* - Changes APIs are driven by a combination of the global checkpoint, retention operations, and retention leases. Here we
* prefer using the global checkpoint instead of the maximum sequence number because only operations up to the global
* checkpoint are exposed in the the changes APIs.
*/

// calculate the minimum sequence number to retain based on retention leases
retentionLeases = retentionLeasesSupplier.get();
final long minimumRetainingSequenceNumber = retentionLeases
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);
/*
* The minimum sequence number to retain is the minimum of the minimum based on retention leases, and the number of operations
* below the global checkpoint to retain (index.soft_deletes.retention.operations).
*/
final long minSeqNoForQueryingChanges =
Math.min(globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber);
final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1;
// This can go backward as the retentionOperations value can be changed in settings.

/* We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from
* the addition of leases with a retaining sequence number lower than previous retaining sequence numbers.
*/
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
}
return minRetainedSeqNo;
Expand All @@ -117,4 +149,5 @@ synchronized long getMinRetainedSeqNo() {
Query getRetentionQuery() {
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -146,6 +148,29 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
volatile ReplicationGroup replicationGroup;

private final Map<String, RetentionLease> retentionLeases = new HashMap<>();

/**
* Get all retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
*
* @return the retention leases
*/
public synchronized Collection<RetentionLease> getRetentionLeases() {
return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
}

/**
* Adds a new or updates an existing retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
*/
public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, source));
}

public static class CheckpointState implements Writeable {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

/**
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
* that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
* number, and the source of the retention lease (e.g., "ccr").
*/
public class RetentionLease {

private final String id;

/**
* The identifier for this retention lease. This identifier should be unique per lease and is set during construction by the caller.
*
* @return the identifier
*/
public String id() {
return id;
}

private final long retainingSequenceNumber;

/**
* The retaining sequence number of this retention lease. The retaining sequence number is the minimum sequence number that this
* retention lease wants to retain during merge operations. The retaining sequence number is set during construction by the caller.
*
* @return the retaining sequence number
*/
public long retainingSequenceNumber() {
return retainingSequenceNumber;
}

private final String source;

/**
* The source of this retention lease. The source is set during construction by the caller.
*
* @return the source
*/
public String source() {
return source;
}

/**
* Constructs a new retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
*/
public RetentionLease(final String id, final long retainingSequenceNumber, final String source) {
this.id = id;
this.retainingSequenceNumber = retainingSequenceNumber;
this.source = source;
}

@Override
public String toString() {
return "ShardHistoryRetentionLease{" +
"id='" + id + '\'' +
", retainingSequenceNumber=" + retainingSequenceNumber +
", source='" + source + '\'' +
'}';
}

}
29 changes: 22 additions & 7 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,20 @@ public void addGlobalCheckpointListener(
this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout);
}


/**
* Adds a new or updates an existing retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
*/
void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert assertPrimaryMode();
verifyNotClosed();
replicationTracker.addOrUpdateRetentionLease(id, retainingSequenceNumber, source);
}

/**
* Waits for all operations up to the provided sequence number to complete.
*
Expand Down Expand Up @@ -2310,13 +2324,14 @@ private DocumentMapperForType docMapper(String type) {
private EngineConfig newEngineConfig() {
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
() -> operationPrimaryTerm, tombstoneDocSupplier());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -52,7 +53,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final int extraRetainedOps = between(0, 100);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps);
final SoftDeletesPolicy softDeletesPolicy =
new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList);
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);

Expand Down Expand Up @@ -96,7 +98,8 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
public void testAcquireIndexCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final int extraRetainedOps = between(0, 100);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps);
final SoftDeletesPolicy softDeletesPolicy =
new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList);
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
Expand Down Expand Up @@ -176,7 +179,7 @@ public void testAcquireIndexCommit() throws Exception {

public void testLegacyIndex() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
final UUID translogUUID = UUID.randomUUID();

TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
Expand Down Expand Up @@ -211,7 +214,7 @@ public void testLegacyIndex() throws Exception {

public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);

Expand Down Expand Up @@ -245,7 +248,7 @@ public void testDeleteInvalidCommits() throws Exception {

public void testCheckUnreferencedCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
Expand Down
Loading