Skip to content

Commit 848cd4c

Browse files
authored
Add per-plugin recovery permitter actors (#7448)
* Add per-plugin recovery permitter actors * Add specs
1 parent ccae284 commit 848cd4c

File tree

7 files changed

+125
-37
lines changed

7 files changed

+125
-37
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// -----------------------------------------------------------------------
2+
// <copyright file="MultipleRecoveryPermitterSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
// -----------------------------------------------------------------------
7+
8+
using System.Threading.Tasks;
9+
using Akka.Actor;
10+
using Akka.Configuration;
11+
using FluentAssertions;
12+
using Xunit;
13+
14+
namespace Akka.Persistence.Tests;
15+
16+
public class MultipleRecoveryPermitterSpec : PersistenceSpec
17+
{
18+
private readonly IActorRef _permitter1;
19+
private readonly IActorRef _permitter2;
20+
21+
public MultipleRecoveryPermitterSpec() : base(ConfigurationFactory.ParseString($$"""
22+
akka.persistence
23+
{
24+
# default global max recovery value
25+
max-concurrent-recoveries = 3
26+
27+
journal
28+
{
29+
plugin = "akka.persistence.journal.inmem"
30+
inmem2 {
31+
# max recovery value override
32+
max-concurrent-recoveries = 20
33+
class = "Akka.Persistence.Journal.MemoryJournal, Akka.Persistence"
34+
plugin-dispatcher = "akka.actor.default-dispatcher"
35+
}
36+
}
37+
38+
# snapshot store plugin is NOT defined, things should still work
39+
snapshot-store.plugin = "akka.persistence.no-snapshot-store"
40+
snapshot-store.local.dir = "target/snapshots-"{{typeof(RecoveryPermitterSpec).FullName}}"}
41+
"""))
42+
{
43+
var extension = Persistence.Instance.Apply(Sys);
44+
_permitter1 = extension.RecoveryPermitterFor(null);
45+
_permitter2 = extension.RecoveryPermitterFor("akka.persistence.journal.inmem2");
46+
}
47+
48+
[Fact(DisplayName = "Plugin max-concurrent-recoveries HOCON setting should override akka.persistence setting")]
49+
public async Task HoconOverrideTest()
50+
{
51+
_permitter1.Tell(GetMaxPermits.Instance);
52+
await ExpectMsgAsync(3);
53+
54+
_permitter2.Tell(GetMaxPermits.Instance);
55+
await ExpectMsgAsync(20);
56+
}
57+
58+
[Fact(DisplayName = "Each plugin should have their own recovery permitter")]
59+
public void MultiRecoveryPermitterActorTest()
60+
{
61+
_permitter1.Equals(_permitter2).Should().BeFalse();
62+
}
63+
}

src/core/Akka.Persistence.Tests/RecoveryPermitterSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public RecoveryPermitterSpec() : base(ConfigurationFactory.ParseString(@"
7575
akka.persistence.snapshot-store.local.dir = ""target/snapshots-" + typeof(RecoveryPermitterSpec).FullName +
7676
@"/"""))
7777
{
78-
permitter = Persistence.Instance.Apply(Sys).RecoveryPermitter();
78+
permitter = Persistence.Instance.Apply(Sys).RecoveryPermitterFor(null);
7979
}
8080

8181
private void RequestPermit(TestProbe probe)

src/core/Akka.Persistence/Eventsourced.Lifecycle.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ private void StartRecovery(Recovery recovery)
2727

2828
private void RequestRecoveryPermit()
2929
{
30-
Extension.RecoveryPermitter().Tell(Akka.Persistence.RequestRecoveryPermit.Instance, Self);
30+
RecoveryPermitter.Tell(Akka.Persistence.RequestRecoveryPermit.Instance, Self);
3131
ChangeState(WaitingRecoveryPermit(Recovery));
3232
}
3333

@@ -46,6 +46,7 @@ public override void AroundPreStart()
4646
// Fail fast on missing plugins.
4747
var j = Journal;
4848
var s = SnapshotStore;
49+
var r = RecoveryPermitter;
4950
RequestRecoveryPermit();
5051
base.AroundPreStart();
5152
}

