Skip to content

Commit 31f81b1

Browse files
authored
Make composite bucket size configurable (opensearch-project#4544)
* Make composite bucket size configurable Signed-off-by: Lantao Jin <[email protected]> * update doc Signed-off-by: Lantao Jin <[email protected]> --------- Signed-off-by: Lantao Jin <[email protected]>
1 parent 0499e95 commit 31f81b1

File tree

15 files changed

+147
-18
lines changed

15 files changed

+147
-18
lines changed

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public enum Key {
5050
/** Common Settings for SQL and PPL. */
5151
QUERY_MEMORY_LIMIT("plugins.query.memory_limit"),
5252
QUERY_SIZE_LIMIT("plugins.query.size_limit"),
53+
QUERY_BUCKET_SIZE("plugins.query.buckets"),
54+
SEARCH_MAX_BUCKETS("search.max_buckets"),
5355
ENCYRPTION_MASTER_KEY("plugins.query.datasources.encryption.masterkey"),
5456
DATASOURCES_URI_HOSTS_DENY_LIST("plugins.query.datasources.uri.hosts.denylist"),
5557
DATASOURCES_LIMIT("plugins.query.datasources.limit"),

core/src/test/java/org/opensearch/sql/executor/QueryServiceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public Helper() {
103103
lenient().when(analyzer.analyze(any(), any())).thenReturn(logicalPlan);
104104
lenient().when(planner.plan(any())).thenReturn(plan);
105105
lenient().when(settings.getSettingValue(Key.QUERY_SIZE_LIMIT)).thenReturn(200);
106+
lenient().when(settings.getSettingValue(Key.QUERY_BUCKET_SIZE)).thenReturn(1000);
106107
lenient().when(settings.getSettingValue(Key.CALCITE_ENGINE_ENABLED)).thenReturn(false);
107108

108109
queryService = new QueryService(analyzer, executionEngine, planner, null, settings);

docs/user/admin/settings.rst

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ Result set::
161161
}
162162

163163
plugins.query.size_limit
164-
===========================
164+
========================
165165

166166
Description
167167
-----------
@@ -188,6 +188,43 @@ Result set::
188188
}
189189
}
190190

191+
plugins.query.buckets
192+
=====================
193+
194+
Version
195+
-------
196+
3.4.0
197+
198+
Description
199+
-----------
200+
201+
This configuration indicates how many aggregation buckets will return in a single response. The default value equals to ``plugins.query.size_limit``.
202+
You can change the value to any value not greater than the maximum number of aggregation buckets allowed in a single response (`search.max_buckets`), here is an example::
203+
204+
>> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{
205+
"transient" : {
206+
"plugins.query.buckets" : 1000
207+
}
208+
}'
209+
210+
Result set::
211+
212+
{
213+
"acknowledged" : true,
214+
"persistent" : { },
215+
"transient" : {
216+
"plugins" : {
217+
"query" : {
218+
"buckets" : "1000"
219+
}
220+
}
221+
}
222+
}
223+
224+
Limitations
225+
-----------
226+
The number of aggregation buckets is fixed to ``1000`` in v2. ``plugins.query.buckets`` can only effect the number of aggregation buckets when calcite enabled.
227+
191228
plugins.query.memory_limit
192229
==========================
193230

docs/user/ppl/admin/settings.rst

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,43 @@ Change the size_limit to 1000::
133133

134134
Note: the legacy settings of ``opendistro.query.size_limit`` is deprecated, it will fallback to the new settings if you request an update with the legacy name.
135135

136+
plugins.query.buckets
137+
=====================
138+
139+
Version
140+
-------
141+
3.4.0
142+
143+
Description
144+
-----------
145+
146+
This configuration indicates how many aggregation buckets will return in a single response. The default value equals to ``plugins.query.size_limit``.
147+
You can change the value to any value not greater than the maximum number of aggregation buckets allowed in a single response (`search.max_buckets`), here is an example::
148+
149+
>> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{
150+
"transient" : {
151+
"plugins.query.buckets" : 1000
152+
}
153+
}'
154+
155+
Result set::
156+
157+
{
158+
"acknowledged" : true,
159+
"persistent" : { },
160+
"transient" : {
161+
"plugins" : {
162+
"query" : {
163+
"buckets" : "1000"
164+
}
165+
}
166+
}
167+
}
168+
169+
Limitations
170+
-----------
171+
The number of aggregation buckets is fixed to ``1000`` in v2. ``plugins.query.buckets`` can only effect the number of aggregation buckets when calcite enabled.
172+
136173
plugins.calcite.all_join_types.allowed
137174
======================================
138175

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class CalciteExplainIT extends ExplainIT {
2727
public void init() throws Exception {
2828
super.init();
2929
enableCalcite();
30+
setQueryBucketSize(1000);
3031
loadIndex(Index.BANK_WITH_STRING_VALUES);
3132
loadIndex(Index.NESTED_SIMPLE);
3233
loadIndex(Index.TIME_TEST_DATA);

integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLIntegTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ private Settings defaultSettings() {
112112
private final Map<Key, Object> defaultSettings =
113113
new ImmutableMap.Builder<Key, Object>()
114114
.put(Key.QUERY_SIZE_LIMIT, 200)
115+
.put(Key.QUERY_BUCKET_SIZE, 1000)
115116
.put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1))
116117
.put(Key.FIELD_TYPE_TOLERANCE, true)
117118
.put(Key.CALCITE_ENGINE_ENABLED, true)

integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public abstract class SQLIntegTestCase extends OpenSearchSQLRestTestCase {
5151
public static final String TRANSIENT = "transient";
5252
public static final Integer DEFAULT_QUERY_SIZE_LIMIT =
5353
Integer.parseInt(System.getProperty("defaultQuerySizeLimit", "200"));
54+
public static final Integer DEFAULT_QUERY_BUCKET_SIZE =
55+
Integer.parseInt(System.getProperty("defaultQueryBucketSize", "1000"));
5456
public static final Integer DEFAULT_MAX_RESULT_WINDOW =
5557
Integer.parseInt(System.getProperty("defaultMaxResultWindow", "10000"));
5658

@@ -148,6 +150,20 @@ protected void resetQuerySizeLimit() throws IOException {
148150
DEFAULT_QUERY_SIZE_LIMIT.toString()));
149151
}
150152

