Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ public static IServiceCollection AddSynapse(this IServiceCollection services, IC
services.AddSingleton<ITextDocumentRepository<string>, RedisTextDocumentRepository<string>>();
services.AddSingleton<ITextDocumentRepository>(provider => provider.GetRequiredService<ITextDocumentRepository<string>>());

services.AddSingleton<ISchemaHandlerProvider, SchemaHandlerProvider>();
services.AddSingleton<ISchemaHandler, JsonSchemaHandler>();

services.AddScoped<IResourceRepository, ResourceRepository>();
services.AddScoped<IAdmissionControl, AdmissionControl>();
services.AddScoped<IVersionControl, VersionControl>();
services.AddScoped<IResourceMutator, WorkflowInstanceMutator>();
services.AddScoped<IResourceValidator, WorkflowInstanceValidator>();
services.AddScoped<IResourceValidator, CorrelationValidator>();
services.AddSingleton<IPatchHandler, JsonMergePatchHandler>();
services.AddSingleton<IPatchHandler, JsonPatchHandler>();
services.AddSingleton<IPatchHandler, JsonStrategicMergePatchHandler>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented.Services;
using Neuroglia.Data.Infrastructure.ResourceOriented;
using Synapse.Resources;
using Neuroglia.Data.Infrastructure.ResourceOriented.Properties;
using ServerlessWorkflow.Sdk;
using Neuroglia;

namespace Synapse.Core.Infrastructure.Services;

/// <summary>
/// Represents the service used to validate <see cref="Correlation"/>s
/// </summary>
/// <param name="resources">The service used to manage resources</param>
/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/> implementations</param>
public class CorrelationValidator(IResourceRepository resources, ISchemaHandlerProvider schemaHandlerProvider)
: IResourceValidator
{

/// <summary>
/// Gets the service used to manage resources
/// </summary>
protected IResourceRepository Resources { get; } = resources;

/// <summary>
/// Gets the service used to provide <see cref="ISchemaHandler"/> implementations
/// </summary>
protected ISchemaHandlerProvider SchemaHandlerProvider { get; } = schemaHandlerProvider;

/// <inheritdoc/>
public virtual bool AppliesTo(Operation operation, string group, string version, string plural, string? @namespace = null) => operation == Operation.Create && group == Correlation.ResourceDefinition.Group && version == Correlation.ResourceDefinition.Version && plural == Correlation.ResourceDefinition.Plural;

/// <inheritdoc/>
public virtual async Task<AdmissionReviewResponse> ValidateAsync(AdmissionReviewRequest context, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
var correlation = context.UpdatedState.ConvertTo<Correlation>()!;
switch (correlation.Spec.Outcome.Type)
{
case CorrelationOutcomeType.Start:
if (correlation.Spec.Outcome.Start == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The '/spec/outcome/start' property must be set when the correlation outcome type has been set to '{CorrelationOutcomeType.Start}'", new("/spec/outcome/start", UriKind.Relative)));
var workflow = await this.Resources.GetAsync<Workflow>(correlation.Spec.Outcome.Start.Workflow.Name, correlation.Spec.Outcome.Start.Workflow.Namespace, cancellationToken).ConfigureAwait(false);
if (workflow == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow '{correlation.Spec.Outcome.Start.Workflow.Name}.{correlation.Spec.Outcome.Start.Workflow.Namespace}'", new("/spec/outcome/start/workflow", UriKind.Relative)));
var workflowDefinition = workflow.Spec.Versions.Get(correlation.Spec.Outcome.Start.Workflow.Version);
if (workflowDefinition == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find version '{correlation.Spec.Outcome.Start.Workflow.Version}' of workflow '{correlation.Spec.Outcome.Start.Workflow.Name}.{correlation.Spec.Outcome.Start.Workflow.Namespace}'", new("/spec/outcome/start/workflow/version", UriKind.Relative)));
if (workflowDefinition.Input?.Schema != null)
{
var schemaHandler = this.SchemaHandlerProvider.GetHandler(workflowDefinition.Input.Schema.Format) ?? throw new ArgumentNullException($"Failed to find an handler that supports the specified schema format '{workflowDefinition.Input.Schema.Format}'");
var validationResult = await schemaHandler.ValidateAsync(correlation.Spec.Outcome.Start.Input ?? new Dictionary<string, object>(), workflowDefinition.Input.Schema, cancellationToken).ConfigureAwait(false);
if (!validationResult.IsSuccess()) return new(context.Uid, false, null, new(ErrorType.Validation, ErrorTitle.Validation, ErrorStatus.Validation, $"Failed to validate the correlation outcome workflow input:\n{string.Join('\n', validationResult.Errors?.FirstOrDefault()?.Errors?.Select(e => $"- {e.Key}: {e.Value.First()}") ?? [])}", new("/spec/outcome/start/input", UriKind.Relative)));
}
break;
case CorrelationOutcomeType.Correlate:
if (correlation.Spec.Outcome.Correlate == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The '/spec/outcome/correlate' property must be set when the correlation outcome type has been set to '{CorrelationOutcomeType.Correlate}'", new("/spec/outcome/correlate", UriKind.Relative)));
var components = correlation.Spec.Outcome.Correlate.Instance.Split('.', StringSplitOptions.RemoveEmptyEntries);
if (components.Length != 2) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The specified value '{correlation.Spec.Outcome.Correlate.Instance}' is not a valid workflow instance qualified name ({{name}}.{{namespace}})", new("/spec/outcome/correlate/instance", UriKind.Relative)));
var name = components[0];
var @namespace = components[1];
var workflowInstance = await this.Resources.GetAsync<WorkflowInstance>(name, @namespace, cancellationToken).ConfigureAwait(false);
if (workflowInstance == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow instance '{correlation.Spec.Outcome.Correlate.Instance}'", new("/spec/outcome/correlate/instance", UriKind.Relative)));
var task = workflowInstance.Status?.Tasks?.FirstOrDefault(t => t.Reference.OriginalString == correlation.Spec.Outcome.Correlate.Task);
if (task == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the task '{correlation.Spec.Outcome.Correlate.Task}' in workflow instance '{correlation.Spec.Outcome.Correlate.Instance}'", new("/spec/outcome/correlate/task", UriKind.Relative)));
break;
default:
return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"The specified correlation outcome type '{correlation.Spec.Outcome.Type}' is not supported", new("/spec/outcome/type", UriKind.Relative)));
}
return new(context.Uid, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ public virtual Task<AdmissionReviewResponse> MutateAsync(AdmissionReviewRequest
return Task.FromResult(new AdmissionReviewResponse(context.Uid, true, new(PatchType.JsonPatch, patch)));
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented.Services;
using Neuroglia.Data.Infrastructure.ResourceOriented;
using Synapse.Resources;
using Neuroglia.Data.Infrastructure.ResourceOriented.Properties;
using ServerlessWorkflow.Sdk;
using Neuroglia;

namespace Synapse.Core.Infrastructure.Services;

/// <summary>
/// Represents the service used to validate <see cref="WorkflowInstance"/>s
/// </summary>
/// <param name="resources">The service used to manage resources</param>
/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/> implementations</param>
public class WorkflowInstanceValidator(IResourceRepository resources, ISchemaHandlerProvider schemaHandlerProvider)
: IResourceValidator
{

/// <summary>
/// Gets the service used to manage resources
/// </summary>
protected IResourceRepository Resources { get; } = resources;

/// <summary>
/// Gets the service used to provide <see cref="ISchemaHandler"/> implementations
/// </summary>
protected ISchemaHandlerProvider SchemaHandlerProvider { get; } = schemaHandlerProvider;

/// <inheritdoc/>
public virtual bool AppliesTo(Operation operation, string group, string version, string plural, string? @namespace = null) => operation == Operation.Create && group == WorkflowInstance.ResourceDefinition.Group && version == WorkflowInstance.ResourceDefinition.Version && plural == WorkflowInstance.ResourceDefinition.Plural;

/// <inheritdoc/>
public virtual async Task<AdmissionReviewResponse> ValidateAsync(AdmissionReviewRequest context, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
var workflowInstance = context.UpdatedState.ConvertTo<WorkflowInstance>()!;
var workflow = await this.Resources.GetAsync<Workflow>(workflowInstance.Spec.Definition.Name, workflowInstance.Spec.Definition.Namespace, cancellationToken).ConfigureAwait(false);
if (workflow == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find the specified workflow '{workflowInstance.Spec.Definition.Name}.{workflowInstance.Spec.Definition.Namespace}'", new("/spec/definition", UriKind.Relative)));
var workflowDefinition = workflow.Spec.Versions.Get(workflowInstance.Spec.Definition.Version);
if (workflowDefinition == null) return new(context.Uid, false, null, new(ProblemTypes.AdmissionFailed, ProblemTitles.ValidationFailed, ErrorStatus.Validation, $"Failed to find version '{workflowInstance.Spec.Definition.Version}' of workflow '{workflowInstance.Spec.Definition.Name}.{workflowInstance.Spec.Definition.Namespace}'", new("/spec/definition/version", UriKind.Relative)));
if (workflowDefinition.Input?.Schema != null)
{
var schemaHandler = this.SchemaHandlerProvider.GetHandler(workflowDefinition.Input.Schema.Format) ?? throw new ArgumentNullException($"Failed to find an handler that supports the specified schema format '{workflowDefinition.Input.Schema.Format}'");
var validationResult = await schemaHandler.ValidateAsync(workflowInstance.Spec.Input ?? [], workflowDefinition.Input.Schema, cancellationToken).ConfigureAwait(false);
if (!validationResult.IsSuccess()) return new(context.Uid, false, null, new(ErrorType.Validation, ErrorTitle.Validation, ErrorStatus.Validation, $"Failed to validate the workflow instance's input:\n{string.Join('\n', validationResult.Errors?.FirstOrDefault()?.Errors?.Select(e => $"- {e.Key}: {e.Value.First()}") ?? [])}", new("/spec/input", UriKind.Relative)));
}
return new(context.Uid, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
var message = ex.Message;
try { if (this.Container?.StandardError != null) message = await this.Container.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false); } catch { }
var error = ex.ToError(this.Task.Instance.Reference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
}
catch (ErrorRaisedException ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new()
{
Status = ErrorStatus.Validation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new()
{
Status = ErrorStatus.Validation,
Expand Down
8 changes: 4 additions & 4 deletions src/runner/Synapse.Runner/Services/TaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationToken =
}
catch(HttpRequestException ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Communication,
Expand All @@ -135,7 +135,7 @@ await this.SetErrorAsync(new Error()
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
this.Logger.LogError("An error occurred while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Runtime,
Expand Down Expand Up @@ -200,7 +200,7 @@ await this.SetErrorAsync(new()
catch (OperationCanceledException) { }
catch (HttpRequestException ex)
{
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Communication,
Expand All @@ -212,7 +212,7 @@ await this.SetErrorAsync(new Error()
}
catch (Exception ex)
{
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
this.Logger.LogError("An error occurred while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Runtime,
Expand Down
Loading