Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 47 additions & 45 deletions core/src/main/scala/fs2/io/ssh/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,57 +41,54 @@ final class Client[F[_]] private (client: SshClient)(implicit F: Async[F]) {
import Client.Error
import MinaFuture._

@SuppressWarnings(
Array(
"org.wartremover.warts.DefaultArguments"))
def exec(
cc: ConnectionConfig,
command: String,
chunkSize: Int = 4096)(
implicit FR: Raise[F, Error])
: Resource[F, Process[F]] = {
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments"))
def exec(cc: ConnectionConfig, command: String, chunkSize: Int = 4096)(
implicit FR: Raise[F, Error]
): Resource[F, Process[F]] = {

for {
session <- this.session(cc)

channel <- Resource.make(
for {
channel <- F.delay(session.createExecChannel(command))
_ <- F.delay(channel.setStreaming(StreamingChannel.Streaming.Async))
opened <- fromFuture(F.delay(channel.open()))
// TODO handle failure opening
} yield channel)(
channel => fromFuture(F.delay(channel.close(false))).void)
channel <- Resource.make(for {
channel <- F.delay(session.createExecChannel(command))
_ <- F.delay(channel.setStreaming(StreamingChannel.Streaming.Async))
opened <- fromFuture(F.delay(channel.open()))
// TODO handle failure opening
} yield channel)(channel =>
fromFuture(F.delay(channel.close(false))).void
)
} yield new Process[F](channel, chunkSize)
}

def portForward(
cc: ConnectionConfig,
local: InetSocketAddress,
remote: InetSocketAddress)(
implicit FR: Raise[F, Error])
: Resource[F, Unit] = {
this.session(cc).flatMap { clientSession =>
val l = SshdSocketAddress.toSshdSocketAddress(local)
val r = SshdSocketAddress.toSshdSocketAddress(remote)
Resource.make(F.delay(clientSession.startLocalPortForwarding(l, r)))(address => F.delay(clientSession.stopLocalPortForwarding(address)))
}.void
cc: ConnectionConfig,
local: InetSocketAddress,
remote: InetSocketAddress
)(implicit FR: Raise[F, Error]): Resource[F, Unit] = {
this
.session(cc)
.flatMap { clientSession =>
val l = SshdSocketAddress.toSshdSocketAddress(local)
val r = SshdSocketAddress.toSshdSocketAddress(remote)
Resource.make(F.delay(clientSession.startLocalPortForwarding(l, r)))(
address => F.delay(clientSession.stopLocalPortForwarding(address))
)
}
.void
}

@SuppressWarnings(
Array(
"org.wartremover.warts.Null",
"org.wartremover.warts.ToString"))
Array("org.wartremover.warts.Null", "org.wartremover.warts.ToString")
)
def session(
cc: ConnectionConfig)(
implicit FR: Raise[F, Error])
: Resource[F, ClientSession] = {
cc: ConnectionConfig
)(implicit FR: Raise[F, Error]): Resource[F, ClientSession] = {

for {
session <-
Resource.make(
fromFuture(F.delay(client.connect(cc.username, cc.host))))(
session => fromFuture(F.delay(session.close(false))).void)
fromFuture(F.delay(client.connect(cc.username, cc.host)))
)(session => fromFuture(F.delay(session.close(false))).void)

_ <- cc.auth match {
case Auth.Password(text) =>
Expand All @@ -115,7 +112,9 @@ final class Client[F[_]] private (client: SshClient)(implicit F: Async[F]) {
}

case None =>
F.delay(provider.setPasswordFinder(FilePasswordProvider.EMPTY))
F.delay(
provider.setPasswordFinder(FilePasswordProvider.EMPTY)
)
}

pairs <- F.blocking(provider.loadKeys(session))
Expand All @@ -142,8 +141,8 @@ final class Client[F[_]] private (client: SshClient)(implicit F: Async[F]) {

_ <- Resource eval {
success match {
case Left(_: SshException) =>
FR.raise(Error.Authentication)
case Left(ex: SshException) =>
FR.raise(Error.SshErr(ex))

case Left(e) =>
F.raiseError(e)
Expand All @@ -168,7 +167,11 @@ object Client {

// send periodic SSH_MSG_IGNOREs to prevent the connection from dying
// without activity
client.setSessionHeartbeat(HeartbeatType.IGNORE, TimeUnit.SECONDS, HeartbeatInterval)
client.setSessionHeartbeat(
HeartbeatType.IGNORE,
TimeUnit.SECONDS,
HeartbeatInterval
)

client.start()
client
Expand All @@ -178,15 +181,14 @@ object Client {
}

// convenience function that really should live elsewhere
def resolve[F[_]: Async](
hostname: String,
port: Int)
: F[InetSocketAddress] =
Async[F].blocking(new InetSocketAddress(InetAddress.getByName(hostname), port))
def resolve[F[_]: Async](hostname: String, port: Int): F[InetSocketAddress] =
Async[F].blocking(
new InetSocketAddress(InetAddress.getByName(hostname), port)
)

sealed trait Error extends Product with Serializable

object Error {
case object Authentication extends Error
case class SshErr(ex: SshException) extends Error
}
}
Loading