Skip to content

Conversation

@Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented Feb 5, 2025

Changes

  • Implement nullability in Akka.Streams.Implementation.Buffer
  • Propagate and implement nullability to stream classes that uses Buffer:
    • ActorRefSourceActor
    • Buffer
    • SelectAsync
    • SelectAsyncUnordered
    • Delay
    • FlattenMerge
    • SourceRefStageImpl
    • QueueSink
    • QueueSource

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review

}
}

if (_terminating && _buffer.IsEmpty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actual bug, _buffer can be null if _stage._maxBuffer is less than 1. Moved the code to inside the previous if block

/// </summary>
/// <returns>TBD</returns>
T Peek();
T? Peek();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original behavior is that Peek() can return null

@Aaronontheweb Aaronontheweb added this to the 1.5.38 milestone Feb 5, 2025
Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not done reviewing yet but left a comment

else if (message is Status.Success)
{
if (BufferSize == 0 || Buffer.IsEmpty)
if (Buffer is null || Buffer.IsEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

BufferSize = bufferSize;
OverflowStrategy = overflowStrategy;
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Buffer = bufferSize > 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, should we throw an exception here if the bufferSize is less than 0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should, I just tried to preserve the old behavior

@Aaronontheweb
Copy link
Member

Another thing I'm wondering - is there a compelling reason to have a null buffer at all?

@Arkatufus
Copy link
Contributor Author

In QueueSource case, maybe, though I don't know what the use case is and what the thought behind it, its a 1 to 1 behavior port from scala Akka.

@Arkatufus
Copy link
Contributor Author

Though for QueueSource, there is a difference in scala Akka, in their version, _pendingOffer is another buffer, so you have 2 buffers before the stage backpressure.

@Aaronontheweb Aaronontheweb modified the milestones: 1.5.38, 1.5.39 Feb 17, 2025
@Aaronontheweb Aaronontheweb modified the milestones: 1.5.39, 1.5.40 Mar 14, 2025
@Aaronontheweb
Copy link
Member

After thinking on this, yep, there are definitely scenarios where a user might want a null result inside the buffer. You were right @Arkatufus

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) March 18, 2025 13:45
This was referenced Oct 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants