Skip to content

Commit 5f88ec4

Browse files
committed
Remove Cluster Manager Routing
1 parent 7788eb6 commit 5f88ec4

File tree

7 files changed

+65
-107
lines changed

7 files changed

+65
-107
lines changed

server/src/main/java/org/opensearch/action/admin/cluster/cache/PruneCacheAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@
1111
import org.opensearch.action.ActionType;
1212

1313
/**
14-
* Transport action to prune remote file cache
14+
* Transport action to prune file cache
1515
*
1616
* @opensearch.internal
1717
*/
1818
public class PruneCacheAction extends ActionType<PruneCacheResponse> {
1919

2020
public static final PruneCacheAction INSTANCE = new PruneCacheAction();
21-
public static final String NAME = "cluster:admin/cache/remote/prune";
21+
public static final String NAME = "cluster:admin/filecache/prune";
2222

23-
public PruneCacheAction() {
23+
private PruneCacheAction() {
2424
super(NAME, PruneCacheResponse::new);
2525
}
2626
}

server/src/main/java/org/opensearch/action/admin/cluster/cache/PruneCacheRequest.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,24 @@
88

99
package org.opensearch.action.admin.cluster.cache;
1010

11+
import org.opensearch.action.ActionRequest;
1112
import org.opensearch.action.ActionRequestValidationException;
12-
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
13+
import org.opensearch.common.unit.TimeValue;
1314
import org.opensearch.core.common.io.stream.StreamInput;
1415
import org.opensearch.core.common.io.stream.StreamOutput;
1516

1617
import java.io.IOException;
1718

1819
/**
19-
* Request for pruning remote file cache
20+
* Request for pruning file cache.
21+
* Extends ActionRequest for direct node execution.
2022
*
2123
* @opensearch.internal
2224
*/
23-
public class PruneCacheRequest extends ClusterManagerNodeRequest<PruneCacheRequest> {
25+
public class PruneCacheRequest extends ActionRequest {
26+
27+
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(30);
28+
private TimeValue timeout = DEFAULT_TIMEOUT;
2429

2530
/**
2631
* Default constructor
@@ -35,11 +40,37 @@ public PruneCacheRequest() {}
3540
*/
3641
public PruneCacheRequest(StreamInput in) throws IOException {
3742
super(in);
43+
timeout = in.readTimeValue();
3844
}
3945

4046
@Override
4147
public void writeTo(StreamOutput out) throws IOException {
4248
super.writeTo(out);
49+
out.writeTimeValue(timeout);
50+
}
51+
52+
/**
53+
* Sets the timeout for this request.
54+
* Note: Currently preserved for API compatibility but not actively enforced
55+
* in HandledTransportAction implementation.
56+
*
57+
* @param timeout operation timeout (passed through but not enforced by transport layer)
58+
* @return this request
59+
*/
60+
public PruneCacheRequest timeout(TimeValue timeout) {
61+
this.timeout = timeout;
62+
return this;
63+
}
64+
65+
/**
66+
* Gets the timeout for this request.
67+
* Note: Currently preserved for API compatibility but not actively enforced
68+
* in HandledTransportAction implementation.
69+
*
70+
* @return operation timeout (passed through but not enforced by transport layer)
71+
*/
72+
public TimeValue timeout() {
73+
return timeout;
4374
}
4475

4576
@Override

server/src/main/java/org/opensearch/action/admin/cluster/cache/TransportPruneCacheAction.java

Lines changed: 13 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -9,80 +9,47 @@
99
package org.opensearch.action.admin.cluster.cache;
1010

1111
import org.opensearch.action.support.ActionFilters;
12-
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
13-
import org.opensearch.cluster.ClusterState;
14-
import org.opensearch.cluster.block.ClusterBlockException;
15-
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
16-
import org.opensearch.cluster.service.ClusterService;
12+
import org.opensearch.action.support.HandledTransportAction;
1713
import org.opensearch.common.Nullable;
1814
import org.opensearch.common.inject.Inject;
1915
import org.opensearch.core.action.ActionListener;
20-
import org.opensearch.core.common.io.stream.StreamInput;
2116
import org.opensearch.index.store.remote.filecache.FileCache;
17+
import org.opensearch.tasks.Task;
2218
import org.opensearch.threadpool.ThreadPool;
2319
import org.opensearch.transport.TransportService;
2420

25-
import java.io.IOException;
26-
2721
/**
28-
* Transport action for pruning remote file cache
22+
* Transport action for pruning file cache.
23+
* Uses HandledTransportAction for direct node execution instead of cluster manager routing.
2924
*
3025
* @opensearch.internal
3126
*/
32-
public class TransportPruneCacheAction extends TransportClusterManagerNodeAction<PruneCacheRequest, PruneCacheResponse> {
27+
public class TransportPruneCacheAction extends HandledTransportAction<PruneCacheRequest, PruneCacheResponse> {
3328

3429
private final FileCache fileCache;
3530

3631
@Inject
37-
public TransportPruneCacheAction(
38-
ThreadPool threadPool,
39-
TransportService transportService,
40-
ClusterService clusterService,
41-
ActionFilters actionFilters,
42-
IndexNameExpressionResolver indexNameExpressionResolver,
43-
@Nullable FileCache fileCache
44-
) {
45-
super(
46-
PruneCacheAction.NAME,
47-
transportService,
48-
clusterService,
49-
threadPool,
50-
actionFilters,
51-
PruneCacheRequest::new,
52-
indexNameExpressionResolver
53-
);
32+
public TransportPruneCacheAction(TransportService transportService, ActionFilters actionFilters, @Nullable FileCache fileCache) {
33+
super(PruneCacheAction.NAME, transportService, actionFilters, PruneCacheRequest::new, ThreadPool.Names.GENERIC);
5434
this.fileCache = fileCache;
5535
}
5636

5737
@Override
58-
protected String executor() {
59-
return ThreadPool.Names.SAME;
60-
}
61-
62-
@Override
63-
protected PruneCacheResponse read(StreamInput in) throws IOException {
64-
return new PruneCacheResponse(in);
65-
}
66-
67-
@Override
68-
protected void clusterManagerOperation(PruneCacheRequest request, ClusterState state, ActionListener<PruneCacheResponse> listener)
69-
throws Exception {
38+
protected void doExecute(Task task, PruneCacheRequest request, ActionListener<PruneCacheResponse> listener) {
7039
try {
7140
if (fileCache == null) {
72-
listener.onResponse(new PruneCacheResponse(true, 0));
41+
listener.onResponse(new PruneCacheResponse(true, 0L));
7342
return;
7443
}
7544

7645
long prunedBytes = fileCache.prune();
7746
listener.onResponse(new PruneCacheResponse(true, prunedBytes));
47+
} catch (RuntimeException e) {
48+
// Catch runtime exceptions from FileCache operations for better error diagnostics
49+
listener.onFailure(e);
7850
} catch (Exception e) {
51+
// Fallback for any other unexpected exceptions
7952
listener.onFailure(e);
8053
}
8154
}
82-
83-
@Override
84-
protected ClusterBlockException checkBlock(PruneCacheRequest request, ClusterState state) {
85-
// Prune cache operation doesn't require any cluster blocks
86-
return null;
87-
}
8855
}

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestPruneCacheAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4949
PruneCacheRequest pruneCacheRequest = new PruneCacheRequest();
5050

5151
// Handle timeout parameter
52-
pruneCacheRequest.clusterManagerNodeTimeout(request.paramAsTime("timeout", pruneCacheRequest.clusterManagerNodeTimeout()));
52+
pruneCacheRequest.timeout(request.paramAsTime("timeout", pruneCacheRequest.timeout()));
5353

5454
// Delegate to Transport Action with standard response handling
5555
return channel -> client.execute(PruneCacheAction.INSTANCE, pruneCacheRequest, new RestToXContentListener<>(channel));

server/src/test/java/org/opensearch/action/admin/cluster/cache/PruneCacheRequestResponseTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class PruneCacheRequestResponseTests extends OpenSearchTestCase {
2828
*/
2929
public void testPruneCacheRequestSerialization() throws IOException {
3030
PruneCacheRequest originalRequest = new PruneCacheRequest();
31-
originalRequest.clusterManagerNodeTimeout(TimeValue.timeValueSeconds(30));
31+
originalRequest.timeout(TimeValue.timeValueSeconds(30));
3232

3333
// Serialize the request
3434
BytesStreamOutput out = new BytesStreamOutput();
@@ -39,7 +39,7 @@ public void testPruneCacheRequestSerialization() throws IOException {
3939
PruneCacheRequest deserializedRequest = new PruneCacheRequest(in);
4040

4141
// Assert that timeout is properly serialized/deserialized
42-
assertEquals(originalRequest.clusterManagerNodeTimeout(), deserializedRequest.clusterManagerNodeTimeout());
42+
assertEquals(originalRequest.timeout(), deserializedRequest.timeout());
4343
}
4444

4545
/**
@@ -58,7 +58,7 @@ public void testPruneCacheRequestDefaultTimeout() throws IOException {
5858
PruneCacheRequest deserializedRequest = new PruneCacheRequest(in);
5959

6060
// Assert that default timeout is maintained
61-
assertEquals(originalRequest.clusterManagerNodeTimeout(), deserializedRequest.clusterManagerNodeTimeout());
61+
assertEquals(originalRequest.timeout(), deserializedRequest.timeout());
6262
}
6363

6464
/**

server/src/test/java/org/opensearch/action/admin/cluster/cache/TransportPruneCacheActionTests.java

Lines changed: 10 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@
99
package org.opensearch.action.admin.cluster.cache;
1010

1111
import org.opensearch.action.support.ActionFilters;
12-
import org.opensearch.cluster.ClusterState;
13-
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
14-
import org.opensearch.cluster.service.ClusterService;
1512
import org.opensearch.core.action.ActionListener;
1613
import org.opensearch.index.store.remote.filecache.FileCache;
1714
import org.opensearch.test.OpenSearchTestCase;
@@ -27,15 +24,13 @@
2724
import static org.mockito.Mockito.when;
2825

2926
/**
30-
* Tests for {@link TransportPruneCacheAction}.
27+
* Tests for {@link TransportPruneCacheAction} using HandledTransportAction pattern.
3128
*/
3229
public class TransportPruneCacheActionTests extends OpenSearchTestCase {
3330

3431
private ThreadPool threadPool;
3532
private TransportService transportService;
36-
private ClusterService clusterService;
3733
private ActionFilters actionFilters;
38-
private IndexNameExpressionResolver indexNameExpressionResolver;
3934
private FileCache fileCache;
4035
private TransportPruneCacheAction action;
4136

@@ -45,19 +40,10 @@ public void setUp() throws Exception {
4540

4641
threadPool = new TestThreadPool("test");
4742
transportService = mock(TransportService.class);
48-
clusterService = mock(ClusterService.class);
4943
actionFilters = mock(ActionFilters.class);
50-
indexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
5144
fileCache = mock(FileCache.class);
5245

53-
action = new TransportPruneCacheAction(
54-
threadPool,
55-
transportService,
56-
clusterService,
57-
actionFilters,
58-
indexNameExpressionResolver,
59-
fileCache
60-
);
46+
action = new TransportPruneCacheAction(transportService, actionFilters, fileCache);
6147
}
6248

6349
@After
@@ -75,7 +61,6 @@ public void testSuccessfulPruneOperation() throws Exception {
7561
when(fileCache.prune()).thenReturn(expectedPrunedBytes);
7662

7763
PruneCacheRequest request = new PruneCacheRequest();
78-
ClusterState clusterState = mock(ClusterState.class);
7964

8065
ActionListener<PruneCacheResponse> listener = new ActionListener<PruneCacheResponse>() {
8166
@Override
@@ -90,7 +75,7 @@ public void onFailure(Exception e) {
9075
}
9176
};
9277

93-
action.clusterManagerOperation(request, clusterState, listener);
78+
action.doExecute(null, request, listener);
9479

9580
verify(fileCache).prune();
9681
}
@@ -101,16 +86,12 @@ public void onFailure(Exception e) {
10186
public void testNullFileCache() throws Exception {
10287
// Create action with null FileCache
10388
TransportPruneCacheAction nullCacheAction = new TransportPruneCacheAction(
104-
threadPool,
10589
transportService,
106-
clusterService,
10790
actionFilters,
108-
indexNameExpressionResolver,
10991
null // null FileCache
11092
);
11193

11294
PruneCacheRequest request = new PruneCacheRequest();
113-
ClusterState clusterState = mock(ClusterState.class);
11495

11596
ActionListener<PruneCacheResponse> listener = new ActionListener<PruneCacheResponse>() {
11697
@Override
@@ -125,7 +106,7 @@ public void onFailure(Exception e) {
125106
}
126107
};
127108

128-
nullCacheAction.clusterManagerOperation(request, clusterState, listener);
109+
nullCacheAction.doExecute(null, request, listener);
129110
}
130111

131112
/**
@@ -137,7 +118,6 @@ public void testPruneException() throws Exception {
137118
when(fileCache.prune()).thenThrow(expectedException);
138119

139120
PruneCacheRequest request = new PruneCacheRequest();
140-
ClusterState clusterState = mock(ClusterState.class);
141121

142122
ActionListener<PruneCacheResponse> listener = new ActionListener<PruneCacheResponse>() {
143123
@Override
@@ -151,36 +131,18 @@ public void onFailure(Exception e) {
151131
}
152132
};
153133

154-
action.clusterManagerOperation(request, clusterState, listener);
134+
action.doExecute(null, request, listener);
155135

156136
verify(fileCache).prune();
157137
}
158138

159-
/**
160-
* Tests that the executor is correctly set to SAME.
161-
*/
162-
public void testExecutor() {
163-
assertEquals("Executor should be SAME for non-blocking operations", ThreadPool.Names.SAME, action.executor());
164-
}
165-
166-
/**
167-
* Tests that checkBlock returns null (no blocking).
168-
*/
169-
public void testCheckBlock() {
170-
PruneCacheRequest request = new PruneCacheRequest();
171-
ClusterState clusterState = mock(ClusterState.class);
172-
173-
assertNull("Prune cache operation should not be blocked", action.checkBlock(request, clusterState));
174-
}
175-
176139
/**
177140
* Tests response deserialization.
178141
*/
179142
public void testResponseDeserialization() throws Exception {
180-
// This tests the read method implementation
181-
// In a real test, you'd create a StreamInput with serialized response data
182-
// For this test, we'll just verify the method exists and can be called
183-
assertNotNull("Action should have read method", action);
143+
// This tests the basic action functionality
144+
// For HandledTransportAction, we verify the action instance is created correctly
145+
assertNotNull("Action should be created successfully", action);
184146
}
185147

186148
/**
@@ -193,7 +155,6 @@ public void testDifferentPruneReturnValues() throws Exception {
193155
when(fileCache.prune()).thenReturn(expectedBytes);
194156

195157
PruneCacheRequest request = new PruneCacheRequest();
196-
ClusterState clusterState = mock(ClusterState.class);
197158

198159
ActionListener<PruneCacheResponse> listener = new ActionListener<PruneCacheResponse>() {
199160
@Override
@@ -208,7 +169,7 @@ public void onFailure(Exception e) {
208169
}
209170
};
210171

211-
action.clusterManagerOperation(request, clusterState, listener);
172+
action.doExecute(null, request, listener);
212173
}
213174
}
214175

@@ -225,7 +186,6 @@ public void testAsyncBehavior() throws Exception {
225186
}).when(fileCache).prune();
226187

227188
PruneCacheRequest request = new PruneCacheRequest();
228-
ClusterState clusterState = mock(ClusterState.class);
229189

230190
final boolean[] callbackInvoked = { false };
231191

@@ -243,7 +203,7 @@ public void onFailure(Exception e) {
243203
}
244204
};
245205

246-
action.clusterManagerOperation(request, clusterState, listener);
206+
action.doExecute(null, request, listener);
247207

248208
assertTrue("Callback should have been invoked", callbackInvoked[0]);
249209
verify(fileCache).prune();

server/src/test/java/org/opensearch/rest/action/admin/cluster/RestPruneCacheActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void testTimeoutParameterHandling() throws Exception {
8282
assertEquals(PruneCacheAction.INSTANCE, actionType);
8383
assertThat(request, instanceOf(PruneCacheRequest.class));
8484
PruneCacheRequest pruneCacheRequest = (PruneCacheRequest) request;
85-
assertEquals(30000, pruneCacheRequest.clusterManagerNodeTimeout().getMillis());
85+
assertEquals(30000, pruneCacheRequest.timeout().getMillis());
8686
return new PruneCacheResponse(true, 1024);
8787
});
8888

0 commit comments

Comments
 (0)