Skip to content

Commit 13e19e7

Browse files
authored
Allow _update and upsert to read from the transaction log (#29264)
We historically removed reading from the transaction log to get consistent results from _GET calls. There was also the motivation that the read-modify-update principle we apply should not be hidden from the user. We still agree on the fact that we should not hide these aspects but the impact on updates is quite significant especially if the same documents is updated before it's written to disk and made serachable. This change adds back the ability to read from the transaction log but only for update calls. Calls to the _GET API will always do a refresh if necessary to return consistent results ie. if stored fields or DocValues Fields are requested. Closes #26802
1 parent c3fdf8f commit 13e19e7

File tree

21 files changed

+602
-45
lines changed

21 files changed

+602
-45
lines changed

server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,13 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
112112
if (uidTerm == null) {
113113
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
114114
}
115-
result = context.indexShard().get(new Engine.Get(false, request.type(), request.id(), uidTerm));
115+
result = context.indexShard().get(new Engine.Get(false, false, request.type(), request.id(), uidTerm));
116116
if (!result.exists()) {
117117
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
118118
}
119119
context.parsedQuery(context.getQueryShardContext().toQuery(request.query()));
120120
context.preProcess(true);
121-
int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().context.docBase;
121+
int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().docBase;
122122
Explanation explanation = context.searcher().explain(context.query(), topLevelDocId);
123123
for (RescoreContext ctx : context.rescore()) {
124124
Rescorer rescorer = ctx.rescorer();

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.script.ExecutableScript;
4848
import org.elasticsearch.script.Script;
4949
import org.elasticsearch.script.ScriptService;
50-
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
5150
import org.elasticsearch.search.lookup.SourceLookup;
5251

5352
import java.io.IOException;
@@ -71,9 +70,8 @@ public UpdateHelper(Settings settings, ScriptService scriptService) {
7170
* Prepares an update request by converting it into an index or delete request or an update response (no action).
7271
*/
7372
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
74-
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
75-
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME},
76-
true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE);
73+
final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(),
74+
request.versionType());
7775
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
7876
}
7977

server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
100100
if (versions.advanceExact(docID) == false) {
101101
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
102102
}
103-
return new DocIdAndVersion(docID, versions.longValue(), context);
103+
return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase);
104104
} else {
105105
return null;
106106
}

server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.common.lucene.uid;
2121

2222
import org.apache.lucene.index.IndexReader;
23+
import org.apache.lucene.index.LeafReader;
2324
import org.apache.lucene.index.LeafReaderContext;
2425
import org.apache.lucene.index.NumericDocValues;
2526
import org.apache.lucene.index.Term;
@@ -97,12 +98,14 @@ private VersionsAndSeqNoResolver() {
9798
public static class DocIdAndVersion {
9899
public final int docId;
99100
public final long version;
100-
public final LeafReaderContext context;
101+
public final LeafReader reader;
102+
public final int docBase;
101103

102-
DocIdAndVersion(int docId, long version, LeafReaderContext context) {
104+
public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) {
103105
this.docId = docId;
104106
this.version = version;
105-
this.context = context;
107+
this.reader = reader;
108+
this.docBase = docBase;
106109
}
107110
}
108111

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1232,14 +1232,16 @@ public static class Get {
12321232
private final boolean realtime;
12331233
private final Term uid;
12341234
private final String type, id;
1235+
private final boolean readFromTranslog;
12351236
private long version = Versions.MATCH_ANY;
12361237
private VersionType versionType = VersionType.INTERNAL;
12371238

1238-
public Get(boolean realtime, String type, String id, Term uid) {
1239+
public Get(boolean realtime, boolean readFromTranslog, String type, String id, Term uid) {
12391240
this.realtime = realtime;
12401241
this.type = type;
12411242
this.id = id;
12421243
this.uid = uid;
1244+
this.readFromTranslog = readFromTranslog;
12431245
}
12441246

12451247
public boolean realtime() {
@@ -1275,6 +1277,10 @@ public Get versionType(VersionType versionType) {
12751277
this.versionType = versionType;
12761278
return this;
12771279
}
1280+
1281+
public boolean isReadFromTranslog() {
1282+
return readFromTranslog;
1283+
}
12781284
}
12791285

12801286
public static class GetResult implements Releasable {

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.elasticsearch.threadpool.ThreadPool;
7979

8080
import java.io.IOException;
81+
import java.io.UncheckedIOException;
8182
import java.util.ArrayList;
8283
import java.util.Arrays;
8384
import java.util.Collection;
@@ -145,6 +146,7 @@ public class InternalEngine extends Engine {
145146
* being indexed/deleted.
146147
*/
147148
private final AtomicLong writingBytes = new AtomicLong();
149+
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
148150

149151
@Nullable
150152
private final String historyUUID;
@@ -558,6 +560,27 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
558560
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
559561
get.versionType().explainConflictForReads(versionValue.version, get.version()));
560562
}
563+
if (get.isReadFromTranslog()) {
564+
// this is only used for updates - API _GET calls will always read form a reader for consistency
565+
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
566+
if (versionValue.getLocation() != null) {
567+
try {
568+
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
569+
if (operation != null) {
570+
// in the case of a already pruned translog generation we might get null here - yet very unlikely
571+
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
572+
.getIndexSettings().getIndexVersionCreated());
573+
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)),
574+
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
575+
}
576+
} catch (IOException e) {
577+
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
578+
throw new EngineException(shardId, "failed to read operation from translog", e);
579+
}
580+
} else {
581+
trackTranslogLocation.set(true);
582+
}
583+
}
561584
refresh("realtime_get", SearcherScope.INTERNAL);
562585
}
563586
scope = SearcherScope.INTERNAL;
@@ -790,6 +813,10 @@ public IndexResult index(Index index) throws IOException {
790813
}
791814
indexResult.setTranslogLocation(location);
792815
}
816+
if (plan.indexIntoLucene && indexResult.hasFailure() == false) {
817+
versionMap.maybePutUnderLock(index.uid().bytes(),
818+
getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation()));
819+
}
793820
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
794821
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
795822
}
@@ -916,8 +943,6 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
916943
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
917944
index(index.docs(), indexWriter);
918945
}
919-
versionMap.maybePutUnderLock(index.uid().bytes(),
920-
new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
921946
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
922947
} catch (Exception ex) {
923948
if (indexWriter.getTragicException() == null) {
@@ -941,6 +966,13 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
941966
}
942967
}
943968

969+
private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) {
970+
if (location != null && trackTranslogLocation.get()) {
971+
return new TranslogVersionValue(location, version, seqNo, term);
972+
}
973+
return new VersionValue(version, seqNo, term);
974+
}
975+
944976
/**
945977
* returns true if the indexing operation may have already be processed by this engine.
946978
* Note that it is OK to rarely return true even if this is not the case. However a `false`

0 commit comments

Comments
 (0)