Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/scala/fs2/io/ssh/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.sshd.common.SshException
import org.apache.sshd.common.config.keys.FilePasswordProvider
import org.apache.sshd.common.keyprovider.FileKeyPairProvider

import scala.{Array, Int, None, Product, Serializable, Some}
import scala.{Array, Boolean, Int, None, Product, Serializable, Some}
import scala.util.{Left, Right}

import java.lang.{String, SuppressWarnings}
Expand All @@ -50,7 +50,8 @@ final class Client[F[_]: Concurrent: ContextShift] private (client: SshClient) {
cc: ConnectionConfig,
command: String,
blocker: Blocker,
chunkSize: Int = 4096)(
chunkSize: Int = 4096,
closeImmediately: Boolean = false)(
implicit FR: FunctorRaise[F, Error])
: Resource[F, Process[F]] = {

Expand All @@ -64,7 +65,7 @@ final class Client[F[_]: Concurrent: ContextShift] private (client: SshClient) {
opened <- fromFuture(F.delay(channel.open()))
// TODO handle failure opening
} yield channel)(
channel => fromFuture(F.delay(channel.close(false))).void)
channel => fromFuture(F.delay(channel.close(closeImmediately))).void)
} yield new Process[F](channel, chunkSize)
}

Expand Down
90 changes: 85 additions & 5 deletions core/src/test/scala/fs2/io/ssh/ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import java.nio.file.Paths
// they will only run on Travis if you push your branch to upstream
class ClientSpec extends Specification with SshDockerService {
implicit val cs = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)

val Timeout = 30.seconds

Expand Down Expand Up @@ -284,6 +285,76 @@ class ClientSpec extends Specification with SshDockerService {
_ <- Resource.liftF(IO(status mustEqual 1))
} yield ()
}

"disconnect immediately when command is long-running" in {
val cc = ConnectionConfig(_, testUser, Auth.Password(testPassword))

val createTmp: (Blocker, Client[IO], InetSocketAddress) => Resource[IO, String] = (blocker, client, isa) =>
for {
num <- Resource.liftF(IO(math.abs(math.random * 100000)))

tmpFile = s"/tmp/testing-${num}"

_ <- client.exec(
cc(isa),
s"touch $tmpFile",
blocker)
} yield tmpFile

val longRunningRead: String => (Blocker, Client[IO], InetSocketAddress) => Resource[IO, String] =
tmpFile => (blocker, client, isa) =>
for {
p <- client.exec(
cc(isa),
s"tail -f -n0 $tmpFile",
blocker,
closeImmediately = true)

results <- p.stdout
.chunks
.through(text.utf8DecodeC.andThen(text.lines))
.collectFirst { case s@"1" => s }
.take(1)
.compile
.resource
.lastOrError
} yield results

val writeLine: (String, String) => (Blocker, Client[IO], InetSocketAddress) => Resource[IO, Unit] =
(tmpFile, line) => (blocker, client, isa) =>
for {
p <- client.exec(
cc(isa),
s"cat >> $tmpFile",
blocker)

_ <- Stream.chunk(Chunk.bytes(s"$line\n".getBytes))
.through(p.stdin)
.compile
.resource
.drain
} yield ()

val createWriteRead = for {
tmpFile <- setupIO(createTmp)

sleep5 = IO.sleep(5.seconds)
writeSlowly = for {
_ <- sleep5
_ <- setupIO(writeLine(tmpFile, "0"))
_ <- sleep5
_ <- setupIO(writeLine(tmpFile, "1"))
_ <- sleep5
} yield ()

writeRead <- IO.race(
writeSlowly,
setupIO(longRunningRead(tmpFile)).timeout(Timeout)
)
} yield writeRead

createWriteRead.unsafeRunTimed(Timeout) must beSome(beRight("1"))
}
}

def setup(f: (Blocker, Client[IO], InetSocketAddress) => Resource[IO, Unit]): Result =
Expand All @@ -294,13 +365,22 @@ class ClientSpec extends Specification with SshDockerService {
finish: F[Unit] => Result)
: Result = {

val r = for {
val r = setupResourceF(f)
finish(r.use(_ => Applicative[F].unit))
}

def setupIO[A](f: (Blocker, Client[IO], InetSocketAddress) => Resource[IO, A]): IO[A] =
setupResourceF(f).use(IO.pure)

def setupResourceF[F[_]: Concurrent: ContextShift, A](
f: (Blocker, Client[F], InetSocketAddress) => Resource[F, A])
: Resource[F, A] = {

for {
blocker <- Blocker[F]
client <- Client[F]
isa <- Resource.liftF(Client.resolve[F](testHost, testPort, blocker))
_ <- f(blocker, client, isa)
} yield ()

finish(r.use(_ => Applicative[F].unit))
res <- f(blocker, client, isa)
} yield res
}
}