diff --git a/build.sbt b/build.sbt index a9a53ab154..bdb8f1a855 100644 --- a/build.sbt +++ b/build.sbt @@ -772,6 +772,14 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) } else Seq() } ) + .nativeSettings( + mimaBinaryIssueFilters ++= Seq( + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.unsafe.PollingExecutorScheduler$SleepTask"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$") + ) + ) /** * Test support for the core project, providing various helpful instances like ScalaCheck diff --git a/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala b/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala new file mode 100644 index 0000000000..72604bbe66 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +trait FileDescriptorPoller { + + /** + * Registers a file descriptor with the poller and monitors read- and/or write-ready events. + */ + def registerFileDescriptor( + fileDescriptor: Int, + monitorReadReady: Boolean, + monitorWriteReady: Boolean + ): Resource[IO, FileDescriptorPollHandle] + +} + +trait FileDescriptorPollHandle { + + /** + * Recursively invokes `f` until it is no longer blocked. Typically `f` will call `read` or + * `recv` on the file descriptor. + * - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`. + * Then `f` will be invoked again with `A` at a later point, when the file handle is ready + * for reading. + * - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this + * method will complete with `B`. + */ + def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] + + /** + * Recursively invokes `f` until it is no longer blocked. Typically `f` will call `write` or + * `send` on the file descriptor. + * - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`. + * Then `f` will be invoked again with `A` at a later point, when the file handle is ready + * for writing. + * - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this + * method will complete with `B`. + */ + def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] + +} diff --git a/core/native/src/main/scala/cats/effect/IOApp.scala b/core/native/src/main/scala/cats/effect/IOApp.scala index 182fc874b9..1fc67b36e6 100644 --- a/core/native/src/main/scala/cats/effect/IOApp.scala +++ b/core/native/src/main/scala/cats/effect/IOApp.scala @@ -20,6 +20,7 @@ import cats.effect.metrics.NativeCpuStarvationMetrics import scala.concurrent.CancellationException import scala.concurrent.duration._ +import scala.scalanative.meta.LinktimeInfo /** * The primary entry point to a Cats Effect application. Extend this trait rather than defining @@ -165,6 +166,21 @@ trait IOApp { */ protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig() + /** + * The [[unsafe.PollingSystem]] used by the [[runtime]] which will evaluate the [[IO]] + * produced by `run`. It is very unlikely that users will need to override this method. + * + * [[unsafe.PollingSystem]] implementors may provide their own flavors of [[IOApp]] that + * override this method. + */ + protected def pollingSystem: unsafe.PollingSystem = + if (LinktimeInfo.isLinux) + unsafe.EpollSystem + else if (LinktimeInfo.isMac) + unsafe.KqueueSystem + else + unsafe.SleepSystem + /** * The entry point for your application. Will be called by the runtime when the process is * started. If the underlying runtime supports it, any arguments passed to the process will be @@ -186,12 +202,8 @@ trait IOApp { import unsafe.IORuntime val installed = IORuntime installGlobal { - IORuntime( - IORuntime.defaultComputeExecutionContext, - IORuntime.defaultComputeExecutionContext, - IORuntime.defaultScheduler, - () => (), - runtimeConfig) + val loop = IORuntime.createEventLoop(pollingSystem) + IORuntime(loop, loop, loop, () => (), runtimeConfig) } _runtime = IORuntime.global diff --git a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala index 71e71c7003..497e5a818d 100644 --- a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala @@ -17,6 +17,9 @@ package cats.effect import cats.effect.std.Console +import cats.effect.unsafe.EventLoopExecutorScheduler + +import scala.reflect.ClassTag import java.time.Instant @@ -62,4 +65,11 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type => */ def readLine: IO[String] = Console[IO].readLine + + def poller[Poller](implicit ct: ClassTag[Poller]): IO[Option[Poller]] = + IO.executionContext.map { + case loop: EventLoopExecutorScheduler if ct.runtimeClass.isInstance(loop.poller) => + Some(loop.poller.asInstanceOf[Poller]) + case _ => None + } } diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala new file mode 100644 index 0000000000..bb8666391f --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -0,0 +1,288 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.effect.std.Semaphore +import cats.syntax.all._ + +import org.typelevel.scalaccompat.annotation._ + +import scala.annotation.tailrec +import scala.concurrent.ExecutionContext +import scala.scalanative.annotation.alwaysinline +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.unistd +import scala.scalanative.runtime._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import java.io.IOException +import java.util.{Collections, IdentityHashMap, Set} + +object EpollSystem extends PollingSystem { + + import epoll._ + import epollImplicits._ + + private[this] final val MaxEvents = 64 + + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = + new Poller(ec, data) + + def makePollData(): PollData = { + val fd = epoll_create1(0) + if (fd == -1) + throw new IOException(fromCString(strerror(errno))) + new PollData(fd) + } + + def closePollData(data: PollData): Unit = data.close() + + def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = + data.poll(nanos) + + final class Poller private[EpollSystem] (ec: ExecutionContext, data: () => PollData) + extends FileDescriptorPoller { + + def registerFileDescriptor( + fd: Int, + reads: Boolean, + writes: Boolean + ): Resource[IO, FileDescriptorPollHandle] = + Resource + .make { + (Semaphore[IO](1), Semaphore[IO](1)).flatMapN { (readSemaphore, writeSemaphore) => + IO { + val handle = new PollHandle(readSemaphore, writeSemaphore) + val unregister = data().register(fd, reads, writes, handle) + (handle, unregister) + }.evalOn(ec) + } + }(_._2) + .map(_._1) + + } + + private final class PollHandle( + readSemaphore: Semaphore[IO], + writeSemaphore: Semaphore[IO] + ) extends FileDescriptorPollHandle { + + private[this] var readReadyCounter = 0 + private[this] var readCallback: Either[Throwable, Int] => Unit = null + + private[this] var writeReadyCounter = 0 + private[this] var writeCallback: Either[Throwable, Int] => Unit = null + + def notify(events: Int): Unit = { + if ((events & EPOLLIN) != 0) { + val counter = readReadyCounter + 1 + readReadyCounter = counter + val cb = readCallback + readCallback = null + if (cb ne null) cb(Right(counter)) + } + if ((events & EPOLLOUT) != 0) { + val counter = writeReadyCounter + 1 + writeReadyCounter = counter + val cb = writeCallback + writeCallback = null + if (cb ne null) cb(Right(counter)) + } + } + + def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + readSemaphore.permit.surround { + def go(a: A, before: Int): IO[B] = + f(a).flatMap { + case Left(a) => + IO(readReadyCounter).flatMap { after => + if (before != after) + // there was a read-ready notification since we started, try again immediately + go(a, after) + else + IO.asyncCheckAttempt[Int] { cb => + IO { + readCallback = cb + // check again before we suspend + val now = readReadyCounter + if (now != before) { + readCallback = null + Right(now) + } else Left(Some(IO(this.readCallback = null))) + } + }.flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } + + IO(readReadyCounter).flatMap(go(a, _)) + } + + def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + writeSemaphore.permit.surround { + def go(a: A, before: Int): IO[B] = + f(a).flatMap { + case Left(a) => + IO(writeReadyCounter).flatMap { after => + if (before != after) + // there was a write-ready notification since we started, try again immediately + go(a, after) + else + IO.asyncCheckAttempt[Int] { cb => + IO { + writeCallback = cb + // check again before we suspend + val now = writeReadyCounter + if (now != before) { + writeCallback = null + Right(now) + } else Left(Some(IO(this.writeCallback = null))) + } + }.flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } + + IO(writeReadyCounter).flatMap(go(a, _)) + } + + } + + final class PollData private[EpollSystem] (epfd: Int) { + + private[this] val handles: Set[PollHandle] = + Collections.newSetFromMap(new IdentityHashMap) + + private[EpollSystem] def close(): Unit = + if (unistd.close(epfd) != 0) + throw new IOException(fromCString(strerror(errno))) + + private[EpollSystem] def poll(timeout: Long): Boolean = { + val noHandles = handles.isEmpty() + + if (timeout <= 0 && noHandles) + false // nothing to do here + else { + val events = stackalloc[epoll_event](MaxEvents.toLong) + + @tailrec + def processEvents(timeout: Int): Unit = { + + val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout) + + if (triggeredEvents >= 0) { + var i = 0 + while (i < triggeredEvents) { + val event = events + i.toLong + val handle = fromPtr(event.data) + handle.notify(event.events.toInt) + i += 1 + } + } else { + throw new IOException(fromCString(strerror(errno))) + } + + if (triggeredEvents >= MaxEvents) + processEvents(0) // drain the ready list + else + () + } + + val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt + processEvents(timeoutMillis) + + !handles.isEmpty() + } + } + + private[EpollSystem] def register( + fd: Int, + reads: Boolean, + writes: Boolean, + handle: PollHandle + ): IO[Unit] = { + val event = stackalloc[epoll_event]() + event.events = + (EPOLLET | (if (reads) EPOLLIN else 0) | (if (writes) EPOLLOUT else 0)).toUInt + event.data = toPtr(handle) + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event) != 0) + throw new IOException(fromCString(strerror(errno))) + handles.add(handle) + + IO { + handles.remove(handle) + if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, null) != 0) + throw new IOException(fromCString(strerror(errno))) + } + } + + @alwaysinline private[this] def toPtr(handle: PollHandle): Ptr[Byte] = + fromRawPtr(Intrinsics.castObjectToRawPtr(handle)) + + @alwaysinline private[this] def fromPtr[A](ptr: Ptr[Byte]): PollHandle = + Intrinsics.castRawPtrToObject(toRawPtr(ptr)).asInstanceOf[PollHandle] + } + + @nowarn212 + @extern + private object epoll { + + final val EPOLL_CTL_ADD = 1 + final val EPOLL_CTL_DEL = 2 + final val EPOLL_CTL_MOD = 3 + + final val EPOLLIN = 0x001 + final val EPOLLOUT = 0x004 + final val EPOLLONESHOT = 1 << 30 + final val EPOLLET = 1 << 31 + + type epoll_event + type epoll_data_t = Ptr[Byte] + + def epoll_create1(flags: Int): Int = extern + + def epoll_ctl(epfd: Int, op: Int, fd: Int, event: Ptr[epoll_event]): Int = extern + + def epoll_wait(epfd: Int, events: Ptr[epoll_event], maxevents: Int, timeout: Int): Int = + extern + + } + + private object epollImplicits { + + implicit final class epoll_eventOps(epoll_event: Ptr[epoll_event]) { + def events: CUnsignedInt = !(epoll_event.asInstanceOf[Ptr[CUnsignedInt]]) + def events_=(events: CUnsignedInt): Unit = + !(epoll_event.asInstanceOf[Ptr[CUnsignedInt]]) = events + + def data: epoll_data_t = + !((epoll_event.asInstanceOf[Ptr[Byte]] + sizeof[CUnsignedInt]) + .asInstanceOf[Ptr[epoll_data_t]]) + def data_=(data: epoll_data_t): Unit = + !((epoll_event.asInstanceOf[Ptr[Byte]] + sizeof[CUnsignedInt]) + .asInstanceOf[Ptr[epoll_data_t]]) = data + } + + implicit val epoll_eventTag: Tag[epoll_event] = + Tag.materializeCArrayTag[Byte, Nat.Digit2[Nat._1, Nat._2]].asInstanceOf[Tag[epoll_event]] + + } +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala new file mode 100644 index 0000000000..165fde120e --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala @@ -0,0 +1,158 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.duration._ +import scala.scalanative.libc.errno +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.unsafe._ +import scala.util.control.NonFatal + +import java.util.{ArrayDeque, PriorityQueue} + +private[effect] final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSystem) + extends ExecutionContextExecutor + with Scheduler { + + private[this] val pollData = system.makePollData() + + val poller: Any = system.makePoller(this, () => pollData) + + private[this] var needsReschedule: Boolean = true + + private[this] val executeQueue: ArrayDeque[Runnable] = new ArrayDeque + private[this] val sleepQueue: PriorityQueue[SleepTask] = new PriorityQueue + + private[this] val noop: Runnable = () => () + + private[this] def scheduleIfNeeded(): Unit = if (needsReschedule) { + ExecutionContext.global.execute(() => loop()) + needsReschedule = false + } + + final def execute(runnable: Runnable): Unit = { + scheduleIfNeeded() + executeQueue.addLast(runnable) + } + + final def sleep(delay: FiniteDuration, task: Runnable): Runnable = + if (delay <= Duration.Zero) { + execute(task) + noop + } else { + scheduleIfNeeded() + val now = monotonicNanos() + val sleepTask = new SleepTask(now + delay.toNanos, task) + sleepQueue.offer(sleepTask) + sleepTask + } + + def reportFailure(t: Throwable): Unit = t.printStackTrace() + + def nowMillis() = System.currentTimeMillis() + + override def nowMicros(): Long = + if (LinktimeInfo.isFreeBSD || LinktimeInfo.isLinux || LinktimeInfo.isMac) { + import scala.scalanative.posix.time._ + import scala.scalanative.posix.timeOps._ + val ts = stackalloc[timespec]() + if (clock_gettime(CLOCK_REALTIME, ts) != 0) + throw new RuntimeException(s"clock_gettime: ${errno.errno}") + ts.tv_sec * 1000000 + ts.tv_nsec / 1000 + } else { + super.nowMicros() + } + + def monotonicNanos() = System.nanoTime() + + private[this] def loop(): Unit = { + needsReschedule = false + + var continue = true + + while (continue) { + // execute the timers + val now = monotonicNanos() + while (!sleepQueue.isEmpty() && sleepQueue.peek().at <= now) { + val task = sleepQueue.poll() + try task.runnable.run() + catch { + case t if NonFatal(t) => reportFailure(t) + case t: Throwable => IOFiber.onFatalFailure(t) + } + } + + // do up to pollEvery tasks + var i = 0 + while (i < pollEvery && !executeQueue.isEmpty()) { + val runnable = executeQueue.poll() + try runnable.run() + catch { + case t if NonFatal(t) => reportFailure(t) + case t: Throwable => IOFiber.onFatalFailure(t) + } + i += 1 + } + + // finally we poll + val timeout = + if (!executeQueue.isEmpty()) + 0 + else if (!sleepQueue.isEmpty()) + Math.max(sleepQueue.peek().at - monotonicNanos(), 0) + else + -1 + + val needsPoll = system.poll(pollData, timeout, reportFailure) + + continue = needsPoll || !executeQueue.isEmpty() || !sleepQueue.isEmpty() + } + + needsReschedule = true + } + + private[this] final class SleepTask( + val at: Long, + val runnable: Runnable + ) extends Runnable + with Comparable[SleepTask] { + + def run(): Unit = { + sleepQueue.remove(this) + () + } + + def compareTo(that: SleepTask): Int = + java.lang.Long.compare(this.at, that.at) + } + +} + +private object EventLoopExecutorScheduler { + lazy val global = { + val system = + if (LinktimeInfo.isLinux) + EpollSystem + else if (LinktimeInfo.isMac) + KqueueSystem + else + SleepSystem + new EventLoopExecutorScheduler(64, system) + } +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index 42c0d19b1c..78c1594cfe 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -20,9 +20,12 @@ import scala.concurrent.ExecutionContext private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type => - def defaultComputeExecutionContext: ExecutionContext = QueueExecutorScheduler + def defaultComputeExecutionContext: ExecutionContext = EventLoopExecutorScheduler.global - def defaultScheduler: Scheduler = QueueExecutorScheduler + def defaultScheduler: Scheduler = EventLoopExecutorScheduler.global + + def createEventLoop(system: PollingSystem): ExecutionContext with Scheduler = + new EventLoopExecutorScheduler(64, system) private[this] var _global: IORuntime = null diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala new file mode 100644 index 0000000000..d4ec798b79 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -0,0 +1,287 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.effect.std.Semaphore +import cats.syntax.all._ + +import org.typelevel.scalaccompat.annotation._ + +import scala.annotation.tailrec +import scala.concurrent.ExecutionContext +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.time._ +import scala.scalanative.posix.timeOps._ +import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import java.io.IOException +import java.util.HashMap + +object KqueueSystem extends PollingSystem { + + import event._ + import eventImplicits._ + + private final val MaxEvents = 64 + + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = + new Poller(ec, data) + + def makePollData(): PollData = { + val fd = kqueue() + if (fd == -1) + throw new IOException(fromCString(strerror(errno))) + new PollData(fd) + } + + def closePollData(data: PollData): Unit = data.close() + + def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = + data.poll(nanos) + + final class Poller private[KqueueSystem] ( + ec: ExecutionContext, + data: () => PollData + ) extends FileDescriptorPoller { + def registerFileDescriptor( + fd: Int, + reads: Boolean, + writes: Boolean + ): Resource[IO, FileDescriptorPollHandle] = + Resource.eval { + (Semaphore[IO](1), Semaphore[IO](1)).mapN { + new PollHandle(ec, data, fd, _, _) + } + } + } + + private final class PollHandle( + ec: ExecutionContext, + data: () => PollData, + fd: Int, + readSemaphore: Semaphore[IO], + writeSemaphore: Semaphore[IO] + ) extends FileDescriptorPollHandle { + + private[this] val readEvent = KEvent(fd.toLong, EVFILT_READ) + private[this] val writeEvent = KEvent(fd.toLong, EVFILT_WRITE) + + def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + readSemaphore.permit.surround { + a.tailRecM { a => + f(a).flatTap { r => + if (r.isRight) + IO.unit + else + IO.async[Unit] { cb => + IO { + val kqueue = data() + kqueue.evSet(readEvent, EV_ADD.toUShort, cb) + Some(IO(kqueue.removeCallback(readEvent))) + }.evalOn(ec) + } + } + } + } + + def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + writeSemaphore.permit.surround { + a.tailRecM { a => + f(a).flatTap { r => + if (r.isRight) + IO.unit + else + IO.async[Unit] { cb => + IO { + val kqueue = data() + kqueue.evSet(writeEvent, EV_ADD.toUShort, cb) + Some(IO(kqueue.removeCallback(writeEvent))) + }.evalOn(ec) + } + } + } + } + + } + + private final case class KEvent(ident: Long, filter: Short) + + final class PollData private[KqueueSystem] (kqfd: Int) { + + private[this] val changelistArray = new Array[Byte](sizeof[kevent64_s].toInt * MaxEvents) + private[this] val changelist = changelistArray.at(0).asInstanceOf[Ptr[kevent64_s]] + private[this] var changeCount = 0 + + private[this] val callbacks = new HashMap[KEvent, Either[Throwable, Unit] => Unit]() + + private[KqueueSystem] def evSet( + event: KEvent, + flags: CUnsignedShort, + cb: Either[Throwable, Unit] => Unit + ): Unit = { + val change = changelist + changeCount.toLong + + change.ident = event.ident.toULong + change.filter = event.filter + change.flags = (flags.toInt | EV_ONESHOT).toUShort + + callbacks.put(event, cb) + + changeCount += 1 + } + + private[KqueueSystem] def removeCallback(event: KEvent): Unit = { + callbacks.remove(event) + () + } + + private[KqueueSystem] def close(): Unit = + if (unistd.close(kqfd) != 0) + throw new IOException(fromCString(strerror(errno))) + + private[KqueueSystem] def poll(timeout: Long): Boolean = { + val noCallbacks = callbacks.isEmpty + + if (timeout <= 0 && noCallbacks && changeCount == 0) + false // nothing to do here + else { + + val eventlist = stackalloc[kevent64_s](MaxEvents.toLong) + + @tailrec + def processEvents(timeout: Ptr[timespec], changeCount: Int, flags: Int): Unit = { + + val triggeredEvents = + kevent64( + kqfd, + changelist, + changeCount, + eventlist, + MaxEvents, + flags.toUInt, + timeout + ) + + if (triggeredEvents >= 0) { + var i = 0 + var event = eventlist + while (i < triggeredEvents) { + val cb = callbacks.remove(KEvent(event.ident.toLong, event.filter)) + + if (cb ne null) + cb( + if ((event.flags.toLong & EV_ERROR) != 0) + Left(new IOException(fromCString(strerror(event.data.toInt)))) + else Either.unit + ) + + i += 1 + event += 1 + } + } else { + throw new IOException(fromCString(strerror(errno))) + } + + if (triggeredEvents >= MaxEvents) + processEvents(null, 0, KEVENT_FLAG_NONE) // drain the ready list + else + () + } + + val timeoutSpec = + if (timeout <= 0) null + else { + val ts = stackalloc[timespec]() + ts.tv_sec = timeout / 1000000000 + ts.tv_nsec = timeout % 1000000000 + ts + } + + val flags = if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE + + processEvents(timeoutSpec, changeCount, flags) + changeCount = 0 + + !callbacks.isEmpty() + } + } + + } + + @nowarn212 + @extern + private object event { + // Derived from https://opensource.apple.com/source/xnu/xnu-7195.81.3/bsd/sys/event.h.auto.html + + final val EVFILT_READ = -1 + final val EVFILT_WRITE = -2 + + final val KEVENT_FLAG_NONE = 0x000000 + final val KEVENT_FLAG_IMMEDIATE = 0x000001 + + final val EV_ADD = 0x0001 + final val EV_DELETE = 0x0002 + final val EV_ONESHOT = 0x0010 + final val EV_CLEAR = 0x0020 + final val EV_ERROR = 0x4000 + + type kevent64_s + + def kqueue(): CInt = extern + + def kevent64( + kq: CInt, + changelist: Ptr[kevent64_s], + nchanges: CInt, + eventlist: Ptr[kevent64_s], + nevents: CInt, + flags: CUnsignedInt, + timeout: Ptr[timespec] + ): CInt = extern + + } + + private object eventImplicits { + + implicit final class kevent64_sOps(kevent64_s: Ptr[kevent64_s]) { + def ident: CUnsignedLongInt = !(kevent64_s.asInstanceOf[Ptr[CUnsignedLongInt]]) + def ident_=(ident: CUnsignedLongInt): Unit = + !(kevent64_s.asInstanceOf[Ptr[CUnsignedLongInt]]) = ident + + def filter: CShort = !(kevent64_s.asInstanceOf[Ptr[CShort]] + 4) + def filter_=(filter: CShort): Unit = + !(kevent64_s.asInstanceOf[Ptr[CShort]] + 4) = filter + + def flags: CUnsignedShort = !(kevent64_s.asInstanceOf[Ptr[CUnsignedShort]] + 5) + def flags_=(flags: CUnsignedShort): Unit = + !(kevent64_s.asInstanceOf[Ptr[CUnsignedShort]] + 5) = flags + + def data: CLong = !(kevent64_s.asInstanceOf[Ptr[CLong]] + 2) + + def udata: Ptr[Byte] = !(kevent64_s.asInstanceOf[Ptr[Ptr[Byte]]] + 3) + def udata_=(udata: Ptr[Byte]): Unit = + !(kevent64_s.asInstanceOf[Ptr[Ptr[Byte]]] + 3) = udata + } + + implicit val kevent64_sTag: Tag[kevent64_s] = + Tag.materializeCArrayTag[Byte, Nat.Digit2[Nat._4, Nat._8]].asInstanceOf[Tag[kevent64_s]] + } +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index d111ce950c..ddaa239a30 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -19,63 +19,38 @@ package unsafe import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} import scala.concurrent.duration._ -import scala.scalanative.libc.errno -import scala.scalanative.meta.LinktimeInfo -import scala.scalanative.unsafe._ -import scala.util.control.NonFatal - -import java.util.{ArrayDeque, PriorityQueue} +@deprecated("Use default runtime with a custom PollingSystem", "3.5.0") abstract class PollingExecutorScheduler(pollEvery: Int) extends ExecutionContextExecutor - with Scheduler { - - private[this] var needsReschedule: Boolean = true - - private[this] val executeQueue: ArrayDeque[Runnable] = new ArrayDeque - private[this] val sleepQueue: PriorityQueue[SleepTask] = new PriorityQueue - - private[this] val noop: Runnable = () => () - - private[this] def scheduleIfNeeded(): Unit = if (needsReschedule) { - ExecutionContext.global.execute(() => loop()) - needsReschedule = false - } + with Scheduler { outer => + + private[this] val loop = new EventLoopExecutorScheduler( + pollEvery, + new PollingSystem { + type Poller = outer.type + type PollData = outer.type + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = outer + def makePollData(): PollData = outer + def closePollData(data: PollData): Unit = () + def poll(data: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = + if (nanos == -1) data.poll(Duration.Inf) else data.poll(nanos.nanos) + } + ) - final def execute(runnable: Runnable): Unit = { - scheduleIfNeeded() - executeQueue.addLast(runnable) - } + final def execute(runnable: Runnable): Unit = + loop.execute(runnable) final def sleep(delay: FiniteDuration, task: Runnable): Runnable = - if (delay <= Duration.Zero) { - execute(task) - noop - } else { - scheduleIfNeeded() - val now = monotonicNanos() - val sleepTask = new SleepTask(now + delay.toNanos, task) - sleepQueue.offer(sleepTask) - sleepTask - } + loop.sleep(delay, task) - def reportFailure(t: Throwable): Unit = t.printStackTrace() + def reportFailure(t: Throwable): Unit = loop.reportFailure(t) - def nowMillis() = System.currentTimeMillis() + def nowMillis() = loop.nowMillis() - override def nowMicros(): Long = - if (LinktimeInfo.isFreeBSD || LinktimeInfo.isLinux || LinktimeInfo.isMac) { - import scala.scalanative.posix.time._ - import scala.scalanative.posix.timeOps._ - val ts = stackalloc[timespec]() - if (clock_gettime(CLOCK_REALTIME, ts) != 0) - throw new RuntimeException(s"clock_gettime: ${errno.errno}") - ts.tv_sec * 1000000 + ts.tv_nsec / 1000 - } else { - super.nowMicros() - } + override def nowMicros(): Long = loop.nowMicros() - def monotonicNanos() = System.nanoTime() + def monotonicNanos() = loop.monotonicNanos() /** * @param timeout @@ -90,65 +65,4 @@ abstract class PollingExecutorScheduler(pollEvery: Int) */ protected def poll(timeout: Duration): Boolean - private[this] def loop(): Unit = { - needsReschedule = false - - var continue = true - - while (continue) { - // execute the timers - val now = monotonicNanos() - while (!sleepQueue.isEmpty() && sleepQueue.peek().at <= now) { - val task = sleepQueue.poll() - try task.runnable.run() - catch { - case t if NonFatal(t) => reportFailure(t) - case t: Throwable => IOFiber.onFatalFailure(t) - } - } - - // do up to pollEvery tasks - var i = 0 - while (i < pollEvery && !executeQueue.isEmpty()) { - val runnable = executeQueue.poll() - try runnable.run() - catch { - case t if NonFatal(t) => reportFailure(t) - case t: Throwable => IOFiber.onFatalFailure(t) - } - i += 1 - } - - // finally we poll - val timeout = - if (!executeQueue.isEmpty()) - Duration.Zero - else if (!sleepQueue.isEmpty()) - Math.max(sleepQueue.peek().at - monotonicNanos(), 0).nanos - else - Duration.Inf - - val needsPoll = poll(timeout) - - continue = needsPoll || !executeQueue.isEmpty() || !sleepQueue.isEmpty() - } - - needsReschedule = true - } - - private[this] final class SleepTask( - val at: Long, - val runnable: Runnable - ) extends Runnable - with Comparable[SleepTask] { - - def run(): Unit = { - sleepQueue.remove(this) - () - } - - def compareTo(that: SleepTask): Int = - java.lang.Long.compare(this.at, that.at) - } - } diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala new file mode 100644 index 0000000000..a4f0a88248 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import scala.concurrent.ExecutionContext + +abstract class PollingSystem { + + /** + * The user-facing Poller interface. + */ + type Poller + + /** + * The thread-local data structure used for polling. + */ + type PollData + + def makePoller(ec: ExecutionContext, data: () => PollData): Poller + + def makePollData(): PollData + + def closePollData(data: PollData): Unit + + /** + * @param nanos + * the maximum duration for which to block, where `nanos == -1` indicates to block + * indefinitely. ''However'', if `nanos == -1` and there are no remaining events to poll + * for, this method should return `false` immediately. This is unfortunate but necessary so + * that the `EventLoop` can yield to the Scala Native global `ExecutionContext` which is + * currently hard-coded into every test framework, including MUnit, specs2, and Weaver. + * + * @return + * whether poll should be called again (i.e., there are more events to be polled) + */ + def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean + +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/SchedulerCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/unsafe/SchedulerCompanionPlatform.scala index f6e4964808..e6fe2d258c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SchedulerCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SchedulerCompanionPlatform.scala @@ -18,6 +18,7 @@ package cats.effect.unsafe private[unsafe] abstract class SchedulerCompanionPlatform { this: Scheduler.type => - def createDefaultScheduler(): (Scheduler, () => Unit) = (QueueExecutorScheduler, () => ()) + def createDefaultScheduler(): (Scheduler, () => Unit) = + (EventLoopExecutorScheduler.global, () => ()) } diff --git a/core/native/src/main/scala/cats/effect/unsafe/QueueExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala similarity index 57% rename from core/native/src/main/scala/cats/effect/unsafe/QueueExecutorScheduler.scala rename to core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index c53036b5dc..ce4b85cc4b 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/QueueExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -14,18 +14,25 @@ * limitations under the License. */ -package cats.effect.unsafe +package cats.effect +package unsafe -import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext -// JVM WSTP sets ExternalQueueTicks = 64 so we steal it here -private[effect] object QueueExecutorScheduler extends PollingExecutorScheduler(64) { +object SleepSystem extends PollingSystem { - def poll(timeout: Duration): Boolean = { - if (timeout != Duration.Zero && timeout.isFinite) { - val nanos = timeout.toNanos + final class Poller private[SleepSystem] () + final class PollData private[SleepSystem] () + + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = new Poller + + def makePollData(): PollData = new PollData + + def closePollData(data: PollData): Unit = () + + def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = { + if (nanos > 0) Thread.sleep(nanos / 1000000, (nanos % 1000000).toInt) - } false } diff --git a/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala new file mode 100644 index 0000000000..a321c3b5c7 --- /dev/null +++ b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala @@ -0,0 +1,139 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.std.CountDownLatch +import cats.syntax.all._ + +import scala.concurrent.duration._ +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.fcntl._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import java.io.IOException + +class FileDescriptorPollerSpec extends BaseSpec { + + final class Pipe( + val readFd: Int, + val writeFd: Int, + val readHandle: FileDescriptorPollHandle, + val writeHandle: FileDescriptorPollHandle + ) { + def read(buf: Array[Byte], offset: Int, length: Int): IO[Unit] = + readHandle + .pollReadRec(()) { _ => IO(guard(unistd.read(readFd, buf.at(offset), length.toULong))) } + .void + + def write(buf: Array[Byte], offset: Int, length: Int): IO[Unit] = + writeHandle + .pollWriteRec(()) { _ => + IO(guard(unistd.write(writeFd, buf.at(offset), length.toULong))) + } + .void + + private def guard(thunk: => CInt): Either[Unit, CInt] = { + val rtn = thunk + if (rtn < 0) { + val en = errno + if (en == EAGAIN || en == EWOULDBLOCK) + Left(()) + else + throw new IOException(fromCString(strerror(errno))) + } else + Right(rtn) + } + } + + def mkPipe: Resource[IO, Pipe] = + Resource + .make { + IO { + val fd = stackalloc[CInt](2) + if (unistd.pipe(fd) != 0) + throw new IOException(fromCString(strerror(errno))) + (fd(0), fd(1)) + } + } { + case (readFd, writeFd) => + IO { + unistd.close(readFd) + unistd.close(writeFd) + () + } + } + .evalTap { + case (readFd, writeFd) => + IO { + if (fcntl(readFd, F_SETFL, O_NONBLOCK) != 0) + throw new IOException(fromCString(strerror(errno))) + if (fcntl(writeFd, F_SETFL, O_NONBLOCK) != 0) + throw new IOException(fromCString(strerror(errno))) + } + } + .flatMap { + case (readFd, writeFd) => + Resource.eval(IO.poller[FileDescriptorPoller].map(_.get)).flatMap { poller => + ( + poller.registerFileDescriptor(readFd, true, false), + poller.registerFileDescriptor(writeFd, false, true) + ).mapN(new Pipe(readFd, writeFd, _, _)) + } + } + + "FileDescriptorPoller" should { + + "notify read-ready events" in real { + mkPipe.use { pipe => + for { + buf <- IO(new Array[Byte](4)) + _ <- pipe.write(Array[Byte](1, 2, 3), 0, 3).background.surround(pipe.read(buf, 0, 3)) + _ <- pipe.write(Array[Byte](42), 0, 1).background.surround(pipe.read(buf, 3, 1)) + } yield buf.toList must be_==(List[Byte](1, 2, 3, 42)) + } + } + + "handle lots of simultaneous events" in real { + mkPipe.replicateA(1000).use { pipes => + CountDownLatch[IO](1000).flatMap { latch => + pipes + .traverse_ { pipe => + (pipe.read(new Array[Byte](1), 0, 1) *> latch.release).background + } + .surround { + IO { // trigger all the pipes at once + pipes.foreach { pipe => + unistd.write(pipe.writeFd, Array[Byte](42).at(0), 1.toULong) + } + }.background.surround(latch.await.as(true)) + } + } + } + } + + "hang if never ready" in real { + mkPipe.use { pipe => + pipe.read(new Array[Byte](1), 0, 1).as(false).timeoutTo(1.second, IO.pure(true)) + } + } + } + +} diff --git a/tests/native/src/test/scala/cats/effect/unsafe/SchedulerSpec.scala b/tests/native/src/test/scala/cats/effect/unsafe/SchedulerSpec.scala index c55d919868..9031819c19 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/SchedulerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/SchedulerSpec.scala @@ -17,6 +17,8 @@ package cats.effect package unsafe +import scala.concurrent.duration._ + class SchedulerSpec extends BaseSpec { "Default scheduler" should { @@ -27,12 +29,18 @@ class SchedulerSpec extends BaseSpec { deltas = times.map(_ - start) } yield deltas.exists(_.toMicros % 1000 != 0) } + "correctly calculate real time" in real { IO.realTime.product(IO(System.currentTimeMillis())).map { case (realTime, currentTime) => (realTime.toMillis - currentTime) should be_<=(1L) } } + + "sleep for correct duration" in real { + val duration = 1500.millis + IO.sleep(duration).timed.map(_._1 should be_>=(duration)) + } } }