Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 60 additions & 60 deletions src/EventStore.Core/ClusterVNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,63 @@ public ClusterVNode(ClusterVNodeOptions options,

NodeInfo = new VNodeInfo(instanceId.Value, debugIndex, intTcp, intSecIp, extTcp, extSecIp,
httpEndPoint, options.Cluster.ReadOnlyReplica);
GossipAdvertiseInfo = GetGossipAdvertiseInfo();
GossipAdvertiseInfo GetGossipAdvertiseInfo() {
IPAddress intIpAddress = options.Interface.ReplicationIp;

IPAddress extIpAddress = options.Interface.NodeIp;

var intHostToAdvertise = options.Interface.ReplicationHostAdvertiseAs ?? intIpAddress.ToString();
var extHostToAdvertise = options.Interface.NodeHostAdvertiseAs ?? extIpAddress.ToString();

if (intIpAddress.Equals(IPAddress.Any) || extIpAddress.Equals(IPAddress.Any)) {
IPAddress nonLoopbackAddress = IPFinder.GetNonLoopbackAddress();
IPAddress addressToAdvertise = options.Cluster.ClusterSize > 1 ? nonLoopbackAddress : IPAddress.Loopback;

if (intIpAddress.Equals(IPAddress.Any) && options.Interface.ReplicationHostAdvertiseAs == null) {
intHostToAdvertise = addressToAdvertise.ToString();
}

if (extIpAddress.Equals(IPAddress.Any) && options.Interface.NodeHostAdvertiseAs == null) {
extHostToAdvertise = addressToAdvertise.ToString();
}
}

var intTcpEndPoint = NodeInfo.InternalTcp == null
? null
: new DnsEndPoint(intHostToAdvertise, intTcpPortAdvertiseAs > 0
? (options.Interface.ReplicationTcpPortAdvertiseAs)
: NodeInfo.InternalTcp.Port);

var intSecureTcpEndPoint = NodeInfo.InternalSecureTcp == null
? null
: new DnsEndPoint(intHostToAdvertise, intSecTcpPortAdvertiseAs > 0
? intSecTcpPortAdvertiseAs
: NodeInfo.InternalSecureTcp.Port);

var extTcpEndPoint = NodeInfo.ExternalTcp == null
? null
: new DnsEndPoint(extHostToAdvertise, extTcpPortAdvertiseAs > 0
? extTcpPortAdvertiseAs
: NodeInfo.ExternalTcp.Port);

var extSecureTcpEndPoint = NodeInfo.ExternalSecureTcp == null
? null
: new DnsEndPoint(extHostToAdvertise, extSecTcpPortAdvertiseAs > 0
? extSecTcpPortAdvertiseAs
: NodeInfo.ExternalSecureTcp.Port);

var httpEndPoint = new DnsEndPoint(extHostToAdvertise,
options.Interface.NodePortAdvertiseAs > 0
? options.Interface.NodePortAdvertiseAs
: NodeInfo.HttpEndPoint.GetPort());

return new GossipAdvertiseInfo(intTcpEndPoint, intSecureTcpEndPoint, extTcpEndPoint,
extSecureTcpEndPoint, httpEndPoint, options.Interface.ReplicationHostAdvertiseAs,
options.Interface.NodeHostAdvertiseAs, options.Interface.NodePortAdvertiseAs,
options.Interface.AdvertiseHostToClientAs, options.Interface.AdvertiseNodePortToClientAs,
extTcpOptions?.AdvertisedPort ?? 0);
}

var dbConfig = CreateDbConfig(
out var statsHelper,
Expand All @@ -320,7 +377,8 @@ public ClusterVNode(ClusterVNodeOptions options,
.GetSection(SectionNames.EventStore)
.GetSection(SectionNames.Metrics)
.Get<MetricsConfiguration>() ?? new();
MetricsBootstrapper.Bootstrap(metricsConfiguration, dbConfig, trackers);
MetricsBootstrapper.Bootstrap(metricsConfiguration, dbConfig, trackers, NodeInfo.InstanceId,
GossipAdvertiseInfo.HttpEndPoint);

Db = new TFChunkDb(dbConfig, tracker: trackers.TransactionFileTracker);

Expand Down Expand Up @@ -853,64 +911,6 @@ void StartSubsystems() {
bus.Subscribe<GrpcMessage.SendOverGrpc>(grpcSendService);
});

GossipAdvertiseInfo = GetGossipAdvertiseInfo();
GossipAdvertiseInfo GetGossipAdvertiseInfo() {
IPAddress intIpAddress = options.Interface.ReplicationIp;

IPAddress extIpAddress = options.Interface.NodeIp;

var intHostToAdvertise = options.Interface.ReplicationHostAdvertiseAs ?? intIpAddress.ToString();
var extHostToAdvertise = options.Interface.NodeHostAdvertiseAs ?? extIpAddress.ToString();

if (intIpAddress.Equals(IPAddress.Any) || extIpAddress.Equals(IPAddress.Any)) {
IPAddress nonLoopbackAddress = IPFinder.GetNonLoopbackAddress();
IPAddress addressToAdvertise = options.Cluster.ClusterSize > 1 ? nonLoopbackAddress : IPAddress.Loopback;

if (intIpAddress.Equals(IPAddress.Any) && options.Interface.ReplicationHostAdvertiseAs == null) {
intHostToAdvertise = addressToAdvertise.ToString();
}

if (extIpAddress.Equals(IPAddress.Any) && options.Interface.NodeHostAdvertiseAs == null) {
extHostToAdvertise = addressToAdvertise.ToString();
}
}

var intTcpEndPoint = NodeInfo.InternalTcp == null
? null
: new DnsEndPoint(intHostToAdvertise, intTcpPortAdvertiseAs > 0
? (options.Interface.ReplicationTcpPortAdvertiseAs)
: NodeInfo.InternalTcp.Port);

var intSecureTcpEndPoint = NodeInfo.InternalSecureTcp == null
? null
: new DnsEndPoint(intHostToAdvertise, intSecTcpPortAdvertiseAs > 0
? intSecTcpPortAdvertiseAs
: NodeInfo.InternalSecureTcp.Port);

var extTcpEndPoint = NodeInfo.ExternalTcp == null
? null
: new DnsEndPoint(extHostToAdvertise, extTcpPortAdvertiseAs > 0
? extTcpPortAdvertiseAs
: NodeInfo.ExternalTcp.Port);

var extSecureTcpEndPoint = NodeInfo.ExternalSecureTcp == null
? null
: new DnsEndPoint(extHostToAdvertise, extSecTcpPortAdvertiseAs > 0
? extSecTcpPortAdvertiseAs
: NodeInfo.ExternalSecureTcp.Port);

var httpEndPoint = new DnsEndPoint(extHostToAdvertise,
options.Interface.NodePortAdvertiseAs > 0
? options.Interface.NodePortAdvertiseAs
: NodeInfo.HttpEndPoint.GetPort());

return new GossipAdvertiseInfo(intTcpEndPoint, intSecureTcpEndPoint, extTcpEndPoint,
extSecureTcpEndPoint, httpEndPoint, options.Interface.ReplicationHostAdvertiseAs,
options.Interface.NodeHostAdvertiseAs, options.Interface.NodePortAdvertiseAs,
options.Interface.AdvertiseHostToClientAs, options.Interface.AdvertiseNodePortToClientAs,
extTcpOptions?.AdvertisedPort ?? 0);
}

_httpService = new KestrelHttpService(ServiceAccessibility.Public, _mainQueue, new TrieUriRouter(),
_workersHandler, options.Application.LogHttpRequests,
string.IsNullOrEmpty(GossipAdvertiseInfo.AdvertiseHostToClientAs) ? GossipAdvertiseInfo.AdvertiseExternalHostAs : GossipAdvertiseInfo.AdvertiseHostToClientAs,
Expand Down Expand Up @@ -1606,7 +1606,7 @@ IServiceCollection ConfigureAdditionalServices(IServiceCollection services) => s
(() => (_certificateSelector(), _intermediateCertsSelector(), _trustedRootCertsSelector()));

_startup = new ClusterVNodeStartup<TStreamId>(_subsystems, _mainQueue, monitoringQueue, _mainBus, _workersHandler,
_authenticationProvider, _authorizationProvider,
_authenticationProvider, _authorizationProvider, instanceId.Value, GossipAdvertiseInfo.HttpEndPoint,
options.Application.MaxAppendSize, TimeSpan.FromMilliseconds(options.Database.WriteTimeoutMs),
expiryStrategy ?? new DefaultExpiryStrategy(),
_httpService,
Expand Down
12 changes: 11 additions & 1 deletion src/EventStore.Core/ClusterVNodeStartup.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using EventStore.Common.Configuration;
using EventStore.Common.Utils;
using EventStore.Core.Bus;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Core.Services.Storage.ReaderIndex;
using EventStore.Core.Services.Transport.Grpc;
Expand Down Expand Up @@ -55,6 +57,8 @@ public class ClusterVNodeStartup<TStreamId> : IStartup, IHandle<SystemMessage.Sy

private bool _ready;
private readonly IAuthorizationProvider _authorizationProvider;
private readonly Guid _instanceId;
private readonly DnsEndPoint _httpEndPoint;
private readonly MultiQueuedHandler _httpMessageHandler;
private readonly string _clusterDns;

Expand All @@ -66,6 +70,8 @@ public ClusterVNodeStartup(
MultiQueuedHandler httpMessageHandler,
IAuthenticationProvider authenticationProvider,
IAuthorizationProvider authorizationProvider,
Guid instanceId,
DnsEndPoint httpEndPoint,
int maxAppendSize,
TimeSpan writeTimeout,
IExpiryStrategy expiryStrategy,
Expand Down Expand Up @@ -97,6 +103,8 @@ public ClusterVNodeStartup(
_httpMessageHandler = httpMessageHandler;
_authenticationProvider = authenticationProvider;
_authorizationProvider = authorizationProvider ?? throw new ArgumentNullException(nameof(authorizationProvider));
_instanceId = instanceId;
_httpEndPoint = httpEndPoint;
_maxAppendSize = maxAppendSize;
_writeTimeout = writeTimeout;
_expiryStrategy = expiryStrategy;
Expand Down Expand Up @@ -209,7 +217,9 @@ public IServiceProvider ConfigureServices(IServiceCollection services) {
// OpenTelemetry
.AddOpenTelemetry()
.WithMetrics(meterOptions => meterOptions
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("eventstore"))
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("eventstore")
.AddAttributes(new KeyValuePair<string, object>[]
{ new("nodeId", _instanceId.ToString()), new("nodeAddress", _httpEndPoint.ToString()) }))
.AddMeter(metricsConfiguration.Meters)
.AddView(i => {
if (i.Name.StartsWith("eventstore-") &&
Expand Down
4 changes: 2 additions & 2 deletions src/EventStore.Core/EventStore.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
</PackageReference>
<PackageReference Include="HdrHistogram" Version="2.5.0" />
<PackageReference Include="Microsoft.FASTER.Core" Version="1.9.16" />
<PackageReference Include="OpenTelemetry.Exporter.Prometheus.AspNetCore" Version="1.4.0-rc.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.4.0-rc.1" />
<PackageReference Include="OpenTelemetry.Exporter.Prometheus.AspNetCore" Version="1.7.0-rc.1" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.7.0-rc.1" />
<PackageReference Include="Quickenshtein" Version="1.5.1" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="8.0.0" />
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="8.0.0" />
Expand Down
8 changes: 5 additions & 3 deletions src/EventStore.Core/MetricsBootstrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Net;
using EventStore.Core.TransactionLog.Chunks;
using EventStore.Core.TransactionLog.Checkpoint;
using EventStore.Core.Index;
Expand Down Expand Up @@ -61,10 +62,11 @@ public class GossipTrackers {
public static class MetricsBootstrapper {
private static readonly ILogger Log = Serilog.Log.ForContext(typeof(MetricsBootstrapper));

public static void Bootstrap(
Conf conf,
public static void Bootstrap(Conf conf,
TFChunkDbConfig dbConfig,
Trackers trackers) {
Trackers trackers,
Guid instanceId,
DnsEndPoint httpEndPoint) {

LogConfig(conf);

Expand Down