diff --git a/src/core/Synapse.Core/WorkflowInstanceStatusPhase.cs b/src/core/Synapse.Core/WorkflowInstanceStatusPhase.cs index bee4172c6..7c9459c94 100644 --- a/src/core/Synapse.Core/WorkflowInstanceStatusPhase.cs +++ b/src/core/Synapse.Core/WorkflowInstanceStatusPhase.cs @@ -28,14 +28,14 @@ public static class WorkflowInstanceStatusPhase /// public const string Running = "running"; /// - /// Indicates that the workflow ran to completion - /// - public const string Completed = "completed"; - /// /// Indicates that the workflow's execution is waiting for user or event input /// public const string Waiting = "waiting"; /// + /// Indicates that the workflow ran to completion + /// + public const string Completed = "completed"; + /// /// Indicates that the workflow's execution has been cancelled /// public const string Cancelled = "cancelled"; diff --git a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs index 8ef270d23..2a445a71a 100644 --- a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs +++ b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs @@ -367,7 +367,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte Metadata = new() { Namespace = this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Namespace, - Name = $"{this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Namespace}-" + Name = $"{this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Name}-" }, Spec = new() { diff --git a/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs index f8ed71404..c25a00bb0 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs @@ -71,6 +71,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken } catch(Exception ex) { + this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); var message = ex.Message; try { if (this.Container?.StandardError != null) message = await this.Container.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false); } catch { } var error = ex.ToError(this.Task.Instance.Reference); diff --git a/src/runner/Synapse.Runner/Services/Executors/ExtensionTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ExtensionTaskExecutor.cs index 18b459ca1..98711b943 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ExtensionTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ExtensionTaskExecutor.cs @@ -28,10 +28,6 @@ public class ExtensionTaskExecutor(IServiceProvider serviceProvider, ILogger - protected override Task DoExecuteAsync(CancellationToken cancellationToken) - { - this.GetType(); - throw new NotImplementedException(); - } + protected override Task DoExecuteAsync(CancellationToken cancellationToken) => throw new NotImplementedException(); } diff --git a/src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs index ff7ef9a90..30b533446 100644 --- a/src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs @@ -62,9 +62,14 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo var callInvoker = new DefaultCallInvoker(channel); this.GrpcClient = DynamicGrpcClient.FromDescriptorProtos(callInvoker: callInvoker, [fileDescriptor]); } - catch (ErrorRaisedException ex) { await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false); } + catch (ErrorRaisedException ex) + { + this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false); + } catch (Exception ex) { + this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new() { Status = ErrorStatus.Validation, diff --git a/src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs index 9c3426adb..524fc465f 100644 --- a/src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs @@ -60,6 +60,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo } catch(Exception ex) { + this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new() { Status = ErrorStatus.Validation, diff --git a/src/runner/Synapse.Runner/Services/Executors/SwitchTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/SwitchTaskExecutor.cs index b7ad07bc1..17f87fead 100644 --- a/src/runner/Synapse.Runner/Services/Executors/SwitchTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/SwitchTaskExecutor.cs @@ -36,10 +36,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken var defaultCase = this.Task.Definition.Switch.FirstOrDefault(kvp => string.IsNullOrWhiteSpace(kvp.Value.When)); foreach (var @case in this.Task.Definition.Switch!.Where(c => !string.IsNullOrWhiteSpace(c.Value.When))) { - if (await this.Task.Workflow.Expressions.EvaluateConditionAsync(@case.Value.When!, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false)) - { - matches.Add(@case); - } + if (await this.Task.Workflow.Expressions.EvaluateConditionAsync(@case.Value.When!, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false)) matches.Add(@case); } if (matches.Count == 1) await this.SetResultAsync(this.Task.Input, matches.First().Value.Then, cancellationToken).ConfigureAwait(false); else if (matches.Count > 1) await this.SetErrorAsync(Error.Configuration(this.Task.Instance.Reference, $"At most one matching case is allowed, but cases {string.Join(", ", matches.Select(m => m.Key))} have been matched."), cancellationToken).ConfigureAwait(false); diff --git a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs index ac2cc8d15..2997d654e 100644 --- a/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs @@ -11,6 +11,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia; +using Neuroglia.Data.Expressions; +using Neuroglia.Data.Infrastructure.ResourceOriented; +using System.Security.Cryptography; +using System.Text; + namespace Synapse.Runner.Services.Executors; /// @@ -23,19 +29,102 @@ namespace Synapse.Runner.Services.Executors; /// The current /// The service used to provide implementations /// The service used to serialize/deserialize objects to/from JSON -public class WorkflowProcessExecutor(IServiceProvider serviceProvider, ILogger logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory, ITaskExecutionContext context, ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer) +/// The service used to interact with the Synapse API +public class WorkflowProcessExecutor(IServiceProvider serviceProvider, ILogger logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory, + ITaskExecutionContext context, ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer, ISynapseApiClient api) : TaskExecutor(serviceProvider, logger, executionContextFactory, executorFactory, context, schemaHandlerProvider, serializer) { + /// + /// Gets the service used to interact with the Synapse API + /// + protected ISynapseApiClient Api { get; } = api; + /// /// Gets the definition of the shell process to run /// protected WorkflowProcessDefinition ProcessDefinition => this.Task.Definition.Run.Workflow!; /// - protected override Task DoExecuteAsync(CancellationToken cancellationToken) + protected override async Task DoExecuteAsync(CancellationToken cancellationToken) { - throw new NotImplementedException(); + var hash = Convert.ToHexString(MD5.HashData(Encoding.UTF8.GetBytes($"{Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name)}{this.Task.Instance.Reference}"))).ToLowerInvariant(); + var workflowInstanceName = $"{this.ProcessDefinition.Name}-{hash}"; + var workflowInstanceNamespace = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace)!; + WorkflowInstance workflowInstance; + try + { + workflowInstance = await this.Api.WorkflowInstances.GetAsync(workflowInstanceName, workflowInstanceNamespace, cancellationToken).ConfigureAwait(false); + switch (workflowInstance.Status?.Phase) + { + case WorkflowInstanceStatusPhase.Cancelled: + await this.SetErrorAsync(new() + { + Type = ErrorType.Runtime, + Status = ErrorStatus.Runtime, + Title = ErrorTitle.Runtime, + Detail = $"The execution of workflow instance '{workflowInstance.GetQualifiedName()}' has been cancelled" + }, cancellationToken).ConfigureAwait(false); + return; + case WorkflowInstanceStatusPhase.Faulted: + await this.SetErrorAsync(workflowInstance.Status.Error!, cancellationToken).ConfigureAwait(false); + return; + case WorkflowInstanceStatusPhase.Completed: + var output = string.IsNullOrWhiteSpace(workflowInstance.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(workflowInstance.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content; + await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + return; + } + } + catch + { + var workflow = await this.Api.Workflows.GetAsync(this.ProcessDefinition.Name, this.ProcessDefinition.Namespace, cancellationToken).ConfigureAwait(false); + var workflowDefinition = this.ProcessDefinition.Version == "latest" + ? workflow.Spec.Versions.Last() + : workflow.Spec.Versions.Get(this.ProcessDefinition.Version) ?? throw new NullReferenceException($"Failed to find version '{this.ProcessDefinition.Version}' of workflow '{workflow.GetQualifiedName()}'"); + var input = await this.Task.Workflow.Expressions.EvaluateAsync>(this.ProcessDefinition.Input ?? new(), this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken: cancellationToken).ConfigureAwait(false); + workflowInstance = new WorkflowInstance() + { + Metadata = new() + { + Namespace = workflowInstanceNamespace, + Name = workflowInstanceName + }, + Spec = new() + { + Definition = new() + { + Namespace = this.ProcessDefinition.Namespace, + Name = this.ProcessDefinition.Name, + Version = this.ProcessDefinition.Version + }, + Input = input + } + }; + workflowInstance = await this.Api.WorkflowInstances.CreateAsync(workflowInstance, cancellationToken).ConfigureAwait(false); + } + var watchEvents = await this.Api.WorkflowInstances.MonitorAsync(workflowInstance.GetName(), workflowInstance.GetNamespace()!, cancellationToken).ConfigureAwait(false); + await foreach(var watchEvent in watchEvents) + { + switch (watchEvent.Resource.Status?.Phase) + { + case WorkflowInstanceStatusPhase.Cancelled: + await this.SetErrorAsync(new() + { + Type = ErrorType.Runtime, + Status = ErrorStatus.Runtime, + Title = ErrorTitle.Runtime, + Detail = $"The execution of workflow instance '{workflowInstance.GetQualifiedName()}' has been cancelled" + }, cancellationToken).ConfigureAwait(false); + break; + case WorkflowInstanceStatusPhase.Faulted: + await this.SetErrorAsync(workflowInstance.Status!.Error!, cancellationToken).ConfigureAwait(false); + return; + case WorkflowInstanceStatusPhase.Completed: + var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content; + await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); + return; + } + } } } \ No newline at end of file diff --git a/src/runner/Synapse.Runner/Services/TaskExecutor.cs b/src/runner/Synapse.Runner/Services/TaskExecutor.cs index eb0491a92..b90091b26 100644 --- a/src/runner/Synapse.Runner/Services/TaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/TaskExecutor.cs @@ -123,6 +123,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationToken = } catch(HttpRequestException ex) { + this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new Error() { Type = ErrorType.Communication, @@ -134,6 +135,7 @@ await this.SetErrorAsync(new Error() } catch(Exception ex) { + this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new Error() { Type = ErrorType.Runtime, @@ -198,6 +200,7 @@ await this.SetErrorAsync(new() catch (OperationCanceledException) { } catch (HttpRequestException ex) { + this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new Error() { Type = ErrorType.Communication, @@ -209,6 +212,7 @@ await this.SetErrorAsync(new Error() } catch (Exception ex) { + this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new Error() { Type = ErrorType.Runtime, diff --git a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs index 227512ead..105169221 100644 --- a/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs @@ -29,11 +29,11 @@ namespace Synapse.Runner.Services; /// The current /// The service used to evaluate runtime expressions /// The service used to serialize/deserialize objects to/from JSON -/// The service used to interact with the Synapse API +/// The service used to interact with the Synapse API /// The service used to access the current /// The of the to execute /// The to execute -public class WorkflowExecutionContext(IServiceProvider services, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, ISynapseApiClient cloudFlowsApi, IOptions options, WorkflowDefinition definition, WorkflowInstance instance) +public class WorkflowExecutionContext(IServiceProvider services, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, ISynapseApiClient api, IOptions options, WorkflowDefinition definition, WorkflowInstance instance) : IWorkflowExecutionContext { @@ -51,7 +51,7 @@ public class WorkflowExecutionContext(IServiceProvider services, IExpressionEval /// /// Gets the service used to interact with the Synapse API /// - protected ISynapseApiClient Api { get; } = cloudFlowsApi; + protected ISynapseApiClient Api { get; } = api; /// /// Gets the current