diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/ChatStreamMiddleware.cs b/src/Plugins/BotSharp.Plugin.ChatHub/ChatStreamMiddleware.cs index 138d50844..0d8aa0b3e 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/ChatStreamMiddleware.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/ChatStreamMiddleware.cs @@ -56,8 +56,9 @@ private async Task HandleWebSocket(IServiceProvider services, string agentId, st _session = new BotSharpRealtimeSession(services, webSocket, new ChatSessionOptions { Provider = "BotSharp Chat Stream", - BufferSize = 1024 * 16, - JsonOptions = BotSharpOptions.defaultJsonOptions + BufferSize = 1024 * 32, + JsonOptions = BotSharpOptions.defaultJsonOptions, + Logger = _logger }); var hub = services.GetRequiredService(); @@ -79,10 +80,13 @@ private async Task HandleWebSocket(IServiceProvider services, string agentId, st continue; } - var (eventType, data) = MapEvents(conn, receivedText); + var (eventType, data) = MapEvents(conn, receivedText, conversationId); if (eventType == "start") { - var request = InitRequest(data); +#if DEBUG + _logger.LogCritical($"Start chat stream connection for conversation ({conversationId})"); +#endif + var request = InitRequest(data, conversationId); await ConnectToModel(hub, webSocket, request?.States); } else if (eventType == "media") @@ -94,6 +98,9 @@ private async Task HandleWebSocket(IServiceProvider services, string agentId, st } else if (eventType == "disconnect") { +#if DEBUG + _logger.LogCritical($"Disconnecting chat stream connection for conversation ({conversationId})"); +#endif await hub.Completer.Disconnect(); break; } @@ -115,11 +122,20 @@ await hub.ConnectToModel(responseToUser: async data => }, initStates: states); } - private (string, string) MapEvents(RealtimeHubConnection conn, string receivedText) + private (string, string) MapEvents(RealtimeHubConnection conn, string receivedText, string conversationId) { - var response = JsonSerializer.Deserialize(receivedText); - var data = response?.Body?.Payload ?? string.Empty; + ChatStreamEventResponse? response = new(); + try + { + response = JsonSerializer.Deserialize(receivedText); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when deserializing chat stream event response for conversation ({conversationId}) (response: {receivedText?.SubstringMax(30)})"); + } + + var data = response?.Body?.Payload ?? string.Empty; switch (response.Event) { case "start": @@ -157,14 +173,15 @@ private void InitEvents(RealtimeHubConnection conn) }); } - private ChatStreamRequest? InitRequest(string data) + private ChatStreamRequest? InitRequest(string data, string conversationId) { try { return JsonSerializer.Deserialize(data, BotSharpOptions.defaultJsonOptions); } - catch + catch (Exception ex) { + _logger.LogError(ex, $"Error when deserializing initial request data for conversation ({conversationId})."); return null; } } diff --git a/src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs b/src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs index 3bcaf18b8..686233806 100644 --- a/src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs +++ b/src/Plugins/BotSharp.Plugin.Twilio/TwilioStreamMiddleware.cs @@ -1,9 +1,12 @@ using BotSharp.Abstraction.Hooks; using BotSharp.Abstraction.MLTasks; +using BotSharp.Abstraction.Options; using BotSharp.Abstraction.Realtime; using BotSharp.Abstraction.Realtime.Models; +using BotSharp.Abstraction.Realtime.Models.Session; using BotSharp.Abstraction.Routing; using BotSharp.Abstraction.Utilities; +using BotSharp.Core.Session; using BotSharp.Plugin.Twilio.Interfaces; using BotSharp.Plugin.Twilio.Models.Stream; using Microsoft.AspNetCore.Http; @@ -20,8 +23,11 @@ public class TwilioStreamMiddleware { private readonly RequestDelegate _next; private readonly ILogger _logger; + private BotSharpRealtimeSession _session; - public TwilioStreamMiddleware(RequestDelegate next, ILogger logger) + public TwilioStreamMiddleware( + RequestDelegate next, + ILogger logger) { _next = next; _logger = logger; @@ -46,6 +52,7 @@ public async Task Invoke(HttpContext httpContext) } catch (Exception ex) { + _session?.Dispose(); _logger.LogError(ex, $"Error in WebSocket communication: {ex.Message} for conversation {conversationId}"); } return; @@ -57,7 +64,15 @@ public async Task Invoke(HttpContext httpContext) private async Task HandleWebSocket(IServiceProvider services, string agentId, string conversationId, WebSocket webSocket) { - var settings = services.GetRequiredService(); + _session?.Dispose(); + _session = new BotSharpRealtimeSession(services, webSocket, new ChatSessionOptions + { + Provider = "BotSharp Twilio Stream", + BufferSize = 1024 * 32, + JsonOptions = BotSharpOptions.defaultJsonOptions, + Logger = _logger + }); + var hub = services.GetRequiredService(); var conn = hub.SetHubConnection(conversationId); conn.CurrentAgentId = agentId; @@ -65,25 +80,21 @@ private async Task HandleWebSocket(IServiceProvider services, string agentId, st // load conversation and state var convService = services.GetRequiredService(); convService.SetConversationId(conversationId, []); + var hooks = services.GetHooks(agentId); foreach (var hook in hooks) { await hook.OnStreamingStarted(conn); } + convService.States.Save(); var routing = services.GetRequiredService(); routing.Context.Push(agentId); - var buffer = new byte[1024 * 32]; - WebSocketReceiveResult result; - - do + await foreach (ChatSessionUpdate update in _session.ReceiveUpdatesAsync(CancellationToken.None)) { - Array.Clear(buffer, 0, buffer.Length); - result = await webSocket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - string receivedText = Encoding.UTF8.GetString(buffer, 0, result.Count); - + var receivedText = update?.RawResponse; if (string.IsNullOrEmpty(receivedText)) { continue; @@ -128,10 +139,13 @@ private async Task HandleWebSocket(IServiceProvider services, string agentId, st #endif await hub.Completer.Disconnect(); await HandleUserDisconnected(); + break; } - } while (!result.CloseStatus.HasValue); + } - await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None); + convService.SaveStates(); + await _session.DisconnectAsync(); + _session.Dispose(); } private async Task ConnectToModel(IRealtimeHub hub, WebSocket webSocket)