Skip to content

Commit 6546ed6

Browse files
committed
Limit the number of TCP connections on .NET core
Remove the async action in the Parallel.For Add ConnectionLimit to ConnectionConfiguration Add comments to all options with clear descriptions.
1 parent 54d50e6 commit 6546ed6

File tree

6 files changed

+87
-30
lines changed

6 files changed

+87
-30
lines changed

build/scripts/Testing.fsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@ module Tests =
4747
setProcessEnvironVar "NEST_INTEGRATION_CLUSTER" clusterFilter
4848
setProcessEnvironVar "NEST_INTEGRATION_VERSION" esVersion
4949
setProcessEnvironVar "NEST_TEST_FILTER" testFilter
50-
testDesktopClr "all"
50+
testDesktopClr "all"

src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class ConnectionConfiguration : ConnectionConfiguration<ConnectionConfigu
2121
public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1);
2222
public static readonly TimeSpan DefaultPingTimeout = TimeSpan.FromSeconds(2);
2323
public static readonly TimeSpan DefaultPingTimeoutOnSSL = TimeSpan.FromSeconds(5);
24+
public static readonly int DefaultConnectionLimit = 80;
2425

2526
/// <summary>
2627
/// ConnectionConfiguration allows you to control how ElasticLowLevelClient behaves and where/how it connects
@@ -53,10 +54,10 @@ public ConnectionConfiguration(IConnectionPool connectionPool, Func<ConnectionCo
5354

5455
// ReSharper disable once MemberCanBePrivate.Global
5556
// eventhough we use don't use this we very much would like to expose this constructor
57+
5658
public ConnectionConfiguration(IConnectionPool connectionPool, IConnection connection, Func<ConnectionConfiguration, IElasticsearchSerializer> serializerFactory)
5759
: base(connectionPool, connection, serializerFactory)
5860
{ }
59-
6061
}
6162

6263
[Browsable(false)]
@@ -112,6 +113,9 @@ public abstract class ConnectionConfiguration<T> : IConnectionConfigurationValue
112113
private int? _maxRetries;
113114
int? IConnectionConfigurationValues.MaxRetries => _maxRetries;
114115

116+
private int _connectionLimit;
117+
int IConnectionConfigurationValues.ConnectionLimit => _connectionLimit;
118+
115119
private bool _sniffOnStartup;
116120
bool IConnectionConfigurationValues.SniffsOnStartup => _sniffOnStartup;
117121

@@ -166,46 +170,75 @@ protected ConnectionConfiguration(IConnectionPool connectionPool, IConnection co
166170
// ReSharper disable once VirtualMemberCallInContructor
167171
this._serializer = serializerFactory?.Invoke((T)this) ?? this.DefaultSerializer((T)this);
168172

173+
this._connectionLimit = ConnectionConfiguration.DefaultConnectionLimit;
169174
this._requestTimeout = ConnectionConfiguration.DefaultTimeout;
170175
this._sniffOnConnectionFault = true;
171176
this._sniffOnStartup = true;
172177
this._sniffLifeSpan = TimeSpan.FromHours(1);
173178
}
174179

175-
T Assign(Action<ConnectionConfiguration<T>> assigner) => Fluent.Assign((T)this, assigner);
180+
private T Assign(Action<ConnectionConfiguration<T>> assigner) => Fluent.Assign((T)this, assigner);
176181

182+
/// <summary>
183+
/// The default serializer used to serialize documents to and from JSON
184+
/// </summary>
177185
protected virtual IElasticsearchSerializer DefaultSerializer(T settings) => new ElasticsearchDefaultSerializer();
178186

187+
/// <summary>
188+
/// Sets the keep-alive option on a TCP connection.
189+
/// <para>For Desktop CLR, sets ServicePointManager.SetTcpKeepAlive</para>
190+
/// </summary>
191+
/// <param name="keepAliveTime">Specifies the timeout with no activity until the first keep-alive packet is sent.</param>
192+
/// <param name="keepAliveInterval">Specifies the interval between when successive keep-alive packets are sent if no acknowledgement is received.</param>
179193
public T EnableTcpKeepAlive(TimeSpan keepAliveTime, TimeSpan keepAliveInterval) =>
180194
Assign(a => { this._keepAliveTime = keepAliveTime; this._keepAliveInterval = keepAliveInterval; });
181195

