Skip to content

Commit 349a68e

Browse files
Add support for field level segment stats in opensearch stats api
Signed-off-by: Abhinav Tripathi <[email protected]>
1 parent 29b113a commit 349a68e

File tree

19 files changed

+756
-55
lines changed

19 files changed

+756
-55
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005))
1515
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
1616
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
17+
- Add field-level segment statistics to _stats API ([#19747](https://github.com/opensearch-project/OpenSearch/pull/19747))
1718

1819
### Changed
1920
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,15 @@ public void testAutoForceMergeFeatureFlagDisabled() throws InterruptedException,
8686
assertNotNull(shard);
8787

8888
// Before stats
89-
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
89+
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false, false);
9090

9191
// This is to make sure auto force merge action gets triggered multiple times ang gets successful at least once.
9292
Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3);
9393
// refresh to clear old segments
9494
flushAndRefresh(INDEX_NAME_1);
9595

9696
// After stats
97-
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
97+
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false, false);
9898
assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount());
9999

100100
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
@@ -123,9 +123,9 @@ public void testAutoForceMergeTriggeringWithOneShardOfNonWarmCandidate() throws
123123
}
124124
IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1);
125125
assertNotNull(shard);
126-
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
126+
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false, false);
127127
Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3);
128-
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
128+
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false, false);
129129
assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount());
130130
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
131131
}
@@ -150,9 +150,9 @@ public void testAutoForceMergeTriggeringBasicWithOneShard() throws Exception {
150150
}
151151
IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1);
152152
assertNotNull(shard);
153-
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false);
154-
waitUntil(() -> shard.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
155-
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false);
153+
SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false, false);
154+
waitUntil(() -> shard.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
155+
SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false, false);
156156
assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount());
157157
assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount());
158158
assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get());
@@ -211,26 +211,26 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E
211211
assertNotNull(shard4);
212212
assertNotNull(shard5);
213213

214-
SegmentsStats segmentsStatsForShard1Before = shard1.segmentStats(false, false);
215-
SegmentsStats segmentsStatsForShard2Before = shard2.segmentStats(false, false);
216-
SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false);
217-
SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false);
218-
SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false);
214+
SegmentsStats segmentsStatsForShard1Before = shard1.segmentStats(false, false, false);
215+
SegmentsStats segmentsStatsForShard2Before = shard2.segmentStats(false, false, false);
216+
SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false, false);
217+
SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false, false);
218+
SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false, false);
219219
AtomicLong totalSegmentsBefore = new AtomicLong(
220220
segmentsStatsForShard1Before.getCount() + segmentsStatsForShard2Before.getCount() + segmentsStatsForShard3Before.getCount()
221221
+ segmentsStatsForShard4Before.getCount() + segmentsStatsForShard5Before.getCount()
222222
);
223223
assertTrue(totalSegmentsBefore.get() > 5);
224-
waitUntil(() -> shard1.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
225-
waitUntil(() -> shard2.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
226-
waitUntil(() -> shard3.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
227-
waitUntil(() -> shard4.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
228-
waitUntil(() -> shard5.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
229-
SegmentsStats segmentsStatsForShard1After = shard1.segmentStats(false, false);
230-
SegmentsStats segmentsStatsForShard2After = shard2.segmentStats(false, false);
231-
SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false);
232-
SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false);
233-
SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false);
224+
waitUntil(() -> shard1.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
225+
waitUntil(() -> shard2.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
226+
waitUntil(() -> shard3.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
227+
waitUntil(() -> shard4.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
228+
waitUntil(() -> shard5.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES);
229+
SegmentsStats segmentsStatsForShard1After = shard1.segmentStats(false, false, false);
230+
SegmentsStats segmentsStatsForShard2After = shard2.segmentStats(false, false, false);
231+
SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false, false);
232+
SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false, false);
233+
SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false, false);
234234
AtomicLong totalSegmentsAfter = new AtomicLong(
235235
segmentsStatsForShard1After.getCount() + segmentsStatsForShard2After.getCount() + segmentsStatsForShard3After.getCount()
236236
+ segmentsStatsForShard4After.getCount() + segmentsStatsForShard5After.getCount()

server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,11 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C
227227
completion = indexShard.completionStats(flags.completionDataFields());
228228
break;
229229
case Segments:
230-
segments = indexShard.segmentStats(flags.includeSegmentFileSizes(), flags.includeUnloadedSegments());
230+
segments = indexShard.segmentStats(
231+
flags.includeSegmentFileSizes(),
232+
flags.includeUnloadedSegments(),
233+
flags.includeFieldLevelSegmentFileSizes()
234+
);
231235
break;
232236
case Translog:
233237
translog = indexShard.translogStats();

server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
6262
private String[] completionDataFields = null;
6363
private boolean includeSegmentFileSizes = false;
6464
private boolean includeUnloadedSegments = false;
65+
private boolean includeFieldLevelSegmentFileSizes = false;
6566
private boolean includeAllShardIndexingPressureTrackers = false;
6667
private boolean includeOnlyTopIndexingPressureMetrics = false;
6768
// Used for metric CACHE_STATS, to determine which caches to report stats for
@@ -104,6 +105,9 @@ public CommonStatsFlags(StreamInput in) throws IOException {
104105
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
105106
includeIndicesStatsByLevel = in.readBoolean();
106107
}
108+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
109+
includeFieldLevelSegmentFileSizes = in.readBoolean();
110+
}
107111
}
108112

109113
@Override
@@ -131,6 +135,9 @@ public void writeTo(StreamOutput out) throws IOException {
131135
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
132136
out.writeBoolean(includeIndicesStatsByLevel);
133137
}
138+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
139+
out.writeBoolean(includeFieldLevelSegmentFileSizes);
140+
}
134141
}
135142

