diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1008ed7..606954a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -48,5 +48,17 @@ jobs: - name: Compile with warnings as errors run: mix compile --warnings-as-errors - - name: Run tests + - name: Run unit tests run: mix test + + - name: Start SSH test container + run: cd test/docker && bash run.sh start + + - name: Verify SSH container is ready + run: | + ssh -i test/docker/.keys/test_key -p 2222 \ + -o BatchMode=yes -o StrictHostKeyChecking=no -o ConnectTimeout=5 \ + fusion_test@localhost echo ok + + - name: Run external tests + run: elixir --sname ci_test@localhost -S mix test --include external diff --git a/.gitignore b/.gitignore index 6f8cf11..d5bbb9e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ *.beam test/docker/.keys/ .worktrees/ +tmp/ +/cover/ +erl_crash.dump diff --git a/README.md b/README.md index 57767e0..1116588 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Fusion connects to remote servers via SSH, sets up port tunnels for Erlang distr - Elixir ~> 1.18 / OTP 28+ - Remote server with Elixir/Erlang installed -- SSH access (key-based or password via `sshpass`) +- SSH access (key-based or password) ## Installation @@ -119,6 +119,21 @@ Disconnect when done: Fusion.NodeManager.disconnect(manager) ``` +### SSH Backend + +Fusion uses Erlang's built-in SSH module by default. No system `ssh` binary required. + +To use the legacy system SSH backend instead: + +```elixir +target = %Fusion.Target{ + host: "10.0.1.5", + username: "deploy", + auth: {:key, "~/.ssh/id_ed25519"}, + ssh_backend: Fusion.SshBackend.System # uses system ssh/sshpass +} +``` + ### Automatic Dependency Resolution When you run `RemoteHealth` remotely, Fusion reads the BEAM bytecode, walks the dependency tree, and pushes everything the module needs. You don't need to manually track the dependency chain. @@ -199,7 +214,11 @@ cd test/docker && ./run.sh stop Fusion (public API) ├── TaskRunner - Remote code execution + module pushing + dependency resolution ├── NodeManager - GenServer: tunnel setup, BEAM bootstrap, connection lifecycle -├── Target - SSH connection configuration struct +├── Target - SSH connection configuration struct (includes ssh_backend selection) +├── SshBackend - Behaviour for pluggable SSH implementations +│ ├── Erlang - Default: uses OTP's built-in :ssh module +│ └── System - Legacy: shells out to system ssh/sshpass binaries +├── SshKeyProvider - Custom ssh_client_key_api for specific key file paths ├── TunnelSupervisor - DynamicSupervisor for tunnel processes ├── Net - Port generation, EPMD utilities ├── Connector - SSH connection GenServer diff --git a/docs/plans/2026-03-07-erlang-ssh-design.md b/docs/plans/2026-03-07-erlang-ssh-design.md new file mode 100644 index 0000000..c403e7a --- /dev/null +++ b/docs/plans/2026-03-07-erlang-ssh-design.md @@ -0,0 +1,87 @@ +# Replace System SSH with Erlang :ssh Module — Design + +## Goal + +Replace all usage of system `ssh`/`sshpass` binaries with Erlang's built-in `:ssh` module while preserving the old approach as a pluggable alternative. + +## Architecture + +### Pluggable SSH Backend + +A behaviour that both backends implement: + +```elixir +defmodule Fusion.SshBackend do + @callback connect(target) :: {:ok, conn} | {:error, reason} + @callback forward_tunnel(conn, listen_port, remote_host, remote_port) :: {:ok, port} | {:error, reason} + @callback reverse_tunnel(conn, remote_port, local_host, local_port) :: {:ok, port} | {:error, reason} + @callback exec(conn, command) :: {:ok, output} | {:error, reason} + @callback close(conn) :: :ok +end +``` + +Two implementations: +- `Fusion.SshBackend.Erlang` — new, uses `:ssh` module (default) +- `Fusion.SshBackend.System` — current behavior, shells out to `ssh`/`sshpass` + +Configurable via `Fusion.Target`: +```elixir +%Fusion.Target{ + host: "10.0.1.5", + username: "deploy", + auth: {:key, "~/.ssh/id_ed25519"}, + ssh_backend: Fusion.SshBackend.Erlang # or Fusion.SshBackend.System +} +``` + +### Erlang SSH API Mapping + +| Fusion Need | Erlang :ssh API | +|-------------|-----------------| +| Forward tunnel (`ssh -L`) | `:ssh.tcpip_tunnel_to_server/6` | +| Reverse tunnel (`ssh -R`) | `:ssh.tcpip_tunnel_from_server/6` | +| Remote command execution | `:ssh_connection.session_channel/2` + `:ssh_connection.exec/4` | +| Password auth | `:ssh.connect(host, port, user: ..., password: ...)` | +| Key auth | `:ssh.connect(host, port, key_cb: {Fusion.SshKeyProvider, ...})` | +| Disconnect detection | `Process.monitor(conn_pid)` | +| Cleanup | `:ssh.close(conn)` | + +### Custom Key Provider + +`Fusion.SshKeyProvider` — implements `ssh_client_key_api` behaviour to load a specific key file by path. Preserves the current `{:key, "/path/to/key"}` UX instead of scanning a directory. + +### Module Changes + +**New modules:** +- `Fusion.SshBackend` — behaviour definition +- `Fusion.SshBackend.Erlang` — new `:ssh` based implementation +- `Fusion.SshBackend.System` — extracted from current code +- `Fusion.SshKeyProvider` — custom `ssh_client_key_api` for specific key files + +**Modified:** +- `Fusion.Target` — add `ssh_backend` field, default `Fusion.SshBackend.Erlang` +- `Fusion.NodeManager` — use backend behaviour instead of direct SSH calls +- `Fusion.Application` — ensure `:ssh` is started + +**Preserved (moved into System backend):** +- `Fusion.Utilities.Ssh` — used by System backend +- `Fusion.Utilities.Exec` — used by System backend +- `Fusion.SshPortTunnel` — used by System backend +- `Fusion.Connector` — refactored to use backend + +### Gotchas + +- All strings to `:ssh` must be charlists (`~c"..."`) +- `silently_accept_hosts: true` required for non-interactive use +- `user_interaction: false` to prevent stdin blocking +- `:ssh` app must be started before use +- One `session_channel` per `exec` call +- Exec output arrives async via `{:ssh_cm, conn, {:data, ...}}` messages +- OTP 22+ required for high-level tunnel APIs (we're on OTP 28) + +### Testing + +- Existing unit tests should continue passing +- Docker integration tests validate actual SSH behavior +- Add unit tests for `SshBackend.Erlang` and `SshKeyProvider` +- Test both backends in integration tests diff --git a/docs/plans/2026-03-07-erlang-ssh-plan.md b/docs/plans/2026-03-07-erlang-ssh-plan.md new file mode 100644 index 0000000..aa61be4 --- /dev/null +++ b/docs/plans/2026-03-07-erlang-ssh-plan.md @@ -0,0 +1,841 @@ +# Erlang SSH Backend Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Replace system SSH with Erlang's `:ssh` module while keeping the old approach as a pluggable alternative. + +**Architecture:** Define a `Fusion.SshBackend` behaviour with two implementations: `Erlang` (new, default) and `System` (existing code). `NodeManager` delegates SSH operations through the backend. A custom `SshKeyProvider` implements `ssh_client_key_api` for specific key file paths. + +**Tech Stack:** Elixir, Erlang `:ssh` / `:ssh_connection`, OTP 28 + +--- + +### Task 1: Define the SshBackend behaviour + +**Files:** +- Create: `lib/fusion/ssh_backend.ex` +- Test: `test/fusion/ssh_backend_test.exs` + +**Step 1: Write the test** + +```elixir +# test/fusion/ssh_backend_test.exs +defmodule Fusion.SshBackendTest do + use ExUnit.Case, async: true + + test "Fusion.SshBackend defines the expected callbacks" do + callbacks = Fusion.SshBackend.behaviour_info(:callbacks) + assert {:connect, 1} in callbacks + assert {:forward_tunnel, 4} in callbacks + assert {:reverse_tunnel, 4} in callbacks + assert {:exec, 2} in callbacks + assert {:exec_async, 2} in callbacks + assert {:close, 1} in callbacks + end +end +``` + +**Step 2: Run test to verify it fails** + +Run: `cd /home/eyal/projects/join-with/oss/fusion/.worktrees/erlang-ssh && mix test test/fusion/ssh_backend_test.exs` +Expected: FAIL — module not found + +**Step 3: Write the behaviour** + +```elixir +# lib/fusion/ssh_backend.ex +defmodule Fusion.SshBackend do + @moduledoc """ + Behaviour for SSH backends. + + Fusion supports pluggable SSH implementations. The default is + `Fusion.SshBackend.Erlang` which uses OTP's built-in :ssh module. + The legacy `Fusion.SshBackend.System` shells out to the system ssh binary. + """ + + @type conn :: term() + @type target :: Fusion.Target.t() + + @doc "Open an SSH connection to the target." + @callback connect(target()) :: {:ok, conn()} | {:error, term()} + + @doc "Create a forward tunnel (local listen port -> remote host:port)." + @callback forward_tunnel(conn(), non_neg_integer(), String.t(), non_neg_integer()) :: + {:ok, non_neg_integer()} | {:error, term()} + + @doc "Create a reverse tunnel (remote listen port -> local host:port)." + @callback reverse_tunnel(conn(), non_neg_integer(), String.t(), non_neg_integer()) :: + {:ok, non_neg_integer()} | {:error, term()} + + @doc "Execute a command on the remote host synchronously. Returns stdout." + @callback exec(conn(), String.t()) :: {:ok, String.t()} | {:error, term()} + + @doc "Execute a command on the remote host asynchronously. Returns port/pid for monitoring." + @callback exec_async(conn(), String.t()) :: {:ok, pid()} | {:error, term()} + + @doc "Close the SSH connection." + @callback close(conn()) :: :ok +end +``` + +**Step 4: Run test to verify it passes** + +Run: `mix test test/fusion/ssh_backend_test.exs` +Expected: PASS + +**Step 5: Commit** + +```bash +git add lib/fusion/ssh_backend.ex test/fusion/ssh_backend_test.exs +git commit -m "feat: define SshBackend behaviour" +``` + +--- + +### Task 2: Add ssh_backend field to Target + +**Files:** +- Modify: `lib/fusion/target.ex` +- Modify: `test/fusion/target_test.exs` + +**Step 1: Write the test** + +```elixir +# Add to test/fusion/target_test.exs +describe "ssh_backend" do + test "defaults to Fusion.SshBackend.Erlang" do + target = %Target{host: "x", port: 22, username: "u", auth: {:key, "/k"}} + assert target.ssh_backend == Fusion.SshBackend.Erlang + end + + test "can be set to System backend" do + target = %Target{ + host: "x", + port: 22, + username: "u", + auth: {:key, "/k"}, + ssh_backend: Fusion.SshBackend.System + } + + assert target.ssh_backend == Fusion.SshBackend.System + end +end +``` + +**Step 2: Run test to verify it fails** + +Run: `mix test test/fusion/target_test.exs` +Expected: FAIL — no ssh_backend field + +**Step 3: Add the field to Target struct** + +In `lib/fusion/target.ex`, add `ssh_backend` to defstruct with default: + +```elixir +defstruct host: nil, port: 22, username: nil, auth: nil, ssh_backend: Fusion.SshBackend.Erlang + +@type t :: %__MODULE__{ + host: String.t(), + port: non_neg_integer(), + username: String.t(), + auth: auth(), + ssh_backend: module() + } +``` + +**Step 4: Run tests** + +Run: `mix test test/fusion/target_test.exs` +Expected: PASS + +**Step 5: Run all tests** + +Run: `mix test` +Expected: All pass (existing tests don't set ssh_backend, default is fine) + +**Step 6: Commit** + +```bash +git add lib/fusion/target.ex test/fusion/target_test.exs +git commit -m "feat: add ssh_backend field to Target with Erlang default" +``` + +--- + +### Task 3: Create SshKeyProvider (custom key_cb) + +**Files:** +- Create: `lib/fusion/ssh_key_provider.ex` +- Test: `test/fusion/ssh_key_provider_test.exs` + +**Step 1: Write the test** + +```elixir +# test/fusion/ssh_key_provider_test.exs +defmodule Fusion.SshKeyProviderTest do + use ExUnit.Case, async: true + + @moduletag :tmp_dir + + describe "is_host_key/5" do + test "accepts any host key when silently accepting" do + assert Fusion.SshKeyProvider.is_host_key(:fake_key, "host", 22, :ssh_rsa, []) + end + end + + describe "user_key/2" do + test "reads a key file from the provided path", %{tmp_dir: tmp_dir} do + # Generate a test key + key_path = Path.join(tmp_dir, "test_key") + {_, 0} = System.cmd("ssh-keygen", ["-t", "ed25519", "-f", key_path, "-N", "", "-q"]) + + opts = [key_path: key_path] + result = Fusion.SshKeyProvider.user_key(:"ssh-ed25519", opts) + assert {:ok, _key} = result + end + + test "returns error for missing key file" do + opts = [key_path: "/nonexistent/key"] + assert {:error, _} = Fusion.SshKeyProvider.user_key(:"ssh-ed25519", opts) + end + end +end +``` + +**Step 2: Run test to verify it fails** + +Run: `mix test test/fusion/ssh_key_provider_test.exs` +Expected: FAIL — module not found + +**Step 3: Implement SshKeyProvider** + +```elixir +# lib/fusion/ssh_key_provider.ex +defmodule Fusion.SshKeyProvider do + @moduledoc """ + Custom SSH key callback that loads a specific key file by path. + + Implements the `:ssh_client_key_api` behaviour so that Fusion can + use `{:key, "/path/to/specific/key"}` auth with Erlang's :ssh module. + """ + + @behaviour :ssh_client_key_api + + @impl true + def is_host_key(_key, _host, _port, _algorithm, _opts) do + # Accept all host keys (equivalent to StrictHostKeyChecking=no) + true + end + + @impl true + def user_key(algorithm, opts) do + key_path = Keyword.fetch!(opts, :key_path) + + case File.read(key_path) do + {:ok, pem} -> + decode_private_key(pem, algorithm) + + {:error, reason} -> + {:error, {:file_read_error, key_path, reason}} + end + end + + defp decode_private_key(pem, _algorithm) do + try do + [{entry, _}] = :public_key.pem_decode(pem) + key = :public_key.pem_entry_decode(entry) + {:ok, key} + rescue + e -> {:error, {:decode_error, e}} + end + end +end +``` + +**Step 4: Run test to verify it passes** + +Run: `mix test test/fusion/ssh_key_provider_test.exs` +Expected: PASS + +**Step 5: Commit** + +```bash +git add lib/fusion/ssh_key_provider.ex test/fusion/ssh_key_provider_test.exs +git commit -m "feat: add SshKeyProvider for specific key file paths" +``` + +--- + +### Task 4: Implement SshBackend.Erlang + +**Files:** +- Create: `lib/fusion/ssh_backend/erlang.ex` +- Test: `test/fusion/ssh_backend/erlang_test.exs` + +**Step 1: Write the test** + +```elixir +# test/fusion/ssh_backend/erlang_test.exs +defmodule Fusion.SshBackend.ErlangTest do + use ExUnit.Case, async: true + + alias Fusion.SshBackend.Erlang, as: Backend + alias Fusion.Target + + describe "connect_opts/1" do + test "builds password auth options" do + target = %Target{host: "h", port: 22, username: "user", auth: {:password, "secret"}} + opts = Backend.connect_opts(target) + + assert Keyword.get(opts, :user) == ~c"user" + assert Keyword.get(opts, :password) == ~c"secret" + assert Keyword.get(opts, :silently_accept_hosts) == true + assert Keyword.get(opts, :user_interaction) == false + end + + test "builds key auth options" do + target = %Target{host: "h", port: 22, username: "user", auth: {:key, "/home/user/.ssh/id_ed25519"}} + opts = Backend.connect_opts(target) + + assert Keyword.get(opts, :user) == ~c"user" + assert Keyword.get(opts, :key_cb) == {Fusion.SshKeyProvider, key_path: "/home/user/.ssh/id_ed25519"} + assert Keyword.get(opts, :silently_accept_hosts) == true + assert Keyword.get(opts, :user_interaction) == false + refute Keyword.has_key?(opts, :password) + end + end +end +``` + +**Step 2: Run test to verify it fails** + +Run: `mix test test/fusion/ssh_backend/erlang_test.exs` +Expected: FAIL — module not found + +**Step 3: Implement the Erlang backend** + +```elixir +# lib/fusion/ssh_backend/erlang.ex +defmodule Fusion.SshBackend.Erlang do + @moduledoc """ + SSH backend using Erlang's built-in :ssh module. + + This is the default backend. It uses OTP's SSH implementation + for connections, tunnels, and remote command execution. + """ + + @behaviour Fusion.SshBackend + + @connect_timeout 15_000 + @exec_timeout 30_000 + + @impl true + def connect(%Fusion.Target{} = target) do + :ssh.start() + + host = String.to_charlist(target.host) + opts = connect_opts(target) + + case :ssh.connect(host, target.port, opts, @connect_timeout) do + {:ok, conn} -> {:ok, conn} + {:error, reason} -> {:error, reason} + end + end + + @impl true + def forward_tunnel(conn, listen_port, connect_host, connect_port) do + :ssh.tcpip_tunnel_to_server( + conn, + ~c"127.0.0.1", + listen_port, + String.to_charlist(connect_host), + connect_port, + @connect_timeout + ) + end + + @impl true + def reverse_tunnel(conn, listen_port, connect_host, connect_port) do + :ssh.tcpip_tunnel_from_server( + conn, + ~c"0.0.0.0", + listen_port, + String.to_charlist(connect_host), + connect_port, + @connect_timeout + ) + end + + @impl true + def exec(conn, command) do + with {:ok, ch} <- :ssh_connection.session_channel(conn, @exec_timeout), + :success <- :ssh_connection.exec(conn, ch, String.to_charlist(command), @exec_timeout) do + collect_output(conn, ch, <<>>, nil) + else + :failure -> {:error, :exec_failed} + {:error, reason} -> {:error, reason} + end + end + + @impl true + def exec_async(conn, command) do + parent = self() + + pid = + spawn_link(fn -> + {:ok, ch} = :ssh_connection.session_channel(conn, @exec_timeout) + :success = :ssh_connection.exec(conn, ch, String.to_charlist(command), @exec_timeout) + + # Keep the process alive to receive SSH messages + receive do + {:ssh_cm, ^conn, {:closed, ^ch}} -> :ok + after + :infinity -> :ok + end + end) + + {:ok, pid} + end + + @impl true + def close(conn) do + :ssh.close(conn) + :ok + end + + @doc false + def connect_opts(%Fusion.Target{} = target) do + base_opts = [ + user: String.to_charlist(target.username), + silently_accept_hosts: true, + user_interaction: false + ] + + auth_opts = + case target.auth do + {:password, password} -> + [password: String.to_charlist(password)] + + {:key, key_path} -> + [key_cb: {Fusion.SshKeyProvider, key_path: key_path}] + end + + base_opts ++ auth_opts + end + + defp collect_output(conn, ch, stdout, exit_code) do + receive do + {:ssh_cm, ^conn, {:data, ^ch, 0, data}} -> + collect_output(conn, ch, stdout <> data, exit_code) + + {:ssh_cm, ^conn, {:data, ^ch, 1, _stderr}} -> + collect_output(conn, ch, stdout, exit_code) + + {:ssh_cm, ^conn, {:eof, ^ch}} -> + collect_output(conn, ch, stdout, exit_code) + + {:ssh_cm, ^conn, {:exit_status, ^ch, code}} -> + collect_output(conn, ch, stdout, code) + + {:ssh_cm, ^conn, {:closed, ^ch}} -> + case exit_code do + 0 -> {:ok, stdout} + nil -> {:ok, stdout} + code -> {:error, {:exit_code, code, stdout}} + end + after + @exec_timeout -> + :ssh_connection.close(conn, ch) + {:error, :timeout} + end + end +end +``` + +**Step 4: Run test to verify it passes** + +Run: `mix test test/fusion/ssh_backend/erlang_test.exs` +Expected: PASS + +**Step 5: Commit** + +```bash +git add lib/fusion/ssh_backend/erlang.ex test/fusion/ssh_backend/erlang_test.exs +git commit -m "feat: implement Erlang SSH backend" +``` + +--- + +### Task 5: Extract SshBackend.System from existing code + +**Files:** +- Create: `lib/fusion/ssh_backend/system.ex` +- Test: `test/fusion/ssh_backend/system_test.exs` + +**Step 1: Write the test** + +```elixir +# test/fusion/ssh_backend/system_test.exs +defmodule Fusion.SshBackend.SystemTest do + use ExUnit.Case, async: true + + alias Fusion.SshBackend.System, as: Backend + alias Fusion.Target + + test "implements the SshBackend behaviour" do + behaviours = + Backend.__info__(:attributes) + |> Keyword.get_values(:behaviour) + |> List.flatten() + + assert Fusion.SshBackend in behaviours + end + + test "connect returns a conn struct" do + target = %Target{ + host: "example.com", + port: 22, + username: "deploy", + auth: {:key, "~/.ssh/id_rsa"}, + ssh_backend: Backend + } + + # connect won't actually work without a real server, but we can verify it exists + assert function_exported?(Backend, :connect, 1) + assert function_exported?(Backend, :forward_tunnel, 4) + assert function_exported?(Backend, :reverse_tunnel, 4) + assert function_exported?(Backend, :exec, 2) + assert function_exported?(Backend, :exec_async, 2) + assert function_exported?(Backend, :close, 1) + end +end +``` + +**Step 2: Implement System backend** + +Extract the current SSH command-building and OS process approach into `Fusion.SshBackend.System`. This wraps the existing `Fusion.Utilities.Ssh` and `Fusion.Utilities.Exec` modules behind the behaviour. + +```elixir +# lib/fusion/ssh_backend/system.ex +defmodule Fusion.SshBackend.System do + @moduledoc """ + SSH backend that shells out to the system `ssh` and `sshpass` binaries. + + This is the legacy backend. Use `Fusion.SshBackend.Erlang` (the default) + for a pure-Erlang implementation with no system binary dependencies. + """ + + @behaviour Fusion.SshBackend + + alias Fusion.Utilities.Ssh + alias Fusion.Utilities.Exec + alias Fusion.Net.Spot + + defmodule Conn do + @moduledoc false + defstruct auth: nil, remote: nil, tunnels: [], os_pids: [] + end + + @impl true + def connect(%Fusion.Target{} = target) do + {auth, remote} = Fusion.Target.to_auth_and_spot(target) + {:ok, %Conn{auth: auth, remote: remote}} + end + + @impl true + def forward_tunnel(%Conn{} = conn, listen_port, connect_host, connect_port) do + to_spot = %Spot{host: connect_host, port: connect_port} + + cmd = + Ssh.cmd_port_tunnel(conn.auth, conn.remote, listen_port, to_spot, :forward) + + case Exec.capture_std_mon(cmd) do + {:ok, port, _os_pid} -> + {:ok, listen_port} + + error -> + error + end + end + + @impl true + def reverse_tunnel(%Conn{} = conn, listen_port, connect_host, connect_port) do + to_spot = %Spot{host: connect_host, port: connect_port} + + cmd = + Ssh.cmd_port_tunnel(conn.auth, conn.remote, listen_port, to_spot, :reverse) + + case Exec.capture_std_mon(cmd) do + {:ok, port, _os_pid} -> + {:ok, listen_port} + + error -> + error + end + end + + @impl true + def exec(%Conn{} = conn, command) do + cmd = Ssh.cmd_remote(command, conn.auth, conn.remote) + Exec.run_sync_capture_std(cmd) + end + + @impl true + def exec_async(%Conn{} = conn, command) do + cmd = Ssh.cmd_remote(command, conn.auth, conn.remote) + + case Exec.capture_std_mon(cmd) do + {:ok, port, os_pid} -> {:ok, {port, os_pid}} + error -> error + end + end + + @impl true + def close(%Conn{} = _conn) do + # System SSH tunnels are cleaned up via Port.close in NodeManager + :ok + end +end +``` + +**Step 3: Run test to verify it passes** + +Run: `mix test test/fusion/ssh_backend/system_test.exs` +Expected: PASS + +**Step 4: Commit** + +```bash +git add lib/fusion/ssh_backend/system.ex test/fusion/ssh_backend/system_test.exs +git commit -m "feat: extract System SSH backend from existing code" +``` + +--- + +### Task 6: Refactor NodeManager to use SshBackend + +**Files:** +- Modify: `lib/fusion/node_manager.ex` +- Modify: `test/fusion/node_manager_test.exs` + +**Step 1: Update the test** + +Add test to verify the backend is used from target: + +```elixir +# Add to test/fusion/node_manager_test.exs +describe "backend selection" do + test "uses the backend from the target" do + target = %Target{ + host: "x", + port: 22, + username: "u", + auth: {:key, "/k"}, + ssh_backend: Fusion.SshBackend.System + } + + {:ok, pid} = NodeManager.start_link(target) + assert NodeManager.status(pid) == :disconnected + GenServer.stop(pid) + end +end +``` + +**Step 2: Refactor NodeManager** + +Rewrite `do_connect/1` and `setup_tunnels/6` to call `target.ssh_backend.connect/1`, `.forward_tunnel/4`, `.reverse_tunnel/4`, `.exec_async/2` instead of directly building SSH commands. + +Key changes: +- `do_connect/1`: call `target.ssh_backend.connect(target)` to get a conn, store it in state +- `setup_tunnels`: call `conn |> backend.reverse_tunnel(...)` and `conn |> backend.forward_tunnel(...)` +- `start_remote_node`: call `backend.exec_async(conn, cmd)` instead of `Ssh.cmd_remote |> Exec.capture_std_mon` +- `do_disconnect`: call `backend.close(conn)` instead of killing OS pids +- `kill_remote_process`: call `backend.exec(conn, "kill ...")` instead of building SSH command +- Store `conn` and `backend` in state struct, remove `tunnel_ports` and `remote_os_pid` + +The complete refactored `NodeManager` should: +1. On connect: `backend.connect(target)` → store conn +2. Setup 3 tunnels via backend calls +3. Start remote node via `backend.exec_async(conn, elixir_cmd)` +4. Wait for `Node.connect` as before +5. On disconnect: `Node.disconnect`, `backend.exec(conn, "kill ...")`, `backend.close(conn)` + +**Step 3: Run all tests** + +Run: `mix test` +Expected: All pass + +**Step 4: Commit** + +```bash +git add lib/fusion/node_manager.ex test/fusion/node_manager_test.exs +git commit -m "refactor: NodeManager uses pluggable SshBackend" +``` + +--- + +### Task 7: Start :ssh in Application + +**Files:** +- Modify: `lib/fusion/application.ex` + +**Step 1: Add :ssh to extra_applications in mix.exs** + +In `mix.exs`, add `:ssh` and `:public_key` to `extra_applications`: + +```elixir +def application do + [ + extra_applications: [:logger, :ssh, :public_key], + mod: {Fusion.Application, []} + ] +end +``` + +**Step 2: Run all tests** + +Run: `mix test` +Expected: All pass + +**Step 3: Commit** + +```bash +git add mix.exs +git commit -m "feat: add :ssh and :public_key to extra_applications" +``` + +--- + +### Task 8: Integration test with both backends + +**Files:** +- Modify: `test/fusion/node_manager_integration_test.exs` + +**Step 1: Parameterize the integration test for both backends** + +Duplicate the existing integration test to run with both `Fusion.SshBackend.Erlang` and `Fusion.SshBackend.System`. Use a helper to avoid code duplication: + +```elixir +# Add at the bottom of node_manager_integration_test.exs + +for backend <- [Fusion.SshBackend.Erlang, Fusion.SshBackend.System] do + backend_name = backend |> Module.split() |> List.last() + + @tag timeout: 30_000 + test "connect with #{backend_name} backend" do + case skip_unless_ssh_available() do + {:skip, reason} -> + IO.puts("SKIP: #{reason}") + + {:ok, ssh_key} -> + user = System.get_env("USER") + + target = %Target{ + host: "localhost", + port: 22, + username: user, + auth: {:key, ssh_key}, + ssh_backend: unquote(backend) + } + + {:ok, manager} = NodeManager.start_link(target) + + case NodeManager.connect(manager) do + {:ok, remote_node} -> + assert is_atom(remote_node) + assert remote_node in Node.list() + assert NodeManager.disconnect(manager) == :ok + refute remote_node in Node.list() + + {:error, :local_node_not_alive} -> + IO.puts("SKIP: Local node not alive") + + {:error, reason} -> + flunk("Failed with #{unquote(backend_name)}: #{inspect(reason)}") + end + + GenServer.stop(manager) + end + end +end +``` + +**Step 2: Commit** + +```bash +git add test/fusion/node_manager_integration_test.exs +git commit -m "test: add integration tests for both SSH backends" +``` + +--- + +### Task 9: Update README and docs + +**Files:** +- Modify: `README.md` + +**Step 1: Update Requirements section** + +Remove `sshpass` mention, add note about backends: + +```markdown +## Requirements + +- Elixir ~> 1.18 / OTP 28+ +- Remote server with Elixir/Erlang installed +- SSH access (key-based or password) +``` + +**Step 2: Add backend configuration section** + +After the Usage section, add: + +```markdown +### SSH Backend + +Fusion uses Erlang's built-in SSH module by default. No system `ssh` binary required. + +To use the legacy system SSH backend instead: + +\```elixir +target = %Fusion.Target{ + host: "10.0.1.5", + username: "deploy", + auth: {:key, "~/.ssh/id_ed25519"}, + ssh_backend: Fusion.SshBackend.System # uses system ssh/sshpass +} +\``` +``` + +**Step 3: Commit** + +```bash +git add README.md +git commit -m "docs: update README for Erlang SSH backend + +Closes #33" +``` + +--- + +### Task 10: Final verification + +**Step 1: Run full test suite** + +Run: `mix test` +Expected: All tests pass + +**Step 2: Compile with warnings as errors** + +Run: `mix compile --warnings-as-errors` +Expected: Clean compile + +**Step 3: Check formatting** + +Run: `mix format --check-formatted` +Expected: No formatting issues + +**Step 4: Run integration tests if available** + +Run: `elixir --sname test@localhost -S mix test --include integration` +Expected: Integration tests pass with both backends (or skip gracefully) diff --git a/lib/fusion/connector.ex b/lib/fusion/connector.ex index 1300b99..8433666 100644 --- a/lib/fusion/connector.ex +++ b/lib/fusion/connector.ex @@ -60,7 +60,7 @@ defmodule Fusion.Connector do ## Private defp do_start_connector(%__MODULE__{auth: auth, remote: remote} = state) do - origin_node = Net.get_erl_node() + {:ok, origin_node} = Net.get_erl_node() remote_node = gen_remote_node_info(origin_node.host, origin_node.cookie) epmd_port = Net.get_epmd_port() epmd_remote_port = Net.gen_port() diff --git a/lib/fusion/net.ex b/lib/fusion/net.ex index 0a0b4d7..dd445f8 100644 --- a/lib/fusion/net.ex +++ b/lib/fusion/net.ex @@ -5,30 +5,50 @@ defmodule Fusion.Net do @default_epmd_port 4369 - @doc "Generate a random port in the ephemeral range." - def gen_port(start_range \\ 49152, end_range \\ 65535) do - range = end_range - start_range - start_range + :rand.uniform(range) + @doc """ + Generate a random available port in the ephemeral range. + + Briefly binds to port 0 to get an OS-assigned free port. Falls back + to random selection if binding fails. + """ + @spec gen_port() :: pos_integer() + def gen_port do + case :gen_tcp.listen(0, []) do + {:ok, socket} -> + try do + {:ok, port} = :inet.port(socket) + port + after + :gen_tcp.close(socket) + end + + {:error, _} -> + Enum.random(49152..65535) + end end @doc "Get the current node's ErlNode info." - def get_erl_node( - {name, host} \\ node_self(), - port \\ get_self_port_from_epmd(), - cookie \\ Node.get_cookie() - ) do - %ErlNode{name: name, host: host, port: port, cookie: cookie} + @spec get_erl_node({String.t(), String.t()}, atom()) :: + {:ok, ErlNode.t()} | {:error, term()} + def get_erl_node(name_host \\ node_self(), cookie \\ Node.get_cookie()) do + {name, host} = name_host + + with {:ok, port} <- get_self_port_from_epmd(name_host) do + {:ok, %ErlNode{name: name, host: host, port: port, cookie: cookie}} + end end @doc "Query EPMD for the current node's distribution port." - def get_self_port_from_epmd({name, host} \\ node_self()) do - {:port, port, _} = - :erl_epmd.port_please( - String.to_charlist(name), - String.to_charlist(host) - ) - - port + @spec get_self_port_from_epmd({String.t(), String.t()}) :: + {:ok, pos_integer()} | {:error, term()} + def get_self_port_from_epmd(name_host \\ node_self()) do + {name, host} = name_host + + case :erl_epmd.port_please(String.to_charlist(name), String.to_charlist(host)) do + {:port, port, _} -> {:ok, port} + :noport -> {:error, {:epmd_lookup_failed, "node #{name}@#{host} not registered"}} + {:error, reason} -> {:error, {:epmd_lookup_failed, reason}} + end end @doc """ @@ -39,18 +59,53 @@ defmodule Fusion.Net do iex> Fusion.Net.node_self(:"master@my-computer-v3475-ad345") {"master", "my-computer-v3475-ad345"} """ + @spec node_self(atom()) :: {String.t(), String.t()} def node_self(full_name \\ Node.self()) do - full_name |> Atom.to_string() |> String.split("@") |> List.to_tuple() + case full_name |> Atom.to_string() |> String.split("@") do + [name, host] -> + {name, host} + + _ -> + raise ArgumentError, "expected a full node name (name@host), got: #{inspect(full_name)}" + end + end + + @doc """ + Generate `count` unique ports. Prevents self-collision when multiple + ports are needed simultaneously (e.g., for tunnels). + """ + @spec gen_unique_ports(pos_integer()) :: [pos_integer()] + def gen_unique_ports(count) do + do_gen_unique_ports(count, MapSet.new(), []) + end + + defp do_gen_unique_ports(0, _seen, acc), do: Enum.reverse(acc) + + defp do_gen_unique_ports(remaining, seen, acc) do + port = gen_port() + + if MapSet.member?(seen, port) do + do_gen_unique_ports(remaining, seen, acc) + else + do_gen_unique_ports(remaining - 1, MapSet.put(seen, port), [port | acc]) + end end @doc "Get the EPMD port (from ERL_EPMD_PORT env var or default 4369)." + @spec get_epmd_port(String.t() | nil, pos_integer()) :: pos_integer() def get_epmd_port( port_str \\ System.get_env("ERL_EPMD_PORT"), default \\ @default_epmd_port ) do case port_str do - nil -> default - _ -> String.to_integer(port_str) + nil -> + default + + _ -> + case Integer.parse(port_str) do + {port, ""} when port > 0 and port <= 65535 -> port + _ -> raise ArgumentError, "invalid ERL_EPMD_PORT: #{inspect(port_str)}" + end end end end diff --git a/lib/fusion/node_manager.ex b/lib/fusion/node_manager.ex index 933c677..a3e8b49 100644 --- a/lib/fusion/node_manager.ex +++ b/lib/fusion/node_manager.ex @@ -3,7 +3,7 @@ defmodule Fusion.NodeManager do GenServer that bootstraps and connects a remote BEAM node via SSH. The NodeManager: - 1. Uses SSH tunnels (via Connector) to bridge Erlang distribution + 1. Uses a pluggable SSH backend (via `target.ssh_backend`) for connections and tunnels 2. Starts a remote Erlang node via SSH with correct EPMD port and cookie 3. Connects the remote node to the local cluster 4. Provides lifecycle management (connect/disconnect) @@ -13,24 +13,26 @@ defmodule Fusion.NodeManager do require Logger alias Fusion.Target - alias Fusion.Utilities.Ssh - alias Fusion.Utilities.Exec alias Fusion.Net @connect_timeout 15_000 @connect_retry_interval 500 + @tunnel_retry_attempts 10 + @tunnel_retry_interval 500 @default_elixir_path "/usr/bin/env elixir" + @tunnel_connect_host "127.0.0.1" defstruct target: nil, status: :disconnected, remote_node_name: nil, - remote_os_pid: nil, - remote_port: nil, - epmd_tunnel_port: nil, - node_tunnel_port: nil, - tunnel_ports: [] + conn: nil - @type t :: %__MODULE__{} + @type t :: %__MODULE__{ + target: Target.t() | nil, + status: :disconnected | :connected, + remote_node_name: atom() | nil, + conn: term() | nil + } ## Public API @@ -50,7 +52,7 @@ defmodule Fusion.NodeManager do @doc "Disconnect from the remote node and clean up." def disconnect(server) do - GenServer.call(server, :disconnect) + GenServer.call(server, :disconnect, 60_000) end @doc "Get the remote node name (atom) if connected." @@ -67,6 +69,16 @@ defmodule Fusion.NodeManager do @impl true def init(%Target{} = target) do + backend = target.ssh_backend + + unless Code.ensure_loaded?(backend) and + Enum.all?(Fusion.SshBackend.behaviour_info(:callbacks), fn {fun, arity} -> + function_exported?(backend, fun, arity) + end) do + raise ArgumentError, + "#{inspect(backend)} does not implement the Fusion.SshBackend behaviour" + end + {:ok, %__MODULE__{target: target}} end @@ -81,46 +93,40 @@ defmodule Fusion.NodeManager do end end + @impl true def handle_call(:connect, _from, %{status: :connected} = state) do {:reply, {:ok, state.remote_node_name}, state} end + @impl true def handle_call(:disconnect, _from, %{status: :connected} = state) do new_state = do_disconnect(state) {:reply, :ok, new_state} end + @impl true def handle_call(:disconnect, _from, %{status: :disconnected} = state) do {:reply, :ok, state} end + @impl true def handle_call(:remote_node, _from, state) do {:reply, state.remote_node_name, state} end + @impl true def handle_call(:status, _from, state) do {:reply, state.status, state} end @impl true - def handle_info({port, {:data, _data}}, state) when is_port(port) do - {:noreply, state} - end - - def handle_info({port, {:exit_status, _code}}, state) when is_port(port) do - if port in state.tunnel_ports do - Logger.warning("Tunnel process exited unexpectedly") - {:noreply, %{state | status: :disconnected}} - else - {:noreply, state} - end - end - - def handle_info({:nodedown, node}, %{remote_node_name: node} = state) do + def handle_info({:nodedown, node}, %{remote_node_name: node, status: :connected} = state) do Logger.warning("Remote node #{node} went down") - {:noreply, %{state | status: :disconnected}} + new_state = do_disconnect(state) + {:noreply, new_state} end + @impl true def handle_info(_msg, state) do {:noreply, state} end @@ -131,91 +137,138 @@ defmodule Fusion.NodeManager do :ok end + @impl true def terminate(_reason, _state), do: :ok ## Private defp do_connect(state) do target = state.target - {auth, remote} = Target.to_auth_and_spot(target) + backend = target.ssh_backend + + with {:ok, local_node} <- ensure_local_node_alive() do + [remote_node_port, epmd_tunnel_port] = Net.gen_unique_ports(2) + + ports = %{ + epmd: Net.get_epmd_port(), + remote_node: remote_node_port, + epmd_tunnel: epmd_tunnel_port + } - local_node = ensure_local_node_alive!() - epmd_port = Net.get_epmd_port() - remote_node_port = Net.gen_port() - epmd_tunnel_port = Net.gen_port() + remote_node_name = gen_remote_node_name(target.host) - remote_node_name = gen_remote_node_name(remote.host) + Logger.info("Connecting to #{target.host}:#{target.port} as #{target.username}") - Logger.info("Connecting to #{remote.host}:#{remote.port} as #{target.username}") + with {:ok, conn} <- backend.connect(target) do + case do_connect_with_conn(state, backend, conn, local_node, remote_node_name, ports) do + {:ok, _} = success -> + success - with {:ok, tunnel_ports} <- - setup_tunnels(auth, remote, local_node, epmd_port, remote_node_port, epmd_tunnel_port), - {:ok, remote_port, remote_os_pid} <- - start_remote_node(auth, remote, remote_node_name, epmd_tunnel_port, remote_node_port), + {:error, _} = error -> + cleanup_failed_connect(backend, conn, remote_node_name) + error + end + end + end + end + + defp do_connect_with_conn(state, backend, conn, local_node, remote_node_name, ports) do + with :ok <- setup_tunnels(backend, conn, local_node, ports), + :ok <- launch_remote_node(backend, conn, remote_node_name, ports), :ok <- wait_for_connection(remote_node_name, @connect_timeout) do Logger.info("Connected to remote node #{remote_node_name}") Node.monitor(remote_node_name, true) - {:ok, - %{ - state - | status: :connected, - remote_node_name: remote_node_name, - remote_os_pid: remote_os_pid, - remote_port: remote_port, - epmd_tunnel_port: epmd_tunnel_port, - node_tunnel_port: nil, - tunnel_ports: tunnel_ports - }} + {:ok, %{state | status: :connected, remote_node_name: remote_node_name, conn: conn}} end end - defp setup_tunnels(auth, remote, local_node, epmd_port, remote_node_port, epmd_tunnel_port) do - ports = [] - - # Reverse tunnel: make local node's distribution port accessible on remote - {:ok, p1, _} = - Ssh.cmd_port_tunnel( - auth, - remote, - local_node.port, - %Fusion.Net.Spot{host: "localhost", port: local_node.port}, - :reverse - ) - |> Exec.capture_std_mon() - - # Forward tunnel: make remote node's distribution port accessible locally - {:ok, p2, _} = - Ssh.cmd_port_tunnel( - auth, - remote, - remote_node_port, - %Fusion.Net.Spot{host: "localhost", port: remote_node_port}, - :forward - ) - |> Exec.capture_std_mon() - - # Reverse tunnel: make local EPMD accessible on remote via tunneled port - {:ok, p3, _} = - Ssh.cmd_port_tunnel( - auth, - remote, - epmd_tunnel_port, - %Fusion.Net.Spot{host: "localhost", port: epmd_port}, - :reverse - ) - |> Exec.capture_std_mon() + defp setup_tunnels(backend, conn, local_node, ports) do + with {:ok, _} <- + retry_tunnel(fn -> + backend.reverse_tunnel(conn, local_node.port, @tunnel_connect_host, local_node.port) + end), + {:ok, _} <- + backend.forward_tunnel( + conn, + ports.remote_node, + @tunnel_connect_host, + ports.remote_node + ), + {:ok, _} <- + retry_tunnel(fn -> + backend.reverse_tunnel(conn, ports.epmd_tunnel, @tunnel_connect_host, ports.epmd) + end) do + :ok + end + end + + # Retry tunnel setup to handle transient :not_accepted errors that occur when + # a previous SSH connection's tunnel listener hasn't been fully released yet. + defp retry_tunnel(fun, attempt \\ 1) do + case fun.() do + {:ok, _} = ok -> + ok + + {:error, :not_accepted} when attempt < @tunnel_retry_attempts -> + Logger.debug("Tunnel not accepted, retrying (#{attempt}/#{@tunnel_retry_attempts})") + Process.sleep(@tunnel_retry_interval) + retry_tunnel(fun, attempt + 1) + + error -> + error + end + end + + defp launch_remote_node(backend, conn, remote_node_name, ports) do + cmd = build_remote_node_cmd(remote_node_name, ports.epmd_tunnel, ports.remote_node) + + case backend.exec_async(conn, cmd) do + {:ok, _} -> :ok + {:error, _} = error -> error + end + end - {:ok, [p1, p2, p3 | ports]} + defp cleanup_failed_connect(backend, conn, remote_node_name) do + kill_remote_process(backend, conn, remote_node_name) + safe_call(fn -> backend.close(conn) end) end - defp start_remote_node(auth, remote, node_name, epmd_port, node_port) do - cmd = build_remote_node_cmd(node_name, epmd_port, node_port) - remote_cmd = Ssh.cmd_remote(cmd, auth, remote) - {:ok, port, os_pid} = Exec.capture_std_mon(remote_cmd) - {:ok, port, os_pid} + # Kill the remote BEAM process via SSH exec. Used as cleanup when the + # distribution layer is unavailable. remote_node_name is internally generated + # (not user input), so shell interpolation here is safe. + defp kill_remote_process(backend, conn, remote_node_name) do + safe_call(fn -> + backend.exec( + conn, + "kill -9 $(pgrep -f -- '--sname #{remote_node_name}') 2>/dev/null || true" + ) + end) + end + + defp safe_call(fun) do + try do + fun.() + rescue + e -> + Logger.debug("Cleanup call failed: #{inspect(e)}") + :ok + catch + _, reason -> + Logger.debug("Cleanup call failed: #{inspect(reason)}") + :ok + end end + # Extension point: if you need alternate remote node types (e.g., rebar3-based + # Erlang nodes), this is the function to replace. + # + # All values interpolated into this command (cookie, node_name, ports) come + # from trusted internal sources — cookie from Node.get_cookie(), node_name + # from gen_remote_node_name/1, and ports from Net.gen_port/0. + # + # Note: the cookie is visible via `ps aux` on the remote host. For multi-tenant + # environments, consider using ~/.erlang.cookie instead. defp build_remote_node_cmd(node_name, epmd_port, node_port) do cookie = Node.get_cookie() @@ -223,7 +276,7 @@ defmodule Fusion.NodeManager do "ERL_EPMD_PORT=#{epmd_port}", @default_elixir_path, "--sname #{node_name}", - "--cookie #{cookie}", + "--cookie '#{cookie}'", "--erl \"-kernel inet_dist_listen_min #{node_port} inet_dist_listen_max #{node_port}\"", "-e \"Process.sleep(:infinity)\"" ] @@ -254,44 +307,45 @@ defmodule Fusion.NodeManager do end defp do_disconnect(state) do - # Disconnect from the remote node + backend = state.target.ssh_backend + if state.remote_node_name do - Node.disconnect(state.remote_node_name) - end + # Stop monitoring before teardown to prevent :nodedown re-entering do_disconnect + safe_call(fn -> Node.monitor(state.remote_node_name, false) end) - # Kill the remote Elixir process - if state.remote_os_pid do - kill_remote_process(state) - end + # Request graceful shutdown via async RPC (cast won't block if remote is hung) + safe_call(fn -> :rpc.cast(state.remote_node_name, System, :stop, [0]) end) + safe_call(fn -> Node.disconnect(state.remote_node_name) end) - # Close tunnel ports - for port <- state.tunnel_ports, Port.info(port) != nil do - Port.close(port) + # Belt-and-suspenders: always kill via SSH as well, since rpc.cast is + # fire-and-forget and we cannot know if the graceful shutdown succeeded. + if state.conn do + kill_remote_process(backend, state.conn, state.remote_node_name) + end end - %{state | status: :disconnected, remote_node_name: nil, tunnel_ports: []} - end + # Close SSH connection + if state.conn do + safe_call(fn -> backend.close(state.conn) end) + end - defp kill_remote_process(state) do - {auth, remote} = Target.to_auth_and_spot(state.target) - kill_cmd = "kill #{state.remote_os_pid} 2>/dev/null || true" - Ssh.cmd_remote(kill_cmd, auth, remote) |> Exec.run_sync_capture_std() - rescue - _ -> :ok + %{state | status: :disconnected, remote_node_name: nil, conn: nil} end - defp ensure_local_node_alive! do - unless Node.alive?() do - raise "Local node must be alive (started with --sname or --name) to use Fusion" + defp ensure_local_node_alive do + if Node.alive?() do + Net.get_erl_node() + else + {:error, :local_node_not_alive} end - - Net.get_erl_node() end - defp gen_remote_node_name(_host) do - # Always use @localhost because all distribution traffic goes through SSH tunnels - # that bind on localhost. Using the actual remote hostname would bypass the tunnels. - id = :rand.uniform(999_999) |> Integer.to_string() |> String.pad_leading(6, "0") - :"fusion_worker_#{id}@localhost" + # Always use @localhost because all distribution traffic goes through SSH tunnels + # that bind on localhost. Using the actual remote hostname would bypass the tunnels. + # The host label is included for debuggability when multiple connections exist. + defp gen_remote_node_name(host) do + id = :rand.bytes(8) |> Base.encode16(case: :lower) + label = host |> String.replace(~r/[^a-zA-Z0-9]/, "_") |> String.slice(0, 20) + :"fusion_#{label}_#{id}@localhost" end end diff --git a/lib/fusion/ssh_backend.ex b/lib/fusion/ssh_backend.ex new file mode 100644 index 0000000..a031b24 --- /dev/null +++ b/lib/fusion/ssh_backend.ex @@ -0,0 +1,49 @@ +defmodule Fusion.SshBackend do + @moduledoc """ + Behaviour for SSH backends. + + Fusion supports pluggable SSH implementations. The default is + `Fusion.SshBackend.Erlang` which uses OTP's built-in :ssh module. + The legacy `Fusion.SshBackend.System` shells out to the system ssh binary. + """ + + @type conn :: term() + @type target :: Fusion.Target.t() + + @doc "Open an SSH connection to the target." + @callback connect(target()) :: {:ok, conn()} | {:error, term()} + + @doc "Create a forward tunnel (local listen port -> remote host:port)." + @callback forward_tunnel(conn(), non_neg_integer(), String.t(), non_neg_integer()) :: + {:ok, non_neg_integer()} | {:error, term()} + + @doc "Create a reverse tunnel (remote listen port -> local host:port)." + @callback reverse_tunnel(conn(), non_neg_integer(), String.t(), non_neg_integer()) :: + {:ok, non_neg_integer()} | {:error, term()} + + @type exec_error :: + {:exit_status, integer(), stdout :: String.t(), stderr :: String.t()} + | :exec_failed + | :timeout + | :output_exceeded_limit + + @doc """ + Execute a command on the remote host synchronously. Returns stdout on success. + + Non-zero exits return `{:error, {:exit_status, code, stdout, stderr}}`. + """ + @callback exec(conn(), String.t()) :: {:ok, String.t()} | {:error, exec_error()} + + @doc """ + Execute a command on the remote host asynchronously (fire-and-forget). + + Returns `{:ok, pid}` where `pid` is the process handling the async command. + The caller should not monitor or interact with this pid — it exists only to + confirm the command was launched. Output and exit status are discarded. + Use `exec/2` if you need results. + """ + @callback exec_async(conn(), String.t()) :: {:ok, pid()} | {:error, term()} + + @doc "Close the SSH connection." + @callback close(conn()) :: :ok +end diff --git a/lib/fusion/ssh_backend/erlang.ex b/lib/fusion/ssh_backend/erlang.ex new file mode 100644 index 0000000..5652e3d --- /dev/null +++ b/lib/fusion/ssh_backend/erlang.ex @@ -0,0 +1,207 @@ +defmodule Fusion.SshBackend.Erlang do + @moduledoc """ + SSH backend using Erlang's built-in :ssh module. + + This is the default backend. It uses OTP's SSH implementation + for connections, tunnels, and remote command execution. + + ## Security + + Host key verification is disabled (`silently_accept_hosts: true`). + All remote host keys are accepted without checking, equivalent to + `StrictHostKeyChecking=no` in OpenSSH. This means connections are + vulnerable to MITM attacks. This is acceptable for trusted networks + and development, but should be noted for production deployments. + """ + + require Logger + + @behaviour Fusion.SshBackend + + @localhost_bind ~c"127.0.0.1" + @connect_timeout 15_000 + @tunnel_timeout 15_000 + # Idle timeout: resets on each received message, not a wall-clock limit + @exec_timeout 30_000 + @exec_async_timeout 300_000 + @idle_timeout 120_000 + @max_output_bytes 10_485_760 + + @impl true + def connect(%Fusion.Target{} = target) do + Logger.debug("SSH connecting to #{target.host}:#{target.port} as #{target.username}") + + host = String.to_charlist(target.host) + opts = connect_opts(target) + + case :ssh.connect(host, target.port, opts, @connect_timeout) do + {:ok, conn} -> + Logger.debug("SSH connected to #{target.host}:#{target.port}") + {:ok, conn} + + {:error, reason} -> + Logger.warning( + "SSH connection to #{target.host}:#{target.port} failed: #{inspect(reason)}" + ) + + {:error, reason} + end + end + + @impl true + def forward_tunnel(conn, listen_port, connect_host, connect_port) do + Logger.debug( + "Creating forward tunnel: localhost:#{listen_port} -> #{connect_host}:#{connect_port}" + ) + + :ssh.tcpip_tunnel_to_server( + conn, + @localhost_bind, + listen_port, + String.to_charlist(connect_host), + connect_port, + @tunnel_timeout + ) + end + + @impl true + def reverse_tunnel(conn, listen_port, connect_host, connect_port) do + Logger.debug( + "Creating reverse tunnel: remote:#{listen_port} -> #{connect_host}:#{connect_port}" + ) + + :ssh.tcpip_tunnel_from_server( + conn, + @localhost_bind, + listen_port, + String.to_charlist(connect_host), + connect_port, + @tunnel_timeout + ) + end + + @impl true + def exec(conn, command) do + with {:ok, ch} <- :ssh_connection.session_channel(conn, @exec_timeout), + :success <- :ssh_connection.exec(conn, ch, String.to_charlist(command), @exec_timeout) do + collect_output(conn, ch, <<>>, <<>>, nil) + else + :failure -> + Logger.warning("SSH exec failed: :exec_failed") + {:error, :exec_failed} + + {:error, reason} -> + Logger.warning("SSH exec failed: #{inspect(reason)}") + {:error, reason} + end + end + + @impl true + def exec_async(conn, command) do + pid = + spawn(fn -> + case :ssh_connection.session_channel(conn, @exec_timeout) do + {:ok, ch} -> + case :ssh_connection.exec(conn, ch, String.to_charlist(command), @exec_timeout) do + :success -> + ref = Process.monitor(conn) + + receive do + {:ssh_cm, ^conn, {:closed, ^ch}} -> + Process.demonitor(ref, [:flush]) + + {:DOWN, ^ref, :process, ^conn, _reason} -> + :ok + after + @exec_async_timeout -> + Process.demonitor(ref, [:flush]) + :ssh_connection.close(conn, ch) + Logger.warning("SSH exec_async timed out after #{@exec_async_timeout}ms") + end + + :failure -> + :ssh_connection.close(conn, ch) + Logger.warning("SSH exec_async failed: exec returned :failure") + end + + {:error, reason} -> + Logger.warning("SSH exec_async failed to open channel: #{inspect(reason)}") + end + end) + + {:ok, pid} + end + + @impl true + def close(conn) do + try do + :ssh.close(conn) + Logger.debug("SSH connection closed") + rescue + e -> Logger.debug("SSH close failed: #{inspect(e)}") + catch + _, reason -> Logger.debug("SSH close failed: #{inspect(reason)}") + end + + :ok + end + + defp connect_opts(%Fusion.Target{} = target) do + base_opts = [ + user: String.to_charlist(target.username), + silently_accept_hosts: true, + user_interaction: false, + idle_time: @idle_timeout + ] + + auth_opts = + case target.auth do + {:password, password} -> + [password: String.to_charlist(password)] + + {:key, key_path} -> + [key_cb: {Fusion.SshKeyProvider, key_path: key_path}] + end + + base_opts ++ auth_opts + end + + defp collect_output(conn, ch, stdout, stderr, exit_code) do + if byte_size(stdout) + byte_size(stderr) > @max_output_bytes do + Logger.warning("SSH exec output exceeded #{@max_output_bytes} bytes, closing channel") + :ssh_connection.close(conn, ch) + {:error, :output_exceeded_limit} + else + do_collect_output(conn, ch, stdout, stderr, exit_code) + end + end + + defp do_collect_output(conn, ch, stdout, stderr, exit_code) do + receive do + {:ssh_cm, ^conn, {:data, ^ch, 0, data}} -> + collect_output(conn, ch, stdout <> data, stderr, exit_code) + + {:ssh_cm, ^conn, {:data, ^ch, 1, data}} -> + collect_output(conn, ch, stdout, stderr <> data, exit_code) + + {:ssh_cm, ^conn, {:eof, ^ch}} -> + collect_output(conn, ch, stdout, stderr, exit_code) + + {:ssh_cm, ^conn, {:exit_status, ^ch, code}} -> + collect_output(conn, ch, stdout, stderr, code) + + {:ssh_cm, ^conn, {:closed, ^ch}} -> + case exit_code do + 0 -> {:ok, stdout} + # Some SSH servers close the channel without sending exit_status. + # Treat this as success since we received all the output. + nil -> {:ok, stdout} + code -> {:error, {:exit_status, code, stdout, stderr}} + end + after + @exec_timeout -> + :ssh_connection.close(conn, ch) + {:error, :timeout} + end + end +end diff --git a/lib/fusion/ssh_backend/system.ex b/lib/fusion/ssh_backend/system.ex new file mode 100644 index 0000000..ce27885 --- /dev/null +++ b/lib/fusion/ssh_backend/system.ex @@ -0,0 +1,162 @@ +defmodule Fusion.SshBackend.System do + @moduledoc """ + SSH backend that shells out to the system `ssh` and `sshpass` binaries. + + This is the legacy backend. Use `Fusion.SshBackend.Erlang` (the default) + for a pure-Erlang implementation with no system binary dependencies. + """ + + @behaviour Fusion.SshBackend + + require Logger + + alias Fusion.Utilities.Ssh + alias Fusion.Utilities.Exec + alias Fusion.Net.Spot + + @drain_timeout 300_000 + + defmodule Conn do + @moduledoc false + @enforce_keys [:auth, :remote, :resource_tracker] + defstruct [:auth, :remote, :resource_tracker] + + defimpl Inspect do + def inspect(%{auth: auth, remote: remote}, opts) do + redacted_auth = + case auth do + %{password: _} = a -> %{a | password: "**REDACTED**"} + other -> other + end + + Inspect.Algebra.concat([ + "#Fusion.SshBackend.System.Conn<", + Inspect.Algebra.to_doc(%{auth: redacted_auth, remote: remote}, opts), + ">" + ]) + end + end + end + + # Note: unlike the Erlang backend, connect/1 does not establish a TCP connection. + # Connection validation is deferred until the first tunnel or exec call. + @impl true + def connect(%Fusion.Target{} = target) do + {auth, remote} = to_auth_and_spot(target) + {:ok, tracker} = Agent.start_link(fn -> [] end) + {:ok, %Conn{auth: auth, remote: remote, resource_tracker: tracker}} + end + + defp to_auth_and_spot(%Fusion.Target{} = target) do + auth = + case target.auth do + {:key, path} -> %{username: target.username, key_path: path} + {:password, pass} -> %{username: target.username, password: pass} + end + + remote = %Spot{host: target.host, port: target.port} + {auth, remote} + end + + @impl true + def forward_tunnel(%Conn{} = conn, listen_port, connect_host, connect_port) do + do_tunnel(conn, listen_port, connect_host, connect_port, :forward) + end + + @impl true + def reverse_tunnel(%Conn{} = conn, listen_port, connect_host, connect_port) do + do_tunnel(conn, listen_port, connect_host, connect_port, :reverse) + end + + defp do_tunnel(conn, listen_port, connect_host, connect_port, direction) do + to_spot = %Spot{host: connect_host, port: connect_port} + cmd = Ssh.cmd_port_tunnel(conn.auth, conn.remote, listen_port, to_spot, direction) + + case Exec.capture_std_mon(cmd, env: password_env(conn.auth)) do + {:ok, port, os_pid} -> + Agent.update(conn.resource_tracker, &[{port, os_pid} | &1]) + {:ok, listen_port} + + error -> + error + end + end + + @impl true + def exec(%Conn{} = conn, command) do + cmd = Ssh.cmd_remote(command, conn.auth, conn.remote) + + case Exec.run_sync_capture_std(cmd, env: password_env(conn.auth)) do + {:ok, output} -> {:ok, output} + {:error, {code, output}} -> {:error, {:exit_status, code, output, ""}} + end + end + + @impl true + def exec_async(%Conn{} = conn, command) do + cmd = Ssh.cmd_remote(command, conn.auth, conn.remote) + + port_env = + Enum.map(password_env(conn.auth), fn {k, v} -> + {String.to_charlist(k), String.to_charlist(v)} + end) + + pid = + spawn(fn -> + port = + Port.open({:spawn_executable, "/bin/sh"}, [ + :binary, + :exit_status, + :stderr_to_stdout, + {:args, ["-c", cmd]}, + {:env, port_env} + ]) + + drain_port(port) + end) + + {:ok, pid} + end + + defp drain_port(port) do + receive do + {^port, {:data, _data}} -> drain_port(port) + {^port, {:exit_status, _code}} -> :ok + after + @drain_timeout -> + Port.close(port) + Logger.warning("System exec_async drain_port timed out after #{@drain_timeout}ms") + end + end + + defp password_env(%{password: password}), do: [{"SSHPASS", password}] + defp password_env(_auth), do: [] + + @impl true + def close(%Conn{resource_tracker: tracker} = _conn) do + resources = Agent.get(tracker, & &1) + + # Close ports (sends SIGHUP), letting SSH processes cleanly disconnect. + # A clean disconnect sends SSH_MSG_DISCONNECT so the remote sshd + # immediately releases tunnel listeners. SIGKILL would skip this, + # leaving the remote holding stale listeners until TCP keepalive fires. + for {port, _os_pid} <- resources do + try do + Port.close(port) + catch + _, _ -> :ok + end + end + + # Brief wait for clean SSH disconnects to complete + Process.sleep(200) + + # SIGTERM any stragglers (still allows clean shutdown unlike SIGKILL) + for {_port, os_pid} <- resources do + System.cmd("kill", [to_string(os_pid)], stderr_to_stdout: true) + end + + Agent.stop(tracker) + :ok + end +end diff --git a/lib/fusion/ssh_key_provider.ex b/lib/fusion/ssh_key_provider.ex new file mode 100644 index 0000000..8cc99a0 --- /dev/null +++ b/lib/fusion/ssh_key_provider.ex @@ -0,0 +1,177 @@ +defmodule Fusion.SshKeyProvider do + @moduledoc """ + Custom SSH key callback that loads a specific key file by path. + + Implements the `:ssh_client_key_api` behaviour so that Fusion can + use `{:key, "/path/to/specific/key"}` auth with Erlang's :ssh module. + + ## Security + + Host key verification is disabled — `is_host_key/4,5` always returns `true`. + This is equivalent to `StrictHostKeyChecking=no` in OpenSSH. + """ + + @behaviour :ssh_client_key_api + + require Logger + + # RFC 8410 OIDs for EdDSA curves + @oid_ed25519 {1, 3, 101, 112} + @oid_ed448 {1, 3, 101, 113} + + @impl true + def is_host_key(_key, _host, _port, _algorithm, _opts) do + # Accept all host keys (equivalent to StrictHostKeyChecking=no) + true + end + + @impl true + def is_host_key(_key, _host, _algorithm, _opts) do + true + end + + @impl true + def add_host_key(_host, _port, _key, _opts) do + :ok + end + + @impl true + def add_host_key(_host, _key, _opts) do + :ok + end + + @impl true + def user_key(algorithm, opts) do + # Erlang SSH wraps key_cb options under :key_cb_private + private_opts = Keyword.get(opts, :key_cb_private, opts) + key_path = Keyword.fetch!(private_opts, :key_path) + + case File.read(key_path) do + {:ok, key_data} -> + case decode_private_key(key_data) do + {:ok, key} -> + expected = key_type_for_algorithm(algorithm) + actual = key_type(key) + + # nil means unknown algorithm — skip the check rather than false-positive + if expected != nil and expected != actual do + Logger.debug( + "SSH key type mismatch: requested #{algorithm} but key at #{key_path} is #{actual}" + ) + + {:error, :key_type_mismatch} + else + {:ok, key} + end + + error -> + error + end + + {:error, reason} -> + {:error, {:file_read_error, key_path, reason}} + end + end + + defp decode_private_key(data) do + # Try OpenSSH key v1 format first (modern ssh-keygen default), + # then fall back to PEM format for older keys. + # Short-circuit on definitive errors like :encrypted_key. + case decode_openssh_key(data) do + {:ok, _} = ok -> ok + {:error, :encrypted_key} = err -> err + {:error, _} -> fallback_decode_pem_key(data) + end + end + + defp fallback_decode_pem_key(data) do + case decode_pem_key(data) do + {:ok, _} = ok -> ok + {:error, :encrypted_key} = err -> err + {:error, _} -> {:error, :unsupported_key_format} + end + end + + defp decode_openssh_key(data) do + case :ssh_file.decode(data, :openssh_key_v1) do + [{key, _attrs} | _] -> + {:ok, key} + + _ -> + # Decode failed — check if the key is encrypted (cipher != "none"). + # OpenSSH v1 keys embed encryption info in the binary, not the PEM wrapper. + if openssh_key_encrypted?(data), + do: {:error, :encrypted_key}, + else: {:error, :openssh_decode_failed} + end + rescue + e -> + Logger.debug("OpenSSH key decode raised: #{inspect(e)}") + {:error, :openssh_decode_failed} + catch + _, reason -> + Logger.debug("OpenSSH key decode threw: #{inspect(reason)}") + {:error, :openssh_decode_failed} + end + + # Check if an OpenSSH v1 key is encrypted by inspecting the cipher field. + # Format: "openssh-key-v1\0" + uint32 cipher_len + cipher_name + ... + # Unencrypted keys use cipher "none". + defp openssh_key_encrypted?(pem_data) do + case :public_key.pem_decode(pem_data) do + [{_type, der, _info} | _] -> + case der do + <<"openssh-key-v1", 0, len::32, cipher::binary-size(len), _::binary>> -> + cipher != "none" + + _ -> + false + end + + _ -> + false + end + end + + defp decode_pem_key(data) do + case :public_key.pem_decode(data) do + [{_type, _der, :not_encrypted} = entry | _] -> + key = :public_key.pem_entry_decode(entry) + {:ok, key} + + [{_type, _der, _encryption_info} | _] -> + {:error, :encrypted_key} + + _ -> + {:error, :pem_decode_failed} + end + rescue + e -> + Logger.debug("PEM key decode raised: #{inspect(e)}") + {:error, :pem_decode_failed} + catch + _, reason -> + Logger.debug("PEM key decode threw: #{inspect(reason)}") + {:error, :pem_decode_failed} + end + + defp key_type_for_algorithm(:"ssh-ed25519"), do: :ed25519 + defp key_type_for_algorithm(:"ssh-ed448"), do: :ed448 + defp key_type_for_algorithm(:"ssh-rsa"), do: :rsa + defp key_type_for_algorithm(:"rsa-sha2-256"), do: :rsa + defp key_type_for_algorithm(:"rsa-sha2-512"), do: :rsa + defp key_type_for_algorithm(:"ecdsa-sha2-nistp256"), do: :ecdsa + defp key_type_for_algorithm(:"ecdsa-sha2-nistp384"), do: :ecdsa + defp key_type_for_algorithm(:"ecdsa-sha2-nistp521"), do: :ecdsa + defp key_type_for_algorithm(_), do: nil + + defp key_type({:ed_pri, :ed25519, _, _}), do: :ed25519 + defp key_type({:ed_pri, :ed448, _, _}), do: :ed448 + # OTP 28 may represent Ed25519/Ed448 as ECPrivateKey with namedCurve OIDs + defp key_type({:ECPrivateKey, _, _, {:namedCurve, @oid_ed25519}, _, _}), do: :ed25519 + defp key_type({:ECPrivateKey, _, _, {:namedCurve, @oid_ed448}, _, _}), do: :ed448 + # ECDSA curves (NIST P-256/P-384/P-521) + defp key_type({:ECPrivateKey, _, _, {:namedCurve, _}, _, _}), do: :ecdsa + defp key_type({:RSAPrivateKey, _, _, _, _, _, _, _, _, _, _}), do: :rsa + defp key_type(_), do: :unknown +end diff --git a/lib/fusion/target.ex b/lib/fusion/target.ex index 6b431f2..7ea67c8 100644 --- a/lib/fusion/target.ex +++ b/lib/fusion/target.ex @@ -1,30 +1,27 @@ defmodule Fusion.Target do @moduledoc "Represents an SSH connection target." - defstruct host: nil, port: 22, username: nil, auth: nil + @enforce_keys [:host, :username, :auth] + defstruct [:host, :username, :auth, port: 22, ssh_backend: Fusion.SshBackend.Erlang] @type auth :: {:key, String.t()} | {:password, String.t()} @type t :: %__MODULE__{ host: String.t(), - port: non_neg_integer(), + port: pos_integer(), username: String.t(), - auth: auth() + auth: auth(), + ssh_backend: module() } - @doc """ - Converts a Target into the legacy auth/remote format used by tunnel modules. + defimpl Inspect do + def inspect(%{auth: {:password, _}} = target, opts) do + redacted = %{target | auth: {:password, "**REDACTED**"}} + Inspect.Any.inspect(redacted, opts) + end - Returns `{auth_map, %Fusion.Net.Spot{}}`. - """ - def to_auth_and_spot(%__MODULE__{} = target) do - auth = - case target.auth do - {:key, path} -> %{username: target.username, key_path: path} - {:password, pass} -> %{username: target.username, password: pass} - end - - remote = %Fusion.Net.Spot{host: target.host, port: target.port} - {auth, remote} + def inspect(target, opts) do + Inspect.Any.inspect(target, opts) + end end end diff --git a/lib/fusion/utilities/bash.ex b/lib/fusion/utilities/bash.ex index 34eaefa..7e91bd8 100644 --- a/lib/fusion/utilities/bash.ex +++ b/lib/fusion/utilities/bash.ex @@ -4,14 +4,21 @@ defmodule Fusion.Utilities.Bash do @doc ~S""" Escapes a string for use inside double-quoted shell strings. + Escapes backslashes, double quotes, dollar signs, and backticks. + ## Examples iex> Fusion.Utilities.Bash.escape_str(~s(echo "$response")) ~s(echo \\"\\$response\\") + + iex> Fusion.Utilities.Bash.escape_str(~s(a\\b`c`)) + ~s(a\\\\b\\`c\\`) """ def escape_str(str) do str + |> String.replace("\\", "\\\\") |> String.replace("\"", "\\\"") |> String.replace("$", "\\$") + |> String.replace("`", "\\`") end end diff --git a/lib/fusion/utilities/exec.ex b/lib/fusion/utilities/exec.ex index e9da39a..2729185 100644 --- a/lib/fusion/utilities/exec.ex +++ b/lib/fusion/utilities/exec.ex @@ -9,12 +9,17 @@ defmodule Fusion.Utilities.Exec do - `{port, {:data, data}}` for stdout output - `{port, {:exit_status, status}}` when the process exits """ - def capture_std_mon(cmd) do + def capture_std_mon(cmd, opts \\ []) do + env = Keyword.get(opts, :env, []) + port_env = Enum.map(env, fn {k, v} -> {String.to_charlist(k), String.to_charlist(v)} end) + port = - Port.open({:spawn, cmd}, [ + Port.open({:spawn_executable, "/bin/sh"}, [ :binary, :exit_status, - :stderr_to_stdout + :stderr_to_stdout, + {:args, ["-c", cmd]}, + {:env, port_env} ]) {:os_pid, os_pid} = Port.info(port, :os_pid) @@ -25,8 +30,10 @@ defmodule Fusion.Utilities.Exec do Runs a command synchronously and captures output. Returns `{:ok, output}` or `{:error, {exit_code, output}}`. """ - def run_sync_capture_std(cmd) do - case System.cmd("/bin/sh", ["-c", cmd], stderr_to_stdout: true) do + def run_sync_capture_std(cmd, opts \\ []) do + env = Keyword.get(opts, :env, []) + + case System.cmd("/bin/sh", ["-c", cmd], stderr_to_stdout: true, env: env) do {output, 0} -> {:ok, output} {output, code} -> {:error, {code, output}} end diff --git a/lib/fusion/utilities/ssh.ex b/lib/fusion/utilities/ssh.ex index 7216162..1e6b009 100644 --- a/lib/fusion/utilities/ssh.ex +++ b/lib/fusion/utilities/ssh.ex @@ -23,13 +23,16 @@ defmodule Fusion.Utilities.Ssh do @doc """ Generate sshpass prefix for password authentication. + Uses `-e` flag to read password from SSHPASS environment variable, + avoiding shell injection and password exposure in `ps` output. + ## Examples - iex> Fusion.Utilities.Ssh.partial_cmd_sshpass("abcd1234!", "/usr/bin/sshpass") - "/usr/bin/sshpass -p abcd1234!" + iex> Fusion.Utilities.Ssh.partial_cmd_sshpass("/usr/bin/sshpass") + "/usr/bin/sshpass -e" """ - def partial_cmd_sshpass(password, sshpass_path \\ @default_sshpass_path) do - "#{sshpass_path} -p #{password}" + def partial_cmd_sshpass(sshpass_path \\ @default_sshpass_path) do + "#{sshpass_path} -e" end @doc """ @@ -59,24 +62,27 @@ defmodule Fusion.Utilities.Ssh do @doc """ Generate a full SSH command string. + For password auth, the caller must set the `SSHPASS` environment variable + before executing the returned command. + ## Examples iex> Fusion.Utilities.Ssh.cmd("-nNT -R 3001:localhost:3002", %{username: "john", password: "abcd1234"}, ...> %Fusion.Net.Spot{host: "example.com", port: 22}) - "/usr/bin/env sshpass -p abcd1234 /usr/bin/env ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p 22 -nNT -R 3001:localhost:3002 john@example.com" + "/usr/bin/env sshpass -e /usr/bin/env ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p 22 -nNT -R 3001:localhost:3002 john@example.com" iex> Fusion.Utilities.Ssh.cmd("-nNT -R 3001:localhost:3002", %{username: "john", key_path: "/home/john/.ssh/id_rsa"}, ...> %Fusion.Net.Spot{host: "example.com", port: 22}) - "/usr/bin/env ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p 22 -i /home/john/.ssh/id_rsa -nNT -R 3001:localhost:3002 john@example.com" + ~s(/usr/bin/env ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p 22 -i "/home/john/.ssh/id_rsa" -nNT -R 3001:localhost:3002 john@example.com) """ def cmd(cmd, auth, remote, ssh_path \\ @default_ssh_path) - def cmd(cmd, %{username: username, password: password}, %Spot{} = remote, ssh_path) do - "#{partial_cmd_sshpass(password)} #{ssh_path} #{@default_ssh_opts} -p #{remote.port} #{cmd} #{username}@#{remote.host}" + def cmd(cmd, %{username: username, password: _password}, %Spot{} = remote, ssh_path) do + "#{partial_cmd_sshpass()} #{ssh_path} #{@default_ssh_opts} -p #{remote.port} #{cmd} #{username}@#{remote.host}" end def cmd(cmd, %{username: username, key_path: key_path}, %Spot{} = remote, ssh_path) do - "#{ssh_path} #{@default_ssh_opts} -p #{remote.port} -i #{key_path} #{cmd} #{username}@#{remote.host}" + "#{ssh_path} #{@default_ssh_opts} -p #{remote.port} -i \"#{Bash.escape_str(key_path)}\" #{cmd} #{username}@#{remote.host}" end @doc "Generate an SSH command to execute a remote command." @@ -94,14 +100,14 @@ defmodule Fusion.Utilities.Ssh do ...> 4567, ...> %Fusion.Net.Spot{host: "localhost", port: 2345}, ...> :reverse) - "/usr/bin/env sshpass -p abcd1234 /usr/bin/env ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p 22 -nNT -R 4567:localhost:2345 john@example.com" + "/usr/bin/env sshpass -e /usr/bin/env ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p 22 -nNT -R 4567:localhost:2345 john@example.com" iex> Fusion.Utilities.Ssh.cmd_port_tunnel(%{username: "john", password: "abcd1234"}, ...> %Fusion.Net.Spot{host: "example.com", port: 22}, ...> 4567, ...> %Fusion.Net.Spot{host: "localhost", port: 2345}, ...> :forward) - "/usr/bin/env sshpass -p abcd1234 /usr/bin/env ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p 22 -nNT -4 -L 4567:localhost:2345 john@example.com" + "/usr/bin/env sshpass -e /usr/bin/env ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p 22 -nNT -4 -L 4567:localhost:2345 john@example.com" """ def cmd_port_tunnel(auth, %Spot{} = remote, from_port, %Spot{} = to_spot, :reverse) do partial_cmd_reverse_tunnel(from_port, to_spot) diff --git a/mix.exs b/mix.exs index 7efb8a4..b37016c 100644 --- a/mix.exs +++ b/mix.exs @@ -27,7 +27,7 @@ defmodule Fusion.MixProject do def application do [ - extra_applications: [:logger], + extra_applications: [:logger, :ssh, :public_key, :crypto], mod: {Fusion.Application, []} ] end diff --git a/test/docker/run.sh b/test/docker/run.sh index b2c4064..05e26a8 100755 --- a/test/docker/run.sh +++ b/test/docker/run.sh @@ -31,7 +31,7 @@ start() { # Wait for SSH to be ready echo -n "Waiting for SSH..." for i in $(seq 1 30); do - if docker exec "$CONTAINER_NAME" sh -c "ss -tlnp | grep -q ':22'" 2>/dev/null; then + if docker exec "$CONTAINER_NAME" sh -c "cat /proc/net/tcp 2>/dev/null | grep -q '00000000:0016'" 2>/dev/null; then echo " ready!" break fi diff --git a/test/fusion/external_test.exs b/test/fusion/external_test.exs index 29f0198..6cb9156 100644 --- a/test/fusion/external_test.exs +++ b/test/fusion/external_test.exs @@ -17,65 +17,218 @@ defmodule Fusion.ExternalTest do @moduletag :external - defp skip_unless_docker_available do - cond do - not Docker.available?() -> - IO.puts("SKIP: Docker container not running (cd test/docker && ./run.sh start)") - :skip - - not Docker.ssh_works?() -> - IO.puts("SKIP: SSH to Docker container failed") - :skip - - true -> - :ok + defp ensure_docker_available! do + unless Docker.available?() do + flunk("Docker container not running (cd test/docker && ./run.sh start)") end - end - defp with_connected_node(fun) do - case skip_unless_docker_available() do - :skip -> - :ok + unless Docker.ssh_works?() do + flunk("SSH to Docker container failed") + end + end - :ok -> - target = Docker.target() - {:ok, manager} = NodeManager.start_link(target) + defp with_connected_node(fun, opts \\ []) do + ensure_docker_available!() + backend = Keyword.get(opts, :backend) + auth = Keyword.get(opts, :auth) - case NodeManager.connect(manager) do - {:ok, remote_node} -> - try do - fun.(remote_node) - after - NodeManager.disconnect(manager) - GenServer.stop(manager) - end + target = + case {backend, auth} do + {nil, nil} -> Docker.target() + {nil, :password} -> Docker.target_password() + {b, nil} -> %{Docker.target() | ssh_backend: b} + {b, :password} -> %{Docker.target_password() | ssh_backend: b} + end - {:error, :local_node_not_alive} -> - IO.puts("SKIP: Run with --sname flag") + {:ok, manager} = NodeManager.start_link(target) - {:error, reason} -> - flunk("Connection to Docker container failed: #{inspect(reason)}") + case NodeManager.connect(manager) do + {:ok, remote_node} -> + try do + fun.(manager, remote_node) + after + NodeManager.disconnect(manager) + GenServer.stop(manager) + # Allow remote sshd to release tunnel listeners before the + # next test tries to bind the same local node port. + Process.sleep(1_000) end + + {:error, :local_node_not_alive} -> + flunk("Local node not alive (run with --sname flag)") + + {:error, reason} -> + flunk("Connection to Docker container failed: #{inspect(reason)}") + end + end + + ## NodeManager: backend connectivity + # + # Erlang backend tests both key and password auth through NodeManager. + # System backend tests one auth mode through NodeManager — its tunnel cleanup + # is asynchronous (remote sshd may hold listeners after close), so we limit + # NodeManager tests to reduce :not_accepted conflicts between tests. + + for backend <- [Fusion.SshBackend.Erlang] do + backend_name = backend |> Module.split() |> List.last() + + @tag timeout: 30_000 + test "connect and disconnect with #{backend_name} backend (key auth)" do + with_connected_node( + fn _manager, remote_node -> + assert is_atom(remote_node) + assert remote_node in Node.list() + end, + backend: unquote(backend) + ) + end + + @tag timeout: 30_000 + test "connect and disconnect with #{backend_name} backend (password auth)" do + with_connected_node( + fn _manager, remote_node -> + assert is_atom(remote_node) + assert remote_node in Node.list() + end, + backend: unquote(backend), + auth: :password + ) end end + ## NodeManager: status and lifecycle + + @tag timeout: 30_000 + test "NodeManager status and remote_node APIs" do + with_connected_node(fn manager, remote_node -> + assert NodeManager.status(manager) == :connected + assert NodeManager.remote_node(manager) == remote_node + end) + end + + @tag timeout: 60_000 + test "reconnect after disconnect" do + ensure_docker_available!() + target = Docker.target() + {:ok, manager} = NodeManager.start_link(target) + + {:ok, node1} = NodeManager.connect(manager) + assert node1 in Node.list() + + NodeManager.disconnect(manager) + assert NodeManager.status(manager) == :disconnected + assert NodeManager.remote_node(manager) == nil + Process.sleep(1_000) + + {:ok, node2} = NodeManager.connect(manager) + assert node2 in Node.list() + assert node1 != node2 + + NodeManager.disconnect(manager) + GenServer.stop(manager) + Process.sleep(1_000) + end + + ## SshBackend: direct exec + + @tag timeout: 15_000 + test "Erlang backend: exec command directly" do + ensure_docker_available!() + target = Docker.target() + + {:ok, conn} = Fusion.SshBackend.Erlang.connect(target) + {:ok, output} = Fusion.SshBackend.Erlang.exec(conn, "echo hello") + assert String.trim(output) == "hello" + assert Fusion.SshBackend.Erlang.close(conn) == :ok + end + + @tag timeout: 15_000 + test "System backend: exec command directly" do + ensure_docker_available!() + target = Docker.target() + + {:ok, conn} = Fusion.SshBackend.System.connect(target) + {:ok, output} = Fusion.SshBackend.System.exec(conn, "echo hello") + assert String.trim(output) == "hello" + assert Fusion.SshBackend.System.close(conn) == :ok + end + + @tag timeout: 15_000 + test "Erlang backend: exec returns error for failing command" do + ensure_docker_available!() + target = Docker.target() + + {:ok, conn} = Fusion.SshBackend.Erlang.connect(target) + assert {:error, {:exit_status, code, _, _}} = Fusion.SshBackend.Erlang.exec(conn, "exit 42") + assert code == 42 + Fusion.SshBackend.Erlang.close(conn) + end + + @tag timeout: 15_000 + test "System backend: exec returns error for failing command" do + ensure_docker_available!() + target = Docker.target() + + {:ok, conn} = Fusion.SshBackend.System.connect(target) + assert {:error, {:exit_status, code, _, _}} = Fusion.SshBackend.System.exec(conn, "exit 42") + assert code == 42 + Fusion.SshBackend.System.close(conn) + end + + ## SshBackend: tunnels + + @tag timeout: 15_000 + test "Erlang backend: forward and reverse tunnels" do + ensure_docker_available!() + target = Docker.target() + + {:ok, conn} = Fusion.SshBackend.Erlang.connect(target) + + [fwd_port, rev_port] = Fusion.Net.gen_unique_ports(2) + + assert {:ok, ^fwd_port} = + Fusion.SshBackend.Erlang.forward_tunnel(conn, fwd_port, "127.0.0.1", fwd_port) + + assert {:ok, ^rev_port} = + Fusion.SshBackend.Erlang.reverse_tunnel(conn, rev_port, "127.0.0.1", rev_port) + + Fusion.SshBackend.Erlang.close(conn) + end + + @tag timeout: 15_000 + test "System backend: forward and reverse tunnels" do + ensure_docker_available!() + target = Docker.target() + + {:ok, conn} = Fusion.SshBackend.System.connect(target) + + [fwd_port, rev_port] = Fusion.Net.gen_unique_ports(2) + + assert {:ok, ^fwd_port} = + Fusion.SshBackend.System.forward_tunnel(conn, fwd_port, "127.0.0.1", fwd_port) + + assert {:ok, ^rev_port} = + Fusion.SshBackend.System.reverse_tunnel(conn, rev_port, "127.0.0.1", rev_port) + + Fusion.SshBackend.System.close(conn) + Process.sleep(500) + end + + ## TaskRunner: remote execution + @tag timeout: 60_000 test "full pipeline: connect, push module, execute, disconnect" do - with_connected_node(fn remote_node -> - # Verify basic arithmetic works via MFA + with_connected_node(fn _manager, remote_node -> assert {:ok, 3} = TaskRunner.run(remote_node, Kernel, :+, [1, 2]) - # Run function from a compiled helper module via run_fun assert {:ok, "hello from remote"} = TaskRunner.run_fun(remote_node, &RemoteFuns.hello/0) - # Push a custom module and call it assert :ok = TaskRunner.push_module(remote_node, Fusion.Net) assert {:ok, port} = TaskRunner.run(remote_node, Fusion.Net, :gen_port, []) assert is_integer(port) - assert port >= 49152 + assert port > 0 - # Verify the remote node is a separate BEAM instance assert {:ok, remote_pid} = TaskRunner.run_fun(remote_node, &RemoteFuns.get_self/0) @@ -84,9 +237,25 @@ defmodule Fusion.ExternalTest do end) end + @tag timeout: 30_000 + test "run function on remote node" do + with_connected_node(fn _manager, remote_node -> + assert :ok = TaskRunner.push_module(remote_node, RemoteFuns) + assert {:ok, 42} = TaskRunner.run(remote_node, RemoteFuns, :multiply, [21, 2]) + end) + end + + @tag timeout: 30_000 + test "run_fun with auto-push of named function" do + with_connected_node(fn _manager, remote_node -> + assert {:ok, "hello from remote"} = + TaskRunner.run_fun(remote_node, &RemoteFuns.hello/0) + end) + end + @tag timeout: 60_000 test "run system command on remote container" do - with_connected_node(fn remote_node -> + with_connected_node(fn _manager, remote_node -> assert {:ok, {hostname, 0}} = TaskRunner.run(remote_node, System, :cmd, ["hostname", []]) @@ -97,7 +266,7 @@ defmodule Fusion.ExternalTest do @tag timeout: 60_000 test "push multiple modules and use them together" do - with_connected_node(fn remote_node -> + with_connected_node(fn _manager, remote_node -> assert :ok = TaskRunner.push_modules(remote_node, [ Fusion.Net, @@ -114,15 +283,36 @@ defmodule Fusion.ExternalTest do @tag timeout: 60_000 test "automatic transitive dependency pushing" do - with_connected_node(fn remote_node -> - # Push ONLY RemoteFuns - it references Fusion.Net.Spot via make_spot/1. - # The dependency should be resolved and pushed automatically. + with_connected_node(fn _manager, remote_node -> assert :ok = TaskRunner.push_module(remote_node, RemoteFuns) - # Call make_spot which creates a Fusion.Net.Spot struct. - # This would fail with UndefinedFunctionError if Spot wasn't auto-pushed. assert {:ok, %Fusion.Net.Spot{host: "test", port: 42}} = TaskRunner.run(remote_node, RemoteFuns, :make_spot, [42]) end) end + + ## Error handling + + @tag timeout: 30_000 + test "run returns error for undefined function on remote" do + with_connected_node(fn _manager, remote_node -> + assert {:error, _} = TaskRunner.run(remote_node, :nonexistent_module, :nope, []) + end) + end + + @tag timeout: 30_000 + test "connection to unreachable host fails" do + ensure_docker_available!() + + target = %Fusion.Target{ + host: "localhost", + port: 19999, + username: "fusion_test", + auth: {:key, Docker.key_path()} + } + + {:ok, manager} = NodeManager.start_link(target) + assert {:error, _reason} = NodeManager.connect(manager, 5_000) + GenServer.stop(manager) + end end diff --git a/test/fusion/net_test.exs b/test/fusion/net_test.exs index 8b6609d..8d4e7dc 100644 --- a/test/fusion/net_test.exs +++ b/test/fusion/net_test.exs @@ -3,9 +3,10 @@ defmodule Fusion.NetTest do doctest Fusion.Net describe "gen_port/0" do - test "generates port in ephemeral range" do + test "generates a valid port number" do port = Fusion.Net.gen_port() - assert port >= 49152 + assert is_integer(port) + assert port > 0 assert port <= 65535 end diff --git a/test/fusion/node_manager_integration_test.exs b/test/fusion/node_manager_integration_test.exs deleted file mode 100644 index 1b47681..0000000 --- a/test/fusion/node_manager_integration_test.exs +++ /dev/null @@ -1,95 +0,0 @@ -defmodule Fusion.NodeManagerIntegrationTest do - @moduledoc """ - Integration tests that SSH to localhost to test real node bootstrapping. - Run with: mix test --include integration - Requires: SSH server running locally, key-based auth configured for current user. - """ - use ExUnit.Case - - alias Fusion.NodeManager - alias Fusion.Target - - @moduletag :integration - - defp find_ssh_key do - ~w(~/.ssh/id_ed25519 ~/.ssh/id_rsa ~/.ssh/id_ecdsa) - |> Enum.map(&Path.expand/1) - |> Enum.find(&File.exists?/1) - end - - defp ssh_to_localhost_works?(ssh_key) do - user = System.get_env("USER") - - {_output, exit_code} = - System.cmd( - "ssh", - [ - "-i", - ssh_key, - "-o", - "BatchMode=yes", - "-o", - "ConnectTimeout=3", - "#{user}@localhost", - "echo", - "ok" - ], - stderr_to_stdout: true - ) - - exit_code == 0 - end - - defp skip_unless_ssh_available do - case find_ssh_key() do - nil -> - {:skip, "No SSH key found in ~/.ssh/"} - - ssh_key -> - if ssh_to_localhost_works?(ssh_key) do - {:ok, ssh_key} - else - {:skip, "SSH to localhost not configured (add local key to authorized_keys)"} - end - end - end - - @tag timeout: 30_000 - test "connect to localhost, bootstrap remote node, verify cluster connection" do - case skip_unless_ssh_available() do - {:skip, reason} -> - IO.puts("SKIP: #{reason}") - - {:ok, ssh_key} -> - user = System.get_env("USER") - - target = %Target{ - host: "localhost", - port: 22, - username: user, - auth: {:key, ssh_key} - } - - {:ok, manager} = NodeManager.start_link(target) - - case NodeManager.connect(manager) do - {:ok, remote_node} -> - assert is_atom(remote_node) - assert remote_node in Node.list() - assert NodeManager.status(manager) == :connected - - assert NodeManager.disconnect(manager) == :ok - assert NodeManager.status(manager) == :disconnected - refute remote_node in Node.list() - - {:error, :local_node_not_alive} -> - IO.puts("SKIP: Local node not alive (run with --sname)") - - {:error, reason} -> - flunk("Failed to connect: #{inspect(reason)}") - end - - GenServer.stop(manager) - end - end -end diff --git a/test/fusion/node_manager_test.exs b/test/fusion/node_manager_test.exs index 8ea42e4..46463ca 100644 --- a/test/fusion/node_manager_test.exs +++ b/test/fusion/node_manager_test.exs @@ -1,49 +1,174 @@ defmodule Fusion.NodeManagerTest do - use ExUnit.Case + use ExUnit.Case, async: true alias Fusion.NodeManager alias Fusion.Target describe "start_link/1" do test "starts with a target" do - target = %Target{ - host: "example.com", - port: 22, - username: "deploy", - auth: {:key, "~/.ssh/id_rsa"} - } - - {:ok, pid} = NodeManager.start_link(target) + pid = start_manager() assert is_pid(pid) assert NodeManager.status(pid) == :disconnected assert NodeManager.remote_node(pid) == nil - - GenServer.stop(pid) end end describe "status/1" do test "returns :disconnected initially" do - target = %Target{host: "x", port: 22, username: "u", auth: {:key, "/k"}} - {:ok, pid} = NodeManager.start_link(target) + pid = start_manager() assert NodeManager.status(pid) == :disconnected - GenServer.stop(pid) end end describe "disconnect/1" do test "disconnect when already disconnected is a no-op" do - target = %Target{host: "x", port: 22, username: "u", auth: {:key, "/k"}} - {:ok, pid} = NodeManager.start_link(target) + pid = start_manager() assert NodeManager.disconnect(pid) == :ok - GenServer.stop(pid) + end + + test "disconnect when connected resets state and calls close" do + conn_id = :disc_conn + table = :ets.new(:"tracking_mock_#{conn_id}", [:named_table, :public, :set]) + on_exit(fn -> if :ets.whereis(table) != :undefined, do: :ets.delete(table) end) + + pid = start_manager(ssh_backend: Fusion.Test.TrackingMockBackend) + force_connected(pid, :disc_test@localhost, conn_id) + + assert NodeManager.disconnect(pid) == :ok + assert NodeManager.status(pid) == :disconnected + assert NodeManager.remote_node(pid) == nil + + assert [{:close_count, 1}] = :ets.lookup(table, :close_count) end end describe "terminate/2" do - test "terminate handles disconnected state" do - state = %NodeManager{status: :disconnected} - assert NodeManager.terminate(:normal, state) == :ok + test "calls close on the backend during cleanup" do + conn_id = :term_conn + table = :ets.new(:"tracking_mock_#{conn_id}", [:named_table, :public, :set]) + on_exit(fn -> if :ets.whereis(table) != :undefined, do: :ets.delete(table) end) + + pid = start_manager(ssh_backend: Fusion.Test.TrackingMockBackend) + force_connected(pid, :term_test@localhost, conn_id) + + GenServer.stop(pid, :normal) + + assert [{:close_count, 1}] = :ets.lookup(table, :close_count) + end + end + + describe "connect error handling" do + @tag :distributed + test "returns error when backend.connect fails" do + pid = start_manager(ssh_backend: Fusion.Test.FailConnectBackend) + assert {:error, :connection_refused} = NodeManager.connect(pid) + assert NodeManager.status(pid) == :disconnected + end + + @tag :distributed + test "returns error when tunnel setup fails" do + pid = start_manager(ssh_backend: Fusion.Test.FailTunnelBackend) + assert {:error, :tunnel_failed} = NodeManager.connect(pid) + assert NodeManager.status(pid) == :disconnected + end + + @tag :distributed + test "returns error when exec_async fails" do + pid = start_manager(ssh_backend: Fusion.Test.FailExecAsyncBackend) + assert {:error, :exec_async_failed} = NodeManager.connect(pid) + assert NodeManager.status(pid) == :disconnected + end + + @tag :not_distributed + test "returns error when local node is not alive" do + pid = start_manager() + assert {:error, :local_node_not_alive} = NodeManager.connect(pid) + assert NodeManager.status(pid) == :disconnected + end + end + + describe "handle_info(:nodedown)" do + test "sets status to disconnected and cleans up" do + pid = start_manager() + force_connected(pid, :test_node@localhost) + + send(pid, {:nodedown, :test_node@localhost}) + # :sys.get_state forces the GenServer to process all prior messages + _ = :sys.get_state(pid) + assert NodeManager.status(pid) == :disconnected + assert NodeManager.remote_node(pid) == nil + end + + test "ignores arbitrary messages" do + pid = start_manager() + send(pid, :random_message) + _ = :sys.get_state(pid) + assert NodeManager.status(pid) == :disconnected + end + + test "ignores nodedown for unrelated nodes" do + pid = start_manager() + force_connected(pid, :my_node@localhost) + + send(pid, {:nodedown, :other_node@localhost}) + _ = :sys.get_state(pid) + assert NodeManager.status(pid) == :connected end end + + describe "connect when already connected" do + test "returns existing node name" do + pid = start_manager() + force_connected(pid, :fake@localhost) + + assert {:ok, :fake@localhost} = NodeManager.connect(pid) + end + end + + describe "backend validation" do + test "raises for module that does not implement SshBackend" do + Process.flag(:trap_exit, true) + + assert {:error, {%ArgumentError{message: msg}, _}} = + NodeManager.start_link(build_target(ssh_backend: String)) + + assert msg =~ "does not implement" + end + end + + describe "backend selection" do + test "uses the backend from the target" do + pid = start_manager(ssh_backend: Fusion.SshBackend.System) + state = :sys.get_state(pid) + assert state.target.ssh_backend == Fusion.SshBackend.System + end + + test "defaults to Erlang backend" do + pid = start_manager() + state = :sys.get_state(pid) + assert state.target.ssh_backend == Fusion.SshBackend.Erlang + end + end + + ## Helpers + + defp start_manager(overrides \\ []) do + target = build_target(overrides) + {:ok, pid} = NodeManager.start_link(target) + on_exit(fn -> if Process.alive?(pid), do: GenServer.stop(pid) end) + pid + end + + defp force_connected(pid, node_name, conn \\ :mock_conn) do + :sys.replace_state(pid, fn state -> + %{state | status: :connected, remote_node_name: node_name, conn: conn} + end) + end + + defp build_target(overrides) do + struct( + %Target{host: "x", port: 22, username: "u", auth: {:key, "/k"}}, + overrides + ) + end end diff --git a/test/fusion/ssh_backend/erlang_test.exs b/test/fusion/ssh_backend/erlang_test.exs new file mode 100644 index 0000000..8126714 --- /dev/null +++ b/test/fusion/ssh_backend/erlang_test.exs @@ -0,0 +1,18 @@ +defmodule Fusion.SshBackend.ErlangTest do + use ExUnit.Case, async: true + + import Fusion.Test.SshBackendSharedTests + + alias Fusion.SshBackend.Erlang, as: Backend + + assert_implements_ssh_backend(Backend) + + test "ensures :ssh application is started" do + started_apps = Application.started_applications() |> Enum.map(&elem(&1, 0)) + assert :ssh in started_apps + end + + test "close/1 returns :ok even with invalid connection" do + assert :ok = Backend.close(:not_a_real_connection) + end +end diff --git a/test/fusion/ssh_backend/system_test.exs b/test/fusion/ssh_backend/system_test.exs new file mode 100644 index 0000000..bfaa4e4 --- /dev/null +++ b/test/fusion/ssh_backend/system_test.exs @@ -0,0 +1,51 @@ +defmodule Fusion.SshBackend.SystemTest do + use ExUnit.Case, async: true + + import Fusion.Test.SshBackendSharedTests + + alias Fusion.SshBackend.System, as: Backend + alias Fusion.Target + + assert_implements_ssh_backend(Backend) + + test "connect returns a conn struct with auth and remote" do + target = %Target{ + host: "example.com", + port: 22, + username: "deploy", + auth: {:key, "~/.ssh/id_rsa"}, + ssh_backend: Backend + } + + assert {:ok, conn} = Backend.connect(target) + assert conn.auth == %{username: "deploy", key_path: "~/.ssh/id_rsa"} + assert conn.remote == %Fusion.Net.Spot{host: "example.com", port: 22} + end + + test "connect with password auth" do + target = %Target{ + host: "example.com", + port: 2222, + username: "admin", + auth: {:password, "secret"}, + ssh_backend: Backend + } + + assert {:ok, conn} = Backend.connect(target) + assert conn.auth == %{username: "admin", password: "secret"} + assert conn.remote == %Fusion.Net.Spot{host: "example.com", port: 2222} + end + + test "close returns :ok" do + target = %Target{ + host: "example.com", + port: 22, + username: "deploy", + auth: {:key, "~/.ssh/id_rsa"}, + ssh_backend: Backend + } + + {:ok, conn} = Backend.connect(target) + assert :ok = Backend.close(conn) + end +end diff --git a/test/fusion/ssh_backend_test.exs b/test/fusion/ssh_backend_test.exs new file mode 100644 index 0000000..f372e86 --- /dev/null +++ b/test/fusion/ssh_backend_test.exs @@ -0,0 +1,13 @@ +defmodule Fusion.SshBackendTest do + use ExUnit.Case, async: true + + test "Fusion.SshBackend defines the expected callbacks" do + callbacks = Fusion.SshBackend.behaviour_info(:callbacks) + assert {:connect, 1} in callbacks + assert {:forward_tunnel, 4} in callbacks + assert {:reverse_tunnel, 4} in callbacks + assert {:exec, 2} in callbacks + assert {:exec_async, 2} in callbacks + assert {:close, 1} in callbacks + end +end diff --git a/test/fusion/ssh_key_provider_test.exs b/test/fusion/ssh_key_provider_test.exs new file mode 100644 index 0000000..1c49ac5 --- /dev/null +++ b/test/fusion/ssh_key_provider_test.exs @@ -0,0 +1,101 @@ +defmodule Fusion.SshKeyProviderTest do + use ExUnit.Case, async: true + + describe "is_host_key/5" do + test "accepts any host key" do + assert Fusion.SshKeyProvider.is_host_key(:fake_key, "host", 22, :ssh_rsa, []) + end + end + + describe "is_host_key/4" do + test "accepts any host key" do + assert Fusion.SshKeyProvider.is_host_key(:fake_key, "host", :ssh_rsa, []) + end + end + + describe "add_host_key/4" do + test "returns :ok" do + assert :ok = Fusion.SshKeyProvider.add_host_key("host", 22, :fake_key, []) + end + end + + describe "add_host_key/3" do + test "returns :ok" do + assert :ok = Fusion.SshKeyProvider.add_host_key("host", :fake_key, []) + end + end + + describe "user_key/2" do + @tag :tmp_dir + test "reads an Ed25519 key file", %{tmp_dir: tmp_dir} do + key_path = generate_key(tmp_dir, "test_key", "ed25519") + assert {:ok, _key} = Fusion.SshKeyProvider.user_key(:"ssh-ed25519", key_path: key_path) + end + + @tag :tmp_dir + test "reads an RSA key file", %{tmp_dir: tmp_dir} do + key_path = generate_key(tmp_dir, "test_rsa_key", "rsa", ["-b", "2048"]) + assert {:ok, _key} = Fusion.SshKeyProvider.user_key(:"ssh-rsa", key_path: key_path) + end + + @tag :tmp_dir + test "reads an ECDSA key file", %{tmp_dir: tmp_dir} do + key_path = generate_key(tmp_dir, "test_ecdsa_key", "ecdsa", ["-b", "256"]) + + assert {:ok, _key} = + Fusion.SshKeyProvider.user_key(:"ecdsa-sha2-nistp256", key_path: key_path) + end + + test "returns error for missing key file" do + assert {:error, {:file_read_error, "/nonexistent/key", :enoent}} = + Fusion.SshKeyProvider.user_key(:"ssh-ed25519", key_path: "/nonexistent/key") + end + + test "raises KeyError when key_path option is missing" do + assert_raise KeyError, fn -> + Fusion.SshKeyProvider.user_key(:"ssh-ed25519", []) + end + end + + @tag :tmp_dir + test "returns error for garbage data", %{tmp_dir: tmp_dir} do + key_path = Path.join(tmp_dir, "bad_key") + File.write!(key_path, "this is not a key") + + assert {:error, :unsupported_key_format} = + Fusion.SshKeyProvider.user_key(:"ssh-rsa", key_path: key_path) + end + + @tag :tmp_dir + test "returns error for encrypted key", %{tmp_dir: tmp_dir} do + key_path = generate_key(tmp_dir, "encrypted_key", "ed25519", ["-N", "my_passphrase"]) + + assert {:error, :encrypted_key} = + Fusion.SshKeyProvider.user_key(:"ssh-ed25519", key_path: key_path) + end + + @tag :tmp_dir + test "returns error when key type does not match requested algorithm", %{tmp_dir: tmp_dir} do + key_path = generate_key(tmp_dir, "mismatch_key", "ed25519") + + assert {:error, :key_type_mismatch} = + Fusion.SshKeyProvider.user_key(:"ssh-rsa", key_path: key_path) + end + end + + ## Helpers + + defp generate_key(tmp_dir, name, type, extra_opts \\ []) do + key_path = Path.join(tmp_dir, name) + # Default: no passphrase, quiet mode + base_opts = ["-t", type, "-f", key_path, "-q"] + + opts = + if Enum.any?(extra_opts, &(&1 == "-N")), + do: base_opts ++ extra_opts, + else: base_opts ++ ["-N", ""] ++ extra_opts + + {_, 0} = System.cmd("ssh-keygen", opts) + key_path + end +end diff --git a/test/fusion/target_test.exs b/test/fusion/target_test.exs index da74db1..fcf0123 100644 --- a/test/fusion/target_test.exs +++ b/test/fusion/target_test.exs @@ -2,35 +2,23 @@ defmodule Fusion.TargetTest do use ExUnit.Case, async: true alias Fusion.Target - alias Fusion.Net.Spot - describe "to_auth_and_spot/1" do - test "converts key-based target" do - target = %Target{ - host: "10.0.1.5", - port: 22, - username: "deploy", - auth: {:key, "~/.ssh/id_rsa"} - } - - {auth, remote} = Target.to_auth_and_spot(target) - - assert auth == %{username: "deploy", key_path: "~/.ssh/id_rsa"} - assert remote == %Spot{host: "10.0.1.5", port: 22} + describe "ssh_backend" do + test "defaults to Fusion.SshBackend.Erlang" do + target = %Target{host: "x", port: 22, username: "u", auth: {:key, "/k"}} + assert target.ssh_backend == Fusion.SshBackend.Erlang end - test "converts password-based target" do + test "can be set to System backend" do target = %Target{ - host: "example.com", - port: 2222, - username: "admin", - auth: {:password, "secret"} + host: "x", + port: 22, + username: "u", + auth: {:key, "/k"}, + ssh_backend: Fusion.SshBackend.System } - {auth, remote} = Target.to_auth_and_spot(target) - - assert auth == %{username: "admin", password: "secret"} - assert remote == %Spot{host: "example.com", port: 2222} + assert target.ssh_backend == Fusion.SshBackend.System end end end diff --git a/test/fusion/task_runner_integration_test.exs b/test/fusion/task_runner_integration_test.exs deleted file mode 100644 index 8327a5e..0000000 --- a/test/fusion/task_runner_integration_test.exs +++ /dev/null @@ -1,105 +0,0 @@ -defmodule Fusion.TaskRunnerIntegrationTest do - @moduledoc """ - Integration tests for TaskRunner over a real SSH connection. - Run with: mix test --include integration - """ - use ExUnit.Case - - alias Fusion.NodeManager - alias Fusion.TaskRunner - alias Fusion.Target - - @moduletag :integration - - defp find_ssh_key do - ~w(~/.ssh/id_ed25519 ~/.ssh/id_rsa ~/.ssh/id_ecdsa) - |> Enum.map(&Path.expand/1) - |> Enum.find(&File.exists?/1) - end - - defp ssh_to_localhost_works?(ssh_key) do - user = System.get_env("USER") - - {_output, exit_code} = - System.cmd( - "ssh", - [ - "-i", - ssh_key, - "-o", - "BatchMode=yes", - "-o", - "ConnectTimeout=3", - "#{user}@localhost", - "echo", - "ok" - ], - stderr_to_stdout: true - ) - - exit_code == 0 - end - - defp with_connected_node(fun) do - case find_ssh_key() do - nil -> - IO.puts("SKIP: No SSH key found") - - ssh_key -> - unless ssh_to_localhost_works?(ssh_key) do - IO.puts("SKIP: SSH to localhost not configured") - else - user = System.get_env("USER") - - target = %Target{ - host: "localhost", - port: 22, - username: user, - auth: {:key, ssh_key} - } - - {:ok, manager} = NodeManager.start_link(target) - - case NodeManager.connect(manager) do - {:ok, remote_node} -> - try do - fun.(remote_node) - after - NodeManager.disconnect(manager) - GenServer.stop(manager) - end - - {:error, :local_node_not_alive} -> - IO.puts("SKIP: Local node not alive") - - {:error, reason} -> - flunk("Connection failed: #{inspect(reason)}") - end - end - end - end - - @tag timeout: 30_000 - test "run MFA on remote node" do - with_connected_node(fn remote_node -> - assert {:ok, 3} = TaskRunner.run(remote_node, Kernel, :+, [1, 2]) - end) - end - - @tag timeout: 30_000 - test "run anonymous function on remote node" do - with_connected_node(fn remote_node -> - assert {:ok, 42} = TaskRunner.run_fun(remote_node, fn -> 21 * 2 end) - end) - end - - @tag timeout: 30_000 - test "push and run custom module on remote node" do - with_connected_node(fn remote_node -> - assert :ok = TaskRunner.push_module(remote_node, Fusion.Net) - assert {:ok, port} = TaskRunner.run(remote_node, Fusion.Net, :gen_port, []) - assert is_integer(port) - assert port >= 49152 - end) - end -end diff --git a/test/helpers/mock_ssh_backends.ex b/test/helpers/mock_ssh_backends.ex new file mode 100644 index 0000000..7ca94f8 --- /dev/null +++ b/test/helpers/mock_ssh_backends.ex @@ -0,0 +1,146 @@ +defmodule Fusion.Test.MockBackend do + @moduledoc false + @behaviour Fusion.SshBackend + + @mock_async_timeout 5_000 + + @impl true + def connect(_target), do: {:ok, :mock_conn} + + @impl true + def forward_tunnel(_conn, port, _host, _remote_port), do: {:ok, port} + + @impl true + def reverse_tunnel(_conn, port, _host, _remote_port), do: {:ok, port} + + @impl true + def exec(_conn, _cmd), do: {:ok, ""} + + @impl true + def exec_async(_conn, _cmd) do + pid = + spawn(fn -> + receive do + :stop -> :ok + after + @mock_async_timeout -> :ok + end + end) + + {:ok, pid} + end + + @impl true + def close(_conn), do: :ok +end + +defmodule Fusion.Test.TrackingMockBackend do + @moduledoc false + @behaviour Fusion.SshBackend + + alias Fusion.Test.MockBackend + + @impl true + defdelegate connect(target), to: MockBackend + + @impl true + defdelegate forward_tunnel(conn, port, host, remote_port), to: MockBackend + + @impl true + defdelegate reverse_tunnel(conn, port, host, remote_port), to: MockBackend + + @impl true + defdelegate exec(conn, cmd), to: MockBackend + + @impl true + defdelegate exec_async(conn, cmd), to: MockBackend + + @impl true + def close(conn) when is_atom(conn) do + table = :"tracking_mock_#{conn}" + + if :ets.whereis(table) != :undefined do + :ets.update_counter(table, :close_count, 1, {:close_count, 0}) + end + + :ok + end + + @impl true + def close(_conn), do: :ok +end + +defmodule Fusion.Test.FailConnectBackend do + @moduledoc false + @behaviour Fusion.SshBackend + + alias Fusion.Test.MockBackend + + @impl true + def connect(_target), do: {:error, :connection_refused} + + @impl true + defdelegate forward_tunnel(conn, port, host, remote_port), to: MockBackend + + @impl true + defdelegate reverse_tunnel(conn, port, host, remote_port), to: MockBackend + + @impl true + defdelegate exec(conn, cmd), to: MockBackend + + @impl true + defdelegate exec_async(conn, cmd), to: MockBackend + + @impl true + defdelegate close(conn), to: MockBackend +end + +defmodule Fusion.Test.FailExecAsyncBackend do + @moduledoc false + @behaviour Fusion.SshBackend + + alias Fusion.Test.MockBackend + + @impl true + defdelegate connect(target), to: MockBackend + + @impl true + defdelegate forward_tunnel(conn, port, host, remote_port), to: MockBackend + + @impl true + defdelegate reverse_tunnel(conn, port, host, remote_port), to: MockBackend + + @impl true + defdelegate exec(conn, cmd), to: MockBackend + + @impl true + def exec_async(_conn, _cmd), do: {:error, :exec_async_failed} + + @impl true + defdelegate close(conn), to: MockBackend +end + +defmodule Fusion.Test.FailTunnelBackend do + @moduledoc false + @behaviour Fusion.SshBackend + + alias Fusion.Test.MockBackend + + @impl true + defdelegate connect(target), to: MockBackend + + @impl true + def forward_tunnel(_conn, _port, _host, _remote_port), do: {:error, :tunnel_failed} + + @impl true + def reverse_tunnel(_conn, _port, _host, _remote_port), do: {:error, :tunnel_failed} + + @impl true + defdelegate exec(conn, cmd), to: MockBackend + + @impl true + defdelegate exec_async(conn, cmd), to: MockBackend + + @impl true + defdelegate close(conn), to: MockBackend +end diff --git a/test/helpers/remote_funs.ex b/test/helpers/remote_funs.ex index 03ab32f..4eee006 100644 --- a/test/helpers/remote_funs.ex +++ b/test/helpers/remote_funs.ex @@ -6,4 +6,6 @@ defmodule Fusion.Test.Helpers.RemoteFuns do def get_self, do: self() def make_spot(port), do: %Fusion.Net.Spot{host: "test", port: port} + + def multiply(a, b), do: a * b end diff --git a/test/helpers/ssh_backend_shared_tests.ex b/test/helpers/ssh_backend_shared_tests.ex new file mode 100644 index 0000000..d6d825a --- /dev/null +++ b/test/helpers/ssh_backend_shared_tests.ex @@ -0,0 +1,32 @@ +defmodule Fusion.Test.SshBackendSharedTests do + @moduledoc false + + defmacro assert_implements_ssh_backend(module) do + quote do + test "implements the SshBackend behaviour" do + behaviours = + unquote(module).__info__(:attributes) + |> Keyword.get_values(:behaviour) + |> List.flatten() + + assert Fusion.SshBackend in behaviours + end + + test "exports all required callback functions" do + Code.ensure_loaded!(unquote(module)) + + for {fun, arity} <- [ + connect: 1, + forward_tunnel: 4, + reverse_tunnel: 4, + exec: 2, + exec_async: 2, + close: 1 + ] do + assert function_exported?(unquote(module), fun, arity), + "expected #{inspect(unquote(module))}.#{fun}/#{arity} to be exported" + end + end + end + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 82abcd8..1361c58 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1,3 @@ -ExUnit.start(exclude: [:integration, :external]) +base_excludes = [:external] +env_excludes = if Node.alive?(), do: [:not_distributed], else: [:distributed] +ExUnit.start(exclude: base_excludes ++ env_excludes)