Skip to content

Commit 124c1f1

Browse files
INGEST: Create Index Before Pipeline Execute (#32786)
* INGEST: Create Index Before Pipeline Execute * Ensures that indices are created before the default pipeline setting is read to correcly handle the case of an index template containing a default pipeline (without the fix the first document does not get the pipeline applied as explained in #32758) * closes #32758
1 parent a8bfa46 commit 124c1f1

File tree

2 files changed

+109
-41
lines changed

2 files changed

+109
-41
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -127,37 +127,6 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe
127127

128128
@Override
129129
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
130-
boolean hasIndexRequestsWithPipelines = false;
131-
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
132-
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
133-
if (actionRequest instanceof IndexRequest) {
134-
IndexRequest indexRequest = (IndexRequest) actionRequest;
135-
String pipeline = indexRequest.getPipeline();
136-
if (pipeline == null) {
137-
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
138-
if (indexMetaData == null) {
139-
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
140-
} else {
141-
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
142-
indexRequest.setPipeline(defaultPipeline);
143-
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
144-
hasIndexRequestsWithPipelines = true;
145-
}
146-
}
147-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
148-
hasIndexRequestsWithPipelines = true;
149-
}
150-
}
151-
}
152-
if (hasIndexRequestsWithPipelines) {
153-
if (clusterService.localNode().isIngestNode()) {
154-
processBulkIndexIngestRequest(task, bulkRequest, listener);
155-
} else {
156-
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
157-
}
158-
return;
159-
}
160-
161130
final long startTime = relativeTime();
162131
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
163132

@@ -191,15 +160,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
191160
}
192161
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
193162
if (autoCreateIndices.isEmpty()) {
194-
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
163+
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
195164
} else {
196165
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
197166
for (String index : autoCreateIndices) {
198167
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
199168
@Override
200169
public void onResponse(CreateIndexResponse result) {
201170
if (counter.decrementAndGet() == 0) {
202-
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
171+
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
203172
}
204173
}
205174

@@ -215,7 +184,7 @@ public void onFailure(Exception e) {
215184
}
216185
}
217186
if (counter.decrementAndGet() == 0) {
218-
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
187+
executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
219188
inner.addSuppressed(e);
220189
listener.onFailure(inner);
221190
}), responses, indicesThatCannotBeCreated);
@@ -225,7 +194,47 @@ public void onFailure(Exception e) {
225194
}
226195
}
227196
} else {
228-
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
197+
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
198+
}
199+
}
200+
201+
private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
202+
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
203+
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
204+
boolean hasIndexRequestsWithPipelines = false;
205+
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
206+
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
207+
if (actionRequest instanceof IndexRequest) {
208+
IndexRequest indexRequest = (IndexRequest) actionRequest;
209+
String pipeline = indexRequest.getPipeline();
210+
if (pipeline == null) {
211+
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
212+
if (indexMetaData == null) {
213+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
214+
} else {
215+
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
216+
indexRequest.setPipeline(defaultPipeline);
217+
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
218+
hasIndexRequestsWithPipelines = true;
219+
}
220+
}
221+
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
222+
hasIndexRequestsWithPipelines = true;
223+
}
224+
}
225+
}
226+
if (hasIndexRequestsWithPipelines) {
227+
try {
228+
if (clusterService.localNode().isIngestNode()) {
229+
processBulkIndexIngestRequest(task, bulkRequest, listener);
230+
} else {
231+
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
232+
}
233+
} catch (Exception e) {
234+
listener.onFailure(e);
235+
}
236+
} else {
237+
executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
229238
}
230239
}
231240

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,25 @@
2222
import org.elasticsearch.Version;
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.action.DocWriteRequest;
25+
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2526
import org.elasticsearch.action.index.IndexAction;
2627
import org.elasticsearch.action.index.IndexRequest;
2728
import org.elasticsearch.action.index.IndexResponse;
2829
import org.elasticsearch.action.support.ActionFilters;
30+
import org.elasticsearch.action.support.AutoCreateIndex;
2931
import org.elasticsearch.cluster.ClusterChangedEvent;
3032
import org.elasticsearch.cluster.ClusterState;
3133
import org.elasticsearch.cluster.ClusterStateApplier;
3234
import org.elasticsearch.cluster.metadata.IndexMetaData;
35+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3336
import org.elasticsearch.cluster.metadata.MetaData;
3437
import org.elasticsearch.cluster.node.DiscoveryNode;
3538
import org.elasticsearch.cluster.node.DiscoveryNodes;
3639
import org.elasticsearch.cluster.service.ClusterService;
3740
import org.elasticsearch.common.collect.ImmutableOpenMap;
41+
import org.elasticsearch.common.settings.ClusterSettings;
3842
import org.elasticsearch.common.settings.Settings;
43+
import org.elasticsearch.common.unit.TimeValue;
3944
import org.elasticsearch.common.util.concurrent.AtomicArray;
4045
import org.elasticsearch.index.IndexNotFoundException;
4146
import org.elasticsearch.index.IndexSettings;
@@ -77,6 +82,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
7782
*/
7883
private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";
7984

