Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/core/Synapse.Core/WorkflowInstanceStatusPhase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ public static class WorkflowInstanceStatusPhase
/// </summary>
public const string Running = "running";
/// <summary>
/// Indicates that the workflow ran to completion
/// </summary>
public const string Completed = "completed";
/// <summary>
/// Indicates that the workflow's execution is waiting for user or event input
/// </summary>
public const string Waiting = "waiting";
/// <summary>
/// Indicates that the workflow ran to completion
/// </summary>
public const string Completed = "completed";
/// <summary>
/// Indicates that the workflow's execution has been cancelled
/// </summary>
public const string Cancelled = "cancelled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
Metadata = new()
{
Namespace = this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Namespace,
Name = $"{this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Namespace}-"
Name = $"{this.Correlation.Resource.Spec.Outcome.Start!.Workflow.Name}-"
},
Spec = new()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
var message = ex.Message;
try { if (this.Container?.StandardError != null) message = await this.Container.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false); } catch { }
var error = ex.ToError(this.Task.Instance.Reference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ public class ExtensionTaskExecutor(IServiceProvider serviceProvider, ILogger<Ext
{

/// <inheritdoc/>
protected override Task DoExecuteAsync(CancellationToken cancellationToken)
{
this.GetType();
throw new NotImplementedException();
}
protected override Task DoExecuteAsync(CancellationToken cancellationToken) => throw new NotImplementedException();

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,14 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
var callInvoker = new DefaultCallInvoker(channel);
this.GrpcClient = DynamicGrpcClient.FromDescriptorProtos(callInvoker: callInvoker, [fileDescriptor]);
}
catch (ErrorRaisedException ex) { await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false); }
catch (ErrorRaisedException ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(ex.Error, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new()
{
Status = ErrorStatus.Validation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new()
{
Status = ErrorStatus.Validation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
var defaultCase = this.Task.Definition.Switch.FirstOrDefault(kvp => string.IsNullOrWhiteSpace(kvp.Value.When));
foreach (var @case in this.Task.Definition.Switch!.Where(c => !string.IsNullOrWhiteSpace(c.Value.When)))
{
if (await this.Task.Workflow.Expressions.EvaluateConditionAsync(@case.Value.When!, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false))
{
matches.Add(@case);
}
if (await this.Task.Workflow.Expressions.EvaluateConditionAsync(@case.Value.When!, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false)) matches.Add(@case);
}
if (matches.Count == 1) await this.SetResultAsync(this.Task.Input, matches.First().Value.Then, cancellationToken).ConfigureAwait(false);
else if (matches.Count > 1) await this.SetErrorAsync(Error.Configuration(this.Task.Instance.Reference, $"At most one matching case is allowed, but cases {string.Join(", ", matches.Select(m => m.Key))} have been matched."), cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia;
using Neuroglia.Data.Expressions;
using Neuroglia.Data.Infrastructure.ResourceOriented;
using System.Security.Cryptography;
using System.Text;

namespace Synapse.Runner.Services.Executors;

/// <summary>
Expand All @@ -23,19 +29,102 @@ namespace Synapse.Runner.Services.Executors;
/// <param name="context">The current <see cref="ITaskExecutionContext"/></param>
/// <param name="schemaHandlerProvider">The service used to provide <see cref="ISchemaHandler"/> implementations</param>
/// <param name="serializer">The service used to serialize/deserialize objects to/from JSON</param>
public class WorkflowProcessExecutor(IServiceProvider serviceProvider, ILogger<WorkflowProcessExecutor> logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory, ITaskExecutionContext<RunTaskDefinition> context, ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer)
/// <param name="api">The service used to interact with the Synapse API</param>
public class WorkflowProcessExecutor(IServiceProvider serviceProvider, ILogger<WorkflowProcessExecutor> logger, ITaskExecutionContextFactory executionContextFactory, ITaskExecutorFactory executorFactory,
ITaskExecutionContext<RunTaskDefinition> context, ISchemaHandlerProvider schemaHandlerProvider, IJsonSerializer serializer, ISynapseApiClient api)
: TaskExecutor<RunTaskDefinition>(serviceProvider, logger, executionContextFactory, executorFactory, context, schemaHandlerProvider, serializer)
{

/// <summary>
/// Gets the service used to interact with the Synapse API
/// </summary>
protected ISynapseApiClient Api { get; } = api;

/// <summary>
/// Gets the definition of the shell process to run
/// </summary>
protected WorkflowProcessDefinition ProcessDefinition => this.Task.Definition.Run.Workflow!;

/// <inheritdoc/>
protected override Task DoExecuteAsync(CancellationToken cancellationToken)
protected override async Task DoExecuteAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
var hash = Convert.ToHexString(MD5.HashData(Encoding.UTF8.GetBytes($"{Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name)}{this.Task.Instance.Reference}"))).ToLowerInvariant();
var workflowInstanceName = $"{this.ProcessDefinition.Name}-{hash}";
var workflowInstanceNamespace = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace)!;
WorkflowInstance workflowInstance;
try
{
workflowInstance = await this.Api.WorkflowInstances.GetAsync(workflowInstanceName, workflowInstanceNamespace, cancellationToken).ConfigureAwait(false);
switch (workflowInstance.Status?.Phase)
{
case WorkflowInstanceStatusPhase.Cancelled:
await this.SetErrorAsync(new()
{
Type = ErrorType.Runtime,
Status = ErrorStatus.Runtime,
Title = ErrorTitle.Runtime,
Detail = $"The execution of workflow instance '{workflowInstance.GetQualifiedName()}' has been cancelled"
}, cancellationToken).ConfigureAwait(false);
return;
case WorkflowInstanceStatusPhase.Faulted:
await this.SetErrorAsync(workflowInstance.Status.Error!, cancellationToken).ConfigureAwait(false);
return;
case WorkflowInstanceStatusPhase.Completed:
var output = string.IsNullOrWhiteSpace(workflowInstance.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(workflowInstance.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content;
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
}
}
catch
{
var workflow = await this.Api.Workflows.GetAsync(this.ProcessDefinition.Name, this.ProcessDefinition.Namespace, cancellationToken).ConfigureAwait(false);
var workflowDefinition = this.ProcessDefinition.Version == "latest"
? workflow.Spec.Versions.Last()
: workflow.Spec.Versions.Get(this.ProcessDefinition.Version) ?? throw new NullReferenceException($"Failed to find version '{this.ProcessDefinition.Version}' of workflow '{workflow.GetQualifiedName()}'");
var input = await this.Task.Workflow.Expressions.EvaluateAsync<EquatableDictionary<string, object>>(this.ProcessDefinition.Input ?? new(), this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken: cancellationToken).ConfigureAwait(false);
workflowInstance = new WorkflowInstance()
{
Metadata = new()
{
Namespace = workflowInstanceNamespace,
Name = workflowInstanceName
},
Spec = new()
{
Definition = new()
{
Namespace = this.ProcessDefinition.Namespace,
Name = this.ProcessDefinition.Name,
Version = this.ProcessDefinition.Version
},
Input = input
}
};
workflowInstance = await this.Api.WorkflowInstances.CreateAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
}
var watchEvents = await this.Api.WorkflowInstances.MonitorAsync(workflowInstance.GetName(), workflowInstance.GetNamespace()!, cancellationToken).ConfigureAwait(false);
await foreach(var watchEvent in watchEvents)
{
switch (watchEvent.Resource.Status?.Phase)
{
case WorkflowInstanceStatusPhase.Cancelled:
await this.SetErrorAsync(new()
{
Type = ErrorType.Runtime,
Status = ErrorStatus.Runtime,
Title = ErrorTitle.Runtime,
Detail = $"The execution of workflow instance '{workflowInstance.GetQualifiedName()}' has been cancelled"
}, cancellationToken).ConfigureAwait(false);
break;
case WorkflowInstanceStatusPhase.Faulted:
await this.SetErrorAsync(workflowInstance.Status!.Error!, cancellationToken).ConfigureAwait(false);
return;
case WorkflowInstanceStatusPhase.Completed:
var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content;
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
}
}
}

}
4 changes: 4 additions & 0 deletions src/runner/Synapse.Runner/Services/TaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationToken =
}
catch(HttpRequestException ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Communication,
Expand All @@ -134,6 +135,7 @@ await this.SetErrorAsync(new Error()
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while initializing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Runtime,
Expand Down Expand Up @@ -198,6 +200,7 @@ await this.SetErrorAsync(new()
catch (OperationCanceledException) { }
catch (HttpRequestException ex)
{
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Communication,
Expand All @@ -209,6 +212,7 @@ await this.SetErrorAsync(new Error()
}
catch (Exception ex)
{
this.Logger.LogError("An error occured while executing the task '{task}': {ex}", this.Task.Instance.Reference, ex);
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Runtime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ namespace Synapse.Runner.Services;
/// <param name="services">The current <see cref="IServiceProvider"/></param>
/// <param name="expressionEvaluator">The service used to evaluate runtime expressions</param>
/// <param name="jsonSerializer">The service used to serialize/deserialize objects to/from JSON</param>
/// <param name="cloudFlowsApi">The service used to interact with the Synapse API</param>
/// <param name="api">The service used to interact with the Synapse API</param>
/// <param name="options">The service used to access the current <see cref="RunnerOptions"/></param>
/// <param name="definition">The <see cref="WorkflowDefinition"/> of the <see cref="WorkflowInstance"/> to execute</param>
/// <param name="instance">The <see cref="WorkflowInstance"/> to execute</param>
public class WorkflowExecutionContext(IServiceProvider services, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, ISynapseApiClient cloudFlowsApi, IOptions<RunnerOptions> options, WorkflowDefinition definition, WorkflowInstance instance)
public class WorkflowExecutionContext(IServiceProvider services, IExpressionEvaluator expressionEvaluator, IJsonSerializer jsonSerializer, ISynapseApiClient api, IOptions<RunnerOptions> options, WorkflowDefinition definition, WorkflowInstance instance)
: IWorkflowExecutionContext
{

Expand All @@ -51,7 +51,7 @@ public class WorkflowExecutionContext(IServiceProvider services, IExpressionEval
/// <summary>
/// Gets the service used to interact with the Synapse API
/// </summary>
protected ISynapseApiClient Api { get; } = cloudFlowsApi;
protected ISynapseApiClient Api { get; } = api;

/// <summary>
/// Gets the current <see cref="RunnerOptions"/>
Expand Down