-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Akka.Peristence.Query.InMemory: properly unwrap Tagged messages in all queries
#7548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f2f0e7a
ed6786b
4148368
632fc9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| // ----------------------------------------------------------------------- | ||
| // <copyright file="MemoryQueryJournalHelpers.cs" company="Akka.NET Project"> | ||
| // Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com> | ||
| // Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
| // </copyright> | ||
| // ----------------------------------------------------------------------- | ||
|
|
||
| using System.Linq; | ||
| using Akka.Persistence.Journal; | ||
|
|
||
| namespace Akka.Persistence.Query.InMemory; | ||
|
|
||
| /// <summary> | ||
| /// INTERNAL API | ||
| /// </summary> | ||
| internal static class MemoryQueryJournalHelpers | ||
| { | ||
| public static EventEnvelope PrepareEnventEnvelope(IPersistentRepresentation message, Offset? offsetHint = null) | ||
| { | ||
| // Bugfix for https://github.com/akkadotnet/akka.net/issues/7528 | ||
| var payload = message.Payload is Tagged t ? t.Payload : message.Payload; | ||
| var tags = message.Payload is Tagged tagged ? tagged.Tags.ToArray() : []; | ||
|
|
||
| return new EventEnvelope( | ||
| offset: offsetHint ?? new Sequence(message.SequenceNr), | ||
| persistenceId: message.PersistenceId, | ||
| sequenceNr: message.SequenceNr, | ||
| @event: payload, | ||
| timestamp: message.Timestamp, | ||
| tags: tags); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,7 +70,7 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten | |
| var highest = HighestSequenceNr(persistenceId); | ||
| if (highest != 0L && max != 0L) | ||
| Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max).ForEach(recoveryCallback); | ||
| return Task.FromResult(new object()); | ||
| return Task.CompletedTask; | ||
| } | ||
|
|
||
| protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) | ||
|
|
@@ -81,7 +81,7 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque | |
| _meta.AddOrUpdate(persistenceId, highestSeqNr, (_, _) => highestSeqNr); | ||
| for (var snr = 1L; snr <= toSeqNr; snr++) | ||
| Delete(persistenceId, snr); | ||
| return Task.FromResult(new object()); | ||
| return Task.CompletedTask; | ||
| } | ||
|
|
||
| protected override bool ReceivePluginInternal(object message) | ||
|
|
@@ -127,8 +127,7 @@ private Task<int> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) | |
| .Skip(replay.FromOffset) | ||
| .Take(replay.ToOffset)) | ||
| { | ||
| var payload = (Tagged)persistence.Payload; | ||
| replay.ReplyTo.Tell(new ReplayedTaggedMessage(persistence.WithPayload(payload.Payload), replay.Tag, replay.FromOffset + index), ActorRefs.NoSender); | ||
| replay.ReplyTo.Tell(new ReplayedTaggedMessage(persistence, replay.Tag, replay.FromOffset + index), ActorRefs.NoSender); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason why tagged message queries worked is we unpacked We preserve the |
||
| index++; | ||
| } | ||
|
|
||
|
|
@@ -243,14 +242,17 @@ public sealed class ReplayedTaggedMessage : INoSerializationVerificationNeeded, | |
|
|
||
| public readonly IPersistentRepresentation Persistent; | ||
|
|
||
| [Obsolete("If there are tags, they will be stored in the PersistentRepresentation")] | ||
| public readonly string Tag; | ||
|
|
||
| public readonly int Offset; | ||
|
|
||
| public ReplayedTaggedMessage(IPersistentRepresentation persistent, string tag, int offset) | ||
| { | ||
| Persistent = persistent; | ||
| #pragma warning disable CS0618 // Type or member is obsolete | ||
| Tag = tag; | ||
| #pragma warning restore CS0618 // Type or member is obsolete | ||
| Offset = offset; | ||
| } | ||
| } | ||
|
|
@@ -388,29 +390,17 @@ public override bool Equals(object obj) | |
| #endregion | ||
|
|
||
| #region IMemoryMessages implementation | ||
|
|
||
| /// <summary> | ||
| /// TBD | ||
| /// </summary> | ||
| /// <param name="persistent">TBD</param> | ||
| /// <returns>TBD</returns> | ||
|
|
||
| public Messages Add(IPersistentRepresentation persistent) | ||
| { | ||
| var list = Messages.GetOrAdd(persistent.PersistenceId, _ => new LinkedList<IPersistentRepresentation>()); | ||
| list.AddLast(persistent); | ||
| return Messages; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// TBD | ||
| /// </summary> | ||
| /// <param name="pid">TBD</param> | ||
| /// <param name="seqNr">TBD</param> | ||
| /// <param name="updater">TBD</param> | ||
| /// <returns>TBD</returns> | ||
|
|
||
| public Messages Update(string pid, long seqNr, Func<IPersistentRepresentation, IPersistentRepresentation> updater) | ||
| { | ||
| if (Messages.TryGetValue(pid, out LinkedList<IPersistentRepresentation> persistents)) | ||
| if (Messages.TryGetValue(pid, out var persistents)) | ||
| { | ||
| var node = persistents.First; | ||
| while (node != null) | ||
|
|
@@ -424,16 +414,10 @@ public Messages Update(string pid, long seqNr, Func<IPersistentRepresentation, I | |
|
|
||
| return Messages; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// TBD | ||
| /// </summary> | ||
| /// <param name="pid">TBD</param> | ||
| /// <param name="seqNr">TBD</param> | ||
| /// <returns>TBD</returns> | ||
|
|
||
| public Messages Delete(string pid, long seqNr) | ||
| { | ||
| if (Messages.TryGetValue(pid, out LinkedList<IPersistentRepresentation> persistents)) | ||
| if (Messages.TryGetValue(pid, out var persistents)) | ||
| { | ||
| var node = persistents.First; | ||
| while (node != null) | ||
|
|
@@ -447,35 +431,22 @@ public Messages Delete(string pid, long seqNr) | |
|
|
||
| return Messages; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// TBD | ||
| /// </summary> | ||
| /// <param name="pid">TBD</param> | ||
| /// <param name="fromSeqNr">TBD</param> | ||
| /// <param name="toSeqNr">TBD</param> | ||
| /// <param name="max">TBD</param> | ||
| /// <returns>TBD</returns> | ||
|
|
||
| public IEnumerable<IPersistentRepresentation> Read(string pid, long fromSeqNr, long toSeqNr, long max) | ||
| { | ||
| if (Messages.TryGetValue(pid, out LinkedList<IPersistentRepresentation> persistents)) | ||
| if (Messages.TryGetValue(pid, out var persistents)) | ||
| { | ||
| return persistents | ||
| .Where(x => x.SequenceNr >= fromSeqNr && x.SequenceNr <= toSeqNr) | ||
| .Take(max > int.MaxValue ? int.MaxValue : (int)max); | ||
| } | ||
|
|
||
| return Enumerable.Empty<IPersistentRepresentation>(); | ||
| return []; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// TBD | ||
| /// </summary> | ||
| /// <param name="pid">TBD</param> | ||
| /// <returns>TBD</returns> | ||
|
|
||
| public long HighestSequenceNr(string pid) | ||
| { | ||
| if (Messages.TryGetValue(pid, out LinkedList<IPersistentRepresentation> persistents)) | ||
| if (Messages.TryGetValue(pid, out var persistents)) | ||
| { | ||
| var last = persistents.LastOrDefault(); | ||
| return last?.SequenceNr ?? 0L; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a unified handler for inspecting
IPersistentRepresentations and then doing "the right thing" on the other end