196+
/// <summary>
197+
/// The maximum number of retries for a given request,
198+
/// </summary>
182199
public T MaximumRetries(int maxRetries) => Assign(a => a._maxRetries = maxRetries);
183200

201+
/// <summary>
202+
/// Limits the number of concurrent connections that can be opened to an endpoint. Defaults to 80.
203+
/// <para>For Desktop CLR, this setting applies to the DefaultConnectionLimit property on the ServicePointManager object when creating ServicePoint objects, affecting the default <see cref="IConnection"/> implementation.</para>
204+
/// <para>For Core CLR, this setting applies to the MaxConnectionsPerServer property on the HttpClientHandler instances used by the HttpClient inside the default <see cref="IConnection"/> implementation</para>
205+
/// </summary>
206+
/// <param name="connectionLimit">The connection limit</param>
207+
public T ConnectionLimit(int connectionLimit)
208+
{
209+
if (connectionLimit <= 0) throw new ArgumentException("must be greater than 0", nameof(connectionLimit));
210+
return Assign(a => a._connectionLimit = connectionLimit);
211+
}
212+
184213
/// <summary>
185-
/// On connection pools that support reseeding setting this to true (default) will resniff the cluster when a call fails
214+
/// Enables resniffing of the cluster when a call fails, if the connection pool supports reseeding. Defaults to true
186215
/// </summary>
187216
public T SniffOnConnectionFault(bool sniffsOnConnectionFault = true) => Assign(a => a._sniffOnConnectionFault = sniffsOnConnectionFault);
188217

189218
/// <summary>
190-
/// Enables sniffing on first usage of a connection pool if that pool supports reseeding, defaults to true
219+
/// Enables sniffing on first usage of a connection pool if that pool supports reseeding. Defaults to true
191220
/// </summary>
192221
public T SniffOnStartup(bool sniffsOnStartup = true) => Assign(a => a._sniffOnStartup = sniffsOnStartup);
193222

194223
/// <summary>
195224
/// Set the duration after which a cluster state is considered stale and a sniff should be performed again.
196-
/// An IConnectionPool has to signal it supports reseeding otherwise sniffing will never happen.
225+
/// An <see cref="IConnectionPool"/> has to signal it supports reseeding, otherwise sniffing will never happen.
197226
/// Defaults to 1 hour.
198227
/// Set to null to disable completely. Sniffing will only ever happen on ConnectionPools that return true for SupportsReseeding
199228
/// </summary>
200229
/// <param name="sniffLifeSpan">The duration a clusterstate is considered fresh, set to null to disable periodic sniffing</param>
201230
public T SniffLifeSpan(TimeSpan? sniffLifeSpan) => Assign(a => a._sniffLifeSpan = sniffLifeSpan);
202231

203232
/// <summary>
204-
/// Enable gzip compressed requests and responses, do note that you need to configure elasticsearch to set this
233+
/// Enables gzip compressed requests and responses.
234+
/// <para>IMPORTANT: You need to configure http compression on Elasticsearch to be able to use this</para>
205235
/// <para>http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-http.html"</para>
206236
/// </summary>
207237
public T EnableHttpCompression(bool enabled = true) => Assign(a => a._enableHttpCompression = enabled);
208238

239+
/// <summary>
240+
/// Disables the automatic detection of a proxy
241+
/// </summary>
209242
public T DisableAutomaticProxyDetection(bool disable = true) => Assign(a => a._disableAutomaticProxyDetection = disable);
210243

211244
/// <summary>
@@ -223,18 +256,18 @@ public T EnableTcpKeepAlive(TimeSpan keepAliveTime, TimeSpan keepAliveInterval)
223256
public T DisablePing(bool disable = true) => Assign(a => a._disablePings = disable);
224257

225258
/// <summary>
226-
/// This NameValueCollection will be appended to every url NEST calls, great if you need to pass i.e an API key.
259+
/// A collection of query string parameters that will be sent with every request. Useful in situations where you always need to pass a parameter e.g. an API key.
227260
/// </summary>
228261
public T GlobalQueryStringParameters(NameValueCollection queryStringParameters) => Assign(a => a._queryString.Add(queryStringParameters));
229262

