Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1900,26 +1900,26 @@ val a: Unit < Sync =

// Recurring task with a delay between
// executions
val b: Fiber[Unit, Any] < Sync =
val b: Fiber[Unit, Abort[Nothing]] < Sync =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think It'd be important to not track Abort[Nothing] here. Sorry, I think I indicated that it'd be fine to remove Reducible from Fiber.init but I thought this would still be Any since the underlying impl is IOPromiseBase[Any, A < (Async & S)] but I'm not sure it's possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may still be possible to get rid of Reducible without tracking Abort[Nothing]. I'll give it a try

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pull it off by adding a reduce method to Fiber and Fiber.Unsafe

Clock.repeatWithDelay(
startAfter = 1.minute,
delay = 1.minute
)(a)

// Without an initial delay
val c: Fiber[Unit, Any] < Sync =
val c: Fiber[Unit, Abort[Nothing]] < Sync =
Clock.repeatWithDelay(1.minute)(a)

// Schedule at a specific interval, regardless
// of the duration of each execution
val d: Fiber[Unit, Any] < Sync =
val d: Fiber[Unit, Abort[Nothing]] < Sync =
Clock.repeatAtInterval(
startAfter = 1.minute,
interval = 1.minute
)(a)

// Without an initial delay
val e: Fiber[Unit, Any] < Sync =
val e: Fiber[Unit, Abort[Nothing]] < Sync =
Clock.repeatAtInterval(1.minute)(a)
```

Expand All @@ -1929,15 +1929,15 @@ Use the returned `Fiber` to control scheduled tasks.
import kyo.*

// Example task
val a: Fiber[Unit, Any] < Sync =
val a: Fiber[Unit, Abort[Nothing]] < Sync =
Clock.repeatAtInterval(1.second)(())

// Try to cancel a task
def b(task: Fiber[Unit, Any]): Boolean < Sync =
def b(task: Fiber[Unit, Abort[Nothing]]): Boolean < Sync =
task.interrupt

// Check if a task is done
def c(task: Fiber[Unit, Any]): Boolean < Sync =
def c(task: Fiber[Unit, Abort[Nothing]]): Boolean < Sync =
task.done
```

Expand Down Expand Up @@ -2243,7 +2243,7 @@ import kyo.*
// Fork a computation. The parameter is
// taken by reference and automatically
// suspended with 'Sync'
val a: Fiber[Int, Any] < Sync =
val a: Fiber[Int, Abort[Nothing]] < Sync =
Fiber.initUnscoped(Math.cos(42).toInt)

// It's possible to "extract" the value of a
Expand Down Expand Up @@ -2327,7 +2327,7 @@ import scala.concurrent.Future
val a: Future[Int] = Future.successful(42)

// Transform a 'Future' into a 'Fiber'
val b: Fiber[Int, Any] < Sync =
val b: Fiber[Int, Abort[Nothing]] < Sync =
Fiber.fromFuture(a)
```

Expand All @@ -2340,7 +2340,7 @@ import kyo.*
import scala.concurrent.*

// An example fiber
val a: Fiber[Int, Any] = Fiber.succeed(42)
val a: Fiber[Int, Abort[Nothing]] = Fiber.succeed(42)

// Check if the fiber is done
val b: Boolean < Sync =
Expand Down Expand Up @@ -2370,8 +2370,8 @@ val h: Future[Int] < Sync =

// 'Fiber' provides a monadic API with both
// 'map' and 'flatMap'
val i: Fiber[Int, Any] < Sync =
a.flatMap(v => Fiber.succeed(v.eval + 1))
val i: Fiber[Int, Abort[Nothing]] < Sync =
a.flatMap(v => Fiber.fromResult(Abort.run(v).eval.map(_ + 1)))
```

Similarly to `Sync`, users should avoid handling the `Async` effect directly and rely on `KyoApp` instead. If strictly necessary, there are two methods to handle the `Async` effect:
Expand All @@ -2387,7 +2387,7 @@ val a: Int < Async =
Fiber.initUnscoped(Math.cos(42).toInt).map(_.get)

// Avoid handling 'Async' directly
val b: Fiber[Int, Any] < Sync =
val b: Fiber[Int, Abort[Nothing]] < Sync =
Fiber.initUnscoped(a)

// The 'runAndBlock' method accepts
Expand Down
1 change: 0 additions & 1 deletion kyo-actor/shared/src/main/scala/kyo/Actor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ object Actor:
Scope.run, // Close used resources
Fiber.init // Start the actor's processing loop in an async context
)
_ <- Scope.ensure(mailbox.close) // Registers a finalizer in the outer scope to provide the actor hierarchy behavior
yield new Actor[E, A, B](_subject, _consumer):
def close(using Frame) = mailbox.close
end Actor
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S))
def fork[S2](
using
isolate: Isolate[S, Sync, S2],
reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[A, reduce.SReduced & S2] < (Sync & S & Scope) =
): Fiber[A, Abort[E] & S2] < (Sync & S & Scope) =
Fiber.init(effect)

