|
25 | 25 | import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; |
26 | 26 | import org.elasticsearch.cluster.node.DiscoveryNode; |
27 | 27 | import org.elasticsearch.cluster.node.DiscoveryNodes; |
| 28 | +import org.elasticsearch.common.Nullable; |
28 | 29 | import org.elasticsearch.common.collect.Tuple; |
29 | 30 | import org.elasticsearch.common.settings.Settings; |
| 31 | +import org.elasticsearch.common.unit.TimeValue; |
30 | 32 | import org.elasticsearch.common.util.set.Sets; |
31 | 33 | import org.elasticsearch.discovery.Discovery; |
32 | | -import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; |
33 | 34 | import org.elasticsearch.test.ESTestCase; |
34 | 35 | import org.elasticsearch.transport.TransportException; |
35 | 36 | import org.elasticsearch.transport.TransportResponse; |
|
45 | 46 | import java.util.Map; |
46 | 47 | import java.util.Optional; |
47 | 48 | import java.util.Set; |
| 49 | +import java.util.concurrent.CopyOnWriteArrayList; |
| 50 | +import java.util.concurrent.CountDownLatch; |
48 | 51 | import java.util.concurrent.TimeUnit; |
49 | 52 | import java.util.concurrent.atomic.AtomicBoolean; |
50 | 53 | import java.util.concurrent.atomic.AtomicInteger; |
|
57 | 60 | import static org.hamcrest.Matchers.containsInAnyOrder; |
58 | 61 | import static org.hamcrest.Matchers.containsString; |
59 | 62 | import static org.hamcrest.Matchers.empty; |
| 63 | +import static org.hamcrest.Matchers.emptyIterable; |
60 | 64 | import static org.hamcrest.Matchers.equalTo; |
61 | 65 |
|
62 | 66 | public class PublicationTests extends ESTestCase { |
@@ -478,4 +482,43 @@ private static DiscoveryNode newNode(int nodeId, Map<String, String> attributes, |
478 | 482 | return ts.stream(); |
479 | 483 | }); |
480 | 484 | } |
| 485 | + |
| 486 | + public static class AssertingAckListener implements Discovery.AckListener { |
| 487 | + private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>(); |
| 488 | + private final Set<DiscoveryNode> successfulAcks = Collections.synchronizedSet(new HashSet<>()); |
| 489 | + private final CountDownLatch countDown; |
| 490 | + private final CountDownLatch commitCountDown; |
| 491 | + |
| 492 | + public AssertingAckListener(int nodeCount) { |
| 493 | + countDown = new CountDownLatch(nodeCount); |
| 494 | + commitCountDown = new CountDownLatch(1); |
| 495 | + } |
| 496 | + |
| 497 | + @Override |
| 498 | + public void onCommit(TimeValue commitTime) { |
| 499 | + commitCountDown.countDown(); |
| 500 | + } |
| 501 | + |
| 502 | + @Override |
| 503 | + public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { |
| 504 | + if (e != null) { |
| 505 | + errors.add(new Tuple<>(node, e)); |
| 506 | + } else { |
| 507 | + successfulAcks.add(node); |
| 508 | + } |
| 509 | + countDown.countDown(); |
| 510 | + } |
| 511 | + |
| 512 | + public Set<DiscoveryNode> await(long timeout, TimeUnit unit) throws InterruptedException { |
| 513 | + assertThat(awaitErrors(timeout, unit), emptyIterable()); |
| 514 | + assertTrue(commitCountDown.await(timeout, unit)); |
| 515 | + return new HashSet<>(successfulAcks); |
| 516 | + } |
| 517 | + |
| 518 | + public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException { |
| 519 | + countDown.await(timeout, unit); |
| 520 | + return errors; |
| 521 | + } |
| 522 | + |
| 523 | + } |
481 | 524 | } |
0 commit comments