Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ private BucketUtils() {}
*
* @param finalSize
* The number of terms required in the final reduce phase.
* @param singleShard
* whether a single shard is being queried, or multiple shards
* @return A suggested default for the size of any shard-side PriorityQueues
*/
public static int suggestShardSideQueueSize(int finalSize, boolean singleShard) {
public static int suggestShardSideQueueSize(int finalSize) {
if (finalSize < 1) {
throw new IllegalArgumentException("size must be positive, got " + finalSize);
}
if (singleShard) {
// In the case of a single shard, we do not need to over-request
return finalSize;
}
// Request 50% more buckets on the shards in order to improve accuracy
// as well as a small constant that should help with small values of 'size'
final long shardSampleSize = (long) (finalSize * 1.5 + 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public int shardSize() {
if (shardSize < 0) {
// Use default heuristic to avoid any wrong-ranking caused by
// distributed counting
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards() == 1);
shardSize = BucketUtils.suggestShardSideQueueSize(requiredSize);
}

if (requiredSize <= 0 || shardSize <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// each shard and as
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}

if (valuesSource instanceof ValuesSource.Bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingl
// we want to find have only one occurrence on each shard and as
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}

// TODO - need to check with mapping that this is indeed a text field....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource, Aggregator pare
// The user has not made a shardSize selection. Use default
// heuristic to avoid any wrong-ranking caused by distributed
// counting
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards() == 1));
bucketCountThresholds.setShardSize(BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize()));
}
bucketCountThresholds.ensureValidity();
if (valuesSource instanceof ValuesSource.Bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,22 @@ public class BucketUtilsTests extends ESTestCase {

public void testBadInput() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> BucketUtils.suggestShardSideQueueSize(0, randomBoolean()));
() -> BucketUtils.suggestShardSideQueueSize(0));
assertEquals(e.getMessage(), "size must be positive, got 0");
}

public void testOptimizesSingleShard() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
assertEquals(size, BucketUtils.suggestShardSideQueueSize( size, true));
}
}

public void testOverFlow() {
for (int iter = 0; iter < 10; ++iter) {
final int size = Integer.MAX_VALUE - randomInt(10);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size);
assertThat(shardSize, greaterThanOrEqualTo(shardSize));
}
}

public void testShardSizeIsGreaterThanGlobalSize() {
for (int iter = 0; iter < 10; ++iter) {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
final int numberOfShards = randomIntBetween(1, 10);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size, numberOfShards == 1);
final int shardSize = BucketUtils.suggestShardSideQueueSize( size);
assertThat(shardSize, greaterThanOrEqualTo(size));
}
}
Expand Down