Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cmd/ingestor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,24 @@ func LoadConfig(path string) (*Config, error) {
}

// ResolvedSources returns the final list of MQTT sources to connect to.
//
// Scheme mapping:
//
// mqtt:// → tcp:// (paho plain TCP)
// mqtts:// → ssl:// (paho TLS over TCP)
// ws:// (paho WebSocket — passed through, no mapping needed)
// wss:// (paho WebSocket TLS — passed through, no mapping needed)
func (c *Config) ResolvedSources() []MQTTSource {
for i := range c.MQTTSources {
// paho uses tcp:// and ssl:// not mqtt:// and mqtts://
// paho uses tcp:// and ssl:// for plain MQTT; ws:// and wss:// are accepted natively.
b := c.MQTTSources[i].Broker
if strings.HasPrefix(b, "mqtt://") {
c.MQTTSources[i].Broker = "tcp://" + b[7:]
} else if strings.HasPrefix(b, "mqtts://") {
c.MQTTSources[i].Broker = "ssl://" + b[8:]
}
// ws:// and wss:// pass through unchanged — paho handles WebSocket
// connections natively via gorilla/websocket.
}
return c.MQTTSources
}
91 changes: 91 additions & 0 deletions cmd/ingestor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,94 @@ func TestLoadConfigWithAllFields(t *testing.T) {
t.Errorf("iataFilter=%v", src.IATAFilter)
}
}

// TestResolvedSourcesSchemeMapping verifies that mqtt:// and mqtts:// are translated
// to the paho-native tcp:// and ssl:// schemes, while ws:// and wss:// pass through
// unchanged (paho handles WebSocket connections natively).
func TestResolvedSourcesSchemeMapping(t *testing.T) {
tests := []struct {
input string
want string
}{
{"mqtt://host:1883", "tcp://host:1883"},
{"mqtts://host:8883", "ssl://host:8883"},
{"tcp://host:1883", "tcp://host:1883"}, // already translated
{"ssl://host:8883", "ssl://host:8883"}, // already translated
{"ws://host:9001", "ws://host:9001"}, // WebSocket: pass through
{"wss://host:9001", "wss://host:9001"}, // WebSocket TLS: pass through
{"ws://host:9001/mqtt", "ws://host:9001/mqtt"}, // with path
{"wss://host:9001/mqtt", "wss://host:9001/mqtt"},
}

for _, tt := range tests {
cfg := &Config{
MQTTSources: []MQTTSource{
{Name: "test", Broker: tt.input, Topics: []string{"meshcore/#"}},
},
}
sources := cfg.ResolvedSources()
if got := sources[0].Broker; got != tt.want {
t.Errorf("ResolvedSources(%q) = %q, want %q", tt.input, got, tt.want)
}
}
}

// TestLoadConfigWSSource verifies that a WebSocket MQTT source round-trips through
// LoadConfig correctly — username/password preserved, scheme unchanged.
func TestLoadConfigWSSource(t *testing.T) {
t.Setenv("DB_PATH", "")
t.Setenv("MQTT_BROKER", "")

dir := t.TempDir()
cfgPath := filepath.Join(dir, "config.json")
os.WriteFile(cfgPath, []byte(`{
"dbPath": "test.db",
"mqttSources": [
{
"name": "local-tcp",
"broker": "mqtt://localhost:1883",
"topics": ["meshcore/#"]
},
{
"name": "wsmqtt-ws",
"broker": "wss://wsmqtt.example.com/mqtt",
"username": "corescope",
"password": "s3cr3t",
"topics": ["meshcore/#"]
}
]
}`), 0o644)

cfg, err := LoadConfig(cfgPath)
if err != nil {
t.Fatal(err)
}
if len(cfg.MQTTSources) != 2 {
t.Fatalf("mqttSources len=%d, want 2", len(cfg.MQTTSources))
}

tcp := cfg.MQTTSources[0]
if tcp.Name != "local-tcp" {
t.Errorf("name=%s, want local-tcp", tcp.Name)
}

ws := cfg.MQTTSources[1]
if ws.Name != "wsmqtt-ws" {
t.Errorf("name=%s, want wsmqtt-ws", ws.Name)
}
if ws.Broker != "wss://wsmqtt.example.com/mqtt" {
t.Errorf("broker=%s, want wss://wsmqtt.example.com/mqtt", ws.Broker)
}
if ws.Username != "corescope" {
t.Errorf("username=%s, want corescope", ws.Username)
}
if ws.Password != "s3cr3t" {
t.Errorf("password=%s, want s3cr3t", ws.Password)
}

// Verify ResolvedSources leaves wss:// intact
sources := cfg.ResolvedSources()
if sources[1].Broker != "wss://wsmqtt.example.com/mqtt" {
t.Errorf("ResolvedSources wss broker=%s, want unchanged", sources[1].Broker)
}
}
7 changes: 5 additions & 2 deletions cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ func main() {
opts.SetPassword(source.Password)
}
if source.RejectUnauthorized != nil && !*source.RejectUnauthorized {
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
} else if strings.HasPrefix(source.Broker, "ssl://") {
// rejectUnauthorized: false — skip TLS verification (self-signed certs)
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) //nolint:gosec
} else if strings.HasPrefix(source.Broker, "ssl://") || strings.HasPrefix(source.Broker, "wss://") {
// TLS with system CA pool — valid for Caddy-proxied endpoints and
// any broker with a publicly-trusted certificate.
opts.SetTLSConfig(&tls.Config{})
}

Expand Down
12 changes: 11 additions & 1 deletion config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@
"OAK",
"MRY"
]
},
{
"_comment": "WebSocket MQTT broker (e.g. meshcore-mqtt-broker). Use ws:// for plain WebSocket or wss:// for TLS. Username/password supported.",
"name": "wsmqtt",
"broker": "wss://wsmqtt.example.com/mqtt",
"username": "corescope",
"password": "your-password",
"topics": [
"meshcore/#"
]
}
],
"channelKeys": {
Expand Down Expand Up @@ -218,7 +228,7 @@
"maxAgeDays": 5,
"_comment": "Neighbor edges older than this many days are pruned on startup and daily. Default: 5."
},
"_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).",
"_comment_mqttSources": "Each source connects to an MQTT broker. Supported schemes: mqtt:// (plain TCP), mqtts:// (TLS), ws:// (WebSocket), wss:// (WebSocket TLS). topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).",
"_comment_channelKeys": "Hex keys for decrypting channel messages. Key name = channel display name. public channel key is well-known.",
"_comment_hashChannels": "Channel names whose keys are derived via SHA256. Key = SHA256(name)[:16]. Listed here so the ingestor can auto-derive keys.",
"_comment_defaultRegion": "IATA code shown by default in region filters.",
Expand Down