1818 */
1919package org .elasticsearch .cluster .coordination ;
2020
21+ import org .apache .logging .log4j .CloseableThreadContext ;
2122import org .apache .logging .log4j .message .ParameterizedMessage ;
2223import org .elasticsearch .Version ;
2324import org .elasticsearch .cluster .ClusterState ;
3839import org .elasticsearch .discovery .zen .UnicastHostsProvider .HostsResolver ;
3940import org .elasticsearch .indices .cluster .FakeThreadPoolMasterService ;
4041import org .elasticsearch .test .ESTestCase ;
42+ import org .elasticsearch .test .disruption .DisruptableMockTransport ;
4143import org .elasticsearch .test .junit .annotations .TestLogging ;
42- import org .elasticsearch .test .transport .MockTransport ;
43- import org .elasticsearch .transport .RequestHandlerRegistry ;
44- import org .elasticsearch .transport .TransportChannel ;
45- import org .elasticsearch .transport .TransportRequest ;
46- import org .elasticsearch .transport .TransportResponse ;
47- import org .elasticsearch .transport .TransportResponseOptions ;
4844import org .elasticsearch .transport .TransportService ;
4945import org .hamcrest .Matcher ;
5046
5551import java .util .List ;
5652import java .util .Optional ;
5753import java .util .Set ;
58- import java .util .function .Consumer ;
5954import java .util .function .Predicate ;
6055import java .util .stream .Collectors ;
6156
@@ -184,15 +179,15 @@ class ClusterNode extends AbstractComponent {
184179 private final PersistedState persistedState ;
185180 private MasterService masterService ;
186181 private TransportService transportService ;
187- private MockTransport mockTransport ;
182+ private DisruptableMockTransport mockTransport ;
188183
189184 ClusterNode (int nodeIndex ) {
190185 super (Settings .builder ().put (NODE_NAME_SETTING .getKey (), nodeIdFromIndex (nodeIndex )).build ());
191186 this .nodeIndex = nodeIndex ;
192187 localNode = createDiscoveryNode ();
193188 persistedState = new InMemoryPersistedState (1L ,
194189 clusterState (1L , 1L , localNode , initialConfiguration , initialConfiguration , 0L ));
195- setUp ();
190+ onNode ( localNode , this :: setUp ). run ();
196191 }
197192
198193 private DiscoveryNode createDiscoveryNode () {
@@ -206,112 +201,44 @@ private DiscoveryNode createDiscoveryNode() {
206201 }
207202
208203 private void setUp () {
209- mockTransport = new MockTransport ( ) {
204+ mockTransport = new DisruptableMockTransport ( logger ) {
210205 @ Override
211- protected void onSendRequest (long requestId , String action , TransportRequest request , DiscoveryNode destination ) {
212- assert destination .equals (localNode ) == false : "non-local message from " + localNode + " to itself" ;
213- super .onSendRequest (requestId , action , request , destination );
206+ protected DiscoveryNode getLocalNode () {
207+ return localNode ;
208+ }
209+
210+ @ Override
211+ protected ConnectionStatus getConnectionStatus (DiscoveryNode sender , DiscoveryNode destination ) {
212+ return ConnectionStatus .CONNECTED ;
213+ }
214214
215- // connecting and handshaking with a new node happens synchronously, so we cannot enqueue these tasks for later
216- final Consumer < Runnable > scheduler ;
215+ @ Override
216+ protected Optional < DisruptableMockTransport > getDisruptedCapturingTransport ( DiscoveryNode node , String action ) {
217217 final Predicate <ClusterNode > matchesDestination ;
218218 if (action .equals (HANDSHAKE_ACTION_NAME )) {
219- scheduler = Runnable ::run ;
220- matchesDestination = n -> n .getLocalNode ().getAddress ().equals (destination .getAddress ());
219+ matchesDestination = n -> n .getLocalNode ().getAddress ().equals (node .getAddress ());
221220 } else {
222- scheduler = deterministicTaskQueue ::scheduleNow ;
223- matchesDestination = n -> n .getLocalNode ().equals (destination );
221+ matchesDestination = n -> n .getLocalNode ().equals (node );
224222 }
223+ return clusterNodes .stream ().filter (matchesDestination ).findAny ().map (cn -> cn .mockTransport );
224+ }
225225
226- scheduler .accept (new Runnable () {
227- @ Override
228- public String toString () {
229- return "delivery of [" + action + "][" + requestId + "]: " + request ;
230- }
231-
232- @ Override
233- public void run () {
234- clusterNodes .stream ().filter (matchesDestination ).findAny ().ifPresent (
235- destinationNode -> {
236-
237- final RequestHandlerRegistry requestHandler
238- = destinationNode .mockTransport .getRequestHandler (action );
239-
240- final TransportChannel transportChannel = new TransportChannel () {
241- @ Override
242- public String getProfileName () {
243- return "default" ;
244- }
245-
246- @ Override
247- public String getChannelType () {
248- return "coordinator-test-channel" ;
249- }
250-
251- @ Override
252- public void sendResponse (final TransportResponse response ) {
253- scheduler .accept (new Runnable () {
254- @ Override
255- public String toString () {
256- return "delivery of response " + response
257- + " to [" + action + "][" + requestId + "]: " + request ;
258- }
259-
260- @ Override
261- public void run () {
262- handleResponse (requestId , response );
263- }
264- });
265- }
266-
267- @ Override
268- public void sendResponse (TransportResponse response , TransportResponseOptions options ) {
269- sendResponse (response );
270- }
271-
272- @ Override
273- public void sendResponse (Exception exception ) {
274- scheduler .accept (new Runnable () {
275- @ Override
276- public String toString () {
277- return "delivery of error response " + exception .getMessage ()
278- + " to [" + action + "][" + requestId + "]: " + request ;
279- }
280-
281- @ Override
282- public void run () {
283- handleRemoteError (requestId , exception );
284- }
285- });
286- }
287- };
288-
289- try {
290- processMessageReceived (request , requestHandler , transportChannel );
291- } catch (Exception e ) {
292- scheduler .accept (new Runnable () {
293- @ Override
294- public String toString () {
295- return "delivery of processing error response " + e .getMessage ()
296- + " to [" + action + "][" + requestId + "]: " + request ;
297- }
298-
299- @ Override
300- public void run () {
301- handleRemoteError (requestId , e );
302- }
303- });
304- }
305- }
306- );
307- }
308- });
226+ @ Override
227+ protected void handle (DiscoveryNode sender , DiscoveryNode destination , String action , Runnable doDelivery ) {
228+ // handshake needs to run inline as the caller blockingly waits on the result
229+ if (action .equals (HANDSHAKE_ACTION_NAME )) {
230+ onNode (destination , doDelivery ).run ();
231+ } else {
232+ deterministicTaskQueue .scheduleNow (onNode (destination , doDelivery ));
233+ }
309234 }
310235 };
311236
312- masterService = new FakeThreadPoolMasterService ("test" , deterministicTaskQueue ::scheduleNow );
237+ masterService = new FakeThreadPoolMasterService ("test" ,
238+ runnable -> deterministicTaskQueue .scheduleNow (onNode (localNode , runnable )));
313239 transportService = mockTransport .createTransportService (
314- settings , deterministicTaskQueue .getThreadPool (), NOOP_TRANSPORT_INTERCEPTOR , a -> localNode , null , emptySet ());
240+ settings , deterministicTaskQueue .getThreadPool (runnable -> onNode (localNode , runnable )), NOOP_TRANSPORT_INTERCEPTOR ,
241+ a -> localNode , null , emptySet ());
315242 coordinator = new Coordinator (settings , transportService , ESAllocationTestCase .createAllocationService (Settings .EMPTY ),
316243 masterService , this ::getPersistedState , Cluster .this ::provideUnicastHosts , Randomness .get ());
317244 masterService .setClusterStatePublisher (coordinator );
@@ -359,9 +286,20 @@ private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
359286 }
360287 }
361288
362- @ SuppressWarnings ("unchecked" )
363- private static void processMessageReceived (TransportRequest request , RequestHandlerRegistry requestHandler ,
364- TransportChannel transportChannel ) throws Exception {
365- requestHandler .processMessageReceived (request , transportChannel );
289+ private static Runnable onNode (DiscoveryNode node , Runnable runnable ) {
290+ final String nodeId = "{" + node .getId () + "}{" + node .getEphemeralId () + "}" ;
291+ return new Runnable () {
292+ @ Override
293+ public void run () {
294+ try (CloseableThreadContext .Instance ignored = CloseableThreadContext .put ("nodeId" , nodeId )) {
295+ runnable .run ();
296+ }
297+ }
298+
299+ @ Override
300+ public String toString () {
301+ return nodeId + ": " + runnable .toString ();
302+ }
303+ };
366304 }
367305}
0 commit comments