Skip to content

Conversation

@johnhungerford
Copy link
Collaborator

Fixes #1387

Problem

For concurrent stream transformations using Channel and/or Meter, there is a bug (#1387) where if the transformation effect f fails with Abort[Closed], that failure is interpreted as end-of-stream.

Solution

Added Abort.withMask which allows you to mask an Abort failure type that may or may not be within a generic type intersection to protect it from being handled accidentally.

Notes

A couple significant changes to make this work:

  1. ConcreteTag derivation now does an implicit search for createSingle so that the tag for a union can be derived from tags of single types
  2. Fiber.init and other Async methods do not use Reducible to eliminate Abort[Nothing]. Instead, the S type parameter in Fiber is constrained to being a subtype of Abort[Nothing] to reflect the coupling of Fiber to failures.

README.md Outdated
// Recurring task with a delay between
// executions
val b: Fiber[Unit, Any] < Sync =
val b: Fiber[Unit, Abort[Nothing]] < Sync =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think It'd be important to not track Abort[Nothing] here. Sorry, I think I indicated that it'd be fine to remove Reducible from Fiber.init but I thought this would still be Any since the underlying impl is IOPromiseBase[Any, A < (Async & S)] but I'm not sure it's possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may still be possible to get rid of Reducible without tracking Abort[Nothing]. I'll give it a try

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pull it off by adding a reduce method to Fiber and Fiber.Unsafe

*/
def apply[A, S](effect: A < (Abort[EG] & S)): A < (Abort[EG | Masked] & S)

given tag: ConcreteTag[Masked]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems this doesn't need to be in the abstract class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't make it work without it here. If you need to call Abort.run[mask.Masked] within a withMask effect, this is the only way I can figure out to make the ConcreteTag available to the compiler.

def apply[A, S](effect: A < (Abort[EG] & S)): A < (Abort[EG | Masked] & S) =
Abort.runPartial[E](effect).map:
case Result.Success(a) => a
case Result.Failure(e) => Abort.fail(Wr(e))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice insight :) I'm wondering if we could have a more generic version of this mechanism since it's essentially temporarily swapping the tag. I couldn't test it and there's some type safety challenges but I wonder if this could be a good path:

sealed trait Mask[E] extends ArrowEffect[Const[Any], Const[Any]]

object Mask:

    def apply[E <: ArrowEffect[?, ?]](using Frame)[A, S](v: A < (E & S))(using eTag: Tag[E], maskTag: Tag[Mask[E]]): A < (Mask[E] & S) =
        ArrowEffect.handle(eTag, v) {
            [C] => (input, cont) => ArrowEffect.suspendWith(maskTag, input)(cont)
        }

    def run[E <: ArrowEffect[?, ?]](using Frame)[A, S](v: A < (Mask[E] & S))(using eTag: Tag[E], maskTag: Tag[Mask[E]]): A < (E & S) =
        ArrowEffect.handle(maskTag, v) {
            [C] => (input: Any, cont) => ArrowEffect.suspendWith(eTag, input)(cont)
        }

It seems it should work to prevent E suspensions from being handled until Mask.run. Ah, but there's also the challenge that Abort still uses erased tags, though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried something like this too, but I actually need to preserve the Abort so that I can do error handling. So I can't mask Abort[E] as Mask[Abort[E]], but it would have to be Abort[E] -> Abort[Mask[E]]. That's why we need a specialized approach for abort.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that Abort[E1] & Abort[E2] is equivalent to Abort[E1 | E2], it seems Mask[Abort[E1]] would work to mask the appropriate failures? That's considering a scenario where Abort doesn't use erased tags

Copy link
Collaborator Author

@johnhungerford johnhungerford Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mask[Abort[E1]] would mask the failure, but I would not be able to then call Abort.run and handle the masked case separately from the unmasked case. The use case is that I'm given some effect A < (Abort[E] & S), and I don't know whether Closed is a subtype of E. I need to compose this with an effect of type Unit < Abort[Closed] effect and then handle all errors on it. I need to do something different depending on whether the failure comes from the Closed abort on the second effect or whether it's a failure E from the first effect, which may very well include Closed from some other resource.

johnhungerford added a commit that referenced this pull request Jul 25, 2025
)

<!--
PRs require an approval from any of the core contributors, other than
the PR author.

Include this header if applicable:
Fixes #issue1, #issue2, ...
-->

Prerequisite for #1393. See
[discussion](#1393 (comment)).

### Problem
<!--
Explain here the context, and why you're making this change. What is the
problem you're trying to solve?
-->

The derivation macro for `ConcreteTag` currently does not search for
given `ConcreteTag` instances for components `A` and `B` in composite
types like `A | B` or `A & B`. This forces users to require implicit
evidence for the full composite type in cases where only one of the
types is generic (e.g., `def someMethod[E](A < Abort[E | Closed])(using
ConcreteTag[E | Closed]) ...`). This creates a more severe problem when
one of the types in the intersection isn't known in the type signature,
like when one of the types is a dependent type.

### Solution
<!--
Describe your solution. Focus on helping reviewers understand your
technical approach and implementation decisions.
-->

Add an implicit search in the `createSingle` method of the `ConcreteTag`
derivation macro to look for a given instance prior to trying to
construct it from the `TypeRepr`.

### Notes
<!--
Add any important additional information as bullet points, such as:
- Implementation details reviewers should know about
- Open questions and concerns
- Limitations
-->

Also updated some names which hadn't been updated from the previous name
`SafeClassTag`.
@johnhungerford
Copy link
Collaborator Author

@hearnadam @ahoy-jon since Flavio has stepped out, could one of you take a look at this? I think I have addressed all the issues. Regardless of whether we adopt a more general Mask or Guard effect, we will need a specialized version for Abort.

* [[Fiber.Unsafe]] for low-level operations requiring [[AllowUnsafe]]
*/
opaque type Fiber[+A, -S] = IOPromiseBase[Any, A < (Async & S)]
opaque type Fiber[+A, -S <: Abort[Nothing]] = IOPromiseBase[Any, A < (Async & S)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make this change separately? We probably need Scalafix rule? @ahoy-jon

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is quite old 🙂

* A Fiber that is immediately completed with the provided Result
*/
def fromResult[E, A, S](result: Result[E, A < S])(using reduce: Reducible[Abort[E]]): Fiber[A, reduce.SReduced & S] =
def fromResult[E, A, S](result: Result[E, A < S]): Fiber[A, Abort[E] & S] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a frame?

reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[A, reduce.SReduced & S2] < (Sync & S & Scope) =
using frame: Frame
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these don't need to be named anymore, since you are not using reduce anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG]: Closed error propagation in Stream parallel map functions

3 participants