Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<AkkaVersion>1.5.53</AkkaVersion>
<AkkaHostingVersion>1.5.53</AkkaHostingVersion>
<AkkaVersion>1.5.55</AkkaVersion>
<AkkaHostingVersion>1.5.55-beta1</AkkaHostingVersion>
</PropertyGroup>
<!-- Library dependencies -->
<ItemGroup>
Expand Down
59 changes: 57 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ await host.RunAsync();

### Health Checks

The Hosting package includes built-in health check support for monitoring the health of your Redis persistence plugins:
The Hosting package includes built-in connectivity health check support for verifying Redis availability and accessibility. These liveness checks proactively verify that your Redis instance is accessible and responsive by performing PING commands against the configured Redis instance.

#### Enabling Connectivity Health Checks

Enable connectivity health checks by calling `WithHealthCheck()` on the journal and/or snapshot builder:

```csharp
builder
Expand All @@ -196,9 +200,60 @@ builder
ConfigurationString = "your-redis-connection-string",
},
journalBuilder: journal => journal.WithHealthCheck(HealthStatus.Degraded),
snapshotBuilder: snapshot.WithHealthCheck(HealthStatus.Degraded));
snapshotBuilder: snapshot => snapshot.WithHealthCheck(HealthStatus.Degraded));
```

When enabled, the connectivity health checks will:
- Verify connectivity to the Redis instance
- Test the Redis PING command to ensure responsiveness
- Report `Healthy` when the Redis instance is accessible
- Report `Degraded` or `Unhealthy` (configurable) when the instance is unreachable or unresponsive

Health checks are tagged with `akka`, `persistence`, and `redis` for easy filtering and organization in your health check endpoints.

For ASP.NET Core applications, you can expose these health checks via an endpoint:

```csharp
var builder = WebApplication.CreateBuilder(args);

// Add health checks service
builder.Services.AddHealthChecks();

builder.Services.AddAkka("redisDemo", (configBuilder, provider) =>
{
configBuilder
.WithRedisPersistence(
journalOptions: new RedisJournalOptions { ConfigurationString = "your-redis-connection-string" },
snapshotOptions: new RedisSnapshotOptions { ConfigurationString = "your-redis-connection-string" },
journalBuilder: journal => journal.WithHealthCheck(),
snapshotBuilder: snapshot => snapshot.WithHealthCheck());
});

var app = builder.Build();

// Map health check endpoint
app.MapHealthChecks("/healthz");

app.Run();
```

#### Customizing Health Check Tags

You can customize the tags applied to health checks by providing an `IEnumerable<string>` to the `WithHealthCheck()` method:

```csharp
journalBuilder: journal => journal.WithHealthCheck(
unHealthyStatus: HealthStatus.Degraded,
name: "redis-journal",
tags: new[] { "backend", "database", "redis" }),
snapshotBuilder: snapshot => snapshot.WithHealthCheck(
unHealthyStatus: HealthStatus.Degraded,
name: "redis-snapshot",
tags: new[] { "backend", "database", "redis" })
```

When tags are not specified, the default tags are used: `["akka", "persistence", "redis"]` for both journals and snapshot stores.

## Serialization
Akka Persistence provided serializers wrap the user payload in an envelope containing all persistence-relevant information. Redis Journal uses provided Protobuf serializers for the wrapper types (e.g. `IPersistentRepresentation`), then the payload will be serialized using the user configured serializer.

Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the README file packaging removed from this file?

Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(NetStandardVersion);$(NetVersion)</TargetFrameworks>
<PackageReadmeFile>README.md</PackageReadmeFile>
</PropertyGroup>

<PropertyGroup>
<TargetFrameworks>$(NetStandardVersion);$(NetVersion)</TargetFrameworks>
<PackageReadmeFile>README.md</PackageReadmeFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka.Persistence.Hosting" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Akka.Persistence.Hosting" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Redis\Akka.Persistence.Redis.csproj" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Persistence.Redis\Akka.Persistence.Redis.csproj" />
</ItemGroup>

<ItemGroup>
<None Include="$(MSBuildThisFileDirectory)\README.md" Pack="true" PackagePath="\" />
</ItemGroup>
<ItemGroup>
<None Include="README.md" Pack="true" PackagePath="\" />
</ItemGroup>
</Project>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tags need to be customizable for all health checks

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using Akka.Hosting;
using Akka.Persistence.Hosting;
using Microsoft.Extensions.Diagnostics.HealthChecks;

#nullable enable
namespace Akka.Persistence.Redis.Hosting;

