-
Notifications
You must be signed in to change notification settings - Fork 25.6k
[ML] Job in Index: Convert job data remover to work with index configs #34532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
| package org.elasticsearch.xpack.ml.job.persistence; | ||
|
|
||
| import org.elasticsearch.ElasticsearchParseException; | ||
| import org.elasticsearch.client.Client; | ||
| import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; | ||
| import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
| import org.elasticsearch.common.xcontent.XContentFactory; | ||
| import org.elasticsearch.common.xcontent.XContentParser; | ||
| import org.elasticsearch.common.xcontent.XContentType; | ||
| import org.elasticsearch.index.query.QueryBuilder; | ||
| import org.elasticsearch.index.query.TermQueryBuilder; | ||
| import org.elasticsearch.search.SearchHit; | ||
| import org.elasticsearch.xpack.core.ml.job.config.Job; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
|
|
||
| public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> { | ||
|
|
||
| public BatchedJobsIterator(Client client, String index) { | ||
| super(client, index); | ||
| } | ||
|
|
||
| @Override | ||
| protected QueryBuilder getQuery() { | ||
| return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE); | ||
| } | ||
|
|
||
| @Override | ||
| protected Job.Builder map(SearchHit hit) { | ||
| try (InputStream stream = hit.getSourceRef().streamInput(); | ||
| XContentParser parser = XContentFactory.xContent(XContentType.JSON) | ||
| .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { | ||
| return Job.LENIENT_PARSER.apply(parser, null); | ||
| } catch (IOException e) { | ||
| throw new ElasticsearchParseException("failed to parse job document [" + hit.getId() + "]", e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,23 +6,23 @@ | |
| package org.elasticsearch.xpack.ml.job.retention; | ||
|
|
||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.client.Client; | ||
| import org.elasticsearch.common.unit.TimeValue; | ||
| import org.elasticsearch.index.query.BoolQueryBuilder; | ||
| import org.elasticsearch.index.query.QueryBuilders; | ||
| import org.elasticsearch.xpack.core.ml.MlMetadata; | ||
| import org.elasticsearch.xpack.core.ml.job.config.Job; | ||
| import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; | ||
| import org.elasticsearch.xpack.core.ml.job.results.Result; | ||
| import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; | ||
| import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; | ||
| import org.joda.time.DateTime; | ||
| import org.joda.time.chrono.ISOChronology; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Deque; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| /** | ||
| * Removes job data that expired with respect to their retention period. | ||
|
|
@@ -33,23 +33,29 @@ | |
| */ | ||
| abstract class AbstractExpiredJobDataRemover implements MlDataRemover { | ||
|
|
||
| private final ClusterService clusterService; | ||
| private final Client client; | ||
|
|
||
| AbstractExpiredJobDataRemover(ClusterService clusterService) { | ||
| this.clusterService = Objects.requireNonNull(clusterService); | ||
| AbstractExpiredJobDataRemover(Client client) { | ||
| this.client = client; | ||
| } | ||
|
|
||
| @Override | ||
| public void remove(ActionListener<Boolean> listener) { | ||
| removeData(newJobIterator(), listener); | ||
| } | ||
|
|
||
| private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> listener) { | ||
| private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener) { | ||
| if (jobIterator.hasNext() == false) { | ||
| listener.onResponse(true); | ||
| return; | ||
| } | ||
| Job job = jobIterator.next(); | ||
| if (job == null) { | ||
| // maybe null of the batched iterator search return no results | ||
|
||
| listener.onResponse(true); | ||
| return; | ||
| } | ||
|
|
||
| Long retentionDays = getRetentionDays(job); | ||
| if (retentionDays == null) { | ||
| removeData(jobIterator, listener); | ||
|
|
@@ -59,14 +65,9 @@ private void removeData(Iterator<Job> jobIterator, ActionListener<Boolean> liste | |
| removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(response -> removeData(jobIterator, listener), listener::onFailure)); | ||
| } | ||
|
|
||
| private Iterator<Job> newJobIterator() { | ||
| ClusterState clusterState = clusterService.state(); | ||
| List<Job> jobs = new ArrayList<>(MlMetadata.getMlMetadata(clusterState).getJobs().values()); | ||
| return createVolatileCursorIterator(jobs); | ||
| } | ||
|
|
||
| protected static <T> Iterator<T> createVolatileCursorIterator(List<T> items) { | ||
| return new VolatileCursorIterator<T>(items); | ||
| private WrappedBatchedJobsIterator newJobIterator() { | ||
| BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName()); | ||
| return new WrappedBatchedJobsIterator(jobsIterator); | ||
| } | ||
|
|
||
| private long calcCutoffEpochMs(long retentionDays) { | ||
|
|
@@ -87,4 +88,49 @@ protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) | |
| .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId)) | ||
| .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis")); | ||
| } | ||
|
|
||
| /** | ||
| * BatchedJobsIterator efficiently returns batches of jobs using a scroll | ||
| * search but AbstractExpiredJobDataRemover works with one job at a time. | ||
| * This class abstracts away the logic of pulling one job at a time from | ||
| * multiple batches. | ||
| */ | ||
| private class WrappedBatchedJobsIterator implements Iterator<Job> { | ||
| private final BatchedJobsIterator batchedIterator; | ||
| private VolatileCursorIterator<Job> batch; | ||
|
|
||
| WrappedBatchedJobsIterator(BatchedJobsIterator batchedIterator) { | ||
| this.batchedIterator = batchedIterator; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean hasNext() { | ||
| return (batch != null && batch.hasNext()) || batchedIterator.hasNext(); | ||
| } | ||
|
|
||
| /** | ||
| * Before BatchedJobsIterator has run a search it reports hasNext == true | ||
| * but the first search may return no results. In that case null is return | ||
| * and clients have to handle null. | ||
| */ | ||
| @Override | ||
| public Job next() { | ||
| if (batch != null && batch.hasNext()) { | ||
| return batch.next(); | ||
| } | ||
|
|
||
| // batch is either null or all its elements have been iterated. | ||
| // get the next batch | ||
| batch = createBatchIteratorFromBatch(batchedIterator.next()); | ||
|
||
|
|
||
| // BatchedJobsIterator.hasNext maybe true if searching the first time | ||
| // but no results are returned. | ||
| return batch.hasNext() ? batch.next() : null; | ||
| } | ||
|
|
||
| private VolatileCursorIterator<Job> createBatchIteratorFromBatch(Deque<Job.Builder> builders) { | ||
| List<Job> jobs = builders.stream().map(Job.Builder::build).collect(Collectors.toList()); | ||
| return new VolatileCursorIterator<>(jobs); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't need the builders, we could make this
BatchedDocumentsIterator<Job>and build inmap().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the beginning I made the decision that builders are returned when the config document is read. This is proving to be the wrong choice as I now have to build everywhere, the job should be returned and it should be re-validated before opening a job etc. I will open an issue for this. Whether build is called by the client or in
mapis moot