Skip to content

Commit e4e6a46

Browse files
committed
API changes for stream transport
* extensibility for transport classes * StreamTransport and StreamTransportService implementation * streaming based search action
1 parent 57d1d17 commit e4e6a46

File tree

56 files changed

+1464
-69
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1464
-69
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,8 @@
286286
import org.opensearch.action.search.PutSearchPipelineTransportAction;
287287
import org.opensearch.action.search.SearchAction;
288288
import org.opensearch.action.search.SearchScrollAction;
289+
import org.opensearch.action.search.StreamSearchAction;
290+
import org.opensearch.action.search.StreamTransportSearchAction;
289291
import org.opensearch.action.search.TransportClearScrollAction;
290292
import org.opensearch.action.search.TransportCreatePitAction;
291293
import org.opensearch.action.search.TransportDeletePitAction;
@@ -734,6 +736,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
734736
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class);
735737
actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class);
736738
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
739+
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
740+
actions.register(StreamSearchAction.INSTANCE, StreamTransportSearchAction.class);
741+
}
737742
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
738743
actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
739744
actions.register(ExplainAction.INSTANCE, TransportExplainAction.class);

server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public SearchRequestBuilder(OpenSearchClient client, SearchAction action) {
6868
super(client, action, new SearchRequest());
6969
}
7070

71+
public SearchRequestBuilder(OpenSearchClient client, StreamSearchAction action) {
72+
super(client, action, new SearchRequest());
73+
}
74+
7175
/**
7276
* Sets the indices the search will be executed on.
7377
*/

server/src/main/java/org/opensearch/action/search/SearchTransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public class SearchTransportService {
102102
public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]";
103103

104104
private final TransportService transportService;
105-
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
105+
protected final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
106106
private final Map<String, Long> clientConnections = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
107107

