From d5a1e0717e330cb1c319ffd59e1b58a64bfb830f Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Jul 2026 16:52:48 -0500 Subject: [PATCH 1/4] feat(skynet): hold+reuse multihop routes via a yamux skyfwd mux (fix route-death) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The skynet resolving proxy "only worked well for direct routes": over a multihop route the routing rules expire and the whole route re-sets-up on every reconnect (seconds). Root cause — a rule's 10-min TTL is refreshed only by an OPEN RouteGroup's keepalive, and the native `.skynet` proxy dialed a FRESH route per SOCKS5 connection with no caching (embedded_skynetweb.go), closing it when the connection ended. Direct re-setup is instant; multihop isn't. skysocks-lite never had this because it caches+yamux-muxes its route and holds it open. Generalize that pattern: - **pkg/skyroute.Pool** — holds ONE route group per destination PK and yamux-muxes logical connections over it, so the held route's keepalive keeps every hop warm and reconnects reuse it with zero setup. Idle groups are reclaimed after ~10 min (DefaultIdleTTL); a holder that's done (e.g. a closing iframe window) can Release one eagerly. Because the route lives on the visor, this needs no browser "page-open" signal — it fixes the external-browser SOCKS5 case and the in-tab iframe case identically. - **skyfwd mux server** (skyenv.SkyForwardingMuxPort = 59) — one accepted route group carries a yamux session; each stream runs the SAME ready-byte + ClientMsg handshake + forward as the 1:1 SkyForwardingServerPort, reusing handleServerConn. The route group's peer PK is carried onto each stream (muxPeerConn) so the per-port PK whitelist still works. Additive + version-negotiated: a caller dials the mux port and falls back to the 1:1 port (ErrNoMux, negative-cached) against older visors. - **routerSkynetDialer** dials through the pool for the route path (direct transport + explicit source routes are unchanged); a real route-setup failure is surfaced, only ErrNoMux falls through to the legacy 1:1 dial. Unit-tested (pkg/skyroute): route-group reuse, ErrNoMux fallback + negative cache, idle reap, eager Release. Full build + vet clean. Design + the wider unified routing-control plan (skysocks-lite + skychat as the other two consumers of RoutingPolicy, the iframe control surface, and a voice-chat assessment) in docs/skynet-routing-control-rfc.md. This is Phase 1. --- docs/skynet-routing-control-rfc.md | 161 ++++++++++++++++ pkg/skyenv/skyenv.go | 11 ++ pkg/skyroute/pool.go | 286 +++++++++++++++++++++++++++++ pkg/skyroute/pool_test.go | 145 +++++++++++++++ pkg/visor/embedded_skynetweb.go | 49 +++++ pkg/visor/init_services.go | 93 ++++++++++ 6 files changed, 745 insertions(+) create mode 100644 docs/skynet-routing-control-rfc.md create mode 100644 pkg/skyroute/pool.go create mode 100644 pkg/skyroute/pool_test.go diff --git a/docs/skynet-routing-control-rfc.md b/docs/skynet-routing-control-rfc.md new file mode 100644 index 0000000000..dd0d816367 --- /dev/null +++ b/docs/skynet-routing-control-rfc.md @@ -0,0 +1,161 @@ +# RFC: Skynet routing control & multihop route reuse + +Status: draft · Author: operator + Claude · Relates to: skysocks-client-lite, the +resolving SOCKS5 proxy, skychat, and the wasm-visor iframe browser. + +## 1. Problem + +The skynet **resolving proxy** "only works well for a direct route." Over a +**multihop** route the routing rules expire and the whole route has to be set up +again — a multi-second stall on every reconnect — so multihop skynet browsing is +effectively unusable through a browser-configured SOCKS5 proxy. + +Separately, we now have **two-and-a-half implementations of the same thing** — dial +a PK over skynet with some routing policy, then carry bytes: + +- **skysocks-client-lite** (`cmd/wasm-visor/skysocks_js.go`) — clearnet egress. +- **the resolving proxy** (native `pkg/skynetweb` + wasm `fetchDmsg`) — skynet/dmsg sites. +- **skychat** (`skynet:1` networker) — messages, which also pick a route. + +None of them expose any routing control, and only one of them (skysocks-lite) +actually keeps its route alive. This RFC unifies the three and fixes the +route-death with a single primitive. + +## 2. Root cause of route-death (measured in code) + +- A routing rule's TTL is **10 min** — `DefaultRouteKeepAlive` (`pkg/router/router.go:34`), + checked by `ruleIsTimedOut` (`pkg/routing/table.go:261`), GC'd every 10 s + (`DefaultRulesGCInterval`, `router.go:36`; `router_gc.go`). +- Rules are refreshed **only by an open RouteGroup's own keepalive loop** — every + 5 min (`defaultRouteGroupKeepAliveInterval = DefaultRouteKeepAlive/2`, + `route_group.go:27`), which re-arms **every intermediate hop** (`sendKeepAlive`, + `route_group.go:1433`; `handleKeepAlivePacket` forwards it downstream, + `router_packet.go:206`). An open-but-idle RouteGroup keeps all hops warm; a + **closed** one stops refreshing and every hop's rule is reaped within ≤10 min. +- The native `.skynet` proxy dials a **fresh route per SOCKS5 connection with no + caching** — `serveSOCKS5`'s Dial calls `DialSkynet` once per connection + (`skynetweb/runtime.go:249`) → `router.DialRoutes` with a fresh ephemeral port + (`embedded_skynetweb.go:273-274`) — and the RouteGroup closes when that + connection ends. Browsers open/close SOCKS5 connections constantly, so every + page load re-runs route-finding + rule-setup across all hops. + +**Why direct "works" and multihop doesn't:** re-setup of a *direct* route is +instant (one transport, one hop); re-setup of a *multihop* route costs a +route-finder round-trip plus rule installation at every hop. Same code path, wildly +different cost. skysocks-lite doesn't exhibit this because it **caches and holds** +its route group per window (`skysocksSessions`, `skysocks_js.go:65`) and yamux-muxes +over it, so the keepalive loop keeps the multihop rules warm for the window's life. + +## 3. What each surface does today + +| Surface | Transport | Route reuse | Muxed? | Policy control | +|---|---|---|---|---| +| skysocks-lite (`skysocks_js.go`) | `DialRoutes`, `MuxRoutes=2` | **yes** — session cached per window | yes (yamux) | hardcoded | +| native resolving proxy `.skynet` (`embedded_skynetweb.go`) | `DialRoutes`, default opts | **no** — fresh per conn | no (1:1 forwarder) | none | +| native resolving proxy `.dmsg` | direct dmsg stream | n/a (long-lived session) | dmsg yamux | none | +| wasm resolving proxy (`fetchDmsg`) | **dmsg-direct only** (`FetchOverDmsg`) | n/a | dmsg | none | +| skychat `skynet:1` | `DialRoutes` via `NewSkywireNetworker` | per-conn (app framework) | app | none | + +Two facts worth calling out: + +- The **skynet forwarding server is a raw 1:1 port-forwarder** — `PerformHandshake` + dials a port and pipes (`skynetweb/runtime.go:58-63`); there is no `Accept` + loop, so a route can carry exactly one forwarded connection. This is *why* a + cache-map alone can't fix it: you can't reuse a 1:1 conn for a second browser + request. Reuse requires **muxing**. +- The **wasm iframe resolving proxy is dmsg-direct for everything**, including + `.skynet` — so it has no route-death problem *and* no real skynet multihop + routing. The route-death is a native-external-browser problem; giving the iframe + true skynet routing is a separate enhancement (§6). + +## 4. The fix: a held, muxed route group with idle-TTL (one fix for both cases) + +The operator's instinct was to hold the route open "while the page is open," and to +worry that an external browser can't signal that. The better lever is that **the +route lives on the visor, not the browser** — so no browser-side signal is needed. + +Generalize the skysocks-lite pattern into a shared primitive: + +> **`skyroute.Pool`** — keyed by `destPK` (+ remote port). `Get(destPK)` returns a +> **held, yamux-muxed RouteGroup**, dialing + caching it on first use and reusing +> it afterwards. Each caller opens a **yamux stream** per logical connection. The +> pool evicts a route group after an **idle TTL** (no open streams for N seconds, +> default 2–5 min) and closes it — until then its keepalive keeps every multihop +> hop warm, so the *next* request reuses a warm route with **zero setup**. + +This requires making the **skynet forwarding server yamux-aware**: one route group += one yamux session; each accepted stream runs the existing `PerformHandshake` + +forward. (Version-negotiated / new forwarding port so old peers still work 1:1.) + +Consequences: + +- **External-browser SOCKS5** benefit with no page-open signal — the visor holds + the warm route across the browser's connection churn; idle-TTL reclaims it. +- **iframe browser** benefits identically; window-close becomes an *optional eager + release* (free the route now instead of waiting for the idle TTL) — a + nice-to-have, not the mechanism. +- **skysocks-lite** collapses onto the same pool (it already muxes; just swap its + private `skysocksSessions` map for `skyroute.Pool`). +- **skychat** `skynet:1` messages ride the same warm pooled routes. + +## 5. Unified routing policy + +Extract a `RoutingPolicy` (a `router.DialOptions` preset) that all three surfaces +consume, instead of each hardcoding: + +``` +type RoutingPolicy struct { + MuxRoutes int // parallel routes (skysocks-lite uses 2) + MinHops int // 0 = direct-when-available; >=2 = force multihop + HoldTTL time.Duration // idle route-group hold (pool eviction) + Finder string // "latency" (default) | "hops" | ... +} +``` + +`skyroute.Pool` takes a `RoutingPolicy`; `DialSkynet`, skysocks-lite, and the +skychat networker all dial through it. One place to tune, one behavior everywhere. + +## 6. Control surface + +- **iframe ⚙ panel** (next to the proxy-log 🐞 toggle in `browse.js`): mux-route + count, "force multihop / min hops," "hold route" duration, and a **live + active-routes view** (hops + per-leg latency for the current dest). Drives both + the skysocks-lite window and the resolving-proxy window through the shared + policy. New JS hooks `skywireVisor.routePolicy(get/set)` + `skywireVisor.routes()` + (all async — they now cross the Web Worker boundary; see the worker migration). +- **skychat**: a per-conversation "route" selector (dmsg-direct vs skynet, mux, + min-hops) — the same policy object, so "control the exact route a message is sent + over" falls out for free. +- **native mirror**: `cli` flags / config for the external-browser SOCKS5 proxy so + it gets the same policy (hold-TTL, mux, min-hops). + +## 7. Phases + +1. **`skyroute.Pool` + yamux-aware forwarding server** (version-negotiated). Port + the native `.skynet` proxy onto it → fixes route-death. *Verifiable with a real + external browser on a multihop route.* +2. **Extract `RoutingPolicy`**; move skysocks-lite onto `skyroute.Pool` (dedup the + two impls); route skychat `skynet:1` through it. +3. **Control surface**: iframe ⚙ routing panel + active-routes view; native cli + mirror; optional real `DialRoutes` skynet path for the wasm iframe resolving + proxy (so it can do multihop, gated by the same policy). + +## Appendix A: skychat voice chat (separate track) + +Three distinct projects, in ascending difficulty: + +1. **wasm↔wasm over WebRTC — easy, best quality.** `getUserMedia` → media track on + `RTCPeerConnection` (Opus, jitter buffer, AEC, PLC all built in), signaling over + the existing WebRTC-over-dmsg channel. ~days. Caveat: `RTCPeerConnection` is not + available in a Web Worker, so the voice PeerConnection must live on the main + thread (or via the deferred main-thread PC-proxy the worker migration will want + anyway). Does **not** ride skywire routes — orthogonal to this RFC. +2. **Voice over skywire routes — hard.** Reliable/ordered routes+dmsg cause + head-of-line stalls; needs a **datagram transport** (faithful-UDP-over-dmsg, + ~40% built & stalled, or QUIC datagrams) + an Opus codec in Go/wasm + a jitter + buffer. Weeks; blocked on datagram transport. +3. **Native-visor voice — architectural friction.** No built-in audio pipeline; + mic/Opus/playback via cgo fights the pure-Go/TinyGo constraint → belongs in a + separate module (like `frank`), never in skywire's main `go.mod`. + +Recommendation: if voice is wanted, do (1) as a self-contained wasm-visor feature. diff --git a/pkg/skyenv/skyenv.go b/pkg/skyenv/skyenv.go index f0eb53b0f6..35b346bdd5 100644 --- a/pkg/skyenv/skyenv.go +++ b/pkg/skyenv/skyenv.go @@ -199,6 +199,17 @@ const ( // Previously 47 — conflicted with DmsgTransportSetupPort SkyForwardingServerPort uint16 = 57 + // SkyForwardingMuxPort is the yamux-multiplexed variant of the skyfwd server: + // one accepted route group carries a yamux session, and each stream runs the + // SAME ready-byte + ClientMsg handshake as SkyForwardingServerPort. This lets a + // caller hold ONE multihop route open and reuse it across many short + // connections (the route's keepalive keeps every hop warm), instead of dialing + // a fresh route per connection — the fix for multihop skynet routes dying under + // the resolving proxy. Version negotiation is by port availability: a caller + // dials this port for route reuse and falls back to the 1:1 + // SkyForwardingServerPort against older visors that don't serve it. + SkyForwardingMuxPort uint16 = 59 + // SkyPingPort dmsg port of sky ping // Previously 48 — conflicted with DmsgTransportSetupServicePort SkyPingPort uint16 = 58 diff --git a/pkg/skyroute/pool.go b/pkg/skyroute/pool.go new file mode 100644 index 0000000000..3509e02c4f --- /dev/null +++ b/pkg/skyroute/pool.go @@ -0,0 +1,286 @@ +// Package skyroute holds and reuses multihop skynet routes so that short-lived +// connections — browser SOCKS5 requests through the resolving proxy, skysocks-lite +// fetches, skychat messages — don't each pay a fresh route setup. +// +// A skywire routing rule expires after DefaultRouteKeepAlive (10 min) unless an +// OPEN RouteGroup's keepalive loop keeps re-arming every hop. The resolving proxy +// historically dialed a fresh route per connection and closed it when the +// connection ended, so multihop routes died and every page load re-ran +// route-finding + per-hop rule setup (seconds). Direct routes re-set-up instantly, +// which is why "it only worked for direct routes". +// +// Pool fixes that by holding ONE route group per destination open and multiplexing +// many logical connections over it with yamux — exactly the shape skysocks-lite +// already used privately. The held route group's keepalive keeps the multihop hops +// warm, so subsequent connections reuse it with zero setup. The route lives on the +// visor (not the browser), so this needs no "page is open" signal and fixes the +// external-browser SOCKS5 case and the in-tab iframe case identically. An idle +// route group is reclaimed after IdleTTL; a holder that knows it's done (an iframe +// window closing) can Release it eagerly. +// +// The far end must run the yamux-aware skyfwd server (skyenv.SkyForwardingMuxPort). +// Against older visors that don't, the first dial fails, OpenStream returns +// ErrNoMux (negative-cached briefly), and the caller falls back to a 1:1 dial. +package skyroute + +import ( + "context" + "errors" + "net" + "sync" + "time" + + "github.com/hashicorp/yamux" + "github.com/sirupsen/logrus" + + "github.com/skycoin/skywire/pkg/cipher" + "github.com/skycoin/skywire/pkg/skyenv" +) + +// ErrNoMux means the destination's visor does not serve the yamux forwarding port +// (an older build). The caller should fall back to a 1:1 SkyForwardingServerPort +// dial. The pool negative-caches this per destination so it isn't re-probed on +// every request. +var ErrNoMux = errors.New("skyroute: destination does not serve the mux forwarding port") + +const ( + // DefaultIdleTTL is how long a route group is held open after its last stream + // closes before the reaper reclaims it. The route's keepalive keeps the hops + // warm during this window so reconnects reuse it with no setup. + DefaultIdleTTL = 10 * time.Minute + // negativeCacheTTL is how long a destination found to lack mux support is + // skipped (returns ErrNoMux without re-dialing) before being re-probed. + negativeCacheTTL = 5 * time.Minute + // firstStreamTimeout bounds the first yamux stream open, so a route dialed to a + // non-listening mux port is detected as ErrNoMux quickly instead of blocking on + // yamux keepalive (~30s). + firstStreamTimeout = 12 * time.Second +) + +// DialRoute dials the destination's forwarding server on muxPort over a (possibly +// multihop) route and returns the route-group net.Conn. The pool yamux-muxes over +// whatever conn this returns; it can take several seconds (route setup) and is +// always called without the pool lock held. +type DialRoute func(ctx context.Context, dest cipher.PubKey, muxPort uint16) (net.Conn, error) + +type heldSession struct { + sess *yamux.Session + lastActive time.Time // last time this route group had ≥1 stream (or was opened) +} + +// Pool holds and reuses route groups keyed by destination PK. +type Pool struct { + dial DialRoute + idleTTL time.Duration + log logrus.FieldLogger + + mu sync.Mutex + sessions map[cipher.PubKey]*heldSession + negative map[cipher.PubKey]time.Time // dest -> earliest re-probe time + closed bool + stop chan struct{} +} + +// New builds a Pool. idleTTL ≤ 0 uses DefaultIdleTTL. A nil log discards output. +func New(dial DialRoute, idleTTL time.Duration, log logrus.FieldLogger) *Pool { + if idleTTL <= 0 { + idleTTL = DefaultIdleTTL + } + if log == nil { + l := logrus.New() + l.SetOutput(discard{}) + log = l + } + p := &Pool{ + dial: dial, + idleTTL: idleTTL, + log: log, + sessions: map[cipher.PubKey]*heldSession{}, + negative: map[cipher.PubKey]time.Time{}, + stop: make(chan struct{}), + } + go p.reapLoop() + return p +} + +// OpenStream returns a stream to dest's skyfwd server over a HELD, reused route +// group, dialing + caching the route group on first use (or after the previous one +// died). The caller then runs the skynet handshake (skynetweb.PerformHandshake) on +// the returned conn exactly as it would on a fresh route. Returns ErrNoMux when +// dest doesn't serve the mux port (caller should fall back to a 1:1 dial). +func (p *Pool) OpenStream(ctx context.Context, dest cipher.PubKey) (net.Conn, error) { + p.mu.Lock() + if p.closed { + p.mu.Unlock() + return nil, errors.New("skyroute: pool closed") + } + // Reuse an open session. + if s, ok := p.sessions[dest]; ok { + if !s.sess.IsClosed() { + s.lastActive = time.Now() + sess := s.sess + p.mu.Unlock() + if stream, err := sess.Open(); err == nil { + return stream, nil + } + // Session is up but won't yield a stream — drop it and redial below. + p.mu.Lock() + if cur, ok := p.sessions[dest]; ok && cur.sess == sess { + _ = sess.Close() //nolint:errcheck + delete(p.sessions, dest) + } + } else { + _ = s.sess.Close() //nolint:errcheck + delete(p.sessions, dest) + } + } + // Skip destinations recently found to lack mux support. + if t, ok := p.negative[dest]; ok { + if time.Now().Before(t) { + p.mu.Unlock() + return nil, ErrNoMux + } + delete(p.negative, dest) + } + p.mu.Unlock() + + // Dial the mux forwarding port over a route (may take seconds; no lock held). + conn, err := p.dial(ctx, dest, skyenv.SkyForwardingMuxPort) + if err != nil { + return nil, err + } + sess, err := yamux.Client(conn, yamux.DefaultConfig()) + if err != nil { + _ = conn.Close() //nolint:errcheck + return nil, err + } + // First stream doubles as a liveness probe: if the remote isn't serving the mux + // port, it won't accept, so bound the open and treat a timeout as ErrNoMux. + stream, err := openWithTimeout(ctx, sess, firstStreamTimeout) + if err != nil { + _ = sess.Close() //nolint:errcheck + _ = conn.Close() //nolint:errcheck + p.mu.Lock() + p.negative[dest] = time.Now().Add(negativeCacheTTL) + p.mu.Unlock() + return nil, ErrNoMux + } + // Publish, unless a concurrent dial for the same dest already won. + p.mu.Lock() + if p.closed { + p.mu.Unlock() + _ = stream.Close() //nolint:errcheck + _ = sess.Close() //nolint:errcheck + return nil, errors.New("skyroute: pool closed") + } + if cur, ok := p.sessions[dest]; ok && !cur.sess.IsClosed() { + other := cur.sess + p.mu.Unlock() + _ = stream.Close() //nolint:errcheck + _ = sess.Close() //nolint:errcheck + if s2, err := other.Open(); err == nil { + return s2, nil + } + return nil, errors.New("skyroute: raced session unusable") + } + p.sessions[dest] = &heldSession{sess: sess, lastActive: time.Now()} + p.mu.Unlock() + return stream, nil +} + +// Release eagerly closes and drops dest's held route group (e.g. an iframe window +// that owned it has closed). Idempotent. +func (p *Pool) Release(dest cipher.PubKey) { + p.mu.Lock() + defer p.mu.Unlock() + if s, ok := p.sessions[dest]; ok { + _ = s.sess.Close() //nolint:errcheck + delete(p.sessions, dest) + } +} + +// Close tears down the pool and every held route group. +func (p *Pool) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { + return nil + } + p.closed = true + close(p.stop) + for dest, s := range p.sessions { + _ = s.sess.Close() //nolint:errcheck + delete(p.sessions, dest) + } + return nil +} + +func (p *Pool) reapLoop() { + t := time.NewTicker(p.idleTTL / 3) + defer t.Stop() + for { + select { + case <-p.stop: + return + case <-t.C: + p.reap() + } + } +} + +// reap closes route groups that have had no streams for longer than idleTTL, and +// expires stale negative-cache entries. A session with live streams has its +// lastActive refreshed so it only idles out after its last stream closes. +func (p *Pool) reap() { + p.mu.Lock() + defer p.mu.Unlock() + now := time.Now() + for dest, s := range p.sessions { + if s.sess.IsClosed() { + delete(p.sessions, dest) + continue + } + if s.sess.NumStreams() > 0 { + s.lastActive = now + continue + } + if now.Sub(s.lastActive) > p.idleTTL { + _ = s.sess.Close() //nolint:errcheck + delete(p.sessions, dest) + p.log.WithField("dest", dest.String()).Debug("skyroute: released idle route group") + } + } + for dest, t := range p.negative { + if now.After(t) { + delete(p.negative, dest) + } + } +} + +// openWithTimeout runs sess.Open bounded by the smaller of d and ctx, closing the +// session on timeout so the blocked Open unwinds. +func openWithTimeout(ctx context.Context, sess *yamux.Session, d time.Duration) (net.Conn, error) { + type res struct { + c net.Conn + err error + } + ch := make(chan res, 1) + go func() { + c, err := sess.Open() + ch <- res{c, err} + }() + timer := time.NewTimer(d) + defer timer.Stop() + select { + case r := <-ch: + return r.c, r.err + case <-timer.C: + return nil, errors.New("skyroute: first stream open timed out") + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +type discard struct{} + +func (discard) Write(p []byte) (int, error) { return len(p), nil } diff --git a/pkg/skyroute/pool_test.go b/pkg/skyroute/pool_test.go new file mode 100644 index 0000000000..671ecfd181 --- /dev/null +++ b/pkg/skyroute/pool_test.go @@ -0,0 +1,145 @@ +package skyroute + +import ( + "context" + "io" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/hashicorp/yamux" + "github.com/stretchr/testify/require" + + "github.com/skycoin/skywire/pkg/cipher" +) + +// echoServer runs a yamux server on conn that echoes each accepted stream. +func echoServer(conn net.Conn) { + sess, err := yamux.Server(conn, yamux.DefaultConfig()) + if err != nil { + return + } + for { + st, err := sess.Accept() + if err != nil { + return + } + go func(s net.Conn) { _, _ = io.Copy(s, s); _ = s.Close() }(st) + } +} + +func testPK(b byte) cipher.PubKey { + var pk cipher.PubKey + pk[0] = 0x02 + pk[1] = b + return pk +} + +func roundtrip(t *testing.T, c net.Conn, msg string) { + t.Helper() + _, err := c.Write([]byte(msg)) + require.NoError(t, err) + buf := make([]byte, len(msg)) + _, err = io.ReadFull(c, buf) + require.NoError(t, err) + require.Equal(t, msg, string(buf)) +} + +// A working mux far-end reuses ONE route group across many OpenStream calls. +func TestPool_ReusesRouteGroup(t *testing.T) { + var dials int32 + dial := func(_ context.Context, _ cipher.PubKey, _ uint16) (net.Conn, error) { + atomic.AddInt32(&dials, 1) + a, b := net.Pipe() + go echoServer(b) + return a, nil + } + p := New(dial, time.Minute, nil) + defer p.Close() + + dest := testPK(1) + for i := 0; i < 4; i++ { + s, err := p.OpenStream(context.Background(), dest) + require.NoError(t, err) + roundtrip(t, s, "ping") + require.NoError(t, s.Close()) + } + require.EqualValues(t, 1, atomic.LoadInt32(&dials), "should dial the route once and reuse it") +} + +// A far-end that isn't serving the mux port → ErrNoMux, then negative-cached (no +// re-dial) so the caller falls back cheaply. +func TestPool_NoMuxFallbackAndNegativeCache(t *testing.T) { + var dials int32 + dial := func(_ context.Context, _ cipher.PubKey, _ uint16) (net.Conn, error) { + atomic.AddInt32(&dials, 1) + a, b := net.Pipe() + _ = b.Close() // no yamux server → session dies immediately + return a, nil + } + p := New(dial, time.Minute, nil) + defer p.Close() + + dest := testPK(2) + _, err := p.OpenStream(context.Background(), dest) + require.ErrorIs(t, err, ErrNoMux) + // Second call is served from the negative cache — no second dial. + _, err = p.OpenStream(context.Background(), dest) + require.ErrorIs(t, err, ErrNoMux) + require.EqualValues(t, 1, atomic.LoadInt32(&dials), "no-mux destination must be negative-cached") +} + +// After the idle TTL with no open streams, the held route group is reclaimed and +// the next OpenStream re-dials. +func TestPool_IdleReap(t *testing.T) { + var dials int32 + dial := func(_ context.Context, _ cipher.PubKey, _ uint16) (net.Conn, error) { + atomic.AddInt32(&dials, 1) + a, b := net.Pipe() + go echoServer(b) + return a, nil + } + p := New(dial, 60*time.Millisecond, nil) + defer p.Close() + + dest := testPK(3) + s, err := p.OpenStream(context.Background(), dest) + require.NoError(t, err) + roundtrip(t, s, "a") + require.NoError(t, s.Close()) + + require.Eventually(t, func() bool { + p.mu.Lock() + _, held := p.sessions[dest] + p.mu.Unlock() + return !held + }, 2*time.Second, 20*time.Millisecond, "idle route group should be reaped") + + _, err = p.OpenStream(context.Background(), dest) + require.NoError(t, err) + require.EqualValues(t, 2, atomic.LoadInt32(&dials), "should re-dial after idle reap") +} + +// Release eagerly drops the held route group so the next call re-dials. +func TestPool_Release(t *testing.T) { + var dials int32 + dial := func(_ context.Context, _ cipher.PubKey, _ uint16) (net.Conn, error) { + atomic.AddInt32(&dials, 1) + a, b := net.Pipe() + go echoServer(b) + return a, nil + } + p := New(dial, time.Minute, nil) + defer p.Close() + + dest := testPK(4) + s, err := p.OpenStream(context.Background(), dest) + require.NoError(t, err) + require.NoError(t, s.Close()) + + p.Release(dest) + _, err = p.OpenStream(context.Background(), dest) + require.NoError(t, err) + require.EqualValues(t, 2, atomic.LoadInt32(&dials), "Release should force a re-dial") +} diff --git a/pkg/visor/embedded_skynetweb.go b/pkg/visor/embedded_skynetweb.go index 5992e49b4d..98ed081944 100644 --- a/pkg/visor/embedded_skynetweb.go +++ b/pkg/visor/embedded_skynetweb.go @@ -13,6 +13,7 @@ package visor import ( "context" + "errors" "fmt" "net" "sync" @@ -31,6 +32,7 @@ import ( "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/skyenv" "github.com/skycoin/skywire/pkg/skynetweb" + "github.com/skycoin/skywire/pkg/skyroute" "github.com/skycoin/skywire/pkg/transport" types "github.com/skycoin/skywire/pkg/transport/types" "github.com/skycoin/skywire/pkg/visor/visorconfig" @@ -211,6 +213,16 @@ func (e *EmbeddedSkynetWeb) serve(ctx context.Context) { skynetMuxPtr: e.skynetMux, routeTimeout: time.Duration(e.cfg.RouteTimeout), } + // Hold + reuse multihop routes across the resolving proxy's short-lived SOCKS5 + // connections: without this, every connection dials a fresh route and closes it, + // so multihop routes die and re-set-up each time (the "only works for direct + // routes" limitation). The pool keeps one route group per destination warm and + // yamux-muxes over it; idle groups reclaim after ~10 min. See pkg/skyroute. + dialer.pool = skyroute.New(dialer.dialRouteForPool, skyroute.DefaultIdleTTL, e.log) + go func() { + <-ctx.Done() + _ = dialer.pool.Close() //nolint:errcheck + }() if err := skynetweb.Run(ctx, e.log, dialer, cfg); err != nil && err != context.Canceled { e.log.WithError(err).Warn("skynetweb runtime stopped") } @@ -226,6 +238,21 @@ type routerSkynetDialer struct { skynetMuxPtr **transport.VStreamMux // shared with forwarding server; deref at dial time routeTimeout time.Duration // 0 = use DefaultRouteKeepAlive nextPort uint32 // ephemeral port counter for route fallback + pool *skyroute.Pool // holds+reuses multihop routes per dest (nil = disabled) +} + +// dialRouteForPool dials the destination's mux forwarding port over a route, for +// skyroute.Pool. Mirrors the route-based path in DialSkynet (fresh ephemeral local +// port, optional keep-alive override) but targets muxPort so the pool can yamux the +// route group. +func (d *routerSkynetDialer) dialRouteForPool(ctx context.Context, dest cipher.PubKey, muxPort uint16) (net.Conn, error) { + var opts *router.DialOptions + if d.routeTimeout > 0 { + opts = router.DefaultDialOptions() + opts.KeepAlive = d.routeTimeout + } + lPort := routing.Port(atomic.AddUint32(&d.nextPort, 1)) //nolint:gosec + return d.router.DialRoutes(ctx, dest, lPort, routing.Port(muxPort), opts) } func (d *routerSkynetDialer) DialSkynet(ctx context.Context, remote cipher.PubKey, port uint16, route []skynetweb.RouteLabel) (net.Conn, error) { @@ -265,6 +292,28 @@ func (d *routerSkynetDialer) DialSkynet(ctx context.Context, remote cipher.PubKe // No direct transport (dial failed) — fall through to route-based dial. } + // Reuse a HELD, yamux-muxed multihop route via the pool: dial the mux forwarding + // port ONCE, then open a stream per connection. The held route group's keepalive + // keeps every hop warm, so reconnects don't re-run route setup — the fix for + // multihop skynet routes dying under the resolving proxy. Falls back to a fresh + // 1:1 route dial only against older visors that don't serve the mux port. + if d.pool != nil { + stream, err := d.pool.OpenStream(ctx, remote) + if err == nil { + if hErr := skynetweb.PerformHandshake(stream, port); hErr != nil { + _ = stream.Close() //nolint:errcheck,gosec + return nil, hErr + } + return stream, nil + } + if !errors.Is(err, skyroute.ErrNoMux) { + // A real route-setup failure — the 1:1 path would fail identically, so + // surface it rather than masking it behind a second route attempt. + return nil, err + } + d.log.WithField("remote", remote.String()).Debug("Skynet: mux forwarding unavailable; using 1:1 route") + } + var opts *router.DialOptions if d.routeTimeout > 0 { opts = router.DefaultDialOptions() diff --git a/pkg/visor/init_services.go b/pkg/visor/init_services.go index d4a4370904..ec41a782a9 100644 --- a/pkg/visor/init_services.go +++ b/pkg/visor/init_services.go @@ -15,6 +15,7 @@ import ( "time" "github.com/google/uuid" + "github.com/hashicorp/yamux" "github.com/skycoin/skywire/pkg/app/appevent" "github.com/skycoin/skywire/pkg/app/appnet" @@ -309,6 +310,32 @@ func initSkywireForwardConn(ctx context.Context, v *Visor, log *logging.Logger) } }() + // Also accept yamux-multiplexed forwarding route groups on SkyForwardingMuxPort: + // one accepted route group carries a yamux session, and each accepted stream runs + // the same handshake + forward as the 1:1 server above. This lets a client hold + // ONE multihop route open (its keepalive keeps the hops warm) and reuse it across + // many short connections instead of dialing a fresh route each time — the fix for + // multihop skynet routes dying under the resolving proxy. Additive and + // version-negotiated: clients that want reuse dial this port and fall back to + // SkyForwardingServerPort against older visors. See pkg/skyroute. + muxApp := appnet.Addr{ + Net: appnet.TypeSkynet, + PubKey: v.conf.PK, + Port: routing.Port(skyenv.SkyForwardingMuxPort), + } + if ml, err := appnet.ListenContext(ctx, muxApp); err != nil { + log.WithError(err).Warn("Failed to listen on skynet mux forwarding port; route reuse disabled") + } else { + v.pushCloseStack("sky_forwarding_mux", func() error { + cancel() + if cErr := ml.Close(); cErr != nil { + log.WithError(cErr).Error("Error closing mux listener.") + } + return nil + }) + go acceptSkyForwardingMux(ctx, log, ml, v) + } + // Also accept direct transport connections via VStreamMux (route ID 0). // This allows peers with a direct transport to skip route setup entirely. log.WithField("tpM_nil", v.tpM == nil).Debug("Checking transport manager for VStreamMux") @@ -394,6 +421,72 @@ func (c *vstreamConn) SetDeadline(_ time.Time) error { return nil } func (c *vstreamConn) SetReadDeadline(_ time.Time) error { return nil } func (c *vstreamConn) SetWriteDeadline(_ time.Time) error { return nil } +// acceptSkyForwardingMux accepts yamux-multiplexed forwarding route groups: each +// accepted route group is a yamux session whose streams each run handleServerConn +// (ready byte + ClientMsg + forward). See skyenv.SkyForwardingMuxPort and +// pkg/skyroute for the client half. +func acceptSkyForwardingMux(ctx context.Context, log *logging.Logger, l net.Listener, v *Visor) { + for { + conn, err := l.Accept() + if err != nil { + if errors.Is(err, appnet.ErrClosedConn) || ctx.Err() != nil { + return + } + log.WithError(err).Warn("Failed to accept mux forwarding conn, continuing") + continue + } + wrapped, err := appnet.WrapConn(conn) + if err != nil { + log.WithError(err).Warn("Failed to wrap mux forwarding conn, continuing") + _ = conn.Close() //nolint:errcheck,gosec + continue + } + go serveSkyForwardingMuxSession(log, wrapped, v) + } +} + +// serveSkyForwardingMuxSession yamux-serves one accepted forwarding route group, +// dispatching each accepted stream to handleServerConn. The route group's peer PK +// is carried onto every stream (muxPeerConn) so the per-port PK whitelist enforced +// in handleServerConn still works over the muxed path. +func serveSkyForwardingMuxSession(log *logging.Logger, routeConn net.Conn, v *Visor) { + defer routeConn.Close() //nolint:errcheck,gosec + var peerPK cipher.PubKey + if pk, ok := remotePKFromForwardingConn(routeConn); ok { + peerPK = pk + } + sess, err := yamux.Server(routeConn, yamux.DefaultConfig()) + if err != nil { + log.WithError(err).Warn("mux forwarding: yamux server init failed") + return + } + defer sess.Close() //nolint:errcheck,gosec + for { + stream, err := sess.Accept() + if err != nil { + return // session closed / route died + } + go func() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Panic in mux stream handler: %v", r) + } + }() + handleServerConn(log, &muxPeerConn{Conn: stream, pk: peerPK}, v) + }() + } +} + +// muxPeerConn makes a yamux stream report the route group's peer PK via RemotePK, +// which remotePKFromForwardingConn prefers — so the forwarded-port PK whitelist +// enforced in handleServerConn works over the muxed path too. +type muxPeerConn struct { + net.Conn + pk cipher.PubKey +} + +func (c *muxPeerConn) RemotePK() cipher.PubKey { return c.pk } + func handleServerConn(log *logging.Logger, remoteConn net.Conn, v *Visor) { // Send ready signal to synchronize with client after noise handshake // This ensures the noise handshake is complete before data exchange From 3a8ef837ab532f19abaf224defb8771f321fbdec Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Jul 2026 17:11:55 -0500 Subject: [PATCH 2/4] =?UTF-8?q?test(skynet):=20integration=20test=20?= =?UTF-8?q?=E2=80=94=20pool=20reuse=20over=20the=20real=20mux=20forwarding?= =?UTF-8?q?=20server?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the REAL yamux mux forwarding server (serveSkyForwardingMuxSession) to the REAL skyroute.Pool over one in-memory conn and asserts the core property: 5 logical connections reuse ONE route group (dial count == 1), each reaching a registered service through the real handleServerConn ready-byte + ClientMsg handshake + dispatch. A RouteGroup already satisfies net.Conn and runs under yamux in production (skysocks-lite), so this covers the mux-server↔pool integration the skyroute unit tests (which mock the far end) don't — without standing up the full transport/ route-setup stack. Also asserts muxPeerConn carries the route group's peer PK (keeps the per-port whitelist working over the muxed path). --- pkg/visor/skyfwd_mux_test.go | 90 ++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 pkg/visor/skyfwd_mux_test.go diff --git a/pkg/visor/skyfwd_mux_test.go b/pkg/visor/skyfwd_mux_test.go new file mode 100644 index 0000000000..fbcd76a9d7 --- /dev/null +++ b/pkg/visor/skyfwd_mux_test.go @@ -0,0 +1,90 @@ +package visor + +import ( + "context" + "io" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/skycoin/skywire/pkg/cipher" + "github.com/skycoin/skywire/pkg/logging" + "github.com/skycoin/skywire/pkg/skynetweb" + "github.com/skycoin/skywire/pkg/skyroute" +) + +// TestSkyForwardingMux_PoolReuseOverRealServer wires the REAL yamux mux forwarding +// server (serveSkyForwardingMuxSession, the accept half) to the REAL skyroute.Pool +// (the client half) over a single in-memory conn, and verifies that many logical +// connections reuse ONE route group and each reaches the registered service through +// the real handleServerConn handshake+dispatch. +// +// A conn is a conn: a router RouteGroup already satisfies net.Conn and is run under +// yamux in production by skysocks-lite, so exercising the mux server + pool over a +// net.Pipe covers the integration the skyroute unit tests (which mock the far end) +// do not — without standing up the full transport/route-setup stack. +func TestSkyForwardingMux_PoolReuseOverRealServer(t *testing.T) { + const echoPort uint16 = 4321 + + // A minimal visor: the service-registry happy path in handleServerConn only + // needs v.services. Register an echo service on echoPort. + reg := NewServiceRegistry() + reg.Register(echoPort, "echo", func(c net.Conn) { + _, _ = io.Copy(c, c) + _ = c.Close() + }) + v := &Visor{services: reg} + log := logging.MustGetLogger("skyfwd_mux_test") + + // One route group == one net.Pipe. The server yamux-serves its end; the pool + // yamux-dials the other end (once) and multiplexes streams over it. + serverConn, clientConn := net.Pipe() + go serveSkyForwardingMuxSession(log, serverConn, v) + + var dials int32 + pool := skyroute.New(func(_ context.Context, _ cipher.PubKey, _ uint16) (net.Conn, error) { + atomic.AddInt32(&dials, 1) + return clientConn, nil // the single held route group + }, time.Minute, nil) + defer pool.Close() + + dest, _ := cipher.GenerateKeyPair() + + for i := 0; i < 5; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + stream, err := pool.OpenStream(ctx, dest) + require.NoError(t, err, "open stream %d", i) + require.NoError(t, skynetweb.PerformHandshake(stream, echoPort), "handshake %d", i) + + msg := []byte("ping") + _, err = stream.Write(msg) + require.NoError(t, err) + buf := make([]byte, len(msg)) + _, err = io.ReadFull(stream, buf) + require.NoError(t, err, "echo read %d", i) + require.Equal(t, "ping", string(buf)) + + _ = stream.Close() + cancel() + } + + require.EqualValues(t, 1, atomic.LoadInt32(&dials), + "all 5 connections must reuse ONE route group, not re-dial") +} + +// TestMuxPeerConn_CarriesPK asserts the yamux-stream wrapper reports the route +// group's peer PK — the property that keeps the per-port whitelist working over the +// muxed path (remotePKFromForwardingConn prefers RemotePK()). +func TestMuxPeerConn_CarriesPK(t *testing.T) { + pk, _ := cipher.GenerateKeyPair() + a, b := net.Pipe() + defer a.Close() //nolint:errcheck + defer b.Close() //nolint:errcheck + c := &muxPeerConn{Conn: a, pk: pk} + got, ok := remotePKFromForwardingConn(c) + require.True(t, ok) + require.Equal(t, pk, got) +} From aa5b6fad657800eca47810633d9ab3ee049ca617 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Jul 2026 17:36:35 -0500 Subject: [PATCH 3/4] test(skynet): satisfy errcheck (nolint on best-effort Close/Copy in tests) --- pkg/skyroute/pool_test.go | 12 ++++++------ pkg/visor/skyfwd_mux_test.go | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/skyroute/pool_test.go b/pkg/skyroute/pool_test.go index 671ecfd181..5808834193 100644 --- a/pkg/skyroute/pool_test.go +++ b/pkg/skyroute/pool_test.go @@ -25,7 +25,7 @@ func echoServer(conn net.Conn) { if err != nil { return } - go func(s net.Conn) { _, _ = io.Copy(s, s); _ = s.Close() }(st) + go func(s net.Conn) { _, _ = io.Copy(s, s); _ = s.Close() }(st) //nolint:errcheck } } @@ -56,7 +56,7 @@ func TestPool_ReusesRouteGroup(t *testing.T) { return a, nil } p := New(dial, time.Minute, nil) - defer p.Close() + defer p.Close() //nolint:errcheck dest := testPK(1) for i := 0; i < 4; i++ { @@ -75,11 +75,11 @@ func TestPool_NoMuxFallbackAndNegativeCache(t *testing.T) { dial := func(_ context.Context, _ cipher.PubKey, _ uint16) (net.Conn, error) { atomic.AddInt32(&dials, 1) a, b := net.Pipe() - _ = b.Close() // no yamux server → session dies immediately + _ = b.Close() //nolint:errcheck // no yamux server → session dies immediately return a, nil } p := New(dial, time.Minute, nil) - defer p.Close() + defer p.Close() //nolint:errcheck dest := testPK(2) _, err := p.OpenStream(context.Background(), dest) @@ -101,7 +101,7 @@ func TestPool_IdleReap(t *testing.T) { return a, nil } p := New(dial, 60*time.Millisecond, nil) - defer p.Close() + defer p.Close() //nolint:errcheck dest := testPK(3) s, err := p.OpenStream(context.Background(), dest) @@ -131,7 +131,7 @@ func TestPool_Release(t *testing.T) { return a, nil } p := New(dial, time.Minute, nil) - defer p.Close() + defer p.Close() //nolint:errcheck dest := testPK(4) s, err := p.OpenStream(context.Background(), dest) diff --git a/pkg/visor/skyfwd_mux_test.go b/pkg/visor/skyfwd_mux_test.go index fbcd76a9d7..2757063034 100644 --- a/pkg/visor/skyfwd_mux_test.go +++ b/pkg/visor/skyfwd_mux_test.go @@ -33,8 +33,8 @@ func TestSkyForwardingMux_PoolReuseOverRealServer(t *testing.T) { // needs v.services. Register an echo service on echoPort. reg := NewServiceRegistry() reg.Register(echoPort, "echo", func(c net.Conn) { - _, _ = io.Copy(c, c) - _ = c.Close() + _, _ = io.Copy(c, c) //nolint:errcheck + _ = c.Close() //nolint:errcheck }) v := &Visor{services: reg} log := logging.MustGetLogger("skyfwd_mux_test") @@ -49,7 +49,7 @@ func TestSkyForwardingMux_PoolReuseOverRealServer(t *testing.T) { atomic.AddInt32(&dials, 1) return clientConn, nil // the single held route group }, time.Minute, nil) - defer pool.Close() + defer pool.Close() //nolint:errcheck dest, _ := cipher.GenerateKeyPair() @@ -67,7 +67,7 @@ func TestSkyForwardingMux_PoolReuseOverRealServer(t *testing.T) { require.NoError(t, err, "echo read %d", i) require.Equal(t, "ping", string(buf)) - _ = stream.Close() + _ = stream.Close() //nolint:errcheck cancel() } From bf7cb0b667149d71c613ef65eda6068af753614f Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Jul 2026 17:36:46 -0500 Subject: [PATCH 4/4] test(skynet): gofmt --- pkg/visor/skyfwd_mux_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/visor/skyfwd_mux_test.go b/pkg/visor/skyfwd_mux_test.go index 2757063034..d3f62256db 100644 --- a/pkg/visor/skyfwd_mux_test.go +++ b/pkg/visor/skyfwd_mux_test.go @@ -34,7 +34,7 @@ func TestSkyForwardingMux_PoolReuseOverRealServer(t *testing.T) { reg := NewServiceRegistry() reg.Register(echoPort, "echo", func(c net.Conn) { _, _ = io.Copy(c, c) //nolint:errcheck - _ = c.Close() //nolint:errcheck + _ = c.Close() //nolint:errcheck }) v := &Visor{services: reg} log := logging.MustGetLogger("skyfwd_mux_test")