diff --git a/README.md b/README.md
index e67f4f8a..ecc2d7aa 100644
--- a/README.md
+++ b/README.md
@@ -12,7 +12,7 @@ If you're migrating from legacy `Akka.Persistence.Sql.Common` based plugins, you
- [Akka.Persistence.Sql](#akkapersistencesql)
- [Getting Started](#getting-started)
* [The Easy Way, Using `Akka.Hosting`](#the-easy-way-using-akkahosting)
- + [Health Checks (Akka.Hosting v1.5.51+)](#health-checks-akkahosting-v1551)
+ + [Health Checks](#health-checks)
* [The Classic Way, Using HOCON](#the-classic-way-using-hocon)
* [Supported Database Providers](#supported-database-providers)
+ [Tested Database Providers](#tested-database-providers)
@@ -76,27 +76,22 @@ This includes setting the connection string and provider name again, if necessar
Please consult the Linq2Db documentation for more details on configuring a valid DataOptions object.
Note that `MappingSchema` and `RetryPolicy` will always be overridden by Akka.Persistence.Sql.
-### Health Checks (Akka.Hosting v1.5.51+)
+### Health Checks
-Starting with Akka.Hosting v1.5.51, you can add health checks for your persistence plugins to verify that journals and snapshot stores are properly initialized and accessible. These health checks integrate with `Microsoft.Extensions.Diagnostics.HealthChecks` and can be used with ASP.NET Core health check endpoints.
+Starting with Akka.Persistence.Sql v1.5.51 or later, you can add health checks for your persistence plugins to verify that journals and snapshot stores are properly initialized and accessible. These health checks integrate with `Microsoft.Extensions.Diagnostics.HealthChecks` and can be used with ASP.NET Core health check endpoints.
-To configure health checks, use the `.WithHealthCheck()` method when setting up your journal and snapshot store:
+To configure health checks, use the `journalBuilder` and `snapshotBuilder` parameters with the `.WithHealthCheck()` method:
```csharp
var host = new HostBuilder()
.ConfigureServices((context, services) => {
services.AddAkka("my-system-name", (builder, provider) =>
{
- builder
- .WithSqlPersistence(
- connectionString: _myConnectionString,
- providerName: ProviderName.SqlServer2019,
- journal: j => j.WithHealthCheck(
- unHealthyStatus: HealthStatus.Degraded,
- name: "sql-journal"),
- snapshot: s => s.WithHealthCheck(
- unHealthyStatus: HealthStatus.Degraded,
- name: "sql-snapshot"));
+ builder.WithSqlPersistence(
+ connectionString: _myConnectionString,
+ providerName: ProviderName.SqlServer2019,
+ journalBuilder: journal => journal.WithHealthCheck(HealthStatus.Degraded),
+ snapshotBuilder: snapshot => snapshot.WithHealthCheck(HealthStatus.Degraded));
});
});
```
@@ -119,12 +114,11 @@ builder.Services.AddHealthChecks();
builder.Services.AddAkka("my-system-name", (configBuilder, provider) =>
{
- configBuilder
- .WithSqlPersistence(
- connectionString: _myConnectionString,
- providerName: ProviderName.SqlServer2019,
- journal: j => j.WithHealthCheck(),
- snapshot: s => s.WithHealthCheck());
+ configBuilder.WithSqlPersistence(
+ connectionString: _myConnectionString,
+ providerName: ProviderName.SqlServer2019,
+ journalBuilder: journal => journal.WithHealthCheck(),
+ snapshotBuilder: snapshot => snapshot.WithHealthCheck());
});
var app = builder.Build();
diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/BaselineJournalBuilderSpec.cs b/src/Akka.Persistence.Sql.Hosting.Tests/BaselineJournalBuilderSpec.cs
new file mode 100644
index 00000000..bd44033b
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Hosting.Tests/BaselineJournalBuilderSpec.cs
@@ -0,0 +1,94 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Actor;
+using Akka.Event;
+using Akka.Hosting;
+using Akka.Persistence.Hosting;
+using Akka.Persistence.Query;
+using Akka.Persistence.Sql.Query;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Akka.Persistence.TCK.Query;
+using Akka.Streams;
+using Akka.Streams.TestKit;
+using FluentAssertions;
+using FluentAssertions.Extensions;
+using LinqToDB;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Akka.Persistence.Sql.Hosting.Tests
+{
+ ///
+ /// Baseline test to validate current journalBuilder functionality before refactoring
+ ///
+ public class BaselineJournalBuilderSpec : Akka.Hosting.TestKit.TestKit, IClassFixture
+ {
+ private const string PId = "baseline-test";
+ private readonly SqliteContainer _fixture;
+
+ public BaselineJournalBuilderSpec(ITestOutputHelper output, SqliteContainer fixture)
+ : base(nameof(BaselineJournalBuilderSpec), output)
+ {
+ _fixture = fixture;
+
+ if (!_fixture.InitializeDbAsync().Wait(10.Seconds()))
+ throw new Exception("Failed to clean up database in 10 seconds");
+ }
+
+ protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
+ {
+ // Test the refactored pattern to ensure basic persistence works
+ builder.WithSqlPersistence(
+ connectionString: _fixture.ConnectionString,
+ providerName: _fixture.ProviderName);
+
+ builder.StartActors((system, registry) =>
+ {
+ var actor = system.ActorOf(Props.Create(() => new TestPersistentActor(PId)));
+ registry.Register(actor);
+ });
+ }
+
+ [Fact]
+ public async Task Refactored_hosting_should_support_basic_persistence()
+ {
+ // Arrange
+ var actor = ActorRegistry.Get();
+
+ // Act - persist an event
+ actor.Tell("test-event");
+ await ExpectMsgAsync("ACK", 3.Seconds());
+
+ // Verify the event was persisted
+ var readJournal = Sys.ReadJournalFor("akka.persistence.query.journal.sql");
+ var source = readJournal.CurrentEventsByPersistenceId(PId, 0, long.MaxValue);
+ var probe = source.RunWith(this.SinkProbe(), Sys.Materializer());
+
+ probe.Request(1);
+ var envelope = await probe.ExpectNextAsync(3.Seconds());
+ envelope.PersistenceId.Should().Be(PId);
+ envelope.Event.Should().Be("test-event");
+ await probe.ExpectCompleteAsync();
+ }
+
+ private class TestPersistentActor : ReceivePersistentActor
+ {
+ public TestPersistentActor(string persistenceId)
+ {
+ PersistenceId = persistenceId;
+
+ Command(str =>
+ {
+ var sender = Sender;
+ Persist(str, _ => sender.Tell("ACK"));
+ });
+ }
+
+ public override string PersistenceId { get; }
+ }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/HealthCheckSpec.cs b/src/Akka.Persistence.Sql.Hosting.Tests/HealthCheckSpec.cs
new file mode 100644
index 00000000..ad7da2fc
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Hosting.Tests/HealthCheckSpec.cs
@@ -0,0 +1,112 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Akka.Hosting;
+using Akka.Hosting.HealthChecks;
+using Akka.Persistence.Journal;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using FluentAssertions;
+using FluentAssertions.Extensions;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Diagnostics.HealthChecks;
+using Microsoft.Extensions.Hosting;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Akka.Persistence.Sql.Hosting.Tests
+{
+ ///
+ /// Validates that health checks are properly registered after the refactoring.
+ ///
+ public class HealthCheckSpec : Akka.Hosting.TestKit.TestKit, IClassFixture
+ {
+ private readonly SqliteContainer _fixture;
+
+ public HealthCheckSpec(ITestOutputHelper output, SqliteContainer fixture)
+ : base(nameof(HealthCheckSpec), output)
+ {
+ _fixture = fixture;
+
+ if (!_fixture.InitializeDbAsync().Wait(10.Seconds()))
+ throw new Exception("Failed to clean up database in 10 seconds");
+ }
+
+ protected override void ConfigureServices(HostBuilderContext context, IServiceCollection services)
+ {
+ base.ConfigureServices(context, services);
+ services.AddHealthChecks();
+ }
+
+ protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
+ {
+ // Use the refactored WithSqlPersistence with health check registration
+ builder.WithSqlPersistence(
+ connectionString: _fixture.ConnectionString,
+ providerName: _fixture.ProviderName,
+ journalBuilder: journal =>
+ {
+ journal.WithHealthCheck(HealthStatus.Degraded);
+ },
+ snapshotBuilder: snapshot =>
+ {
+ snapshot.WithHealthCheck(HealthStatus.Degraded);
+ });
+ }
+
+ [Fact]
+ public async Task Health_checks_should_be_registered_and_healthy()
+ {
+ // Arrange
+ var healthCheckService = Host.Services.GetRequiredService();
+
+ // Act - run all health checks
+ var healthReport = await healthCheckService.CheckHealthAsync(CancellationToken.None);
+
+ // Assert - verify that health checks are registered and healthy
+ healthReport.Entries.Should().NotBeEmpty("health checks should be registered");
+
+ // Debug: print all registered health checks (ALL of them, not just SQL)
+ Output?.WriteLine($"Total health checks registered: {healthReport.Entries.Count}");
+ foreach (var entry in healthReport.Entries)
+ {
+ Output?.WriteLine($" - {entry.Key}: {entry.Value.Status}");
+ }
+
+ // We should have exactly 2 health checks: journal and snapshot
+ // Look for any Akka.Persistence-related health checks
+ var persistenceHealthChecks = healthReport.Entries
+ .Where(e => e.Key.Contains("Akka.Persistence", StringComparison.OrdinalIgnoreCase))
+ .ToList();
+
+ persistenceHealthChecks.Should().HaveCount(2,
+ "because we registered health checks for both journal and snapshot store");
+
+ // Verify journal health check exists and is healthy
+ var journalHealthCheck = persistenceHealthChecks
+ .FirstOrDefault(e => e.Key.Contains("journal", StringComparison.OrdinalIgnoreCase));
+
+ journalHealthCheck.Should().NotBeNull("journal health check should be registered");
+ journalHealthCheck.Value.Status.Should().Be(HealthStatus.Healthy,
+ "SQL journal should be properly initialized");
+
+ // Verify snapshot health check exists and is healthy
+ var snapshotHealthCheck = persistenceHealthChecks
+ .FirstOrDefault(e => e.Key.Contains("snapshot", StringComparison.OrdinalIgnoreCase));
+
+ snapshotHealthCheck.Should().NotBeNull("snapshot health check should be registered");
+ snapshotHealthCheck.Value.Status.Should().Be(HealthStatus.Healthy,
+ "SQL snapshot store should be properly initialized");
+
+ // Verify overall health status
+ healthReport.Status.Should().Be(HealthStatus.Healthy,
+ "because all health checks should pass");
+ }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Hosting/HostingExtensions.cs b/src/Akka.Persistence.Sql.Hosting/HostingExtensions.cs
index 95b0dc7e..d7d70311 100644
--- a/src/Akka.Persistence.Sql.Hosting/HostingExtensions.cs
+++ b/src/Akka.Persistence.Sql.Hosting/HostingExtensions.cs
@@ -17,7 +17,8 @@ namespace Akka.Persistence.Sql.Hosting
public static class HostingExtensions
{
///
- /// Adds Akka.Persistence.Sql support to this .
+ /// Adds Akka.Persistence.Sql support to this with optional support
+ /// for health checks on both journal and snapshot store.
///
///
/// The builder instance being configured.
@@ -25,12 +26,6 @@ public static class HostingExtensions
///
/// Connection string used for database access.
///
- ///
- ///
- /// Should the SQL store table be initialized automatically.
- ///
- /// Default: true
- ///
///
/// A string constant defining the database type to connect to, valid values are defined inside
/// static class.
@@ -54,6 +49,18 @@ public static class HostingExtensions
///
/// Default: null
///
+ ///
+ ///
+ /// An used to configure an instance.
+ ///
+ /// Default: null
+ ///
+ ///
+ ///
+ /// Should the SQL store table be initialized automatically.
+ ///
+ /// Default: true
+ ///
///
///
/// The configuration identifier for the plugins
@@ -132,6 +139,18 @@ public static class HostingExtensions
/// Thrown when or is null
/// or whitespace
///
+ ///
+ ///
+ /// builder.WithSqlPersistence(
+ /// connectionString: "...",
+ /// providerName: ProviderName.SQLite,
+ /// journalBuilder: journal => journal
+ /// .AddEventAdapter<MyAdapter>("adapter", new[] { typeof(MyEvent) })
+ /// .WithHealthCheck(HealthStatus.Degraded),
+ /// snapshotBuilder: snapshot => snapshot
+ /// .WithHealthCheck(HealthStatus.Degraded));
+ ///
+ ///
public static AkkaConfigurationBuilder WithSqlPersistence(
this AkkaConfigurationBuilder builder,
string connectionString,
@@ -139,6 +158,7 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
PersistenceMode mode = PersistenceMode.Both,
string? schemaName = null,
Action? journalBuilder = null,
+ Action? snapshotBuilder = null,
bool autoInitialize = true,
string pluginIdentifier = "sql",
bool isDefaultPlugin = true,
@@ -151,6 +171,9 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
if (mode == PersistenceMode.SnapshotStore && journalBuilder is not null)
throw new Exception($"{nameof(journalBuilder)} can only be set when {nameof(mode)} is set to either {PersistenceMode.Both} or {PersistenceMode.Journal}");
+ if (mode == PersistenceMode.Journal && snapshotBuilder is not null)
+ throw new Exception($"{nameof(snapshotBuilder)} can only be set when {nameof(mode)} is set to either {PersistenceMode.Both} or {PersistenceMode.SnapshotStore}");
+
if (string.IsNullOrWhiteSpace(connectionString))
throw new ArgumentNullException(nameof(connectionString), $"{nameof(connectionString)} can not be null");
@@ -183,10 +206,6 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
journalOpt.DatabaseOptions.JournalTable.UseWriterUuidColumn = useWriterUuidColumn;
}
- var adapters = new AkkaPersistenceJournalBuilder(journalOpt.Identifier, builder);
- journalBuilder?.Invoke(adapters);
- journalOpt.Adapters = adapters;
-
var snapshotOpt = new SqlSnapshotOptions(isDefaultPlugin, pluginIdentifier)
{
ConnectionString = connectionString,
@@ -205,9 +224,9 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
return mode switch
{
- PersistenceMode.Journal => builder.WithSqlPersistence(journalOpt, null),
- PersistenceMode.SnapshotStore => builder.WithSqlPersistence(null, snapshotOpt),
- PersistenceMode.Both => builder.WithSqlPersistence(journalOpt, snapshotOpt),
+ PersistenceMode.Journal => builder.WithSqlPersistence(journalOpt, null, journalBuilder, snapshotBuilder),
+ PersistenceMode.SnapshotStore => builder.WithSqlPersistence(null, snapshotOpt, journalBuilder, snapshotBuilder),
+ PersistenceMode.Both => builder.WithSqlPersistence(journalOpt, snapshotOpt, journalBuilder, snapshotBuilder),
_ => throw new ArgumentOutOfRangeException(nameof(mode), mode, "Invalid PersistenceMode defined."),
};
}
@@ -361,7 +380,7 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
if (dataOptions is null)
throw new ArgumentNullException(nameof(dataOptions), $"{nameof(dataOptions)} can not be null");
-
+
var journalOpt = new SqlJournalOptions(isDefaultPlugin, pluginIdentifier)
{
AutoInitialize = autoInitialize,
@@ -387,10 +406,6 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
journalOpt.DatabaseOptions.JournalTable.UseWriterUuidColumn = useWriterUuidColumn;
}
- var adapters = new AkkaPersistenceJournalBuilder(journalOpt.Identifier, builder);
- journalBuilder?.Invoke(adapters);
- journalOpt.Adapters = adapters;
-
var snapshotOpt = new SqlSnapshotOptions(isDefaultPlugin, pluginIdentifier)
{
DataOptions = dataOptions,
@@ -408,13 +423,13 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
return mode switch
{
- PersistenceMode.Journal => builder.WithSqlPersistence(journalOpt, null),
- PersistenceMode.SnapshotStore => builder.WithSqlPersistence(null, snapshotOpt),
- PersistenceMode.Both => builder.WithSqlPersistence(journalOpt, snapshotOpt),
+ PersistenceMode.Journal => builder.WithSqlPersistence(journalOpt, null, journalBuilder, null),
+ PersistenceMode.SnapshotStore => builder.WithSqlPersistence(null, snapshotOpt, journalBuilder, null),
+ PersistenceMode.Both => builder.WithSqlPersistence(journalOpt, snapshotOpt, journalBuilder, null),
_ => throw new ArgumentOutOfRangeException(nameof(mode), mode, "Invalid PersistenceMode defined."),
};
}
-
+
///
/// Adds Akka.Persistence.Sql support to this . At least one of the
/// configurator delegate needs to be populated else this method will throw an exception.
@@ -494,6 +509,18 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
///
/// Default: null
///
+ ///
+ ///
+ /// An used to configure an instance for event adapters and health checks.
+ ///
+ /// Default: null
+ ///
+ ///
+ ///
+ /// An used to configure an instance for health checks.
+ ///
+ /// Default: null
+ ///
///
/// The same instance originally passed in.
///
@@ -503,8 +530,11 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
public static AkkaConfigurationBuilder WithSqlPersistence(
this AkkaConfigurationBuilder builder,
SqlJournalOptions? journalOptions = null,
- SqlSnapshotOptions? snapshotOptions = null)
+ SqlSnapshotOptions? snapshotOptions = null,
+ Action? journalBuilder = null,
+ Action? snapshotBuilder = null)
{
+ // Register DataOptions if needed (SQL-specific setup)
if (journalOptions?.DataOptions is not null)
{
if(builder.Setups.FirstOrDefault(s => s is MultiDataOptionsSetup) is not MultiDataOptionsSetup setup)
@@ -525,30 +555,22 @@ public static AkkaConfigurationBuilder WithSqlPersistence(
}
setup.AddDataOptions(snapshotOptions.PluginId, snapshotOptions.DataOptions);
}
-
+
+ // Use unified API from Akka.Persistence.Hosting
return (journalOptions, snapshotOptions) switch
{
(null, null) =>
throw new ArgumentException($"{nameof(journalOptions)} and {nameof(snapshotOptions)} could not both be null"),
- (_, null) =>
- builder
- .AddHocon(journalOptions.ToConfig(), HoconAddMode.Prepend)
- .AddHocon(journalOptions.DefaultConfig, HoconAddMode.Append)
- .AddHocon(journalOptions.DefaultQueryConfig, HoconAddMode.Append),
-
- (null, _) =>
- builder
- .AddHocon(snapshotOptions.ToConfig(), HoconAddMode.Prepend)
- .AddHocon(snapshotOptions.DefaultConfig, HoconAddMode.Append),
-
- (_, _) =>
- builder
- .AddHocon(journalOptions.ToConfig(), HoconAddMode.Prepend)
- .AddHocon(snapshotOptions.ToConfig(), HoconAddMode.Prepend)
- .AddHocon(journalOptions.DefaultConfig, HoconAddMode.Append)
- .AddHocon(snapshotOptions.DefaultConfig, HoconAddMode.Append)
- .AddHocon(journalOptions.DefaultQueryConfig, HoconAddMode.Append),
+ (_, null) => builder
+ .WithJournal(journalOptions, journalBuilder)
+ .AddHocon(journalOptions.DefaultQueryConfig, HoconAddMode.Append),
+
+ (null, _) => builder.WithSnapshot(snapshotOptions, snapshotBuilder),
+
+ (_, _) => builder
+ .WithJournalAndSnapshot(journalOptions, snapshotOptions, journalBuilder, snapshotBuilder)
+ .AddHocon(journalOptions.DefaultQueryConfig, HoconAddMode.Append),
};
}
}
diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index b6129de5..b3639631 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -2,7 +2,7 @@
true
1.5.51
- 1.5.51
+ 1.5.51.1
5.2.0
1.0.118
8.0.7