136143
/**
@@ -143,6 +150,7 @@ public CommonStatsFlags all() {
143150
completionDataFields = null;
144151
includeSegmentFileSizes = false;
145152
includeUnloadedSegments = false;
153+
includeFieldLevelSegmentFileSizes = false;
146154
includeAllShardIndexingPressureTrackers = false;
147155
includeOnlyTopIndexingPressureMetrics = false;
148156
includeCaches = EnumSet.allOf(CacheType.class);
@@ -160,6 +168,7 @@ public CommonStatsFlags clear() {
160168
completionDataFields = null;
161169
includeSegmentFileSizes = false;
162170
includeUnloadedSegments = false;
171+
includeFieldLevelSegmentFileSizes = false;
163172
includeAllShardIndexingPressureTrackers = false;
164173
includeOnlyTopIndexingPressureMetrics = false;
165174
includeCaches = EnumSet.noneOf(CacheType.class);
@@ -223,6 +232,11 @@ public CommonStatsFlags includeSegmentFileSizes(boolean includeSegmentFileSizes)
223232
return this;
224233
}
225234

235+
public CommonStatsFlags includeFieldLevelSegmentFileSizes(boolean includeFieldLevelSegmentFileSizes) {
236+
this.includeFieldLevelSegmentFileSizes = includeFieldLevelSegmentFileSizes;
237+
return this;
238+
}
239+
226240
public CommonStatsFlags includeUnloadedSegments(boolean includeUnloadedSegments) {
227241
this.includeUnloadedSegments = includeUnloadedSegments;
228242
return this;
@@ -269,6 +283,10 @@ public boolean includeSegmentFileSizes() {
269283
return this.includeSegmentFileSizes;
270284
}
271285

286+
public boolean includeFieldLevelSegmentFileSizes() {
287+
return this.includeFieldLevelSegmentFileSizes;
288+
}
289+
272290
public void setIncludeIndicesStatsByLevel(boolean includeIndicesStatsByLevel) {
273291
this.includeIndicesStatsByLevel = includeIndicesStatsByLevel;
274292
}

server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,11 @@ public IndicesStatsRequest includeSegmentFileSizes(boolean includeSegmentFileSiz
292292
return this;
293293
}
294294

295+
public IndicesStatsRequest includeFieldLevelSegmentFileSizes(boolean includeFieldLevelSegmentFileSizes) {
296+
flags.includeFieldLevelSegmentFileSizes(includeFieldLevelSegmentFileSizes);
297+
return this;
298+
}
299+
295300
public IndicesStatsRequest includeUnloadedSegments(boolean includeUnloadedSegments) {
296301
flags.includeUnloadedSegments(includeUnloadedSegments);
297302
return this;
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common;
10+
11+
import org.opensearch.common.annotation.PublicApi;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.core.common.io.stream.StreamOutput;
14+
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.core.common.unit.ByteSizeValue;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
18+
import java.io.IOException;
19+
import java.util.HashMap;
20+
import java.util.Iterator;
21+
import java.util.Map;
22+
import java.util.Objects;
23+
24+
/**
25+
* A reusable class to encode field → {extension → file size} mappings for segment statistics.
26+
* This class provides per-field disk consumption breakdown by file extension.
27+
*
28+
* @opensearch.api
29+
*/
30+
@PublicApi(since = "3.0.0")
31+
public final class FieldFileStats implements Writeable, Iterable<Map.Entry<String, Map<String, Long>>> {
32+
33+
private final Map<String, Map<String, Long>> stats;
34+
35+
/**
36+
* Creates a new FieldFileStats instance
37+
*
38+
* @param stats the field to extension to size mapping
39+
*/
40+
public FieldFileStats(Map<String, Map<String, Long>> stats) {
41+
this.stats = Objects.requireNonNull(stats, "stats must be non-null");
42+
assert stats.containsKey(null) == false : "stats cannot contain null field names";
43+
}
44+
45+
/**
46+
* Creates a new FieldFileStats instance from a stream
47+
*
48+
* @param input the stream input
49+
* @throws IOException if an I/O error occurs
50+
*/
51+
public FieldFileStats(StreamInput input) throws IOException {
52+
int fieldCount = input.readVInt();
53+
this.stats = new HashMap<>(fieldCount);
54+
55+
for (int i = 0; i < fieldCount; i++) {
56+
String fieldName = input.readString();
57+
int extensionCount = input.readVInt();
58+
Map<String, Long> extensionSizes = new HashMap<>(extensionCount);
59+
60+
for (int j = 0; j < extensionCount; j++) {
61+
String extension = input.readString();
62+
long size = input.readVLong();
63+
extensionSizes.put(extension, size);
64+
}
65+
66+
stats.put(fieldName, extensionSizes);
67+
}
68+
}
69+
70+
/**
71+
* Adds / merges the given field file stats into this stats instance
72+
*
73+
* @param fieldFileStats the stats to merge
74+
*/
75+
public void add(FieldFileStats fieldFileStats) {
76+
for (final var fieldEntry : fieldFileStats.stats.entrySet()) {
77+
String fieldName = fieldEntry.getKey();
78+
Map<String, Long> otherExtensions = fieldEntry.getValue();
79+
80+
stats.compute(fieldName, (k, existingExtensions) -> {
81+
if (existingExtensions == null) {
82+
return new HashMap<>(otherExtensions);
83+
} else {
84+
for (Map.Entry<String, Long> extEntry : otherExtensions.entrySet()) {
85+
existingExtensions.merge(extEntry.getKey(), extEntry.getValue(), Long::sum);
86+
}
87+
return existingExtensions;
88+
}
89+
});
90+
}
91+
}
92+
93+
@Override
94+
public void writeTo(StreamOutput out) throws IOException {
95+
out.writeVInt(stats.size());
96+
97+
for (Map.Entry<String, Map<String, Long>> fieldEntry : stats.entrySet()) {
98+
out.writeString(fieldEntry.getKey());
99+
Map<String, Long> extensionSizes = fieldEntry.getValue();
100+
out.writeVInt(extensionSizes.size());
101+
102+
for (Map.Entry<String, Long> extEntry : extensionSizes.entrySet()) {
103+
out.writeString(extEntry.getKey());
104+
out.writeVLong(extEntry.getValue());
105+
}
106+
}
107+
}
108+
109+
/**
110+
* Generates x-content into the given builder for field-level file statistics.
111+
* Output format:
112+
* <pre>
113+
* "field_level_file_sizes": {
114+
* "fieldName": {
115+
* "extension": {
116+
* "size_in_bytes": 12345,
117+
* "size": "12kb"
118+
* }
119+
* }
120+
* }
121+
* </pre>
122+
*
123+
* @param builder the builder to generate on
124+
* @param key the top level key for this stats object
125+
* @throws IOException if an I/O error occurs
126+
*/
127+
public void toXContent(XContentBuilder builder, String key) throws IOException {
128+
builder.startObject(key);
129+
for (final var fieldEntry : stats.entrySet()) {
130+
builder.startObject(fieldEntry.getKey());
131+
for (final var extEntry : fieldEntry.getValue().entrySet()) {
132+
builder.startObject(extEntry.getKey());
133+
builder.humanReadableField("size_in_bytes", "size", new ByteSizeValue(extEntry.getValue()));
134+
builder.endObject();
135+
}
136+
builder.endObject();
137+
}
138+
builder.endObject();
139+
}
140+
141+
/**
142+
* Creates a deep copy of this stats instance
143+
*
144+
* @return a new copy of this FieldFileStats
145+
*/
146+
public FieldFileStats copy() {
147+
Map<String, Map<String, Long>> copiedStats = new HashMap<>(stats.size());
148+
for (Map.Entry<String, Map<String, Long>> entry : stats.entrySet()) {
149+
copiedStats.put(entry.getKey(), new HashMap<>(entry.getValue()));
150+
}
151+
return new FieldFileStats(copiedStats);
152+
}
153+
154+
@Override
155+
public boolean equals(Object o) {
156+
if (this == o) return true;
157+
if (o == null || getClass() != o.getClass()) return false;
158+
FieldFileStats that = (FieldFileStats) o;
159+
return Objects.equals(stats, that.stats);
160+
}
161+
162+
@Override
163+
public int hashCode() {
164+
return Objects.hash(stats);
165+
}
166+
167+
@Override
168+
public Iterator<Map.Entry<String, Map<String, Long>>> iterator() {
169+
return stats.entrySet().iterator();
170+
}
171+
172+
/**
173+
* Returns the file size in bytes for a specific field and extension,
174+
* or 0 if the field/extension combination is not present
175+
*
176+
* @param field the field name
177+
* @param extension the file extension
178+
* @return the size in bytes
179+
*/
180+
public long get(String field, String extension) {
181+
Map<String, Long> extensions = stats.get(field);
182+
return extensions != null ? extensions.getOrDefault(extension, 0L) : 0L;
183+
}
184+
185+
/**
186+
* Returns all extension sizes for a given field
187+
*
188+
* @param field the field name
189+
* @return map of extension to size, or null if field not present
190+
*/
191+
public Map<String, Long> getField(String field) {
192+
return stats.get(field);
193+
}
194+
195+
/**
196+
* Returns true if the given field is in the stats
197+
*
198+
* @param field the field name
199+
* @return true if field exists
200+
*/
201+
public boolean containsField(String field) {
202+
return stats.containsKey(field);
203+
}
204+
205+
/**
206+
* Returns the total number of fields tracked
207+
*
208+
* @return the number of fields
209+
*/
210+
public int getFieldCount() {
211+
return stats.size();
212+
}
213+
}

0 commit comments

Comments
 (0)