Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/reference/aggregations/bucket/composite-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ POST /sales/docs/_bulk?refresh
{"product": "mad max", "price": "27", "timestamp": "2017-05-10T07:07"}
{"index":{"_id":4}}
{"product": "apocalypse now", "price": "10", "timestamp": "2017-05-11T08:35"}
{"index":{"_id":5}}
{"product": "apocalypse now", "price": "10", "timestamp": "2017-05-11T08:35"}
-------------------------------------------------
// NOTCONSOLE
// TESTSETUP
Expand Down Expand Up @@ -348,6 +350,34 @@ GET /_search
\... will sort the composite bucket in descending order when comparing values from the `date_histogram` source
and in ascending order when comparing values from the `terms` source.

====== Missing bucket

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


By default documents without a value for a given source are ignored.
It is possible to include them in the response by setting `ignore_missing` to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option is missing_bucket below, is this a typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is a typo, thanks

`true` (defaults to `false`):

[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "product_name": { "terms" : { "field": "product", "missing_bucket": true } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE

In the example above the source `product_name` will emit an explicit `null` value
for documents without a value for the field `product`.
The `order` specified in the source dictates whether the `null` values should rank
first (ascending order, `asc`) or last (descending order, `desc`).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think somewhere in the docs we need to say that the missing option is deprecated and will be removed in favour of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I plan to add the deprecation in the docs during the backport to 6x since the deprecation is not for master. After the backport to 6x I'll remove the missing option and add a note in the breaking change.

==== Size

The `size` parameter can be set to define how many composite buckets should be returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,32 @@ setup:
- length: { aggregations.test.buckets: 2 }
- length: { aggregations.test.after_key: 1 }
- match: { aggregations.test.after_key.keyword: "foo" }

---
"Composite aggregation and array size":
- skip:
version: " - 6.99.99"
reason: starting in 7.0 the composite sources do not allocate arrays eagerly.

- do:
search:
index: test
body:
aggregations:
test:
composite:
size: 1000000000
sources: [
{
"keyword": {
"terms": {
"field": "keyword",
}
}
}
]

- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- length: { aggregations.test.after_key: 1 }
- match: { aggregations.test.after_key.keyword: "foo" }
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,91 @@
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.StringFieldType;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.LeafBucketCollector;

import java.io.IOException;
import java.util.function.LongConsumer;

/**
* A {@link SingleDimensionValuesSource} for binary source ({@link BytesRef}).
*/
class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
private final CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc;
private final BytesRef[] values;
private ObjectArray<BytesRef> values;
private ObjectArray<BytesRefBuilder> valueBuilders;
private BytesRef currentValue;

BinaryValuesSource(MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
DocValueFormat format, Object missing, int size, int reverseMul) {
super(format, fieldType, missing, size, reverseMul);
BinaryValuesSource(BigArrays bigArrays, LongConsumer breakerConsumer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like breakerConsumer isn't used here, but is used in DoubleValuesSource. Was that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact this is the only place where the breaker consumer is needed (I removed it from the other values source). It is used to take the BytesRef in the ObjectArray into account in the circuit breaker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) {
super(bigArrays, breakerConsumer, format, fieldType, missingBucket, missing, size, reverseMul);
this.docValuesFunc = docValuesFunc;
this.values = new BytesRef[size];
this.values = bigArrays.newObjectArray(Math.min(size, 100));
this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100));
}

@Override
public void copyCurrent(int slot) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do copyCurrent() and compare() need to be public? The other value sources seem to have them as package-private

values[slot] = BytesRef.deepCopyOf(currentValue);
values = bigArrays.grow(values, slot+1);
valueBuilders = bigArrays.grow(valueBuilders, slot+1);
BytesRefBuilder builder = valueBuilders.get(slot);
int byteSize = builder == null ? 0 : builder.bytes().length;
if (builder == null) {
builder = new BytesRefBuilder();
valueBuilders.set(slot, builder);
}
if (missingBucket && currentValue == null) {
values.set(slot, null);
} else {
assert currentValue != null;
builder.copyBytes(currentValue);
breakerConsumer.accept(builder.bytes().length - byteSize);
values.set(slot, builder.get());
}
}

@Override
public int compare(int from, int to) {
return compareValues(values[from], values[to]);
if (missingBucket) {
if (values.get(from) == null) {
return values.get(to) == null ? 0 : -1 * reverseMul;
} else if (values.get(to) == null) {
return reverseMul;
}
}
return compareValues(values.get(from), values.get(to));
}

