From b61912a16d8321c544d1df555760f8fd228e6f84 Mon Sep 17 00:00:00 2001 From: rintcius Date: Mon, 24 Nov 2025 12:18:35 +0100 Subject: [PATCH] Keep exception in case of error ... also rename because we can't assume these are always auth errors --- core/src/main/scala/fs2/io/ssh/Client.scala | 92 +++---- .../test/scala/fs2/io/ssh/ClientSpec.scala | 236 +++++++++--------- 2 files changed, 171 insertions(+), 157 deletions(-) diff --git a/core/src/main/scala/fs2/io/ssh/Client.scala b/core/src/main/scala/fs2/io/ssh/Client.scala index 6fc86d7..e4cafb9 100644 --- a/core/src/main/scala/fs2/io/ssh/Client.scala +++ b/core/src/main/scala/fs2/io/ssh/Client.scala @@ -41,57 +41,54 @@ final class Client[F[_]] private (client: SshClient)(implicit F: Async[F]) { import Client.Error import MinaFuture._ - @SuppressWarnings( - Array( - "org.wartremover.warts.DefaultArguments")) - def exec( - cc: ConnectionConfig, - command: String, - chunkSize: Int = 4096)( - implicit FR: Raise[F, Error]) - : Resource[F, Process[F]] = { + @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) + def exec(cc: ConnectionConfig, command: String, chunkSize: Int = 4096)( + implicit FR: Raise[F, Error] + ): Resource[F, Process[F]] = { for { session <- this.session(cc) - channel <- Resource.make( - for { - channel <- F.delay(session.createExecChannel(command)) - _ <- F.delay(channel.setStreaming(StreamingChannel.Streaming.Async)) - opened <- fromFuture(F.delay(channel.open())) - // TODO handle failure opening - } yield channel)( - channel => fromFuture(F.delay(channel.close(false))).void) + channel <- Resource.make(for { + channel <- F.delay(session.createExecChannel(command)) + _ <- F.delay(channel.setStreaming(StreamingChannel.Streaming.Async)) + opened <- fromFuture(F.delay(channel.open())) + // TODO handle failure opening + } yield channel)(channel => + fromFuture(F.delay(channel.close(false))).void + ) } yield new Process[F](channel, chunkSize) } def portForward( - cc: ConnectionConfig, - local: InetSocketAddress, - remote: InetSocketAddress)( - implicit FR: Raise[F, Error]) - : Resource[F, Unit] = { - this.session(cc).flatMap { clientSession => - val l = SshdSocketAddress.toSshdSocketAddress(local) - val r = SshdSocketAddress.toSshdSocketAddress(remote) - Resource.make(F.delay(clientSession.startLocalPortForwarding(l, r)))(address => F.delay(clientSession.stopLocalPortForwarding(address))) - }.void + cc: ConnectionConfig, + local: InetSocketAddress, + remote: InetSocketAddress + )(implicit FR: Raise[F, Error]): Resource[F, Unit] = { + this + .session(cc) + .flatMap { clientSession => + val l = SshdSocketAddress.toSshdSocketAddress(local) + val r = SshdSocketAddress.toSshdSocketAddress(remote) + Resource.make(F.delay(clientSession.startLocalPortForwarding(l, r)))( + address => F.delay(clientSession.stopLocalPortForwarding(address)) + ) + } + .void } @SuppressWarnings( - Array( - "org.wartremover.warts.Null", - "org.wartremover.warts.ToString")) + Array("org.wartremover.warts.Null", "org.wartremover.warts.ToString") + ) def session( - cc: ConnectionConfig)( - implicit FR: Raise[F, Error]) - : Resource[F, ClientSession] = { + cc: ConnectionConfig + )(implicit FR: Raise[F, Error]): Resource[F, ClientSession] = { for { session <- Resource.make( - fromFuture(F.delay(client.connect(cc.username, cc.host))))( - session => fromFuture(F.delay(session.close(false))).void) + fromFuture(F.delay(client.connect(cc.username, cc.host))) + )(session => fromFuture(F.delay(session.close(false))).void) _ <- cc.auth match { case Auth.Password(text) => @@ -115,7 +112,9 @@ final class Client[F[_]] private (client: SshClient)(implicit F: Async[F]) { } case None => - F.delay(provider.setPasswordFinder(FilePasswordProvider.EMPTY)) + F.delay( + provider.setPasswordFinder(FilePasswordProvider.EMPTY) + ) } pairs <- F.blocking(provider.loadKeys(session)) @@ -142,8 +141,8 @@ final class Client[F[_]] private (client: SshClient)(implicit F: Async[F]) { _ <- Resource eval { success match { - case Left(_: SshException) => - FR.raise(Error.Authentication) + case Left(ex: SshException) => + FR.raise(Error.SshErr(ex)) case Left(e) => F.raiseError(e) @@ -168,7 +167,11 @@ object Client { // send periodic SSH_MSG_IGNOREs to prevent the connection from dying // without activity - client.setSessionHeartbeat(HeartbeatType.IGNORE, TimeUnit.SECONDS, HeartbeatInterval) + client.setSessionHeartbeat( + HeartbeatType.IGNORE, + TimeUnit.SECONDS, + HeartbeatInterval + ) client.start() client @@ -178,15 +181,14 @@ object Client { } // convenience function that really should live elsewhere - def resolve[F[_]: Async]( - hostname: String, - port: Int) - : F[InetSocketAddress] = - Async[F].blocking(new InetSocketAddress(InetAddress.getByName(hostname), port)) + def resolve[F[_]: Async](hostname: String, port: Int): F[InetSocketAddress] = + Async[F].blocking( + new InetSocketAddress(InetAddress.getByName(hostname), port) + ) sealed trait Error extends Product with Serializable object Error { - case object Authentication extends Error + case class SshErr(ex: SshException) extends Error } } diff --git a/core/src/test/scala/fs2/io/ssh/ClientSpec.scala b/core/src/test/scala/fs2/io/ssh/ClientSpec.scala index a3d4ec4..1fad416 100644 --- a/core/src/test/scala/fs2/io/ssh/ClientSpec.scala +++ b/core/src/test/scala/fs2/io/ssh/ClientSpec.scala @@ -53,7 +53,8 @@ class ClientSpec extends Specification with SshDockerService { val keyPassword = "password" "ssh client" should { - final case class WrappedError(e: Client.Error) extends RuntimeException(e.toString) + final case class WrappedError(e: Client.Error) + extends RuntimeException(e.toString) // useful for tests where we're implicitly asserting no errors implicit val throwingFR: Raise[IO, Client.Error] = @@ -65,81 +66,101 @@ class ClientSpec extends Specification with SshDockerService { } "authenticate with a password" in setup { (client, isa) => - client.exec( - ConnectionConfig( - isa, - testUser, - Auth.Password(testPassword)), - "whoami").void + client + .exec( + ConnectionConfig(isa, testUser, Auth.Password(testPassword)), + "whoami" + ) + .void } "report authentication failure" in setupF[EitherT[IO, Client.Error, *]]( { (client, isa) => - client.exec( - ConnectionConfig( - isa, - testUser, - Auth.Password("bippy")), - "whoami").void + client + .exec( + ConnectionConfig(isa, testUser, Auth.Password("bippy")), + "whoami" + ) + .void }, - _.value.unsafeRunTimed(Timeout) must beSome(beLeft(Client.Error.Authentication: Client.Error))) + (_: EitherT[IO, Client.Error, Unit]).value + .unsafeRunTimed(Timeout) must beSome(beLeft[Client.Error].like { + case _: Client.Error.SshErr => ok + }) + ) "authenticate with an unprotected key" in setup { (client, isa) => - client.exec( - ConnectionConfig( - isa, - testUser, - Auth.KeyFile(Paths.get("core", "src", "test", "resources", "nopassword"), None)), - "whoami").void + client + .exec( + ConnectionConfig( + isa, + testUser, + Auth.KeyFile( + Paths.get("core", "src", "test", "resources", "nopassword"), + None + ) + ), + "whoami" + ) + .void } "authenticate with an unprotected key in memory" in setup { (client, isa) => for { keyChunks <- - Files[IO].readAll( - fs2.io.file.Path.fromNioPath(Paths.get("core", "src", "test", "resources", "nopassword")), - 4096, - Flags.Read - ) - .chunks - .compile - .resource - .to(Seq) + Files[IO] + .readAll( + fs2.io.file.Path.fromNioPath( + Paths.get("core", "src", "test", "resources", "nopassword") + ), + 4096, + Flags.Read + ) + .chunks + .compile + .resource + .to(Seq) key = Chunk.concat(keyChunks).toArray[Byte] _ <- client.exec( - ConnectionConfig( - isa, - testUser, - Auth.KeyBytes(key, None)), - "whoami") + ConnectionConfig(isa, testUser, Auth.KeyBytes(key, None)), + "whoami" + ) } yield () } "authenticate with a protected key" in setup { (client, isa) => - client.exec( - ConnectionConfig( - isa, - testUser, - Auth.KeyFile( - Paths.get("core", "src", "test", "resources", "password"), - Some(keyPassword))), - "whoami").void + client + .exec( + ConnectionConfig( + isa, + testUser, + Auth.KeyFile( + Paths.get("core", "src", "test", "resources", "password"), + Some(keyPassword) + ) + ), + "whoami" + ) + .void } "authenticate with a protected key in memory" in setup { (client, isa) => for { keyChunks <- - Files[IO].readAll( - fs2.io.file.Path.fromNioPath(Paths.get("core", "src", "test", "resources", "password")), - 4096, - Flags.Read - ) - .chunks - .compile - .resource - .to(Seq) + Files[IO] + .readAll( + fs2.io.file.Path.fromNioPath( + Paths.get("core", "src", "test", "resources", "password") + ), + 4096, + Flags.Read + ) + .chunks + .compile + .resource + .to(Seq) key = Chunk.concat(keyChunks).toArray[Byte] @@ -147,22 +168,21 @@ class ClientSpec extends Specification with SshDockerService { ConnectionConfig( isa, testUser, - Auth.KeyBytes(key, Some(keyPassword))), - "whoami") + Auth.KeyBytes(key, Some(keyPassword)) + ), + "whoami" + ) } yield () } "read from stdout" in setup { (client, isa) => for { p <- client.exec( - ConnectionConfig( - isa, - testUser, - Auth.Password(testPassword)), - "whoami") + ConnectionConfig(isa, testUser, Auth.Password(testPassword)), + "whoami" + ) - results <- p.stdout - .chunks + results <- p.stdout.chunks .map(bytes => new String(bytes.toArray[Byte])) .foldMonoid .compile @@ -176,14 +196,11 @@ class ClientSpec extends Specification with SshDockerService { "read from stderr" in setup { (client, isa) => for { p <- client.exec( - ConnectionConfig( - isa, - testUser, - Auth.Password(testPassword)), - "whoami >&2") + ConnectionConfig(isa, testUser, Auth.Password(testPassword)), + "whoami >&2" + ) - results <- p.stderr - .chunks + results <- p.stderr.chunks .map(bytes => new String(bytes.toArray[Byte])) .foldMonoid .compile @@ -195,7 +212,8 @@ class ClientSpec extends Specification with SshDockerService { } "write to stdin" in setup { (client, isa) => - val data = "Hello, It's me, I've been wondering if after all these years you'd like to meet" + val data = + "Hello, It's me, I've been wondering if after all these years you'd like to meet" for { num <- Resource.eval(IO(math.abs(math.random() * 100000))) @@ -203,31 +221,27 @@ class ClientSpec extends Specification with SshDockerService { // we need to nest the resource here because we want to explicitly close the connection r = for { p1 <- client.exec( - ConnectionConfig( - isa, - testUser, - Auth.Password(testPassword)), - s"cat > /tmp/testing-${num}") - - _ <- Stream.chunk(Chunk.array(data.getBytes)) - .through(p1.stdin) - .compile - .resource - .drain + ConnectionConfig(isa, testUser, Auth.Password(testPassword)), + s"cat > /tmp/testing-${num}" + ) + + _ <- Stream + .chunk(Chunk.array(data.getBytes)) + .through(p1.stdin) + .compile + .resource + .drain } yield () // you know, a close function on Resource would be really great right about now... _ <- Resource.eval(r.use(_ => IO.unit)) p2 <- client.exec( - ConnectionConfig( - isa, - testUser, - Auth.Password(testPassword)), - s"cat /tmp/testing-${num}") + ConnectionConfig(isa, testUser, Auth.Password(testPassword)), + s"cat /tmp/testing-${num}" + ) - results <- p2.stdout - .chunks + results <- p2.stdout.chunks .map(bytes => new String(bytes.toArray[Byte])) .foldMonoid .compile @@ -243,11 +257,9 @@ class ClientSpec extends Specification with SshDockerService { now <- Resource.eval(IO(System.currentTimeMillis)) p <- client.exec( - ConnectionConfig( - isa, - testUser, - Auth.Password(testPassword)), - "sleep 5") + ConnectionConfig(isa, testUser, Auth.Password(testPassword)), + "sleep 5" + ) status <- Resource.eval(p.join) now2 <- Resource.eval(IO(System.currentTimeMillis)) @@ -261,32 +273,32 @@ class ClientSpec extends Specification with SshDockerService { } yield () } - "join on remote command, reporting non-zero status" in setup { (client, isa) => - for { - p <- client.exec( - ConnectionConfig( - isa, - testUser, - Auth.Password(testPassword)), - "exit 1") + "join on remote command, reporting non-zero status" in setup { + (client, isa) => + for { + p <- client.exec( + ConnectionConfig(isa, testUser, Auth.Password(testPassword)), + "exit 1" + ) - status <- Resource.eval(p.join) - _ <- Resource.eval(IO(status mustEqual 1)) - } yield () + status <- Resource.eval(p.join) + _ <- Resource.eval(IO(status mustEqual 1)) + } yield () } "forward the port" in setup { (client, isa) => client.portForward( - ConnectionConfig( - isa, - testUser, - Auth.Password(testPassword)), + ConnectionConfig(isa, testUser, Auth.Password(testPassword)), InetSocketAddress.createUnresolved("127.0.0.1", 3000), InetSocketAddress.createUnresolved(isa.getHostString, 3000) - ) >> OkHttpBuilder.withDefaultClient[IO].flatMap(_.resource).evalMap { client => - client.get("http://127.0.0.1:3000")(r => IO.pure(r.status.isSuccess)).flatMap { result => - IO.delay(result mustEqual true) - }.void + ) >> OkHttpBuilder.withDefaultClient[IO].flatMap(_.resource).evalMap { + client => + client + .get("http://127.0.0.1:3000")(r => IO.pure(r.status.isSuccess)) + .flatMap { result => + IO.delay(result mustEqual true) + } + .void } } } @@ -296,8 +308,8 @@ class ClientSpec extends Specification with SshDockerService { def setupF[F[_]: Async]( f: (Client[F], InetSocketAddress) => Resource[F, Unit], - finish: F[Unit] => Result) - : Result = { + finish: F[Unit] => Result + ): Result = { val r = for { client <- Client[F]