Skip to content

Commit c0f8c89

Browse files
authored
Introduce shard history retention leases (#37167)
This commit is the first in a series which will culminate with fully-functional shard history retention leases. Shard history retention leases are aimed at preventing shard history consumers from having to fallback to expensive file copy operations if shard history is not available from a certain point. These consumers include following indices in cross-cluster replication, and local shard recoveries. A future consumer will be the changes API. Further, index lifecycle management requires coordinating with some of these consumers otherwise it could remove the source before all consumers have finished reading all operations. The notion of shard history retention leases that we are introducing here will also be used to address this problem. Shard history retention leases are a property of the replication group managed under the authority of the primary. A shard history retention lease is a combination of an identifier, a retaining sequence number, a timestamp indicating when the lease was acquired or renewed, and a string indicating the source of the lease. Being leases they have a limited lifespan that will expire if not renewed. The idea of these leases is that all operations above the minimum of all retaining sequence numbers will be retained during merges (which would otherwise clear away operations that are soft deleted). These leases will be periodically persisted to Lucene and restored during recovery, and broadcast to replicas under certain circumstances. This commit is merely putting the basics in place. This first commit only introduces the concept and integrates their use with the soft delete retention policy. We add some tests to demonstrate the basic management is correct, and that the soft delete policy is correctly influenced by the existence of any retention leases. We make no effort in this commit to implement any of the following: - timestamps - expiration - persistence to and recovery from Lucene - handoff during primary relocation - sharing retention leases with replicas - exposing leases in shard-level statistics - integration with cross-cluster replication These will occur individually in follow-up commits.
1 parent a7c3d58 commit c0f8c89

File tree

16 files changed

+520
-74
lines changed

16 files changed

+520
-74
lines changed

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,19 @@
3535
import org.elasticsearch.index.IndexSettings;
3636
import org.elasticsearch.index.codec.CodecService;
3737
import org.elasticsearch.index.mapper.ParsedDocument;
38+
import org.elasticsearch.index.seqno.RetentionLease;
3839
import org.elasticsearch.index.shard.ShardId;
3940
import org.elasticsearch.index.store.Store;
4041
import org.elasticsearch.index.translog.TranslogConfig;
4142
import org.elasticsearch.indices.IndexingMemoryController;
4243
import org.elasticsearch.indices.breaker.CircuitBreakerService;
4344
import org.elasticsearch.threadpool.ThreadPool;
4445

46+
import java.util.Collection;
4547
import java.util.List;
48+
import java.util.Objects;
4649
import java.util.function.LongSupplier;
50+
import java.util.function.Supplier;
4751

4852
/*
4953
* Holds all the configuration that is used to create an {@link Engine}.
@@ -77,6 +81,18 @@ public final class EngineConfig {
7781
@Nullable
7882
private final CircuitBreakerService circuitBreakerService;
7983
private final LongSupplier globalCheckpointSupplier;
84+
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
85+
86+
/**
87+
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
88+
* soft deleted should be retained.
89+
*
90+
* @return a supplier of outstanding retention leases
91+
*/
92+
public Supplier<Collection<RetentionLease>> retentionLeasesSupplier() {
93+
return retentionLeasesSupplier;
94+
}
95+
8096
private final LongSupplier primaryTermSupplier;
8197
private final TombstoneDocSupplier tombstoneDocSupplier;
8298

@@ -125,7 +141,9 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
125141
List<ReferenceManager.RefreshListener> externalRefreshListener,
126142
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
127143
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
128-
LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) {
144+
Supplier<Collection<RetentionLease>> retentionLeasesSupplier,
145+
LongSupplier primaryTermSupplier,
146+
TombstoneDocSupplier tombstoneDocSupplier) {
129147
this.shardId = shardId;
130148
this.allocationId = allocationId;
131149
this.indexSettings = indexSettings;
@@ -161,6 +179,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
161179
this.indexSort = indexSort;
162180
this.circuitBreakerService = circuitBreakerService;
163181
this.globalCheckpointSupplier = globalCheckpointSupplier;
182+
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
164183
this.primaryTermSupplier = primaryTermSupplier;
165184
this.tombstoneDocSupplier = tombstoneDocSupplier;
166185
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,11 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
276276
} else {
277277
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
278278
}
279-
return new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo,
280-
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations());
279+
return new SoftDeletesPolicy(
280+
translog::getLastSyncedGlobalCheckpoint,
281+
lastMinRetainedSeqNo,
282+
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(),
283+
engineConfig.retentionLeasesSupplier());
281284
}
282285

