Skip to content

Commit ea17600

Browse files
committed
optimize _cat/nodes api
Signed-off-by: kkewwei <[email protected]>
1 parent b980b12 commit ea17600

File tree

4 files changed

+230
-34
lines changed

4 files changed

+230
-34
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
7979
- Fix NPE in ReplicaShardAllocator ([#14385](https://github.com/opensearch-project/OpenSearch/pull/14385))
8080
- Fix constant_keyword field type used when creating index ([#14807](https://github.com/opensearch-project/OpenSearch/pull/14807))
8181
- Use circuit breaker in InternalHistogram when adding empty buckets ([#14754](https://github.com/opensearch-project/OpenSearch/pull/14754))
82+
- Optimize _cat/nodes api ([#14853](https://github.com/opensearch-project/OpenSearch/pull/14853))
8283

8384
### Security
8485

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http;
10+
11+
import org.apache.hc.core5.http.ParseException;
12+
import org.apache.hc.core5.http.io.entity.EntityUtils;
13+
import org.junit.BeforeClass;
14+
import org.opensearch.client.Request;
15+
import org.opensearch.client.Response;
16+
import org.opensearch.client.ResponseException;
17+
import org.opensearch.client.RestClient;
18+
import org.opensearch.test.OpenSearchIntegTestCase;
19+
20+
import java.io.IOException;
21+
22+
import static org.apache.hc.core5.http.HttpStatus.SC_OK;
23+
import static org.hamcrest.Matchers.containsString;
24+
25+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 5, numClientNodes = 0)
26+
public class HttpCatIT extends HttpSmokeTestCase {
27+
28+
@BeforeClass
29+
public static void doNotSetAvailableProcessors() {
30+
System.setProperty("opensearch.set.netty.runtime.available.processors", "false");
31+
}
32+
33+
public void testdoCatRequest() throws IOException, ParseException {
34+
try (RestClient restClient = getRestClient()) {
35+
int nodesCount = restClient.getNodes().size();
36+
for (int i = 0; i < 20; i++) {
37+
Request nodesRequest = new Request("GET", "/_cat/nodes?timeout=" + randomInt(300) + "ms");
38+
try {
39+
Response response = restClient.performRequest(nodesRequest);
40+
assertEquals(SC_OK, response.getStatusLine().getStatusCode());
41+
String result = EntityUtils.toString(response.getEntity());
42+
String[] NodeInfos = result.split("\n");
43+
assertEquals(nodesCount, NodeInfos.length);
44+
} catch (ResponseException e) {
45+
// it means that it costs too long to get ClusterState from the master.
46+
assertThat(e.getMessage(), containsString("costs too long to get ClusterState from the master"));
47+
}
48+
}
49+
}
50+
}
51+
52+
}
53+

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

Lines changed: 174 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.rest.action.cat;
3434

35+
import org.opensearch.OpenSearchTimeoutException;
36+
import org.opensearch.action.FailedNodeException;
3537
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
3638
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
3739
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
@@ -47,6 +49,8 @@
4749
import org.opensearch.common.Table;
4850
import org.opensearch.common.logging.DeprecationLogger;
4951
import org.opensearch.common.network.NetworkAddress;
52+
import org.opensearch.common.unit.TimeValue;
53+
import org.opensearch.core.action.ActionListener;
5054
import org.opensearch.core.common.Strings;
5155
import org.opensearch.core.common.transport.TransportAddress;
5256
import org.opensearch.core.common.unit.ByteSizeValue;
@@ -68,15 +72,22 @@
6872
import org.opensearch.monitor.os.OsStats;
6973
import org.opensearch.monitor.process.ProcessInfo;
7074
import org.opensearch.monitor.process.ProcessStats;
75+
import org.opensearch.rest.RestChannel;
7176
import org.opensearch.rest.RestRequest;
72-
import org.opensearch.rest.RestResponse;
7377
import org.opensearch.rest.action.RestActionListener;
74-
import org.opensearch.rest.action.RestResponseListener;
7578
import org.opensearch.script.ScriptStats;
7679
import org.opensearch.search.suggest.completion.CompletionStats;
80+
import org.opensearch.threadpool.ThreadPool;
7781

82+
import java.util.ArrayList;
83+
import java.util.Collection;
7884
import java.util.List;
7985
import java.util.Locale;
86+
import java.util.concurrent.ConcurrentHashMap;
87+
import java.util.concurrent.ConcurrentMap;
88+
import java.util.concurrent.CountDownLatch;
89+
import java.util.function.Consumer;
90+
import java.util.function.Supplier;
8091
import java.util.stream.Collectors;
8192

8293
import static java.util.Collections.singletonList;
@@ -88,6 +99,7 @@
8899
* @opensearch.api
89100
*/
90101
public class RestNodesAction extends AbstractCatAction {
102+
public static final long TIMEOUT_THRESHOLD_MILLIS = 5;
91103
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestNodesAction.class);
92104
static final String LOCAL_DEPRECATED_MESSAGE = "Deprecated parameter [local] used. This parameter does not cause this API to act "
93105
+ "locally, and should not be used. It will be unsupported in version 8.0.";
@@ -120,47 +132,175 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
120132
);
121133
parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName());
122134
final boolean fullId = request.paramAsBoolean("full_id", false);
123-
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
135+
ThreadPool threadPool = client.admin().cluster().threadPool();
136+
long beginTime = threadPool.relativeTimeInMillis();
137+
final long timeout = request.hasParam("timeout")
138+
? TimeValue.parseTimeValue(request.param("timeout"), "timeout").millis()
139+
: Long.MAX_VALUE;
140+
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<>(channel) {
124141
@Override
125142
public void processResponse(final ClusterStateResponse clusterStateResponse) {
126-
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
127-
nodesInfoRequest.timeout(request.param("timeout"));
128-
nodesInfoRequest.clear()
129-
.addMetrics(
130-
NodesInfoRequest.Metric.JVM.metricName(),
131-
NodesInfoRequest.Metric.OS.metricName(),
132-
NodesInfoRequest.Metric.PROCESS.metricName(),
133-
NodesInfoRequest.Metric.HTTP.metricName()
143+
long leftTime = timeout - threadPool.relativeTimeInMillis() + beginTime;
144+
if (leftTime < TIMEOUT_THRESHOLD_MILLIS) {
145+
onFailure(
146+
new OpenSearchTimeoutException(
147+
"costs too long to get ClusterState from the master:"
148+
+ clusterStateResponse.getState().nodes().getMasterNode().getName()
149+
)
134150
);
135-
client.admin().cluster().nodesInfo(nodesInfoRequest, new RestActionListener<NodesInfoResponse>(channel) {
136-
@Override
137-
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
138-
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
139-
nodesStatsRequest.timeout(request.param("timeout"));
140-
nodesStatsRequest.clear()
141-
.indices(true)
142-
.addMetrics(
143-
NodesStatsRequest.Metric.JVM.metricName(),
144-
NodesStatsRequest.Metric.OS.metricName(),
145-
NodesStatsRequest.Metric.FS.metricName(),
146-
NodesStatsRequest.Metric.PROCESS.metricName(),
147-
NodesStatsRequest.Metric.SCRIPT.metricName()
151+
return;
152+
}
153+
String[] nodeIds = clusterStateResponse.getState().nodes().resolveNodes(null);
154+
CountDownLatch nodesCount = new CountDownLatch(nodeIds.length);
155+
ConcurrentMap<String, NodeInfo> successNodeInfos = new ConcurrentHashMap<>(nodeIds.length);
156+
ConcurrentMap<String, FailedNodeException> failNodeInfos = new ConcurrentHashMap<>(nodeIds.length);
157+
ConcurrentMap<String, NodeStats> successNodeStats = new ConcurrentHashMap<>(nodeIds.length);
158+
ConcurrentMap<String, FailedNodeException> failNodeStats = new ConcurrentHashMap<>(nodeIds.length);
159+
for (String nodeId : nodeIds) {
160+
NodesInfoRequest nodesInfoRequest = createNodesInfoRequest(timeout, leftTime, nodeId);
161+
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<>() {
162+
@Override
163+
public void onResponse(NodesInfoResponse nodesInfoResponse) {
164+
assert nodesInfoResponse.getNodes().size() + nodesInfoResponse.failures().size() == 1;
165+
NodesStatsRequest nodesStatsRequest = checkAndCreateNodesStatsRequest(
166+
nodesInfoResponse.failures(),
167+
timeout,
168+
beginTime,
169+
nodeId,
170+
this::onFailure,
171+
threadPool::relativeTimeInMillis,
172+
clusterStateResponse.getState().nodes().get(nodeId).getName()
148173
);
149-
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
150-
@Override
151-
public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {
152-
return RestTable.buildResponse(
153-
buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse),
154-
channel
155-
);
174+
if (nodesStatsRequest == null) {
175+
return;
156176
}
157-
});
158-
}
159-
});
177+
successNodeInfos.put(nodeId, nodesInfoResponse.getNodes().get(0));
178+
client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.runAfter(new ActionListener<>() {
179+
@Override
180+
public void onResponse(NodesStatsResponse nodesStatsResponse) {
181+
assert nodesStatsResponse.getNodes().size() + nodesStatsResponse.failures().size() == 1;
182+
if (nodesStatsResponse.getNodes().size() == 1) {
183+
successNodeStats.put(nodeId, nodesStatsResponse.getNodes().get(0));
184+
} else {
185+
failNodeStats.put(nodeId, nodesStatsResponse.failures().get(0));
186+
}
187+
}
188+
189+
@Override
190+
public void onFailure(Exception e) {
191+
assert e instanceof FailedNodeException;
192+
failNodeStats.put(nodeId, (FailedNodeException) e);
193+
}
194+
}, nodesCount::countDown));
195+
}
196+
197+
@Override
198+
public void onFailure(Exception e) {
199+
assert e instanceof FailedNodeException;
200+
failNodeInfos.put(nodeId, (FailedNodeException) e);
201+
nodesCount.countDown();
202+
}
203+
});
204+
}
205+
206+
try {
207+
nodesCount.await();
208+
sendResponse(
209+
channel,
210+
clusterStateResponse,
211+
request,
212+
fullId,
213+
successNodeInfos.values(),
214+
failNodeInfos.values(),
215+
successNodeStats.values(),
216+
failNodeStats.values()
217+
);
218+
} catch (Exception e) {
219+
e.addSuppressed(e);
220+
logger.error("failed to send failure response", e);
221+
}
160222
}
161223
});
162224
}
163225