src/core/Akka.Persistence/Eventsourced.Recovery.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ private EventsourcedState Recovering(Receive recoveryBehavior, TimeSpan timeout)
270270
}
271271

272272
private void ReturnRecoveryPermit() =>
273-
Extension.RecoveryPermitter().Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);
273+
RecoveryPermitter.Tell(Akka.Persistence.ReturnRecoveryPermit.Instance, Self);
274274

275275
private void TransitToProcessingState()
276276
{

src/core/Akka.Persistence/Eventsourced.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public abstract partial class Eventsourced : ActorBase, IPersistentIdentity, IPe
8181
private readonly IStash _internalStash;
8282
private IActorRef _snapshotStore;
8383
private IActorRef _journal;
84+
private IActorRef _recoveryPermitter;
8485
private List<IPersistentEnvelope> _journalBatch = new();
8586
private bool _isWriteInProgress;
8687
private long _sequenceNr;
@@ -166,6 +167,8 @@ public IStash Stash
166167
/// </summary>
167168
public IActorRef Journal => _journal ??= Extension.JournalFor(JournalPluginId);
168169

170+
internal IActorRef RecoveryPermitter => _recoveryPermitter ??= Extension.RecoveryPermitterFor(JournalPluginId);
171+
169172
/// <summary>
170173
/// TBD
171174
/// </summary>

src/core/Akka.Persistence/Persistence.cs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
using System;
99
using System.Collections.Concurrent;
1010
using System.Linq;
11-
using System.Reflection;
1211
using System.Threading;
1312
using Akka.Actor;
1413
using Akka.Annotations;
@@ -21,18 +20,21 @@ namespace Akka.Persistence
2120
{
2221
internal struct PluginHolder
2322
{
24-
public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config)
23+
public PluginHolder(IActorRef @ref, EventAdapters adapters, Config config, IActorRef recoveryPermitter)
2524
{
2625
Ref = @ref;
2726
Adapters = adapters;
2827
Config = config;
28+
RecoveryPermitter = recoveryPermitter;
2929
}
3030

3131
public IActorRef Ref { get; }
3232

3333
public EventAdapters Adapters { get; }
3434

3535
public Config Config { get; }
36+
37+
public IActorRef RecoveryPermitter { get; }
3638
}
3739

3840
/// <summary>
@@ -50,7 +52,6 @@ public class PersistenceExtension : IExtension
5052
private readonly Lazy<string> _defaultJournalPluginId;
5153
private readonly Lazy<string> _defaultSnapshotPluginId;
5254
private readonly Lazy<IStashOverflowStrategy> _defaultInternalStashOverflowStrategy;
53-
private readonly Lazy<IActorRef> _recoveryPermitter;
5455

5556
private readonly ConcurrentDictionary<string, Lazy<PluginHolder>> _pluginExtensionIds = new();
5657

@@ -120,12 +121,6 @@ public PersistenceExtension(ExtendedActorSystem system)
120121
_log.Info("Auto-starting snapshot store `{0}`", id);
121122
SnapshotStoreFor(id);
122123
});
123-
124-
_recoveryPermitter = new Lazy<IActorRef>(() =>
125-
{
126-
var maxPermits = _config.GetInt("max-concurrent-recoveries", 0);
127-
return _system.SystemActorOf(Akka.Persistence.RecoveryPermitter.Props(maxPermits), "recoveryPermitter");
128-
});
129124
}
130125

131126
/// <summary>
@@ -152,9 +147,10 @@ public string PersistenceId(IActorRef actor)
152147
/// INTERNAL API: When starting many persistent actors at the same time the journal its data store is protected
153148
/// from being overloaded by limiting number of recoveries that can be in progress at the same time.
154149
/// </summary>
155-
internal IActorRef RecoveryPermitter()
150+
internal IActorRef RecoveryPermitterFor(string journalPluginId)
156151
{
157-
return _recoveryPermitter.Value;
152+
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
153+
return PluginHolderFor(configPath, JournalFallbackConfigPath).RecoveryPermitter;
158154
}
159155

160156
/// <summary>
@@ -270,6 +266,17 @@ private PluginHolder PluginHolderFor(string configPath, string fallbackPath)
270266
return pluginContainer.Value;
271267
}
272268

