Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion test/FunctionalTests/Client/StreamingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,9 @@ await responseStream.WriteAsync(

Logger.LogInformation("Client reading canceled message from server.");
var clientEx = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseStream.MoveNext()).DefaultTimeout();
Assert.AreEqual(StatusCode.Cancelled, clientEx.StatusCode);

// Race on the server can change which error is returned.
Assert.IsTrue(clientEx.StatusCode == StatusCode.Cancelled || clientEx.StatusCode == StatusCode.Internal);
}

[Test]
Expand Down
12 changes: 11 additions & 1 deletion test/Grpc.Net.Client.Tests/Balancer/ConnectionManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public async Task PickAsync_ChannelStateChangesWithWaitForReady_WaitsForCorrectE
services.AddNUnitLogger();
var serviceProvider = services.BuildServiceProvider();
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger(GetType());

var resolver = new TestResolver(loggerFactory);
resolver.UpdateAddresses(new List<BalancerAddress>
Expand All @@ -104,23 +105,32 @@ public async Task PickAsync_ChannelStateChangesWithWaitForReady_WaitsForCorrectE
// Assert
Assert.AreEqual(new DnsEndPoint("localhost", 80), result1.Address!.EndPoint);

logger.LogInformation("Updating resolve to have 80 and 81 addresses.");
resolver.UpdateAddresses(new List<BalancerAddress>
{
new BalancerAddress("localhost", 80),
new BalancerAddress("localhost", 81)
});

logger.LogInformation("Wait for both subchannels to be ready.");
await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync(logger, clientChannel, expectedCount: 2);

// This needs to happen after both subchannels are ready so the Transports collection has two items in it.
logger.LogInformation("Make subchannels not ready.");
for (var i = 0; i < transportFactory.Transports.Count; i++)
{
transportFactory.Transports[i].UpdateState(ConnectivityState.TransientFailure);
}

logger.LogInformation("Wait for both subchannels to not be ready.");
await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync(logger, clientChannel, expectedCount: 0);

var pickTask2 = clientChannel.PickAsync(
new PickContext { Request = new HttpRequestMessage() },
waitForReady: true,
CancellationToken.None).AsTask().DefaultTimeout();

Assert.IsFalse(pickTask2.IsCompleted);
Assert.IsFalse(pickTask2.IsCompleted, "PickAsync should wait until an subchannel is ready.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it. You wait for subchannels to be ready, assert that PickAsync doesn't complete, and then say PickAsync shouldn't complete until subchannels are ready. (Also, the grammar is wrong in the sentence, I assume you mean "any subchannels are ready" or "a subchannel is ready")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah wait, now I understand, the expectedCount value is 0, maybe use named params.


resolver.UpdateAddresses(new List<BalancerAddress>
{
Expand Down
12 changes: 9 additions & 3 deletions test/Shared/BalancerWaitHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

namespace Grpc.Tests.Shared;

public static class BalancerWaitHelpers
internal static class BalancerWaitHelpers
{
public static Task WaitForChannelStateAsync(ILogger logger, GrpcChannel channel, ConnectivityState state, int channelId = 1)
{
Expand Down Expand Up @@ -57,7 +57,12 @@ public static async Task<Subchannel> WaitForSubchannelToBeReadyAsync(ILogger log
return subChannel;
}

public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger logger, GrpcChannel channel, int expectedCount, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null, Func<Subchannel, bool>? validateSubchannel = null)
public static Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger logger, GrpcChannel channel, int expectedCount, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null, Func<Subchannel, bool>? validateSubchannel = null)
{
return WaitForSubchannelsToBeReadyAsync(logger, channel.ConnectionManager, expectedCount, getPickerSubchannels, validateSubchannel);
}

public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger logger, ConnectionManager connectionManager, int expectedCount, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null, Func<Subchannel, bool>? validateSubchannel = null)
{
if (getPickerSubchannels == null)
{
Expand All @@ -68,6 +73,7 @@ public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger
RoundRobinPicker roundRobinPicker => roundRobinPicker._subchannels.ToArray(),
PickFirstPicker pickFirstPicker => new[] { pickFirstPicker.Subchannel },
EmptyPicker emptyPicker => Array.Empty<Subchannel>(),
ErrorPicker errorPicker => Array.Empty<Subchannel>(),
null => Array.Empty<Subchannel>(),
_ => throw new Exception("Unexpected picker type: " + picker.GetType().FullName)
};
Expand All @@ -79,7 +85,7 @@ public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger
Subchannel[]? subChannelsCopy = null;
await TestHelpers.AssertIsTrueRetryAsync(() =>
{
var picker = channel.ConnectionManager._picker;
var picker = connectionManager._picker;
subChannelsCopy = getPickerSubchannels(picker);
logger.LogInformation($"Current subchannel ready count: {subChannelsCopy.Length}");
for (var i = 0; i < subChannelsCopy.Length; i++)
Expand Down