diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 81762def4cc35..e885b4cc58a1c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -86,8 +86,13 @@ public boolean isGroupOrJob(String id) { return groupOrJobLookup.isGroupOrJob(id); } - public Set expandJobIds(String expression, boolean allowNoJobs) { - return groupOrJobLookup.expandJobIds(expression, allowNoJobs); + public Set expandJobIds(String expression) { + return groupOrJobLookup.expandJobIds(expression); + } + + // Matches only groups + public Set expandGroupIds(String expression) { + return groupOrJobLookup.expandGroupIds(expression); } public boolean isJobDeleting(String jobId) { @@ -107,9 +112,9 @@ public Optional getDatafeedByJobId(String jobId) { return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst(); } - public Set expandDatafeedIds(String expression, boolean allowNoDatafeeds) { - return NameResolver.newUnaliased(datafeeds.keySet(), ExceptionsHelper::missingDatafeedException) - .expand(expression, allowNoDatafeeds); + public Set expandDatafeedIds(String expression) { + return NameResolver.newUnaliased(datafeeds.keySet()) + .expand(expression); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java index fde28a84f8d2e..56654e45fe2d0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookup.java @@ -8,7 +8,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.NameResolver; import java.util.ArrayList; @@ -55,8 +54,12 @@ private void put(Job job) { } } - public Set expandJobIds(String expression, boolean allowNoJobs) { - return new GroupOrJobResolver().expand(expression, allowNoJobs); + public Set expandJobIds(String expression) { + return new GroupOrJobResolver().expand(expression); + } + + public Set expandGroupIds(String expression) { + return new GroupResolver().expand(expression); } public boolean isGroupOrJob(String id) { @@ -66,7 +69,6 @@ public boolean isGroupOrJob(String id) { private class GroupOrJobResolver extends NameResolver { private GroupOrJobResolver() { - super(ExceptionsHelper::missingJobException); } @Override @@ -88,4 +90,33 @@ protected List lookup(String key) { return groupOrJob == null ? Collections.emptyList() : groupOrJob.jobs().stream().map(Job::getId).collect(Collectors.toList()); } } + + private class GroupResolver extends NameResolver { + + private GroupResolver() { + } + + @Override + protected Set keys() { + return nameSet(); + } + + @Override + protected Set nameSet() { + return groupOrJobLookup.entrySet().stream() + .filter(entry -> entry.getValue().isGroup()) + .map(entry -> entry.getKey()) + .collect(Collectors.toSet()); + } + + @Override + protected List lookup(String key) { + GroupOrJob groupOrJob = groupOrJobLookup.get(key); + if (groupOrJob == null || groupOrJob.isGroup() == false) { + return Collections.emptyList(); + } else { + return Collections.singletonList(key); + } + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/NameResolver.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/NameResolver.java index f737a3d9ad7d0..6ea80c8e8689c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/NameResolver.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/NameResolver.java @@ -5,18 +5,15 @@ */ package org.elasticsearch.xpack.core.ml.utils; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.regex.Regex; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -25,12 +22,6 @@ */ public abstract class NameResolver { - private final Function notFoundExceptionSupplier; - - protected NameResolver(Function notFoundExceptionSupplier) { - this.notFoundExceptionSupplier = Objects.requireNonNull(notFoundExceptionSupplier); - } - /** * Expands an expression into the set of matching names. * For example, given a set of names ["foo-1", "foo-2", "bar-1", bar-2"], @@ -46,12 +37,9 @@ protected NameResolver(Function notFoundExcep * * * @param expression the expression to resolve - * @param allowNoMatch if {@code false}, an error is thrown when no name matches the {@code expression}. - * This only applies to wild card expressions, if {@code expression} is not a - * wildcard then setting this true will not suppress the exception * @return the sorted set of matching names */ - public SortedSet expand(String expression, boolean allowNoMatch) { + public SortedSet expand(String expression) { SortedSet result = new TreeSet<>(); if (MetaData.ALL.equals(expression) || Regex.isMatchAllPattern(expression)) { result.addAll(nameSet()); @@ -64,24 +52,13 @@ public SortedSet expand(String expression, boolean allowNoMatch) { .map(this::lookup) .flatMap(List::stream) .collect(Collectors.toList()); - if (expanded.isEmpty() && allowNoMatch == false) { - throw notFoundExceptionSupplier.apply(token); - } result.addAll(expanded); } else { List matchingNames = lookup(token); - // allowNoMatch only applies to wildcard expressions, - // this isn't so don't check the allowNoMatch here - if (matchingNames.isEmpty()) { - throw notFoundExceptionSupplier.apply(token); - } result.addAll(matchingNames); } } } - if (result.isEmpty() && allowNoMatch == false) { - throw notFoundExceptionSupplier.apply(expression); - } return result; } @@ -105,11 +82,10 @@ public SortedSet expand(String expression, boolean allowNoMatch) { /** * Creates a {@code NameResolver} that has no aliases * @param nameSet the set of all names - * @param notFoundExceptionSupplier a supplier of {@link ResourceNotFoundException} to be used when an expression matches no name * @return the unaliased {@code NameResolver} */ - public static NameResolver newUnaliased(Set nameSet, Function notFoundExceptionSupplier) { - return new NameResolver(notFoundExceptionSupplier) { + public static NameResolver newUnaliased(Set nameSet) { + return new NameResolver() { @Override protected Set keys() { return nameSet; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java index 8543f02cec56c..37472584c5aab 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java @@ -6,46 +6,34 @@ package org.elasticsearch.xpack.core.ml.job.groups; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.contains; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class GroupOrJobLookupTests extends ESTestCase { - public void testEmptyLookup_GivenAllowNoJobs() { - GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList()); - - assertThat(lookup.expandJobIds("_all", true).isEmpty(), is(true)); - assertThat(lookup.expandJobIds("*", true).isEmpty(), is(true)); - assertThat(lookup.expandJobIds("foo*", true).isEmpty(), is(true)); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo", true)); - } - - public void testEmptyLookup_GivenNotAllowNoJobs() { + public void testEmptyLookup() { GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList()); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("_all", false)); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("*", false)); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo*", false)); - expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo", true)); + assertThat(lookup.expandJobIds("_all").isEmpty(), is(true)); + assertThat(lookup.expandJobIds("*").isEmpty(), is(true)); + assertThat(lookup.expandJobIds("foo*").isEmpty(), is(true)); + assertThat(lookup.expandJobIds("foo").isEmpty(), is(true)); } public void testAllIsNotExpandedInCommaSeparatedExpression() { GroupOrJobLookup lookup = new GroupOrJobLookup(Collections.emptyList()); - ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> lookup.expandJobIds("foo-*,_all", true)); - assertThat(e.getMessage(), equalTo("No known job with id '_all'")); + assertThat(lookup.expandJobIds("foo*,_all").isEmpty(), is(true)); } public void testConstructor_GivenJobWithSameIdAsPreviousGroupName() { @@ -75,19 +63,19 @@ public void testLookup() { jobs.add(mockJob("nogroup", Collections.emptyList())); GroupOrJobLookup groupOrJobLookup = new GroupOrJobLookup(jobs); - assertThat(groupOrJobLookup.expandJobIds("_all", false), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup")); - assertThat(groupOrJobLookup.expandJobIds("*", false), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup")); - assertThat(groupOrJobLookup.expandJobIds("bar-1", false), contains("bar-1")); - assertThat(groupOrJobLookup.expandJobIds("foo-1", false), contains("foo-1")); - assertThat(groupOrJobLookup.expandJobIds("foo-2, bar-1", false), contains("bar-1", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("foo-group", false), contains("foo-1", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("bar-group", false), contains("bar-1", "bar-2")); - assertThat(groupOrJobLookup.expandJobIds("ones", false), contains("bar-1", "foo-1")); - assertThat(groupOrJobLookup.expandJobIds("twos", false), contains("bar-2", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("foo-group, nogroup", false), contains("foo-1", "foo-2", "nogroup")); - assertThat(groupOrJobLookup.expandJobIds("*-group", false), contains("bar-1", "bar-2", "foo-1", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("foo-group,foo-1,foo-2", false), contains("foo-1", "foo-2")); - assertThat(groupOrJobLookup.expandJobIds("foo-group,*-2", false), contains("bar-2", "foo-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("_all"), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup")); + assertThat(groupOrJobLookup.expandJobIds("*"), contains("bar-1", "bar-2", "foo-1", "foo-2", "nogroup")); + assertThat(groupOrJobLookup.expandJobIds("bar-1"), contains("bar-1")); + assertThat(groupOrJobLookup.expandJobIds("foo-1"), contains("foo-1")); + assertThat(groupOrJobLookup.expandJobIds("foo-2, bar-1"), contains("bar-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("foo-group"), contains("foo-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("bar-group"), contains("bar-1", "bar-2")); + assertThat(groupOrJobLookup.expandJobIds("ones"), contains("bar-1", "foo-1")); + assertThat(groupOrJobLookup.expandJobIds("twos"), contains("bar-2", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("foo-group, nogroup"), contains("foo-1", "foo-2", "nogroup")); + assertThat(groupOrJobLookup.expandJobIds("*-group"), contains("bar-1", "bar-2", "foo-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("foo-group,foo-1,foo-2"), contains("foo-1", "foo-2")); + assertThat(groupOrJobLookup.expandJobIds("foo-group,*-2"), contains("bar-2", "foo-1", "foo-2")); } public void testIsGroupOrJob() { @@ -104,6 +92,19 @@ public void testIsGroupOrJob() { assertFalse(groupOrJobLookup.isGroupOrJob("missing")); } + public void testExpandGroupIds() { + List jobs = new ArrayList<>(); + jobs.add(mockJob("foo-1", Arrays.asList("foo-group"))); + jobs.add(mockJob("foo-2", Arrays.asList("foo-group"))); + jobs.add(mockJob("bar-1", Arrays.asList("bar-group"))); + jobs.add(mockJob("nogroup", Collections.emptyList())); + + GroupOrJobLookup groupOrJobLookup = new GroupOrJobLookup(jobs); + assertThat(groupOrJobLookup.expandGroupIds("foo*"), contains("foo-group")); + assertThat(groupOrJobLookup.expandGroupIds("bar-group,nogroup"), contains("bar-group")); + assertThat(groupOrJobLookup.expandGroupIds("*"), contains("bar-group", "foo-group")); + } + private static Job mockJob(String jobId, List groups) { Job job = mock(Job.class); when(job.getId()).thenReturn(jobId); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 7c809543d1198..4850ad71dc2ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -37,7 +36,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.io.IOException; @@ -52,28 +51,25 @@ public class TransportCloseJobAction extends TransportTasksAction { - private final Client client; private final ClusterService clusterService; private final Auditor auditor; private final PersistentTasksService persistentTasksService; - private final JobConfigProvider jobConfigProvider; private final DatafeedConfigProvider datafeedConfigProvider; + private final JobManager jobManager; @Inject public TransportCloseJobAction(Settings settings, TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterService clusterService, Client client, Auditor auditor, - PersistentTasksService persistentTasksService, JobConfigProvider jobConfigProvider, - DatafeedConfigProvider datafeedConfigProvider) { + ClusterService clusterService, Auditor auditor, PersistentTasksService persistentTasksService, + DatafeedConfigProvider datafeedConfigProvider, JobManager jobManager) { // We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here: super(settings, CloseJobAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, CloseJobAction.Request::new, CloseJobAction.Response::new, ThreadPool.Names.SAME); - this.client = client; this.clusterService = clusterService; this.auditor = auditor; this.persistentTasksService = persistentTasksService; - this.jobConfigProvider = jobConfigProvider; this.datafeedConfigProvider = datafeedConfigProvider; + this.jobManager = jobManager; } @Override @@ -107,7 +103,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen */ PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap( + jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( expandedJobIds -> { validate(expandedJobIds, request.isForce(), tasksMetaData, ActionListener.wrap( response -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java index a848634b30e90..b5743701fb3d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java @@ -20,19 +20,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction { @@ -65,28 +57,10 @@ protected void masterOperation(GetDatafeedsAction.Request request, ClusterState ActionListener listener) { logger.debug("Get datafeed '{}'", request.getDatafeedId()); - Map clusterStateConfigs = - expandClusterStateDatafeeds(request.getDatafeedId(), request.allowNoDatafeeds(), state); - - datafeedConfigProvider.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( - datafeedBuilders -> { - // Check for duplicate datafeeds - for (DatafeedConfig.Builder datafeed : datafeedBuilders) { - if (clusterStateConfigs.containsKey(datafeed.getId())) { - listener.onFailure(new IllegalStateException("Datafeed [" + datafeed.getId() + "] configuration " + - "exists in both clusterstate and index")); - return; - } - } + DatafeedConfigReader datafeedConfigReader = new DatafeedConfigReader(datafeedConfigProvider); - // Merge cluster state and index configs - List datafeeds = new ArrayList<>(datafeedBuilders.size() + clusterStateConfigs.values().size()); - for (DatafeedConfig.Builder builder: datafeedBuilders) { - datafeeds.add(builder.build()); - } - - datafeeds.addAll(clusterStateConfigs.values()); - Collections.sort(datafeeds, Comparator.comparing(DatafeedConfig::getId)); + datafeedConfigReader.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoDatafeeds(), state, ActionListener.wrap( + datafeeds -> { listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeeds, datafeeds.size(), DatafeedConfig.RESULTS_FIELD))); }, @@ -94,24 +68,6 @@ protected void masterOperation(GetDatafeedsAction.Request request, ClusterState )); } - Map expandClusterStateDatafeeds(String datafeedExpression, boolean allowNoDatafeeds, - ClusterState clusterState) { - - Map configById = new HashMap<>(); - try { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoDatafeeds); - - for (String expandedDatafeedId : expandedDatafeedIds) { - configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); - } - } catch (Exception e){ - // ignore - } - - return configById; - } - @Override protected ClusterBlockException checkBlock(GetDatafeedsAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index ae1b42d2e3083..55006afa3241e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -16,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -24,7 +26,7 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import java.util.List; import java.util.stream.Collectors; @@ -32,17 +34,17 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction { - private final DatafeedConfigProvider datafeedConfigProvider; + private final DatafeedConfigReader datafeedConfigReader; @Inject public TransportGetDatafeedsStatsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - DatafeedConfigProvider datafeedConfigProvider) { + Client client, NamedXContentRegistry xContentRegistry) { super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new); - this.datafeedConfigProvider = datafeedConfigProvider; + this.datafeedConfigReader = new DatafeedConfigReader(client, xContentRegistry); } @Override @@ -57,10 +59,10 @@ protected GetDatafeedsStatsAction.Response newResponse() { @Override protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); - datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + datafeedConfigReader.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), state, ActionListener.wrap( expandedDatafeedIds -> { PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); List results = expandedDatafeedIds.stream() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index 57b5dbd8b7f23..e0ae481d61b53 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -54,27 +54,27 @@ public class TransportGetJobsStatsAction extends TransportTasksAction finalListener) { - jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), true, ActionListener.wrap( + jobManager.expandJobIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( expandedIds -> { request.setExpandedJobsIds(new ArrayList<>(expandedIds)); ActionListener jobStatsListener = ActionListener.wrap( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 4c5d6940bf45a..77910f21f67d1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import java.io.IOException; @@ -115,7 +116,8 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new)); } } else { - datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + DatafeedConfigReader datafeedConfigReader = new DatafeedConfigReader(datafeedConfigProvider); + datafeedConfigReader.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), state, ActionListener.wrap( expandedIds -> { PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java new file mode 100644 index 0000000000000..52b07ee3b929f --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReader.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; +import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * This class abstracts away reading datafeed configuration from either + * the cluster state or index documents. + */ +public class DatafeedConfigReader { + + private final DatafeedConfigProvider datafeedConfigProvider; + + public DatafeedConfigReader(Client client, NamedXContentRegistry xContentRegistry) { + this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry); + } + + public DatafeedConfigReader(DatafeedConfigProvider datafeedConfigProvider) { + this.datafeedConfigProvider = datafeedConfigProvider; + } + + /** + * Merges the results of {@link MlMetadata#expandDatafeedIds} + * and {@link DatafeedConfigProvider#expandDatafeedIds(String, boolean, ActionListener)} + */ + public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ClusterState clusterState, + ActionListener> listener) { + + Set clusterStateDatafeedIds = MlMetadata.getMlMetadata(clusterState).expandDatafeedIds(expression); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); + requiredMatches.filterMatchedIds(clusterStateDatafeedIds); + + datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck(expression, ActionListener.wrap( + expandedDatafeedIds -> { + // Check for duplicate Ids + expandedDatafeedIds.forEach(id -> { + if (clusterStateDatafeedIds.contains(id)) { + listener.onFailure(new IllegalStateException("Datafeed [" + id + "] configuration " + + "exists in both clusterstate and index")); + return; + } + }); + + requiredMatches.filterMatchedIds(expandedDatafeedIds); + + if (requiredMatches.hasUnmatchedIds()) { + listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); + } else { + expandedDatafeedIds.addAll(clusterStateDatafeedIds); + listener.onResponse(expandedDatafeedIds); + } + }, + listener::onFailure + )); + } + + /** + * Merges the results of {@link MlMetadata#expandDatafeedIds} + * and {@link DatafeedConfigProvider#expandDatafeedConfigs(String, boolean, ActionListener)} + */ + public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ClusterState clusterState, + ActionListener> listener) { + + Map clusterStateConfigs = expandClusterStateDatafeeds(expression, clusterState); + + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); + requiredMatches.filterMatchedIds(clusterStateConfigs.keySet()); + + datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck(expression, ActionListener.wrap( + datafeedBuilders -> { + // Check for duplicate Ids + datafeedBuilders.forEach(datafeedBuilder -> { + if (clusterStateConfigs.containsKey(datafeedBuilder.getId())) { + listener.onFailure(new IllegalStateException("Datafeed [" + datafeedBuilder.getId() + "] configuration " + + "exists in both clusterstate and index")); + return; + } + }); + + List datafeedConfigs = new ArrayList<>(); + for (DatafeedConfig.Builder builder : datafeedBuilders) { + datafeedConfigs.add(builder.build()); + } + + requiredMatches.filterMatchedIds(datafeedConfigs.stream().map(DatafeedConfig::getId).collect(Collectors.toList())); + + if (requiredMatches.hasUnmatchedIds()) { + listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString())); + } else { + datafeedConfigs.addAll(clusterStateConfigs.values()); + Collections.sort(datafeedConfigs, Comparator.comparing(DatafeedConfig::getId)); + listener.onResponse(datafeedConfigs); + } + }, + listener::onFailure + )); + } + + private Map expandClusterStateDatafeeds(String datafeedExpression, ClusterState clusterState) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression); + return expandedDatafeedIds.stream().collect(Collectors.toMap(Function.identity(), mlMetadata::getDatafeed)); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index cc4131e7c65d9..2972b27dda789 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -351,17 +351,9 @@ private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, Acti * @param listener The expanded datafeed IDs listener */ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener> listener) { - String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); - sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); - sourceBuilder.fetchSource(false); - sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + SearchRequest searchRequest = buildExpandDatafeedIdsSearch(expression); - SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setSource(sourceBuilder).request(); - - ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoDatafeeds); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.wrap( @@ -386,6 +378,45 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio } + /** + * Similar to {@link #expandDatafeedIds(String, boolean, ActionListener)} but no error + * is generated if there are missing Ids. Whatever Ids match will be returned. + * + * This method is only for use when combining datafeed Ids from multiple sources, its usage + * should be limited. + * + * @param expression the expression to resolve + * @param listener The expanded datafeed IDs listener + */ + public void expandDatafeedIdsWithoutMissingCheck(String expression, ActionListener> listener) { + SearchRequest searchRequest = buildExpandDatafeedIdsSearch(expression); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + SortedSet datafeedIds = new TreeSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue()); + } + listener.onResponse(datafeedIds); + }, + listener::onFailure) + , client::search); + } + + private SearchRequest buildExpandDatafeedIdsSearch(String expression) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); + sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); + sourceBuilder.fetchSource(false); + sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + + return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + } + /** * The same logic as {@link #expandDatafeedIds(String, boolean, ActionListener)} but * the full datafeed configuration is returned. @@ -398,7 +429,7 @@ public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, Actio * wildcard then setting this true will not suppress the exception * @param listener The expanded datafeed config listener */ - // NORELEASE datafeed configs should be paged or have a mechanism to return all jobs if there are many of them + // NORELEASE datafeed configs should be paged or have a mechanism to return all configs if there are many of them public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ActionListener> listener) { String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); @@ -442,6 +473,51 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A } + /** + * The same logic as {@link #expandDatafeedIdsWithoutMissingCheck(String, ActionListener)} + * but the full datafeed configuration is returned. + * + * This method is only for use when combining datafeeds from multiple sources, its usage + * should be limited. + * + * @param expression the expression to resolve + * @param listener The expanded datafeed config listener + */ + // NORELEASE datafeed configs should be paged or have a mechanism to return all configs if there are many of them + public void expandDatafeedConfigsWithoutMissingCheck(String expression, ActionListener> listener) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedIdQuery(tokens)); + sourceBuilder.sort(DatafeedConfig.ID.getPreferredName()); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + List datafeeds = new ArrayList<>(); + Set datafeedIds = new HashSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + try { + BytesReference source = hit.getSourceRef(); + DatafeedConfig.Builder datafeed = parseLenientlyFromSource(source); + datafeeds.add(datafeed); + datafeedIds.add(datafeed.getId()); + } catch (IOException e) { + // TODO A better way to handle this rather than just ignoring the error? + logger.error("Error parsing datafeed configuration [" + hit.getId() + "]", e); + } + } + listener.onResponse(datafeeds); + }, + listener::onFailure) + , client::search); + + } + + private QueryBuilder buildDatafeedIdQuery(String [] tokens) { QueryBuilder datafeedQuery = new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE); if (Strings.isAllOrWildcard(tokens)) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1097a547d2f57..47880f840effe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.job; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -16,7 +18,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -49,6 +50,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; +import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; @@ -62,13 +64,17 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Allows interactions with jobs. The managed interactions include: @@ -79,10 +85,11 @@ *
  • updating
  • * */ -public class JobManager extends AbstractComponent { +public class JobManager { private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(JobManager.class)); + private static final Logger logger = LogManager.getLogger(JobManager.class); private final Settings settings; private final Environment environment; @@ -102,6 +109,13 @@ public class JobManager extends AbstractComponent { public JobManager(Environment environment, Settings settings, JobResultsProvider jobResultsProvider, ClusterService clusterService, Auditor auditor, ThreadPool threadPool, Client client, UpdateJobProcessNotifier updateJobProcessNotifier) { + this(environment, settings, jobResultsProvider, clusterService, auditor, threadPool, client, + updateJobProcessNotifier, new JobConfigProvider(client)); + } + + JobManager(Environment environment, Settings settings, JobResultsProvider jobResultsProvider, + ClusterService clusterService, Auditor auditor, ThreadPool threadPool, + Client client, UpdateJobProcessNotifier updateJobProcessNotifier, JobConfigProvider jobConfigProvider) { this.settings = settings; this.environment = environment; this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider); @@ -110,7 +124,7 @@ public JobManager(Environment environment, Settings settings, JobResultsProvider this.client = Objects.requireNonNull(client); this.threadPool = Objects.requireNonNull(threadPool); this.updateJobProcessNotifier = updateJobProcessNotifier; - this.jobConfigProvider = new JobConfigProvider(client); + this.jobConfigProvider = jobConfigProvider; maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() @@ -122,7 +136,21 @@ private void setMaxModelMemoryLimit(ByteSizeValue maxModelMemoryLimit) { } public void jobExists(String jobId, ActionListener listener) { - jobConfigProvider.jobExists(jobId, true, listener); + jobConfigProvider.jobExists(jobId, true, ActionListener.wrap( + jobFound -> { + if (jobFound) { + listener.onResponse(Boolean.TRUE); + } else { + // Look in the clusterstate for the job config + if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) { + listener.onResponse(Boolean.TRUE); + } else { + listener.onFailure(ExceptionsHelper.missingJobException(jobId)); + } + } + }, + listener::onFailure + )); } /** @@ -172,9 +200,18 @@ private void getJobFromClusterState(String jobId, ActionListener jobListene * @param jobsListener The jobs listener */ public void expandJobs(String expression, boolean allowNoJobs, ActionListener> jobsListener) { - Map clusterStateJobs = expandJobsFromClusterState(expression, allowNoJobs, clusterService.state()); - - jobConfigProvider.expandJobs(expression, allowNoJobs, false, ActionListener.wrap( + Map clusterStateJobs = expandJobsFromClusterState(expression, clusterService.state()); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); + requiredMatches.filterMatchedIds(clusterStateJobs.keySet()); + + // If expression contains a group Id it has been expanded to its + // constituent job Ids but Ids matcher needs to know the group + // has been matched + Set groupIds = clusterStateJobs.values().stream() + .filter(job -> job.getGroups() != null).flatMap(j -> j.getGroups().stream()).collect(Collectors.toSet()); + requiredMatches.filterMatchedIds(groupIds); + + jobConfigProvider.expandJobsWithoutMissingcheck(expression, false, ActionListener.wrap( jobBuilders -> { // Check for duplicate jobs for (Job.Builder jb : jobBuilders) { @@ -185,34 +222,82 @@ public void expandJobs(String expression, boolean allowNoJobs, ActionListener jobAndGroupIds = new HashSet<>(); + // Merge cluster state and index jobs List jobs = new ArrayList<>(); for (Job.Builder jb : jobBuilders) { - jobs.add(jb.build()); + Job job = jb.build(); + jobAndGroupIds.add(job.getId()); + jobAndGroupIds.addAll(job.getGroups()); + jobs.add(job); } - jobs.addAll(clusterStateJobs.values()); - Collections.sort(jobs, Comparator.comparing(Job::getId)); - jobsListener.onResponse(new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD)); + requiredMatches.filterMatchedIds(jobAndGroupIds); + + if (requiredMatches.hasUnmatchedIds()) { + jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); + } else { + jobs.addAll(clusterStateJobs.values()); + Collections.sort(jobs, Comparator.comparing(Job::getId)); + jobsListener.onResponse(new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD)); + } }, jobsListener::onFailure )); } - private Map expandJobsFromClusterState(String expression, boolean allowNoJobs, ClusterState clusterState) { + private Map expandJobsFromClusterState(String expression, ClusterState clusterState) { Map jobIdToJob = new HashMap<>(); - try { - Set expandedJobIds = MlMetadata.getMlMetadata(clusterState).expandJobIds(expression, allowNoJobs); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - for (String expandedJobId : expandedJobIds) { - jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); - } - } catch (Exception e) { - // ignore + Set expandedJobIds = MlMetadata.getMlMetadata(clusterState).expandJobIds(expression); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + for (String expandedJobId : expandedJobIds) { + jobIdToJob.put(expandedJobId, mlMetadata.getJobs().get(expandedJobId)); } return jobIdToJob; } + /** + * Get the job Ids that match the given {@code expression}. + * + * @param expression the jobId or an expression matching jobIds + * @param allowNoJobs if {@code false}, an error is thrown when no job matches the {@code jobId} + * @param jobsListener The jobs listener + */ + public void expandJobIds(String expression, boolean allowNoJobs, ActionListener> jobsListener) { + Set clusterStateJobIds = MlMetadata.getMlMetadata(clusterService.state()).expandJobIds(expression); + ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(expression, allowNoJobs); + requiredMatches.filterMatchedIds(clusterStateJobIds); + // If expression contains a group Id it has been expanded to its + // constituent job Ids but Ids matcher needs to know the group + // has been matched + requiredMatches.filterMatchedIds(MlMetadata.getMlMetadata(clusterService.state()).expandGroupIds(expression)); + + jobConfigProvider.expandJobsIdsWithoutMissingCheck(expression, false, ActionListener.wrap( + jobIdsAndGroups -> { + // Check for duplicate job Ids + for (String id : jobIdsAndGroups.getJobs()) { + if (clusterStateJobIds.contains(id)) { + jobsListener.onFailure(new IllegalStateException("Job [" + id + "] configuration " + + "exists in both clusterstate and index")); + return; + } + } + + requiredMatches.filterMatchedIds(jobIdsAndGroups.getJobs()); + requiredMatches.filterMatchedIds(jobIdsAndGroups.getGroups()); + if (requiredMatches.hasUnmatchedIds()) { + jobsListener.onFailure(ExceptionsHelper.missingJobException(requiredMatches.unmatchedIdsString())); + } else { + SortedSet allJobIds = new TreeSet<>(clusterStateJobIds); + allJobIds.addAll(jobIdsAndGroups.getJobs()); + jobsListener.onResponse(allJobIds); + } + }, + jobsListener::onFailure + )); + } + /** * Validate the char filter/tokenizer/token filter names used in the categorization analyzer config (if any). * This validation has to be done server-side; it cannot be done in a client as that won't have loaded the @@ -524,8 +609,8 @@ public void updateProcessOnCalendarChanged(List calendarJobIds, ActionLi jobConfigProvider.expandGroupIds(calendarJobIds, ActionListener.wrap( expandedIds -> { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { - // Merge the expended group members with the request Ids. - // Ids that aren't jobs will be filtered by isJobOpen() + // Merge the expended group members with the request Ids which + // which are job ids rather than group Ids. expandedIds.addAll(calendarJobIds); for (String jobId : expandedIds) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java index 4f4968a9d5629..41a4d53df6b8f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/ExpandedIdsMatcher.java @@ -44,6 +44,18 @@ public final class ExpandedIdsMatcher { private final LinkedList requiredMatches; + /** + * Generate the list of required matches from {@code tokenExpression} and initialize. + * + * @param tokenExpression Token expression string will be split by {@link #tokenizeExpression(String)} + * @param allowNoMatchForWildcards If true then it is not required for wildcard + * expressions to match an Id meaning they are + * not returned in the list of required matches + */ + public ExpandedIdsMatcher(String tokenExpression, boolean allowNoMatchForWildcards) { + this(ExpandedIdsMatcher.tokenizeExpression(tokenExpression), allowNoMatchForWildcards); + } + /** * Generate the list of required matches from the expressions in {@code tokens} * and initialize. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index e68c3a689c4bc..12cb5b5b00db3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -537,17 +537,9 @@ public void markJobAsDeleting(String jobId, ActionListener listener) { * @param listener The expanded job Ids listener */ public void expandJobsIds(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener> listener) { - String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); - sourceBuilder.sort(Job.ID.getPreferredName()); - sourceBuilder.fetchSource(false); - sourceBuilder.docValueField(Job.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); - sourceBuilder.docValueField(Job.GROUPS.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); - - SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setSource(sourceBuilder).request(); + SearchRequest searchRequest = makeExpandIdsSearchRequest(expression, excludeDeleting); + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, @@ -579,6 +571,72 @@ public void expandJobsIds(String expression, boolean allowNoJobs, boolean exclud } + public static class JobIdsAndGroups { + private SortedSet jobs; + private SortedSet groups; + + public JobIdsAndGroups(SortedSet jobs, SortedSet groups) { + this.jobs = jobs; + this.groups = groups; + } + + public SortedSet getJobs() { + return jobs; + } + + public SortedSet getGroups() { + return groups; + } + } + + /** + * Similar to {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but no error + * is generated if there are missing Ids. Whatever Ids match will be returned. + * + * This method is only for use when combining jobs Ids from multiple sources, its usage + * should be limited. + * + * @param expression the expression to resolve + * @param excludeDeleting If true exclude jobs marked as deleting + * @param listener The expanded job Ids listener + */ + public void expandJobsIdsWithoutMissingCheck(String expression, boolean excludeDeleting, ActionListener listener) { + + SearchRequest searchRequest = makeExpandIdsSearchRequest(expression, excludeDeleting); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + SortedSet jobIds = new TreeSet<>(); + SortedSet groupsIds = new TreeSet<>(); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + jobIds.add(hit.field(Job.ID.getPreferredName()).getValue()); + List groups = hit.field(Job.GROUPS.getPreferredName()).getValues(); + if (groups != null) { + groupsIds.addAll(groups.stream().map(Object::toString).collect(Collectors.toList())); + } + } + + listener.onResponse(new JobIdsAndGroups(jobIds, groupsIds)); + }, + listener::onFailure) + , client::search); + + } + + private SearchRequest makeExpandIdsSearchRequest(String expression, boolean excludeDeleting) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); + sourceBuilder.sort(Job.ID.getPreferredName()); + sourceBuilder.fetchSource(false); + sourceBuilder.docValueField(Job.ID.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + sourceBuilder.docValueField(Job.GROUPS.getPreferredName(), DocValueFieldsContext.USE_DEFAULT_FORMAT); + + return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + } + /** * The same logic as {@link #expandJobsIds(String, boolean, boolean, ActionListener)} but * the full anomaly detector job configuration is returned. @@ -638,6 +696,51 @@ public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDe } + /** + * The same logic as {@link #expandJobsIdsWithoutMissingCheck(String, boolean, ActionListener)} + * but the full anomaly detector job configuration is returned. + * + * This method is only for use when combining jobs from multiple sources, its usage + * should be limited. + * + * @param expression the expression to resolve + * @param excludeDeleting If true exclude jobs marked as deleting + * @param listener The expanded jobs listener + */ + // NORELEASE jobs should be paged or have a mechanism to return all jobs if there are many of them + public void expandJobsWithoutMissingcheck(String expression, boolean excludeDeleting, ActionListener> listener) { + String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting)); + sourceBuilder.sort(Job.ID.getPreferredName()); + + SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setSource(sourceBuilder).request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap( + response -> { + List jobs = new ArrayList<>(); + + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + try { + BytesReference source = hit.getSourceRef(); + Job.Builder job = parseJobLenientlyFromSource(source); + jobs.add(job); + } catch (IOException e) { + // TODO A better way to handle this rather than just ignoring the error? + logger.error("Error parsing anomaly detector job configuration [" + hit.getId() + "]", e); + } + } + + listener.onResponse(jobs); + }, + listener::onFailure) + , client::search); + + } + /** * Expands the list of job group Ids to the set of jobs which are members of the groups. * Unlike {@link #expandJobsIds(String, boolean, boolean, ActionListener)} it is not an error diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index c7ca2ff805eba..cb43afed94280 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchModule; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -27,7 +28,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTests; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField; import java.util.Collections; @@ -35,8 +35,8 @@ import java.util.HashMap; import java.util.Map; -import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT; +import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; @@ -397,10 +397,10 @@ public void testRemoveDatafeed_failBecauseDatafeedStarted() { public void testExpandJobIds() { MlMetadata mlMetadata = newMlMetadataWithJobs("bar-1", "foo-1", "foo-2").build(); - assertThat(mlMetadata.expandJobIds("_all", false), contains("bar-1", "foo-1", "foo-2")); - assertThat(mlMetadata.expandJobIds("*", false), contains("bar-1", "foo-1", "foo-2")); - assertThat(mlMetadata.expandJobIds("foo-*", false), contains("foo-1", "foo-2")); - assertThat(mlMetadata.expandJobIds("foo-1,bar-*", false), contains("bar-1", "foo-1")); + assertThat(mlMetadata.expandJobIds("_all"), contains("bar-1", "foo-1", "foo-2")); + assertThat(mlMetadata.expandJobIds("*"), contains("bar-1", "foo-1", "foo-2")); + assertThat(mlMetadata.expandJobIds("foo-*"), contains("foo-1", "foo-2")); + assertThat(mlMetadata.expandJobIds("foo-1,bar-*"), contains("bar-1", "foo-1")); } public void testExpandDatafeedIds() { @@ -411,10 +411,10 @@ public void testExpandDatafeedIds() { MlMetadata mlMetadata = mlMetadataBuilder.build(); - assertThat(mlMetadata.expandDatafeedIds("_all", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); - assertThat(mlMetadata.expandDatafeedIds("*", false), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); - assertThat(mlMetadata.expandDatafeedIds("foo-*", false), contains("foo-1-feed", "foo-2-feed")); - assertThat(mlMetadata.expandDatafeedIds("foo-1-feed,bar-1*", false), contains("bar-1-feed", "foo-1-feed")); + assertThat(mlMetadata.expandDatafeedIds("_all"), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); + assertThat(mlMetadata.expandDatafeedIds("*"), contains("bar-1-feed", "foo-1-feed", "foo-2-feed")); + assertThat(mlMetadata.expandDatafeedIds("foo-*"), contains("foo-1-feed", "foo-2-feed")); + assertThat(mlMetadata.expandDatafeedIds("foo-1-feed,bar-1*"), contains("bar-1-feed", "foo-1-feed")); } private static MlMetadata.Builder newMlMetadataWithJobs(String... jobIds) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index c250770710ab8..572a10e44d20c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -8,7 +8,6 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -31,7 +30,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; @@ -61,13 +60,13 @@ public class TransportCloseJobActionTests extends ESTestCase { private ClusterService clusterService; - private JobConfigProvider jobConfigProvider; + private JobManager jobManager; private DatafeedConfigProvider datafeedConfigProvider; @Before private void setupMocks() { clusterService = mock(ClusterService.class); - jobConfigProvider = mock(JobConfigProvider.class); + jobManager = mock(JobManager.class); datafeedConfigProvider = mock(DatafeedConfigProvider.class); } @@ -222,7 +221,7 @@ public void testDoExecute_whenNothingToClose() { when(clusterService.state()).thenReturn(clusterState); SortedSet expandedIds = new TreeSet<>(); expandedIds.add("foo"); - mockJobConfigProviderExpandIds(expandedIds); + mockJobManagerExpandIds(expandedIds); mockDatafeedConfigFindDatafeeds(Collections.emptySortedSet()); AtomicBoolean gotResponse = new AtomicBoolean(false); @@ -278,8 +277,7 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat private TransportCloseJobAction createAction() { return new TransportCloseJobAction(Settings.EMPTY, mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), - clusterService, mock(Client.class), mock(Auditor.class), mock(PersistentTasksService.class), - jobConfigProvider, datafeedConfigProvider); + clusterService, mock(Auditor.class), mock(PersistentTasksService.class), datafeedConfigProvider, jobManager); } private void mockDatafeedConfigFindDatafeeds(Set datafeedIds) { @@ -291,13 +289,13 @@ private void mockDatafeedConfigFindDatafeeds(Set datafeedIds) { }).when(datafeedConfigProvider).findDatafeedsForJobIds(any(), any(ActionListener.class)); } - private void mockJobConfigProviderExpandIds(Set expandedIds) { + private void mockJobManagerExpandIds(Set expandedIds) { doAnswer(invocation -> { - ActionListener> listener = (ActionListener>) invocation.getArguments()[3]; + ActionListener> listener = (ActionListener>) invocation.getArguments()[2]; listener.onResponse(expandedIds); return null; - }).when(jobConfigProvider).expandJobsIds(any(), anyBoolean(), anyBoolean(), any(ActionListener.class)); + }).when(jobManager).expandJobIds(any(), anyBoolean(), any(ActionListener.class)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java new file mode 100644 index 0000000000000..986b0d0cf46ae --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigReaderTests.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; + +import java.util.Collections; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; +import static org.hamcrest.Matchers.contains; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class DatafeedConfigReaderTests extends ESTestCase { + + private final String JOB_ID_FOO = "foo"; + + @SuppressWarnings("unchecked") + private void mockProviderWithExpectedIds(DatafeedConfigProvider mockedProvider, String expression, SortedSet datafeedIds) { + doAnswer(invocationOnMock -> { + ActionListener> listener = (ActionListener>) invocationOnMock.getArguments()[1]; + listener.onResponse(datafeedIds); + return null; + }).when(mockedProvider).expandDatafeedIdsWithoutMissingCheck(eq(expression), any()); + } + + @SuppressWarnings("unchecked") + private void mockProviderWithExpectedConfig(DatafeedConfigProvider mockedProvider, String expression, + List datafeedConfigs) { + doAnswer(invocationOnMock -> { + ActionListener> listener = + (ActionListener>) invocationOnMock.getArguments()[1]; + listener.onResponse(datafeedConfigs); + return null; + }).when(mockedProvider).expandDatafeedConfigsWithoutMissingCheck(eq(expression), any()); + } + + public void testExpandDatafeedIds_SplitBetweenClusterStateAndIndex() { + SortedSet idsInIndex = new TreeSet<>(); + idsInIndex.add("index-df"); + DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); + mockProviderWithExpectedIds(provider, "cs-df,index-df", idsInIndex); + + ClusterState clusterState = buildClusterStateWithJob(createDatafeedConfig("cs-df", JOB_ID_FOO)); + + DatafeedConfigReader reader = new DatafeedConfigReader(provider); + + AtomicReference> idsHolder = new AtomicReference<>(); + reader.expandDatafeedIds("cs-df,index-df", true, clusterState, ActionListener.wrap( + idsHolder::set, + e -> fail(e.getMessage()) + )); + assertNotNull(idsHolder.get()); + assertThat(idsHolder.get(), contains("cs-df", "index-df")); + + mockProviderWithExpectedIds(provider, "cs-df", new TreeSet<>()); + reader.expandDatafeedIds("cs-df", true, clusterState, ActionListener.wrap( + idsHolder::set, + e -> assertNull(e) + )); + assertThat(idsHolder.get(), contains("cs-df")); + + idsInIndex.clear(); + idsInIndex.add("index-df"); + mockProviderWithExpectedIds(provider, "index-df", idsInIndex); + reader.expandDatafeedIds("index-df", true, clusterState, ActionListener.wrap( + idsHolder::set, + e -> assertNull(e) + )); + assertThat(idsHolder.get(), contains("index-df")); + } + + public void testExpandDatafeedIds_GivenAll() { + SortedSet idsInIndex = new TreeSet<>(); + idsInIndex.add("df1"); + idsInIndex.add("df2"); + DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); + mockProviderWithExpectedIds(provider, "_all", idsInIndex); + + ClusterState clusterState = buildClusterStateWithJob(createDatafeedConfig("df3", JOB_ID_FOO)); + + DatafeedConfigReader reader = new DatafeedConfigReader(provider); + + AtomicReference> idsHolder = new AtomicReference<>(); + reader.expandDatafeedIds("_all", true, clusterState, ActionListener.wrap( + idsHolder::set, + e -> fail(e.getMessage()) + )); + + assertNotNull(idsHolder.get()); + assertThat(idsHolder.get(), contains("df1", "df2", "df3")); + } + + public void testExpandDatafeedConfigs_SplitBetweenClusterStateAndIndex() { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-a").build(), false); + mlMetadata.putDatafeed(createDatafeedConfig("cs-df", "job-a"), Collections.emptyMap()); + mlMetadata.putJob(buildJobBuilder("job-b").build(), false); + mlMetadata.putDatafeed(createDatafeedConfig("ll-df", "job-b"), Collections.emptyMap()); + + ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedconfigreadertests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + + DatafeedConfig.Builder indexConfig = createDatafeedConfigBuilder("index-df", "job-c"); + DatafeedConfigProvider provider = mock(DatafeedConfigProvider.class); + mockProviderWithExpectedConfig(provider, "_all", Collections.singletonList(indexConfig)); + + DatafeedConfigReader reader = new DatafeedConfigReader(provider); + + AtomicReference> configHolder = new AtomicReference<>(); + reader.expandDatafeedConfigs("_all", true, clusterState, ActionListener.wrap( + configHolder::set, + e -> fail(e.getMessage()) + )); + + assertEquals("cs-df", configHolder.get().get(0).getId()); + assertEquals("index-df", configHolder.get().get(1).getId()); + assertEquals("ll-df", configHolder.get().get(2).getId()); + } + + private ClusterState buildClusterStateWithJob(DatafeedConfig datafeed) { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder(JOB_ID_FOO).build(), false); + mlMetadata.putDatafeed(datafeed, Collections.emptyMap()); + + return ClusterState.builder(new ClusterName("datafeedconfigreadertests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + } + + private DatafeedConfig.Builder createDatafeedConfigBuilder(String id, String jobId) { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder(id, jobId); + builder.setIndices(Collections.singletonList("beats*")); + return builder; + } + + private DatafeedConfig createDatafeedConfig(String id, String jobId) { + return createDatafeedConfigBuilder(id, jobId).build(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 9496f4ca0d8f2..19f772709ead8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -245,7 +245,7 @@ public void testExpandDatafeeds() throws Exception { client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); - // Test job IDs only + // Test IDs only SortedSet expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("foo*", true, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("foo-1", "foo-2")), expandedIds); @@ -262,7 +262,7 @@ public void testExpandDatafeeds() throws Exception { expandedIds = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIds("bar-1,foo*", true, actionListener)); assertEquals(new TreeSet<>(Arrays.asList("bar-1", "foo-1", "foo-2")), expandedIds); - // Test full job config + // Test full config List expandedDatafeedBuilders = blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("foo*", true, actionListener)); List expandedDatafeeds = @@ -290,6 +290,33 @@ public void testExpandDatafeeds() throws Exception { assertThat(expandedDatafeeds, containsInAnyOrder(bar1, foo1, foo2)); } + public void testExpandDatafeedsWithoutMissingCheck() throws Exception { + DatafeedConfig foo1 = putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap()); + putDatafeedConfig(createDatafeedConfig("bar-1", "j3"), Collections.emptyMap()); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + // Test IDs only + SortedSet expandedIds = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck("tim", actionListener)); + assertThat(expandedIds, empty()); + + expandedIds = blockingCall(actionListener -> + datafeedConfigProvider.expandDatafeedIdsWithoutMissingCheck("foo-1,dave", actionListener)); + assertThat(expandedIds, contains("foo-1")); + + // Test full config + List expandedDatafeedBuilders = + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck("tim", actionListener)); + assertThat(expandedDatafeedBuilders, empty()); + + expandedDatafeedBuilders = blockingCall(actionListener -> + datafeedConfigProvider.expandDatafeedConfigsWithoutMissingCheck("foo*,dave", actionListener)); + List expandedDatafeeds = + expandedDatafeedBuilders.stream().map(DatafeedConfig.Builder::build).collect(Collectors.toList()); + assertThat(expandedDatafeeds, contains(foo1)); + } + public void testFindDatafeedsForJobIds() throws Exception { putDatafeedConfig(createDatafeedConfig("foo-1", "j1"), Collections.emptyMap()); putDatafeedConfig(createDatafeedConfig("foo-2", "j2"), Collections.emptyMap()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index bd6e2df8ad0a6..45a93cc9c9dd0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -437,6 +437,53 @@ public void testExpandJobIds_excludeDeleting() throws Exception { assertThat(expandedJobsBuilders, hasSize(3)); } + public void testExpandJobsIdsWithoutMissingCheck() throws Exception { + putJob(createJob("tom", null)); + putJob(createJob("dick", null)); + putJob(createJob("harry", Collections.singletonList("harry-group"))); + putJob(createJob("harry-jnr", Collections.singletonList("harry-group"))); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + JobConfigProvider.JobIdsAndGroups expandedIds = blockingCall(actionListener -> + jobConfigProvider.expandJobsIdsWithoutMissingCheck("dick,john", false, actionListener)); + assertEquals(new TreeSet<>(Collections.singletonList("dick")), expandedIds.getJobs()); + assertThat(expandedIds.getGroups(), empty()); + + expandedIds = blockingCall(actionListener -> jobConfigProvider.expandJobsIdsWithoutMissingCheck("foo*", true, actionListener)); + assertThat(expandedIds.getJobs(), empty()); + assertThat(expandedIds.getGroups(), empty()); + + expandedIds = blockingCall(actionListener -> + jobConfigProvider.expandJobsIdsWithoutMissingCheck("harry-group,dave", false, actionListener)); + assertEquals(new TreeSet<>(Arrays.asList("harry", "harry-jnr")), expandedIds.getJobs()); + assertEquals(new TreeSet<>(Arrays.asList("harry-group")), expandedIds.getGroups()); + } + + public void testExpandJobsWithoutMissingCheck() throws Exception { + putJob(createJob("tom", null)); + putJob(createJob("dick", null)); + putJob(createJob("harry", Collections.singletonList("harry-group"))); + putJob(createJob("harry-jnr", Collections.singletonList("harry-group"))); + + client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.configIndexName()).get(); + + List expandedJobsBuilders = + blockingCall(actionListener -> jobConfigProvider.expandJobsWithoutMissingcheck("dick,john", true, actionListener)); + List expandedJobIds = expandedJobsBuilders.stream().map(Job.Builder::build).map(Job::getId).collect(Collectors.toList()); + assertThat(expandedJobIds, contains("dick")); + + expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobsWithoutMissingcheck("foo*", true, actionListener)); + expandedJobIds = expandedJobsBuilders.stream().map(Job.Builder::build).map(Job::getId).collect(Collectors.toList()); + assertThat(expandedJobIds, empty()); + + expandedJobsBuilders = blockingCall(actionListener -> + jobConfigProvider.expandJobsWithoutMissingcheck("harry-group,dave", true, actionListener)); + expandedJobIds = expandedJobsBuilders.stream().map(Job.Builder::build).map(Job::getId).collect(Collectors.toList()); + assertThat(expandedJobIds, contains("harry", "harry-jnr")); + } + public void testExpandGroups() throws Exception { putJob(createJob("apples", Collections.singletonList("fruit"))); putJob(createJob("pears", Collections.singletonList("fruit"))); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index e7ec8c789855f..41907d52dd048 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams; @@ -57,10 +58,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -74,6 +78,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -168,7 +174,7 @@ public void testGetJobFromClusterWhenNotInIndex() { assertEquals(clusterJob, jobHolder.get()); } - public void testExpandJobsFromClusterStateAndIndex() throws IOException { + public void testExpandJobsFromClusterStateAndIndex_GivenAll() throws IOException { Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); Job csJobFoo2 = buildJobBuilder("foo-cs-2").build(); Job csJobBar = buildJobBuilder("bar-cs").build(); @@ -190,7 +196,7 @@ public void testExpandJobsFromClusterStateAndIndex() throws IOException { Job.Builder indexJobFoo = buildJobBuilder("foo-index"); docsAsBytes.add(toBytesReference(indexJobFoo.build())); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -218,9 +224,269 @@ public void testExpandJobsFromClusterStateAndIndex() throws IOException { assertThat(jobIds, contains("foo-cs-1", "foo-cs-2", "foo-index")); } + public void testExpandJob_GivenDuplicateConfig() throws IOException { + Job csJob = buildJobBuilder("dupe").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJob, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + List docsAsBytes = new ArrayList<>(); + Job.Builder indexJob = buildJobBuilder("dupe"); + docsAsBytes.add(toBytesReference(indexJob.build())); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobsHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.expandJobs("_all", true, ActionListener.wrap( + jobsHolder::set, + exceptionHolder::set + )); + + assertNull(jobsHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); + assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); + } + + public void testExpandJobs_SplitBetweenClusterStateAndIndex() throws IOException { + Job csJob = buildJobBuilder("cs-job").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJob, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + + List docsAsBytes = new ArrayList<>(); + + Job.Builder indexJob = buildJobBuilder("index-job"); + docsAsBytes.add(toBytesReference(indexJob.build())); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); + JobManager jobManager = createJobManager(mockClientBuilder.build()); + + AtomicReference> jobsHolder = new AtomicReference<>(); + jobManager.expandJobs("cs-job,index-job", true, ActionListener.wrap( + jobs -> jobsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobsHolder.get()); + assertThat(jobsHolder.get().results(), hasSize(2)); + List jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); + assertThat(jobIds, contains("cs-job", "index-job")); + } + + public void testExpandJobs_GivenJobInClusterStateNotIndex() { + Job.Builder csJobFoo1 = buildJobBuilder("foo-cs-1"); + csJobFoo1.setGroups(Collections.singletonList("foo-group")); + Job.Builder csJobFoo2 = buildJobBuilder("foo-cs-2"); + csJobFoo2.setGroups(Collections.singletonList("foo-group")); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1.build(), false); + mlMetadata.putJob(csJobFoo2.build(), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + List docsAsBytes = new ArrayList<>(); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); + JobManager jobManager = createJobManager(mockClientBuilder.build()); + + + AtomicReference> jobsHolder = new AtomicReference<>(); + jobManager.expandJobs("foo*", true, ActionListener.wrap( + jobs -> jobsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobsHolder.get()); + assertThat(jobsHolder.get().results(), hasSize(2)); + List jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); + assertThat(jobIds, contains("foo-cs-1", "foo-cs-2")); + + jobManager.expandJobs("foo-group", true, ActionListener.wrap( + jobs -> jobsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobsHolder.get()); + assertThat(jobsHolder.get().results(), hasSize(2)); + jobIds = jobsHolder.get().results().stream().map(Job::getId).collect(Collectors.toList()); + assertThat(jobIds, contains("foo-cs-1", "foo-cs-2")); + } + + public void testExpandJobIds_GivenDuplicateConfig() { + Job csJob = buildJobBuilder("dupe").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJob, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + Map fieldMap = new HashMap<>(); + fieldMap.put(Job.ID.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("dupe"))); + fieldMap.put(Job.GROUPS.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.emptyList())); + + List> fieldHits = new ArrayList<>(); + fieldHits.add(fieldMap); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), fieldHits); + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.expandJobIds("_all", true, ActionListener.wrap( + jobIdsHolder::set, + exceptionHolder::set + )); + + assertNull(jobIdsHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(IllegalStateException.class)); + assertEquals("Job [dupe] configuration exists in both clusterstate and index", exceptionHolder.get().getMessage()); + } + + public void testExpandJobIdsFromClusterStateAndIndex_GivenAll() { + Job.Builder csJobFoo1 = buildJobBuilder("foo-cs-1"); + csJobFoo1.setGroups(Collections.singletonList("foo-group")); + Job.Builder csJobFoo2 = buildJobBuilder("foo-cs-2"); + csJobFoo2.setGroups(Collections.singletonList("foo-group")); + Job csJobBar = buildJobBuilder("bar-cs").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1.build(), false); + mlMetadata.putJob(csJobFoo2.build(), false); + mlMetadata.putJob(csJobBar, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + Map fieldMap = new HashMap<>(); + fieldMap.put(Job.ID.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("index-job"))); + fieldMap.put(Job.GROUPS.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("index-group"))); + + List> fieldHits = new ArrayList<>(); + fieldHits.add(fieldMap); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), fieldHits); + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + jobManager.expandJobIds("_all", true, ActionListener.wrap( + jobs -> jobIdsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobIdsHolder.get()); + assertThat(jobIdsHolder.get(), contains("bar-cs", "foo-cs-1", "foo-cs-2", "index-job")); + + jobManager.expandJobIds("index-group", true, ActionListener.wrap( + jobs -> jobIdsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobIdsHolder.get()); + assertThat(jobIdsHolder.get(), contains("index-job")); + } + + public void testExpandJobIds_GivenJobInClusterStateNotIndex() { + Job csJobFoo1 = buildJobBuilder("foo-cs-1").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), Collections.emptyList()); + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + jobManager.expandJobIds("foo*", true, ActionListener.wrap( + jobs -> jobIdsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobIdsHolder.get()); + assertThat(jobIdsHolder.get(), hasSize(1)); + assertThat(jobIdsHolder.get(), contains("foo-cs-1")); + } + + public void testExpandJobIds_GivenConfigInIndexAndClusterState() { + Job csJobFoo1 = buildJobBuilder("cs-job").build(); + + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + Map fieldMap = new HashMap<>(); + fieldMap.put(Job.ID.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.singletonList("index-job"))); + fieldMap.put(Job.GROUPS.getPreferredName(), + new DocumentField(Job.ID.getPreferredName(), Collections.emptyList())); + + List> fieldHits = new ArrayList<>(); + fieldHits.add(fieldMap); + + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); + mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), fieldHits); + + + JobManager jobManager = createJobManager(mockClientBuilder.build()); + AtomicReference> jobIdsHolder = new AtomicReference<>(); + jobManager.expandJobIds("index-job,cs-job", true, ActionListener.wrap( + jobs -> jobIdsHolder.set(jobs), + e -> fail(e.getMessage()) + )); + + assertNotNull(jobIdsHolder.get()); + assertThat(jobIdsHolder.get(), hasSize(2)); + assertThat(jobIdsHolder.get(), contains("cs-job" ,"index-job")); + } + @SuppressWarnings("unchecked") public void testPutJob_AddsCreateTime() throws IOException { - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob()); @@ -258,8 +524,75 @@ public void onFailure(Exception e) { }); } + public void testJobExists_GivenMissingJob() { + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + when(clusterService.state()).thenReturn(clusterState); + + JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); + + ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), + Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(false); + return null; + }).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any()); + + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, + auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); + + AtomicBoolean jobExistsHolder = new AtomicBoolean(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.jobExists("non-job", ActionListener.wrap( + jobExistsHolder::set, + exceptionHolder::set + )); + + assertFalse(jobExistsHolder.get()); + assertThat(exceptionHolder.get(), instanceOf(ResourceNotFoundException.class)); + } + + public void testJobExists_GivenJobIsInClusterState() { + Job csJobFoo1 = buildJobBuilder("cs-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(csJobFoo1, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class); + + ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), + Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(false); + return null; + }).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any()); + + JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, + auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider); + + AtomicBoolean jobExistsHolder = new AtomicBoolean(); + AtomicReference exceptionHolder = new AtomicReference<>(); + jobManager.jobExists("cs-job", ActionListener.wrap( + jobExistsHolder::set, + exceptionHolder::set + )); + + assertTrue(jobExistsHolder.get()); + assertNull(exceptionHolder.get()); + } + public void testPutJob_ThrowsIfJobExistsInClusterState() throws IOException { - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob()); @@ -284,7 +617,7 @@ public void onFailure(Exception e) { public void testNotifyFilterChangedGivenNoop() { MlFilter filter = MlFilter.builder("my_filter").build(); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); JobManager jobManager = createJobManager(mockClientBuilder.build()); jobManager.notifyFilterChanged(filter, Collections.emptySet(), Collections.emptySet(), ActionListener.wrap( @@ -336,7 +669,7 @@ public void testNotifyFilterChanged() throws IOException { return null; }).when(updateJobProcessNotifier).submitJobUpdate(any(), any()); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -387,7 +720,7 @@ public void testNotifyFilterChangedGivenOnlyAddedItems() throws IOException { .build(); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -423,7 +756,7 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { when(clusterService.state()).thenReturn(clusterState); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); mockClientBuilder.prepareSearch(AnomalyDetectorsIndex.configIndexName(), docsAsBytes); JobManager jobManager = createJobManager(mockClientBuilder.build()); @@ -439,7 +772,7 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { Mockito.verifyNoMoreInteractions(auditor, updateJobProcessNotifier); } - public void testUpdateProcessOnCalendarChanged() throws IOException { + public void testUpdateProcessOnCalendarChanged() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job-1", "node_id", JobState.OPENED, tasksBuilder); addJobTask("job-2", "node_id", JobState.OPENED, tasksBuilder); @@ -451,7 +784,7 @@ public void testUpdateProcessOnCalendarChanged() throws IOException { .build(); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); // For the JobConfigProvider expand groups search. // The search will not return any results mockClientBuilder.prepareSearchFields(AnomalyDetectorsIndex.configIndexName(), Collections.emptyList()); @@ -487,7 +820,7 @@ public void testUpdateProcessOnCalendarChanged_GivenGroups() throws IOException .build(); when(clusterService.state()).thenReturn(clusterState); - MockClientBuilder mockClientBuilder = new MockClientBuilder("cluster-test"); + MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test"); // For the JobConfigProvider expand groups search. // group-1 will expand to job-1 and job-2 List> fieldHits = new ArrayList<>(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NameResolverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NameResolverTests.java index 9f4bcc13cbd04..93507d3583e2c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NameResolverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NameResolverTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.utils; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.utils.NameResolver; @@ -18,7 +17,6 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; @@ -27,45 +25,36 @@ public class NameResolverTests extends ESTestCase { public void testNoMatchingNames() { - ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, - () -> newUnaliasedResolver().expand("foo", false)); - assertThat(e.getMessage(), equalTo("foo")); + assertThat(newUnaliasedResolver().expand("foo").isEmpty(), is(true)); } - public void testNoMatchingNames_GivenPatternAndAllowNoMatch() { - assertThat(newUnaliasedResolver().expand("foo*", true).isEmpty(), is(true)); + public void testNoMatchingNames_GivenPattern() { + assertThat(newUnaliasedResolver().expand("foo*").isEmpty(), is(true)); } - public void testNoMatchingNames_GivenPatternAndNotAllowNoMatch() { - ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, - () -> newUnaliasedResolver().expand("foo*", false)); - assertThat(e.getMessage(), equalTo("foo*")); - } - - public void testNoMatchingNames_GivenMatchingNameAndNonMatchingPatternAndNotAllowNoMatch() { - ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, - () -> newUnaliasedResolver("foo").expand("foo, bar*", false)); - assertThat(e.getMessage(), equalTo("bar*")); + public void testNoMatchingNames_GivenMatchingNameAndNonMatchingPattern() { + NameResolver nameResolver = newUnaliasedResolver("foo"); + assertThat(nameResolver.expand("foo,bar*"), equalTo(newSortedSet("foo"))); } public void testUnaliased() { NameResolver nameResolver = newUnaliasedResolver("foo-1", "foo-2", "bar-1", "bar-2"); - assertThat(nameResolver.expand("foo-1", false), equalTo(newSortedSet("foo-1"))); - assertThat(nameResolver.expand("foo-2", false), equalTo(newSortedSet("foo-2"))); - assertThat(nameResolver.expand("bar-1", false), equalTo(newSortedSet("bar-1"))); - assertThat(nameResolver.expand("bar-2", false), equalTo(newSortedSet("bar-2"))); - assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-*", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("bar-*", false), equalTo(newSortedSet("bar-1", "bar-2"))); - assertThat(nameResolver.expand("*oo-*", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("*-1", false), equalTo(newSortedSet("foo-1", "bar-1"))); - assertThat(nameResolver.expand("*-2", false), equalTo(newSortedSet("foo-2", "bar-2"))); - assertThat(nameResolver.expand("*", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); - assertThat(nameResolver.expand("_all", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); - assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-1,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1"))); - assertThat(nameResolver.expand("foo-*,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-1"), equalTo(newSortedSet("foo-1"))); + assertThat(nameResolver.expand("foo-2"), equalTo(newSortedSet("foo-2"))); + assertThat(nameResolver.expand("bar-1"), equalTo(newSortedSet("bar-1"))); + assertThat(nameResolver.expand("bar-2"), equalTo(newSortedSet("bar-2"))); + assertThat(nameResolver.expand("foo-1,foo-2"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-*"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("bar-*"), equalTo(newSortedSet("bar-1", "bar-2"))); + assertThat(nameResolver.expand("*oo-*"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("*-1"), equalTo(newSortedSet("foo-1", "bar-1"))); + assertThat(nameResolver.expand("*-2"), equalTo(newSortedSet("foo-2", "bar-2"))); + assertThat(nameResolver.expand("*"), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); + assertThat(nameResolver.expand("_all"), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); + assertThat(nameResolver.expand("foo-1,foo-2"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-1,bar-1"), equalTo(newSortedSet("bar-1", "foo-1"))); + assertThat(nameResolver.expand("foo-*,bar-1"), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); } public void testAliased() { @@ -79,33 +68,33 @@ public void testAliased() { NameResolver nameResolver = new TestAliasNameResolver(namesAndAliasesMap); // First try same set of assertions as unaliases - assertThat(nameResolver.expand("foo-1", false), equalTo(newSortedSet("foo-1"))); - assertThat(nameResolver.expand("foo-2", false), equalTo(newSortedSet("foo-2"))); - assertThat(nameResolver.expand("bar-1", false), equalTo(newSortedSet("bar-1"))); - assertThat(nameResolver.expand("bar-2", false), equalTo(newSortedSet("bar-2"))); - assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-*", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("bar-*", false), equalTo(newSortedSet("bar-1", "bar-2"))); - assertThat(nameResolver.expand("*oo-*", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("*-1", false), equalTo(newSortedSet("foo-1", "bar-1"))); - assertThat(nameResolver.expand("*-2", false), equalTo(newSortedSet("foo-2", "bar-2"))); - assertThat(nameResolver.expand("*", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); - assertThat(nameResolver.expand("_all", false), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); - assertThat(nameResolver.expand("foo-1,foo-2", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-1,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1"))); - assertThat(nameResolver.expand("foo-*,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-1"), equalTo(newSortedSet("foo-1"))); + assertThat(nameResolver.expand("foo-2"), equalTo(newSortedSet("foo-2"))); + assertThat(nameResolver.expand("bar-1"), equalTo(newSortedSet("bar-1"))); + assertThat(nameResolver.expand("bar-2"), equalTo(newSortedSet("bar-2"))); + assertThat(nameResolver.expand("foo-1,foo-2"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-*"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("bar-*"), equalTo(newSortedSet("bar-1", "bar-2"))); + assertThat(nameResolver.expand("*oo-*"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("*-1"), equalTo(newSortedSet("foo-1", "bar-1"))); + assertThat(nameResolver.expand("*-2"), equalTo(newSortedSet("foo-2", "bar-2"))); + assertThat(nameResolver.expand("*"), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); + assertThat(nameResolver.expand("_all"), equalTo(newSortedSet("foo-1", "foo-2", "bar-1", "bar-2"))); + assertThat(nameResolver.expand("foo-1,foo-2"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-1,bar-1"), equalTo(newSortedSet("bar-1", "foo-1"))); + assertThat(nameResolver.expand("foo-*,bar-1"), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); // No let's test the aliases - assertThat(nameResolver.expand("foo-group", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("bar-group", false), equalTo(newSortedSet("bar-1", "bar-2"))); - assertThat(nameResolver.expand("foo-group,bar-group", false), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-group,foo-1", false), equalTo(newSortedSet("foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-group,bar-1", false), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); - assertThat(nameResolver.expand("foo-group,bar-*", false), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-group"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("bar-group"), equalTo(newSortedSet("bar-1", "bar-2"))); + assertThat(nameResolver.expand("foo-group,bar-group"), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-group,foo-1"), equalTo(newSortedSet("foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-group,bar-1"), equalTo(newSortedSet("bar-1", "foo-1", "foo-2"))); + assertThat(nameResolver.expand("foo-group,bar-*"), equalTo(newSortedSet("bar-1", "bar-2", "foo-1", "foo-2"))); } private static NameResolver newUnaliasedResolver(String... names) { - return NameResolver.newUnaliased(new HashSet<>(Arrays.asList(names)), notFoundExceptionSupplier()); + return NameResolver.newUnaliased(new HashSet<>(Arrays.asList(names))); } private static SortedSet newSortedSet(String... names) { @@ -115,17 +104,12 @@ private static SortedSet newSortedSet(String... names) { } return result; } - - private static Function notFoundExceptionSupplier() { - return s -> new ResourceNotFoundException(s); - } - + private static class TestAliasNameResolver extends NameResolver { private final Map> lookup; TestAliasNameResolver(Map> lookup) { - super(notFoundExceptionSupplier()); this.lookup = lookup; }