Skip to content

Commit f26f266

Browse files
authored
Extract TimingStats-related functionality into TimingStatsReporter (#43371)
1 parent 7054a42 commit f26f266

File tree

11 files changed

+448
-364
lines changed

11 files changed

+448
-364
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ private static void addTimingStatsExceptBucketCountMapping(XContentBuilder build
864864
.startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
865865
.field(TYPE, DOUBLE)
866866
.endObject()
867-
.startObject(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName())
867+
.startObject(TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
868868
.field(TYPE, DOUBLE)
869869
.endObject();
870870
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class TimingStats implements ToXContentObject, Writeable {
3131
public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
3232
public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
3333
public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
34-
public static final ParseField EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS =
34+
public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS =
3535
new ParseField("exponential_average_bucket_processing_time_ms");
3636

3737
public static final ParseField TYPE = new ParseField("timing_stats");
@@ -49,7 +49,7 @@ public class TimingStats implements ToXContentObject, Writeable {
4949
PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
5050
PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
5151
PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
52-
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS);
52+
PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS);
5353
}
5454

5555
public static String documentId(String jobId) {
@@ -185,7 +185,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
185185
builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
186186
}
187187
if (exponentialAvgBucketProcessingTimeMs != null) {
188-
builder.field(EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
188+
builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
189189
}
190190
builder.endObject();
191191
return builder;
@@ -219,34 +219,4 @@ public int hashCode() {
219219
public String toString() {
220220
return Strings.toString(this);
221221
}
222-
223-
/**
224-
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
225-
*/
226-
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
227-
return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
228-
|| differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
229-
|| differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs)
230-
|| differSignificantly(stats1.exponentialAvgBucketProcessingTimeMs, stats2.exponentialAvgBucketProcessingTimeMs);
231-
}
232-
233-
/**
234-
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
235-
* This can be interpreted as values { value1, value2 } differing significantly from each other.
236-
* This method also returns:
237-
* - {@code true} in case one value is {@code null} while the other is not.
238-
* - {@code false} in case both values are {@code null}.
239-
*/
240-
static boolean differSignificantly(Double value1, Double value2) {
241-
if (value1 != null && value2 != null) {
242-
return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
243-
}
244-
return (value1 != null) || (value2 != null);
245-
}
246-
247-
/**
248-
* Minimum ratio of values that is interpreted as values being similar.
249-
* If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
250-
*/
251-
private static final double MIN_VALID_RATIO = 0.9;
252222
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public final class ReservedFieldNames {
179179
TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
180180
TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
181181
TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
182-
TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
182+
TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
183183

184184
GetResult._ID,
185185
GetResult._INDEX,

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
import static org.hamcrest.Matchers.closeTo;
1515
import static org.hamcrest.Matchers.equalTo;
16-
import static org.hamcrest.Matchers.is;
1716
import static org.hamcrest.Matchers.nullValue;
1817

1918
public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
@@ -124,33 +123,6 @@ public void testDocumentId() {
124123
assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats"));
125124
}
126125

127-
public void testTimingStatsDifferSignificantly() {
128-
assertThat(
129-
TimingStats.differSignificantly(
130-
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)),
131-
is(false));
132-
assertThat(
133-
TimingStats.differSignificantly(
134-
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)),
135-
is(false));
136-
assertThat(
137-
TimingStats.differSignificantly(
138-
new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)),
139-
is(true));
140-
}
141-
142-
public void testValuesDifferSignificantly() {
143-
assertThat(TimingStats.differSignificantly((Double) null, (Double) null), is(false));
144-
assertThat(TimingStats.differSignificantly(1.0, null), is(true));
145-
assertThat(TimingStats.differSignificantly(null, 1.0), is(true));
146-
assertThat(TimingStats.differSignificantly(0.9, 1.0), is(false));
147-
assertThat(TimingStats.differSignificantly(1.0, 0.9), is(false));
148-
assertThat(TimingStats.differSignificantly(0.9, 1.000001), is(true));
149-
assertThat(TimingStats.differSignificantly(1.0, 0.899999), is(true));
150-
assertThat(TimingStats.differSignificantly(0.0, 1.0), is(true));
151-
assertThat(TimingStats.differSignificantly(1.0, 0.0), is(true));
152-
}
153-
154126
/**
155127
* Creates a matcher of {@link TimingStats}s that matches when an examined stats are equal
156128
* to the specified <code>operand</code>, within a range of +/- <code>error</code>.
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.job.persistence;
7+
8+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
9+
10+
import java.util.Objects;
11+
12+
/**
13+
* {@link TimingStatsReporter} class handles the logic of persisting {@link TimingStats} if they changed significantly since the last time
14+
* they were persisted.
15+
*
16+
* This class is not thread-safe.
17+
*/
18+
public class TimingStatsReporter {
19+
20+
/** Persisted timing stats. May be stale. */
21+
private TimingStats persistedTimingStats;
22+
/** Current timing stats. */
23+
private TimingStats currentTimingStats;
24+
/** Object used to persist current timing stats. */
25+
private JobResultsPersister.Builder bulkResultsPersister;
26+
27+
public TimingStatsReporter(TimingStats timingStats, JobResultsPersister.Builder jobResultsPersister) {
28+
Objects.requireNonNull(timingStats);
29+
this.persistedTimingStats = new TimingStats(timingStats);
30+
this.currentTimingStats = new TimingStats(timingStats);
31+
this.bulkResultsPersister = Objects.requireNonNull(jobResultsPersister);
32+
}
33+
34+
public TimingStats getCurrentTimingStats() {
35+
return new TimingStats(currentTimingStats);
36+
}
37+
38+
public void reportBucketProcessingTime(long bucketProcessingTimeMs) {
39+
currentTimingStats.updateStats(bucketProcessingTimeMs);
40+
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
41+
flush();
42+
}
43+
}
44+
45+
public void flush() {
46+
persistedTimingStats = new TimingStats(currentTimingStats);
47+
bulkResultsPersister.persistTimingStats(persistedTimingStats);
48+
}
49+
50+
/**
51+
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
52+
*/
53+
public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
54+
return differSignificantly(stats1.getMinBucketProcessingTimeMs(), stats2.getMinBucketProcessingTimeMs())
55+
|| differSignificantly(stats1.getMaxBucketProcessingTimeMs(), stats2.getMaxBucketProcessingTimeMs())
56+
|| differSignificantly(stats1.getAvgBucketProcessingTimeMs(), stats2.getAvgBucketProcessingTimeMs())
57+
|| differSignificantly(stats1.getExponentialAvgBucketProcessingTimeMs(), stats2.getExponentialAvgBucketProcessingTimeMs());
58+
}
59+
60+
/**
61+
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
62+
* This can be interpreted as values { value1, value2 } differing significantly from each other.
63+
* This method also returns:
64+
* - {@code true} in case one value is {@code null} while the other is not.
65+
* - {@code false} in case both values are {@code null}.
66+
*/
67+
static boolean differSignificantly(Double value1, Double value2) {
68+
if (value1 != null && value2 != null) {
69+
return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
70+
}
71+
return (value1 != null) || (value2 != null);
72+
}
73+
74+
/**
75+
* Minimum ratio of values that is interpreted as values being similar.
76+
* If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
77+
*/
78+
private static final double MIN_VALID_RATIO = 0.9;
79+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,12 +517,13 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
517517
jobId,
518518
renormalizer,
519519
jobResultsPersister,
520+
process,
520521
autodetectParams.modelSizeStats(),
521522
autodetectParams.timingStats());
522523
ExecutorService autodetectWorkerExecutor;
523524
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
524525
autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
525-
autodetectExecutorService.submit(() -> processor.process(process));
526+
autodetectExecutorService.submit(processor::process);
526527
} catch (EsRejectedExecutionException e) {
527528
// If submitting the operation to read the results from the process fails we need to close
528529
// the process too, so that other submitted operations to threadpool are stopped.

0 commit comments

Comments
 (0)