diff --git a/src/tck/Reactive.Streams.TCK.Tests/RangePublisherTest.cs b/src/tck/Reactive.Streams.TCK.Tests/RangePublisherTest.cs new file mode 100644 index 0000000..7703d94 --- /dev/null +++ b/src/tck/Reactive.Streams.TCK.Tests/RangePublisherTest.cs @@ -0,0 +1,192 @@ +using NUnit.Framework; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Reactive.Streams.TCK.Tests +{ + [TestFixture] + public class RangePublisherTest : PublisherVerification + { + static readonly ConcurrentDictionary stacks = new ConcurrentDictionary(); + + static readonly ConcurrentDictionary states = new ConcurrentDictionary(); + + static int id; + + [TearDown] + public void AfterTest() + { + bool fail = false; + StringBuilder b = new StringBuilder(); + foreach (var t in states) + { + if (!t.Value) + { + b.Append("\r\n-------------------------------"); + + b.Append("\r\nat ").Append(stacks[t.Key]); + + fail = true; + } + } + states.Clear(); + stacks.Clear(); + if (fail) + { + throw new InvalidOperationException("Cancellations were missing:" + b); + } + } + + public RangePublisherTest() : base(new TestEnvironment()) + { + } + + public override IPublisher CreatePublisher(long elements) + { + return new RangePublisher(1, elements); + } + + public override IPublisher CreateFailedPublisher() + { + return null; + } + + internal sealed class RangePublisher : IPublisher + { + + readonly string stacktrace; + + readonly long start; + + readonly long count; + + internal RangePublisher(long start, long count) + { + this.stacktrace = Environment.StackTrace; + this.start = start; + this.count = count; + } + + public void Subscribe(ISubscriber s) + { + if (s == null) + { + throw new ArgumentNullException(); + } + + int ids = Interlocked.Increment(ref id); + + RangeSubscription parent = new RangeSubscription(s, ids, start, start + count); + stacks.AddOrUpdate(ids, (a) => stacktrace, (a, b) => stacktrace); + states.AddOrUpdate(ids, (a) => false, (a, b) => false); + s.OnSubscribe(parent); + } + + sealed class RangeSubscription : ISubscription + { + + readonly ISubscriber actual; + + readonly int ids; + + readonly long end; + + long index; + + volatile bool cancelled; + + long requested; + + internal RangeSubscription(ISubscriber actual, int ids, long start, long end) + { + this.actual = actual; + this.ids = ids; + this.index = start; + this.end = end; + } + + + public void Request(long n) + { + if (!cancelled) + { + if (n <= 0L) + { + cancelled = true; + states[ids] = true; + actual.OnError(new ArgumentException("§3.9 violated")); + return; + } + + for (;;) + { + long r = Volatile.Read(ref requested); + long u = r + n; + if (u < 0L) + { + u = long.MaxValue; + } + if (Interlocked.CompareExchange(ref requested, u, r) == r) + { + if (r == 0) + { + break; + } + return; + } + } + + long idx = index; + long f = end; + + for (;;) + { + long e = 0; + while (e != n && idx != f) + { + if (cancelled) + { + return; + } + + actual.OnNext((int)idx); + + idx++; + e++; + } + + if (idx == f) + { + if (!cancelled) + { + states[ids] = true; + actual.OnComplete(); + } + return; + } + + index = idx; + n = Interlocked.Add(ref requested, -n); + if (n == 0) + { + break; + } + } + } + } + + public void Cancel() + { + cancelled = true; + states[ids] = true; + } + } + } + } +} diff --git a/src/tck/Reactive.Streams.TCK/PublisherVerification.cs b/src/tck/Reactive.Streams.TCK/PublisherVerification.cs index 301481d..4d3e1ca 100644 --- a/src/tck/Reactive.Streams.TCK/PublisherVerification.cs +++ b/src/tck/Reactive.Streams.TCK/PublisherVerification.cs @@ -52,7 +52,7 @@ public static long EnvironmentPublisherReferenceGcTimeoutMilliseconds() { var environmentMilliseconds = Environment.GetEnvironmentVariable(PublisherReferenceGcTimeoutMillisecondsEnvironment); - if(environmentMilliseconds == null) + if (environmentMilliseconds == null) return DefaultPublisherReferenceGcTimeoutMilliseconds; try { @@ -169,18 +169,24 @@ public void Required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfPr => ActivePublisherTest(5, false, publisher => { var subscriber = _environment.NewManualSubscriber(publisher); + try + { + subscriber.ExpectNone($"Publisher {publisher} produced value before the first `Request`: "); + subscriber.Request(1); + subscriber.NextElement($"Publisher {publisher} produced no element after first `Request`"); + subscriber.ExpectNone($"Publisher {publisher} produced unrequested: "); - subscriber.ExpectNone($"Publisher {publisher} produced value before the first `Request`: "); - subscriber.Request(1); - subscriber.NextElement($"Publisher {publisher} produced no element after first `Request`"); - subscriber.ExpectNone($"Publisher {publisher} produced unrequested: "); - - subscriber.Request(1); - subscriber.Request(2); - subscriber.NextElements(3, _environment.DefaultTimeoutMilliseconds, - $"Publisher {publisher} produced less than 3 elements after two respective `Request` calls"); + subscriber.Request(1); + subscriber.Request(2); + subscriber.NextElements(3, _environment.DefaultTimeoutMilliseconds, + $"Publisher {publisher} produced less than 3 elements after two respective `Request` calls"); - subscriber.ExpectNone($"Publisher {publisher} produced unrequested "); + subscriber.ExpectNone($"Publisher {publisher} produced unrequested "); + } + finally + { + subscriber.Cancel(); + } }); // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.2 @@ -430,7 +436,7 @@ public void Required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSigna // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.7 [Test] - public void Untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() + public void Untested_spec107_mustNotEmitFurtherSignalsOnceOnErrorHasBeenSignalled() => NotVerified(); // can we meaningfully test this, without more control over the publisher? // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.8 @@ -468,25 +474,41 @@ public void Required_spec109_mustIssueOnSubscribeForNonNullSubscriber() => ActivePublisherTest(0, false, publisher => { var onSubscriberLatch = new Latch(_environment); - publisher.Subscribe(new Spec109Subscriber(onSubscriberLatch)); - onSubscriberLatch.ExpectClose("Should have received OnSubscribe"); - _environment.VerifyNoAsyncErrorsNoDelay(); + var subscriber = new Spec109Subscriber(onSubscriberLatch); + try + { + publisher.Subscribe(subscriber); + onSubscriberLatch.ExpectClose("Should have received OnSubscribe"); + _environment.VerifyNoAsyncErrorsNoDelay(); + } + finally + { + subscriber.Cancel(); + } }); private class Spec109Subscriber : ISubscriber { private readonly Latch _onSubscriberLatch; + ISubscription upstream; + public Spec109Subscriber(Latch onSubscriberLatch) { _onSubscriberLatch = onSubscriberLatch; } + public void Cancel() + { + Interlocked.Exchange(ref upstream, null)?.Cancel(); + } + public void OnNext(T element) => _onSubscriberLatch.AssertClosed("OnSubscribe should be called prior to OnNext always"); public void OnSubscribe(ISubscription subscription) { + Interlocked.Exchange(ref upstream, subscription); _onSubscriberLatch.AssertOpen("Only one OnSubscribe call expected"); _onSubscriberLatch.Close(); } @@ -500,7 +522,7 @@ public void OnComplete() // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.9 [Test] - public void Required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe () + public void Required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() => WhenHasErrorPublisherTest(publisher => { var onErrorLatch = new Latch(_environment); @@ -542,7 +564,7 @@ public override void OnSubscribe(ISubscription subscription) // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.10 [Test] - public void Untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() + public void Untested_spec110_rejectASubscriptionRequestIfTheSameSubscriberSubscribesTwice() => NotVerified(); // can we meaningfully test this? // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 @@ -552,13 +574,26 @@ public void Optional_spec111_maySupportMultiSubscribe() { var sub1 = _environment.NewManualSubscriber(publisher); var sub2 = _environment.NewManualSubscriber(publisher); - - _environment.VerifyNoAsyncErrors(); + try + { + _environment.VerifyNoAsyncErrors(); + } + finally + { + try + { + sub1.Cancel(); + } + finally + { + sub2.Cancel(); + } + } }); // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 [Test] - public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne () + public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() => OptionalActivePublisherTest(5, true, publisher => { var sub1 = _environment.NewManualSubscriber(publisher); @@ -595,7 +630,7 @@ public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequen var x4 = sub1.NextElement($"Publisher {publisher} did not produce the requested 1 element on 1st subscriber"); sub1.RequestEndOfStream($"Publisher {publisher} did not complete the stream as expected on 1st subscriber"); - var r = new List {x1, x2}; + var r = new List { x1, x2 }; r.AddRange(x3); r.Add(x4); @@ -614,7 +649,7 @@ public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequen // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11 [Test] - public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront () + public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() => OptionalActivePublisherTest(3, false, publisher => { var sub1 = _environment.NewManualSubscriber(publisher); @@ -633,7 +668,7 @@ public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequen // NOTE: can't check completion, the Publisher may not be able to signal it // a similar test *with* completion checking is implemented - + Assert.AreEqual(received1, received2, "Expected elements to be signaled in the same sequence to 1st and 2nd subscribers"); Assert.AreEqual(received2, received3, "Expected elements to be signaled in the same sequence to 2st and 3nd subscribers"); @@ -667,7 +702,7 @@ public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequen }); ///////////////////// SUBSCRIPTION TESTS ////////////////////////////////// - + // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.2 [Test] public void Required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() @@ -800,7 +835,7 @@ public override void OnError(Exception cause) [Test] public void Untested_spec304_requestShouldNotPerformHeavyComputations() => NotVerified(); // cannot be meaningfully tested, or can it? - + // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.5 [Test] public void Untested_spec305_cancelMustNotSynchronouslyPerformHeavyCompuatation() @@ -834,13 +869,13 @@ public Spec306Subscriber(TestEnvironment environment) : base(environment) // In this test however it must keep the cancelled Subscription and keep issuing `request(long)` to it. public override void Cancel() { - if(Subscription.IsCompleted()) + if (Subscription.IsCompleted()) Subscription.Value.Cancel(); else Environment.Flop("Cannot cancel a subscription before having received it"); } } - + // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.7 [Test] public void Required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() @@ -858,7 +893,7 @@ public void Required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsM subscriber.ExpectNone(); _environment.VerifyNoAsyncErrorsNoDelay(); }); - + // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.9 [Test] public void Required_spec309_requestZeroMustSignalIllegalArgumentException() @@ -868,7 +903,7 @@ public void Required_spec309_requestZeroMustSignalIllegalArgumentException() subscriber.Request(0); subscriber.ExpectErrorWithMessage("3.9"); // we do require implementations to mention the rule number at the very least }); - + // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.9 [Test] public void Required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() @@ -964,13 +999,13 @@ public void Required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferenc GC.Collect(); ManualSubscriber tmp; - if(reference.TryGetTarget(out tmp)) + if (reference.TryGetTarget(out tmp)) _environment.Flop($"Publisher {publisher} did not drop reference to test subscriber after subscription cancellation"); _environment.VerifyNoAsyncErrorsNoDelay(); }); } - + // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.17 [Test] public void Required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() @@ -1068,7 +1103,7 @@ public override void OnNext(T element) } ///////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// - + ///////////////////// TEST INFRASTRUCTURE ///////////////////////////////// /// @@ -1083,7 +1118,7 @@ public void ActivePublisherTest(long elements, bool completionSignalRequired, Ac { if (elements > MaxElementsFromPublisher) Assert.Ignore($"Uable to run this test as required elements nr : {elements} is higher than supported by given producer {MaxElementsFromPublisher}"); - if(completionSignalRequired && MaxElementsFromPublisher == long.MaxValue) + if (completionSignalRequired && MaxElementsFromPublisher == long.MaxValue) Assert.Ignore("Unable to run this test, as it requires an onComplete signal, which this Publisher is unable to provide (as signalled by returning long.MaxValue from `MaxElementsFromPublisher"); var publisher = CreatePublisher(elements); @@ -1101,14 +1136,14 @@ public void ActivePublisherTest(long elements, bool completionSignalRequired, Ac /// The actual test to run public void OptionalActivePublisherTest(long elements, bool completionSignalRequired, Action> run) { - if(elements > MaxElementsFromPublisher) + if (elements > MaxElementsFromPublisher) Assert.Ignore($"Uable to run this test as required elements nr : {elements} is higher than supported by given producer {MaxElementsFromPublisher}"); if (completionSignalRequired && MaxElementsFromPublisher == long.MaxValue) Assert.Ignore("Unable to run this test, as it requires an onComplete signal, which this Publisher is unable to provide (as signalled by returning long.MaxValue from `MaxElementsFromPublisher"); var publisher = CreatePublisher(elements); var skipMessage = "Skipped because tested publisher does NOT implement this OPTIONAL requirement."; - + try { PotentiallyPendingTest(publisher, run); @@ -1153,7 +1188,7 @@ public void PotentiallyPendingTest(IPublisher publisher, Action /// public void StochasticTest(int n, Action body) { - if(SkipStochasticTests) + if (SkipStochasticTests) Assert.Ignore("Skipping @Stochastic test because `SkipStochasticTests` returned `true`!"); for (var i = 0; i < n; i++)