From ea2a2bdb56873dba13d393f7273ee8ed2e4e8e0f Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 1 Nov 2018 10:44:23 +0000 Subject: [PATCH 01/10] Change get jobs and get jobs stats to read from cs and index. This enables calls to the get job/stats APIs in a mixed cluster state before jobs have been migrated --- .../action/TransportGetJobsStatsAction.java | 10 +- .../xpack/ml/job/JobManager.java | 68 +++++++++++- .../job/persistence/ExpandedIdsMatcher.java | 12 ++ .../xpack/ml/job/JobManagerTests.java | 105 ++++++++++++++++++ 4 files changed, 188 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index 57b5dbd8b7f23..e0ae481d61b53 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -54,27 +54,27 @@ public class TransportGetJobsStatsAction extends TransportTasksAction finalListener) { - jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap( + jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( expandedIds -> { request.setExpandedJobsIds(new ArrayList<>(expandedIds)); ActionListener jobStatsListener = ActionListener.wrap( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1097a547d2f57..ee05016fad28e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; +import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; @@ -66,6 +67,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -173,6 +176,8 @@ private void getJobFromClusterState(String jobId, ActionListener jobListene */ public void expandJobs(String expression, boolean allowNoJobs, ActionListener> jobsListener) { Map clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state()); + ExpandedIdsMatcher idsMatcher = new ExpandedIdsMatcher(expression, allowNoJobs); + idsMatcher.filterMatchedIds(clusterStateJobs.keySet()); jobConfigProvider.expandJobs(expression, allowNoJobs, false, ActionListener.wrap( jobBuilders -> { @@ -195,7 +200,18 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener(jobs, jobs.size(), Job.RESULTS_FIELD)); }, - jobsListener::onFailure + e -> { + if (e instanceof ResourceNotFoundException) { + // Check if we found the jobs that satisfy expression in the clusterstate + if (idsMatcher.hasUnmatchedIds() == false) { + List jobs = new ArrayList<>(clusterStateJobs.values()); + Collections.sort(jobs, Comparator.comparing(Job::getId)); + jobsListener.onResponse(new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD)); + return; + } + } + jobsListener.onFailure(e); + } )); } @@ -208,11 +224,59 @@ private Map expandJobsFromClusterState(String expression, boolean a jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); } } catch (Exception e) { - // ignore + // ignore exception thrown if no job Id matches expression } return jobIdToJob; } + /** + * Get the job Ids that match the given {@code expression}. + * + * @param expression the jobId or an expression matching jobIds + * @param allowNoJobs if {@code false}, an error is thrown when no job matches the {@code jobId} + * @param jobsListener The jobs listener + */ + public void expandJobIds(String expression, boolean allowNoJobs, ActionListener> jobsListener) { + Set clusterStateJobIds = expandJobIdsFromClusterState(expression, allowNoJobs, clusterService.state()); + ExpandedIdsMatcher idsMatcher = new ExpandedIdsMatcher(expression, allowNoJobs); + idsMatcher.filterMatchedIds(clusterStateJobIds); + + jobConfigProvider.expandJobsIds(expression, allowNoJobs, false, ActionListener.wrap( + jobIds -> { + // Check for duplicate job Ids + jobIds.forEach(id -> { + if (clusterStateJobIds.contains(id)) { + jobsListener.onFailure(new IllegalStateException("Job [" + id + "] configuration " + + "exists in both clusterstate and index")); + return; + } + }); + + jobIds.addAll(clusterStateJobIds); + jobsListener.onResponse(jobIds); + }, + e -> { + if (e instanceof ResourceNotFoundException) { + // Check if we found the job Ids that satisfy expression in the clusterstate + if (idsMatcher.hasUnmatchedIds() == false) { + jobsListener.onResponse(new TreeSet<>(clusterStateJobIds)); + return; + } + } + jobsListener.onFailure(e); + } + )); + } + + private Set expandJobIdsFromClusterState(String expression, boolean allowNoJobs, ClusterState clusterState) { + try { + return MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs); + } catch (Exception e) { + // ignore exception thrown if no job Id matches expression + } + return Collections.emptySet(); + } + /** * Validate the char filter/tokenizer/token filter names used in the categorization analyzer config (if any). * This validation has to be done server-side; it cannot be done in a client as that won't have loaded the diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java index 4f4968a9d5629..41a4d53df6b8f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java @@ -44,6 +44,18 @@ public final class ExpandedIdsMatcher { private final LinkedList requiredMatches; + /** + * Generate the list of required matches from {@code tokenExpression} and initialize. + * + * @param tokenExpression Token expression string will be split by {@link #tokenizeExpression(String)} + * @param allowNoMatchForWildcards If true then it is not required for wildcard + * expressions to match an Id meaning they are + * not returned in the list of required matches + */ + public ExpandedIdsMatcher(String tokenExpression, boolean allowNoMatchForWildcards) { + this(ExpandedIdsMatcher.tokenizeExpression(tokenExpression), allowNoMatchForWildcards); + } + /** * Generate the list of required matches from the expressions in {@code tokens} * and initialize. diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index e7ec8c789855f..10e0d1b4830fc 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -57,8 +57,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -218,6 +220,109 @@ public void testExpandJobsFromClusterStateAndIndex() throws IOException { assertThat(jobIds, contains("foo-cs-1", "foo-cs-2", "foo-index")); } + public void testExpandJobs_GivenJobInClusterStateNotIndex() throws IOException { + Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); + Job csJobFoo2 = buildJobBuilder("foo-cs-2").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1, false); + mlMetadata.putJob(csJobFoo2, false); + + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + + List docsAsBytes = new ArrayList<>(); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); + JobManager jobManager = createJobManager(mockClientBuilder.build()); + + + AtomicReference> jobsHolder = new AtomicReference<>(); + jobManager.expandJobs("foo*", true, ActionListener.wrap( + jobs -> jobsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobsHolder.get()); + assertThat(jobsHolder.get().results(), hasSize(2)); + List jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); + assertThat(jobIds, contains("foo-cs-1", "foo-cs-2")); + } + + public void testExpandJobIdsFromClusterStateAndIndex() throws IOException { + Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); + Job csJobFoo2 = buildJobBuilder("foo-cs-2").build(); + Job csJobBar = buildJobBuilder("bar-cs").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1, false); + mlMetadata.putJob(csJobFoo2, false); + mlMetadata.putJob(csJobBar, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + + List> fieldHits = new ArrayList<>(); + Map fieldMap = new HashMap<>(); + fieldMap.put(Job.ID.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("index-job"))); + fieldMap.put(Job.GROUPS.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.emptyList())); + + fieldHits.add(fieldMap); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), fieldHits); + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + jobManager.expandJobIds("_all", true, ActionListener.wrap( + jobs -> jobIdsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobIdsHolder.get()); + assertThat(jobIdsHolder.get(), hasSize(4)); + assertThat(jobIdsHolder.get(), contains("bar-cs", "foo-cs-1", "foo-cs-2", "index-job")); + } + + public void testExpandJobIds_GivenJobInClusterStateNotInded() { + Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), Collections.emptyList()); + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + jobManager.expandJobIds("foo*", true, ActionListener.wrap( + jobs -> jobIdsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobIdsHolder.get()); + assertThat(jobIdsHolder.get(), hasSize(1)); + assertThat(jobIdsHolder.get(), contains("foo-cs-1")); + } + @SuppressWarnings("unchecked") public void testPutJob_AddsCreateTime() throws IOException { MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); From 81ec15ba828aa7261f414b87556601c0f97b5490 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 1 Nov 2018 17:17:54 +0000 Subject: [PATCH 02/10] Change group or job lookup to not throw on missing Id --- .../xpack/core/ml/MlMetadata.java | 10 +- .../core/ml/job/groups/GroupOrJobLookup.java | 6 +- .../xpack/core/ml/utils/NameResolver.java | 30 +---- .../ml/job/groups/GroupOrJobLookupTests.java | 52 ++++----- .../xpack/ml/MlMetadataTests.java | 20 ++-- .../xpack/ml/utils/NameResolverTests.java | 104 ++++++++---------- 6 files changed, 84 insertions(+), 138 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 81762def4cc35..2f891542365a6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -86,8 +86,8 @@ public boolean isGroupOrJob(String id) { return groupOrJobLookup.isGroupOrJob(id); } - public Set expandJobIds(String expression, boolean allowNoJobs) { - return groupOrJobLookup.expandJobIds(expression, allowNoJobs); + public Set expandJobIds(String expression) { + return groupOrJobLookup.expandJobIds(expression); } public boolean isJobDeleting(String jobId) { @@ -107,9 +107,9 @@ public Optional getDatafeedByJobId(String jobId) { return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst(); } - public Set expandDatafeedIds(String expression, boolean allowNoDatafeeds) { - return NameResolver.newUnaliased(datafeeds.keySet(), ExceptionsHelper::missingDatafeedException) - .expand(expression, allowNoDatafeeds); + public Set expandDatafeedIds(String expression) { + return NameResolver.newUnaliased(datafeeds.keySet()) + .expand(expression); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java index fde28a84f8d2e..58c5e755091a4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java @@ -8,7 +8,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.NameResolver; import java.util.ArrayList; @@ -55,8 +54,8 @@ private void put(Job job) { } } - public Set expandJobIds(String expression, boolean allowNoJobs) { - return new GroupOrJobResolver().expand(expression, allowNoJobs); + public Set expandJobIds(String expression) { + return new GroupOrJobResolver().expand(expression); } public boolean isGroupOrJob(String id) { @@ -66,7 +65,6 @@ public boolean isGroupOrJob(String id) { private class GroupOrJobResolver extends NameResolver { private GroupOrJobResolver() { - super(ExceptionsHelper::missingJobException); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/NameResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/NameResolver.java index f737a3d9ad7d0..6ea80c8e8689c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/NameResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/NameResolver.java @@ -5,18 +5,15 @@ */ package org.elasticsearch.xpack.core.ml.utils; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.regex.Regex; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -25,12 +22,6 @@ */ public abstract class NameResolver { - private final Function notFoundExceptionSupplier; - - protected NameResolver(Function notFoundExceptionSupplier) { - this.notFoundExceptionSupplier = Objects.requireNonNull(notFoundExceptionSupplier); - } - /** * Expands an expression into the set of matching names. * For example, given a set of names ["foo-1", "foo-2", "bar-1", bar-2"], @@ -46,12 +37,9 @@ protected NameResolver(Function notFoundExcep * * * @param expression the expression to resolve - * @param allowNoMatch if {@code false}, an error is thrown when no name matches the {@code expression}. - * This only applies to wild card expressions, if {@code expression} is not a - * wildcard then setting this true will not suppress the exception * @return the sorted set of matching names */ - public SortedSet expand(String expression, boolean allowNoMatch) { + public SortedSet expand(String expression) { SortedSet result = new TreeSet<>(); if (MetaData.ALL.equals(expression) || Regex.isMatchAllPattern(expression)) { result.addAll(nameSet()); @@ -64,24 +52,13 @@ public SortedSet expand(String expression, boolean allowNoMatch) { .map(this::lookup) .flatMap(List::stream) .collect(Collectors.toList()); - if (expanded.isEmpty() && allowNoMatch == false) { - throw notFoundExceptionSupplier.apply(token); - } result.addAll(expanded); } else { List matchingNames = lookup(token); - // allowNoMatch only applies to wildcard expressions, - // this isn't so don't check the allowNoMatch here - if (matchingNames.isEmpty()) { - throw notFoundExceptionSupplier.apply(token); - } result.addAll(matchingNames); } } } - if (result.isEmpty() && allowNoMatch == false) { - throw notFoundExceptionSupplier.apply(expression); - } return result; } @@ -105,11 +82,10 @@ public SortedSet expand(String expression, boolean allowNoMatch) { /** * Creates a {@code NameResolver} that has no aliases * @param nameSet the set of all names - * @param notFoundExceptionSupplier a supplier of {@link ResourceNotFoundException} to be used when an expression matches no name * @return the unaliased {@code NameResolver} */ - public static NameResolver newUnaliased(Set nameSet, Function notFoundExceptionSupplier) { - return new NameResolver(notFoundExceptionSupplier) { + public static NameResolver newUnaliased(Set nameSet) { + return new NameResolver() { @Override protected Set keys() { return nameSet; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java index 8543f02cec56c..eb1e83f6d6ae1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java @@ -6,46 +6,34 @@ package org.elasticsearch.xpack.core.ml.job.groups; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.contains; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class GroupOrJobLookupTests extends ESTestCase { - public void testEmptyLookup_GivenAllowNoJobs() { - GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList()); - - assertThat(lookup.expandJobIds("_all", true).isEmpty(), is(true)); - assertThat(lookup.expandJobIds("*", true).isEmpty(), is(true)); - assertThat(lookup.expandJobIds("foo*", true).isEmpty(), is(true)); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo", true)); - } - - public void testEmptyLookup_GivenNotAllowNoJobs() { + public void testEmptyLookup() { GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList()); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("_all", false)); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("*", false)); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo*", false)); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo", true)); + assertThat(lookup.expandJobIds("_all").isEmpty(), is(true)); + assertThat(lookup.expandJobIds("*").isEmpty(), is(true)); + assertThat(lookup.expandJobIds("foo*").isEmpty(), is(true)); + assertThat(lookup.expandJobIds("foo").isEmpty(), is(true)); } public void testAllIsNotExpandedInCommaSeparatedExpression() { GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList()); - ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo-*,_all", true)); - assertThat(e.getMessage(), equalTo("No known job with id '_all'")); + assertThat(lookup.expandJobIds("foo*,_all").isEmpty(), is(true)); } public void testConstructor_GivenJobWithSameIdAsPreviousGroupName() { @@ -75,19 +63,19 @@ public void testLookup() { jobs.add(mockJob("nogroup", Collections.emptyList())); GroupOrJobLookup groupOrJobLookup = new GroupOrJobLookup(jobs); - assertThat(groupOrJobLookup.expandJobIds("_all", false), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup")); - assertThat(groupOrJobLookup.expandJobIds("*", false), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup")); - assertThat(groupOrJobLookup.expandJobIds("bar-1", false), contains("bar-1")); - assertThat(groupOrJobLookup.expandJobIds("foo-1", false), contains("foo-1")); - assertThat(groupOrJobLookup.expandJobIds("foo-2, bar-1", false), contains("bar-1", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("foo-group", false), contains("foo-1", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("bar-group", false), contains("bar-1", "bar-2")); - assertThat(groupOrJobLookup.expandJobIds("ones", false), contains("bar-1", "foo-1")); - assertThat(groupOrJobLookup.expandJobIds("twos", false), contains("bar-2", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("foo-group, nogroup", false), contains("foo-1", "foo-2", "nogroup")); - assertThat(groupOrJobLookup.expandJobIds("*-group", false), contains("bar-1", "bar-2", "foo-1", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("foo-group,foo-1,foo-2", false), contains("foo-1", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("foo-group,*-2", false), contains("bar-2", "foo-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("_all"), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup")); + assertThat(groupOrJobLookup.expandJobIds("*"), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup")); + assertThat(groupOrJobLookup.expandJobIds("bar-1"), contains("bar-1")); + assertThat(groupOrJobLookup.expandJobIds("foo-1"), contains("foo-1")); + assertThat(groupOrJobLookup.expandJobIds("foo-2, bar-1"), contains("bar-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("foo-group"), contains("foo-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("bar-group"), contains("bar-1", "bar-2")); + assertThat(groupOrJobLookup.expandJobIds("ones"), contains("bar-1", "foo-1")); + assertThat(groupOrJobLookup.expandJobIds("twos"), contains("bar-2", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("foo-group, nogroup"), contains("foo-1", "foo-2", "nogroup")); + assertThat(groupOrJobLookup.expandJobIds("*-group"), contains("bar-1", "bar-2", "foo-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("foo-group,foo-1,foo-2"), contains("foo-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("foo-group,*-2"), contains("bar-2", "foo-1", "foo-2")); } public void testIsGroupOrJob() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index c7ca2ff805eba..cb43afed94280 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -27,7 +28,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTests; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; import java.util.Collections; @@ -35,8 +35,8 @@ import java.util.HashMap; import java.util.Map; -import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT; +import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; @@ -397,10 +397,10 @@ public void testRemoveDatafeed_failBecauseDatafeedStarted() { public void testExpandJobIds() { MlMetadata mlMetadata = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2").build(); - assertThat(mlMetadata.expandJobIds("_all", false), contains("bar-1", "foo-1", "foo-2")); - assertThat(mlMetadata.expandJobIds("*", false), contains("bar-1", "foo-1", "foo-2")); - assertThat(mlMetadata.expandJobIds("foo-*", false), contains("foo-1", "foo-2")); - assertThat(mlMetadata.expandJobIds("foo-1,bar-*", false), contains("bar-1", "foo-1")); + assertThat(mlMetadata.expandJobIds("_all"), contains("bar-1", "foo-1", "foo-2")); + assertThat(mlMetadata.expandJobIds("*"), contains("bar-1", "foo-1", "foo-2")); + assertThat(mlMetadata.expandJobIds("foo-*"), contains("foo-1", "foo-2")); + assertThat(mlMetadata.expandJobIds("foo-1,bar-*"), contains("bar-1", "foo-1")); } public void testExpandDatafeedIds() { @@ -411,10 +411,10 @@ public void testExpandDatafeedIds() { MlMetadata mlMetadata = mlMetadataBuilder.build(); - assertThat(mlMetadata.expandDatafeedIds("_all", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); - assertThat(mlMetadata.expandDatafeedIds("*", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); - assertThat(mlMetadata.expandDatafeedIds("foo-*", false), contains("foo-1-feed", "foo-2-feed")); - assertThat(mlMetadata.expandDatafeedIds("foo-1-feed,bar-1*", false), contains("bar-1-feed", "foo-1-feed")); + assertThat(mlMetadata.expandDatafeedIds("_all"), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); + assertThat(mlMetadata.expandDatafeedIds("*"), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); + assertThat(mlMetadata.expandDatafeedIds("foo-*"), contains("foo-1-feed", "foo-2-feed")); + assertThat(mlMetadata.expandDatafeedIds("foo-1-feed,bar-1*"), contains("bar-1-feed", "foo-1-feed")); } private static MlMetadata.Builder newMlMetadataWithJobs(String... jobIds) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NameResolverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NameResolverTests.java index 9f4bcc13cbd04..93507d3583e2c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NameResolverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NameResolverTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.utils; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.utils.NameResolver; @@ -18,7 +17,6 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; @@ -27,45 +25,36 @@ public class NameResolverTests extends ESTestCase { public void testNoMatchingNames() { - ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, - () -> newUnaliasedResolver().expand("foo", false)); - assertThat(e.getMessage(), equalTo("foo")); + assertThat(newUnaliasedResolver().expand("foo").isEmpty(), is(true)); } - public void testNoMatchingNames_GivenPatternAndAllowNoMatch() { - assertThat(newUnaliasedResolver().expand("foo*", true).isEmpty(), is(true)); + public void testNoMatchingNames_GivenPattern() { + assertThat(newUnaliasedResolver().expand("foo*").isEmpty(), is(true)); } - public void testNoMatchingNames_GivenPatternAndNotAllowNoMatch() { - ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, - () -> newUnaliasedResolver().expand("foo*", false)); - assertThat(e.getMessage(), equalTo("foo*")); - } - - public void testNoMatchingNames_GivenMatchingNameAndNonMatchingPatternAndNotAllowNoMatch() { - ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, - () -> newUnaliasedResolver("foo").expand("foo, bar*", false)); - assertThat(e.getMessage(), equalTo("bar*")); + public void testNoMatchingNames_GivenMatchingNameAndNonMatchingPattern() { + NameResolver nameResolver = newUnaliasedResolver("foo"); + assertThat(nameResolver.expand("foo,bar*"), equalTo(newSortedSet("foo"))); } public void testUnaliased() { NameResolver nameResolver = newUnaliasedResolver("foo-1", "foo-2", "bar-1", "bar-2"); - assertThat(nameResolver.expand("foo-1", false), equalTo(newSortedSet("foo-1"))); - assertThat(nameResolver.expand("foo-2", false), equalTo(newSortedSet("foo-2"))); - assertThat(nameResolver.expand("bar-1", false), equalTo(newSortedSet("bar-1"))); - assertThat(nameResolver.expand("bar-2", false), equalTo(newSortedSet("bar-2"))); - assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-*", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("bar-*", false), equalTo(newSortedSet("bar-1", "bar-2"))); - assertThat(nameResolver.expand("*oo-*", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("*-1", false), equalTo(newSortedSet("foo-1", "bar-1"))); - assertThat(nameResolver.expand("*-2", false), equalTo(newSortedSet("foo-2", "bar-2"))); - assertThat(nameResolver.expand("*", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); - assertThat(nameResolver.expand("_all", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); - assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-1,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1"))); - assertThat(nameResolver.expand("foo-*,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-1"), equalTo(newSortedSet("foo-1"))); + assertThat(nameResolver.expand("foo-2"), equalTo(newSortedSet("foo-2"))); + assertThat(nameResolver.expand("bar-1"), equalTo(newSortedSet("bar-1"))); + assertThat(nameResolver.expand("bar-2"), equalTo(newSortedSet("bar-2"))); + assertThat(nameResolver.expand("foo-1,foo-2"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-*"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("bar-*"), equalTo(newSortedSet("bar-1", "bar-2"))); + assertThat(nameResolver.expand("*oo-*"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("*-1"), equalTo(newSortedSet("foo-1", "bar-1"))); + assertThat(nameResolver.expand("*-2"), equalTo(newSortedSet("foo-2", "bar-2"))); + assertThat(nameResolver.expand("*"), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); + assertThat(nameResolver.expand("_all"), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); + assertThat(nameResolver.expand("foo-1,foo-2"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-1,bar-1"), equalTo(newSortedSet("bar-1", "foo-1"))); + assertThat(nameResolver.expand("foo-*,bar-1"), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); } public void testAliased() { @@ -79,33 +68,33 @@ public void testAliased() { NameResolver nameResolver = new TestAliasNameResolver(namesAndAliasesMap); // First try same set of assertions as unaliases - assertThat(nameResolver.expand("foo-1", false), equalTo(newSortedSet("foo-1"))); - assertThat(nameResolver.expand("foo-2", false), equalTo(newSortedSet("foo-2"))); - assertThat(nameResolver.expand("bar-1", false), equalTo(newSortedSet("bar-1"))); - assertThat(nameResolver.expand("bar-2", false), equalTo(newSortedSet("bar-2"))); - assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-*", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("bar-*", false), equalTo(newSortedSet("bar-1", "bar-2"))); - assertThat(nameResolver.expand("*oo-*", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("*-1", false), equalTo(newSortedSet("foo-1", "bar-1"))); - assertThat(nameResolver.expand("*-2", false), equalTo(newSortedSet("foo-2", "bar-2"))); - assertThat(nameResolver.expand("*", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); - assertThat(nameResolver.expand("_all", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); - assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-1,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1"))); - assertThat(nameResolver.expand("foo-*,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-1"), equalTo(newSortedSet("foo-1"))); + assertThat(nameResolver.expand("foo-2"), equalTo(newSortedSet("foo-2"))); + assertThat(nameResolver.expand("bar-1"), equalTo(newSortedSet("bar-1"))); + assertThat(nameResolver.expand("bar-2"), equalTo(newSortedSet("bar-2"))); + assertThat(nameResolver.expand("foo-1,foo-2"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-*"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("bar-*"), equalTo(newSortedSet("bar-1", "bar-2"))); + assertThat(nameResolver.expand("*oo-*"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("*-1"), equalTo(newSortedSet("foo-1", "bar-1"))); + assertThat(nameResolver.expand("*-2"), equalTo(newSortedSet("foo-2", "bar-2"))); + assertThat(nameResolver.expand("*"), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); + assertThat(nameResolver.expand("_all"), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); + assertThat(nameResolver.expand("foo-1,foo-2"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-1,bar-1"), equalTo(newSortedSet("bar-1", "foo-1"))); + assertThat(nameResolver.expand("foo-*,bar-1"), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); // No let's test the aliases - assertThat(nameResolver.expand("foo-group", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("bar-group", false), equalTo(newSortedSet("bar-1", "bar-2"))); - assertThat(nameResolver.expand("foo-group,bar-group", false), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-group,foo-1", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-group,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-group,bar-*", false), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-group"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("bar-group"), equalTo(newSortedSet("bar-1", "bar-2"))); + assertThat(nameResolver.expand("foo-group,bar-group"), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-group,foo-1"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-group,bar-1"), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-group,bar-*"), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2"))); } private static NameResolver newUnaliasedResolver(String... names) { - return NameResolver.newUnaliased(new HashSet<>(Arrays.asList(names)), notFoundExceptionSupplier()); + return NameResolver.newUnaliased(new HashSet<>(Arrays.asList(names))); } private static SortedSet newSortedSet(String... names) { @@ -115,17 +104,12 @@ private static SortedSet newSortedSet(String... names) { } return result; } - - private static Function notFoundExceptionSupplier() { - return s -> new ResourceNotFoundException(s); - } - + private static class TestAliasNameResolver extends NameResolver { private final Map> lookup; TestAliasNameResolver(Map> lookup) { - super(notFoundExceptionSupplier()); this.lookup = lookup; } From 32760962dda630a2c4a2fc9aa8ae5758dedc0d42 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 2 Nov 2018 09:29:21 +0000 Subject: [PATCH 03/10] Create DatafeedConfigReader for reading index and CS config Plus tests and fixes to expandJobs --- .../action/TransportGetDatafeedsAction.java | 15 +- .../TransportGetDatafeedsStatsAction.java | 14 +- .../ml/datafeed/DatafeedConfigReader.java | 130 ++++++++++++++++++ .../persistence/DatafeedConfigProvider.java | 98 +++++++++++-- .../xpack/ml/job/JobManager.java | 89 +++++------- .../ml/job/persistence/JobConfigProvider.java | 109 +++++++++++++-- .../datafeed/DatafeedConfigReaderTests.java | 117 ++++++++++++++++ .../integration/DatafeedConfigProviderIT.java | 31 ++++- .../ml/integration/JobConfigProviderIT.java | 44 ++++++ .../xpack/ml/job/JobManagerTests.java | 108 ++++++++++++--- 10 files changed, 642 insertions(+), 113 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java index a848634b30e90..fa66dc5711f53 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java @@ -98,17 +98,12 @@ Map expandClusterStateDatafeeds(String datafeedExpressio ClusterState clusterState) { Map configById = new HashMap<>(); - try { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoDatafeeds); - - for (String expandedDatafeedId : expandedDatafeedIds) { - configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); - } - } catch (Exception e){ - // ignore - } + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression); + for (String expandedDatafeedId : expandedDatafeedIds) { + configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); + } return configById; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index ae1b42d2e3083..4123767da951e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -16,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -24,7 +26,7 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import java.util.List; import java.util.stream.Collectors; @@ -32,17 +34,17 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction { - private final DatafeedConfigProvider datafeedConfigProvider; + private final DatafeedConfigReader datafeedConfigReader; @Inject public TransportGetDatafeedsStatsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - DatafeedConfigProvider datafeedConfigProvider) { + Client client,NamedXContentRegistry xContentRegistry) { super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new); - this.datafeedConfigProvider = datafeedConfigProvider; + this.datafeedConfigReader = new DatafeedConfigReader(client, settings, xContentRegistry); } @Override @@ -57,10 +59,10 @@ protected GetDatafeedsStatsAction.Response newResponse() { @Override protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); - datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + datafeedConfigReader.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), state, ActionListener.wrap( expandedDatafeedIds -> { PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); List results = expandedDatafeedIds.stream() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java new file mode 100644 index 0000000000000..c4efd4dec1c7a --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.stream.Collectors; + +/** + * This class abstracts away reading datafeed configuration from either + * the cluster state or index documents. + */ +public class DatafeedConfigReader { + + private final DatafeedConfigProvider datafeedConfigProvider; + + public DatafeedConfigReader(Client client, Settings settings, NamedXContentRegistry xContentRegistry) { + this.datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); + } + + public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) { + this.datafeedConfigProvider = datafeedConfigProvider; + } + + /** + * Merges the results of {@Link MlMetadata#expandDatafeedIds} + * and {@link DatafeedConfigProvider#expandDatafeedIds(String, boolean, ActionListener)} + */ + public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ClusterState clusterState, + ActionListener> listener) { + + Set clusterStateDatafeedIds = MlMetadata.getMlMetadata(clusterState).expandDatafeedIds(expression); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); + requiredMatches.filterMatchedIds(clusterStateDatafeedIds); + + datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck(expression, ActionListener.wrap( + expandedDatafeedIds -> { + // Check for duplicate Ids + expandedDatafeedIds.forEach(id -> { + if (clusterStateDatafeedIds.contains(id)) { + listener.onFailure(new IllegalStateException("Datafeed [" + id + "] configuration " + + "exists in both clusterstate and index")); + return; + } + }); + + requiredMatches.filterMatchedIds(expandedDatafeedIds); + + if (requiredMatches.hasUnmatchedIds()) { + listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); + } else { + expandedDatafeedIds.addAll(clusterStateDatafeedIds); + listener.onResponse(expandedDatafeedIds); + } + }, + listener::onFailure + )); + } + + /** + * Merges the results of {@Link MlMetadata#expandDatafeedIds} + * and {@link DatafeedConfigProvider#expandDatafeedConfigs(String, boolean, ActionListener)} + */ + public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ClusterState clusterState, + ActionListener> listener) { + + Map clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState); + + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); + requiredMatches.filterMatchedIds(clusterStateConfigs.keySet()); + + datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck(expression, ActionListener.wrap( + datafeedBuilders -> { + // Check for duplicate Ids + datafeedBuilders.forEach(datafeedBuilder -> { + if (clusterStateConfigs.containsKey(datafeedBuilder.getId())) { + listener.onFailure(new IllegalStateException("Datafeed [" + datafeedBuilder.getId() + "] configuration " + + "exists in both clusterstate and index")); + return; + } + }); + + List datafeedConfigs = new ArrayList<>(); + for (DatafeedConfig.Builder builder : datafeedBuilders) { + datafeedConfigs.add(builder.build()); + } + + requiredMatches.filterMatchedIds(datafeedConfigs.stream().map(DatafeedConfig::getId).collect(Collectors.toList())); + + if (requiredMatches.hasUnmatchedIds()) { + listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); + } else { + datafeedConfigs.addAll(clusterStateConfigs.values()); + listener.onResponse(datafeedConfigs); + } + }, + listener::onFailure + )); + } + + private Map expandClusterStateDatafeeds(String datafeedExpression, ClusterState clusterState) { + + Map configById = new HashMap<>(); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression); + + for (String expandedDatafeedId : expandedDatafeedIds) { + configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); + } + return configById; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index cc4131e7c65d9..2972b27dda789 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -351,17 +351,9 @@ private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, Acti * @param listener The expanded datafeed IDs listener */ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener> listener) { - String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); - sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); - sourceBuilder.fetchSource(false); - sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + SearchRequest searchRequest = buildExpandDatafeedIdsSearch(expression); - SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setSource(sourceBuilder).request(); - - ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.wrap( @@ -386,6 +378,45 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio } + /** + * Similar to {@link #expandDatafeedIds(String, boolean, ActionListener)} but no error + * is generated if there are missing Ids. Whatever Ids match will be returned. + * + * This method is only for use when combining datafeed Ids from multiple sources, its usage + * should be limited. + * + * @param expression the expression to resolve + * @param listener The expanded datafeed IDs listener + */ + public void expandDatafeedIdsWithoutMissingCheck(String expression, ActionListener> listener) { + SearchRequest searchRequest = buildExpandDatafeedIdsSearch(expression); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + SortedSet datafeedIds = new TreeSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue()); + } + listener.onResponse(datafeedIds); + }, + listener::onFailure) + , client::search); + } + + private SearchRequest buildExpandDatafeedIdsSearch(String expression) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); + sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); + sourceBuilder.fetchSource(false); + sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + + return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + } + /** * The same logic as {@link #expandDatafeedIds(String, boolean, ActionListener)} but * the full datafeed configuration is returned. @@ -398,7 +429,7 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio * wildcard then setting this true will not suppress the exception * @param listener The expanded datafeed config listener */ - // NORELEASE datafeed configs should be paged or have a mechanism to return all jobs if there are many of them + // NORELEASE datafeed configs should be paged or have a mechanism to return all configs if there are many of them public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); @@ -442,6 +473,51 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A } + /** + * The same logic as {@link #expandDatafeedIdsWithoutMissingCheck(String, ActionListener)} + * but the full datafeed configuration is returned. + * + * This method is only for use when combining datafeeds from multiple sources, its usage + * should be limited. + * + * @param expression the expression to resolve + * @param listener The expanded datafeed config listener + */ + // NORELEASE datafeed configs should be paged or have a mechanism to return all configs if there are many of them + public void expandDatafeedConfigsWithoutMissingCheck(String expression, ActionListener> listener) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); + sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + List datafeeds = new ArrayList<>(); + Set datafeedIds = new HashSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + try { + BytesReference source = hit.getSourceRef(); + DatafeedConfig.Builder datafeed = parseLenientlyFromSource(source); + datafeeds.add(datafeed); + datafeedIds.add(datafeed.getId()); + } catch (IOException e) { + // TODO A better way to handle this rather than just ignoring the error? + logger.error("Error parsing datafeed configuration [" + hit.getId() + "]", e); + } + } + listener.onResponse(datafeeds); + }, + listener::onFailure) + , client::search); + + } + + private QueryBuilder buildDatafeedIdQuery(String [] tokens) { QueryBuilder datafeedQuery = new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE); if (Strings.isAllOrWildcard(tokens)) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index ee05016fad28e..78a788b138a19 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -72,6 +72,7 @@ import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Allows interactions with jobs. The managed interactions include: @@ -175,11 +176,11 @@ private void getJobFromClusterState(String jobId, ActionListener jobListene * @param jobsListener The jobs listener */ public void expandJobs(String expression, boolean allowNoJobs, ActionListener> jobsListener) { - Map clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state()); - ExpandedIdsMatcher idsMatcher = new ExpandedIdsMatcher(expression, allowNoJobs); - idsMatcher.filterMatchedIds(clusterStateJobs.keySet()); + Map clusterStateJobs = expandJobsFromClusterState(expression, clusterService.state()); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); + requiredMatches.filterMatchedIds(clusterStateJobs.keySet()); - jobConfigProvider.expandJobs(expression, allowNoJobs, false, ActionListener.wrap( + jobConfigProvider.expandJobsWithoutMissingcheck(expression, false, ActionListener.wrap( jobBuilders -> { // Check for duplicate jobs for (Job.Builder jb : jobBuilders) { @@ -196,35 +197,26 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener(jobs, jobs.size(), Job.RESULTS_FIELD)); - }, - e -> { - if (e instanceof ResourceNotFoundException) { - // Check if we found the jobs that satisfy expression in the clusterstate - if (idsMatcher.hasUnmatchedIds() == false) { - List jobs = new ArrayList<>(clusterStateJobs.values()); - Collections.sort(jobs, Comparator.comparing(Job::getId)); - jobsListener.onResponse(new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD)); - return; - } + requiredMatches.filterMatchedIds(jobs.stream().map(Job::getId).collect(Collectors.toList())); + + if (requiredMatches.hasUnmatchedIds()) { + jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); + } else { + jobs.addAll(clusterStateJobs.values()); + Collections.sort(jobs, Comparator.comparing(Job::getId)); + jobsListener.onResponse(new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD)); } - jobsListener.onFailure(e); - } + }, + jobsListener::onFailure )); } - private Map expandJobsFromClusterState(String expression, boolean allowNoJobs, ClusterState clusterState) { + private Map expandJobsFromClusterState(String expression, ClusterState clusterState) { Map jobIdToJob = new HashMap<>(); - try { - Set expandedJobIds = MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - for (String expandedJobId : expandedJobIds) { - jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); - } - } catch (Exception e) { - // ignore exception thrown if no job Id matches expression + Set expandedJobIds = MlMetadata.getMlMetadata(clusterState).expandJobIds(expression); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + for (String expandedJobId : expandedJobIds) { + jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); } return jobIdToJob; } @@ -237,11 +229,11 @@ private Map expandJobsFromClusterState(String expression, boolean a * @param jobsListener The jobs listener */ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener> jobsListener) { - Set clusterStateJobIds = expandJobIdsFromClusterState(expression, allowNoJobs, clusterService.state()); - ExpandedIdsMatcher idsMatcher = new ExpandedIdsMatcher(expression, allowNoJobs); - idsMatcher.filterMatchedIds(clusterStateJobIds); + Set clusterStateJobIds = MlMetadata.getMlMetadata(clusterService.state()).expandJobIds(expression); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); + requiredMatches.filterMatchedIds(clusterStateJobIds); - jobConfigProvider.expandJobsIds(expression, allowNoJobs, false, ActionListener.wrap( + jobConfigProvider.expandJobsIdsWithoutMissingCheck(expression, false, ActionListener.wrap( jobIds -> { // Check for duplicate job Ids jobIds.forEach(id -> { @@ -252,31 +244,18 @@ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener< } }); - jobIds.addAll(clusterStateJobIds); - jobsListener.onResponse(jobIds); - }, - e -> { - if (e instanceof ResourceNotFoundException) { - // Check if we found the job Ids that satisfy expression in the clusterstate - if (idsMatcher.hasUnmatchedIds() == false) { - jobsListener.onResponse(new TreeSet<>(clusterStateJobIds)); - return; - } + requiredMatches.filterMatchedIds(jobIds); + if (requiredMatches.hasUnmatchedIds()) { + jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); + } else { + jobIds.addAll(clusterStateJobIds); + jobsListener.onResponse(new TreeSet<>(jobIds)); } - jobsListener.onFailure(e); - } + }, + jobsListener::onFailure )); } - private Set expandJobIdsFromClusterState(String expression, boolean allowNoJobs, ClusterState clusterState) { - try { - return MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs); - } catch (Exception e) { - // ignore exception thrown if no job Id matches expression - } - return Collections.emptySet(); - } - /** * Validate the char filter/tokenizer/token filter names used in the categorization analyzer config (if any). * This validation has to be done server-side; it cannot be done in a client as that won't have loaded the @@ -588,8 +567,8 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap( expandedIds -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - // Merge the expended group members with the request Ids. - // Ids that aren't jobs will be filtered by isJobOpen() + // Merge the expended group members with the request Ids which + // which are job ids rather than group Ids. expandedIds.addAll(calendarJobIds); for (String jobId : expandedIds) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index e68c3a689c4bc..e48d322478220 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -537,17 +537,9 @@ public void markJobAsDeleting(String jobId, ActionListener listener) { * @param listener The expanded job Ids listener */ public void expandJobsIds(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener> listener) { - String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); - sourceBuilder.sort(Job.ID.getPreferredName()); - sourceBuilder.fetchSource(false); - sourceBuilder.docValueField(Job.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); - sourceBuilder.docValueField(Job.GROUPS.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); - - SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setSource(sourceBuilder).request(); + SearchRequest searchRequest = makeExpandIdsSearchRequest(expression, excludeDeleting); + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, @@ -579,6 +571,55 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud } + /** + * Similar to {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but no error + * is generated if there are missing Ids. Whatever Ids match will be returned. + * + * This method is only for use when combining jobs Ids from multiple sources, its usage + * should be limited. + * + * @param expression the expression to resolve + * @param excludeDeleting If true exclude jobs marked as deleting + * @param listener The expanded job Ids listener + */ + public void expandJobsIdsWithoutMissingCheck(String expression, boolean excludeDeleting, ActionListener> listener) { + + SearchRequest searchRequest = makeExpandIdsSearchRequest(expression, excludeDeleting); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + SortedSet jobIds = new TreeSet<>(); + SortedSet groupsIds = new TreeSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + jobIds.add(hit.field(Job.ID.getPreferredName()).getValue()); + List groups = hit.field(Job.GROUPS.getPreferredName()).getValues(); + if (groups != null) { + groupsIds.addAll(groups.stream().map(Object::toString).collect(Collectors.toList())); + } + } + + groupsIds.addAll(jobIds); + listener.onResponse(jobIds); + }, + listener::onFailure) + , client::search); + + } + + private SearchRequest makeExpandIdsSearchRequest(String expression, boolean excludeDeleting) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); + sourceBuilder.sort(Job.ID.getPreferredName()); + sourceBuilder.fetchSource(false); + sourceBuilder.docValueField(Job.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + sourceBuilder.docValueField(Job.GROUPS.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + + return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + } + /** * The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but * the full anomaly detector job configuration is returned. @@ -638,6 +679,54 @@ public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDe } + /** + * The same logic as {@link #expandJobsIdsWithoutMissingCheck(String, boolean, ActionListener)} + * but the full anomaly detector job configuration is returned. + * + * This method is only for use when combining jobs from multiple sources, its usage + * should be limited. + * + * @param expression the expression to resolve + * @param excludeDeleting If true exclude jobs marked as deleting + * @param listener The expanded jobs listener + */ + // NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them + public void expandJobsWithoutMissingcheck(String expression, boolean excludeDeleting, ActionListener> listener) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); + sourceBuilder.sort(Job.ID.getPreferredName()); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + List jobs = new ArrayList<>(); + Set jobAndGroupIds = new HashSet<>(); + + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + try { + BytesReference source = hit.getSourceRef(); + Job.Builder job = parseJobLenientlyFromSource(source); + jobs.add(job); + jobAndGroupIds.add(job.getId()); + jobAndGroupIds.addAll(job.getGroups()); + } catch (IOException e) { + // TODO A better way to handle this rather than just ignoring the error? + logger.error("Error parsing anomaly detector job configuration [" + hit.getId() + "]", e); + } + } + + listener.onResponse(jobs); + }, + listener::onFailure) + , client::search); + + } + /** * Expands the list of job group Ids to the set of jobs which are members of the groups. * Unlike {@link #expandJobsIds(String, boolean, boolean, ActionListener)} it is not an error diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java new file mode 100644 index 0000000000000..cac6bc12579ca --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; + +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; +import static org.hamcrest.Matchers.contains; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class DatafeedConfigReaderTests extends ESTestCase { + + private final String JOB_ID = "foo"; + + private void mockProviderWithExpectedIds(DatafeedConfigProvider mockedProvider, String expression, SortedSet datafeedIds) { + doAnswer(invocationOnMock -> { + ActionListener> listener = (ActionListener>) invocationOnMock.getArguments()[1]; + listener.onResponse(datafeedIds); + return null; + }).when(mockedProvider).expandDatafeedIdsWithoutMissingCheck(eq(expression), any()); + } + + public void testExpandDatafeedIds_SplitBetweenClusterStateAndIndex() { + SortedSet idsInIndex = new TreeSet<>(); + idsInIndex.add("index-df"); + DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); + mockProviderWithExpectedIds(provider, "cs-df,index-df", idsInIndex); + + ClusterState clusterState = buildClusterStateWithJob(Collections.singletonList(createDatafeedConfig("cs-df", JOB_ID))); + + DatafeedConfigReader reader = new DatafeedConfigReader(provider); + + AtomicReference> idsHolder = new AtomicReference<>(); + reader.expandDatafeedIds("cs-df,index-df", true, clusterState, ActionListener.wrap( + idsHolder::set, + e -> fail(e.getMessage()) + )); + assertNotNull(idsHolder.get()); + assertThat(idsHolder.get(), contains("cs-df", "index-df")); + + mockProviderWithExpectedIds(provider, "cs-df", new TreeSet<>()); + reader.expandDatafeedIds("cs-df", true, clusterState, ActionListener.wrap( + idsHolder::set, + e -> assertNull(e) + )); + assertThat(idsHolder.get(), contains("cs-df")); + + idsInIndex.clear(); + idsInIndex.add("index-df"); + mockProviderWithExpectedIds(provider, "index-df", idsInIndex); + reader.expandDatafeedIds("index-df", true, clusterState, ActionListener.wrap( + idsHolder::set, + e -> assertNull(e) + )); + assertThat(idsHolder.get(), contains("index-df")); + + } + + public void testExpandDatafeedIds_GivenAll() { + SortedSet idsInIndex = new TreeSet<>(); + idsInIndex.add("df1"); + idsInIndex.add("df2"); + DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); + mockProviderWithExpectedIds(provider, "_all", idsInIndex); + + ClusterState clusterState = buildClusterStateWithJob(Collections.singletonList(createDatafeedConfig("df3", JOB_ID))); + + DatafeedConfigReader reader = new DatafeedConfigReader(provider); + + AtomicReference> idsHolder = new AtomicReference<>(); + reader.expandDatafeedIds("_all", true, clusterState, ActionListener.wrap( + idsHolder::set, + e -> fail(e.getMessage()) + )); + + assertNotNull(idsHolder.get()); + assertThat(idsHolder.get(), contains("df1", "df2", "df3")); + } + + private ClusterState buildClusterStateWithJob(List datafeeds) { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder(JOB_ID).build(), false); + for (DatafeedConfig df : datafeeds) { + mlMetadata.putDatafeed(df, Collections.emptyMap()); + } + + return ClusterState.builder(new ClusterName("datafeedconfigreadertests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + } + + private DatafeedConfig createDatafeedConfig(String id, String jobId) { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder(id, jobId); + builder.setIndices(Collections.singletonList("beats*")); + return builder.build(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 9496f4ca0d8f2..19f772709ead8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -245,7 +245,7 @@ public void testExpandDatafeeds() throws Exception { client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); - // Test job IDs only + // Test IDs only SortedSet expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); @@ -262,7 +262,7 @@ public void testExpandDatafeeds() throws Exception { expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1", "foo-2")), expandedIds); - // Test full job config + // Test full config List expandedDatafeedBuilders = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, actionListener)); List expandedDatafeeds = @@ -290,6 +290,33 @@ public void testExpandDatafeeds() throws Exception { assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2)); } + public void testExpandDatafeedsWithoutMissingCheck() throws Exception { + DatafeedConfig foo1 = putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap()); + putDatafeedConfig(createDatafeedConfig("bar-1", "j3"), Collections.emptyMap()); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + // Test IDs only + SortedSet expandedIds = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck("tim", actionListener)); + assertThat(expandedIds, empty()); + + expandedIds = blockingCall(actionListener -> + datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck("foo-1,dave", actionListener)); + assertThat(expandedIds, contains("foo-1")); + + // Test full config + List expandedDatafeedBuilders = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck("tim", actionListener)); + assertThat(expandedDatafeedBuilders, empty()); + + expandedDatafeedBuilders = blockingCall(actionListener -> + datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck("foo*,dave", actionListener)); + List expandedDatafeeds = + expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()); + assertThat(expandedDatafeeds, contains(foo1)); + } + public void testFindDatafeedsForJobIds() throws Exception { putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap()); putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index bd6e2df8ad0a6..a27c4a06b24c1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -437,6 +437,50 @@ public void testExpandJobIds_excludeDeleting() throws Exception { assertThat(expandedJobsBuilders, hasSize(3)); } + public void testExpandJobsIdsWithoutMissingCheck() throws Exception { + putJob(createJob("tom", null)); + putJob(createJob("dick", null)); + putJob(createJob("harry", Collections.singletonList("harry-group"))); + putJob(createJob("harry-jnr", Collections.singletonList("harry-group"))); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + SortedSet expandedIds = blockingCall(actionListener -> + jobConfigProvider.expandJobsIdsWithoutMissingCheck("dick,john", false, actionListener)); + assertEquals(new TreeSet<>(Collections.singletonList("dick")), expandedIds); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIdsWithoutMissingCheck("foo*", true, actionListener)); + assertThat(expandedIds, empty()); + + expandedIds = blockingCall(actionListener -> + jobConfigProvider.expandJobsIdsWithoutMissingCheck("harry-group,dave", false, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr")), expandedIds); + } + + public void testExpandJobsWithoutMissingCheck() throws Exception { + putJob(createJob("tom", null)); + putJob(createJob("dick", null)); + putJob(createJob("harry", Collections.singletonList("harry-group"))); + putJob(createJob("harry-jnr", Collections.singletonList("harry-group"))); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + List expandedJobsBuilders = + blockingCall(actionListener -> jobConfigProvider.expandJobsWithoutMissingcheck("dick,john", true, actionListener)); + List expandedJobIds = expandedJobsBuilders.stream().map(Job.Builder::build).map(Job::getId).collect(Collectors.toList()); + assertThat(expandedJobIds, contains("dick")); + + expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobsWithoutMissingcheck("foo*", true, actionListener)); + expandedJobIds = expandedJobsBuilders.stream().map(Job.Builder::build).map(Job::getId).collect(Collectors.toList()); + assertThat(expandedJobIds, empty()); + + expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobsWithoutMissingcheck("harry-group,dave", true, actionListener)); + expandedJobIds = expandedJobsBuilders.stream().map(Job.Builder::build).map(Job::getId).collect(Collectors.toList()); + assertThat(expandedJobIds, contains("harry", "harry-jnr")); + } + public void testExpandGroups() throws Exception { putJob(createJob("apples", Collections.singletonList("fruit"))); putJob(createJob("pears", Collections.singletonList("fruit"))); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 10e0d1b4830fc..2494970ab782c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -170,7 +170,7 @@ public void testGetJobFromClusterWhenNotInIndex() { assertEquals(clusterJob, jobHolder.get()); } - public void testExpandJobsFromClusterStateAndIndex() throws IOException { + public void testExpandJobsFromClusterStateAndIndex_GivenAll() throws IOException { Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); Job csJobFoo2 = buildJobBuilder("foo-cs-2").build(); Job csJobBar = buildJobBuilder("bar-cs").build(); @@ -192,7 +192,7 @@ public void testExpandJobsFromClusterStateAndIndex() throws IOException { Job.Builder indexJobFoo = buildJobBuilder("foo-index"); docsAsBytes.add(toBytesReference(indexJobFoo.build())); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -220,7 +220,41 @@ public void testExpandJobsFromClusterStateAndIndex() throws IOException { assertThat(jobIds, contains("foo-cs-1", "foo-cs-2", "foo-index")); } - public void testExpandJobs_GivenJobInClusterStateNotIndex() throws IOException { + public void testExpandJobs_SplitBetweenClusterStateAndIndex() throws IOException { + Job csJob = buildJobBuilder("cs-job").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJob, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + + List docsAsBytes = new ArrayList<>(); + + Job.Builder indexJob = buildJobBuilder("index-job"); + docsAsBytes.add(toBytesReference(indexJob.build())); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); + JobManager jobManager = createJobManager(mockClientBuilder.build()); + + AtomicReference> jobsHolder = new AtomicReference<>(); + jobManager.expandJobs("cs-job,index-job", true, ActionListener.wrap( + jobs -> jobsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobsHolder.get()); + assertThat(jobsHolder.get().results(), hasSize(2)); + List jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); + assertThat(jobIds, contains("cs-job", "index-job")); + } + + public void testExpandJobs_GivenJobInClusterStateNotIndex() { Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); Job csJobFoo2 = buildJobBuilder("foo-cs-2").build(); @@ -238,7 +272,7 @@ public void testExpandJobs_GivenJobInClusterStateNotIndex() throws IOException { List docsAsBytes = new ArrayList<>(); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -255,7 +289,7 @@ public void testExpandJobs_GivenJobInClusterStateNotIndex() throws IOException { assertThat(jobIds, contains("foo-cs-1", "foo-cs-2")); } - public void testExpandJobIdsFromClusterStateAndIndex() throws IOException { + public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); Job csJobFoo2 = buildJobBuilder("foo-cs-2").build(); Job csJobBar = buildJobBuilder("bar-cs").build(); @@ -271,17 +305,16 @@ public void testExpandJobIdsFromClusterStateAndIndex() throws IOException { .build(); when(clusterService.state()).thenReturn(clusterState); - - List> fieldHits = new ArrayList<>(); Map fieldMap = new HashMap<>(); fieldMap.put(Job.ID.getPreferredName(), new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("index-job"))); fieldMap.put(Job.GROUPS.getPreferredName(), new DocumentField(Job.ID.getPreferredName(), Collections.emptyList())); + List> fieldHits = new ArrayList<>(); fieldHits.add(fieldMap); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), fieldHits); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -296,7 +329,7 @@ public void testExpandJobIdsFromClusterStateAndIndex() throws IOException { assertThat(jobIdsHolder.get(), contains("bar-cs", "foo-cs-1", "foo-cs-2", "index-job")); } - public void testExpandJobIds_GivenJobInClusterStateNotInded() { + public void testExpandJobIds_GivenJobInClusterStateNotIndex() { Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); @@ -308,7 +341,7 @@ public void testExpandJobIds_GivenJobInClusterStateNotInded() { .build(); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), Collections.emptyList()); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -323,9 +356,46 @@ public void testExpandJobIds_GivenJobInClusterStateNotInded() { assertThat(jobIdsHolder.get(), contains("foo-cs-1")); } + public void testExpandJobIds_GivenConfigInIndexAndClusterState() { + Job csJobFoo1 = buildJobBuilder("cs-job").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + Map fieldMap = new HashMap<>(); + fieldMap.put(Job.ID.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("index-job"))); + fieldMap.put(Job.GROUPS.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.emptyList())); + + List> fieldHits = new ArrayList<>(); + fieldHits.add(fieldMap); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), fieldHits); + + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + jobManager.expandJobIds("index-job,cs-job", true, ActionListener.wrap( + jobs -> jobIdsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobIdsHolder.get()); + assertThat(jobIdsHolder.get(), hasSize(2)); + assertThat(jobIdsHolder.get(), contains("cs-job" ,"index-job")); + } + @SuppressWarnings("unchecked") public void testPutJob_AddsCreateTime() throws IOException { - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob()); @@ -364,7 +434,7 @@ public void onFailure(Exception e) { } public void testPutJob_ThrowsIfJobExistsInClusterState() throws IOException { - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob()); @@ -389,7 +459,7 @@ public void onFailure(Exception e) { public void testNotifyFilterChangedGivenNoop() { MlFilter filter = MlFilter.builder("my_filter").build(); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); jobManager.notifyFilterChanged(filter, Collections.emptySet(), Collections.emptySet(), ActionListener.wrap( @@ -441,7 +511,7 @@ public void testNotifyFilterChanged() throws IOException { return null; }).when(updateJobProcessNotifier).submitJobUpdate(any(), any()); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -492,7 +562,7 @@ public void testNotifyFilterChangedGivenOnlyAddedItems() throws IOException { .build(); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -528,7 +598,7 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { when(clusterService.state()).thenReturn(clusterState); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -544,7 +614,7 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { Mockito.verifyNoMoreInteractions(auditor, updateJobProcessNotifier); } - public void testUpdateProcessOnCalendarChanged() throws IOException { + public void testUpdateProcessOnCalendarChanged() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job-1", "node_id", JobState.OPENED, tasksBuilder); addJobTask("job-2", "node_id", JobState.OPENED, tasksBuilder); @@ -556,7 +626,7 @@ public void testUpdateProcessOnCalendarChanged() throws IOException { .build(); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); // For the JobConfigProvider expand groups search. // The search will not return any results mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), Collections.emptyList()); @@ -592,7 +662,7 @@ public void testUpdateProcessOnCalendarChanged_GivenGroups() throws IOException .build(); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); // For the JobConfigProvider expand groups search. // group-1 will expand to job-1 and job-2 List> fieldHits = new ArrayList<>(); From 95f9f4f8dc2b9631532b346dff3516d2fd97114c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 5 Nov 2018 18:30:39 +0000 Subject: [PATCH 04/10] Refactor calls to expand datafeed and job Ids to use single function. All calls should use the calls that also check the cluster state --- .../ml/action/TransportCloseJobAction.java | 16 ++--- .../action/TransportGetDatafeedsAction.java | 47 ++---------- .../action/TransportStopDatafeedAction.java | 4 +- .../xpack/ml/job/JobManager.java | 25 ++++++- .../action/TransportCloseJobActionTests.java | 18 +++-- .../xpack/ml/job/JobManagerTests.java | 71 +++++++++++++++++++ 6 files changed, 115 insertions(+), 66 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 7c809543d1198..4850ad71dc2ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -37,7 +36,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.io.IOException; @@ -52,28 +51,25 @@ public class TransportCloseJobAction extends TransportTasksAction { - private final Client client; private final ClusterService clusterService; private final Auditor auditor; private final PersistentTasksService persistentTasksService; - private final JobConfigProvider jobConfigProvider; private final DatafeedConfigProvider datafeedConfigProvider; + private final JobManager jobManager; @Inject public TransportCloseJobAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, Client client, Auditor auditor, - PersistentTasksService persistentTasksService, JobConfigProvider jobConfigProvider, - DatafeedConfigProvider datafeedConfigProvider) { + ClusterService clusterService, Auditor auditor, PersistentTasksService persistentTasksService, + DatafeedConfigProvider datafeedConfigProvider, JobManager jobManager) { // We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here: super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, CloseJobAction.Request::new, CloseJobAction.Response::new, ThreadPool.Names.SAME); - this.client = client; this.clusterService = clusterService; this.auditor = auditor; this.persistentTasksService = persistentTasksService; - this.jobConfigProvider = jobConfigProvider; this.datafeedConfigProvider = datafeedConfigProvider; + this.jobManager = jobManager; } @Override @@ -107,7 +103,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen */ PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap( + jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( expandedJobIds -> { validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap( response -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java index fa66dc5711f53..b5743701fb3d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java @@ -20,19 +20,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction { @@ -65,28 +57,10 @@ protected void masterOperation(GetDatafeedsAction.Request request, ClusterState ActionListener listener) { logger.debug("Get datafeed '{}'", request.getDatafeedId()); - Map clusterStateConfigs = - expandClusterStateDatafeeds(request.getDatafeedId(), request.allowNoDatafeeds(), state); - - datafeedConfigProvider.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( - datafeedBuilders -> { - // Check for duplicate datafeeds - for (DatafeedConfig.Builder datafeed : datafeedBuilders) { - if (clusterStateConfigs.containsKey(datafeed.getId())) { - listener.onFailure(new IllegalStateException("Datafeed [" + datafeed.getId() + "] configuration " + - "exists in both clusterstate and index")); - return; - } - } + DatafeedConfigReader datafeedConfigReader = new DatafeedConfigReader(datafeedConfigProvider); - // Merge cluster state and index configs - List datafeeds = new ArrayList<>(datafeedBuilders.size() + clusterStateConfigs.values().size()); - for (DatafeedConfig.Builder builder: datafeedBuilders) { - datafeeds.add(builder.build()); - } - - datafeeds.addAll(clusterStateConfigs.values()); - Collections.sort(datafeeds, Comparator.comparing(DatafeedConfig::getId)); + datafeedConfigReader.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoDatafeeds(), state, ActionListener.wrap( + datafeeds -> { listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeeds, datafeeds.size(), DatafeedConfig.RESULTS_FIELD))); }, @@ -94,19 +68,6 @@ protected void masterOperation(GetDatafeedsAction.Request request, ClusterState )); } - Map expandClusterStateDatafeeds(String datafeedExpression, boolean allowNoDatafeeds, - ClusterState clusterState) { - - Map configById = new HashMap<>(); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression); - - for (String expandedDatafeedId : expandedDatafeedIds) { - configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); - } - return configById; - } - @Override protected ClusterBlockException checkBlock(GetDatafeedsAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 4c5d6940bf45a..77910f21f67d1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import java.io.IOException; @@ -115,7 +116,8 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new)); } } else { - datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + DatafeedConfigReader datafeedConfigReader = new DatafeedConfigReader(datafeedConfigProvider); + datafeedConfigReader.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), state, ActionListener.wrap( expandedIds -> { PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 78a788b138a19..33b1dbcdd1e17 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -106,6 +106,13 @@ public class JobManager extends AbstractComponent { public JobManager(Environment environment, Settings settings, JobResultsProvider jobResultsProvider, ClusterService clusterService, Auditor auditor, ThreadPool threadPool, Client client, UpdateJobProcessNotifier updateJobProcessNotifier) { + this(environment, settings, jobResultsProvider, clusterService, auditor, threadPool, client, + updateJobProcessNotifier, new JobConfigProvider(client)); + } + + JobManager(Environment environment, Settings settings, JobResultsProvider jobResultsProvider, + ClusterService clusterService, Auditor auditor, ThreadPool threadPool, + Client client, UpdateJobProcessNotifier updateJobProcessNotifier, JobConfigProvider jobConfigProvider) { this.settings = settings; this.environment = environment; this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); @@ -114,7 +121,7 @@ public JobManager(Environment environment, Settings settings, JobResultsProvider this.client = Objects.requireNonNull(client); this.threadPool = Objects.requireNonNull(threadPool); this.updateJobProcessNotifier = updateJobProcessNotifier; - this.jobConfigProvider = new JobConfigProvider(client); + this.jobConfigProvider = jobConfigProvider; maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() @@ -126,7 +133,21 @@ private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { } public void jobExists(String jobId, ActionListener listener) { - jobConfigProvider.jobExists(jobId, true, listener); + jobConfigProvider.jobExists(jobId, true, ActionListener.wrap( + jobFound -> { + if (jobFound) { + listener.onResponse(Boolean.TRUE); + } else { + // Look in the clusterstate for the job config + if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) { + listener.onResponse(Boolean.TRUE); + } else { + listener.onFailure(ExceptionsHelper.missingJobException(jobId)); + } + } + }, + listener::onFailure + )); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index c250770710ab8..572a10e44d20c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -8,7 +8,6 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -31,7 +30,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; @@ -61,13 +60,13 @@ public class TransportCloseJobActionTests extends ESTestCase { private ClusterService clusterService; - private JobConfigProvider jobConfigProvider; + private JobManager jobManager; private DatafeedConfigProvider datafeedConfigProvider; @Before private void setupMocks() { clusterService = mock(ClusterService.class); - jobConfigProvider = mock(JobConfigProvider.class); + jobManager = mock(JobManager.class); datafeedConfigProvider = mock(DatafeedConfigProvider.class); } @@ -222,7 +221,7 @@ public void testDoExecute_whenNothingToClose() { when(clusterService.state()).thenReturn(clusterState); SortedSet expandedIds = new TreeSet<>(); expandedIds.add("foo"); - mockJobConfigProviderExpandIds(expandedIds); + mockJobManagerExpandIds(expandedIds); mockDatafeedConfigFindDatafeeds(Collections.emptySortedSet()); AtomicBoolean gotResponse = new AtomicBoolean(false); @@ -278,8 +277,7 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat private TransportCloseJobAction createAction() { return new TransportCloseJobAction(Settings.EMPTY, mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), - clusterService, mock(Client.class), mock(Auditor.class), mock(PersistentTasksService.class), - jobConfigProvider, datafeedConfigProvider); + clusterService, mock(Auditor.class), mock(PersistentTasksService.class), datafeedConfigProvider, jobManager); } private void mockDatafeedConfigFindDatafeeds(Set datafeedIds) { @@ -291,13 +289,13 @@ private void mockDatafeedConfigFindDatafeeds(Set datafeedIds) { }).when(datafeedConfigProvider).findDatafeedsForJobIds(any(), any(ActionListener.class)); } - private void mockJobConfigProviderExpandIds(Set expandedIds) { + private void mockJobManagerExpandIds(Set expandedIds) { doAnswer(invocation -> { - ActionListener> listener = (ActionListener>) invocation.getArguments()[3]; + ActionListener> listener = (ActionListener>) invocation.getArguments()[2]; listener.onResponse(expandedIds); return null; - }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(ActionListener.class)); + }).when(jobManager).expandJobIds(any(), anyBoolean(), any(ActionListener.class)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 2494970ab782c..7f7c06af6d040 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; @@ -63,6 +64,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -76,6 +78,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -433,6 +437,73 @@ public void onFailure(Exception e) { }); } + public void testJobExists_GivenMissingJob() { + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + when(clusterService.state()).thenReturn(clusterState); + + JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); + + ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), + Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(false); + return null; + }).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any()); + + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, + auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); + + AtomicBoolean jobExistsHolder = new AtomicBoolean(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.jobExists("non-job", ActionListener.wrap( + jobExistsHolder::set, + exceptionHolder::set + )); + + assertFalse(jobExistsHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + } + + public void testJobExists_GivenJobIsInClusterState() { + Job csJobFoo1 = buildJobBuilder("cs-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); + + ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), + Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(false); + return null; + }).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any()); + + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, + auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); + + AtomicBoolean jobExistsHolder = new AtomicBoolean(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.jobExists("cs-job", ActionListener.wrap( + jobExistsHolder::set, + exceptionHolder::set + )); + + assertTrue(jobExistsHolder.get()); + assertNull(exceptionHolder.get()); + } + public void testPutJob_ThrowsIfJobExistsInClusterState() throws IOException { MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); From 683a95b4979b701af7421a3212b8b34e48184f1d Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 7 Nov 2018 12:22:07 +0000 Subject: [PATCH 05/10] Match job groups --- .../xpack/core/ml/MlMetadata.java | 5 +++ .../core/ml/job/groups/GroupOrJobLookup.java | 33 ++++++++++++++ .../ml/job/groups/GroupOrJobLookupTests.java | 13 ++++++ .../xpack/ml/job/JobManager.java | 39 ++++++++++++---- .../ml/job/persistence/JobConfigProvider.java | 26 ++++++++--- .../ml/integration/JobConfigProviderIT.java | 11 +++-- .../xpack/ml/job/JobManagerTests.java | 44 +++++++++++++------ 7 files changed, 139 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 2f891542365a6..e885b4cc58a1c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -90,6 +90,11 @@ public Set expandJobIds(String expression) { return groupOrJobLookup.expandJobIds(expression); } + // Matches only groups + public Set expandGroupIds(String expression) { + return groupOrJobLookup.expandGroupIds(expression); + } + public boolean isJobDeleting(String jobId) { Job job = jobs.get(jobId); return job == null || job.isDeleting(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java index 58c5e755091a4..56654e45fe2d0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java @@ -58,6 +58,10 @@ public Set expandJobIds(String expression) { return new GroupOrJobResolver().expand(expression); } + public Set expandGroupIds(String expression) { + return new GroupResolver().expand(expression); + } + public boolean isGroupOrJob(String id) { return groupOrJobLookup.containsKey(id); } @@ -86,4 +90,33 @@ protected List lookup(String key) { return groupOrJob == null ? Collections.emptyList() : groupOrJob.jobs().stream().map(Job::getId).collect(Collectors.toList()); } } + + private class GroupResolver extends NameResolver { + + private GroupResolver() { + } + + @Override + protected Set keys() { + return nameSet(); + } + + @Override + protected Set nameSet() { + return groupOrJobLookup.entrySet().stream() + .filter(entry -> entry.getValue().isGroup()) + .map(entry -> entry.getKey()) + .collect(Collectors.toSet()); + } + + @Override + protected List lookup(String key) { + GroupOrJob groupOrJob = groupOrJobLookup.get(key); + if (groupOrJob == null || groupOrJob.isGroup() == false) { + return Collections.emptyList(); + } else { + return Collections.singletonList(key); + } + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java index eb1e83f6d6ae1..37472584c5aab 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java @@ -92,6 +92,19 @@ public void testIsGroupOrJob() { assertFalse(groupOrJobLookup.isGroupOrJob("missing")); } + public void testExpandGroupIds() { + List jobs = new ArrayList<>(); + jobs.add(mockJob("foo-1", Arrays.asList("foo-group"))); + jobs.add(mockJob("foo-2", Arrays.asList("foo-group"))); + jobs.add(mockJob("bar-1", Arrays.asList("bar-group"))); + jobs.add(mockJob("nogroup", Collections.emptyList())); + + GroupOrJobLookup groupOrJobLookup = new GroupOrJobLookup(jobs); + assertThat(groupOrJobLookup.expandGroupIds("foo*"), contains("foo-group")); + assertThat(groupOrJobLookup.expandGroupIds("bar-group,nogroup"), contains("bar-group")); + assertThat(groupOrJobLookup.expandGroupIds("*"), contains("bar-group", "foo-group")); + } + private static Job mockJob(String jobId, List groups) { Job job = mock(Job.class); when(job.getId()).thenReturn(jobId); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 33b1dbcdd1e17..1c985c2d9ac3e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.job; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -16,7 +18,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -63,6 +64,7 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -83,10 +85,11 @@ *
  • updating
  • * */ -public class JobManager extends AbstractComponent { +public class JobManager { private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(JobManager.class)); + private static final Logger logger = LogManager.getLogger(JobManager.class); private final Settings settings; private final Environment environment; @@ -201,6 +204,13 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener groupIds = clusterStateJobs.values().stream() + .filter(job -> job.getGroups() != null).flatMap(j -> j.getGroups().stream()).collect(Collectors.toSet()); + requiredMatches.filterMatchedIds(groupIds); + jobConfigProvider.expandJobsWithoutMissingcheck(expression, false, ActionListener.wrap( jobBuilders -> { // Check for duplicate jobs @@ -212,13 +222,18 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener jobAndGroupIds = new HashSet<>(); + // Merge cluster state and index jobs List jobs = new ArrayList<>(); for (Job.Builder jb : jobBuilders) { - jobs.add(jb.build()); + Job job = jb.build(); + jobAndGroupIds.add(job.getId()); + jobAndGroupIds.addAll(job.getGroups()); + jobs.add(job); } - requiredMatches.filterMatchedIds(jobs.stream().map(Job::getId).collect(Collectors.toList())); + requiredMatches.filterMatchedIds(jobAndGroupIds); if (requiredMatches.hasUnmatchedIds()) { jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); @@ -253,11 +268,15 @@ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener< Set clusterStateJobIds = MlMetadata.getMlMetadata(clusterService.state()).expandJobIds(expression); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); requiredMatches.filterMatchedIds(clusterStateJobIds); + // If expression contains a group Id it has been expanded to its + // constituent job Ids but Ids matcher needs to know the group + // has been matched + requiredMatches.filterMatchedIds(MlMetadata.getMlMetadata(clusterService.state()).expandGroupIds(expression)); jobConfigProvider.expandJobsIdsWithoutMissingCheck(expression, false, ActionListener.wrap( - jobIds -> { + jobIdsAndGroups -> { // Check for duplicate job Ids - jobIds.forEach(id -> { + jobIdsAndGroups.getJobs().forEach(id -> { if (clusterStateJobIds.contains(id)) { jobsListener.onFailure(new IllegalStateException("Job [" + id + "] configuration " + "exists in both clusterstate and index")); @@ -265,12 +284,14 @@ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener< } }); - requiredMatches.filterMatchedIds(jobIds); + requiredMatches.filterMatchedIds(jobIdsAndGroups.getJobs()); + requiredMatches.filterMatchedIds(jobIdsAndGroups.getGroups()); if (requiredMatches.hasUnmatchedIds()) { jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); } else { - jobIds.addAll(clusterStateJobIds); - jobsListener.onResponse(new TreeSet<>(jobIds)); + SortedSet allJobIds = new TreeSet<>(clusterStateJobIds); + allJobIds.addAll(jobIdsAndGroups.getJobs()); + jobsListener.onResponse(allJobIds); } }, jobsListener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index e48d322478220..12cb5b5b00db3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -571,6 +571,24 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud } + public static class JobIdsAndGroups { + private SortedSet jobs; + private SortedSet groups; + + public JobIdsAndGroups(SortedSet jobs, SortedSet groups) { + this.jobs = jobs; + this.groups = groups; + } + + public SortedSet getJobs() { + return jobs; + } + + public SortedSet getGroups() { + return groups; + } + } + /** * Similar to {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but no error * is generated if there are missing Ids. Whatever Ids match will be returned. @@ -582,7 +600,7 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud * @param excludeDeleting If true exclude jobs marked as deleting * @param listener The expanded job Ids listener */ - public void expandJobsIdsWithoutMissingCheck(String expression, boolean excludeDeleting, ActionListener> listener) { + public void expandJobsIdsWithoutMissingCheck(String expression, boolean excludeDeleting, ActionListener listener) { SearchRequest searchRequest = makeExpandIdsSearchRequest(expression, excludeDeleting); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, @@ -599,8 +617,7 @@ public void expandJobsIdsWithoutMissingCheck(String expression, boolean excludeD } } - groupsIds.addAll(jobIds); - listener.onResponse(jobIds); + listener.onResponse(new JobIdsAndGroups(jobIds, groupsIds)); }, listener::onFailure) , client::search); @@ -704,7 +721,6 @@ public void expandJobsWithoutMissingcheck(String expression, boolean excludeDele ActionListener.wrap( response -> { List jobs = new ArrayList<>(); - Set jobAndGroupIds = new HashSet<>(); SearchHit[] hits = response.getHits().getHits(); for (SearchHit hit : hits) { @@ -712,8 +728,6 @@ public void expandJobsWithoutMissingcheck(String expression, boolean excludeDele BytesReference source = hit.getSourceRef(); Job.Builder job = parseJobLenientlyFromSource(source); jobs.add(job); - jobAndGroupIds.add(job.getId()); - jobAndGroupIds.addAll(job.getGroups()); } catch (IOException e) { // TODO A better way to handle this rather than just ignoring the error? logger.error("Error parsing anomaly detector job configuration [" + hit.getId() + "]", e); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index a27c4a06b24c1..45a93cc9c9dd0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -445,16 +445,19 @@ public void testExpandJobsIdsWithoutMissingCheck() throws Exception { client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); - SortedSet expandedIds = blockingCall(actionListener -> + JobConfigProvider.JobIdsAndGroups expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIdsWithoutMissingCheck("dick,john", false, actionListener)); - assertEquals(new TreeSet<>(Collections.singletonList("dick")), expandedIds); + assertEquals(new TreeSet<>(Collections.singletonList("dick")), expandedIds.getJobs()); + assertThat(expandedIds.getGroups(), empty()); expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIdsWithoutMissingCheck("foo*", true, actionListener)); - assertThat(expandedIds, empty()); + assertThat(expandedIds.getJobs(), empty()); + assertThat(expandedIds.getGroups(), empty()); expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIdsWithoutMissingCheck("harry-group,dave", false, actionListener)); - assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr")), expandedIds); + assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr")), expandedIds.getJobs()); + assertEquals(new TreeSet<>(Arrays.asList("harry-group")), expandedIds.getGroups()); } public void testExpandJobsWithoutMissingCheck() throws Exception { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 7f7c06af6d040..56fd06a5942f7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -259,13 +259,14 @@ public void testExpandJobs_SplitBetweenClusterStateAndIndex() throws IOException } public void testExpandJobs_GivenJobInClusterStateNotIndex() { - Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); - Job csJobFoo2 = buildJobBuilder("foo-cs-2").build(); + Job.Builder csJobFoo1 = buildJobBuilder("foo-cs-1"); + csJobFoo1.setGroups(Collections.singletonList("foo-group")); + Job.Builder csJobFoo2 = buildJobBuilder("foo-cs-2"); + csJobFoo2.setGroups(Collections.singletonList("foo-group")); MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - mlMetadata.putJob(csJobFoo1, false); - mlMetadata.putJob(csJobFoo2, false); - + mlMetadata.putJob(csJobFoo1.build(), false); + mlMetadata.putJob(csJobFoo2.build(), false); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder() @@ -273,9 +274,7 @@ public void testExpandJobs_GivenJobInClusterStateNotIndex() { .build(); when(clusterService.state()).thenReturn(clusterState); - List docsAsBytes = new ArrayList<>(); - MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -291,16 +290,28 @@ public void testExpandJobs_GivenJobInClusterStateNotIndex() { assertThat(jobsHolder.get().results(), hasSize(2)); List jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); assertThat(jobIds, contains("foo-cs-1", "foo-cs-2")); + + jobManager.expandJobs("foo-group", true, ActionListener.wrap( + jobs -> jobsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobsHolder.get()); + assertThat(jobsHolder.get().results(), hasSize(2)); + jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); + assertThat(jobIds, contains("foo-cs-1", "foo-cs-2")); } public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { - Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); - Job csJobFoo2 = buildJobBuilder("foo-cs-2").build(); + Job.Builder csJobFoo1 = buildJobBuilder("foo-cs-1"); + csJobFoo1.setGroups(Collections.singletonList("foo-group")); + Job.Builder csJobFoo2 = buildJobBuilder("foo-cs-2"); + csJobFoo2.setGroups(Collections.singletonList("foo-group")); Job csJobBar = buildJobBuilder("bar-cs").build(); MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - mlMetadata.putJob(csJobFoo1, false); - mlMetadata.putJob(csJobFoo2, false); + mlMetadata.putJob(csJobFoo1.build(), false); + mlMetadata.putJob(csJobFoo2.build(), false); mlMetadata.putJob(csJobBar, false); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) @@ -313,7 +324,7 @@ public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { fieldMap.put(Job.ID.getPreferredName(), new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("index-job"))); fieldMap.put(Job.GROUPS.getPreferredName(), - new DocumentField(Job.ID.getPreferredName(), Collections.emptyList())); + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("index-group"))); List> fieldHits = new ArrayList<>(); fieldHits.add(fieldMap); @@ -329,8 +340,15 @@ public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { )); assertNotNull(jobIdsHolder.get()); - assertThat(jobIdsHolder.get(), hasSize(4)); assertThat(jobIdsHolder.get(), contains("bar-cs", "foo-cs-1", "foo-cs-2", "index-job")); + + jobManager.expandJobIds("index-group", true, ActionListener.wrap( + jobs -> jobIdsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobIdsHolder.get()); + assertThat(jobIdsHolder.get(), contains("index-job")); } public void testExpandJobIds_GivenJobInClusterStateNotIndex() { From a8814e7ad2b3904e330d454ec78958c36c920127 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 7 Nov 2018 16:29:11 +0000 Subject: [PATCH 06/10] Fix compilation after rebase --- .../xpack/ml/action/TransportGetDatafeedsStatsAction.java | 2 +- .../xpack/ml/datafeed/DatafeedConfigReader.java | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index 4123767da951e..a117e8d591353 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -44,7 +44,7 @@ public TransportGetDatafeedsStatsAction(Settings settings, TransportService tran Client client,NamedXContentRegistry xContentRegistry) { super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new); - this.datafeedConfigReader = new DatafeedConfigReader(client, settings, xContentRegistry); + this.datafeedConfigReader = new DatafeedConfigReader(client, xContentRegistry); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java index c4efd4dec1c7a..ba05e050db4be 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -8,7 +8,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -32,8 +31,8 @@ public class DatafeedConfigReader { private final DatafeedConfigProvider datafeedConfigProvider; - public DatafeedConfigReader(Client client, Settings settings, NamedXContentRegistry xContentRegistry) { - this.datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); + public DatafeedConfigReader(Client client, NamedXContentRegistry xContentRegistry) { + this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry); } public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) { From f6e96fab94f6e0e6653e6926dede39117511ad67 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 8 Nov 2018 11:19:14 +0000 Subject: [PATCH 07/10] fix javadoc --- .../elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java index ba05e050db4be..9de12fa0ef419 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -40,7 +40,7 @@ public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) { } /** - * Merges the results of {@Link MlMetadata#expandDatafeedIds} + * Merges the results of {@link MlMetadata#expandDatafeedIds} * and {@link DatafeedConfigProvider#expandDatafeedIds(String, boolean, ActionListener)} */ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ClusterState clusterState, @@ -75,7 +75,7 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Clust } /** - * Merges the results of {@Link MlMetadata#expandDatafeedIds} + * Merges the results of {@link MlMetadata#expandDatafeedIds} * and {@link DatafeedConfigProvider#expandDatafeedConfigs(String, boolean, ActionListener)} */ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ClusterState clusterState, From aad7e88848cae14de2913a20a2a4f11e753f37e8 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 8 Nov 2018 14:53:04 +0000 Subject: [PATCH 08/10] Address review comments --- .../TransportGetDatafeedsStatsAction.java | 2 +- .../ml/datafeed/DatafeedConfigReader.java | 10 +-- .../xpack/ml/job/JobManager.java | 4 +- .../xpack/ml/job/JobManagerTests.java | 69 +++++++++++++++++++ 4 files changed, 74 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index a117e8d591353..55006afa3241e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -41,7 +41,7 @@ public TransportGetDatafeedsStatsAction(Settings settings, TransportService tran ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client,NamedXContentRegistry xContentRegistry) { + Client client, NamedXContentRegistry xContentRegistry) { super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new); this.datafeedConfigReader = new DatafeedConfigReader(client, xContentRegistry); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java index 9de12fa0ef419..e93f449f570d2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -16,11 +16,11 @@ import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -116,14 +116,8 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, C } private Map expandClusterStateDatafeeds(String datafeedExpression, ClusterState clusterState) { - - Map configById = new HashMap<>(); MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression); - - for (String expandedDatafeedId : expandedDatafeedIds) { - configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); - } - return configById; + return expandedDatafeedIds.stream().collect(Collectors.toMap(Function.identity(), mlMetadata::getDatafeed)); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1c985c2d9ac3e..47880f840effe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -276,13 +276,13 @@ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener< jobConfigProvider.expandJobsIdsWithoutMissingCheck(expression, false, ActionListener.wrap( jobIdsAndGroups -> { // Check for duplicate job Ids - jobIdsAndGroups.getJobs().forEach(id -> { + for (String id : jobIdsAndGroups.getJobs()) { if (clusterStateJobIds.contains(id)) { jobsListener.onFailure(new IllegalStateException("Job [" + id + "] configuration " + "exists in both clusterstate and index")); return; } - }); + } requiredMatches.filterMatchedIds(jobIdsAndGroups.getJobs()); requiredMatches.filterMatchedIds(jobIdsAndGroups.getGroups()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 56fd06a5942f7..41907d52dd048 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -224,6 +224,38 @@ public void testExpandJobsFromClusterStateAndIndex_GivenAll() throws IOException assertThat(jobIds, contains("foo-cs-1", "foo-cs-2", "foo-index")); } + public void testExpandJob_GivenDuplicateConfig() throws IOException { + Job csJob = buildJobBuilder("dupe").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJob, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + List docsAsBytes = new ArrayList<>(); + Job.Builder indexJob = buildJobBuilder("dupe"); + docsAsBytes.add(toBytesReference(indexJob.build())); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobsHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.expandJobs("_all", true, ActionListener.wrap( + jobsHolder::set, + exceptionHolder::set + )); + + assertNull(jobsHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); + assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); + } + public void testExpandJobs_SplitBetweenClusterStateAndIndex() throws IOException { Job csJob = buildJobBuilder("cs-job").build(); @@ -302,6 +334,43 @@ public void testExpandJobs_GivenJobInClusterStateNotIndex() { assertThat(jobIds, contains("foo-cs-1", "foo-cs-2")); } + public void testExpandJobIds_GivenDuplicateConfig() { + Job csJob = buildJobBuilder("dupe").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJob, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + Map fieldMap = new HashMap<>(); + fieldMap.put(Job.ID.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("dupe"))); + fieldMap.put(Job.GROUPS.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.emptyList())); + + List> fieldHits = new ArrayList<>(); + fieldHits.add(fieldMap); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), fieldHits); + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.expandJobIds("_all", true, ActionListener.wrap( + jobIdsHolder::set, + exceptionHolder::set + )); + + assertNull(jobIdsHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); + assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); + } + public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { Job.Builder csJobFoo1 = buildJobBuilder("foo-cs-1"); csJobFoo1.setGroups(Collections.singletonList("foo-group")); From a9870f1ee4767001868931399b44bf1cc1ed20e4 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 9 Nov 2018 16:39:00 +0000 Subject: [PATCH 09/10] Ensure order of datafeed configs --- .../ml/datafeed/DatafeedConfigReader.java | 3 + .../datafeed/DatafeedConfigReaderTests.java | 66 +++++++++++++++---- 2 files changed, 58 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java index e93f449f570d2..52b07ee3b929f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -16,6 +16,8 @@ import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -108,6 +110,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, C listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); } else { datafeedConfigs.addAll(clusterStateConfigs.values()); + Collections.sort(datafeedConfigs, Comparator.comparing(DatafeedConfig::getId)); listener.onResponse(datafeedConfigs); } }, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java index cac6bc12579ca..636cb2d811347 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.SortedSet; @@ -29,8 +30,9 @@ public class DatafeedConfigReaderTests extends ESTestCase { - private final String JOB_ID = "foo"; + private final String JOB_ID_FOO = "foo"; + @SuppressWarnings("unchecked") private void mockProviderWithExpectedIds(DatafeedConfigProvider mockedProvider, String expression, SortedSet datafeedIds) { doAnswer(invocationOnMock -> { ActionListener> listener = (ActionListener>) invocationOnMock.getArguments()[1]; @@ -39,13 +41,24 @@ private void mockProviderWithExpectedIds(DatafeedConfigProvider mockedProvider, }).when(mockedProvider).expandDatafeedIdsWithoutMissingCheck(eq(expression), any()); } + @SuppressWarnings("unchecked") + private void mockProviderWithExpectedConfig(DatafeedConfigProvider mockedProvider, String expression, + List datafeedConfigs) { + doAnswer(invocationOnMock -> { + ActionListener> listener = + (ActionListener>) invocationOnMock.getArguments()[1]; + listener.onResponse(datafeedConfigs); + return null; + }).when(mockedProvider).expandDatafeedConfigsWithoutMissingCheck(eq(expression), any()); + } + public void testExpandDatafeedIds_SplitBetweenClusterStateAndIndex() { SortedSet idsInIndex = new TreeSet<>(); idsInIndex.add("index-df"); DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); mockProviderWithExpectedIds(provider, "cs-df,index-df", idsInIndex); - ClusterState clusterState = buildClusterStateWithJob(Collections.singletonList(createDatafeedConfig("cs-df", JOB_ID))); + ClusterState clusterState = buildClusterStateWithJob(createDatafeedConfig("cs-df", JOB_ID_FOO)); DatafeedConfigReader reader = new DatafeedConfigReader(provider); @@ -72,7 +85,6 @@ public void testExpandDatafeedIds_SplitBetweenClusterStateAndIndex() { e -> assertNull(e) )); assertThat(idsHolder.get(), contains("index-df")); - } public void testExpandDatafeedIds_GivenAll() { @@ -82,7 +94,7 @@ public void testExpandDatafeedIds_GivenAll() { DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); mockProviderWithExpectedIds(provider, "_all", idsInIndex); - ClusterState clusterState = buildClusterStateWithJob(Collections.singletonList(createDatafeedConfig("df3", JOB_ID))); + ClusterState clusterState = buildClusterStateWithJob(createDatafeedConfig("df3", JOB_ID_FOO)); DatafeedConfigReader reader = new DatafeedConfigReader(provider); @@ -96,12 +108,40 @@ public void testExpandDatafeedIds_GivenAll() { assertThat(idsHolder.get(), contains("df1", "df2", "df3")); } - private ClusterState buildClusterStateWithJob(List datafeeds) { + public void testExpandDatafeedConfigs_SplitBetweenClusterStateAndIndex() { MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - mlMetadata.putJob(buildJobBuilder(JOB_ID).build(), false); - for (DatafeedConfig df : datafeeds) { - mlMetadata.putDatafeed(df, Collections.emptyMap()); - } + mlMetadata.putJob(buildJobBuilder("job-a").build(), false); + mlMetadata.putDatafeed(createDatafeedConfig("cs-df", "job-a"), Collections.emptyMap()); + mlMetadata.putJob(buildJobBuilder("job-b").build(), false); + mlMetadata.putDatafeed(createDatafeedConfig("ll-df", "job-b"), Collections.emptyMap()); + + ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedconfigreadertests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + + DatafeedConfig.Builder indexConfig = createDatafeedConfigBuilder("index-df", "job-c"); + DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); + mockProviderWithExpectedConfig(provider, "_all", Collections.singletonList(indexConfig)); + + DatafeedConfigReader reader = new DatafeedConfigReader(provider); + + AtomicReference> configHolder = new AtomicReference<>(); + reader.expandDatafeedConfigs("_all", true, clusterState, ActionListener.wrap( + configHolder::set, + e -> fail(e.getMessage()) + )); + + assertEquals("cs-df", configHolder.get().get(0).getId()); + assertEquals("index-df", configHolder.get().get(1).getId()); + assertEquals("ll-df", configHolder.get().get(2).getId()); + } + + private ClusterState buildClusterStateWithJob(DatafeedConfig datafeed) { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder(JOB_ID_FOO).build(), false); + mlMetadata.putDatafeed(datafeed, Collections.emptyMap()); return ClusterState.builder(new ClusterName("datafeedconfigreadertests")) .metaData(MetaData.builder() @@ -109,9 +149,13 @@ private ClusterState buildClusterStateWithJob(List datafeeds) { .build(); } - private DatafeedConfig createDatafeedConfig(String id, String jobId) { + private DatafeedConfig.Builder createDatafeedConfigBuilder(String id, String jobId) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(id, jobId); builder.setIndices(Collections.singletonList("beats*")); - return builder.build(); + return builder; + } + + private DatafeedConfig createDatafeedConfig(String id, String jobId) { + return createDatafeedConfigBuilder(id, jobId).build(); } } From a326be10cf5ecaddeffcb223f39321f20d968fef Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 13 Nov 2018 12:53:59 +0000 Subject: [PATCH 10/10] Checkstyle fixes --- .../xpack/ml/datafeed/DatafeedConfigReaderTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java index 636cb2d811347..986b0d0cf46ae 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.SortedSet;