diff --git a/CHANGELOG.md b/CHANGELOG.md index fb17fd826..e6f4269fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Removed +- Removed `netstandard2.1`; minimum target now `net6.0` [#407](https://github.com/jet/equinox/pull/407) - Remove explicit `net461` handling; minimum target now `net6.0` / `FSharp.Core` v `6.0.0` [#310](https://github.com/jet/equinox/pull/310) [#323](https://github.com/jet/equinox/pull/323) [#354](https://github.com/jet/equinox/pull/354) - Remove `Equinox.Core.ICache` (there is/was only one impl, and the interface has changed as part of [#386](https://github.com/jet/equinox/pull/386)) [#389](https://github.com/jet/equinox/pull/389) diff --git a/samples/Store/Domain/Domain.fsproj b/samples/Store/Domain/Domain.fsproj index d413b2001..5d71b0c18 100644 --- a/samples/Store/Domain/Domain.fsproj +++ b/samples/Store/Domain/Domain.fsproj @@ -1,7 +1,7 @@  - netstandard2.1 + net6.0 diff --git a/samples/Store/Integration/AutoDataAttribute.fs b/samples/Store/Integration/AutoDataAttribute.fs index b9d4da8ae..d0a32adf4 100644 --- a/samples/Store/Integration/AutoDataAttribute.fs +++ b/samples/Store/Integration/AutoDataAttribute.fs @@ -2,6 +2,22 @@ open System +open Domain +open FsCheck.FSharp +open FSharp.UMX + +module ArbMap = + let defaultGen<'t> = ArbMap.defaults |> ArbMap.generate<'t> + +type FsCheckGenerators = + static member SkuId = ArbMap.defaultGen |> Gen.map SkuId |> Arb.fromGen + static member ContactPreferencesId = + ArbMap.defaultGen + |> Gen.map (fun x -> sprintf "%s@test.com" (x.ToString("N"))) + |> Gen.map ContactPreferences.ClientId + |> Arb.fromGen + static member RequestId = ArbMap.defaultGen |> Gen.map (fun x -> RequestId.parse %x) |> Arb.fromGen + type AutoDataAttribute() = inherit FsCheck.Xunit.PropertyAttribute(Arbitrary = [|typeof|], MaxTest = 1, QuietOnSuccess = true) diff --git a/samples/Store/Integration/Infrastructure.fs b/samples/Store/Integration/Infrastructure.fs deleted file mode 100644 index a3aecccfd..000000000 --- a/samples/Store/Integration/Infrastructure.fs +++ /dev/null @@ -1,16 +0,0 @@ -namespace global - -open Domain -open FsCheck.FSharp -open System - -module Arb = - let generate<'t> = ArbMap.defaults |> ArbMap.generate<'t> - -type FsCheckGenerators = - static member SkuId = Arb.generate |> Gen.map SkuId |> Arb.fromGen - static member ContactPreferencesId = - Arb.generate - |> Gen.map (fun x -> sprintf "%s@test.com" (x.ToString("N"))) - |> Gen.map ContactPreferences.ClientId - |> Arb.fromGen diff --git a/samples/Store/Integration/Integration.fsproj b/samples/Store/Integration/Integration.fsproj index 501d81d4e..cb5a6825e 100644 --- a/samples/Store/Integration/Integration.fsproj +++ b/samples/Store/Integration/Integration.fsproj @@ -6,7 +6,6 @@ - diff --git a/samples/TodoBackend/TodoBackend.fsproj b/samples/TodoBackend/TodoBackend.fsproj index cb9056ba3..617e2e9c7 100644 --- a/samples/TodoBackend/TodoBackend.fsproj +++ b/samples/TodoBackend/TodoBackend.fsproj @@ -1,7 +1,7 @@  - netstandard2.1 + net6.0 diff --git a/src/Equinox.Core/Cache.fs b/src/Equinox.Core/Cache.fs index 748c2e705..a8f7f8f2a 100755 --- a/src/Equinox.Core/Cache.fs +++ b/src/Equinox.Core/Cache.fs @@ -34,19 +34,22 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, /// Coordinates having a max of one in-flight request across all staleness-tolerant loads at all times // Follows high level flow of AsyncCacheCell.Await - read the comments there, and the AsyncCacheCell tests first! member x.ReadThrough(maxAge: TimeSpan, isStale, load: Func<_, _>) = task { + let act = System.Diagnostics.Activity.Current + let setCacheHit hit = if act <> null then act.SetTag(Tags.cache_hit, hit) |> ignore let cacheEntryValidityCheckTimestamp = System.Diagnostics.Stopwatch.GetTimestamp() - let isWithinMaxAge cachedValueTimestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - cachedValueTimestamp) <= maxAge.TotalSeconds + let age timestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - timestamp) + if act <> null then act.SetTag(Tags.cache_age, 1000. * age verifiedTimestamp) |> ignore + let isWithinMaxAge cachedValueTimestamp = age cachedValueTimestamp <= maxAge.TotalSeconds let fetchStateConsistently () = struct (cell, tryGet (), isWithinMaxAge verifiedTimestamp) match lock x fetchStateConsistently with - | _, ValueSome cachedValue, true -> - return cachedValue + | _, ValueSome cachedValue, true -> setCacheHit true; return cachedValue | ourInitialCellState, maybeBaseState, _ -> // If it's not good enough for us, trigger a request (though someone may have beaten us to that) + setCacheHit false // Inspect/await any concurrent attempt to see if it is sufficient for our needs match! ourInitialCellState.TryAwaitValid() with | ValueSome (fetchCommencedTimestamp, res) when isWithinMaxAge fetchCommencedTimestamp -> return res | _ -> - // .. it wasn't; join the race to dispatch a request (others following us will share our fate via the TryAwaitValid) let newInstance = AsyncLazy(fun () -> load.Invoke maybeBaseState) let _ = Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState) @@ -81,13 +84,12 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) = // if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws) member internal _.Load(key, maxAge, isStale, policy, loadOrReload, ct) = task { let loadOrReload maybeBaseState = task { - let act = System.Diagnostics.Activity.Current - if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore let ts = System.Diagnostics.Stopwatch.GetTimestamp() let! res = loadOrReload ct maybeBaseState return struct (ts, res) } if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it let maybeBaseState = tryLoad key + let act = System.Diagnostics.Activity.Current in act.SetTag(Tags.cache_hit, ValueOption.isSome maybeBaseState) |> ignore let! timestamp, res = loadOrReload maybeBaseState addOrMergeCacheEntry isStale key policy timestamp res return res diff --git a/src/Equinox.Core/Category.fs b/src/Equinox.Core/Category.fs index fa7c2dc4c..ef6f47e1c 100755 --- a/src/Equinox.Core/Category.fs +++ b/src/Equinox.Core/Category.fs @@ -24,8 +24,6 @@ type ICategory<'event, 'state, 'context> = // Low level stream impl, used by Store-specific Category types that layer policies such as Caching in namespace Equinox -open Equinox.Core.Tracing -open System.Diagnostics open System.Threading open System.Threading.Tasks @@ -42,12 +40,10 @@ type Category<'event, 'state, 'context> member _.LoadEmpty() = empty member _.Load(maxAge, requireLeader, ct) = task { - use act = source.StartActivity("Load", ActivityKind.Client) - if act <> null then act.AddStream(categoryName, streamId, streamName).AddLeader(requireLeader).AddStale(maxAge) |> ignore + Equinox.Core.Tracing.Load.setTags(categoryName, streamId, streamName, requireLeader, maxAge) return! inner.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) } member _.TrySync(attempt, (token, originState), events, ct) = task { - use act = source.StartActivity("TrySync", ActivityKind.Client) - if act <> null then act.AddStream(categoryName, streamId, streamName).AddSyncAttempt(attempt) |> ignore + Equinox.Core.Tracing.TrySync.setTags(attempt, events) let log = if attempt = 1 then log else log.ForContext("attempts", attempt) return! inner.TrySync(log, categoryName, streamId, streamName, context, init, token, originState, events, ct) } } diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index 43965761a..1a5fd44b0 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -5,7 +5,6 @@ - diff --git a/src/Equinox.Core/Tracing.fs b/src/Equinox.Core/Tracing.fs deleted file mode 100644 index 1c8916d41..000000000 --- a/src/Equinox.Core/Tracing.fs +++ /dev/null @@ -1,40 +0,0 @@ -module Equinox.Core.Tracing - -open System.Diagnostics - -let source = new ActivitySource("Equinox") - -[] -type ActivityExtensions private () = - - [] - static member AddLeader(act: Activity, requiresLeader) = - if requiresLeader then act.AddTag("eqx.requires_leader", true) else act - - [] - static member AddRetryAttempt(act: Activity, attempt: int) = - if attempt > 1 then act.AddTag("eqx.retry_count", attempt - 1) else act - - [] - static member AddSyncAttempt(act: Activity, attempt: int) = - if attempt > 1 then act.AddTag("eqx.resync_count", attempt - 1) else act - - [] - static member AddStale(act: Activity, maxAge: System.TimeSpan) = - if maxAge.Ticks <> 0L then act.AddTag("eqx.allow_stale", true) else act - - [] - static member AddStream(act: Activity, category: string, streamId: string, streamName: string) = - act.AddTag("eqx.stream_name", streamName).AddTag("eqx.stream_id", streamId).AddTag("eqx.category", category) - - [] - static member AddCacheHit(act: Activity, hit: bool) = - act.AddTag("eqx.cache_hit", hit) - - [] - static member IncMetric(act: Activity, count: int, bytes: int) = - let currentCount = act.GetTagItem("eqx.count") |> ValueOption.ofObj |> ValueOption.map unbox |> ValueOption.defaultValue 0 - let currentBytes = act.GetTagItem("eqx.bytes") |> ValueOption.ofObj |> ValueOption.map unbox |> ValueOption.defaultValue 0 - let count = count + currentCount - let bytes = bytes + currentBytes - act.SetTag("eqx.count", count).SetTag("eqx.bytes", bytes) diff --git a/src/Equinox.MessageDb/Equinox.MessageDb.fsproj b/src/Equinox.MessageDb/Equinox.MessageDb.fsproj index 4b193a802..58923da17 100644 --- a/src/Equinox.MessageDb/Equinox.MessageDb.fsproj +++ b/src/Equinox.MessageDb/Equinox.MessageDb.fsproj @@ -8,11 +8,7 @@ Infrastructure.fs - - Internal.fs - - diff --git a/src/Equinox.MessageDb/MessageDb.fs b/src/Equinox.MessageDb/MessageDb.fs index 7c93cf018..9a17a814b 100644 --- a/src/Equinox.MessageDb/MessageDb.fs +++ b/src/Equinox.MessageDb/MessageDb.fs @@ -4,260 +4,112 @@ open Equinox.Core open Equinox.Core.Tracing open Equinox.MessageDb.Core open FsCodec -open Serilog open System -open System.Diagnostics open System.Text.Json open System.Threading open System.Threading.Tasks type EventBody = ReadOnlyMemory -module Log = - - /// Name of Property used for Metric in LogEvents. - let [] PropertyTag = "mdbEvt" - - [] - type Measurement = { stream: string; interval: StopwatchInterval; bytes: int; count: int } - [] - type Metric = - | Slice of Measurement - | Batch of slices: int * Measurement - | ReadLast of Measurement - | WriteSuccess of Measurement - | WriteConflict of Measurement - let [] (|MetricEvent|_|) (logEvent: Serilog.Events.LogEvent): Metric voption = - let mutable p = Unchecked.defaultof<_> - logEvent.Properties.TryGetValue(PropertyTag, &p) |> ignore - match p with Log.ScalarValue (:? Metric as e) -> ValueSome e | _ -> ValueNone - - /// Attach a property to the captured event record to hold the metric information - let internal event (value: Metric) = Internal.Log.withScalarProperty PropertyTag value - let prop name value (log: ILogger) = log.ForContext(name, value) - let bytesToString (bytes: EventBody) = System.Text.Encoding.UTF8.GetString bytes.Span - let propEvents name (kvps: System.Collections.Generic.KeyValuePair seq) (log: ILogger) = - let items = seq { for kv in kvps do yield sprintf "{\"%s\": %s}" kv.Key kv.Value } - log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items)) - let propEventData name (events: IEventData[]) (log: ILogger) = - log |> propEvents name (seq { - for x in events do - yield System.Collections.Generic.KeyValuePair<_, _>(x.EventType, bytesToString x.Data) }) - let propResolvedEvents name (events: ITimelineEvent[]) (log: ILogger) = - log |> propEvents name (seq { - for x in events do - yield System.Collections.Generic.KeyValuePair<_, _>(x.EventType, bytesToString x.Data) }) - let withLoggedRetries<'t> retryPolicy (contextLabel: string) (f: ILogger -> CancellationToken -> Task<'t>) log ct: Task<'t> = +module Activity = + let setTags (tags: (string * obj)[]) = + let act = System.Diagnostics.Activity.Current + if act <> null then for k,v in tags do act.SetTag(k, v) |> ignore + +module Retry = + let withSpanTag<'t> retryPolicy (f: CancellationToken -> Task<'t>) ct: Task<'t> = match retryPolicy with - | None -> f log ct + | None -> f ct | Some retryPolicy -> - let withLoggingContextWrapping count = - let log = if count = 1 then log else log |> prop contextLabel count - let act = Activity.Current in if act <> null then act.AddRetryAttempt(count) |> ignore - f log - retryPolicy withLoggingContextWrapping - - /// NB Caveat emptor; this is subject to unlimited change without the major version changing - while the `dotnet-templates` repo will be kept in step, and - /// the ChangeLog will mention changes, it's critical to not assume that the presence or nature of these helpers be considered stable - module InternalMetrics = - - module Stats = - let inline (|Stats|) ({ interval = i }: Measurement) = int64 i.ElapsedMilliseconds - - let (|Read|ReadL|Write|Resync|Rollup|) = function - | Slice (Stats s) -> Read s - // slices are rolled up into batches so be sure not to double-count - | Batch (_, Stats s) -> Rollup s - | ReadLast (Stats s) -> ReadL s - | WriteSuccess (Stats s) -> Write s - | WriteConflict (Stats s) -> Resync s - type Counter = - { mutable count: int64; mutable ms: int64 } - static member Create() = { count = 0L; ms = 0L } - member x.Ingest(ms) = - Interlocked.Increment(&x.count) |> ignore - Interlocked.Add(&x.ms, ms) |> ignore - type LogSink() = - static let epoch = Stopwatch.StartNew() - static member val Read = Counter.Create() with get, set - static member val ReadL = Counter.Create() with get, set - static member val Write = Counter.Create() with get, set - static member val Resync = Counter.Create() with get, set - static member Restart() = - LogSink.Read <- Counter.Create() - LogSink.ReadL <- Counter.Create() - LogSink.Write <- Counter.Create() - LogSink.Resync <- Counter.Create() - let span = epoch.Elapsed - epoch.Restart() - span - interface Serilog.Core.ILogEventSink with - member _.Emit logEvent = logEvent |> function - | MetricEvent (Read stats) -> LogSink.Read.Ingest stats - | MetricEvent (ReadL stats) -> LogSink.ReadL.Ingest stats - | MetricEvent (Write stats) -> LogSink.Write.Ingest stats - | MetricEvent (Resync stats) -> LogSink.Resync.Ingest stats - | MetricEvent (Rollup _) -> () - | _ -> () - - /// Relies on feeding of metrics from Log through to Stats.LogSink - /// Use Stats.LogSink.Restart() to reset the start point (and stats) where relevant - let dump (log: ILogger) = - let stats = - [ "Read", Stats.LogSink.Read - "ReadL", Stats.LogSink.ReadL - "Write", Stats.LogSink.Write - "Resync", Stats.LogSink.Resync ] - let logActivity name count lat = - log.Information("{name}: {count:n0} requests; Average latency: {lat:n0}ms", - name, count, (if count = 0L then Double.NaN else float lat/float count)) - let mutable rows, totalCount, totalMs = 0, 0L, 0L - for name, stat in stats do - if stat.count <> 0L then - totalCount <- totalCount + stat.count - totalMs <- totalMs + stat.ms - logActivity name stat.count stat.ms - rows <- rows + 1 - // Yes, there's a minor race here between the use of the values and the reset - let duration = Stats.LogSink.Restart() - if rows > 1 then logActivity "TOTAL" totalCount totalMs - let measures: (string * (TimeSpan -> float)) list = [ "s", fun x -> x.TotalSeconds(*; "m", fun x -> x.TotalMinutes; "h", fun x -> x.TotalHours*) ] - let logPeriodicRate name count = log.Information("rp{name} {count:n0}", name, count) - for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) + let withRetryTag count = + Activity.setTags [|Tags.append_retries, count|] + f + retryPolicy withRetryTag module private Write = - let private writeEventsAsync (writer: MessageDbWriter) streamName version events ct: Task = - writer.WriteMessages(streamName, events, version, ct) let inline len (bytes: EventBody) = bytes.Length let private eventDataLen (x: IEventData) = len x.Data + len x.Meta let private eventDataBytes events = events |> Array.sumBy eventDataLen - let private writeEventsLogged writer streamName version events (log: ILogger) ct: Task = task { - let act = Activity.Current - let log = if not (log.IsEnabled Events.LogEventLevel.Debug) then log else log |> Log.propEventData "Json" events - let bytes, count = eventDataBytes events, events.Length - let log = log |> Log.prop "bytes" bytes - if act <> null then act.AddExpectedVersion(version).IncMetric(count, bytes) |> ignore - let! t, result = writeEventsAsync writer streamName version events |> Stopwatch.time ct - let reqMetric: Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count} - let resultLog, evt = - match result with - | MdbSyncResult.Written x -> - if act <> null then - act.SetStatus(ActivityStatusCode.Ok).AddTag("eqx.new_version", x) |> ignore - log |> Log.prop "currentPosition" x, Log.WriteSuccess reqMetric - | MdbSyncResult.ConflictUnknown -> - let eventTypes = [| for x in events -> x.EventType |] - if act <> null then act.RecordConflict().AddTag("eqx.event_types", eventTypes) |> ignore - let writeLog = log |> Log.prop "stream" streamName |> Log.prop "count" count - writeLog.Information("MdbTrySync WrongExpectedVersionException writing {eventTypes}, expected {expectedVersion}", eventTypes, version) - log, Log.WriteConflict reqMetric - (resultLog |> Log.event evt).Information("Mdb{action:l} count={count} conflict={conflict}", - "Write", count, match evt with Log.WriteConflict _ -> true | _ -> false) + let private writeEventsLogged (writer: MessageDbWriter) streamName version events ct: Task = task { + Activity.setTags [|Tags.append_bytes, eventDataBytes events|] + let! result = writer.WriteMessages(streamName, events, version, ct) + match result with + | MdbSyncResult.Written x -> Activity.setTags [|Tags.append_version, x|] + | MdbSyncResult.ConflictUnknown -> + let eventTypes = + if events.Length <= 3 + then [| for x in events -> x.EventType |] + else [| for x in Seq.take 3 events -> x.EventType |] + Activity.setTags [|Tags.conflict, true; Tags.append_types, eventTypes|] return result } - let writeEvents log retryPolicy writer (category, streamId, streamName) version events ct: Task = task { + let writeEvents retryPolicy writer streamName version events ct: Task = task { let call = writeEventsLogged writer streamName version events - return! Log.withLoggedRetries retryPolicy "writeAttempt" call log ct } + return! Retry.withSpanTag retryPolicy call ct } module Read = + module LoadMethod = + [] + let last = "Last" + [] + let batchForward = "BatchForward" + [] type StreamEventsSlice = { Messages: ITimelineEvent[]; IsEnd: bool; LastVersion: int64 } - let private toSlice (events: ITimelineEvent[]) isLast: StreamEventsSlice= + let private toSlice (events: ITimelineEvent[]) isLast: StreamEventsSlice = let lastVersion = match Array.tryLast events with Some ev -> ev.Index | None -> -1L { Messages = events; IsEnd = isLast; LastVersion = lastVersion } - let private readSliceAsync (reader: MessageDbReader) (streamName: string) (batchSize: int64) (startPos: int64) (requiresLeader: bool) ct = task { - let! page = reader.ReadStream(streamName, startPos, batchSize, requiresLeader, ct) + let private readSliceAsync (reader: MessageDbReader) (streamName: string) (batchSize: int64) (requiresLeader: bool) (fromVersion: int64) ct = task { + let! page = reader.ReadStream(streamName, fromVersion, batchSize, requiresLeader, ct) let isLast = int64 page.Length < batchSize return toSlice page isLast } let private readLastEventAsync (reader: MessageDbReader) (streamName: string) (requiresLeader: bool) (eventType: string option) ct = task { let! events = reader.ReadLastEvent(streamName, requiresLeader, ct, ?eventType = eventType) - return toSlice events false } + return toSlice events true } let inline len (bytes: EventBody) = bytes.Length let private resolvedEventLen (x: ITimelineEvent) = len x.Data + len x.Meta let private resolvedEventBytes events = events |> Array.sumBy resolvedEventLen - let private loggedReadSlice reader streamName batchSize requiresLeader startPos batchIndex (log: ILogger) ct: Task<_> = task { - let act = Activity.Current - let! t, slice = readSliceAsync reader streamName batchSize startPos requiresLeader |> Stopwatch.time ct - let bytes, count = slice.Messages |> resolvedEventBytes, slice.Messages.Length - let reqMetric: Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count} - let evt = Log.Slice reqMetric - if act <> null then act.IncMetric(count, bytes).AddLastVersion(slice.LastVersion) |> ignore - let log = if not (log.IsEnabled Events.LogEventLevel.Debug) then log else log |> Log.propResolvedEvents "Json" slice.Messages - (log |> Log.prop "startPos" startPos |> Log.prop "bytes" bytes |> Log.event evt).Information("Mdb{action:l} count={count} version={version}", - "Read", count, slice.LastVersion) - return slice } - - let private readBatches (log: ILogger) fold originState tryDecode (readSlice: int64 -> int -> ILogger -> CancellationToken -> Task) - (maxPermittedBatchReads: int option) (startPosition: int64) ct - : Task = - let mutable batchCount, eventCount, pos = 0, 0, startPosition + let private readBatches fold originState tryDecode (readSlice: int64 -> CancellationToken -> Task) + (maxPermittedBatchReads: int option) (fromVersion: int64) ct + : Task = + let mutable batchCount, eventCount, pos = 0, 0, fromVersion let mutable version = -1L let mutable state = originState let rec loop () : Task = task { - match maxPermittedBatchReads with - | Some mpbr when batchCount >= mpbr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" - | _ -> () - - let batchLog = log |> Log.prop "batchIndex" batchCount - let! slice = readSlice pos batchCount batchLog ct + let! slice = readSlice pos ct version <- max version slice.LastVersion state <- slice.Messages |> Seq.chooseV tryDecode |> fold state batchCount <- batchCount + 1 eventCount <- eventCount + slice.Messages.Length pos <- slice.LastVersion + 1L if not slice.IsEnd then + match maxPermittedBatchReads with + | Some mpbr when batchCount >= mpbr -> invalidOp "batch Limit exceeded" + | _ -> () return! loop () } task { do! loop () - let act = Activity.Current - if act <> null then act.AddBatches(batchCount).AddLastVersion(version) |> ignore - return version, state, batchCount, eventCount } - - let private logBatchRead streamName batches events t version (log: ILogger) = - let reqMetric: Log.Measurement = { stream = streamName; interval = t; bytes = 0; count = 0} - let action = "Load" - let evt = Log.Metric.Batch (1, reqMetric) - (log |> Log.event evt).Information( - "Mdb{action:l} stream={stream} count={count}/{batches} version={version}", - action, streamName, events, batches, version) - - let private logLastEventRead streamName t events (version: int64) (log: ILogger) = - let bytes = resolvedEventBytes events - let count = events.Length - let reqMetric: Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count} - let evt = Log.Metric.ReadLast reqMetric - let act = Activity.Current - if act <> null then act.IncMetric(count, bytes).AddLastVersion(version) |> ignore - (log |> Log.prop "bytes" bytes |> Log.event evt).Information( - "Mdb{action:l} stream={stream} count={count} version={version}", - "ReadL", streamName, count, version) - - let internal loadLastEvent (log: ILogger) retryPolicy (reader: MessageDbReader) requiresLeader streamName eventType ct - : Task[]> = task { - let act = Activity.Current - if act <> null then act.AddLoadMethod("Last") |> ignore - let read _ = readLastEventAsync reader streamName requiresLeader eventType - - let! t, page = Log.withLoggedRetries retryPolicy "readAttempt" read log |> Stopwatch.time ct + Activity.setTags [|Tags.batches, batchCount; Tags.loaded_count, eventCount; Tags.read_version, version|] + return version, state } - log |> logLastEventRead streamName t page.Messages page.LastVersion + let internal loadLastEvent retryPolicy (reader: MessageDbReader) requiresLeader streamName eventType ct + : Task[]> = task { + let read = readLastEventAsync reader streamName requiresLeader eventType + let! page = Retry.withSpanTag retryPolicy read ct + Activity.setTags [|Tags.load_method, LoadMethod.last; Tags.loaded_count, page.Messages.Length; Tags.loaded_bytes, resolvedEventBytes page.Messages|] return page.LastVersion, page.Messages } - let internal loadForwardsFrom (log: ILogger) fold initial tryDecode retryPolicy reader batchSize maxPermittedBatchReads streamName startPosition requiresLeader ct + let internal loadForwardsFrom fold initial tryDecode retryPolicy reader batchSize maxPermittedBatchReads streamName (fromVersion: int64) requiresLeader ct : Task = task { - let act = Activity.Current - if act <> null then act.AddBatchSize(batchSize).AddStartPosition(startPosition).AddLoadMethod("BatchForward") |> ignore - let call = loggedReadSlice reader streamName batchSize requiresLeader - let retryingLoggingReadSlice pos batchIndex = Log.withLoggedRetries retryPolicy "readAttempt" (call pos batchIndex) - let log = log |> Log.prop "batchSize" batchSize |> Log.prop "stream" streamName - let! t, (version, state, batchCount, eventCount) = readBatches log fold initial tryDecode retryingLoggingReadSlice maxPermittedBatchReads startPosition |> Stopwatch.time ct - log |> logBatchRead streamName batchCount eventCount t version - return version, state } + let call = readSliceAsync reader streamName batchSize requiresLeader + let retryingReadSlice version = Retry.withSpanTag retryPolicy (call version) + Activity.setTags [|Tags.batch_size, batchSize; Tags.loaded_from_version, fromVersion; Tags.load_method, LoadMethod.batchForward|] + return! readBatches fold initial tryDecode retryingReadSlice maxPermittedBatchReads fromVersion ct } module private Token = @@ -323,38 +175,39 @@ type MessageDbContext(client: MessageDbClient, batchOptions: BatchOptions) = member val BatchOptions = batchOptions member _.TokenEmpty = Token.create -1L - member _.LoadBatched(log, streamName, requireLeader, tryDecode, fold, initial, ct): Task = task { - let! version, state = Read.loadForwardsFrom log fold initial tryDecode client.ReadRetryPolicy client.Reader batchOptions.BatchSize batchOptions.MaxBatches streamName 0L requireLeader ct + member _.LoadBatched(streamName, requireLeader, tryDecode, fold, initial, ct): Task = task { + let! version, state = Read.loadForwardsFrom fold initial tryDecode client.ReadRetryPolicy client.Reader batchOptions.BatchSize batchOptions.MaxBatches streamName 0L requireLeader ct return struct(Token.create version, state) } - member _.LoadLast(log, streamName, requireLeader, tryDecode, fold, initial, ct): Task = task { - let! version, events = Read.loadLastEvent log client.ReadRetryPolicy client.Reader requireLeader streamName None ct + member _.LoadLast(streamName, requireLeader, tryDecode, fold, initial, ct): Task = task { + let! version, events = Read.loadLastEvent client.ReadRetryPolicy client.Reader requireLeader streamName None ct return struct(Token.create version, events |> Seq.chooseV tryDecode |> fold initial) } - member _.LoadSnapshot(log, category, streamId, requireLeader, tryDecode, eventType, ct) = task { + member _.LoadSnapshot(category, streamId, requireLeader, tryDecode, eventType, ct) = task { let snapshotStream = Snapshot.streamName category streamId - let! _, events = Read.loadLastEvent log client.ReadRetryPolicy client.Reader requireLeader snapshotStream (Some eventType) ct - return Snapshot.decode tryDecode events } + let! _, events = Read.loadLastEvent client.ReadRetryPolicy client.Reader requireLeader snapshotStream (Some eventType) ct + let decoded = Snapshot.decode tryDecode events + let version = match decoded with ValueNone -> -1L | ValueSome (t, _) -> t.version + Activity.setTags [|Tags.snapshot_version, version|] + return decoded } - member _.Reload(log, streamName, requireLeader, token, tryDecode, fold, initial, ct): Task = task { + member _.Reload(streamName, requireLeader, token, tryDecode, fold, initial, ct): Task = task { let streamVersion = Token.streamVersion token - let startPos = streamVersion + 1L // Reading a stream uses {inclusive} positions, but the streamVersion is `-1`-based - let! version, state = Read.loadForwardsFrom log fold initial tryDecode client.ReadRetryPolicy client.Reader batchOptions.BatchSize batchOptions.MaxBatches streamName startPos requireLeader ct + let fromVersion = streamVersion + 1L // Reading a stream uses {inclusive} positions, but the streamVersion is `-1`-based + let! version, state = Read.loadForwardsFrom fold initial tryDecode client.ReadRetryPolicy client.Reader batchOptions.BatchSize batchOptions.MaxBatches streamName fromVersion requireLeader ct return struct(Token.create (max streamVersion version), state) } - member internal _.TrySync(log, category, streamId, streamName, token, encodedEvents: IEventData[], ct): Task = task { + member internal _.TrySync(_category, _streamId, streamName, token, encodedEvents: IEventData[], ct): Task = task { let streamVersion = Token.streamVersion token - match! Write.writeEvents log client.WriteRetryPolicy client.Writer (category, streamId, streamName) (StreamVersion streamVersion) encodedEvents ct with + match! Write.writeEvents client.WriteRetryPolicy client.Writer streamName (StreamVersion streamVersion) encodedEvents ct with | MdbSyncResult.Written version' -> let token = Token.create version' return GatewaySyncResult.Written token | MdbSyncResult.ConflictUnknown -> return GatewaySyncResult.ConflictUnknown } - member _.StoreSnapshot(log, category, streamId, event, ct) = task { + member _.StoreSnapshot(category, streamId, event, ct) = task { let snapshotStream = Snapshot.streamName category streamId - let category = Snapshot.snapshotCategory category - let act = Activity.Current - if act <> null then act.SetTag("eqx.snapshot_written", true) |> ignore - do! Write.writeEvents log None client.Writer (category, streamId, snapshotStream) Any [| event |] ct :> Task } + Activity.setTags [|Tags.snapshot_written, true|] + do! Write.writeEvents None client.Writer snapshotStream Any [| event |] ct :> Task } [] type AccessStrategy<'event, 'state> = @@ -374,42 +227,42 @@ type AccessStrategy<'event, 'state> = | AdjacentSnapshots of snapshotEventCaseName: string * toSnapshot: ('state -> 'event) type private Category<'event, 'state, 'context>(context: MessageDbContext, codec: IEventCodec<_, _, 'context>, fold, initial, access) = - let loadAlgorithm log category streamId streamName requireLeader ct = + let loadAlgorithm category streamId streamName requireLeader ct = match access with - | None -> context.LoadBatched(log, streamName, requireLeader, codec.TryDecode, fold, initial, ct) - | Some AccessStrategy.LatestKnownEvent -> context.LoadLast(log, streamName, requireLeader, codec.TryDecode, fold, initial, ct) + | None -> context.LoadBatched(streamName, requireLeader, codec.TryDecode, fold, initial, ct) + | Some AccessStrategy.LatestKnownEvent -> context.LoadLast(streamName, requireLeader, codec.TryDecode, fold, initial, ct) | Some (AccessStrategy.AdjacentSnapshots (snapshotType, _)) -> task { - match! context.LoadSnapshot(log, category, streamId, requireLeader, codec.TryDecode, snapshotType, ct) with + match! context.LoadSnapshot(category, streamId, requireLeader, codec.TryDecode, snapshotType, ct) with | ValueSome (pos, snapshotEvent) -> let state = fold initial [| snapshotEvent |] - let! token, state = context.Reload(log, streamName, requireLeader, pos, codec.TryDecode, fold, state, ct) + let! token, state = context.Reload(streamName, requireLeader, pos, codec.TryDecode, fold, state, ct) return struct(token, state) - | ValueNone -> return! context.LoadBatched(log, streamName, requireLeader, codec.TryDecode, fold, initial, ct) } - let reload (log, sn, leader, token, state) ct = context.Reload(log, sn, leader, token, codec.TryDecode, fold, state, ct) + | ValueNone -> return! context.LoadBatched(streamName, requireLeader, codec.TryDecode, fold, initial, ct) } + let reload (sn, leader, token, state) ct = context.Reload(sn, leader, token, codec.TryDecode, fold, state, ct) interface ICategory<'event, 'state, 'context> with - member _.Load(log, categoryName, streamId, streamName, _maxAge, requireLeader, ct) = - loadAlgorithm log categoryName streamId streamName requireLeader ct - member x.TrySync(log, categoryName, streamId, streamName, ctx, _maybeInit, token, state, events, ct) = task { + member _.Load(_log, categoryName, streamId, streamName, _maxAge, requireLeader, ct) = + loadAlgorithm categoryName streamId streamName requireLeader ct + member x.TrySync(_log, categoryName, streamId, streamName, ctx, _maybeInit, token, state, events, ct) = task { let encode e = codec.Encode(ctx, e) let encodedEvents: IEventData[] = events |> Array.map encode - match! context.TrySync(log, categoryName, streamId, streamName, token, encodedEvents, ct) with + match! context.TrySync(categoryName, streamId, streamName, token, encodedEvents, ct) with | GatewaySyncResult.Written token' -> let state' = fold state (Seq.ofArray events) match access with | None | Some AccessStrategy.LatestKnownEvent -> () | Some (AccessStrategy.AdjacentSnapshots(_, toSnap)) -> if Token.shouldSnapshot context.BatchOptions.BatchSize token token' then - do! x.StoreSnapshot(log, categoryName, streamId, ctx, token', toSnap state', ct) + do! x.StoreSnapshot(categoryName, streamId, ctx, token', toSnap state', ct) return SyncResult.Written (token', state') | GatewaySyncResult.ConflictUnknown -> - return SyncResult.Conflict (reload (log, streamName, (*requireLeader*)true, token, state)) } - interface Caching.IReloadable<'state> with member _.Reload(log, sn, leader, token, state, ct) = reload (log, sn, leader, token, state) ct + return SyncResult.Conflict (reload (streamName, (*requireLeader*)true, token, state)) } + interface Caching.IReloadable<'state> with member _.Reload(_log, sn, leader, token, state, ct) = reload (sn, leader, token, state) ct - member _.StoreSnapshot(log, category, streamId, ctx, token, snapshotEvent, ct) = + member _.StoreSnapshot(category, streamId, ctx, token, snapshotEvent, ct) = let encodedWithMeta = let rawEvent = codec.Encode(ctx, snapshotEvent) FsCodec.Core.EventData.Create(rawEvent.EventType, rawEvent.Data, meta = Snapshot.meta token) - context.StoreSnapshot(log, category, streamId, encodedWithMeta, ct) + context.StoreSnapshot(category, streamId, encodedWithMeta, ct) type MessageDbCategory<'event, 'state, 'context> internal (resolveInner, empty) = inherit Equinox.Category<'event, 'state, 'context>(resolveInner, empty) diff --git a/src/Equinox.MessageDb/Tracing.fs b/src/Equinox.MessageDb/Tracing.fs deleted file mode 100644 index 0d5e74aff..000000000 --- a/src/Equinox.MessageDb/Tracing.fs +++ /dev/null @@ -1,36 +0,0 @@ -[] -module internal Equinox.MessageDb.Tracing - -open Equinox.MessageDb.Core -open System.Diagnostics - -[] -type ActivityExtensions private () = - - [] - static member AddExpectedVersion(act: Activity, version) = - match version with StreamVersion v -> act.AddTag("eqx.expected_version", v) | Any -> act - - [] - static member AddLastVersion(act: Activity, version: int64) = - act.AddTag("eqx.last_version", version) - - [] - static member AddBatchSize(act: Activity, size: int64) = - act.AddTag("eqx.batch_size", size) - - [] - static member AddBatches(act: Activity, batches: int) = - act.AddTag("eqx.batches", batches) - - [] - static member AddStartPosition(act: Activity, pos: int64) = - act.AddTag("eqx.start_position", pos) - - [] - static member AddLoadMethod(act: Activity, method: string) = - act.AddTag("eqx.load_method", method) - - [] - static member RecordConflict(act: Activity) = - act.AddTag("eqx.conflict", true).SetStatus(ActivityStatusCode.Error, "WrongExpectedVersion") diff --git a/src/Equinox/Core.fs b/src/Equinox/Core.fs index 5c03b5965..c3d58ecdf 100755 --- a/src/Equinox/Core.fs +++ b/src/Equinox/Core.fs @@ -54,10 +54,12 @@ type internal Impl() = static member TransactAsync(stream, fetch: IStream<'e, 's> -> CancellationToken -> Task, decide, reload, mapResult, ct): Task<'v> = task { + use _ = Tracing.source.StartActivity("Transact") let! originTokenAndState = fetch stream ct return! run stream decide reload mapResult originTokenAndState ct } static member QueryAsync(stream, fetch: IStream<'e, 's> -> CancellationToken -> Task, projection: Func, ct): Task<'v> = task { + use _ = Tracing.source.StartActivity("Query") let! tokenAndState = fetch stream ct return projection.Invoke tokenAndState } diff --git a/src/Equinox/Equinox.fsproj b/src/Equinox/Equinox.fsproj index 879a77fb8..fff7fe876 100644 --- a/src/Equinox/Equinox.fsproj +++ b/src/Equinox/Equinox.fsproj @@ -1,11 +1,12 @@  - netstandard2.1 + net6.0 + diff --git a/src/Equinox/Tracing.fs b/src/Equinox/Tracing.fs new file mode 100644 index 000000000..77171e42c --- /dev/null +++ b/src/Equinox/Tracing.fs @@ -0,0 +1,80 @@ +module Equinox.Core.Tracing + +let [] SourceName = "Equinox" +let internal source = new System.Diagnostics.ActivitySource(SourceName) + +module Tags = + (* General tags *) + + /// The full stream name + let [] stream_name = "eqx.stream_name" + /// The id of the stream + let [] stream_id = "eqx.stream_id" + /// The category of the stream + let [] category = "eqx.category" + /// The store being used + let [] store = "eqx.store" + + (* Information about loading events *) + + /// The version of the stream at read time (empty stream = 0) + let [] read_version = "eqx.load.version" + /// The configured batch size + let [] batch_size = "eqx.load.batch_size" + /// The total number of batches loaded from the store + let [] batches = "eqx.load.batches" + /// The total number of events loaded from the store + let [] loaded_count = "eqx.load.count" + /// The total size of events loaded + let [] loaded_bytes = "eqx.load.bytes" + /// The version we load forwards from + let [] loaded_from_version = "eqx.load.from_version" + /// The load method (BatchForward, LatestEvent, BatchBackward, etc) + let [] load_method = "eqx.load.method" + /// Whether the load specified leader node / consistent read etc + let [] requires_leader = "eqx.load.requires_leader" + /// Whether we used a cached state (independent of whether we reloaded) + let [] cache_hit = "eqx.cache.hit" + /// Elapsed ms since cached state was stored or revalidated at time of inspection + let [] cache_age = "eqx.cache.age_ms" + /// Staleness tolerance specified for this request, in ms + let [] max_staleness = "eqx.cache.max_stale_ms" + /// If a snapshot was read, what version of the stream was it based on + let [] snapshot_version = "eqx.snapshot.version" + + (* Information about appending of events *) + + /// Whether there was at least one conflict during transact + let [] conflict = "eqx.conflict" + /// (if conflict) Sync attempts - 1 + let [] sync_retries = "eqx.conflict.retries" + + /// Store level retry + let [] append_retries = "eqx.append.retries" + /// The number of events we appended + let [] append_count = "eqx.append.count" + /// The total bytes we appended + let [] append_bytes = "eqx.append.bytes" + /// Whether a snapshot was written during this transaction + let [] snapshot_written = "eqx.snapshot.written" + /// The new version of the stream after appending events + let [] append_version = "eqx.append.version" + /// In case of conflict, which event types did we try to append + let [] append_types = "eqx.append.types" + +module Load = + let setTags (category, streamId, streamName, requiresLeader, maxAge: System.TimeSpan) = + let act = System.Diagnostics.Activity.Current + if act <> null then + act.SetTag(Tags.category, category) + .SetTag(Tags.stream_id, streamId) + .SetTag(Tags.stream_name, streamName) + .SetTag(Tags.requires_leader, requiresLeader) + .SetTag(Tags.max_staleness, maxAge.TotalMilliseconds) |> ignore + +module TrySync = + let setTags (attempt, events: 'a[]) = + let act = System.Diagnostics.Activity.Current + if act <> null then + act.SetTag(Tags.sync_retries, attempt - 1) + .SetTag(Tags.append_count, events.Length) |> ignore diff --git a/tests/Equinox.CosmosStore.Integration/CosmosFixturesInfrastructure.fs b/tests/Equinox.CosmosStore.Integration/CosmosFixturesInfrastructure.fs index 99ae05749..85b81c530 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosFixturesInfrastructure.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosFixturesInfrastructure.fs @@ -1,20 +1,5 @@ namespace global -open Domain -open FsCheck.FSharp -open System - -module Arb = - let generate<'t> = ArbMap.defaults |> ArbMap.generate<'t> - -type FsCheckGenerators = - static member SkuId = Arb.generate |> Gen.map SkuId |> Arb.fromGen - static member ContactPreferencesId = - Arb.generate - |> Gen.map (fun x -> sprintf "%s@test.com" (x.ToString("N"))) - |> Gen.map ContactPreferences.ClientId - |> Arb.fromGen - [] module SerilogHelpers = open Serilog.Events diff --git a/tests/Equinox.EventStoreDb.Integration/Infrastructure.fs b/tests/Equinox.EventStoreDb.Integration/Infrastructure.fs index 1837d2e11..47afbb11b 100644 --- a/tests/Equinox.EventStoreDb.Integration/Infrastructure.fs +++ b/tests/Equinox.EventStoreDb.Integration/Infrastructure.fs @@ -1,20 +1,9 @@ namespace global open Domain -open FsCheck.FSharp open System open FSharp.UMX -module Arb = - let generate<'t> = ArbMap.defaults |> ArbMap.generate<'t> -type FsCheckGenerators = - static member SkuId = Arb.generate |> Gen.map SkuId |> Arb.fromGen - static member ContactPreferencesId = - Arb.generate - |> Gen.map (fun x -> sprintf "%s@test.com" (x.ToString("N"))) - |> Gen.map ContactPreferences.ClientId - |> Arb.fromGen - static member RequestId = Arb.generate |> Gen.map (fun x -> RequestId.parse %x) |> Arb.fromGen #if STORE_POSTGRES || STORE_MSSQL || STORE_MYSQL open Equinox.SqlStreamStore diff --git a/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs b/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs index 4ed04e091..9fdfe9806 100644 --- a/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs +++ b/tests/Equinox.EventStoreDb.Integration/StoreIntegration.fs @@ -4,7 +4,6 @@ open Domain open FSharp.UMX open Serilog open Swensen.Unquote -open System.Diagnostics open System.Threading open System @@ -43,19 +42,6 @@ let connectToLocalStore (_: ILogger) = type Context = SqlStreamStoreContext type Category<'event, 'state, 'context> = SqlStreamStoreCategory<'event, 'state, 'context> #endif -#if STORE_MESSAGEDB -open Equinox.MessageDb -open OpenTelemetry -open OpenTelemetry.Trace -open OpenTelemetry.Resources - -let connectToLocalStore _ = async { - let connectionString = "Host=localhost; Username=message_store; Password=; Database=message_store; Port=5432; Maximum Pool Size=10" - return MessageDbClient(connectionString) -} -type Context = MessageDbContext -type Category<'event, 'state, 'context> = MessageDbCategory<'event, 'state, 'context> -#endif #if STORE_EVENTSTOREDB open Equinox.EventStoreDb @@ -88,10 +74,6 @@ type Category<'event, 'state, 'context> = EventStoreCategory<'event, 'state, 'co let createContext connection batchSize = Context(connection, batchSize = batchSize) -#if NET -let source = new ActivitySource("StoreIntegration") -#endif - module SimplestThing = type Event = | StuffHappened @@ -114,19 +96,11 @@ module Cart = |> Equinox.Decider.resolve log |> Cart.create - #if STORE_MESSAGEDB - let snapshot = Cart.Fold.snapshotEventCaseName, Cart.Fold.snapshot - let createServiceWithAdjacentSnapshotting log context = - Category(context, codec, fold, initial, access = AccessStrategy.AdjacentSnapshots snapshot) - |> Equinox.Decider.resolve log - |> Cart.create - #else let snapshot = Cart.Fold.isOrigin, Cart.Fold.snapshot let createServiceWithCompaction log context = Category(context, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot) |> Equinox.Decider.resolve log |> Cart.create - #endif let createServiceWithCaching log context cache = let sliding20m = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) @@ -134,19 +108,11 @@ module Cart = |> Equinox.Decider.resolve log |> Cart.create - #if STORE_MESSAGEDB - let createServiceWithSnapshottingAndCaching log context cache = - let sliding20m = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - Category(context, codec, fold, initial, sliding20m, AccessStrategy.AdjacentSnapshots snapshot) - |> Equinox.Decider.resolve log - |> Cart.create - #else let createServiceWithCompactionAndCaching log context cache = let sliding20m = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) Category(context, codec, fold, initial, sliding20m, AccessStrategy.RollingSnapshots snapshot) |> Equinox.Decider.resolve log |> Cart.create - #endif module ContactPreferences = let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial @@ -176,18 +142,6 @@ let addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuI addAndThenRemoveItems true true context cartId skuId service count type GeneralTests(testOutputHelper) = - #if STORE_MESSAGEDB - let sdk = - Sdk.CreateTracerProviderBuilder() - .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName = "mdbi")) - .AddSource("Equinox") - .AddSource("Equinox.MessageDb") - .AddSource("StoreIntegration") - .AddSource("Npqsl") - .AddOtlpExporter(fun opts -> opts.Endpoint <- Uri("http://localhost:4317")) - .Build() - #endif - let output = TestContext(testOutputHelper) #if STORE_EVENTSTOREDB // gRPC does not expose slice metrics @@ -200,9 +154,6 @@ type GeneralTests(testOutputHelper) = [] let ``Can roundtrip against Store, correctly batching the reads [without any optimizations]`` (ctx, skuId) = async { - #if NET - use _ = source.StartActivity("Can roundtrip against Store, correctly batching the reads [without any optimizations]") - #endif let log, capture = output.CreateLoggerWithCapture() let! connection = connectToLocalStore log @@ -232,9 +183,6 @@ type GeneralTests(testOutputHelper) = [] let ``Can roundtrip against Store, managing sync conflicts by retrying [without any optimizations]`` (ctx, initialState) = async { - #if NET - use _ = source.StartActivity("Can roundtrip against Store, managing sync conflicts by retrying [without any optimizations]") - #endif let log1, capture1 = output.CreateLoggerWithCapture() let! connection = connectToLocalStore log1 @@ -266,9 +214,6 @@ type GeneralTests(testOutputHelper) = let w3, s3 = eventWaitSet () let w4, s4 = eventWaitSet () let t1 = async { - #if NET - use _ = source.StartActivity("Trx1") - #endif // Wait for other to have state, signal we have it, await conflict and handle let prepare = async { do! w0 @@ -284,9 +229,6 @@ type GeneralTests(testOutputHelper) = let context = createContext connection batchSize let service2 = Cart.createServiceWithoutOptimization log2 context let t2 = async { - #if NET - use _ = source.StartActivity("Trx2") - #endif // Signal we have state, wait for other to do same, engineer conflict let prepare = async { do! s0 @@ -320,19 +262,11 @@ type GeneralTests(testOutputHelper) = #else let sliceBackward = [EsAct.SliceBackward] #endif -#if STORE_MESSAGEDB // MessageDB doesn't report Batches for "Read Last Event" scenarios - let singleBatchBackwards = [EsAct.ReadLast] - let batchBackwardsAndAppend = [EsAct.ReadLast; EsAct.Append] -#else let singleBatchBackwards = sliceBackward @ [EsAct.BatchBackward] let batchBackwardsAndAppend = singleBatchBackwards @ [EsAct.Append] -#endif [] let ``Can correctly read and update against Store, with LatestKnownEvent Access Strategy`` id value = async { - #if NET - use _ = source.StartActivity("Can correctly read and update against Store, with LatestKnownEvent Access Strategy") - #endif let log, capture = output.CreateLoggerWithCapture() let! client = connectToLocalStore log let service = ContactPreferences.createService log client @@ -354,9 +288,6 @@ type GeneralTests(testOutputHelper) = [] let ``Can roundtrip against Store, correctly caching to avoid redundant reads`` (ctx, skuId) = async { - #if NET - use _ = source.StartActivity("Can roundtrip against Store, correctly caching to avoid redundant reads") - #endif let log, capture = output.CreateLoggerWithCapture() let! client = connectToLocalStore log let batchSize = 10 @@ -420,9 +351,6 @@ type GeneralTests(testOutputHelper) = [] let ``Version is 0-based`` () = async { - #if NET - use _ = source.StartActivity("Version is 0-based") - #endif let log, _ = output.CreateLoggerWithCapture() let! connection = connectToLocalStore log @@ -438,13 +366,6 @@ type GeneralTests(testOutputHelper) = test <@ [before; after] = [0L; 1L] @> } -#if STORE_MESSAGEDB - interface IDisposable with - member _.Dispose() = sdk.Shutdown() |> ignore -#endif - - -#if !STORE_MESSAGEDB type RollingSnapshotTests(testOutputHelper) = let output = TestContext(testOutputHelper) #if STORE_EVENTSTOREDB // gRPC does not expose slice metrics @@ -464,9 +385,6 @@ type RollingSnapshotTests(testOutputHelper) = [] let ``Can roundtrip against Store, correctly compacting to avoid redundant reads`` (ctx, skuId) = async { - #if NET - use _ = source.StartActivity("Can roundtrip against Store, correctly compacting to avoid redundant reads") - #endif let log, capture = output.CreateLoggerWithCapture() let! client = connectToLocalStore log let batchSize = 10 @@ -550,119 +468,3 @@ type RollingSnapshotTests(testOutputHelper) = let suboptimalExtraSlice: EsAct list = sliceForward test <@ singleBatchBackwards @ batchBackwardsAndAppend @ suboptimalExtraSlice @ singleBatchForward = capture.ExternalCalls @> } -#endif - - -#if STORE_MESSAGEDB -type AdjacentSnapshotTests(testOutputHelper) = - let sdk = - Sdk.CreateTracerProviderBuilder() - .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName = "mdbi")) - .AddSource("Equinox") - .AddSource("Equinox.MessageDb") - .AddSource("StoreIntegration") - .AddSource("Npqsl") - .AddOtlpExporter(fun opts -> opts.Endpoint <- Uri("http://localhost:4317")) - .Build() - - let output = TestContext(testOutputHelper) - - let sliceForward = [EsAct.SliceForward] - let singleBatchForward = [EsAct.SliceForward; EsAct.BatchForward] - let readSnapshotted = [EsAct.ReadLast; EsAct.SliceForward; EsAct.BatchForward] - - [] - let ``Can roundtrip against Store, correctly snapshotting to avoid redundant reads`` (ctx, skuId) = async { - #if NET - use _ = source.StartActivity("Can roundtrip against Store, correctly snapshotting to avoid redundant reads") - #endif - let log, capture = output.CreateLoggerWithCapture() - let! client = connectToLocalStore log - let batchSize = 10 - let context = createContext client batchSize - let service = Cart.createServiceWithAdjacentSnapshotting log context - - // Trigger 8 events, then reload - let cartId = % Guid.NewGuid() - do! addAndThenRemoveItemsManyTimes ctx cartId skuId service 4 - let! _ = service.Read cartId - - test <@ readSnapshotted @ [EsAct.Append] @ readSnapshotted = capture.ExternalCalls @> - - // Add two more, which should push it over the threshold and hence trigger an append of a snapshot event - capture.Clear() - do! addAndThenRemoveItemsManyTimes ctx cartId skuId service 1 - test <@ readSnapshotted @ [EsAct.Append; EsAct.Append] = capture.ExternalCalls @> - - // We now have 10 events and should be able to read them with a single call - capture.Clear() - let! _ = service.Read cartId - test <@ readSnapshotted = capture.ExternalCalls @> - - // Add 8 more; total of 18 should not trigger snapshotting as we snapshotted at Event Number 10 - capture.Clear() - do! addAndThenRemoveItemsManyTimes ctx cartId skuId service 4 - test <@ readSnapshotted @ [EsAct.Append] = capture.ExternalCalls @> - - // While we now have 18 events, we should be able to read them with a single call - capture.Clear() - let! _ = service.Read cartId - test <@ readSnapshotted = capture.ExternalCalls @> - - // add two more events, triggering a snapshot, then read it in a single snapshotted read - capture.Clear() - do! addAndThenRemoveItemsManyTimes ctx cartId skuId service 1 - // and reload the 20 events with a single read - let! _ = service.Read cartId - test <@ readSnapshotted @ [EsAct.Append; EsAct.Append] @ readSnapshotted = capture.ExternalCalls @> - } - - [] - let ``Can combine snapshotting with caching against Store`` (ctx, skuId) = async { - let log, capture = output.CreateLoggerWithCapture() - let! client = connectToLocalStore log - let batchSize = 10 - let context = createContext client batchSize - let service1 = Cart.createServiceWithAdjacentSnapshotting log context - let cache = Equinox.Cache("cart", sizeMb = 50) - let context = createContext client batchSize - let service2 = Cart.createServiceWithSnapshottingAndCaching log context cache - - // Trigger 8 events, then reload - let cartId = % Guid.NewGuid() - do! addAndThenRemoveItemsManyTimes ctx cartId skuId service1 4 - let! _ = service2.Read cartId - - // ... should see a single append as we are inside the batch threshold - test <@ readSnapshotted @ [EsAct.Append] @ readSnapshotted = capture.ExternalCalls @> - - // Add two more, which should push it over the threshold and hence trigger generation of a snapshot event - capture.Clear() - do! addAndThenRemoveItemsManyTimes ctx cartId skuId service1 1 - test <@ readSnapshotted @ [EsAct.Append; EsAct.Append] = capture.ExternalCalls @> - - // We now have 10 events, we should be able to read them with a single snapshotted read - capture.Clear() - let! _ = service1.Read cartId - test <@ readSnapshotted = capture.ExternalCalls @> - - // Add 8 more; total of 18 should not trigger snapshotting as the snapshot is at version 10 - capture.Clear() - do! addAndThenRemoveItemsManyTimes ctx cartId skuId service1 4 - test <@ readSnapshotted @ [EsAct.Append] = capture.ExternalCalls @> - - // While we now have 18 events, we should be able to read them with a single snapshotted read - capture.Clear() - let! _ = service1.Read cartId - test <@ readSnapshotted = capture.ExternalCalls @> - // ... trigger a second snapshotting - capture.Clear() - do! addAndThenRemoveItemsManyTimes ctx cartId skuId service1 1 - // and we _could_ reload the 20 events with a single read. However we are using the cache, which last saw it with 10 events, which necessitates two reads - let! _ = service2.Read cartId - test <@ readSnapshotted @ [EsAct.Append; EsAct.Append] @ sliceForward @ singleBatchForward = capture.ExternalCalls @> - } - - interface IDisposable with - member _.Dispose() = sdk.Shutdown() |> ignore -#endif diff --git a/tests/Equinox.MessageDb.Integration/Equinox.MessageDb.Integration.fsproj b/tests/Equinox.MessageDb.Integration/Equinox.MessageDb.Integration.fsproj index 5bc805578..30f5bbfd8 100644 --- a/tests/Equinox.MessageDb.Integration/Equinox.MessageDb.Integration.fsproj +++ b/tests/Equinox.MessageDb.Integration/Equinox.MessageDb.Integration.fsproj @@ -2,14 +2,11 @@ net6.0 - $(DefineConstants);STORE_MESSAGEDB - - - + diff --git a/tests/Equinox.MessageDb.Integration/MessageDbIntegration.fs b/tests/Equinox.MessageDb.Integration/MessageDbIntegration.fs new file mode 100644 index 000000000..1333324fe --- /dev/null +++ b/tests/Equinox.MessageDb.Integration/MessageDbIntegration.fs @@ -0,0 +1,479 @@ +module Equinox.MessageDb.Integration.MessageDbIntegration + +open System.Threading +open Domain +open Equinox.Core.Tracing +open FSharp.UMX +open Swensen.Unquote +open System.Diagnostics +open System + +let defaultBatchSize = 500 + +open Equinox.MessageDb + +let connectToLocalStore () = async { + let connectionString = "Host=localhost; Username=message_store; Password=; Database=message_store; Port=5432; Maximum Pool Size=10" + return MessageDbClient(connectionString) +} +type Context = MessageDbContext +type Category<'event, 'state, 'context> = MessageDbCategory<'event, 'state, 'context> + +let createContext connection batchSize = Context(connection, batchSize = batchSize) + +module SimplestThing = + type Event = + | StuffHappened + interface TypeShape.UnionContract.IUnionContract + let codec = EventCodec.gen + + let evolve (_state: Event) (event: Event) = event + let fold = Seq.fold evolve + let initial = StuffHappened + let resolve log context = + Category(context, codec, fold, initial) + |> Equinox.Decider.resolve log + let [] Category = "SimplestThing" + +module Cart = + let fold, initial = Cart.Fold.fold, Cart.Fold.initial + let codec = Cart.Events.codec + let createServiceWithoutOptimization log context = + Category(context, codec, fold, initial) + |> Equinox.Decider.resolve log + |> Cart.create + + let snapshot = Cart.Fold.snapshotEventCaseName, Cart.Fold.snapshot + let createServiceWithAdjacentSnapshotting log context = + Category(context, codec, fold, initial, access = AccessStrategy.AdjacentSnapshots snapshot) + |> Equinox.Decider.resolve log + |> Cart.create + + let createServiceWithCaching log context cache = + let sliding20m = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + Category(context, codec, fold, initial, sliding20m) + |> Equinox.Decider.resolve log + |> Cart.create + + let createServiceWithSnapshottingAndCaching log context cache = + let sliding20m = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + Category(context, codec, fold, initial, sliding20m, AccessStrategy.AdjacentSnapshots snapshot) + |> Equinox.Decider.resolve log + |> Cart.create + +module ContactPreferences = + let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial + let codec = ContactPreferences.Events.codec + let createServiceWithoutOptimization log connection = + let context = createContext connection defaultBatchSize + Category(context, codec, fold, initial) + |> Equinox.Decider.resolve log + |> ContactPreferences.create + + let createService log connection = + Category(createContext connection 1, codec, fold, initial, access = AccessStrategy.LatestKnownEvent) + |> Equinox.Decider.resolve log + |> ContactPreferences.create + +let addAndThenRemoveItems optimistic exceptTheLastOne context cartId skuId (service: Cart.Service) count = + service.ExecuteManyAsync(cartId, optimistic, seq { + for i in 1..count do + yield Cart.SyncItem (context, skuId, Some i, None) + if not exceptTheLastOne || i <> count then + yield Cart.SyncItem (context, skuId, Some 0, None) }) +let addAndThenRemoveItemsManyTimes context cartId skuId service count = + addAndThenRemoveItems false false context cartId skuId service count +let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service count = + addAndThenRemoveItems false true context cartId skuId service count +let addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service count = + addAndThenRemoveItems true true context cartId skuId service count + +type Listener() = + let scope = Guid.NewGuid() + let v = AsyncLocal() + do v.Value <- scope + let spans = ResizeArray() + let listener = new ActivityListener( + ActivityStarted = (fun s -> if v.Value = scope then spans.Add(s)), + ShouldListenTo = (fun s -> s.Name = Equinox.Core.Tracing.SourceName), + Sample = (fun _ -> ActivitySamplingResult.AllDataAndRecorded)) + do ActivitySource.AddActivityListener(listener) + + member _.Spans() = spans + member _.Clear() = spans.Clear() + member _.TestSpans(tests: _[]) = + let len = max tests.Length spans.Count + // this is funky because we want to ensure the same number of tests and spans + for i in 0..len - 1 do + tests[i] spans[i] + spans.Clear() + + interface IDisposable with + member _.Dispose() = listener.Dispose() + +let span (m: (string * obj) list) (span: Activity) = + let spanDict = Map.ofList [ + for key, _ in m do + key, span.GetTagItem(key) + "name", span.DisplayName :> obj ] + test <@ spanDict = Map.ofList m @> + +type GeneralTests() = + + [] + let ``Can roundtrip against Store, correctly batching the reads [without any optimizations]`` (ctx, skuId) = async { + let log = Serilog.Log.Logger + use listener = new Listener() + let! connection = connectToLocalStore () + + let batchSize = 3 + let context = createContext connection batchSize + let service = Cart.createServiceWithoutOptimization log context + + // The command processing should trigger only a single read and a single write call + let addRemoveCount = 6 + let cartId = % Guid.NewGuid() + + do! addAndThenRemoveItemsManyTimesExceptTheLastOne ctx cartId skuId service addRemoveCount + listener.TestSpans([| + span([ + "name", "Transact" + Tags.batches, 1 + Tags.loaded_count, 0 + Tags.append_count, 11 + ]) + |]) + + // Validate basic operation; Key side effect: Log entries will be emitted to `capture` + let! state = service.Read cartId + let expectedEventCount = 2 * addRemoveCount - 1 + test <@ addRemoveCount = match state with { items = [{ quantity = quantity }] } -> quantity | _ -> failwith "nope" @> + + // Need to read 4 batches to read 11 events in batches of 3 + let expectedBatches = ceil(float expectedEventCount/float batchSize) |> int + listener.TestSpans([| + span(["name", "Query"; Tags.batches, expectedBatches; Tags.loaded_count, expectedEventCount]) + |]) + } + + [] + let ``Can roundtrip against Store, managing sync conflicts by retrying [without any optimizations]`` (ctx, initialState) = async { + let log = Serilog.Log.Logger + use listener = new Listener() + + let! connection = connectToLocalStore() + // Ensure batching is included at some point in the proceedings + let batchSize = 3 + + let ctx, (sku11, sku12, sku21, sku22) = ctx + let cartId = % Guid.NewGuid() + + // establish base stream state + let context = createContext connection batchSize + let service1 = Cart.createServiceWithoutOptimization log context + let! maybeInitialSku = + let streamEmpty, skuId = initialState + async { + if streamEmpty then return None + else + let addRemoveCount = 2 + do! addAndThenRemoveItemsManyTimesExceptTheLastOne ctx cartId skuId service1 addRemoveCount + return Some (skuId, addRemoveCount) } + + let act prepare (service: Cart.Service) skuId count = + service.ExecuteManyAsync(cartId, false, prepare = prepare, commands = [Cart.SyncItem (ctx, skuId, Some count, None)]) + + let eventWaitSet () = let e = new ManualResetEvent(false) in (Async.AwaitWaitHandle e |> Async.Ignore), async { e.Set() |> ignore } + let w0, s0 = eventWaitSet () + let w1, s1 = eventWaitSet () + let w2, s2 = eventWaitSet () + let w3, s3 = eventWaitSet () + let w4, s4 = eventWaitSet () + let t1 = async { + // Wait for other to have state, signal we have it, await conflict and handle + let prepare = async { + do! w0 + do! s1 + do! w2 } + do! act prepare service1 sku11 11 + // Wait for other side to load; generate conflict + let prepare = async { do! w3 } + do! act prepare service1 sku12 12 + // Signal conflict generated + do! s4 } + let context = createContext connection batchSize + let service2 = Cart.createServiceWithoutOptimization log context + let t2 = async { + // Signal we have state, wait for other to do same, engineer conflict + let prepare = async { + do! s0 + do! w1 } + do! act prepare service2 sku21 21 + // Signal conflict is in place + do! s2 + // Await our conflict + let prepare = async { + do! s3 + do! w4 } + do! act prepare service2 sku22 22 } + // Act: Engineer the conflicts and applications + do! Async.Parallel [t1; t2] |> Async.Ignore + + let syncs = listener.Spans() |> List.ofSeq + // Load state + let! result = service1.Read cartId + + // Ensure correct values got persisted + let has sku qty = result.items |> List.exists (fun { skuId = s; quantity = q } -> (sku, qty) = (s, q)) + test <@ maybeInitialSku |> Option.forall (fun (skuId, quantity) -> has skuId quantity) + && has sku11 11 && has sku12 12 + && has sku21 21 && has sku22 22 @> + // Intended conflicts pertained + let conflicts = syncs |> List.filter(fun s -> s.DisplayName = "Transact" && s.GetTagItem(Tags.conflict) = true) + test <@ List.length conflicts = 2 @> } + + [] + let ``Can correctly read and update against Store, with LatestKnownEvent Access Strategy`` id value = async { + use listener = new Listener() + let log = Serilog.Log.Logger + let! client = connectToLocalStore() + let service = ContactPreferences.createService log client + + // Feed some junk into the stream + for i in 0..11 do + let quickSurveysValue = i % 2 = 0 + do! service.Update(id, { value with quickSurveys = quickSurveysValue }) + // Ensure there will be something to be changed by the Update below + do! service.Update(id, { value with quickSurveys = not value.quickSurveys }) + + listener.Clear() + do! service.Update(id, value) + + let! result = service.Read id + listener.TestSpans([| + span([ "name", "Transact"; Tags.loaded_count, 1; Tags.append_count, 1 ]) + span([ "name", "Query"; Tags.loaded_count, 1 ]) + |]) + test <@ value = result @> + } + + let loadCached hit batches count (span: Activity) = + test <@ span.DisplayName = "Load" + && span.GetTagItem(Tags.cache_hit) = hit + && span.GetTagItem(Tags.batches) = batches + && span.GetTagItem(Tags.loaded_count) = count @> + + [] + let ``Can roundtrip against Store, correctly caching to avoid redundant reads`` (ctx, skuId) = async { + use listener = new Listener() + let log = Serilog.Log.Logger + let! client = connectToLocalStore () + let batchSize = 10 + let cache = Equinox.Cache("cart", sizeMb = 50) + let context = createContext client batchSize + let createServiceCached () = Cart.createServiceWithCaching log context cache + let service1, service2, service3 = createServiceCached (), createServiceCached (), Cart.createServiceWithoutOptimization log context + let cartId = % Guid.NewGuid() + + // Trigger 9 events, then reload + do! addAndThenRemoveItemsManyTimesExceptTheLastOne ctx cartId skuId service1 5 + listener.TestSpans([| + span ["name", "Transact"; Tags.cache_hit, false; Tags.batches, 1; Tags.loaded_count, 0; Tags.append_count, 9] + |]) + + let! resStale = service2.ReadStale cartId + listener.TestSpans([| + span ["name", "Query"; Tags.cache_hit, true; Tags.batches, null; Tags.loaded_count, null ] + |]) + let! resFresh = service2.Read cartId + // // Because we're caching writes, stale vs fresh reads are equivalent + test <@ resStale = resFresh @> + // ... should see a write plus a batched forward read as position is cached + listener.TestSpans([| + span ["name", "Query"; Tags.cache_hit, true; Tags.batches, 1; Tags.loaded_count, 0] + |]) + + let skuId2 = SkuId <| Guid.NewGuid() + do! addAndThenRemoveItemsManyTimesExceptTheLastOne ctx cartId skuId2 service1 1 + listener.TestSpans([| + span ["name", "Transact"; Tags.cache_hit, true; Tags.batches, 1; Tags.loaded_count, 0; Tags.append_count, 1] + |]) + + // While we now have 12 events, we should be able to read them with a single call + // Do a stale read - we will see outs + let! res = service2.ReadStale cartId + // result after 10 should be different to result after 12 + test <@ res <> resFresh @> + // but we don't do a roundtrip to get it + listener.TestSpans([| + span ["name", "Query"; Tags.cache_hit, true; Tags.batches, null; Tags.loaded_count, null ] + |]) + let! _ = service2.Read cartId + listener.TestSpans([| + span ["name", "Query"; Tags.cache_hit, true; Tags.batches, 1; Tags.loaded_count, 0 ] + |]) + // As the cache is up to date, we can transact against the cached value and do a null transaction without a roundtrip + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne ctx cartId skuId2 service1 1 + listener.TestSpans([| + span ["name", "Transact"; Tags.cache_hit, true; Tags.batches, null; Tags.loaded_count, null ] + |]) + // As the cache is up to date, we can do an optimistic append, saving a Read roundtrip + let skuId3 = SkuId <| Guid.NewGuid() + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne ctx cartId skuId3 service1 1 + listener.TestSpans([| + span ["name", "Transact"; Tags.cache_hit, true; Tags.batches, null; Tags.loaded_count, null; Tags.append_count, 1] + |]) + // If we don't have a cache attached, we don't benefit from / pay the price for any optimism + let skuId4 = SkuId <| Guid.NewGuid() + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne ctx cartId skuId4 service3 1 + // Need 2 batches to do the reading + listener.TestSpans([| + // this time, we did something, so we see the append call + span ["name", "Transact"; Tags.cache_hit, null; Tags.batches, 2; Tags.loaded_count, 11; Tags.append_count, 1] + |]) + // we've engineered a clash with the cache state (service3 doest participate in caching) + // Conflict with cached state leads to a read forward to resync; Then we'll idempotently decide not to do any append + do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne ctx cartId skuId4 service2 1 + listener.TestSpans([| + span ["name", "Transact"; Tags.cache_hit, true; Tags.conflict, true ] + |]) + } + + [] + let ``Version is 0-based`` () = async { + let! connection = connectToLocalStore () + + let batchSize = 3 + let context = createContext connection batchSize + let id = Guid.NewGuid() + let toStreamId (x: Guid) = x.ToString "N" + let decider = SimplestThing.resolve Serilog.Log.Logger context SimplestThing.Category (Equinox.StreamId.gen toStreamId id) + + let! before, after = decider.TransactEx( + (fun state -> state.Version, [SimplestThing.StuffHappened]), + mapResult = (fun result ctx-> result, ctx.Version)) + test <@ [before; after] = [0L; 1L] @> + } + +type AdjacentSnapshotTests() = + + [] + let ``Can roundtrip against Store, correctly snapshotting to avoid redundant reads`` (ctx, skuId) = async { + use listener = new Listener() + let log = Serilog.Log.Logger + let! client = connectToLocalStore () + let batchSize = 10 + let context = createContext client batchSize + let service = Cart.createServiceWithAdjacentSnapshotting log context + + // Trigger 8 events, then reload + let cartId = % Guid.NewGuid() + do! addAndThenRemoveItemsManyTimes ctx cartId skuId service 4 + let! _ = service.Read cartId + listener.TestSpans([| + span ["name", "Transact"; Tags.batches, 1; Tags.loaded_count, 0; Tags.append_count, 8; Tags.snapshot_version, -1L] + span ["name", "Query"; Tags.batches, 1; Tags.loaded_count, 8; Tags.snapshot_version, -1L] + |]) + + // Add two more, which should push it over the threshold and hence trigger an append of a snapshot event + do! addAndThenRemoveItemsManyTimes ctx cartId skuId service 1 + listener.TestSpans([| + span ["name", "Transact"; Tags.batches, 1; Tags.loaded_count, 8; Tags.append_count, 2; Tags.snapshot_version, -1L + Tags.snapshot_written, true] + |]) + // We now have 10 events and should be able to read them with a single call + let! _ = service.Read cartId + listener.TestSpans([| + span ["name", "Query"; Tags.batches, 1; Tags.loaded_count, 0; Tags.snapshot_version, 10L] + |]) + + // Add 8 more; total of 18 should not trigger snapshotting as we snapshotted at Event Number 10 + do! addAndThenRemoveItemsManyTimes ctx cartId skuId service 4 + listener.TestSpans([| + span ["name", "Transact"; Tags.batches, 1; Tags.loaded_count, 0 + Tags.snapshot_version, 10L; Tags.append_count, 8] + |]) + + // While we now have 18 events, we should be able to read them with a single call + let! _ = service.Read cartId + listener.TestSpans([| + span ["name", "Query"; Tags.batches, 1; Tags.loaded_count, 8 + Tags.snapshot_version, 10L] + |]) + + // add two more events, triggering a snapshot, then read it in a single snapshotted read + do! addAndThenRemoveItemsManyTimes ctx cartId skuId service 1 + // and reload the 20 events with a single read + let! _ = service.Read cartId + listener.TestSpans([| + span ["name", "Transact"; Tags.batches, 1; Tags.loaded_count, 8; Tags.snapshot_version, 10L + Tags.append_count, 2; Tags.snapshot_written, true] + span ["name", "Query"; Tags.batches, 1; Tags.loaded_count, 0 + Tags.snapshot_version, 20L] + |]) + } + + [] + let ``Can combine snapshotting with caching against Store`` (ctx, skuId) = async { + let log = Serilog.Log.Logger + use listener = new Listener() + let! client = connectToLocalStore() + let batchSize = 10 + let context = createContext client batchSize + let service1 = Cart.createServiceWithAdjacentSnapshotting log context + let cache = Equinox.Cache("cart", sizeMb = 50) + let context = createContext client batchSize + let service2 = Cart.createServiceWithSnapshottingAndCaching log context cache + + // Trigger 8 events, then reload + let cartId = % Guid.NewGuid() + do! addAndThenRemoveItemsManyTimes ctx cartId skuId service1 4 + let! _ = service2.Read cartId + + // ... should not see a snapshot write as we are inside the batch threshold + listener.TestSpans([| + span ["name", "Transact"; Tags.batches, 1; Tags.loaded_count, 0; Tags.snapshot_version, -1L + Tags.cache_hit, null; Tags.append_count, 8; Tags.snapshot_written, null] + span ["name", "Query"; Tags.batches, 1; Tags.loaded_count, 8; Tags.snapshot_version, -1L + Tags.cache_hit, false] + + |]) + + // Add two more, which should push it over the threshold and hence trigger generation of a snapshot event + do! addAndThenRemoveItemsManyTimes ctx cartId skuId service1 1 + listener.TestSpans([| + span ["name", "Transact"; Tags.batches, 1; Tags.loaded_count, 8; Tags.snapshot_version, -1L + Tags.cache_hit, null; Tags.append_count, 2; Tags.snapshot_written, true] + |]) + // We now have 10 events, we should be able to read them with a single snapshotted read + let! _ = service1.Read cartId + listener.TestSpans([| + span ["name", "Query"; Tags.batches, 1; Tags.loaded_count, 0; Tags.snapshot_version, 10L + Tags.cache_hit, null] + |]) + + // Add 8 more; total of 18 should not trigger snapshotting as the snapshot is at version 10 + do! addAndThenRemoveItemsManyTimes ctx cartId skuId service1 4 + listener.TestSpans([| + span ["name", "Transact"; Tags.batches, 1; Tags.loaded_count, 0; Tags.snapshot_version, 10L + Tags.cache_hit, null; Tags.append_count, 8; Tags.snapshot_written, null] + |]) + + // While we now have 18 events, we should be able to read them with a single snapshotted read + let! _ = service1.Read cartId + listener.TestSpans([| + span ["name", "Query"; Tags.batches, 1; Tags.loaded_count, 8; Tags.snapshot_version, 10L + Tags.cache_hit, null] + |]) + + // ... trigger a second snapshotting + do! addAndThenRemoveItemsManyTimes ctx cartId skuId service1 1 + // and we _could_ reload the 20 events with a single read. However we are using the cache, which last saw it with 10 events, which necessitates two reads + let! _ = service2.Read cartId + listener.TestSpans([| + span ["name", "Transact"; Tags.batches, 1; Tags.loaded_count, 8; Tags.snapshot_version, 10L + Tags.cache_hit, null; Tags.append_count, 2; Tags.snapshot_written, true] + span ["name", "Query"; Tags.batches, 2; Tags.loaded_count, 12; Tags.snapshot_version, null + Tags.cache_hit, true] + |]) + } diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 172401e55..9b5a25782 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -293,7 +293,7 @@ let dumpStats log = function | Store.Context.Dynamo _ -> Equinox.DynamoStore.Core.Log.InternalMetrics.dump log | Store.Context.Es _ -> Equinox.EventStoreDb.Log.InternalMetrics.dump log | Store.Context.Sql _ -> Equinox.SqlStreamStore.Log.InternalMetrics.dump log - | Store.Context.Mdb _ -> Equinox.MessageDb.Log.InternalMetrics.dump log + | Store.Context.Mdb _ -> () // MessageDB does not expose InternalMetrics. Use an ActivityListener instead | Store.Context.Memory _ -> () module LoadTest =