/// <summary>
/// Extension methods for Redis persistence connectivity checks
/// </summary>
public static class RedisConnectivityCheckExtensions
{
/// <summary>
/// Adds a connectivity check for the Redis journal.
/// This is a liveness check that proactively verifies database connectivity.
/// </summary>
/// <param name="builder">The journal builder</param>
/// <param name="journalOptions">The journal options containing connection details</param>
/// <param name="unHealthyStatus">The status to return when check fails. Defaults to Unhealthy.</param>
/// <param name="name">Optional name for the health check. Defaults to "Akka.Persistence.Redis.Journal.{id}.Connectivity"</param>
/// <param name="tags">Optional tags for the health check. Defaults to ["akka", "persistence", "redis", "journal", "connectivity"]</param>
/// <returns>The journal builder for chaining</returns>
public static AkkaPersistenceJournalBuilder WithConnectivityCheck(
this AkkaPersistenceJournalBuilder builder,
RedisJournalOptions journalOptions,
HealthStatus unHealthyStatus = HealthStatus.Unhealthy,
string? name = null,
string[]? tags = null)
{
if (journalOptions is null)
throw new ArgumentNullException(nameof(journalOptions));

if (string.IsNullOrWhiteSpace(journalOptions.ConfigurationString))
throw new ArgumentException("ConfigurationString must be set on RedisJournalOptions", nameof(journalOptions));

var registration = new AkkaHealthCheckRegistration(
name ?? $"Akka.Persistence.Redis.Journal.{journalOptions.Identifier}.Connectivity",
new RedisJournalConnectivityCheck(journalOptions.ConfigurationString, journalOptions.Identifier),
unHealthyStatus,
tags ?? new[] { "akka", "persistence", "redis", "journal", "connectivity" });

// Use the new WithCustomHealthCheck method from Akka.Hosting 1.5.55-beta1
return builder.WithCustomHealthCheck(registration);
}

/// <summary>
/// Adds a connectivity check for the Redis snapshot store.
/// This is a liveness check that proactively verifies database connectivity.
/// </summary>
/// <param name="builder">The snapshot builder</param>
/// <param name="snapshotOptions">The snapshot options containing connection details</param>
/// <param name="unHealthyStatus">The status to return when check fails. Defaults to Unhealthy.</param>
/// <param name="name">Optional name for the health check. Defaults to "Akka.Persistence.Redis.SnapshotStore.{id}.Connectivity"</param>
/// <param name="tags">Optional tags for the health check. Defaults to ["akka", "persistence", "redis", "snapshot-store", "connectivity"]</param>
/// <returns>The snapshot builder for chaining</returns>
public static AkkaPersistenceSnapshotBuilder WithConnectivityCheck(
this AkkaPersistenceSnapshotBuilder builder,
RedisSnapshotOptions snapshotOptions,
HealthStatus unHealthyStatus = HealthStatus.Unhealthy,
string? name = null,
string[]? tags = null)
{
if (snapshotOptions is null)
throw new ArgumentNullException(nameof(snapshotOptions));

if (string.IsNullOrWhiteSpace(snapshotOptions.ConfigurationString))
throw new ArgumentException("ConfigurationString must be set on RedisSnapshotOptions", nameof(snapshotOptions));

var registration = new AkkaHealthCheckRegistration(
name ?? $"Akka.Persistence.Redis.SnapshotStore.{snapshotOptions.Identifier}.Connectivity",
new RedisSnapshotStoreConnectivityCheck(snapshotOptions.ConfigurationString, snapshotOptions.Identifier),
unHealthyStatus,
tags ?? new[] { "akka", "persistence", "redis", "snapshot-store", "connectivity" });

// Use the new WithCustomHealthCheck method from Akka.Hosting 1.5.55-beta1
return builder.WithCustomHealthCheck(registration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// -----------------------------------------------------------------------
// <copyright file="RedisJournalConnectivityCheck.cs" company="Akka.NET Project">
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Hosting;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using StackExchange.Redis;

#nullable enable
namespace Akka.Persistence.Redis.Hosting;

/// <summary>
/// Health check that verifies connectivity to the Redis instance used by the journal.
/// This is a liveness check that proactively verifies backend connectivity.
/// </summary>
public sealed class RedisJournalConnectivityCheck : IAkkaHealthCheck
{
private readonly string _connectionString;
private readonly string _journalId;

public RedisJournalConnectivityCheck(string connectionString, string journalId)
{
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
_journalId = journalId ?? throw new ArgumentNullException(nameof(journalId));
}

public async Task<HealthCheckResult> CheckHealthAsync(AkkaHealthCheckContext context, CancellationToken cancellationToken = default)
{
try
{
using var connection = await ConnectionMultiplexer.ConnectAsync(_connectionString);
var server = connection.GetServer(connection.GetEndPoints().First());
await server.PingAsync();
return HealthCheckResult.Healthy($"Redis journal '{_journalId}' connection successful");
}
catch (OperationCanceledException)
{
return HealthCheckResult.Unhealthy($"Redis journal '{_journalId}' connectivity check timed out");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy($"Redis journal '{_journalId}' connection failed", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// -----------------------------------------------------------------------
// <copyright file="RedisSnapshotStoreConnectivityCheck.cs" company="Akka.NET Project">
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Hosting;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using StackExchange.Redis;

#nullable enable
namespace Akka.Persistence.Redis.Hosting;

/// <summary>
/// Health check that verifies connectivity to the Redis instance used by the snapshot store.
/// This is a liveness check that proactively verifies backend connectivity.
/// </summary>
public sealed class RedisSnapshotStoreConnectivityCheck : IAkkaHealthCheck
{
private readonly string _connectionString;
private readonly string _snapshotStoreId;

public RedisSnapshotStoreConnectivityCheck(string connectionString, string snapshotStoreId)
{
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
_snapshotStoreId = snapshotStoreId ?? throw new ArgumentNullException(nameof(snapshotStoreId));
}

public async Task<HealthCheckResult> CheckHealthAsync(AkkaHealthCheckContext context, CancellationToken cancellationToken = default)
{
try
{
using var connection = await ConnectionMultiplexer.ConnectAsync(_connectionString);
var server = connection.GetServer(connection.GetEndPoints().First());
await server.PingAsync();
return HealthCheckResult.Healthy($"Redis snapshot store '{_snapshotStoreId}' connection successful");
}
catch (OperationCanceledException)
{
return HealthCheckResult.Unhealthy($"Redis snapshot store '{_snapshotStoreId}' connectivity check timed out");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy($"Redis snapshot store '{_snapshotStoreId}' connection failed", ex);
}
}
}
Loading