Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public enum QueueProcessingOrder
}
public abstract partial class RateLimiter : System.IAsyncDisposable, System.IDisposable
{
public static System.Threading.RateLimiting.RateLimiter CreateChained(params System.Threading.RateLimiting.RateLimiter[] limiters) { throw null; }
protected RateLimiter() { }
public abstract System.TimeSpan? IdleDuration { get; }
public System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> AcquireAsync(int permitCount = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ System.Threading.RateLimiting.RateLimitLease</PackageDescription>
</PropertyGroup>

<ItemGroup>
<Compile Include="System\Threading\RateLimiting\ChainedRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\ChainedPartitionedRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\CombinedRateLimitLease.cs" />
<Compile Include="System\Threading\RateLimiting\ConcurrencyLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\ConcurrencyLimiterOptions.cs" />
<Compile Include="System\Threading\RateLimiting\DefaultPartitionedRateLimiter.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ internal sealed class ChainedPartitionedRateLimiter<TResource> : PartitionedRate

public ChainedPartitionedRateLimiter(PartitionedRateLimiter<TResource>[] limiters)
{
_limiters = limiters;
_limiters = (PartitionedRateLimiter<TResource>[])limiters.Clone();
}

public override RateLimiterStatistics? GetStatistics(TResource resource)
{
ThrowIfDisposed();

long lowestAvailablePermits = long.MaxValue;
long currentQueuedCount = 0;
long totalFailedLeases = 0;
long innerMostSuccessfulLeases = 0;

foreach (PartitionedRateLimiter<TResource> limiter in _limiters)
{
if (limiter.GetStatistics(resource) is { } statistics)
Expand All @@ -38,11 +40,13 @@ public ChainedPartitionedRateLimiter(PartitionedRateLimiter<TResource>[] limiter
{
lowestAvailablePermits = statistics.CurrentAvailablePermits;
}

currentQueuedCount += statistics.CurrentQueuedCount;
totalFailedLeases += statistics.TotalFailedLeases;
innerMostSuccessfulLeases = statistics.TotalSuccessfulLeases;
}
}

return new RateLimiterStatistics()
{
CurrentAvailablePermits = lowestAvailablePermits,
Expand All @@ -55,11 +59,14 @@ public ChainedPartitionedRateLimiter(PartitionedRateLimiter<TResource>[] limiter
protected override RateLimitLease AttemptAcquireCore(TResource resource, int permitCount)
{
ThrowIfDisposed();

RateLimitLease[]? leases = null;

for (int i = 0; i < _limiters.Length; i++)
{
RateLimitLease? lease = null;
Exception? exception = null;

try
{
lease = _limiters[i].AttemptAcquire(resource, permitCount);
Expand All @@ -68,7 +75,9 @@ protected override RateLimitLease AttemptAcquireCore(TResource resource, int per
{
exception = ex;
}
RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);

RateLimitLease? notAcquiredLease = ChainedRateLimiter.CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);

if (notAcquiredLease is not null)
{
return notAcquiredLease;
Expand All @@ -81,11 +90,14 @@ protected override RateLimitLease AttemptAcquireCore(TResource resource, int per
protected override async ValueTask<RateLimitLease> AcquireAsyncCore(TResource resource, int permitCount, CancellationToken cancellationToken)
{
ThrowIfDisposed();

RateLimitLease[]? leases = null;

for (int i = 0; i < _limiters.Length; i++)
{
RateLimitLease? lease = null;
Exception? exception = null;

try
{
lease = await _limiters[i].AcquireAsync(resource, permitCount, cancellationToken).ConfigureAwait(false);
Expand All @@ -94,7 +106,9 @@ protected override async ValueTask<RateLimitLease> AcquireAsyncCore(TResource re
{
exception = ex;
}
RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);

RateLimitLease? notAcquiredLease = ChainedRateLimiter.CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length);

if (notAcquiredLease is not null)
{
return notAcquiredLease;
Expand All @@ -116,145 +130,5 @@ private void ThrowIfDisposed()
throw new ObjectDisposedException(nameof(ChainedPartitionedRateLimiter<TResource>));
}
}

private static RateLimitLease? CommonAcquireLogic(Exception? ex, RateLimitLease? lease, ref RateLimitLease[]? leases, int index, int length)
{
if (ex is not null)
{
AggregateException? innerEx = CommonDispose(leases, index);
if (innerEx is not null)
{
Exception[] exceptions = new Exception[innerEx.InnerExceptions.Count + 1];
innerEx.InnerExceptions.CopyTo(exceptions, 0);
exceptions[exceptions.Length - 1] = ex;
throw new AggregateException(exceptions);
}
throw ex;
}

if (!lease!.IsAcquired)
{
AggregateException? innerEx = CommonDispose(leases, index);
return innerEx is not null ? throw innerEx : lease;
}

leases ??= new RateLimitLease[length];
leases[index] = lease;
return null;
}

private static AggregateException? CommonDispose(RateLimitLease[]? leases, int i)
{
List<Exception>? exceptions = null;
while (i > 0)
{
i--;
try
{
leases![i].Dispose();
}
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
}

if (exceptions is not null)
{
return new AggregateException(exceptions);
}

return null;
}

private sealed class CombinedRateLimitLease : RateLimitLease
{
private RateLimitLease[]? _leases;
private HashSet<string>? _metadataNames;

public CombinedRateLimitLease(RateLimitLease[] leases)
{
_leases = leases;
}

public override bool IsAcquired => true;

public override IEnumerable<string> MetadataNames
{
get
{
if (_leases is null)
{
return Enumerable.Empty<string>();
}

if (_metadataNames is null)
{
_metadataNames = new HashSet<string>();
foreach (RateLimitLease lease in _leases)
{
foreach (string metadataName in lease.MetadataNames)
{
_metadataNames.Add(metadataName);
}
}
}
return _metadataNames;
}
}

public override bool TryGetMetadata(string metadataName, out object? metadata)
{
if (_leases is not null)
{
foreach (RateLimitLease lease in _leases)
{
// Use the first metadata item of a given name, ignore duplicates, we can't reliably merge arbitrary metadata
// Creating an object[] if there are multiple of the same metadataName could work, but makes consumption of metadata messy
// and makes MetadataName.Create<T>(...) uses no longer work
if (lease.TryGetMetadata(metadataName, out metadata))
{
return true;
}
}
}

metadata = null;
return false;
}

protected override void Dispose(bool disposing)
{
if (_leases is null)
{
return;
}

List<Exception>? exceptions = null;
// Dispose in reverse order
// Avoids issues where dispose might unblock a queued acquire and then the acquire fails when acquiring the next limiter.
// When disposing in reverse order there wont be any issues of unblocking an acquire that affects acquires on limiters in the chain after it
for (int i = _leases.Length - 1; i >= 0; i--)
{
try
{
_leases[i].Dispose();
}
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
}

_leases = null;

if (exceptions is not null)
{
throw new AggregateException(exceptions);
}
}
}
}
}
Loading
Loading