108108
public SearchTransportService(
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Transport action for executing a search
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class StreamSearchAction extends ActionType<SearchResponse> {
19+
20+
public static final StreamSearchAction INSTANCE = new StreamSearchAction();
21+
public static final String NAME = "indices:data/read/search/stream";
22+
23+
private StreamSearchAction() {
24+
super(NAME, SearchResponse::new);
25+
}
26+
27+
}
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.action.support.StreamChannelActionListener;
12+
import org.opensearch.core.action.ActionListener;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
16+
import org.opensearch.search.SearchPhaseResult;
17+
import org.opensearch.search.SearchService;
18+
import org.opensearch.search.fetch.FetchSearchResult;
19+
import org.opensearch.search.fetch.QueryFetchSearchResult;
20+
import org.opensearch.search.fetch.ShardFetchSearchRequest;
21+
import org.opensearch.search.internal.ShardSearchRequest;
22+
import org.opensearch.search.query.QuerySearchResult;
23+
import org.opensearch.threadpool.ThreadPool;
24+
import org.opensearch.transport.StreamTransportResponseHandler;
25+
import org.opensearch.transport.StreamTransportService;
26+
import org.opensearch.transport.Transport;
27+
import org.opensearch.transport.TransportException;
28+
import org.opensearch.transport.TransportRequestOptions;
29+
import org.opensearch.transport.stream.StreamTransportResponse;
30+
31+
import java.io.IOException;
32+
import java.util.function.BiFunction;
33+
34+
/**
35+
* Search transport service for streaming search
36+
*/
37+
public class StreamSearchTransportService extends SearchTransportService {
38+
private final StreamTransportService transportService;
39+
40+
public StreamSearchTransportService(
41+
StreamTransportService transportService,
42+
BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper
43+
) {
44+
super(transportService, responseWrapper);
45+
this.transportService = transportService;
46+
}
47+
48+
public static void registerStreamRequestHandler(StreamTransportService transportService, SearchService searchService) {
49+
transportService.registerRequestHandler(
50+
QUERY_ACTION_NAME,
51+
ThreadPool.Names.SAME,
52+
false,
53+
true,
54+
AdmissionControlActionType.SEARCH,
55+
ShardSearchRequest::new,
56+
(request, channel, task) -> {
57+
searchService.executeQueryPhase(
58+
request,
59+
false,
60+
(SearchShardTask) task,
61+
new StreamChannelActionListener<>(channel, QUERY_ACTION_NAME, request)
62+
);
63+
}
64+
);
65+
transportService.registerRequestHandler(
66+
FETCH_ID_ACTION_NAME,
67+
ThreadPool.Names.SAME,
68+
true,
69+
true,
70+
AdmissionControlActionType.SEARCH,
71+
ShardFetchSearchRequest::new,
72+
(request, channel, task) -> {
73+
searchService.executeFetchPhase(
74+
request,
75+
(SearchShardTask) task,
76+
new StreamChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request)
77+
);
78+
}
79+
);
80+
transportService.registerRequestHandler(
81+
QUERY_CAN_MATCH_NAME,
82+
ThreadPool.Names.SAME,
83+
ShardSearchRequest::new,
84+
(request, channel, task) -> {
85+
searchService.canMatch(request, new StreamChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request));
86+
}
87+
);
88+
}
89+
90+
@Override
91+
public void sendExecuteQuery(
92+
Transport.Connection connection,
93+
final ShardSearchRequest request,
94+
SearchTask task,
95+
final SearchActionListener<SearchPhaseResult> listener
96+
) {
97+
final boolean fetchDocuments = request.numberOfShards() == 1;
98+
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
99+
100+
StreamTransportResponseHandler<SearchPhaseResult> transportHandler = new StreamTransportResponseHandler<SearchPhaseResult>() {
101+
@Override
102+
public void handleStreamResponse(StreamTransportResponse<SearchPhaseResult> response) {
103+
try {
104+
SearchPhaseResult result = response.nextResponse();
105+
listener.onResponse(result);
106+
} catch (Exception e) {
107+
response.cancel("Client error during search phase", e);
108+
listener.onFailure(e);
109+
}
110+
}
111+
112+
@Override
113+
public void handleException(TransportException e) {
114+
listener.onFailure(e);
115+
}
116+
117+
@Override
118+
public String executor() {
119+
return ThreadPool.Names.STREAM_SEARCH;
120+
}
121+
122+
@Override
123+
public SearchPhaseResult read(StreamInput in) throws IOException {
124+
return reader.read(in);
125+
}
126+
};
127+
128+
transportService.sendChildRequest(
129+
connection,
130+
QUERY_ACTION_NAME,
131+
request,
132+
task,
133+
transportHandler // TODO: wrap with ConnectionCountingHandler
134+
);
135+
}
136+
137+
@Override
138+
public void sendExecuteFetch(
139+
Transport.Connection connection,
140+
final ShardFetchSearchRequest request,
141+
SearchTask task,
142+
final SearchActionListener<FetchSearchResult> listener
143+
) {
144+
StreamTransportResponseHandler<FetchSearchResult> transportHandler = new StreamTransportResponseHandler<FetchSearchResult>() {
145+
@Override
146+
public void handleStreamResponse(StreamTransportResponse<FetchSearchResult> response) {
147+
try {
148+
FetchSearchResult result = response.nextResponse();
149+
listener.onResponse(result);
150+
} catch (Exception e) {
151+
response.cancel("Client error during fetch phase", e);
152+
listener.onFailure(e);
153+
}
154+
}
155+
156+
@Override
157+
public void handleException(TransportException exp) {
158+
listener.onFailure(exp);
159+
}
160+
161+
@Override
162+
public String executor() {
163+
return ThreadPool.Names.STREAM_SEARCH;
164+
}
165+
166+
@Override
167+
public FetchSearchResult read(StreamInput in) throws IOException {
168+
return new FetchSearchResult(in);
169+
}
170+
};
171+
transportService.sendChildRequest(connection, FETCH_ID_ACTION_NAME, request, task, transportHandler);
172+
}
173+
174+
@Override
175+
public void sendCanMatch(
176+
Transport.Connection connection,
177+
final ShardSearchRequest request,
178+
SearchTask task,
179+
final ActionListener<SearchService.CanMatchResponse> listener
180+
) {
181+
StreamTransportResponseHandler<SearchService.CanMatchResponse> transportHandler = new StreamTransportResponseHandler<
182+
SearchService.CanMatchResponse>() {
183+
@Override
184+
public void handleStreamResponse(StreamTransportResponse<SearchService.CanMatchResponse> response) {
185+
try {
186+
SearchService.CanMatchResponse result = response.nextResponse();
187+
if (response.nextResponse() != null) {
188+
throw new IllegalStateException("Only one response expected from SearchService.CanMatchResponse");
189+
}
190+
listener.onResponse(result);
191+
} catch (Exception e) {
192+
response.cancel("Client error during can match", e);
193+
listener.onFailure(e);
194+
}
195+
}
196+
197+
@Override
198+
public void handleException(TransportException exp) {
199+
listener.onFailure(exp);
200+
}
201+
202+
@Override
203+
public String executor() {
204+
return ThreadPool.Names.SAME;
205+
}
206+
207+
@Override
208+
public SearchService.CanMatchResponse read(StreamInput in) throws IOException {
209+
return new SearchService.CanMatchResponse(in);
210+
}
211+
};
212+
213+
transportService.sendChildRequest(
214+
connection,
215+
QUERY_CAN_MATCH_NAME,
216+
request,
217+
task,
218+
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(),
219+
transportHandler
220+
);
221+
}
222+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.action.support.ActionFilters;
12+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
13+
import org.opensearch.cluster.service.ClusterService;
14+
import org.opensearch.common.Nullable;
15+
import org.opensearch.common.inject.Inject;
16+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
17+
import org.opensearch.core.indices.breaker.CircuitBreakerService;
18+
import org.opensearch.search.SearchService;
19+
import org.opensearch.search.pipeline.SearchPipelineService;
20+
import org.opensearch.tasks.TaskResourceTrackingService;
21+
import org.opensearch.telemetry.metrics.MetricsRegistry;
22+
import org.opensearch.telemetry.tracing.Tracer;
23+
import org.opensearch.threadpool.ThreadPool;
24+
import org.opensearch.transport.StreamTransportService;
25+
import org.opensearch.transport.client.node.NodeClient;
26+
27+
/**
28+
* Transport search action for streaming search
29+
* @opensearch.internal
30+
*/
31+
public class StreamTransportSearchAction extends TransportSearchAction {
32+
@Inject
33+
public StreamTransportSearchAction(
34+
NodeClient client,
35+
ThreadPool threadPool,
36+
CircuitBreakerService circuitBreakerService,
37+
@Nullable StreamTransportService transportService,
38+
SearchService searchService,
39+
@Nullable StreamSearchTransportService searchTransportService,
40+
SearchPhaseController searchPhaseController,
41+
ClusterService clusterService,
42+
ActionFilters actionFilters,
43+
IndexNameExpressionResolver indexNameExpressionResolver,
44+
NamedWriteableRegistry namedWriteableRegistry,
45+
SearchPipelineService searchPipelineService,
46+
MetricsRegistry metricsRegistry,
47+
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory,
48+
Tracer tracer,
49+
TaskResourceTrackingService taskResourceTrackingService
50+
) {
51+
super(
52+
client,
53+
threadPool,
54+
circuitBreakerService,
55+
transportService,
56+
searchService,
57+
searchTransportService,
58+
searchPhaseController,
59+
clusterService,
60+
actionFilters,
61+
indexNameExpressionResolver,
62+
namedWriteableRegistry,
63+
searchPipelineService,
64+
metricsRegistry,
65+
searchRequestOperationsCompositeListenerFactory,
66+
tracer,
67+
taskResourceTrackingService
68+
);
69+
}
70+
}

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.opensearch.transport.RemoteClusterAware;
9898
import org.opensearch.transport.RemoteClusterService;
9999
import org.opensearch.transport.RemoteTransportException;
100+
import org.opensearch.transport.StreamTransportService;
100101
import org.opensearch.transport.Transport;
101102
import org.opensearch.transport.TransportService;
102103
import org.opensearch.transport.client.Client;
@@ -207,7 +208,11 @@ public TransportSearchAction(
207208
this.searchPhaseController = searchPhaseController;
208209
this.searchTransportService = searchTransportService;
209210
this.remoteClusterService = searchTransportService.getRemoteClusterService();
210-
SearchTransportService.registerRequestHandler(transportService, searchService);
211+
if (transportService instanceof StreamTransportService) {
212+
StreamSearchTransportService.registerStreamRequestHandler((StreamTransportService) transportService, searchService);
213+
} else {
214+
SearchTransportService.registerRequestHandler(transportService, searchService);
215+
}
211216
this.clusterService = clusterService;
212217
this.searchService = searchService;
213218
this.indexNameExpressionResolver = indexNameExpressionResolver;

0 commit comments

Comments
 (0)