226+
private NodesInfoRequest createNodesInfoRequest(long timeout, long leftTime, String nodeId) {
227+
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
228+
if (timeout != Long.MAX_VALUE) {
229+
nodesInfoRequest.timeout(TimeValue.timeValueMillis(leftTime));
230+
}
231+
nodesInfoRequest.clear()
232+
.nodesIds(nodeId)
233+
.addMetrics(
234+
NodesInfoRequest.Metric.JVM.metricName(),
235+
NodesInfoRequest.Metric.OS.metricName(),
236+
NodesInfoRequest.Metric.PROCESS.metricName(),
237+
NodesInfoRequest.Metric.HTTP.metricName()
238+
);
239+
return nodesInfoRequest;
240+
}
241+
242+
private NodesStatsRequest checkAndCreateNodesStatsRequest(
243+
List<FailedNodeException> failedNodeExceptions,
244+
long timeout,
245+
long beginTime,
246+
String nodeId,
247+
Consumer<FailedNodeException> failedConsumer,
248+
Supplier<Long> currentTimeSupplier,
249+
String nodeName
250+
) {
251+
if (failedNodeExceptions.isEmpty() == false) {
252+
failedConsumer.accept(failedNodeExceptions.get(0));
253+
return null;
254+
}
255+
long leftTime = timeout - currentTimeSupplier.get() + beginTime;
256+
if (leftTime < TIMEOUT_THRESHOLD_MILLIS) {
257+
failedConsumer.accept(
258+
new FailedNodeException(nodeId, "There is not enough time to obtain nodesInfo metric from " + nodeName, null)
259+
);
260+
return null;
261+
}
262+
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
263+
if (timeout != Long.MAX_VALUE) {
264+
nodesStatsRequest.timeout(TimeValue.timeValueMillis(leftTime));
265+
}
266+
nodesStatsRequest.clear()
267+
.nodesIds(nodeId)
268+
.indices(true)
269+
.addMetrics(
270+
NodesStatsRequest.Metric.JVM.metricName(),
271+
NodesStatsRequest.Metric.OS.metricName(),
272+
NodesStatsRequest.Metric.FS.metricName(),
273+
NodesStatsRequest.Metric.PROCESS.metricName(),
274+
NodesStatsRequest.Metric.SCRIPT.metricName()
275+
);
276+
return nodesStatsRequest;
277+
}
278+
279+
private void sendResponse(
280+
RestChannel channel,
281+
ClusterStateResponse clusterStateResponse,
282+
RestRequest request,
283+
boolean fullId,
284+
Collection<NodeInfo> successNodeInfos,
285+
Collection<FailedNodeException> failNodeInfos,
286+
Collection<NodeStats> successNodeStats,
287+
Collection<FailedNodeException> failNodeStats
288+
) throws Exception {
289+
NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(
290+
clusterStateResponse.getClusterName(),
291+
new ArrayList<>(successNodeInfos),
292+
new ArrayList<>(failNodeInfos)
293+
);
294+
NodesStatsResponse nodesStatsResponse = new NodesStatsResponse(
295+
clusterStateResponse.getClusterName(),
296+
new ArrayList<>(successNodeStats),
297+
new ArrayList<>(failNodeStats)
298+
);
299+
channel.sendResponse(
300+
RestTable.buildResponse(buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse), channel)
301+
);
302+
}
303+
164304
@Override
165305
protected Table getTableWithHeader(final RestRequest request) {
166306
Table table = new Table();

server/src/main/resources/org/opensearch/bootstrap/test-framework.policy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,6 @@ grant {
157157
permission java.lang.RuntimePermission "reflectionFactoryAccess";
158158
permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect";
159159
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
160+
permission java.util.PropertyPermission "opensearch.set.netty.runtime.available.processors", "write";
161+
permission java.net.SocketPermission "*", "accept,connect";
160162
};

0 commit comments

Comments
 (0)