85+
private static final Settings SETTINGS =
86+
Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build();
87+
8088
/** Services needed by bulk action */
8189
TransportService transportService;
8290
ClusterService clusterService;
@@ -112,25 +120,42 @@ public class TransportBulkActionIngestTests extends ESTestCase {
112120
/** A subclass of the real bulk action to allow skipping real bulk indexing, and marking when it would have happened. */
113121
class TestTransportBulkAction extends TransportBulkAction {
114122
boolean isExecuted = false; // set when the "real" bulk execution happens
123+
124+
boolean needToCheck; // pluggable return value for `needToCheck`
125+
126+
boolean indexCreated = true; // If set to false, will be set to true by call to createIndex
127+
115128
TestTransportBulkAction() {
116-
super(Settings.EMPTY, null, transportService, clusterService, ingestService,
117-
null, null, new ActionFilters(Collections.emptySet()), null, null);
129+
super(SETTINGS, null, transportService, clusterService, ingestService,
130+
null, null, new ActionFilters(Collections.emptySet()), null,
131+
new AutoCreateIndex(
132+
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
133+
new IndexNameExpressionResolver(SETTINGS)
134+
)
135+
);
118136
}
119137
@Override
120138
protected boolean needToCheck() {
121-
return false;
139+
return needToCheck;
122140
}
123141
@Override
124142
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
125143
final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
144+
assertTrue(indexCreated);
126145
isExecuted = true;
127146
}
147+
148+
@Override
149+
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
150+
indexCreated = true;
151+
listener.onResponse(null);
152+
}
128153
}
129154

130155
class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
131156

132157
TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
133-
super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
158+
super(SETTINGS, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
134159
TransportBulkActionIngestTests.this.clusterService,
135160
null, null, null, new ActionFilters(Collections.emptySet()), null,
136161
IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, bulkAction, null);
@@ -162,15 +187,17 @@ public void setupAction() {
162187
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
163188
ClusterState state = mock(ClusterState.class);
164189
when(state.getNodes()).thenReturn(nodes);
165-
when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
190+
MetaData metaData = MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
166191
.putAll(
167192
Collections.singletonMap(
168193
WITH_DEFAULT_PIPELINE,
169194
IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
170195
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
171196
.build()
172197
).numberOfShards(1).numberOfReplicas(1).build()))
173-
.build()).build());
198+
.build()).build();
199+
when(state.getMetaData()).thenReturn(metaData);
200+
when(state.metaData()).thenReturn(metaData);
174201
when(clusterService.state()).thenReturn(state);
175202
doAnswer(invocation -> {
176203
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
@@ -408,4 +435,36 @@ public void testUseDefaultPipeline() throws Exception {
408435
verifyZeroInteractions(transportService);
409436
}
410437

438+
public void testCreateIndexBeforeRunPipeline() throws Exception {
439+
Exception exception = new Exception("fake exception");
440+
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
441+
indexRequest.setPipeline("testpipeline");
442+
indexRequest.source(Collections.emptyMap());
443+
AtomicBoolean responseCalled = new AtomicBoolean(false);
444+
AtomicBoolean failureCalled = new AtomicBoolean(false);
445+
action.needToCheck = true;
446+
action.indexCreated = false;
447+
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
448+
response -> responseCalled.set(true),
449+
e -> {
450+
assertThat(e, sameInstance(exception));
451+
failureCalled.set(true);
452+
}));
453+
454+
// check failure works, and passes through to the listener
455+
assertFalse(action.isExecuted); // haven't executed yet
456+
assertFalse(responseCalled.get());
457+
assertFalse(failureCalled.get());
458+
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
459+
completionHandler.getValue().accept(exception);
460+
assertTrue(failureCalled.get());
461+
462+
// now check success
463+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
464+
completionHandler.getValue().accept(null);
465+
assertTrue(action.isExecuted);
466+
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
467+
verifyZeroInteractions(transportService);
468+
}
469+
411470
}

0 commit comments

Comments
 (0)