Skip to content

Commit a30bf27

Browse files
authored
[ML] add auditor to data frame plugin (#40012) (#40394)
* [Data Frame] add auditor * Adjusting Level, Auditor, and message to address pr comments * Addressing PR comments
1 parent 2dd879a commit a30bf27

File tree

16 files changed

+833
-17
lines changed

16 files changed

+833
-17
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.common.notifications;
7+
8+
import org.elasticsearch.common.ParseField;
9+
import org.elasticsearch.common.xcontent.ToXContent;
10+
import org.elasticsearch.common.xcontent.ToXContentObject;
11+
import org.elasticsearch.common.xcontent.XContentBuilder;
12+
13+
import java.io.IOException;
14+
import java.util.Date;
15+
import java.util.Objects;
16+
17+
public abstract class AbstractAuditMessage implements ToXContentObject {
18+
public static final ParseField TYPE = new ParseField("audit_message");
19+
20+
public static final ParseField MESSAGE = new ParseField("message");
21+
public static final ParseField LEVEL = new ParseField("level");
22+
public static final ParseField TIMESTAMP = new ParseField("timestamp");
23+
public static final ParseField NODE_NAME = new ParseField("node_name");
24+
25+
private final String resourceId;
26+
private final String message;
27+
private final Level level;
28+
private final Date timestamp;
29+
private final String nodeName;
30+
31+
public AbstractAuditMessage(String resourceId, String message, Level level, String nodeName) {
32+
this.resourceId = resourceId;
33+
this.message = Objects.requireNonNull(message);
34+
this.level = Objects.requireNonNull(level);
35+
this.timestamp = new Date();
36+
this.nodeName = nodeName;
37+
}
38+
39+
protected AbstractAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
40+
this.resourceId = resourceId;
41+
this.message = Objects.requireNonNull(message);
42+
this.level = Objects.requireNonNull(level);
43+
this.timestamp = Objects.requireNonNull(timestamp);
44+
this.nodeName = nodeName;
45+
}
46+
47+
public final String getResourceId() {
48+
return resourceId;
49+
}
50+
51+
public final String getMessage() {
52+
return message;
53+
}
54+
55+
public final Level getLevel() {
56+
return level;
57+
}
58+
59+
public final Date getTimestamp() {
60+
return timestamp;
61+
}
62+
63+
public final String getNodeName() {
64+
return nodeName;
65+
}
66+
67+
@Override
68+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
69+
builder.startObject();
70+
if (resourceId != null) {
71+
builder.field(getResourceField(), resourceId);
72+
}
73+
builder.field(MESSAGE.getPreferredName(), message);
74+
builder.field(LEVEL.getPreferredName(), level);
75+
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
76+
if (nodeName != null) {
77+
builder.field(NODE_NAME.getPreferredName(), nodeName);
78+
}
79+
builder.endObject();
80+
return builder;
81+
}
82+
83+
@Override
84+
public int hashCode() {
85+
return Objects.hash(resourceId, message, level, timestamp);
86+
}
87+
88+
@Override
89+
public boolean equals(Object obj) {
90+
if (obj == null) {
91+
return false;
92+
}
93+
if (obj instanceof AbstractAuditMessage == false) {
94+
return false;
95+
}
96+
97+
AbstractAuditMessage other = (AbstractAuditMessage) obj;
98+
return Objects.equals(resourceId, other.resourceId) &&
99+
Objects.equals(message, other.message) &&
100+
Objects.equals(level, other.level) &&
101+
Objects.equals(timestamp, other.timestamp);
102+
}
103+
104+
protected abstract String getResourceField();
105+
106+
public abstract static class AbstractBuilder<T extends AbstractAuditMessage> {
107+
108+
public T info(String resourceId, String message, String nodeName) {
109+
return newMessage(Level.INFO, resourceId, message, nodeName);
110+
}
111+
112+
public T warning(String resourceId, String message, String nodeName) {
113+
return newMessage(Level.WARNING, resourceId, message, nodeName);
114+
}
115+
116+
public T error(String resourceId, String message, String nodeName) {
117+
return newMessage(Level.ERROR, resourceId, message, nodeName);
118+
}
119+
120+
protected abstract T newMessage(Level level, String resourceId, String message, String nodeName);
121+
}
122+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.common.notifications;
7+
8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.index.IndexRequest;
12+
import org.elasticsearch.action.index.IndexResponse;
13+
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.common.xcontent.ToXContent;
16+
import org.elasticsearch.common.xcontent.XContentBuilder;
17+
18+
import java.io.IOException;
19+
import java.util.Objects;
20+
21+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
22+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
23+
24+
public class Auditor<T extends AbstractAuditMessage> {
25+
26+
private static final Logger logger = LogManager.getLogger(Auditor.class);
27+
private final Client client;
28+
private final String nodeName;
29+
private final String auditIndex;
30+
private final String executionOrigin;
31+
private final AbstractAuditMessage.AbstractBuilder<T> messageBuilder;
32+
33+
public Auditor(Client client,
34+
String nodeName,
35+
String auditIndex,
36+
String executionOrigin,
37+
AbstractAuditMessage.AbstractBuilder<T> messageBuilder) {
38+
this.client = Objects.requireNonNull(client);
39+
this.nodeName = Objects.requireNonNull(nodeName);
40+
this.auditIndex = auditIndex;
41+
this.executionOrigin = executionOrigin;
42+
this.messageBuilder = Objects.requireNonNull(messageBuilder);
43+
}
44+
45+
public final void info(String resourceId, String message) {
46+
indexDoc(messageBuilder.info(resourceId, message, nodeName));
47+
}
48+
49+
public final void warning(String resourceId, String message) {
50+
indexDoc(messageBuilder.warning(resourceId, message, nodeName));
51+
}
52+
53+
public final void error(String resourceId, String message) {
54+
indexDoc(messageBuilder.error(resourceId, message, nodeName));
55+
}
56+
57+
protected void onIndexResponse(IndexResponse response) {
58+
logger.trace("Successfully wrote audit message");
59+
}
60+
61+
protected void onIndexFailure(Exception exception) {
62+
logger.debug("Failed to write audit message", exception);
63+
}
64+
65+
private void indexDoc(ToXContent toXContent) {
66+
IndexRequest indexRequest = new IndexRequest(auditIndex);
67+
indexRequest.source(toXContentBuilder(toXContent));
68+
indexRequest.timeout(TimeValue.timeValueSeconds(5));
69+
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
70+
executionOrigin,
71+
indexRequest,
72+
ActionListener.wrap(
73+
this::onIndexResponse,
74+
this::onIndexFailure
75+
), client::index);
76+
}
77+
78+
private XContentBuilder toXContentBuilder(ToXContent toXContent) {
79+
try (XContentBuilder jsonBuilder = jsonBuilder()) {
80+
return toXContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
81+
} catch (IOException e) {
82+
throw new RuntimeException(e);
83+
}
84+
}
85+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.common.notifications;
7+
8+
import java.util.Locale;
9+
10+
public enum Level {
11+
INFO, WARNING, ERROR;
12+
13+
/**
14+
* Case-insensitive from string method.
15+
*
16+
* @param value
17+
* String representation
18+
* @return The condition type
19+
*/
20+
public static Level fromString(String value) {
21+
return Level.valueOf(value.toUpperCase(Locale.ROOT));
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return name().toLowerCase(Locale.ROOT);
27+
}
28+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public final class DataFrameField {
3232
public static final String REST_BASE_PATH = "/_data_frame/";
3333
public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/";
3434
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/";
35+
public static final String DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD = "transform_id";
3536

3637
// note: this is used to match tasks
3738
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.dataframe.notifications;
7+
8+
import org.elasticsearch.common.ParseField;
9+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
10+
import org.elasticsearch.common.xcontent.ObjectParser;
11+
import org.elasticsearch.common.xcontent.XContentParser;
12+
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
13+
import org.elasticsearch.xpack.core.common.notifications.Level;
14+
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
15+
16+
import java.util.Date;
17+
18+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
19+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
20+
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD;
21+
22+
public class DataFrameAuditMessage extends AbstractAuditMessage {
23+
24+
private static final ParseField TRANSFORM_ID = new ParseField(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD);
25+
public static final ConstructingObjectParser<DataFrameAuditMessage, Void> PARSER = new ConstructingObjectParser<>(
26+
"data_frame_audit_message",
27+
true,
28+
a -> new DataFrameAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
29+
30+
static {
31+
PARSER.declareString(optionalConstructorArg(), TRANSFORM_ID);
32+
PARSER.declareString(constructorArg(), MESSAGE);
33+
PARSER.declareField(constructorArg(), p -> {
34+
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
35+
return Level.fromString(p.text());
36+
}
37+
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
38+
}, LEVEL, ObjectParser.ValueType.STRING);
39+
PARSER.declareField(constructorArg(), parser -> {
40+
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
41+
return new Date(parser.longValue());
42+
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
43+
return new Date(TimeUtils.dateStringToEpoch(parser.text()));
44+
}
45+
throw new IllegalArgumentException(
46+
"unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
47+
}, TIMESTAMP, ObjectParser.ValueType.VALUE);
48+
PARSER.declareString(optionalConstructorArg(), NODE_NAME);
49+
}
50+
51+
public DataFrameAuditMessage(String resourceId, String message, Level level, String nodeName) {
52+
super(resourceId, message, level, nodeName);
53+
}
54+
55+
protected DataFrameAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
56+
super(resourceId, message, level, timestamp, nodeName);
57+
}
58+
59+
@Override
60+
protected String getResourceField() {
61+
return TRANSFORM_ID.getPreferredName();
62+
}
63+
64+
public static AbstractAuditMessage.AbstractBuilder<DataFrameAuditMessage> builder() {
65+
return new AbstractBuilder<DataFrameAuditMessage>() {
66+
@Override
67+
protected DataFrameAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
68+
return new DataFrameAuditMessage(resourceId, message, level, nodeName);
69+
}
70+
};
71+
}
72+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,18 @@ private static Map<String, RoleDescriptor> initializeReservedRoles() {
158158
null, MetadataUtils.DEFAULT_RESERVED_METADATA))
159159
.put("data_frame_transforms_admin", new RoleDescriptor("data_frame_transforms_admin",
160160
new String[] { "manage_data_frame_transforms" },
161-
null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
161+
new RoleDescriptor.IndicesPrivileges[]{
162+
RoleDescriptor.IndicesPrivileges.builder()
163+
.indices(".data-frame-notifications*")
164+
.privileges("view_index_metadata", "read").build()
165+
}, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
162166
.put("data_frame_transforms_user", new RoleDescriptor("data_frame_transforms_user",
163167
new String[] { "monitor_data_frame_transforms" },
164-
null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
168+
new RoleDescriptor.IndicesPrivileges[]{
169+
RoleDescriptor.IndicesPrivileges.builder()
170+
.indices(".data-frame-notifications*")
171+
.privileges("view_index_metadata", "read").build()
172+
}, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
165173
.put("watcher_admin", new RoleDescriptor("watcher_admin", new String[] { "manage_watcher" },
166174
new RoleDescriptor.IndicesPrivileges[] {
167175
RoleDescriptor.IndicesPrivileges.builder().indices(Watch.INDEX, TriggeredWatchStoreField.INDEX_NAME,

0 commit comments

Comments
 (0)