diff --git a/README.md b/README.md index 671a0f331..793d3154a 100644 --- a/README.md +++ b/README.md @@ -1909,11 +1909,11 @@ val b: Fiber[Unit, Any] < Sync = Clock.repeatWithDelay( startAfter = 1.minute, delay = 1.minute - )(a) + )(a).map(_.reduced) // Note: Fiber#reduced simplifies Fiber's second type parameter // Without an initial delay val c: Fiber[Unit, Any] < Sync = - Clock.repeatWithDelay(1.minute)(a) + Clock.repeatWithDelay(1.minute)(a).map(_.reduced) // Schedule at a specific interval, regardless // of the duration of each execution @@ -1921,11 +1921,11 @@ val d: Fiber[Unit, Any] < Sync = Clock.repeatAtInterval( startAfter = 1.minute, interval = 1.minute - )(a) + )(a).map(_.reduced) // Without an initial delay val e: Fiber[Unit, Any] < Sync = - Clock.repeatAtInterval(1.minute)(a) + Clock.repeatAtInterval(1.minute)(a).map(_.reduced) ``` Use the returned `Fiber` to control scheduled tasks. @@ -1935,7 +1935,7 @@ import kyo.* // Example task val a: Fiber[Unit, Any] < Sync = - Clock.repeatAtInterval(1.second)(()) + Clock.repeatAtInterval(1.second)(()).map(_.reduced) // Try to cancel a task def b(task: Fiber[Unit, Any]): Boolean < Sync = @@ -2249,7 +2249,7 @@ import kyo.* // taken by reference and automatically // suspended with 'Sync' val a: Fiber[Int, Any] < Sync = - Fiber.initUnscoped(Math.cos(42).toInt) + Fiber.initUnscoped(Math.cos(42).toInt).map(_.reduced) // It's possible to "extract" the value of a // 'Fiber' via the 'get' method. This is also @@ -2376,7 +2376,7 @@ val h: Future[Int] < Sync = // 'Fiber' provides a monadic API with both // 'map' and 'flatMap' val i: Fiber[Int, Any] < Sync = - a.flatMap(v => Fiber.succeed(v.eval + 1)) + a.flatMap(v => Fiber.fromResult(Abort.run(v).eval.map(_ + 1))).map(_.reduced) ``` Similarly to `Sync`, users should avoid handling the `Async` effect directly and rely on `KyoApp` instead. If strictly necessary, there are two methods to handle the `Async` effect: @@ -2393,7 +2393,7 @@ val a: Int < Async = // Avoid handling 'Async' directly val b: Fiber[Int, Any] < Sync = - Fiber.initUnscoped(a) + Fiber.initUnscoped(a).map(_.reduced) // The 'runAndBlock' method accepts // arbitrary pending effects but relies @@ -2420,7 +2420,7 @@ val b: Boolean < Sync = // Fullfil the promise with // another fiber val c: Boolean < Sync = - a.map(fiber => Fiber.initUnscoped(1).map(fiber.become(_))) + a.map(fiber => Fiber.initUnscoped(1).map(v => fiber.become(v.reduced))) ``` > A `Promise` is basically a `Fiber` with all the regular functionality plus the `complete` and `become` methods to manually fulfill the promise. diff --git a/kyo-actor/shared/src/main/scala/kyo/Actor.scala b/kyo-actor/shared/src/main/scala/kyo/Actor.scala index 8eb3f50f8..a446cc669 100644 --- a/kyo-actor/shared/src/main/scala/kyo/Actor.scala +++ b/kyo-actor/shared/src/main/scala/kyo/Actor.scala @@ -436,7 +436,7 @@ object Actor: for mailbox <- // Create a bounded channel to serve as the actor's mailbox - Channel.init[A](capacity, Access.MultiProducerSingleConsumer) + Channel.initUnscoped[A](capacity, Access.MultiProducerSingleConsumer) _subject = // Create the actor's message interface (Subject) // Messages sent through this subject are queued in the mailbox @@ -452,7 +452,7 @@ object Actor: }.handle( Sync.ensure(mailbox.close), // Ensure mailbox cleanup by closing it when the actor completes or fails Env.run(_subject), // Provide the actor's Subject to the environment so it can be accessed via Actor.self - Scope.run, // Close used resources + Scope.run, // Clean up resources Fiber.init // Start the actor's processing loop in an async context ) yield new Actor[E, A, B](_subject, _consumer): diff --git a/kyo-combinators/shared/src/main/scala/kyo/AsyncCombinators.scala b/kyo-combinators/shared/src/main/scala/kyo/AsyncCombinators.scala index d7b8de6e6..e5ef47e82 100644 --- a/kyo-combinators/shared/src/main/scala/kyo/AsyncCombinators.scala +++ b/kyo-combinators/shared/src/main/scala/kyo/AsyncCombinators.scala @@ -17,9 +17,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S)) def fork[S2]( using isolate: Isolate[S, Sync, S2], - reduce: Reducible[Abort[E]], frame: Frame - ): Fiber[A, reduce.SReduced & S2] < (Sync & S & Scope) = + ): Fiber[A, Abort[E] & S2] < (Sync & S & Scope) = Fiber.init(effect) /** Forks this computation and uses the resulting fiber within a scoped function [[f]]. Guarantees fiber interruption after usage. @@ -32,9 +31,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S)) def forkUsing[S2]( using isolate: Isolate[S, Sync, S2], - reduce: Reducible[Abort[E]], frame: Frame - )[B, S3](f: Fiber[A, reduce.SReduced & S2] => B < S3): B < (Sync & S & S3) = + )[B, S3](f: Fiber[A, Abort[E] & S2] => B < S3): B < (Sync & S & S3) = Fiber.use(effect)(f) /** Forks this computation, returning a fiber. Does not guarantee fiber interruption. @@ -47,7 +45,7 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S)) isolate: Isolate[S, Sync, S2], reduce: Reducible[Abort[E]], frame: Frame - ): Fiber[A, reduce.SReduced & S2] < (Sync & S) = + ): Fiber[A, Abort[E] & S2] < (Sync & S) = Fiber.initUnscoped(effect) /** Performs this computation and then the next one in parallel, discarding the result of this computation. diff --git a/kyo-core/shared/src/main/scala/kyo/Channel.scala b/kyo-core/shared/src/main/scala/kyo/Channel.scala index 074df78a9..9f4c1eb8b 100644 --- a/kyo-core/shared/src/main/scala/kyo/Channel.scala +++ b/kyo-core/shared/src/main/scala/kyo/Channel.scala @@ -598,7 +598,7 @@ object Channel: end close def closeAwaitEmpty()(using Frame, AllowUnsafe) = - Fiber.Unsafe.init(Result.succeed(close().isDefined)) + Fiber.Unsafe.init(Result.succeed(close().isDefined)).reduced def empty()(using AllowUnsafe, Frame) = succeedIfOpen(true) def full()(using AllowUnsafe, Frame) = succeedIfOpen(true) diff --git a/kyo-core/shared/src/main/scala/kyo/Clock.scala b/kyo-core/shared/src/main/scala/kyo/Clock.scala index 939fc431c..1213d25fd 100644 --- a/kyo-core/shared/src/main/scala/kyo/Clock.scala +++ b/kyo-core/shared/src/main/scala/kyo/Clock.scala @@ -429,10 +429,8 @@ object Clock: )( f: => Any < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[Unit, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[Unit, Abort[E]] < (Sync & S) = repeatWithDelay(Duration.Zero, delay)(f) /** Repeatedly executes a task with a fixed delay between completions, starting after an initial delay. @@ -454,10 +452,8 @@ object Clock: )( f: => Any < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[Unit, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[Unit, Abort[E]] < (Sync & S) = repeatWithDelay(startAfter, delay, ())(_ => f.unit) /** Repeatedly executes a task with a fixed delay between completions, maintaining state between executions. @@ -484,10 +480,8 @@ object Clock: )( f: A => A < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[A, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[A, Abort[E]] < (Sync & S) = repeatWithDelay(Schedule.delay(startAfter).andThen(Schedule.fixed(delay)), state)(f) /** Repeatedly executes a task with delays determined by a custom schedule. @@ -506,10 +500,8 @@ object Clock: )( f: => Any < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[Unit, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[Unit, Abort[E]] < (Sync & S) = repeatWithDelay[E, Unit, S](delaySchedule, ())(_ => f.unit) /** Repeatedly executes a task with delays determined by a custom schedule, maintaining state between executions. @@ -533,10 +525,8 @@ object Clock: )( f: A => A < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[A, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[A, Abort[E]] < (Sync & S) = Fiber.initUnscoped { Clock.use { clock => Loop(state, delaySchedule) { (state, schedule) => @@ -569,10 +559,8 @@ object Clock: )( f: => Any < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[Unit, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[Unit, Abort[E]] < (Sync & S) = repeatAtInterval(Duration.Zero, interval)(f) /** Repeatedly executes a task at fixed time intervals, starting after an initial delay. @@ -594,10 +582,8 @@ object Clock: )( f: => Any < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[Unit, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[Unit, Abort[E]] < (Sync & S) = repeatAtInterval(startAfter, interval, ())(_ => f.unit) /** Repeatedly executes a task at fixed time intervals, maintaining state between executions. @@ -624,10 +610,8 @@ object Clock: )( f: A => A < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[A, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[A, Abort[E]] < (Sync & S) = repeatAtInterval(Schedule.delay(startAfter).andThen(Schedule.fixed(interval)), state)(f) /** Repeatedly executes a task with intervals determined by a custom schedule. @@ -646,10 +630,8 @@ object Clock: )( f: => Any < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[Unit, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[Unit, Abort[E]] < (Sync & S) = repeatAtInterval(intervalSchedule, ())(_ => f.unit) /** Repeatedly executes a task with intervals determined by a custom schedule, maintaining state between executions. @@ -673,10 +655,8 @@ object Clock: )( f: A => A < (Async & Abort[E] & S) )( - using - frame: Frame, - reduce: Reducible[Abort[E]] - ): Fiber[A, reduce.SReduced] < (Sync & S) = + using frame: Frame + ): Fiber[A, Abort[E]] < (Sync & S) = Fiber.initUnscoped { Clock.use { clock => clock.now.map { now => diff --git a/kyo-core/shared/src/main/scala/kyo/Fiber.scala b/kyo-core/shared/src/main/scala/kyo/Fiber.scala index f8540ced1..88f31f174 100644 --- a/kyo-core/shared/src/main/scala/kyo/Fiber.scala +++ b/kyo-core/shared/src/main/scala/kyo/Fiber.scala @@ -43,7 +43,6 @@ opaque type Fiber[+A, -S] = IOPromiseBase[Any, A < (Async & S)] export Fiber.Promise object Fiber: - private val _unit = IOPromise(Result.succeed((): Unit < Any)) /** Creates a unit Fiber. @@ -91,14 +90,14 @@ object Fiber: /** Creates a Fiber from a Result. * * This method creates a Fiber that is immediately completed with the provided Result. The Fiber will have the same success and error - * types as the Result, with the error type reduced according to the Reducible instance. + * types as the Result. * * @param result * The Result to create the Fiber from * @return * A Fiber that is immediately completed with the provided Result */ - def fromResult[E, A, S](result: Result[E, A < S])(using reduce: Reducible[Abort[E]]): Fiber[A, reduce.SReduced & S] = + def fromResult[E, A, S](result: Result[E, A < S]): Fiber[A, Abort[E] & S] = IOPromise(result) /** Creates a Fiber from a Future. @@ -128,10 +127,8 @@ object Fiber: )( v: => A < (Abort[E] & Async & S) )( - using - reduce: Reducible[Abort[E]], - frame: Frame - ): Fiber[A, reduce.SReduced & S2] < (Sync & S & Scope) = + using frame: Frame + ): Fiber[A, Abort[E] & S2] < (Sync & S & Scope) = Scope.acquireRelease(initUnscoped[E, A, S, S2](v))(_.interrupt) /** Use an asynchronous computation running in a new Fiber, interrupting the fiber after usage. @@ -144,11 +141,10 @@ object Fiber: def use[E, A, S, S2]( using isolate: Isolate[S, Sync, S2], - reduce: Reducible[Abort[E]], frame: Frame )( v: => A < (Abort[E] & Async & S) - )[B, S3](f: Fiber[A, reduce.SReduced & S2] => B < S3): B < (Sync & S & S3) = + )[B, S3](f: Fiber[A, Abort[E] & S2] => B < S3): B < (Sync & S & S3) = initUnscoped[E, A, S, S2](v).map: fiber => Sync.ensure(fiber.interrupt)(f(fiber)) @@ -164,14 +160,12 @@ object Fiber: )( v: => A < (Abort[E] & Async & S) )( - using - reduce: Reducible[Abort[E]], - frame: Frame - ): Fiber[A, reduce.SReduced & S2] < (Sync & S) = + using frame: Frame + ): Fiber[A, Abort[E] & S2] < (Sync & S) = Isolate.internal.runDetached((trace, context) => isolate.capture { state => val io = isolate.isolate(state, v).map(r => Kyo.lift(isolate.restore(r))) - IOTask(io, trace, context).asInstanceOf[Fiber[A, reduce.SReduced & S2]] + IOTask(io, trace, context) } ) @@ -227,6 +221,11 @@ object Fiber: def waiters(using Frame): Int < Sync = Sync.Unsafe(Unsafe.waiters(self)()) + /** Reduce the Fiber's effect type. Useful especially to convert `Fiber[A, Abort[Nothing]]` to `Fiber[A, Any]` + */ + def reduced(using reduce: Reducible[S]): Fiber[A, reduce.SReduced] = + self.asInstanceOf[Fiber[A, reduce.SReduced]] + def unsafe: Fiber.Unsafe[A, S] = self end extension @@ -361,7 +360,7 @@ object Fiber: /** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */ object Unsafe: - def init[E, A, S](result: Result[E, A < S])(using allow: AllowUnsafe, reduce: Reducible[Abort[E]]): Unsafe[A, reduce.SReduced & S] = + def init[E, A, S](result: Result[E, A < S])(using allow: AllowUnsafe): Unsafe[A, Abort[E] & S] = IOPromise(result) def fromFuture[A](f: => Future[A])(using AllowUnsafe): Unsafe[A, Any] = @@ -399,6 +398,9 @@ object Fiber: def safe: Fiber[A, S] = self def waiters()(using AllowUnsafe): Int = self.lower.waiters() + + def reduced(using reduce: Reducible[S]): Unsafe[A, reduce.SReduced] = + self.asInstanceOf[Unsafe[A, reduce.SReduced]] end extension extension [E, A, S](self: Unsafe[A, Abort[E] & S]) diff --git a/kyo-core/shared/src/main/scala/kyo/Queue.scala b/kyo-core/shared/src/main/scala/kyo/Queue.scala index c1b2281d8..38b063e51 100644 --- a/kyo-core/shared/src/main/scala/kyo/Queue.scala +++ b/kyo-core/shared/src/main/scala/kyo/Queue.scala @@ -557,13 +557,15 @@ object Queue: end close final def closeAwaitEmpty()(using frame: Frame, allow: AllowUnsafe): Fiber.Unsafe[Boolean, Any] = + import scala.language.implicitConversions + val fail = Result.Failure(Closed("Queue", initFrame)) val p = Promise.Unsafe.init[Boolean, Any]() if state.compareAndSet(State.Open, State.HalfOpen(p, fail)) then handleHalfOpen() p else - Fiber.Unsafe.init(Result.succeed(false)) + Fiber.Unsafe.init(Result.succeed(false)).reduced end if end closeAwaitEmpty diff --git a/kyo-core/shared/src/main/scala/kyo/Scope.scala b/kyo-core/shared/src/main/scala/kyo/Scope.scala index 8e8cfddfe..7b2d31b63 100644 --- a/kyo-core/shared/src/main/scala/kyo/Scope.scala +++ b/kyo-core/shared/src/main/scala/kyo/Scope.scala @@ -190,8 +190,9 @@ object Scope: .map(_.foldError(_ => (), ex => Log.error("Scope finalizer failed", ex.exception))) } .handle(Fiber.initUnscoped[Nothing, Unit, Any, Any]) - .map(promise.becomeDiscard) + .map(v => promise.becomeDiscard(v.reduced)) } + end close def await(using Frame): Unit < Async = promise.get end init diff --git a/kyo-core/shared/src/main/scala/kyo/StreamCoreExtensions.scala b/kyo-core/shared/src/main/scala/kyo/StreamCoreExtensions.scala index 2f23a4d17..52f83689d 100644 --- a/kyo-core/shared/src/main/scala/kyo/StreamCoreExtensions.scala +++ b/kyo-core/shared/src/main/scala/kyo/StreamCoreExtensions.scala @@ -371,61 +371,63 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], i: Isolate[S & S2, Sync, S & S2], - ev: ConcreteTag[E | Closed], + ev: ConcreteTag[E], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = given CanEqual[Boolean | Chunk[V2], Boolean | Chunk[V2]] = CanEqual.derived Stream[V2, S & S2 & Abort[E] & Async]: - // Emit from channel of fibers to allow parallel transformations while preserving order - Channel.use[Fiber[Chunk[V2], Async & Abort[E | Closed] & S & S2]](bufferSize): channelOut => - // Concurrency limiter - Meter.useSemaphore(parallel): semaphore => - // Ensure lingering fibers are interrupted - val cleanup = Abort.run[Closed]: - Sync.ensure(channelOut.close): - Loop.foreach: - channelOut.drain.map: chunk => - if chunk.isEmpty then Loop.done - else Kyo.foreach(chunk)(_.interrupt).andThen(Loop.continue) - - // Handle original stream by running transformations in parallel, limiting concurrency - // via semaphore - val handleEmit = ArrowEffect.handleLoop(t1, stream.emit)( - handle = [C] => - (input, cont) => - // Fork async generation of chunks (with each transformation limited by semaphore) - // and publish fiber to output channel. Wait for concurrency using semaphore first to - // backpressure handler loop - semaphore.run(Fiber.initUnscoped(Async.foreach(input)(v => semaphore.run(f(v))))).map: chunkFiber => - channelOut.put(chunkFiber).andThen(Loop.continue(cont(()))) - ) + Abort.withMask[E]: mask => + // Emit from channel of fibers to allow parallel transformations while preserving order + Channel.use[Fiber[Chunk[V2], Async & Abort[mask.Masked | Closed] & S & S2]](bufferSize): channelOut => + // Concurrency limiter + Meter.useSemaphore(parallel): semaphore => + // Ensure lingering fibers are interrupted + val cleanup = Abort.run[Closed]: + Sync.ensure(channelOut.close): + Loop.foreach: + channelOut.drain.map: chunk => + if chunk.isEmpty then Loop.done + else Kyo.foreach(chunk)(_.interrupt).andThen(Loop.continue) + + // Handle original stream by running transformations in parallel, limiting concurrency + // via semaphore + val handleEmit = ArrowEffect.handleLoop(t1, mask(stream.emit))( + handle = [C] => + (input, cont) => + // Fork async generation of chunks (with each transformation limited by semaphore) + // and publish fiber to output channel. Wait for concurrency using semaphore first to + // backpressure handler loop + semaphore.run(Fiber.initUnscoped(Async.foreach(input)(v => semaphore.run(mask(f(v)))))).map: + chunkFiber => + channelOut.put(chunkFiber).andThen(Loop.continue(cont(()))) + ) - // Run stream handler in background, propagating errors to foreground - val background = - Abort.fold[E | Closed]( - // When finished, set output channel to close once it's drained - onSuccess = _ => channelOut.closeAwaitEmpty.unit, - onFail = { - case _: Closed => bug("buffer closed unexpectedly") - case e: E @unchecked => cleanup.andThen(Abort.fail(e)) - }, - onPanic = e => cleanup.andThen(Abort.panic(e)) - )(handleEmit) - - // Emit chunks from fibers published to channelOut - val emitResults = - val emit = Loop.forever: - channelOut.take.map: chunkFiber => - chunkFiber.get.map: chunk => - if chunk.nonEmpty then Emit.value(chunk) else Kyo.unit - Abort.run[Closed](emit).unit - end emitResults - - // Stream from output channel, running handlers in background - Fiber.use[E, Unit, S & S2, S & S2](background): backgroundFiber => - emitResults.andThen: - // Join background to propagate errors to foreground - backgroundFiber.get.unit + // Run stream handler in background, propagating errors to foreground + val background = + Abort.fold[mask.Masked | Closed]( + // When finished, set output channel to close once it's drained + onSuccess = _ => channelOut.closeAwaitEmpty.unit, + onFail = { + case _: Closed => bug("buffer closed unexpectedly") + case e: mask.Masked @unchecked => cleanup.andThen(Abort.fail(e)) + }, + onPanic = e => cleanup.andThen(Abort.panic(e)) + )(handleEmit) + + // Emit chunks from fibers published to channelOut + val emitResults = + val emit = Loop.forever: + channelOut.take.map: chunkFiber => + chunkFiber.get.map: chunk => + if chunk.nonEmpty then Emit.value(chunk) else Kyo.unit + Abort.run[Closed](emit).unit + end emitResults + + // Stream from output channel, running handlers in background + Fiber.use[mask.Masked, Unit, S & S2, S & S2](background): backgroundFiber => + emitResults.andThen: + // Join background to propagate errors to foreground + backgroundFiber.get.unit end mapPar /** Applies effectful transformation of stream elements asynchronously, mapping them in parallel. Preserves chunk boundaries. @@ -439,7 +441,7 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], i: Isolate[S & S2, Sync, S & S2], - ev: ConcreteTag[E | Closed], + ev: ConcreteTag[E], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = mapPar(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, i, ev, frame) @@ -460,64 +462,65 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], i: Isolate[S & S2, Sync, S & S2], - ev: ConcreteTag[E | Closed], + ev: ConcreteTag[E], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = Stream[V2, S & S2 & Abort[E] & Async]: - // Output channel containing transformed values - Channel.use[V2](bufferSize): channelOut => - // Channel containing transformation fibers. This is needed to ensure - // all transformations get published prior to completion - Channel.use[Fiber[Unit, Async & Abort[E | Closed] & S & S2]](bufferSize): channelPar => - // Concurrency limiter - Meter.useSemaphore(parallel): semaphore => - // Ensure lingering fibers are interrupted - val cleanup = Abort.run[Closed]: - Sync.ensure(channelPar.close.andThen(channelOut.close)): - Loop.foreach: - channelPar.drain.map: chunk => - if chunk.isEmpty then Loop.done - else Kyo.foreach(chunk)(_.interrupt).andThen(Loop.continue) - - // Handle original stream, asynchronously transforming input and publishing output - // using semaphore as rate limiter - val handleEmit = ArrowEffect.handleLoop(t1, stream.emit)( - handle = [C] => - (input, cont) => - // For each element in input chunk, transform and publish each to channelOut - // concurrently, limited by semaphore. Fork this collective process and publish - // fiber to channelPar in order to ensure completion/interruption. Wait for - // concurrency first using semaphore to backpressure handler loop - semaphore.run(Fiber.initUnscoped( - Async.foreachDiscard(input)(v => semaphore.run(f(v).map(channelOut.put(_)))) - )).map: fiber => - channelPar.put(fiber).andThen(Loop.continue(cont(()))) - ).andThen(channelPar.closeAwaitEmpty.unit) - - // Drain channelPar, waiting for each fiber to complete before finishing. This - // ensures background fiber does not complete until all transformations are published - val handlePar = - Abort.run[Closed]( - Loop.forever(channelPar.take.map(_.get)) - ).unit - - // Run stream handler in background, closing the output channel when finished - // and propagating failures - val background = - Abort.fold[E | Closed]( - onSuccess = _ => channelOut.closeAwaitEmpty.unit, - onFail = { - case _: Closed => bug("buffer closed unexpectedly") - case e: E @unchecked => cleanup.andThen(Abort.fail(e)) - }, - onPanic = e => cleanup.andThen(Abort.panic(e)) - )(Async.foreachDiscard(Seq(handleEmit, handlePar))(identity).unit) - - // Emit from channel while running handler in background, then joining handler - // to capture any failures from background - Fiber.use[E, Unit, S & S2, S & S2](background): backgroundFiber => - emitElementsFromChannel(channelOut).andThen: - backgroundFiber.get.unit + Abort.withMask[E]: mask => + // Output channel containing transformed values + Channel.use[V2](bufferSize): channelOut => + // Channel containing transformation fibers. This is needed to ensure + // all transformations get published prior to completion + Channel.use[Fiber[Unit, Async & Abort[Closed | mask.Masked] & S & S2]](bufferSize): channelPar => + // Concurrency limiter + Meter.useSemaphore(parallel): semaphore => + // Ensure lingering fibers are interrupted + val cleanup = Abort.run[Closed]: + Sync.ensure(channelPar.close.andThen(channelOut.close)): + Loop.foreach: + channelPar.drain.map: chunk => + if chunk.isEmpty then Loop.done + else Kyo.foreach(chunk)(_.interrupt).andThen(Loop.continue) + + // Handle original stream, asynchronously transforming input and publishing output + // using semaphore as rate limiter + val handleEmit = ArrowEffect.handleLoop(t1, mask(stream.emit))( + handle = [C] => + (input, cont) => + // For each element in input chunk, transform and publish each to channelOut + // concurrently, limited by semaphore. Fork this collective process and publish + // fiber to channelPar in order to ensure completion/interruption. Wait for + // concurrency first using semaphore to backpressure handler loop + semaphore.run(Fiber.initUnscoped( + Async.foreachDiscard(input)(v => semaphore.run(mask(f(v)).map(channelOut.put(_)))) + )).map: fiber => + channelPar.put(fiber).andThen(Loop.continue(cont(()))) + ).andThen(channelPar.closeAwaitEmpty.unit) + + // Drain channelPar, waiting for each fiber to complete before finishing. This + // ensures background fiber does not complete until all transformations are published + val handlePar = + Abort.run[Closed]( + Loop.forever(channelPar.take.map(_.get)) + ).unit + + // Run stream handler in background, closing the output channel when finished + // and propagating failures + val background = + Abort.fold[Closed | mask.Masked]( + onSuccess = _ => channelOut.closeAwaitEmpty.unit, + onFail = { + case _: Closed => bug("buffer closed unexpectedly") + case e: mask.Masked @unchecked => cleanup.andThen(Abort.fail(e)) + }, + onPanic = e => cleanup.andThen(Abort.panic(e)) + )(Async.foreachDiscard(Seq(handleEmit, handlePar))(identity).unit) + + // Emit from channel while running handler in background, then joining handler + // to capture any failures from background + Fiber.use[mask.Masked, Unit, S & S2, S & S2](background): backgroundFiber => + emitElementsFromChannel(channelOut).andThen: + backgroundFiber.get.unit end mapParUnordered /** Applies effectful transformation of stream elements asynchronously, mapping them in parallel. Does not preserve chunk @@ -532,7 +535,7 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], i: Isolate[S & S2, Sync, S & S2], - ev: ConcreteTag[E | Closed], + ev: ConcreteTag[E], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = mapParUnordered(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, i, ev, frame) @@ -555,58 +558,59 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], i: Isolate[S & S2, Sync, S & S2], - ev: ConcreteTag[E | Closed], + ev: ConcreteTag[E], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = Stream[V2, S & S2 & Abort[E] & Async]: - // Emit from channel of fibers to allow parallel transformations while preserving order - Channel.use[Fiber[Chunk[V2], Async & Abort[E | Closed] & S & S2]](bufferSize): channelOut => - // Concurrency limiter - Meter.useSemaphore(parallel): semaphore => - // Ensure lingering fibers are interrupted - val cleanup = Abort.run[Closed]: - Sync.ensure(channelOut.close): - Loop.foreach: - channelOut.drain.map: chunk => - if chunk.isEmpty then Loop.done - else Kyo.foreach(chunk)(_.interrupt).andThen(Loop.continue) - - // Handle original stream by running transformations in parallel, limiting concurrency - // via semaphore - val handleEmit = ArrowEffect.handleLoop(t1, stream.emit)( - handle = [C] => - (input, cont) => - // Transform chunk in background, publishing fiber to channelOut - semaphore.run(Fiber.initUnscoped(f(input))).map: chunkFiber => - channelOut.put(chunkFiber).andThen(Loop.continue(cont(()))) - ) + Abort.withMask[E]: mask => + // Emit from channel of fibers to allow parallel transformations while preserving order + Channel.use[Fiber[Chunk[V2], Async & Abort[Closed | mask.Masked] & S & S2]](bufferSize): channelOut => + // Concurrency limiter + Meter.useSemaphore(parallel): semaphore => + // Ensure lingering fibers are interrupted + val cleanup = Abort.run[Closed]: + Sync.ensure(channelOut.close): + Loop.foreach: + channelOut.drain.map: chunk => + if chunk.isEmpty then Loop.done + else Kyo.foreach(chunk)(_.interrupt).andThen(Loop.continue) + + // Handle original stream by running transformations in parallel, limiting concurrency + // via semaphore + val handleEmit = ArrowEffect.handleLoop(t1, mask(stream.emit))( + handle = [C] => + (input, cont) => + // Transform chunk in background, publishing fiber to channelOut + semaphore.run(Fiber.initUnscoped(mask(f(input)))).map: chunkFiber => + channelOut.put(chunkFiber).andThen(Loop.continue(cont(()))) + ) + + // Run stream handler in background, propagating errors to foreground + val background = + Abort.fold[Closed | mask.Masked]( + // When finished, set output channel to close once it's drained + onSuccess = _ => channelOut.closeAwaitEmpty.unit, + onFail = { + case _: Closed => bug("buffer closed unexpectedly") + case e: mask.Masked @unchecked => cleanup.andThen(Abort.fail(e)) + }, + onPanic = e => cleanup.andThen(Abort.panic(e)) + )(handleEmit) - // Run stream handler in background, propagating errors to foreground - val background = - Abort.fold[E | Closed]( - // When finished, set output channel to close once it's drained - onSuccess = _ => channelOut.closeAwaitEmpty.unit, - onFail = { - case _: Closed => bug("buffer closed unexpectedly") - case e: E @unchecked => cleanup.andThen(Abort.fail(e)) - }, - onPanic = e => cleanup.andThen(Abort.panic(e)) - )(handleEmit) - - // Emit chunks from fibers published to channelOut - val emitResults = - val emit = Loop.forever: - channelOut.take.map: chunkFiber => - chunkFiber.use: chunk => - if chunk.nonEmpty then Emit.value(chunk) else Kyo.unit - Abort.run[Closed](emit).unit - end emitResults - - // Stream from output channel, running handlers in background - Fiber.use[E, Unit, S & S2, S & S2](background): backgroundFiber => - emitResults.andThen: - // Join background to propagate errors to foreground - backgroundFiber.get.unit + // Emit chunks from fibers published to channelOut + val emitResults = + val emit = Loop.forever: + channelOut.take.map: chunkFiber => + chunkFiber.use: chunk => + if chunk.nonEmpty then Emit.value(chunk) else Kyo.unit + Abort.run[Closed](emit).unit + end emitResults + + // Stream from output channel, running handlers in background + Fiber.use[mask.Masked, Unit, S & S2, S & S2](background): backgroundFiber => + emitResults.andThen: + // Join background to propagate errors to foreground + backgroundFiber.get.unit end mapChunkPar /** Applies effectful transformation of stream elements asynchronously, mapping them in parallel. Preserves chunk boundaries. @@ -620,7 +624,7 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], i: Isolate[S & S2, Sync, S & S2], - ev: ConcreteTag[E | Closed], + ev: ConcreteTag[E], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = mapChunkPar(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, i, ev, frame) @@ -647,72 +651,73 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], i: Isolate[S & S2, Sync, S & S2], - ev: ConcreteTag[E | Closed], + ev: ConcreteTag[E], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = Stream[V2, S & S2 & Abort[E] & Async]: - // Output channel containing transformed values - Channel.use[Chunk[V2]](bufferSize): channelOut => - // Channel containing transformation fibers. This is needed to ensure - // all transformations get published prior to completion - Channel.use[Fiber[Unit, Async & Abort[E | Closed] & S & S2]](bufferSize): channelPar => - // Concurrency limiter - Meter.useSemaphore(parallel): semaphore => - // Ensure lingering fibers are interrupted - val cleanup = Abort.run[Closed]: - Sync.ensure(channelPar.close.andThen(channelOut.close)): - Loop.foreach: - channelPar.drain.map: chunk => - if chunk.isEmpty then Loop.done - else Kyo.foreach(chunk)(_.interrupt).andThen(Loop.continue) - - // Handle original stream, asynchronously transforming input and publishing output - // using semaphore as rate limiter - val handleEmit = ArrowEffect.handleLoop(t1, stream.emit)( - handle = [C] => - (input, cont) => - // Transform chunks and publish to channelOut in background fiber, placing - // fiber in channelPar to ensure completion/interruption - semaphore.run(Fiber.initUnscoped( - f(input).map: chunk => - channelOut.put(chunk).unit - )).map: fiber => - channelPar.put(fiber).andThen(Loop.continue(cont(()))) - ).andThen(channelPar.closeAwaitEmpty.unit) - - // Drain channelPar, waiting for each fiber to complete before finishing. This - // ensures background fiber does not complete until all transformations are published - val handlePar = - Abort.run[Closed]( - Loop.forever: - channelPar.take.map(_.get) - ).unit - - // Run stream handler in background, closing the output channel when finished - // and propagating failures - val background = - Abort.fold[E | Closed]( - onSuccess = _ => channelOut.closeAwaitEmpty.unit, - onFail = { - case _: Closed => bug("buffer closed unexpectedly") - case e: E @unchecked => cleanup.andThen(Abort.fail(e)) - }, - onPanic = e => cleanup.andThen(Abort.panic(e)) - )(Async.foreachDiscard(Seq(handleEmit, handlePar))(identity)) - - // Emit chunks from channelOut - val emitResults = - val emit = Loop.forever: - channelOut.take.map: chunk => - if chunk.nonEmpty then Emit.value(chunk) else Kyo.unit - Abort.run(emit).unit - end emitResults - - // Emit from channel while running handler in background, then joining handler - // to capture any failures from background - Fiber.use[E, Unit, S & S2, S & S2](background): backgroundFiber => - emitResults.andThen: - backgroundFiber.get.unit + Abort.withMask[E]: mask => + // Output channel containing transformed values + Channel.use[Chunk[V2]](bufferSize): channelOut => + // Channel containing transformation fibers. This is needed to ensure + // all transformations get published prior to completion + Channel.use[Fiber[Unit, Async & Abort[Closed | mask.Masked] & S & S2]](bufferSize): channelPar => + // Concurrency limiter + Meter.useSemaphore(parallel): semaphore => + // Ensure lingering fibers are interrupted + val cleanup = Abort.run[Closed]: + Sync.ensure(channelPar.close.andThen(channelOut.close)): + Loop.foreach: + channelPar.drain.map: chunk => + if chunk.isEmpty then Loop.done + else Kyo.foreach(chunk)(_.interrupt).andThen(Loop.continue) + + // Handle original stream, asynchronously transforming input and publishing output + // using semaphore as rate limiter + val handleEmit = ArrowEffect.handleLoop(t1, mask(stream.emit))( + handle = [C] => + (input, cont) => + // Transform chunks and publish to channelOut in background fiber, placing + // fiber in channelPar to ensure completion/interruption + semaphore.run(Fiber.initUnscoped( + mask(f(input)).map: chunk => + channelOut.put(chunk).unit + )).map: fiber => + channelPar.put(fiber).andThen(Loop.continue(cont(()))) + ).andThen(channelPar.closeAwaitEmpty.unit) + + // Drain channelPar, waiting for each fiber to complete before finishing. This + // ensures background fiber does not complete until all transformations are published + val handlePar = + Abort.run[Closed]( + Loop.forever: + channelPar.take.map(_.get) + ).unit + + // Run stream handler in background, closing the output channel when finished + // and propagating failures + val background = + Abort.fold[Closed | mask.Masked]( + onSuccess = _ => channelOut.closeAwaitEmpty.unit, + onFail = { + case _: Closed => bug("buffer closed unexpectedly") + case e: mask.Masked @unchecked => cleanup.andThen(Abort.fail(e)) + }, + onPanic = e => cleanup.andThen(Abort.panic(e)) + )(Async.foreachDiscard(Seq(handleEmit, handlePar))(identity)) + + // Emit chunks from channelOut + val emitResults = + val emit = Loop.forever: + channelOut.take.map: chunk => + if chunk.nonEmpty then Emit.value(chunk) else Kyo.unit + Abort.run(emit).unit + end emitResults + + // Emit from channel while running handler in background, then joining handler + // to capture any failures from background + Fiber.use[mask.Masked, Unit, S & S2, S & S2](background): backgroundFiber => + emitResults.andThen: + backgroundFiber.get.unit /** Applies effectful transformation of stream chunks asynchronously, mapping chunk in parallel. Does not preserve chunk boundaries. * @@ -728,7 +733,7 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], i: Isolate[S & S2, Sync, S & S2], - ev: ConcreteTag[E | Closed], + ev: ConcreteTag[E], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = mapChunkParUnordered(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, i, ev, frame) diff --git a/kyo-core/shared/src/test/scala/kyo/AsyncTest.scala b/kyo-core/shared/src/test/scala/kyo/AsyncTest.scala index 658392521..f726f65a3 100644 --- a/kyo-core/shared/src/test/scala/kyo/AsyncTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/AsyncTest.scala @@ -443,7 +443,7 @@ class AsyncTest extends Test: "run" in { val v: Int < Abort[Int] = 1 - val _: Fiber[Fiber[Int, Abort[Int]], Any] < Sync = Fiber.initUnscoped(Fiber.initUnscoped(v)) + val _: Fiber[Fiber[Int, Abort[Int]], Any] < Sync = Fiber.initUnscoped(Fiber.initUnscoped(v)).map(_.reduced) val _: Fiber[Int, Abort[Int | Timeout]] < Sync = Fiber.initUnscoped(KyoApp.runAndBlock(1.second)(v)) val _: Fiber[Int, Abort[Int]] < Sync = Fiber.initUnscoped(Async.mask(v)) val _: Fiber[Int, Abort[Int | Timeout]] < Sync = Fiber.initUnscoped(Async.timeout(1.second)(v)) diff --git a/kyo-core/shared/src/test/scala/kyo/StreamCoreExtensionsTest.scala b/kyo-core/shared/src/test/scala/kyo/StreamCoreExtensionsTest.scala index 11282d346..0ed53847d 100644 --- a/kyo-core/shared/src/test/scala/kyo/StreamCoreExtensionsTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/StreamCoreExtensionsTest.scala @@ -177,6 +177,24 @@ class StreamCoreExtensionsTest extends Test: Choice.run(test).andThen(succeed) } + + "should propagate Abort[Closed]" in run { + val failure = Closed("failure", summon[Frame]) + val stream = Stream.init(1 to 4).concat(Stream.init(5 to 8)).concat(Stream.init(9 to 12)) + val test = + for + par <- Choice.eval(2, 4, Async.defaultConcurrency, 1024) + buf <- Choice.eval(1, par, 4, 5, 8, 12, Int.MaxValue) + s2 = stream.mapPar(par, buf)(i => if i == 5 then Abort.fail(failure) else i + 1) + res <- Abort.run(s2.run) + yield assert( + res == Result.Failure(failure) + ) + end for + end test + + Choice.run(test).andThen(succeed) + } } "mapParUnordered" - { @@ -231,6 +249,24 @@ class StreamCoreExtensionsTest extends Test: Choice.run(test).andThen(succeed) } + + "should propagate Abort[Closed]" in run { + val failure = Closed("failure", summon[Frame]) + val stream = Stream.init(1 to 4).concat(Stream.init(5 to 8)).concat(Stream.init(9 to 12)) + val test = + for + par <- Choice.eval(2, 4, Async.defaultConcurrency, 1024) + buf <- Choice.eval(1, par, 4, 5, 8, 12, Int.MaxValue) + s2 = stream.mapParUnordered(par, buf)(i => if i == 5 then Abort.fail(failure) else i + 1) + res <- Abort.run(s2.run) + yield assert( + res == Result.Failure(failure) + ) + end for + end test + + Choice.run(test).andThen(succeed) + } } "mapChunkPar" - { @@ -285,6 +321,24 @@ class StreamCoreExtensionsTest extends Test: Choice.run(test).andThen(succeed) } + + "should propagate Abort[Closed]" in run { + val failure = Closed("failure", summon[Frame]) + val stream = Stream.init(1 to 4).concat(Stream.init(5 to 8)).concat(Stream.init(9 to 12)) + val test = + for + par <- Choice.eval(2, 4, Async.defaultConcurrency, 1024) + buf <- Choice.eval(1, par, 4, 5, 8, 12, Int.MaxValue) + s2 = stream.mapChunkPar(par, buf)(chunk => if chunk.contains(5) then Abort.fail(failure) else chunk.map(_ + 1)) + res <- Abort.run(s2.run) + yield assert( + res == Result.Failure(failure) + ) + end for + end test + + Choice.run(test).andThen(succeed) + } } "mapChunkParUnordered" - { @@ -343,6 +397,26 @@ class StreamCoreExtensionsTest extends Test: Choice.run(test).andThen(succeed) } + + "should propagate Abort[Closed]" in run { + val failure = Closed("failure", summon[Frame]) + val stream = Stream.init(1 to 4).concat(Stream.init(5 to 8)).concat(Stream.init(9 to 12)) + val test = + for + par <- Choice.eval(2, 4, Async.defaultConcurrency, 1024) + buf <- Choice.eval(1, par, 4, 5, 8, 12, Int.MaxValue) + s2 = stream.mapChunkParUnordered(par, buf)(chunk => + if chunk.contains(5) then Abort.fail(failure) else chunk.map(_ + 1) + ) + res <- Abort.run(s2.run) + yield assert( + res == Result.Failure(failure) + ) + end for + end test + + Choice.run(test).andThen(succeed) + } } def fromIteratorTests(chunkSize: Int): Unit = diff --git a/kyo-data/shared/src/main/scala/kyo/internal/ConcreteTagMacro.scala b/kyo-data/shared/src/main/scala/kyo/internal/ConcreteTagMacro.scala index 9964dccde..6f22e00a8 100644 --- a/kyo-data/shared/src/main/scala/kyo/internal/ConcreteTagMacro.scala +++ b/kyo-data/shared/src/main/scala/kyo/internal/ConcreteTagMacro.scala @@ -50,7 +50,7 @@ private[kyo] object ConcreteTagMacro: case _ => Chunk(tpe) val exprs = flatten(tpe).map(create) '{ Intersection(Set(${ Varargs(exprs) }*)) } - case _ => createSingle(tpe) + case _ => createSingle(tpe).asInstanceOf[Expr[ConcreteTag[Any]]] end match end create diff --git a/kyo-prelude/shared/src/main/scala/kyo/Abort.scala b/kyo-prelude/shared/src/main/scala/kyo/Abort.scala index 752e1fd5e..74032ae91 100644 --- a/kyo-prelude/shared/src/main/scala/kyo/Abort.scala +++ b/kyo-prelude/shared/src/main/scala/kyo/Abort.scala @@ -603,4 +603,59 @@ object Abort: end literal + sealed class CanMaskAbort + + /** A function to apply a mask to any failures of type [[E]] + */ + sealed abstract class MaskAbort[E]: + /* Masked error type */ + type Masked + + /** @param effect + * Effect including [[Abort[E]]] + * @return + * An effect with any [[Abort[E]]] failures masked as [[Masked]] + */ + def apply[A, S](effect: A < (Abort[E] & S))(using CanMaskAbort): A < (Abort[Masked] & S) + + given tag: ConcreteTag[Masked] + + end MaskAbort + + /** Use an effect with a generic error type [[EG]], ensuring that you will never accidentally handle a specific error type [[E]] that + * may or may not be a subtype of [[EG]]. Accepts a function from a [[MaskAbort]] utility that allows you to apply the mask to any + * effect using the effect [[Abort[EG]]], masking [[E]]. Unmasks the masked abort effect after the function returns. + * + * @tparam E + * Specific error type to be masked + * @tparam EG + * Generic error type that may or may not include [[EG]] as a subtype + * @param f + * Function using a [[MaskAbort]] input to mask [[E]] errors + */ + def withMask[E](using + ConcreteTag[E], + Frame + )[A, S](f: CanMaskAbort ?=> (mask: MaskAbort[E]) => A < (Abort[mask.Masked] & S)): A < (Abort[E] & S) = + given CanMaskAbort {} + case class Wr(e: E) + val wrTag = summon[ConcreteTag[Wr]] + + val mask = new MaskAbort[E]: + type Masked = Wr + def apply[A, S](effect: A < (Abort[E] & S))(using CanMaskAbort): A < (Abort[Masked] & S) = + Abort.runPartial[E](effect).map: + case Result.Success(a) => a + case Result.Failure(e) => Abort.fail(Wr(e)) + given tag: ConcreteTag[Wr] = wrTag + + val masked = f(mask) + + val unmasked: A < (Abort[E] & S) = Abort.runPartial[Wr](masked).map: + case Result.Success(a) => a + case Result.Failure(wr) => Abort.fail(wr.e) + + unmasked + end withMask + end Abort diff --git a/kyo-prelude/shared/src/test/scala/kyo/AbortTest.scala b/kyo-prelude/shared/src/test/scala/kyo/AbortTest.scala index b7a00e468..5b1fe30c0 100644 --- a/kyo-prelude/shared/src/test/scala/kyo/AbortTest.scala +++ b/kyo-prelude/shared/src/test/scala/kyo/AbortTest.scala @@ -1423,4 +1423,34 @@ class AbortTest extends Test: } } + "withMask" - { + def genericFunction[E: ConcreteTag, S](effect: Int < (Abort[E] & S)): Int < (Abort[E] & S) = + Abort.withMask[E]: mask => + Abort.runPartial[String] { + mask(effect).map: i => + if i < 0 then Abort.fail("failure") + else + mask(effect).map: j => + i + j + }.map: + case Result.Success(v) => v + case Result.Failure(e) => 0 + + "works with no masked effect" in { + val eff = 4 + assert(Abort.run[Nothing](genericFunction(eff)).eval == Result.Success(8)) + } + + "is able to handle masked effect type within scope" in { + val eff = -5 + assert(Abort.run[Nothing](genericFunction(eff)).eval == Result.Success(0)) + } + + "protects masked effect" in { + val eff = Abort.fail("string") + val result: Int < Abort[String] = genericFunction(eff) + assert(Abort.run[String](result).eval == Result.Failure("string")) + } + } + end AbortTest diff --git a/kyo-sttp/shared/src/main/scala/kyo/internal/KyoSttpMonad.scala b/kyo-sttp/shared/src/main/scala/kyo/internal/KyoSttpMonad.scala index 43a8b4b2d..6dc3590a9 100644 --- a/kyo-sttp/shared/src/main/scala/kyo/internal/KyoSttpMonad.scala +++ b/kyo-sttp/shared/src/main/scala/kyo/internal/KyoSttpMonad.scala @@ -26,9 +26,10 @@ sealed class KyoSttpMonad(using Frame) extends MonadAsyncError[M]: def ensure[A](f: M[A], e: => M[Unit]) = Promise.initWith[Unit, Any] { p => def run = - Fiber.initUnscoped(e).map(p.becomeDiscard) + Fiber.initUnscoped(e).map(v => p.becomeDiscard(v.reduced)) Sync.ensure(run)(f).map(r => p.get.andThen(r)) } + end ensure def error[A](t: Throwable) = Sync.defer(throw t)