-
Couldn't load subscription status.
- Fork 1.1k
[Query] Add non-generic ReadJournalFor API
#7679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,8 @@ namespace Akka.Persistence.Query | |
| { | ||
| public sealed class PersistenceQuery : IExtension | ||
| { | ||
| private static readonly Type ReadJournalType = typeof(IReadJournal); | ||
|
|
||
| private readonly ExtendedActorSystem _system; | ||
| private readonly ConcurrentDictionary<string, IReadJournal> _readJournalPluginExtensionIds = new(); | ||
| private ILoggingAdapter _log; | ||
|
|
@@ -34,18 +36,24 @@ public PersistenceQuery(ExtendedActorSystem system) | |
| } | ||
|
|
||
| public TJournal ReadJournalFor<TJournal>(string readJournalPluginId) where TJournal : IReadJournal | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Old API is left intact |
||
| => (TJournal) ReadJournalFor(typeof(TJournal), readJournalPluginId); | ||
|
|
||
| public IReadJournal ReadJournalFor(Type readJournalType, string readJournalPluginId) | ||
| { | ||
| if(!ReadJournalType.IsAssignableFrom(readJournalType)) | ||
| throw new ArgumentException("Must implement IReadJournal interface", nameof(readJournalType)); | ||
|
Comment on lines
+43
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added type checking code to make sure that |
||
|
|
||
| if(_readJournalPluginExtensionIds.TryGetValue(readJournalPluginId, out var plugin)) | ||
| return (TJournal)plugin; | ||
| return plugin; | ||
|
|
||
| lock (_lock) | ||
| { | ||
| if (_readJournalPluginExtensionIds.TryGetValue(readJournalPluginId, out plugin)) | ||
| return (TJournal)plugin; | ||
| return plugin; | ||
|
|
||
| plugin = CreatePlugin(readJournalPluginId, GetDefaultConfig<TJournal>()).GetReadJournal(); | ||
| plugin = CreatePlugin(readJournalPluginId, GetDefaultConfig(readJournalType)).GetReadJournal(); | ||
| _readJournalPluginExtensionIds[readJournalPluginId] = plugin; | ||
| return (TJournal)plugin; | ||
| return plugin; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -79,8 +87,11 @@ private IReadJournalProvider CreateType(Type pluginType, object[] parameters) | |
| } | ||
|
|
||
| public static Config GetDefaultConfig<TJournal>() | ||
| => GetDefaultConfig(typeof(TJournal)); | ||
|
|
||
| public static Config GetDefaultConfig(Type journalType) | ||
| { | ||
| var defaultConfigMethod = typeof(TJournal).GetMethod("DefaultConfiguration", BindingFlags.Public | BindingFlags.Static); | ||
| var defaultConfigMethod = journalType.GetMethod("DefaultConfiguration", BindingFlags.Public | BindingFlags.Static); | ||
| return defaultConfigMethod?.Invoke(null, null) as Config; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type cache of
IReadJournal