Skip to content

Commit d633566

Browse files
Deprecate TTFB, RESP_TIMEOUT, introduce MAX_CONCURRENT_REQUESTS (Consensys#8839)
1 parent dbed1e9 commit d633566

File tree

13 files changed

+124
-205
lines changed

13 files changed

+124
-205
lines changed

ethereum/spec/src/main/java/tech/pegasys/teku/spec/constants/NetworkConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,7 @@ public class NetworkConstants {
2020
public static final int DEFAULT_SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY = 128;
2121

2222
public static final int NODE_ID_BITS = 256;
23+
24+
// https://github.com/ethereum/consensus-specs/pull/3767
25+
public static final int MAX_CONCURRENT_REQUESTS = 2;
2326
}

infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public class ThrottlingTaskQueue {
2828

2929
private int inflightTaskCount = 0;
3030

31+
public static ThrottlingTaskQueue create(final int maximumConcurrentTasks) {
32+
return new ThrottlingTaskQueue(maximumConcurrentTasks);
33+
}
34+
3135
public static ThrottlingTaskQueue create(
3236
final int maximumConcurrentTasks,
3337
final MetricsSystem metricsSystem,

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import tech.pegasys.teku.networking.p2p.peer.Peer;
4242
import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber;
4343
import tech.pegasys.teku.spec.Spec;
44-
import tech.pegasys.teku.spec.config.SpecConfig;
4544
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage;
4645
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessageSchema;
4746
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
@@ -51,6 +50,8 @@
5150
public class Eth2PeerManager implements PeerLookup, PeerHandler {
5251
private static final Logger LOG = LogManager.getLogger();
5352

53+
private static final Duration STATUS_RECEIVED_TIMEOUT = Duration.ofSeconds(10);
54+
5455
private final AsyncRunner asyncRunner;
5556
private final RecentChainData recentChainData;
5657
private final Eth2PeerFactory eth2PeerFactory;
@@ -66,7 +67,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler {
6667
private final int eth2RpcOutstandingPingThreshold;
6768

6869
private final Duration eth2StatusUpdateInterval;
69-
private final SpecConfig specConfig;
7070

7171
Eth2PeerManager(
7272
final Spec spec,
@@ -99,7 +99,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler {
9999
this.eth2RpcPingInterval = eth2RpcPingInterval;
100100
this.eth2RpcOutstandingPingThreshold = eth2RpcOutstandingPingThreshold;
101101
this.eth2StatusUpdateInterval = eth2StatusUpdateInterval;
102-
this.specConfig = spec.getGenesisSpecConfig();
103102
}
104103

105104
public static Eth2PeerManager create(
@@ -237,7 +236,7 @@ private void ensureStatusReceived(final Eth2Peer peer) {
237236
.ifExceptionGetsHereRaiseABug();
238237
}
239238
},
240-
Duration.ofSeconds(specConfig.getRespTimeout()))
239+
STATUS_RECEIVED_TIMEOUT)
241240
.finish(
242241
() -> {},
243242
error -> {

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ public static BeaconChainMethods create(
115115
final MetadataMessagesFactory metadataMessagesFactory,
116116
final RpcEncoding rpcEncoding) {
117117
return new BeaconChainMethods(
118-
createStatus(spec, asyncRunner, statusMessageFactory, peerLookup, rpcEncoding),
119-
createGoodBye(spec, asyncRunner, metricsSystem, peerLookup, rpcEncoding),
118+
createStatus(asyncRunner, statusMessageFactory, peerLookup, rpcEncoding),
119+
createGoodBye(asyncRunner, metricsSystem, peerLookup, rpcEncoding),
120120
createBeaconBlocksByRoot(
121121
spec, metricsSystem, asyncRunner, recentChainData, peerLookup, rpcEncoding),
122122
createBeaconBlocksByRange(
@@ -144,11 +144,10 @@ public static BeaconChainMethods create(
144144
rpcEncoding,
145145
recentChainData),
146146
createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding),
147-
createPing(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
147+
createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
148148
}
149149

150150
private static Eth2RpcMethod<StatusMessage, StatusMessage> createStatus(
151-
final Spec spec,
152151
final AsyncRunner asyncRunner,
153152
final StatusMessageFactory statusMessageFactory,
154153
final PeerLookup peerLookup,
@@ -165,12 +164,10 @@ private static Eth2RpcMethod<StatusMessage, StatusMessage> createStatus(
165164
true,
166165
contextCodec,
167166
statusHandler,
168-
peerLookup,
169-
spec.getNetworkingConfig());
167+
peerLookup);
170168
}
171169

172170
private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
173-
final Spec spec,
174171
final AsyncRunner asyncRunner,
175172
final MetricsSystem metricsSystem,
176173
final PeerLookup peerLookup,
@@ -187,8 +184,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
187184
false,
188185
contextCodec,
189186
goodbyeHandler,
190-
peerLookup,
191-
spec.getNetworkingConfig());
187+
peerLookup);
192188
}
193189

194190
private static Eth2RpcMethod<BeaconBlocksByRootRequestMessage, SignedBeaconBlock>
@@ -221,8 +217,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
221217
expectResponseToRequest,
222218
forkDigestContextCodec,
223219
beaconBlocksByRootHandler,
224-
peerLookup,
225-
spec.getNetworkingConfig());
220+
peerLookup);
226221

227222
return VersionedEth2RpcMethod.create(
228223
rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method));
@@ -259,8 +254,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
259254
expectResponseToRequest,
260255
forkDigestContextCodec,
261256
beaconBlocksByRangeHandler,
262-
peerLookup,
263-
spec.getNetworkingConfig());
257+
peerLookup);
264258

