Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -224,6 +227,36 @@ public void testDateHistogramPivot() throws Exception {
assertOnePivotValue(dataFrameIndex + "/_search?q=by_day:2017-01-15", 3.82);
}

@SuppressWarnings("unchecked")
public void testPreviewTransform() throws Exception {
final Request createPreviewRequest = new Request("POST", DATAFRAME_ENDPOINT + "_preview");

String config = "{"
+ " \"source\": \"reviews\","
+ " \"id\": \"doesnot-matter\","
+ " \"dest\": \"doesnot-matter\",";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {\"terms\": { \"field\": \"user_id\" }},"
+ " \"by_day\": {\"date_histogram\": {\"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"}}},"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createPreviewRequest.setJsonEntity(config);
Map<String, Object> previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest));
List<Map<String, Object>> preview = (List<Map<String, Object>>)previewDataframeResponse.get("data_frame_preview");
assertThat(preview.size(), equalTo(393));
Set<String> expectedFields = new HashSet<>(Arrays.asList("reviewer", "by_day", "avg_rating"));
preview.forEach(p -> {
Set<String> keys = p.keySet();
assertThat(keys, equalTo(expectedFields));
});
}

private void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
// start the transform
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@
import org.elasticsearch.xpack.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.dataframe.action.PreviewDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.PutDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction;
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.dataframe.action.TransportPreviewDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction;
Expand All @@ -64,6 +66,7 @@
import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestGetDataFrameTransformsAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestGetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestPreviewDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestPutDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestStartDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.rest.action.RestStopDataFrameTransformAction;
Expand Down Expand Up @@ -137,7 +140,8 @@ public List<RestHandler> getRestHandlers(final Settings settings, final RestCont
new RestStopDataFrameTransformAction(settings, restController),
new RestDeleteDataFrameTransformAction(settings, restController),
new RestGetDataFrameTransformsAction(settings, restController),
new RestGetDataFrameTransformsStatsAction(settings, restController)
new RestGetDataFrameTransformsStatsAction(settings, restController),
new RestPreviewDataFrameTransformAction(settings, restController)
);
}

Expand All @@ -153,7 +157,8 @@ public List<RestHandler> getRestHandlers(final Settings settings, final RestCont
new ActionHandler<>(StopDataFrameTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class),
new ActionHandler<>(DeleteDataFrameTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class),
new ActionHandler<>(GetDataFrameTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class),
new ActionHandler<>(GetDataFrameTransformsStatsAction.INSTANCE, TransportGetDataFrameTransformsStatsAction.class)
new ActionHandler<>(GetDataFrameTransformsStatsAction.INSTANCE, TransportGetDataFrameTransformsStatsAction.class),
new ActionHandler<>(PreviewDataFrameTransformAction.INSTANCE, TransportPreviewDataFrameTransformAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.dataframe.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class PreviewDataFrameTransformAction extends Action<PreviewDataFrameTransformAction.Response> {

public static final PreviewDataFrameTransformAction INSTANCE = new PreviewDataFrameTransformAction();
public static final String NAME = "cluster:admin/data_frame/preview";

private PreviewDataFrameTransformAction() {
super(NAME);
}

@Override
public Response newResponse() {
return new Response();
}

public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {

private DataFrameTransformConfig config;

public Request(DataFrameTransformConfig config) {
this.setConfig(config);
}

public Request() { }

public static Request fromXContent(final XContentParser parser) throws IOException {
return new Request(DataFrameTransformConfig.fromXContent(parser, null, false));
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return this.config.toXContent(builder, params);
}

public DataFrameTransformConfig getConfig() {
return config;
}

public void setConfig(DataFrameTransformConfig config) {
this.config = config;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.config = new DataFrameTransformConfig(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.config.writeTo(out);
}

@Override
public int hashCode() {
return Objects.hash(config);
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(config, other.config);
}
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {

protected RequestBuilder(ElasticsearchClient client, PreviewDataFrameTransformAction action) {
super(client, action, new Request());
}
}

public static class Response extends ActionResponse implements ToXContentObject {

private List<Map<String, Object>> docs;
public static ParseField DATA_FRAME_PREVIEW = new ParseField("data_frame_preview");

static ObjectParser<Response, Void> PARSER = new ObjectParser<>("data_frame_transform_preview", Response::new);
static {
PARSER.declareObjectArray(Response::setDocs, (p, c) -> p.mapOrdered(), DATA_FRAME_PREVIEW);
}
public Response() {}

public Response(StreamInput in) throws IOException {
int size = in.readInt();
this.docs = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
this.docs.add(in.readMap());
}
}

public Response(List<Map<String, Object>> docs) {
this.docs = new ArrayList<>(docs);
}

public void setDocs(List<Map<String, Object>> docs) {
this.docs = new ArrayList<>(docs);
}

@Override
public void readFrom(StreamInput in) throws IOException {
int size = in.readInt();
this.docs = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
this.docs.add(in.readMap());
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(docs.size());
for (Map<String, Object> doc : docs) {
out.writeMapWithConsistentOrder(doc);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DATA_FRAME_PREVIEW.getPreferredName(), docs);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}

if (obj == null || obj.getClass() != getClass()) {
return false;
}

Response other = (Response) obj;
return Objects.equals(other.docs, docs);
}

@Override
public int hashCode() {
return Objects.hashCode(docs);
}

public static Response fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.dataframe.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.transform.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;

import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.dataframe.transforms.DataFrameIndexer.COMPOSITE_AGGREGATION_NAME;

public class TransportPreviewDataFrameTransformAction extends
HandledTransportAction<PreviewDataFrameTransformAction.Request, PreviewDataFrameTransformAction.Response> {

private final XPackLicenseState licenseState;
private final Client client;
private final ThreadPool threadPool;

@Inject
public TransportPreviewDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
Client client, ThreadPool threadPool, XPackLicenseState licenseState) {
super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters,
(Supplier<PreviewDataFrameTransformAction.Request>) PreviewDataFrameTransformAction.Request::new);
this.licenseState = licenseState;
this.client = client;
this.threadPool = threadPool;
}

@Override
protected void doExecute(Task task,
PreviewDataFrameTransformAction.Request request,
ActionListener<PreviewDataFrameTransformAction.Response> listener) {
if (!licenseState.isDataFrameAllowed()) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
return;
}

Pivot pivot = new Pivot(request.getConfig().getSource(),
request.getConfig().getQueryConfig().getQuery(),
request.getConfig().getPivotConfig());

getPreview(pivot, ActionListener.wrap(
previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)),
listener::onFailure
));
}

private void getPreview(Pivot pivot, ActionListener<List<Map<String, Object>>> listener) {
ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(),
ClientHelper.DATA_FRAME_ORIGIN,
client,
SearchAction.INSTANCE,
pivot.buildSearchRequest(null),
ActionListener.wrap(
r -> {
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
listener.onResponse(pivot.extractResults(agg, stats).collect(Collectors.toList()));
},
listener::onFailure
));
}
}
Loading