Skip to content

Commit 2dd879a

Browse files
authored
[ML] adds support for non-numeric mapped types (#40220) (#40380)
* [ML] adds support for non-numeric mapped types and mapping overrides * correcting hlrc compilation issues after merge * removing mapping_override option * clearing up unnecessary changes
1 parent 88f510f commit 2dd879a

File tree

11 files changed

+371
-57
lines changed

11 files changed

+371
-57
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class DataFrameMessages {
4545
"Failed to create composite aggregation from pivot function";
4646
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID =
4747
"Data frame transform configuration [{0}] has invalid elements";
48-
48+
public static final String DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]";
4949
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY =
5050
"Failed to parse query for data frame transform";
5151
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY =

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Set;
1919

2020
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
21+
import static org.hamcrest.Matchers.containsString;
2122
import static org.hamcrest.Matchers.equalTo;
2223

2324
public class DataFramePivotRestIT extends DataFrameRestTestCase {
@@ -267,6 +268,52 @@ public void testPreviewTransform() throws Exception {
267268
});
268269
}
269270

271+
public void testPivotWithMaxOnDateField() throws Exception {
272+
String transformId = "simpleDateHistogramPivotWithMaxTime";
273+
String dataFrameIndex = "pivot_reviews_via_date_histogram_with_max_time";
274+
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
275+
276+
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId,
277+
BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
278+
279+
String config = "{"
280+
+ " \"source\": \"" + REVIEWS_INDEX_NAME + "\","
281+
+ " \"dest\": \"" + dataFrameIndex + "\",";
282+
283+
config +=" \"pivot\": { \n" +
284+
" \"group_by\": {\n" +
285+
" \"by_day\": {\"date_histogram\": {\n" +
286+
" \"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"\n" +
287+
" }}\n" +
288+
" },\n" +
289+
" \n" +
290+
" \"aggs\" :{\n" +
291+
" \"avg_rating\": {\n" +
292+
" \"avg\": {\"field\": \"stars\"}\n" +
293+
" },\n" +
294+
" \"timestamp\": {\n" +
295+
" \"max\": {\"field\": \"timestamp\"}\n" +
296+
" }\n" +
297+
" }}"
298+
+ "}";
299+
300+
createDataframeTransformRequest.setJsonEntity(config);
301+
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
302+
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
303+
assertTrue(indexExists(dataFrameIndex));
304+
305+
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
306+
307+
// we expect 21 documents as there shall be 21 days worth of docs
308+
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
309+
assertEquals(21, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
310+
assertOnePivotValue(dataFrameIndex + "/_search?q=by_day:2017-01-15", 3.82);
311+
Map<String, Object> searchResult = getAsMap(dataFrameIndex + "/_search?q=by_day:2017-01-15");
312+
String actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.timestamp", searchResult)).get(0);
313+
// Do `containsString` as actual ending timestamp is indeterminate due to how data is generated
314+
assertThat(actual, containsString("2017-01-15T20:"));
315+
}
316+
270317
private void assertOnePivotValue(String query, double expected) throws IOException {
271318
Map<String, Object> searchResult = getAsMap(query);
272319

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.core.XPackField;
2323
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
2424
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
25+
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
2526
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
2627

2728
import java.util.List;
@@ -57,9 +58,11 @@ protected void doExecute(Task task,
5758
return;
5859
}
5960

60-
Pivot pivot = new Pivot(request.getConfig().getSource(),
61-
request.getConfig().getQueryConfig().getQuery(),
62-
request.getConfig().getPivotConfig());
61+
final DataFrameTransformConfig config = request.getConfig();
62+
63+
Pivot pivot = new Pivot(config.getSource(),
64+
config.getQueryConfig().getQuery(),
65+
config.getPivotConfig());
6366

6467
getPreview(pivot, ActionListener.wrap(
6568
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
@@ -68,18 +71,24 @@ protected void doExecute(Task task,
6871
}
6972

7073
private void getPreview(Pivot pivot, ActionListener<List<Map<String, Object>>> listener) {
71-
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
72-
ClientHelper.DATA_FRAME_ORIGIN,
73-
client,
74-
SearchAction.INSTANCE,
75-
pivot.buildSearchRequest(null),
76-
ActionListener.wrap(
77-
r -> {
78-
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
79-
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
80-
listener.onResponse(pivot.extractResults(agg, stats).collect(Collectors.toList()));
81-
},
82-
listener::onFailure
83-
));
74+
pivot.deduceMappings(client, ActionListener.wrap(
75+
deducedMappings -> {
76+
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
77+
ClientHelper.DATA_FRAME_ORIGIN,
78+
client,
79+
SearchAction.INSTANCE,
80+
pivot.buildSearchRequest(null),
81+
ActionListener.wrap(
82+
r -> {
83+
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
84+
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
85+
listener.onResponse(pivot.extractResults(agg, deducedMappings, stats).collect(Collectors.toList()));
86+
},
87+
listener::onFailure
88+
));
89+
},
90+
listener::onFailure
91+
));
92+
8493
}
8594
}

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public DataFrameIndexer(Executor executor, AtomicReference<IndexerState> initial
4444

4545
protected abstract DataFrameTransformConfig getConfig();
4646

47+
protected abstract Map<String, String> getFieldMappings();
48+
4749
@Override
4850
protected void onStartJob(long now) {
4951
QueryBuilder queryBuilder = getConfig().getQueryConfig().getQuery();
@@ -70,7 +72,7 @@ private Stream<IndexRequest> processBucketsToIndexRequests(CompositeAggregation
7072
final DataFrameTransformConfig transformConfig = getConfig();
7173
String indexName = transformConfig.getDestination();
7274

73-
return pivot.extractResults(agg, getStats()).map(document -> {
75+
return pivot.extractResults(agg, getFieldMappings(), getStats()).map(document -> {
7476
XContentBuilder builder;
7577
try {
7678
builder = jsonBuilder();

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
3737
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
3838
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
39+
import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
3940

4041
import java.util.Map;
4142
import java.util.concurrent.CountDownLatch;
@@ -230,6 +231,7 @@ protected class ClientDataFrameIndexer extends DataFrameIndexer {
230231
private final DataFrameTransformsConfigManager transformsConfigManager;
231232
private final DataFrameTransformsCheckpointService transformsCheckpointService;
232233
private final String transformId;
234+
private Map<String, String> fieldMappings = null;
233235

234236
private DataFrameTransformConfig transformConfig = null;
235237

@@ -248,6 +250,11 @@ protected DataFrameTransformConfig getConfig() {
248250
return transformConfig;
249251
}
250252

253+
@Override
254+
protected Map<String, String> getFieldMappings() {
255+
return fieldMappings;
256+
}
257+
251258
@Override
252259
protected String getJobId() {
253260
return transformId;
@@ -279,6 +286,27 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
279286
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
280287
}
281288

289+
if (fieldMappings == null) {
290+
CountDownLatch latch = new CountDownLatch(1);
291+
SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination(), new LatchedActionListener<>(
292+
ActionListener.wrap(
293+
destinationMappings -> fieldMappings = destinationMappings,
294+
e -> {
295+
throw new RuntimeException(
296+
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
297+
transformConfig.getDestination()),
298+
e);
299+
}), latch));
300+
try {
301+
latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
302+
} catch (InterruptedException e) {
303+
throw new RuntimeException(
304+
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS,
305+
transformConfig.getDestination()),
306+
e);
307+
}
308+
}
309+
282310
return super.maybeTriggerAsyncJob(now);
283311
}
284312

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.Map;
2222
import java.util.stream.Stream;
2323

24+
import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType;
25+
2426
final class AggregationResultUtils {
2527
private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class);
2628

@@ -30,30 +32,38 @@ final class AggregationResultUtils {
3032
* @param agg The aggregation result
3133
* @param groups The original groupings used for querying
3234
* @param aggregationBuilders the aggregation used for querying
33-
* @param dataFrameIndexerTransformStats stats collector
35+
* @param fieldTypeMap A Map containing "field-name": "type" entries to determine the appropriate type for the aggregation results.
36+
* @param stats stats collector
3437
* @return a map containing the results of the aggregation in a consumable way
3538
*/
3639
public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg,
37-
GroupConfig groups,
38-
Collection<AggregationBuilder> aggregationBuilders,
39-
DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {
40+
GroupConfig groups,
41+
Collection<AggregationBuilder> aggregationBuilders,
42+
Map<String, String> fieldTypeMap,
43+
DataFrameIndexerTransformStats stats) {
4044
return agg.getBuckets().stream().map(bucket -> {
41-
dataFrameIndexerTransformStats.incrementNumDocuments(bucket.getDocCount());
45+
stats.incrementNumDocuments(bucket.getDocCount());
4246

4347
Map<String, Object> document = new HashMap<>();
44-
groups.getGroups().keySet().forEach(destinationFieldName -> {
45-
document.put(destinationFieldName, bucket.getKey().get(destinationFieldName));
46-
});
48+
groups.getGroups().keySet().forEach(destinationFieldName ->
49+
document.put(destinationFieldName, bucket.getKey().get(destinationFieldName)));
4750

4851
for (AggregationBuilder aggregationBuilder : aggregationBuilders) {
4952
String aggName = aggregationBuilder.getName();
53+
final String fieldType = fieldTypeMap.get(aggName);
5054

5155
// TODO: support other aggregation types
5256
Aggregation aggResult = bucket.getAggregations().get(aggName);
5357

5458
if (aggResult instanceof NumericMetricsAggregation.SingleValue) {
5559
NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult;
56-
document.put(aggName, aggResultSingleValue.value());
60+
// If the type is numeric, simply gather the `value` type, otherwise utilize `getValueAsString` so we don't lose
61+
// formatted outputs.
62+
if (isNumericType(fieldType)) {
63+
document.put(aggName, aggResultSingleValue.value());
64+
} else {
65+
document.put(aggName, aggResultSingleValue.getValueAsString());
66+
}
5767
} else {
5868
// Execution should never reach this point!
5969
// Creating transforms with unsupported aggregations shall not be possible

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,17 @@ public SearchRequest buildSearchRequest(Map<String, Object> position) {
7777
}
7878

7979
public Stream<Map<String, Object>> extractResults(CompositeAggregation agg,
80-
DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {
80+
Map<String, String> fieldTypeMap,
81+
DataFrameIndexerTransformStats dataFrameIndexerTransformStats) {
8182

8283
GroupConfig groups = config.getGroupConfig();
8384
Collection<AggregationBuilder> aggregationBuilders = config.getAggregationConfig().getAggregatorFactories();
8485

85-
return AggregationResultUtils.extractCompositeAggregationResults(agg, groups, aggregationBuilders, dataFrameIndexerTransformStats);
86+
return AggregationResultUtils.extractCompositeAggregationResults(agg,
87+
groups,
88+
aggregationBuilders,
89+
fieldTypeMap,
90+
dataFrameIndexerTransformStats);
8691
}
8792

8893
private void runTestQuery(Client client, final ActionListener<Boolean> listener) {
@@ -99,7 +104,7 @@ private void runTestQuery(Client client, final ActionListener<Boolean> listener)
99104
}
100105
listener.onResponse(true);
101106
}, e->{
102-
listener.onFailure(new RuntimeException("Failed to test query",e));
107+
listener.onFailure(new RuntimeException("Failed to test query", e));
103108
}));
104109
}
105110

0 commit comments

Comments
 (0)