From 5e8f2de38526aae76850b9959a3921a0e73e0fe1 Mon Sep 17 00:00:00 2001 From: Tadas Dailyda Date: Thu, 6 Feb 2020 13:55:50 +0200 Subject: [PATCH 1/2] Add closeImmediately param to Client.exec --- core/src/main/scala/fs2/io/ssh/Client.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/fs2/io/ssh/Client.scala b/core/src/main/scala/fs2/io/ssh/Client.scala index 8ab34a7..5bfc098 100644 --- a/core/src/main/scala/fs2/io/ssh/Client.scala +++ b/core/src/main/scala/fs2/io/ssh/Client.scala @@ -29,7 +29,7 @@ import org.apache.sshd.common.SshException import org.apache.sshd.common.config.keys.FilePasswordProvider import org.apache.sshd.common.keyprovider.FileKeyPairProvider -import scala.{Array, Int, None, Product, Serializable, Some} +import scala.{Array, Boolean, Int, None, Product, Serializable, Some} import scala.util.{Left, Right} import java.lang.{String, SuppressWarnings} @@ -50,7 +50,8 @@ final class Client[F[_]: Concurrent: ContextShift] private (client: SshClient) { cc: ConnectionConfig, command: String, blocker: Blocker, - chunkSize: Int = 4096)( + chunkSize: Int = 4096, + closeImmediately: Boolean = false)( implicit FR: FunctorRaise[F, Error]) : Resource[F, Process[F]] = { @@ -64,7 +65,7 @@ final class Client[F[_]: Concurrent: ContextShift] private (client: SshClient) { opened <- fromFuture(F.delay(channel.open())) // TODO handle failure opening } yield channel)( - channel => fromFuture(F.delay(channel.close(false))).void) + channel => fromFuture(F.delay(channel.close(closeImmediately))).void) } yield new Process[F](channel, chunkSize) } From 34ec15bb67a8a0844d554efefa60f4cc4f4cc457 Mon Sep 17 00:00:00 2001 From: Tadas Dailyda Date: Sat, 15 Feb 2020 18:48:59 +0200 Subject: [PATCH 2/2] Add test case to demonstrate closeImmediately=true use case --- .../test/scala/fs2/io/ssh/ClientSpec.scala | 90 +++++++++++++++++-- 1 file changed, 85 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/fs2/io/ssh/ClientSpec.scala b/core/src/test/scala/fs2/io/ssh/ClientSpec.scala index fcc97ae..03c4c9d 100644 --- a/core/src/test/scala/fs2/io/ssh/ClientSpec.scala +++ b/core/src/test/scala/fs2/io/ssh/ClientSpec.scala @@ -44,6 +44,7 @@ import java.nio.file.Paths // they will only run on Travis if you push your branch to upstream class ClientSpec extends Specification with SshDockerService { implicit val cs = IO.contextShift(ExecutionContext.global) + implicit val timer = IO.timer(ExecutionContext.global) val Timeout = 30.seconds @@ -284,6 +285,76 @@ class ClientSpec extends Specification with SshDockerService { _ <- Resource.liftF(IO(status mustEqual 1)) } yield () } + + "disconnect immediately when command is long-running" in { + val cc = ConnectionConfig(_, testUser, Auth.Password(testPassword)) + + val createTmp: (Blocker, Client[IO], InetSocketAddress) => Resource[IO, String] = (blocker, client, isa) => + for { + num <- Resource.liftF(IO(math.abs(math.random * 100000))) + + tmpFile = s"/tmp/testing-${num}" + + _ <- client.exec( + cc(isa), + s"touch $tmpFile", + blocker) + } yield tmpFile + + val longRunningRead: String => (Blocker, Client[IO], InetSocketAddress) => Resource[IO, String] = + tmpFile => (blocker, client, isa) => + for { + p <- client.exec( + cc(isa), + s"tail -f -n0 $tmpFile", + blocker, + closeImmediately = true) + + results <- p.stdout + .chunks + .through(text.utf8DecodeC.andThen(text.lines)) + .collectFirst { case s@"1" => s } + .take(1) + .compile + .resource + .lastOrError + } yield results + + val writeLine: (String, String) => (Blocker, Client[IO], InetSocketAddress) => Resource[IO, Unit] = + (tmpFile, line) => (blocker, client, isa) => + for { + p <- client.exec( + cc(isa), + s"cat >> $tmpFile", + blocker) + + _ <- Stream.chunk(Chunk.bytes(s"$line\n".getBytes)) + .through(p.stdin) + .compile + .resource + .drain + } yield () + + val createWriteRead = for { + tmpFile <- setupIO(createTmp) + + sleep5 = IO.sleep(5.seconds) + writeSlowly = for { + _ <- sleep5 + _ <- setupIO(writeLine(tmpFile, "0")) + _ <- sleep5 + _ <- setupIO(writeLine(tmpFile, "1")) + _ <- sleep5 + } yield () + + writeRead <- IO.race( + writeSlowly, + setupIO(longRunningRead(tmpFile)).timeout(Timeout) + ) + } yield writeRead + + createWriteRead.unsafeRunTimed(Timeout) must beSome(beRight("1")) + } } def setup(f: (Blocker, Client[IO], InetSocketAddress) => Resource[IO, Unit]): Result = @@ -294,13 +365,22 @@ class ClientSpec extends Specification with SshDockerService { finish: F[Unit] => Result) : Result = { - val r = for { + val r = setupResourceF(f) + finish(r.use(_ => Applicative[F].unit)) + } + + def setupIO[A](f: (Blocker, Client[IO], InetSocketAddress) => Resource[IO, A]): IO[A] = + setupResourceF(f).use(IO.pure) + + def setupResourceF[F[_]: Concurrent: ContextShift, A]( + f: (Blocker, Client[F], InetSocketAddress) => Resource[F, A]) + : Resource[F, A] = { + + for { blocker <- Blocker[F] client <- Client[F] isa <- Resource.liftF(Client.resolve[F](testHost, testPort, blocker)) - _ <- f(blocker, client, isa) - } yield () - - finish(r.use(_ => Applicative[F].unit)) + res <- f(blocker, client, isa) + } yield res } }