From 4e1b5070ac7d9ea0bd74e8a05928e6ac95a8026d Mon Sep 17 00:00:00 2001 From: PnPie Date: Tue, 28 Nov 2017 08:58:22 +0100 Subject: [PATCH 1/9] support document version set in ingest pipelines Add support for setting document version and version type in set processor of an ingest pipeline. --- .../ingest/SimulatePipelineRequest.java | 11 +++- .../ingest/WriteableIngestDocument.java | 6 +-- .../index/mapper/VersionFieldMapper.java | 4 +- .../elasticsearch/ingest/IngestDocument.java | 49 +++++++++-------- .../ingest/PipelineExecutionService.java | 19 ++++--- .../SimulatePipelineRequestParsingTests.java | 36 ++++++++----- .../ingest/WriteableIngestDocumentTests.java | 6 +-- .../ingest/PipelineExecutionServiceTests.java | 52 +++++++++++-------- .../ingest/common/AppendProcessorTests.java | 38 ++++++++++---- .../ingest/common/SetProcessorTests.java | 22 +++++++- 10 files changed, 160 insertions(+), 83 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 39d7f96e8e6f6..4ac82f63e1b6c 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; @@ -195,8 +196,16 @@ private static List parseDocs(Map config) { dataMap, MetaData.ROUTING.getFieldName()); String parent = ConfigurationUtils.readOptionalStringOrIntProperty(null, null, dataMap, MetaData.PARENT.getFieldName()); + Long version = null; + if (dataMap.containsKey(MetaData.VERSION.getFieldName())) { + version = (Long) ConfigurationUtils.readObject(null, null, dataMap, MetaData.VERSION.getFieldName()); + } + VersionType versionType = null; + if (dataMap.containsKey(MetaData.VERSION_TYPE.getFieldName())) { + versionType = (VersionType) ConfigurationUtils.readObject(null, null, dataMap, MetaData.VERSION_TYPE.getFieldName()); + } IngestDocument ingestDocument = - new IngestDocument(index, type, id, routing, parent, document); + new IngestDocument(index, type, id, routing, parent, version, versionType, document); ingestDocumentList.add(ingestDocument); } return ingestDocumentList; diff --git a/core/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java b/core/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java index 64365407f2379..87168cb7a9bba 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java @@ -68,10 +68,10 @@ IngestDocument getIngestDocument() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("doc"); - Map metadataMap = ingestDocument.extractMetadata(); - for (Map.Entry metadata : metadataMap.entrySet()) { + Map metadataMap = ingestDocument.extractMetadata(); + for (Map.Entry metadata : metadataMap.entrySet()) { if (metadata.getValue() != null) { - builder.field(metadata.getKey().getFieldName(), metadata.getValue()); + builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString()); } } builder.field("_source", ingestDocument.getSourceAndMetadata()); diff --git a/core/src/main/java/org/elasticsearch/index/mapper/VersionFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/VersionFieldMapper.java index c5ead1327cc9b..6a7969b31ac36 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/VersionFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/VersionFieldMapper.java @@ -69,7 +69,9 @@ public MetadataFieldMapper getDefault(MappedFieldType fieldType, ParserContext c } } - static final class VersionFieldType extends MappedFieldType { + public static final class VersionFieldType extends MappedFieldType { + + public static final String NAME = "_version_type"; VersionFieldType() { } diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 7edbccef5aad7..c310241cba89a 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -20,27 +20,13 @@ package org.elasticsearch.ingest; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.IndexFieldMapper; -import org.elasticsearch.index.mapper.ParentFieldMapper; -import org.elasticsearch.index.mapper.RoutingFieldMapper; -import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.TypeFieldMapper; -import org.elasticsearch.script.ScriptService; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.*; import org.elasticsearch.script.TemplateScript; import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Base64; -import java.util.Date; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; /** * Represents a single document being captured before indexing and holds the source and metadata (like id, type and index). @@ -73,6 +59,16 @@ public IngestDocument(String index, String type, String id, String routing, Stri this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC)); } + public IngestDocument(String index, String type, String id, String routing, String parent, Long version, VersionType versionType, Map source) { + this(index, type, id, routing, parent, source); + if (version != null) { + sourceAndMetadata.put(MetaData.VERSION.getFieldName(), version); + } + if (versionType != null) { + sourceAndMetadata.put(MetaData.VERSION_TYPE.getFieldName(), versionType); + } + } + /** * Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided as argument */ @@ -457,6 +453,9 @@ private void setFieldValue(String path, Object value, boolean append) { } String leafKey = fieldPath.pathElements[fieldPath.pathElements.length - 1]; + if (leafKey == MetaData.VERSION_TYPE.getFieldName()) { + value = VersionType.fromString(value.toString()); + } if (context == null) { throw new IllegalArgumentException("cannot set [" + leafKey + "] with null parent as part of path [" + path + "]"); } @@ -559,10 +558,16 @@ private Map createTemplateModel() { * one time operation that extracts the metadata fields from the ingest document and returns them. * Metadata fields that used to be accessible as ordinary top level fields will be removed as part of this call. */ - public Map extractMetadata() { - Map metadataMap = new EnumMap<>(MetaData.class); + public Map extractMetadata() { + Map metadataMap = new EnumMap<>(MetaData.class); for (MetaData metaData : MetaData.values()) { - metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), String.class)); + if (metaData == MetaData.VERSION) { + metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), Long.class)); + } else if (metaData == MetaData.VERSION_TYPE) { + metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), VersionType.class)); + } else { + metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), String.class)); + } } return metadataMap; } @@ -651,7 +656,9 @@ public enum MetaData { TYPE(TypeFieldMapper.NAME), ID(IdFieldMapper.NAME), ROUTING(RoutingFieldMapper.NAME), - PARENT(ParentFieldMapper.NAME); + PARENT(ParentFieldMapper.NAME), + VERSION(VersionFieldMapper.NAME), + VERSION_TYPE(VersionFieldMapper.VersionFieldType.NAME); private final String fieldName; diff --git a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index cec622f4a2587..3d4e108a275b1 100644 --- a/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.index.VersionType; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; @@ -164,18 +165,22 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws E String id = indexRequest.id(); String routing = indexRequest.routing(); String parent = indexRequest.parent(); + Long version = indexRequest.version(); + VersionType versionType = indexRequest.versionType(); Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, sourceAsMap); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, version, versionType, sourceAsMap); pipeline.execute(ingestDocument); - Map metadataMap = ingestDocument.extractMetadata(); + Map metadataMap = ingestDocument.extractMetadata(); //it's fine to set all metadata fields all the time, as ingest document holds their starting values //before ingestion, which might also get modified during ingestion. - indexRequest.index(metadataMap.get(IngestDocument.MetaData.INDEX)); - indexRequest.type(metadataMap.get(IngestDocument.MetaData.TYPE)); - indexRequest.id(metadataMap.get(IngestDocument.MetaData.ID)); - indexRequest.routing(metadataMap.get(IngestDocument.MetaData.ROUTING)); - indexRequest.parent(metadataMap.get(IngestDocument.MetaData.PARENT)); + indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); + indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); + indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); + indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); + indexRequest.parent((String) metadataMap.get(IngestDocument.MetaData.PARENT)); + indexRequest.version((Long) metadataMap.get(IngestDocument.MetaData.VERSION)); + indexRequest.versionType((VersionType) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)); indexRequest.source(ingestDocument.getSourceAndMetadata()); } catch (Exception e) { totalStats.ingestFailed(); diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index 8b4ce79f3ecc4..337b1d126bf15 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; +import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; @@ -39,11 +40,7 @@ import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields; import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID; -import static org.elasticsearch.ingest.IngestDocument.MetaData.ID; -import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX; -import static org.elasticsearch.ingest.IngestDocument.MetaData.PARENT; -import static org.elasticsearch.ingest.IngestDocument.MetaData.ROUTING; -import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE; +import static org.elasticsearch.ingest.IngestDocument.MetaData.*; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; @@ -98,7 +95,7 @@ public void testParseUsingPipelineStore() throws Exception { Iterator> expectedDocsIterator = expectedDocs.iterator(); for (IngestDocument ingestDocument : actualRequest.getDocuments()) { Map expectedDocument = expectedDocsIterator.next(); - Map metadataMap = ingestDocument.extractMetadata(); + Map metadataMap = ingestDocument.extractMetadata(); assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName()))); assertThat(metadataMap.get(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName()))); assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName()))); @@ -120,17 +117,26 @@ public void testParseWithProvidedPipeline() throws Exception { for (int i = 0; i < numDocs; i++) { Map doc = new HashMap<>(); Map expectedDoc = new HashMap<>(); - List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, PARENT); + List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, PARENT, VERSION, VERSION_TYPE); for(IngestDocument.MetaData field : fields) { - if(randomBoolean()) { - String value = randomAlphaOfLengthBetween(1, 10); + if (field == IngestDocument.MetaData.VERSION) { + Long value = randomLong(); doc.put(field.getFieldName(), value); expectedDoc.put(field.getFieldName(), value); - } - else { - Integer value = randomIntBetween(1, 1000000); + } else if (field == IngestDocument.MetaData.VERSION_TYPE) { + VersionType value = randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE); doc.put(field.getFieldName(), value); - expectedDoc.put(field.getFieldName(), String.valueOf(value)); + expectedDoc.put(field.getFieldName(), value); + } else { + if (randomBoolean()) { + String value = randomAlphaOfLengthBetween(1, 10); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), value); + } else { + Integer value = randomIntBetween(1, 1000000); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), String.valueOf(value)); + } } } String fieldName = randomAlphaOfLengthBetween(1, 10); @@ -175,12 +181,14 @@ public void testParseWithProvidedPipeline() throws Exception { Iterator> expectedDocsIterator = expectedDocs.iterator(); for (IngestDocument ingestDocument : actualRequest.getDocuments()) { Map expectedDocument = expectedDocsIterator.next(); - Map metadataMap = ingestDocument.extractMetadata(); + Map metadataMap = ingestDocument.extractMetadata(); assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName()))); assertThat(metadataMap.get(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName()))); assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName()))); assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName()))); assertThat(metadataMap.get(PARENT), equalTo(expectedDocument.get(PARENT.getFieldName()))); + assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName()))); + assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName()))); assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE))); } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java b/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java index c5f67e235061e..b04c7dfcd84f8 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java @@ -133,13 +133,13 @@ public void testToXContent() throws IOException { Map toXContentSource = (Map) toXContentDoc.get("_source"); Map toXContentIngestMetadata = (Map) toXContentDoc.get("_ingest"); - Map metadataMap = ingestDocument.extractMetadata(); - for (Map.Entry metadata : metadataMap.entrySet()) { + Map metadataMap = ingestDocument.extractMetadata(); + for (Map.Entry metadata : metadataMap.entrySet()) { String fieldName = metadata.getKey().getFieldName(); if (metadata.getValue() == null) { assertThat(toXContentDoc.containsKey(fieldName), is(false)); } else { - assertThat(toXContentDoc.get(fieldName), equalTo(metadata.getValue())); + assertThat(toXContentDoc.get(fieldName), equalTo(metadata.getValue().toString())); } } diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 44c8e78bef703..ec2d1d7e4780e 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.CustomTypeSafeMatcher; @@ -44,21 +45,13 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class PipelineExecutionServiceTests extends ESTestCase { @@ -157,10 +150,18 @@ public void testExecuteEmptyPipeline() throws Exception { public void testExecutePropagateAllMetaDataUpdates() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); + long newVersion = randomLong(); + String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); doAnswer((InvocationOnMock invocationOnMock) -> { IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { - ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); + if (metaData == IngestDocument.MetaData.VERSION) { + ingestDocument.setFieldValue(metaData.getFieldName(), newVersion); + } else if (metaData == IngestDocument.MetaData.VERSION_TYPE) { + ingestDocument.setFieldValue(metaData.getFieldName(), versionType); + } else { + ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); + } } return null; }).when(processor).execute(any()); @@ -175,12 +176,13 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { verify(processor).execute(any()); verify(failureHandler, never()).accept(any()); verify(completionHandler, times(1)).accept(true); - assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.type(), equalTo("update_type")); assertThat(indexRequest.id(), equalTo("update_id")); assertThat(indexRequest.routing(), equalTo("update_routing")); assertThat(indexRequest.parent(), equalTo("update_parent")); + assertThat(indexRequest.version(), equalTo(newVersion)); + assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType))); } public void testExecuteFailure() throws Exception { @@ -188,13 +190,13 @@ public void testExecuteFailure() throws Exception { when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -225,14 +227,14 @@ public void testExecuteFailureWithOnFailure() throws Exception { Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -246,15 +248,15 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { Collections.singletonList(onFailureOnFailureProcessor)))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -380,6 +382,10 @@ private IngestDocument eqID(String index, String type, String id, Map source) { + return argThat(new IngestDocumentMatcher(index, type, id, version, versionType, source)); + } + private class IngestDocumentMatcher extends ArgumentMatcher { private final IngestDocument ingestDocument; @@ -388,6 +394,10 @@ private class IngestDocumentMatcher extends ArgumentMatcher { this.ingestDocument = new IngestDocument(index, type, id, null, null, source); } + IngestDocumentMatcher(String index, String type, String id, Long version, VersionType versionType, Map source) { + this.ingestDocument = new IngestDocument(index, type, id, null, null, version, versionType, source); + } + @Override public boolean matches(Object o) { if (o.getClass() == IngestDocument.class) { diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java index cda2b36a3ecff..50e20dc566376 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; @@ -123,21 +124,38 @@ public void testConvertScalarToList() throws Exception { } public void testAppendMetadata() throws Exception { - //here any metadata field value becomes a list, which won't make sense in most of the cases, + // here any metadata field value becomes a list, which won't make sense in most of the cases, // but support for append is streamlined like for set so we test it IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values()); - List values = new ArrayList<>(); + List values = new ArrayList<>(); Processor appendProcessor; - if (randomBoolean()) { - String value = randomAlphaOfLengthBetween(1, 10); - values.add(value); - appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), value); + int valuesSize = randomIntBetween(0, 10); + if (randomMetaData == IngestDocument.MetaData.VERSION) { + if (randomBoolean()) { + Long version = randomLong(); + values.add(version); + appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), version); + } else { + for (int i = 0; i < valuesSize; i++) { + values.add(randomLong()); + } + appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), values); + } + } else if (randomMetaData == IngestDocument.MetaData.VERSION_TYPE) { + String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); + values.add(VersionType.fromString(versionType)); + appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), versionType); } else { - int valuesSize = randomIntBetween(0, 10); - for (int i = 0; i < valuesSize; i++) { - values.add(randomAlphaOfLengthBetween(1, 10)); + if (randomBoolean()) { + String value = randomAlphaOfLengthBetween(1, 10); + values.add(value); + appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), value); + } else { + for (int i = 0; i < valuesSize; i++) { + values.add(randomAlphaOfLengthBetween(1, 10)); + } + appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), values); } - appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), values); } IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java index 5f0759b5b2157..d92b25eb023fe 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; @@ -101,10 +102,27 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception { public void testSetMetadata() throws Exception { IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values()); - Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value", true); + Processor processor; + long version = 0L; + String versionType = new String(); + if (randomMetaData == IngestDocument.MetaData.VERSION) { + version = randomLong(); + processor = createSetProcessor(randomMetaData.getFieldName(), version, true); + } else if (randomMetaData == IngestDocument.MetaData.VERSION_TYPE) { + versionType = randomFrom("internal", "external", "external_gt", "external_gte"); + processor = createSetProcessor(randomMetaData.getFieldName(), versionType, true); + } else { + processor = createSetProcessor(randomMetaData.getFieldName(), "_value", true); + } IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); - assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value")); + if (randomMetaData == IngestDocument.MetaData.VERSION) { + assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), Long.class), Matchers.equalTo(version)); + } else if (randomMetaData == IngestDocument.MetaData.VERSION_TYPE) { + assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), VersionType.class), Matchers.equalTo(VersionType.fromString(versionType))); + } else { + assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value")); + } } private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled) { From 180d8c1a558e924313e707c0a341ebbc42d3f6b5 Mon Sep 17 00:00:00 2001 From: PnPie Date: Tue, 28 Nov 2017 21:04:56 +0100 Subject: [PATCH 2/9] remove wildcard imports --- .../elasticsearch/ingest/IngestDocument.java | 18 ++++++++++++++++-- .../ingest/PipelineExecutionServiceTests.java | 12 ++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java index c310241cba89a..198154309c8b8 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -21,12 +21,26 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.mapper.*; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.IndexFieldMapper; +import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.TypeFieldMapper; +import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.script.TemplateScript; import java.time.ZoneOffset; import java.time.ZonedDateTime; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Date; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; /** * Represents a single document being captured before indexing and holds the source and metadata (like id, type and index). diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index ec2d1d7e4780e..6522544c5fa57 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -45,13 +45,21 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class PipelineExecutionServiceTests extends ESTestCase { From 590bd7283b6077f5f4d4790a5c27330e717d31eb Mon Sep 17 00:00:00 2001 From: PnPie Date: Wed, 29 Nov 2017 00:08:28 +0100 Subject: [PATCH 3/9] change line width --- .../main/java/org/elasticsearch/ingest/IngestDocument.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 198154309c8b8..e0eb172600bba 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -73,7 +73,8 @@ public IngestDocument(String index, String type, String id, String routing, Stri this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC)); } - public IngestDocument(String index, String type, String id, String routing, String parent, Long version, VersionType versionType, Map source) { + public IngestDocument(String index, String type, String id, String routing,String parent, + Long version, VersionType versionType,Map source) { this(index, type, id, routing, parent, source); if (version != null) { sourceAndMetadata.put(MetaData.VERSION.getFieldName(), version); @@ -578,7 +579,8 @@ public Map extractMetadata() { if (metaData == MetaData.VERSION) { metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), Long.class)); } else if (metaData == MetaData.VERSION_TYPE) { - metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), VersionType.class)); + metadataMap.put(metaData, + cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), VersionType.class)); } else { metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), String.class)); } From f84d796291e2c14d0b051990decaa4a2f2b16ba8 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Fri, 22 Dec 2017 11:14:05 -0800 Subject: [PATCH 4/9] fix precommit --- .../SimulatePipelineRequestParsingTests.java | 12 ++++++-- .../ingest/PipelineExecutionServiceTests.java | 30 ++++++++++++------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index 337b1d126bf15..e42f4350b32e9 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -40,7 +40,13 @@ import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields; import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID; -import static org.elasticsearch.ingest.IngestDocument.MetaData.*; +import static org.elasticsearch.ingest.IngestDocument.MetaData.ID; +import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX; +import static org.elasticsearch.ingest.IngestDocument.MetaData.PARENT; +import static org.elasticsearch.ingest.IngestDocument.MetaData.ROUTING; +import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE; +import static org.elasticsearch.ingest.IngestDocument.MetaData.VERSION; +import static org.elasticsearch.ingest.IngestDocument.MetaData.VERSION_TYPE; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; @@ -119,11 +125,11 @@ public void testParseWithProvidedPipeline() throws Exception { Map expectedDoc = new HashMap<>(); List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, PARENT, VERSION, VERSION_TYPE); for(IngestDocument.MetaData field : fields) { - if (field == IngestDocument.MetaData.VERSION) { + if (field == VERSION) { Long value = randomLong(); doc.put(field.getFieldName(), value); expectedDoc.put(field.getFieldName(), value); - } else if (field == IngestDocument.MetaData.VERSION_TYPE) { + } else if (field == VERSION_TYPE) { VersionType value = randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE); doc.put(field.getFieldName(), value); expectedDoc.put(field.getFieldName(), value); diff --git a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 6522544c5fa57..ab9e5b33141cc 100644 --- a/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -198,13 +198,15 @@ public void testExecuteFailure() throws Exception { when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -217,7 +219,8 @@ public void testExecuteSuccessWithOnFailure() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id") + .source(Collections.emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @@ -235,14 +238,17 @@ public void testExecuteFailureWithOnFailure() throws Exception { Collections.singletonList(new CompoundProcessor(onFailureProcessor))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), + indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } @@ -256,15 +262,19 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { Collections.singletonList(onFailureOnFailureProcessor)))); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - verify(processor).execute(eqID("_index", "_type", "_id", indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); + verify(processor).execute(eqID("_index", "_type", "_id", + indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); verify(failureHandler, times(1)).accept(any(RuntimeException.class)); verify(completionHandler, never()).accept(anyBoolean()); } From e42a8deb004eefccb78000720b9bd9c60faa7a1b Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Fri, 22 Dec 2017 13:09:33 -0800 Subject: [PATCH 5/9] fix more precommit --- .../org/elasticsearch/ingest/common/SetProcessorTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java index d92b25eb023fe..e7f7f6a9e05c9 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java @@ -119,7 +119,8 @@ public void testSetMetadata() throws Exception { if (randomMetaData == IngestDocument.MetaData.VERSION) { assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), Long.class), Matchers.equalTo(version)); } else if (randomMetaData == IngestDocument.MetaData.VERSION_TYPE) { - assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), VersionType.class), Matchers.equalTo(VersionType.fromString(versionType))); + assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), VersionType.class), + Matchers.equalTo(VersionType.fromString(versionType))); } else { assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value")); } From ee9a6dfae28b26b0bce9b7e9fbf09d1140a3ccb1 Mon Sep 17 00:00:00 2001 From: PnPie Date: Sat, 10 Feb 2018 22:57:26 +0100 Subject: [PATCH 6/9] update --- .../ingest/common/AppendProcessorTests.java | 61 ++++++++---- .../common/DateIndexNameProcessorTests.java | 10 +- .../ingest/common/ForEachProcessorTests.java | 16 ++-- .../ingest/common/SetProcessorTests.java | 25 +++-- .../rest-api-spec/test/ingest/170_version.yml | 94 +++++++++++++++++++ .../ingest/IngestDocumentMustacheIT.java | 8 +- .../ingest/ValueSourceMustacheIT.java | 2 +- .../elasticsearch/ingest/IngestDocument.java | 16 ++-- .../elasticsearch/ingest/IngestClientIT.java | 2 +- .../ingest/IngestDocumentTests.java | 2 +- .../ingest/PipelineExecutionServiceTests.java | 2 +- .../ingest/RandomDocumentPicks.java | 11 ++- 12 files changed, 188 insertions(+), 61 deletions(-) create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java index 50e20dc566376..9a10c3d830dad 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestDocument.MetaData; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestTemplateService; @@ -123,39 +124,61 @@ public void testConvertScalarToList() throws Exception { } } - public void testAppendMetadata() throws Exception { + public void testAppendMetadataExceptVersion() throws Exception { // here any metadata field value becomes a list, which won't make sense in most of the cases, // but support for append is streamlined like for set so we test it - IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values()); + MetaData randomMetaData = randomFrom(MetaData.INDEX, MetaData.TYPE, MetaData.ID, MetaData.ROUTING, MetaData.PARENT); + List values = new ArrayList<>(); + Processor appendProcessor; + if (randomBoolean()) { + String value = randomAlphaOfLengthBetween(1, 10); + values.add(value); + appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), value); + } else { + int valuesSize = randomIntBetween(0, 10); + for (int i = 0; i < valuesSize; i++) { + values.add(randomAlphaOfLengthBetween(1, 10)); + } + appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), values); + } + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + Object initialValue = ingestDocument.getSourceAndMetadata().get(randomMetaData.getFieldName()); + appendProcessor.execute(ingestDocument); + List list = ingestDocument.getFieldValue(randomMetaData.getFieldName(), List.class); + if (initialValue == null) { + assertThat(list, equalTo(values)); + } else { + assertThat(list.size(), equalTo(values.size() + 1)); + assertThat(list.get(0), equalTo(initialValue)); + for (int i = 1; i < list.size(); i++) { + assertThat(list.get(i), equalTo(values.get(i - 1))); + } + } + } + + public void testAppendMetadataVersion() throws Exception { + // append version or version type may not make sense in most of the cases, + // but support for append is streamlined like for set so we test it + MetaData randomMetaData = randomFrom(MetaData.VERSION, MetaData.VERSION_TYPE); List values = new ArrayList<>(); Processor appendProcessor; - int valuesSize = randomIntBetween(0, 10); - if (randomMetaData == IngestDocument.MetaData.VERSION) { + if (randomMetaData == MetaData.VERSION) { + int valuesSize = randomIntBetween(0, 10); if (randomBoolean()) { - Long version = randomLong(); + Long version = randomNonNegativeLong(); values.add(version); appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), version); } else { for (int i = 0; i < valuesSize; i++) { - values.add(randomLong()); + values.add(randomNonNegativeLong()); } appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), values); } - } else if (randomMetaData == IngestDocument.MetaData.VERSION_TYPE) { - String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); + } else { + String versionType = randomFrom("internal", "external", "external_gte"); values.add(VersionType.fromString(versionType)); appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), versionType); - } else { - if (randomBoolean()) { - String value = randomAlphaOfLengthBetween(1, 10); - values.add(value); - appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), value); - } else { - for (int i = 0; i < valuesSize; i++) { - values.add(randomAlphaOfLengthBetween(1, 10)); - } - appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), values); - } } IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java index 6736594613954..d052ce0cd44c3 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java @@ -38,7 +38,7 @@ public void testJodaPattern() throws Exception { "events-", "y", "yyyyMMdd" ); - IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, + IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", "2016-04-25T12:24:20.101Z")); processor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); @@ -48,7 +48,7 @@ public void testTAI64N()throws Exception { Function function = DateFormat.Tai64n.getFunction(null, DateTimeZone.UTC, null); DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function), DateTimeZone.UTC, "events-", "m", "yyyyMMdd"); - IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, + IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024")); dateProcessor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); @@ -58,12 +58,12 @@ public void testUnixMs()throws Exception { Function function = DateFormat.UnixMs.getFunction(null, DateTimeZone.UTC, null); DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function), DateTimeZone.UTC, "events-", "m", "yyyyMMdd"); - IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, + IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", "1000500")); dateProcessor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); - document = new IngestDocument("_index", "_type", "_id", null, null, + document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", 1000500L)); dateProcessor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); @@ -73,7 +73,7 @@ public void testUnix()throws Exception { Function function = DateFormat.Unix.getFunction(null, DateTimeZone.UTC, null); DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function), DateTimeZone.UTC, "events-", "m", "yyyyMMdd"); - IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, + IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null, Collections.singletonMap("_field", "1000.5")); dateProcessor.execute(document); assertThat(document.getSourceAndMetadata().get("_index"), equalTo("")); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index c043102ef5d3a..95c25bedb6280 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -45,7 +45,7 @@ public void testExecute() throws Exception { values.add("bar"); values.add("baz"); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); ForEachProcessor processor = new ForEachProcessor( @@ -61,7 +61,7 @@ public void testExecute() throws Exception { public void testExecuteWithFailure() throws Exception { IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c")) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c")) ); TestProcessor testProcessor = new TestProcessor(id -> { @@ -101,7 +101,7 @@ public void testMetaDataAvailable() throws Exception { values.add(new HashMap<>()); values.add(new HashMap<>()); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); TestProcessor innerProcessor = new TestProcessor(id -> { @@ -132,7 +132,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { document.put("values", values); document.put("flat_values", new ArrayList<>()); document.put("other", "value"); - IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, document); ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", @@ -171,7 +171,7 @@ public String getTag() { values.add(""); } IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor); @@ -190,7 +190,7 @@ public void testModifyFieldsOutsideArray() throws Exception { values.add(1); values.add(null); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values) ); TemplateScript.Factory template = new TestTemplateService.MockTemplateScript.Factory("errors"); @@ -220,7 +220,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws source.put("_value", "new_value"); source.put("values", values); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, source + "_index", "_type", "_id", null, null, null, null, source ); TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", @@ -251,7 +251,7 @@ public void testNestedForEach() throws Exception { values.add(value); IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, Collections.singletonMap("values1", values) + "_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values1", values) ); TestProcessor testProcessor = new TestProcessor( diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java index e7f7f6a9e05c9..42488ee1a2d4d 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestDocument.MetaData; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestTemplateService; @@ -100,29 +101,33 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception { assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue)); } - public void testSetMetadata() throws Exception { - IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values()); + public void testSetMetadataExceptVersion() throws Exception { + MetaData randomMetaData = randomFrom(MetaData.INDEX, MetaData.TYPE, MetaData.ID, MetaData.ROUTING, MetaData.PARENT); + Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value", true); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value")); + } + + public void testSetMetadataVersion() throws Exception { + MetaData randomMetaData = randomFrom(MetaData.VERSION, MetaData.VERSION_TYPE); Processor processor; long version = 0L; String versionType = new String(); if (randomMetaData == IngestDocument.MetaData.VERSION) { - version = randomLong(); + version = randomNonNegativeLong(); processor = createSetProcessor(randomMetaData.getFieldName(), version, true); - } else if (randomMetaData == IngestDocument.MetaData.VERSION_TYPE) { - versionType = randomFrom("internal", "external", "external_gt", "external_gte"); - processor = createSetProcessor(randomMetaData.getFieldName(), versionType, true); } else { - processor = createSetProcessor(randomMetaData.getFieldName(), "_value", true); + versionType = randomFrom("internal", "external", "external_gte"); + processor = createSetProcessor(randomMetaData.getFieldName(), versionType, true); } IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); if (randomMetaData == IngestDocument.MetaData.VERSION) { assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), Long.class), Matchers.equalTo(version)); - } else if (randomMetaData == IngestDocument.MetaData.VERSION_TYPE) { + } else { assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), VersionType.class), Matchers.equalTo(VersionType.fromString(versionType))); - } else { - assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value")); } } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml new file mode 100644 index 0000000000000..c0795bf14d7fa --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml @@ -0,0 +1,94 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test set document version & version type": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "_version", + "value": 3 + } + }, + { + "set" : { + "field" : "_version_type", + "value": "external_gte" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {} + + - do: + get: + index: test + type: test + id: 1 + - match: { _version: 3 } + +--- +"Test append document version & version type": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "append" : { + "field" : "_version", + "value": [3, 6, 9] + } + }, + { + "append" : { + "field" : "_version_type", + "value":["external", "external_gte"] + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: {} + + - do: + get: + index: test + type: test + id: 1 + - match: { _version: [1, 3, 6, 9] } diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java index cbe798666129a..16dbbc6f8cbab 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java @@ -33,7 +33,7 @@ public class IngestDocumentMustacheIT extends AbstractScriptTestCase { public void testAccessMetaDataViaTemplate() { Map document = new HashMap<>(); document.put("foo", "bar"); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{foo}}", scriptService)); assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 bar")); @@ -48,7 +48,7 @@ public void testAccessMapMetaDataViaTemplate() { innerObject.put("baz", "hello baz"); innerObject.put("qux", Collections.singletonMap("fubar", "hello qux and fubar")); document.put("foo", innerObject); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{foo.bar}} {{foo.baz}} {{foo.qux.fubar}}", scriptService)); assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 hello bar hello baz hello qux and fubar")); @@ -67,7 +67,7 @@ public void testAccessListMetaDataViaTemplate() { list.add(value); list.add(null); document.put("list2", list); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{list1.0}} {{list2.0}}", scriptService)); assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 foo {field=value}")); } @@ -77,7 +77,7 @@ public void testAccessIngestMetadataViaTemplate() { Map ingestMap = new HashMap<>(); ingestMap.put("timestamp", "bogus_timestamp"); document.put("_ingest", ingestMap); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); ingestDocument.setFieldValue(compile("ingest_timestamp"), ValueSource.wrap("{{_ingest.timestamp}} and {{_source._ingest.timestamp}}", scriptService)); assertThat(ingestDocument.getFieldValue("ingest_timestamp", String.class), diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java index a8c7afcec6fee..a80b693851fc1 100644 --- a/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java +++ b/qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java @@ -64,7 +64,7 @@ public void testValueSourceWithTemplates() { } public void testAccessSourceViaTemplate() { - IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, new HashMap<>()); + IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, null, null, new HashMap<>()); assertThat(ingestDocument.hasField("marvel"), is(false)); ingestDocument.setFieldValue(compile("{{_index}}"), ValueSource.wrap("{{_index}}", scriptService)); assertThat(ingestDocument.getFieldValue("marvel", String.class), equalTo("marvel")); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 2b2497277d764..2f56645878ef1 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -56,7 +56,8 @@ public final class IngestDocument { private final Map sourceAndMetadata; private final Map ingestMetadata; - public IngestDocument(String index, String type, String id, String routing, String parent, Map source) { + public IngestDocument(String index, String type, String id, String routing, String parent, + Long version, VersionType versionType, Map source) { this.sourceAndMetadata = new HashMap<>(); this.sourceAndMetadata.putAll(source); this.sourceAndMetadata.put(MetaData.INDEX.getFieldName(), index); @@ -68,20 +69,15 @@ public IngestDocument(String index, String type, String id, String routing, Stri if (parent != null) { this.sourceAndMetadata.put(MetaData.PARENT.getFieldName(), parent); } - - this.ingestMetadata = new HashMap<>(); - this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC)); - } - - public IngestDocument(String index, String type, String id, String routing,String parent, - Long version, VersionType versionType,Map source) { - this(index, type, id, routing, parent, source); if (version != null) { sourceAndMetadata.put(MetaData.VERSION.getFieldName(), version); } if (versionType != null) { sourceAndMetadata.put(MetaData.VERSION_TYPE.getFieldName(), versionType); } + + this.ingestMetadata = new HashMap<>(); + this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC)); } /** @@ -631,7 +627,7 @@ private static Object deepCopy(Object value) { } else if (value == null || value instanceof String || value instanceof Integer || value instanceof Long || value instanceof Float || value instanceof Double || value instanceof Boolean || - value instanceof ZonedDateTime) { + value instanceof ZonedDateTime || value instanceof VersionType) { return value; } else if (value instanceof Date) { return ((Date) value).clone(); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 654927b19f2fb..dbbc8e443c076 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -127,7 +127,7 @@ public void testSimulate() throws Exception { source.put("foo", "bar"); source.put("fail", false); source.put("processed", true); - IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, source); + IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source); assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata())); assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java index 9df2a38c6f14b..04285b3432e12 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java @@ -76,7 +76,7 @@ public void setTestIngestDocument() { list.add(null); document.put("list", list); - ingestDocument = new IngestDocument("index", "type", "id", null, null, document); + ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document); } public void testSimpleGetFieldValue() { diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index ab9e5b33141cc..3247761a548f0 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -409,7 +409,7 @@ private class IngestDocumentMatcher extends ArgumentMatcher { private final IngestDocument ingestDocument; IngestDocumentMatcher(String index, String type, String id, Map source) { - this.ingestDocument = new IngestDocument(index, type, id, null, null, source); + this.ingestDocument = new IngestDocument(index, type, id, null, null, null, null, source); } IngestDocumentMatcher(String index, String type, String id, Long version, VersionType versionType, Map source) { diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/RandomDocumentPicks.java b/test/framework/src/main/java/org/elasticsearch/ingest/RandomDocumentPicks.java index 0def04a79eaf1..cad7b388430bb 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/RandomDocumentPicks.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/RandomDocumentPicks.java @@ -22,6 +22,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; +import org.elasticsearch.index.VersionType; import java.util.ArrayList; import java.util.HashMap; @@ -138,6 +139,9 @@ public static IngestDocument randomIngestDocument(Random random, Map randomSource(Random random) { @@ -219,6 +223,11 @@ public static String randomString(Random random) { return RandomStrings.randomUnicodeOfCodepointLengthBetween(random, 1, 10); } + private static Long randomNonNegtiveLong(Random random) { + long randomLong = random.nextLong(); + return randomLong == Long.MIN_VALUE ? 0 : Math.abs(randomLong); + } + private static void addRandomFields(Random random, Map parentNode, int currentDepth) { if (currentDepth > 5) { return; From 4b1ad020a18cc9e492c6d110138f8b8efe6e74cc Mon Sep 17 00:00:00 2001 From: PnPie Date: Tue, 13 Feb 2018 21:03:49 +0100 Subject: [PATCH 7/9] Update remove version/version type tests for append processor complete yml tests --- .../ingest/common/AppendProcessorTests.java | 39 ---------------- .../rest-api-spec/test/ingest/170_version.yml | 46 ++++++------------- 2 files changed, 15 insertions(+), 70 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java index 9a10c3d830dad..66cddd43e6583 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java @@ -157,45 +157,6 @@ public void testAppendMetadataExceptVersion() throws Exception { } } - public void testAppendMetadataVersion() throws Exception { - // append version or version type may not make sense in most of the cases, - // but support for append is streamlined like for set so we test it - MetaData randomMetaData = randomFrom(MetaData.VERSION, MetaData.VERSION_TYPE); - List values = new ArrayList<>(); - Processor appendProcessor; - if (randomMetaData == MetaData.VERSION) { - int valuesSize = randomIntBetween(0, 10); - if (randomBoolean()) { - Long version = randomNonNegativeLong(); - values.add(version); - appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), version); - } else { - for (int i = 0; i < valuesSize; i++) { - values.add(randomNonNegativeLong()); - } - appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), values); - } - } else { - String versionType = randomFrom("internal", "external", "external_gte"); - values.add(VersionType.fromString(versionType)); - appendProcessor = createAppendProcessor(randomMetaData.getFieldName(), versionType); - } - - IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); - Object initialValue = ingestDocument.getSourceAndMetadata().get(randomMetaData.getFieldName()); - appendProcessor.execute(ingestDocument); - List list = ingestDocument.getFieldValue(randomMetaData.getFieldName(), List.class); - if (initialValue == null) { - assertThat(list, equalTo(values)); - } else { - assertThat(list.size(), equalTo(values.size() + 1)); - assertThat(list.get(0), equalTo(initialValue)); - for (int i = 1; i < list.size(); i++) { - assertThat(list.get(i), equalTo(values.get(i - 1))); - } - } - } - private static Processor createAppendProcessor(String fieldName, Object fieldValue) { return new AppendProcessor(randomAlphaOfLength(10), new TestTemplateService.MockTemplateScript.Factory(fieldName), diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml index c0795bf14d7fa..7abf07baa9b3a 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml @@ -27,7 +27,7 @@ teardown: { "set" : { "field" : "_version_type", - "value": "external_gte" + "value": "external" } } ] @@ -41,54 +41,38 @@ teardown: id: 1 pipeline: "my_pipeline" body: {} + - match: { _version: 3 } - do: get: index: test type: test id: 1 - - match: { _version: 3 } - ---- -"Test append document version & version type": - - do: - cluster.health: - wait_for_status: green + version: 3 + - match: { _id: "1" } - do: - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "description": "_description", - "processors": [ - { - "append" : { - "field" : "_version", - "value": [3, 6, 9] - } - }, - { - "append" : { - "field" : "_version_type", - "value":["external", "external_gte"] - } - } - ] - } - - match: { acknowledged: true } + catch: conflict + index: + index: test + type: test + id: 1 + version: 3 + body: {} - do: index: index: test type: test id: 1 - pipeline: "my_pipeline" + version: 5 body: {} + - match: { _version: 5 } - do: get: index: test type: test id: 1 - - match: { _version: [1, 3, 6, 9] } + version: 5 + - match: { _id: "1" } From 10e41bd4b88994fffa8cc9f1113c3aae58804f3b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 16 Feb 2018 16:52:25 +0100 Subject: [PATCH 8/9] Threat version_type as a string, so that we can (de)serialize it without hassle. When applying the version to the IndexRequest just cast to Number and fetch its long value. Simplify yaml versions to just tests to see that both version and version_type can be used from an ingest pipeline. --- .../ingest/common/SetProcessorTests.java | 28 ++++------ .../rest-api-spec/test/ingest/170_version.yml | 56 +++++++++---------- .../ingest/SimulatePipelineRequest.java | 3 +- .../elasticsearch/ingest/IngestDocument.java | 14 +---- .../ingest/PipelineExecutionService.java | 6 +- .../SimulatePipelineRequestParsingTests.java | 4 +- 6 files changed, 49 insertions(+), 62 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java index 42488ee1a2d4d..6fec977e6c268 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java @@ -110,25 +110,19 @@ public void testSetMetadataExceptVersion() throws Exception { } public void testSetMetadataVersion() throws Exception { - MetaData randomMetaData = randomFrom(MetaData.VERSION, MetaData.VERSION_TYPE); - Processor processor; - long version = 0L; - String versionType = new String(); - if (randomMetaData == IngestDocument.MetaData.VERSION) { - version = randomNonNegativeLong(); - processor = createSetProcessor(randomMetaData.getFieldName(), version, true); - } else { - versionType = randomFrom("internal", "external", "external_gte"); - processor = createSetProcessor(randomMetaData.getFieldName(), versionType, true); - } + long version = randomNonNegativeLong(); + Processor processor = createSetProcessor(MetaData.VERSION.getFieldName(), version, true); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); - if (randomMetaData == IngestDocument.MetaData.VERSION) { - assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), Long.class), Matchers.equalTo(version)); - } else { - assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), VersionType.class), - Matchers.equalTo(VersionType.fromString(versionType))); - } + assertThat(ingestDocument.getFieldValue(MetaData.VERSION.getFieldName(), Long.class), Matchers.equalTo(version)); + } + + public void testSetMetadataVersionType() throws Exception { + String versionType = randomFrom("internal", "external", "external_gte"); + Processor processor = createSetProcessor(MetaData.VERSION_TYPE.getFieldName(), versionType, true); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(MetaData.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType)); } private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled) { diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml index 7abf07baa9b3a..10c80c8e30525 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/170_version.yml @@ -13,7 +13,7 @@ teardown: - do: ingest.put_pipeline: - id: "my_pipeline" + id: "my_pipeline1" body: > { "description": "_description", @@ -21,13 +21,13 @@ teardown: { "set" : { "field" : "_version", - "value": 3 + "value": 1 } }, { "set" : { "field" : "_version_type", - "value": "external" + "value": "internal" } } ] @@ -35,21 +35,27 @@ teardown: - match: { acknowledged: true } - do: - index: - index: test - type: test - id: 1 - pipeline: "my_pipeline" - body: {} - - match: { _version: 3 } - - - do: - get: - index: test - type: test - id: 1 - version: 3 - - match: { _id: "1" } + ingest.put_pipeline: + id: "my_pipeline2" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "_version", + "value": 1 + } + }, + { + "set" : { + "field" : "_version_type", + "value": "external" + } + } + ] + } + - match: { acknowledged: true } - do: catch: conflict @@ -57,7 +63,7 @@ teardown: index: test type: test id: 1 - version: 3 + pipeline: "my_pipeline1" body: {} - do: @@ -65,14 +71,6 @@ teardown: index: test type: test id: 1 - version: 5 + pipeline: "my_pipeline2" body: {} - - match: { _version: 5 } - - - do: - get: - index: test - type: test - id: 1 - version: 5 - - match: { _id: "1" } + - match: { _version: 1 } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 4ac82f63e1b6c..170f0bc8518cf 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -202,7 +202,8 @@ private static List parseDocs(Map config) { } VersionType versionType = null; if (dataMap.containsKey(MetaData.VERSION_TYPE.getFieldName())) { - versionType = (VersionType) ConfigurationUtils.readObject(null, null, dataMap, MetaData.VERSION_TYPE.getFieldName()); + versionType = VersionType.fromString(ConfigurationUtils.readStringProperty(null, null, dataMap, + MetaData.VERSION_TYPE.getFieldName())); } IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, version, versionType, document); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 2f56645878ef1..45d0e8930d750 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -73,7 +73,7 @@ public IngestDocument(String index, String type, String id, String routing, Stri sourceAndMetadata.put(MetaData.VERSION.getFieldName(), version); } if (versionType != null) { - sourceAndMetadata.put(MetaData.VERSION_TYPE.getFieldName(), versionType); + sourceAndMetadata.put(MetaData.VERSION_TYPE.getFieldName(), VersionType.toString(versionType)); } this.ingestMetadata = new HashMap<>(); @@ -464,9 +464,6 @@ private void setFieldValue(String path, Object value, boolean append) { } String leafKey = fieldPath.pathElements[fieldPath.pathElements.length - 1]; - if (leafKey == MetaData.VERSION_TYPE.getFieldName()) { - value = VersionType.fromString(value.toString()); - } if (context == null) { throw new IllegalArgumentException("cannot set [" + leafKey + "] with null parent as part of path [" + path + "]"); } @@ -572,14 +569,7 @@ private Map createTemplateModel() { public Map extractMetadata() { Map metadataMap = new EnumMap<>(MetaData.class); for (MetaData metaData : MetaData.values()) { - if (metaData == MetaData.VERSION) { - metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), Long.class)); - } else if (metaData == MetaData.VERSION_TYPE) { - metadataMap.put(metaData, - cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), VersionType.class)); - } else { - metadataMap.put(metaData, cast(metaData.getFieldName(), sourceAndMetadata.remove(metaData.getFieldName()), String.class)); - } + metadataMap.put(metaData, sourceAndMetadata.remove(metaData.getFieldName())); } return metadataMap; } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index 3d4e108a275b1..31bedd4ee1777 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -179,8 +179,10 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws E indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); indexRequest.parent((String) metadataMap.get(IngestDocument.MetaData.PARENT)); - indexRequest.version((Long) metadataMap.get(IngestDocument.MetaData.VERSION)); - indexRequest.versionType((VersionType) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)); + indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); + if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { + indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); + } indexRequest.source(ingestDocument.getSourceAndMetadata()); } catch (Exception e) { totalStats.ingestFailed(); diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index e42f4350b32e9..00815807eee8a 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -130,7 +130,9 @@ public void testParseWithProvidedPipeline() throws Exception { doc.put(field.getFieldName(), value); expectedDoc.put(field.getFieldName(), value); } else if (field == VERSION_TYPE) { - VersionType value = randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE); + String value = VersionType.toString( + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE) + ); doc.put(field.getFieldName(), value); expectedDoc.put(field.getFieldName(), value); } else { From 8799107d9e38a54dd5a7c96d3643fddfe1950025 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 20 Feb 2018 10:16:34 +0100 Subject: [PATCH 9/9] removed unneeded changes --- .../org/elasticsearch/index/mapper/VersionFieldMapper.java | 4 +--- .../main/java/org/elasticsearch/ingest/IngestDocument.java | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/VersionFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/VersionFieldMapper.java index 5f30d8dc7dee8..bedb98e2126ac 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/VersionFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/VersionFieldMapper.java @@ -69,9 +69,7 @@ public MetadataFieldMapper getDefault(MappedFieldType fieldType, ParserContext c } } - public static final class VersionFieldType extends MappedFieldType { - - public static final String NAME = "_version_type"; + static final class VersionFieldType extends MappedFieldType { VersionFieldType() { } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 45d0e8930d750..89e945780c8f5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -617,7 +617,7 @@ private static Object deepCopy(Object value) { } else if (value == null || value instanceof String || value instanceof Integer || value instanceof Long || value instanceof Float || value instanceof Double || value instanceof Boolean || - value instanceof ZonedDateTime || value instanceof VersionType) { + value instanceof ZonedDateTime) { return value; } else if (value instanceof Date) { return ((Date) value).clone(); @@ -658,7 +658,7 @@ public enum MetaData { ROUTING(RoutingFieldMapper.NAME), PARENT(ParentFieldMapper.NAME), VERSION(VersionFieldMapper.NAME), - VERSION_TYPE(VersionFieldMapper.VersionFieldType.NAME); + VERSION_TYPE("_version_type"); private final String fieldName;