230263
/// <summary>
231-
/// a NameValueCollection that will be send as headers for each request
264+
/// A collection of headers that will be sent with every request. Useful in situations where you always need to pass a header e.g. a custom auth header
232265
/// </summary>
233266
public T GlobalHeaders(NameValueCollection headers) => Assign(a => a._headers.Add(headers));
234267

235268
/// <summary>
236269
/// Sets the default timeout in milliseconds for each request to Elasticsearch.
237-
/// NOTE: You can set this to a high value here, and specify the timeout on Elasticsearch's side.
270+
/// NOTE: You can set this to a high value here, and specify a timeout on Elasticsearch's side.
238271
/// </summary>
239272
/// <param name="timeout">time out in milliseconds</param>
240273
public T RequestTimeout(TimeSpan timeout) => Assign(a => a._requestTimeout = timeout);
@@ -262,14 +295,14 @@ public T EnableTcpKeepAlive(TimeSpan keepAliveTime, TimeSpan keepAliveInterval)
262295

263296
/// <summary>
264297
/// Limits the total runtime including retries separately from <see cref="RequestTimeout"/>
265-
/// <pre>
266-
/// When not specified defaults to <see cref="RequestTimeout"/> which itself defaults to 60seconds
267-
/// </pre>
298+
/// <para>
299+
/// When not specified defaults to <see cref="RequestTimeout"/>, which itself defaults to 60 seconds
300+
/// </para>
268301
/// </summary>
269302
public T MaxRetryTimeout(TimeSpan maxRetryTimeout) => Assign(a => a._maxRetryTimeout = maxRetryTimeout);
270303

271304
/// <summary>
272-
/// If your connection has to go through proxy use this method to specify the proxy url
305+
/// If your connection has to go through proxy, use this method to specify the proxy url
273306
/// </summary>
274307
public T Proxy(Uri proxyAdress, string username, string password)
275308
{
@@ -281,8 +314,8 @@ public T Proxy(Uri proxyAdress, string username, string password)
281314
}
282315

283316
/// <summary>
284-
/// Forces all requests to have ?pretty=true, causing elasticsearch to return formatted json.
285-
/// Also forces the client to send out formatted json. Defaults to false
317+
/// Forces all requests to have ?pretty=true, causing Elasticsearch to return formatted json.
318+
/// Also forces the client to send out formatted json. Defaults to <c>false</c>
286319
/// </summary>
287320
public T PrettyJson(bool b = true) => Assign(a =>
288321
{
@@ -293,33 +326,36 @@ public T PrettyJson(bool b = true) => Assign(a =>
293326
});
294327

295328
/// <summary>
296-
/// Make sure the reponse bytes are always available on the ElasticsearchResponse object
297-
/// <para>Note: that depending on the registered serializer this may cause the respond to be read in memory first</para>
329+
/// Ensures the response bytes are always available on the <see cref="ElasticsearchResponse{T}"/>
330+
/// <para>IMPORTANT: Depending on the registered serializer,
331+
/// this may cause the respose to be buffered in memory first, potentially affecting performance.</para>
298332
/// </summary>
299333
public T DisableDirectStreaming(bool b = true) => Assign(a => a._disableDirectStreaming = b);
300334

301335
/// <summary>
302-
/// Global callback for every response that NEST receives, useful for custom logging.
303-
/// Calling this multiple times will register multiple listeners.
336+
/// Registers an <see cref="Action{IApiCallDetails}"/> that is called when a response is received from Elasticsearch.
337+
/// This can be useful for implementing custom logging.
338+
/// Multiple callbacks can be registered by calling this multiple times
304339
/// </summary>
305340
public T OnRequestCompleted(Action<IApiCallDetails> handler) =>
306341
Assign(a => a._completedRequestHandler += handler ?? DefaultCompletedRequestHandler);
307342

343+
/// <summary>
344+
/// Registers an <see cref="Action{RequestData}"/> that is called when <see cref="RequestData"/> is created.
345+
/// Multiple callbacks can be registered by calling this multiple times
346+
/// </summary>
308347
public T OnRequestDataCreated(Action<RequestData> handler) =>
309348
Assign(a => a._onRequestDataCreated += handler ?? DefaultRequestDataCreated);
310349

311350
/// <summary>
312-
/// Basic access authentication credentials to specify with all requests.
351+
/// Basic Authentication credentials to specify with all requests.
313352
/// </summary>
314-
public T BasicAuthentication(string userName, string password)
315-
{
316-
this._basicAuthCredentials = new BasicAuthenticationCredentials
353+
public T BasicAuthentication(string userName, string password) =>
354+
Assign(a => a._basicAuthCredentials = new BasicAuthenticationCredentials
317355
{
318356
Username = userName,
319357
Password = password
320-
};
321-
return (T)this;
322-
}
358+
});
323359