@Override
int compareCurrent(int slot) {
return compareValues(currentValue, values[slot]);
if (missingBucket) {
if (currentValue == null) {
return values.get(slot) == null ? 0 : -1 * reverseMul;
} else if (values.get(slot) == null) {
return reverseMul;
}
}
return compareValues(currentValue, values.get(slot));
}

@Override
int compareCurrentWithAfter() {
if (missingBucket) {
if (currentValue == null) {
return afterValue == null ? 0 : -1 * reverseMul;
} else if (afterValue == null) {
return reverseMul;
}
}
return compareValues(currentValue, afterValue);
}

Expand All @@ -76,7 +118,9 @@ int compareValues(BytesRef v1, BytesRef v2) {

@Override
void setAfter(Comparable<?> value) {
if (value.getClass() == String.class) {
if (missingBucket && value == null) {
afterValue = null;
} else if (value.getClass() == String.class) {
afterValue = format.parseBytesRef(value.toString());
} else {
throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
Expand All @@ -85,7 +129,7 @@ void setAfter(Comparable<?> value) {

@Override
BytesRef toComparable(int slot) {
return values[slot];
return values.get(slot);
}

@Override
Expand All @@ -100,6 +144,9 @@ public void collect(int doc, long bucket) throws IOException {
currentValue = dvs.nextValue();
next.collect(doc, bucket);
}
} else if (missingBucket) {
currentValue = null;
next.collect(doc, bucket);
}
}
};
Expand Down Expand Up @@ -130,5 +177,7 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer
}

@Override
public void close() {}
public void close() {
Releasables.close(values, valueBuilders);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;

/**
* A bit array that is implemented using a growing {@link LongArray}
* created from {@link BigArrays}.
* The underlying long array grows lazily based on the biggest index
* that needs to be set.
*/
final class BitArray implements Releasable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if RoaringDocIdSet could be reused here instead of a custom bit array class? I'm thinking it'd provide better compression in the case when missing keys are sparse, and similar compression when missing keys are dense?

Although it seems to require that IDs are added in monotonically increasing order, and I'm not sure if composite follows that pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used to mark slots with missing values and not doc ids so the size remains small (capped by the requested size of the composite agg) and values are mutable (we reuse slots if a new competitive composite bucket is found and the queue is full) so we need a fixed bit set.
It also uses BigArrays to create the underlying LongArray so the memory it uses is accounted in the circuit breaker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

private final BigArrays bigArrays;
private LongArray bits;

BitArray(BigArrays bigArrays, int initialSize) {
this.bigArrays = bigArrays;
this.bits = bigArrays.newLongArray(initialSize, true);
}

public void set(int index) {
fill(index, true);
}

public void clear(int index) {
fill(index, false);
}

public boolean get(int index) {
int wordNum = index >> 6;
long bitmask = 1L << index;
return (bits.get(wordNum) & bitmask) != 0;
}

private void fill(int index, boolean bit) {
int wordNum = index >> 6;
bits = bigArrays.grow(bits,wordNum+1);
long bitmask = 1L << index;
long value = bit ? bits.get(wordNum) | bitmask : bits.get(wordNum) & ~bitmask;
bits.set(wordNum, value);
}

@Override
public void close() {
Releasables.close(bits);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;

Expand Down Expand Up @@ -66,11 +65,7 @@ static XContentBuilder toXContentFragment(CompositeAggregation aggregation, XCon
static void buildCompositeMap(String fieldName, Map<String, Object> composite, XContentBuilder builder) throws IOException {
builder.startObject(fieldName);
for (Map.Entry<String, Object> entry : composite.entrySet()) {
if (entry.getValue().getClass() == BytesRef.class) {
builder.field(entry.getKey(), ((BytesRef) entry.getValue()).utf8ToString());
} else {
builder.field(entry.getKey(), entry.getValue());
}
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<
throw new IllegalArgumentException("Missing value for [after." + sources.get(i).name() + "]");
}
Object obj = after.get(sourceName);
if (obj instanceof Comparable) {
if (configs[i].missingBucket() && obj == null) {
values[i] = null;
} else if (obj instanceof Comparable) {
values[i] = (Comparable<?>) obj;
} else {
throw new IllegalArgumentException("Invalid value for [after." + sources.get(i).name() +
Expand Down
Loading