From 09524d78bcf38dccd16307108f048ca1c32d6d1a Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Sat, 11 Apr 2026 21:12:08 +0200 Subject: [PATCH 1/4] Support fs2.io.readOutputStream on Scala Native --- .../fs2/io/internal/PipedStreamBuffer.scala | 0 .../scala/fs2/io/internal/Synchronizer.scala | 0 .../src/main/scala/fs2/io/iojvmnative.scala | 56 +++++++++++++++++- io/jvm/src/main/scala/fs2/io/ioplatform.scala | 57 +------------------ 4 files changed, 57 insertions(+), 56 deletions(-) rename io/{jvm => jvm-native}/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala (100%) rename io/{jvm => jvm-native}/src/main/scala/fs2/io/internal/Synchronizer.scala (100%) diff --git a/io/jvm/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala b/io/jvm-native/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala similarity index 100% rename from io/jvm/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala rename to io/jvm-native/src/main/scala/fs2/io/internal/PipedStreamBuffer.scala diff --git a/io/jvm/src/main/scala/fs2/io/internal/Synchronizer.scala b/io/jvm-native/src/main/scala/fs2/io/internal/Synchronizer.scala similarity index 100% rename from io/jvm/src/main/scala/fs2/io/internal/Synchronizer.scala rename to io/jvm-native/src/main/scala/fs2/io/internal/Synchronizer.scala diff --git a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala index f6ac066201..e10f4593e9 100644 --- a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala +++ b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala @@ -22,9 +22,12 @@ package fs2 package io -import cats.effect.kernel.Sync +import cats.effect.kernel.{Async, Deferred, Outcome, Resource, Sync} +import cats.effect.kernel.implicits._ import cats.syntax.all._ +import fs2.io.internal.PipedStreamBuffer +import java.io.{InputStream, OutputStream} import scala.reflect.ClassTag private[fs2] trait iojvmnative { @@ -56,4 +59,55 @@ private[fs2] trait iojvmnative { case None => Stream.raiseError(new IOException(s"Resource $name not found")) } + /** Take a function that emits to a `java.io.OutputStream` effectfully, + * and return a stream which, when run, will perform that function and emit + * the bytes recorded in the OutputStream as an fs2.Stream + * + * The stream produced by this will terminate if: + * - `f` returns + * - `f` calls `OutputStream#close` + * + * If none of those happens, the stream will run forever. + */ + def readOutputStream[F[_]: Async]( + chunkSize: Int + )( + f: OutputStream => F[Unit] + ): Stream[F, Byte] = { + val mkOutput: Resource[F, (OutputStream, InputStream)] = + Resource.make(Sync[F].delay { + val buf = new PipedStreamBuffer(chunkSize) + (buf.outputStream, buf.inputStream) + })(ois => + Sync[F].blocking { + // Piped(I/O)Stream implementations cant't throw on close, no need to nest the handling here. + ois._2.close() + ois._1.close() + } + ) + + Stream.resource(mkOutput).flatMap { case (os, is) => + Stream.eval(Deferred[F, Option[Throwable]]).flatMap { err => + // We need to close the output stream regardless of how `f` finishes + // to ensure an outstanding blocking read on the input stream completes. + // In such a case, there's a race between completion of the read + // stream and finalization of the write stream, so we capture the error + // that occurs when writing and rethrow it. + val write = f(os).guaranteeCase((outcome: Outcome[F, Throwable, Unit]) => + Sync[F].blocking(os.close()) *> err + .complete(outcome match { + case Outcome.Errored(t) => Some(t) + case _ => None + }) + .void + ) + val read = readInputStream(is.pure[F], chunkSize, closeAfterUse = false) + read.concurrently(Stream.eval(write)) ++ Stream.eval(err.get).flatMap { + case None => Stream.empty + case Some(t) => Stream.raiseError[F](t) + } + } + } + } + } diff --git a/io/jvm/src/main/scala/fs2/io/ioplatform.scala b/io/jvm/src/main/scala/fs2/io/ioplatform.scala index bcfcb8c961..82d75e45eb 100644 --- a/io/jvm/src/main/scala/fs2/io/ioplatform.scala +++ b/io/jvm/src/main/scala/fs2/io/ioplatform.scala @@ -23,14 +23,12 @@ package fs2 package io import cats.Show -import cats.effect.kernel.{Async, Outcome, Resource, Sync} +import cats.effect.kernel.{Async, Resource, Sync} import cats.effect.kernel.implicits._ -import cats.effect.kernel.Deferred import cats.syntax.all._ import fs2.internal.ThreadFactories -import fs2.io.internal.PipedStreamBuffer -import java.io.{InputStream, OutputStream} +import java.io.InputStream import java.nio.charset.Charset import java.nio.charset.StandardCharsets import java.util.concurrent.Executors @@ -116,57 +114,6 @@ private[fs2] trait ioplatform extends iojvmnative { ): Resource[F, InputStream] = JavaInputOutputStream.toInputStream(source) - /** Take a function that emits to a `java.io.OutputStream` effectfully, - * and return a stream which, when run, will perform that function and emit - * the bytes recorded in the OutputStream as an fs2.Stream - * - * The stream produced by this will terminate if: - * - `f` returns - * - `f` calls `OutputStream#close` - * - * If none of those happens, the stream will run forever. - */ - def readOutputStream[F[_]: Async]( - chunkSize: Int - )( - f: OutputStream => F[Unit] - ): Stream[F, Byte] = { - val mkOutput: Resource[F, (OutputStream, InputStream)] = - Resource.make(Sync[F].delay { - val buf = new PipedStreamBuffer(chunkSize) - (buf.outputStream, buf.inputStream) - })(ois => - Sync[F].blocking { - // Piped(I/O)Stream implementations cant't throw on close, no need to nest the handling here. - ois._2.close() - ois._1.close() - } - ) - - Stream.resource(mkOutput).flatMap { case (os, is) => - Stream.eval(Deferred[F, Option[Throwable]]).flatMap { err => - // We need to close the output stream regardless of how `f` finishes - // to ensure an outstanding blocking read on the input stream completes. - // In such a case, there's a race between completion of the read - // stream and finalization of the write stream, so we capture the error - // that occurs when writing and rethrow it. - val write = f(os).guaranteeCase((outcome: Outcome[F, Throwable, Unit]) => - Sync[F].blocking(os.close()) *> err - .complete(outcome match { - case Outcome.Errored(t) => Some(t) - case _ => None - }) - .void - ) - val read = readInputStream(is.pure[F], chunkSize, closeAfterUse = false) - read.concurrently(Stream.eval(write)) ++ Stream.eval(err.get).flatMap { - case None => Stream.empty - case Some(t) => Stream.raiseError[F](t) - } - } - } - } - // Using null instead of Option because null check is faster private lazy val vtExecutor: ExecutionContext = { val javaVersion: Int = From f46cd495440a5a3e29ebbe47d91a9e97fc10eca1 Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Sun, 12 Apr 2026 09:35:04 +0200 Subject: [PATCH 2/4] Support fs2.io.toInputStream[Resource] on Scala Native and run tests --- build.sbt | 3 ++- .../scala/fs2/io/JavaInputOutputStream.scala | 0 .../src/main/scala/fs2/io/iojvmnative.scala | 22 ++++++++++++++++ .../test/scala/fs2/io/IoPlatformSuite.scala | 3 ++- io/jvm/src/main/scala/fs2/io/ioplatform.scala | 25 +------------------ 5 files changed, 27 insertions(+), 26 deletions(-) rename io/{jvm => jvm-native}/src/main/scala/fs2/io/JavaInputOutputStream.scala (100%) rename io/{jvm => jvm-native}/src/test/scala/fs2/io/IoPlatformSuite.scala (98%) diff --git a/build.sbt b/build.sbt index aec1ff0c38..0e230eb195 100644 --- a/build.sbt +++ b/build.sbt @@ -378,7 +378,8 @@ lazy val root = tlCrossRootProject lazy val commonNativeSettings = Seq[Setting[?]]( tlVersionIntroduced := List("2.12", "2.13", "3").map(_ -> "3.13.0").toMap, - Test / nativeBrewFormulas += "openssl" + Test / nativeBrewFormulas += "openssl", + Test / nativeConfig ~= { _.withEmbedResources(true) } ) lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) diff --git a/io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala b/io/jvm-native/src/main/scala/fs2/io/JavaInputOutputStream.scala similarity index 100% rename from io/jvm/src/main/scala/fs2/io/JavaInputOutputStream.scala rename to io/jvm-native/src/main/scala/fs2/io/JavaInputOutputStream.scala diff --git a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala index e10f4593e9..d5b22a8ab8 100644 --- a/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala +++ b/io/jvm-native/src/main/scala/fs2/io/iojvmnative.scala @@ -110,4 +110,26 @@ private[fs2] trait iojvmnative { } } + /** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`, + * that is closed whenever the resulting stream terminates. + * + * If the `close` of resulting input stream is invoked manually, then this will await until the + * original stream completely terminates. + * + * Because all `InputStream` methods block (including `close`), the resulting `InputStream` + * should be consumed on a different thread pool than the one that is backing the effect. + * + * Note that the implementation is not thread safe -- only one thread is allowed at any time + * to operate on the resulting `java.io.InputStream`. + */ + def toInputStream[F[_]: Async]: Pipe[F, Byte, InputStream] = + source => Stream.resource(toInputStreamResource(source)) + + /** Like [[toInputStream]] but returns a `Resource` rather than a single element stream. + */ + def toInputStreamResource[F[_]: Async]( + source: Stream[F, Byte] + ): Resource[F, InputStream] = + JavaInputOutputStream.toInputStream(source) + } diff --git a/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala b/io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala similarity index 98% rename from io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala rename to io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala index 983832e95d..2469e013f3 100644 --- a/io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala +++ b/io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala @@ -275,7 +275,8 @@ class IoPlatformSuite extends Fs2Suite { bar.assertEquals("bar") } test("classloader") { - val size = readClassLoaderResource[IO]("fs2/io/foo", 8192).as(1L).compile.foldMonoid + val resourcePath = if (isNative) "/fs2/io/foo" else "fs2/io/foo" + val size = readClassLoaderResource[IO](resourcePath, 8192).as(1L).compile.foldMonoid size.assertEquals(3L) } } diff --git a/io/jvm/src/main/scala/fs2/io/ioplatform.scala b/io/jvm/src/main/scala/fs2/io/ioplatform.scala index 82d75e45eb..35c4774264 100644 --- a/io/jvm/src/main/scala/fs2/io/ioplatform.scala +++ b/io/jvm/src/main/scala/fs2/io/ioplatform.scala @@ -23,12 +23,11 @@ package fs2 package io import cats.Show -import cats.effect.kernel.{Async, Resource, Sync} +import cats.effect.kernel.{Async, Sync} import cats.effect.kernel.implicits._ import cats.syntax.all._ import fs2.internal.ThreadFactories -import java.io.InputStream import java.nio.charset.Charset import java.nio.charset.StandardCharsets import java.util.concurrent.Executors @@ -92,28 +91,6 @@ private[fs2] trait ioplatform extends iojvmnative { ): Pipe[F, O, Nothing] = _.map(_.show).through(text.encode(charset)).through(stdout) - /** Pipe that converts a stream of bytes to a stream that will emit a single `java.io.InputStream`, - * that is closed whenever the resulting stream terminates. - * - * If the `close` of resulting input stream is invoked manually, then this will await until the - * original stream completely terminates. - * - * Because all `InputStream` methods block (including `close`), the resulting `InputStream` - * should be consumed on a different thread pool than the one that is backing the effect. - * - * Note that the implementation is not thread safe -- only one thread is allowed at any time - * to operate on the resulting `java.io.InputStream`. - */ - def toInputStream[F[_]: Async]: Pipe[F, Byte, InputStream] = - source => Stream.resource(toInputStreamResource(source)) - - /** Like [[toInputStream]] but returns a `Resource` rather than a single element stream. - */ - def toInputStreamResource[F[_]: Async]( - source: Stream[F, Byte] - ): Resource[F, InputStream] = - JavaInputOutputStream.toInputStream(source) - // Using null instead of Option because null check is faster private lazy val vtExecutor: ExecutionContext = { val javaVersion: Int = From 3ec3a250ec825db8c7879b065ad4c562f059c726 Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Sun, 12 Apr 2026 10:52:09 +0200 Subject: [PATCH 3/4] Add workaround for nativeLink flakiness --- build.sbt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/build.sbt b/build.sbt index 0e230eb195..282d769b16 100644 --- a/build.sbt +++ b/build.sbt @@ -2,6 +2,9 @@ import com.typesafe.tools.mima.core._ Global / onChangedBuildSource := ReloadOnSourceChanges +// Workaround for https://github.com/scala-native/scala-native/issues/2024 +Global / concurrentRestrictions += Tags.limit(NativeTags.Link, 1) + ThisBuild / tlBaseVersion := "3.13" ThisBuild / organization := "co.fs2" From 1184bfaf2e5b97d3e2d68f59ece6a8f07ff2c761 Mon Sep 17 00:00:00 2001 From: Lorenzo Gabriele Date: Fri, 17 Apr 2026 18:31:42 +0200 Subject: [PATCH 4/4] Increase `IoPlatformSuite` timeout to 5 minutes --- io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala b/io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala index 2469e013f3..8213451624 100644 --- a/io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala +++ b/io/jvm-native/src/test/scala/fs2/io/IoPlatformSuite.scala @@ -38,7 +38,7 @@ import java.util.concurrent.Executors class IoPlatformSuite extends Fs2Suite { // This suite runs for a long time, this avoids timeouts in CI. - override def munitIOTimeout: Duration = 2.minutes + override def munitIOTimeout: Duration = 5.minutes group("readInputStream") { test("reuses internal buffer on smaller chunks") {