Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.cluster.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -36,6 +37,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeService;
Expand Down Expand Up @@ -96,13 +99,23 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
shardsStats.add(
new ShardStats(
indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
indexShard.commitStats(),
indexShard.seqNoStats()));
commitStats,
seqNoStats));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -167,57 +168,61 @@ public CommonStats(CommonStatsFlags flags) {
public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
CommonStatsFlags.Flag[] setFlags = flags.getFlags();
for (CommonStatsFlags.Flag flag : setFlags) {
switch (flag) {
case Docs:
docs = indexShard.docStats();
break;
case Store:
store = indexShard.storeStats();
break;
case Indexing:
indexing = indexShard.indexingStats(flags.types());
break;
case Get:
get = indexShard.getStats();
break;
case Search:
search = indexShard.searchStats(flags.groups());
break;
case Merge:
merge = indexShard.mergeStats();
break;
case Refresh:
refresh = indexShard.refreshStats();
break;
case Flush:
flush = indexShard.flushStats();
break;
case Warmer:
warmer = indexShard.warmerStats();
break;
case QueryCache:
queryCache = indicesQueryCache.getStats(indexShard.shardId());
break;
case FieldData:
fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
break;
case Completion:
completion = indexShard.completionStats(flags.completionDataFields());
break;
case Segments:
segments = indexShard.segmentStats(flags.includeSegmentFileSizes());
break;
case Translog:
translog = indexShard.translogStats();
break;
case RequestCache:
requestCache = indexShard.requestCache().stats();
break;
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
try {
switch (flag) {
case Docs:
docs = indexShard.docStats();
break;
case Store:
store = indexShard.storeStats();
break;
case Indexing:
indexing = indexShard.indexingStats(flags.types());
break;
case Get:
get = indexShard.getStats();
break;
case Search:
search = indexShard.searchStats(flags.groups());
break;
case Merge:
merge = indexShard.mergeStats();
break;
case Refresh:
refresh = indexShard.refreshStats();
break;
case Flush:
flush = indexShard.flushStats();
break;
case Warmer:
warmer = indexShard.warmerStats();
break;
case QueryCache:
queryCache = indicesQueryCache.getStats(indexShard.shardId());
break;
case FieldData:
fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
break;
case Completion:
completion = indexShard.completionStats(flags.completionDataFields());
break;
case Segments:
segments = indexShard.segmentStats(flags.includeSegmentFileSizes());
break;
case Translog:
translog = indexShard.translogStats();
break;
case RequestCache:
requestCache = indexShard.requestCache().stats();
break;
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public CommonStats getStats() {
return this.commonStats;
}

@Nullable
public CommitStats getCommitStats() {
return this.commitStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
Expand All @@ -33,6 +34,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -100,7 +103,17 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh
}

CommonStats commonStats = new CommonStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats,
indexShard.commitStats(), indexShard.seqNoStats());
commitStats, seqNoStats);
}
}
16 changes: 6 additions & 10 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,21 +868,19 @@ public DocsStats docStats() {
}

/**
* @return {@link CommitStats} if engine is open, otherwise null
* @return {@link CommitStats}
* @throws AlreadyClosedException if shard is closed
*/
@Nullable
public CommitStats commitStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.commitStats();
return getEngine().commitStats();
}

/**
* @return {@link SeqNoStats} if engine is open, otherwise null
* @return {@link SeqNoStats}
* @throws AlreadyClosedException if shard is closed
*/
@Nullable
public SeqNoStats seqNoStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
return getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
}

public IndexingStats indexingStats(String... types) {
Expand Down Expand Up @@ -912,8 +910,6 @@ public StoreStats storeStats() {
return store.stats();
} catch (IOException e) {
throw new ElasticsearchException("io exception while building 'store stats'", e);
} catch (AlreadyClosedException ex) {
return null; // already closed
}
}

Expand Down
17 changes: 15 additions & 2 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
Expand All @@ -91,6 +92,7 @@
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -333,13 +335,24 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
return null;
}

CommitStats commitStats;
SeqNoStats seqNoStats;
try {
commitStats = indexShard.commitStats();
seqNoStats = indexShard.seqNoStats();
} catch (AlreadyClosedException e) {
// shard is closed - no stats is fine
commitStats = null;
seqNoStats = null;
}

return new IndexShardStats(indexShard.shardId(),
new ShardStats[] {
new ShardStats(indexShard.routingEntry(),
indexShard.shardPath(),
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
indexShard.commitStats(),
indexShard.seqNoStats())
commitStats,
seqNoStats)
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineTestCase;
Expand All @@ -88,6 +89,7 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -3082,4 +3084,36 @@ public void onShardInactive(IndexShard indexShard) {
closeShards(primary);
}

public void testOnCloseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);

for (int i = 0; i < 3; i++) {
indexDoc(indexShard, "_doc", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
indexShard.refresh("test"); // produce segments
}

// check stats on closed and on opened shard
if (randomBoolean()) {
closeShards(indexShard);

expectThrows(AlreadyClosedException.class, () -> indexShard.seqNoStats());
expectThrows(AlreadyClosedException.class, () -> indexShard.commitStats());
expectThrows(AlreadyClosedException.class, () -> indexShard.storeStats());

} else {
final SeqNoStats seqNoStats = indexShard.seqNoStats();
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(2L));

final CommitStats commitStats = indexShard.commitStats();
assertThat(commitStats.getGeneration(), equalTo(2L));

final StoreStats storeStats = indexShard.storeStats();

assertThat(storeStats.sizeInBytes(), greaterThan(0L));

closeShards(indexShard);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1111,17 +1111,21 @@ private void assertSameSyncIdSameDocs() {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
CommitStats commitStats = indexShard.commitStats();
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
try {
CommitStats commitStats = indexShard.commitStats();
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
if (syncId != null) {
long liveDocsOnShard = commitStats.getNumDocs();
if (docsOnShards.get(syncId) != null) {
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
assertThat("sync id is equal but number of docs does not match on node "
+ nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got "
+ liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
} else {
docsOnShards.put(syncId, liveDocsOnShard);
}
}
} catch (AlreadyClosedException e) {
// the engine is closed or if the shard is recovering
}
}
}
Expand Down