From 2531fcf1f6e52bb9d7706e8f369627e413e53133 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 14 Nov 2018 17:49:50 +0000 Subject: [PATCH 1/5] Delete expired data for cluster state jobs --- .../TransportDeleteExpiredDataAction.java | 4 +- .../AbstractExpiredJobDataRemover.java | 22 +++++++-- .../ExpiredModelSnapshotsRemover.java | 11 ++--- .../job/retention/ExpiredResultsRemover.java | 10 ++-- .../AbstractExpiredJobDataRemoverTests.java | 49 +++++++++++++++++-- .../ExpiredModelSnapshotsRemoverTests.java | 21 ++++++-- .../retention/ExpiredResultsRemoverTests.java | 18 +++++-- 7 files changed, 107 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index b9668c4de3433..dcf5960a2f0bc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -56,9 +56,9 @@ protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener private void deleteExpiredData(ActionListener listener) { Auditor auditor = new Auditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( - new ExpiredResultsRemover(client, auditor), + new ExpiredResultsRemover(client, clusterService, auditor), new ExpiredForecastsRemover(client, threadPool), - new ExpiredModelSnapshotsRemover(client, threadPool), + new ExpiredModelSnapshotsRemover(client, clusterService, threadPool), new UnusedStateRemover(client, clusterService) ); Iterator dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index b595c564ab9aa..eda8f9a6a95a5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -7,9 +7,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -18,6 +21,7 @@ import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; +import java.util.ArrayList; import java.util.Deque; import java.util.Iterator; import java.util.List; @@ -34,9 +38,15 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { private final Client client; + private final ClusterService clusterService; - AbstractExpiredJobDataRemover(Client client) { + AbstractExpiredJobDataRemover(Client client, ClusterService clusterService) { this.client = client; + this.clusterService = clusterService; + } + + protected Client getClient() { + return client; } @Override @@ -66,8 +76,13 @@ private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener jobs = new ArrayList<>(MlMetadata.getMlMetadata(clusterState).getJobs().values()); + VolatileCursorIterator clusterStateJobs = new VolatileCursorIterator<>(jobs); + BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName()); - return new WrappedBatchedJobsIterator(jobsIterator); + return new WrappedBatchedJobsIterator(jobsIterator, clusterStateJobs); } private long calcCutoffEpochMs(long retentionDays) { @@ -99,8 +114,9 @@ private class WrappedBatchedJobsIterator implements Iterator { private final BatchedJobsIterator batchedIterator; private VolatileCursorIterator currentBatch; - WrappedBatchedJobsIterator(BatchedJobsIterator batchedIterator) { + WrappedBatchedJobsIterator(BatchedJobsIterator batchedIterator, VolatileCursorIterator currentBatch) { this.batchedIterator = batchedIterator; + this.currentBatch = currentBatch; } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 74ff0c8dc8776..0114bd322f95c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -54,12 +55,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover */ private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000; - private final Client client; private final ThreadPool threadPool; - public ExpiredModelSnapshotsRemover(Client client, ThreadPool threadPool) { - super(client); - this.client = Objects.requireNonNull(client); + public ExpiredModelSnapshotsRemover(Client client, ClusterService clusterService, ThreadPool threadPool) { + super(client, clusterService); this.threadPool = Objects.requireNonNull(threadPool); } @@ -90,7 +89,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener(LOGGER, threadPool, + getClient().execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); } @@ -124,7 +123,7 @@ private void deleteModelSnapshots(Iterator modelSnapshotIterator, ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request( modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); - client.execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { + getClient().execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { @Override public void onResponse(AcknowledgedResponse response) { try { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index d7b16ef8c468e..64cf7550ee362 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -44,12 +45,11 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { private static final Logger LOGGER = LogManager.getLogger(ExpiredResultsRemover.class); - private final Client client; + private final Auditor auditor; - public ExpiredResultsRemover(Client client, Auditor auditor) { - super(client); - this.client = Objects.requireNonNull(client); + public ExpiredResultsRemover(Client client, ClusterService clusterService, Auditor auditor) { + super(client, clusterService); this.auditor = Objects.requireNonNull(auditor); } @@ -63,7 +63,7 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener() { + getClient().execute(DeleteByQueryAction.INSTANCE, request, new ActionListener() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { try { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index c2318d1cb4664..7eb05916b07c3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -9,6 +9,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -16,6 +20,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.junit.Before; @@ -43,8 +48,8 @@ private class ConcreteExpiredJobDataRemover extends AbstractExpiredJobDataRemove private int getRetentionDaysCallCount = 0; - ConcreteExpiredJobDataRemover(Client client) { - super(client); + ConcreteExpiredJobDataRemover(Client client, ClusterService clusterService) { + super(client, clusterService); } @Override @@ -61,10 +66,12 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener toXContents) throws IOException { @@ -92,8 +99,11 @@ public void testRemoveGivenNoJobs() throws IOException { when(future.actionGet()).thenReturn(response); when(client.search(any())).thenReturn(future); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + when(clusterService.state()).thenReturn(clusterState); + TestListener listener = new TestListener(); - ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client, clusterService); remover.remove(listener); listener.waitToCompletion(); @@ -103,6 +113,10 @@ public void testRemoveGivenNoJobs() throws IOException { public void testRemoveGivenMulipleBatches() throws IOException { + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + when(clusterService.state()).thenReturn(clusterState); + // This is testing AbstractExpiredJobDataRemover.WrappedBatchedJobsIterator int totalHits = 7; List responses = new ArrayList<>(); @@ -130,7 +144,7 @@ public void testRemoveGivenMulipleBatches() throws IOException { when(client.search(any())).thenReturn(future); TestListener listener = new TestListener(); - ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client, clusterService); remover.remove(listener); listener.waitToCompletion(); @@ -139,6 +153,33 @@ public void testRemoveGivenMulipleBatches() throws IOException { assertEquals(remover.getRetentionDaysCallCount, 7); } + public void testIterateOverClusterStateJobs() throws IOException { + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(JobTests.buildJobBuilder("csjob1").build(), false); + mlMetadata.putJob(JobTests.buildJobBuilder("csjob2").build(), false); + mlMetadata.putJob(JobTests.buildJobBuilder("csjob3").build(), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + SearchResponse response = createSearchResponse(Collections.emptyList()); + + ActionFuture future = mock(ActionFuture.class); + when(future.actionGet()).thenReturn(response); + when(client.search(any())).thenReturn(future); + + TestListener listener = new TestListener(); + ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(client, clusterService); + remover.remove(listener); + + listener.waitToCompletion(); + assertThat(listener.success, is(true)); + assertEquals(remover.getRetentionDaysCallCount, 3); + } + static class TestListener implements ActionListener { boolean success; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 02d747fb80a50..e5a4b1d14da69 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -12,6 +12,9 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.test.ESTestCase; @@ -31,6 +34,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener; @@ -46,6 +50,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { private Client client; + private ClusterService clusterService; private ThreadPool threadPool; private List capturedSearchRequests; private List capturedDeleteModelSnapshotRequests; @@ -60,6 +65,10 @@ public void setUpTests() { client = mock(Client.class); listener = new TestListener(); + clusterService = mock(ClusterService.class); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + when(clusterService.state()).thenReturn(clusterState); + // Init thread pool Settings settings = Settings.builder() .put("node.name", "expired_model_snapshots_remover_test") @@ -90,7 +99,7 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException { public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException { givenClientRequestsSucceed(); - givenJobs(Arrays.asList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); + givenJobs(Collections.singletonList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build())); createExpiredModelSnapshotsRemover().remove(listener); @@ -110,7 +119,7 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); @@ -147,7 +156,7 @@ public void testRemove_GivenClientSearchRequestsFail() throws IOException { List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); @@ -173,7 +182,7 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-2", "snapshots-2_1")); + List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); @@ -192,6 +201,7 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); } + @SuppressWarnings("unchecked") private void givenJobs(List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); @@ -201,7 +211,7 @@ private void givenJobs(List jobs) throws IOException { } private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() { - return new ExpiredModelSnapshotsRemover(client, threadPool); + return new ExpiredModelSnapshotsRemover(client, clusterService, threadPool); } private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) { @@ -220,6 +230,7 @@ private void givenClientDeleteModelSnapshotRequestsFail() { givenClientRequests(true, false); } + @SuppressWarnings("unchecked") private void givenClientRequests(boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) { doAnswer(new Answer() { int callCount = 0; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index 7dc258a322ac3..5caac27368712 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -9,6 +9,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -42,19 +45,26 @@ public class ExpiredResultsRemoverTests extends ESTestCase { private Client client; + private ClusterService clusterService; private List capturedDeleteByQueryRequests; private ActionListener listener; @Before + @SuppressWarnings("unchecked") public void setUpTests() { capturedDeleteByQueryRequests = new ArrayList<>(); client = mock(Client.class); + + clusterService = mock(ClusterService.class); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + when(clusterService.state()).thenReturn(clusterState); + ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); doAnswer(new Answer() { @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + public Void answer(InvocationOnMock invocationOnMock) { capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -132,10 +142,11 @@ private void givenClientRequestsFailed() { givenClientRequests(false); } + @SuppressWarnings("unchecked") private void givenClientRequests(boolean shouldSucceed) { doAnswer(new Answer() { @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + public Void answer(InvocationOnMock invocationOnMock) { capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]); ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -151,6 +162,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { }).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any()); } + @SuppressWarnings("unchecked") private void givenJobs(List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); @@ -160,6 +172,6 @@ private void givenJobs(List jobs) throws IOException { } private ExpiredResultsRemover createExpiredResultsRemover() { - return new ExpiredResultsRemover(client, mock(Auditor.class)); + return new ExpiredResultsRemover(client, clusterService, mock(Auditor.class)); } } \ No newline at end of file From f53adeebc140d8ab75dc00da2fa6779a518a4193 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 14 Nov 2018 17:50:09 +0000 Subject: [PATCH 2/5] Check cluster state jobs when deleting a filter --- .../xpack/core/ml/job/messages/Messages.java | 3 +- .../action/TransportDeleteFilterAction.java | 28 +++++++++++++++++-- .../rest-api-spec/test/ml/filter_crud.yml | 2 +- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index fff65ca2b4fae..271ac461c2844 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -43,8 +43,9 @@ public final class Messages { "Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]"; public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists"; - public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists"; + public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs [{1}]"; public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed"; + public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists"; public static final String INCONSISTENT_ID = "Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java index eddc79a7bb4c4..0c490f803345e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java @@ -16,22 +16,28 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlMetaIndex; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -39,28 +45,39 @@ public class TransportDeleteFilterAction extends HandledTransportAction { private final Client client; + private final ClusterService clusterService; private final JobConfigProvider jobConfigProvider; @Inject public TransportDeleteFilterAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Client client, JobConfigProvider jobConfigProvider) { + Client client, ClusterService clusterService, + JobConfigProvider jobConfigProvider) { super(settings, DeleteFilterAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeleteFilterAction.Request::new); this.client = client; + this.clusterService = clusterService; this.jobConfigProvider = jobConfigProvider; } @Override protected void doExecute(DeleteFilterAction.Request request, ActionListener listener) { final String filterId = request.getFilterId(); + + List clusterStateJobsUsingFilter = clusterStateJobsUsingFilter(filterId, clusterService.state()); + if (clusterStateJobsUsingFilter.isEmpty() == false) { + listener.onFailure(ExceptionsHelper.conflictStatusException( + Messages.getMessage(Messages.FILTER_CANNOT_DELETE, filterId, clusterStateJobsUsingFilter))); + return; + } + jobConfigProvider.findJobsWithCustomRules(ActionListener.wrap( jobs-> { List currentlyUsedBy = findJobsUsingFilter(jobs, filterId); if (!currentlyUsedBy.isEmpty()) { listener.onFailure(ExceptionsHelper.conflictStatusException( - "Cannot delete filter, currently used by jobs: " + currentlyUsedBy)); + Messages.getMessage(Messages.FILTER_CANNOT_DELETE, filterId, currentlyUsedBy))); } else { deleteFilter(filterId, listener); } @@ -70,7 +87,7 @@ protected void doExecute(DeleteFilterAction.Request request, ActionListener findJobsUsingFilter(List jobs, String filterId) { + private static List findJobsUsingFilter(Collection jobs, String filterId) { List currentlyUsedBy = new ArrayList<>(); for (Job job : jobs) { List detectors = job.getAnalysisConfig().getDetectors(); @@ -84,6 +101,11 @@ private static List findJobsUsingFilter(List jobs, String filterId) return currentlyUsedBy; } + private static List clusterStateJobsUsingFilter(String filterId, ClusterState state) { + Map jobs = MlMetadata.getMlMetadata(state).getJobs(); + return findJobsUsingFilter(jobs.values(), filterId); + } + private void deleteFilter(String filterId, ActionListener listener) { DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, MlFilter.documentId(filterId)); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/filter_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/filter_crud.yml index 28b5d5c9315e8..fb4b3e764816c 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/filter_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/filter_crud.yml @@ -246,7 +246,7 @@ setup: } } - do: - catch: conflict + catch: /Cannot delete filter \[filter-foo\] currently used by jobs \[filter-crud\]/ xpack.ml.delete_filter: filter_id: "filter-foo" From 07be76eae616e6e81d57a894275fc40d5b895293 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 14 Nov 2018 17:50:40 +0000 Subject: [PATCH 3/5] Delete datafeed from cs --- .../action/TransportDeleteDatafeedAction.java | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 3178c9980b65e..a7dbb9d4f93b6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -11,10 +11,12 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -23,6 +25,8 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; @@ -67,18 +71,18 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { if (request.isForce()) { forceDeleteDatafeed(request, state, listener); } else { - deleteDatafeedConfig(request, listener); + deleteDatafeedConfig(request, state, listener); } } private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state, ActionListener listener) { ActionListener finalListener = ActionListener.wrap( - response -> deleteDatafeedConfig(request, listener), + response -> deleteDatafeedConfig(request, state, listener), listener::onFailure ); @@ -117,7 +121,8 @@ public void onFailure(Exception e) { } } - private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener listener) { + private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ClusterState state, + ActionListener listener) { // Check datafeed is stopped PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) { @@ -126,10 +131,39 @@ private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionLi return; } - datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( - deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), - listener::onFailure - )); + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) { + deleteDatafeedFromMetadata(request, listener); + } else { + datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( + deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), + listener::onFailure + )); + } + } + + private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener listener) { + clusterService.submitStateUpdateTask("delete-datafeed-" + request.getDatafeedId(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + XPackPlugin.checkReadyForXPackCustomMetadata(currentState); + MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState); + PersistentTasksCustomMetaData persistentTasks = + currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) + .removeDatafeed(request.getDatafeedId(), persistentTasks).build(); + return ClusterState.builder(currentState).metaData( + MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()) + .build(); + } + }); } @Override From 0079c8f1e65965682ac08a5c20d991216a0265fc Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 14 Nov 2018 18:42:14 +0000 Subject: [PATCH 4/5] Delete filter unit tests --- .../xpack/core/ml/job/messages/Messages.java | 2 +- .../TransportDeleteFilterActionTests.java | 84 +++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterActionTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 271ac461c2844..b083977bb4b7e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -43,7 +43,7 @@ public final class Messages { "Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]"; public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists"; - public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs [{1}]"; + public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}"; public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed"; public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists"; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterActionTests.java new file mode 100644 index 0000000000000..f18e1e1010eeb --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterActionTests.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.config.RuleScope; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; + +import java.util.Collections; +import java.util.Date; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportDeleteFilterActionTests extends ESTestCase { + + public void testDoExecute_ClusterStateJobUsesFilter() { + + Job.Builder builder = creatJobUsingFilter("job-using-filter", "filter-foo"); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(builder.build(), false); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(clusterState); + + TransportDeleteFilterAction action = new TransportDeleteFilterAction(Settings.EMPTY, mock(ThreadPool.class), + mock(TransportService.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), + mock(Client.class), clusterService, mock(JobConfigProvider.class)); + + DeleteFilterAction.Request request = new DeleteFilterAction.Request("filter-foo"); + AtomicReference requestException = new AtomicReference<>(); + action.doExecute(request, ActionListener.wrap( + response -> fail("response was not expected"), + requestException::set + )); + + assertThat(requestException.get(), instanceOf(ElasticsearchStatusException.class)); + assertEquals("Cannot delete filter [filter-foo] currently used by jobs [job-using-filter]", requestException.get().getMessage()); + } + + private Job.Builder creatJobUsingFilter(String jobId, String filterId) { + Detector.Builder detectorReferencingFilter = new Detector.Builder("count", null); + detectorReferencingFilter.setByFieldName("foo"); + DetectionRule filterRule = new DetectionRule.Builder(RuleScope.builder().exclude("foo", filterId)).build(); + detectorReferencingFilter.setRules(Collections.singletonList(filterRule)); + AnalysisConfig.Builder filterAnalysisConfig = new AnalysisConfig.Builder(Collections.singletonList( + detectorReferencingFilter.build())); + + Job.Builder builder = new Job.Builder(jobId); + builder.setAnalysisConfig(filterAnalysisConfig); + builder.setDataDescription(new DataDescription.Builder()); + builder.setCreateTime(new Date()); + return builder; + } +} + + From 76bec797cb9d84fa7cd420090feb5f4fe33e98ee Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 15 Nov 2018 13:40:20 +0000 Subject: [PATCH 5/5] Delete job from cluster state or index --- .../ml/action/TransportDeleteJobAction.java | 32 ++++++----- .../xpack/ml/job/ClusterStateJobUpdate.java | 57 +++++++++++++++++++ .../xpack/ml/job/JobManager.java | 36 ++++++++++++ 3 files changed, 112 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 0b531cad86492..9eb526e529a37 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -65,7 +65,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -93,7 +93,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction(); @@ -189,7 +189,15 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust finalListener.onFailure(e); }); - markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener); + ActionListener checkForDatafeedsListener = ActionListener.wrap( + ok -> jobManager.markJobAsDeleting(request.getJobId(), request.isForce(), markAsDeletingListener), + finalListener::onFailure + ); + + // This check only applies to index configurations. + // ClusterState config makes the same check against the + // job being used by a datafeed in MlMetadata.markJobAsDeleting() + checkJobNotUsedByDatafeed(request.getJobId(), checkForDatafeedsListener); } private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) { @@ -231,7 +239,7 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ // Step 3. When the physical storage has been deleted, delete the job config document // ------- // Don't report an error if the document has already been deleted - CheckedConsumer deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false, + CheckedConsumer deleteJobStateHandler = response -> jobManager.deleteJob(request, ActionListener.wrap( deleteResponse -> apiResponseHandler.accept(Boolean.TRUE), listener::onFailure @@ -332,9 +340,8 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri ); // Step 5. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases - ActionListener getJobHandler = ActionListener.wrap( - builder -> { - Job job = builder.build(); + ActionListener getJobHandler = ActionListener.wrap( + job -> { indexName.set(job.getResultsIndexName()); if (indexName.get().equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) { @@ -357,7 +364,7 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri // Step 4. Get the job as the result index name is required ActionListener deleteCategorizerStateHandler = ActionListener.wrap( response -> { - jobConfigProvider.getJob(jobId, getJobHandler); + jobManager.getJob(jobId, getJobHandler); }, failureHandler ); @@ -574,8 +581,7 @@ private void checkJobIsNotOpen(String jobId, ClusterState state) { } } - private void markJobAsDeletingIfNotUsed(String jobId, ActionListener listener) { - + private void checkJobNotUsedByDatafeed(String jobId, ActionListener listener) { datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap( datafeedIds -> { if (datafeedIds.isEmpty() == false) { @@ -583,7 +589,7 @@ private void markJobAsDeletingIfNotUsed(String jobId, ActionListener li + datafeedIds.iterator().next() + "] refers to it")); return; } - jobConfigProvider.markJobAsDeleting(jobId, listener); + listener.onResponse(Boolean.TRUE); }, listener::onFailure )); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java index 19f32730bc31d..2a5bac012898e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/ClusterStateJobUpdate.java @@ -5,10 +5,16 @@ */ package org.elasticsearch.xpack.ml.job; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; /** @@ -44,4 +50,55 @@ private static ClusterState buildNewClusterState(ClusterState currentState, MlMe newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()); return newState.build(); } + + public static void markJobAsDeleting(String jobId, boolean force, ClusterService clusterService, ActionListener listener) { + clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); + builder.markJobAsDeleting(jobId, tasks, force); + return buildNewClusterState(currentState, builder); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(true); + } + }); + } + + public static void deleteJob(DeleteJobAction.Request request, ClusterService clusterService, ActionListener listener) { + String jobId = request.getJobId(); + + clusterService.submitStateUpdateTask( + "delete-job-" + jobId, + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected Boolean newResponse(boolean acknowledged) { + return acknowledged; + } + + @Override + public ClusterState execute(ClusterState currentState) { + MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); + if (currentMlMetadata.getJobs().containsKey(jobId) == false) { + // We wouldn't have got here if the job never existed so + // the Job must have been deleted by another action. + // Don't error in this case + return currentState; + } + + MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); + builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); + return buildNewClusterState(currentState, builder); + } + }); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 707bb0801188d..8287e7b72944b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -299,6 +300,41 @@ public void expandJobIds(String expression, boolean allowNoJobs, ActionListener< )); } + /** + * Mark the job as being deleted. First looks in the cluster state for the + * job configuration then the index + * + * @param jobId To to mark + * @param force Allows an open job to be marked + * @param listener listener + */ + public void markJobAsDeleting(String jobId, boolean force, ActionListener listener) { + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId)) { + ClusterStateJobUpdate.markJobAsDeleting(jobId, force, clusterService, listener); + } else { + jobConfigProvider.markJobAsDeleting(jobId, listener); + } + } + + /** + * First try to delete the job from the cluster state, if it does not exist + * there try to delete the index job. + * + * @param request The delete job request + * @param listener Delete listener + */ + public void deleteJob(DeleteJobAction.Request request, ActionListener listener) { + + if (ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), request.getJobId())) { + ClusterStateJobUpdate.deleteJob(request, clusterService, listener); + } else { + jobConfigProvider.deleteJob(request.getJobId(), false, ActionListener.wrap( + deleteResponse -> listener.onResponse(Boolean.TRUE), + listener::onFailure + )); + } + } + /** * Validate the char filter/tokenizer/token filter names used in the categorization analyzer config (if any). * This validation has to be done server-side; it cannot be done in a client as that won't have loaded the