Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions kernel/shared/src/main/scala/cats/effect/kernel/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{Duration, FiniteDuration}

import java.util.concurrent.TimeoutException

/**
* `Resource` is a data structure which encodes the idea of executing an action which has an
* associated finalizer that needs to be run when the action completes.
Expand Down Expand Up @@ -1496,6 +1498,28 @@ private[effect] trait ResourceTemporal[F[_]]

def sleep(time: FiniteDuration): Resource[F, Unit] =
Resource.sleep(time)

// Overridden because the GenTemporal default is implemented in terms of
// racePair, whose Resource instance (via start) does not guarantee the
// source's finalizers have run by the time use returns (#4489, regressed by
// #4059). Resource#race has the correct finalizer semantics (#3226), so we
// express the timeout in terms of it instead.
override protected def timeoutTo[A](
fa: Resource[F, A],
duration: FiniteDuration,
fallback: Resource[F, A]): Resource[F, A] =
fa.race(Resource.sleep[F](duration)).flatMap {
case Left(a) => Resource.pure[F, A](a)
case Right(()) => fallback
}

override protected def timeout[A](fa: Resource[F, A], duration: FiniteDuration)(
implicit ev: TimeoutException <:< Throwable): Resource[F, A] =
fa.race(Resource.sleep[F](duration)).flatMap {
case Left(a) => Resource.pure[F, A](a)
case Right(()) =>
Resource.eval(F.raiseError[A](new TimeoutException(duration.toString())))
}
}

abstract private[effect] class ResourceAsync[F[_]]
Expand Down
58 changes: 58 additions & 0 deletions tests/shared/src/test/scala/cats/effect/ResourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,64 @@ class ResourceSuite extends BaseScalaCheckSuite with DisciplineSuite {
}
}

real("ensure a timed out resource runs its onCancel".ignore) {
IO.ref(true) flatMap { fired =>
Resource
.eval(IO.sleep(100.millis).onCancel(fired.set(false)).timeout(100.millis))
.use_
.attempt
.flatMap {
case Left(_) => IO.unit
case Right(_) =>
fired.get.ifM(IO.raiseError(new Exception("didn't run the cancelation")), IO.unit)
}
}
}

// github.com/typelevel/cats-effect/issues/4489
// doesn't show up with the ticker variant
real("run finalisers when winning a timeout race (real)") {
IO.ref(false).flatMap { ref =>
val res = Resource.make(ref.set(true))(_ => ref.set(false))
val timedRes = res.timeout(1.hour)
val check = timedRes.use_ *>
ref.get.ifM(IO.raiseError(new Exception("not released")), IO.unit)
check.replicateA_(10000)
}
}

ticked("run finalisers when winning a timeout race (ticked)") { implicit ticker =>
val go = IO.ref(false).flatMap { ref =>
val res = Resource.make(ref.set(true))(_ => ref.set(false))
val timedRes = res.timeout(1.hour)
timedRes.use_ *>
ref.get.ifM(IO.raiseError(new Exception("not released")), IO.unit)
}
assertCompleteAs(go, ())
}

ticked("run finalisers when losing a timeout race") { implicit ticker =>
val go = IO.ref(true).flatMap { ref =>
val res = Resource.make(IO.sleep(100.millis))(_ => ref.set(false))
val timedRes = res.timeout(1.milli)
timedRes.use_.attempt *> IO.sleep(150.millis) *>
ref.get.ifM(IO.raiseError(new Exception("not released")), IO.unit)
}
assertCompleteAs(go, ())
}

// TODO this test is failing, indicating a timing bug with Resource
ticked("run nested finalisers when succeeding at the same time as the timeout".ignore) {
implicit ticker =>
val go = IO.ref(false).flatMap { ref =>
val inner = Resource.make(ref.set(true))(_ => ref.set(false))
val res = Resource.make(IO.sleep(99.millis).flatMap(_ => inner.use_))(_ => IO.unit)
res.timeout(100.millis).use_.attempt *> IO.sleep(150.millis) *>
ref.get.ifM(IO.unit, IO.raiseError(new Exception("inner finaliser not run")))
}
assertCompleteAs(go, ())
}

ticked("attempt - releases resource on error") { implicit ticker =>
assertCompleteAs(
IO.ref(0)
Expand Down
Loading