269+
private static IActorRef CreateRecoveryPermitter(ExtendedActorSystem system, string configPath, Config pluginConfig)
270+
{
271+
// backward compatibility
272+
// get the setting from the plugin path, if not found, default to the one defined in "akka.persistence"
273+
var maxPermits = pluginConfig.HasPath("max-concurrent-recoveries")
274+
? pluginConfig.GetInt("max-concurrent-recoveries")
275+
: system.Settings.Config.GetInt("akka.persistence.max-concurrent-recoveries");
276+
277+
return system.SystemActorOf(RecoveryPermitter.Props(maxPermits), $"recoveryPermitter-{configPath}");
278+
}
279+
273280
private static IActorRef CreatePlugin(ExtendedActorSystem system, string configPath, Config pluginConfig)
274281
{
275282
var pluginActorName = configPath;
@@ -303,8 +310,9 @@ private static PluginHolder NewPluginHolder(ExtendedActorSystem system, string c
303310
var config = system.Settings.Config.GetConfig(configPath).WithFallback(system.Settings.Config.GetConfig(fallbackPath));
304311
var plugin = CreatePlugin(system, configPath, config);
305312
var adapters = CreateAdapters(system, configPath);
313+
var recoveryPermitter = CreateRecoveryPermitter(system, configPath, config);
306314

307-
return new PluginHolder(plugin, adapters, config);
315+
return new PluginHolder(plugin, adapters, config, recoveryPermitter);
308316
}
309317
}
310318

src/core/Akka.Persistence/RecoveryPermitter.cs

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ internal sealed class ReturnRecoveryPermit
3232
private ReturnRecoveryPermit() { }
3333
}
3434

35+
internal sealed class GetMaxPermits
36+
{
37+
public static GetMaxPermits Instance { get; } = new();
38+
private GetMaxPermits() { }
39+
}
40+
3541
/// <summary>
3642
/// When starting many persistent actors at the same time the journal its data store is protected
3743
/// from being overloaded by limiting number of recoveries that can be in progress at the same time.
@@ -55,29 +61,36 @@ public RecoveryPermitter(int maxPermits)
5561

5662
protected override void OnReceive(object message)
5763
{
58-
if (message is RequestRecoveryPermit)
59-
{
60-
Context.Watch(Sender);
61-
if (_usedPermits >= MaxPermits)
62-
{
63-
if (pending.Count == 0)
64-
Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender);
65-
pending.AddLast(Sender);
66-
_maxPendingStats = Math.Max(_maxPendingStats, pending.Count);
67-
}
68-
else
69-
{
70-
RecoveryPermitGranted(Sender);
71-
}
72-
}
73-
else if (message is ReturnRecoveryPermit)
74-
{
75-
ReturnRecoveryPermit(Sender);
76-
}
77-
else if (message is Terminated terminated && !pending.Remove(terminated.ActorRef))
64+
switch (message)
7865
{
79-
// pre-mature termination should be rare
80-
ReturnRecoveryPermit(terminated.ActorRef);
66+
case RequestRecoveryPermit:
67+
Context.Watch(Sender);
68+
if (_usedPermits >= MaxPermits)
69+
{
70+
if (pending.Count == 0)
71+
Log.Debug("Exceeded max-concurrent-recoveries [{0}]. First pending {1}", MaxPermits, Sender);
72+
pending.AddLast(Sender);
73+
_maxPendingStats = Math.Max(_maxPendingStats, pending.Count);
74+
}
75+
else
76+
{
77+
RecoveryPermitGranted(Sender);
78+
}
79+
80+
break;
81+
82+
case Akka.Persistence.ReturnRecoveryPermit:
83+
ReturnRecoveryPermit(Sender);
84+
break;
85+
86+
case Terminated terminated when !pending.Remove(terminated.ActorRef):
87+
// pre-mature termination should be rare
88+
ReturnRecoveryPermit(terminated.ActorRef);
89+
break;
90+
91+
case GetMaxPermits:
92+
Sender.Tell(MaxPermits);
93+
break;
8194
}
8295
}
8396

0 commit comments

Comments
 (0)