324360
/// <summary>
325361
/// Allows for requests to be pipelined. http://en.wikipedia.org/wiki/HTTP_pipelining

src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public interface IConnectionConfigurationValues : IDisposable
5252
/// </summary>
5353
int? MaxRetries { get; }
5454

55+
/// <summary>
56+
/// Limits the number of concurrent connections that can be opened to an endpoint. Defaults to 80 (see <see cref="ConnectionConfiguration.DefaultConnectionLimit"/>).
57+
/// <para>For Desktop CLR, this setting applies to the DefaultConnectionLimit property on the ServicePointManager object when creating ServicePoint objects, affecting the default <see cref="IConnection"/> implementation.</para>
58+
/// <para>For Core CLR, this setting applies to the MaxConnectionsPerServer property on the HttpClientHandler instances used by the HttpClient inside the default <see cref="IConnection"/> implementation</para>
59+
/// </summary>
60+
int ConnectionLimit { get; }
61+
5562
/// <summary>
5663
/// This signals that we do not want to send initial pings to unknown/previously dead nodes
5764
/// and just send the call straightaway
@@ -68,7 +75,15 @@ public interface IConnectionConfigurationValues : IDisposable
6875
/// When set will force all connections through this proxy
6976
/// </summary>
7077
string ProxyAddress { get; }
78+
79+
/// <summary>
80+
/// The username for the proxy, when configured
81+
/// </summary>
7182
string ProxyUsername { get; }
83+
84+
/// <summary>
85+
/// The password for the proxy, when configured
86+
/// </summary>
7287
string ProxyPassword { get; }
7388

7489
/// <summary>
@@ -138,6 +153,10 @@ public interface IConnectionConfigurationValues : IDisposable
138153
/// </summary>
139154
BasicAuthenticationCredentials BasicAuthenticationCredentials { get; }
140155

156+
/// <summary>
157+
/// An action to run when the <see cref="RequestData"/> for a request has been
158+
/// created.
159+
/// </summary>
141160
Action<RequestData> OnRequestDataCreated { get; }
142161

143162
/// <summary>

src/Elasticsearch.Net/Connection/HttpConnection-CoreFx.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ protected virtual HttpClientHandler CreateHttpClientHandler(RequestData requestD
107107
{
108108
var handler = new HttpClientHandler
109109
{
110-
AutomaticDecompression = requestData.HttpCompression ? GZip | Deflate : None
110+
AutomaticDecompression = requestData.HttpCompression ? GZip | Deflate : None,
111+
// same limit as desktop clr
112+
MaxConnectionsPerServer = requestData.ConnectionSettings.ConnectionLimit
111113
};
112114

113115
if (!requestData.ProxyAddress.IsNullOrEmpty())

src/Elasticsearch.Net/Connection/HttpConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ protected virtual void AlterServicePoint(ServicePoint requestServicePoint, Reque
7676
{
7777
requestServicePoint.UseNagleAlgorithm = false;
7878
requestServicePoint.Expect100Continue = false;
79-
requestServicePoint.ConnectionLimit = 80;
79+
requestServicePoint.ConnectionLimit = requestData.ConnectionSettings.ConnectionLimit;
8080
//looking at http://referencesource.microsoft.com/#System/net/System/Net/ServicePoint.cs
8181
//this method only sets internal values and wont actually cause timers and such to be reset
8282
//So it should be idempotent if called with the same parameters

src/Tests/Reproduce/ConnectionReuseAndBalancing.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ [I] public async Task IndexAndSearchABunch()
5252
this.AssertHttpStats(nodeStats);
5353
for (var i = 0; i < 10; i++)
5454
{
55-
Parallel.For(0, 1000, async (c) => await client.SearchAsync<Project>(s => s));
55+
Parallel.For(0, 1000, c => client.Search<Project>(s => s));
5656

5757
nodeStats = await client.NodesStatsAsync(statsRequest);
5858
this.AssertHttpStats(nodeStats);

0 commit comments

Comments
 (0)