Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public final class Messages {
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";

public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists";
public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";
public static final String FILTER_NOT_FOUND = "No filter with id [{0}] exists";

public static final String INCONSISTENT_ID =
"Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -23,6 +25,8 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
Expand Down Expand Up @@ -67,18 +71,18 @@ protected AcknowledgedResponse newResponse() {

@Override
protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
ActionListener<AcknowledgedResponse> listener) {
if (request.isForce()) {
forceDeleteDatafeed(request, state, listener);
} else {
deleteDatafeedConfig(request, listener);
deleteDatafeedConfig(request, state, listener);
}
}

private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
ActionListener<Boolean> finalListener = ActionListener.wrap(
response -> deleteDatafeedConfig(request, listener),
response -> deleteDatafeedConfig(request, state, listener),
listener::onFailure
);

Expand Down Expand Up @@ -117,7 +121,8 @@ public void onFailure(Exception e) {
}
}

private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
// Check datafeed is stopped
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) {
Expand All @@ -126,10 +131,39 @@ private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionLi
return;
}

datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
listener::onFailure
));
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
if (mlMetadata.getDatafeed(request.getDatafeedId()) != null) {
deleteDatafeedFromMetadata(request, listener);
} else {
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
listener::onFailure
));
}
}

private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("delete-datafeed-" + request.getDatafeedId(),
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {

@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}

@Override
public ClusterState execute(ClusterState currentState) {
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
PersistentTasksCustomMetaData persistentTasks =
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
.removeDatafeed(request.getDatafeedId(), persistentTasks).build();
return ClusterState.builder(currentState).metaData(
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build())
.build();
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ protected void doExecute(DeleteExpiredDataAction.Request request, ActionListener
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener) {
Auditor auditor = new Auditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, auditor),
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredForecastsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, threadPool),
new ExpiredModelSnapshotsRemover(client, clusterService, threadPool),
new UnusedStateRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,68 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportDeleteFilterAction extends HandledTransportAction<DeleteFilterAction.Request, AcknowledgedResponse> {

private final Client client;
private final ClusterService clusterService;
private final JobConfigProvider jobConfigProvider;

@Inject
public TransportDeleteFilterAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobConfigProvider jobConfigProvider) {
Client client, ClusterService clusterService,
JobConfigProvider jobConfigProvider) {
super(settings, DeleteFilterAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, DeleteFilterAction.Request::new);
this.client = client;
this.clusterService = clusterService;
this.jobConfigProvider = jobConfigProvider;
}

@Override
protected void doExecute(DeleteFilterAction.Request request, ActionListener<AcknowledgedResponse> listener) {
final String filterId = request.getFilterId();

List<String> clusterStateJobsUsingFilter = clusterStateJobsUsingFilter(filterId, clusterService.state());
if (clusterStateJobsUsingFilter.isEmpty() == false) {
listener.onFailure(ExceptionsHelper.conflictStatusException(
Messages.getMessage(Messages.FILTER_CANNOT_DELETE, filterId, clusterStateJobsUsingFilter)));
return;
}

jobConfigProvider.findJobsWithCustomRules(ActionListener.wrap(
jobs-> {
List<String> currentlyUsedBy = findJobsUsingFilter(jobs, filterId);
if (!currentlyUsedBy.isEmpty()) {
listener.onFailure(ExceptionsHelper.conflictStatusException(
"Cannot delete filter, currently used by jobs: " + currentlyUsedBy));
Messages.getMessage(Messages.FILTER_CANNOT_DELETE, filterId, currentlyUsedBy)));
} else {
deleteFilter(filterId, listener);
}
Expand All @@ -70,7 +87,7 @@ protected void doExecute(DeleteFilterAction.Request request, ActionListener<Ackn
);
}

private static List<String> findJobsUsingFilter(List<Job> jobs, String filterId) {
private static List<String> findJobsUsingFilter(Collection<Job> jobs, String filterId) {
List<String> currentlyUsedBy = new ArrayList<>();
for (Job job : jobs) {
List<Detector> detectors = job.getAnalysisConfig().getDetectors();
Expand All @@ -84,6 +101,11 @@ private static List<String> findJobsUsingFilter(List<Job> jobs, String filterId)
return currentlyUsedBy;
}

private static List<String> clusterStateJobsUsingFilter(String filterId, ClusterState state) {
Map<String, Job> jobs = MlMetadata.getMlMetadata(state).getJobs();
return findJobsUsingFilter(jobs.values(), filterId);
}

private void deleteFilter(String filterId, ActionListener<AcknowledgedResponse> listener) {
DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE,
MlFilter.documentId(filterId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
Expand Down Expand Up @@ -93,7 +93,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
private final PersistentTasksService persistentTasksService;
private final Auditor auditor;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final JobManager jobManager;
private final DatafeedConfigProvider datafeedConfigProvider;
private final MlMemoryTracker memoryTracker;

Expand All @@ -110,15 +110,15 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
Client client, Auditor auditor, JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
JobManager jobManager, DatafeedConfigProvider datafeedConfigProvider,
MlMemoryTracker memoryTracker) {
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, DeleteJobAction.Request::new);
this.client = client;
this.persistentTasksService = persistentTasksService;
this.auditor = auditor;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.jobManager = jobManager;
this.datafeedConfigProvider = datafeedConfigProvider;
this.memoryTracker = memoryTracker;
this.listenersByJobId = new HashMap<>();
Expand Down Expand Up @@ -189,7 +189,15 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
finalListener.onFailure(e);
});

markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener);
ActionListener<Boolean> checkForDatafeedsListener = ActionListener.wrap(
ok -> jobManager.markJobAsDeleting(request.getJobId(), request.isForce(), markAsDeletingListener),
finalListener::onFailure
);

// This check only applies to index configurations.
// ClusterState config makes the same check against the
// job being used by a datafeed in MlMetadata.markJobAsDeleting()
checkJobNotUsedByDatafeed(request.getJobId(), checkForDatafeedsListener);
}

private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) {
Expand Down Expand Up @@ -231,7 +239,7 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ
// Step 3. When the physical storage has been deleted, delete the job config document
// -------
// Don't report an error if the document has already been deleted
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false,
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobManager.deleteJob(request,
ActionListener.wrap(
deleteResponse -> apiResponseHandler.accept(Boolean.TRUE),
listener::onFailure
Expand Down Expand Up @@ -332,9 +340,8 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
);

// Step 5. Determine if we are on a shared index by looking at `.ml-anomalies-shared` or the custom index's aliases
ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
builder -> {
Job job = builder.build();
ActionListener<Job> getJobHandler = ActionListener.wrap(
job -> {
indexName.set(job.getResultsIndexName());
if (indexName.get().equals(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT)) {
Expand All @@ -357,7 +364,7 @@ private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, Stri
// Step 4. Get the job as the result index name is required
ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
response -> {
jobConfigProvider.getJob(jobId, getJobHandler);
jobManager.getJob(jobId, getJobHandler);
},
failureHandler
);
Expand Down Expand Up @@ -574,16 +581,15 @@ private void checkJobIsNotOpen(String jobId, ClusterState state) {
}
}

private void markJobAsDeletingIfNotUsed(String jobId, ActionListener<Boolean> listener) {

private void checkJobNotUsedByDatafeed(String jobId, ActionListener<Boolean> listener) {
datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
datafeedIds -> {
if (datafeedIds.isEmpty() == false) {
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
+ datafeedIds.iterator().next() + "] refers to it"));
return;
}
jobConfigProvider.markJobAsDeleting(jobId, listener);
listener.onResponse(Boolean.TRUE);
},
listener::onFailure
));
Expand Down
Loading