From 34b62c716eb8c9d2ddd543b14d959383f7c3a46f Mon Sep 17 00:00:00 2001 From: Abhinav Tripathi Date: Fri, 8 Aug 2025 01:53:01 +0530 Subject: [PATCH] Added code changes to support field level stat on segment Signed-off-by: Abhinav Tripathi --- CHANGELOG.md | 1 + .../rest-api-spec/api/indices.stats.json | 5 + .../rest-api-spec/api/nodes.stats.json | 5 + server/build.gradle | 5 +- .../AutoForceMergeManagerIT.java | 44 +- .../engine/FieldLevelSegmentStatsIT.java | 136 ++++ .../admin/indices/stats/CommonStats.java | 6 +- .../admin/indices/stats/CommonStatsFlags.java | 20 + .../indices/stats/IndicesStatsRequest.java | 9 + .../stats/IndicesStatsRequestBuilder.java | 5 + .../org/opensearch/index/engine/Engine.java | 42 +- .../FieldLevelSegmentStatsCalculator.java | 625 ++++++++++++++++++ .../index/engine/FieldLevelStatsCache.java | 77 +++ .../index/engine/InternalEngine.java | 14 + .../index/engine/NRTReplicationEngine.java | 14 + .../opensearch/index/engine/NoOpEngine.java | 13 +- .../index/engine/ReadOnlyEngine.java | 14 + .../index/engine/SegmentsStats.java | 52 ++ .../opensearch/index/shard/IndexShard.java | 20 +- .../admin/cluster/RestNodesStatsAction.java | 2 + .../admin/indices/RestIndicesStatsAction.java | 1 + .../AutoForceMergeManagerTests.java | 2 +- ...FieldLevelSegmentStatsCalculatorTests.java | 260 ++++++++ .../engine/FieldLevelStatsCacheTests.java | 248 +++++++ .../index/engine/InternalEngineTests.java | 47 +- .../engine/NRTReplicationEngineTests.java | 4 +- .../index/engine/NoOpEngineTests.java | 8 +- .../IndexLevelReplicationTests.java | 4 +- .../index/shard/IndexShardTests.java | 42 ++ ...alStorePeerRecoverySourceHandlerTests.java | 6 +- ...teStorePeerRecoverySourceHandlerTests.java | 2 +- .../indices/RestIndicesStatsActionTests.java | 34 + 32 files changed, 1711 insertions(+), 56 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/index/engine/FieldLevelSegmentStatsIT.java create mode 100644 server/src/main/java/org/opensearch/index/engine/FieldLevelSegmentStatsCalculator.java create mode 100644 server/src/main/java/org/opensearch/index/engine/FieldLevelStatsCache.java create mode 100644 server/src/test/java/org/opensearch/index/engine/FieldLevelSegmentStatsCalculatorTests.java create mode 100644 server/src/test/java/org/opensearch/index/engine/FieldLevelStatsCacheTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 79420dff89fd2..5e34e5cab3029 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.x] ### Added - Add temporal routing processors for time-based document routing ([#18920](https://github.com/opensearch-project/OpenSearch/issues/18920)) +* Added code changes to support field level stat on segment [18973](https://github.com/opensearch-project/OpenSearch/pull/18973) ### Changed - Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json index 71ce6dbd443f0..7fa672be7754c 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json @@ -123,6 +123,11 @@ "description":"Whether to report the aggregated disk usage of each one of the Lucene index files (only applies if segment stats are requested)", "default":false }, + "include_field_level_segment_file_sizes":{ + "type":"boolean", + "description":"Whether to report the disk usage of Lucene index files at the field level (only applies if segment stats are requested)", + "default":false + }, "include_unloaded_segments":{ "type":"boolean", "description":"If set to true segment stats will include stats for segments that are not currently loaded into memory", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json index a23b2e5428fb6..30d306e730596 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json @@ -222,6 +222,11 @@ "type":"boolean", "description":"Whether to report the aggregated disk usage of each one of the Lucene index files (only applies if segment stats are requested)", "default":false + }, + "include_field_level_segment_file_sizes":{ + "type":"boolean", + "description":"Whether to report the disk usage of Lucene index files at the field level (only applies if segment stats are requested)", + "default":false } } } diff --git a/server/build.gradle b/server/build.gradle index 803d791295e71..23712b02e4a2f 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -395,10 +395,12 @@ tasks.named("sourcesJar").configure { tasks.register("japicmp", me.champeau.gradle.japicmp.JapicmpTask) { logger.info("Comparing public APIs from ${version} to ${japicmpCompareTarget}") // See please https://github.com/siom79/japicmp/issues/201 - compatibilityChangeExcludes = [ "METHOD_ABSTRACT_NOW_DEFAULT", "METHOD_ADDED_TO_INTERFACE" ] + compatibilityChangeExcludes = [ "METHOD_ABSTRACT_NOW_DEFAULT", "METHOD_ADDED_TO_INTERFACE", "METHOD_REMOVED_IN_SUPERCLASS" ] oldClasspath.from(files("${buildDir}/japicmp-target/opensearch-${japicmpCompareTarget}.jar")) newClasspath.from(tasks.named('jar')) + // Restrict to modified elements only and fail only on binary-incompatible changes onlyModified = true + onlyBinaryIncompatibleModified = true failOnModification = true ignoreMissingClasses = true failOnSourceIncompatibility = true @@ -406,6 +408,7 @@ tasks.register("japicmp", me.champeau.gradle.japicmp.JapicmpTask) { annotationExcludes = ['@org.opensearch.common.annotation.InternalApi', '@org.opensearch.common.annotation.ExperimentalApi'] txtOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.txt") htmlOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.html") + xmlOutputFile = layout.buildDirectory.file("reports/java-compatibility/report.xml") dependsOn downloadJapicmpCompareTarget } diff --git a/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java index 142e2da95653e..c01ad0348c236 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerIT.java @@ -86,7 +86,7 @@ public void testAutoForceMergeFeatureFlagDisabled() throws InterruptedException, assertNotNull(shard); // Before stats - SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false); + SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false, false); // This is to make sure auto force merge action gets triggered multiple times ang gets successful at least once. Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3); @@ -94,7 +94,7 @@ public void testAutoForceMergeFeatureFlagDisabled() throws InterruptedException, flushAndRefresh(INDEX_NAME_1); // After stats - SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false); + SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false, false); assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount()); // Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file @@ -123,9 +123,9 @@ public void testAutoForceMergeTriggeringWithOneShardOfNonWarmCandidate() throws } IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1); assertNotNull(shard); - SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false); + SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false, false); Thread.sleep(TimeValue.parseTimeValue(SCHEDULER_INTERVAL, "test").getMillis() * 3); - SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false); + SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false, false); assertEquals(segmentsStatsBefore.getCount(), segmentsStatsAfter.getCount()); assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get()); } @@ -150,9 +150,9 @@ public void testAutoForceMergeTriggeringBasicWithOneShard() throws Exception { } IndexShard shard = getIndexShard(dataNode, INDEX_NAME_1); assertNotNull(shard); - SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false); - waitUntil(() -> shard.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); - SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false); + SegmentsStats segmentsStatsBefore = shard.segmentStats(false, false, false); + waitUntil(() -> shard.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + SegmentsStats segmentsStatsAfter = shard.segmentStats(false, false, false); assertTrue((int) segmentsStatsBefore.getCount() > segmentsStatsAfter.getCount()); assertEquals((int) SEGMENT_COUNT, segmentsStatsAfter.getCount()); assertAcked(client().admin().indices().prepareDelete(INDEX_NAME_1).get()); @@ -211,26 +211,26 @@ public void testAutoForceMergeTriggeringBasicWithFiveShardsOfTwoIndex() throws E assertNotNull(shard4); assertNotNull(shard5); - SegmentsStats segmentsStatsForShard1Before = shard1.segmentStats(false, false); - SegmentsStats segmentsStatsForShard2Before = shard2.segmentStats(false, false); - SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false); - SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false); - SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false); + SegmentsStats segmentsStatsForShard1Before = shard1.segmentStats(false, false, false); + SegmentsStats segmentsStatsForShard2Before = shard2.segmentStats(false, false, false); + SegmentsStats segmentsStatsForShard3Before = shard3.segmentStats(false, false, false); + SegmentsStats segmentsStatsForShard4Before = shard4.segmentStats(false, false, false); + SegmentsStats segmentsStatsForShard5Before = shard5.segmentStats(false, false, false); AtomicLong totalSegmentsBefore = new AtomicLong( segmentsStatsForShard1Before.getCount() + segmentsStatsForShard2Before.getCount() + segmentsStatsForShard3Before.getCount() + segmentsStatsForShard4Before.getCount() + segmentsStatsForShard5Before.getCount() ); assertTrue(totalSegmentsBefore.get() > 5); - waitUntil(() -> shard1.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); - waitUntil(() -> shard2.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); - waitUntil(() -> shard3.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); - waitUntil(() -> shard4.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); - waitUntil(() -> shard5.segmentStats(false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); - SegmentsStats segmentsStatsForShard1After = shard1.segmentStats(false, false); - SegmentsStats segmentsStatsForShard2After = shard2.segmentStats(false, false); - SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false); - SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false); - SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false); + waitUntil(() -> shard1.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + waitUntil(() -> shard2.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + waitUntil(() -> shard3.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + waitUntil(() -> shard4.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + waitUntil(() -> shard5.segmentStats(false, false, false).getCount() == SEGMENT_COUNT, 1, TimeUnit.MINUTES); + SegmentsStats segmentsStatsForShard1After = shard1.segmentStats(false, false, false); + SegmentsStats segmentsStatsForShard2After = shard2.segmentStats(false, false, false); + SegmentsStats segmentsStatsForShard3After = shard3.segmentStats(false, false, false); + SegmentsStats segmentsStatsForShard4After = shard4.segmentStats(false, false, false); + SegmentsStats segmentsStatsForShard5After = shard5.segmentStats(false, false, false); AtomicLong totalSegmentsAfter = new AtomicLong( segmentsStatsForShard1After.getCount() + segmentsStatsForShard2After.getCount() + segmentsStatsForShard3After.getCount() + segmentsStatsForShard4After.getCount() + segmentsStatsForShard5After.getCount() diff --git a/server/src/internalClusterTest/java/org/opensearch/index/engine/FieldLevelSegmentStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/index/engine/FieldLevelSegmentStatsIT.java new file mode 100644 index 0000000000000..bdab0bf24132e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/index/engine/FieldLevelSegmentStatsIT.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Map; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.greaterThan; + +/** + * Integration test for field-level segment statistics + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2) +public class FieldLevelSegmentStatsIT extends OpenSearchIntegTestCase { + + /** + * Comprehensive test for field-level segment stats API covering single and multiple indices + */ + public void testFieldLevelSegmentStatsEndToEnd() { + String index1 = "test-field-stats"; + String index2 = "test-index-2"; + + // Create first index with explicit mapping + assertAcked( + prepareCreate(index1).setMapping( + "title", + "type=text", + "keyword_field", + "type=keyword", + "numeric_field", + "type=long", + "date_field", + "type=date" + ) + ); + + // Create second index with different mapping + assertAcked(prepareCreate(index2).setMapping("field1", "type=text", "field2", "type=keyword")); + + // Index documents to first index + int numDocs = 100; + for (int i = 0; i < numDocs; i++) { + client().prepareIndex(index1) + .setId(String.valueOf(i)) + .setSource( + "title", + "Document title " + i, + "keyword_field", + "keyword_" + (i % 10), + "numeric_field", + i, + "date_field", + "2025-01-" + ((i % 28) + 1 < 10 ? "0" + ((i % 28) + 1) : String.valueOf((i % 28) + 1)) + ) + .get(); + } + + // Index documents to second index + for (int i = 0; i < 50; i++) { + client().prepareIndex(index2).setId(String.valueOf(i)).setSource("field1", "text " + i, "field2", "keyword" + i).get(); + } + + refresh(index1, index2); + + // Test single index with field-level stats + IndicesStatsRequest singleIndexRequest = new IndicesStatsRequest(); + singleIndexRequest.indices(index1); + singleIndexRequest.segments(true); + singleIndexRequest.includeFieldLevelSegmentFileSizes(true); + + IndicesStatsResponse singleIndexResponse = client().admin().indices().stats(singleIndexRequest).actionGet(); + + SegmentsStats segmentStats = singleIndexResponse.getIndex(index1).getTotal().getSegments(); + assertNotNull("Segment stats should not be null", segmentStats); + + Map> fieldLevelStats = segmentStats.getFieldLevelFileSizes(); + assertNotNull("Field-level stats should not be null", fieldLevelStats); + assertFalse("Field-level stats should not be empty", fieldLevelStats.isEmpty()); + + // Verify expected fields are present + assertTrue("Should have stats for title", fieldLevelStats.containsKey("title")); + assertTrue("Should have stats for keyword_field", fieldLevelStats.containsKey("keyword_field")); + assertTrue("Should have stats for numeric_field", fieldLevelStats.containsKey("numeric_field")); + assertTrue("Should have stats for date_field", fieldLevelStats.containsKey("date_field")); + + // Verify stats have positive values + for (Map.Entry> fieldEntry : fieldLevelStats.entrySet()) { + assertFalse("Field " + fieldEntry.getKey() + " should have file stats", fieldEntry.getValue().isEmpty()); + for (Map.Entry fileEntry : fieldEntry.getValue().entrySet()) { + assertThat("File sizes should be positive", fileEntry.getValue(), greaterThan(0L)); + } + } + + // Test multiple indices + IndicesStatsRequest multiIndexRequest = new IndicesStatsRequest(); + multiIndexRequest.indices(index1, index2); + multiIndexRequest.segments(true); + multiIndexRequest.includeFieldLevelSegmentFileSizes(true); + + IndicesStatsResponse multiIndexResponse = client().admin().indices().stats(multiIndexRequest).actionGet(); + + // Verify both indices have field-level stats + SegmentsStats stats1 = multiIndexResponse.getIndex(index1).getTotal().getSegments(); + assert stats1 != null; + assertFalse("Index1 field-level stats should not be empty", stats1.getFieldLevelFileSizes().isEmpty()); + assertTrue("Index1 should have stats for title", stats1.getFieldLevelFileSizes().containsKey("title")); + + SegmentsStats stats2 = multiIndexResponse.getIndex(index2).getTotal().getSegments(); + assert stats2 != null; + assertFalse("Index2 field-level stats should not be empty", stats2.getFieldLevelFileSizes().isEmpty()); + assertTrue("Index2 should have stats for field1", stats2.getFieldLevelFileSizes().containsKey("field1")); + assertTrue("Index2 should have stats for field2", stats2.getFieldLevelFileSizes().containsKey("field2")); + + // Test backward compatibility (without field-level stats) + IndicesStatsRequest normalRequest = new IndicesStatsRequest(); + normalRequest.indices(index1); + normalRequest.segments(true); + normalRequest.includeFieldLevelSegmentFileSizes(false); + + IndicesStatsResponse normalResponse = client().admin().indices().stats(normalRequest).actionGet(); + SegmentsStats normalSegmentStats = normalResponse.getIndex(index1).getTotal().getSegments(); + + assertNotNull("Normal segment stats should not be null", normalSegmentStats); + assertTrue("Normal request should not include field-level stats", normalSegmentStats.getFieldLevelFileSizes().isEmpty()); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java index 8bfeb13b253c3..6ab737f3f9be2 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java @@ -227,7 +227,11 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C completion = indexShard.completionStats(flags.completionDataFields()); break; case Segments: - segments = indexShard.segmentStats(flags.includeSegmentFileSizes(), flags.includeUnloadedSegments()); + segments = indexShard.segmentStats( + flags.includeSegmentFileSizes(), + flags.includeFieldLevelSegmentFileSizes(), + flags.includeUnloadedSegments() + ); break; case Translog: translog = indexShard.translogStats(); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java index 03fb55323feec..4cf2d1a54c9bd 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java @@ -61,6 +61,7 @@ public class CommonStatsFlags implements Writeable, Cloneable { private String[] fieldDataFields = null; private String[] completionDataFields = null; private boolean includeSegmentFileSizes = false; + private boolean includeFieldLevelSegmentFileSizes = false; private boolean includeUnloadedSegments = false; private boolean includeAllShardIndexingPressureTrackers = false; private boolean includeOnlyTopIndexingPressureMetrics = false; @@ -94,6 +95,11 @@ public CommonStatsFlags(StreamInput in) throws IOException { fieldDataFields = in.readStringArray(); completionDataFields = in.readStringArray(); includeSegmentFileSizes = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_3_2_0)) { + includeFieldLevelSegmentFileSizes = in.readBoolean(); + } else { + includeFieldLevelSegmentFileSizes = false; + } includeUnloadedSegments = in.readBoolean(); includeAllShardIndexingPressureTrackers = in.readBoolean(); includeOnlyTopIndexingPressureMetrics = in.readBoolean(); @@ -121,6 +127,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArrayNullable(fieldDataFields); out.writeStringArrayNullable(completionDataFields); out.writeBoolean(includeSegmentFileSizes); + if (out.getVersion().onOrAfter(Version.V_3_2_0)) { + out.writeBoolean(includeFieldLevelSegmentFileSizes); + } out.writeBoolean(includeUnloadedSegments); out.writeBoolean(includeAllShardIndexingPressureTrackers); out.writeBoolean(includeOnlyTopIndexingPressureMetrics); @@ -142,6 +151,7 @@ public CommonStatsFlags all() { fieldDataFields = null; completionDataFields = null; includeSegmentFileSizes = false; + includeFieldLevelSegmentFileSizes = false; includeUnloadedSegments = false; includeAllShardIndexingPressureTrackers = false; includeOnlyTopIndexingPressureMetrics = false; @@ -159,6 +169,7 @@ public CommonStatsFlags clear() { fieldDataFields = null; completionDataFields = null; includeSegmentFileSizes = false; + includeFieldLevelSegmentFileSizes = false; includeUnloadedSegments = false; includeAllShardIndexingPressureTrackers = false; includeOnlyTopIndexingPressureMetrics = false; @@ -223,6 +234,11 @@ public CommonStatsFlags includeSegmentFileSizes(boolean includeSegmentFileSizes) return this; } + public CommonStatsFlags includeFieldLevelSegmentFileSizes(boolean includeFieldLevelSegmentFileSizes) { + this.includeFieldLevelSegmentFileSizes = includeFieldLevelSegmentFileSizes; + return this; + } + public CommonStatsFlags includeUnloadedSegments(boolean includeUnloadedSegments) { this.includeUnloadedSegments = includeUnloadedSegments; return this; @@ -269,6 +285,10 @@ public boolean includeSegmentFileSizes() { return this.includeSegmentFileSizes; } + public boolean includeFieldLevelSegmentFileSizes() { + return this.includeFieldLevelSegmentFileSizes; + } + public void setIncludeIndicesStatsByLevel(boolean includeIndicesStatsByLevel) { this.includeIndicesStatsByLevel = includeIndicesStatsByLevel; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java index c36e53098d166..091301aa2e200 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -292,6 +292,15 @@ public IndicesStatsRequest includeSegmentFileSizes(boolean includeSegmentFileSiz return this; } + public boolean includeFieldLevelSegmentFileSizes() { + return flags.includeFieldLevelSegmentFileSizes(); + } + + public IndicesStatsRequest includeFieldLevelSegmentFileSizes(boolean includeFieldLevelSegmentFileSizes) { + flags.includeFieldLevelSegmentFileSizes(includeFieldLevelSegmentFileSizes); + return this; + } + public IndicesStatsRequest includeUnloadedSegments(boolean includeUnloadedSegments) { flags.includeUnloadedSegments(includeUnloadedSegments); return this; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java index 4b86016134c75..dbea53ac0738a 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java @@ -181,4 +181,9 @@ public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegm request.includeSegmentFileSizes(includeSegmentFileSizes); return this; } + + public IndicesStatsRequestBuilder setIncludeFieldLevelSegmentFileSizes(boolean includeFieldLevelSegmentFileSizes) { + request.includeFieldLevelSegmentFileSizes(includeFieldLevelSegmentFileSizes); + return this; + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 92858ffc26902..bcd04eadd67ba 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -923,14 +923,18 @@ public final CommitStats commitStats() { /** * Global stats on segments. */ - public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { + public SegmentsStats segmentsStats( + boolean includeSegmentFileSizes, + boolean includeFieldLevelSegmentFileSizes, + boolean includeUnloadedSegments + ) { ensureOpen(); Set segmentName = new HashSet<>(); SegmentsStats stats = new SegmentsStats(); try (Searcher searcher = acquireSearcher("segments_stats", SearcherScope.INTERNAL)) { for (LeafReaderContext ctx : searcher.getIndexReader().getContext().leaves()) { SegmentReader segmentReader = Lucene.segmentReader(ctx.reader()); - fillSegmentStats(segmentReader, includeSegmentFileSizes, stats); + fillSegmentStats(segmentReader, includeSegmentFileSizes, includeFieldLevelSegmentFileSizes, stats); segmentName.add(segmentReader.getSegmentName()); } } @@ -939,7 +943,7 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl for (LeafReaderContext ctx : searcher.getIndexReader().getContext().leaves()) { SegmentReader segmentReader = Lucene.segmentReader(ctx.reader()); if (segmentName.contains(segmentReader.getSegmentName()) == false) { - fillSegmentStats(segmentReader, includeSegmentFileSizes, stats); + fillSegmentStats(segmentReader, includeSegmentFileSizes, includeFieldLevelSegmentFileSizes, stats); } } } @@ -947,6 +951,14 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl return stats; } + /** + * Backwards-compatible overload retained for binary compatibility. + * Delegates to the new method with field-level stats disabled by default. + */ + public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { + return segmentsStats(includeSegmentFileSizes, false, includeUnloadedSegments); + } + /** * @return Stats for pull-based ingestion. */ @@ -970,12 +982,32 @@ protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineCo ); } - protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegmentFileSizes, SegmentsStats stats) { + protected void fillSegmentStats( + SegmentReader segmentReader, + boolean includeSegmentFileSizes, + boolean includeFieldLevelSegmentFileSizes, + SegmentsStats stats + ) { stats.add(1); if (includeSegmentFileSizes) { // TODO: consider moving this to StoreStats stats.addFileSizes(getSegmentFileSizes(segmentReader)); } + if (includeFieldLevelSegmentFileSizes) { + try { + FieldLevelSegmentStatsCalculator calculator = new FieldLevelSegmentStatsCalculator(); + Map> fieldLevelStats = calculator.calculateFieldLevelStats(segmentReader); + stats.addFieldLevelFileSizes(fieldLevelStats); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "Failed to calculate field-level segment statistics for segment [{}]", + segmentReader.getSegmentName() + ), + e + ); + } + } } boolean shouldCleanupUnreferencedFiles() { @@ -1162,7 +1194,7 @@ public boolean refreshNeeded() { return searcher.getDirectoryReader().isCurrent() == false; } } catch (IOException e) { - logger.error("failed to access searcher manager", e); + logger.error(() -> new ParameterizedMessage("failed to access searcher manager"), e); failEngine("failed to access searcher manager", e); throw new EngineException(shardId, "failed to access searcher manager", e); } finally { diff --git a/server/src/main/java/org/opensearch/index/engine/FieldLevelSegmentStatsCalculator.java b/server/src/main/java/org/opensearch/index/engine/FieldLevelSegmentStatsCalculator.java new file mode 100644 index 0000000000000..9b1e7e34c06ec --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/FieldLevelSegmentStatsCalculator.java @@ -0,0 +1,625 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.Terms; +import org.apache.lucene.store.Directory; +import org.opensearch.common.Randomness; +import org.opensearch.core.common.unit.ByteSizeValue; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Calculates field-level segment statistics for Lucene index files with maximum accuracy. + * Uses Lucene's built-in statistics APIs and proportional attribution for precise calculations. + * Supports sampling for large segments to prevent OOM while maintaining statistical accuracy. + * + * @opensearch.internal + */ +public class FieldLevelSegmentStatsCalculator { + private static final Logger logger = LogManager.getLogger(FieldLevelSegmentStatsCalculator.class); + + // Configuration constants + private static final long DEFAULT_LARGE_SEGMENT_THRESHOLD = 1024L * 1024L * 1024L; // 1GB default + private static final double DEFAULT_SAMPLING_RATE = 0.1; // 10% sampling for large segments + private static final int MIN_SAMPLE_FIELDS = 10; // Minimum fields to sample for accuracy + + private final long largeSegmentThreshold; + private final double samplingRate; + private final FieldLevelStatsCache cache = new FieldLevelStatsCache(); + + /** + * Default constructor with default sampling threshold + */ + public FieldLevelSegmentStatsCalculator() { + this(DEFAULT_LARGE_SEGMENT_THRESHOLD); + } + + /** + * Constructor with configurable sampling threshold + * @param largeSegmentThreshold Threshold in bytes for triggering sampling + */ + public FieldLevelSegmentStatsCalculator(long largeSegmentThreshold) { + this.largeSegmentThreshold = largeSegmentThreshold; + this.samplingRate = DEFAULT_SAMPLING_RATE; + } + + /** + * Calculate field-level statistics for a segment using actual file sizes and Lucene statistics + * @param segmentReader The segment reader to analyze + * @return Map of field names to their file type statistics (dvd, dvm, tim, tip, pos, dim, dii) + */ + public Map> calculateFieldLevelStats(SegmentReader segmentReader) { + return calculateFieldLevelStats(segmentReader, null, false); + } + + /** + * Calculate field-level statistics with options for field filtering and sampling + * @param segmentReader The segment reader to analyze + * @param fieldFilter Optional set of fields to include (null = all fields) + * @param forceSampling Force sampling even for small segments + * @return Map of field names to their file type statistics + */ + public Map> calculateFieldLevelStats( + SegmentReader segmentReader, + Set fieldFilter, + boolean forceSampling + ) { + long startTime = System.currentTimeMillis(); + + // Check cache first + Map> cachedStats = cache.get(segmentReader); + if (cachedStats != null && fieldFilter == null) { + return cachedStats; + } + + Map> fieldLevelStats = new ConcurrentHashMap<>(); + if (segmentReader.getFieldInfos().size() == 0) { + return fieldLevelStats; + } + + long segmentSize = estimateSegmentSize(segmentReader); + boolean shouldSample = forceSampling || segmentSize > largeSegmentThreshold; + + if (shouldSample) { + logger.info( + "Using sampling for large segment {} (size: {}, sampling rate: {}%)", + segmentReader.getSegmentName(), + new ByteSizeValue(segmentSize), + (int) (samplingRate * 100) + ); + fieldLevelStats = calculateWithSampling(segmentReader, fieldFilter); + } else { + Map segmentFileSizes = getSegmentFileSizes(segmentReader); + processFields(segmentReader, fieldFilter, segmentFileSizes, fieldLevelStats, 1.0); + } + + // Cache the results if no filter was applied + if (fieldFilter == null && !fieldLevelStats.isEmpty()) { + cache.put(segmentReader, fieldLevelStats); + } + + long elapsedTime = System.currentTimeMillis() - startTime; + if (elapsedTime > 5000) { + logger.warn( + "Field-level stats calculation took {} ms for segment {} (sampled: {})", + elapsedTime, + segmentReader.getSegmentName(), + shouldSample + ); + } + return fieldLevelStats; + } + + /** + * Calculate statistics using sampling for large segments + * @param segmentReader The segment reader + * @param fieldFilter Optional field filter + * @return Sampled and extrapolated field statistics + */ + private Map> calculateWithSampling(SegmentReader segmentReader, Set fieldFilter) { + Map> sampledStats = new ConcurrentHashMap<>(); + Map segmentFileSizes = getSegmentFileSizes(segmentReader); + + // Determine fields to sample + int totalFields = segmentReader.getFieldInfos().size(); + int fieldsToSample = Math.max(MIN_SAMPLE_FIELDS, (int) (totalFields * samplingRate)); + + // Ensure we don't sample more fields than exist + fieldsToSample = Math.min(fieldsToSample, totalFields); + // Use statistical sampling for field selection + Set sampledFields = selectFieldsForSampling(segmentReader, fieldsToSample, fieldFilter); + // Process only sampled fields + processFields(segmentReader, sampledFields, segmentFileSizes, sampledStats, 1.0 / samplingRate); + + // Add sampling metadata to indicate these are estimates + for (Map.Entry> entry : sampledStats.entrySet()) { + entry.getValue().put("_sampled", 1L); + entry.getValue().put("_sampling_rate", (long) (samplingRate * 100)); + } + + return sampledStats; + } + + /** + * Select fields for sampling using stratified sampling to ensure representation + * @param segmentReader The segment reader + * @param numFields Number of fields to sample + * @param fieldFilter Optional field filter + * @return Set of field names to sample + */ + private Set selectFieldsForSampling(SegmentReader segmentReader, int numFields, Set fieldFilter) { + Set sampledFields = ConcurrentHashMap.newKeySet(); + + // Group fields by type for stratified sampling + Map> fieldsByType = new HashMap<>(); + fieldsByType.put("docvalues", ConcurrentHashMap.newKeySet()); + fieldsByType.put("terms", ConcurrentHashMap.newKeySet()); + fieldsByType.put("points", ConcurrentHashMap.newKeySet()); + + for (FieldInfo fieldInfo : segmentReader.getFieldInfos()) { + if (fieldFilter != null && !fieldFilter.contains(fieldInfo.name)) { + continue; + } + + if (fieldInfo.getDocValuesType() != DocValuesType.NONE) { + fieldsByType.get("docvalues").add(fieldInfo.name); + } + if (fieldInfo.getIndexOptions() != IndexOptions.NONE) { + fieldsByType.get("terms").add(fieldInfo.name); + } + if (fieldInfo.getPointDimensionCount() > 0) { + fieldsByType.get("points").add(fieldInfo.name); + } + } + + // Stratified sampling: sample proportionally from each type + for (Map.Entry> typeEntry : fieldsByType.entrySet()) { + Set fieldsOfType = typeEntry.getValue(); + if (fieldsOfType.isEmpty()) continue; + + int samplesToTake = Math.max(1, (int) (numFields * ((double) fieldsOfType.size() / segmentReader.getFieldInfos().size()))); + + // Random sampling within stratum + fieldsOfType.stream() + .sorted((a, b) -> Randomness.get().nextBoolean() ? 1 : -1) + .limit(samplesToTake) + .forEach(sampledFields::add); + } + return sampledFields; + } + + /** + * Get actual segment file sizes by extension + */ + private Map getSegmentFileSizes(SegmentReader segmentReader) { + Map fileSizes = new HashMap<>(); + try { + Directory directory = segmentReader.directory(); + if (directory != null) { + String[] files = directory.listAll(); + for (String file : files) { + try { + long size = directory.fileLength(file); + int idx = file.lastIndexOf('.'); + if (idx != -1) { + String ext = file.substring(idx + 1); + fileSizes.merge(ext, size, Long::sum); + } + } catch (NoSuchFileException | FileNotFoundException e) { + // File was deleted during processing, skip + } + } + } + } catch (IOException e) { + logger.debug(() -> new ParameterizedMessage("Failed to get segment file sizes for {}", segmentReader.getSegmentName()), e); + } + return fileSizes; + } + + /** + * Calculate DocValues statistics with proportional attribution + */ + private void calculateDocValuesStats( + SegmentReader reader, + FieldInfo fieldInfo, + Map stats, + Map segmentFileSizes, + double extrapolationFactor + ) { + Long actualDvdSize = segmentFileSizes.get("dvd"); + Long actualDvmSize = segmentFileSizes.get("dvm"); + + if (actualDvdSize != null && actualDvdSize > 0) { + long totalDocValuesSize = calculateTotalDocValuesSize(reader); + long fieldDocValuesSize = estimateFieldDocValuesSize(reader, fieldInfo); + if (totalDocValuesSize > 0 && fieldDocValuesSize > 0) { + long baseSize = (actualDvdSize * fieldDocValuesSize) / totalDocValuesSize; + stats.put("dvd", (long) (baseSize * extrapolationFactor)); + if (actualDvmSize != null) { + long metaSize = (actualDvmSize * fieldDocValuesSize) / totalDocValuesSize; + stats.put("dvm", (long) (metaSize * extrapolationFactor)); + } else { + stats.put("dvm", Math.max(1L, (long) (baseSize * extrapolationFactor / 10))); + } + } else { + stats.put("dvd", (long) (fieldDocValuesSize * extrapolationFactor)); + stats.put("dvm", Math.max(1L, (long) (fieldDocValuesSize * extrapolationFactor / 10))); + } + } else { + long estimatedSize = estimateFieldDocValuesSize(reader, fieldInfo); + if (estimatedSize > 0) { + stats.put("dvd", (long) (estimatedSize * extrapolationFactor)); + stats.put("dvm", Math.max(1L, (long) (estimatedSize * extrapolationFactor / 10))); + } + } + } + + /** + * Calculate PointValues statistics using Lucene's built-in APIs + */ + private void calculatePointValuesStats( + SegmentReader reader, + FieldInfo fieldInfo, + Map stats, + Map segmentFileSizes, + double extrapolationFactor + ) { + try { + PointValues pointValues = reader.getPointValues(fieldInfo.name); + if (pointValues == null) return; + + long numPoints = pointValues.size(); + long numDocs = pointValues.getDocCount(); + int bytesPerDim = pointValues.getBytesPerDimension(); + int numDims = pointValues.getNumIndexDimensions(); + + long pointsDataSize = numPoints * bytesPerDim * numDims; + long pointsIndexSize = Math.max(numDocs * 8, pointsDataSize / 4); + + if (pointsDataSize > 0) { + Long actualDimSize = segmentFileSizes.get("dim"); + Long actualDiiSize = segmentFileSizes.get("dii"); + + if (actualDimSize != null && actualDimSize > 0) { + long totalPointsSize = calculateTotalPointValuesSize(reader); + if (totalPointsSize > 0) { + long baseSize = (actualDimSize * pointsDataSize) / totalPointsSize; + stats.put("dim", (long) (baseSize * extrapolationFactor)); + if (actualDiiSize != null) { + long indexSize = (actualDiiSize * pointsDataSize) / totalPointsSize; + stats.put("dii", (long) (indexSize * extrapolationFactor)); + } else { + stats.put("dii", Math.max(1L, (long) (baseSize * extrapolationFactor / 4))); + } + } else { + stats.put("dim", (long) (pointsDataSize * extrapolationFactor)); + stats.put("dii", (long) (pointsIndexSize * extrapolationFactor)); + } + } else { + stats.put("dim", (long) (pointsDataSize * extrapolationFactor)); + stats.put("dii", (long) (pointsIndexSize * extrapolationFactor)); + } + } + } catch (IllegalArgumentException e) { + // Field doesn't have point values, skip silently + } catch (IOException e) { + logger.debug(() -> new ParameterizedMessage("Failed to calculate PointValues stats for field {}", fieldInfo.name), e); + } + } + + /** + * Calculate Terms statistics using Lucene's aggregate statistics APIs + */ + private void calculateTermStats( + SegmentReader reader, + FieldInfo fieldInfo, + Map stats, + Map segmentFileSizes, + double extrapolationFactor + ) throws IOException { + Terms terms = reader.terms(fieldInfo.name); + if (terms == null) return; + + long termCount = terms.size(); + long docCount = terms.getDocCount(); + long sumDocFreq = terms.getSumDocFreq(); + long sumTotalTermFreq = terms.getSumTotalTermFreq(); + long termDictSize; + long termIndexSize; + long positionsSize = 0; + + if (termCount > 0) { + // 15 bytes: average term length + overhead (based on Lucene's BytesRef) + // 8 bytes: doc ID (4 bytes) + frequency (4 bytes) per posting + termDictSize = termCount * 15; + if (sumDocFreq > 0) { + termDictSize += sumDocFreq * 8; + } else if (docCount > 0) { + termDictSize += docCount * 8; + } + termIndexSize = Math.max(1L, termDictSize / 10); + } else { + termDictSize = reader.maxDoc() * 20L; + termIndexSize = termDictSize / 10; + } + + if (fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0) { + if (sumTotalTermFreq > 0) { + positionsSize = sumTotalTermFreq * 4; + } else { + positionsSize = reader.maxDoc() * 8L; + } + } + + applyTermStatsWithAttribution(stats, termDictSize, termIndexSize, positionsSize, segmentFileSizes, reader, extrapolationFactor); + } + + /** + * Apply term statistics with proportional attribution to actual file sizes + */ + private void applyTermStatsWithAttribution( + Map stats, + long termDictSize, + long termIndexSize, + long positionsSize, + Map segmentFileSizes, + SegmentReader reader, + double extrapolationFactor + ) { + Long actualTimSize = segmentFileSizes.get("tim"); + Long actualTipSize = segmentFileSizes.get("tip"); + Long actualPosSize = segmentFileSizes.get("pos"); + + if (actualTimSize != null && actualTimSize > 0) { + long totalTermDictSize = calculateTotalTermDictSize(reader); + if (totalTermDictSize > 0) { + long baseSize = (actualTimSize * termDictSize) / totalTermDictSize; + stats.put("tim", (long) (baseSize * extrapolationFactor)); + } else { + stats.put("tim", (long) (termDictSize * extrapolationFactor)); + } + } else if (termDictSize > 0) { + stats.put("tim", (long) (termDictSize * extrapolationFactor)); + } + + if (actualTipSize != null && actualTipSize > 0) { + long timSize = stats.getOrDefault("tim", (long) (termDictSize * extrapolationFactor)); + // Index is typically 10% of dictionary size based on Lucene's FST compression + stats.put("tip", Math.max(1L, (long) (timSize / 10))); + } else if (termIndexSize > 0) { + stats.put("tip", (long) (termIndexSize * extrapolationFactor)); + } + + if (positionsSize > 0) { + if (actualPosSize != null && actualPosSize > 0) { + long totalPosSize = calculateTotalPositionsSize(reader); + if (totalPosSize > 0) { + long baseSize = (actualPosSize * positionsSize) / totalPosSize; + stats.put("pos", (long) (baseSize * extrapolationFactor)); + } else { + stats.put("pos", (long) (positionsSize * extrapolationFactor)); + } + } else { + stats.put("pos", (long) (positionsSize * extrapolationFactor)); + } + } + } + + /** + * Estimate DocValues size for a single field + */ + private long estimateFieldDocValuesSize(SegmentReader reader, FieldInfo fieldInfo) { + int maxDoc = reader.maxDoc(); + return switch (fieldInfo.getDocValuesType()) { + case BINARY -> maxDoc * 32L; + case SORTED, SORTED_SET -> maxDoc * 16L; + default -> maxDoc * 8L; + }; + } + + /** + * Calculate total DocValues size across all fields for proportional attribution + */ + private long calculateTotalDocValuesSize(SegmentReader reader) { + long total = 0; + for (FieldInfo fieldInfo : reader.getFieldInfos()) { + if (fieldInfo.getDocValuesType() != DocValuesType.NONE) { + total += estimateFieldDocValuesSize(reader, fieldInfo); + } + } + return total; + } + + /** + * Calculate total PointValues size across all fields for proportional attribution + */ + private long calculateTotalPointValuesSize(SegmentReader reader) { + long total = 0; + for (FieldInfo fieldInfo : reader.getFieldInfos()) { + if (fieldInfo.getPointDimensionCount() > 0) { + try { + PointValues pv = reader.getPointValues(fieldInfo.name); + if (pv != null) { + total += pv.size() * pv.getBytesPerDimension() * pv.getNumIndexDimensions(); + } + } catch (Exception e) { + // Skip field on error + } + } + } + return total; + } + + /** + * Calculate total term dictionary size across all fields for proportional attribution + */ + private long calculateTotalTermDictSize(SegmentReader reader) { + long total = 0; + for (FieldInfo fieldInfo : reader.getFieldInfos()) { + if (fieldInfo.getIndexOptions() != IndexOptions.NONE) { + try { + Terms terms = reader.terms(fieldInfo.name); + if (terms != null) { + long termCount = terms.size(); + long sumDocFreq = terms.getSumDocFreq(); + if (termCount > 0) { + total += termCount * 15; + if (sumDocFreq > 0) { + total += sumDocFreq * 8; + } + } + } + } catch (IOException e) { + // Skip field on error + } + } + } + return total; + } + + /** + * Calculate total positions size across all fields for proportional attribution + */ + private long calculateTotalPositionsSize(SegmentReader reader) { + long total = 0; + for (FieldInfo fieldInfo : reader.getFieldInfos()) { + if (fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0) { + try { + Terms terms = reader.terms(fieldInfo.name); + if (terms != null) { + long sumTotalTermFreq = terms.getSumTotalTermFreq(); + if (sumTotalTermFreq > 0) { + total += sumTotalTermFreq * 4; + } + } + } catch (IOException e) { + // Skip field on error + } + } + } + return total; + } + + /** + * Process fields (with optional sampling) + */ + private void processFields( + SegmentReader segmentReader, + Set fieldFilter, + Map segmentFileSizes, + Map> fieldLevelStats, + double extrapolationFactor + ) { + for (FieldInfo fieldInfo : segmentReader.getFieldInfos()) { + if (fieldFilter != null && !fieldFilter.contains(fieldInfo.name)) { + continue; + } + + Map fieldStats = new HashMap<>(); + + try { + if (fieldInfo.getDocValuesType() != DocValuesType.NONE) { + calculateDocValuesStats(segmentReader, fieldInfo, fieldStats, segmentFileSizes, extrapolationFactor); + } + if (fieldInfo.getPointDimensionCount() > 0) { + calculatePointValuesStats(segmentReader, fieldInfo, fieldStats, segmentFileSizes, extrapolationFactor); + } + if (fieldInfo.getIndexOptions() != IndexOptions.NONE) { + calculateTermStats(segmentReader, fieldInfo, fieldStats, segmentFileSizes, extrapolationFactor); + } + if (!fieldStats.isEmpty()) { + fieldLevelStats.put(fieldInfo.name, fieldStats); + } + + } catch (OutOfMemoryError e) { + handleOutOfMemoryError(fieldInfo, segmentReader, e); + continue; + } catch (Exception e) { + handleCalculationError(fieldInfo, segmentReader, e); + } + } + } + + /** + * Get actual segment size using SegmentCommitInfo + */ + private long estimateSegmentSize(SegmentReader reader) { + try { + SegmentCommitInfo segmentInfo = reader.getSegmentInfo(); + if (segmentInfo != null) { + return segmentInfo.sizeInBytes(); + } + } catch (IOException e) { + logger.debug("Failed to get segment size for {}: {}", reader.getSegmentName(), e.getMessage()); + } catch (Exception e) { + logger.trace("Error getting segment size, using estimation", e); + } + + // Fallback to estimation based on document count + return reader.maxDoc() * 10240L; // Assume 10KB per doc average + } + + /** + * Handle out of memory errors + */ + private void handleOutOfMemoryError(FieldInfo fieldInfo, SegmentReader reader, OutOfMemoryError e) { + logger.warn( + () -> new ParameterizedMessage( + "Out of memory calculating stats for field [{}] in segment [{}], clearing cache", + fieldInfo.name, + reader.getSegmentName() + ), + e + ); + cache.clear(); + } + + /** + * Handle general calculation errors + */ + private void handleCalculationError(FieldInfo fieldInfo, SegmentReader reader, Exception e) { + logger.debug( + () -> new ParameterizedMessage( + "Failed to calculate stats for field [{}] in segment [{}]", + fieldInfo.name, + reader.getSegmentName() + ), + e + ); + } + + /** + * Get the current sampling configuration + * @return Map containing threshold and sampling rate + */ + public Map getSamplingConfig() { + Map config = new HashMap<>(); + config.put("threshold_bytes", largeSegmentThreshold); + config.put("threshold_gb", largeSegmentThreshold / (1024.0 * 1024.0 * 1024.0)); + config.put("sampling_rate", samplingRate); + config.put("min_sample_fields", MIN_SAMPLE_FIELDS); + return config; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/FieldLevelStatsCache.java b/server/src/main/java/org/opensearch/index/engine/FieldLevelStatsCache.java new file mode 100644 index 0000000000000..7192c5332a3d2 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/FieldLevelStatsCache.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SegmentReader; +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; +import org.opensearch.common.unit.TimeValue; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Cache for field-level segment statistics to avoid recalculation + * Uses OpenSearch's Cache with TTL-based expiration + * + * @opensearch.internal + */ +public class FieldLevelStatsCache { + private static final Logger logger = LogManager.getLogger(FieldLevelStatsCache.class); + + // Cache configuration + private static final long DEFAULT_CACHE_SIZE = 100; // Max entries + private static final TimeValue DEFAULT_CACHE_EXPIRE = TimeValue.timeValueMinutes(30); + + // Cache implementation using OpenSearch's Cache builder + private final Cache>> cache; + + public FieldLevelStatsCache() { + this.cache = CacheBuilder.>>builder() + .setMaximumWeight(DEFAULT_CACHE_SIZE) + .setExpireAfterAccess(DEFAULT_CACHE_EXPIRE) + .build(); + } + + /** + * Get cached stats for a segment + * @param reader The segment reader + * @return Cached stats or null if not found + */ + public Map> get(SegmentReader reader) { + return cache.get(reader.getSegmentName()); + } + + /** + * Put stats into cache + * @param reader The segment reader + * @param fieldStats The calculated field statistics + */ + public void put(SegmentReader reader, Map> fieldStats) { + cache.put(reader.getSegmentName(), new ConcurrentHashMap<>(fieldStats)); + } + + /** + * Invalidate cache entry for a segment + * @param reader The segment reader + */ + public void invalidate(SegmentReader reader) { + cache.invalidate(reader.getSegmentName()); + } + + /** + * Clear all cache entries + */ + public void clear() { + cache.invalidateAll(); + logger.info("Cleared field-level stats cache"); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index fcc81335d4363..dd0820889d904 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -563,6 +563,20 @@ public String getHistoryUUID() { return historyUUID; } + @Override + public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { + return super.segmentsStats(includeSegmentFileSizes, false, includeUnloadedSegments); + } + + @Override + public SegmentsStats segmentsStats( + boolean includeSegmentFileSizes, + boolean includeFieldLevelSegmentFileSizes, + boolean includeUnloadedSegments + ) { + return super.segmentsStats(includeSegmentFileSizes, includeFieldLevelSegmentFileSizes, includeUnloadedSegments); + } + /** returns the force merge uuid for the engine */ @Nullable public String getForceMergeUUID() { diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 1beac84b3d516..be4f09f32db2b 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -144,6 +144,20 @@ public void onAfterTranslogSync() { } } + @Override + public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { + return super.segmentsStats(includeSegmentFileSizes, false, includeUnloadedSegments); + } + + @Override + public SegmentsStats segmentsStats( + boolean includeSegmentFileSizes, + boolean includeFieldLevelSegmentFileSizes, + boolean includeUnloadedSegments + ) { + return super.segmentsStats(includeSegmentFileSizes, includeFieldLevelSegmentFileSizes, includeUnloadedSegments); + } + public void cleanUnreferencedFiles() throws IOException { replicaFileTracker.deleteUnreferencedFiles(store.directory().listAll()); } diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index 0af2c8d72b1fd..af8df0cb48ebb 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -82,7 +82,7 @@ public NoOpEngine(EngineConfig config) { try (DirectoryReader reader = openDirectory(directory, config.getIndexSettings().isSoftDeleteEnabled(), config.getLeafSorter())) { for (LeafReaderContext ctx : reader.getContext().leaves()) { SegmentReader segmentReader = Lucene.segmentReader(ctx.reader()); - fillSegmentStats(segmentReader, true, segmentsStats); + fillSegmentStats(segmentReader, true, false, segmentsStats); } this.docsStats = docsStats(reader); } catch (IOException e) { @@ -137,16 +137,23 @@ public CacheHelper getReaderCacheHelper() { } @Override - public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { + public SegmentsStats segmentsStats( + boolean includeSegmentFileSizes, + boolean includeFieldLevelSegmentFileSizes, + boolean includeUnloadedSegments + ) { if (includeUnloadedSegments) { final SegmentsStats stats = new SegmentsStats(); stats.add(this.segmentsStats); if (includeSegmentFileSizes == false) { stats.clearFileSizes(); } + if (includeFieldLevelSegmentFileSizes == false) { + stats.clearFieldLevelFileSizes(); + } return stats; } else { - return super.segmentsStats(includeSegmentFileSizes, includeUnloadedSegments); + return super.segmentsStats(includeSegmentFileSizes, includeFieldLevelSegmentFileSizes, includeUnloadedSegments); } } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index eba074e27f764..88f8a2e025809 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -196,6 +196,20 @@ protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, fi return true; } + @Override + public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { + return super.segmentsStats(includeSegmentFileSizes, false, includeUnloadedSegments); + } + + @Override + public SegmentsStats segmentsStats( + boolean includeSegmentFileSizes, + boolean includeFieldLevelSegmentFileSizes, + boolean includeUnloadedSegments + ) { + return super.segmentsStats(includeSegmentFileSizes, includeFieldLevelSegmentFileSizes, includeUnloadedSegments); + } + @Override public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is verified when the read-only engine is opened, diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index 653907862b18c..77ac798385f77 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -64,6 +64,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment { private long maxUnsafeAutoIdTimestamp = Long.MIN_VALUE; private long bitsetMemoryInBytes; private final Map fileSizes; + private final Map> fieldLevelFileSizes; private final RemoteSegmentStats remoteSegmentStats; private static final ByteSizeValue ZERO_BYTE_SIZE_VALUE = new ByteSizeValue(0L); @@ -107,6 +108,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment { public SegmentsStats() { fileSizes = new HashMap<>(); + fieldLevelFileSizes = new HashMap<>(); remoteSegmentStats = new RemoteSegmentStats(); replicationStats = new ReplicationStats(); } @@ -129,6 +131,15 @@ public SegmentsStats(StreamInput in) throws IOException { bitsetMemoryInBytes = in.readLong(); maxUnsafeAutoIdTimestamp = in.readLong(); fileSizes = in.readMap(StreamInput::readString, StreamInput::readLong); + // Field-level file sizes (added in 3.2.0) + if (in.getVersion().onOrAfter(Version.V_3_2_0)) { + fieldLevelFileSizes = in.readMap( + StreamInput::readString, + input -> input.readMap(StreamInput::readString, StreamInput::readLong) + ); + } else { + fieldLevelFileSizes = new HashMap<>(); + } if (in.getVersion().onOrAfter(Version.V_2_10_0)) { remoteSegmentStats = in.readOptionalWriteable(RemoteSegmentStats::new); replicationStats = in.readOptionalWriteable(ReplicationStats::new); @@ -184,6 +195,7 @@ public void add(SegmentsStats mergeStats) { addVersionMapMemoryInBytes(mergeStats.versionMapMemoryInBytes); addBitsetMemoryInBytes(mergeStats.bitsetMemoryInBytes); addFileSizes(mergeStats.fileSizes); + addFieldLevelFileSizes(mergeStats.fieldLevelFileSizes); addRemoteSegmentStats(mergeStats.remoteSegmentStats); addReplicationStats(mergeStats.replicationStats); } @@ -275,6 +287,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); } builder.endObject(); + + // Add field-level file sizes if available + if (!fieldLevelFileSizes.isEmpty()) { + builder.startObject(Fields.FIELD_LEVEL_FILE_SIZES); + for (Map.Entry> fieldEntry : fieldLevelFileSizes.entrySet()) { + builder.startObject(fieldEntry.getKey()); + for (Map.Entry fileEntry : fieldEntry.getValue().entrySet()) { + builder.startObject(fileEntry.getKey()); + builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, new ByteSizeValue(fileEntry.getValue())); + builder.field(Fields.DESCRIPTION, FILE_DESCRIPTIONS.getOrDefault(fileEntry.getKey(), "Others")); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); return builder; } @@ -309,6 +338,7 @@ static final class Fields { static final String FIXED_BIT_SET = "fixed_bit_set"; static final String FIXED_BIT_SET_MEMORY_IN_BYTES = "fixed_bit_set_memory_in_bytes"; static final String FILE_SIZES = "file_sizes"; + static final String FIELD_LEVEL_FILE_SIZES = "field_level_file_sizes"; static final String SIZE = "size"; static final String SIZE_IN_BYTES = "size_in_bytes"; static final String DESCRIPTION = "description"; @@ -333,6 +363,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(bitsetMemoryInBytes); out.writeLong(maxUnsafeAutoIdTimestamp); out.writeMap(this.fileSizes, StreamOutput::writeString, StreamOutput::writeLong); + // Field-level file sizes (added in 3.2.0) + if (out.getVersion().onOrAfter(Version.V_3_2_0)) { + out.writeMap( + this.fieldLevelFileSizes, + StreamOutput::writeString, + (output, map) -> output.writeMap(map, StreamOutput::writeString, StreamOutput::writeLong) + ); + } if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeOptionalWriteable(remoteSegmentStats); out.writeOptionalWriteable(replicationStats); @@ -343,6 +381,20 @@ public void clearFileSizes() { fileSizes.clear(); } + public Map> getFieldLevelFileSizes() { + return fieldLevelFileSizes; + } + + public void addFieldLevelFileSizes(Map> fieldLevelStats) { + if (fieldLevelStats != null) { + this.fieldLevelFileSizes.putAll(fieldLevelStats); + } + } + + public void clearFieldLevelFileSizes() { + fieldLevelFileSizes.clear(); + } + /** * Used only for deprecating memory tracking in REST interface * todo remove in OpenSearch 3.0 diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9edbddbbc3684..a3e299fde7bde 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1564,8 +1564,16 @@ public MergeStats mergeStats() { return mergeStats; } - public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { - SegmentsStats segmentsStats = getEngine().segmentsStats(includeSegmentFileSizes, includeUnloadedSegments); + public SegmentsStats segmentStats( + boolean includeSegmentFileSizes, + boolean includeFieldLevelSegmentFileSizes, + boolean includeUnloadedSegments + ) { + SegmentsStats segmentsStats = getEngine().segmentsStats( + includeSegmentFileSizes, + includeFieldLevelSegmentFileSizes, + includeUnloadedSegments + ); segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes()); // Populate remote_store stats only if the index is remote store backed if (indexSettings().isAssignedOnRemoteNode()) { @@ -1579,6 +1587,14 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu return segmentsStats; } + /** + * Backwards-compatible overload retained for binary compatibility. + * Delegates to the new method with field-level stats disabled by default. + */ + public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) { + return segmentStats(includeSegmentFileSizes, false, includeUnloadedSegments); + } + public WarmerStats warmerStats() { return shardWarmerService.stats(); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java index 883c2827118e0..8cc925ad89f76 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -220,6 +220,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } if (nodesStatsRequest.indices().isSet(Flag.Segments)) { nodesStatsRequest.indices().includeSegmentFileSizes(request.paramAsBoolean("include_segment_file_sizes", false)); + nodesStatsRequest.indices() + .includeFieldLevelSegmentFileSizes(request.paramAsBoolean("include_field_level_segment_file_sizes", false)); } if (request.hasParam("include_all")) { nodesStatsRequest.indices().includeAllShardIndexingPressureTrackers(request.paramAsBoolean("include_all", false)); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java index 233b40f29384d..fb6f3477b4663 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsAction.java @@ -157,6 +157,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC if (indicesStatsRequest.segments()) { indicesStatsRequest.includeSegmentFileSizes(request.paramAsBoolean("include_segment_file_sizes", false)); + indicesStatsRequest.includeFieldLevelSegmentFileSizes(request.paramAsBoolean("include_field_level_segment_file_sizes", false)); indicesStatsRequest.includeUnloadedSegments(request.paramAsBoolean("include_unloaded_segments", false)); } diff --git a/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java b/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java index 8f33c3534423a..04933264408f2 100644 --- a/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java +++ b/server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java @@ -569,7 +569,7 @@ private IndexShard getShard(String indexName, TranslogStats translogStats, Integ ShardRouting shardRouting = mock(ShardRouting.class); when(shard.shardId()).thenReturn(shardId1); when(shard.translogStats()).thenReturn(translogStats); - when(shard.segmentStats(false, false)).thenReturn(segmentsStats); + when(shard.segmentStats(false, false, false)).thenReturn(segmentsStats); when(shard.routingEntry()).thenReturn(shardRouting); when(shardRouting.primary()).thenReturn(true); when(shard.state()).thenReturn(IndexShardState.STARTED); diff --git a/server/src/test/java/org/opensearch/index/engine/FieldLevelSegmentStatsCalculatorTests.java b/server/src/test/java/org/opensearch/index/engine/FieldLevelSegmentStatsCalculatorTests.java new file mode 100644 index 0000000000000..56aed863cf578 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/FieldLevelSegmentStatsCalculatorTests.java @@ -0,0 +1,260 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.IntPoint; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +/** + * Unit tests for FieldLevelSegmentStatsCalculator + */ +public class FieldLevelSegmentStatsCalculatorTests extends OpenSearchTestCase { + + /** + * Test core calculation functionality including basic stats, field filtering, + * proportional attribution, and edge cases + */ + public void testCoreCalculationAndFiltering() throws IOException { + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + // Add documents with various field types + for (int i = 0; i < 100; i++) { + Document doc = new Document(); + doc.add(new TextField("text_field", "content " + i + " with text", Field.Store.YES)); + doc.add(new StringField("keyword_field", "keyword" + i, Field.Store.NO)); + doc.add(new NumericDocValuesField("numeric_docvalues", i)); + doc.add(new IntPoint("int_point", i)); + doc.add(new SortedDocValuesField("sorted_docvalues", new BytesRef("sorted" + i))); + // Sparse field - only in some docs + if (i % 10 == 0) { + doc.add(new TextField("sparse_field", "sparse " + i, Field.Store.NO)); + } + writer.addDocument(doc); + } + writer.commit(); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + FieldLevelSegmentStatsCalculator calculator = new FieldLevelSegmentStatsCalculator(); + + // Test basic calculation + Map> stats = calculator.calculateFieldLevelStats(segmentReader); + assertNotNull("Stats should not be null", stats); + assertFalse("Stats should not be empty", stats.isEmpty()); + + // Verify all field types have appropriate stats + assertTrue("Should have text field", stats.containsKey("text_field")); + assertTrue("Should have keyword field", stats.containsKey("keyword_field")); + assertTrue("Should have numeric docvalues", stats.containsKey("numeric_docvalues")); + assertTrue("Should have sorted docvalues", stats.containsKey("sorted_docvalues")); + assertTrue("Should have point values", stats.containsKey("int_point")); + assertTrue("Should have sparse field", stats.containsKey("sparse_field")); + + // Verify proportional attribution (sparse field should be smaller) + Map sparseStats = stats.get("sparse_field"); + Map textStats = stats.get("text_field"); + assertTrue( + "Sparse field should have smaller stats", + sparseStats.getOrDefault("tim", 0L) < textStats.getOrDefault("tim", 0L) + ); + + // Test field filtering + Set filter = Set.of("text_field", "numeric_docvalues"); + Map> filteredStats = calculator.calculateFieldLevelStats(segmentReader, filter, false); + assertEquals("Should only have filtered fields", 2, filteredStats.size()); + assertTrue("Should have text_field", filteredStats.containsKey("text_field")); + assertTrue("Should have numeric_docvalues", filteredStats.containsKey("numeric_docvalues")); + assertFalse("Should not have keyword_field", filteredStats.containsKey("keyword_field")); + + // Test empty segment edge case + writer.deleteAll(); + writer.commit(); + } + } + } + + /** + * Test sampling for large segments with reduced complexity + */ + public void testSamplingAndLargeSegments() throws IOException { + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + // Create a large segment + for (int i = 0; i < 10000; i++) { + Document doc = new Document(); + doc.add(new TextField("text", "Document " + i, Field.Store.NO)); + doc.add(new NumericDocValuesField("numeric", i)); + for (int j = 0; j < 10; j++) { + doc.add(new TextField("field_" + j, "content_" + i, Field.Store.NO)); + } + writer.addDocument(doc); + } + writer.commit(); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + // Use low threshold to trigger sampling + FieldLevelSegmentStatsCalculator calculator = new FieldLevelSegmentStatsCalculator(1024 * 1024); + + Map> sampledStats = calculator.calculateFieldLevelStats(segmentReader, null, true); + assertNotNull("Sampled stats should not be null", sampledStats); + assertFalse("Sampled stats should not be empty", sampledStats.isEmpty()); + + // Verify sampling metadata + for (Map fieldStats : sampledStats.values()) { + assertEquals("Should be marked as sampled", 1L, (long) fieldStats.get("_sampled")); + assertEquals("Should show 10% sampling rate", 10L, (long) fieldStats.get("_sampling_rate")); + } + + // Verify reasonable performance + long start = System.currentTimeMillis(); + calculator.calculateFieldLevelStats(segmentReader, null, true); + long elapsed = System.currentTimeMillis() - start; + assertTrue("Sampling should be fast", elapsed < 5000); + } + } + } + + /** + * Test comprehensive error handling including memory constraints and edge cases + */ + public void testComprehensiveErrorHandling() throws IOException { + // Test with actual segments that trigger error handling paths + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + // Create a document + Document doc = new Document(); + doc.add(new TextField("test_field", "test content", Field.Store.NO)); + doc.add(new NumericDocValuesField("numeric", 42)); + writer.addDocument(doc); + writer.commit(); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + + // Test with very low memory threshold to trigger sampling + FieldLevelSegmentStatsCalculator lowMemCalculator = new FieldLevelSegmentStatsCalculator(1); + Map> sampledStats = lowMemCalculator.calculateFieldLevelStats(segmentReader, null, true); + + // Should handle memory constraints with sampling + assertNotNull("Should handle low memory with sampling", sampledStats); + for (Map fieldStats : sampledStats.values()) { + assertEquals("Should be marked as sampled", 1L, (long) fieldStats.get("_sampled")); + } + + // Test with standard calculator + FieldLevelSegmentStatsCalculator calculator = new FieldLevelSegmentStatsCalculator(); + Map> stats = calculator.calculateFieldLevelStats(segmentReader); + assertNotNull("Should calculate stats normally", stats); + + // Test with field filter on non-existent field + Set nonExistentFilter = Set.of("non_existent_field"); + Map> filteredStats = calculator.calculateFieldLevelStats(segmentReader, nonExistentFilter, false); + assertTrue("Should return empty for non-existent fields", filteredStats.isEmpty()); + + // Test with empty field filter + Set emptyFilter = Set.of(); + Map> emptyFilterStats = calculator.calculateFieldLevelStats(segmentReader, emptyFilter, false); + assertTrue("Should return empty for empty filter", emptyFilterStats.isEmpty()); + } + } + } + + /** + * Test caching behavior + */ + public void testCachingBehavior() throws IOException { + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + for (int i = 0; i < 50; i++) { + Document doc = new Document(); + doc.add(new TextField("field", "text " + i, Field.Store.YES)); + writer.addDocument(doc); + } + writer.commit(); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + FieldLevelSegmentStatsCalculator calculator = new FieldLevelSegmentStatsCalculator(); + + Map> stats1 = calculator.calculateFieldLevelStats(segmentReader); + Map> stats2 = calculator.calculateFieldLevelStats(segmentReader); + assertEquals("Cached results should match", stats1, stats2); + + // Field filtering should bypass cache + Set filter = Set.of("field"); + Map> filtered = calculator.calculateFieldLevelStats(segmentReader, filter, false); + assertEquals("Filtered stats should have one field", 1, filtered.size()); + } + } + } + + /** + * Test integration with stats classes including serialization and XContent + */ + public void testIntegrationWithStatsClasses() throws IOException { + // Test SegmentsStats integration and serialization + SegmentsStats stats = new SegmentsStats(); + Map> fieldStats = Map.of( + "text_field", + Map.of("tim", 1000L, "tip", 100L, "_sampled", 1L), + "numeric_field", + Map.of("dvd", 800L, "dvm", 80L) + ); + stats.addFieldLevelFileSizes(fieldStats); + + // Test binary serialization + BytesStreamOutput output = new BytesStreamOutput(); + stats.writeTo(output); + StreamInput input = output.bytes().streamInput(); + SegmentsStats deserialized = new SegmentsStats(input); + assertEquals("Deserialized stats should match", fieldStats, deserialized.getFieldLevelFileSizes()); + + // Test XContent serialization + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + stats.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + String json = builder.toString(); + assertTrue("JSON should contain field_level_file_sizes", json.contains("field_level_file_sizes")); + assertTrue("JSON should contain sampling metadata", json.contains("_sampled")); + + // Test CommonStatsFlags + CommonStatsFlags flags = new CommonStatsFlags(); + assertFalse("Should be false by default", flags.includeFieldLevelSegmentFileSizes()); + flags.includeFieldLevelSegmentFileSizes(true); + assertTrue("Should be true after setting", flags.includeFieldLevelSegmentFileSizes()); + + // Test IndicesStatsRequest + IndicesStatsRequest request = new IndicesStatsRequest(); + assertFalse("Should be false by default", request.includeFieldLevelSegmentFileSizes()); + request.includeFieldLevelSegmentFileSizes(true); + assertTrue("Should be true after setting", request.includeFieldLevelSegmentFileSizes()); + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/FieldLevelStatsCacheTests.java b/server/src/test/java/org/opensearch/index/engine/FieldLevelStatsCacheTests.java new file mode 100644 index 0000000000000..9e9dfa36ce1d4 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/FieldLevelStatsCacheTests.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.store.Directory; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Unit tests for FieldLevelStatsCache + */ +public class FieldLevelStatsCacheTests extends OpenSearchTestCase { + + /** + * Test basic cache put and get operations + */ + public void testBasicCacheOperations() throws IOException { + FieldLevelStatsCache cache = new FieldLevelStatsCache(); + + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + // Create a segment + Document doc = new Document(); + doc.add(new TextField("field", "content", Field.Store.YES)); + writer.addDocument(doc); + writer.commit(); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + + // Initially cache should return null + assertNull("Cache should be empty initially", cache.get(segmentReader)); + + // Put stats in cache + Map> stats = Map.of("field", Map.of("tim", 100L, "tip", 10L)); + cache.put(segmentReader, stats); + + // Get should return the stats + Map> cachedStats = cache.get(segmentReader); + assertNotNull("Should get cached stats", cachedStats); + assertEquals("Cached stats should match", stats, cachedStats); + } + } + } + + /** + * Test cache invalidation + */ + public void testCacheInvalidation() throws IOException { + FieldLevelStatsCache cache = new FieldLevelStatsCache(); + + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + + Document doc = new Document(); + doc.add(new TextField("field", "content", Field.Store.YES)); + writer.addDocument(doc); + writer.commit(); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + + // Put stats in cache + Map> stats = Map.of("field", Map.of("tim", 100L, "tip", 10L)); + cache.put(segmentReader, stats); + + // Verify stats are cached + assertNotNull("Should have cached stats", cache.get(segmentReader)); + + // Invalidate the cache entry + cache.invalidate(segmentReader); + + // Should return null after invalidation + assertNull("Should return null after invalidation", cache.get(segmentReader)); + } + } + } + + /** + * Test cache clear operation + */ + public void testCacheClear() throws IOException { + FieldLevelStatsCache cache = new FieldLevelStatsCache(); + + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + + // Create multiple segments + for (int i = 0; i < 3; i++) { + Document doc = new Document(); + doc.add(new TextField("field" + i, "content" + i, Field.Store.YES)); + writer.addDocument(doc); + writer.commit(); + } + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + // Cache stats for all segments + for (int i = 0; i < reader.leaves().size(); i++) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(i).reader(); + Map> stats = Map.of("field" + i, Map.of("tim", 100L * (i + 1), "tip", 10L * (i + 1))); + cache.put(segmentReader, stats); + } + + // Verify all are cached + for (int i = 0; i < reader.leaves().size(); i++) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(i).reader(); + assertNotNull("Should have cached stats for segment " + i, cache.get(segmentReader)); + } + + // Clear cache + cache.clear(); + + // Verify all are cleared + for (int i = 0; i < reader.leaves().size(); i++) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(i).reader(); + assertNull("Should have no cached stats after clear", cache.get(segmentReader)); + } + } + } + } + + /** + * Test cache with multiple segments from same index + */ + public void testMultipleSegments() throws IOException { + FieldLevelStatsCache cache = new FieldLevelStatsCache(); + + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + + // Create multiple segments + for (int i = 0; i < 5; i++) { + Document doc = new Document(); + doc.add(new TextField("field", "content " + i, Field.Store.YES)); + writer.addDocument(doc); + if (i % 2 == 0) { + writer.commit(); // Create new segment every 2 docs + } + } + writer.commit(); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + assertTrue("Should have multiple segments", reader.leaves().size() > 1); + + // Cache different stats for each segment + for (int i = 0; i < reader.leaves().size(); i++) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(i).reader(); + Map> stats = Map.of("field", Map.of("tim", 100L * (i + 1), "tip", 10L * (i + 1))); + cache.put(segmentReader, stats); + } + + // Verify each segment has its own cached stats + for (int i = 0; i < reader.leaves().size(); i++) { + SegmentReader segmentReader = (SegmentReader) reader.leaves().get(i).reader(); + Map> cachedStats = cache.get(segmentReader); + assertNotNull("Should have cached stats for segment " + i, cachedStats); + + // Verify the stats are correct for this segment + long expectedTim = 100L * (i + 1); + assertEquals("Stats should match for segment " + i, expectedTim, (long) cachedStats.get("field").get("tim")); + } + } + } + } + + /** + * Test concurrent cache access + */ + public void testConcurrentAccess() throws Exception { + final FieldLevelStatsCache cache = new FieldLevelStatsCache(); + final int numThreads = 10; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch completeLatch = new CountDownLatch(numThreads); + final AtomicInteger errors = new AtomicInteger(0); + + try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER))) { + + Document doc = new Document(); + doc.add(new TextField("field", "content", Field.Store.YES)); + writer.addDocument(doc); + writer.commit(); + + try (DirectoryReader reader = DirectoryReader.open(writer)) { + final SegmentReader segmentReader = (SegmentReader) reader.leaves().get(0).reader(); + final Map> stats = Map.of("field", Map.of("tim", 100L, "tip", 10L)); + + // Create threads that will access cache concurrently + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + new Thread(() -> { + try { + startLatch.await(); + + // Each thread performs multiple operations + for (int j = 0; j < 100; j++) { + if (threadId % 3 == 0) { + // Some threads just read + cache.get(segmentReader); + } else if (threadId % 3 == 1) { + // Some threads write + cache.put(segmentReader, stats); + } else { + // Some threads invalidate + if (j % 10 == 0) { + cache.invalidate(segmentReader); + } else { + cache.get(segmentReader); + } + } + } + } catch (Exception e) { + errors.incrementAndGet(); + logger.error("Error in concurrent test", e); + } finally { + completeLatch.countDown(); + } + }).start(); + } + + // Start all threads at once + startLatch.countDown(); + + // Wait for completion + assertTrue("Threads should complete", completeLatch.await(10, TimeUnit.SECONDS)); + assertEquals("Should have no errors", 0, errors.get()); + + // Cache should still be functional + cache.put(segmentReader, stats); + assertEquals("Cache should still work", stats, cache.get(segmentReader)); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index d004d5aa90eac..d3d6d41aaca18 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -552,13 +552,13 @@ public void testSegmentsWithNestedFieldIndexSortWithMerge() throws Exception { public void testSegmentsStatsIncludingFileSizes() throws Exception { try (Store store = createStore(); Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { - assertThat(engine.segmentsStats(true, false).getFileSizes().size(), equalTo(0)); + assertThat(engine.segmentsStats(true, false, false).getFileSizes().size(), equalTo(0)); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); engine.refresh("test"); - SegmentsStats stats = engine.segmentsStats(true, false); + SegmentsStats stats = engine.segmentsStats(true, false, false); assertThat(stats.getFileSizes().size(), greaterThan(0)); assertThat(() -> stats.getFileSizes().values().iterator(), everyItem(greaterThan(0L))); @@ -568,7 +568,30 @@ public void testSegmentsStatsIncludingFileSizes() throws Exception { engine.index(indexForDoc(doc2)); engine.refresh("test"); - assertThat(engine.segmentsStats(true, false).getFileSizes().get(firstEntry.getKey()), greaterThan(firstEntry.getValue())); + assertThat( + engine.segmentsStats(true, false, false).getFileSizes().get(firstEntry.getKey()), + greaterThan(firstEntry.getValue()) + ); + } + } + + public void testFieldLevelStatsBackwardCompatibility() throws Exception { + // Test that the old method signature still works + try (Store store = createStore(); Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + + // Old method signature should work and not include field-level stats + SegmentsStats stats = engine.segmentsStats(true, false); + assertNotNull("Stats should not be null", stats); + assertTrue("Should have file sizes", stats.getFileSizes().size() > 0); + assertTrue("Should not have field-level stats with old method", stats.getFieldLevelFileSizes().isEmpty()); + + // New method signature with field-level stats disabled + SegmentsStats newStats = engine.segmentsStats(true, false, false); + assertEquals("Should have same file sizes", stats.getFileSizes(), newStats.getFileSizes()); + assertTrue("Should not have field-level stats when disabled", newStats.getFieldLevelFileSizes().isEmpty()); } } @@ -5090,7 +5113,10 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { globalCheckpoint::get ); try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = createEngine(configSupplier.apply(store))) { - assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); + assertEquals( + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, + engine.segmentsStats(false, false, false).getMaxUnsafeAutoIdTimestamp() + ); final ParsedDocument doc = testParsedDocument( "1", null, @@ -5099,13 +5125,16 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { null ); engine.index(appendOnlyPrimary(doc, true, timestamp1)); - assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); + assertEquals(timestamp1, engine.segmentsStats(false, false, false).getMaxUnsafeAutoIdTimestamp()); } try (Store store = createStore(newFSDirectory(storeDir)); InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { - assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); + assertEquals( + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, + engine.segmentsStats(false, false, false).getMaxUnsafeAutoIdTimestamp() + ); TranslogHandler translogHandler = createTranslogHandler(configSupplier.apply(store).getIndexSettings(), engine); engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); - assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); + assertEquals(timestamp1, engine.segmentsStats(false, false, false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument( "1", null, @@ -5114,7 +5143,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { null ); engine.index(appendOnlyPrimary(doc, true, timestamp2, false)); - assertEquals(maxTimestamp12, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); + assertEquals(maxTimestamp12, engine.segmentsStats(false, false, false).getMaxUnsafeAutoIdTimestamp()); globalCheckpoint.set(1); // make sure flush cleans up commits for later. engine.flush(); } @@ -5129,7 +5158,7 @@ public void testEngineMaxTimestampIsInitialized() throws IOException { store.associateIndexWithNewTranslog(translogUUID); } try (Engine engine = new InternalEngine(configSupplier.apply(store))) { - assertEquals(maxTimestamp12, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); + assertEquals(maxTimestamp12, engine.segmentsStats(false, false, false).getMaxUnsafeAutoIdTimestamp()); } } } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 57509c5daa2b1..1c5800e7108b3 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -390,7 +390,7 @@ public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Excep // refresh to create a lot of segments. engine.refresh("test"); } - assertEquals(2, engine.segmentsStats(false, false).getCount()); + assertEquals(2, engine.segmentsStats(false, false, false).getCount()); // wipe the nrt directory initially so we can sync with primary. Lucene.cleanLuceneIndex(nrtEngineStore.directory()); assertFalse( @@ -409,7 +409,7 @@ public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Excep // merge primary down to 1 segment engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); // we expect a 3rd segment to be created after merge. - assertEquals(3, engine.segmentsStats(false, false).getCount()); + assertEquals(3, engine.segmentsStats(false, false, false).getCount()); final Collection latestPrimaryFiles = engine.getLatestSegmentInfos().files(false); // copy new segments in and load reader. diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java index 423d246115a9a..63f5b95d0e012 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java @@ -168,7 +168,7 @@ public void testNoOpEngineStats() throws Exception { final SegmentsStats expectedSegmentStats; try (InternalEngine engine = createEngine(config)) { expectedDocStats = engine.docStats(); - expectedSegmentStats = engine.segmentsStats(includeFileSize, true); + expectedSegmentStats = engine.segmentsStats(includeFileSize, false, true); } try (NoOpEngine noOpEngine = new NoOpEngine(config)) { @@ -176,14 +176,14 @@ public void testNoOpEngineStats() throws Exception { assertEquals(expectedDocStats.getDeleted(), noOpEngine.docStats().getDeleted()); assertEquals(expectedDocStats.getTotalSizeInBytes(), noOpEngine.docStats().getTotalSizeInBytes()); assertEquals(expectedDocStats.getAverageSizeInBytes(), noOpEngine.docStats().getAverageSizeInBytes()); - assertEquals(expectedSegmentStats.getCount(), noOpEngine.segmentsStats(includeFileSize, true).getCount()); + assertEquals(expectedSegmentStats.getCount(), noOpEngine.segmentsStats(includeFileSize, false, true).getCount()); // don't compare memory in bytes since we load the index with term-dict off-heap assertEquals( expectedSegmentStats.getFileSizes().size(), - noOpEngine.segmentsStats(includeFileSize, true).getFileSizes().size() + noOpEngine.segmentsStats(includeFileSize, false, true).getFileSizes().size() ); - assertEquals(0, noOpEngine.segmentsStats(includeFileSize, false).getFileSizes().size()); + assertEquals(0, noOpEngine.segmentsStats(includeFileSize, false, false).getFileSizes().size()); } catch (AssertionError e) { logger.error(config.getMergePolicy()); throw e; diff --git a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java index afe306625b6bc..482a1a21ba872 100644 --- a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java @@ -257,8 +257,8 @@ public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception { IndexShard replica = shards.addReplica(); shards.recoverReplica(replica); - SegmentsStats segmentsStats = replica.segmentStats(false, false); - SegmentsStats primarySegmentStats = shards.getPrimary().segmentStats(false, false); + SegmentsStats segmentsStats = replica.segmentStats(false, false, false); + SegmentsStats primarySegmentStats = shards.getPrimary().segmentStats(false, false, false); assertNotEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, primarySegmentStats.getMaxUnsafeAutoIdTimestamp()); assertEquals(primarySegmentStats.getMaxUnsafeAutoIdTimestamp(), segmentsStats.getMaxUnsafeAutoIdTimestamp()); assertNotEquals(Long.MAX_VALUE, segmentsStats.getMaxUnsafeAutoIdTimestamp()); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 1beb6413075d0..61245ff1cd8ae 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -108,6 +108,7 @@ import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.engine.ReadOnlyEngine; +import org.opensearch.index.engine.SegmentsStats; import org.opensearch.index.fielddata.FieldDataStats; import org.opensearch.index.fielddata.IndexFieldData; import org.opensearch.index.fielddata.IndexFieldDataCache; @@ -1727,6 +1728,47 @@ public void testShardStats() throws IOException { closeShards(shard); } + public void testSegmentStatsWithFieldLevelFileSizes() throws Exception { + // Test the new segmentStats overload with field-level statistics + IndexShard shard = newStartedShard(true); + try { + // Index some documents with various fields + for (int i = 0; i < 10; i++) { + indexDoc(shard, "_doc", String.valueOf(i), "{\"field1\":\"value" + i + "\",\"field2\":" + i + "}"); + } + shard.refresh("test"); + + // Test new method with field-level stats enabled + SegmentsStats fieldLevelStats = shard.segmentStats(false, true, false); + assertNotNull("Segment stats should not be null", fieldLevelStats); + Map> fieldStats = fieldLevelStats.getFieldLevelFileSizes(); + assertNotNull("Field-level stats should not be null", fieldStats); + assertFalse("Field-level stats should not be empty after indexing", fieldStats.isEmpty()); + + // Verify we have stats for the indexed fields + assertTrue("Should have stats for field1 or field2", fieldStats.containsKey("field1") || fieldStats.containsKey("field2")); + + // Test backward compatibility with old method signature + SegmentsStats oldMethodStats = shard.segmentStats(true, false); + assertNotNull("Old method should still work", oldMethodStats); + assertTrue("Old method should not include field-level stats", oldMethodStats.getFieldLevelFileSizes().isEmpty()); + + // Test with both flags enabled + SegmentsStats combinedStats = shard.segmentStats(true, true, false); + assertNotNull("Combined stats should not be null", combinedStats); + assertFalse("Should have file sizes", combinedStats.getFileSizes().isEmpty()); + assertFalse("Should have field-level file sizes", combinedStats.getFieldLevelFileSizes().isEmpty()); + + // Force merge to create a single segment and test again + shard.forceMerge(new ForceMergeRequest().maxNumSegments(1)); + SegmentsStats afterMergeStats = shard.segmentStats(false, true, false); + assertNotNull("Stats after merge should not be null", afterMergeStats); + assertFalse("Field stats should exist after merge", afterMergeStats.getFieldLevelFileSizes().isEmpty()); + } finally { + closeShards(shard); + } + } + public void testShardStatsWithFailures() throws IOException { allowShardFailures(); final ShardId shardId = new ShardId("index", "_na_", 0); diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index a4598c4294a33..d0f32c06c4fbd 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -663,7 +663,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); - when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.isRelocatedPrimary()).thenReturn(true); when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class)); doAnswer(invocation -> { @@ -759,7 +759,7 @@ public void testThrowExceptionOnNoTargetInRouting() throws IOException { final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); - when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.isRelocatedPrimary()).thenReturn(false); final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class); final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class); @@ -860,7 +860,7 @@ public void testTargetInRoutingInSecondAttempt() throws IOException { final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); - when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.isRelocatedPrimary()).thenReturn(false); when(shard.getRetentionLeases()).thenReturn(mock(RetentionLeases.class)); final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 7f913f3c8596a..bb7803274b4e5 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -123,7 +123,7 @@ public void testThrowExceptionOnNoTargetInRouting() throws IOException { final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); - when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.isRelocatedPrimary()).thenReturn(false); final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class); final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class); diff --git a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsActionTests.java index 8b2aa4867d404..bdb0a54427224 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestIndicesStatsActionTests.java @@ -94,4 +94,38 @@ public void testAllRequestWithOtherMetrics() throws IOException { assertThat(e, hasToString(containsString("request [/_stats] contains _all and individual metrics [_all," + metric + "]"))); } + /** + * Test parsing of include_field_level_segment_file_sizes parameter + */ + public void testFieldLevelSegmentFileSizesParameter() throws IOException { + // Test with parameter set to true + HashMap paramsTrue = new HashMap<>(); + paramsTrue.put("include_field_level_segment_file_sizes", "true"); + RestRequest requestTrue = new FakeRestRequest.Builder(xContentRegistry()).withPath("/_stats/segments") + .withParams(paramsTrue) + .build(); + + action.prepareRequest(requestTrue, mock(NodeClient.class)); + + // Test with parameter set to false + HashMap paramsFalse = new HashMap<>(); + paramsFalse.put("include_field_level_segment_file_sizes", "false"); + RestRequest requestFalse = new FakeRestRequest.Builder(xContentRegistry()).withPath("/_stats/segments") + .withParams(paramsFalse) + .build(); + + action.prepareRequest(requestFalse, mock(NodeClient.class)); + + // Test with multiple segment parameters together + HashMap paramsMultiple = new HashMap<>(); + paramsMultiple.put("include_segment_file_sizes", "true"); + paramsMultiple.put("include_field_level_segment_file_sizes", "true"); + paramsMultiple.put("include_unloaded_segments", "true"); + RestRequest requestMultiple = new FakeRestRequest.Builder(xContentRegistry()).withPath("/_stats/segments") + .withParams(paramsMultiple) + .build(); + + action.prepareRequest(requestMultiple, mock(NodeClient.class)); + } + }