/** Forks this computation and uses the resulting fiber within a scoped function [[f]]. Guarantees fiber interruption after usage.
Expand All @@ -32,9 +31,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S))
def forkUsing[S2](
using
isolate: Isolate[S, Sync, S2],
reduce: Reducible[Abort[E]],
frame: Frame
)[B, S3](f: Fiber[A, reduce.SReduced & S2] => B < S3): B < (Sync & S & S3) =
)[B, S3](f: Fiber[A, Abort[E] & S2] => B < S3): B < (Sync & S & S3) =
Fiber.use(effect)(f)

/** Forks this computation, returning a fiber. Does not guarantee fiber interruption.
Expand All @@ -47,7 +45,7 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S))
isolate: Isolate[S, Sync, S2],
reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[A, reduce.SReduced & S2] < (Sync & S) =
): Fiber[A, Abort[E] & S2] < (Sync & S) =
Fiber.initUnscoped(effect)

/** Performs this computation and then the next one in parallel, discarding the result of this computation.
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/Barrier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object Barrier:

/** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */
sealed abstract class Unsafe:
def await()(using AllowUnsafe): Fiber.Unsafe[Unit, Any]
def await()(using AllowUnsafe): Fiber.Unsafe[Unit, Abort[Nothing]]
def pending()(using AllowUnsafe): Int
def safe: Barrier = Barrier(this)
end Unsafe
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ object Channel:
def drain()(using AllowUnsafe, Frame): Result[Closed, Chunk[A]]
def drainUpTo(max: Int)(using AllowUnsafe, Frame): Result[Closed, Chunk[A]]
def close()(using Frame, AllowUnsafe): Maybe[Seq[A]]
def closeAwaitEmpty()(using Frame, AllowUnsafe): Fiber.Unsafe[Boolean, Any]
def closeAwaitEmpty()(using Frame, AllowUnsafe): Fiber.Unsafe[Boolean, Abort[Nothing]]

def empty()(using AllowUnsafe, Frame): Result[Closed, Boolean]
def full()(using AllowUnsafe, Frame): Result[Closed, Boolean]
Expand Down
64 changes: 22 additions & 42 deletions kyo-core/shared/src/main/scala/kyo/Clock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ final case class Clock(unsafe: Clock.Unsafe):
*/
def deadline(duration: Duration)(using Frame): Clock.Deadline < Sync = Sync.Unsafe(unsafe.deadline(duration).safe)

private[kyo] def sleep(duration: Duration)(using Frame): Fiber[Unit, Any] < Sync =
private[kyo] def sleep(duration: Duration)(using Frame): Fiber[Unit, Abort[Nothing]] < Sync =
if duration == Duration.Zero then Fiber.unit
else if !duration.isFinite then Fiber.never
else Sync.Unsafe(unsafe.sleep(duration).safe)
Expand Down Expand Up @@ -389,7 +389,7 @@ object Clock:
def nowMonotonic(using Frame): Duration < Sync =
Sync.Unsafe.withLocal(local)(_.unsafe.nowMonotonic())

private[kyo] def sleep(duration: Duration)(using Frame): Fiber[Unit, Any] < Sync =
private[kyo] def sleep(duration: Duration)(using Frame): Fiber[Unit, Abort[Nothing]] < Sync =
Sync.Unsafe.withLocal(local)(_.unsafe.sleep(duration).safe)

/** Creates a new stopwatch using the local Clock instance.
Expand Down Expand Up @@ -429,10 +429,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatWithDelay(Duration.Zero, delay)(f)

/** Repeatedly executes a task with a fixed delay between completions, starting after an initial delay.
Expand All @@ -454,10 +452,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatWithDelay(startAfter, delay, ())(_ => f.unit)

/** Repeatedly executes a task with a fixed delay between completions, maintaining state between executions.
Expand All @@ -484,10 +480,8 @@ object Clock:
)(
f: A => A < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[A, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E]] < (Sync & S) =
repeatWithDelay(Schedule.delay(startAfter).andThen(Schedule.fixed(delay)), state)(f)

/** Repeatedly executes a task with delays determined by a custom schedule.
Expand All @@ -506,10 +500,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatWithDelay[E, Unit, S](delaySchedule, ())(_ => f.unit)

/** Repeatedly executes a task with delays determined by a custom schedule, maintaining state between executions.
Expand All @@ -533,10 +525,8 @@ object Clock:
)(
f: A => A < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[A, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E]] < (Sync & S) =
Fiber.initUnscoped {
Clock.use { clock =>
Loop(state, delaySchedule) { (state, schedule) =>
Expand Down Expand Up @@ -569,10 +559,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatAtInterval(Duration.Zero, interval)(f)

/** Repeatedly executes a task at fixed time intervals, starting after an initial delay.
Expand All @@ -594,10 +582,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatAtInterval(startAfter, interval, ())(_ => f.unit)

/** Repeatedly executes a task at fixed time intervals, maintaining state between executions.
Expand All @@ -624,10 +610,8 @@ object Clock:
)(
f: A => A < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[A, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E]] < (Sync & S) =
repeatAtInterval(Schedule.delay(startAfter).andThen(Schedule.fixed(interval)), state)(f)

/** Repeatedly executes a task with intervals determined by a custom schedule.
Expand All @@ -646,10 +630,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatAtInterval(intervalSchedule, ())(_ => f.unit)

/** Repeatedly executes a task with intervals determined by a custom schedule, maintaining state between executions.
Expand All @@ -673,10 +655,8 @@ object Clock:
)(
f: A => A < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[A, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E]] < (Sync & S) =
Fiber.initUnscoped {
Clock.use { clock =>
clock.now.map { now =>
Expand Down
Loading