|
16 | 16 | import io.netty.bootstrap.Bootstrap; |
17 | 17 | import io.netty.buffer.ByteBufAllocator; |
18 | 18 | import io.netty.channel.*; |
| 19 | +import io.netty.channel.epoll.EpollEventLoopGroup; |
19 | 20 | import io.netty.channel.group.ChannelGroup; |
20 | 21 | import io.netty.channel.group.DefaultChannelGroup; |
| 22 | +import io.netty.channel.kqueue.KQueueEventLoopGroup; |
21 | 23 | import io.netty.channel.nio.NioEventLoopGroup; |
22 | | -import io.netty.channel.oio.OioEventLoopGroup; |
23 | 24 | import io.netty.handler.codec.http.HttpClientCodec; |
24 | 25 | import io.netty.handler.codec.http.HttpContentDecompressor; |
25 | 26 | import io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder; |
@@ -119,31 +120,31 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) { |
119 | 120 | // check if external EventLoopGroup is defined |
120 | 121 | ThreadFactory threadFactory = config.getThreadFactory() != null ? config.getThreadFactory() : new DefaultThreadFactory(config.getThreadPoolName()); |
121 | 122 | allowReleaseEventLoopGroup = config.getEventLoopGroup() == null; |
122 | | - ChannelFactory<? extends Channel> channelFactory; |
| 123 | + TransportFactory<? extends Channel, ? extends EventLoopGroup> transportFactory; |
123 | 124 | if (allowReleaseEventLoopGroup) { |
124 | 125 | if (config.isUseNativeTransport()) { |
125 | | - eventLoopGroup = newEpollEventLoopGroup(config.getIoThreadsCount(), threadFactory); |
126 | | - channelFactory = getEpollSocketChannelFactory(); |
127 | | - |
| 126 | + transportFactory = getNativeTransportFactory(); |
128 | 127 | } else { |
129 | | - eventLoopGroup = new NioEventLoopGroup(config.getIoThreadsCount(), threadFactory); |
130 | | - channelFactory = NioSocketChannelFactory.INSTANCE; |
| 128 | + transportFactory = NioTransportFactory.INSTANCE; |
131 | 129 | } |
| 130 | + eventLoopGroup = transportFactory.newEventLoopGroup(config.getIoThreadsCount(), threadFactory); |
132 | 131 |
|
133 | 132 | } else { |
134 | 133 | eventLoopGroup = config.getEventLoopGroup(); |
135 | | - if (eventLoopGroup instanceof OioEventLoopGroup) |
136 | | - throw new IllegalArgumentException("Oio is not supported"); |
137 | 134 |
|
138 | 135 | if (eventLoopGroup instanceof NioEventLoopGroup) { |
139 | | - channelFactory = NioSocketChannelFactory.INSTANCE; |
| 136 | + transportFactory = NioTransportFactory.INSTANCE; |
| 137 | + } else if (eventLoopGroup instanceof EpollEventLoopGroup) { |
| 138 | + transportFactory = new EpollTransportFactory(); |
| 139 | + } else if (eventLoopGroup instanceof KQueueEventLoopGroup) { |
| 140 | + transportFactory = new KQueueTransportFactory(); |
140 | 141 | } else { |
141 | | - channelFactory = getEpollSocketChannelFactory(); |
| 142 | + throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName()); |
142 | 143 | } |
143 | 144 | } |
144 | 145 |
|
145 | | - httpBootstrap = newBootstrap(channelFactory, eventLoopGroup, config); |
146 | | - wsBootstrap = newBootstrap(channelFactory, eventLoopGroup, config); |
| 146 | + httpBootstrap = newBootstrap(transportFactory, eventLoopGroup, config); |
| 147 | + wsBootstrap = newBootstrap(transportFactory, eventLoopGroup, config); |
147 | 148 |
|
148 | 149 | // for reactive streams |
149 | 150 | httpBootstrap.option(ChannelOption.AUTO_READ, false); |
@@ -184,21 +185,16 @@ private Bootstrap newBootstrap(ChannelFactory<? extends Channel> channelFactory, |
184 | 185 | return bootstrap; |
185 | 186 | } |
186 | 187 |
|
187 | | - private EventLoopGroup newEpollEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { |
188 | | - try { |
189 | | - Class<?> epollEventLoopGroupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup"); |
190 | | - return (EventLoopGroup) epollEventLoopGroupClass.getConstructor(int.class, ThreadFactory.class).newInstance(ioThreadsCount, threadFactory); |
191 | | - } catch (Exception e) { |
192 | | - throw new IllegalArgumentException(e); |
193 | | - } |
194 | | - } |
195 | | - |
196 | 188 | @SuppressWarnings("unchecked") |
197 | | - private ChannelFactory<? extends Channel> getEpollSocketChannelFactory() { |
| 189 | + private TransportFactory<? extends Channel, ? extends EventLoopGroup> getNativeTransportFactory() { |
198 | 190 | try { |
199 | | - return (ChannelFactory<? extends Channel>) Class.forName("org.asynchttpclient.netty.channel.EpollSocketChannelFactory").newInstance(); |
| 191 | + return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.EpollTransportFactory").newInstance(); |
200 | 192 | } catch (Exception e) { |
201 | | - throw new IllegalArgumentException(e); |
| 193 | + try { |
| 194 | + return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.KQueueTransportFactory").newInstance(); |
| 195 | + } catch (Exception e1) { |
| 196 | + throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available"); |
| 197 | + } |
202 | 198 | } |
203 | 199 | } |
204 | 200 |
|
|
0 commit comments