diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index 70c18fbe..be191ce3 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -162,15 +162,24 @@ func LoadConfig(path string) (*Config, error) { } // ResolvedSources returns the final list of MQTT sources to connect to. +// +// Scheme mapping: +// +// mqtt:// → tcp:// (paho plain TCP) +// mqtts:// → ssl:// (paho TLS over TCP) +// ws:// (paho WebSocket — passed through, no mapping needed) +// wss:// (paho WebSocket TLS — passed through, no mapping needed) func (c *Config) ResolvedSources() []MQTTSource { for i := range c.MQTTSources { - // paho uses tcp:// and ssl:// not mqtt:// and mqtts:// + // paho uses tcp:// and ssl:// for plain MQTT; ws:// and wss:// are accepted natively. b := c.MQTTSources[i].Broker if strings.HasPrefix(b, "mqtt://") { c.MQTTSources[i].Broker = "tcp://" + b[7:] } else if strings.HasPrefix(b, "mqtts://") { c.MQTTSources[i].Broker = "ssl://" + b[8:] } + // ws:// and wss:// pass through unchanged — paho handles WebSocket + // connections natively via gorilla/websocket. } return c.MQTTSources } diff --git a/cmd/ingestor/config_test.go b/cmd/ingestor/config_test.go index 76b76f10..7f7c3fe1 100644 --- a/cmd/ingestor/config_test.go +++ b/cmd/ingestor/config_test.go @@ -284,3 +284,94 @@ func TestLoadConfigWithAllFields(t *testing.T) { t.Errorf("iataFilter=%v", src.IATAFilter) } } + +// TestResolvedSourcesSchemeMapping verifies that mqtt:// and mqtts:// are translated +// to the paho-native tcp:// and ssl:// schemes, while ws:// and wss:// pass through +// unchanged (paho handles WebSocket connections natively). +func TestResolvedSourcesSchemeMapping(t *testing.T) { + tests := []struct { + input string + want string + }{ + {"mqtt://host:1883", "tcp://host:1883"}, + {"mqtts://host:8883", "ssl://host:8883"}, + {"tcp://host:1883", "tcp://host:1883"}, // already translated + {"ssl://host:8883", "ssl://host:8883"}, // already translated + {"ws://host:9001", "ws://host:9001"}, // WebSocket: pass through + {"wss://host:9001", "wss://host:9001"}, // WebSocket TLS: pass through + {"ws://host:9001/mqtt", "ws://host:9001/mqtt"}, // with path + {"wss://host:9001/mqtt", "wss://host:9001/mqtt"}, + } + + for _, tt := range tests { + cfg := &Config{ + MQTTSources: []MQTTSource{ + {Name: "test", Broker: tt.input, Topics: []string{"meshcore/#"}}, + }, + } + sources := cfg.ResolvedSources() + if got := sources[0].Broker; got != tt.want { + t.Errorf("ResolvedSources(%q) = %q, want %q", tt.input, got, tt.want) + } + } +} + +// TestLoadConfigWSSource verifies that a WebSocket MQTT source round-trips through +// LoadConfig correctly — username/password preserved, scheme unchanged. +func TestLoadConfigWSSource(t *testing.T) { + t.Setenv("DB_PATH", "") + t.Setenv("MQTT_BROKER", "") + + dir := t.TempDir() + cfgPath := filepath.Join(dir, "config.json") + os.WriteFile(cfgPath, []byte(`{ + "dbPath": "test.db", + "mqttSources": [ + { + "name": "local-tcp", + "broker": "mqtt://localhost:1883", + "topics": ["meshcore/#"] + }, + { + "name": "wsmqtt-ws", + "broker": "wss://wsmqtt.example.com/mqtt", + "username": "corescope", + "password": "s3cr3t", + "topics": ["meshcore/#"] + } + ] + }`), 0o644) + + cfg, err := LoadConfig(cfgPath) + if err != nil { + t.Fatal(err) + } + if len(cfg.MQTTSources) != 2 { + t.Fatalf("mqttSources len=%d, want 2", len(cfg.MQTTSources)) + } + + tcp := cfg.MQTTSources[0] + if tcp.Name != "local-tcp" { + t.Errorf("name=%s, want local-tcp", tcp.Name) + } + + ws := cfg.MQTTSources[1] + if ws.Name != "wsmqtt-ws" { + t.Errorf("name=%s, want wsmqtt-ws", ws.Name) + } + if ws.Broker != "wss://wsmqtt.example.com/mqtt" { + t.Errorf("broker=%s, want wss://wsmqtt.example.com/mqtt", ws.Broker) + } + if ws.Username != "corescope" { + t.Errorf("username=%s, want corescope", ws.Username) + } + if ws.Password != "s3cr3t" { + t.Errorf("password=%s, want s3cr3t", ws.Password) + } + + // Verify ResolvedSources leaves wss:// intact + sources := cfg.ResolvedSources() + if sources[1].Broker != "wss://wsmqtt.example.com/mqtt" { + t.Errorf("ResolvedSources wss broker=%s, want unchanged", sources[1].Broker) + } +} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 481c7cc1..ac596695 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -133,8 +133,11 @@ func main() { opts.SetPassword(source.Password) } if source.RejectUnauthorized != nil && !*source.RejectUnauthorized { - opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) - } else if strings.HasPrefix(source.Broker, "ssl://") { + // rejectUnauthorized: false — skip TLS verification (self-signed certs) + opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) //nolint:gosec + } else if strings.HasPrefix(source.Broker, "ssl://") || strings.HasPrefix(source.Broker, "wss://") { + // TLS with system CA pool — valid for Caddy-proxied endpoints and + // any broker with a publicly-trusted certificate. opts.SetTLSConfig(&tls.Config{}) } diff --git a/config.example.json b/config.example.json index 5672ed31..e985d3ee 100644 --- a/config.example.json +++ b/config.example.json @@ -125,6 +125,16 @@ "OAK", "MRY" ] + }, + { + "_comment": "WebSocket MQTT broker (e.g. meshcore-mqtt-broker). Use ws:// for plain WebSocket or wss:// for TLS. Username/password supported.", + "name": "wsmqtt", + "broker": "wss://wsmqtt.example.com/mqtt", + "username": "corescope", + "password": "your-password", + "topics": [ + "meshcore/#" + ] } ], "channelKeys": { @@ -218,7 +228,7 @@ "maxAgeDays": 5, "_comment": "Neighbor edges older than this many days are pruned on startup and daily. Default: 5." }, - "_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).", + "_comment_mqttSources": "Each source connects to an MQTT broker. Supported schemes: mqtt:// (plain TCP), mqtts:// (TLS), ws:// (WebSocket), wss:// (WebSocket TLS). topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).", "_comment_channelKeys": "Hex keys for decrypting channel messages. Key name = channel display name. public channel key is well-known.", "_comment_hashChannels": "Channel names whose keys are derived via SHA256. Key = SHA256(name)[:16]. Listed here so the ingestor can auto-derive keys.", "_comment_defaultRegion": "IATA code shown by default in region filters.",