Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@

namespace Microsoft.Agents.Builder
{
public enum StreamingResponseResult
{
Success,
NotStarted,
AlreadyEnded,
UserCancelled,
Timeout,
Error
};

public interface IStreamingResponse
{
/// <summary>
Expand Down Expand Up @@ -94,7 +104,7 @@ public interface IStreamingResponse
/// </remarks>
/// <returns>A Task representing the async operation</returns>
/// <exception cref="System.InvalidOperationException">Throws if the stream has already ended.</exception>
Task EndStreamAsync(CancellationToken cancellationToken = default);
Task<StreamingResponseResult> EndStreamAsync(CancellationToken cancellationToken = default);

bool IsStreamStarted();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ internal class StreamingResponse : IStreamingResponse
private bool _messageUpdated = false;
private bool _isTeamsChannel;
private bool _canceled;
private bool _userCanceled;

// Queue for outgoing activities
private readonly List<Func<IActivity>> _queue = [];
Expand Down Expand Up @@ -289,17 +290,16 @@ public void QueueTextChunk(string text)
/// Since the messages are sent on an interval, this call will block until all have been sent
/// before sending the final Message.
/// </remarks>
/// <returns>A Task representing the async operation</returns>
/// <exception cref="System.InvalidOperationException">Throws if the stream has already ended.</exception>
public async Task EndStreamAsync(CancellationToken cancellationToken = default)
/// <returns>StreamingResponseResult with the result of the streaming response.</returns>
public async Task<StreamingResponseResult> EndStreamAsync(CancellationToken cancellationToken = default)
{
if (!IsStreamingChannel)
{
lock (this)
{
if (_ended)
{
throw Core.Errors.ExceptionHelper.GenerateException<InvalidOperationException>(ErrorHelper.StreamingResponseEnded, null);
return StreamingResponseResult.AlreadyEnded;
}

_ended = true;
Expand All @@ -310,41 +310,57 @@ public async Task EndStreamAsync(CancellationToken cancellationToken = default)
{
await _context.SendActivityAsync(CreateFinalMessage(), cancellationToken).ConfigureAwait(false);
}

return StreamingResponseResult.Success;
}
else
{
lock (this)
{
if (_ended)
{
return;
return StreamingResponseResult.AlreadyEnded;
}

_ended = true;

if (!IsStreamStarted() || _canceled)
if (_canceled)
{
return _userCanceled ? StreamingResponseResult.UserCancelled : StreamingResponseResult.Error;
}

if (!IsStreamStarted())
{
return;
return StreamingResponseResult.NotStarted;
}
}

if (IsStreamStarted())
StreamingResponseResult result = StreamingResponseResult.Success;

// Wait for queue items to be sent per Interval
try
{
// Wait for queue items to be sent per Interval
try
{
_queueEmpty.WaitOne(EndStreamTimeout);
if (!_queueEmpty.WaitOne(EndStreamTimeout))
{
result = StreamingResponseResult.Timeout;
}
catch (AbandonedMutexException)
{
StopStream();

if (_canceled)
{
return _userCanceled ? StreamingResponseResult.UserCancelled : StreamingResponseResult.Error;
}
}
catch (AbandonedMutexException)
{
StopStream();
}

if (UpdatesSent() > 0 || FinalMessage != null)
{
await SendActivityAsync(CreateFinalMessage(), cancellationToken).ConfigureAwait(false);
}

return result;
}
}

Expand Down Expand Up @@ -417,6 +433,7 @@ public async Task ResetAsync(CancellationToken cancellationToken = default)
_nextSequence = 1;
StreamId = null;
_canceled = false;
_userCanceled = false;
}
}

Expand Down Expand Up @@ -612,26 +629,38 @@ private async Task SendActivityAsync(IActivity activity, CancellationToken cance
bool CanceledStream = true;
if (ex is ErrorResponseException errorResponse)
{
if (errorResponse.Body != null && !TeamsStreamCancelled.Equals(errorResponse.Body.Error.Code, StringComparison.OrdinalIgnoreCase))
{
_context?.Adapter?.Logger?.LogWarning(
"Exception during StreamingResponse: {ExceptionMessage} - {ErrorMessage}",
ex.Message,
errorResponse.Body.Error.Message);

System.Diagnostics.Trace.WriteLine($"Exception during StreamingResponse: {ex.Message} - {errorResponse.Body.Error.Message}");
// User canceled?
if (TeamsStreamCancelled.Equals(errorResponse?.Body?.Error?.Code, StringComparison.OrdinalIgnoreCase))
{
_context?.Adapter?.Logger?.LogWarning("User canceled stream on the client side.");
System.Diagnostics.Trace.WriteLine("User canceled stream on the client side.");

_userCanceled = true;
}
// Stream not allowed?
#pragma warning disable CA1862 // Use the 'StringComparison' method overloads to perform case-insensitive string comparisons - this is to support older .NET versions
if (errorResponse.Body != null &&
BadArgument.Equals(errorResponse.Body.Error.Code, StringComparison.OrdinalIgnoreCase) &&
errorResponse.Body.Error.Message.ToLower().Contains(TeamsStreamNotAllowed))
else if (BadArgument.Equals(errorResponse?.Body?.Error?.Code, StringComparison.OrdinalIgnoreCase) &&
(bool)errorResponse?.Body?.Error?.Message.ToLower().Contains(TeamsStreamNotAllowed))
{
_context?.Adapter?.Logger?.LogWarning("Interaction Context does not support StreamingResponse, StreamingResponse has been disabled for this turn");
System.Diagnostics.Trace.WriteLine("Interaction Context does not support StreamingResponse, StreamingResponse has been disabled for this turn");

IsStreamingChannel = false; // Disabled Streaming for this channel / interaction as teams will not accept it at this time.
CanceledStream = false;
CanceledStream = false;
}
#pragma warning restore CA1862 // Use the 'StringComparison' method overloads to perform case-insensitive string comparisons

else
{
var errorMessage = errorResponse?.Body?.Error?.Message ?? "None";

_context?.Adapter?.Logger?.LogWarning(
"Exception during StreamingResponse: {ExceptionMessage} - {ErrorMessage}",
ex.Message,
errorMessage);

System.Diagnostics.Trace.WriteLine($"Exception during StreamingResponse: {ex.Message} - {errorMessage}");
}

if (CanceledStream)
{
_context?.Adapter?.Logger?.LogWarning("User canceled stream on the client side.");
Expand All @@ -643,6 +672,7 @@ private async Task SendActivityAsync(IActivity activity, CancellationToken cance
{
StopStream();
_canceled = CanceledStream;
_queueEmpty.Set();
}
}
}
Expand Down
Loading
Loading