265259
return VersionedEth2RpcMethod.create(
266260
rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method));
@@ -299,8 +293,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
299293
true,
300294
forkDigestContextCodec,
301295
blobSidecarsByRootHandler,
302-
peerLookup,
303-
spec.getNetworkingConfig()));
296+
peerLookup));
304297
}
305298

306299
private static Optional<Eth2RpcMethod<BlobSidecarsByRangeRequestMessage, BlobSidecar>>
@@ -336,8 +329,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
336329
true,
337330
forkDigestContextCodec,
338331
blobSidecarsByRangeHandler,
339-
peerLookup,
340-
spec.getNetworkingConfig()));
332+
peerLookup));
341333
}
342334

343335
private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
@@ -369,8 +361,7 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
369361
expectResponse,
370362
phase0ContextCodec,
371363
messageHandler,
372-
peerLookup,
373-
spec.getNetworkingConfig());
364+
peerLookup);
374365

375366
if (spec.isMilestoneSupported(SpecMilestone.ALTAIR)) {
376367
final SszSchema<MetadataMessage> altairMetadataSchema =
@@ -392,8 +383,7 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
392383
expectResponse,
393384
altairContextCodec,
394385
messageHandler,
395-
peerLookup,
396-
spec.getNetworkingConfig());
386+
peerLookup);
397387
return VersionedEth2RpcMethod.create(
398388
rpcEncoding, requestType, expectResponse, List.of(v2Method, v1Method));
399389
} else {
@@ -402,7 +392,6 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
402392
}
403393

404394
private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
405-
final Spec spec,
406395
final AsyncRunner asyncRunner,
407396
final MetadataMessagesFactory metadataMessagesFactory,
408397
final PeerLookup peerLookup,
@@ -419,8 +408,7 @@ private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
419408
true,
420409
contextCodec,
421410
statusHandler,
422-
peerLookup,
423-
spec.getNetworkingConfig());
411+
peerLookup);
424412
}
425413

426414
public Collection<RpcMethod<?, ?, ?>> all() {

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
2929
import tech.pegasys.teku.networking.p2p.rpc.RpcStream;
3030
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
31-
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;
3231
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest;
3332

3433
public class Eth2IncomingRequestHandler<
3534
TRequest extends RpcRequest & SszData, TResponse extends SszData>
3635
implements RpcRequestHandler {
3736
private static final Logger LOG = LogManager.getLogger();
37+
private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10);
3838

3939
private final PeerLookup peerLookup;
4040
private final LocalMessageHandler<TRequest, TResponse> localMessageHandler;
@@ -45,23 +45,20 @@ public class Eth2IncomingRequestHandler<
4545
private final String protocolId;
4646
private final AsyncRunner asyncRunner;
4747
private final AtomicBoolean requestHandled = new AtomicBoolean(false);
48-
private final Duration respTimeout;
4948

5049
public Eth2IncomingRequestHandler(
5150
final String protocolId,
5251
final RpcResponseEncoder<TResponse, ?> responseEncoder,
5352
final RpcRequestDecoder<TRequest> requestDecoder,
5453
final AsyncRunner asyncRunner,
5554
final PeerLookup peerLookup,
56-
final LocalMessageHandler<TRequest, TResponse> localMessageHandler,
57-
final NetworkingSpecConfig networkingConfig) {
55+
final LocalMessageHandler<TRequest, TResponse> localMessageHandler) {
5856
this.protocolId = protocolId;
5957
this.asyncRunner = asyncRunner;
6058
this.peerLookup = peerLookup;
6159
this.localMessageHandler = localMessageHandler;
6260
this.responseEncoder = responseEncoder;
6361
this.requestDecoder = requestDecoder;
64-
this.respTimeout = Duration.ofSeconds(networkingConfig.getRespTimeout());
6562
}
6663

6764
@Override
@@ -121,15 +118,14 @@ private void handleRequest(
121118
}
122119

123120
private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) {
124-
final Duration timeout = respTimeout;
125121
asyncRunner
126-
.getDelayedFuture(timeout)
122+
.getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT)
127123
.thenAccept(
128124
(__) -> {
129125
if (!requestHandled.get()) {
130126
LOG.debug(
131127
"Failed to receive incoming request data within {} sec for protocol {}. Close stream.",
132-
timeout.toSeconds(),
128+
RECEIVE_INCOMING_REQUEST_TIMEOUT.toSeconds(),
133129
protocolId);
134130
stream.closeAbruptly().ifExceptionGetsHereRaiseABug();
135131
}

0 commit comments

Comments
 (0)