From 6d23310373e7750296967110b661abee65f10c2f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 11 Dec 2022 05:42:20 +0000 Subject: [PATCH 01/31] Extract `PollingSystem` abstraction on Native --- build.sbt | 8 + .../cats/effect/IOCompanionPlatform.scala | 10 ++ .../scala/cats/effect/unsafe/EventLoop.scala | 26 +++ .../unsafe/EventLoopExecutorScheduler.scala | 150 ++++++++++++++++++ .../unsafe/IORuntimeCompanionPlatform.scala | 4 +- .../unsafe/PollingExecutorScheduler.scala | 130 +++------------ .../cats/effect/unsafe/PollingSystem.scala | 41 +++++ .../unsafe/SchedulerCompanionPlatform.scala | 3 +- ...cutorScheduler.scala => SleepSystem.scala} | 18 ++- 9 files changed, 270 insertions(+), 120 deletions(-) create mode 100644 core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala create mode 100644 core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala create mode 100644 core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala rename core/native/src/main/scala/cats/effect/unsafe/{QueueExecutorScheduler.scala => SleepSystem.scala} (66%) diff --git a/build.sbt b/build.sbt index a9a53ab154..bdb8f1a855 100644 --- a/build.sbt +++ b/build.sbt @@ -772,6 +772,14 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) } else Seq() } ) + .nativeSettings( + mimaBinaryIssueFilters ++= Seq( + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.unsafe.PollingExecutorScheduler$SleepTask"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$") + ) + ) /** * Test support for the core project, providing various helpful instances like ScalaCheck diff --git a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala index 71e71c7003..fb4e2a1875 100644 --- a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala @@ -17,6 +17,9 @@ package cats.effect import cats.effect.std.Console +import cats.effect.unsafe.EventLoop + +import scala.reflect.ClassTag import java.time.Instant @@ -62,4 +65,11 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type => */ def readLine: IO[String] = Console[IO].readLine + + def eventLoop[Poller](implicit ct: ClassTag[Poller]): IO[Option[EventLoop[Poller]]] = + IO.executionContext.map { + case loop: EventLoop[_] if ct.runtimeClass.isInstance(loop.poller()) => + Some(loop.asInstanceOf[EventLoop[Poller]]) + case _ => None + } } diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala new file mode 100644 index 0000000000..181c74ea07 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import scala.concurrent.ExecutionContext + +trait EventLoop[Poller] extends ExecutionContext { + + def poller(): Poller + +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala new file mode 100644 index 0000000000..acf22401e4 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala @@ -0,0 +1,150 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.duration._ +import scala.scalanative.libc.errno +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.unsafe._ +import scala.util.control.NonFatal + +import java.util.{ArrayDeque, PriorityQueue} + +private final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSystem) + extends EventLoop[Any] + with ExecutionContextExecutor + with Scheduler { + + private[this] val _poller = system.makePoller() + + private[this] var needsReschedule: Boolean = true + + private[this] val executeQueue: ArrayDeque[Runnable] = new ArrayDeque + private[this] val sleepQueue: PriorityQueue[SleepTask] = new PriorityQueue + + private[this] val noop: Runnable = () => () + + private[this] def scheduleIfNeeded(): Unit = if (needsReschedule) { + ExecutionContext.global.execute(() => loop()) + needsReschedule = false + } + + final def execute(runnable: Runnable): Unit = { + scheduleIfNeeded() + executeQueue.addLast(runnable) + } + + final def sleep(delay: FiniteDuration, task: Runnable): Runnable = + if (delay <= Duration.Zero) { + execute(task) + noop + } else { + scheduleIfNeeded() + val now = monotonicNanos() + val sleepTask = new SleepTask(now + delay.toNanos, task) + sleepQueue.offer(sleepTask) + sleepTask + } + + def reportFailure(t: Throwable): Unit = t.printStackTrace() + + def nowMillis() = System.currentTimeMillis() + + override def nowMicros(): Long = + if (LinktimeInfo.isFreeBSD || LinktimeInfo.isLinux || LinktimeInfo.isMac) { + import scala.scalanative.posix.time._ + import scala.scalanative.posix.timeOps._ + val ts = stackalloc[timespec]() + if (clock_gettime(CLOCK_REALTIME, ts) != 0) + throw new RuntimeException(s"clock_gettime: ${errno.errno}") + ts.tv_sec * 1000000 + ts.tv_nsec / 1000 + } else { + super.nowMicros() + } + + def monotonicNanos() = System.nanoTime() + + def poller(): Any = _poller + + private[this] def loop(): Unit = { + needsReschedule = false + + var continue = true + + while (continue) { + // execute the timers + val now = monotonicNanos() + while (!sleepQueue.isEmpty() && sleepQueue.peek().at <= now) { + val task = sleepQueue.poll() + try task.runnable.run() + catch { + case t if NonFatal(t) => reportFailure(t) + case t: Throwable => IOFiber.onFatalFailure(t) + } + } + + // do up to pollEvery tasks + var i = 0 + while (i < pollEvery && !executeQueue.isEmpty()) { + val runnable = executeQueue.poll() + try runnable.run() + catch { + case t if NonFatal(t) => reportFailure(t) + case t: Throwable => IOFiber.onFatalFailure(t) + } + i += 1 + } + + // finally we poll + val timeout = + if (!executeQueue.isEmpty()) + 0 + else if (!sleepQueue.isEmpty()) + Math.max(sleepQueue.peek().at - monotonicNanos(), 0) + else + -1 + + val needsPoll = system.poll(_poller, timeout) + + continue = needsPoll || !executeQueue.isEmpty() || !sleepQueue.isEmpty() + } + + needsReschedule = true + } + + private[this] final class SleepTask( + val at: Long, + val runnable: Runnable + ) extends Runnable + with Comparable[SleepTask] { + + def run(): Unit = { + sleepQueue.remove(this) + () + } + + def compareTo(that: SleepTask): Int = + java.lang.Long.compare(this.at, that.at) + } + +} + +private object EventLoopExecutorScheduler { + lazy val global = new EventLoopExecutorScheduler(64, SleepSystem) +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index 42c0d19b1c..99c4d303a0 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -20,9 +20,9 @@ import scala.concurrent.ExecutionContext private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type => - def defaultComputeExecutionContext: ExecutionContext = QueueExecutorScheduler + def defaultComputeExecutionContext: ExecutionContext = EventLoopExecutorScheduler.global - def defaultScheduler: Scheduler = QueueExecutorScheduler + def defaultScheduler: Scheduler = EventLoopExecutorScheduler.global private[this] var _global: IORuntime = null diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index d111ce950c..d8db322fa9 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -17,65 +17,38 @@ package cats.effect package unsafe -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ -import scala.scalanative.libc.errno -import scala.scalanative.meta.LinktimeInfo -import scala.scalanative.unsafe._ -import scala.util.control.NonFatal - -import java.util.{ArrayDeque, PriorityQueue} +@deprecated("Use default runtime with a custom PollingSystem", "3.5.0") abstract class PollingExecutorScheduler(pollEvery: Int) extends ExecutionContextExecutor - with Scheduler { - - private[this] var needsReschedule: Boolean = true - - private[this] val executeQueue: ArrayDeque[Runnable] = new ArrayDeque - private[this] val sleepQueue: PriorityQueue[SleepTask] = new PriorityQueue - - private[this] val noop: Runnable = () => () - - private[this] def scheduleIfNeeded(): Unit = if (needsReschedule) { - ExecutionContext.global.execute(() => loop()) - needsReschedule = false - } + with Scheduler { outer => + + private[this] val loop = new EventLoopExecutorScheduler( + pollEvery, + new PollingSystem { + type Poller = outer.type + def makePoller(): Poller = outer + def close(poller: Poller): Unit = () + def poll(poller: Poller, nanos: Long): Boolean = + if (nanos == -1) outer.poll(Duration.Inf) else outer.poll(nanos.nanos) + } + ) - final def execute(runnable: Runnable): Unit = { - scheduleIfNeeded() - executeQueue.addLast(runnable) - } + final def execute(runnable: Runnable): Unit = + loop.execute(runnable) final def sleep(delay: FiniteDuration, task: Runnable): Runnable = - if (delay <= Duration.Zero) { - execute(task) - noop - } else { - scheduleIfNeeded() - val now = monotonicNanos() - val sleepTask = new SleepTask(now + delay.toNanos, task) - sleepQueue.offer(sleepTask) - sleepTask - } + loop.sleep(delay, task) - def reportFailure(t: Throwable): Unit = t.printStackTrace() + def reportFailure(t: Throwable): Unit = loop.reportFailure(t) - def nowMillis() = System.currentTimeMillis() + def nowMillis() = loop.nowMillis() - override def nowMicros(): Long = - if (LinktimeInfo.isFreeBSD || LinktimeInfo.isLinux || LinktimeInfo.isMac) { - import scala.scalanative.posix.time._ - import scala.scalanative.posix.timeOps._ - val ts = stackalloc[timespec]() - if (clock_gettime(CLOCK_REALTIME, ts) != 0) - throw new RuntimeException(s"clock_gettime: ${errno.errno}") - ts.tv_sec * 1000000 + ts.tv_nsec / 1000 - } else { - super.nowMicros() - } + override def nowMicros(): Long = loop.nowMicros() - def monotonicNanos() = System.nanoTime() + def monotonicNanos() = loop.monotonicNanos() /** * @param timeout @@ -90,65 +63,4 @@ abstract class PollingExecutorScheduler(pollEvery: Int) */ protected def poll(timeout: Duration): Boolean - private[this] def loop(): Unit = { - needsReschedule = false - - var continue = true - - while (continue) { - // execute the timers - val now = monotonicNanos() - while (!sleepQueue.isEmpty() && sleepQueue.peek().at <= now) { - val task = sleepQueue.poll() - try task.runnable.run() - catch { - case t if NonFatal(t) => reportFailure(t) - case t: Throwable => IOFiber.onFatalFailure(t) - } - } - - // do up to pollEvery tasks - var i = 0 - while (i < pollEvery && !executeQueue.isEmpty()) { - val runnable = executeQueue.poll() - try runnable.run() - catch { - case t if NonFatal(t) => reportFailure(t) - case t: Throwable => IOFiber.onFatalFailure(t) - } - i += 1 - } - - // finally we poll - val timeout = - if (!executeQueue.isEmpty()) - Duration.Zero - else if (!sleepQueue.isEmpty()) - Math.max(sleepQueue.peek().at - monotonicNanos(), 0).nanos - else - Duration.Inf - - val needsPoll = poll(timeout) - - continue = needsPoll || !executeQueue.isEmpty() || !sleepQueue.isEmpty() - } - - needsReschedule = true - } - - private[this] final class SleepTask( - val at: Long, - val runnable: Runnable - ) extends Runnable - with Comparable[SleepTask] { - - def run(): Unit = { - sleepQueue.remove(this) - () - } - - def compareTo(that: SleepTask): Int = - java.lang.Long.compare(this.at, that.at) - } - } diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala new file mode 100644 index 0000000000..e4a53b5eb6 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +abstract class PollingSystem { + + type Poller + + def makePoller(): Poller + + def close(poller: Poller): Unit + + /** + * @param nanos + * the maximum duration for which to block, where `nanos == -1` indicates to block + * indefinitely. ''However'', if `timeout == -1` and there are no remaining events to poll + * for, this method should return `false` immediately. This is unfortunate but necessary so + * that the `EventLoop` can yield to the Scala Native global `ExecutionContext` which is + * currently hard-coded into every test framework, including JUnit, MUnit, and specs2. + * + * @return + * whether poll should be called again (i.e., there are more events to be polled) + */ + def poll(poller: Poller, nanos: Long): Boolean + +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/SchedulerCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/unsafe/SchedulerCompanionPlatform.scala index f6e4964808..e6fe2d258c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SchedulerCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SchedulerCompanionPlatform.scala @@ -18,6 +18,7 @@ package cats.effect.unsafe private[unsafe] abstract class SchedulerCompanionPlatform { this: Scheduler.type => - def createDefaultScheduler(): (Scheduler, () => Unit) = (QueueExecutorScheduler, () => ()) + def createDefaultScheduler(): (Scheduler, () => Unit) = + (EventLoopExecutorScheduler.global, () => ()) } diff --git a/core/native/src/main/scala/cats/effect/unsafe/QueueExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala similarity index 66% rename from core/native/src/main/scala/cats/effect/unsafe/QueueExecutorScheduler.scala rename to core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index c53036b5dc..247321cae1 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/QueueExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -14,18 +14,20 @@ * limitations under the License. */ -package cats.effect.unsafe +package cats.effect +package unsafe -import scala.concurrent.duration._ +object SleepSystem extends PollingSystem { -// JVM WSTP sets ExternalQueueTicks = 64 so we steal it here -private[effect] object QueueExecutorScheduler extends PollingExecutorScheduler(64) { + type Poller = Unit - def poll(timeout: Duration): Boolean = { - if (timeout != Duration.Zero && timeout.isFinite) { - val nanos = timeout.toNanos + def makePoller(): Poller = () + + def close(poller: Poller): Unit = () + + def poll(poller: Poller, nanos: Long): Boolean = { + if (nanos > 0) Thread.sleep(nanos / 1000000, (nanos % 1000000).toInt) - } false } From 727bfe891c7c50986413c2ba5fb453b97cc45b15 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 11 Dec 2022 05:55:18 +0000 Subject: [PATCH 02/31] Add `FileDescriptorPoller` abstraction --- .../effect/unsafe/FileDescriptorPoller.scala | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala diff --git a/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala b/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala new file mode 100644 index 0000000000..4765116954 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +trait FileDescriptorPoller { + + /** + * Registers a callback to be notified of read- and write-ready events on a file descriptor. + * Produces a runnable which unregisters the file descriptor. + * + * 1. It is the responsibility of the caller to set the file descriptor to non-blocking + * mode. + * 1. It is the responsibility of the caller to unregister the file descriptor when they are + * done. + * 1. A file descriptor should be registered at most once. To modify a registration, you + * must unregister and re-register the file descriptor. + * 1. The callback may be invoked "spuriously" claiming that a file descriptor is read- or + * write-ready when in fact it is not. You should be prepared to handle this. + * 1. The callback will be invoked at least once when the file descriptor transitions from + * blocked to read- or write-ready. You may additionally receive zero or more reminders + * of its readiness. However, you should not rely on any further callbacks until after + * the file descriptor has become blocked again. + */ + def registerFileDescriptor( + fileDescriptor: Int, + readReadyEvents: Boolean, + writeReadyEvents: Boolean)( + cb: FileDescriptorPoller.Callback + ): Runnable + +} + +object FileDescriptorPoller { + trait Callback { + def apply(readReady: Boolean, writeReady: Boolean): Unit + } +} From 9a8f7edfd5908f2f4e096f3d1093ee635d036e68 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 11 Dec 2022 08:22:16 +0000 Subject: [PATCH 03/31] Add `EpollSystem` and `KqueueSystem` Co-authored-by: Lee Tibbert --- .../cats/effect/unsafe/EpollSystem.scala | 159 ++++++++++++ .../scala/cats/effect/unsafe/EventLoop.scala | 2 +- .../unsafe/EventLoopExecutorScheduler.scala | 13 +- .../effect/unsafe/FileDescriptorPoller.scala | 14 +- .../cats/effect/unsafe/KqueueSystem.scala | 240 ++++++++++++++++++ .../unsafe/PollingExecutorScheduler.scala | 2 +- .../cats/effect/unsafe/PollingSystem.scala | 2 +- .../cats/effect/unsafe/SleepSystem.scala | 2 +- 8 files changed, 427 insertions(+), 7 deletions(-) create mode 100644 core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala create mode 100644 core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala new file mode 100644 index 0000000000..e3728a94df --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -0,0 +1,159 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ +import scala.util.control.NonFatal + +import java.io.IOException +import java.util.{Collections, IdentityHashMap, Set} + +import EpollSystem.epoll._ +import EpollSystem.epollImplicits._ + +final class EpollSystem private (maxEvents: Int) extends PollingSystem { + + def makePoller(): Poller = { + val fd = epoll_create1(0) + if (fd == -1) + throw new IOException(fromCString(strerror(errno))) + new Poller(fd, maxEvents) + } + + def close(poller: Poller): Unit = poller.close() + + def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = + poller.poll(nanos, reportFailure) + + final class Poller private[EpollSystem] (private[EpollSystem] val epfd: Int, maxEvents: Int) + extends FileDescriptorPoller { + + private[this] val callbacks: Set[FileDescriptorPoller.Callback] = + Collections.newSetFromMap(new IdentityHashMap) + + private[EpollSystem] def close(): Unit = + if (unistd.close(epfd) != 0) + throw new IOException(fromCString(strerror(errno))) + + private[EpollSystem] def poll(timeout: Long, reportFailure: Throwable => Unit): Boolean = { + val noCallbacks = callbacks.isEmpty() + + if (timeout <= 0 && noCallbacks) + false // nothing to do here + else { + val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt + + val events = stackalloc[epoll_event](maxEvents.toUInt) + + val triggeredEvents = epoll_wait(epfd, events, maxEvents, timeoutMillis) + + if (triggeredEvents >= 0) { + var i = 0 + while (i < triggeredEvents) { + val event = events + i.toLong + val cb = FileDescriptorPoller.Callback.fromPtr(event.data) + try { + val e = event.events.toInt + val readReady = (e & EPOLLIN) != 0 + val writeReady = (e & EPOLLOUT) != 0 + cb.notifyFileDescriptorEvents(readReady, writeReady) + } catch { + case ex if NonFatal(ex) => reportFailure(ex) + } + i += 1 + } + } else { + throw new IOException(fromCString(strerror(errno))) + } + + !callbacks.isEmpty() + } + } + + def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)( + cb: FileDescriptorPoller.Callback): Runnable = { + val event = stackalloc[epoll_event]() + event.events = + (EPOLLET | (if (reads) EPOLLIN else 0) | (if (writes) EPOLLOUT else 0)).toUInt + event.data = FileDescriptorPoller.Callback.toPtr(cb) + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event) != 0) + throw new IOException(fromCString(strerror(errno))) + callbacks.add(cb) + + () => { + callbacks.remove(cb) + if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, null) != 0) + throw new IOException(fromCString(strerror(errno))) + } + } + } + +} + +object EpollSystem { + def apply(maxEvents: Int): EpollSystem = new EpollSystem(maxEvents) + + @extern + private[unsafe] object epoll { + + final val EPOLL_CTL_ADD = 1 + final val EPOLL_CTL_DEL = 2 + final val EPOLL_CTL_MOD = 3 + + final val EPOLLIN = 0x001 + final val EPOLLOUT = 0x004 + final val EPOLLONESHOT = 1 << 30 + final val EPOLLET = 1 << 31 + + type epoll_event + type epoll_data_t = Ptr[Byte] + + def epoll_create1(flags: Int): Int = extern + + def epoll_ctl(epfd: Int, op: Int, fd: Int, event: Ptr[epoll_event]): Int = extern + + def epoll_wait(epfd: Int, events: Ptr[epoll_event], maxevents: Int, timeout: Int): Int = + extern + + } + + private[unsafe] object epollImplicits { + + implicit final class epoll_eventOps(epoll_event: Ptr[epoll_event]) { + def events: CUnsignedInt = !(epoll_event.asInstanceOf[Ptr[CUnsignedInt]]) + def events_=(events: CUnsignedInt): Unit = + !(epoll_event.asInstanceOf[Ptr[CUnsignedInt]]) = events + + def data: epoll_data_t = + !((epoll_event.asInstanceOf[Ptr[Byte]] + sizeof[CUnsignedInt]) + .asInstanceOf[Ptr[epoll_data_t]]) + def data_=(data: epoll_data_t): Unit = + !((epoll_event.asInstanceOf[Ptr[Byte]] + sizeof[CUnsignedInt]) + .asInstanceOf[Ptr[epoll_data_t]]) = data + } + + implicit val epoll_eventTag: Tag[epoll_event] = + Tag.materializeCArrayTag[Byte, Nat.Digit2[Nat._1, Nat._2]].asInstanceOf[Tag[epoll_event]] + + } +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala index 181c74ea07..78cd33cee6 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala @@ -20,7 +20,7 @@ package unsafe import scala.concurrent.ExecutionContext trait EventLoop[Poller] extends ExecutionContext { - + def poller(): Poller } diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala index acf22401e4..9b1b55b271 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala @@ -120,7 +120,7 @@ private final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSy else -1 - val needsPoll = system.poll(_poller, timeout) + val needsPoll = system.poll(_poller, timeout, reportFailure) continue = needsPoll || !executeQueue.isEmpty() || !sleepQueue.isEmpty() } @@ -146,5 +146,14 @@ private final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSy } private object EventLoopExecutorScheduler { - lazy val global = new EventLoopExecutorScheduler(64, SleepSystem) + lazy val global = { + val system = + if (LinktimeInfo.isLinux) + EpollSystem(64) + else if (LinktimeInfo.isMac) + KqueueSystem(64) + else + SleepSystem + new EventLoopExecutorScheduler(64, system) + } } diff --git a/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala b/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala index 4765116954..df8a0e3e00 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala @@ -17,6 +17,10 @@ package cats.effect package unsafe +import scala.scalanative.annotation.alwaysinline +import scala.scalanative.runtime._ +import scala.scalanative.unsafe._ + trait FileDescriptorPoller { /** @@ -47,6 +51,14 @@ trait FileDescriptorPoller { object FileDescriptorPoller { trait Callback { - def apply(readReady: Boolean, writeReady: Boolean): Unit + def notifyFileDescriptorEvents(readReady: Boolean, writeReady: Boolean): Unit + } + + object Callback { + @alwaysinline private[unsafe] def toPtr(cb: Callback): Ptr[Byte] = + fromRawPtr(Intrinsics.castObjectToRawPtr(cb)) + + @alwaysinline private[unsafe] def fromPtr[A](ptr: Ptr[Byte]): Callback = + Intrinsics.castRawPtrToObject(toRawPtr(ptr)).asInstanceOf[Callback] } } diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala new file mode 100644 index 0000000000..4b40ff8bb7 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -0,0 +1,240 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import scala.collection.mutable.LongMap +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.time._ +import scala.scalanative.posix.timeOps._ +import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ +import scala.util.control.NonFatal + +import java.io.IOException +import java.util.ArrayDeque + +import KqueueSystem.EvAdd +import KqueueSystem.event._ +import KqueueSystem.eventImplicits._ + +final class KqueueSystem private (maxEvents: Int) extends PollingSystem { + + def makePoller(): Poller = { + val fd = kqueue() + if (fd == -1) + throw new IOException(fromCString(strerror(errno))) + new Poller(fd, maxEvents) + } + + def close(poller: Poller): Unit = poller.close() + + def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = + poller.poll(nanos, reportFailure) + + final class Poller private[KqueueSystem] (private[KqueueSystem] val kqfd: Int, maxEvents: Int) + extends FileDescriptorPoller { + + private[this] val changes: ArrayDeque[EvAdd] = new ArrayDeque + private[this] val callbacks: LongMap[FileDescriptorPoller.Callback] = new LongMap + + private[KqueueSystem] def close(): Unit = + if (unistd.close(kqfd) != 0) + throw new IOException(fromCString(strerror(errno))) + + private[KqueueSystem] def poll(timeout: Long, reportFailure: Throwable => Unit): Boolean = { + val noCallbacks = callbacks.isEmpty + + // pre-process the changes to filter canceled ones + val changelist = stackalloc[kevent64_s](changes.size().toLong) + var change = changelist + var changeCount = 0 + while (!changes.isEmpty()) { + val evAdd = changes.poll() + if (!evAdd.canceled) { + change.ident = evAdd.fd.toULong + change.filter = evAdd.filter + change.flags = (EV_ADD | EV_CLEAR).toUShort + change.udata = FileDescriptorPoller.Callback.toPtr(evAdd.cb) + change += 1 + changeCount += 1 + } + } + + if (timeout <= 0 && noCallbacks && changeCount == 0) + false // nothing to do here + else { + + val timeoutSpec = + if (timeout <= 0) null + else { + val ts = stackalloc[timespec]() + ts.tv_sec = timeout / 1000000000 + ts.tv_nsec = timeout % 1000000000 + ts + } + + val eventlist = stackalloc[kevent64_s](maxEvents.toLong) + val flags = (if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE).toUInt + val triggeredEvents = + kevent64(kqfd, changelist, changeCount, eventlist, maxEvents, flags, timeoutSpec) + + if (triggeredEvents >= 0) { + var i = 0 + var event = eventlist + while (i < triggeredEvents) { + if ((event.flags.toLong & EV_ERROR) != 0) { + + // TODO it would be interesting to propagate this failure via the callback + reportFailure( + new RuntimeException( + s"kevent64: flags=${event.flags.toHexString} errno=${event.data}" + ) + ) + + } else if (callbacks.contains(event.ident.toLong)) { + val filter = event.filter + val cb = FileDescriptorPoller.Callback.fromPtr(event.udata) + + try { + cb.notifyFileDescriptorEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) + } catch { + case NonFatal(ex) => + reportFailure(ex) + } + } + + i += 1 + event += 1 + } + } else { + throw new IOException(fromCString(strerror(errno))) + } + + !changes.isEmpty() || callbacks.nonEmpty + } + } + + def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)( + cb: FileDescriptorPoller.Callback): Runnable = { + + val readEvent = + if (reads) + new EvAdd(fd, EVFILT_READ, cb) + else null + + val writeEvent = + if (writes) + new EvAdd(fd, EVFILT_WRITE, cb) + else null + + if (readEvent != null) + changes.add(readEvent) + if (writeEvent != null) + changes.add(writeEvent) + + callbacks(fd.toLong) = cb + + () => { + // we do not need to explicitly unregister the fd with the kqueue, + // b/c it will be unregistered automatically when the fd is closed + + // release the callback, so it can be GCed + callbacks.remove(fd.toLong) + + // cancel the events, such that if they are currently pending in the + // changes queue awaiting registration, they will not be registered + if (readEvent != null) readEvent.cancel() + if (writeEvent != null) writeEvent.cancel() + } + } + + } + +} + +object KqueueSystem { + def apply(maxEvents: Int): KqueueSystem = new KqueueSystem(maxEvents) + + private final class EvAdd( + val fd: Int, + val filter: Short, + val cb: FileDescriptorPoller.Callback + ) { + var canceled = false + def cancel() = canceled = true + } + + @extern + private[unsafe] object event { + // Derived from https://opensource.apple.com/source/xnu/xnu-7195.81.3/bsd/sys/event.h.auto.html + + final val EVFILT_READ = -1 + final val EVFILT_WRITE = -2 + + final val KEVENT_FLAG_NONE = 0x000000 + final val KEVENT_FLAG_IMMEDIATE = 0x000001 + + final val EV_ADD = 0x0001 + final val EV_DELETE = 0x0002 + final val EV_CLEAR = 0x0020 + final val EV_ERROR = 0x4000 + + type kevent64_s + + def kqueue(): CInt = extern + + def kevent64( + kq: CInt, + changelist: Ptr[kevent64_s], + nchanges: CInt, + eventlist: Ptr[kevent64_s], + nevents: CInt, + flags: CUnsignedInt, + timeout: Ptr[timespec] + ): CInt = extern + + } + + private[unsafe] object eventImplicits { + + implicit final class kevent64_sOps(kevent64_s: Ptr[kevent64_s]) { + def ident: CUnsignedLongInt = !(kevent64_s.asInstanceOf[Ptr[CUnsignedLongInt]]) + def ident_=(ident: CUnsignedLongInt): Unit = + !(kevent64_s.asInstanceOf[Ptr[CUnsignedLongInt]]) = ident + + def filter: CShort = !(kevent64_s.asInstanceOf[Ptr[CShort]] + 4) + def filter_=(filter: CShort): Unit = + !(kevent64_s.asInstanceOf[Ptr[CShort]] + 4) = filter + + def flags: CUnsignedShort = !(kevent64_s.asInstanceOf[Ptr[CUnsignedShort]] + 5) + def flags_=(flags: CUnsignedShort): Unit = + !(kevent64_s.asInstanceOf[Ptr[CUnsignedShort]] + 5) = flags + + def data: CLong = !(kevent64_s.asInstanceOf[Ptr[CLong]] + 2) + + def udata: Ptr[Byte] = !(kevent64_s.asInstanceOf[Ptr[Ptr[Byte]]] + 3) + def udata_=(udata: Ptr[Byte]): Unit = + !(kevent64_s.asInstanceOf[Ptr[Ptr[Byte]]] + 3) = udata + } + + implicit val kevent64_sTag: Tag[kevent64_s] = + Tag.materializeCArrayTag[Byte, Nat.Digit2[Nat._4, Nat._8]].asInstanceOf[Tag[kevent64_s]] + } +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index d8db322fa9..f4eaef586f 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -31,7 +31,7 @@ abstract class PollingExecutorScheduler(pollEvery: Int) type Poller = outer.type def makePoller(): Poller = outer def close(poller: Poller): Unit = () - def poll(poller: Poller, nanos: Long): Boolean = + def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = if (nanos == -1) outer.poll(Duration.Inf) else outer.poll(nanos.nanos) } ) diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index e4a53b5eb6..6f1cdef6ae 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -36,6 +36,6 @@ abstract class PollingSystem { * @return * whether poll should be called again (i.e., there are more events to be polled) */ - def poll(poller: Poller, nanos: Long): Boolean + def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean } diff --git a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index 247321cae1..d1e0b1c399 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -25,7 +25,7 @@ object SleepSystem extends PollingSystem { def close(poller: Poller): Unit = () - def poll(poller: Poller, nanos: Long): Boolean = { + def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { if (nanos > 0) Thread.sleep(nanos / 1000000, (nanos % 1000000).toInt) false From 08894264c1ddbd6ec4e74dc3edc0656200266d68 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 11 Dec 2022 08:51:04 +0000 Subject: [PATCH 04/31] Add test for `Scheduler#sleep` --- .../src/test/scala/cats/effect/unsafe/SchedulerSpec.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/SchedulerSpec.scala b/tests/native/src/test/scala/cats/effect/unsafe/SchedulerSpec.scala index c55d919868..9031819c19 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/SchedulerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/SchedulerSpec.scala @@ -17,6 +17,8 @@ package cats.effect package unsafe +import scala.concurrent.duration._ + class SchedulerSpec extends BaseSpec { "Default scheduler" should { @@ -27,12 +29,18 @@ class SchedulerSpec extends BaseSpec { deltas = times.map(_ - start) } yield deltas.exists(_.toMicros % 1000 != 0) } + "correctly calculate real time" in real { IO.realTime.product(IO(System.currentTimeMillis())).map { case (realTime, currentTime) => (realTime.toMillis - currentTime) should be_<=(1L) } } + + "sleep for correct duration" in real { + val duration = 1500.millis + IO.sleep(duration).timed.map(_._1 should be_>=(duration)) + } } } From 4250049131bc98efa945b2817945758b0d2158a3 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 11 Dec 2022 10:03:07 +0000 Subject: [PATCH 05/31] Add `FileDescriptorPollerSpec` Co-authored-by: Lorenzo Gabriele --- .../unsafe/FileDescriptorPollerSpec.scala | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala diff --git a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala new file mode 100644 index 0000000000..503fea32b4 --- /dev/null +++ b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.effect.std.{Dispatcher, Queue} +import cats.syntax.all._ + +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.unistd._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import java.io.IOException + +class FileDescriptorPollerSpec extends BaseSpec { + + def mkPipe: Resource[IO, (Int, Int)] = + Resource.make { + IO { + val fd = stackalloc[CInt](2) + if (pipe(fd) != 0) + throw new IOException(fromCString(strerror(errno))) + else + (fd(0), fd(1)) + } + } { + case (fd0, fd1) => + IO { + close(fd0) + close(fd1) + () + } + } + + def onRead(poller: FileDescriptorPoller, fd: Int, cb: IO[Unit]): Resource[IO, Unit] = + Dispatcher + .sequential[IO] + .flatMap { dispatcher => + Resource.make { + IO { + poller.registerFileDescriptor(fd, true, false) { (readReady, _) => + dispatcher.unsafeRunAndForget(cb.whenA(readReady)) + } + } + }(unregister => IO(unregister.run())) + } + .void + + "FileDescriptorPoller" should { + "notify read-ready events" in real { + mkPipe.use { + case (readFd, writeFd) => + IO.eventLoop[FileDescriptorPoller].map(_.get.poller()).flatMap { poller => + Queue.unbounded[IO, Unit].flatMap { queue => + onRead(poller, readFd, queue.offer(())).surround { + for { + buf <- IO(new Array[Byte](4)) + _ <- IO(write(writeFd, Array[Byte](1, 2, 3).at(0), 3.toULong)) + _ <- queue.take + _ <- IO(read(readFd, buf.at(0), 3.toULong)) + _ <- IO(write(writeFd, Array[Byte](42).at(0), 1.toULong)) + _ <- queue.take + _ <- IO(read(readFd, buf.at(3), 1.toULong)) + } yield buf.toList must be_==(List[Byte](1, 2, 3, 42)) + } + } + } + } + } + } + +} From 0b9ac0223edcb1f5ee32fd6895dbbf391b19f4ed Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 11 Dec 2022 21:03:22 +0000 Subject: [PATCH 06/31] Consistent error-handling in `KqueueSystem` --- .../src/main/scala/cats/effect/unsafe/KqueueSystem.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index 4b40ff8bb7..7b9a31d64c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -102,11 +102,7 @@ final class KqueueSystem private (maxEvents: Int) extends PollingSystem { if ((event.flags.toLong & EV_ERROR) != 0) { // TODO it would be interesting to propagate this failure via the callback - reportFailure( - new RuntimeException( - s"kevent64: flags=${event.flags.toHexString} errno=${event.data}" - ) - ) + reportFailure(new IOException(fromCString(strerror(event.data.toInt)))) } else if (callbacks.contains(event.ident.toLong)) { val filter = event.filter From 956734ab8cda9346622e6ebe10e43c807aadcb37 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 11 Dec 2022 22:53:52 +0000 Subject: [PATCH 07/31] Make `pollingSystem` configurable in `IOApp` --- .../src/main/scala/cats/effect/IOApp.scala | 24 ++++++++++++++----- .../unsafe/IORuntimeCompanionPlatform.scala | 3 +++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/IOApp.scala b/core/native/src/main/scala/cats/effect/IOApp.scala index 182fc874b9..1623a3ba55 100644 --- a/core/native/src/main/scala/cats/effect/IOApp.scala +++ b/core/native/src/main/scala/cats/effect/IOApp.scala @@ -20,6 +20,7 @@ import cats.effect.metrics.NativeCpuStarvationMetrics import scala.concurrent.CancellationException import scala.concurrent.duration._ +import scala.scalanative.meta.LinktimeInfo /** * The primary entry point to a Cats Effect application. Extend this trait rather than defining @@ -165,6 +166,21 @@ trait IOApp { */ protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig() + /** + * The [[unsafe.PollingSystem]] used by the [[runtime]] which will evaluate the [[IO]] + * produced by `run`. It is very unlikely that users will need to override this method. + * + * [[unsafe.PollingSystem]] implementors may provide their own flavors of [[IOApp]] that + * override this method. + */ + protected def pollingSystem: unsafe.PollingSystem = + if (LinktimeInfo.isLinux) + unsafe.EpollSystem(64) + else if (LinktimeInfo.isMac) + unsafe.KqueueSystem(64) + else + unsafe.SleepSystem + /** * The entry point for your application. Will be called by the runtime when the process is * started. If the underlying runtime supports it, any arguments passed to the process will be @@ -186,12 +202,8 @@ trait IOApp { import unsafe.IORuntime val installed = IORuntime installGlobal { - IORuntime( - IORuntime.defaultComputeExecutionContext, - IORuntime.defaultComputeExecutionContext, - IORuntime.defaultScheduler, - () => (), - runtimeConfig) + val loop = IORuntime.createEventLoop(pollingSystem) + IORuntime(loop, loop, loop, () => (), runtimeConfig) } _runtime = IORuntime.global diff --git a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index 99c4d303a0..78c1594cfe 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -24,6 +24,9 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type def defaultScheduler: Scheduler = EventLoopExecutorScheduler.global + def createEventLoop(system: PollingSystem): ExecutionContext with Scheduler = + new EventLoopExecutorScheduler(64, system) + private[this] var _global: IORuntime = null private[effect] def installGlobal(global: => IORuntime): Boolean = { From dda54b7f9dea483d7e2164a6a864aa19cd414dda Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 00:01:20 +0000 Subject: [PATCH 08/31] Nowarn unuseds --- .../native/src/main/scala/cats/effect/unsafe/EpollSystem.scala | 3 +++ .../src/main/scala/cats/effect/unsafe/KqueueSystem.scala | 3 +++ 2 files changed, 6 insertions(+) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index e3728a94df..db33cc5d5f 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -17,6 +17,8 @@ package cats.effect package unsafe +import org.typelevel.scalaccompat.annotation._ + import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.unistd @@ -113,6 +115,7 @@ final class EpollSystem private (maxEvents: Int) extends PollingSystem { object EpollSystem { def apply(maxEvents: Int): EpollSystem = new EpollSystem(maxEvents) + @nowarn212 @extern private[unsafe] object epoll { diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index 7b9a31d64c..d0e5f3873b 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -17,6 +17,8 @@ package cats.effect package unsafe +import org.typelevel.scalaccompat.annotation._ + import scala.collection.mutable.LongMap import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ @@ -177,6 +179,7 @@ object KqueueSystem { def cancel() = canceled = true } + @nowarn212 @extern private[unsafe] object event { // Derived from https://opensource.apple.com/source/xnu/xnu-7195.81.3/bsd/sys/event.h.auto.html From 1206b273c7eaddb335b463b9f32dc43d241035e5 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 19:29:27 +0000 Subject: [PATCH 09/31] Revise the fd poller spec --- .../effect/unsafe/FileDescriptorPollerSpec.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala index 503fea32b4..ba9db40356 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala @@ -48,13 +48,13 @@ class FileDescriptorPollerSpec extends BaseSpec { } } - def onRead(poller: FileDescriptorPoller, fd: Int, cb: IO[Unit]): Resource[IO, Unit] = + def onRead(loop: EventLoop[FileDescriptorPoller], fd: Int, cb: IO[Unit]): Resource[IO, Unit] = Dispatcher .sequential[IO] .flatMap { dispatcher => Resource.make { IO { - poller.registerFileDescriptor(fd, true, false) { (readReady, _) => + loop.poller().registerFileDescriptor(fd, true, false) { (readReady, _) => dispatcher.unsafeRunAndForget(cb.whenA(readReady)) } } @@ -66,17 +66,17 @@ class FileDescriptorPollerSpec extends BaseSpec { "notify read-ready events" in real { mkPipe.use { case (readFd, writeFd) => - IO.eventLoop[FileDescriptorPoller].map(_.get.poller()).flatMap { poller => + IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => Queue.unbounded[IO, Unit].flatMap { queue => - onRead(poller, readFd, queue.offer(())).surround { + onRead(loop, readFd, queue.offer(())).surround { for { buf <- IO(new Array[Byte](4)) _ <- IO(write(writeFd, Array[Byte](1, 2, 3).at(0), 3.toULong)) - _ <- queue.take - _ <- IO(read(readFd, buf.at(0), 3.toULong)) + .background + .surround(queue.take *> IO(read(readFd, buf.at(0), 3.toULong))) _ <- IO(write(writeFd, Array[Byte](42).at(0), 1.toULong)) - _ <- queue.take - _ <- IO(read(readFd, buf.at(3), 1.toULong)) + .background + .surround(queue.take *> IO(read(readFd, buf.at(3), 1.toULong))) } yield buf.toList must be_==(List[Byte](1, 2, 3, 42)) } } From e0c4ec33962551b2b241562345be8f2010517d70 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 20:05:05 +0000 Subject: [PATCH 10/31] Remove `maxEvents` config from `EpollSystem` --- .../src/main/scala/cats/effect/IOApp.scala | 2 +- .../cats/effect/unsafe/EpollSystem.scala | 75 ++++++++++--------- .../unsafe/EventLoopExecutorScheduler.scala | 2 +- 3 files changed, 43 insertions(+), 36 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/IOApp.scala b/core/native/src/main/scala/cats/effect/IOApp.scala index 1623a3ba55..9e979d8723 100644 --- a/core/native/src/main/scala/cats/effect/IOApp.scala +++ b/core/native/src/main/scala/cats/effect/IOApp.scala @@ -175,7 +175,7 @@ trait IOApp { */ protected def pollingSystem: unsafe.PollingSystem = if (LinktimeInfo.isLinux) - unsafe.EpollSystem(64) + unsafe.EpollSystem else if (LinktimeInfo.isMac) unsafe.KqueueSystem(64) else diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index db33cc5d5f..1e327f303e 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -19,6 +19,7 @@ package unsafe import org.typelevel.scalaccompat.annotation._ +import scala.annotation.tailrec import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.unistd @@ -29,16 +30,18 @@ import scala.util.control.NonFatal import java.io.IOException import java.util.{Collections, IdentityHashMap, Set} -import EpollSystem.epoll._ -import EpollSystem.epollImplicits._ +object EpollSystem extends PollingSystem { -final class EpollSystem private (maxEvents: Int) extends PollingSystem { + import epoll._ + import epollImplicits._ + + private[this] final val MaxEvents = 64 def makePoller(): Poller = { val fd = epoll_create1(0) if (fd == -1) throw new IOException(fromCString(strerror(errno))) - new Poller(fd, maxEvents) + new Poller(fd) } def close(poller: Poller): Unit = poller.close() @@ -46,8 +49,7 @@ final class EpollSystem private (maxEvents: Int) extends PollingSystem { def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = poller.poll(nanos, reportFailure) - final class Poller private[EpollSystem] (private[EpollSystem] val epfd: Int, maxEvents: Int) - extends FileDescriptorPoller { + final class Poller private[EpollSystem] (epfd: Int) extends FileDescriptorPoller { private[this] val callbacks: Set[FileDescriptorPoller.Callback] = Collections.newSetFromMap(new IdentityHashMap) @@ -62,31 +64,41 @@ final class EpollSystem private (maxEvents: Int) extends PollingSystem { if (timeout <= 0 && noCallbacks) false // nothing to do here else { - val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt - - val events = stackalloc[epoll_event](maxEvents.toUInt) - - val triggeredEvents = epoll_wait(epfd, events, maxEvents, timeoutMillis) - - if (triggeredEvents >= 0) { - var i = 0 - while (i < triggeredEvents) { - val event = events + i.toLong - val cb = FileDescriptorPoller.Callback.fromPtr(event.data) - try { - val e = event.events.toInt - val readReady = (e & EPOLLIN) != 0 - val writeReady = (e & EPOLLOUT) != 0 - cb.notifyFileDescriptorEvents(readReady, writeReady) - } catch { - case ex if NonFatal(ex) => reportFailure(ex) + val events = stackalloc[epoll_event](MaxEvents.toLong) + + @tailrec + def processEvents(timeout: Int): Unit = { + + val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout) + + if (triggeredEvents >= 0) { + var i = 0 + while (i < triggeredEvents) { + val event = events + i.toLong + val cb = FileDescriptorPoller.Callback.fromPtr(event.data) + try { + val e = event.events.toInt + val readReady = (e & EPOLLIN) != 0 + val writeReady = (e & EPOLLOUT) != 0 + cb.notifyFileDescriptorEvents(readReady, writeReady) + } catch { + case ex if NonFatal(ex) => reportFailure(ex) + } + i += 1 } - i += 1 + } else { + throw new IOException(fromCString(strerror(errno))) } - } else { - throw new IOException(fromCString(strerror(errno))) + + if (triggeredEvents >= MaxEvents) + processEvents(0) // drain the ready list + else + () } + val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt + processEvents(timeoutMillis) + !callbacks.isEmpty() } } @@ -110,14 +122,9 @@ final class EpollSystem private (maxEvents: Int) extends PollingSystem { } } -} - -object EpollSystem { - def apply(maxEvents: Int): EpollSystem = new EpollSystem(maxEvents) - @nowarn212 @extern - private[unsafe] object epoll { + private object epoll { final val EPOLL_CTL_ADD = 1 final val EPOLL_CTL_DEL = 2 @@ -140,7 +147,7 @@ object EpollSystem { } - private[unsafe] object epollImplicits { + private object epollImplicits { implicit final class epoll_eventOps(epoll_event: Ptr[epoll_event]) { def events: CUnsignedInt = !(epoll_event.asInstanceOf[Ptr[CUnsignedInt]]) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala index 9b1b55b271..e968c7e295 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala @@ -149,7 +149,7 @@ private object EventLoopExecutorScheduler { lazy val global = { val system = if (LinktimeInfo.isLinux) - EpollSystem(64) + EpollSystem else if (LinktimeInfo.isMac) KqueueSystem(64) else From eeeb3e65b9baf9030ae1c9361fc02173190c7b53 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 20:26:35 +0000 Subject: [PATCH 11/31] Remove `maxEvents` config from `KqueueSystem` --- .../src/main/scala/cats/effect/IOApp.scala | 2 +- .../unsafe/EventLoopExecutorScheduler.scala | 2 +- .../cats/effect/unsafe/KqueueSystem.scala | 107 ++++++++++-------- 3 files changed, 64 insertions(+), 47 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/IOApp.scala b/core/native/src/main/scala/cats/effect/IOApp.scala index 9e979d8723..1fc67b36e6 100644 --- a/core/native/src/main/scala/cats/effect/IOApp.scala +++ b/core/native/src/main/scala/cats/effect/IOApp.scala @@ -177,7 +177,7 @@ trait IOApp { if (LinktimeInfo.isLinux) unsafe.EpollSystem else if (LinktimeInfo.isMac) - unsafe.KqueueSystem(64) + unsafe.KqueueSystem else unsafe.SleepSystem diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala index e968c7e295..5c7fe6517e 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala @@ -151,7 +151,7 @@ private object EventLoopExecutorScheduler { if (LinktimeInfo.isLinux) EpollSystem else if (LinktimeInfo.isMac) - KqueueSystem(64) + KqueueSystem else SleepSystem new EventLoopExecutorScheduler(64, system) diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index d0e5f3873b..db8dc4e01f 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -19,6 +19,7 @@ package unsafe import org.typelevel.scalaccompat.annotation._ +import scala.annotation.tailrec import scala.collection.mutable.LongMap import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ @@ -32,17 +33,18 @@ import scala.util.control.NonFatal import java.io.IOException import java.util.ArrayDeque -import KqueueSystem.EvAdd -import KqueueSystem.event._ -import KqueueSystem.eventImplicits._ +final object KqueueSystem extends PollingSystem { -final class KqueueSystem private (maxEvents: Int) extends PollingSystem { + import event._ + import eventImplicits._ + + private final val MaxEvents = 64 def makePoller(): Poller = { val fd = kqueue() if (fd == -1) throw new IOException(fromCString(strerror(errno))) - new Poller(fd, maxEvents) + new Poller(fd) } def close(poller: Poller): Unit = poller.close() @@ -50,8 +52,7 @@ final class KqueueSystem private (maxEvents: Int) extends PollingSystem { def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = poller.poll(nanos, reportFailure) - final class Poller private[KqueueSystem] (private[KqueueSystem] val kqfd: Int, maxEvents: Int) - extends FileDescriptorPoller { + final class Poller private[KqueueSystem] (kqfd: Int) extends FileDescriptorPoller { private[this] val changes: ArrayDeque[EvAdd] = new ArrayDeque private[this] val callbacks: LongMap[FileDescriptorPoller.Callback] = new LongMap @@ -83,6 +84,56 @@ final class KqueueSystem private (maxEvents: Int) extends PollingSystem { false // nothing to do here else { + val eventlist = stackalloc[kevent64_s](MaxEvents.toLong) + + @tailrec + def processEvents(timeout: Ptr[timespec], changeCount: Int, flags: Int): Unit = { + + val triggeredEvents = + kevent64( + kqfd, + changelist, + changeCount, + eventlist, + MaxEvents, + flags.toUInt, + timeout + ) + + if (triggeredEvents >= 0) { + var i = 0 + var event = eventlist + while (i < triggeredEvents) { + if ((event.flags.toLong & EV_ERROR) != 0) { + + // TODO it would be interesting to propagate this failure via the callback + reportFailure(new IOException(fromCString(strerror(event.data.toInt)))) + + } else if (callbacks.contains(event.ident.toLong)) { + val filter = event.filter + val cb = FileDescriptorPoller.Callback.fromPtr(event.udata) + + try { + cb.notifyFileDescriptorEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) + } catch { + case NonFatal(ex) => + reportFailure(ex) + } + } + + i += 1 + event += 1 + } + } else { + throw new IOException(fromCString(strerror(errno))) + } + + if (triggeredEvents >= MaxEvents) + processEvents(null, 0, KEVENT_FLAG_NONE) // drain the ready list + else + () + } + val timeoutSpec = if (timeout <= 0) null else { @@ -92,38 +143,9 @@ final class KqueueSystem private (maxEvents: Int) extends PollingSystem { ts } - val eventlist = stackalloc[kevent64_s](maxEvents.toLong) - val flags = (if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE).toUInt - val triggeredEvents = - kevent64(kqfd, changelist, changeCount, eventlist, maxEvents, flags, timeoutSpec) - - if (triggeredEvents >= 0) { - var i = 0 - var event = eventlist - while (i < triggeredEvents) { - if ((event.flags.toLong & EV_ERROR) != 0) { - - // TODO it would be interesting to propagate this failure via the callback - reportFailure(new IOException(fromCString(strerror(event.data.toInt)))) - - } else if (callbacks.contains(event.ident.toLong)) { - val filter = event.filter - val cb = FileDescriptorPoller.Callback.fromPtr(event.udata) - - try { - cb.notifyFileDescriptorEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) - } catch { - case NonFatal(ex) => - reportFailure(ex) - } - } + val flags = if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE - i += 1 - event += 1 - } - } else { - throw new IOException(fromCString(strerror(errno))) - } + processEvents(timeoutSpec, changeCount, flags) !changes.isEmpty() || callbacks.nonEmpty } @@ -165,11 +187,6 @@ final class KqueueSystem private (maxEvents: Int) extends PollingSystem { } -} - -object KqueueSystem { - def apply(maxEvents: Int): KqueueSystem = new KqueueSystem(maxEvents) - private final class EvAdd( val fd: Int, val filter: Short, @@ -181,7 +198,7 @@ object KqueueSystem { @nowarn212 @extern - private[unsafe] object event { + private object event { // Derived from https://opensource.apple.com/source/xnu/xnu-7195.81.3/bsd/sys/event.h.auto.html final val EVFILT_READ = -1 @@ -211,7 +228,7 @@ object KqueueSystem { } - private[unsafe] object eventImplicits { + private object eventImplicits { implicit final class kevent64_sOps(kevent64_s: Ptr[kevent64_s]) { def ident: CUnsignedLongInt = !(kevent64_s.asInstanceOf[Ptr[CUnsignedLongInt]]) From c4a0a163473308c47eab74c12c9206e96b4460ef Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 20:43:20 +0000 Subject: [PATCH 12/31] Add test for many simultaneous events --- .../unsafe/FileDescriptorPollerSpec.scala | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala index ba9db40356..2083cb1370 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala @@ -17,7 +17,7 @@ package cats.effect package unsafe -import cats.effect.std.{Dispatcher, Queue} +import cats.effect.std.{CountDownLatch, Dispatcher, Queue} import cats.syntax.all._ import scala.scalanative.libc.errno._ @@ -30,20 +30,22 @@ import java.io.IOException class FileDescriptorPollerSpec extends BaseSpec { - def mkPipe: Resource[IO, (Int, Int)] = + case class Pipe(readFd: Int, writeFd: Int) + + def mkPipe: Resource[IO, Pipe] = Resource.make { IO { val fd = stackalloc[CInt](2) if (pipe(fd) != 0) throw new IOException(fromCString(strerror(errno))) else - (fd(0), fd(1)) + Pipe(fd(0), fd(1)) } } { - case (fd0, fd1) => + case Pipe(readFd, writeFd) => IO { - close(fd0) - close(fd1) + close(readFd) + close(writeFd) () } } @@ -63,9 +65,10 @@ class FileDescriptorPollerSpec extends BaseSpec { .void "FileDescriptorPoller" should { + "notify read-ready events" in real { mkPipe.use { - case (readFd, writeFd) => + case Pipe(readFd, writeFd) => IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => Queue.unbounded[IO, Unit].flatMap { queue => onRead(loop, readFd, queue.offer(())).surround { @@ -83,6 +86,20 @@ class FileDescriptorPollerSpec extends BaseSpec { } } } + + "handle lots of simultaneous events" in real { + mkPipe.replicateA(1000).use { pipes => + IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => + CountDownLatch[IO](1000).flatMap { latch => + pipes.traverse_(pipe => onRead(loop, pipe.readFd, latch.release)).surround { + IO { // trigger all the pipes at once + pipes.foreach(pipe => write(pipe.writeFd, Array[Byte](42).at(0), 1.toULong)) + }.background.surround(latch.await.as(true)) + } + } + } + } + } } } From aae6e972724fcb7893f26db8e9e33e69ec57575c Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 21:24:16 +0000 Subject: [PATCH 13/31] Remove redundant `final` --- .../native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index db8dc4e01f..bf6f660a56 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -33,7 +33,7 @@ import scala.util.control.NonFatal import java.io.IOException import java.util.ArrayDeque -final object KqueueSystem extends PollingSystem { +object KqueueSystem extends PollingSystem { import event._ import eventImplicits._ From 457f89c04735feb79ae9410b1c3e777d18f72ef3 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 23:07:29 +0000 Subject: [PATCH 14/31] Update comment --- .../src/main/scala/cats/effect/unsafe/PollingSystem.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index 6f1cdef6ae..56c0f2a50f 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -28,10 +28,10 @@ abstract class PollingSystem { /** * @param nanos * the maximum duration for which to block, where `nanos == -1` indicates to block - * indefinitely. ''However'', if `timeout == -1` and there are no remaining events to poll + * indefinitely. ''However'', if `nanos == -1` and there are no remaining events to poll * for, this method should return `false` immediately. This is unfortunate but necessary so * that the `EventLoop` can yield to the Scala Native global `ExecutionContext` which is - * currently hard-coded into every test framework, including JUnit, MUnit, and specs2. + * currently hard-coded into every test framework, including MUnit, specs2, and Weaver. * * @return * whether poll should be called again (i.e., there are more events to be polled) From 721c2fc46df3e0bdea1c3b7a08a913c14621150b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 23:31:29 +0000 Subject: [PATCH 15/31] Add test for pre-existing readiness --- .../effect/unsafe/FileDescriptorPollerSpec.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala index 2083cb1370..6439a0fe0b 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala @@ -100,6 +100,20 @@ class FileDescriptorPollerSpec extends BaseSpec { } } } + + "notify of pre-existing readiness on registration" in real { + mkPipe.use { + case Pipe(readFd, writeFd) => + IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => + val registerAndWait = IO.deferred[Unit].flatMap { gate => + onRead(loop, readFd, gate.complete(()).void).surround(gate.get) + } + + IO(write(writeFd, Array[Byte](42).at(0), 1.toULong)) *> + registerAndWait *> registerAndWait *> IO.pure(true) + } + } + } } } From 4f9e57b3ee59a9df859f133d1bf59e50473507d9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 12 Dec 2022 23:48:29 +0000 Subject: [PATCH 16/31] Add test for no readiness --- .../effect/unsafe/FileDescriptorPollerSpec.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala index 6439a0fe0b..7145892f4a 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala @@ -20,6 +20,7 @@ package unsafe import cats.effect.std.{CountDownLatch, Dispatcher, Queue} import cats.syntax.all._ +import scala.concurrent.duration._ import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.unistd._ @@ -114,6 +115,19 @@ class FileDescriptorPollerSpec extends BaseSpec { } } } + + "not notify if not ready" in real { + mkPipe.use { + case Pipe(readFd, _) => + IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => + val registerAndWait = IO.deferred[Unit].flatMap { gate => + onRead(loop, readFd, gate.complete(()).void).surround(gate.get) + } + + registerAndWait.as(false).timeoutTo(1.second, IO.pure(true)) + } + } + } } } From a520aee4c0dedebfcb6c9acc57d3ffe051690e3a Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 19 Dec 2022 05:16:48 +0000 Subject: [PATCH 17/31] Reimagine `FileDescriptorPoller` --- .../cats/effect/FileDescriptorPoller.scala | 60 +++++++++++++++++ .../effect/unsafe/FileDescriptorPoller.scala | 64 ------------------- 2 files changed, 60 insertions(+), 64 deletions(-) create mode 100644 core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala delete mode 100644 core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala diff --git a/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala b/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala new file mode 100644 index 0000000000..535d2c041b --- /dev/null +++ b/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import scala.scalanative.annotation.alwaysinline +import scala.scalanative.runtime._ +import scala.scalanative.unsafe._ + +trait FileDescriptorPoller { + + /** + * Registers a file descriptor with the poller and monitors read- and/or write-ready events. + */ + def registerFileDescriptor( + fileDescriptor: Int, + read: Boolean, + monitorWrites: Boolean + ): Resource[IO, FileDescriptorPollHandle] + +} + +trait FileDescriptorPollHandle { + + /** + * Recursively invokes `f` until it is no longer blocked. Typically `f` will call `read` or + * `recv` on the file descriptor. + * - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`. + * Then `f` will be invoked again with `A` at a later point, when the file handle is ready + * for reading. + * - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this + * method will complete with `B`. + */ + def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] + + /** + * Recursively invokes `f` until it is no longer blocked. Typically `f` will call `write` or + * `send` on the file descriptor. + * - If `f` fails because the file descriptor is blocked, then it should return `Left[A]`. + * Then `f` will be invoked again with `A` at a later point, when the file handle is ready + * for writing. + * - If `f` is successful, then it should return a `Right[B]`. The `IO` returned from this + * method will complete with `B`. + */ + def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] + +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala b/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala deleted file mode 100644 index df8a0e3e00..0000000000 --- a/core/native/src/main/scala/cats/effect/unsafe/FileDescriptorPoller.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2020-2022 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect -package unsafe - -import scala.scalanative.annotation.alwaysinline -import scala.scalanative.runtime._ -import scala.scalanative.unsafe._ - -trait FileDescriptorPoller { - - /** - * Registers a callback to be notified of read- and write-ready events on a file descriptor. - * Produces a runnable which unregisters the file descriptor. - * - * 1. It is the responsibility of the caller to set the file descriptor to non-blocking - * mode. - * 1. It is the responsibility of the caller to unregister the file descriptor when they are - * done. - * 1. A file descriptor should be registered at most once. To modify a registration, you - * must unregister and re-register the file descriptor. - * 1. The callback may be invoked "spuriously" claiming that a file descriptor is read- or - * write-ready when in fact it is not. You should be prepared to handle this. - * 1. The callback will be invoked at least once when the file descriptor transitions from - * blocked to read- or write-ready. You may additionally receive zero or more reminders - * of its readiness. However, you should not rely on any further callbacks until after - * the file descriptor has become blocked again. - */ - def registerFileDescriptor( - fileDescriptor: Int, - readReadyEvents: Boolean, - writeReadyEvents: Boolean)( - cb: FileDescriptorPoller.Callback - ): Runnable - -} - -object FileDescriptorPoller { - trait Callback { - def notifyFileDescriptorEvents(readReady: Boolean, writeReady: Boolean): Unit - } - - object Callback { - @alwaysinline private[unsafe] def toPtr(cb: Callback): Ptr[Byte] = - fromRawPtr(Intrinsics.castObjectToRawPtr(cb)) - - @alwaysinline private[unsafe] def fromPtr[A](ptr: Ptr[Byte]): Callback = - Intrinsics.castRawPtrToObject(toRawPtr(ptr)).asInstanceOf[Callback] - } -} From 5f8146b90d4fe535ced9a859788be0fac1d912f1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 19 Dec 2022 07:05:27 +0000 Subject: [PATCH 18/31] Fix parameter names --- .../src/main/scala/cats/effect/FileDescriptorPoller.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala b/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala index 535d2c041b..d000caef7a 100644 --- a/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala +++ b/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala @@ -27,8 +27,8 @@ trait FileDescriptorPoller { */ def registerFileDescriptor( fileDescriptor: Int, - read: Boolean, - monitorWrites: Boolean + monitorReadReady: Boolean, + monitorWriteReady: Boolean ): Resource[IO, FileDescriptorPollHandle] } From 364060561dce34a701ad268d37919ee13d64f067 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 19 Dec 2022 20:24:18 +0000 Subject: [PATCH 19/31] Refactor/redesign `PollingSystem` ... again ... (: --- .../cats/effect/FileDescriptorPoller.scala | 4 - .../cats/effect/IOCompanionPlatform.scala | 2 +- .../cats/effect/unsafe/EpollSystem.scala | 140 +++++----- .../scala/cats/effect/unsafe/EventLoop.scala | 4 +- .../unsafe/EventLoopExecutorScheduler.scala | 8 +- .../cats/effect/unsafe/KqueueSystem.scala | 248 +++++++++--------- .../unsafe/PollingExecutorScheduler.scala | 12 +- .../cats/effect/unsafe/PollingSystem.scala | 18 +- .../cats/effect/unsafe/SleepSystem.scala | 13 +- 9 files changed, 238 insertions(+), 211 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala b/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala index d000caef7a..72604bbe66 100644 --- a/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala +++ b/core/native/src/main/scala/cats/effect/FileDescriptorPoller.scala @@ -16,10 +16,6 @@ package cats.effect -import scala.scalanative.annotation.alwaysinline -import scala.scalanative.runtime._ -import scala.scalanative.unsafe._ - trait FileDescriptorPoller { /** diff --git a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala index fb4e2a1875..05070cc214 100644 --- a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala @@ -68,7 +68,7 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type => def eventLoop[Poller](implicit ct: ClassTag[Poller]): IO[Option[EventLoop[Poller]]] = IO.executionContext.map { - case loop: EventLoop[_] if ct.runtimeClass.isInstance(loop.poller()) => + case loop: EventLoop[_] if ct.runtimeClass.isInstance(loop.poller) => Some(loop.asInstanceOf[EventLoop[Poller]]) case _ => None } diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index 1e327f303e..7c6cfd3be6 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -20,6 +20,7 @@ package unsafe import org.typelevel.scalaccompat.annotation._ import scala.annotation.tailrec +import scala.concurrent.ExecutionContext import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.unistd @@ -37,89 +38,94 @@ object EpollSystem extends PollingSystem { private[this] final val MaxEvents = 64 - def makePoller(): Poller = { + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = new Poller + + def makePollData(): PollData = { val fd = epoll_create1(0) if (fd == -1) throw new IOException(fromCString(strerror(errno))) - new Poller(fd) + new PollData(fd) } - def close(poller: Poller): Unit = poller.close() + def closePollData(data: PollData): Unit = data.close() + + def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = + data.poll(nanos, reportFailure) - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = - poller.poll(nanos, reportFailure) + final class Poller private[EpollSystem] () - final class Poller private[EpollSystem] (epfd: Int) extends FileDescriptorPoller { + final class PollData private[EpollSystem] (epfd: Int) { - private[this] val callbacks: Set[FileDescriptorPoller.Callback] = - Collections.newSetFromMap(new IdentityHashMap) + // private[this] val callbacks: Set[FileDescriptorPoller.Callback] = + // Collections.newSetFromMap(new IdentityHashMap) private[EpollSystem] def close(): Unit = if (unistd.close(epfd) != 0) throw new IOException(fromCString(strerror(errno))) private[EpollSystem] def poll(timeout: Long, reportFailure: Throwable => Unit): Boolean = { - val noCallbacks = callbacks.isEmpty() - - if (timeout <= 0 && noCallbacks) - false // nothing to do here - else { - val events = stackalloc[epoll_event](MaxEvents.toLong) - - @tailrec - def processEvents(timeout: Int): Unit = { - - val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout) - - if (triggeredEvents >= 0) { - var i = 0 - while (i < triggeredEvents) { - val event = events + i.toLong - val cb = FileDescriptorPoller.Callback.fromPtr(event.data) - try { - val e = event.events.toInt - val readReady = (e & EPOLLIN) != 0 - val writeReady = (e & EPOLLOUT) != 0 - cb.notifyFileDescriptorEvents(readReady, writeReady) - } catch { - case ex if NonFatal(ex) => reportFailure(ex) - } - i += 1 - } - } else { - throw new IOException(fromCString(strerror(errno))) - } - - if (triggeredEvents >= MaxEvents) - processEvents(0) // drain the ready list - else - () - } - - val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt - processEvents(timeoutMillis) - - !callbacks.isEmpty() - } + // val noCallbacks = callbacks.isEmpty() + + // if (timeout <= 0 && noCallbacks) + // false // nothing to do here + // else { + // val events = stackalloc[epoll_event](MaxEvents.toLong) + + // @tailrec + // def processEvents(timeout: Int): Unit = { + + // val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout) + + // if (triggeredEvents >= 0) { + // var i = 0 + // while (i < triggeredEvents) { + // val event = events + i.toLong + // val cb = FileDescriptorPoller.Callback.fromPtr(event.data) + // try { + // val e = event.events.toInt + // val readReady = (e & EPOLLIN) != 0 + // val writeReady = (e & EPOLLOUT) != 0 + // cb.notifyFileDescriptorEvents(readReady, writeReady) + // } catch { + // case ex if NonFatal(ex) => reportFailure(ex) + // } + // i += 1 + // } + // } else { + // throw new IOException(fromCString(strerror(errno))) + // } + + // if (triggeredEvents >= MaxEvents) + // processEvents(0) // drain the ready list + // else + // () + // } + + // val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt + // processEvents(timeoutMillis) + + // !callbacks.isEmpty() + // } + ??? } - def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)( - cb: FileDescriptorPoller.Callback): Runnable = { - val event = stackalloc[epoll_event]() - event.events = - (EPOLLET | (if (reads) EPOLLIN else 0) | (if (writes) EPOLLOUT else 0)).toUInt - event.data = FileDescriptorPoller.Callback.toPtr(cb) - - if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event) != 0) - throw new IOException(fromCString(strerror(errno))) - callbacks.add(cb) - - () => { - callbacks.remove(cb) - if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, null) != 0) - throw new IOException(fromCString(strerror(errno))) - } - } + // def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)( + // cb: FileDescriptorPoller.Callback): Runnable = { + // val event = stackalloc[epoll_event]() + // event.events = + // (EPOLLET | (if (reads) EPOLLIN else 0) | (if (writes) EPOLLOUT else 0)).toUInt + // event.data = FileDescriptorPoller.Callback.toPtr(cb) + + // if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event) != 0) + // throw new IOException(fromCString(strerror(errno))) + // callbacks.add(cb) + + // () => { + // callbacks.remove(cb) + // if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, null) != 0) + // throw new IOException(fromCString(strerror(errno))) + // } + // } } @nowarn212 diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala index 78cd33cee6..23dd70969b 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala @@ -19,8 +19,8 @@ package unsafe import scala.concurrent.ExecutionContext -trait EventLoop[Poller] extends ExecutionContext { +trait EventLoop[+Poller] extends ExecutionContext { - def poller(): Poller + def poller: Poller } diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala index 5c7fe6517e..c91dbe7f3f 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala @@ -31,7 +31,9 @@ private final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSy with ExecutionContextExecutor with Scheduler { - private[this] val _poller = system.makePoller() + private[this] val pollData = system.makePollData() + + val poller: Any = system.makePoller(this, () => pollData) private[this] var needsReschedule: Boolean = true @@ -80,8 +82,6 @@ private final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSy def monotonicNanos() = System.nanoTime() - def poller(): Any = _poller - private[this] def loop(): Unit = { needsReschedule = false @@ -120,7 +120,7 @@ private final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSy else -1 - val needsPoll = system.poll(_poller, timeout, reportFailure) + val needsPoll = system.poll(pollData, timeout, reportFailure) continue = needsPoll || !executeQueue.isEmpty() || !sleepQueue.isEmpty() } diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index bf6f660a56..47ddb9019d 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -20,6 +20,7 @@ package unsafe import org.typelevel.scalaccompat.annotation._ import scala.annotation.tailrec +import scala.concurrent.ExecutionContext import scala.collection.mutable.LongMap import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ @@ -40,157 +41,162 @@ object KqueueSystem extends PollingSystem { private final val MaxEvents = 64 - def makePoller(): Poller = { + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = new Poller + + def makePollData(): PollData = { val fd = kqueue() if (fd == -1) throw new IOException(fromCString(strerror(errno))) - new Poller(fd) + new PollData(fd) } - def close(poller: Poller): Unit = poller.close() + def closePollData(data: PollData): Unit = data.close() + + def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = + data.poll(nanos, reportFailure) - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = - poller.poll(nanos, reportFailure) + final class Poller private[KqueueSystem] () - final class Poller private[KqueueSystem] (kqfd: Int) extends FileDescriptorPoller { + final class PollData private[KqueueSystem] (kqfd: Int) { private[this] val changes: ArrayDeque[EvAdd] = new ArrayDeque - private[this] val callbacks: LongMap[FileDescriptorPoller.Callback] = new LongMap + // private[this] val callbacks: LongMap[FileDescriptorPoller.Callback] = new LongMap private[KqueueSystem] def close(): Unit = if (unistd.close(kqfd) != 0) throw new IOException(fromCString(strerror(errno))) private[KqueueSystem] def poll(timeout: Long, reportFailure: Throwable => Unit): Boolean = { - val noCallbacks = callbacks.isEmpty - - // pre-process the changes to filter canceled ones - val changelist = stackalloc[kevent64_s](changes.size().toLong) - var change = changelist - var changeCount = 0 - while (!changes.isEmpty()) { - val evAdd = changes.poll() - if (!evAdd.canceled) { - change.ident = evAdd.fd.toULong - change.filter = evAdd.filter - change.flags = (EV_ADD | EV_CLEAR).toUShort - change.udata = FileDescriptorPoller.Callback.toPtr(evAdd.cb) - change += 1 - changeCount += 1 - } - } - - if (timeout <= 0 && noCallbacks && changeCount == 0) - false // nothing to do here - else { - - val eventlist = stackalloc[kevent64_s](MaxEvents.toLong) - - @tailrec - def processEvents(timeout: Ptr[timespec], changeCount: Int, flags: Int): Unit = { - - val triggeredEvents = - kevent64( - kqfd, - changelist, - changeCount, - eventlist, - MaxEvents, - flags.toUInt, - timeout - ) - - if (triggeredEvents >= 0) { - var i = 0 - var event = eventlist - while (i < triggeredEvents) { - if ((event.flags.toLong & EV_ERROR) != 0) { - - // TODO it would be interesting to propagate this failure via the callback - reportFailure(new IOException(fromCString(strerror(event.data.toInt)))) - - } else if (callbacks.contains(event.ident.toLong)) { - val filter = event.filter - val cb = FileDescriptorPoller.Callback.fromPtr(event.udata) - - try { - cb.notifyFileDescriptorEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) - } catch { - case NonFatal(ex) => - reportFailure(ex) - } - } - - i += 1 - event += 1 - } - } else { - throw new IOException(fromCString(strerror(errno))) - } - - if (triggeredEvents >= MaxEvents) - processEvents(null, 0, KEVENT_FLAG_NONE) // drain the ready list - else - () - } - - val timeoutSpec = - if (timeout <= 0) null - else { - val ts = stackalloc[timespec]() - ts.tv_sec = timeout / 1000000000 - ts.tv_nsec = timeout % 1000000000 - ts - } - - val flags = if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE - - processEvents(timeoutSpec, changeCount, flags) - - !changes.isEmpty() || callbacks.nonEmpty - } + // val noCallbacks = callbacks.isEmpty + + // // pre-process the changes to filter canceled ones + // val changelist = stackalloc[kevent64_s](changes.size().toLong) + // var change = changelist + // var changeCount = 0 + // while (!changes.isEmpty()) { + // val evAdd = changes.poll() + // if (!evAdd.canceled) { + // change.ident = evAdd.fd.toULong + // change.filter = evAdd.filter + // change.flags = (EV_ADD | EV_CLEAR).toUShort + // change.udata = FileDescriptorPoller.Callback.toPtr(evAdd.cb) + // change += 1 + // changeCount += 1 + // } + // } + + // if (timeout <= 0 && noCallbacks && changeCount == 0) + // false // nothing to do here + // else { + + // val eventlist = stackalloc[kevent64_s](MaxEvents.toLong) + + // @tailrec + // def processEvents(timeout: Ptr[timespec], changeCount: Int, flags: Int): Unit = { + + // val triggeredEvents = + // kevent64( + // kqfd, + // changelist, + // changeCount, + // eventlist, + // MaxEvents, + // flags.toUInt, + // timeout + // ) + + // if (triggeredEvents >= 0) { + // var i = 0 + // var event = eventlist + // while (i < triggeredEvents) { + // if ((event.flags.toLong & EV_ERROR) != 0) { + + // // TODO it would be interesting to propagate this failure via the callback + // reportFailure(new IOException(fromCString(strerror(event.data.toInt)))) + + // } else if (callbacks.contains(event.ident.toLong)) { + // val filter = event.filter + // val cb = FileDescriptorPoller.Callback.fromPtr(event.udata) + + // try { + // cb.notifyFileDescriptorEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) + // } catch { + // case NonFatal(ex) => + // reportFailure(ex) + // } + // } + + // i += 1 + // event += 1 + // } + // } else { + // throw new IOException(fromCString(strerror(errno))) + // } + + // if (triggeredEvents >= MaxEvents) + // processEvents(null, 0, KEVENT_FLAG_NONE) // drain the ready list + // else + // () + // } + + // val timeoutSpec = + // if (timeout <= 0) null + // else { + // val ts = stackalloc[timespec]() + // ts.tv_sec = timeout / 1000000000 + // ts.tv_nsec = timeout % 1000000000 + // ts + // } + + // val flags = if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE + + // processEvents(timeoutSpec, changeCount, flags) + + // !changes.isEmpty() || callbacks.nonEmpty + // } + ??? } - def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)( - cb: FileDescriptorPoller.Callback): Runnable = { + // def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)( + // cb: FileDescriptorPoller.Callback): Runnable = { - val readEvent = - if (reads) - new EvAdd(fd, EVFILT_READ, cb) - else null + // val readEvent = + // if (reads) + // new EvAdd(fd, EVFILT_READ, cb) + // else null - val writeEvent = - if (writes) - new EvAdd(fd, EVFILT_WRITE, cb) - else null + // val writeEvent = + // if (writes) + // new EvAdd(fd, EVFILT_WRITE, cb) + // else null - if (readEvent != null) - changes.add(readEvent) - if (writeEvent != null) - changes.add(writeEvent) + // if (readEvent != null) + // changes.add(readEvent) + // if (writeEvent != null) + // changes.add(writeEvent) - callbacks(fd.toLong) = cb + // callbacks(fd.toLong) = cb - () => { - // we do not need to explicitly unregister the fd with the kqueue, - // b/c it will be unregistered automatically when the fd is closed + // () => { + // // we do not need to explicitly unregister the fd with the kqueue, + // // b/c it will be unregistered automatically when the fd is closed - // release the callback, so it can be GCed - callbacks.remove(fd.toLong) + // // release the callback, so it can be GCed + // callbacks.remove(fd.toLong) - // cancel the events, such that if they are currently pending in the - // changes queue awaiting registration, they will not be registered - if (readEvent != null) readEvent.cancel() - if (writeEvent != null) writeEvent.cancel() - } - } + // // cancel the events, such that if they are currently pending in the + // // changes queue awaiting registration, they will not be registered + // if (readEvent != null) readEvent.cancel() + // if (writeEvent != null) writeEvent.cancel() + // } + // } } private final class EvAdd( val fd: Int, val filter: Short, - val cb: FileDescriptorPoller.Callback + val cb: Any ) { var canceled = false def cancel() = canceled = true diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala index f4eaef586f..ddaa239a30 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingExecutorScheduler.scala @@ -17,7 +17,7 @@ package cats.effect package unsafe -import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} import scala.concurrent.duration._ @deprecated("Use default runtime with a custom PollingSystem", "3.5.0") @@ -29,10 +29,12 @@ abstract class PollingExecutorScheduler(pollEvery: Int) pollEvery, new PollingSystem { type Poller = outer.type - def makePoller(): Poller = outer - def close(poller: Poller): Unit = () - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = - if (nanos == -1) outer.poll(Duration.Inf) else outer.poll(nanos.nanos) + type PollData = outer.type + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = outer + def makePollData(): PollData = outer + def closePollData(data: PollData): Unit = () + def poll(data: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = + if (nanos == -1) data.poll(Duration.Inf) else data.poll(nanos.nanos) } ) diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index 56c0f2a50f..594d5657e6 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -17,13 +17,25 @@ package cats.effect package unsafe +import scala.concurrent.ExecutionContext + abstract class PollingSystem { + /** + * The user-facing Poller interface. + */ type Poller - def makePoller(): Poller + /** + * The thread-local data structure used for polling. + */ + type PollData + + def makePoller(ec: ExecutionContext, data: () => PollData): Poller + + def makePollData(): PollData - def close(poller: Poller): Unit + def closePollData(poller: PollData): Unit /** * @param nanos @@ -36,6 +48,6 @@ abstract class PollingSystem { * @return * whether poll should be called again (i.e., there are more events to be polled) */ - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean + def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean } diff --git a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index d1e0b1c399..47f8c0418c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -17,15 +17,20 @@ package cats.effect package unsafe +import scala.concurrent.ExecutionContext + object SleepSystem extends PollingSystem { - type Poller = Unit + final class Poller private[SleepSystem] () + final class PollData private[SleepSystem] () + + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = new Poller - def makePoller(): Poller = () + def makePollData(): PollData = new PollData - def close(poller: Poller): Unit = () + def closePollData(poller: PollData): Unit = () - def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = { + def poll(poller: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = { if (nanos > 0) Thread.sleep(nanos / 1000000, (nanos % 1000000).toInt) false From 42491da54a7d0dd0394add2f48f12eecdeccc5d9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 19 Dec 2022 21:42:39 +0000 Subject: [PATCH 20/31] Dump `EventLoop` abstraction --- .../cats/effect/IOCompanionPlatform.scala | 8 +++--- .../scala/cats/effect/unsafe/EventLoop.scala | 26 ------------------- .../unsafe/EventLoopExecutorScheduler.scala | 5 ++-- 3 files changed, 6 insertions(+), 33 deletions(-) delete mode 100644 core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala diff --git a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala index 05070cc214..497e5a818d 100644 --- a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala @@ -17,7 +17,7 @@ package cats.effect import cats.effect.std.Console -import cats.effect.unsafe.EventLoop +import cats.effect.unsafe.EventLoopExecutorScheduler import scala.reflect.ClassTag @@ -66,10 +66,10 @@ private[effect] abstract class IOCompanionPlatform { this: IO.type => def readLine: IO[String] = Console[IO].readLine - def eventLoop[Poller](implicit ct: ClassTag[Poller]): IO[Option[EventLoop[Poller]]] = + def poller[Poller](implicit ct: ClassTag[Poller]): IO[Option[Poller]] = IO.executionContext.map { - case loop: EventLoop[_] if ct.runtimeClass.isInstance(loop.poller) => - Some(loop.asInstanceOf[EventLoop[Poller]]) + case loop: EventLoopExecutorScheduler if ct.runtimeClass.isInstance(loop.poller) => + Some(loop.poller.asInstanceOf[Poller]) case _ => None } } diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala deleted file mode 100644 index 23dd70969b..0000000000 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoop.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2020-2022 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect -package unsafe - -import scala.concurrent.ExecutionContext - -trait EventLoop[+Poller] extends ExecutionContext { - - def poller: Poller - -} diff --git a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala index c91dbe7f3f..165fde120e 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EventLoopExecutorScheduler.scala @@ -26,9 +26,8 @@ import scala.util.control.NonFatal import java.util.{ArrayDeque, PriorityQueue} -private final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSystem) - extends EventLoop[Any] - with ExecutionContextExecutor +private[effect] final class EventLoopExecutorScheduler(pollEvery: Int, system: PollingSystem) + extends ExecutionContextExecutor with Scheduler { private[this] val pollData = system.makePollData() From 786127ca2f1d14f8e12ee0e0af9ab67435402f6e Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 19 Dec 2022 22:06:29 +0000 Subject: [PATCH 21/31] Update the `FileDescriptorPollerSpec` --- .../effect/FileDescriptorPollerSpec.scala | 124 ++++++++++++++++ .../unsafe/FileDescriptorPollerSpec.scala | 133 ------------------ 2 files changed, 124 insertions(+), 133 deletions(-) create mode 100644 tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala delete mode 100644 tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala diff --git a/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala new file mode 100644 index 0000000000..95d8594fcf --- /dev/null +++ b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala @@ -0,0 +1,124 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.std.CountDownLatch +import cats.syntax.all._ + +import scala.concurrent.duration._ +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import java.io.IOException + +class FileDescriptorPollerSpec extends BaseSpec { + + final class Pipe( + val readFd: Int, + val writeFd: Int, + val readHandle: FileDescriptorPollHandle, + val writeHandle: FileDescriptorPollHandle + ) { + def read(buf: Array[Byte], offset: Int, length: Int): IO[Unit] = + readHandle + .pollReadRec(()) { _ => IO(guard(unistd.read(readFd, buf.at(offset), length.toULong))) } + .void + + def write(buf: Array[Byte], offset: Int, length: Int): IO[Unit] = + writeHandle + .pollWriteRec(()) { _ => + IO(guard(unistd.write(readFd, buf.at(offset), length.toULong))) + } + .void + + private def guard(thunk: => CInt): Either[Unit, CInt] = { + val rtn = thunk + if (rtn < 0) { + val en = errno + if (en == EAGAIN || en == EWOULDBLOCK) + Left(()) + else + throw new IOException(fromCString(strerror(errno))) + } else + Right(rtn) + } + } + + def mkPipe: Resource[IO, Pipe] = + Resource.make { + IO { + val fd = stackalloc[CInt](2) + if (unistd.pipe(fd) != 0) + throw new IOException(fromCString(strerror(errno))) + else + (fd(0), fd(1)) + } + } { + case (readFd, writeFd) => + IO { + unistd.close(readFd) + unistd.close(writeFd) + () + } + } >>= { + case (readFd, writeFd) => + Resource.eval(IO.poller[FileDescriptorPoller].map(_.get)).flatMap { poller => + ( + poller.registerFileDescriptor(readFd, true, false), + poller.registerFileDescriptor(writeFd, false, true) + ).mapN(new Pipe(readFd, writeFd, _, _)) + } + } + + "FileDescriptorPoller" should { + + "notify read-ready events" in real { + mkPipe.use { pipe => + for { + buf <- IO(new Array[Byte](4)) + _ <- pipe.write(Array[Byte](1, 2, 3), 0, 3).background.surround(pipe.read(buf, 0, 3)) + _ <- pipe.write(Array[Byte](42), 0, 1).background.surround(pipe.read(buf, 3, 1)) + } yield buf.toList must be_==(List[Byte](1, 2, 3, 42)) + } + } + + "handle lots of simultaneous events" in real { + mkPipe.replicateA(1000).use { pipes => + CountDownLatch[IO](1000).flatMap { latch => + pipes.traverse_(pipe => pipe.read(new Array[Byte](1), 0, 1).background).surround { + IO { // trigger all the pipes at once + pipes.foreach { pipe => + unistd.write(pipe.writeFd, Array[Byte](42).at(0), 1.toULong) + } + }.background.surround(latch.await.as(true)) + } + } + } + } + + "hang if never ready" in real { + mkPipe.use { pipe => + pipe.read(new Array[Byte](1), 0, 1).as(false).timeoutTo(1.second, IO.pure(true)) + } + } + } + +} diff --git a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala deleted file mode 100644 index 7145892f4a..0000000000 --- a/tests/native/src/test/scala/cats/effect/unsafe/FileDescriptorPollerSpec.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2020-2022 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect -package unsafe - -import cats.effect.std.{CountDownLatch, Dispatcher, Queue} -import cats.syntax.all._ - -import scala.concurrent.duration._ -import scala.scalanative.libc.errno._ -import scala.scalanative.posix.string._ -import scala.scalanative.posix.unistd._ -import scala.scalanative.unsafe._ -import scala.scalanative.unsigned._ - -import java.io.IOException - -class FileDescriptorPollerSpec extends BaseSpec { - - case class Pipe(readFd: Int, writeFd: Int) - - def mkPipe: Resource[IO, Pipe] = - Resource.make { - IO { - val fd = stackalloc[CInt](2) - if (pipe(fd) != 0) - throw new IOException(fromCString(strerror(errno))) - else - Pipe(fd(0), fd(1)) - } - } { - case Pipe(readFd, writeFd) => - IO { - close(readFd) - close(writeFd) - () - } - } - - def onRead(loop: EventLoop[FileDescriptorPoller], fd: Int, cb: IO[Unit]): Resource[IO, Unit] = - Dispatcher - .sequential[IO] - .flatMap { dispatcher => - Resource.make { - IO { - loop.poller().registerFileDescriptor(fd, true, false) { (readReady, _) => - dispatcher.unsafeRunAndForget(cb.whenA(readReady)) - } - } - }(unregister => IO(unregister.run())) - } - .void - - "FileDescriptorPoller" should { - - "notify read-ready events" in real { - mkPipe.use { - case Pipe(readFd, writeFd) => - IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => - Queue.unbounded[IO, Unit].flatMap { queue => - onRead(loop, readFd, queue.offer(())).surround { - for { - buf <- IO(new Array[Byte](4)) - _ <- IO(write(writeFd, Array[Byte](1, 2, 3).at(0), 3.toULong)) - .background - .surround(queue.take *> IO(read(readFd, buf.at(0), 3.toULong))) - _ <- IO(write(writeFd, Array[Byte](42).at(0), 1.toULong)) - .background - .surround(queue.take *> IO(read(readFd, buf.at(3), 1.toULong))) - } yield buf.toList must be_==(List[Byte](1, 2, 3, 42)) - } - } - } - } - } - - "handle lots of simultaneous events" in real { - mkPipe.replicateA(1000).use { pipes => - IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => - CountDownLatch[IO](1000).flatMap { latch => - pipes.traverse_(pipe => onRead(loop, pipe.readFd, latch.release)).surround { - IO { // trigger all the pipes at once - pipes.foreach(pipe => write(pipe.writeFd, Array[Byte](42).at(0), 1.toULong)) - }.background.surround(latch.await.as(true)) - } - } - } - } - } - - "notify of pre-existing readiness on registration" in real { - mkPipe.use { - case Pipe(readFd, writeFd) => - IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => - val registerAndWait = IO.deferred[Unit].flatMap { gate => - onRead(loop, readFd, gate.complete(()).void).surround(gate.get) - } - - IO(write(writeFd, Array[Byte](42).at(0), 1.toULong)) *> - registerAndWait *> registerAndWait *> IO.pure(true) - } - } - } - - "not notify if not ready" in real { - mkPipe.use { - case Pipe(readFd, _) => - IO.eventLoop[FileDescriptorPoller].map(_.get).flatMap { loop => - val registerAndWait = IO.deferred[Unit].flatMap { gate => - onRead(loop, readFd, gate.complete(()).void).surround(gate.get) - } - - registerAndWait.as(false).timeoutTo(1.second, IO.pure(true)) - } - } - } - } - -} From de3eea0fcb957306bded74951454f6a4acb769d8 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 00:36:10 +0000 Subject: [PATCH 22/31] Rework `EpollSystem` --- .../cats/effect/unsafe/EpollSystem.scala | 249 +++++++++++++----- 1 file changed, 182 insertions(+), 67 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index 7c6cfd3be6..314c51f492 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -17,16 +17,20 @@ package cats.effect package unsafe +import cats.effect.std.Semaphore +import cats.syntax.all._ + import org.typelevel.scalaccompat.annotation._ import scala.annotation.tailrec import scala.concurrent.ExecutionContext +import scala.scalanative.annotation.alwaysinline import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.unistd +import scala.scalanative.runtime._ import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ -import scala.util.control.NonFatal import java.io.IOException import java.util.{Collections, IdentityHashMap, Set} @@ -38,7 +42,8 @@ object EpollSystem extends PollingSystem { private[this] final val MaxEvents = 64 - def makePoller(ec: ExecutionContext, data: () => PollData): Poller = new Poller + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = + new Poller(ec, data) def makePollData(): PollData = { val fd = epoll_create1(0) @@ -50,82 +55,192 @@ object EpollSystem extends PollingSystem { def closePollData(data: PollData): Unit = data.close() def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = - data.poll(nanos, reportFailure) + data.poll(nanos) + + final class Poller private[EpollSystem] (ec: ExecutionContext, data: () => PollData) + extends FileDescriptorPoller { + + def registerFileDescriptor( + fd: Int, + reads: Boolean, + writes: Boolean + ): Resource[IO, FileDescriptorPollHandle] = + Resource + .make { + (Semaphore[IO](1), Semaphore[IO](1)).flatMapN { (readSemaphore, writeSemaphore) => + IO { + val handle = new PollHandle(readSemaphore, writeSemaphore) + val unregister = data().register(fd, reads, writes, handle) + (handle, unregister) + }.evalOn(ec) + } + }(_._2) + .map(_._1) + + } + + private final class PollHandle( + readSemaphore: Semaphore[IO], + writeSemaphore: Semaphore[IO] + ) extends FileDescriptorPollHandle { + + private[this] var readReadyCounter = 0 + private[this] var readCallback: Either[Throwable, Int] => Unit = null + + private[this] var writeReadyCounter = 0 + private[this] var writeCallback: Either[Throwable, Int] => Unit = null + + def notify(events: Int): Unit = { + if ((events & EPOLLIN) != 0) { + val counter = readReadyCounter + 1 + readReadyCounter = counter + val cb = readCallback + readCallback = null + if (cb ne null) cb(Right(counter)) + } + if ((events & EPOLLOUT) != 0) { + val counter = writeReadyCounter + 1 + writeReadyCounter = counter + val cb = writeCallback + writeCallback = null + if (cb ne null) cb(Right(counter)) + } + } + + def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + readSemaphore.permit.surround { + def go(a: A, before: Int): IO[B] = + f(a).flatMap { + case Left(a) => + IO(readReadyCounter).flatMap { after => + if (before != after) + // there was a read-ready notification since we started, try again immediately + go(a, after) + else + IO.async[Int] { cb => + IO { + readCallback = cb + // check again before we suspend + val now = readReadyCounter + if (now != before) { + cb(Right(now)) + readCallback = null + None + } else Some(IO(this.readCallback = null)) + } + }.flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } + + IO(readReadyCounter).flatMap(go(a, _)) + } + + def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + writeSemaphore.permit.surround { + def go(a: A, before: Int): IO[B] = + f(a).flatMap { + case Left(a) => + IO(writeReadyCounter).flatMap { after => + if (before != after) + // there was a write-ready notification since we started, try again immediately + go(a, after) + else + IO.async[Int] { cb => + IO { + writeCallback = cb + // check again before we suspend + val now = writeReadyCounter + if (now != before) { + cb(Right(now)) + writeCallback = null + None + } else Some(IO(this.writeCallback = null)) + } + }.flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } + + IO(writeReadyCounter).flatMap(go(a, _)) + } - final class Poller private[EpollSystem] () + } final class PollData private[EpollSystem] (epfd: Int) { - // private[this] val callbacks: Set[FileDescriptorPoller.Callback] = - // Collections.newSetFromMap(new IdentityHashMap) + private[this] val handles: Set[PollHandle] = + Collections.newSetFromMap(new IdentityHashMap) private[EpollSystem] def close(): Unit = if (unistd.close(epfd) != 0) throw new IOException(fromCString(strerror(errno))) - private[EpollSystem] def poll(timeout: Long, reportFailure: Throwable => Unit): Boolean = { - // val noCallbacks = callbacks.isEmpty() - - // if (timeout <= 0 && noCallbacks) - // false // nothing to do here - // else { - // val events = stackalloc[epoll_event](MaxEvents.toLong) - - // @tailrec - // def processEvents(timeout: Int): Unit = { - - // val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout) - - // if (triggeredEvents >= 0) { - // var i = 0 - // while (i < triggeredEvents) { - // val event = events + i.toLong - // val cb = FileDescriptorPoller.Callback.fromPtr(event.data) - // try { - // val e = event.events.toInt - // val readReady = (e & EPOLLIN) != 0 - // val writeReady = (e & EPOLLOUT) != 0 - // cb.notifyFileDescriptorEvents(readReady, writeReady) - // } catch { - // case ex if NonFatal(ex) => reportFailure(ex) - // } - // i += 1 - // } - // } else { - // throw new IOException(fromCString(strerror(errno))) - // } - - // if (triggeredEvents >= MaxEvents) - // processEvents(0) // drain the ready list - // else - // () - // } - - // val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt - // processEvents(timeoutMillis) - - // !callbacks.isEmpty() - // } - ??? + private[EpollSystem] def poll(timeout: Long): Boolean = { + val noHandles = handles.isEmpty() + + if (timeout <= 0 && noHandles) + false // nothing to do here + else { + val events = stackalloc[epoll_event](MaxEvents.toLong) + + @tailrec + def processEvents(timeout: Int): Unit = { + + val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout) + + if (triggeredEvents >= 0) { + var i = 0 + while (i < triggeredEvents) { + val event = events + i.toLong + val handle = fromPtr(event.data) + handle.notify(event.events.toInt) + i += 1 + } + } else { + throw new IOException(fromCString(strerror(errno))) + } + + if (triggeredEvents >= MaxEvents) + processEvents(0) // drain the ready list + else + () + } + + val timeoutMillis = if (timeout == -1) -1 else (timeout / 1000000).toInt + processEvents(timeoutMillis) + + !handles.isEmpty() + } + } + + private[EpollSystem] def register( + fd: Int, + reads: Boolean, + writes: Boolean, + handle: PollHandle + ): IO[Unit] = { + val event = stackalloc[epoll_event]() + event.events = + (EPOLLET | (if (reads) EPOLLIN else 0) | (if (writes) EPOLLOUT else 0)).toUInt + event.data = toPtr(handle) + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event) != 0) + throw new IOException(fromCString(strerror(errno))) + handles.add(handle) + + IO { + handles.remove(handle) + if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, null) != 0) + throw new IOException(fromCString(strerror(errno))) + } } - // def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)( - // cb: FileDescriptorPoller.Callback): Runnable = { - // val event = stackalloc[epoll_event]() - // event.events = - // (EPOLLET | (if (reads) EPOLLIN else 0) | (if (writes) EPOLLOUT else 0)).toUInt - // event.data = FileDescriptorPoller.Callback.toPtr(cb) - - // if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event) != 0) - // throw new IOException(fromCString(strerror(errno))) - // callbacks.add(cb) - - // () => { - // callbacks.remove(cb) - // if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, null) != 0) - // throw new IOException(fromCString(strerror(errno))) - // } - // } + @alwaysinline private[this] def toPtr(handle: PollHandle): Ptr[Byte] = + fromRawPtr(Intrinsics.castObjectToRawPtr(handle)) + + @alwaysinline private[this] def fromPtr[A](ptr: Ptr[Byte]): PollHandle = + Intrinsics.castRawPtrToObject(toRawPtr(ptr)).asInstanceOf[PollHandle] } @nowarn212 From eb8ba8403f02b5f2df36fba303e34f56ccfe5e05 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 01:14:04 +0000 Subject: [PATCH 23/31] Set pipes to non-blocking mode --- .../effect/FileDescriptorPollerSpec.scala | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala index 95d8594fcf..c87ff8880a 100644 --- a/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala @@ -63,30 +63,40 @@ class FileDescriptorPollerSpec extends BaseSpec { } def mkPipe: Resource[IO, Pipe] = - Resource.make { - IO { - val fd = stackalloc[CInt](2) - if (unistd.pipe(fd) != 0) - throw new IOException(fromCString(strerror(errno))) - else - (fd(0), fd(1)) - } - } { - case (readFd, writeFd) => + Resource + .make { IO { - unistd.close(readFd) - unistd.close(writeFd) - () - } - } >>= { - case (readFd, writeFd) => - Resource.eval(IO.poller[FileDescriptorPoller].map(_.get)).flatMap { poller => - ( - poller.registerFileDescriptor(readFd, true, false), - poller.registerFileDescriptor(writeFd, false, true) - ).mapN(new Pipe(readFd, writeFd, _, _)) + val fd = stackalloc[CInt](2) + if (unistd.pipe(fd) != 0) + throw new IOException(fromCString(strerror(errno))) + (fd(0), fd(1)) } - } + } { + case (readFd, writeFd) => + IO { + unistd.close(readFd) + unistd.close(writeFd) + () + } + } + .evalTap { + case (readFd, writeFd) => + IO { + if (fcntl(readFd, F_SETFL, O_NONBLOCK) != 0) + throw new IOException(fromCString(strerror(errno))) + if (fcntl(writeFd, F_SETFL, O_NONBLOCK) != 0) + throw new IOException(fromCString(strerror(errno))) + } + } + .flatMap { + case (readFd, writeFd) => + Resource.eval(IO.poller[FileDescriptorPoller].map(_.get)).flatMap { poller => + ( + poller.registerFileDescriptor(readFd, true, false), + poller.registerFileDescriptor(writeFd, false, true) + ).mapN(new Pipe(readFd, writeFd, _, _)) + } + } "FileDescriptorPoller" should { From 0124567afaacbc991e387144854d8b893772d6f7 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 01:14:44 +0000 Subject: [PATCH 24/31] Add fcntl import --- .../src/test/scala/cats/effect/FileDescriptorPollerSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala index c87ff8880a..a998fb6115 100644 --- a/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala @@ -22,6 +22,7 @@ import cats.syntax.all._ import scala.concurrent.duration._ import scala.scalanative.libc.errno._ import scala.scalanative.posix.errno._ +import scala.scalanative.posix.fcntl._ import scala.scalanative.posix.string._ import scala.scalanative.posix.unistd import scala.scalanative.unsafe._ From 72b05a78380339e34984769279e1bece3b8f500b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 01:25:01 +0000 Subject: [PATCH 25/31] Fix bugs in spec --- .../effect/FileDescriptorPollerSpec.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala index a998fb6115..a321c3b5c7 100644 --- a/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala +++ b/tests/native/src/test/scala/cats/effect/FileDescriptorPollerSpec.scala @@ -46,7 +46,7 @@ class FileDescriptorPollerSpec extends BaseSpec { def write(buf: Array[Byte], offset: Int, length: Int): IO[Unit] = writeHandle .pollWriteRec(()) { _ => - IO(guard(unistd.write(readFd, buf.at(offset), length.toULong))) + IO(guard(unistd.write(writeFd, buf.at(offset), length.toULong))) } .void @@ -114,13 +114,17 @@ class FileDescriptorPollerSpec extends BaseSpec { "handle lots of simultaneous events" in real { mkPipe.replicateA(1000).use { pipes => CountDownLatch[IO](1000).flatMap { latch => - pipes.traverse_(pipe => pipe.read(new Array[Byte](1), 0, 1).background).surround { - IO { // trigger all the pipes at once - pipes.foreach { pipe => - unistd.write(pipe.writeFd, Array[Byte](42).at(0), 1.toULong) - } - }.background.surround(latch.await.as(true)) - } + pipes + .traverse_ { pipe => + (pipe.read(new Array[Byte](1), 0, 1) *> latch.release).background + } + .surround { + IO { // trigger all the pipes at once + pipes.foreach { pipe => + unistd.write(pipe.writeFd, Array[Byte](42).at(0), 1.toULong) + } + }.background.surround(latch.await.as(true)) + } } } } From d18fa76ddd8137ca4d54073d4e12c4c15ecdf43f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 04:03:55 +0000 Subject: [PATCH 26/31] Add some uncancelables --- .../cats/effect/unsafe/EpollSystem.scala | 96 ++++++++++--------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index 314c51f492..d7243a441c 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -109,58 +109,62 @@ object EpollSystem extends PollingSystem { def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = readSemaphore.permit.surround { - def go(a: A, before: Int): IO[B] = - f(a).flatMap { - case Left(a) => - IO(readReadyCounter).flatMap { after => - if (before != after) - // there was a read-ready notification since we started, try again immediately - go(a, after) - else - IO.async[Int] { cb => - IO { - readCallback = cb - // check again before we suspend - val now = readReadyCounter - if (now != before) { - cb(Right(now)) - readCallback = null - None - } else Some(IO(this.readCallback = null)) - } - }.flatMap(go(a, _)) - } - case Right(b) => IO.pure(b) - } + IO.uncancelable { poll => + def go(a: A, before: Int): IO[B] = + poll(f(a)).flatMap { + case Left(a) => + IO(readReadyCounter).flatMap { after => + if (before != after) + // there was a read-ready notification since we started, try again immediately + go(a, after) + else + poll(IO.async[Int] { cb => + IO { + readCallback = cb + // check again before we suspend + val now = readReadyCounter + if (now != before) { + cb(Right(now)) + readCallback = null + None + } else Some(IO(this.readCallback = null)) + } + }).flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } + } IO(readReadyCounter).flatMap(go(a, _)) } def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = writeSemaphore.permit.surround { - def go(a: A, before: Int): IO[B] = - f(a).flatMap { - case Left(a) => - IO(writeReadyCounter).flatMap { after => - if (before != after) - // there was a write-ready notification since we started, try again immediately - go(a, after) - else - IO.async[Int] { cb => - IO { - writeCallback = cb - // check again before we suspend - val now = writeReadyCounter - if (now != before) { - cb(Right(now)) - writeCallback = null - None - } else Some(IO(this.writeCallback = null)) - } - }.flatMap(go(a, _)) - } - case Right(b) => IO.pure(b) - } + IO.uncancelable { poll => + def go(a: A, before: Int): IO[B] = + poll(f(a)).flatMap { + case Left(a) => + IO(writeReadyCounter).flatMap { after => + if (before != after) + // there was a write-ready notification since we started, try again immediately + go(a, after) + else + poll(IO.async[Int] { cb => + IO { + writeCallback = cb + // check again before we suspend + val now = writeReadyCounter + if (now != before) { + cb(Right(now)) + writeCallback = null + None + } else Some(IO(this.writeCallback = null)) + } + }).flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } + } IO(writeReadyCounter).flatMap(go(a, _)) } From 4d3a916fc542a11566395a5ec4469b5d92f4c4ad Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 04:50:16 +0000 Subject: [PATCH 27/31] Revert "Add some uncancelables" This reverts commit d18fa76ddd8137ca4d54073d4e12c4c15ecdf43f. --- .../cats/effect/unsafe/EpollSystem.scala | 96 +++++++++---------- 1 file changed, 46 insertions(+), 50 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index d7243a441c..314c51f492 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -109,62 +109,58 @@ object EpollSystem extends PollingSystem { def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = readSemaphore.permit.surround { - IO.uncancelable { poll => - def go(a: A, before: Int): IO[B] = - poll(f(a)).flatMap { - case Left(a) => - IO(readReadyCounter).flatMap { after => - if (before != after) - // there was a read-ready notification since we started, try again immediately - go(a, after) - else - poll(IO.async[Int] { cb => - IO { - readCallback = cb - // check again before we suspend - val now = readReadyCounter - if (now != before) { - cb(Right(now)) - readCallback = null - None - } else Some(IO(this.readCallback = null)) - } - }).flatMap(go(a, _)) - } - case Right(b) => IO.pure(b) - } - } + def go(a: A, before: Int): IO[B] = + f(a).flatMap { + case Left(a) => + IO(readReadyCounter).flatMap { after => + if (before != after) + // there was a read-ready notification since we started, try again immediately + go(a, after) + else + IO.async[Int] { cb => + IO { + readCallback = cb + // check again before we suspend + val now = readReadyCounter + if (now != before) { + cb(Right(now)) + readCallback = null + None + } else Some(IO(this.readCallback = null)) + } + }.flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } IO(readReadyCounter).flatMap(go(a, _)) } def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = writeSemaphore.permit.surround { - IO.uncancelable { poll => - def go(a: A, before: Int): IO[B] = - poll(f(a)).flatMap { - case Left(a) => - IO(writeReadyCounter).flatMap { after => - if (before != after) - // there was a write-ready notification since we started, try again immediately - go(a, after) - else - poll(IO.async[Int] { cb => - IO { - writeCallback = cb - // check again before we suspend - val now = writeReadyCounter - if (now != before) { - cb(Right(now)) - writeCallback = null - None - } else Some(IO(this.writeCallback = null)) - } - }).flatMap(go(a, _)) - } - case Right(b) => IO.pure(b) - } - } + def go(a: A, before: Int): IO[B] = + f(a).flatMap { + case Left(a) => + IO(writeReadyCounter).flatMap { after => + if (before != after) + // there was a write-ready notification since we started, try again immediately + go(a, after) + else + IO.async[Int] { cb => + IO { + writeCallback = cb + // check again before we suspend + val now = writeReadyCounter + if (now != before) { + cb(Right(now)) + writeCallback = null + None + } else Some(IO(this.writeCallback = null)) + } + }.flatMap(go(a, _)) + } + case Right(b) => IO.pure(b) + } IO(writeReadyCounter).flatMap(go(a, _)) } From 9ba870f3059de3b83696cb8e9f0b60ac715fc44f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 07:10:25 +0000 Subject: [PATCH 28/31] Rework `KqueueSystem` --- .../cats/effect/unsafe/KqueueSystem.scala | 299 ++++++++++-------- 1 file changed, 162 insertions(+), 137 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index 47ddb9019d..df5e82accc 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -17,11 +17,13 @@ package cats.effect package unsafe +import cats.effect.std.Semaphore +import cats.syntax.all._ + import org.typelevel.scalaccompat.annotation._ import scala.annotation.tailrec import scala.concurrent.ExecutionContext -import scala.collection.mutable.LongMap import scala.scalanative.libc.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.time._ @@ -29,10 +31,9 @@ import scala.scalanative.posix.timeOps._ import scala.scalanative.posix.unistd import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ -import scala.util.control.NonFatal import java.io.IOException -import java.util.ArrayDeque +import java.util.HashMap object KqueueSystem extends PollingSystem { @@ -41,7 +42,8 @@ object KqueueSystem extends PollingSystem { private final val MaxEvents = 64 - def makePoller(ec: ExecutionContext, data: () => PollData): Poller = new Poller + def makePoller(ec: ExecutionContext, data: () => PollData): Poller = + new Poller(ec, data) def makePollData(): PollData = { val fd = kqueue() @@ -53,153 +55,175 @@ object KqueueSystem extends PollingSystem { def closePollData(data: PollData): Unit = data.close() def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = - data.poll(nanos, reportFailure) + data.poll(nanos) + + final class Poller private[KqueueSystem] ( + ec: ExecutionContext, + data: () => PollData + ) extends FileDescriptorPoller { + def registerFileDescriptor( + fd: Int, + reads: Boolean, + writes: Boolean + ): Resource[IO, FileDescriptorPollHandle] = + Resource.eval { + (Semaphore[IO](1), Semaphore[IO](1)).mapN { + new PollHandle(ec, data, fd, _, _) + } + } + } - final class Poller private[KqueueSystem] () + private final class PollHandle( + ec: ExecutionContext, + data: () => PollData, + fd: Int, + readSemaphore: Semaphore[IO], + writeSemaphore: Semaphore[IO] + ) extends FileDescriptorPollHandle { + + private[this] val readEvent = KEvent(fd.toLong, EVFILT_READ) + private[this] val writeEvent = KEvent(fd.toLong, EVFILT_WRITE) + + def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + readSemaphore.permit.surround { + a.tailRecM { a => + f(a).flatTap { r => + if (r.isRight) + IO.unit + else + IO.async[Unit] { cb => + IO { + val kqueue = data() + kqueue.evSet(readEvent, EV_ADD.toUShort, cb) + Some(IO(kqueue.removeCallback(readEvent))) + } + }.evalOn(ec) + } + } + } + + def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + writeSemaphore.permit.surround { + a.tailRecM { a => + f(a).flatTap { r => + if (r.isRight) + IO.unit + else + IO.async[Unit] { cb => + IO { + val kqueue = data() + kqueue.evSet(writeEvent, EV_ADD.toUShort, cb) + Some(IO(kqueue.removeCallback(writeEvent))) + } + }.evalOn(ec) + } + } + } - final class PollData private[KqueueSystem] (kqfd: Int) { + } - private[this] val changes: ArrayDeque[EvAdd] = new ArrayDeque - // private[this] val callbacks: LongMap[FileDescriptorPoller.Callback] = new LongMap + private final case class KEvent(ident: Long, filter: Short) - private[KqueueSystem] def close(): Unit = - if (unistd.close(kqfd) != 0) - throw new IOException(fromCString(strerror(errno))) - - private[KqueueSystem] def poll(timeout: Long, reportFailure: Throwable => Unit): Boolean = { - // val noCallbacks = callbacks.isEmpty - - // // pre-process the changes to filter canceled ones - // val changelist = stackalloc[kevent64_s](changes.size().toLong) - // var change = changelist - // var changeCount = 0 - // while (!changes.isEmpty()) { - // val evAdd = changes.poll() - // if (!evAdd.canceled) { - // change.ident = evAdd.fd.toULong - // change.filter = evAdd.filter - // change.flags = (EV_ADD | EV_CLEAR).toUShort - // change.udata = FileDescriptorPoller.Callback.toPtr(evAdd.cb) - // change += 1 - // changeCount += 1 - // } - // } - - // if (timeout <= 0 && noCallbacks && changeCount == 0) - // false // nothing to do here - // else { - - // val eventlist = stackalloc[kevent64_s](MaxEvents.toLong) - - // @tailrec - // def processEvents(timeout: Ptr[timespec], changeCount: Int, flags: Int): Unit = { - - // val triggeredEvents = - // kevent64( - // kqfd, - // changelist, - // changeCount, - // eventlist, - // MaxEvents, - // flags.toUInt, - // timeout - // ) - - // if (triggeredEvents >= 0) { - // var i = 0 - // var event = eventlist - // while (i < triggeredEvents) { - // if ((event.flags.toLong & EV_ERROR) != 0) { - - // // TODO it would be interesting to propagate this failure via the callback - // reportFailure(new IOException(fromCString(strerror(event.data.toInt)))) - - // } else if (callbacks.contains(event.ident.toLong)) { - // val filter = event.filter - // val cb = FileDescriptorPoller.Callback.fromPtr(event.udata) - - // try { - // cb.notifyFileDescriptorEvents(filter == EVFILT_READ, filter == EVFILT_WRITE) - // } catch { - // case NonFatal(ex) => - // reportFailure(ex) - // } - // } - - // i += 1 - // event += 1 - // } - // } else { - // throw new IOException(fromCString(strerror(errno))) - // } - - // if (triggeredEvents >= MaxEvents) - // processEvents(null, 0, KEVENT_FLAG_NONE) // drain the ready list - // else - // () - // } - - // val timeoutSpec = - // if (timeout <= 0) null - // else { - // val ts = stackalloc[timespec]() - // ts.tv_sec = timeout / 1000000000 - // ts.tv_nsec = timeout % 1000000000 - // ts - // } - - // val flags = if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE - - // processEvents(timeoutSpec, changeCount, flags) - - // !changes.isEmpty() || callbacks.nonEmpty - // } - ??? - } + final class PollData private[KqueueSystem] (kqfd: Int) { - // def registerFileDescriptor(fd: Int, reads: Boolean, writes: Boolean)( - // cb: FileDescriptorPoller.Callback): Runnable = { + private[this] val changelistArray = new Array[Byte](sizeof[kevent64_s].toInt * MaxEvents) + private[this] val changelist = changelistArray.at(0).asInstanceOf[Ptr[kevent64_s]] + private[this] var changeCount = 0 - // val readEvent = - // if (reads) - // new EvAdd(fd, EVFILT_READ, cb) - // else null + private[this] val callbacks = new HashMap[KEvent, Either[Throwable, Unit] => Unit]() - // val writeEvent = - // if (writes) - // new EvAdd(fd, EVFILT_WRITE, cb) - // else null + private[KqueueSystem] def evSet( + event: KEvent, + flags: CUnsignedShort, + cb: Either[Throwable, Unit] => Unit + ): Unit = { + val change = changelist + changeCount.toLong - // if (readEvent != null) - // changes.add(readEvent) - // if (writeEvent != null) - // changes.add(writeEvent) + change.ident = event.ident.toULong + change.filter = event.filter + change.flags = (flags.toInt | EV_ONESHOT).toUShort - // callbacks(fd.toLong) = cb + callbacks.put(event, cb) - // () => { - // // we do not need to explicitly unregister the fd with the kqueue, - // // b/c it will be unregistered automatically when the fd is closed + changeCount += 1 + } - // // release the callback, so it can be GCed - // callbacks.remove(fd.toLong) + private[KqueueSystem] def removeCallback(event: KEvent): Unit = { + callbacks.remove(event) + () + } - // // cancel the events, such that if they are currently pending in the - // // changes queue awaiting registration, they will not be registered - // if (readEvent != null) readEvent.cancel() - // if (writeEvent != null) writeEvent.cancel() - // } - // } + private[KqueueSystem] def close(): Unit = + if (unistd.close(kqfd) != 0) + throw new IOException(fromCString(strerror(errno))) - } + private[KqueueSystem] def poll(timeout: Long): Boolean = { + val noCallbacks = callbacks.isEmpty + + if (timeout <= 0 && noCallbacks && changeCount == 0) + false // nothing to do here + else { + + val eventlist = stackalloc[kevent64_s](MaxEvents.toLong) + + @tailrec + def processEvents(timeout: Ptr[timespec], changeCount: Int, flags: Int): Unit = { + + val triggeredEvents = + kevent64( + kqfd, + changelist, + changeCount, + eventlist, + MaxEvents, + flags.toUInt, + timeout + ) + + if (triggeredEvents >= 0) { + var i = 0 + var event = eventlist + while (i < triggeredEvents) { + val cb = callbacks.remove(KEvent(event.ident.toLong, event.filter)) + + if (cb ne null) + cb( + if ((event.flags.toLong & EV_ERROR) != 0) + Left(new IOException(fromCString(strerror(event.data.toInt)))) + else Either.unit + ) + + i += 1 + event += 1 + } + } else { + throw new IOException(fromCString(strerror(errno))) + } + + if (triggeredEvents >= MaxEvents) + processEvents(null, 0, KEVENT_FLAG_NONE) // drain the ready list + else + () + } + + val timeoutSpec = + if (timeout <= 0) null + else { + val ts = stackalloc[timespec]() + ts.tv_sec = timeout / 1000000000 + ts.tv_nsec = timeout % 1000000000 + ts + } + + val flags = if (timeout == 0) KEVENT_FLAG_IMMEDIATE else KEVENT_FLAG_NONE + + processEvents(timeoutSpec, changeCount, flags) + changeCount = 0 + + !callbacks.isEmpty() + } + } - private final class EvAdd( - val fd: Int, - val filter: Short, - val cb: Any - ) { - var canceled = false - def cancel() = canceled = true } @nowarn212 @@ -215,6 +239,7 @@ object KqueueSystem extends PollingSystem { final val EV_ADD = 0x0001 final val EV_DELETE = 0x0002 + final val EV_ONESHOT = 0x0010 final val EV_CLEAR = 0x0020 final val EV_ERROR = 0x4000 From 96738832dcfbbbc8ae1d1c707fce1b28b794e389 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Tue, 20 Dec 2022 18:37:35 +0000 Subject: [PATCH 29/31] Post-refactor typos --- .../src/main/scala/cats/effect/unsafe/PollingSystem.scala | 2 +- .../src/main/scala/cats/effect/unsafe/SleepSystem.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala index 594d5657e6..a4f0a88248 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/PollingSystem.scala @@ -35,7 +35,7 @@ abstract class PollingSystem { def makePollData(): PollData - def closePollData(poller: PollData): Unit + def closePollData(data: PollData): Unit /** * @param nanos diff --git a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala index 47f8c0418c..ce4b85cc4b 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/SleepSystem.scala @@ -28,9 +28,9 @@ object SleepSystem extends PollingSystem { def makePollData(): PollData = new PollData - def closePollData(poller: PollData): Unit = () + def closePollData(data: PollData): Unit = () - def poll(poller: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = { + def poll(data: PollData, nanos: Long, reportFailure: Throwable => Unit): Boolean = { if (nanos > 0) Thread.sleep(nanos / 1000000, (nanos % 1000000).toInt) false From 43b0b0adf9e7c19be490643d66be5d575af4ebe5 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 24 Dec 2022 04:08:32 +0000 Subject: [PATCH 30/31] Scope `.evalOn` even more tightly --- .../src/main/scala/cats/effect/unsafe/KqueueSystem.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala index df5e82accc..d4ec798b79 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/KqueueSystem.scala @@ -96,8 +96,8 @@ object KqueueSystem extends PollingSystem { val kqueue = data() kqueue.evSet(readEvent, EV_ADD.toUShort, cb) Some(IO(kqueue.removeCallback(readEvent))) - } - }.evalOn(ec) + }.evalOn(ec) + } } } } @@ -114,8 +114,8 @@ object KqueueSystem extends PollingSystem { val kqueue = data() kqueue.evSet(writeEvent, EV_ADD.toUShort, cb) Some(IO(kqueue.removeCallback(writeEvent))) - } - }.evalOn(ec) + }.evalOn(ec) + } } } } From e5dd04f5a0a1528a2a9aa7f2e122703a88627833 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 24 Dec 2022 04:37:55 +0000 Subject: [PATCH 31/31] Use `asyncCheckAttempt` --- .../scala/cats/effect/unsafe/EpollSystem.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala index 314c51f492..bb8666391f 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala @@ -117,16 +117,15 @@ object EpollSystem extends PollingSystem { // there was a read-ready notification since we started, try again immediately go(a, after) else - IO.async[Int] { cb => + IO.asyncCheckAttempt[Int] { cb => IO { readCallback = cb // check again before we suspend val now = readReadyCounter if (now != before) { - cb(Right(now)) readCallback = null - None - } else Some(IO(this.readCallback = null)) + Right(now) + } else Left(Some(IO(this.readCallback = null))) } }.flatMap(go(a, _)) } @@ -146,16 +145,15 @@ object EpollSystem extends PollingSystem { // there was a write-ready notification since we started, try again immediately go(a, after) else - IO.async[Int] { cb => + IO.asyncCheckAttempt[Int] { cb => IO { writeCallback = cb // check again before we suspend val now = writeReadyCounter if (now != before) { - cb(Right(now)) writeCallback = null - None - } else Some(IO(this.writeCallback = null)) + Right(now) + } else Left(Some(IO(this.writeCallback = null))) } }.flatMap(go(a, _)) }