Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ object PureConcGenerators {
override def recursiveGen[B: Arbitrary: Cogen](deeper: GenK[PureConc[E, *]]) =
super
.recursiveGen[B](deeper)
.filterNot(
_._1 == "racePair"
) // remove the racePair generator since it reifies nondeterminism, which cannot be law-tested
.filterNot(gen =>
gen._1 == "racePair" || gen._1 == "join") // remove generators which reify nondeterminism and cannot be law-tested
}

implicit def arbitraryPureConc[E: Arbitrary: Cogen, A: Arbitrary: Cogen]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package cats.effect
package kernel
package testkit

import cats.{~>, Group, Monad, Monoid, Order}
import cats.{~>, Eq, Group, Monad, Monoid, Order}
import cats.data.Kleisli
import cats.syntax.all._

Expand Down Expand Up @@ -90,6 +90,9 @@ private[effect] object TimeT {
a.map(_.inverse())
}

implicit def eqTimeT[F[_], A](implicit FA: Eq[F[A]]): Eq[TimeT[F, A]] =
Eq.by(TimeT.run)

implicit def orderTimeT[F[_], A](implicit FA: Order[F[A]]): Order[TimeT[F, A]] =
Order.by(TimeT.run)

Expand All @@ -111,15 +114,69 @@ private[effect] object TimeT {
val forkA = time.fork()
val forkB = time.fork()

// TODO this doesn't work (yet) because we need to force the "faster" effect to win the race, which right now isn't happening
F.racePair(fa.run(forkA), fb.run(forkB)).map {
def liftOutcome[C](oc: Outcome[F, E, C]): Outcome[TimeT[F, *], E, C] =
oc.mapK(TimeT.liftK[F])

F.racePair(fa.run(forkA), fb.run(forkB)).flatMap {
case Left((oca, delegate)) =>
time.now = forkA.now
Left((oca.mapK(TimeT.liftK[F]), fiberize(forkB, delegate)))
F.onCancel(F.race(delegate.join, F.cede), delegate.cancel).map {
case Left(ocb) if forkB.now < forkA.now =>
time.now = forkB.now
Right((completedFiber(forkA, liftOutcome(oca)), liftOutcome(ocb)))

case _ =>
time.now = forkA.now
Left((liftOutcome(oca), fiberize(forkB, delegate)))
}

case Right((delegate, ocb)) =>
time.now = forkB.now
Right((fiberize(forkA, delegate), ocb.mapK(TimeT.liftK[F])))
F.onCancel(F.race(delegate.join, F.cede), delegate.cancel).map {
case Left(oca) if forkA.now < forkB.now =>
time.now = forkA.now
Left((liftOutcome(oca), completedFiber(forkB, liftOutcome(ocb))))

case _ =>
time.now = forkB.now
Right((fiberize(forkA, delegate), liftOutcome(ocb)))
}
}
}

override def race[A, B](fa: TimeT[F, A], fb: TimeT[F, B]): TimeT[F, Either[A, B]] =
uncancelable { poll =>
poll(racePair(fa, fb)).flatMap {
case Left((oc, f)) =>
oc match {
case Outcome.Succeeded(fa) => f.cancel *> fa.map(Left(_))
case Outcome.Errored(ea) => f.cancel *> raiseError(ea)
case Outcome.Canceled() =>
f.cancel *> poll(f.join) flatMap {
case Outcome.Succeeded(fb) => fb.map(Right(_))
case Outcome.Errored(eb) => raiseError(eb)
case Outcome.Canceled() => poll(canceled) *> never
}
}

case Right((f, oc)) =>
oc match {
case Outcome.Succeeded(fb) => f.cancel *> fb.map(Right(_))
case Outcome.Errored(eb) => f.cancel *> raiseError(eb)
case Outcome.Canceled() =>
f.cancel *> poll(f.join) flatMap {
case Outcome.Succeeded(fa) => fa.map(Left(_))
case Outcome.Errored(ea) => raiseError(ea)
case Outcome.Canceled() => poll(canceled) *> never
}
}
}
}

override def raceOutcome[A, B](fa: TimeT[F, A], fb: TimeT[F, B])
: TimeT[F, Either[Outcome[TimeT[F, *], E, A], Outcome[TimeT[F, *], E, B]]] =
uncancelable { poll =>
poll(racePair(fa, fb)).flatMap {
case Left((oc, f)) => f.cancel.as(Left(oc))
case Right((f, oc)) => f.cancel.as(Right(oc))
}
}

Expand Down Expand Up @@ -156,5 +213,20 @@ private[effect] object TimeT {
}
}
}

private[this] def completedFiber[A](
forked: Time,
outcome: Outcome[TimeT[F, *], E, A]): Fiber[TimeT[F, *], E, A] =
new Fiber[TimeT[F, *], E, A] {

val cancel =
unit

val join =
Kleisli { outerTime =>
outerTime.now = outerTime.now.max(forked.now)
F.pure(outcome)
}
}
}
}
Loading
Loading