From 185bcaac675e27027278bbf10cad255b51a09a47 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Wed, 11 Sep 2024 16:58:22 +0200 Subject: [PATCH] fix(Runner): Fixed the WorkflowExecutor and DoTaskExecutor to ignore the flow directive of skipped tasks Signed-off-by: Charles d'Avernas --- .../Extensions/WorkflowDefinitionExtensions.cs | 2 +- .../Services/Executors/DoTaskExecutor.cs | 5 ++--- .../Services/Executors/OpenApiCallExecutor.cs | 14 ++++++++------ .../Synapse.Runner/Services/WorkflowExecutor.cs | 2 +- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/runner/Synapse.Runner/Extensions/WorkflowDefinitionExtensions.cs b/src/runner/Synapse.Runner/Extensions/WorkflowDefinitionExtensions.cs index 965315628..e296ba593 100644 --- a/src/runner/Synapse.Runner/Extensions/WorkflowDefinitionExtensions.cs +++ b/src/runner/Synapse.Runner/Extensions/WorkflowDefinitionExtensions.cs @@ -28,7 +28,7 @@ public static class WorkflowDefinitionExtensions public static MapEntry? GetTaskAfter(this WorkflowDefinition workflow, TaskInstance after) { ArgumentNullException.ThrowIfNull(after); - switch (after.Next) + switch (after.Status == TaskInstanceStatus.Skipped ? FlowDirective.Continue : after.Next) { case FlowDirective.Continue: var afterTask = workflow.Do[after.Name!]; diff --git a/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs index 4cb0cf26f..c241514da 100644 --- a/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/DoTaskExecutor.cs @@ -98,7 +98,6 @@ protected virtual async Task OnSubTaskFaultAsync(ITaskExecutor executor, Cancell /// A new awaitable protected virtual async Task OnSubtaskCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken) { - ArgumentNullException.ThrowIfNull(executor); var last = executor.Task.Instance; var output = executor.Task.Output!; var nextDefinition = this.Task.Definition.Do.GetTaskAfter(last); @@ -106,12 +105,12 @@ protected virtual async Task OnSubtaskCompletedAsync(ITaskExecutor executor, Can if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false); if (nextDefinition == null || nextDefinition.Value == null) { - await this.SetResultAsync(output, last.Next == FlowDirective.End ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + await this.SetResultAsync(output, last.Status != TaskInstanceStatus.Skipped && last.Next == FlowDirective.End ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); return; } var nextDefinitionIndex = this.Task.Definition.Do.Keys.ToList().IndexOf(nextDefinition.Key); TaskInstance next; - switch (executor.Task.Instance.Next) + switch (executor.Task.Instance.Status == TaskInstanceStatus.Skipped ? FlowDirective.Continue : executor.Task.Instance.Next) { case FlowDirective.End: await this.SetResultAsync(output, FlowDirective.End, cancellationToken).ConfigureAwait(false); diff --git a/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs index e734f5d06..ccc82c4ae 100644 --- a/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/OpenApiCallExecutor.cs @@ -109,14 +109,16 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo this.OpenApi = (OpenApiCallDefinition)this.JsonSerializer.Convert(this.Task.Definition.With, typeof(OpenApiCallDefinition))!; using var httpClient = this.HttpClientFactory.CreateClient(); await httpClient.ConfigureAuthenticationAsync(this.Task.Workflow.Definition, this.OpenApi.Document.Endpoint.Authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false); - var uri = StringFormatter.NamedFormat(this.OpenApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary()); - if (uri.IsRuntimeExpression()) uri = await this.Task.Workflow.Expressions.EvaluateAsync(uri, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); - using var request = new HttpRequestMessage(HttpMethod.Get, uri); + var uriString = StringFormatter.NamedFormat(this.OpenApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary()); + if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The OpenAPI endpoint URI cannot be null or whitespace"); + if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI"); + using var request = new HttpRequestMessage(HttpMethod.Get, uriString); using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false); if (!response.IsSuccessStatusCode) { var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); - this.Logger.LogInformation("Failed to retrieve the OpenAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'.", this.OpenApi.Document.EndpointUri, response.StatusCode); + this.Logger.LogInformation("Failed to retrieve the OpenAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'.", uri, response.StatusCode); this.Logger.LogDebug("Response content:\r\n{responseContent}", responseContent ?? "None"); response.EnsureSuccessStatusCode(); } @@ -125,11 +127,11 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo var operation = this.Document.Paths .SelectMany(p => p.Value.Operations) .FirstOrDefault(o => o.Value.OperationId == this.OpenApi.OperationId); - if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{this.OpenApi.OperationId}' in OpenAPI document at '{this.OpenApi.Document.EndpointUri}'"); + if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{this.OpenApi.OperationId}' in OpenAPI document at '{uri}'"); this.HttpMethod = operation.Key.ToHttpMethod(); this.Operation = operation.Value; this.Servers = this.Document.Servers.Select(s => s.Url).ToList(); - if (this.Servers.Count == 0) this.Servers.Add(this.OpenApi.Document.EndpointUri.OriginalString.Replace(this.OpenApi.Document.EndpointUri.PathAndQuery, string.Empty)); + if (this.Servers.Count == 0) this.Servers.Add(uri.OriginalString.Replace(uri.PathAndQuery, string.Empty)); var path = this.Document.Paths.Single(p => p.Value.Operations.Any(o => o.Value.OperationId == operation.Value.OperationId)); this.Path = path.Key; await this.BuildParametersAsync(cancellationToken).ConfigureAwait(false); diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs index 68fc36dc9..7e0ead1d6 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutor.cs @@ -265,7 +265,7 @@ protected virtual async Task OnTaskFaultedAsync(ITaskExecutor executor, Cancella /// A new awaitable protected virtual async Task OnTaskCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken) { - var nextDefinition = executor.Task.Instance.Next switch + var nextDefinition = (executor.Task.Instance.Status == TaskInstanceStatus.Skipped ? FlowDirective.Continue : executor.Task.Instance.Next) switch { FlowDirective.End or FlowDirective.Exit => null, _ => this.Workflow.Definition.GetTaskAfter(executor.Task.Instance)