283286
/**

server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@
2323
import org.apache.lucene.search.Query;
2424
import org.elasticsearch.common.lease.Releasable;
2525
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
26+
import org.elasticsearch.index.seqno.RetentionLease;
2627
import org.elasticsearch.index.seqno.SequenceNumbers;
2728
import org.elasticsearch.index.translog.Translog;
2829

30+
import java.util.Collection;
31+
import java.util.Objects;
2932
import java.util.concurrent.atomic.AtomicBoolean;
3033
import java.util.function.LongSupplier;
34+
import java.util.function.Supplier;
3135

3236
/**
3337
* A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying history changes purpose.
@@ -41,11 +45,18 @@ final class SoftDeletesPolicy {
4145
private long retentionOperations;
4246
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
4347
private long minRetainedSeqNo;
48+
// provides the retention leases used to calculate the minimum sequence number to retain
49+
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
4450

45-
SoftDeletesPolicy(LongSupplier globalCheckpointSupplier, long minRetainedSeqNo, long retentionOperations) {
51+
SoftDeletesPolicy(
52+
final LongSupplier globalCheckpointSupplier,
53+
final long minRetainedSeqNo,
54+
final long retentionOperations,
55+
final Supplier<Collection<RetentionLease>> retentionLeasesSupplier) {
4656
this.globalCheckpointSupplier = globalCheckpointSupplier;
4757
this.retentionOperations = retentionOperations;
4858
this.minRetainedSeqNo = minRetainedSeqNo;
59+
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
4960
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
5061
this.retentionLockCount = 0;
5162
}
@@ -97,14 +108,35 @@ private synchronized void releaseRetentionLock() {
97108
synchronized long getMinRetainedSeqNo() {
98109
// Do not advance if the retention lock is held
99110
if (retentionLockCount == 0) {
100-
// This policy retains operations for two purposes: peer-recovery and querying changes history.
101-
// - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
102-
// then sends ops after the local checkpoint of that commit. This requires keeping all ops after localCheckpointOfSafeCommit;
103-
// - Changes APIs are driven the combination of the global checkpoint and retention ops. Here we prefer using the global
104-
// checkpoint instead of max_seqno because only operations up to the global checkpoint are exposed in the the changes APIs.
105-
final long minSeqNoForQueryingChanges = globalCheckpointSupplier.getAsLong() - retentionOperations;
111+
/*
112+
* This policy retains operations for two purposes: peer-recovery and querying changes history.
113+
* - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
114+
* then sends operations after the local checkpoint of that commit. This requires keeping all ops after
115+
* localCheckpointOfSafeCommit.
116+
* - Changes APIs are driven by a combination of the global checkpoint, retention operations, and retention leases. Here we
117+
* prefer using the global checkpoint instead of the maximum sequence number because only operations up to the global
118+
* checkpoint are exposed in the the changes APIs.
119+
*/
120+
121+
// calculate the minimum sequence number to retain based on retention leases
122+
final long minimumRetainingSequenceNumber = retentionLeasesSupplier
123+
.get()
124+
.stream()
125+
.mapToLong(RetentionLease::retainingSequenceNumber)
126+
.min()
127+
.orElse(Long.MAX_VALUE);
128+
/*
129+
* The minimum sequence number to retain is the minimum of the minimum based on retention leases, and the number of operations
130+
* below the global checkpoint to retain (index.soft_deletes.retention.operations).
131+
*/
132+
final long minSeqNoForQueryingChanges =
133+
Math.min(globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber);
106134
final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1;
107-
// This can go backward as the retentionOperations value can be changed in settings.
135+
136+
/*
137+
* We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from
138+
* the addition of leases with a retaining sequence number lower than previous retaining sequence numbers.
139+
*/
108140
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
109141
}
110142
return minRetainedSeqNo;
@@ -117,4 +149,5 @@ synchronized long getMinRetainedSeqNo() {
117149
Query getRetentionQuery() {
118150
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE);
119151
}
152+
120153
}

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import org.elasticsearch.index.shard.ShardId;
3636

3737
import java.io.IOException;
38+
import java.util.ArrayList;
3839
import java.util.Collection;
40+
import java.util.Collections;
3941
import java.util.HashMap;
4042
import java.util.HashSet;
4143
import java.util.Map;
@@ -146,6 +148,29 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
146148
*/
147149
volatile ReplicationGroup replicationGroup;
148150

