1616
1717package io .grpc .netty ;
1818
19+ import static com .google .common .base .Preconditions .checkNotNull ;
1920import static io .netty .handler .codec .http2 .Http2CodecUtil .getEmbeddedHttp2Exception ;
2021
2122import com .google .common .annotations .VisibleForTesting ;
2223import com .google .common .base .Preconditions ;
24+ import com .google .common .base .Ticker ;
2325import io .grpc .ChannelLogger ;
2426import io .netty .channel .ChannelHandlerContext ;
2527import io .netty .channel .ChannelPromise ;
@@ -44,6 +46,7 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
4446 private boolean autoTuneFlowControlOn ;
4547 private ChannelHandlerContext ctx ;
4648 private boolean initialWindowSent = false ;
49+ private final Ticker ticker ;
4750
4851 private static final long BDP_MEASUREMENT_PING = 1234 ;
4952
@@ -54,20 +57,22 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
5457 Http2Settings initialSettings ,
5558 ChannelLogger negotiationLogger ,
5659 boolean autoFlowControl ,
57- PingLimiter pingLimiter ) {
60+ PingLimiter pingLimiter ,
61+ Ticker ticker ) {
5862 super (channelUnused , decoder , encoder , initialSettings , negotiationLogger );
5963
6064 // During a graceful shutdown, wait until all streams are closed.
6165 gracefulShutdownTimeoutMillis (GRACEFUL_SHUTDOWN_NO_TIMEOUT );
6266
6367 // Extract the connection window from the settings if it was set.
6468 this .initialConnectionWindow = initialSettings .initialWindowSize () == null ? -1 :
65- initialSettings .initialWindowSize ();
69+ initialSettings .initialWindowSize ();
6670 this .autoTuneFlowControlOn = autoFlowControl ;
6771 if (pingLimiter == null ) {
6872 pingLimiter = new AllowPingLimiter ();
6973 }
7074 this .flowControlPing = new FlowControlPinger (pingLimiter );
75+ this .ticker = checkNotNull (ticker , "ticker" );
7176 }
7277
7378 @ Override
@@ -131,14 +136,17 @@ void setAutoTuneFlowControl(boolean isOn) {
131136 final class FlowControlPinger {
132137
133138 private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024 ;
139+ public static final int MAX_BACKOFF = 10 ;
134140
135141 private final PingLimiter pingLimiter ;
136142 private int pingCount ;
137143 private int pingReturn ;
138144 private boolean pinging ;
139145 private int dataSizeSincePing ;
140- private float lastBandwidth ; // bytes per second
146+ private long lastBandwidth ; // bytes per nanosecond
141147 private long lastPingTime ;
148+ private int lastTargetWindow ;
149+ private int pingFrequencyMultiplier ;
142150
143151 public FlowControlPinger (PingLimiter pingLimiter ) {
144152 Preconditions .checkNotNull (pingLimiter , "pingLimiter" );
@@ -157,10 +165,24 @@ public void onDataRead(int dataLength, int paddingLength) {
157165 if (!autoTuneFlowControlOn ) {
158166 return ;
159167 }
160- if (!isPinging () && pingLimiter .isPingAllowed ()) {
168+
169+ // Note that we are double counting around the ping initiation as the current data will be
170+ // added at the end of this method, so will be available in the next check. This at worst
171+ // causes us to send a ping one data packet earlier, but makes startup faster if there are
172+ // small packets before big ones.
173+ int dataForCheck = getDataSincePing () + dataLength + paddingLength ;
174+ // Need to double the data here to account for targetWindow being set to twice the data below
175+ if (!isPinging () && pingLimiter .isPingAllowed ()
176+ && dataForCheck * 2 >= lastTargetWindow * pingFrequencyMultiplier ) {
161177 setPinging (true );
162178 sendPing (ctx ());
163179 }
180+
181+ if (lastTargetWindow == 0 ) {
182+ lastTargetWindow =
183+ decoder ().flowController ().initialWindowSize (connection ().connectionStream ());
184+ }
185+
164186 incrementDataSincePing (dataLength + paddingLength );
165187 }
166188
@@ -169,25 +191,32 @@ public void updateWindow() throws Http2Exception {
169191 return ;
170192 }
171193 pingReturn ++;
172- long elapsedTime = (System .nanoTime () - lastPingTime );
194+ setPinging (false );
195+
196+ long elapsedTime = (ticker .read () - lastPingTime );
173197 if (elapsedTime == 0 ) {
174198 elapsedTime = 1 ;
175199 }
200+
176201 long bandwidth = (getDataSincePing () * TimeUnit .SECONDS .toNanos (1 )) / elapsedTime ;
177- Http2LocalFlowController fc = decoder ().flowController ();
178202 // Calculate new window size by doubling the observed BDP, but cap at max window
179203 int targetWindow = Math .min (getDataSincePing () * 2 , MAX_WINDOW_SIZE );
180- setPinging ( false );
204+ Http2LocalFlowController fc = decoder (). flowController ( );
181205 int currentWindow = fc .initialWindowSize (connection ().connectionStream ());
182- if (targetWindow > currentWindow && bandwidth > lastBandwidth ) {
183- lastBandwidth = bandwidth ;
184- int increase = targetWindow - currentWindow ;
185- fc .incrementWindowSize (connection ().connectionStream (), increase );
186- fc .initialWindowSize (targetWindow );
187- Http2Settings settings = new Http2Settings ();
188- settings .initialWindowSize (targetWindow );
189- frameWriter ().writeSettings (ctx (), settings , ctx ().newPromise ());
206+ if (bandwidth <= lastBandwidth || targetWindow <= currentWindow ) {
207+ pingFrequencyMultiplier = Math .min (pingFrequencyMultiplier + 1 , MAX_BACKOFF );
208+ return ;
190209 }
210+
211+ pingFrequencyMultiplier = 0 ; // react quickly when size is changing
212+ lastBandwidth = bandwidth ;
213+ lastTargetWindow = targetWindow ;
214+ int increase = targetWindow - currentWindow ;
215+ fc .incrementWindowSize (connection ().connectionStream (), increase );
216+ fc .initialWindowSize (targetWindow );
217+ Http2Settings settings = new Http2Settings ();
218+ settings .initialWindowSize (targetWindow );
219+ frameWriter ().writeSettings (ctx (), settings , ctx ().newPromise ());
191220 }
192221
193222 private boolean isPinging () {
@@ -200,7 +229,7 @@ private void setPinging(boolean pingOut) {
200229
201230 private void sendPing (ChannelHandlerContext ctx ) {
202231 setDataSizeSincePing (0 );
203- lastPingTime = System . nanoTime ();
232+ lastPingTime = ticker . read ();
204233 encoder ().writePing (ctx , false , BDP_MEASUREMENT_PING , ctx .newPromise ());
205234 pingCount ++;
206235 }
@@ -229,10 +258,12 @@ private void setDataSizeSincePing(int dataSize) {
229258 dataSizeSincePing = dataSize ;
230259 }
231260
261+ // Only used in testing
232262 @ VisibleForTesting
233263 void setDataSizeAndSincePing (int dataSize ) {
234264 setDataSizeSincePing (dataSize );
235- lastPingTime = System .nanoTime () - TimeUnit .SECONDS .toNanos (1 );
265+ pingFrequencyMultiplier = 1 ;
266+ lastPingTime = ticker .read () ;
236267 }
237268 }
238269
0 commit comments