Skip to content

Commit 54c7a85

Browse files
deshsiddansjcyjed326
committed
Create listener to refresh search thread resource usage (opensearch-project#14832)
* [bug fix] fix incorrect coordinator node search resource usages Signed-off-by: Chenyang Ji <[email protected]> * fix bug on serialization when passing task resource usage to coordinator Signed-off-by: Chenyang Ji <[email protected]> * add more unit tests Signed-off-by: Chenyang Ji <[email protected]> * remove query insights plugin related code Signed-off-by: Chenyang Ji <[email protected]> * create per request listener to refresh task resource usage Signed-off-by: Chenyang Ji <[email protected]> * Make new listener API public Signed-off-by: Siddhant Deshmukh <[email protected]> * Add changelog Signed-off-by: Siddhant Deshmukh <[email protected]> * Remove wrong files added Signed-off-by: Siddhant Deshmukh <[email protected]> * Address review comments Signed-off-by: Siddhant Deshmukh <[email protected]> * Build fix Signed-off-by: Siddhant Deshmukh <[email protected]> * Make singleton Signed-off-by: Siddhant Deshmukh <[email protected]> * Address review comments Signed-off-by: Siddhant Deshmukh <[email protected]> * Make sure listener runs before plugin listeners Signed-off-by: Siddhant Deshmukh <[email protected]> * Spotless Signed-off-by: Siddhant Deshmukh <[email protected]> * Minor fix Signed-off-by: Siddhant Deshmukh <[email protected]> --------- Signed-off-by: Chenyang Ji <[email protected]> Signed-off-by: Siddhant Deshmukh <[email protected]> Signed-off-by: Jay Deng <[email protected]> Co-authored-by: Chenyang Ji <[email protected]> Co-authored-by: Jay Deng <[email protected]> (cherry picked from commit 8ff3bcc)
1 parent e87bb18 commit 54c7a85

File tree

3 files changed

+42
-7
lines changed

3 files changed

+42
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
2525
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
2626
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
27+
- Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832))
2728

2829
### Dependencies
2930
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.tasks.TaskResourceTrackingService;
12+
13+
/**
14+
* SearchTaskRequestOperationsListener subscriber for operations on search tasks resource usages.
15+
* Listener ensures to refreshResourceStats on request end capturing the search task resource usage
16+
* upon request completion.
17+
*
18+
*/
19+
public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener {
20+
private final TaskResourceTrackingService taskResourceTrackingService;
21+
22+
public SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) {
23+
this.taskResourceTrackingService = taskResourceTrackingService;
24+
}
25+
26+
@Override
27+
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
28+
taskResourceTrackingService.refreshResourceStats(context.getTask());
29+
}
30+
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.opensearch.action.search.SearchRequestOperationsListener;
5151
import org.opensearch.action.search.SearchRequestSlowLog;
5252
import org.opensearch.action.search.SearchRequestStats;
53+
import org.opensearch.action.search.SearchTaskRequestOperationsListener;
5354
import org.opensearch.action.search.SearchTransportService;
5455
import org.opensearch.action.support.TransportAction;
5556
import org.opensearch.action.update.UpdateHelper;
@@ -795,8 +796,17 @@ protected Node(
795796
threadPool
796797
);
797798

799+
final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
800+
settings,
801+
clusterService.getClusterSettings(),
802+
threadPool
803+
);
804+
798805
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings());
799806
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);
807+
final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener(
808+
taskResourceTrackingService
809+
);
800810

801811
remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
802812
CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings);
@@ -898,7 +908,7 @@ protected Node(
898908
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
899909
new SearchRequestOperationsCompositeListenerFactory(
900910
Stream.concat(
901-
Stream.of(searchRequestStats, searchRequestSlowLog),
911+
Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener),
902912
pluginComponents.stream()
903913
.filter(p -> p instanceof SearchRequestOperationsListener)
904914
.map(p -> (SearchRequestOperationsListener) p)
@@ -1026,12 +1036,6 @@ protected Node(
10261036
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
10271037
clusterService.setIndexingPressureService(indexingPressureService);
10281038

1029-
final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
1030-
settings,
1031-
clusterService.getClusterSettings(),
1032-
threadPool
1033-
);
1034-
10351039
final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
10361040
settings,
10371041
clusterService.getClusterSettings()

0 commit comments

Comments
 (0)