diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 5d6ec6f93b732..496f8efc60ccf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -444,26 +444,22 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); - - // adding empty buckets if needed - if (minDocCount == 0) { - addEmptyBuckets(reducedBuckets, reduceContext); - } - - if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) { - // nothing to do, data are already sorted since shards return - // sorted buckets and the merge-sort performed by reduceBuckets - // maintains order - } else if (InternalOrder.isKeyDesc(order)) { - // we just need to reverse here... - List reverse = new ArrayList<>(reducedBuckets); - Collections.reverse(reverse); - reducedBuckets = reverse; - } else { - // sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort - CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + if (reduceContext.isFinalReduce()) { + if (minDocCount == 0) { + addEmptyBuckets(reducedBuckets, reduceContext); + } + if (InternalOrder.isKeyDesc(order)) { + // we just need to reverse here... + List reverse = new ArrayList<>(reducedBuckets); + Collections.reverse(reverse); + reducedBuckets = reverse; + } else if (InternalOrder.isKeyAsc(order) == false){ + // nothing to do when sorting by key ascending, as data is already sorted since shards return + // sorted buckets and the merge-sort performed by reduceBuckets maintains order. + // otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort + CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + } } - return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo, format, keyed, pipelineAggregators(), getMetaData()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index d26ac47c9ea25..9f93929c0a186 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -421,26 +421,22 @@ private void addEmptyBuckets(List list, ReduceContext reduceContext) { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); - - // adding empty buckets if needed - if (minDocCount == 0) { - addEmptyBuckets(reducedBuckets, reduceContext); - } - - if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) { - // nothing to do, data are already sorted since shards return - // sorted buckets and the merge-sort performed by reduceBuckets - // maintains order - } else if (InternalOrder.isKeyDesc(order)) { - // we just need to reverse here... - List reverse = new ArrayList<>(reducedBuckets); - Collections.reverse(reverse); - reducedBuckets = reverse; - } else { - // sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort - CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + if (reduceContext.isFinalReduce()) { + if (minDocCount == 0) { + addEmptyBuckets(reducedBuckets, reduceContext); + } + if (InternalOrder.isKeyDesc(order)) { + // we just need to reverse here... + List reverse = new ArrayList<>(reducedBuckets); + Collections.reverse(reverse); + reducedBuckets = reverse; + } else if (InternalOrder.isKeyAsc(order) == false){ + // nothing to do when sorting by key ascending, as data is already sorted since shards return + // sorted buckets and the merge-sort performed by reduceBuckets maintains order. + // otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort + CollectionUtil.introSort(reducedBuckets, order.comparator(null)); + } } - return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, pipelineAggregators(), getMetaData()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 6e4dfc8fe254c..59327121c9038 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -119,19 +119,19 @@ import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue; +import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; +import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.ParsedPercentilesBucket; -import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue; import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket; +import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket; -import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative; import java.io.IOException; import java.util.ArrayList; @@ -151,6 +151,7 @@ import static org.elasticsearch.test.XContentTestUtils.insertRandomFields; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public abstract class InternalAggregationTestCase extends AbstractWireSerializingTestCase { public static final int DEFAULT_MAX_BUCKETS = 100000; @@ -267,7 +268,14 @@ public void testReduceRandom() { new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false); @SuppressWarnings("unchecked") T reduced = (T) inputs.get(0).reduce(internalAggregations, context); - assertMultiBucketConsumer(reduced, bucketConsumer); + int initialBucketCount = 0; + for (InternalAggregation internalAggregation : internalAggregations) { + initialBucketCount += countInnerBucket(internalAggregation); + } + int reducedBucketCount = countInnerBucket(reduced); + //check that non final reduction never adds buckets + assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount)); + assertMultiBucketConsumer(reducedBucketCount, bucketConsumer); toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); toReduce.add(reduced); } @@ -332,14 +340,14 @@ protected NamedXContentRegistry xContentRegistry() { public final void testFromXContent() throws IOException { final T aggregation = createTestInstance(); - final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false); - assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation); + final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false); + assertFromXContent(aggregation, parsedAggregation); } public final void testFromXContentWithRandomFields() throws IOException { final T aggregation = createTestInstance(); - final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true); - assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation); + final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true); + assertFromXContent(aggregation, parsedAggregation); } protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException; @@ -423,6 +431,10 @@ protected static DocValueFormat randomNumericDocValueFormat() { } public static void assertMultiBucketConsumer(Aggregation agg, MultiBucketConsumer bucketConsumer) { - assertThat(bucketConsumer.getCount(), equalTo(countInnerBucket(agg))); + assertMultiBucketConsumer(countInnerBucket(agg), bucketConsumer); + } + + private static void assertMultiBucketConsumer(int innerBucketCount, MultiBucketConsumer bucketConsumer) { + assertThat(bucketConsumer.getCount(), equalTo(innerBucketCount)); } }