4848import org .opensearch .http .AbstractHttpServerTransport ;
4949import org .opensearch .http .HttpChannel ;
5050import org .opensearch .http .HttpServerChannel ;
51+ import org .opensearch .http .HttpServerTransport ;
5152import org .opensearch .http .nio .ssl .SslUtils ;
5253import org .opensearch .nio .BytesChannelContext ;
5354import org .opensearch .nio .ChannelFactory ;
6162import org .opensearch .plugins .SecureHttpTransportSettingsProvider ;
6263import org .opensearch .telemetry .tracing .Tracer ;
6364import org .opensearch .threadpool .ThreadPool ;
65+ import org .opensearch .transport .TransportAdapterProvider ;
6466import org .opensearch .transport .nio .NioGroupFactory ;
6567import org .opensearch .transport .nio .PageAllocator ;
6668
7173import java .net .InetSocketAddress ;
7274import java .nio .channels .ServerSocketChannel ;
7375import java .nio .channels .SocketChannel ;
76+ import java .util .Collections ;
77+ import java .util .List ;
78+ import java .util .Optional ;
7479import java .util .function .Consumer ;
80+ import java .util .stream .Collectors ;
81+
82+ import io .netty .channel .ChannelInboundHandlerAdapter ;
83+ import io .netty .handler .codec .http .HttpContentDecompressor ;
7584
7685import static org .opensearch .http .HttpTransportSettings .SETTING_HTTP_MAX_CHUNK_SIZE ;
7786import static org .opensearch .http .HttpTransportSettings .SETTING_HTTP_MAX_HEADER_SIZE ;
8998public class NioHttpServerTransport extends AbstractHttpServerTransport {
9099 private static final Logger logger = LogManager .getLogger (NioHttpServerTransport .class );
91100
101+ public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider .REQUEST_HEADER_VERIFIER ;
102+ public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider .REQUEST_DECOMPRESSOR ;
103+
92104 protected final PageAllocator pageAllocator ;
93105 private final NioGroupFactory nioGroupFactory ;
94106
@@ -224,6 +236,8 @@ protected void acceptChannel(NioSocketChannel socketChannel) {
224236
225237 private class HttpChannelFactory extends ChannelFactory <NioHttpServerChannel , NioHttpChannel > {
226238 private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider ;
239+ private final ChannelInboundHandlerAdapter headerVerifier ;
240+ private final TransportAdapterProvider <HttpServerTransport > decompressorProvider ;
227241
228242 private HttpChannelFactory (@ Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider ) {
229243 super (
@@ -237,6 +251,63 @@ private HttpChannelFactory(@Nullable SecureHttpTransportSettingsProvider secureH
237251 tcpReceiveBufferSize
238252 );
239253 this .secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider ;
254+
255+ final List <ChannelInboundHandlerAdapter > headerVerifiers = getHeaderVerifiers (secureHttpTransportSettingsProvider );
256+ final Optional <TransportAdapterProvider <HttpServerTransport >> decompressorProviderOpt = getDecompressorProvider (
257+ secureHttpTransportSettingsProvider
258+ );
259+
260+ // There could be multiple request decompressor providers configured, using the first one
261+ decompressorProviderOpt .ifPresent (p -> logger .debug ("Using request decompressor provider: {}" , p ));
262+
263+ if (headerVerifiers .size () > 1 ) {
264+ throw new IllegalArgumentException (
265+ "Cannot have more than one header verifier configured, supplied " + headerVerifiers .size ()
266+ );
267+ }
268+
269+ this .headerVerifier = headerVerifiers .isEmpty () ? null : headerVerifiers .get (0 );
270+ this .decompressorProvider = decompressorProviderOpt .orElseGet (() -> new TransportAdapterProvider <HttpServerTransport >() {
271+ @ Override
272+ public String name () {
273+ return REQUEST_DECOMPRESSOR ;
274+ }
275+
276+ @ Override
277+ public <C > Optional <C > create (Settings settings , HttpServerTransport transport , Class <C > adapterClass ) {
278+ return Optional .empty ();
279+ }
280+ });
281+
282+ }
283+
284+ private List <ChannelInboundHandlerAdapter > getHeaderVerifiers (
285+ @ Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider
286+ ) {
287+ if (secureHttpTransportSettingsProvider == null ) {
288+ return Collections .emptyList ();
289+ }
290+
291+ return secureHttpTransportSettingsProvider .getHttpTransportAdapterProviders (settings )
292+ .stream ()
293+ .filter (p -> REQUEST_HEADER_VERIFIER .equalsIgnoreCase (p .name ()))
294+ .map (p -> p .create (settings , NioHttpServerTransport .this , ChannelInboundHandlerAdapter .class ))
295+ .filter (Optional ::isPresent )
296+ .map (Optional ::get )
297+ .collect (Collectors .toList ());
298+ }
299+
300+ private Optional <TransportAdapterProvider <HttpServerTransport >> getDecompressorProvider (
301+ @ Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider
302+ ) {
303+ if (secureHttpTransportSettingsProvider == null ) {
304+ return Optional .empty ();
305+ }
306+
307+ return secureHttpTransportSettingsProvider .getHttpTransportAdapterProviders (settings )
308+ .stream ()
309+ .filter (p -> REQUEST_DECOMPRESSOR .equalsIgnoreCase (p .name ()))
310+ .findFirst ();
240311 }
241312
242313 @ Override
@@ -254,6 +325,9 @@ public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel,
254325 handlingSettings ,
255326 selector .getTaskScheduler (),
256327 threadPool ::relativeTimeInMillis ,
328+ headerVerifier ,
329+ decompressorProvider .create (settings , NioHttpServerTransport .this , ChannelInboundHandlerAdapter .class )
330+ .orElseGet (HttpContentDecompressor ::new ),
257331 engine
258332 );
259333 Consumer <Exception > exceptionHandler = (e ) -> onException (httpChannel , e );
0 commit comments