Skip to content

Commit 055c1e6

Browse files
authored
First management implementation (#5)
* First management implementation --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 835cddb commit 055c1e6

23 files changed

+1419
-65
lines changed

.github/workflows/test-supported-net-versions.yml renamed to .github/workflows/build-test.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ on:
66

77
jobs:
88
build:
9-
runs-on: ubuntu-22.04
9+
runs-on: ubuntu-latest
1010
steps:
1111
- uses: actions/checkout@v4
1212
- name: Checkout tls-gen
@@ -28,12 +28,12 @@ jobs:
2828
run: dotnet --version
2929
- name: Restore
3030
run: dotnet restore ./Build.csproj --verbosity=normal
31+
- name: Verify
32+
run: dotnet format ./Build.csproj --no-restore --verbosity=diagnostic --verify-no-changes
3133
- name: Build
3234
run: dotnet build ./Build.csproj --no-restore --verbosity=normal
33-
- name: Verify
34-
run: dotnet format ./rabbitmq-amqp-dotnet-client.sln --no-restore --verbosity=diagnostic --verify-no-changes
3535
- name: Test
36-
run: dotnet test ./Build.csproj --no-build --logger "console;verbosity=detailed" /p:AltCover=true
36+
run: dotnet test ./Build.csproj --logger "console;verbosity=detailed" /p:AltCover=true
3737
- name: Stop toxiproxy
3838
run: docker stop toxiproxy && docker rm toxiproxy
3939
- name: Stop broker

.github/workflows/main.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: rabbitmq-stream-dotnet-client
1+
name: rabbitmq-amqp-dotnet-client
22

33
on:
44
push:
@@ -8,4 +8,4 @@ on:
88

99
jobs:
1010
call-build-test:
11-
uses: ./.github/workflows/test-supported-net-versions.yml
11+
uses: ./.github/workflows/build-test.yaml

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,5 @@ projects/Unit*/TestResult.xml
124124
Tests/coverage.*
125125
##
126126
docs/temp/
127-
127+
tls-gen/
128+
rabbitmq-configuration/

Directory.Packages.props

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
</PropertyGroup>
55
<ItemGroup>
66
<!-- RabbitMQ.Amqp.Client -->
7+
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.10" />
78
<PackageVersion Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" />
8-
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
9+
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
910
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" />
1011
<PackageVersion Include="MinVer" Version="4.3.0" />
1112
<!-- Tests -->
@@ -16,16 +17,4 @@
1617
<PackageVersion Include="coverlet.collector" Version="3.2.0" />
1718
<!-- docs/**/*.csproj -->
1819
</ItemGroup>
19-
<ItemGroup Label=".NET 6 Specific" Condition="'$(TargetFramework)' == 'net6.0'">
20-
<!-- RabbitMQ.Stream.Client -->
21-
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
22-
</ItemGroup>
23-
<ItemGroup Label=".NET 7 Specific" Condition="'$(TargetFramework)' == 'net7.0'">
24-
<!-- RabbitMQ.Stream.Client -->
25-
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="7.0.0" />
26-
</ItemGroup>
27-
<ItemGroup Label=".NET 8 Specific" Condition="'$(TargetFramework)' == 'net8.0'">
28-
<!-- RabbitMQ.Amqp.Client -->
29-
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
30-
</ItemGroup>
3120
</Project>

RabbitMQ.AMQP.Client/Address.cs

Lines changed: 0 additions & 29 deletions
This file was deleted.
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
using Amqp;
2+
3+
namespace RabbitMQ.AMQP.Client;
4+
5+
public class AmqpAddressBuilder
6+
{
7+
private string _host = "localhost";
8+
private int _port = 5672;
9+
private string _user = "guest";
10+
private string _password = "guest";
11+
private string _scheme = "AMQP";
12+
private string _connection = "AMQP.NET";
13+
private string _virtualHost = "/";
14+
15+
16+
public AmqpAddressBuilder Host(string host)
17+
{
18+
_host = host;
19+
return this;
20+
}
21+
22+
public AmqpAddressBuilder Port(int port)
23+
{
24+
_port = port;
25+
return this;
26+
}
27+
28+
public AmqpAddressBuilder User(string user)
29+
{
30+
_user = user;
31+
return this;
32+
}
33+
34+
public AmqpAddressBuilder Password(string password)
35+
{
36+
_password = password;
37+
return this;
38+
}
39+
40+
41+
public AmqpAddressBuilder Scheme(string scheme)
42+
{
43+
_scheme = scheme;
44+
return this;
45+
}
46+
47+
public AmqpAddressBuilder ConnectionName(string connection)
48+
{
49+
_connection = connection;
50+
return this;
51+
}
52+
53+
public AmqpAddressBuilder VirtualHost(string virtualHost)
54+
{
55+
_virtualHost = virtualHost;
56+
return this;
57+
}
58+
59+
public AmqpAddress Build()
60+
{
61+
return new AmqpAddress(_host, _port, _user,
62+
_password, _virtualHost,
63+
_scheme, _connection);
64+
}
65+
}
66+
67+
// <summary>
68+
// Represents a network address.
69+
// </summary>
70+
public class AmqpAddress : IAddress
71+
{
72+
internal Address Address { get; }
73+
74+
private readonly string _connectionName = "AMQP.NET";
75+
private readonly string _virtualHost = "/";
76+
77+
78+
public AmqpAddress(string address)
79+
{
80+
Address = new Address(address);
81+
}
82+
83+
public AmqpAddress(string host, int port,
84+
string user,
85+
string password,
86+
string virtualHost, string scheme, string connectionName)
87+
{
88+
Address = new Address(host, port, user, password, "/", scheme);
89+
_connectionName = connectionName;
90+
_virtualHost = virtualHost;
91+
}
92+
93+
public string Host()
94+
{
95+
return Address.Host;
96+
}
97+
98+
99+
public int Port()
100+
{
101+
return Address.Port;
102+
}
103+
104+
105+
106+
public string VirtualHost()
107+
{
108+
return _virtualHost;
109+
}
110+
111+
public string User()
112+
{
113+
return Address.User;
114+
}
115+
116+
117+
public string Password()
118+
{
119+
return Address.Password;
120+
}
121+
122+
123+
public string Scheme()
124+
{
125+
return Address.Scheme;
126+
}
127+
128+
public string ConnectionName()
129+
{
130+
return _connectionName;
131+
}
132+
133+
public override string ToString()
134+
{
135+
return
136+
$"Address{{host='{Address.Host}', port={Address.Port}, path='{Address.Path}', username='{Address.User}', password='{Address.Password}'}}";
137+
}
138+
139+
public override bool Equals(object? obj)
140+
{
141+
if (obj == null || GetType() != obj.GetType())
142+
{
143+
return false;
144+
}
145+
146+
var address = (AmqpAddress)obj;
147+
return Address.Host == address.Address.Host &&
148+
Address.Port == address.Address.Port &&
149+
Address.Path == address.Address.Path &&
150+
Address.User == address.Address.User &&
151+
Address.Password == address.Address.Password &&
152+
Address.Scheme == address.Address.Scheme;
153+
}
154+
155+
protected bool Equals(AmqpAddress other)
156+
{
157+
return Address.Equals(other.Address);
158+
}
159+
160+
public override int GetHashCode()
161+
{
162+
return Address.GetHashCode();
163+
}
164+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
using Amqp;
2+
using Amqp.Framing;
3+
using Amqp.Types;
4+
5+
namespace RabbitMQ.AMQP.Client;
6+
7+
public class AmqpConnection : IConnection, IResource
8+
{
9+
private Connection? _nativeConnection;
10+
private AmqpAddress _address = null!;
11+
private readonly AmqpManagement _management = new();
12+
13+
public IManagement Management()
14+
{
15+
return _management;
16+
}
17+
18+
public async Task ConnectAsync(IAddress address)
19+
{
20+
var amqpAddress = new AmqpAddressBuilder()
21+
.Host(address.Host())
22+
.Port(address.Port())
23+
.User(address.User())
24+
.Password(address.Password())
25+
.VirtualHost(address.VirtualHost())
26+
.ConnectionName(address.ConnectionName())
27+
.Scheme(address.Scheme())
28+
.Build();
29+
_address = amqpAddress;
30+
await EnsureConnectionAsync();
31+
}
32+
33+
internal async Task EnsureConnectionAsync()
34+
{
35+
try
36+
{
37+
if (_nativeConnection == null || _nativeConnection.IsClosed)
38+
{
39+
var open = new Open
40+
{
41+
HostName = $"vhost:{_address.VirtualHost()}",
42+
Properties = new Fields()
43+
{
44+
[new Symbol("connection_name")] = _address.ConnectionName(),
45+
}
46+
};
47+
var connection = await Connection.Factory.CreateAsync(_address.Address, open);
48+
connection.Closed += (sender, error) =>
49+
{
50+
var unexpected = Status != Status.Closed;
51+
Status = Status.Closed;
52+
53+
Closed?.Invoke(this, unexpected);
54+
55+
Trace.WriteLine(TraceLevel.Warning, $"connection is closed " +
56+
$"{sender} {error} {Status} " +
57+
$"{connection.IsClosed}");
58+
};
59+
_nativeConnection = connection;
60+
_management.Init(connection);
61+
}
62+
63+
Status = Status.Open;
64+
}
65+
catch (Amqp.AmqpException e)
66+
{
67+
throw new ConnectionException("AmqpException: Connection failed", e);
68+
}
69+
catch (System.OperationCanceledException e)
70+
{
71+
// wrong virtual host
72+
throw new ConnectionException("OperationCanceledException: Connection failed", e);
73+
}
74+
75+
catch (NotSupportedException e)
76+
{
77+
// wrong schema
78+
throw new ConnectionException("NotSupportedException: Connection failed", e);
79+
}
80+
81+
82+
}
83+
84+
85+
public async Task CloseAsync()
86+
{
87+
Status = Status.Closed;
88+
if (_nativeConnection is { IsClosed: false }) await _nativeConnection.CloseAsync();
89+
await _management.CloseAsync();
90+
}
91+
92+
public event IResource.ClosedEventHandler? Closed;
93+
94+
95+
public Status Status { get; private set; } = Status.Closed;
96+
}

0 commit comments

Comments
 (0)