From 143a95408f1547f799a142cba81ece34f62094da Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 14 Feb 2020 11:53:00 +0000 Subject: [PATCH 1/2] Add Stream.TransactAsyncEx; change QueryEx to align --- CHANGELOG.md | 6 ++ samples/Tutorial/FulfilmentCenter.fsx | 4 +- src/Equinox.Cosmos/Cosmos.fs | 2 +- src/Equinox.EventStore/EventStore.fs | 2 +- src/Equinox.MemoryStore/MemoryStore.fs | 2 +- src/Equinox.SqlStreamStore/SqlStreamStore.fs | 2 +- src/Equinox/Equinox.fs | 73 +++++++++++--------- src/Equinox/Flow.fs | 50 +++++++++----- 8 files changed, 85 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6e4b4d9b..70f969b59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,13 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- `Stream.TransactAsyncEx` to expose `Core.ISyncContext` at conclusion of processing, enabling one to extract the post-state `Version` etc. (This paves the way for exposing [`SessionToken`](https://github.com/jet/equinox/issues/192) at a later point without a breaking change) + ### Changed + +- `Stream.QueryEx` to supply `Core.ISyncContext` in lieu of only exposing `Version` (to align with `TransactAsyncEx`) + ### Removed ### Fixed diff --git a/samples/Tutorial/FulfilmentCenter.fsx b/samples/Tutorial/FulfilmentCenter.fsx index 2d093dc3e..aa213749b 100644 --- a/samples/Tutorial/FulfilmentCenter.fsx +++ b/samples/Tutorial/FulfilmentCenter.fsx @@ -96,7 +96,7 @@ module FulfilmentCenter = stream.Query id let queryEx fc (projection : Fold.State -> 't) : Async = let stream = resolve fc - stream.QueryEx(fun v s -> v, projection s) + stream.QueryEx(fun c -> c.Version, projection c.State) member __.UpdateName(id, value) = execute id (Register value) member __.UpdateAddress(id, value) = execute id (UpdateAddress value) @@ -181,4 +181,4 @@ module FulfilmentCenterSummary = stream.Query(Option.map (fun s -> s.state)) member __.Update(id, version, value) = execute id (Update (version,value)) - member __.TryRead id : Async = read id \ No newline at end of file + member __.TryRead id : Async = read id diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 1b2edf0f5..b64737e4f 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -1350,4 +1350,4 @@ module Events = /// Obtains the `index` from the current write Position let getNextIndex (ctx: Context) (streamName: string) : Async = - ctx.Sync(ctx.CreateStream streamName) |> stripPosition \ No newline at end of file + ctx.Sync(ctx.CreateStream streamName) |> stripPosition diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 53825d59d..6ac9dfe04 100755 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -332,7 +332,7 @@ module Token = let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamName streamVersion : StreamToken = let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion create compactedEventNumberOption (Some batchCapacityLimit) streamName streamVersion - + /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom let ofUncompactedVersion batchSize streamName streamVersion : StreamToken = ofCompactionEventNumber None 0 batchSize streamName streamVersion diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index 2245b8643..794096e43 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -90,4 +90,4 @@ type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>, /// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento] member __.FromMemento(Token.Unpack stream as streamToken, state, ?context) = - Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context) \ No newline at end of file + Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context) diff --git a/src/Equinox.SqlStreamStore/SqlStreamStore.fs b/src/Equinox.SqlStreamStore/SqlStreamStore.fs index 2149c8d6d..edad1e0fe 100644 --- a/src/Equinox.SqlStreamStore/SqlStreamStore.fs +++ b/src/Equinox.SqlStreamStore/SqlStreamStore.fs @@ -562,4 +562,4 @@ type ConnectorBase([]?readRetryPolicy, []?writeRetryPoli member __.Establish(appName) : Async = async { let! store = __.Connect() return Connection(readConnection=store, writeConnection=store, ?readRetryPolicy=readRetryPolicy, ?writeRetryPolicy=writeRetryPolicy) - } \ No newline at end of file + } diff --git a/src/Equinox/Equinox.fs b/src/Equinox/Equinox.fs index fe0511191..400e2a1a2 100755 --- a/src/Equinox/Equinox.fs +++ b/src/Equinox/Equinox.fs @@ -10,41 +10,50 @@ type MaxResyncsExhaustedException(count) = /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic type Stream<'event, 'state> ( log, stream : IStream<'event, 'state>, maxAttempts : int, - [] ?mkAttemptsExhaustedException, + [] ?createAttemptsExhaustedException, [] ?resyncPolicy) = - let transact f = - let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber f -> async { return! f }) + let transact decide mapResult = + let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF }) let throwMaxResyncsExhaustedException attempts = MaxResyncsExhaustedException attempts - let handleResyncsExceeded = defaultArg mkAttemptsExhaustedException throwMaxResyncsExhaustedException - Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) f - - /// 0. Invoke the supplied `interpret` function with the present state - /// 1. Attempt to sync the accumulated events to the stream - /// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure. - member __.Transact(interpret : 'state -> 'event list) : Async = transact (fun state -> async { return (), interpret state }) - - /// 0. Invoke the supplied `decide` function with the present state - /// 1. Attempt to sync the accumulated events to the stream - /// 2. Yield result - /// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure. - member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> = transact (fun state -> async { return decide state }) - - /// 0. Invoke the supplied _Async_ `decide` function with the present state - /// 1. Attempt to sync the accumulated events to the stream - /// 2. Yield result - /// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure. - member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = transact decide - - /// Project from the folded `State` without executing a decision flow as `Decide` does - member __.Query(projection : 'state -> 'view) : Async<'view> = Flow.query(stream, log, fun syncState -> projection syncState.State) - - /// Project from the folded `State` (with the current version of the stream supplied for context) without executing a decision flow as `Decide` does - member __.QueryEx(projection : int64 -> 'state -> 'view) : Async<'view> = Flow.query(stream, log, fun syncState -> projection syncState.Version syncState.State) - - /// Low-level helper to allow one to obtain a reference to a stream and state pair (including the position) in order to pass it as a continuation within the application - /// Such a memento is then held within the application and passed in lieu of a StreamId to the StreamResolver in order to avoid having to reload state - member __.CreateMemento() : Async = Flow.query(stream, log, fun syncState -> syncState.Memento) + let handleResyncsExceeded = defaultArg createAttemptsExhaustedException throwMaxResyncsExhaustedException + Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) decide mapResult + + /// 0. Invoke the supplied interpret function with the present state + /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream + /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. + member __.Transact(interpret : 'state -> 'event list) : Async = + transact (fun state -> async { return (), interpret state }) (fun () _context -> ()) + + /// 0. Invoke the supplied decide function with the present state, holding the 'result + /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream + /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. + /// 2. Yield result + member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> = + transact (fun state -> async { return decide state }) (fun result _context -> result) + + /// 0. Invoke the supplied _Async_ decide function with the present state, holding the 'result + /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream + /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. + /// 2. Yield result + member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = + transact decide (fun result _context -> result) + + /// 0. Invoke the supplied _Async_ decide function with the present state, holding the 'result + /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream + /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. + /// 2. Uses mapResult to render the final outcome from the 'result and/or the final ISyncContext + /// 3. Yields the outcome + member __.TransactAsyncEx(decide : 'state -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> = + transact decide mapResult + + /// Project from the folded 'state, without executing a decision flow as Transact does + member __.Query(projection : 'state -> 'view) : Async<'view> = + Flow.query (stream, log, fun syncState -> projection (syncState :> ISyncContext<'state>).State) + + /// Project from the stream's 'state (including extended context), without executing a decision flow as Transact does + member __.QueryEx(projection : ISyncContext<'state> -> 'view) : Async<'view> = + Flow.query (stream, log, projection) /// Store-agnostic Context.Resolve Options type ResolveOption = diff --git a/src/Equinox/Flow.fs b/src/Equinox/Flow.fs index 3141bc453..93bc1af0c 100755 --- a/src/Equinox/Flow.fs +++ b/src/Equinox/Flow.fs @@ -20,14 +20,25 @@ type SyncResult<'state> = /// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code. type IStream<'event, 'state> = /// Obtain the state from the target stream - abstract Load : log: ILogger - -> Async + abstract Load : log: ILogger -> Async /// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream /// SyncResult.Written: implies the state is now the value represented by the Result's value /// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry abstract TrySync : log: ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async> +/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required +type ISyncContext<'state> = + + /// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via a Resolver's FromMemento method + abstract member CreateMemento : unit -> StreamToken * 'state + + /// Exposes the underlying Store's internal Version/Index (which, depending on the Codec, may or may not be reflected in the last event presented) + abstract member Version : int64 + + /// The present State of the stream within the context of this Flow + abstract member State : 'state + /// Internal implementation of the Store agnostic load + run/render. See Equinox.fs for App-facing APIs. module internal Flow = @@ -37,11 +48,7 @@ module internal Flow = trySync : ILogger * StreamToken * 'state * 'event list -> Async>) = let mutable tokenAndState = originState - member __.Memento = tokenAndState - member __.State = snd __.Memento - member __.Version = (fst __.Memento).version - - member __.TryOr(log, events, handleFailureResync : (Async -> Async)) : Async = async { + let trySyncOr log events (handleFailureResync : Async -> Async) : Async = async { let! res = let token, state = tokenAndState in trySync (log,token,state,events) match res with | SyncResult.Conflict resync -> @@ -50,12 +57,19 @@ module internal Flow = tokenAndState <- token', streamState' return true } + interface ISyncContext<'state> with + member __.CreateMemento() = tokenAndState + member __.State = snd tokenAndState + member __.Version = (fst tokenAndState).version + + member __.TryWithoutResync(log : ILogger, events) : Async = + trySyncOr log events (fun _resync -> async { return false }) member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async = let resyncInPreparationForRetry resync = async { let! streamState' = runResync log attemptNumber resync tokenAndState <- streamState' return false } - __.TryOr(log, events, resyncInPreparationForRetry) + trySyncOr log events resyncInPreparationForRetry /// Process a command, ensuring a consistent final state is established on the stream. /// 1. make a decision predicated on the known state @@ -65,41 +79,41 @@ module internal Flow = let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException) (syncState : SyncState<'event, 'state>) (decide : 'state -> Async<'result * 'event list>) - : Async<'result> = + (mapResult : 'result -> SyncState<'event, 'state> -> 'resultEx) + : Async<'resultEx> = if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") /// Run a decision cycle - decide what events should be appended given the presented state - let rec loop attempt : Async<'result> = async { + let rec loop attempt : Async<'resultEx> = async { let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) - let! result, events = decide syncState.State + let! result, events = decide (syncState :> ISyncContext<'state>).State if List.isEmpty events then log.Debug "No events generated" - return result + return mapResult result syncState elif attempt = maxSyncAttempts then // Special case: on final attempt, we won't be `resync`ing; we're giving up - let! committed = syncState.TryOr(log, events, fun _resync -> async { return false }) - + let! committed = syncState.TryWithoutResync(log, events) if not committed then log.Debug "Max Sync Attempts exceeded" return raise (createMaxAttemptsExhaustedException attempt) else - return result + return mapResult result syncState else let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events) if not committed then log.Debug "Resyncing and retrying" return! loop (attempt + 1) else - return result } + return mapResult result syncState } /// Commence, processing based on the incoming state loop 1 - let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide : Async<'result> = async { + let transact (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async { let! streamState = stream.Load log let syncState = SyncState(streamState, stream.TrySync) - return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide } + return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide mapResult } let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event, 'state> -> 'result) : Async<'result> = async { let! streamState = stream.Load log From f2368dc8c9d75e85c4bbd4179883103bbfd75de0 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 14 Feb 2020 11:58:07 +0000 Subject: [PATCH 2/2] Add PR id --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70f969b59..dabab767a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,11 +10,11 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added -- `Stream.TransactAsyncEx` to expose `Core.ISyncContext` at conclusion of processing, enabling one to extract the post-state `Version` etc. (This paves the way for exposing [`SessionToken`](https://github.com/jet/equinox/issues/192) at a later point without a breaking change) +- `Stream.TransactAsyncEx`, exposing the `Core.ISyncContext` at conclusion of the sync operation, affording the ability to examine the post-state `Version` etc. (This paves the way for exposing [`SessionToken`](https://github.com/jet/equinox/issues/192) at a later point without a breaking change) [#194](https://github.com/jet/equinox/pull/194) ### Changed -- `Stream.QueryEx` to supply `Core.ISyncContext` in lieu of only exposing `Version` (to align with `TransactAsyncEx`) +- `Stream.QueryEx` to supply `Core.ISyncContext` in lieu of only exposing `Version` (to align with `TransactAsyncEx`) [#194](https://github.com/jet/equinox/pull/194) ### Removed ### Fixed