diff --git a/src/Temporalio/Client/ITemporalClientPlugin.cs b/src/Temporalio/Client/ITemporalClientPlugin.cs new file mode 100644 index 00000000..5f4efe32 --- /dev/null +++ b/src/Temporalio/Client/ITemporalClientPlugin.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading.Tasks; + +namespace Temporalio.Client +{ + /// + /// Interface for temporal client plugins. + /// + /// + /// WARNING: This API is experimental and may change in the future. + /// + public interface ITemporalClientPlugin + { + /// + /// Gets the plugin name. + /// + string Name { get; } + + /// + /// Configures the client options. + /// + /// The client options to configure. + void ConfigureClient(TemporalClientOptions options); + + /// + /// Handles temporal connection asynchronously. + /// + /// The connection options. + /// The continuation function. + /// A task representing the asynchronous operation. + Task ConnectAsync(TemporalClientConnectOptions options, Func> continuation); + } +} \ No newline at end of file diff --git a/src/Temporalio/Client/TemporalClient.cs b/src/Temporalio/Client/TemporalClient.cs index a0b9c59a..3cac3fd9 100644 --- a/src/Temporalio/Client/TemporalClient.cs +++ b/src/Temporalio/Client/TemporalClient.cs @@ -21,6 +21,11 @@ public partial class TemporalClient : ITemporalClient /// Options for this client. public TemporalClient(ITemporalConnection connection, TemporalClientOptions options) { + foreach (var plugin in options.Plugins ?? Enumerable.Empty()) + { + plugin.ConfigureClient(options); + } + Connection = connection; Options = options; OutboundInterceptor = new Impl(this); @@ -61,10 +66,21 @@ public TemporalClient(ITemporalConnection connection, TemporalClientOptions opti /// Options for connecting. /// The connected client. public static async Task ConnectAsync( - TemporalClientConnectOptions options) => - new( - await TemporalConnection.ConnectAsync(options).ConfigureAwait(false), + TemporalClientConnectOptions options) + { + Func> connect = TemporalConnection.ConnectAsync; + if (options.Plugins != null) + { + foreach (var plugin in options.Plugins.Reverse()) + { + var localConnect = connect; + connect = connectOptions => plugin.ConnectAsync(connectOptions, localConnect); + } + } + return new( + await connect(options).ConfigureAwait(false), options.ToClientOptions()); + } /// /// Create a client to a Temporal namespace that does not connect until first call. diff --git a/src/Temporalio/Client/TemporalClientConnectOptions.cs b/src/Temporalio/Client/TemporalClientConnectOptions.cs index e6ac6298..178f1e64 100644 --- a/src/Temporalio/Client/TemporalClientConnectOptions.cs +++ b/src/Temporalio/Client/TemporalClientConnectOptions.cs @@ -66,6 +66,14 @@ public TemporalClientConnectOptions(string targetHost) /// public QueryRejectCondition? QueryRejectCondition { get; set; } + /// + /// Gets or sets the plugins. + /// + /// + /// WARNING: This API is experimental and may change in the future. + /// + public IReadOnlyCollection? Plugins { get; set; } + /// /// Create client options from a subset of these options for use in /// . @@ -79,6 +87,7 @@ public TemporalClientOptions ToClientOptions() => Interceptors = Interceptors, LoggerFactory = LoggerFactory, QueryRejectCondition = QueryRejectCondition, + Plugins = Plugins, }; } } diff --git a/src/Temporalio/Client/TemporalClientOptions.cs b/src/Temporalio/Client/TemporalClientOptions.cs index 9b3bf42d..768a9cb8 100644 --- a/src/Temporalio/Client/TemporalClientOptions.cs +++ b/src/Temporalio/Client/TemporalClientOptions.cs @@ -43,6 +43,14 @@ public class TemporalClientOptions : ICloneable /// public QueryRejectCondition? QueryRejectCondition { get; set; } + /// + /// Gets or sets the plugins. + /// + /// + /// WARNING: This API is experimental and may change in the future. + /// + public IReadOnlyCollection? Plugins { get; set; } + /// /// Create a shallow copy of these options. /// diff --git a/src/Temporalio/Common/SimplePlugin.cs b/src/Temporalio/Common/SimplePlugin.cs new file mode 100644 index 00000000..8b9aeaad --- /dev/null +++ b/src/Temporalio/Common/SimplePlugin.cs @@ -0,0 +1,243 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Temporalio.Client; +using Temporalio.Worker; + +#if NETCOREAPP3_0_OR_GREATER +using System.Runtime.CompilerServices; +#endif + +namespace Temporalio.Common +{ + /// + /// A simple plugin that implements both client and worker plugin interfaces. + /// + /// + /// WARNING: This API is experimental and may change in the future. + /// + public class SimplePlugin : ITemporalClientPlugin, ITemporalWorkerPlugin + { + /// + /// Initializes a new instance of the class. + /// + /// The plugin name. + /// The plugin options. + public SimplePlugin(string name, SimplePluginOptions? options = null) + { + Name = name; + Options = options ?? new SimplePluginOptions(); + } + + /// + /// Gets the plugin options. + /// + public SimplePluginOptions Options { get; } + + /// + /// Gets the plugin name. + /// + public string Name { get; } + + /// + /// Configures the client options. + /// + /// The client options to configure. + public virtual void ConfigureClient(TemporalClientOptions options) + { + options.DataConverter = Resolve(options.DataConverter, Options.DataConverterOption); + options.Interceptors = ResolveAppend(options.Interceptors, Options.ClientInterceptorsOption); + } + + /// + /// Handles temporal connection asynchronously. + /// + /// The connection options. + /// The continuation function. + /// A task representing the asynchronous operation. + public virtual Task ConnectAsync( + TemporalClientConnectOptions options, + Func> continuation) + { + return continuation(options); + } + + /// + /// Configures the worker options. + /// + /// The worker options to configure. + public virtual void ConfigureWorker(TemporalWorkerOptions options) + { + DoAppend(options.Activities, Options.Activities); + DoAppend(options.Workflows, Options.Workflows); + DoAppend(options.NexusServices, Options.NexusServices); + options.Interceptors = ResolveAppend(options.Interceptors, Options.WorkerInterceptorsOption); + options.WorkflowFailureExceptionTypes = ResolveAppend( + options.WorkflowFailureExceptionTypes, Options.WorkflowFailureExceptionTypesOption); + } + + /// + /// Runs the worker asynchronously. + /// + /// Result type. For most worker run calls, this is + /// . + /// The worker to run. + /// The continuation function. + /// Cancellation token to stop the worker. + /// A task representing the asynchronous operation. + public virtual async Task RunWorkerAsync( + TemporalWorker worker, + Func> continuation, + CancellationToken stoppingToken) + { + if (Options.RunContextBefore is { } before) + { + await before().ConfigureAwait(false); + } + try + { + return await continuation(worker, stoppingToken).ConfigureAwait(false); + } + finally + { + if (Options.RunContextAfter is { } after) + { + await after().ConfigureAwait(false); + } + } + } + + /// + /// Configures the replayer options. + /// + /// The replayer options to configure. + public virtual void ConfigureReplayer(WorkflowReplayerOptions options) + { + options.DataConverter = Resolve(options.DataConverter, Options.DataConverterOption); + DoAppend(options.Workflows, Options.Workflows); + options.Interceptors = ResolveAppend(options.Interceptors, Options.WorkerInterceptorsOption); + options.WorkflowFailureExceptionTypes = ResolveAppend( + options.WorkflowFailureExceptionTypes, Options.WorkflowFailureExceptionTypesOption); + } + + /// + /// Runs the replayer asynchronously. + /// + /// The replayer to run. + /// The continuation function. + /// Cancellation token to stop the replay. + /// A task representing the asynchronous operation. + public virtual async Task> ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func>> continuation, + CancellationToken cancellationToken) + { + if (Options.RunContextBefore is { } before) + { + await before().ConfigureAwait(false); + } + try + { + return await continuation(replayer, cancellationToken).ConfigureAwait(false); + } + finally + { + if (Options.RunContextAfter is { } after) + { + await after().ConfigureAwait(false); + } + } + } + +#if NETCOREAPP3_0_OR_GREATER + /// + /// Runs the replayer asynchronously. + /// + /// The replayer to run. + /// The continuation function. + /// Cancellation token to stop the replay. + /// A task representing the asynchronous operation. + public virtual async IAsyncEnumerable ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func> continuation, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + if (Options.RunContextBefore is { } before) + { + await before().ConfigureAwait(false); + } + try + { + var asyncEnum = continuation(replayer); + await foreach (var res in asyncEnum.ConfigureAwait(false).WithCancellation(cancellationToken)) + { + yield return res; + } + } + finally + { + if (Options.RunContextAfter is { } after) + { + await after().ConfigureAwait(false); + } + } + } +#endif + + private static T Resolve(T existing, SimplePluginOptions.SimplePluginOption? parameter) + { + if (parameter == null) + { + return existing; + } + var option = parameter.Constant ?? existing; + if (parameter.Configurable != null) + { + return parameter.Configurable(option); + } + return option; + } + + private static IReadOnlyCollection? ResolveAppend( + IReadOnlyCollection? existing, + SimplePluginOptions.SimplePluginOption?>? parameter) + { + if (parameter == null) + { + return existing; + } + + var option = existing; + if (existing != null && parameter.Constant != null) + { + option = existing.Concat(parameter.Constant).ToList(); + } + else if (parameter.Constant != null) + { + option = parameter.Constant; + } + if (parameter.Configurable != null) + { + return parameter.Configurable(option); + } + return option; + } + + private static void DoAppend( + IList existing, + IList? parameter) + { + if (parameter == null) + { + return; + } + + foreach (var item in parameter) + { + existing.Add(item); + } + } + } +} \ No newline at end of file diff --git a/src/Temporalio/Common/SimplePluginOptions.cs b/src/Temporalio/Common/SimplePluginOptions.cs new file mode 100644 index 00000000..e132e110 --- /dev/null +++ b/src/Temporalio/Common/SimplePluginOptions.cs @@ -0,0 +1,245 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using NexusRpc.Handlers; +using Temporalio.Activities; +using Temporalio.Client.Interceptors; +using Temporalio.Worker.Interceptors; +using Temporalio.Workflows; + +namespace Temporalio.Common +{ + /// + /// Configuration options for simple plugins. + /// + /// + /// WARNING: This API is experimental and may change in the future. + /// + public class SimplePluginOptions : ICloneable + { + /// + /// Gets or sets the data converter for the plugin. + /// + public Converters.DataConverter? DataConverter + { + get => DataConverterOption?.Constant; + set => DataConverterOption = new(value); + } + + /// + /// Gets or sets the data converter option with configuration support. + /// + public SimplePluginOption? DataConverterOption + { + get; + set; + } + + /// + /// Gets or sets the client interceptors for the plugin. + /// + public IReadOnlyCollection? ClientInterceptors + { + get => ClientInterceptorsOption?.Constant; + set => ClientInterceptorsOption = new(value); + } + + /// + /// Gets or sets the client interceptors option with configuration support. + /// + public SimplePluginOption?>? ClientInterceptorsOption + { + get; + set; + } + + /// + /// Gets the activity definitions. Most users will use AddActivity to add to this list. + /// + public IList Activities { get; } = new List(); + + /// + /// Gets the workflow definitions. Most users will use AddWorkflow to add to this list. + /// + public IList Workflows { get; } = new List(); + + /// + /// Gets the Nexus service instances. Most users will use AddNexusService to add to this + /// list. + /// + /// WARNING: Nexus support is experimental. + public IList NexusServices { get; } = new List(); + + /// + /// Gets or sets the worker interceptors for the plugin. + /// + public IReadOnlyCollection? WorkerInterceptors + { + get => WorkerInterceptorsOption?.Constant; + set => WorkerInterceptorsOption = new(value); + } + + /// + /// Gets or sets the worker interceptors option with configuration support. + /// + public SimplePluginOption?>? WorkerInterceptorsOption + { + get; + set; + } + + /// + /// Gets or sets the workflow failure exception types for the plugin. + /// + public IReadOnlyCollection? WorkflowFailureExceptionTypes + { + get => WorkflowFailureExceptionTypesOption?.Constant; + set => WorkflowFailureExceptionTypesOption = new(value); + } + + /// + /// Gets or sets the workflow failure exception types option with configuration support. + /// + public SimplePluginOption?>? WorkflowFailureExceptionTypesOption + { + get; + set; + } + + /// + /// Gets or sets a function to run before running worker/replayer. + /// + public Func? RunContextBefore { get; set; } + + /// + /// Gets or sets a function to run after running worker/replayer. + /// + public Func? RunContextAfter { get; set; } + + /// + /// Add the given delegate with as an activity. This is + /// usually a method reference. + /// + /// Delegate to add. + /// This options instance for chaining. + public SimplePluginOptions AddActivity(Delegate del) => + AddActivity(ActivityDefinition.Create(del)); + + /// + /// Add the given activity definition. Most users will use + /// instead. + /// + /// Definition to add. + /// This options instance for chaining. + public SimplePluginOptions AddActivity(ActivityDefinition definition) + { + Activities.Add(definition); + return this; + } + + /// + /// Add all methods on the given type with . + /// + /// Type to get activities from. + /// Instance to use when invoking. This must be non-null if any + /// activities are non-static. + /// This options instance for chaining. + public SimplePluginOptions AddAllActivities(T? instance) => + AddAllActivities(typeof(T), instance); + + /// + /// Add all methods on the given type with . + /// + /// Type to get activities from. + /// Instance to use when invoking. This must be non-null if any + /// activities are non-static. + /// This options instance for chaining. + public SimplePluginOptions AddAllActivities(Type type, object? instance) + { + foreach (var defn in ActivityDefinition.CreateAll(type, instance)) + { + AddActivity(defn); + } + return this; + } + + /// + /// Add the given type as a workflow. + /// + /// Type to add. + /// This options instance for chaining. + public SimplePluginOptions AddWorkflow() => AddWorkflow(typeof(T)); + + /// + /// Add the given type as a workflow. + /// + /// Type to add. + /// This options instance for chaining. + public SimplePluginOptions AddWorkflow(Type type) => + AddWorkflow(WorkflowDefinition.Create(type)); + + /// + /// Add the given workflow definition. Most users will use + /// instead. + /// + /// Definition to add. + /// This options instance for chaining. + public SimplePluginOptions AddWorkflow(WorkflowDefinition definition) + { + Workflows.Add(definition); + return this; + } + + /// + /// Add the given Nexus service handler. + /// + /// Service handler to add. It is expected to be an instance of + /// a class with a attribute. + /// This options instance for chaining. + /// WARNING: Nexus support is experimental. + public SimplePluginOptions AddNexusService(object serviceHandler) + { + NexusServices.Add(ServiceHandlerInstance.FromInstance(serviceHandler)); + return this; + } + + /// + /// Create a shallow copy of these options. + /// + /// A shallow copy of these options and any transitive options fields. + /// Also copies collections of activities and workflows. + public virtual object Clone() + { + return (SimplePluginOptions)MemberwiseClone(); + } + + /// + /// Represents a required configurable plugin option. + /// + /// The type of the option value. + public class SimplePluginOption + { + /// + /// Initializes a new instance of the class with a constant value. + /// + /// The constant value. + public SimplePluginOption(T? value) => Constant = value; + + /// + /// Initializes a new instance of the class with a configurable function. + /// + /// The configurable function. + public SimplePluginOption(Func value) => Configurable = value; + + /// + /// Gets the configurable function for the required option. + /// + public Func? Configurable { get; } + + /// + /// Gets the constant value for the required option. + /// + public T? Constant { get; } + } + } +} \ No newline at end of file diff --git a/src/Temporalio/Worker/ITemporalWorkerPlugin.cs b/src/Temporalio/Worker/ITemporalWorkerPlugin.cs new file mode 100644 index 00000000..58052189 --- /dev/null +++ b/src/Temporalio/Worker/ITemporalWorkerPlugin.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Temporalio.Worker +{ + /// + /// Interface for temporal worker plugins. + /// + /// + /// WARNING: This API is experimental and may change in the future. + /// + public interface ITemporalWorkerPlugin + { + /// + /// Gets the plugin name. + /// + string Name { get; } + + /// + /// Configures the worker options. + /// + /// The worker options to configure. + void ConfigureWorker(TemporalWorkerOptions options); + + /// + /// Runs the worker asynchronously. + /// + /// Result type. For most worker run calls, this is + /// . + /// The worker to run. + /// The continuation function. + /// Cancellation token to stop the worker. + /// A task representing the asynchronous operation. + Task RunWorkerAsync( + TemporalWorker worker, + Func> continuation, + CancellationToken stoppingToken); + + /// + /// Configures the replayer options. + /// + /// The replayer options to configure. + void ConfigureReplayer(WorkflowReplayerOptions options); + + /// + /// Runs the replayer asynchronously. + /// + /// The replayer to run. + /// The continuation function. + /// Cancellation token to stop the replay. + /// A task representing the asynchronous operation. + Task> ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func>> continuation, + CancellationToken cancellationToken); + +#if NETCOREAPP3_0_OR_GREATER + /// + /// Runs the replayer asynchronously. + /// + /// The replayer to run. + /// The continuation function. + /// Cancellation token to stop the replay. + /// A task representing the asynchronous operation. + IAsyncEnumerable ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func> continuation, + CancellationToken cancellationToken); +#endif + } +} \ No newline at end of file diff --git a/src/Temporalio/Worker/TemporalWorker.cs b/src/Temporalio/Worker/TemporalWorker.cs index 9bd1b5b1..b238b4cf 100644 --- a/src/Temporalio/Worker/TemporalWorker.cs +++ b/src/Temporalio/Worker/TemporalWorker.cs @@ -20,6 +20,7 @@ public class TemporalWorker : IDisposable private readonly ActivityWorker? activityWorker; private readonly WorkflowWorker? workflowWorker; private readonly NexusWorker? nexusWorker; + private readonly IReadOnlyCollection plugins; private IWorkerClient client; private int started; private Disposer? disposer; @@ -44,6 +45,22 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) // Clone the options to discourage mutation (but we aren't completely disabling mutation // on the Options field herein). Options = (TemporalWorkerOptions)options.Clone(); + + var localPlugins = client.Options.Plugins?.OfType() ?? Enumerable.Empty(); + if (Options.Plugins != null) + { + localPlugins = localPlugins.Concat(Options.Plugins); + } + plugins = localPlugins.ToList(); + + foreach (var plugin in plugins) + { + plugin.ConfigureWorker(Options); + } + + // Ensure later accesses use the modified version of options. + options = Options; + var bridgeClient = client.BridgeClientProvider.BridgeClient ?? throw new InvalidOperationException("Cannot use unconnected lazy client for worker"); BridgeWorker = new( @@ -218,7 +235,7 @@ public IWorkerClient Client /// Cancellation requested. /// Fatal worker failure. public Task ExecuteAsync(CancellationToken stoppingToken) => - ExecuteInternalAsync(null, stoppingToken); + ExecuteInternalAsync(null, stoppingToken); /// /// Run this worker until failure, cancelled, or task from given function completes. @@ -241,7 +258,13 @@ public Task ExecuteAsync(CancellationToken stoppingToken) => /// Fatal worker failure. public Task ExecuteAsync( Func untilComplete, CancellationToken stoppingToken = default) => - ExecuteInternalAsync(untilComplete, stoppingToken); + ExecuteInternalAsync( + () => untilComplete().ContinueWith( + _ => ValueTuple.Create(), + default, + TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, + TaskScheduler.Current), + stoppingToken); /// /// Run this worker until failure, cancelled, or task from given function completes. @@ -263,15 +286,9 @@ public Task ExecuteAsync( /// Already started. /// Cancellation requested. /// Fatal worker failure. - public async Task ExecuteAsync( - Func> untilComplete, CancellationToken stoppingToken = default) - { - TResult? ret = default; - await ExecuteInternalAsync( - async () => ret = await untilComplete.Invoke().ConfigureAwait(false), - stoppingToken).ConfigureAwait(false); - return ret!; - } + public Task ExecuteAsync( + Func> untilComplete, CancellationToken stoppingToken = default) => + ExecuteInternalAsync(untilComplete, stoppingToken); /// public void Dispose() @@ -292,8 +309,22 @@ protected virtual void Dispose(bool disposing) } } - private async Task ExecuteInternalAsync( - Func? untilComplete, CancellationToken stoppingToken) + private Task ExecuteInternalAsync( + Func>? untilComplete, CancellationToken stoppingToken) + { + Func> execute = (worker, token) => + worker.ExecuteWithPluginsAsync(untilComplete, token); + foreach (var plugin in plugins.Reverse()) + { + var localExecute = execute; + execute = (worker, token) => plugin.RunWorkerAsync(worker, localExecute, token); + } + + return execute(this, stoppingToken); + } + + private async Task ExecuteWithPluginsAsync( + Func>? untilComplete, CancellationToken stoppingToken) { if (Interlocked.Exchange(ref started, 1) != 0) { @@ -443,6 +474,16 @@ private async Task ExecuteInternalAsync( } #pragma warning restore CA1031 } + // If there is a user task, return the value, otherwise return a default form of it. We + // can trust that the default value is always non-null in no-user-task scenarios based + // on the callers in this class using ValueTuple. + if (userTask != null) + { + return await userTask.ConfigureAwait(false); + } +#pragma warning disable CS8603 // We know this is never nullable in a no-user-task scenario + return default; +#pragma warning restore CS8603 } // This class encapsulates dispose work so we can decide to do it either on Dispose() or the end of ExecuteInternalAsync(). diff --git a/src/Temporalio/Worker/TemporalWorkerOptions.cs b/src/Temporalio/Worker/TemporalWorkerOptions.cs index 8c802720..06269008 100644 --- a/src/Temporalio/Worker/TemporalWorkerOptions.cs +++ b/src/Temporalio/Worker/TemporalWorkerOptions.cs @@ -359,6 +359,14 @@ public TemporalWorkerOptions() /// public bool DisableEagerActivityExecution { get; set; } + /// + /// Gets or sets the plugins. + /// + /// + /// WARNING: This API is experimental and may change in the future. + /// + public IReadOnlyCollection? Plugins { get; set; } + /// /// Gets the TEMPORAL_DEBUG environment variable. /// diff --git a/src/Temporalio/Worker/WorkflowReplayer.cs b/src/Temporalio/Worker/WorkflowReplayer.cs index 48704223..145c5958 100644 --- a/src/Temporalio/Worker/WorkflowReplayer.cs +++ b/src/Temporalio/Worker/WorkflowReplayer.cs @@ -20,6 +20,8 @@ namespace Temporalio.Worker /// public class WorkflowReplayer { + private readonly IReadOnlyCollection plugins; + /// /// Initializes a new instance of the class. Note, some /// options validation is deferred until replay is attempted. @@ -27,6 +29,12 @@ public class WorkflowReplayer /// Replayer options. public WorkflowReplayer(WorkflowReplayerOptions options) { + plugins = options.Plugins ?? Array.Empty(); + foreach (var plugin in plugins) + { + plugin.ConfigureReplayer(options); + } + if (options.Workflows.Count == 0) { throw new ArgumentException("Must have at least one workflow"); @@ -47,10 +55,14 @@ public WorkflowReplayer(WorkflowReplayerOptions options) /// on a workflow task failure /// (e.g. non-determinism). This has no effect on workflow failures which are not reported /// as part of replay. + /// Cancellation token to stop the replay. /// Result of the replay run. public async Task ReplayWorkflowAsync( - WorkflowHistory history, bool throwOnReplayFailure = true) => - (await ReplayWorkflowsAsync(new[] { history }, throwOnReplayFailure).ConfigureAwait(false)).Single(); + WorkflowHistory history, + bool throwOnReplayFailure = true, + CancellationToken cancellationToken = default) => + (await ReplayWorkflowsAsync(new[] { history }, throwOnReplayFailure, cancellationToken). + ConfigureAwait(false)).Single(); /// /// Replay multiple workflows from the given histories. @@ -60,32 +72,24 @@ public async Task ReplayWorkflowAsync( /// on a workflow task failure /// (e.g. non-determinism) as soon as it's encountered. This has no effect on workflow /// failures which are not reported as part of replay. + /// Cancellation token to stop the replay. /// Results of the replay runs. - public async Task> ReplayWorkflowsAsync( - IEnumerable histories, bool throwOnReplayFailure = false) + public Task> ReplayWorkflowsAsync( + IEnumerable histories, + bool throwOnReplayFailure = false, + CancellationToken cancellationToken = default) { - // We could stream the results, but since the method wants them all anyways, a list is - // ok - using (var runner = new WorkflowHistoryRunner(Options, throwOnReplayFailure)) - using (var cts = new CancellationTokenSource()) + Func>> execute = + (replayer, cancellationToken) => + replayer.ReplayWorkflowsInternalAsync(histories, throwOnReplayFailure, cancellationToken); + foreach (var plugin in plugins.Reverse()) { - var workerTask = Task.Run(() => runner.RunWorkerAsync(cts.Token)); - try - { - var results = new List(); - foreach (var history in histories) - { - results.Add(await runner.RunWorkflowAsync(history).ConfigureAwait(false)); - } - return results; - } - finally - { - // Cancel and wait on complete - cts.Cancel(); - await workerTask.ConfigureAwait(false); - } + var localExecute = execute; + execute = (replayer, cancellationToken) => + plugin.ReplayWorkflowsAsync(replayer, localExecute, cancellationToken); } + + return execute(this, cancellationToken); } #if NETCOREAPP3_0_OR_GREATER @@ -103,6 +107,28 @@ public async IAsyncEnumerable ReplayWorkflowsAsync( IAsyncEnumerable histories, bool throwOnReplayFailure = false, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + Func> execute = replayer => + replayer.ReplayWorkflowsInternalAsync(histories, throwOnReplayFailure); + foreach (var plugin in plugins.Reverse()) + { + var localExecute = execute; + execute = replayer => + plugin.ReplayWorkflowsAsync(replayer, localExecute, cancellationToken); + } + + // We must have await foreach in here to make [EnumeratorCancellation] work + var asyncEnum = execute(this); + await foreach (var res in asyncEnum.WithCancellation(cancellationToken).ConfigureAwait(false)) + { + yield return res; + } + } + + private async IAsyncEnumerable ReplayWorkflowsInternalAsync( + IAsyncEnumerable histories, + bool throwOnReplayFailure, + [EnumeratorCancellation] CancellationToken cancellationToken = default) { using (var runner = new WorkflowHistoryRunner(Options, throwOnReplayFailure)) using (var shutdownTokenSource = new CancellationTokenSource()) @@ -133,6 +159,35 @@ await Task.WhenAny( } #endif + private async Task> ReplayWorkflowsInternalAsync( + IEnumerable histories, + bool throwOnReplayFailure, + CancellationToken cancellationToken) + { + // We could stream the results, but since the method wants them all anyways, a list is + // ok + using (var runner = new WorkflowHistoryRunner(Options, throwOnReplayFailure)) + using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + { + var workerTask = Task.Run(() => runner.RunWorkerAsync(cts.Token), CancellationToken.None); + try + { + var results = new List(); + foreach (var history in histories) + { + results.Add(await runner.RunWorkflowAsync(history).ConfigureAwait(false)); + } + return results; + } + finally + { + // Cancel and wait on complete + cts.Cancel(); + await workerTask.ConfigureAwait(false); + } + } + } + /// /// Runner for workflow history. /// diff --git a/src/Temporalio/Worker/WorkflowReplayerOptions.cs b/src/Temporalio/Worker/WorkflowReplayerOptions.cs index cea271a2..cbcbcbad 100644 --- a/src/Temporalio/Worker/WorkflowReplayerOptions.cs +++ b/src/Temporalio/Worker/WorkflowReplayerOptions.cs @@ -119,6 +119,11 @@ public class WorkflowReplayerOptions : ICloneable /// public IReadOnlyCollection? WorkflowFailureExceptionTypes { get; set; } + /// + /// Gets or sets the plugins. + /// + public IReadOnlyCollection? Plugins { get; set; } + /// /// Gets or sets a function to create workflow instances. /// diff --git a/tests/Temporalio.Tests/Common/PluginTests.cs b/tests/Temporalio.Tests/Common/PluginTests.cs new file mode 100644 index 00000000..0f70613e --- /dev/null +++ b/tests/Temporalio.Tests/Common/PluginTests.cs @@ -0,0 +1,239 @@ +using Temporalio.Api.Common.V1; +using Temporalio.Client; +using Temporalio.Common; +using Temporalio.Converters; +using Temporalio.Worker; +using Temporalio.Workflows; +using Xunit.Abstractions; + +namespace Temporalio.Tests.Common; + +using System.Threading; +using Xunit; + +public class PluginTests : WorkflowEnvironmentTestBase +{ + public PluginTests(ITestOutputHelper output, WorkflowEnvironment env) + : base(output, env) + { + } + + private class ClientPlugin : ITemporalClientPlugin + { + public string Name => "ClientPlugin"; + + public void ConfigureClient(TemporalClientOptions options) + { + options.Namespace = "NewNamespace"; + } + + public Task ConnectAsync( + TemporalClientConnectOptions options, + Func> continuation) => + throw new NotImplementedException(); + } + + private class WorkerPlugin : ITemporalWorkerPlugin + { + public string Name => "WorkerPlugin"; + + public void ConfigureWorker(TemporalWorkerOptions options) + { + options.TaskQueue = "NewTaskQueue"; + } + + public Task RunWorkerAsync( + TemporalWorker worker, + Func> continuation, + CancellationToken stoppingToken) => + throw new NotImplementedException(); + + public void ConfigureReplayer(WorkflowReplayerOptions options) => + throw new NotImplementedException(); + + public Task> ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func>> continuation, + CancellationToken cancellationToken) => + throw new NotImplementedException(); + + public IAsyncEnumerable ReplayWorkflowsAsync( + WorkflowReplayer replayer, + Func> continuation, + CancellationToken cancellationToken) => + throw new NotImplementedException(); + } + + private class CombinedPlugin : WorkerPlugin, ITemporalClientPlugin + { + public new string Name => "CombinedPlugin"; + + public void ConfigureClient(TemporalClientOptions options) + { + options.Namespace = "NewNamespace"; + } + + public Task ConnectAsync( + TemporalClientConnectOptions options, + Func> continuation) + { + options.TargetHost = "Invalid"; + return continuation(options); + } + } + + [Fact] + public void TestClientPlugin() + { + var newOptions = (TemporalClientOptions)Client.Options.Clone(); + newOptions.Plugins = new[] { new ClientPlugin() }; + + var client = new TemporalClient(Env.Client.Connection, newOptions); + Assert.Equal("NewNamespace", client.Options.Namespace); + } + + private class FailToConnectPlugin : ITemporalClientPlugin + { + public string Name => "FailToConnectPlugin"; + + public void ConfigureClient(TemporalClientOptions options) + { + } + + public Task ConnectAsync( + TemporalClientConnectOptions options, + Func> continuation) + { + Assert.Equal("Invalid", options.TargetHost); + throw new NotImplementedException(); + } + } + + [Fact] + public async Task TestClientPlugin_Connect_Interceptor() + { + var options = new TemporalClientConnectOptions() + { + Plugins = new ITemporalClientPlugin[] + { + new ClientPlugin(), + new FailToConnectPlugin(), + }, + }; + + // Test the interceptor is invoked, the second one is invoked second as the assert does not fail. + await Assert.ThrowsAsync(async () => await TemporalClient.ConnectAsync(options)); + } + +#pragma warning disable CA1812 + [Workflow] + private class SimpleWorkflow + { + [WorkflowRun] + public Task RunAsync(string name) => Task.FromResult($"Hello, {name}!"); + } + + [Fact] + public void TestWorkerPlugin() + { + using var worker = new TemporalWorker(Env.Client, new TemporalWorkerOptions() + { + Plugins = new[] { new CombinedPlugin() }, + }.AddWorkflow()); + Assert.Equal("NewTaskQueue", worker.Options.TaskQueue); + } + + [Fact] + public void TestWorkerPlugin_Inheritance() + { + var newOptions = (TemporalClientOptions)Client.Options.Clone(); + newOptions.Plugins = new[] { new CombinedPlugin() }; + + var client = new TemporalClient(Env.Client.Connection, newOptions); + using var worker = new TemporalWorker(client, new TemporalWorkerOptions() + { + Plugins = new[] { new CombinedPlugin() }, + }.AddWorkflow()); + Assert.Equal("NewTaskQueue", worker.Options.TaskQueue); + } + + private class Codec : IPayloadCodec + { + public Task> EncodeAsync(IReadOnlyCollection payloads) => throw new NotImplementedException(); + + public Task> DecodeAsync(IReadOnlyCollection payloads) => throw new NotImplementedException(); + } + + [Fact] + public void TestSimplePlugin_Basic() + { + var plugin = new SimplePlugin("SimplePlugin", new SimplePluginOptions() + { + DataConverter = new DataConverter(new DefaultPayloadConverter(), new DefaultFailureConverter(), new Codec()), + }.AddWorkflow()); + var newOptions = (TemporalClientOptions)Client.Options.Clone(); + newOptions.Plugins = new[] { plugin }; + + var client = new TemporalClient(Env.Client.Connection, newOptions); + Assert.NotNull(client.Options.DataConverter.PayloadCodec); + + using var worker = new TemporalWorker(client, new TemporalWorkerOptions() + { + TaskQueue = "TestSimplePlugin_Basic", + }); + Assert.NotNull(client.Options.DataConverter.PayloadCodec); + } + + [Fact] + public void TestSimplePlugin_Function() + { + var plugin = new SimplePlugin("SimplePlugin", new SimplePluginOptions() + { + DataConverterOption = new SimplePluginOptions.SimplePluginOption( + (converter) => converter with { PayloadCodec = new Codec() }), + }.AddWorkflow()); + var newOptions = (TemporalClientOptions)Client.Options.Clone(); + newOptions.Plugins = new[] { plugin }; + + var client = new TemporalClient(Env.Client.Connection, newOptions); + Assert.NotNull(client.Options.DataConverter.PayloadCodec); + + using var worker = new TemporalWorker(client, new TemporalWorkerOptions() + { + TaskQueue = "TestSimplePlugin_Function", + }); + Assert.NotNull(client.Options.DataConverter.PayloadCodec); + } + + [Fact] + public async Task TestSimplePlugin_RunContext() + { + List transitions = new(); + var plugin = new SimplePlugin("SimplePlugin", new SimplePluginOptions() + { + RunContextBefore = async () => transitions.Add("Beginning"), + RunContextAfter = async () => transitions.Add("Ending"), + }.AddWorkflow()); + var newOptions = (TemporalClientOptions)Client.Options.Clone(); + newOptions.Plugins = new[] { plugin }; + + var client = new TemporalClient(Env.Client.Connection, newOptions); + using var worker = new TemporalWorker(client, new TemporalWorkerOptions() + { + TaskQueue = "TestSimplePlugin_Function", + }); + using var cancelToken = new CancellationTokenSource(); + var execution = worker.ExecuteAsync(cancelToken.Token); + cancelToken.CancelAfter(500); + try + { + await execution; + } + catch (OperationCanceledException) + { + } + + Assert.Contains("Beginning", transitions); + Assert.Contains("Ending", transitions); + } +} \ No newline at end of file