From ad6ef904a5c9a22cc867d729679ee7d8a313599e Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 19 Jun 2024 18:58:14 -0500 Subject: [PATCH 1/6] Implemented asynchronous cancelation in `PureConc` --- .../cats/effect/kernel/testkit/pure.scala | 141 +++++++++++------- .../scala/cats/effect/laws/PureConcSpec.scala | 87 +++++++++++ 2 files changed, 176 insertions(+), 52 deletions(-) diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala index c784639d3f..066bcbedbd 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala @@ -17,7 +17,7 @@ package cats.effect.kernel package testkit -import cats.{~>, Defer, Eq, Functor, Id, Monad, MonadError, Order, Show} +import cats.{~>, Applicative, Defer, Eq, Functor, Id, Monad, MonadError, Order, Show} import cats.data.{Kleisli, State, WriterT} import cats.effect.kernel._ import cats.free.FreeT @@ -97,45 +97,48 @@ object pure { type Main[X] = MVarR[ResolvedPC[E, *], X] MVar.empty[Main, Outcome[PureConc[E, *], E, A]].flatMap { state0 => - val state = state0[Main] - - val fiber = new PureFiber[E, A](state0) + MVar.empty[Main, Unit] flatMap { canceled0 => + val state = state0[Main] + val fiber = new PureFiber[E, A](state0, canceled0) + + val identified = canceled mapF { ta => + val fk = new (FiberR[E, *] ~> IdOC[E, *]) { + def apply[a](ke: FiberR[E, a]) = + ke.run(FiberCtx(fiber)) + } - val identified = canceled mapF { ta => - val fk = new (FiberR[E, *] ~> IdOC[E, *]) { - def apply[a](ke: FiberR[E, a]) = - ke.run(FiberCtx(fiber)) + ta.mapK(fk) } - ta.mapK(fk) - } + import Outcome._ - import Outcome._ + val body = identified flatMap { a => + state.tryPut(Succeeded(a.pure[PureConc[E, *]])) + } handleErrorWith { e => state.tryPut(Errored(e)) } - val body = identified flatMap { a => - state.tryPut(Succeeded(a.pure[PureConc[E, *]])) - } handleErrorWith { e => state.tryPut(Errored(e)) } + val results = state.read.flatMap { + case Canceled() => (Outcome.Canceled(): IdOC[E, A]).pure[Main] + case Errored(e) => (Outcome.Errored(e): IdOC[E, A]).pure[Main] - val results = state.read.flatMap { - case Canceled() => (Outcome.Canceled(): IdOC[E, A]).pure[Main] - case Errored(e) => (Outcome.Errored(e): IdOC[E, A]).pure[Main] + case Succeeded(fa) => + val identifiedCompletion = fa.mapF { ta => + val fk = new (FiberR[E, *] ~> IdOC[E, *]) { + def apply[a](ke: FiberR[E, a]) = + ke.run(FiberCtx(fiber)) + } - case Succeeded(fa) => - val identifiedCompletion = fa.mapF { ta => - val fk = new (FiberR[E, *] ~> IdOC[E, *]) { - def apply[a](ke: FiberR[E, a]) = - ke.run(FiberCtx(fiber)) + ta.mapK(fk) } - ta.mapK(fk) - } + identifiedCompletion.map(a => Succeeded[Id, E, A](a): IdOC[E, A]) handleError { + e => Errored(e) + } + } - identifiedCompletion.map(a => Succeeded[Id, E, A](a): IdOC[E, A]) handleError { e => - Errored(e) - } + Kleisli.ask[ResolvedPC[E, *], MVar.Universe].map { u => + ApplicativeThread[ResolvedPC[E, *]].start(body.run(u)) >> results.run(u) + } } - - Kleisli.ask[ResolvedPC[E, *], MVar.Universe].map { u => body.run(u) >> results.run(u) } } } @@ -164,8 +167,9 @@ object pure { case (List(results), _) => results.mapK(optLift) case (_, false) => Outcome.Succeeded(None) - // we could make a writer that only receives one object, but that seems meh. just pretend we deadlocked - case _ => Outcome.Succeeded(None) + // in the case of never and such, we are awaiting the async cancel monitor + // this scenario only arises if the main fiber self cancels + case _ => Outcome.Canceled() } } @@ -197,20 +201,16 @@ object pure { } def canceled: PureConc[E, Unit] = - Thread.annotate("canceled") { - withCtx { ctx => - if (ctx.masks.isEmpty) - uncancelable(_ => ctx.self.cancel >> ctx.finalizers.sequence_ >> Thread.done) - else - ctx.self.cancel - } - } + Thread.annotate("canceled")(withCtx(_.self.cancelAndRealize.ifM(Thread.done, unit))) def cede: PureConc[E, Unit] = Thread.cede def never[A]: PureConc[E, A] = - Thread.annotate("never")(Thread.done[A]) + withCtx[E, A] { ctx => + // we monitor for asynchronous cancelation. if we're masked, this won't cancel and we hang + Thread.annotate("never")(ctx.self.awaitCancelation *> Thread.done) + } def ref[A](a: A): PureConc[E, Ref[PureConc[E, *], A]] = MVar[PureConc[E, *], A](a).flatMap(mVar => Kleisli.pure(unsafeRef(mVar))) @@ -273,7 +273,19 @@ object pure { private def unsafeDeferred[A](mVar: MVar[A]): Deferred[PureConc[E, *], A] = new Deferred[PureConc[E, *], A] { - override def get: PureConc[E, A] = mVar.read[PureConc[E, *]] + override def get: PureConc[E, A] = + withCtx { ctx => + // we need to race cancelation against reading the mvar + // if cancelation wins, we shut down the thread + MVar.empty[PureConc[E, *], Option[A]] flatMap { signal => + val left = Thread.start(ctx.self.awaitCancelation.ifM(signal.tryPut[PureConc[E, *]](None).void, unit)) + val right = Thread.start(mVar.read[PureConc[E, *]].flatMap(a => signal.tryPut[PureConc[E, *]](Some(a)))) + + left *> + right *> + signal.read[PureConc[E, *]].flatMap(_.map(_.pure[PureConc[E, *]]).getOrElse(Thread.done)) + } + } override def complete(a: A): PureConc[E, Boolean] = mVar.tryPut[PureConc[E, *]](a) @@ -283,12 +295,14 @@ object pure { def start[A](fa: PureConc[E, A]): PureConc[E, Fiber[PureConc[E, *], E, A]] = Thread.annotate("start", true) { MVar.empty[PureConc[E, *], Outcome[PureConc[E, *], E, A]].flatMap { state => - val fiber = new PureFiber[E, A](state) + MVar.empty[PureConc[E, *], Unit] flatMap { canceled => + val fiber = new PureFiber[E, A](state, canceled) - // the tryPut here is interesting: it encodes first-wins semantics on cancelation/completion - val body = guaranteeCase(fa)(state.tryPut[PureConc[E, *]](_).void) - val identified = localCtx(FiberCtx(fiber), body) - Thread.start(identified.attempt.void).as(fiber) + // the tryPut here is interesting: it encodes first-wins semantics on cancelation/completion + val body = guaranteeCase(fa)(state.tryPut[PureConc[E, *]](_).void) + val identified = localCtx(FiberCtx(fiber), body) + Thread.start(identified.attempt.void).as(fiber) + } } } @@ -363,14 +377,16 @@ object pure { } // todo: MVar is not Serializable, release then update here - final class PureFiber[E, A](val state0: MVar[Outcome[PureConc[E, *], E, A]]) + final class PureFiber[E, A]( + val state0: MVar[Outcome[PureConc[E, *], E, A]], + val canceled0: MVar[Unit]) extends Fiber[PureConc[E, *], E, A] with Serializable { private[this] val state = state0[PureConc[E, *]] private[pure] val canceled: PureConc[E, Boolean] = - state.tryRead.map(_.map(_.fold(true, _ => false, _ => false)).getOrElse(false)) + canceled0.tryRead[PureConc[E, *]].map(_.as(true).getOrElse(false)) private[pure] val realizeCancelation: PureConc[E, Boolean] = withCtx { ctx => @@ -379,7 +395,10 @@ object pure { checkM.ifM( canceled.ifM( // if unmasked and canceled, finalize - allocateForPureConc[E].uncancelable(_ => ctx.finalizers.sequence_.as(true)), + allocateForPureConc[E] uncancelable { _ => + ctx.finalizers.sequence_.as(true) <* state0.tryPut[PureConc[E, *]]( + Outcome.Canceled()) + }, // if unmasked but not canceled, ignore false.pure[PureConc[E, *]] ), @@ -388,9 +407,27 @@ object pure { ) } - val cancel: PureConc[E, Unit] = state.tryPut(Outcome.Canceled()).void + private[pure] val awaitCancelation: PureConc[E, Boolean] = + canceled0.read[PureConc[E, *]] *> realizeCancelation + + private[pure] val cancelAndRealize: PureConc[E, Boolean] = + canceled0.tryPut[PureConc[E, *]](()) *> realizeCancelation + + val join: PureConc[E, Outcome[PureConc[E, *], E, A]] = { + val Thread = ApplicativeThread[PureConc[E, *]] + + withCtx { ctx => + // this is exactly like Deferred#get + MVar.empty[PureConc[E, *], Option[Outcome[PureConc[E, *], E, A]]] flatMap { signal => + // note we must read our *own* canceled, not the target fiber's + val left = Thread.start(ctx.self.awaitCancelation.ifM(signal.tryPut[PureConc[E, *]](None).void, Applicative[PureConc[E, *]].unit)) + val right = Thread.start(state.read.flatMap(oc => signal.tryPut[PureConc[E, *]](Some(oc)))) + + left *> right *> signal.read[PureConc[E, *]].flatMap(_.map(_.pure[PureConc[E, *]]).getOrElse(Thread.done)) + } + } + } - val join: PureConc[E, Outcome[PureConc[E, *], E, A]] = - state.read + val cancel: PureConc[E, Unit] = canceled0.tryPut[PureConc[E, *]](()) *> join.void } } diff --git a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala index 5b367f4037..911704ba8a 100644 --- a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala +++ b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala @@ -49,6 +49,8 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { } "short-circuit on canceled" in { + pure.run(F.canceled) mustEqual Outcome.Canceled() + pure.run((F.never[Unit], F.canceled).parTupled) mustEqual Outcome.Canceled() pure.run((F.never[Unit], F.canceled).parTupled.start.flatMap(_.join)) mustEqual Outcome .Succeeded(Some(Outcome.canceled[F, Nothing, Unit])) pure.run((F.canceled, F.never[Unit]).parTupled.start.flatMap(_.join)) mustEqual Outcome @@ -75,6 +77,91 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { } } + "core PC state machine" should { + import cats.effect.kernel.{GenConcurrent, Outcome} + import cats.effect.kernel.implicits._ + import cats.syntax.all._ + + type F[A] = PureConc[Int, A] + val F = GenConcurrent[F] + + "run finalizers when canceling never" in { + val t = for { + c <- F.ref(0) + latch <- F.deferred[Unit] + fib <- F.start((latch.complete(()) *> F.never[Unit]).onCancel(c.update(_ + 1))) + _ <- latch.get + _ <- fib.cancel + v <- c.get + } yield v + + pure.run(t) mustEqual Outcome.Succeeded(Some(1)) + } + + "run finalizers when canceling Deferred#get" in { + val t = for { + c <- F.ref(0) + latch <- F.deferred[Unit] + hang <- F.deferred[Unit] + fib <- F.start((latch.complete(()) *> hang.get).onCancel(c.update(_ + 1))) + _ <- latch.get + _ <- fib.cancel + v <- c.get + } yield v + + pure.run(t) mustEqual Outcome.Succeeded(Some(1)) + } + + "run finalizers when canceling Fiber#join" in { + val t = for { + c <- F.ref(0) + latch <- F.deferred[Unit] + hang <- F.start(F.never[Unit]) + fib <- F.start((latch.complete(()) *> hang.join).onCancel(c.update(_ + 1))) + _ <- latch.get + _ <- fib.cancel + v <- c.get + } yield v + + pure.run(t) mustEqual Outcome.Succeeded(Some(1)) + } + + "hang when canceling uncancelable never" in { + val t = for { + latch <- F.deferred[Unit] + f <- F.start((latch.complete(()) *> F.never[Unit]).uncancelable) + _ <- latch.get + _ <- f.cancel + } yield () + + pure.run(t) mustEqual Outcome.Succeeded(None) + } + + "hang when canceling uncancelable Deferred#get" in { + val t = for { + latch <- F.deferred[Unit] + hang <- F.deferred[Unit] + f <- F.start((latch.complete(()) *> hang.get).uncancelable) + _ <- latch.get + _ <- f.cancel + } yield () + + pure.run(t) mustEqual Outcome.Succeeded(None) + } + + "hang when canceling uncancelable Fiber#join" in { + val t = for { + latch <- F.deferred[Unit] + hang <- F.start(F.never[Unit]) + f <- F.start((latch.complete(()) *> hang.join).uncancelable) + _ <- latch.get + _ <- f.cancel + } yield () + + pure.run(t) mustEqual Outcome.Succeeded(None) + } + } + checkAll( "TimeT[PureConc]", GenTemporalTests[TimeT[PureConc[Int, *], *], Int].temporal[Int, Int, Int](10.millis) From 2f1ad92d0ffca2041cd4f34ce8a278b88133e606 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 25 Oct 2024 12:09:55 -0500 Subject: [PATCH 2/6] Beefed up PC tests --- .../scala/cats/effect/laws/PureConcSpec.scala | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala index 911704ba8a..ebb8f8a304 100644 --- a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala +++ b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala @@ -23,6 +23,8 @@ import cats.effect.kernel.testkit.pure._ import cats.laws.discipline.arbitrary._ import org.scalacheck.Prop +import org.scalacheck.rng.Seed +import org.specs2.scalacheck.Parameters import org.specs2.mutable._ import org.typelevel.discipline.specs2.mutable.Discipline @@ -160,10 +162,68 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { pure.run(t) mustEqual Outcome.Succeeded(None) } + + "run finalizers in order" in { + val t = for { + results <- F.ref[String]("") + f <- F start { + F.canceled.onCancel(results.update(_ + "A")).onCancel(results.update(_ + "B")) + } + _ <- f.join + back <- results.get + } yield back + + pure.run(t) mustEqual Outcome.Succeeded(Some("AB")) + } + + "correctly interpret uncancelable cancelation followed by suspension" in { + val t = F.uncancelable(_ => F.canceled *> F.never[Unit]) + pure.run(t) mustEqual Outcome.Succeeded(None) + + val forked = pure.run(F.start(t).flatMap(_.joinWith(F.canceled *> F.never[Unit]))) + forked mustEqual Outcome.Succeeded(None) + } + + "implement locals via Kleisli and FreeT" in { + import cats.{~>, Eval, Id} + import cats.data.Kleisli + import cats.free.FreeT + import cats.syntax.all._ + + type F[A] = FreeT[Id, Kleisli[Eval, Int, *], A] + + def read[A](f: Int => F[A]): F[A] = + FreeT.liftT(Kleisli.ask[Eval, Int]).flatMap(f) + + def withLocal[A](i: Int)(fa: F[A]): F[A] = + fa.mapK(new (Kleisli[Eval, Int, *] ~> Kleisli[Eval, Int, *]) { + def apply[a](kea: Kleisli[Eval, Int, a]) = + Kleisli((_: Int) => kea(i)) + }) + + def run[A](i: Int)(fa: F[A]): A = + fa.runM(fta => Kleisli.liftF(Eval.now(fta))).apply(i).value + + val _ = run(1) { + withLocal(42) { + read { i => + FreeT.liftT[Id, Kleisli[Eval, Int, *], Unit](Kleisli.liftF[Eval, Int, Unit](Eval.later({i mustEqual 42; ()}))).flatMap(_ => + read { i2 => + FreeT.liftT(Kleisli.liftF(Eval.later(i2 mustEqual 42))) + } + ) + } + } *> read { i => + FreeT.liftT(Kleisli.liftF(Eval.later(i mustEqual 1))) + } + } + + ok + } } checkAll( "TimeT[PureConc]", GenTemporalTests[TimeT[PureConc[Int, *], *], Int].temporal[Int, Int, Int](10.millis) - ) + )(Parameters(seed = Seed.fromBase64("ogn64yom4GXCEX0mXdqSfsqSeJxI2RbPUFC5YkvDtzD=").toOption)) } From feffe661da3ec4e428e0e29b6c57c7b68a7ecece Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 21 Jun 2026 18:40:54 -0500 Subject: [PATCH 3/6] Fixed PureConc poll cancelation --- .../cats/effect/kernel/testkit/pure.scala | 573 +++++++++++++++--- .../scala/cats/effect/laws/PureConcSpec.scala | 70 ++- 2 files changed, 533 insertions(+), 110 deletions(-) diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala index 066bcbedbd..b51164d97a 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala @@ -17,7 +17,7 @@ package cats.effect.kernel package testkit -import cats.{~>, Applicative, Defer, Eq, Functor, Id, Monad, MonadError, Order, Show} +import cats.{~>, Defer, Eq, Functor, Id, Monad, MonadError, Order, Show} import cats.data.{Kleisli, State, WriterT} import cats.effect.kernel._ import cats.free.FreeT @@ -41,10 +41,40 @@ object pure { implicit val eq: Eq[MaskId] = Eq.fromUniversalEquals[MaskId] } + private[pure] final case class MaskFrame(id: MaskId) + + private[pure] sealed trait CancelationSignal[E] + + private[pure] object CancelationSignal { + final case class External[E]() extends CancelationSignal[E] + final case class Self[E](finalizers: List[PureConc[E, Unit]]) extends CancelationSignal[E] + } + + private[pure] final class CancelationListenerId + + private[pure] object CancelationListenerId { + implicit val eq: Eq[CancelationListenerId] = + Eq.fromUniversalEquals[CancelationListenerId] + } + + private[pure] final case class CancelationListener[E]( + id: CancelationListenerId, + action: PureConc[E, Unit]) + + private sealed trait MaskUpdate + + private object MaskUpdate { + case object Removed extends MaskUpdate + case object Shadowed extends MaskUpdate + case object Absent extends MaskUpdate + } + final case class FiberCtx[E]( self: PureFiber[E, _], masks: List[MaskId] = Nil, - finalizers: List[PureConc[E, Unit]] = Nil) + finalizers: List[PureConc[E, Unit]] = Nil, + selfCancelationBoundary: Option[Int] = None, + finalizing: Boolean = false) type ResolvedPC[E, A] = ThreadT[IdOC[E, *], A] @@ -69,8 +99,20 @@ object pure { val back = Kleisli.ask[IdOC[E, *], FiberCtx[E]] map { ctx => val checker = ctx .self - .realizeCancelation - .ifM(ApplicativeThread[PureConc[E, *]].done, ().pure[PureConc[E, *]]) + .isFinalizing + .ifM( + ().pure[PureConc[E, *]], + ctx + .self + .hasActivePoll + .ifM( + ().pure[PureConc[E, *]], + ctx + .self + .realizeCancelationWith(ctx) + .ifM( + ApplicativeThread[PureConc[E, *]].done[Unit], + ().pure[PureConc[E, *]]))) checker >> mvarLiftF(ThreadT.liftF(ka)) } @@ -97,46 +139,61 @@ object pure { type Main[X] = MVarR[ResolvedPC[E, *], X] MVar.empty[Main, Outcome[PureConc[E, *], E, A]].flatMap { state0 => - MVar.empty[Main, Unit] flatMap { canceled0 => - val state = state0[Main] - val fiber = new PureFiber[E, A](state0, canceled0) - - val identified = canceled mapF { ta => - val fk = new (FiberR[E, *] ~> IdOC[E, *]) { - def apply[a](ke: FiberR[E, a]) = - ke.run(FiberCtx(fiber)) - } - - ta.mapK(fk) - } - - import Outcome._ - - val body = identified flatMap { a => - state.tryPut(Succeeded(a.pure[PureConc[E, *]])) - } handleErrorWith { e => state.tryPut(Errored(e)) } - - val results = state.read.flatMap { - case Canceled() => (Outcome.Canceled(): IdOC[E, A]).pure[Main] - case Errored(e) => (Outcome.Errored(e): IdOC[E, A]).pure[Main] - - case Succeeded(fa) => - val identifiedCompletion = fa.mapF { ta => - val fk = new (FiberR[E, *] ~> IdOC[E, *]) { - def apply[a](ke: FiberR[E, a]) = - ke.run(FiberCtx(fiber)) + MVar.empty[Main, CancelationSignal[E]] flatMap { canceled0 => + MVar[Main, List[MaskFrame]](Nil) flatMap { masks => + MVar[Main, List[CancelationListener[E]]](Nil) flatMap { cancelationListeners => + MVar[Main, Boolean](false) flatMap { finalizing => + MVar[Main, Int](0) flatMap { activePolls => + val state = state0[Main] + val fiber = + new PureFiber[E, A]( + state0, + canceled0, + masks, + cancelationListeners, + finalizing, + activePolls) + + val identified = canceled mapF { ta => + val fk = new (FiberR[E, *] ~> IdOC[E, *]) { + def apply[a](ke: FiberR[E, a]) = + ke.run(FiberCtx(fiber)) + } + + ta.mapK(fk) + } + + import Outcome._ + + val body = identified flatMap { a => + state.tryPut(Succeeded(a.pure[PureConc[E, *]])) + } handleErrorWith { e => state.tryPut(Errored(e)) } + + val results = state.read.flatMap { + case Canceled() => (Outcome.Canceled(): IdOC[E, A]).pure[Main] + case Errored(e) => (Outcome.Errored(e): IdOC[E, A]).pure[Main] + + case Succeeded(fa) => + val identifiedCompletion = fa.mapF { ta => + val fk = new (FiberR[E, *] ~> IdOC[E, *]) { + def apply[a](ke: FiberR[E, a]) = + ke.run(FiberCtx(fiber)) + } + + ta.mapK(fk) + } + + identifiedCompletion.map(a => Succeeded[Id, E, A](a): IdOC[E, A]) handleError { + e => Errored(e) + } + } + + Kleisli.ask[ResolvedPC[E, *], MVar.Universe].map { u => + ApplicativeThread[ResolvedPC[E, *]].start(body.run(u)) >> results.run(u) + } } - - ta.mapK(fk) - } - - identifiedCompletion.map(a => Succeeded[Id, E, A](a): IdOC[E, A]) handleError { - e => Errored(e) } - } - - Kleisli.ask[ResolvedPC[E, *], MVar.Universe].map { u => - ApplicativeThread[ResolvedPC[E, *]].start(body.run(u)) >> results.run(u) + } } } } @@ -201,7 +258,9 @@ object pure { } def canceled: PureConc[E, Unit] = - Thread.annotate("canceled")(withCtx(_.self.cancelAndRealize.ifM(Thread.done, unit))) + Thread.annotate("canceled")(withCtx { ctx => + ctx.self.cancelAndRealizeWith(ctx).ifM(Thread.done, unit) + }) def cede: PureConc[E, Unit] = Thread.cede @@ -209,7 +268,7 @@ object pure { def never[A]: PureConc[E, A] = withCtx[E, A] { ctx => // we monitor for asynchronous cancelation. if we're masked, this won't cancel and we hang - Thread.annotate("never")(ctx.self.awaitCancelation *> Thread.done) + Thread.annotate("never")(ctx.self.awaitCancelationWith(ctx) *> Thread.done) } def ref[A](a: A): PureConc[E, Ref[PureConc[E, *], A]] = @@ -218,6 +277,11 @@ object pure { def deferred[A]: PureConc[E, Deferred[PureConc[E, *], A]] = MVar.empty[PureConc[E, *], A].flatMap(mVar => Kleisli.pure(unsafeDeferred(mVar))) + private[this] def interruptible[A]( + ctx: FiberCtx[E], + fa: PureConc[E, A]): PureConc[E, A] = + ctx.self.interruptible(ctx)(fa) + private def unsafeRef[A](mVar: MVar[A]): Ref[PureConc[E, *], A] = new Ref[PureConc[E, *], A] { override def get: PureConc[E, A] = mVar.read[PureConc[E, *]] @@ -275,16 +339,7 @@ object pure { new Deferred[PureConc[E, *], A] { override def get: PureConc[E, A] = withCtx { ctx => - // we need to race cancelation against reading the mvar - // if cancelation wins, we shut down the thread - MVar.empty[PureConc[E, *], Option[A]] flatMap { signal => - val left = Thread.start(ctx.self.awaitCancelation.ifM(signal.tryPut[PureConc[E, *]](None).void, unit)) - val right = Thread.start(mVar.read[PureConc[E, *]].flatMap(a => signal.tryPut[PureConc[E, *]](Some(a)))) - - left *> - right *> - signal.read[PureConc[E, *]].flatMap(_.map(_.pure[PureConc[E, *]]).getOrElse(Thread.done)) - } + interruptible(ctx, mVar.read[PureConc[E, *]]) } override def complete(a: A): PureConc[E, Boolean] = mVar.tryPut[PureConc[E, *]](a) @@ -295,32 +350,157 @@ object pure { def start[A](fa: PureConc[E, A]): PureConc[E, Fiber[PureConc[E, *], E, A]] = Thread.annotate("start", true) { MVar.empty[PureConc[E, *], Outcome[PureConc[E, *], E, A]].flatMap { state => - MVar.empty[PureConc[E, *], Unit] flatMap { canceled => - val fiber = new PureFiber[E, A](state, canceled) - - // the tryPut here is interesting: it encodes first-wins semantics on cancelation/completion - val body = guaranteeCase(fa)(state.tryPut[PureConc[E, *]](_).void) - val identified = localCtx(FiberCtx(fiber), body) - Thread.start(identified.attempt.void).as(fiber) + MVar.empty[PureConc[E, *], CancelationSignal[E]] flatMap { canceled => + MVar[PureConc[E, *], List[MaskFrame]](Nil) flatMap { masks => + MVar[PureConc[E, *], List[CancelationListener[E]]](Nil) flatMap { cancelationListeners => + MVar[PureConc[E, *], Boolean](false) flatMap { finalizing => + MVar[PureConc[E, *], Int](0) flatMap { activePolls => + val fiber = + new PureFiber[E, A]( + state, + canceled, + masks, + cancelationListeners, + finalizing, + activePolls) + + // the tryPut here is interesting: it encodes first-wins semantics on cancelation/completion + val body = guaranteeCase(fa)(state.tryPut[PureConc[E, *]](_).void) + val identified = localCtx(FiberCtx(fiber), body) + Thread.start(identified.attempt.void).as(fiber) + } + } + } + } } } } + override def racePair[A, B](fa: PureConc[E, A], fb: PureConc[E, B]): PureConc[ + E, + Either[ + (Outcome[PureConc[E, *], E, A], Fiber[PureConc[E, *], E, B]), + (Fiber[PureConc[E, *], E, A], Outcome[PureConc[E, *], E, B])]] = + uncancelable { poll => + for { + result <- deferred[Either[Outcome[PureConc[E, *], E, A], Outcome[PureConc[E, *], E, B]]] + + fibA <- start(fa) + fibB <- start(fb) + + _ <- Thread.start( + fibA.join.flatMap(oc => + result.complete( + Left(oc): Either[ + Outcome[PureConc[E, *], E, A], + Outcome[PureConc[E, *], E, B]]).void)) + _ <- Thread.start( + fibB.join.flatMap(oc => + result.complete( + Right(oc): Either[ + Outcome[PureConc[E, *], E, A], + Outcome[PureConc[E, *], E, B]]).void)) + + back <- onCancel( + poll(result.get), + for { + canA <- start(fibA.cancel) + canB <- start(fibB.cancel) + + _ <- canA.join + _ <- canB.join + } yield ()) + } yield back match { + case Left(oc) => Left((oc, fibB)) + case Right(oc) => Right((fibA, oc)) + } + } + def uncancelable[A](body: Poll[PureConc[E, *]] => PureConc[E, A]): PureConc[E, A] = Thread.annotate("uncancelable", true) { - val mask = new MaskId + withCtx { ctx => + val mask = new MaskId + val selfCancelationBoundary = + ctx.selfCancelationBoundary.getOrElse(ctx.finalizers.length) + + val self = ctx.self + def updateMasks[B](f: List[MaskFrame] => (List[MaskFrame], B)): PureConc[E, B] = + self.masks.read[PureConc[E, *]].flatMap { ms => + val (updated, b) = f(ms) + self.masks.swap[PureConc[E, *]](updated).as(b) + } - val poll = new Poll[PureConc[E, *]] { - def apply[a](fa: PureConc[E, a]) = - withCtx { ctx => - val ctx2 = ctx.copy(masks = ctx.masks.dropWhile(mask === _)) - localCtx(ctx2, fa.attempt <* ctx.self.realizeCancelation).rethrow + val addF = updateMasks(ms => (MaskFrame(mask) :: ms, ())) + val removeF = updateMasks { + case MaskFrame(`mask`) :: ms => (ms, MaskUpdate.Removed) + case ms if ms.exists(_.id === mask) => (ms, MaskUpdate.Shadowed) + case ms => (ms, MaskUpdate.Absent) + } + + def restore(update: MaskUpdate) = + update match { + case MaskUpdate.Removed => self.exitPoll *> addF + case MaskUpdate.Shadowed | MaskUpdate.Absent => unit } - } - withCtx { ctx => - val ctx2 = ctx.copy(masks = mask :: ctx.masks) - localCtx(ctx2, body(poll)) + val poll = new Poll[PureConc[E, *]] { + def apply[a](fa: PureConc[E, a]) = + withCtx { callCtx => + if (callCtx.self eq self) + removeF.flatMap { update => + val restoreF = restore(update) + val pollCtx = update match { + case MaskUpdate.Removed => + callCtx.copy( + selfCancelationBoundary = + callCtx.selfCancelationBoundary.orElse(Some(selfCancelationBoundary))) + + case MaskUpdate.Shadowed | MaskUpdate.Absent => + callCtx + } + + val enterF = update match { + case MaskUpdate.Removed => self.enterPoll + case MaskUpdate.Shadowed | MaskUpdate.Absent => unit + } + + enterF *> + localCtx( + pollCtx, + onCancel( + self + .realizeCancelationWith(pollCtx) + .ifM( + Thread.done, + fa.attempt.flatMap { result => + self + .realizeCancelationWith(pollCtx) + .ifM( + Thread.done, + restoreF *> result.pure[PureConc[E, *]].rethrow) + }), + restoreF)) + } + else fa + } + } + + val runBody = + addF *> body(poll).attempt.flatMap { result => + removeF.flatMap { + case MaskUpdate.Removed => + val back = result.pure[PureConc[E, *]].rethrow + + self.hasActivePoll.ifM( + back, + self.realizeCancelationWith(ctx).ifM(Thread.done, back)) + + case MaskUpdate.Shadowed | MaskUpdate.Absent => + result.pure[PureConc[E, *]].rethrow + } + } + + onCancel(runBody, removeF.void) } } @@ -329,7 +509,7 @@ object pure { Defer[PureConc[E, *]].defer(pure(new Unique.Token())) def forceR[A, B](fa: PureConc[E, A])(fb: PureConc[E, B]): PureConc[E, B] = - Thread.annotate("forceR")(productR(attempt(fa))(fb)) + Thread.annotate("forceR")(productR(handleError(fa.void)(_ => ()))(fb)) def flatMap[A, B](fa: PureConc[E, A])(f: A => PureConc[E, B]): PureConc[E, B] = M.flatMap(fa)(f) @@ -379,55 +559,246 @@ object pure { // todo: MVar is not Serializable, release then update here final class PureFiber[E, A]( val state0: MVar[Outcome[PureConc[E, *], E, A]], - val canceled0: MVar[Unit]) + val canceled0: MVar[CancelationSignal[E]], + val masks: MVar[List[MaskFrame]], + val cancelationListeners: MVar[List[CancelationListener[E]]], + val finalizing: MVar[Boolean], + val activePolls: MVar[Int]) extends Fiber[PureConc[E, *], E, A] with Serializable { + def this( + state0: MVar[Outcome[PureConc[E, *], E, A]], + canceled0: MVar[CancelationSignal[E]], + masks: MVar[List[MaskFrame]]) = + this(state0, canceled0, masks, null, null, null) + + def this( + state0: MVar[Outcome[PureConc[E, *], E, A]], + canceled0: MVar[CancelationSignal[E]]) = + this(state0, canceled0, null, null, null, null) + private[this] val state = state0[PureConc[E, *]] + private[pure] val hasActivePoll: PureConc[E, Boolean] = + if (activePolls eq null) false.pure[PureConc[E, *]] + else activePolls.read[PureConc[E, *]].map(_ > 0) + + private[pure] val enterPoll: PureConc[E, Unit] = + if (activePolls eq null) ().pure[PureConc[E, *]] + else + activePolls.read[PureConc[E, *]].flatMap { n => + activePolls.swap[PureConc[E, *]](n + 1).void + } + + private[pure] val exitPoll: PureConc[E, Unit] = + if (activePolls eq null) ().pure[PureConc[E, *]] + else + activePolls.read[PureConc[E, *]].flatMap { n => + activePolls.swap[PureConc[E, *]]((n - 1) max 0).void + } + private[pure] val canceled: PureConc[E, Boolean] = canceled0.tryRead[PureConc[E, *]].map(_.as(true).getOrElse(false)) - private[pure] val realizeCancelation: PureConc[E, Boolean] = - withCtx { ctx => - val checkM = ctx.masks.isEmpty.pure[PureConc[E, *]] - - checkM.ifM( - canceled.ifM( - // if unmasked and canceled, finalize - allocateForPureConc[E] uncancelable { _ => - ctx.finalizers.sequence_.as(true) <* state0.tryPut[PureConc[E, *]]( - Outcome.Canceled()) - }, - // if unmasked but not canceled, ignore - false.pure[PureConc[E, *]] - ), + private[pure] def registerCancelationListener( + notify: PureConc[E, Unit]): PureConc[E, CancelationListenerId] = { + val id = new CancelationListenerId + + if (cancelationListeners eq null) id.pure[PureConc[E, *]] + else + cancelationListeners.read[PureConc[E, *]].flatMap { listeners => + cancelationListeners + .swap[PureConc[E, *]](CancelationListener(id, notify) :: listeners) + .as(id) + } + } + + private[pure] def removeCancelationListener(id: CancelationListenerId): PureConc[E, Unit] = + if (cancelationListeners eq null) ().pure[PureConc[E, *]] + else + cancelationListeners.read[PureConc[E, *]].flatMap { listeners => + cancelationListeners + .swap[PureConc[E, *]](listeners.filterNot(_.id === id)) + .void + } + + private[this] def notifyCancelationListeners: PureConc[E, Unit] = + if (cancelationListeners eq null) ().pure[PureConc[E, *]] + else cancelationListeners.swap[PureConc[E, *]](Nil).flatMap(_.traverse_(_.action)) + + private[pure] def interruptible[B](ctx: FiberCtx[E])( + fb: PureConc[E, B]): PureConc[E, B] = { + val Thread = ApplicativeThread[PureConc[E, *]] + + ctx.self.masks.read[PureConc[E, *]].flatMap { + case Nil => + MVar.empty[PureConc[E, *], Option[B]].flatMap { signal => + val notifyCancelation = signal.tryPut[PureConc[E, *]](None).void + + ctx.self.registerCancelationListener(notifyCancelation).flatMap { listener => + val awaitCompletion = + Thread.start(fb.flatMap(b => signal.tryPut[PureConc[E, *]](Some(b)).void)) + + val checkCancelation = + signal.tryRead[PureConc[E, *]].flatMap { + case Some(_) => ().pure[PureConc[E, *]] + case None => + ctx.self + .realizeCancelationWith(ctx) + .ifM(notifyCancelation, ().pure[PureConc[E, *]]) + } + + awaitCompletion *> + checkCancelation *> + signal.read[PureConc[E, *]].flatMap { + case Some(b) => + ctx.self.removeCancelationListener(listener).as(b) + + case None => + ctx.self.removeCancelationListener(listener) *> + ctx.self.realizeCancelationWith(ctx) *> + Thread.done + } + } + } + + case _ => + fb + } + } + + private[pure] val isFinalizing: PureConc[E, Boolean] = + if (finalizing eq null) false.pure[PureConc[E, *]] + else finalizing.read[PureConc[E, *]] + + private[this] def setFinalizing(value: Boolean): PureConc[E, Unit] = + if (finalizing eq null) ().pure[PureConc[E, *]] + else finalizing.swap[PureConc[E, *]](value).void + + private[this] def finalizeWith( + ctx: FiberCtx[E], + finalizers: List[PureConc[E, Unit]]): PureConc[E, Boolean] = + localCtx( + ctx.copy(finalizers = Nil, finalizing = true), + allocateForPureConc[E].uncancelable(_ => finalizers.sequence_) *> + (state0.tryPut[PureConc[E, *]](Outcome.Canceled()).flatMap { + case true => true.pure[PureConc[E, *]] + case false => + state.read.map { + case Outcome.Canceled() => true + case _ => false + } + } <* setFinalizing(false))) + + private[this] def cancelationFinalizers( + signal: CancelationSignal[E], + ctx: FiberCtx[E]): List[PureConc[E, Unit]] = { + signal match { + case CancelationSignal.External() => ctx.finalizers + case CancelationSignal.Self(Nil) if ctx.selfCancelationBoundary.nonEmpty => + selfCancelationFinalizers(ctx) + case CancelationSignal.Self(finalizers) => finalizers + } + } + + private[this] def selfCancelationFinalizers(ctx: FiberCtx[E]): List[PureConc[E, Unit]] = + ctx.selfCancelationBoundary match { + case Some(boundary) => ctx.finalizers.take((ctx.finalizers.length - boundary) max 0) + case None => ctx.finalizers + } + + private[this] def whileFinalizing[B](ctx: FiberCtx[E])(fb: PureConc[E, B]): PureConc[E, B] = + localCtx(ctx.copy(finalizers = Nil, finalizing = true), setFinalizing(true) *> fb) + + private[this] def finalizationOutcome: PureConc[E, Boolean] = + state.read.map { + case Outcome.Canceled() => true + case _ => false + } + + private[this] def realizeCancelationWithSignal( + ctx: FiberCtx[E], + signal: CancelationSignal[E]): PureConc[E, Boolean] = + whileFinalizing(ctx)(finalizeWith(ctx, cancelationFinalizers(signal, ctx))) + + private[pure] def realizeCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = + if (ctx.finalizing) false.pure[PureConc[E, *]] + else isFinalizing.ifM( + finalizationOutcome, + ctx.self.masks.read[PureConc[E, *]].map(_.isEmpty).ifM( + canceled0.tryRead[PureConc[E, *]].flatMap { + case Some(signal) => + // if unmasked and canceled, finalize + realizeCancelationWithSignal(ctx, signal) + + case None => + // if unmasked but not canceled, ignore + false.pure[PureConc[E, *]] + }, // if masked, ignore cancelation state but retain until unmasked false.pure[PureConc[E, *]] - ) + )) + + private[pure] val realizeCancelation: PureConc[E, Boolean] = + withCtx(realizeCancelationWith) + + private[pure] def awaitCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = { + def blocked = + MVar.empty[PureConc[E, *], Unit].flatMap(_.read[PureConc[E, *]]).as(false) + + if (ctx.finalizing) blocked + else ctx.self.masks.read[PureConc[E, *]].flatMap { + case Nil => + isFinalizing.ifM( + canceled0.tryRead[PureConc[E, *]].map(_.isEmpty), + canceled0.read[PureConc[E, *]].flatMap(realizeCancelationWithSignal(ctx, _))) + + case _ => + isFinalizing.ifM( + canceled0.tryRead[PureConc[E, *]].map(_.isEmpty), + blocked) } + } private[pure] val awaitCancelation: PureConc[E, Boolean] = - canceled0.read[PureConc[E, *]] *> realizeCancelation + withCtx(awaitCancelationWith) + + private[pure] def cancelAndRealizeWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = + if (ctx.finalizing) false.pure[PureConc[E, *]] + else isFinalizing.ifM( + ctx.self.masks.read[PureConc[E, *]].map(_.isEmpty), + ctx.self.masks.read[PureConc[E, *]].map(_.isEmpty).ifM( + whileFinalizing(ctx) { + val finalizers = selfCancelationFinalizers(ctx) + + canceled0.tryPut[PureConc[E, *]](CancelationSignal.Self(finalizers)).flatMap { + case true => notifyCancelationListeners *> finalizeWith(ctx, finalizers) + case false => + canceled0 + .tryRead[PureConc[E, *]] + .flatMap( + _.fold(finalizeWith(ctx, finalizers))(signal => + finalizeWith(ctx, cancelationFinalizers(signal, ctx)))) + } + }, + canceled0 + .tryPut[PureConc[E, *]](CancelationSignal.Self(Nil)) + .flatMap(inserted => if (inserted) notifyCancelationListeners.as(false) else false.pure[PureConc[E, *]]))) private[pure] val cancelAndRealize: PureConc[E, Boolean] = - canceled0.tryPut[PureConc[E, *]](()) *> realizeCancelation + withCtx(cancelAndRealizeWith) val join: PureConc[E, Outcome[PureConc[E, *], E, A]] = { - val Thread = ApplicativeThread[PureConc[E, *]] - withCtx { ctx => - // this is exactly like Deferred#get - MVar.empty[PureConc[E, *], Option[Outcome[PureConc[E, *], E, A]]] flatMap { signal => - // note we must read our *own* canceled, not the target fiber's - val left = Thread.start(ctx.self.awaitCancelation.ifM(signal.tryPut[PureConc[E, *]](None).void, Applicative[PureConc[E, *]].unit)) - val right = Thread.start(state.read.flatMap(oc => signal.tryPut[PureConc[E, *]](Some(oc)))) - - left *> right *> signal.read[PureConc[E, *]].flatMap(_.map(_.pure[PureConc[E, *]]).getOrElse(Thread.done)) - } + ctx.self.interruptible(ctx)(state.read) } } - val cancel: PureConc[E, Unit] = canceled0.tryPut[PureConc[E, *]](()) *> join.void + val cancel: PureConc[E, Unit] = + canceled0.tryPut[PureConc[E, *]](CancelationSignal.External()).flatMap { + case true => notifyCancelationListeners + case false => ().pure[PureConc[E, *]] + } *> join.void } } diff --git a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala index ebb8f8a304..410254de1b 100644 --- a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala +++ b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala @@ -24,8 +24,8 @@ import cats.laws.discipline.arbitrary._ import org.scalacheck.Prop import org.scalacheck.rng.Seed -import org.specs2.scalacheck.Parameters import org.specs2.mutable._ +import org.specs2.scalacheck.Parameters import org.typelevel.discipline.specs2.mutable.Discipline import scala.concurrent.duration._ @@ -184,6 +184,60 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { forked mustEqual Outcome.Succeeded(None) } + "ignore poll from another fiber" in { + val t = for { + started <- F.deferred[Unit] + polled <- F.deferred[Unit] + + parent <- F.start { + F.uncancelable { poll => + started.complete(()) *> + F.start(poll(polled.complete(()) *> F.never[Unit])).void *> + polled.get *> + F.never[Unit] + } + } + + _ <- started.get + _ <- polled.get + _ <- parent.cancel + } yield () + + pure.run(t) mustEqual Outcome.Succeeded(None) + } + + "run finalizers around a self-canceling polled region" in { + val t = for { + finalized <- F.ref(0) + fiber <- F.start { + F.uncancelable { poll => + F.onCancel(poll(F.canceled), finalized.update(_ + 1)) + } + } + _ <- fiber.join + back <- finalized.get + } yield back + + pure.run(t) mustEqual Outcome.Succeeded(Some(1)) + } + + "observe pending self-cancel before running a polled region" in { + val t = for { + finalized <- F.ref(0) + ran <- F.ref(false) + fiber <- F.start { + F.uncancelable { poll => + F.canceled *> F.onCancel(poll(ran.set(true)), finalized.update(_ + 1)) + } + } + _ <- fiber.join + fin <- finalized.get + body <- ran.get + } yield (fin, body) + + pure.run(t) mustEqual Outcome.Succeeded(Some((1, false))) + } + "implement locals via Kleisli and FreeT" in { import cats.{~>, Eval, Id} import cats.data.Kleisli @@ -207,15 +261,13 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { val _ = run(1) { withLocal(42) { read { i => - FreeT.liftT[Id, Kleisli[Eval, Int, *], Unit](Kleisli.liftF[Eval, Int, Unit](Eval.later({i mustEqual 42; ()}))).flatMap(_ => - read { i2 => - FreeT.liftT(Kleisli.liftF(Eval.later(i2 mustEqual 42))) - } - ) + FreeT + .liftT[Id, Kleisli[Eval, Int, *], Unit]( + Kleisli.liftF[Eval, Int, Unit](Eval.later { i mustEqual 42; () })) + .flatMap(_ => + read { i2 => FreeT.liftT(Kleisli.liftF(Eval.later(i2 mustEqual 42))) }) } - } *> read { i => - FreeT.liftT(Kleisli.liftF(Eval.later(i mustEqual 1))) - } + } *> read { i => FreeT.liftT(Kleisli.liftF(Eval.later(i mustEqual 1))) } } ok From d7dbb4680d126fe5bea7b74d01dedcd967a6665b Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Mon, 22 Jun 2026 00:13:25 -0500 Subject: [PATCH 4/6] Fix PureConc cancellation laws --- .../kernel/testkit/PureConcGenerators.scala | 4 +- .../cats/effect/kernel/testkit/TimeT.scala | 87 ++++++- .../cats/effect/kernel/testkit/pure.scala | 242 +++++++++++++----- .../scala/cats/effect/laws/PureConcSpec.scala | 157 +++++++++++- 4 files changed, 412 insertions(+), 78 deletions(-) diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/PureConcGenerators.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/PureConcGenerators.scala index c6162d20d4..55ffa373d9 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/PureConcGenerators.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/PureConcGenerators.scala @@ -43,8 +43,8 @@ object PureConcGenerators { super .recursiveGen[B](deeper) .filterNot( - _._1 == "racePair" - ) // remove the racePair generator since it reifies nondeterminism, which cannot be law-tested + gen => gen._1 == "racePair" || gen._1 == "join" + ) // remove generators which reify nondeterminism and cannot be law-tested } implicit def arbitraryPureConc[E: Arbitrary: Cogen, A: Arbitrary: Cogen] diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TimeT.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TimeT.scala index a74a18e241..e7e5820529 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TimeT.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TimeT.scala @@ -18,7 +18,7 @@ package cats.effect package kernel package testkit -import cats.{~>, Group, Monad, Monoid, Order} +import cats.{~>, Eq, Group, Monad, Monoid, Order} import cats.data.Kleisli import cats.syntax.all._ @@ -90,6 +90,9 @@ private[effect] object TimeT { a.map(_.inverse()) } + implicit def eqTimeT[F[_], A](implicit FA: Eq[F[A]]): Eq[TimeT[F, A]] = + Eq.by(TimeT.run) + implicit def orderTimeT[F[_], A](implicit FA: Order[F[A]]): Order[TimeT[F, A]] = Order.by(TimeT.run) @@ -111,15 +114,70 @@ private[effect] object TimeT { val forkA = time.fork() val forkB = time.fork() - // TODO this doesn't work (yet) because we need to force the "faster" effect to win the race, which right now isn't happening - F.racePair(fa.run(forkA), fb.run(forkB)).map { + def liftOutcome[C](oc: Outcome[F, E, C]): Outcome[TimeT[F, *], E, C] = + oc.mapK(TimeT.liftK[F]) + + F.racePair(fa.run(forkA), fb.run(forkB)).flatMap { case Left((oca, delegate)) => - time.now = forkA.now - Left((oca.mapK(TimeT.liftK[F]), fiberize(forkB, delegate))) + F.onCancel(F.race(delegate.join, F.cede), delegate.cancel).map { + case Left(ocb) if forkB.now < forkA.now => + time.now = forkB.now + Right((completedFiber(forkA, liftOutcome(oca)), liftOutcome(ocb))) + + case _ => + time.now = forkA.now + Left((liftOutcome(oca), fiberize(forkB, delegate))) + } case Right((delegate, ocb)) => - time.now = forkB.now - Right((fiberize(forkA, delegate), ocb.mapK(TimeT.liftK[F]))) + F.onCancel(F.race(delegate.join, F.cede), delegate.cancel).map { + case Left(oca) if forkA.now < forkB.now => + time.now = forkA.now + Left((liftOutcome(oca), completedFiber(forkB, liftOutcome(ocb)))) + + case _ => + time.now = forkB.now + Right((fiberize(forkA, delegate), liftOutcome(ocb))) + } + } + } + + override def race[A, B](fa: TimeT[F, A], fb: TimeT[F, B]): TimeT[F, Either[A, B]] = + uncancelable { poll => + poll(racePair(fa, fb)).flatMap { + case Left((oc, f)) => + oc match { + case Outcome.Succeeded(fa) => f.cancel *> fa.map(Left(_)) + case Outcome.Errored(ea) => f.cancel *> raiseError(ea) + case Outcome.Canceled() => + f.cancel *> poll(f.join) flatMap { + case Outcome.Succeeded(fb) => fb.map(Right(_)) + case Outcome.Errored(eb) => raiseError(eb) + case Outcome.Canceled() => poll(canceled) *> never + } + } + + case Right((f, oc)) => + oc match { + case Outcome.Succeeded(fb) => f.cancel *> fb.map(Right(_)) + case Outcome.Errored(eb) => f.cancel *> raiseError(eb) + case Outcome.Canceled() => + f.cancel *> poll(f.join) flatMap { + case Outcome.Succeeded(fa) => fa.map(Left(_)) + case Outcome.Errored(ea) => raiseError(ea) + case Outcome.Canceled() => poll(canceled) *> never + } + } + } + } + + override def raceOutcome[A, B](fa: TimeT[F, A], fb: TimeT[F, B]): TimeT[ + F, + Either[Outcome[TimeT[F, *], E, A], Outcome[TimeT[F, *], E, B]]] = + uncancelable { poll => + poll(racePair(fa, fb)).flatMap { + case Left((oc, f)) => f.cancel.as(Left(oc)) + case Right((f, oc)) => f.cancel.as(Right(oc)) } } @@ -156,5 +214,20 @@ private[effect] object TimeT { } } } + + private[this] def completedFiber[A]( + forked: Time, + outcome: Outcome[TimeT[F, *], E, A]): Fiber[TimeT[F, *], E, A] = + new Fiber[TimeT[F, *], E, A] { + + val cancel = + unit + + val join = + Kleisli { outerTime => + outerTime.now = outerTime.now.max(forked.now) + F.pure(outcome) + } + } } } diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala index b51164d97a..f35c1dd5df 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala @@ -69,12 +69,112 @@ object pure { case object Absent extends MaskUpdate } - final case class FiberCtx[E]( - self: PureFiber[E, _], - masks: List[MaskId] = Nil, - finalizers: List[PureConc[E, Unit]] = Nil, - selfCancelationBoundary: Option[Int] = None, - finalizing: Boolean = false) + final class FiberCtx[E] private[pure] ( + val self: PureFiber[E, _], + val masks: List[MaskId], + val finalizers: List[PureConc[E, Unit]], + private[pure] val selfCancelationBoundary: Option[Int], + private[pure] val finalizing: Boolean) + extends Product3[PureFiber[E, _], List[MaskId], List[PureConc[E, Unit]]] + with Serializable { + + def this( + self: PureFiber[E, _], + masks: List[MaskId], + finalizers: List[PureConc[E, Unit]]) = + this(self, masks, finalizers, None, false) + + def this(self: PureFiber[E, _], masks: List[MaskId]) = + this(self, masks, Nil) + + def this(self: PureFiber[E, _]) = + this(self, Nil, Nil) + + def copy( + self: PureFiber[E, _] = this.self, + masks: List[MaskId] = this.masks, + finalizers: List[PureConc[E, Unit]] = this.finalizers): FiberCtx[E] = + FiberCtx.internal(self, masks, finalizers, selfCancelationBoundary, finalizing) + + private[pure] def withSelfCancelationBoundary(boundary: Option[Int]): FiberCtx[E] = + FiberCtx.internal(self, masks, finalizers, boundary, finalizing) + + private[pure] def withFinalizing(value: Boolean): FiberCtx[E] = + FiberCtx.internal(self, masks, finalizers, selfCancelationBoundary, value) + + def _1: PureFiber[E, _] = self + + def _2: List[MaskId] = masks + + def _3: List[PureConc[E, Unit]] = finalizers + + override def canEqual(that: Any): Boolean = + that.isInstanceOf[FiberCtx[_]] + + override def productArity: Int = 3 + + override def productElement(n: Int): Any = + n match { + case 0 => self + case 1 => masks + case 2 => finalizers + case _ => throw new IndexOutOfBoundsException(n.toString) + } + + override def productPrefix: String = "FiberCtx" + + override def equals(that: Any): Boolean = + that match { + case that: FiberCtx[_] => + that.canEqual(this) && + self == that.self && + masks == that.masks && + finalizers == that.finalizers + case _ => + false + } + + override def hashCode: Int = + (self, masks, finalizers).## + + override def toString: String = + s"FiberCtx($self,$masks,$finalizers)" + } + + object FiberCtx { + def $lessinit$greater$default$2[E]: List[MaskId] = + Nil + + def $lessinit$greater$default$3[E]: Nil.type = + Nil + + def apply[E]( + self: PureFiber[E, _], + masks: List[MaskId] = Nil, + finalizers: List[PureConc[E, Unit]] = Nil): FiberCtx[E] = + new FiberCtx(self, masks, finalizers, None, false) + + def tupled[E] + : ((PureFiber[E, _], List[MaskId], List[PureConc[E, Unit]])) => FiberCtx[E] = { + case (self, masks, finalizers) => apply(self, masks, finalizers) + } + + def curried[E] + : PureFiber[E, _] => List[MaskId] => List[PureConc[E, Unit]] => FiberCtx[E] = + self => masks => finalizers => apply(self, masks, finalizers) + + private[pure] def internal[E]( + self: PureFiber[E, _], + masks: List[MaskId], + finalizers: List[PureConc[E, Unit]], + selfCancelationBoundary: Option[Int], + finalizing: Boolean): FiberCtx[E] = + new FiberCtx(self, masks, finalizers, selfCancelationBoundary, finalizing) + + def unapply[E]( + ctx: FiberCtx[E]): Option[(PureFiber[E, _], List[MaskId], List[PureConc[E, Unit]])] = + Some((ctx.self, ctx.masks, ctx.finalizers)) + } type ResolvedPC[E, A] = ThreadT[IdOC[E, *], A] @@ -106,7 +206,12 @@ object pure { .self .hasActivePoll .ifM( - ().pure[PureConc[E, *]], + ctx + .self + .realizeExternalCancelationWith(ctx) + .ifM( + ApplicativeThread[PureConc[E, *]].done[Unit], + ().pure[PureConc[E, *]]), ctx .self .realizeCancelationWith(ctx) @@ -263,7 +368,10 @@ object pure { }) def cede: PureConc[E, Unit] = - Thread.cede + withCtx { ctx => + Thread.cede *> + ctx.self.realizeExternalCancelationWith(ctx).ifM(Thread.done, unit) + } def never[A]: PureConc[E, A] = withCtx[E, A] { ctx => @@ -388,13 +496,13 @@ object pure { fibA <- start(fa) fibB <- start(fb) - _ <- Thread.start( + _ <- start( fibA.join.flatMap(oc => result.complete( Left(oc): Either[ Outcome[PureConc[E, *], E, A], Outcome[PureConc[E, *], E, B]]).void)) - _ <- Thread.start( + _ <- start( fibB.join.flatMap(oc => result.complete( Right(oc): Either[ @@ -451,9 +559,8 @@ object pure { val restoreF = restore(update) val pollCtx = update match { case MaskUpdate.Removed => - callCtx.copy( - selfCancelationBoundary = - callCtx.selfCancelationBoundary.orElse(Some(selfCancelationBoundary))) + callCtx.withSelfCancelationBoundary( + callCtx.selfCancelationBoundary.orElse(Some(selfCancelationBoundary))) case MaskUpdate.Shadowed | MaskUpdate.Absent => callCtx @@ -492,7 +599,7 @@ object pure { val back = result.pure[PureConc[E, *]].rethrow self.hasActivePoll.ifM( - back, + self.realizeSelfCancelationWith(ctx).ifM(Thread.done, back), self.realizeCancelationWith(ctx).ifM(Thread.done, back)) case MaskUpdate.Shadowed | MaskUpdate.Absent => @@ -559,27 +666,23 @@ object pure { // todo: MVar is not Serializable, release then update here final class PureFiber[E, A]( val state0: MVar[Outcome[PureConc[E, *], E, A]], - val canceled0: MVar[CancelationSignal[E]], - val masks: MVar[List[MaskFrame]], - val cancelationListeners: MVar[List[CancelationListener[E]]], - val finalizing: MVar[Boolean], - val activePolls: MVar[Int]) + private[this] val canceled0: MVar[CancelationSignal[E]], + private[pure] val masks: MVar[List[MaskFrame]], + private[this] val cancelationListeners: MVar[List[CancelationListener[E]]], + private[this] val finalizing: MVar[Boolean], + private[this] val activePolls: MVar[Int]) extends Fiber[PureConc[E, *], E, A] with Serializable { - def this( - state0: MVar[Outcome[PureConc[E, *], E, A]], - canceled0: MVar[CancelationSignal[E]], - masks: MVar[List[MaskFrame]]) = - this(state0, canceled0, masks, null, null, null) - - def this( - state0: MVar[Outcome[PureConc[E, *], E, A]], - canceled0: MVar[CancelationSignal[E]]) = - this(state0, canceled0, null, null, null, null) + def this(state0: MVar[Outcome[PureConc[E, *], E, A]]) = + this(state0, null, null, null, null, null) private[this] val state = state0[PureConc[E, *]] + private[pure] val currentMasks: PureConc[E, List[MaskFrame]] = + if (masks eq null) List.empty[MaskFrame].pure[PureConc[E, *]] + else masks.read[PureConc[E, *]] + private[pure] val hasActivePoll: PureConc[E, Boolean] = if (activePolls eq null) false.pure[PureConc[E, *]] else activePolls.read[PureConc[E, *]].map(_ > 0) @@ -598,9 +701,6 @@ object pure { activePolls.swap[PureConc[E, *]]((n - 1) max 0).void } - private[pure] val canceled: PureConc[E, Boolean] = - canceled0.tryRead[PureConc[E, *]].map(_.as(true).getOrElse(false)) - private[pure] def registerCancelationListener( notify: PureConc[E, Unit]): PureConc[E, CancelationListenerId] = { val id = new CancelationListenerId @@ -631,7 +731,7 @@ object pure { fb: PureConc[E, B]): PureConc[E, B] = { val Thread = ApplicativeThread[PureConc[E, *]] - ctx.self.masks.read[PureConc[E, *]].flatMap { + ctx.self.currentMasks.flatMap { case Nil => MVar.empty[PureConc[E, *], Option[B]].flatMap { signal => val notifyCancelation = signal.tryPut[PureConc[E, *]](None).void @@ -680,7 +780,7 @@ object pure { ctx: FiberCtx[E], finalizers: List[PureConc[E, Unit]]): PureConc[E, Boolean] = localCtx( - ctx.copy(finalizers = Nil, finalizing = true), + ctx.copy(finalizers = Nil).withFinalizing(true), allocateForPureConc[E].uncancelable(_ => finalizers.sequence_) *> (state0.tryPut[PureConc[E, *]](Outcome.Canceled()).flatMap { case true => true.pure[PureConc[E, *]] @@ -709,7 +809,7 @@ object pure { } private[this] def whileFinalizing[B](ctx: FiberCtx[E])(fb: PureConc[E, B]): PureConc[E, B] = - localCtx(ctx.copy(finalizers = Nil, finalizing = true), setFinalizing(true) *> fb) + localCtx(ctx.copy(finalizers = Nil).withFinalizing(true), setFinalizing(true) *> fb) private[this] def finalizationOutcome: PureConc[E, Boolean] = state.read.map { @@ -722,33 +822,43 @@ object pure { signal: CancelationSignal[E]): PureConc[E, Boolean] = whileFinalizing(ctx)(finalizeWith(ctx, cancelationFinalizers(signal, ctx))) - private[pure] def realizeCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = + private[this] def realizeCancelationIf(ctx: FiberCtx[E])( + accepts: CancelationSignal[E] => Boolean): PureConc[E, Boolean] = if (ctx.finalizing) false.pure[PureConc[E, *]] else isFinalizing.ifM( finalizationOutcome, - ctx.self.masks.read[PureConc[E, *]].map(_.isEmpty).ifM( + ctx.self.currentMasks.map(_.isEmpty).ifM( canceled0.tryRead[PureConc[E, *]].flatMap { - case Some(signal) => - // if unmasked and canceled, finalize + case Some(signal) if accepts(signal) => realizeCancelationWithSignal(ctx, signal) - case None => - // if unmasked but not canceled, ignore + case Some(_) | None => false.pure[PureConc[E, *]] }, - // if masked, ignore cancelation state but retain until unmasked false.pure[PureConc[E, *]] )) - private[pure] val realizeCancelation: PureConc[E, Boolean] = - withCtx(realizeCancelationWith) + private[pure] def realizeExternalCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = + realizeCancelationIf(ctx) { + case CancelationSignal.External() => true + case _ => false + } + + private[pure] def realizeSelfCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = + realizeCancelationIf(ctx) { + case CancelationSignal.Self(_) => true + case _ => false + } + + private[pure] def realizeCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = + realizeCancelationIf(ctx)((_: CancelationSignal[E]) => true) private[pure] def awaitCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = { def blocked = MVar.empty[PureConc[E, *], Unit].flatMap(_.read[PureConc[E, *]]).as(false) if (ctx.finalizing) blocked - else ctx.self.masks.read[PureConc[E, *]].flatMap { + else ctx.self.currentMasks.flatMap { case Nil => isFinalizing.ifM( canceled0.tryRead[PureConc[E, *]].map(_.isEmpty), @@ -761,18 +871,17 @@ object pure { } } - private[pure] val awaitCancelation: PureConc[E, Boolean] = - withCtx(awaitCancelationWith) - private[pure] def cancelAndRealizeWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = if (ctx.finalizing) false.pure[PureConc[E, *]] else isFinalizing.ifM( - ctx.self.masks.read[PureConc[E, *]].map(_.isEmpty), - ctx.self.masks.read[PureConc[E, *]].map(_.isEmpty).ifM( + ctx.self.currentMasks.map(_.isEmpty), + ctx.self.currentMasks.map(_.isEmpty).ifM( whileFinalizing(ctx) { - val finalizers = selfCancelationFinalizers(ctx) + val finalizers = + selfCancelationFinalizers(ctx) + val selfCancelation = CancelationSignal.Self(finalizers) - canceled0.tryPut[PureConc[E, *]](CancelationSignal.Self(finalizers)).flatMap { + canceled0.tryPut[PureConc[E, *]](selfCancelation).flatMap { case true => notifyCancelationListeners *> finalizeWith(ctx, finalizers) case false => canceled0 @@ -786,19 +895,26 @@ object pure { .tryPut[PureConc[E, *]](CancelationSignal.Self(Nil)) .flatMap(inserted => if (inserted) notifyCancelationListeners.as(false) else false.pure[PureConc[E, *]]))) - private[pure] val cancelAndRealize: PureConc[E, Boolean] = - withCtx(cancelAndRealizeWith) - - val join: PureConc[E, Outcome[PureConc[E, *], E, A]] = { - withCtx { ctx => - ctx.self.interruptible(ctx)(state.read) + val join: PureConc[E, Outcome[PureConc[E, *], E, A]] = + if (canceled0 eq null) state.read + else { + withCtx { ctx => + ctx.self.interruptible(ctx)(state.read) + } } - } val cancel: PureConc[E, Unit] = - canceled0.tryPut[PureConc[E, *]](CancelationSignal.External()).flatMap { - case true => notifyCancelationListeners - case false => ().pure[PureConc[E, *]] - } *> join.void + if (canceled0 eq null) state.tryPut(Outcome.Canceled()).void + else + allocateForPureConc[E].uncancelable { _ => + state.tryRead.flatMap { + case Some(_) => ().pure[PureConc[E, *]] + case None => + canceled0.tryPut[PureConc[E, *]](CancelationSignal.External()).flatMap { + case true => notifyCancelationListeners + case false => ().pure[PureConc[E, *]] + } *> state.read.void + } + } } } diff --git a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala index 410254de1b..faedb75a67 100644 --- a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala +++ b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala @@ -17,26 +17,39 @@ package cats.effect package laws +import cats.{Eq, Order} import cats.effect.kernel.testkit.{pure, OutcomeGenerators, PureConcGenerators, TimeT} -import cats.effect.kernel.testkit.TimeT._ +import cats.effect.kernel.testkit.TimeT.{eqTimeT => _, orderTimeT => _, _} import cats.effect.kernel.testkit.pure._ import cats.laws.discipline.arbitrary._ import org.scalacheck.Prop -import org.scalacheck.rng.Seed import org.specs2.mutable._ -import org.specs2.scalacheck.Parameters import org.typelevel.discipline.specs2.mutable.Discipline import scala.concurrent.duration._ -class PureConcSpec extends Specification with Discipline with BaseSpec { +private[laws] trait PureConcSpecLowPriorityTimeTInstances { + implicit def orderTimeTPureConcFiniteDuration( + implicit FA: Order[PureConc[Int, FiniteDuration]]): Order[ + TimeT[PureConc[Int, *], FiniteDuration]] = + TimeT.orderTimeT +} + +class PureConcSpec + extends Specification + with Discipline + with PureConcSpecLowPriorityTimeTInstances { import PureConcGenerators._ import OutcomeGenerators._ implicit def exec(fb: TimeT[PureConc[Int, *], Boolean]): Prop = Prop(pure.run(TimeT.run(fb)).fold(false, _ => false, _.getOrElse(false))) + implicit def eqTimeTPureConc[A](implicit FA: Eq[PureConc[Int, A]]): Eq[ + TimeT[PureConc[Int, *], A]] = + TimeT.eqTimeT + "parallel utilities" should { import cats.effect.kernel.{GenConcurrent, Outcome} import cats.effect.kernel.implicits._ @@ -80,7 +93,7 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { } "core PC state machine" should { - import cats.effect.kernel.{GenConcurrent, Outcome} + import cats.effect.kernel.{GenConcurrent, GenTemporal, Outcome} import cats.effect.kernel.implicits._ import cats.syntax.all._ @@ -163,6 +176,22 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { pure.run(t) mustEqual Outcome.Succeeded(None) } + "hang when canceling fiber blocked on cancel finalization" in { + val t = for { + targetStarted <- F.deferred[Unit] + finalizerStarted <- F.deferred[Unit] + target <- F.start( + (targetStarted.complete(()) *> F.never[Unit]) + .onCancel(finalizerStarted.complete(()) *> F.never[Unit])) + _ <- targetStarted.get + canceler <- F.start(target.cancel) + _ <- finalizerStarted.get + _ <- canceler.cancel + } yield () + + pure.run(t) mustEqual Outcome.Succeeded(None) + } + "run finalizers in order" in { val t = for { results <- F.ref[String]("") @@ -176,6 +205,36 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { pure.run(t) mustEqual Outcome.Succeeded(Some("AB")) } + "ignore cancelation of a fiber after racePair has completed" in { + val t = for { + finalized <- F.ref(0) + fiber <- F.start { + F.racePair(F.unit, F.never[Unit]).void.onCancel(finalized.update(_ + 1)) + } + _ <- fiber.join + _ <- fiber.cancel + _ <- F.cede + back <- finalized.get + } yield back + + pure.run(t) mustEqual Outcome.Succeeded(Some(0)) + } + + "ignore cancelation of a fiber after race has completed" in { + val t = for { + finalized <- F.ref(0) + fiber <- F.start { + F.race(F.unit, F.never[Unit]).void.onCancel(finalized.update(_ + 1)) + } + _ <- fiber.join + _ <- fiber.cancel + _ <- F.cede + back <- finalized.get + } yield back + + pure.run(t) mustEqual Outcome.Succeeded(Some(0)) + } + "correctly interpret uncancelable cancelation followed by suspension" in { val t = F.uncancelable(_ => F.canceled *> F.never[Unit]) pure.run(t) mustEqual Outcome.Succeeded(None) @@ -206,6 +265,27 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { pure.run(t) mustEqual Outcome.Succeeded(None) } + "observe external cancelation while blocked inside poll" in { + val t = for { + started <- F.deferred[Unit] + polled <- F.deferred[Unit] + gate <- F.deferred[Unit] + ran <- F.ref(false) + fiber <- F.start { + F.uncancelable { poll => + started.complete(()) *> + poll(polled.complete(()) *> gate.get *> ran.set(true)) + } + } + canceler <- F.start(polled.get *> fiber.cancel) + _ <- started.get + _ <- canceler.join + back <- ran.get + } yield back + + pure.run(t) mustEqual Outcome.Succeeded(Some(false)) + } + "run finalizers around a self-canceling polled region" in { val t = for { finalized <- F.ref(0) @@ -221,6 +301,23 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { pure.run(t) mustEqual Outcome.Succeeded(Some(1)) } + "run outer finalizers around a self-canceling polled region" in { + val t = for { + finalized <- F.ref(0) + fiber <- F.start { + F.onCancel( + F.uncancelable { poll => + poll(F.canceled) + }, + finalized.update(_ + 1)) + } + _ <- fiber.join + back <- finalized.get + } yield back + + pure.run(t) mustEqual Outcome.Succeeded(Some(1)) + } + "observe pending self-cancel before running a polled region" in { val t = for { finalized <- F.ref(0) @@ -238,6 +335,23 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { pure.run(t) mustEqual Outcome.Succeeded(Some((1, false))) } + "observe nested self-cancel inside a polled region before continuing" in { + val t = for { + ran <- F.ref(false) + fiber <- F.start { + F.uncancelable { poll => + poll { + F.uncancelable(_ => F.canceled) *> ran.set(true) + } + } + } + _ <- fiber.join + back <- ran.get + } yield back + + pure.run(t) mustEqual Outcome.Succeeded(Some(false)) + } + "implement locals via Kleisli and FreeT" in { import cats.{~>, Eval, Id} import cats.data.Kleisli @@ -272,10 +386,41 @@ class PureConcSpec extends Specification with Discipline with BaseSpec { ok } + + "race TimeT values against never" in { + type T[A] = TimeT[F, A] + val T = GenTemporal[T, Int] + + pure.run(TimeT.run(T.race(T.pure(1), T.never[Unit]))) mustEqual Outcome.Succeeded( + Some(Left(1))) + pure.run(TimeT.run(T.race(T.never[Unit], T.pure(1)))) mustEqual Outcome.Succeeded( + Some(Right(1))) + pure.run(TimeT.run(T.race(T.sleep(1.second).as(1), T.never[Unit]))) mustEqual Outcome + .Succeeded(Some(Left(1))) + pure.run(TimeT.run(T.race(T.never[Unit], T.sleep(1.second).as(1)))) mustEqual Outcome + .Succeeded(Some(Right(1))) + pure.run(TimeT.run(T.race(T.sleep(2.seconds).as("slow"), T.sleep(1.second).as("fast")))) mustEqual + Outcome.Succeeded(Some(Right("fast"))) + pure.run(TimeT.run(T.race(T.sleep(1.second).as("fast"), T.sleep(2.seconds).as("slow")))) mustEqual + Outcome.Succeeded(Some(Left("fast"))) + pure.run(TimeT.run(T.race(T.canceled, T.never[Unit]).void)) mustEqual Outcome.Canceled() + pure.run(TimeT.run(T.race(T.never[Unit], T.canceled).void)) mustEqual Outcome.Canceled() + pure.run( + TimeT.run(T.race(TimeT.liftF(F.uncancelable(_ => F.canceled.as(1))), T.never[Unit]))) mustEqual + Outcome.Canceled() + pure.run( + TimeT.run(T.race(T.never[Unit], TimeT.liftF(F.uncancelable(_ => F.canceled.as(1)))))) mustEqual + Outcome.Canceled() + pure.run(TimeT.run(T.race(TimeT.liftF(F.start(F.unit).flatMap(_.join).as(1)), T.never[Unit]))) mustEqual + Outcome.Succeeded(Some(Left(1))) + pure.run(TimeT.run(T.race(T.never[Unit], TimeT.liftF(F.start(F.unit).flatMap(_.join).as(1))))) mustEqual + Outcome.Succeeded(Some(Right(1))) + } + } checkAll( "TimeT[PureConc]", GenTemporalTests[TimeT[PureConc[Int, *], *], Int].temporal[Int, Int, Int](10.millis) - )(Parameters(seed = Seed.fromBase64("ogn64yom4GXCEX0mXdqSfsqSeJxI2RbPUFC5YkvDtzD=").toOption)) + ) } From 6b2193b80eb40404ef4aef02ce45ca8963334ce7 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Mon, 22 Jun 2026 10:36:37 -0500 Subject: [PATCH 5/6] Re-unified external and self-cancelation --- .../cats/effect/kernel/testkit/pure.scala | 133 +++++++----------- .../scala/cats/effect/laws/PureConcSpec.scala | 17 +++ 2 files changed, 69 insertions(+), 81 deletions(-) diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala index f35c1dd5df..ea82567f7a 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala @@ -43,12 +43,9 @@ object pure { private[pure] final case class MaskFrame(id: MaskId) - private[pure] sealed trait CancelationSignal[E] - - private[pure] object CancelationSignal { - final case class External[E]() extends CancelationSignal[E] - final case class Self[E](finalizers: List[PureConc[E, Unit]]) extends CancelationSignal[E] - } + // None defers finalizer selection until observation; Some scopes an in-fiber request. + private[pure] final case class CancelationSignal[E]( + finalizers: Option[List[PureConc[E, Unit]]]) private[pure] final class CancelationListenerId @@ -199,19 +196,14 @@ object pure { val back = Kleisli.ask[IdOC[E, *], FiberCtx[E]] map { ctx => val checker = ctx .self - .isFinalizing + .hasActivePoll .ifM( ().pure[PureConc[E, *]], ctx .self - .hasActivePoll + .isFinalizing .ifM( - ctx - .self - .realizeExternalCancelationWith(ctx) - .ifM( - ApplicativeThread[PureConc[E, *]].done[Unit], - ().pure[PureConc[E, *]]), + ().pure[PureConc[E, *]], ctx .self .realizeCancelationWith(ctx) @@ -291,7 +283,7 @@ object pure { identifiedCompletion.map(a => Succeeded[Id, E, A](a): IdOC[E, A]) handleError { e => Errored(e) } - } + } Kleisli.ask[ResolvedPC[E, *], MVar.Universe].map { u => ApplicativeThread[ResolvedPC[E, *]].start(body.run(u)) >> results.run(u) @@ -370,7 +362,7 @@ object pure { def cede: PureConc[E, Unit] = withCtx { ctx => Thread.cede *> - ctx.self.realizeExternalCancelationWith(ctx).ifM(Thread.done, unit) + ctx.self.realizeCancelationWith(ctx).ifM(Thread.done, unit) } def never[A]: PureConc[E, A] = @@ -598,9 +590,7 @@ object pure { case MaskUpdate.Removed => val back = result.pure[PureConc[E, *]].rethrow - self.hasActivePoll.ifM( - self.realizeSelfCancelationWith(ctx).ifM(Thread.done, back), - self.realizeCancelationWith(ctx).ifM(Thread.done, back)) + self.realizeCancelationWith(ctx).ifM(Thread.done, back) case MaskUpdate.Shadowed | MaskUpdate.Absent => result.pure[PureConc[E, *]].rethrow @@ -791,23 +781,6 @@ object pure { } } <* setFinalizing(false))) - private[this] def cancelationFinalizers( - signal: CancelationSignal[E], - ctx: FiberCtx[E]): List[PureConc[E, Unit]] = { - signal match { - case CancelationSignal.External() => ctx.finalizers - case CancelationSignal.Self(Nil) if ctx.selfCancelationBoundary.nonEmpty => - selfCancelationFinalizers(ctx) - case CancelationSignal.Self(finalizers) => finalizers - } - } - - private[this] def selfCancelationFinalizers(ctx: FiberCtx[E]): List[PureConc[E, Unit]] = - ctx.selfCancelationBoundary match { - case Some(boundary) => ctx.finalizers.take((ctx.finalizers.length - boundary) max 0) - case None => ctx.finalizers - } - private[this] def whileFinalizing[B](ctx: FiberCtx[E])(fb: PureConc[E, B]): PureConc[E, B] = localCtx(ctx.copy(finalizers = Nil).withFinalizing(true), setFinalizing(true) *> fb) @@ -817,42 +790,42 @@ object pure { case _ => false } + private[this] def cancelationFinalizers( + signal: CancelationSignal[E], + ctx: FiberCtx[E]): List[PureConc[E, Unit]] = + signal.finalizers match { + case None => ctx.finalizers + case Some(Nil) if ctx.selfCancelationBoundary.nonEmpty => + cancelationBoundaryFinalizers(ctx) + case Some(finalizers) => finalizers + } + private[this] def realizeCancelationWithSignal( ctx: FiberCtx[E], signal: CancelationSignal[E]): PureConc[E, Boolean] = - whileFinalizing(ctx)(finalizeWith(ctx, cancelationFinalizers(signal, ctx))) - - private[this] def realizeCancelationIf(ctx: FiberCtx[E])( - accepts: CancelationSignal[E] => Boolean): PureConc[E, Boolean] = if (ctx.finalizing) false.pure[PureConc[E, *]] else isFinalizing.ifM( finalizationOutcome, ctx.self.currentMasks.map(_.isEmpty).ifM( - canceled0.tryRead[PureConc[E, *]].flatMap { - case Some(signal) if accepts(signal) => - realizeCancelationWithSignal(ctx, signal) - - case Some(_) | None => - false.pure[PureConc[E, *]] - }, + whileFinalizing(ctx)(finalizeWith(ctx, cancelationFinalizers(signal, ctx))), false.pure[PureConc[E, *]] )) - private[pure] def realizeExternalCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = - realizeCancelationIf(ctx) { - case CancelationSignal.External() => true - case _ => false - } + private[pure] def realizeCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = + if (ctx.finalizing) false.pure[PureConc[E, *]] + else isFinalizing.ifM( + finalizationOutcome, + canceled0.tryRead[PureConc[E, *]].flatMap { + case Some(signal) => realizeCancelationWithSignal(ctx, signal) + case None => false.pure[PureConc[E, *]] + }) - private[pure] def realizeSelfCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = - realizeCancelationIf(ctx) { - case CancelationSignal.Self(_) => true - case _ => false + private[this] def cancelationBoundaryFinalizers(ctx: FiberCtx[E]): List[PureConc[E, Unit]] = + ctx.selfCancelationBoundary match { + case Some(boundary) => ctx.finalizers.take((ctx.finalizers.length - boundary) max 0) + case None => Nil } - private[pure] def realizeCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = - realizeCancelationIf(ctx)((_: CancelationSignal[E]) => true) - private[pure] def awaitCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = { def blocked = MVar.empty[PureConc[E, *], Unit].flatMap(_.read[PureConc[E, *]]).as(false) @@ -875,25 +848,26 @@ object pure { if (ctx.finalizing) false.pure[PureConc[E, *]] else isFinalizing.ifM( ctx.self.currentMasks.map(_.isEmpty), - ctx.self.currentMasks.map(_.isEmpty).ifM( - whileFinalizing(ctx) { - val finalizers = - selfCancelationFinalizers(ctx) - val selfCancelation = CancelationSignal.Self(finalizers) - - canceled0.tryPut[PureConc[E, *]](selfCancelation).flatMap { - case true => notifyCancelationListeners *> finalizeWith(ctx, finalizers) - case false => - canceled0 - .tryRead[PureConc[E, *]] - .flatMap( - _.fold(finalizeWith(ctx, finalizers))(signal => - finalizeWith(ctx, cancelationFinalizers(signal, ctx)))) + ctx.self.currentMasks.flatMap { + case Nil => + whileFinalizing(ctx) { + canceled0 + .tryPut[PureConc[E, *]](CancelationSignal[E](Some(ctx.finalizers))) + .flatMap { + case true => notifyCancelationListeners *> finalizeWith(ctx, ctx.finalizers) + case false => finalizeWith(ctx, ctx.finalizers) + } } - }, - canceled0 - .tryPut[PureConc[E, *]](CancelationSignal.Self(Nil)) - .flatMap(inserted => if (inserted) notifyCancelationListeners.as(false) else false.pure[PureConc[E, *]]))) + + case _ => + requestCancelation(Some(Nil)).as(false) + }) + + private[this] def requestCancelation( + finalizers: Option[List[PureConc[E, Unit]]]): PureConc[E, Unit] = + canceled0 + .tryPut[PureConc[E, *]](CancelationSignal[E](finalizers)) + .flatMap(inserted => if (inserted) notifyCancelationListeners else ().pure[PureConc[E, *]]) val join: PureConc[E, Outcome[PureConc[E, *], E, A]] = if (canceled0 eq null) state.read @@ -910,10 +884,7 @@ object pure { state.tryRead.flatMap { case Some(_) => ().pure[PureConc[E, *]] case None => - canceled0.tryPut[PureConc[E, *]](CancelationSignal.External()).flatMap { - case true => notifyCancelationListeners - case false => ().pure[PureConc[E, *]] - } *> state.read.void + requestCancelation(None) *> state.read.void } } } diff --git a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala index faedb75a67..af2627f0dc 100644 --- a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala +++ b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala @@ -318,6 +318,23 @@ class PureConcSpec pure.run(t) mustEqual Outcome.Succeeded(Some(1)) } + "run outer finalizers when a masked self-cancel is observed inside poll" in { + val t = for { + finalized <- F.ref(0) + fiber <- F.start { + F.onCancel( + F.uncancelable { poll => + poll(F.uncancelable(_ => F.canceled)) + }, + finalized.update(_ + 1)) + } + _ <- fiber.join + back <- finalized.get + } yield back + + pure.run(t) mustEqual Outcome.Succeeded(Some(1)) + } + "observe pending self-cancel before running a polled region" in { val t = for { finalized <- F.ref(0) From 3ae77bb0b4cecff30f31f85d6c8503743454a696 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Mon, 22 Jun 2026 10:49:14 -0500 Subject: [PATCH 6/6] Scalafmt --- .../kernel/testkit/PureConcGenerators.scala | 5 +- .../cats/effect/kernel/testkit/TimeT.scala | 5 +- .../cats/effect/kernel/testkit/pure.scala | 212 +++++++++--------- .../scala/cats/effect/laws/PureConcSpec.scala | 44 ++-- 4 files changed, 139 insertions(+), 127 deletions(-) diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/PureConcGenerators.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/PureConcGenerators.scala index 55ffa373d9..d0b92ffd1e 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/PureConcGenerators.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/PureConcGenerators.scala @@ -42,9 +42,8 @@ object PureConcGenerators { override def recursiveGen[B: Arbitrary: Cogen](deeper: GenK[PureConc[E, *]]) = super .recursiveGen[B](deeper) - .filterNot( - gen => gen._1 == "racePair" || gen._1 == "join" - ) // remove generators which reify nondeterminism and cannot be law-tested + .filterNot(gen => + gen._1 == "racePair" || gen._1 == "join") // remove generators which reify nondeterminism and cannot be law-tested } implicit def arbitraryPureConc[E: Arbitrary: Cogen, A: Arbitrary: Cogen] diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TimeT.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TimeT.scala index e7e5820529..ce63a742d9 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TimeT.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/TimeT.scala @@ -171,9 +171,8 @@ private[effect] object TimeT { } } - override def raceOutcome[A, B](fa: TimeT[F, A], fb: TimeT[F, B]): TimeT[ - F, - Either[Outcome[TimeT[F, *], E, A], Outcome[TimeT[F, *], E, B]]] = + override def raceOutcome[A, B](fa: TimeT[F, A], fb: TimeT[F, B]) + : TimeT[F, Either[Outcome[TimeT[F, *], E, A], Outcome[TimeT[F, *], E, B]]] = uncancelable { poll => poll(racePair(fa, fb)).flatMap { case Left((oc, f)) => f.cancel.as(Left(oc)) diff --git a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala index ea82567f7a..e538126ae8 100644 --- a/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala +++ b/kernel-testkit/shared/src/main/scala/cats/effect/kernel/testkit/pure.scala @@ -75,10 +75,7 @@ object pure { extends Product3[PureFiber[E, _], List[MaskId], List[PureConc[E, Unit]]] with Serializable { - def this( - self: PureFiber[E, _], - masks: List[MaskId], - finalizers: List[PureConc[E, Unit]]) = + def this(self: PureFiber[E, _], masks: List[MaskId], finalizers: List[PureConc[E, Unit]]) = this(self, masks, finalizers, None, false) def this(self: PureFiber[E, _], masks: List[MaskId]) = @@ -124,9 +121,9 @@ object pure { that match { case that: FiberCtx[_] => that.canEqual(this) && - self == that.self && - masks == that.masks && - finalizers == that.finalizers + self == that.self && + masks == that.masks && + finalizers == that.finalizers case _ => false } @@ -151,13 +148,11 @@ object pure { finalizers: List[PureConc[E, Unit]] = Nil): FiberCtx[E] = new FiberCtx(self, masks, finalizers, None, false) - def tupled[E] - : ((PureFiber[E, _], List[MaskId], List[PureConc[E, Unit]])) => FiberCtx[E] = { + def tupled[E]: ((PureFiber[E, _], List[MaskId], List[PureConc[E, Unit]])) => FiberCtx[E] = { case (self, masks, finalizers) => apply(self, masks, finalizers) } - def curried[E] - : PureFiber[E, _] => List[MaskId] => List[PureConc[E, Unit]] => FiberCtx[E] = + def curried[E]: PureFiber[E, _] => List[MaskId] => List[PureConc[E, Unit]] => FiberCtx[E] = self => masks => finalizers => apply(self, masks, finalizers) private[pure] def internal[E]( @@ -207,9 +202,8 @@ object pure { ctx .self .realizeCancelationWith(ctx) - .ifM( - ApplicativeThread[PureConc[E, *]].done[Unit], - ().pure[PureConc[E, *]]))) + .ifM(ApplicativeThread[PureConc[E, *]].done[Unit], ().pure[PureConc[E, *]])) + ) checker >> mvarLiftF(ThreadT.liftF(ka)) } @@ -280,9 +274,8 @@ object pure { ta.mapK(fk) } - identifiedCompletion.map(a => Succeeded[Id, E, A](a): IdOC[E, A]) handleError { - e => Errored(e) - } + identifiedCompletion.map(a => + Succeeded[Id, E, A](a): IdOC[E, A]) handleError { e => Errored(e) } } Kleisli.ask[ResolvedPC[E, *], MVar.Universe].map { u => @@ -377,9 +370,7 @@ object pure { def deferred[A]: PureConc[E, Deferred[PureConc[E, *], A]] = MVar.empty[PureConc[E, *], A].flatMap(mVar => Kleisli.pure(unsafeDeferred(mVar))) - private[this] def interruptible[A]( - ctx: FiberCtx[E], - fa: PureConc[E, A]): PureConc[E, A] = + private[this] def interruptible[A](ctx: FiberCtx[E], fa: PureConc[E, A]): PureConc[E, A] = ctx.self.interruptible(ctx)(fa) private def unsafeRef[A](mVar: MVar[A]): Ref[PureConc[E, *], A] = @@ -438,9 +429,7 @@ object pure { private def unsafeDeferred[A](mVar: MVar[A]): Deferred[PureConc[E, *], A] = new Deferred[PureConc[E, *], A] { override def get: PureConc[E, A] = - withCtx { ctx => - interruptible(ctx, mVar.read[PureConc[E, *]]) - } + withCtx { ctx => interruptible(ctx, mVar.read[PureConc[E, *]]) } override def complete(a: A): PureConc[E, Boolean] = mVar.tryPut[PureConc[E, *]](a) @@ -452,24 +441,25 @@ object pure { MVar.empty[PureConc[E, *], Outcome[PureConc[E, *], E, A]].flatMap { state => MVar.empty[PureConc[E, *], CancelationSignal[E]] flatMap { canceled => MVar[PureConc[E, *], List[MaskFrame]](Nil) flatMap { masks => - MVar[PureConc[E, *], List[CancelationListener[E]]](Nil) flatMap { cancelationListeners => - MVar[PureConc[E, *], Boolean](false) flatMap { finalizing => - MVar[PureConc[E, *], Int](0) flatMap { activePolls => - val fiber = - new PureFiber[E, A]( - state, - canceled, - masks, - cancelationListeners, - finalizing, - activePolls) - - // the tryPut here is interesting: it encodes first-wins semantics on cancelation/completion - val body = guaranteeCase(fa)(state.tryPut[PureConc[E, *]](_).void) - val identified = localCtx(FiberCtx(fiber), body) - Thread.start(identified.attempt.void).as(fiber) + MVar[PureConc[E, *], List[CancelationListener[E]]](Nil) flatMap { + cancelationListeners => + MVar[PureConc[E, *], Boolean](false) flatMap { finalizing => + MVar[PureConc[E, *], Int](0) flatMap { activePolls => + val fiber = + new PureFiber[E, A]( + state, + canceled, + masks, + cancelationListeners, + finalizing, + activePolls) + + // the tryPut here is interesting: it encodes first-wins semantics on cancelation/completion + val body = guaranteeCase(fa)(state.tryPut[PureConc[E, *]](_).void) + val identified = localCtx(FiberCtx(fiber), body) + Thread.start(identified.attempt.void).as(fiber) + } } - } } } } @@ -483,23 +473,30 @@ object pure { (Fiber[PureConc[E, *], E, A], Outcome[PureConc[E, *], E, B])]] = uncancelable { poll => for { - result <- deferred[Either[Outcome[PureConc[E, *], E, A], Outcome[PureConc[E, *], E, B]]] + result <- deferred[ + Either[Outcome[PureConc[E, *], E, A], Outcome[PureConc[E, *], E, B]]] fibA <- start(fa) fibB <- start(fb) _ <- start( - fibA.join.flatMap(oc => - result.complete( - Left(oc): Either[ - Outcome[PureConc[E, *], E, A], - Outcome[PureConc[E, *], E, B]]).void)) + fibA + .join + .flatMap(oc => + result + .complete(Left(oc): Either[ + Outcome[PureConc[E, *], E, A], + Outcome[PureConc[E, *], E, B]]) + .void)) _ <- start( - fibB.join.flatMap(oc => - result.complete( - Right(oc): Either[ - Outcome[PureConc[E, *], E, A], - Outcome[PureConc[E, *], E, B]]).void)) + fibB + .join + .flatMap(oc => + result + .complete(Right(oc): Either[ + Outcome[PureConc[E, *], E, A], + Outcome[PureConc[E, *], E, B]]) + .void)) back <- onCancel( poll(result.get), @@ -552,7 +549,9 @@ object pure { val pollCtx = update match { case MaskUpdate.Removed => callCtx.withSelfCancelationBoundary( - callCtx.selfCancelationBoundary.orElse(Some(selfCancelationBoundary))) + callCtx + .selfCancelationBoundary + .orElse(Some(selfCancelationBoundary))) case MaskUpdate.Shadowed | MaskUpdate.Absent => callCtx @@ -578,7 +577,9 @@ object pure { Thread.done, restoreF *> result.pure[PureConc[E, *]].rethrow) }), - restoreF)) + restoreF + ) + ) } else fa } @@ -708,17 +709,14 @@ object pure { if (cancelationListeners eq null) ().pure[PureConc[E, *]] else cancelationListeners.read[PureConc[E, *]].flatMap { listeners => - cancelationListeners - .swap[PureConc[E, *]](listeners.filterNot(_.id === id)) - .void + cancelationListeners.swap[PureConc[E, *]](listeners.filterNot(_.id === id)).void } private[this] def notifyCancelationListeners: PureConc[E, Unit] = if (cancelationListeners eq null) ().pure[PureConc[E, *]] else cancelationListeners.swap[PureConc[E, *]](Nil).flatMap(_.traverse_(_.action)) - private[pure] def interruptible[B](ctx: FiberCtx[E])( - fb: PureConc[E, B]): PureConc[E, B] = { + private[pure] def interruptible[B](ctx: FiberCtx[E])(fb: PureConc[E, B]): PureConc[E, B] = { val Thread = ApplicativeThread[PureConc[E, *]] ctx.self.currentMasks.flatMap { @@ -734,7 +732,8 @@ object pure { signal.tryRead[PureConc[E, *]].flatMap { case Some(_) => ().pure[PureConc[E, *]] case None => - ctx.self + ctx + .self .realizeCancelationWith(ctx) .ifM(notifyCancelation, ().pure[PureConc[E, *]]) } @@ -779,7 +778,8 @@ object pure { case Outcome.Canceled() => true case _ => false } - } <* setFinalizing(false))) + } <* setFinalizing(false)) + ) private[this] def whileFinalizing[B](ctx: FiberCtx[E])(fb: PureConc[E, B]): PureConc[E, B] = localCtx(ctx.copy(finalizers = Nil).withFinalizing(true), setFinalizing(true) *> fb) @@ -804,21 +804,29 @@ object pure { ctx: FiberCtx[E], signal: CancelationSignal[E]): PureConc[E, Boolean] = if (ctx.finalizing) false.pure[PureConc[E, *]] - else isFinalizing.ifM( - finalizationOutcome, - ctx.self.currentMasks.map(_.isEmpty).ifM( - whileFinalizing(ctx)(finalizeWith(ctx, cancelationFinalizers(signal, ctx))), - false.pure[PureConc[E, *]] - )) + else + isFinalizing.ifM( + finalizationOutcome, + ctx + .self + .currentMasks + .map(_.isEmpty) + .ifM( + whileFinalizing(ctx)(finalizeWith(ctx, cancelationFinalizers(signal, ctx))), + false.pure[PureConc[E, *]] + ) + ) private[pure] def realizeCancelationWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = if (ctx.finalizing) false.pure[PureConc[E, *]] - else isFinalizing.ifM( - finalizationOutcome, - canceled0.tryRead[PureConc[E, *]].flatMap { - case Some(signal) => realizeCancelationWithSignal(ctx, signal) - case None => false.pure[PureConc[E, *]] - }) + else + isFinalizing.ifM( + finalizationOutcome, + canceled0.tryRead[PureConc[E, *]].flatMap { + case Some(signal) => realizeCancelationWithSignal(ctx, signal) + case None => false.pure[PureConc[E, *]] + } + ) private[this] def cancelationBoundaryFinalizers(ctx: FiberCtx[E]): List[PureConc[E, Unit]] = ctx.selfCancelationBoundary match { @@ -831,50 +839,54 @@ object pure { MVar.empty[PureConc[E, *], Unit].flatMap(_.read[PureConc[E, *]]).as(false) if (ctx.finalizing) blocked - else ctx.self.currentMasks.flatMap { - case Nil => - isFinalizing.ifM( - canceled0.tryRead[PureConc[E, *]].map(_.isEmpty), - canceled0.read[PureConc[E, *]].flatMap(realizeCancelationWithSignal(ctx, _))) + else + ctx.self.currentMasks.flatMap { + case Nil => + isFinalizing.ifM( + canceled0.tryRead[PureConc[E, *]].map(_.isEmpty), + canceled0.read[PureConc[E, *]].flatMap(realizeCancelationWithSignal(ctx, _))) - case _ => - isFinalizing.ifM( - canceled0.tryRead[PureConc[E, *]].map(_.isEmpty), - blocked) - } + case _ => + isFinalizing.ifM(canceled0.tryRead[PureConc[E, *]].map(_.isEmpty), blocked) + } } private[pure] def cancelAndRealizeWith(ctx: FiberCtx[E]): PureConc[E, Boolean] = if (ctx.finalizing) false.pure[PureConc[E, *]] - else isFinalizing.ifM( - ctx.self.currentMasks.map(_.isEmpty), - ctx.self.currentMasks.flatMap { - case Nil => - whileFinalizing(ctx) { - canceled0 - .tryPut[PureConc[E, *]](CancelationSignal[E](Some(ctx.finalizers))) - .flatMap { - case true => notifyCancelationListeners *> finalizeWith(ctx, ctx.finalizers) - case false => finalizeWith(ctx, ctx.finalizers) + else + isFinalizing.ifM( + ctx.self.currentMasks.map(_.isEmpty), + ctx + .self + .currentMasks + .flatMap { + case Nil => + whileFinalizing(ctx) { + canceled0 + .tryPut[PureConc[E, *]](CancelationSignal[E](Some(ctx.finalizers))) + .flatMap { + case true => + notifyCancelationListeners *> finalizeWith(ctx, ctx.finalizers) + case false => finalizeWith(ctx, ctx.finalizers) + } } - } - case _ => - requestCancelation(Some(Nil)).as(false) - }) + case _ => + requestCancelation(Some(Nil)).as(false) + } + ) private[this] def requestCancelation( finalizers: Option[List[PureConc[E, Unit]]]): PureConc[E, Unit] = canceled0 .tryPut[PureConc[E, *]](CancelationSignal[E](finalizers)) - .flatMap(inserted => if (inserted) notifyCancelationListeners else ().pure[PureConc[E, *]]) + .flatMap(inserted => + if (inserted) notifyCancelationListeners else ().pure[PureConc[E, *]]) val join: PureConc[E, Outcome[PureConc[E, *], E, A]] = if (canceled0 eq null) state.read else { - withCtx { ctx => - ctx.self.interruptible(ctx)(state.read) - } + withCtx { ctx => ctx.self.interruptible(ctx)(state.read) } } val cancel: PureConc[E, Unit] = diff --git a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala index af2627f0dc..c9c6da67db 100644 --- a/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala +++ b/laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala @@ -31,8 +31,8 @@ import scala.concurrent.duration._ private[laws] trait PureConcSpecLowPriorityTimeTInstances { implicit def orderTimeTPureConcFiniteDuration( - implicit FA: Order[PureConc[Int, FiniteDuration]]): Order[ - TimeT[PureConc[Int, *], FiniteDuration]] = + implicit FA: Order[PureConc[Int, FiniteDuration]]) + : Order[TimeT[PureConc[Int, *], FiniteDuration]] = TimeT.orderTimeT } @@ -46,8 +46,8 @@ class PureConcSpec implicit def exec(fb: TimeT[PureConc[Int, *], Boolean]): Prop = Prop(pure.run(TimeT.run(fb)).fold(false, _ => false, _.getOrElse(false))) - implicit def eqTimeTPureConc[A](implicit FA: Eq[PureConc[Int, A]]): Eq[ - TimeT[PureConc[Int, *], A]] = + implicit def eqTimeTPureConc[A]( + implicit FA: Eq[PureConc[Int, A]]): Eq[TimeT[PureConc[Int, *], A]] = TimeT.eqTimeT "parallel utilities" should { @@ -290,9 +290,7 @@ class PureConcSpec val t = for { finalized <- F.ref(0) fiber <- F.start { - F.uncancelable { poll => - F.onCancel(poll(F.canceled), finalized.update(_ + 1)) - } + F.uncancelable { poll => F.onCancel(poll(F.canceled), finalized.update(_ + 1)) } } _ <- fiber.join back <- finalized.get @@ -305,11 +303,7 @@ class PureConcSpec val t = for { finalized <- F.ref(0) fiber <- F.start { - F.onCancel( - F.uncancelable { poll => - poll(F.canceled) - }, - finalized.update(_ + 1)) + F.onCancel(F.uncancelable { poll => poll(F.canceled) }, finalized.update(_ + 1)) } _ <- fiber.join back <- finalized.get @@ -323,9 +317,7 @@ class PureConcSpec finalized <- F.ref(0) fiber <- F.start { F.onCancel( - F.uncancelable { poll => - poll(F.uncancelable(_ => F.canceled)) - }, + F.uncancelable { poll => poll(F.uncancelable(_ => F.canceled)) }, finalized.update(_ + 1)) } _ <- fiber.join @@ -416,21 +408,31 @@ class PureConcSpec .Succeeded(Some(Left(1))) pure.run(TimeT.run(T.race(T.never[Unit], T.sleep(1.second).as(1)))) mustEqual Outcome .Succeeded(Some(Right(1))) - pure.run(TimeT.run(T.race(T.sleep(2.seconds).as("slow"), T.sleep(1.second).as("fast")))) mustEqual + pure.run( + TimeT.run( + T.race(T.sleep(2.seconds).as("slow"), T.sleep(1.second).as("fast")))) mustEqual Outcome.Succeeded(Some(Right("fast"))) - pure.run(TimeT.run(T.race(T.sleep(1.second).as("fast"), T.sleep(2.seconds).as("slow")))) mustEqual + pure.run( + TimeT.run( + T.race(T.sleep(1.second).as("fast"), T.sleep(2.seconds).as("slow")))) mustEqual Outcome.Succeeded(Some(Left("fast"))) pure.run(TimeT.run(T.race(T.canceled, T.never[Unit]).void)) mustEqual Outcome.Canceled() pure.run(TimeT.run(T.race(T.never[Unit], T.canceled).void)) mustEqual Outcome.Canceled() pure.run( - TimeT.run(T.race(TimeT.liftF(F.uncancelable(_ => F.canceled.as(1))), T.never[Unit]))) mustEqual + TimeT.run( + T.race(TimeT.liftF(F.uncancelable(_ => F.canceled.as(1))), T.never[Unit]))) mustEqual Outcome.Canceled() pure.run( - TimeT.run(T.race(T.never[Unit], TimeT.liftF(F.uncancelable(_ => F.canceled.as(1)))))) mustEqual + TimeT.run( + T.race(T.never[Unit], TimeT.liftF(F.uncancelable(_ => F.canceled.as(1)))))) mustEqual Outcome.Canceled() - pure.run(TimeT.run(T.race(TimeT.liftF(F.start(F.unit).flatMap(_.join).as(1)), T.never[Unit]))) mustEqual + pure.run( + TimeT.run( + T.race(TimeT.liftF(F.start(F.unit).flatMap(_.join).as(1)), T.never[Unit]))) mustEqual Outcome.Succeeded(Some(Left(1))) - pure.run(TimeT.run(T.race(T.never[Unit], TimeT.liftF(F.start(F.unit).flatMap(_.join).as(1))))) mustEqual + pure.run( + TimeT.run( + T.race(T.never[Unit], TimeT.liftF(F.start(F.unit).flatMap(_.join).as(1))))) mustEqual Outcome.Succeeded(Some(Right(1))) }