153+
protected void setQueryBucketSize(Integer limit) throws IOException {
154+
updateClusterSettings(
155+
new ClusterSetting(
156+
"transient", Settings.Key.QUERY_BUCKET_SIZE.getKeyValue(), limit.toString()));
157+
}
158+
159+
protected void resetQueryBucketSize() throws IOException {
160+
updateClusterSettings(
161+
new ClusterSetting(
162+
"transient",
163+
Settings.Key.QUERY_BUCKET_SIZE.getKeyValue(),
164+
DEFAULT_QUERY_BUCKET_SIZE.toString()));
165+
}
166+
151167
@SneakyThrows
152168
protected void setDataSourcesEnabled(String clusterSettingType, boolean value) {
153169
updateClusterSettings(

integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ private Settings defaultSettings() {
166166
private final Map<Key, Object> defaultSettings =
167167
new ImmutableMap.Builder<Key, Object>()
168168
.put(Key.QUERY_SIZE_LIMIT, 200)
169+
.put(Key.QUERY_BUCKET_SIZE, 1000)
169170
.put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1))
170171
.put(Key.FIELD_TYPE_TOLERANCE, true)
171172
.build();

integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ private Settings defaultSettings() {
167167
private final Map<Key, Object> defaultSettings =
168168
new ImmutableMap.Builder<Key, Object>()
169169
.put(Key.QUERY_SIZE_LIMIT, 200)
170+
.put(Key.QUERY_BUCKET_SIZE, 1000)
170171
.put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1))
171172
.put(Key.FIELD_TYPE_TOLERANCE, true)
172173
.build();

opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,6 @@
9898
*/
9999
public class AggregateAnalyzer {
100100

101-
/** How many composite buckets should be returned. */
102-
public static final int AGGREGATION_BUCKET_SIZE = 1000;
103-
104101
/** metadata field used when there is no argument. Only apply to COUNT. */
105102
private static final String METADATA_FIELD = "_index";
106103

@@ -141,6 +138,7 @@ static class AggregateBuilderHelper {
141138
final Map<String, ExprType> fieldTypes;
142139
final RelOptCluster cluster;
143140
final boolean bucketNullable;
141+
final int bucketSize;
144142

145143
<T extends ValuesSourceAggregationBuilder<T>> T build(RexNode node, T aggBuilder) {
146144
return build(node, aggBuilder::field, aggBuilder::script);
@@ -188,7 +186,8 @@ public static Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser
188186
RelDataType rowType,
189187
Map<String, ExprType> fieldTypes,
190188
List<String> outputFields,
191-
RelOptCluster cluster)
189+
RelOptCluster cluster,
190+
int bucketSize)
192191
throws ExpressionNotAnalyzableException {
193192
requireNonNull(aggregate, "aggregate");
194193
try {
@@ -201,7 +200,7 @@ public static Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser
201200
.orElseGet(() -> "true"));
202201
List<Integer> groupList = aggregate.getGroupSet().asList();
203202
AggregateBuilderHelper helper =
204-
new AggregateBuilderHelper(rowType, fieldTypes, cluster, bucketNullable);
203+
new AggregateBuilderHelper(rowType, fieldTypes, cluster, bucketNullable, bucketSize);
205204
List<String> aggFieldNames = outputFields.subList(groupList.size(), outputFields.size());
206205
// Process all aggregate calls
207206
Pair<Builder, List<MetricParser>> builderAndParser =
@@ -242,8 +241,7 @@ && isAutoDateSpan(project.getProjects().get(groupList.getFirst()))) {
242241
List<CompositeValuesSourceBuilder<?>> buckets =
243242
createCompositeBuckets(groupList, project, helper);
244243
aggregationBuilder =
245-
AggregationBuilders.composite("composite_buckets", buckets)
246-
.size(AGGREGATION_BUCKET_SIZE);
244+
AggregationBuilders.composite("composite_buckets", buckets).size(bucketSize);
247245
if (newMetricBuilder != null) {
248246
aggregationBuilder.subAggregations(metricBuilder);
249247
}
@@ -629,7 +627,7 @@ private static ValuesSourceAggregationBuilder<?> createTermsAggregationBuilder(
629627
helper.build(
630628
group,
631629
new TermsAggregationBuilder(bucketName)
632-
.size(AGGREGATION_BUCKET_SIZE)
630+
.size(helper.bucketSize)
633631
.order(BucketOrder.key(true)));
634632
return withValueTypeHint(
635633
sourceBuilder,

0 commit comments

Comments
 (0)