151+
private final Map<String, RetentionLease> retentionLeases = new HashMap<>();
152+
153+
/**
154+
* Get all retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
155+
*
156+
* @return the retention leases
157+
*/
158+
public synchronized Collection<RetentionLease> getRetentionLeases() {
159+
return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
160+
}
161+
162+
/**
163+
* Adds a new or updates an existing retention lease.
164+
*
165+
* @param id the identifier of the retention lease
166+
* @param retainingSequenceNumber the retaining sequence number
167+
* @param source the source of the retention lease
168+
*/
169+
public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
170+
assert primaryMode;
171+
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, source));
172+
}
173+
149174
public static class CheckpointState implements Writeable {
150175

151176
/**
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.seqno;
21+
22+
/**
23+
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
24+
* that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
25+
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
26+
* number, and the source of the retention lease (e.g., "ccr").
27+
*/
28+
public class RetentionLease {
29+
30+
private final String id;
31+
32+
/**
33+
* The identifier for this retention lease. This identifier should be unique per lease and is set during construction by the caller.
34+
*
35+
* @return the identifier
36+
*/
37+
public String id() {
38+
return id;
39+
}
40+
41+
private final long retainingSequenceNumber;
42+
43+
/**
44+
* The retaining sequence number of this retention lease. The retaining sequence number is the minimum sequence number that this
45+
* retention lease wants to retain during merge operations. The retaining sequence number is set during construction by the caller.
46+
*
47+
* @return the retaining sequence number
48+
*/
49+
public long retainingSequenceNumber() {
50+
return retainingSequenceNumber;
51+
}
52+
53+
private final String source;
54+
55+
/**
56+
* The source of this retention lease. The source is set during construction by the caller.
57+
*
58+
* @return the source
59+
*/
60+
public String source() {
61+
return source;
62+
}
63+
64+
/**
65+
* Constructs a new retention lease.
66+
*
67+
* @param id the identifier of the retention lease
68+
* @param retainingSequenceNumber the retaining sequence number
69+
* @param source the source of the retention lease
70+
*/
71+
public RetentionLease(final String id, final long retainingSequenceNumber, final String source) {
72+
this.id = id;
73+
this.retainingSequenceNumber = retainingSequenceNumber;
74+
this.source = source;
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return "ShardHistoryRetentionLease{" +
80+
"id='" + id + '\'' +
81+
", retainingSequenceNumber=" + retainingSequenceNumber +
82+
", source='" + source + '\'' +
83+
'}';
84+
}
85+
86+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,20 @@ public void addGlobalCheckpointListener(
18641864
this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout);
18651865
}
18661866

1867+
1868+
/**
1869+
* Adds a new or updates an existing retention lease.
1870+
*
1871+
* @param id the identifier of the retention lease
1872+
* @param retainingSequenceNumber the retaining sequence number
1873+
* @param source the source of the retention lease
1874+
*/
1875+
void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
1876+
assert assertPrimaryMode();
1877+
verifyNotClosed();
1878+
replicationTracker.addOrUpdateRetentionLease(id, retainingSequenceNumber, source);
1879+
}
1880+
18671881
/**
18681882
* Waits for all operations up to the provided sequence number to complete.
18691883
*
@@ -2310,13 +2324,14 @@ private DocumentMapperForType docMapper(String type) {
23102324
private EngineConfig newEngineConfig() {
23112325
Sort indexSort = indexSortSupplier.get();
23122326
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
2313-
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
2314-
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
2315-
indexCache.query(), cachingPolicy, translogConfig,
2316-
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
2317-
Collections.singletonList(refreshListeners),
2318-
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
2319-
indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
2327+
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
2328+
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
2329+
indexCache.query(), cachingPolicy, translogConfig,
2330+
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
2331+
Collections.singletonList(refreshListeners),
2332+
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
2333+
indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
2334+
() -> operationPrimaryTerm, tombstoneDocSupplier());
23202335
}
23212336

23222337
/**

server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.io.IOException;
3131
import java.util.ArrayList;
3232
import java.util.Arrays;
33+
import java.util.Collections;
3334
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
@@ -52,7 +53,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
5253
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
5354
final AtomicLong globalCheckpoint = new AtomicLong();
5455
final int extraRetainedOps = between(0, 100);
55-
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps);
56+
final SoftDeletesPolicy softDeletesPolicy =
57+
new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList);
5658
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
5759
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
5860

@@ -96,7 +98,8 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
9698
public void testAcquireIndexCommit() throws Exception {
9799
final AtomicLong globalCheckpoint = new AtomicLong();
98100
final int extraRetainedOps = between(0, 100);
99-
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps);
101+
final SoftDeletesPolicy softDeletesPolicy =
102+
new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList);
100103
final UUID translogUUID = UUID.randomUUID();
101104
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
102105
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
@@ -176,7 +179,7 @@ public void testAcquireIndexCommit() throws Exception {
176179

177180
public void testLegacyIndex() throws Exception {
178181
final AtomicLong globalCheckpoint = new AtomicLong();
179-
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
182+
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
180183
final UUID translogUUID = UUID.randomUUID();
181184

182185
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
@@ -211,7 +214,7 @@ public void testLegacyIndex() throws Exception {
211214

212215
public void testDeleteInvalidCommits() throws Exception {
213216
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
214-
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
217+
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
215218
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
216219
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
217220

@@ -245,7 +248,7 @@ public void testDeleteInvalidCommits() throws Exception {
245248

246249
public void testCheckUnreferencedCommits() throws Exception {
247250
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
248-
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0);
251+
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
249252
final UUID translogUUID = UUID.randomUUID();
250253
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
251254
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);

0 commit comments

Comments
 (0)