diff --git a/src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs b/src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs index db114345b..04b16cd75 100644 --- a/src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs +++ b/src/core/Synapse.Core.Infrastructure/Extensions/IServiceCollectionExtensions.cs @@ -61,10 +61,15 @@ public static IServiceCollection AddSynapse(this IServiceCollection services, IC services.AddSingleton, RedisTextDocumentRepository>(); services.AddSingleton(provider => provider.GetRequiredService>()); + services.AddSingleton(); + services.AddSingleton(); + services.AddScoped(); services.AddScoped(); services.AddScoped(); services.AddScoped(); + services.AddScoped(); + services.AddScoped(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/src/core/Synapse.Core.Infrastructure/Services/CorrelationValidator.cs b/src/core/Synapse.Core.Infrastructure/Services/CorrelationValidator.cs new file mode 100644 index 000000000..659c06340 --- /dev/null +++ b/src/core/Synapse.Core.Infrastructure/Services/CorrelationValidator.cs @@ -0,0 +1,82 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.ResourceOriented.Services; +using Neuroglia.Data.Infrastructure.ResourceOriented; +using Synapse.Resources; +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using ServerlessWorkflow.Sdk; +using Neuroglia; + +namespace Synapse.Core.Infrastructure.Services; + +/// +/// Represents the service used to validate s +/// +/// The service used to manage resources +/// The service used to provide implementations +public class CorrelationValidator(IResourceRepository resources, ISchemaHandlerProvider schemaHandlerProvider) + : IResourceValidator +{ + + /// + /// Gets the service used to manage resources + /// + protected IResourceRepository Resources { get; } = resources; + + /// + /// Gets the service used to provide implementations + /// + protected ISchemaHandlerProvider SchemaHandlerProvider { get; } = schemaHandlerProvider; + + /// + public virtual bool AppliesTo(Operation operation, string group, string version, string plural, string? @namespace = null) => operation == Operation.Create && group == Correlation.ResourceDefinition.Group && version == Correlation.ResourceDefinition.Version && plural == Correlation.ResourceDefinition.Plural; + + /// + public virtual async Task ValidateAsync(AdmissionReviewRequest context, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(context); + var correlation = context.UpdatedState.ConvertTo()!; + switch (correlation.Spec.Outcome.Type) + { + case CorrelationOutcomeType.Start: + if (correlation.Spec.Outcome.Start == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The '/spec/outcome/start' property must be set when the correlation outcome type has been set to '{CorrelationOutcomeType.Start}'", new("/spec/outcome/start", UriKind.Relative))); + var workflow = await this.Resources.GetAsync(correlation.Spec.Outcome.Start.Workflow.Name, correlation.Spec.Outcome.Start.Workflow.Namespace, cancellationToken).ConfigureAwait(false); + if (workflow == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow '{correlation.Spec.Outcome.Start.Workflow.Name}.{correlation.Spec.Outcome.Start.Workflow.Namespace}'", new("/spec/outcome/start/workflow", UriKind.Relative))); + var workflowDefinition = workflow.Spec.Versions.Get(correlation.Spec.Outcome.Start.Workflow.Version); + if (workflowDefinition == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find version '{correlation.Spec.Outcome.Start.Workflow.Version}' of workflow '{correlation.Spec.Outcome.Start.Workflow.Name}.{correlation.Spec.Outcome.Start.Workflow.Namespace}'", new("/spec/outcome/start/workflow/version", UriKind.Relative))); + if (workflowDefinition.Input?.Schema != null) + { + var schemaHandler = this.SchemaHandlerProvider.GetHandler(workflowDefinition.Input.Schema.Format) ?? throw new ArgumentNullException($"Failed to find an handler that supports the specified schema format '{workflowDefinition.Input.Schema.Format}'"); + var validationResult = await schemaHandler.ValidateAsync(correlation.Spec.Outcome.Start.Input ?? new Dictionary(), workflowDefinition.Input.Schema, cancellationToken).ConfigureAwait(false); + if (!validationResult.IsSuccess()) return new(context.Uid, false, null, new(ErrorType.Validation, ErrorTitle.Validation, ErrorStatus.Validation, $"Failed to validate the correlation outcome workflow input:\n{string.Join('\n', validationResult.Errors?.FirstOrDefault()?.Errors?.Select(e => $"- {e.Key}: {e.Value.First()}") ?? [])}", new("/spec/outcome/start/input", UriKind.Relative))); + } + break; + case CorrelationOutcomeType.Correlate: + if (correlation.Spec.Outcome.Correlate == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The '/spec/outcome/correlate' property must be set when the correlation outcome type has been set to '{CorrelationOutcomeType.Correlate}'", new("/spec/outcome/correlate", UriKind.Relative))); + var components = correlation.Spec.Outcome.Correlate.Instance.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (components.Length != 2) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The specified value '{correlation.Spec.Outcome.Correlate.Instance}' is not a valid workflow instance qualified name ({{name}}.{{namespace}})", new("/spec/outcome/correlate/instance", UriKind.Relative))); + var name = components[0]; + var @namespace = components[1]; + var workflowInstance = await this.Resources.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false); + if (workflowInstance == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow instance '{correlation.Spec.Outcome.Correlate.Instance}'", new("/spec/outcome/correlate/instance", UriKind.Relative))); + var task = workflowInstance.Status?.Tasks?.FirstOrDefault(t => t.Reference.OriginalString == correlation.Spec.Outcome.Correlate.Task); + if (task == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the task '{correlation.Spec.Outcome.Correlate.Task}' in workflow instance '{correlation.Spec.Outcome.Correlate.Instance}'", new("/spec/outcome/correlate/task", UriKind.Relative))); + break; + default: + return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The specified correlation outcome type '{correlation.Spec.Outcome.Type}' is not supported", new("/spec/outcome/type", UriKind.Relative))); + } + return new(context.Uid, true); + } + +} \ No newline at end of file diff --git a/src/core/Synapse.Core.Infrastructure/Services/WorkflowInstanceMutator.cs b/src/core/Synapse.Core.Infrastructure/Services/WorkflowInstanceMutator.cs index 30c9b05f1..a190a624a 100644 --- a/src/core/Synapse.Core.Infrastructure/Services/WorkflowInstanceMutator.cs +++ b/src/core/Synapse.Core.Infrastructure/Services/WorkflowInstanceMutator.cs @@ -40,4 +40,4 @@ public virtual Task MutateAsync(AdmissionReviewRequest return Task.FromResult(new AdmissionReviewResponse(context.Uid, true, new(PatchType.JsonPatch, patch))); } -} \ No newline at end of file +} diff --git a/src/core/Synapse.Core.Infrastructure/Services/WorkflowInstanceValidator.cs b/src/core/Synapse.Core.Infrastructure/Services/WorkflowInstanceValidator.cs new file mode 100644 index 000000000..d809f813a --- /dev/null +++ b/src/core/Synapse.Core.Infrastructure/Services/WorkflowInstanceValidator.cs @@ -0,0 +1,63 @@ +// Copyright © 2024-Present The Synapse Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.ResourceOriented.Services; +using Neuroglia.Data.Infrastructure.ResourceOriented; +using Synapse.Resources; +using Neuroglia.Data.Infrastructure.ResourceOriented.Properties; +using ServerlessWorkflow.Sdk; +using Neuroglia; + +namespace Synapse.Core.Infrastructure.Services; + +/// +/// Represents the service used to validate s +/// +/// The service used to manage resources +/// The service used to provide implementations +public class WorkflowInstanceValidator(IResourceRepository resources, ISchemaHandlerProvider schemaHandlerProvider) + : IResourceValidator +{ + + /// + /// Gets the service used to manage resources + /// + protected IResourceRepository Resources { get; } = resources; + + /// + /// Gets the service used to provide implementations + /// + protected ISchemaHandlerProvider SchemaHandlerProvider { get; } = schemaHandlerProvider; + + /// + public virtual bool AppliesTo(Operation operation, string group, string version, string plural, string? @namespace = null) => operation == Operation.Create && group == WorkflowInstance.ResourceDefinition.Group && version == WorkflowInstance.ResourceDefinition.Version && plural == WorkflowInstance.ResourceDefinition.Plural; + + /// + public virtual async Task ValidateAsync(AdmissionReviewRequest context, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(context); + var workflowInstance = context.UpdatedState.ConvertTo()!; + var workflow = await this.Resources.GetAsync(workflowInstance.Spec.Definition.Name, workflowInstance.Spec.Definition.Namespace, cancellationToken).ConfigureAwait(false); + if (workflow == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow '{workflowInstance.Spec.Definition.Name}.{workflowInstance.Spec.Definition.Namespace}'", new("/spec/definition", UriKind.Relative))); + var workflowDefinition = workflow.Spec.Versions.Get(workflowInstance.Spec.Definition.Version); + if (workflowDefinition == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find version '{workflowInstance.Spec.Definition.Version}' of workflow '{workflowInstance.Spec.Definition.Name}.{workflowInstance.Spec.Definition.Namespace}'", new("/spec/definition/version", UriKind.Relative))); + if (workflowDefinition.Input?.Schema != null) + { + var schemaHandler = this.SchemaHandlerProvider.GetHandler(workflowDefinition.Input.Schema.Format) ?? throw new ArgumentNullException($"Failed to find an handler that supports the specified schema format '{workflowDefinition.Input.Schema.Format}'"); + var validationResult = await schemaHandler.ValidateAsync(workflowInstance.Spec.Input ?? [], workflowDefinition.Input.Schema, cancellationToken).ConfigureAwait(false); + if (!validationResult.IsSuccess()) return new(context.Uid, false, null, new(ErrorType.Validation, ErrorTitle.Validation, ErrorStatus.Validation, $"Failed to validate the workflow instance's input:\n{string.Join('\n', validationResult.Errors?.FirstOrDefault()?.Errors?.Select(e => $"- {e.Key}: {e.Value.First()}") ?? [])}", new("/spec/input", UriKind.Relative))); + } + return new(context.Uid, true); + } + +} diff --git a/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs index c25a00bb0..908658ebd 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs @@ -71,7 +71,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken } catch(Exception ex) { - this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); var message = ex.Message; try { if (this.Container?.StandardError != null) message = await this.Container.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false); } catch { } var error = ex.ToError(this.Task.Instance.Reference); diff --git a/src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs index 30b533446..cf3dc79b6 100644 --- a/src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/GrpcCallExecutor.cs @@ -64,12 +64,12 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo } catch (ErrorRaisedException ex) { - this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false); } catch (Exception ex) { - this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new() { Status = ErrorStatus.Validation, diff --git a/src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs index 524fc465f..9318f961b 100644 --- a/src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/HttpCallExecutor.cs @@ -60,7 +60,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo } catch(Exception ex) { - this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new() { Status = ErrorStatus.Validation, diff --git a/src/runner/Synapse.Runner/Services/TaskExecutor.cs b/src/runner/Synapse.Runner/Services/TaskExecutor.cs index b90091b26..7189a1bbd 100644 --- a/src/runner/Synapse.Runner/Services/TaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/TaskExecutor.cs @@ -123,7 +123,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationToken = } catch(HttpRequestException ex) { - this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new Error() { Type = ErrorType.Communication, @@ -135,7 +135,7 @@ await this.SetErrorAsync(new Error() } catch(Exception ex) { - this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new Error() { Type = ErrorType.Runtime, @@ -200,7 +200,7 @@ await this.SetErrorAsync(new() catch (OperationCanceledException) { } catch (HttpRequestException ex) { - this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new Error() { Type = ErrorType.Communication, @@ -212,7 +212,7 @@ await this.SetErrorAsync(new Error() } catch (Exception ex) { - this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); + this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex); await this.SetErrorAsync(new Error() { Type = ErrorType.Runtime,