Skip to content

Commit ec8921e

Browse files
Mpdreamzawelburn
authored andcommitted
ScrollAll() (elastic#2443)
* ScrollAll() Exposes a sliced scroll behind an observable with an optional max degree of parallelism * implemented review feedback from @russcam, unneccessary cast + only set scroll to _doc order if the user did not supply a sort of their own
1 parent e9a66ed commit ec8921e

File tree

16 files changed

+777
-402
lines changed

16 files changed

+777
-402
lines changed
Lines changed: 118 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,126 +1,124 @@
1-
using System;
2-
using System.Collections.Concurrent;
3-
using System.Collections.Generic;
4-
using System.Globalization;
5-
using System.Linq;
6-
using System.Reflection;
7-
using System.Runtime.Serialization;
8-
using System.Text;
9-
10-
namespace Elasticsearch.Net
11-
{
12-
internal static class Extensions
13-
{
14-
#if !DOTNETCORE
15-
internal static string Utf8String(this byte[] bytes) => bytes == null ? null : Encoding.UTF8.GetString(bytes);
16-
#else
17-
internal static string Utf8String(this byte[] bytes) => bytes == null ? null : Encoding.UTF8.GetString(bytes, 0, bytes.Length);
18-
#endif
19-
20-
internal static byte[] Utf8Bytes(this string s)
21-
{
22-
return s.IsNullOrEmpty() ? null : Encoding.UTF8.GetBytes(s);
23-
}
24-
25-
internal static string ToCamelCase(this string s)
26-
{
27-
if (string.IsNullOrEmpty(s))
28-
return s;
29-
30-
if (!char.IsUpper(s[0]))
31-
return s;
32-
33-
string camelCase = char.ToLowerInvariant(s[0]).ToString();
34-
if (s.Length > 1)
35-
camelCase += s.Substring(1);
36-
37-
return camelCase;
38-
}
39-
40-
internal static string NotNull(this string @object, string parameterName)
41-
{
42-
@object.ThrowIfNull(parameterName);
43-
if (string.IsNullOrWhiteSpace(@object))
44-
throw new ArgumentException("String argument is empty", parameterName);
45-
return @object;
46-
}
47-
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Globalization;
4+
using System.Linq;
5+
using System.Text;
6+
7+
namespace Elasticsearch.Net
8+
{
9+
internal static class Extensions
10+
{
11+
#if !DOTNETCORE
12+
internal static string Utf8String(this byte[] bytes) => bytes == null ? null : Encoding.UTF8.GetString(bytes);
13+
#else
14+
internal static string Utf8String(this byte[] bytes) => bytes == null ? null : Encoding.UTF8.GetString(bytes, 0, bytes.Length);
15+
#endif
16+
17+
internal static byte[] Utf8Bytes(this string s)
18+
{
19+
return s.IsNullOrEmpty() ? null : Encoding.UTF8.GetBytes(s);
20+
}
21+
22+
internal static string ToCamelCase(this string s)
23+
{
24+
if (string.IsNullOrEmpty(s))
25+
return s;
26+
27+
if (!char.IsUpper(s[0]))
28+
return s;
29+
30+
string camelCase = char.ToLowerInvariant(s[0]).ToString();
31+
if (s.Length > 1)
32+
camelCase += s.Substring(1);
33+
34+
return camelCase;
35+
}
36+
37+
internal static string NotNull(this string @object, string parameterName)
38+
{
39+
@object.ThrowIfNull(parameterName);
40+
if (string.IsNullOrWhiteSpace(@object))
41+
throw new ArgumentException("String argument is empty", parameterName);
42+
return @object;
43+
}
44+
4845
internal static string NotNull(this Enum @object, string parameterName)
4946
{
5047
@object.ThrowIfNull(parameterName);
5148
return @object.GetStringValue();
5249
}
5350

54-
internal static void ThrowIfEmpty<T>(this IEnumerable<T> @object, string parameterName)
55-
{
56-
@object.ThrowIfNull(parameterName);
57-
if (!@object.Any())
58-
throw new ArgumentException("Argument can not be an empty collection", parameterName);
59-
}
60-
internal static bool HasAny<T>(this IEnumerable<T> list)
61-
{
62-
return list != null && list.Any();
63-
}
64-
65-
internal static void ThrowIfNull<T>(this T value, string name)
66-
{
67-
if (value == null)
68-
throw new ArgumentNullException(name);
69-
}
70-
internal static bool IsNullOrEmpty(this string value)
71-
{
72-
return string.IsNullOrEmpty(value);
73-
}
74-
75-
internal static IEnumerable<T> DistinctBy<T, TKey>(this IEnumerable<T> items, Func<T, TKey> property)
76-
{
77-
return items.GroupBy(property).Select(x => x.First());
78-
}
79-
80-
private static readonly long _week = (long)TimeSpan.FromDays(7).TotalMilliseconds;
81-
private static readonly long _day = (long)TimeSpan.FromDays(1).TotalMilliseconds;
82-
private static readonly long _hour = (long)TimeSpan.FromHours(1).TotalMilliseconds;
83-
private static readonly long _minute = (long)TimeSpan.FromMinutes(1).TotalMilliseconds;
84-
private static readonly long _second = (long)TimeSpan.FromSeconds(1).TotalMilliseconds;
85-
86-
internal static string ToTimeUnit(this TimeSpan timeSpan)
87-
{
88-
var ms = timeSpan.TotalMilliseconds;
89-
string interval;
90-
double factor = 0;
91-
92-
if (ms >= _week)
93-
{
94-
factor = ms / _week;
95-
interval = "w";
96-
}
97-
else if (ms >= _day)
98-
{
99-
factor = ms / _day;
100-
interval = "d";
101-
}
102-
else if (ms >= _hour)
103-
{
104-
factor = ms / _hour;
105-
interval = "h";
106-
}
107-
else if (ms >= _minute)
108-
{
109-
factor = ms / _minute;
110-
interval = "m";
111-
}
112-
else if (ms >= _second)
113-
{
114-
factor = ms / _second;
115-
interval = "s";
116-
}
117-
else
118-
{
119-
factor = ms;
120-
interval = "ms";
121-
}
122-
123-
return factor.ToString("0.##", CultureInfo.InvariantCulture) + interval;
124-
}
125-
}
126-
}
51+
internal static void ThrowIfEmpty<T>(this IEnumerable<T> @object, string parameterName)
52+
{
53+
@object.ThrowIfNull(parameterName);
54+
if (!@object.Any())
55+
throw new ArgumentException("Argument can not be an empty collection", parameterName);
56+
}
57+
internal static bool HasAny<T>(this IEnumerable<T> list)
58+
{
59+
return list != null && list.Any();
60+
}
61+
62+
internal static void ThrowIfNull<T>(this T value, string name)
63+
{
64+
if (value == null)
65+
throw new ArgumentNullException(name);
66+
}
67+
internal static bool IsNullOrEmpty(this string value)
68+
{
69+
return string.IsNullOrEmpty(value);
70+
}
71+
72+
internal static IEnumerable<T> DistinctBy<T, TKey>(this IEnumerable<T> items, Func<T, TKey> property)
73+
{
74+
return items.GroupBy(property).Select(x => x.First());
75+
}
76+
77+
private static readonly long _week = (long)TimeSpan.FromDays(7).TotalMilliseconds;
78+
private static readonly long _day = (long)TimeSpan.FromDays(1).TotalMilliseconds;
79+
private static readonly long _hour = (long)TimeSpan.FromHours(1).TotalMilliseconds;
80+
private static readonly long _minute = (long)TimeSpan.FromMinutes(1).TotalMilliseconds;
81+
private static readonly long _second = (long)TimeSpan.FromSeconds(1).TotalMilliseconds;
82+
83+
internal static string ToTimeUnit(this TimeSpan timeSpan)
84+
{
85+
var ms = timeSpan.TotalMilliseconds;
86+
string interval;
87+
double factor = 0;
88+
89+
if (ms >= _week)
90+
{
91+
factor = ms / _week;
92+
interval = "w";
93+
}
94+
else if (ms >= _day)
95+
{
96+
factor = ms / _day;
97+
interval = "d";
98+
}
99+
else if (ms >= _hour)
100+
{
101+
factor = ms / _hour;
102+
interval = "h";
103+
}
104+
else if (ms >= _minute)
105+
{
106+
factor = ms / _minute;
107+
interval = "m";
108+
}
109+
else if (ms >= _second)
110+
{
111+
factor = ms / _second;
112+
interval = "s";
113+
}
114+
else
115+
{
116+
factor = ms;
117+
interval = "ms";
118+
}
119+
120+
return factor.ToString("0.##", CultureInfo.InvariantCulture) + interval;
121+
}
122+
123+
}
124+
}

src/Nest/CommonAbstractions/Extensions/Extensions.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
using Newtonsoft.Json;
1212
using Newtonsoft.Json.Converters;
1313
using Newtonsoft.Json.Linq;
14+
using System.Threading.Tasks;
15+
using System.Threading;
1416

1517
namespace Nest
1618
{
@@ -203,5 +205,45 @@ internal static Dictionary<TKey, TValue> NullIfNoKeys<TKey, TValue>(this Diction
203205
}
204206

205207
internal static IEnumerable<T> EmptyIfNull<T>(this IEnumerable<T> xs) => xs ?? new T[0];
208+
209+
internal static Task ForEachAsync<TSource, TResult>(
210+
this IEnumerable<TSource> lazyList,
211+
Func<TSource, long, Task<TResult>> taskSelector,
212+
Action<TSource, TResult> resultProcessor,
213+
Action<Task> done,
214+
int maxDegreeOfParallelism
215+
)
216+
{
217+
var semaphore = new SemaphoreSlim(initialCount: maxDegreeOfParallelism, maxCount: maxDegreeOfParallelism);
218+
long page = 0;
219+
220+
return Task.WhenAll(
221+
from item in lazyList
222+
select ProcessAsync<TSource, TResult>(item, taskSelector, resultProcessor, semaphore, page++)
223+
).ContinueWith(done);
224+
}
225+
226+
private static async Task ProcessAsync<TSource, TResult>(
227+
TSource item,
228+
Func<TSource, long, Task<TResult>> taskSelector,
229+
Action<TSource, TResult> resultProcessor,
230+
SemaphoreSlim semaphoreSlim,
231+
long page)
232+
{
233+
if (semaphoreSlim != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
234+
try
235+
{
236+
var result = await taskSelector(item, page).ConfigureAwait(false);
237+
resultProcessor(item, result);
238+
}
239+
catch
240+
{
241+
throw;
242+
}
243+
finally
244+
{
245+
semaphoreSlim?.Release();
246+
}
247+
}
206248
}
207249
}

0 commit comments

Comments
 (0)