Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Event;
using Akka.Persistence.Journal;
using Akka.Streams.Actors;
using static Akka.Persistence.Query.InMemory.MemoryQueryJournalHelpers;

namespace Akka.Persistence.Query.InMemory
{
Expand Down Expand Up @@ -108,14 +109,9 @@ protected bool Replaying( object message)
if (replayed.Offset > ToOffset)
return true;

// NOTES: tags is empty because tags are not retrieved from the database query (as of this writing)
Buffer.Add(new EventEnvelope(
offset: new Sequence(replayed.Offset),
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
@event: replayed.Persistent.Payload,
timestamp: replayed.Persistent.Timestamp,
tags: Array.Empty<string>()));
var e = PrepareEnventEnvelope(replayed.Persistent, new Sequence(replayed.Offset));

Buffer.Add(e);

CurrentOffset = replayed.Offset + 1;
Buffer.DeliverBuffer(TotalDemand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using Akka.Event;
using Akka.Persistence.Journal;
using Akka.Streams.Actors;
using static Akka.Persistence.Query.InMemory.MemoryQueryJournalHelpers;

namespace Akka.Persistence.Query.InMemory
{
Expand Down Expand Up @@ -36,7 +40,7 @@ internal abstract class AbstractEventsByPersistenceIdPublisher : ActorPublisher<
{
private ILoggingAdapter _log;

protected DeliveryBuffer<EventEnvelope> Buffer;
protected readonly DeliveryBuffer<EventEnvelope> Buffer;
protected readonly IActorRef JournalRef;
protected long CurrentSequenceNr;

Expand Down Expand Up @@ -120,16 +124,11 @@ protected Receive Replaying(int limit)
switch (message)
{
case ReplayedMessage replayed:
var seqNr = replayed.Persistent.SequenceNr;
// NOTES: tags is empty because tags are not retrieved from the database query (as of this writing)
Buffer.Add(new EventEnvelope(
offset: new Sequence(seqNr),
persistenceId: PersistenceId,
sequenceNr: seqNr,
@event: replayed.Persistent.Payload,
timestamp: replayed.Persistent.Timestamp,
tags: Array.Empty<string>()));
CurrentSequenceNr = seqNr + 1;
var e = PrepareEnventEnvelope(replayed.Persistent);

Buffer.Add(e);

CurrentSequenceNr = e.SequenceNr + 1;
Buffer.DeliverBuffer(TotalDemand);
return true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Event;
using Akka.Persistence.Journal;
using Akka.Streams.Actors;
using static Akka.Persistence.Query.InMemory.MemoryQueryJournalHelpers;

namespace Akka.Persistence.Query.InMemory
{
Expand Down Expand Up @@ -115,14 +116,9 @@ protected Receive Replaying(int limit)
switch (message)
{
case MemoryJournal.ReplayedTaggedMessage replayed:
Buffer.Add(new EventEnvelope(
offset: new Sequence(replayed.Offset),
persistenceId: replayed.Persistent.PersistenceId,
sequenceNr: replayed.Persistent.SequenceNr,
@event: replayed.Persistent.Payload,
timestamp: replayed.Persistent.Timestamp,
tags: new [] { replayed.Tag }));

var e = PrepareEnventEnvelope(replayed.Persistent, new Sequence(replayed.Offset));

Buffer.Add(e);
CurrentOffset = replayed.Offset + 1;
Buffer.DeliverBuffer(TotalDemand);
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// -----------------------------------------------------------------------
// <copyright file="MemoryQueryJournalHelpers.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Linq;
using Akka.Persistence.Journal;

namespace Akka.Persistence.Query.InMemory;

/// <summary>
/// INTERNAL API
/// </summary>
internal static class MemoryQueryJournalHelpers
{
public static EventEnvelope PrepareEnventEnvelope(IPersistentRepresentation message, Offset? offsetHint = null)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a unified handler for inspecting IPersistentRepresentations and then doing "the right thing" on the other end

{
// Bugfix for https://github.com/akkadotnet/akka.net/issues/7528
var payload = message.Payload is Tagged t ? t.Payload : message.Payload;
var tags = message.Payload is Tagged tagged ? tagged.Tags.ToArray() : [];

return new EventEnvelope(
offset: offsetHint ?? new Sequence(message.SequenceNr),
persistenceId: message.PersistenceId,
sequenceNr: message.SequenceNr,
@event: payload,
timestamp: message.Timestamp,
tags: tags);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ namespace Akka.Persistence.Journal
{
public readonly int Offset;
public readonly Akka.Persistence.IPersistentRepresentation Persistent;
[System.ObsoleteAttribute("If there are tags, they will be stored in the PersistentRepresentation")]
public readonly string Tag;
public ReplayedTaggedMessage(Akka.Persistence.IPersistentRepresentation persistent, string tag, int offset) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ namespace Akka.Persistence.Journal
{
public readonly int Offset;
public readonly Akka.Persistence.IPersistentRepresentation Persistent;
[System.ObsoleteAttribute("If there are tags, they will be stored in the PersistentRepresentation")]
public readonly string Tag;
public ReplayedTaggedMessage(Akka.Persistence.IPersistentRepresentation persistent, string tag, int offset) { }
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence.Query/EventEnvelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public EventEnvelope(Offset offset, string persistenceId, long sequenceNr, objec
SequenceNr = sequenceNr;
Event = @event;
Timestamp = timestamp;
Tags = tags ?? Array.Empty<string>();
Tags = tags ?? [];
}

public Offset Offset { get; }
Expand Down
61 changes: 16 additions & 45 deletions src/core/Akka.Persistence/Journal/MemoryJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public override Task ReplayMessagesAsync(IActorContext context, string persisten
var highest = HighestSequenceNr(persistenceId);
if (highest != 0L && max != 0L)
Read(persistenceId, fromSequenceNr, Math.Min(toSequenceNr, highest), max).ForEach(recoveryCallback);
return Task.FromResult(new object());
return Task.CompletedTask;
}

protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
Expand All @@ -81,7 +81,7 @@ protected override Task DeleteMessagesToAsync(string persistenceId, long toSeque
_meta.AddOrUpdate(persistenceId, highestSeqNr, (_, _) => highestSeqNr);
for (var snr = 1L; snr <= toSeqNr; snr++)
Delete(persistenceId, snr);
return Task.FromResult(new object());
return Task.CompletedTask;
}

protected override bool ReceivePluginInternal(object message)
Expand Down Expand Up @@ -127,8 +127,7 @@ private Task<int> ReplayTaggedMessagesAsync(ReplayTaggedMessages replay)
.Skip(replay.FromOffset)
.Take(replay.ToOffset))
{
var payload = (Tagged)persistence.Payload;
replay.ReplyTo.Tell(new ReplayedTaggedMessage(persistence.WithPayload(payload.Payload), replay.Tag, replay.FromOffset + index), ActorRefs.NoSender);
replay.ReplyTo.Tell(new ReplayedTaggedMessage(persistence, replay.Tag, replay.FromOffset + index), ActorRefs.NoSender);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why tagged message queries worked is we unpacked Tagged messages for those specifically here - we've moved that logic instead to all of the abstract persistence query implementations for the InMemory journal.

We preserve the Tagged bits in the messages here because this design also caused us to lose data about all of the tags applicable to this event, not just the ones we asked for.

index++;
}

Expand Down Expand Up @@ -243,14 +242,17 @@ public sealed class ReplayedTaggedMessage : INoSerializationVerificationNeeded,

public readonly IPersistentRepresentation Persistent;

[Obsolete("If there are tags, they will be stored in the PersistentRepresentation")]
public readonly string Tag;

public readonly int Offset;

public ReplayedTaggedMessage(IPersistentRepresentation persistent, string tag, int offset)
{
Persistent = persistent;
#pragma warning disable CS0618 // Type or member is obsolete
Tag = tag;
#pragma warning restore CS0618 // Type or member is obsolete
Offset = offset;
}
}
Expand Down Expand Up @@ -388,29 +390,17 @@ public override bool Equals(object obj)
#endregion

#region IMemoryMessages implementation

/// <summary>
/// TBD
/// </summary>
/// <param name="persistent">TBD</param>
/// <returns>TBD</returns>

public Messages Add(IPersistentRepresentation persistent)
{
var list = Messages.GetOrAdd(persistent.PersistenceId, _ => new LinkedList<IPersistentRepresentation>());
list.AddLast(persistent);
return Messages;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="pid">TBD</param>
/// <param name="seqNr">TBD</param>
/// <param name="updater">TBD</param>
/// <returns>TBD</returns>

public Messages Update(string pid, long seqNr, Func<IPersistentRepresentation, IPersistentRepresentation> updater)
{
if (Messages.TryGetValue(pid, out LinkedList<IPersistentRepresentation> persistents))
if (Messages.TryGetValue(pid, out var persistents))
{
var node = persistents.First;
while (node != null)
Expand All @@ -424,16 +414,10 @@ public Messages Update(string pid, long seqNr, Func<IPersistentRepresentation, I

return Messages;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="pid">TBD</param>
/// <param name="seqNr">TBD</param>
/// <returns>TBD</returns>

public Messages Delete(string pid, long seqNr)
{
if (Messages.TryGetValue(pid, out LinkedList<IPersistentRepresentation> persistents))
if (Messages.TryGetValue(pid, out var persistents))
{
var node = persistents.First;
while (node != null)
Expand All @@ -447,35 +431,22 @@ public Messages Delete(string pid, long seqNr)

return Messages;
}

/// <summary>
/// TBD
/// </summary>
/// <param name="pid">TBD</param>
/// <param name="fromSeqNr">TBD</param>
/// <param name="toSeqNr">TBD</param>
/// <param name="max">TBD</param>
/// <returns>TBD</returns>

public IEnumerable<IPersistentRepresentation> Read(string pid, long fromSeqNr, long toSeqNr, long max)
{
if (Messages.TryGetValue(pid, out LinkedList<IPersistentRepresentation> persistents))
if (Messages.TryGetValue(pid, out var persistents))
{
return persistents
.Where(x => x.SequenceNr >= fromSeqNr && x.SequenceNr <= toSeqNr)
.Take(max > int.MaxValue ? int.MaxValue : (int)max);
}

return Enumerable.Empty<IPersistentRepresentation>();
return [];
}

/// <summary>
/// TBD
/// </summary>
/// <param name="pid">TBD</param>
/// <returns>TBD</returns>

public long HighestSequenceNr(string pid)
{
if (Messages.TryGetValue(pid, out LinkedList<IPersistentRepresentation> persistents))
if (Messages.TryGetValue(pid, out var persistents))
{
var last = persistents.LastOrDefault();
return last?.SequenceNr ?? 0L;
Expand Down
Loading