Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/core/Akka.Streams/Attributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using Akka.Event;
Expand Down Expand Up @@ -325,8 +326,11 @@ public IEnumerable<TAttr> GetAttributeList<TAttr>() where TAttr : IAttribute
/// <typeparam name="TAttr">TBD</typeparam>
/// <param name="defaultIfNotFound">TBD</param>
/// <returns>TBD</returns>
public TAttr GetAttribute<TAttr>(TAttr defaultIfNotFound) where TAttr : class, IAttribute
#nullable enable
[return: NotNullIfNotNull("defaultIfNotFound")]
public TAttr? GetAttribute<TAttr>(TAttr? defaultIfNotFound) where TAttr : class, IAttribute
=> GetAttribute<TAttr>() ?? defaultIfNotFound;
#nullable restore

/// <summary>
/// Get the first (least specific) attribute of a given type or subtype thereof.
Expand Down
18 changes: 9 additions & 9 deletions src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Event;
using Akka.Streams.Actors;

#nullable enable
namespace Akka.Streams.Implementation
{
/// <summary>
Expand Down Expand Up @@ -40,7 +41,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
/// <summary>
/// TBD
/// </summary>
protected readonly IBuffer<T> Buffer;
protected readonly IBuffer<T>? Buffer;

/// <summary>
/// TBD
Expand All @@ -50,7 +51,6 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
/// TBD
/// </summary>
public readonly OverflowStrategy OverflowStrategy;
private ILoggingAdapter _log;

/// <summary>
/// TBD
Expand All @@ -63,13 +63,13 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in
{
BufferSize = bufferSize;
OverflowStrategy = overflowStrategy;
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Buffer = bufferSize > 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, should we throw an exception here if the bufferSize is less than 0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should, I just tried to preserve the old behavior

}

/// <summary>
/// TBD
/// </summary>
protected ILoggingAdapter Log => _log ??= Context.GetLogger();
protected ILoggingAdapter Log { get; } = Context.GetLogger();

/// <summary>
/// TBD
Expand All @@ -90,7 +90,7 @@ protected bool DefaultReceive(object message)
Context.Stop(Self);
else if (message is Status.Success)
{
if (BufferSize == 0 || Buffer.IsEmpty)
if (Buffer is null || Buffer.IsEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

OnCompleteThenStop(); // will complete the stream successfully
else
Context.Become(DrainBufferThenComplete);
Expand All @@ -112,7 +112,7 @@ protected virtual bool RequestElement(object message)
if (message is Request)
{
// totalDemand is tracked by base
if (BufferSize != 0)
if (Buffer is not null)
while (TotalDemand > 0L && !Buffer.IsEmpty)
OnNext(Buffer.Dequeue());

Expand All @@ -133,7 +133,7 @@ protected virtual bool ReceiveElement(T message)
{
if (TotalDemand > 0L)
OnNext(message);
else if (BufferSize == 0)
else if (Buffer is null)
Log.Debug("Dropping element because there is no downstream demand: [{0}]", message);
else if (!Buffer.IsFull)
Buffer.Enqueue(message);
Expand Down Expand Up @@ -189,7 +189,7 @@ private bool DrainBufferThenComplete(object message)
// even if previously valid completion was requested via Status.Success
OnErrorThenStop(failure.Cause);
}
else if (message is Request)
else if (message is Request && Buffer is not null)
{
// totalDemand is tracked by base
while (TotalDemand > 0L && !Buffer.IsEmpty)
Expand All @@ -201,7 +201,7 @@ private bool DrainBufferThenComplete(object message)
else if (IsActive)
Log.Debug(
"Dropping element because Status.Success received already, only draining already buffered elements: [{0}] (pending: [{1}])",
message, Buffer.Used);
message, Buffer?.Used ?? 0);
else
return false;

Expand Down
32 changes: 24 additions & 8 deletions src/core/Akka.Streams/Implementation/Buffers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Runtime.CompilerServices;
using Akka.Annotations;

#nullable enable
namespace Akka.Streams.Implementation
{
/// <summary>
Expand Down Expand Up @@ -54,7 +55,7 @@ internal interface IBuffer<T>
/// TBD
/// </summary>
/// <returns>TBD</returns>
T Peek();
T? Peek();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original behavior is that Peek() can return null

/// <summary>
/// TBD
/// </summary>
Expand Down Expand Up @@ -161,7 +162,7 @@ internal abstract class FixedSizeBuffer<T> : IBuffer<T>
/// </summary>
protected long WriteIndex;

private readonly T[] _buffer;
private readonly T?[] _buffer;

/// <summary>
/// TBD
Expand Down Expand Up @@ -222,14 +223,20 @@ public void Enqueue(T element)
/// <param name="element">TBD</param>
/// <param name="maintenance">TBD</param>
[MethodImpl(MethodImplOptions.NoInlining)]
public void Put(long index, T element, bool maintenance) => _buffer[ToOffset(index, maintenance)] = element;
public void Put(long index, T? element, bool maintenance) => _buffer[ToOffset(index, maintenance)] = element;

/// <summary>
/// TBD
/// </summary>
/// <param name="index">TBD</param>
/// <returns>TBD</returns>
public T Get(long index) => _buffer[ToOffset(index, false)];
public T Get(long index)
{
var elem = _buffer[ToOffset(index, false)];
if(elem == null)
throw new IndexOutOfRangeException($"Invalid buffer element at index {index}, element is null");
return elem;
}

/// <summary>
/// TBD
Expand Down Expand Up @@ -354,7 +361,7 @@ private sealed class FixedQueue : IBuffer<T>
private const int Size = 16;
private const int Mask = 15;

private readonly T[] _queue = new T[Size];
private readonly T?[] _queue = new T[Size];
private readonly BoundedBuffer<T> _boundedBuffer;
private int _head;
private int _tail;
Expand All @@ -373,6 +380,9 @@ public FixedQueue(BoundedBuffer<T> boundedBuffer)

public void Enqueue(T element)
{
if(element is null)
throw new ArgumentNullException(nameof(element));

if (_tail - _head == Size)
{
var queue = new DynamicQueue(Capacity);
Expand All @@ -392,12 +402,15 @@ public T Dequeue()
{
var pos = _head & Mask;
var ret = _queue[pos];
if(ret is null)
throw new IndexOutOfRangeException();

_queue[pos] = default(T);
_head += 1;
return ret;
}

public T Peek() => _tail == _head ? default(T) : _queue[_head & Mask];
public T? Peek() => _tail == _head ? default(T) : _queue[_head & Mask];

public void Clear()
{
Expand Down Expand Up @@ -431,12 +444,15 @@ public DynamicQueue(int capacity)

public T Dequeue()
{
if(First is null)
throw new IndexOutOfRangeException();

var result = First.Value;
RemoveFirst();
return result;
}

public T Peek() => First.Value;
public T? Peek() => First is null ? default : First.Value;

public void DropHead() => RemoveFirst();

Expand Down Expand Up @@ -498,7 +514,7 @@ public BoundedBuffer(int capacity)
/// TBD
/// </summary>
/// <returns>TBD</returns>
public T Peek() => _q.Peek();
public T? Peek() => _q.Peek();

/// <summary>
/// TBD
Expand Down
Loading
Loading