Skip to content

Commit 3c5e054

Browse files
Implementing pagination for _cat/shards (#14641) (#16396)
Signed-off-by: Harsh Garg <[email protected]>
1 parent b817793 commit 3c5e054

34 files changed

+1691
-186
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
2626
- Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383))
2727
- Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387)
28+
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))
2829

2930
### Dependencies
3031
- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858))

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsActionIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.opensearch.action.admin.cluster.shards;
1010

11-
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1211
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
1312
import org.opensearch.cluster.metadata.IndexMetadata;
1413
import org.opensearch.cluster.routing.ShardRouting;
@@ -51,9 +50,9 @@ public void testCatShardsWithSuccessResponse() throws InterruptedException {
5150
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
5251
@Override
5352
public void onResponse(CatShardsResponse catShardsResponse) {
54-
ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse();
53+
List<ShardRouting> shardRoutings = catShardsResponse.getResponseShards();
5554
IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse();
56-
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
55+
for (ShardRouting shard : shardRoutings) {
5756
assertEquals("test", shard.getIndexName());
5857
assertNotNull(indicesStatsResponse.asMap().get(shard));
5958
}

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@
461461
import org.opensearch.rest.action.list.AbstractListAction;
462462
import org.opensearch.rest.action.list.RestIndicesListAction;
463463
import org.opensearch.rest.action.list.RestListAction;
464+
import org.opensearch.rest.action.list.RestShardsListAction;
464465
import org.opensearch.rest.action.search.RestClearScrollAction;
465466
import org.opensearch.rest.action.search.RestCountAction;
466467
import org.opensearch.rest.action.search.RestCreatePitAction;
@@ -979,6 +980,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
979980

980981
// LIST API
981982
registerHandler.accept(new RestIndicesListAction(responseLimitSettings));
983+
registerHandler.accept(new RestShardsListAction());
982984

983985
// Point in time API
984986
registerHandler.accept(new RestCreatePitAction());

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsRequest.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88

99
package org.opensearch.action.admin.cluster.shards;
1010

11+
import org.opensearch.Version;
1112
import org.opensearch.action.ActionRequestValidationException;
13+
import org.opensearch.action.pagination.PageParams;
1214
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
1315
import org.opensearch.common.unit.TimeValue;
1416
import org.opensearch.core.common.io.stream.StreamInput;
17+
import org.opensearch.core.common.io.stream.StreamOutput;
1518
import org.opensearch.core.tasks.TaskId;
1619
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;
1720

@@ -27,13 +30,39 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq
2730

2831
private String[] indices;
2932
private TimeValue cancelAfterTimeInterval;
33+
private PageParams pageParams = null;
3034
private boolean requestLimitCheckSupported;
3135

3236
public CatShardsRequest() {}
3337

3438
public CatShardsRequest(StreamInput in) throws IOException {
3539
super(in);
36-
this.requestLimitCheckSupported = false;
40+
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
41+
indices = in.readStringArray();
42+
cancelAfterTimeInterval = in.readOptionalTimeValue();
43+
if (in.readBoolean()) {
44+
pageParams = new PageParams(in);
45+
}
46+
requestLimitCheckSupported = in.readBoolean();
47+
}
48+
}
49+
50+
@Override
51+
public void writeTo(StreamOutput out) throws IOException {
52+
super.writeTo(out);
53+
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
54+
if (indices == null) {
55+
out.writeVInt(0);
56+
} else {
57+
out.writeStringArray(indices);
58+
}
59+
out.writeOptionalTimeValue(cancelAfterTimeInterval);
60+
out.writeBoolean(pageParams != null);
61+
if (pageParams != null) {
62+
pageParams.writeTo(out);
63+
}
64+
out.writeBoolean(requestLimitCheckSupported);
65+
}
3766
}
3867

3968
@Override
@@ -57,6 +86,14 @@ public TimeValue getCancelAfterTimeInterval() {
5786
return this.cancelAfterTimeInterval;
5887
}
5988

89+
public void setPageParams(PageParams pageParams) {
90+
this.pageParams = pageParams;
91+
}
92+
93+
public PageParams getPageParams() {
94+
return pageParams;
95+
}
96+
6097
public void setRequestLimitCheckSupported(final boolean requestLimitCheckSupported) {
6198
this.requestLimitCheckSupported = requestLimitCheckSupported;
6299
}

server/src/main/java/org/opensearch/action/admin/cluster/shards/CatShardsResponse.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@
88

99
package org.opensearch.action.admin.cluster.shards;
1010

11-
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
11+
import org.opensearch.Version;
1212
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
13+
import org.opensearch.action.pagination.PageToken;
14+
import org.opensearch.cluster.node.DiscoveryNodes;
15+
import org.opensearch.cluster.routing.ShardRouting;
1316
import org.opensearch.core.action.ActionResponse;
1417
import org.opensearch.core.common.io.stream.StreamInput;
1518
import org.opensearch.core.common.io.stream.StreamOutput;
1619

1720
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.List;
1823

1924
/**
2025
* A response of a cat shards request.
@@ -23,28 +28,44 @@
2328
*/
2429
public class CatShardsResponse extends ActionResponse {
2530

26-
private ClusterStateResponse clusterStateResponse = null;
27-
28-
private IndicesStatsResponse indicesStatsResponse = null;
31+
private IndicesStatsResponse indicesStatsResponse;
32+
private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
33+
private List<ShardRouting> responseShards = new ArrayList<>();
34+
private PageToken pageToken;
2935

3036
public CatShardsResponse() {}
3137

3238
public CatShardsResponse(StreamInput in) throws IOException {
3339
super(in);
40+
indicesStatsResponse = new IndicesStatsResponse(in);
41+
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
42+
nodes = DiscoveryNodes.readFrom(in, null);
43+
responseShards = in.readList(ShardRouting::new);
44+
if (in.readBoolean()) {
45+
pageToken = new PageToken(in);
46+
}
47+
}
3448
}
3549

3650
@Override
3751
public void writeTo(StreamOutput out) throws IOException {
38-
clusterStateResponse.writeTo(out);
3952
indicesStatsResponse.writeTo(out);
53+
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
54+
nodes.writeToWithAttribute(out);
55+
out.writeList(responseShards);
56+
out.writeBoolean(pageToken != null);
57+
if (pageToken != null) {
58+
pageToken.writeTo(out);
59+
}
60+
}
4061
}
4162

42-
public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
43-
this.clusterStateResponse = clusterStateResponse;
63+
public void setNodes(DiscoveryNodes nodes) {
64+
this.nodes = nodes;
4465
}
4566

46-
public ClusterStateResponse getClusterStateResponse() {
47-
return this.clusterStateResponse;
67+
public DiscoveryNodes getNodes() {
68+
return this.nodes;
4869
}
4970

5071
public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
@@ -54,4 +75,20 @@ public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
5475
public IndicesStatsResponse getIndicesStatsResponse() {
5576
return this.indicesStatsResponse;
5677
}
78+
79+
public void setResponseShards(List<ShardRouting> responseShards) {
80+
this.responseShards = responseShards;
81+
}
82+
83+
public List<ShardRouting> getResponseShards() {
84+
return this.responseShards;
85+
}
86+
87+
public void setPageToken(PageToken pageToken) {
88+
this.pageToken = pageToken;
89+
}
90+
91+
public PageToken getPageToken() {
92+
return this.pageToken;
93+
}
5794
}

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportCatShardsAction.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1313
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
1414
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
15+
import org.opensearch.action.pagination.PageParams;
16+
import org.opensearch.action.pagination.ShardPaginationStrategy;
1517
import org.opensearch.action.support.ActionFilters;
1618
import org.opensearch.action.support.HandledTransportAction;
1719
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
@@ -57,7 +59,11 @@ public void doExecute(Task parentTask, CatShardsRequest shardsRequest, ActionLis
5759
clusterStateRequest.setShouldCancelOnTimeout(true);
5860
clusterStateRequest.local(shardsRequest.local());
5961
clusterStateRequest.clusterManagerNodeTimeout(shardsRequest.clusterManagerNodeTimeout());
60-
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
62+
if (Objects.isNull(shardsRequest.getPageParams())) {
63+
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices());
64+
} else {
65+
clusterStateRequest.clear().nodes(true).routingTable(true).indices(shardsRequest.getIndices()).metadata(true);
66+
}
6167
assert parentTask instanceof CancellableTask;
6268
clusterStateRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
6369

@@ -87,13 +93,26 @@ protected void innerOnFailure(Exception e) {
8793
@Override
8894
public void onResponse(ClusterStateResponse clusterStateResponse) {
8995
validateRequestLimit(shardsRequest, clusterStateResponse, cancellableListener);
90-
catShardsResponse.setClusterStateResponse(clusterStateResponse);
91-
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
92-
indicesStatsRequest.setShouldCancelOnTimeout(true);
93-
indicesStatsRequest.all();
94-
indicesStatsRequest.indices(shardsRequest.getIndices());
95-
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
9696
try {
97+
ShardPaginationStrategy paginationStrategy = getPaginationStrategy(
98+
shardsRequest.getPageParams(),
99+
clusterStateResponse
100+
);
101+
String[] indices = Objects.isNull(paginationStrategy)
102+
? shardsRequest.getIndices()
103+
: paginationStrategy.getRequestedIndices().toArray(new String[0]);
104+
catShardsResponse.setNodes(clusterStateResponse.getState().getNodes());
105+
catShardsResponse.setResponseShards(
106+
Objects.isNull(paginationStrategy)
107+
? clusterStateResponse.getState().routingTable().allShards()
108+
: paginationStrategy.getRequestedEntities()
109+
);
110+
catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken());
111+
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
112+
indicesStatsRequest.setShouldCancelOnTimeout(true);
113+
indicesStatsRequest.all();
114+
indicesStatsRequest.indices(indices);
115+
indicesStatsRequest.setParentTask(client.getLocalNodeId(), parentTask.getId());
97116
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
98117
@Override
99118
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
@@ -122,6 +141,10 @@ public void onFailure(Exception e) {
122141

123142
}
124143

144+
private ShardPaginationStrategy getPaginationStrategy(PageParams pageParams, ClusterStateResponse clusterStateResponse) {
145+
return Objects.isNull(pageParams) ? null : new ShardPaginationStrategy(pageParams, clusterStateResponse.getState());
146+
}
147+
125148
private void validateRequestLimit(
126149
final CatShardsRequest shardsRequest,
127150
final ClusterStateResponse clusterStateResponse,

server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class IndicesStatsResponse extends BroadcastResponse {
6464

6565
private Map<ShardRouting, ShardStats> shardStatsMap;
6666

67-
IndicesStatsResponse(StreamInput in) throws IOException {
67+
public IndicesStatsResponse(StreamInput in) throws IOException {
6868
super(in);
6969
shards = in.readArray(ShardStats::new, (size) -> new ShardStats[size]);
7070
}

0 commit comments

Comments
 (0)