Skip to content

Commit 0bf3c15

Browse files
committed
ReindexAll() improvements (#2462)
* ReindexAll() improvements ReindexAll() is now composed over the IObservables from `ScrollAll()` and `BulkAll()` taking advantage of both's build in concurrency. Since the rate at which the scroll's will come in far exceeds the rate at which we can bulk the `ReindexAll()` also implements a producer consumer rate limitter. This is controlled by a `backPressureFactor` which controls the max amplification factor of running scrolls with the safe guard of: searchSize * maxConcurrency * backPressureFactor >= bulkSize Otherwise not enough scrolls would be spawned to feed a single bulk request. Obviously if the scroll observable calls on complete it will flush out any pending scrolls and feed it to `BulkAll()`. * implemented pr review feedback, namespace changes, document the default for backPressureFactor and small documentation touch ups
1 parent 987f186 commit 0bf3c15

30 files changed

+729
-411
lines changed

src/Nest/CommonAbstractions/Extensions/Extensions.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -211,26 +211,29 @@ internal static Task ForEachAsync<TSource, TResult>(
211211
Func<TSource, long, Task<TResult>> taskSelector,
212212
Action<TSource, TResult> resultProcessor,
213213
Action<Task> done,
214-
int maxDegreeOfParallelism
214+
int maxDegreeOfParallelism,
215+
SemaphoreSlim additionalRateLimitter = null
215216
)
216217
{
217218
var semaphore = new SemaphoreSlim(initialCount: maxDegreeOfParallelism, maxCount: maxDegreeOfParallelism);
218219
long page = 0;
219220

220221
return Task.WhenAll(
221222
from item in lazyList
222-
select ProcessAsync<TSource, TResult>(item, taskSelector, resultProcessor, semaphore, page++)
223+
select ProcessAsync<TSource, TResult>(item, taskSelector, resultProcessor, semaphore, additionalRateLimitter, page++)
223224
).ContinueWith(done);
224225
}
225226

226227
private static async Task ProcessAsync<TSource, TResult>(
227228
TSource item,
228229
Func<TSource, long, Task<TResult>> taskSelector,
229230
Action<TSource, TResult> resultProcessor,
230-
SemaphoreSlim semaphoreSlim,
231+
SemaphoreSlim localRateLimiter,
232+
SemaphoreSlim additionalRateLimiter,
231233
long page)
232234
{
233-
if (semaphoreSlim != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
235+
if (localRateLimiter != null) await localRateLimiter.WaitAsync().ConfigureAwait(false);
236+
if (additionalRateLimiter != null) await additionalRateLimiter.WaitAsync().ConfigureAwait(false);
234237
try
235238
{
236239
var result = await taskSelector(item, page).ConfigureAwait(false);
@@ -242,7 +245,8 @@ private static async Task ProcessAsync<TSource, TResult>(
242245
}
243246
finally
244247
{
245-
semaphoreSlim?.Release();
248+
localRateLimiter?.Release();
249+
additionalRateLimiter?.Release();
246250
}
247251
}
248252
}

src/Nest/CommonAbstractions/Infer/IndexName/IndexNameExtensions.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
{
33
public static class IndexNameExtensions
44
{
5-
65
public static string Resolve(this IndexName marker, IConnectionSettingsValues connectionSettings)
76
{
87
if (marker == null)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace Nest
2+
{
3+
public static class IndicesExtensions
4+
{
5+
public static string Resolve(this Indices marker, IConnectionSettingsValues connectionSettings)
6+
{
7+
if (marker == null) return null;
8+
connectionSettings.ThrowIfNull(nameof(connectionSettings));
9+
return connectionSettings.Inferrer.Resolve(marker);
10+
}
11+
}
12+
}

src/Nest/CommonAbstractions/Infer/Inferrer.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
using System.Collections.Generic;
44
using System.Globalization;
55
using System.Linq;
6+
using Elasticsearch.Net;
67
using Newtonsoft.Json;
78
using Newtonsoft.Json.Serialization;
89

910
namespace Nest
1011
{
1112
public class Inferrer
1213
{
14+
private readonly IConnectionSettingsValues _connectionSettings;
1315
private IdResolver IdResolver { get; }
1416
private IndexNameResolver IndexNameResolver { get; }
1517
private TypeNameResolver TypeNameResolver { get; }
@@ -23,6 +25,7 @@ public class Inferrer
2325
public Inferrer(IConnectionSettingsValues connectionSettings)
2426
{
2527
connectionSettings.ThrowIfNull(nameof(connectionSettings));
28+
this._connectionSettings = connectionSettings;
2629
this.IdResolver = new IdResolver(connectionSettings);
2730
this.IndexNameResolver = new IndexNameResolver(connectionSettings);
2831
this.TypeNameResolver = new TypeNameResolver(connectionSettings);
@@ -31,6 +34,7 @@ public Inferrer(IConnectionSettingsValues connectionSettings)
3134
this.CreateMultiHitDelegates = new ConcurrentDictionary<Type, Action<MultiGetHitJsonConverter.MultiHitTuple, JsonSerializer, ICollection<IMultiGetHit<object>>>>();
3235
this.CreateSearchResponseDelegates = new ConcurrentDictionary<Type, Action<MultiSearchResponseJsonConverter.SearchHitTuple, JsonSerializer, IDictionary<string, object>>>();
3336
}
37+
public string Resolve(IUrlParameter urlParameter) => urlParameter.GetString(this._connectionSettings);
3438

3539
public string Field(Field field) => this.FieldResolver.Resolve(field);
3640

src/Nest/CommonAbstractions/CoordinatedRequestObserverBase.cs renamed to src/Nest/CommonAbstractions/Reactive/CoordinatedRequestObserverBase.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
using System;
22

3-
43
namespace Nest
54
{
5+
internal static class CoordinatedRequestDefaults
6+
{
7+
public static int BulkAllMaxDegreeOfParallelismDefault = 20;
8+
public static TimeSpan BulkAllBackOffTimeDefault = TimeSpan.FromMinutes(1);
9+
public static int BulkAllBackOffRetriesDefault = 0;
10+
public static int BulkAllSizeDefault = 1000;
11+
}
12+
613
public abstract class CoordinatedRequestObserverBase<T> : IObserver<T>
714
{
815
private readonly Action<T> _onNext;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
using System;
2+
using System.Collections;
3+
using System.Collections.Concurrent;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
7+
namespace Nest
8+
{
9+
internal class GetEnumerator<TSource> : IEnumerator<TSource>, IObserver<TSource>
10+
{
11+
private readonly ConcurrentQueue<TSource> _queue;
12+
private TSource _current;
13+
private Exception _error;
14+
private bool _done;
15+
private bool _disposed;
16+
17+
private readonly SemaphoreSlim _gate;
18+
private IDisposable _subscription;
19+
20+
public GetEnumerator()
21+
{
22+
_queue = new ConcurrentQueue<TSource>();
23+
_gate = new SemaphoreSlim(0);
24+
}
25+
26+
private IEnumerator<TSource> Run(IObservable<TSource> source)
27+
{
28+
//
29+
// [OK] Use of unsafe Subscribe: non-pretentious exact mirror with the dual GetEnumerator method.
30+
//
31+
_subscription = source.Subscribe/*Unsafe*/(this);
32+
return this;
33+
}
34+
public IEnumerable<TSource> ToEnumerable(IObservable<TSource> source) =>
35+
new AnonymousEnumerable<TSource>(() => this.Run(source));
36+
37+
public virtual void OnNext(TSource value)
38+
{
39+
_queue.Enqueue(value);
40+
_gate.Release();
41+
}
42+
43+
public void OnError(Exception error)
44+
{
45+
_error = error;
46+
_subscription.Dispose();
47+
_gate.Release();
48+
}
49+
50+
public void OnCompleted()
51+
{
52+
_done = true;
53+
_subscription.Dispose();
54+
_gate.Release();
55+
}
56+
57+
public bool MoveNext()
58+
{
59+
_gate.Wait();
60+
61+
if (_disposed)
62+
throw new ObjectDisposedException("");
63+
64+
if (_queue.TryDequeue(out _current))
65+
return true;
66+
67+
if (_error != null) throw _error;
68+
69+
_gate.Release(); // In the (rare) case the user calls MoveNext again we shouldn't block!
70+
return false;
71+
}
72+
73+
public TSource Current => _current;
74+
75+
object IEnumerator.Current => _current;
76+
77+
public void Dispose()
78+
{
79+
_subscription.Dispose();
80+
81+
_disposed = true;
82+
_gate.Release();
83+
}
84+
85+
public void Reset()
86+
{
87+
throw new NotSupportedException();
88+
}
89+
90+
internal sealed class AnonymousEnumerable<T> : IEnumerable<T>
91+
{
92+
private readonly Func<IEnumerator<T>> _getEnumerator;
93+
94+
public AnonymousEnumerable(Func<IEnumerator<T>> getEnumerator)
95+
{
96+
this._getEnumerator = getEnumerator;
97+
}
98+
99+
public IEnumerator<T> GetEnumerator()
100+
{
101+
return _getEnumerator();
102+
}
103+
104+
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
105+
{
106+
return this.GetEnumerator();
107+
}
108+
}
109+
}
110+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System.Collections;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
5+
namespace Nest
6+
{
7+
internal class PartitionHelper<TDocument> : IEnumerable<IList<TDocument>>
8+
{
9+
private readonly IEnumerable<TDocument> _items;
10+
private readonly int _partitionSize;
11+
private bool _hasMoreItems;
12+
13+
internal PartitionHelper(IEnumerable<TDocument> i, int ps)
14+
{
15+
_items = i;
16+
_partitionSize = ps;
17+
}
18+
19+
IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator();
20+
public IEnumerator<IList<TDocument>> GetEnumerator()
21+
{
22+
using (var enumerator = _items.GetEnumerator())
23+
{
24+
_hasMoreItems = enumerator.MoveNext();
25+
while (_hasMoreItems)
26+
yield return GetNextBatch(enumerator).ToList();
27+
}
28+
}
29+
30+
private IEnumerable<TDocument> GetNextBatch(IEnumerator<TDocument> enumerator)
31+
{
32+
for (var i = 0; i < _partitionSize; ++i)
33+
{
34+
yield return enumerator.Current;
35+
_hasMoreItems = enumerator.MoveNext();
36+
if (!_hasMoreItems) yield break;
37+
}
38+
}
39+
}
40+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace Nest
6+
{
7+
/// <summary>
8+
/// Simple back pressure implementation that makes sure the minimum max concurrency between producer and consumer
9+
/// is not amplified by the greedier of the two by more then the backPressureFactor which defaults to 4
10+
/// </summary>
11+
public class ProducerConsumerBackPressure
12+
{
13+
private readonly SemaphoreSlim _consumerLimiter;
14+
private readonly int _backPressureFactor;
15+
private readonly int _slots;
16+
17+
/// <summary>
18+
/// Simple back pressure implementation that makes sure the minimum max concurrency between producer and consumer
19+
/// is not amplified by the greedier of the two by more then the backPressureFactor
20+
/// </summary>
21+
/// <param name="backPressureFactor">The maximum amplification back pressure of the greedier part of the producer consumer pipeline, if null defaults to 4</param>
22+
/// <param name="maxConcurrency">The minimum maximum concurrency which would be the bottleneck of the producer consumer pipeline</param>
23+
internal ProducerConsumerBackPressure(int? backPressureFactor, int maxConcurrency)
24+
{
25+
this._backPressureFactor = backPressureFactor.GetValueOrDefault(4);
26+
this._slots = maxConcurrency * this._backPressureFactor;
27+
this._consumerLimiter = new SemaphoreSlim(_slots, _slots);
28+
}
29+
30+
public Task WaitAsync(CancellationToken token = default(CancellationToken)) =>
31+
this._consumerLimiter.WaitAsync(token);
32+
33+
public void Release()
34+
{
35+
var minimumRelease = _slots - this._consumerLimiter.CurrentCount;
36+
var release = Math.Min(minimumRelease, this._backPressureFactor);
37+
if (release > 0)
38+
this._consumerLimiter.Release(release);
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)