From 42e42bc4e5ce82d78060611aadd641afbe93ce05 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Sat, 17 Jan 2026 04:40:25 +0000 Subject: [PATCH] Add per-member multicast source filtering Implements IGMPv3/MLDv2-based source filtering for multicast subscriptions, including: - SourceFilter and FilterMode types in oxide-vpc API - Fast-path optimization for EXCLUDE() (accept all) case - Per-member/port filtering on both Tx and Rx paths - Per-next-hop forwarding filtering: Check aggregated SourceFilter before forwarding to each remote sled - Allows skipping forwarding to sleds where no subscriber would accept the packet - This will involve additions to Omicron, but we have to do that anyway for OPTE changes - DTrace probe and stats for filtered/filtered-out packets - Benchmark(s) and integration tests - This PR also includes optimizations to the xde test suite setup for mcast - Note: Contains an API bump - Minor: clippy test cleanup --- .cargo/config.toml | 1 + .github/buildomat/jobs/bench.sh | 1 + .github/buildomat/jobs/test.sh | 3 + .github/buildomat/jobs/xde.sh | 7 + bench/Cargo.toml | 4 + bench/benches/multicast.rs | 134 +++ bin/opteadm/src/bin/opteadm.rs | 14 +- crates/opte-api/src/ip.rs | 75 +- crates/opte-api/src/lib.rs | 2 +- dtrace/opte-mcast-delivery.d | 89 +- lib/oxide-vpc/src/api.rs | 120 +- lib/oxide-vpc/src/engine/geneve.rs | 2 +- lib/oxide-vpc/src/print.rs | 64 +- xde-tests/src/lib.rs | 266 ++++- xde-tests/tests/multicast_multi_nexthop.rs | 30 +- xde-tests/tests/multicast_multi_sub.rs | 95 +- xde-tests/tests/multicast_rx.rs | 45 +- xde-tests/tests/multicast_source_filter.rs | 1195 ++++++++++++++++++++ xde-tests/tests/multicast_validation.rs | 142 +-- xde/src/dev_map.rs | 57 +- xde/src/stats.rs | 26 + xde/src/xde.rs | 364 ++++-- 22 files changed, 2375 insertions(+), 361 deletions(-) create mode 100644 bench/benches/multicast.rs create mode 100644 xde-tests/tests/multicast_source_filter.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 935da798..f06b1eda 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,6 +1,7 @@ [alias] xtask = "run --package xtask --" ubench = "bench --package opte-bench --bench userland --profile release-lto --" +mbench = "bench --package opte-bench --bench multicast --profile release-lto --" kbench = "bench --package opte-bench --bench xde --" [env] diff --git a/.github/buildomat/jobs/bench.sh b/.github/buildomat/jobs/bench.sh index abd922c5..b3a8cc33 100644 --- a/.github/buildomat/jobs/bench.sh +++ b/.github/buildomat/jobs/bench.sh @@ -114,6 +114,7 @@ pfexec add_drv xde banner "bench" cargo kbench local cargo ubench +cargo mbench cp -r target/criterion $OUT_DIR cp -r target/xde-bench $OUT_DIR diff --git a/.github/buildomat/jobs/test.sh b/.github/buildomat/jobs/test.sh index a389dc38..1d127abd 100755 --- a/.github/buildomat/jobs/test.sh +++ b/.github/buildomat/jobs/test.sh @@ -95,6 +95,9 @@ pfexec /input/xde/work/test/multicast_multi_sub --nocapture --test-threads=1 pfexec chmod +x /input/xde/work/test/multicast_validation pfexec /input/xde/work/test/multicast_validation --nocapture --test-threads=1 +pfexec chmod +x /input/xde/work/test/multicast_source_filter +pfexec /input/xde/work/test/multicast_source_filter --nocapture --test-threads=1 + banner "teardown" # Ensure full driver teardown is exercised after tests complete pfexec rem_drv xde diff --git a/.github/buildomat/jobs/xde.sh b/.github/buildomat/jobs/xde.sh index 82baf11c..714ef4bd 100755 --- a/.github/buildomat/jobs/xde.sh +++ b/.github/buildomat/jobs/xde.sh @@ -17,6 +17,7 @@ #: "=/work/test/multicast_rx", #: "=/work/test/multicast_multi_sub", #: "=/work/test/multicast_validation", +#: "=/work/test/multicast_source_filter", #: "=/work/xde.conf", #: ] #: @@ -134,8 +135,14 @@ multicast_validation_test=$( cargo build -q --test multicast_validation --message-format=json |\ jq -r "select(.profile.test == true) | .filenames[]" ) +cargo build --test multicast_source_filter +multicast_source_filter_test=$( + cargo build -q --test multicast_source_filter --message-format=json |\ + jq -r "select(.profile.test == true) | .filenames[]" +) mkdir -p /work/test cp $loopback_test /work/test/loopback cp $multicast_rx_test /work/test/multicast_rx cp $multicast_multi_sub_test /work/test/multicast_multi_sub cp $multicast_validation_test /work/test/multicast_validation +cp $multicast_source_filter_test /work/test/multicast_source_filter diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 2124141e..721a2b33 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -41,3 +41,7 @@ harness = false [[bench]] name = "xde" harness = false + +[[bench]] +name = "multicast" +harness = false diff --git a/bench/benches/multicast.rs b/bench/benches/multicast.rs new file mode 100644 index 00000000..3bb21330 --- /dev/null +++ b/bench/benches/multicast.rs @@ -0,0 +1,134 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2026 Oxide Computer Company + +//! Multicast microbenchmarks. + +use criterion::BenchmarkId; +use criterion::Criterion; +use criterion::Throughput; +use criterion::criterion_group; +use criterion::criterion_main; +use oxide_vpc::api::FilterMode; +use oxide_vpc::api::IpAddr; +use oxide_vpc::api::Ipv4Addr; +use oxide_vpc::api::Ipv6Addr; +use oxide_vpc::api::SourceFilter; +use std::collections::BTreeSet; +use std::hint::black_box; + +/// Generate a source IP address for filter testing (10.0.0.x). +/// These are unicast source addresses, not multicast group destinations. +fn make_src_v4(i: u32) -> IpAddr { + IpAddr::Ip4(Ipv4Addr::from(0x0a000000u32 + i)) +} + +/// Generate a source IP address for filter testing (fd00::x). +/// These are unicast source addresses, not multicast group destinations. +fn make_src_v6(i: u32) -> IpAddr { + let mut bytes = [0u8; 16]; + bytes[0..4].copy_from_slice(&[0xfd, 0x00, 0x00, 0x00]); + bytes[12..16].copy_from_slice(&i.to_be_bytes()); + IpAddr::Ip6(Ipv6Addr::from(bytes)) +} + +/// Benchmark [`SourceFilter::allows`] for various filter configurations. +fn source_filter_allows(c: &mut Criterion) { + let mut group = c.benchmark_group("source_filter/allows"); + group.throughput(Throughput::Elements(1)); + + let src_v4 = make_src_v4(100); // Not in any source list + let src_v6 = make_src_v6(100); + + // Fast path: EXCLUDE() with empty sources (*, G) + let filter_any = SourceFilter::default(); + group.bench_function("exclude_empty_v4", |b| { + b.iter(|| black_box(filter_any.allows(black_box(src_v4)))) + }); + group.bench_function("exclude_empty_v6", |b| { + b.iter(|| black_box(filter_any.allows(black_box(src_v6)))) + }); + + // EXCLUDE with sources: "Miss" case where source is not in exclusion list + for size in [1, 5, 10, 50, 100] { + let sources_v4: BTreeSet<_> = (0..size).map(make_src_v4).collect(); + let filter_v4 = + SourceFilter { mode: FilterMode::Exclude, sources: sources_v4 }; + group.bench_with_input( + BenchmarkId::new("exclude_miss_v4", size), + &filter_v4, + |b, f| b.iter(|| black_box(f.allows(black_box(src_v4)))), + ); + let src_in_list_v4 = make_src_v4(0); + group.bench_with_input( + BenchmarkId::new("exclude_hit_v4", size), + &filter_v4, + |b, f| b.iter(|| black_box(f.allows(black_box(src_in_list_v4)))), + ); + + let sources_v6: BTreeSet<_> = (0..size).map(make_src_v6).collect(); + let filter_v6 = + SourceFilter { mode: FilterMode::Exclude, sources: sources_v6 }; + group.bench_with_input( + BenchmarkId::new("exclude_miss_v6", size), + &filter_v6, + |b, f| b.iter(|| black_box(f.allows(black_box(src_v6)))), + ); + let src_in_list_v6 = make_src_v6(0); + group.bench_with_input( + BenchmarkId::new("exclude_hit_v6", size), + &filter_v6, + |b, f| b.iter(|| black_box(f.allows(black_box(src_in_list_v6)))), + ); + } + + // INCLUDE with sources: "Hit" case where source is in inclusion list + for size in [1, 5, 10, 50, 100] { + let sources_v4: BTreeSet<_> = (0..size).map(make_src_v4).collect(); + let filter_v4 = + SourceFilter { mode: FilterMode::Include, sources: sources_v4 }; + let src_in_list_v4 = make_src_v4(0); + group.bench_with_input( + BenchmarkId::new("include_hit_v4", size), + &filter_v4, + |b, f| b.iter(|| black_box(f.allows(black_box(src_in_list_v4)))), + ); + group.bench_with_input( + BenchmarkId::new("include_miss_v4", size), + &filter_v4, + |b, f| b.iter(|| black_box(f.allows(black_box(src_v4)))), + ); + + let sources_v6: BTreeSet<_> = (0..size).map(make_src_v6).collect(); + let filter_v6 = + SourceFilter { mode: FilterMode::Include, sources: sources_v6 }; + let src_in_list_v6 = make_src_v6(0); + group.bench_with_input( + BenchmarkId::new("include_hit_v6", size), + &filter_v6, + |b, f| b.iter(|| black_box(f.allows(black_box(src_in_list_v6)))), + ); + group.bench_with_input( + BenchmarkId::new("include_miss_v6", size), + &filter_v6, + |b, f| b.iter(|| black_box(f.allows(black_box(src_v6)))), + ); + } + + // INCLUDE() empty, rejecting all + let filter_none = + SourceFilter { mode: FilterMode::Include, sources: BTreeSet::new() }; + group.bench_function("include_empty_v4", |b| { + b.iter(|| black_box(filter_none.allows(black_box(src_v4)))) + }); + group.bench_function("include_empty_v6", |b| { + b.iter(|| black_box(filter_none.allows(black_box(src_v6)))) + }); + + group.finish(); +} + +criterion_group!(benches, source_filter_allows); +criterion_main!(benches); diff --git a/bin/opteadm/src/bin/opteadm.rs b/bin/opteadm/src/bin/opteadm.rs index fb4334db..74bc5962 100644 --- a/bin/opteadm/src/bin/opteadm.rs +++ b/bin/opteadm/src/bin/opteadm.rs @@ -43,6 +43,7 @@ use oxide_vpc::api::FirewallRule; use oxide_vpc::api::IpCfg; use oxide_vpc::api::Ipv4Cfg; use oxide_vpc::api::Ipv6Cfg; +use oxide_vpc::api::McastForwardingNextHop; use oxide_vpc::api::McastSubscribeReq; use oxide_vpc::api::McastUnsubscribeAllReq; use oxide_vpc::api::McastUnsubscribeReq; @@ -64,6 +65,7 @@ use oxide_vpc::api::SetMcast2PhysReq; use oxide_vpc::api::SetMcastForwardingReq; use oxide_vpc::api::SetVirt2BoundaryReq; use oxide_vpc::api::SetVirt2PhysReq; +use oxide_vpc::api::SourceFilter; use oxide_vpc::api::TunnelEndpoint; use oxide_vpc::api::VpcCfg; use oxide_vpc::print::print_mcast_fwd; @@ -926,7 +928,11 @@ fn main() -> anyhow::Result<()> { let next_hop_addr = NextHopV6::new(next_hop, next_hop_vni); let req = SetMcastForwardingReq { underlay, - next_hops: vec![(next_hop_addr, replication)], + next_hops: vec![McastForwardingNextHop { + next_hop: next_hop_addr, + replication, + source_filter: SourceFilter::default(), + }], }; hdl.set_mcast_fwd(&req)?; } @@ -945,7 +951,11 @@ fn main() -> anyhow::Result<()> { } Command::McastSubscribe { port, group } => { - let req = McastSubscribeReq { port_name: port, group }; + let req = McastSubscribeReq { + port_name: port, + group, + filter: SourceFilter::default(), + }; hdl.mcast_subscribe(&req)?; } diff --git a/crates/opte-api/src/ip.rs b/crates/opte-api/src/ip.rs index b505c7a5..f80c2514 100644 --- a/crates/opte-api/src/ip.rs +++ b/crates/opte-api/src/ip.rs @@ -308,6 +308,7 @@ pub enum IpAddr { } impl IpAddr { + /// Returns true if this is a multicast address. pub const fn is_multicast(&self) -> bool { match self { IpAddr::Ip4(v4) => v4.is_multicast(), @@ -315,6 +316,38 @@ impl IpAddr { } } + /// Returns true if this is the unspecified address. + pub const fn is_unspecified(&self) -> bool { + match self { + IpAddr::Ip4(v4) => v4.is_unspecified(), + IpAddr::Ip6(v6) => v6.is_unspecified(), + } + } + + /// Returns true if this is a loopback address. + pub const fn is_loopback(&self) -> bool { + match self { + IpAddr::Ip4(v4) => v4.is_loopback(), + IpAddr::Ip6(v6) => v6.is_loopback(), + } + } + + /// Returns true if this is a link-local address. + pub const fn is_link_local(&self) -> bool { + match self { + IpAddr::Ip4(v4) => v4.is_link_local(), + IpAddr::Ip6(v6) => v6.is_link_local(), + } + } + + /// Returns true if this is a broadcast address (IPv4 only, IPv6 has no broadcast). + pub const fn is_broadcast(&self) -> bool { + match self { + IpAddr::Ip4(v4) => v4.is_broadcast(), + IpAddr::Ip6(_) => false, + } + } + /// Return the multicast MAC address associated with this multicast IP address. /// If the IP address is not multicast, None will be returned. /// @@ -410,6 +443,7 @@ pub struct Ipv4Addr { impl Ipv4Addr { pub const ANY_ADDR: Self = Self { inner: [0; 4] }; + pub const LOCALHOST: Self = Self { inner: [127, 0, 0, 1] }; pub const LOCAL_BCAST: Self = Self { inner: [255; 4] }; /// Return the bytes of the address. @@ -455,10 +489,31 @@ impl Ipv4Addr { u32::from_be_bytes(self.bytes()).to_be() } + /// Returns true if this is a multicast address (224.0.0.0/4). pub const fn is_multicast(&self) -> bool { matches!(self.inner[0], 224..240) } + /// Returns true if this is the unspecified address (0.0.0.0). + pub const fn is_unspecified(&self) -> bool { + matches!(self.inner, [0, 0, 0, 0]) + } + + /// Returns true if this is a loopback address (127.0.0.0/8). + pub const fn is_loopback(&self) -> bool { + self.inner[0] == 127 + } + + /// Returns true if this is the broadcast address (255.255.255.255). + pub const fn is_broadcast(&self) -> bool { + matches!(self.inner, [255, 255, 255, 255]) + } + + /// Returns true if this is a link-local address (169.254.0.0/16). + pub const fn is_link_local(&self) -> bool { + self.inner[0] == 169 && self.inner[1] == 254 + } + /// Return the multicast MAC address associated with this multicast IPv4 /// address. If the IPv4 address is not multicast, None will be returned. /// @@ -621,6 +676,9 @@ impl Ipv6Addr { /// The unspecified IPv6 address, i.e., `::` or all zeros. pub const ANY_ADDR: Self = Self { inner: [0; 16] }; + /// The loopback address, i.e., `::1`. + pub const LOCALHOST: Self = Self::from_const([0, 0, 0, 0, 0, 0, 0, 1]); + /// The All-Routers multicast address, used in the Neighbor Discovery /// Protocol. pub const ALL_ROUTERS: Self = @@ -694,11 +752,26 @@ impl Ipv6Addr { &self.inner[..EXPECTED.len()] == EXPECTED } - /// Return `true` if this is a multicast IPv6 address, and `false` otherwise + /// Returns true if this is a multicast address (ff00::/8). pub const fn is_multicast(&self) -> bool { self.inner[0] == 0xFF } + /// Returns true if this is the unspecified address (::). + pub const fn is_unspecified(&self) -> bool { + matches!(self.inner, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + } + + /// Returns true if this is the loopback address (::1). + pub const fn is_loopback(&self) -> bool { + matches!(self.inner, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]) + } + + /// Returns true if this is a link-local address (fe80::/10). + pub const fn is_link_local(&self) -> bool { + self.inner[0] == 0xfe && (self.inner[1] & 0xc0) == 0x80 + } + /// Return `true` if this is a multicast IPv6 address with the ff04::/16 prefix /// (admin-local scope with flags=0) as used by Omicron for underlay multicast. /// diff --git a/crates/opte-api/src/lib.rs b/crates/opte-api/src/lib.rs index 558a6e41..3fb2fac1 100644 --- a/crates/opte-api/src/lib.rs +++ b/crates/opte-api/src/lib.rs @@ -51,7 +51,7 @@ pub use ulp::*; /// /// We rely on CI and the check-api-version.sh script to verify that /// this number is incremented anytime the oxide-api code changes. -pub const API_VERSION: u64 = 38; +pub const API_VERSION: u64 = 39; /// Major version of the OPTE package. pub const MAJOR_VERSION: u64 = 0; diff --git a/dtrace/opte-mcast-delivery.d b/dtrace/opte-mcast-delivery.d index 7ed9d3c6..babd5a1d 100644 --- a/dtrace/opte-mcast-delivery.d +++ b/dtrace/opte-mcast-delivery.d @@ -12,11 +12,12 @@ #include "common.h" /* Local print formats (avoid colliding with common.h FLOW_FMT macros) */ -#define M_HDR_FMT "%-12s %-6s %-39s %-39s\n" -#define M_LINE_FMT "%-12s %-6u %-39s %-39s\n" -#define M_FWD_HDR_FMT "%-12s %-6s %-39s %-39s\n" +#define M_HDR_FMT "%-12s %-6s %-39s %-39s\n" +#define M_LINE_FMT "%-12s %-6u %-39s %-39s\n" +#define M_FWD_HDR_FMT "%-12s %-6s %-39s %-39s\n" #define M_FWD_LINE_FMT "%-12s %-6u %-39s %-39s\n" -#define DBG_LINE_FMT "%-20s %-30s %s\n" +#define M_FILT_FMT "%-12s %-6u %-39s %-39s src=%-15s mode=%s\n" +#define DBG_LINE_FMT "%-20s %-30s %s\n" /* Macro to reduce code duplication for group address formatting */ #define MCAST_GROUP_STR(af, ptr) \ @@ -390,6 +391,84 @@ mcast-rx-pullup-fail printf(M_LINE_FMT, "RX_FAIL", 0, "-", "-"); } +mcast-tx-no-inner-ip { + /* arg0=mblk_addr */ + + /* Always track aggregations */ + @by_event["TX_NO_IP"] = count(); +} + +mcast-tx-no-inner-ip +/!suppress_output/ +{ + printf(M_LINE_FMT, "TX_NO_IP", 0, "-", "-"); +} + +mcast-source-filtered { + /* arg0=af, arg1=inner_src, arg2=inner_dst, arg3=vni, arg4=port, arg5=filter_mode */ + this->af = arg0; + this->src_ptr = arg1; + this->dst_ptr = arg2; + this->vni = arg3; + this->port = stringof(arg4); + this->filter_mode = arg5; + this->src_str = MCAST_GROUP_STR(this->af, this->src_ptr); + this->dst_str = MCAST_GROUP_STR(this->af, this->dst_ptr); + this->mode_str = this->filter_mode == 0 ? "INCLUDE" : "EXCLUDE"; + + /* Always track aggregations */ + @by_event["FILTERED"] = count(); + @by_vni["FILTERED", this->vni] = count(); + @by_port[this->port] = count(); + @filtered_by_mode[this->mode_str] = count(); +} + +mcast-source-filtered +/!suppress_output/ +{ + if (num >= HEADER_REPRINT_INTERVAL) { + printf(M_HDR_FMT, "EVENT", "VNI", "GROUP", "PORT/NEXTHOP"); + num = 0; + } + + printf(M_FILT_FMT, "FILTERED", this->vni, this->dst_str, this->port, + this->src_str, this->mode_str); + num++; +} + +mcast-fwd-source-filtered { + /* arg0=af, arg1=inner_src, arg2=inner_dst, arg3=vni, arg4=next_hop, arg5=filter_mode */ + this->af = arg0; + this->src_ptr = arg1; + this->dst_ptr = arg2; + this->vni = arg3; + this->next_hop = (in6_addr_t *)arg4; + this->filter_mode = arg5; + this->src_str = MCAST_GROUP_STR(this->af, this->src_ptr); + this->dst_str = MCAST_GROUP_STR(this->af, this->dst_ptr); + this->next_hop_str = inet_ntoa6(this->next_hop); + this->mode_str = this->filter_mode == 0 ? "INCLUDE" : "EXCLUDE"; + + /* Always track aggregations */ + @by_event["FWD_FILT"] = count(); + @by_vni["FWD_FILT", this->vni] = count(); + @by_nexthop_unicast[this->next_hop_str] = count(); + @filtered_by_mode[this->mode_str] = count(); +} + +mcast-fwd-source-filtered +/!suppress_output/ +{ + if (num >= HEADER_REPRINT_INTERVAL) { + printf(M_FWD_HDR_FMT, "EVENT", "VNI", "UNDERLAY_MCAST", "ROUTE_UNICAST"); + num = 0; + } + + printf(M_FILT_FMT, "FWD_FILT", this->vni, this->dst_str, this->next_hop_str, + this->src_str, this->mode_str); + num++; +} + mcast-no-fwd-entry { /* arg0=underlay_ptr, arg1=vni */ this->underlay = (in6_addr_t *)arg0; @@ -422,6 +501,8 @@ END printa(@by_port); printf("\nForwarding by unicast next hop (routing address):\n"); printa(@by_nexthop_unicast); + printf("\nSource filtering by mode:\n"); + printa(@filtered_by_mode); printf("\nConfig ops:\n"); printa(@cfg_counts); } diff --git a/lib/oxide-vpc/src/api.rs b/lib/oxide-vpc/src/api.rs index 8c67ec25..7af7f746 100644 --- a/lib/oxide-vpc/src/api.rs +++ b/lib/oxide-vpc/src/api.rs @@ -758,14 +758,29 @@ pub enum DelRouterEntryResp { /// currently. The VNI in NextHopV6 must be 77 - other values are rejected. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SetMcastForwardingReq { - /// The underlay IPv6 multicast address (outer IPv6 dst in transmitted packets) - /// Must be admin-scoped ff04::/16 + /// The underlay IPv6 multicast address (outer IPv6 dst in transmitted packets). + /// Must be admin-scoped ff04::/16. pub underlay: MulticastUnderlay, - /// Switch endpoints and Tx-only replication instructions. - /// Each NextHopV6.addr is the unicast IPv6 of a switch (for routing). - /// The Replication is a Tx-only instruction indicating which port groups - /// the switch should use. - pub next_hops: Vec<(NextHopV6, Replication)>, + /// Switch endpoints with replication instructions and aggregated source filters. + pub next_hops: Vec, +} + +/// A forwarding entry for a single next hop with its aggregated source filter. +/// +/// The source filter is the union of all subscriber filters on the destination +/// sled. Omicron computes this aggregation. OPTE checks the filter before +/// forwarding to avoid sending packets to sleds where all subscribers would +/// filter them. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct McastForwardingNextHop { + /// The unicast IPv6 address of the switch endpoint (for routing). + pub next_hop: NextHopV6, + /// Tx-only instruction for switch port group replication. + pub replication: Replication, + /// Aggregated source filter for this destination sled. + /// Default (Exclude with empty sources) means accept any source. + #[serde(default)] + pub source_filter: SourceFilter, } /// Clear multicast forwarding entries for an underlay multicast group. @@ -789,8 +804,8 @@ impl CmdOk for DumpMcastForwardingResp {} pub struct McastForwardingEntry { /// The underlay IPv6 multicast address (admin-scoped ff04::/16) pub underlay: MulticastUnderlay, - /// The next hops (underlay IPv6 addresses) with Tx-only replication instructions - pub next_hops: Vec<(NextHopV6, Replication)>, + /// The next hops with replication instructions and source filters + pub next_hops: Vec, } impl opte::api::cmd::CmdOk for DelRouterEntryResp {} @@ -808,17 +823,100 @@ impl CmdOk for DumpMcastSubscriptionsResp {} pub struct McastSubscriptionEntry { /// The underlay IPv6 multicast address (admin-scoped ff04::/16, subscription key) pub underlay: MulticastUnderlay, - /// Port names subscribed to this group on this sled - pub ports: Vec, + /// Port subscriptions with their source filters + pub subscribers: Vec, +} + +impl McastSubscriptionEntry { + /// Returns true if the given port name is subscribed to this group. + pub fn has_port(&self, name: &str) -> bool { + self.subscribers.iter().any(|s| s.port == name) + } +} + +/// A port's subscription to a multicast group with its source filter. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct McastSubscriberEntry { + /// The port name + pub port: String, + /// The source filter for this port's subscription + pub filter: SourceFilter, +} + +/// Filter mode for multicast source filtering per IGMPv3/MLDv2 semantics. +/// +/// Determines how the source list is interpreted for a (port, group) subscription. +/// +/// See [RFD 488] for Oxide multicast architecture and [RFC 3376]/[RFC 3810] +/// for protocol details. +/// +/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/488 +/// [RFC 3376]: https://www.rfc-editor.org/rfc/rfc3376 +/// [RFC 3810]: https://www.rfc-editor.org/rfc/rfc3810 +#[derive( + Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq, +)] +#[repr(u8)] +pub enum FilterMode { + /// Accept packets only from sources in the list. + /// Empty list means no sources are accepted. + Include = 0, + /// Accept packets from any source except those in the list. + /// Empty list means all sources are accepted (*, G). + #[default] + Exclude = 1, +} + +/// Per-member source filter for multicast subscriptions. +/// +/// Each port subscribed to a multicast group can have its own source filter, +/// allowing fine-grained control over which sources are accepted: +/// - `EXCLUDE()`: accept any source (*, G) +/// - `EXCLUDE(S1, S2)`: accept any except listed +/// - `INCLUDE(S1, S2)`: accept only listed sources +/// - `INCLUDE()`: accept nothing +#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +pub struct SourceFilter { + pub mode: FilterMode, + pub sources: BTreeSet, +} + +impl SourceFilter { + /// Returns true if this filter allows packets from the given source. + pub fn allows(&self, src: IpAddr) -> bool { + match self.mode { + FilterMode::Include => self.sources.contains(&src), + FilterMode::Exclude => { + // Fast path for (*, G) subscriptions: EXCLUDE() with empty + // sources is the default and most common case encountered. + // Checking is_empty() avoids the BTreeSet lookup on every + // packet. + self.sources.is_empty() || !self.sources.contains(&src) + } + } + } + + /// Returns true if this filter accepts any source (*, G). + pub fn accepts_any(&self) -> bool { + matches!(self.mode, FilterMode::Exclude) && self.sources.is_empty() + } } /// Subscribe a port to a multicast group. +/// +/// The group address must be a valid IP multicast address (IPv4 in +/// 224.0.0.0/4 or IPv6 in ff00::/8). Non-multicast addresses are +/// rejected. Non-IP multicast frames (L2-only) are not delivered. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct McastSubscribeReq { /// The port name to subscribe pub port_name: String, /// The multicast group address pub group: IpAddr, + /// Source filter for this subscription. Defaults to Exclude with empty + /// sources (accept any source) if not specified. + #[serde(default)] + pub filter: SourceFilter, } /// Unsubscribe a port from a multicast group. diff --git a/lib/oxide-vpc/src/engine/geneve.rs b/lib/oxide-vpc/src/engine/geneve.rs index f26a2fd1..55bf179f 100644 --- a/lib/oxide-vpc/src/engine/geneve.rs +++ b/lib/oxide-vpc/src/engine/geneve.rs @@ -532,7 +532,7 @@ mod test { // Build a minimal packet with just one Multicast option #[rustfmt::skip] - let buf = vec![ + let buf = [ // UDP source 0x1E, 0x61, // UDP dest diff --git a/lib/oxide-vpc/src/print.rs b/lib/oxide-vpc/src/print.rs index 5a014702..e29295db 100644 --- a/lib/oxide-vpc/src/print.rs +++ b/lib/oxide-vpc/src/print.rs @@ -13,9 +13,11 @@ use crate::api::DumpMcastForwardingResp; use crate::api::DumpMcastSubscriptionsResp; use crate::api::DumpVirt2BoundaryResp; use crate::api::DumpVirt2PhysResp; +use crate::api::FilterMode; use crate::api::GuestPhysAddr; use crate::api::Ipv4Addr; use crate::api::Ipv6Addr; +use crate::api::SourceFilter; use opte::api::IpCidr; use opte::api::Vni; use opte::print::*; @@ -140,7 +142,7 @@ fn print_v2p_ip6( /// Print the header for the [`print_mcast_fwd()`] output. fn print_mcast_fwd_header(t: &mut impl Write) -> std::io::Result<()> { - writeln!(t, "GROUP IP\tUNDERLAY IP\tVNI\tREPLICATION") + writeln!(t, "GROUP IP\tUNDERLAY IP\tVNI\tREPLICATION\tFILTER") } /// Print a [`DumpMcastForwardingResp`]. @@ -161,12 +163,17 @@ pub fn print_mcast_fwd_into( write_hr(&mut t)?; for entry in &resp.entries { - for (next_hop, replication) in &entry.next_hops { - writeln!( + for hop in &entry.next_hops { + write!( t, - "{}\t{}\t{}\t{replication:?}", - entry.underlay, next_hop.addr, next_hop.vni + "{}\t{}\t{}\t{:?}\t", + entry.underlay, + hop.next_hop.addr, + hop.next_hop.vni, + hop.replication )?; + write_source_filter(&mut t, &hop.source_filter)?; + writeln!(t)?; } } writeln!(t)?; @@ -175,7 +182,7 @@ pub fn print_mcast_fwd_into( /// Print the header for the [`print_mcast_subs()`] output. fn print_mcast_subs_header(t: &mut impl Write) -> std::io::Result<()> { - writeln!(t, "UNDERLAY GROUP\tSUBSCRIBED PORTS") + writeln!(t, "UNDERLAY GROUP\tPORT\tFILTER") } /// Print a [`DumpMcastSubscriptionsResp`]. @@ -198,9 +205,50 @@ pub fn print_mcast_subs_into( write_hr(&mut t)?; for entry in &resp.entries { - let ports = entry.ports.join(", "); - writeln!(t, "{}\t{ports}", entry.underlay)?; + for sub in &entry.subscribers { + write!(t, "{}\t{}\t", entry.underlay, sub.port)?; + write_source_filter(&mut t, &sub.filter)?; + writeln!(t)?; + } } writeln!(t)?; t.flush() } + +/// Write a source filter to the given writer. +/// +/// Uses notation inspired by RFC 3376 (IGMPv3) and RFC 3810 (MLDv2): +/// - `INCLUDE(S1, S2)` - accept only from listed sources +/// - `EXCLUDE(S1, S2)` - accept from all except listed sources +/// - `EXCLUDE()` - accept any source (*, G) +/// - `INCLUDE()` - accept nothing +/// +/// See (IGMPv3) and +/// (MLDv2). +fn write_source_filter( + t: &mut impl Write, + filter: &SourceFilter, +) -> std::io::Result<()> { + let mode = match filter.mode { + FilterMode::Include => "INCLUDE", + FilterMode::Exclude => "EXCLUDE", + }; + if filter.sources.is_empty() { + if matches!(filter.mode, FilterMode::Exclude) { + write!(t, "{mode}() (any)") + } else { + write!(t, "{mode}() (none)") + } + } else { + write!(t, "{mode}(")?; + let mut first = true; + for source in &filter.sources { + if !first { + write!(t, ", ")?; + } + write!(t, "{source}")?; + first = false; + } + write!(t, ")") + } +} diff --git a/xde-tests/src/lib.rs b/xde-tests/src/lib.rs index f97a99fe..69a77a1d 100644 --- a/xde-tests/src/lib.rs +++ b/xde-tests/src/lib.rs @@ -28,6 +28,7 @@ use oxide_vpc::api::Ipv4Cfg; use oxide_vpc::api::Ipv6Addr; use oxide_vpc::api::Ipv6Cfg; use oxide_vpc::api::MacAddr; +use oxide_vpc::api::McastForwardingNextHop; use oxide_vpc::api::McastSubscribeReq; use oxide_vpc::api::McastUnsubscribeReq; use oxide_vpc::api::MulticastUnderlay; @@ -40,6 +41,7 @@ use oxide_vpc::api::SNat6Cfg; use oxide_vpc::api::SetMcast2PhysReq; use oxide_vpc::api::SetMcastForwardingReq; use oxide_vpc::api::SetVirt2PhysReq; +use oxide_vpc::api::SourceFilter; use oxide_vpc::api::Vni; use oxide_vpc::api::VpcCfg; use rand::Rng; @@ -429,13 +431,24 @@ impl OptePort { &self.name } - /// Subscribe this port to a multicast group. + /// Subscribe this port to a multicast group (accepting any source). /// Automatically tracks the subscription for cleanup on drop. pub fn subscribe_multicast(&self, group: IpAddr) -> Result<()> { + self.subscribe_multicast_filtered(group, SourceFilter::default()) + } + + /// Subscribe this port to a multicast group with source filtering. + /// Automatically tracks the subscription for cleanup on drop. + pub fn subscribe_multicast_filtered( + &self, + group: IpAddr, + filter: SourceFilter, + ) -> Result<()> { let adm = OpteHdl::open()?; adm.mcast_subscribe(&McastSubscribeReq { port_name: self.name.clone(), group, + filter, })?; self.mcast_subscriptions.borrow_mut().push(group); Ok(()) @@ -626,7 +639,7 @@ impl SnoopGuard { /// for negative assertions (verifying that traffic is not flowing). /// /// # Example - /// ```no_run + /// ```ignore /// let mut snoop = SnoopGuard::start("xde_test_sim1", "udp port 9999")?; /// // ... perform operations that should NOT generate traffic ... /// snoop.assert_no_packet("on unsubscribed node B"); @@ -646,7 +659,7 @@ impl SnoopGuard { /// assertions (typically verifying that traffic is flowing). /// /// # Example - /// ```no_run + /// ```ignore /// let mut snoop = SnoopGuard::start("xde_test_sim1", "udp port 9999")?; /// // ... perform operations that should generate traffic ... /// let output = snoop.assert_packet("on subscribed node B"); @@ -727,10 +740,7 @@ impl MulticastGroup { /// Set multicast forwarding entries for this group. pub fn set_forwarding( &self, - next_hops: Vec<( - oxide_vpc::api::NextHopV6, - oxide_vpc::api::Replication, - )>, + next_hops: Vec, ) -> Result<()> { let hdl = OpteHdl::open()?; hdl.set_mcast_fwd(&SetMcastForwardingReq { @@ -806,6 +816,7 @@ pub struct Topology { /// devices as underlay links. These simnet devices are connected to each /// other. /// +/// ```text /// zone a /// #============# /// | *--------* | *-------* @@ -819,6 +830,7 @@ pub struct Topology { /// | *--------* | *-------* /// #============# /// zone b +/// ``` /// /// The following system of overlay/underlay routes is set up /// @@ -901,16 +913,32 @@ pub fn two_node_topology() -> Result { let zfs = Arc::new(Zfs::new("opte2node")?); // Create a pair of zones to simulate our VM instances. - println!("start zone {zone_a_name}"); - let a = OpteZone::new(zone_a_name, &zfs, &[&opte0.name], brand)?; - println!("start zone {zone_b_name}"); - let b = OpteZone::new(zone_b_name, &zfs, &[&opte1.name], brand)?; - - println!("setup zone {zone_a_name}"); - a.setup(&opte0.name, opte0.ip())?; - - println!("setup zone {zone_b_name}"); - b.setup(&opte1.name, opte1.ip())?; + // Boot zones in parallel to reduce setup time. + // + // Thread-safety: each zone has a unique name and dataset (zfs creates + // unique child datasets per zone). zoneadm operations on different + // zones are independent, and each zone uses its own OPTE port. + println!("start zones {zone_a_name}, {zone_b_name}"); + let (a, b) = std::thread::scope(|s| { + let handle_a = + s.spawn(|| OpteZone::new(zone_a_name, &zfs, &[&opte0.name], brand)); + let handle_b = + s.spawn(|| OpteZone::new(zone_b_name, &zfs, &[&opte1.name], brand)); + (handle_a.join().unwrap(), handle_b.join().unwrap()) + }); + let a = a?; + let b = b?; + + // Setup zones in parallel + println!("setup zones {zone_a_name}, {zone_b_name}"); + let (name0, ip0) = (opte0.name.clone(), opte0.ip()); + let (name1, ip1) = (opte1.name.clone(), opte1.ip()); + std::thread::scope(|s| { + let handle_a = s.spawn(|| a.setup(&name0, ip0)); + let handle_b = s.spawn(|| b.setup(&name1, ip1)); + handle_a.join().unwrap()?; + handle_b.join().unwrap() + })?; Ok(Topology { nodes: vec![ @@ -977,24 +1005,30 @@ pub fn two_node_topology_dualstack() -> Result { let zfs = Arc::new(Zfs::new("opte2node")?); - println!("start zone {zone_a_name}"); - let a = OpteZone::new(zone_a_name, &zfs, &[&opte0.name], brand)?; - println!("start zone {zone_b_name}"); - let b = OpteZone::new(zone_b_name, &zfs, &[&opte1.name], brand)?; - - println!("setup zone {zone_a_name}"); - a.setup_dualstack( - &opte0.name, - opte0.ip(), - opte0.ipv6().expect("dualstack port must have IPv6"), - )?; - - println!("setup zone {zone_b_name}"); - b.setup_dualstack( - &opte1.name, - opte1.ip(), - opte1.ipv6().expect("dualstack port must have IPv6"), - )?; + // Boot zones in parallel to reduce setup time. + println!("start zones {zone_a_name}, {zone_b_name}"); + let (a, b) = std::thread::scope(|s| { + let handle_a = + s.spawn(|| OpteZone::new(zone_a_name, &zfs, &[&opte0.name], brand)); + let handle_b = + s.spawn(|| OpteZone::new(zone_b_name, &zfs, &[&opte1.name], brand)); + (handle_a.join().unwrap(), handle_b.join().unwrap()) + }); + let a = a?; + let b = b?; + + // Setup zones in parallel + println!("setup zones {zone_a_name}, {zone_b_name}"); + let (name0, ip0, ip0v6) = + (opte0.name.clone(), opte0.ip(), opte0.ipv6().unwrap()); + let (name1, ip1, ip1v6) = + (opte1.name.clone(), opte1.ip(), opte1.ipv6().unwrap()); + std::thread::scope(|s| { + let handle_a = s.spawn(|| a.setup_dualstack(&name0, ip0, ip0v6)); + let handle_b = s.spawn(|| b.setup_dualstack(&name1, ip1, ip1v6)); + handle_a.join().unwrap()?; + handle_b.join().unwrap() + })?; Ok(Topology { nodes: vec![ @@ -1066,21 +1100,159 @@ pub fn three_node_topology() -> Result { let zfs = Arc::new(Zfs::new("opte3node")?); - println!("start zone {zone_a_name}"); - let a = OpteZone::new(zone_a_name, &zfs, &[&opte0.name], brand)?; - println!("start zone {zone_b_name}"); - let b = OpteZone::new(zone_b_name, &zfs, &[&opte1.name], brand)?; - println!("start zone {zone_c_name}"); - let c = OpteZone::new(zone_c_name, &zfs, &[&opte2.name], brand)?; + // Boot zones in parallel to reduce setup time. + println!("start zones {zone_a_name}, {zone_b_name}, {zone_c_name}"); + let (a, b, c) = std::thread::scope(|s| { + let handle_a = + s.spawn(|| OpteZone::new(zone_a_name, &zfs, &[&opte0.name], brand)); + let handle_b = + s.spawn(|| OpteZone::new(zone_b_name, &zfs, &[&opte1.name], brand)); + let handle_c = + s.spawn(|| OpteZone::new(zone_c_name, &zfs, &[&opte2.name], brand)); + ( + handle_a.join().unwrap(), + handle_b.join().unwrap(), + handle_c.join().unwrap(), + ) + }); + let a = a?; + let b = b?; + let c = c?; + + // Setup zones in parallel + println!("setup zones {zone_a_name}, {zone_b_name}, {zone_c_name}"); + let (name0, ip0) = (opte0.name.clone(), opte0.ip()); + let (name1, ip1) = (opte1.name.clone(), opte1.ip()); + let (name2, ip2) = (opte2.name.clone(), opte2.ip()); + std::thread::scope(|s| { + let handle_a = s.spawn(|| a.setup(&name0, ip0)); + let handle_b = s.spawn(|| b.setup(&name1, ip1)); + let handle_c = s.spawn(|| c.setup(&name2, ip2)); + handle_a.join().unwrap()?; + handle_b.join().unwrap()?; + handle_c.join().unwrap() + })?; + + Ok(Topology { + nodes: vec![ + TestNode { zone: a, port: opte0 }, + TestNode { zone: b, port: opte1 }, + TestNode { zone: c, port: opte2 }, + ], + null_ports: vec![], + v6_routes: vec![r0, r1, r2], + xde, + lls: vec![ll0, ll1], + vnics: vec![vn0, vn1], + simnet: Some(sim), + zfs, + }) +} + +/// Three-node topology with dual-stack (IPv4 + IPv6) guest addresses. +pub fn three_node_topology_dualstack() -> Result { + let brand = "omicron1"; + let zone_a_name = "a"; + let zone_b_name = "b"; + let zone_c_name = "c"; - println!("setup zone {zone_a_name}"); - a.setup(&opte0.name, opte0.ip())?; + let sim = SimnetLink::new("xde_test_sim0", "xde_test_sim1")?; + let vn0 = Vnic::new("xde_test_vnic0", &sim.end_a)?; + let vn1 = Vnic::new("xde_test_vnic1", &sim.end_b)?; + let ll0 = LinkLocal::new(&vn0.name, "ll")?; + let ll1 = LinkLocal::new(&vn1.name, "ll")?; - println!("setup zone {zone_b_name}"); - b.setup(&opte1.name, opte1.ip())?; + Xde::set_xde_underlay(&vn0.name, &vn1.name)?; + let xde = Xde {}; - println!("setup zone {zone_c_name}"); - c.setup(&opte2.name, opte2.ip())?; + // Set up V2P mappings for three nodes + Xde::set_v2p("10.0.0.1", "a8:40:25:ff:00:01", "fd44::1")?; + Xde::set_v2p("10.0.0.2", "a8:40:25:ff:00:02", "fd77::1")?; + Xde::set_v2p("10.0.0.3", "a8:40:25:ff:00:03", "fd88::1")?; + + // Create three dual-stack OPTE ports + let opte0 = OptePort::new_dualstack( + "opte0", + "10.0.0.1", + "fd00::1", + "a8:40:25:ff:00:01", + "fd44::1", + )?; + opte0.add_router_entry("10.0.0.2")?; + opte0.add_router_entry("10.0.0.3")?; + opte0.fw_allow_all()?; + + let opte1 = OptePort::new_dualstack( + "opte1", + "10.0.0.2", + "fd00::2", + "a8:40:25:ff:00:02", + "fd77::1", + )?; + opte1.add_router_entry("10.0.0.1")?; + opte1.add_router_entry("10.0.0.3")?; + opte1.fw_allow_all()?; + + let opte2 = OptePort::new_dualstack( + "opte2", + "10.0.0.3", + "fd00::3", + "a8:40:25:ff:00:03", + "fd88::1", + )?; + opte2.add_router_entry("10.0.0.1")?; + opte2.add_router_entry("10.0.0.2")?; + opte2.fw_allow_all()?; + + println!("adding underlay route 0"); + let r0 = + RouteV6::new(opte0.underlay_ip(), 64, ll0.ip, Some(vn1.name.clone()))?; + + println!("adding underlay route 1"); + let r1 = + RouteV6::new(opte1.underlay_ip(), 64, ll1.ip, Some(vn0.name.clone()))?; + + println!("adding underlay route 2"); + let r2 = + RouteV6::new(opte2.underlay_ip(), 64, ll1.ip, Some(vn0.name.clone()))?; + + let zfs = Arc::new(Zfs::new("opte3node")?); + + // Boot zones in parallel to reduce setup time. + println!("start zones {zone_a_name}, {zone_b_name}, {zone_c_name}"); + let (a, b, c) = std::thread::scope(|s| { + let handle_a = + s.spawn(|| OpteZone::new(zone_a_name, &zfs, &[&opte0.name], brand)); + let handle_b = + s.spawn(|| OpteZone::new(zone_b_name, &zfs, &[&opte1.name], brand)); + let handle_c = + s.spawn(|| OpteZone::new(zone_c_name, &zfs, &[&opte2.name], brand)); + ( + handle_a.join().unwrap(), + handle_b.join().unwrap(), + handle_c.join().unwrap(), + ) + }); + let a = a?; + let b = b?; + let c = c?; + + // Setup zones in parallel + println!("setup zones {zone_a_name}, {zone_b_name}, {zone_c_name}"); + let (name0, ip0, ip0v6) = + (opte0.name.clone(), opte0.ip(), opte0.ipv6().unwrap()); + let (name1, ip1, ip1v6) = + (opte1.name.clone(), opte1.ip(), opte1.ipv6().unwrap()); + let (name2, ip2, ip2v6) = + (opte2.name.clone(), opte2.ip(), opte2.ipv6().unwrap()); + std::thread::scope(|s| { + let handle_a = s.spawn(|| a.setup_dualstack(&name0, ip0, ip0v6)); + let handle_b = s.spawn(|| b.setup_dualstack(&name1, ip1, ip1v6)); + let handle_c = s.spawn(|| c.setup_dualstack(&name2, ip2, ip2v6)); + handle_a.join().unwrap()?; + handle_b.join().unwrap()?; + handle_c.join().unwrap() + })?; Ok(Topology { nodes: vec![ diff --git a/xde-tests/tests/multicast_multi_nexthop.rs b/xde-tests/tests/multicast_multi_nexthop.rs index 79f86514..4c1f9ffd 100644 --- a/xde-tests/tests/multicast_multi_nexthop.rs +++ b/xde-tests/tests/multicast_multi_nexthop.rs @@ -16,9 +16,11 @@ use opte_test_utils::geneve_verify; use oxide_vpc::api::DEFAULT_MULTICAST_VNI; use oxide_vpc::api::IpCidr; use oxide_vpc::api::Ipv4Addr; +use oxide_vpc::api::McastForwardingNextHop; use oxide_vpc::api::MulticastUnderlay; use oxide_vpc::api::NextHopV6; use oxide_vpc::api::Replication; +use oxide_vpc::api::SourceFilter; use oxide_vpc::api::Vni; use xde_tests::GENEVE_UNDERLAY_FILTER; use xde_tests::IPV4_MULTICAST_CIDR; @@ -56,8 +58,16 @@ fn test_multicast_multi_nexthop_fanout() -> Result<()> { let nexthop2: oxide_vpc::api::Ipv6Addr = "fd77::2".parse().unwrap(); mcast.set_forwarding(vec![ - (NextHopV6::new(nexthop1, vni), Replication::External), - (NextHopV6::new(nexthop2, vni), Replication::Underlay), + McastForwardingNextHop { + next_hop: NextHopV6::new(nexthop1, vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }, + McastForwardingNextHop { + next_hop: NextHopV6::new(nexthop2, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }, ])?; // Allow IPv4 multicast traffic (224.0.0.0/4) via Multicast target @@ -88,10 +98,10 @@ fn test_multicast_multi_nexthop_fanout() -> Result<()> { // Verify External replication next hop is present assert!( - entry.next_hops.iter().any(|(nexthop, rep)| { - *rep == Replication::External - && nexthop.addr == nexthop1 - && nexthop.vni == vni + entry.next_hops.iter().any(|hop| { + hop.replication == Replication::External + && hop.next_hop.addr == nexthop1 + && hop.next_hop.vni == vni }), "expected External replication to {nexthop1:?} in forwarding table; got: {:?}", entry.next_hops @@ -99,10 +109,10 @@ fn test_multicast_multi_nexthop_fanout() -> Result<()> { // Verify Underlay replication next hop is present assert!( - entry.next_hops.iter().any(|(nexthop, rep)| { - *rep == Replication::Underlay - && nexthop.addr == nexthop2 - && nexthop.vni == vni + entry.next_hops.iter().any(|hop| { + hop.replication == Replication::Underlay + && hop.next_hop.addr == nexthop2 + && hop.next_hop.vni == vni }), "expected Underlay replication to {nexthop2:?} in forwarding table; got: {:?}", entry.next_hops diff --git a/xde-tests/tests/multicast_multi_sub.rs b/xde-tests/tests/multicast_multi_sub.rs index 7aed7424..e6a6c7b7 100644 --- a/xde-tests/tests/multicast_multi_sub.rs +++ b/xde-tests/tests/multicast_multi_sub.rs @@ -30,9 +30,11 @@ use oxide_vpc::api::DEFAULT_MULTICAST_VNI; use oxide_vpc::api::IpCidr; use oxide_vpc::api::Ipv4Addr; use oxide_vpc::api::Ipv6Addr; +use oxide_vpc::api::McastForwardingNextHop; use oxide_vpc::api::MulticastUnderlay; use oxide_vpc::api::NextHopV6; use oxide_vpc::api::Replication; +use oxide_vpc::api::SourceFilter; use oxide_vpc::api::Vni; use xde_tests::GENEVE_UNDERLAY_FILTER; use xde_tests::IPV4_MULTICAST_CIDR; @@ -81,10 +83,11 @@ fn test_multicast_tx_forwarding_sender_only_subscribed() -> Result<()> { let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); // Set up Tx forwarding with `Replication::External` mode. - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::External, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }])?; // Allow IPv4 multicast traffic via Multicast target let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); @@ -111,14 +114,14 @@ fn test_multicast_tx_forwarding_sender_only_subscribed() -> Result<()> { let p1 = topol.nodes[1].port.name().to_string(); let p2 = topol.nodes[2].port.name().to_string(); assert!( - s_entry.ports.contains(&p0), + s_entry.has_port(&p0), "expected {p0} to be subscribed; got {:?}", - s_entry.ports + s_entry.subscribers ); assert!( - !s_entry.ports.contains(&p1) && !s_entry.ports.contains(&p2), + !s_entry.has_port(&p1) && !s_entry.has_port(&p2), "expected {p1} and {p2} not to be subscribed; got {:?}", - s_entry.ports + s_entry.subscribers ); // Start snoops on nodes B and C to verify no delivery (not subscribed) @@ -224,11 +227,9 @@ fn test_multicast_tx_same_sled_only() -> Result<()> { let p1 = topol.nodes[1].port.name().to_string(); let p2 = topol.nodes[2].port.name().to_string(); assert!( - s_entry.ports.contains(&p0) - && s_entry.ports.contains(&p1) - && s_entry.ports.contains(&p2), + s_entry.has_port(&p0) && s_entry.has_port(&p1) && s_entry.has_port(&p2), "expected {p0}, {p1}, {p2} to be subscribed; got {:?}", - s_entry.ports + s_entry.subscribers ); // Verify no forwarding entries exist @@ -314,10 +315,11 @@ fn test_multicast_underlay_replication_no_local_subscribers() -> Result<()> { // Set up Tx forwarding with `Replication::Underlay` mode. // Tx behavior: forward to underlay with multicast encapsulation. // Rx behavior: same-sled delivery to subscribers (none in this test). - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::Underlay, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; // Allow IPv4 multicast traffic via Multicast target. // @@ -432,10 +434,11 @@ fn test_multicast_external_replication_no_local_subscribers() -> Result<()> { // Tx behavior: forward to underlay with `Replication::External` flag for // boundary switch replication. // Rx behavior: same-sled delivery to subscribers (none in this test). - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::External, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }])?; // Allow IPv4 multicast traffic via Multicast target // @@ -535,10 +538,11 @@ fn test_multicast_both_replication() -> Result<()> { // (to front panel) & `Replication::Underlay` (sled-to-sled multicast). // Rx behavior: same-sled delivery occurs independently, driven purely by // port subscriptions (not the `Replication` mode). - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::Both, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Both, + source_filter: SourceFilter::default(), + }])?; // Allow IPv4 multicast traffic via Multicast target and subscribe to the group let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); @@ -561,11 +565,9 @@ fn test_multicast_both_replication() -> Result<()> { let p1 = topol.nodes[1].port.name().to_string(); let p2 = topol.nodes[2].port.name().to_string(); assert!( - s_entry.ports.contains(&p0) - && s_entry.ports.contains(&p1) - && s_entry.ports.contains(&p2), + s_entry.has_port(&p0) && s_entry.has_port(&p1) && s_entry.has_port(&p2), "expected {p0}, {p1}, {p2} to be subscribed; got {:?}", - s_entry.ports + s_entry.subscribers ); // Start snoops on nodes B and C (same-sled delivery) and underlay @@ -653,11 +655,9 @@ fn test_multicast_sender_self_exclusion() -> Result<()> { let p1 = topol.nodes[1].port.name().to_string(); let p2 = topol.nodes[2].port.name().to_string(); assert!( - s_entry.ports.contains(&p0) - && s_entry.ports.contains(&p1) - && s_entry.ports.contains(&p2), + s_entry.has_port(&p0) && s_entry.has_port(&p1) && s_entry.has_port(&p2), "expected all 3 ports subscribed (including sender A); got {:?}", - s_entry.ports + s_entry.subscribers ); // Start snoops on ALL nodes (A, B, C) @@ -718,10 +718,11 @@ fn test_partial_unsubscribe() -> Result<()> { // underlay egress; the actual packet destination is the multicast address. let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::External, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }])?; let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); for node in &topol.nodes { @@ -743,11 +744,9 @@ fn test_partial_unsubscribe() -> Result<()> { .find(|e| e.underlay == mcast_underlay) .expect("missing multicast subscription entry"); assert!( - s_entry.ports.contains(&p0) - && s_entry.ports.contains(&p1) - && s_entry.ports.contains(&p2), + s_entry.has_port(&p0) && s_entry.has_port(&p1) && s_entry.has_port(&p2), "expected all 3 ports subscribed initially; got {:?}", - s_entry.ports + s_entry.subscribers ); // Send packet and verify B and C receive (A is sender, won't receive its own) @@ -786,14 +785,14 @@ fn test_partial_unsubscribe() -> Result<()> { .find(|e| e.underlay == mcast_underlay) .expect("subscription entry should still exist"); assert!( - s_entry2.ports.contains(&p0) && s_entry2.ports.contains(&p2), + s_entry2.has_port(&p0) && s_entry2.has_port(&p2), "expected p0 and p2 to remain subscribed; got {:?}", - s_entry2.ports + s_entry2.subscribers ); assert!( - !s_entry2.ports.contains(&p1), + !s_entry2.has_port(&p1), "expected p1 to be unsubscribed; got {:?}", - s_entry2.ports + s_entry2.subscribers ); // Verify forwarding table unchanged (forwarding is independent of local subs) @@ -804,10 +803,10 @@ fn test_partial_unsubscribe() -> Result<()> { .find(|e| e.underlay == mcast_underlay) .expect("forwarding entry should still exist"); assert!( - fwd_entry.next_hops.iter().any(|(nexthop, rep)| { - *rep == Replication::External - && nexthop.addr == fake_switch_addr - && nexthop.vni == vni + fwd_entry.next_hops.iter().any(|hop| { + hop.replication == Replication::External + && hop.next_hop.addr == fake_switch_addr + && hop.next_hop.vni == vni }), "forwarding table should be unchanged" ); diff --git a/xde-tests/tests/multicast_rx.rs b/xde-tests/tests/multicast_rx.rs index 3b6974b5..2415ab15 100644 --- a/xde-tests/tests/multicast_rx.rs +++ b/xde-tests/tests/multicast_rx.rs @@ -21,9 +21,11 @@ use oxide_vpc::api::DEFAULT_MULTICAST_VNI; use oxide_vpc::api::IpCidr; use oxide_vpc::api::Ipv4Addr; use oxide_vpc::api::Ipv6Addr; +use oxide_vpc::api::McastForwardingNextHop; use oxide_vpc::api::MulticastUnderlay; use oxide_vpc::api::NextHopV6; use oxide_vpc::api::Replication; +use oxide_vpc::api::SourceFilter; use oxide_vpc::api::Vni; use xde_tests::GENEVE_UNDERLAY_FILTER; use xde_tests::IPV4_MULTICAST_CIDR; @@ -72,10 +74,11 @@ fn test_xde_multicast_rx_dual_family() -> Result<()> { // (when packet loops back via u1→u2 from the underlay). This double-delivery // is a test artifact. In production multi-sled, only Rx delivery occurs when // receiving from other sleds. - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::Underlay, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; // Allow IPv4 multicast traffic (224.0.0.0/4) via Multicast target. let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); @@ -105,9 +108,9 @@ fn test_xde_multicast_rx_dual_family() -> Result<()> { let p0 = topol.nodes[0].port.name().to_string(); let p1 = topol.nodes[1].port.name().to_string(); assert!( - s_entry.ports.contains(&p0) && s_entry.ports.contains(&p1), + s_entry.has_port(&p0) && s_entry.has_port(&p1), "expected both {p0} and {p1} to be subscribed; got {:?}", - s_entry.ports + s_entry.subscribers ); // Assert forwarding table contains expected next hop + replication @@ -118,10 +121,10 @@ fn test_xde_multicast_rx_dual_family() -> Result<()> { .find(|e| e.underlay == mcast_underlay) .expect("missing multicast forwarding entry for underlay group"); assert!( - entry.next_hops.iter().any(|(nexthop, rep)| { - *rep == Replication::Underlay - && nexthop.addr == fake_switch_addr - && nexthop.vni == vni + entry.next_hops.iter().any(|hop| { + hop.replication == Replication::Underlay + && hop.next_hop.addr == fake_switch_addr + && hop.next_hop.vni == vni }), "expected Underlay replication to {fake_switch_addr:?} in forwarding table; got: {:?}", entry.next_hops @@ -181,9 +184,9 @@ fn test_xde_multicast_rx_dual_family() -> Result<()> { .find(|e| e.underlay == mcast_underlay) .expect("missing multicast subscription entry after unsubscribe"); assert!( - !s_entry2.ports.contains(&p1), + !s_entry2.has_port(&p1), "expected {p1} to be unsubscribed; got {:?}", - s_entry2.ports + s_entry2.subscribers ); let mut snoop2 = SnoopGuard::start(&dev_name_b, &filter)?; @@ -207,10 +210,11 @@ fn test_xde_multicast_rx_dual_family() -> Result<()> { MulticastGroup::new(mcast_group_v6.into(), mcast_underlay_v6)?; // Reuse same forwarding config - mcast_v6.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::Underlay, - )])?; + mcast_v6.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; // Allow IPv6 multicast traffic (ff04::/16 admin-local) via Multicast target let mcast_cidr_v6 = @@ -338,10 +342,11 @@ fn test_multicast_config_no_spurious_traffic() -> Result<()> { let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); // Set up forwarding with Underlay replication - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::Underlay, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); for node in &topol.nodes { diff --git a/xde-tests/tests/multicast_source_filter.rs b/xde-tests/tests/multicast_source_filter.rs new file mode 100644 index 00000000..7e624093 --- /dev/null +++ b/xde-tests/tests/multicast_source_filter.rs @@ -0,0 +1,1195 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2026 Oxide Computer Company + +//! Source filtering tests for multicast subscriptions. +//! +//! These validate source filtering with semantics based on IGMPv3/MLDv2: +//! - INCLUDE(sources) only accepts packets from listed sources. +//! - INCLUDE() (empty) blocks all sources. +//! - EXCLUDE(sources) blocks listed sources but accepts others. +//! - EXCLUDE() (empty) accepts all sources (*, G) -> default ASM behavior. +//! - Filter changes via resubscribe take effect immediately. +//! +//! See RFC 3376 (IGMPv3) and RFC 3810 (MLDv2) for the original protocol +//! definitions that inspired this design. + +use anyhow::Result; +use oxide_vpc::api::DEFAULT_MULTICAST_VNI; +use oxide_vpc::api::FilterMode; +use oxide_vpc::api::IpAddr; +use oxide_vpc::api::IpCidr; +use oxide_vpc::api::Ipv4Addr; +use oxide_vpc::api::Ipv6Addr; +use oxide_vpc::api::McastForwardingNextHop; +use oxide_vpc::api::MulticastUnderlay; +use oxide_vpc::api::NextHopV6; +use oxide_vpc::api::Replication; +use oxide_vpc::api::SourceFilter; +use oxide_vpc::api::Vni; +use xde_tests::GENEVE_UNDERLAY_FILTER; +use xde_tests::IPV4_MULTICAST_CIDR; +use xde_tests::IPV6_ADMIN_LOCAL_MULTICAST_CIDR; +use xde_tests::MCAST_TEST_PORT; +use xde_tests::MulticastGroup; +use xde_tests::SnoopGuard; +use xde_tests::UNDERLAY_TEST_DEVICE; + +/// Create an INCLUDE filter with specified sources. +fn include_filter(sources: impl IntoIterator) -> SourceFilter { + SourceFilter { + mode: FilterMode::Include, + sources: sources.into_iter().collect(), + } +} + +/// Create an EXCLUDE filter with specified sources. +fn exclude_filter(sources: impl IntoIterator) -> SourceFilter { + SourceFilter { + mode: FilterMode::Exclude, + sources: sources.into_iter().collect(), + } +} + +#[test] +fn test_include_filter_allows_listed_source() -> Result<()> { + // When subscribed with INCLUDE(sender_ip, other_ip), packets from sender + // should be delivered. + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); + let dev_name_b = topol.nodes[1].port.name().to_string(); + + // Add router entries for both families + let mcast_cidr_v4 = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v6)?; + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 0, 0, 252]); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:fc".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let sender_ip: IpAddr = topol.nodes[0].port.ip().into(); + let other_allowed: IpAddr = Ipv4Addr::from([10, 88, 88, 88]).into(); + + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([sender_ip, other_allowed]), + )?; + + let filter = + format!("udp and ip dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "allowed source test", + )?; + + let output = snoop.assert_packet("IPv4: from allowed source"); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!( + stdout.contains("224.0.0.252"), + "expected multicast dest: {stdout}" + ); + } + + // IPv6 + { + let mcast_group: Ipv6Addr = "ff04::e000:200".parse().unwrap(); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:200".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let sender_ip: IpAddr = topol.nodes[0] + .port + .ipv6() + .expect("dualstack port must have IPv6") + .into(); + + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([sender_ip]), + )?; + + let filter = + format!("udp and ip6 dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "allowed v6 source", + )?; + + let output = snoop.assert_packet("IPv6: from allowed source"); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!( + stdout.contains("ff04::e000:200"), + "expected multicast dest: {stdout}" + ); + } + + Ok(()) +} + +#[test] +fn test_include_filter_blocks_unlisted_source() -> Result<()> { + // When subscribed with INCLUDE(other_ip), packets from sender should + // be blocked because sender is not in the include list. + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); + let dev_name_b = topol.nodes[1].port.name().to_string(); + + let mcast_cidr_v4 = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v6)?; + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 0, 0, 253]); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:fd".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let other_ip: IpAddr = Ipv4Addr::from([10, 99, 99, 99]).into(); + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([other_ip]), + )?; + + let filter = + format!("udp and ip dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "blocked source test", + )?; + + snoop + .assert_no_packet("IPv4: from unlisted source with INCLUDE filter"); + } + + // IPv6 + { + let mcast_group: Ipv6Addr = "ff04::e000:201".parse().unwrap(); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:201".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let other_ip: IpAddr = + "fd00:9999::1".parse::().unwrap().into(); + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([other_ip]), + )?; + + let filter = + format!("udp and ip6 dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "blocked v6 source", + )?; + + snoop + .assert_no_packet("IPv6: from unlisted source with INCLUDE filter"); + } + + Ok(()) +} + +#[test] +fn test_include_empty_blocks_all() -> Result<()> { + // INCLUDE() means accept nothing, so all packets should be blocked. + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); + let dev_name_b = topol.nodes[1].port.name().to_string(); + + let mcast_cidr_v4 = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v6)?; + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 0, 0, 254]); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:fe".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter(std::iter::empty::()), + )?; + + let filter = + format!("udp and ip dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "should be blocked", + )?; + + snoop.assert_no_packet("IPv4: with INCLUDE() (empty) filter"); + } + + // IPv6 + { + let mcast_group: Ipv6Addr = "ff04::e000:203".parse().unwrap(); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:203".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter(std::iter::empty::()), + )?; + + let filter = + format!("udp and ip6 dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "should be blocked v6", + )?; + + snoop.assert_no_packet("IPv6: with INCLUDE() (empty) filter"); + } + + Ok(()) +} + +#[test] +fn test_exclude_empty_allows_all() -> Result<()> { + // EXCLUDE() means accept any source, which is the default ASM behavior. + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); + let dev_name_b = topol.nodes[1].port.name().to_string(); + + let mcast_cidr_v4 = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v6)?; + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 0, 1, 1]); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:101".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter(std::iter::empty::()), + )?; + + let filter = + format!("udp and ip dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "should be allowed", + )?; + + let output = + snoop.assert_packet("IPv4: with EXCLUDE() (any source) filter"); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!( + stdout.contains("224.0.1.1"), + "expected multicast dest: {stdout}" + ); + } + + // IPv6 + { + let mcast_group: Ipv6Addr = "ff04::e000:204".parse().unwrap(); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:204".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter(std::iter::empty::()), + )?; + + let filter = + format!("udp and ip6 dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "should be allowed v6", + )?; + + let output = + snoop.assert_packet("IPv6: with EXCLUDE() (any source) filter"); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!( + stdout.contains("ff04::e000:204"), + "expected multicast dest: {stdout}" + ); + } + + Ok(()) +} + +#[test] +fn test_exclude_filter_blocks_listed_source() -> Result<()> { + // EXCLUDE(sender_ip, other_ip) should block packets from sender. + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); + let dev_name_b = topol.nodes[1].port.name().to_string(); + + let mcast_cidr_v4 = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v6)?; + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 0, 1, 2]); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:102".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let sender_ip: IpAddr = topol.nodes[0].port.ip().into(); + let other_blocked: IpAddr = Ipv4Addr::from([10, 77, 77, 77]).into(); + + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter([sender_ip, other_blocked]), + )?; + + let filter = + format!("udp and ip dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "excluded source test", + )?; + + snoop.assert_no_packet("IPv4: from excluded source"); + } + + // IPv6 + { + let mcast_group: Ipv6Addr = "ff04::e000:202".parse().unwrap(); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:202".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let sender_ip: IpAddr = topol.nodes[0] + .port + .ipv6() + .expect("dualstack port must have IPv6") + .into(); + + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter([sender_ip]), + )?; + + let filter = + format!("udp and ip6 dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "excluded v6 source", + )?; + + snoop.assert_no_packet("IPv6: from excluded source"); + } + + Ok(()) +} + +#[test] +fn test_exclude_filter_allows_unlisted_source() -> Result<()> { + // EXCLUDE(other_ip) should allow packets from sender (not in exclude list). + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); + let dev_name_b = topol.nodes[1].port.name().to_string(); + + let mcast_cidr_v4 = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v6)?; + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 0, 1, 4]); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:104".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let other_ip: IpAddr = Ipv4Addr::from([10, 99, 99, 99]).into(); + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter([other_ip]), + )?; + + let filter = + format!("udp and ip dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "allowed by exclude filter", + )?; + + let output = snoop + .assert_packet("IPv4: from unlisted source with EXCLUDE filter"); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!( + stdout.contains("224.0.1.4"), + "expected multicast dest: {stdout}" + ); + } + + // IPv6 + { + let mcast_group: Ipv6Addr = "ff04::e000:207".parse().unwrap(); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:207".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let other_ip: IpAddr = + "fd00:9999::1".parse::().unwrap().into(); + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter([other_ip]), + )?; + + let filter = + format!("udp and ip6 dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop = SnoopGuard::start(&dev_name_b, &filter)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "allowed by exclude filter v6", + )?; + + let output = snoop + .assert_packet("IPv6: from unlisted source with EXCLUDE filter"); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!( + stdout.contains("ff04::e000:207"), + "expected multicast dest: {stdout}" + ); + } + + Ok(()) +} + +#[test] +fn test_filter_update_via_resubscribe() -> Result<()> { + // Resubscribing with a different filter should update the filter + // and take effect immediately. + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); + let dev_name_b = topol.nodes[1].port.name().to_string(); + + let mcast_cidr_v4 = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v4)?; + topol.nodes[1].port.add_multicast_router_entry(mcast_cidr_v6)?; + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 0, 1, 3]); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:103".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let sender_ip: IpAddr = topol.nodes[0].port.ip().into(); + let filter = + format!("udp and ip dst {mcast_group} and port {MCAST_TEST_PORT}"); + + // Case: INCLUDE(sender) allows delivery + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([sender_ip]), + )?; + + let mut snoop1 = SnoopGuard::start(&dev_name_b, &filter)?; + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "case 1", + )?; + snoop1.assert_packet("IPv4: INCLUDE(sender) allows"); + + // Case: resubscribe with EXCLUDE(sender) blocks delivery + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter([sender_ip]), + )?; + + let mut snoop2 = SnoopGuard::start(&dev_name_b, &filter)?; + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "case 2", + )?; + snoop2.assert_no_packet("IPv4: EXCLUDE(sender) blocks"); + + // Case: resubscribe with EXCLUDE() allows delivery again + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter(std::iter::empty::()), + )?; + + let mut snoop3 = SnoopGuard::start(&dev_name_b, &filter)?; + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "case 3", + )?; + snoop3.assert_packet("IPv4: EXCLUDE() allows"); + } + + // IPv6 + { + let mcast_group: Ipv6Addr = "ff04::e000:205".parse().unwrap(); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:205".parse().unwrap()).unwrap(); + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + + let sender_ip: IpAddr = topol.nodes[0] + .port + .ipv6() + .expect("dualstack port must have IPv6") + .into(); + let filter = + format!("udp and ip6 dst {mcast_group} and port {MCAST_TEST_PORT}"); + + // Case: INCLUDE(sender) allows delivery + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([sender_ip]), + )?; + + let mut snoop1 = SnoopGuard::start(&dev_name_b, &filter)?; + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "case 1 v6", + )?; + snoop1.assert_packet("IPv6: INCLUDE(sender) allows"); + + // Case: resubscribe with EXCLUDE(sender) blocks delivery + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter([sender_ip]), + )?; + + let mut snoop2 = SnoopGuard::start(&dev_name_b, &filter)?; + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "case 2 v6", + )?; + snoop2.assert_no_packet("IPv6: EXCLUDE(sender) blocks"); + + // Case: resubscribe with EXCLUDE() allows delivery again + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + exclude_filter(std::iter::empty::()), + )?; + + let mut snoop3 = SnoopGuard::start(&dev_name_b, &filter)?; + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "case 3 v6", + )?; + snoop3.assert_packet("IPv6: EXCLUDE() allows"); + } + + Ok(()) +} + +#[test] +fn test_tx_same_sled_source_filtering() -> Result<()> { + // This tests source filtering on Tx same-sled delivery path. + // + // This exercises the mcast_tx_source_filtered code path by verifying + // per-member filtering where one subscriber receives and another is blocked. + // + // Setup: Three nodes (A sender, B and C receivers) on the same sled. + // - Node B subscribes with INCLUDE(sender_ip) -> should be received + // - Node C subscribes with INCLUDE(other_ip) -> should be blocked + // - No forwarding configured, so only Tx same-sled delivery is tested + + let topol = xde_tests::three_node_topology_dualstack()?; + + let mcast_cidr_v4 = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + for node in &topol.nodes { + node.port.add_multicast_router_entry(mcast_cidr_v4)?; + node.port.add_multicast_router_entry(mcast_cidr_v6)?; + } + + let dev_name_b = topol.nodes[1].port.name().to_string(); + let dev_name_c = topol.nodes[2].port.name().to_string(); + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 0, 1, 5]); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:105".parse().unwrap()).unwrap(); + let _mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + + let sender_ip: IpAddr = topol.nodes[0].port.ip().into(); + let other_ip: IpAddr = Ipv4Addr::from([10, 99, 99, 99]).into(); + + topol.nodes[0].port.subscribe_multicast(mcast_group.into())?; + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([sender_ip]), + )?; + topol.nodes[2].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([other_ip]), + )?; + + let filter = + format!("udp and ip dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop_b = SnoopGuard::start(&dev_name_b, &filter)?; + let mut snoop_c = SnoopGuard::start(&dev_name_c, &filter)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "tx same-sled filter test", + )?; + + let output_b = + snoop_b.assert_packet("IPv4: node B with INCLUDE(sender)"); + let stdout_b = String::from_utf8_lossy(&output_b.stdout); + assert!( + stdout_b.contains("224.0.1.5"), + "expected multicast dest: {stdout_b}" + ); + + snoop_c + .assert_no_packet("IPv4: node C with INCLUDE(other) blocks sender"); + } + + // IPv6 + { + let mcast_group: Ipv6Addr = "ff04::e000:206".parse().unwrap(); + let mcast_underlay = + MulticastUnderlay::new("ff04::e000:206".parse().unwrap()).unwrap(); + let _mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + + let sender_ip: IpAddr = topol.nodes[0] + .port + .ipv6() + .expect("dualstack port must have IPv6") + .into(); + let other_ip: IpAddr = + "fd00:9999::1".parse::().unwrap().into(); + + topol.nodes[0].port.subscribe_multicast(mcast_group.into())?; + topol.nodes[1].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([sender_ip]), + )?; + topol.nodes[2].port.subscribe_multicast_filtered( + mcast_group.into(), + include_filter([other_ip]), + )?; + + let filter = + format!("udp and ip6 dst {mcast_group} and port {MCAST_TEST_PORT}"); + let mut snoop_b = SnoopGuard::start(&dev_name_b, &filter)?; + let mut snoop_c = SnoopGuard::start(&dev_name_c, &filter)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group, + MCAST_TEST_PORT, + "tx same-sled v6 filter test", + )?; + + let output_b = + snoop_b.assert_packet("IPv6: node B with INCLUDE(sender)"); + let stdout_b = String::from_utf8_lossy(&output_b.stdout); + assert!( + stdout_b.contains("ff04::e000:206"), + "expected multicast dest: {stdout_b}" + ); + + snoop_c + .assert_no_packet("IPv6: node C with INCLUDE(other) blocks sender"); + } + + Ok(()) +} + +#[test] +fn test_forwarding_source_filter() -> Result<()> { + // Test forwarding-level source filtering. Packets are blocked from being + // forwarded to remote sleds when the aggregated source filter for that + // next hop rejects the source. + // + // This exercises mcast_tx_fwd_source_filtered. + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + + let mcast_group = Ipv4Addr::from([224, 1, 2, 240]); + let mcast_underlay = MulticastUnderlay::new(Ipv6Addr::from([ + 0xff, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 224, 1, 2, 240, + ])) + .unwrap(); + + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + + let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); + let sender_ip: IpAddr = topol.nodes[0].port.ip().into(); + + // Set up forwarding with INCLUDE filter that blocks the sender. + // The aggregated filter for this next hop only accepts packets from + // 10.99.99.99 (an address that doesn't exist), so packets from the + // actual sender should be filtered. + let other_ip: IpAddr = Ipv4Addr::from([10, 99, 99, 99]).into(); + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: include_filter([other_ip]), + }])?; + + let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr)?; + + // Subscribe sender to enable Tx processing + topol.nodes[0] + .port + .subscribe_multicast(mcast_group.into()) + .expect("subscribe should succeed"); + + // Snoop underlay for Geneve packets + let mut snoop_underlay = + SnoopGuard::start(UNDERLAY_TEST_DEVICE, GENEVE_UNDERLAY_FILTER)?; + + // Send multicast packet -> tihs should be filtered at forwarding level + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "filtered at forwarding", + )?; + + // No packet should appear on underlay because forwarding filter blocked it + snoop_underlay.assert_no_packet( + "forwarding INCLUDE(other) should block sender from underlay", + ); + + // Now update forwarding to allow the sender + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: include_filter([sender_ip]), + }])?; + + let mut snoop_underlay2 = + SnoopGuard::start(UNDERLAY_TEST_DEVICE, GENEVE_UNDERLAY_FILTER)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "allowed at forwarding", + )?; + + // Packet should now appear on underlay + snoop_underlay2 + .assert_packet("forwarding INCLUDE(sender) should allow to underlay"); + + // Test EXCLUDE mode: EXCLUDE(sender) should block + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: exclude_filter([sender_ip]), + }])?; + + let mut snoop_underlay3 = + SnoopGuard::start(UNDERLAY_TEST_DEVICE, GENEVE_UNDERLAY_FILTER)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "excluded at forwarding", + )?; + + snoop_underlay3.assert_no_packet( + "forwarding EXCLUDE(sender) should block sender from underlay", + ); + + // EXCLUDE() (empty) should allow any source + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), // EXCLUDE() = any + }])?; + + let mut snoop_underlay4 = + SnoopGuard::start(UNDERLAY_TEST_DEVICE, GENEVE_UNDERLAY_FILTER)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "default filter allows", + )?; + + snoop_underlay4.assert_packet( + "forwarding EXCLUDE() (default) should allow any source", + ); + + // IPv6 + { + let mcast_group_v6: Ipv6Addr = "ff04::e000:2f0".parse().unwrap(); + let mcast_underlay_v6 = + MulticastUnderlay::new("ff04::e000:2f0".parse().unwrap()).unwrap(); + let mcast_v6 = + MulticastGroup::new(mcast_group_v6.into(), mcast_underlay_v6)?; + + let sender_ip_v6: IpAddr = topol.nodes[0] + .port + .ipv6() + .expect("dualstack port must have IPv6") + .into(); + + // INCLUDE(other) should block + let other_ip_v6: IpAddr = "fd00::99:99:99:99".parse().unwrap(); + mcast_v6.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: include_filter([other_ip_v6]), + }])?; + + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + + topol.nodes[0] + .port + .subscribe_multicast(mcast_group_v6.into()) + .expect("subscribe should succeed"); + + let mut snoop_v6_1 = + SnoopGuard::start(UNDERLAY_TEST_DEVICE, GENEVE_UNDERLAY_FILTER)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group_v6, + MCAST_TEST_PORT, + "v6 filtered", + )?; + + snoop_v6_1.assert_no_packet( + "IPv6: forwarding INCLUDE(other) should block sender", + ); + + // INCLUDE(sender) should allow + mcast_v6.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: include_filter([sender_ip_v6]), + }])?; + + let mut snoop_v6_2 = + SnoopGuard::start(UNDERLAY_TEST_DEVICE, GENEVE_UNDERLAY_FILTER)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group_v6, + MCAST_TEST_PORT, + "v6 allowed", + )?; + + snoop_v6_2 + .assert_packet("IPv6: forwarding INCLUDE(sender) should allow"); + } + + Ok(()) +} + +#[test] +fn test_forwarding_multi_nexthop_different_filters() -> Result<()> { + // Test multi-next-hop forwarding with different source filters per hop. + // Verifies that filtering is applied independently per next hop. When two + // next hops have different filters, packets only forward to allowed hops. + + let topol = xde_tests::two_node_topology_dualstack()?; + let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; + + // IPv4 + { + let mcast_group = Ipv4Addr::from([224, 1, 2, 241]); + let mcast_underlay = MulticastUnderlay::new(Ipv6Addr::from([ + 0xff, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 224, 1, 2, 241, + ])) + .unwrap(); + + let mcast = MulticastGroup::new(mcast_group.into(), mcast_underlay)?; + + // Two synthetic next hop addresses + let nexthop_allowed: Ipv6Addr = "fd77::1".parse().unwrap(); + let nexthop_blocked: Ipv6Addr = "fd77::2".parse().unwrap(); + + let sender_ip: IpAddr = topol.nodes[0].port.ip().into(); + let other_ip: IpAddr = Ipv4Addr::from([10, 99, 99, 99]).into(); + + // Set up two next hops with different filters: + // - nexthop_allowed: INCLUDE(sender) -> should forward + // - nexthop_blocked: INCLUDE(other) -> should NOT forward + mcast.set_forwarding(vec![ + McastForwardingNextHop { + next_hop: NextHopV6::new(nexthop_allowed, vni), + replication: Replication::Underlay, + source_filter: include_filter([sender_ip]), + }, + McastForwardingNextHop { + next_hop: NextHopV6::new(nexthop_blocked, vni), + replication: Replication::External, + source_filter: include_filter([other_ip]), + }, + ])?; + + let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr)?; + + topol.nodes[0] + .port + .subscribe_multicast(mcast_group.into()) + .expect("subscribe should succeed"); + + // Snoop underlay, where we should see exactly 1 packet + // (to nexthop_allowed) and not 2 (which would happen if both next hops + // received the packet) + let mut snoop = + SnoopGuard::start(UNDERLAY_TEST_DEVICE, GENEVE_UNDERLAY_FILTER)?; + + topol.nodes[0].zone.send_udp_v4( + topol.nodes[0].port.ip(), + mcast_group, + MCAST_TEST_PORT, + "multi-hop filter test", + )?; + + // Packet should reach allowed hop, while the blocked hop is filtered out. + snoop.assert_packet("IPv4: should forward to allowed next hop"); + } + + // IPv6 + { + let mcast_group_v6: Ipv6Addr = "ff04::e000:2f1".parse().unwrap(); + let mcast_underlay_v6 = + MulticastUnderlay::new("ff04::e000:2f1".parse().unwrap()).unwrap(); + let mcast_v6 = + MulticastGroup::new(mcast_group_v6.into(), mcast_underlay_v6)?; + + let nexthop_allowed: Ipv6Addr = "fd77::3".parse().unwrap(); + let nexthop_blocked: Ipv6Addr = "fd77::4".parse().unwrap(); + + let sender_ip_v6: IpAddr = topol.nodes[0] + .port + .ipv6() + .expect("dualstack port must have IPv6") + .into(); + let other_ip_v6: IpAddr = "fd00::99:99:99:99".parse().unwrap(); + + mcast_v6.set_forwarding(vec![ + McastForwardingNextHop { + next_hop: NextHopV6::new(nexthop_allowed, vni), + replication: Replication::Underlay, + source_filter: include_filter([sender_ip_v6]), + }, + McastForwardingNextHop { + next_hop: NextHopV6::new(nexthop_blocked, vni), + replication: Replication::External, + source_filter: include_filter([other_ip_v6]), + }, + ])?; + + let mcast_cidr_v6 = + IpCidr::Ip6(IPV6_ADMIN_LOCAL_MULTICAST_CIDR.parse().unwrap()); + topol.nodes[0].port.add_multicast_router_entry(mcast_cidr_v6)?; + + topol.nodes[0] + .port + .subscribe_multicast(mcast_group_v6.into()) + .expect("subscribe should succeed"); + + let mut snoop_v6 = + SnoopGuard::start(UNDERLAY_TEST_DEVICE, GENEVE_UNDERLAY_FILTER)?; + + topol.nodes[0].zone.send_udp_v6( + topol.nodes[0].port.ipv6().unwrap(), + mcast_group_v6, + MCAST_TEST_PORT, + "multi-hop filter test v6", + )?; + + snoop_v6.assert_packet("IPv6: should forward to allowed next hop"); + } + + Ok(()) +} diff --git a/xde-tests/tests/multicast_validation.rs b/xde-tests/tests/multicast_validation.rs index a223814d..52dfdc36 100644 --- a/xde-tests/tests/multicast_validation.rs +++ b/xde-tests/tests/multicast_validation.rs @@ -21,12 +21,14 @@ use oxide_vpc::api::DEFAULT_MULTICAST_VNI; use oxide_vpc::api::IpCidr; use oxide_vpc::api::Ipv4Addr; use oxide_vpc::api::Ipv6Addr; +use oxide_vpc::api::McastForwardingNextHop; use oxide_vpc::api::McastSubscribeReq; use oxide_vpc::api::McastUnsubscribeAllReq; use oxide_vpc::api::McastUnsubscribeReq; use oxide_vpc::api::MulticastUnderlay; use oxide_vpc::api::NextHopV6; use oxide_vpc::api::Replication; +use oxide_vpc::api::SourceFilter; use oxide_vpc::api::Vni; use xde_tests::GENEVE_UNDERLAY_FILTER; use xde_tests::IPV4_MULTICAST_CIDR; @@ -81,9 +83,9 @@ fn test_subscribe_ff04_direct_without_m2p() -> Result<()> { .expect("missing multicast subscription entry for ff04 group"); let p0 = topol.nodes[0].port.name().to_string(); assert!( - entry.ports.contains(&p0), + entry.has_port(&p0), "expected {p0} to be subscribed; got {:?}", - entry.ports + entry.subscribers ); Ok(()) @@ -97,6 +99,7 @@ fn test_subscribe_nonexistent_port() -> Result<()> { let res = hdl.mcast_subscribe(&McastSubscribeReq { port_name: "this_port_does_not_exist_anywhere".to_string(), group: mcast_group.into(), + filter: SourceFilter::default(), }); assert!( @@ -116,6 +119,7 @@ fn test_subscribe_unicast_ip_as_group() -> Result<()> { let res = hdl.mcast_subscribe(&McastSubscribeReq { port_name: topol.nodes[0].port.name().to_string(), group: unicast_ip.into(), + filter: SourceFilter::default(), }); let err = res.expect_err("Expected error when subscribing to unicast IP"); @@ -147,10 +151,11 @@ fn test_double_subscribe() -> Result<()> { // Use node B's underlay address as the switch unicast address for routing. let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::External, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }])?; let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); for node in &topol.nodes { @@ -177,9 +182,9 @@ fn test_double_subscribe() -> Result<()> { .expect("missing multicast subscription entry for group"); let p1 = topol.nodes[1].port.name().to_string(); assert!( - entry.ports.contains(&p1), + entry.has_port(&p1), "expected {p1} to be subscribed; got {:?}", - entry.ports + entry.subscribers ); let filter = @@ -243,10 +248,11 @@ fn test_subscribe_then_clear_m2p() -> Result<()> { // Use node B's underlay address as the switch unicast address for routing. let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::External, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }])?; let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); for node in &topol.nodes { @@ -306,10 +312,11 @@ fn test_set_mcast_fwd_rejects_non_default_vni() -> Result<()> { let res = hdl.set_mcast_fwd(&oxide_vpc::api::SetMcastForwardingReq { underlay, - next_hops: vec![( - NextHopV6::new(fake_switch_addr, bad_vni), - Replication::External, - )], + next_hops: vec![oxide_vpc::api::McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, bad_vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }], }); assert!(res.is_err(), "set_mcast_fwd should reject non-default VNI"); @@ -336,10 +343,11 @@ fn test_set_mcast_fwd_rejects_multicast_next_hop() -> Result<()> { let res = hdl.set_mcast_fwd(&oxide_vpc::api::SetMcastForwardingReq { underlay, - next_hops: vec![( - NextHopV6::new(bad_next_hop, vni), - Replication::External, - )], + next_hops: vec![oxide_vpc::api::McastForwardingNextHop { + next_hop: NextHopV6::new(bad_next_hop, vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }], }); assert!(res.is_err(), "set_mcast_fwd should reject multicast next hop"); @@ -402,10 +410,11 @@ fn test_multiple_nexthops_accumulate() -> Result<()> { let switch_a = topol.nodes[0].port.underlay_ip().into(); let switch_b = topol.nodes[1].port.underlay_ip().into(); - mcast.set_forwarding(vec![( - NextHopV6::new(switch_a, vni), - Replication::External, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(switch_a, vni), + replication: Replication::External, + source_filter: SourceFilter::default(), + }])?; let hdl = OpteHdl::open()?; let fwd = hdl.dump_mcast_fwd()?; @@ -415,13 +424,14 @@ fn test_multiple_nexthops_accumulate() -> Result<()> { .find(|e| e.underlay == underlay) .expect("missing forwarding entry"); assert_eq!(entry.next_hops.len(), 1, "Expected 1 next hop after first set"); - assert_eq!(entry.next_hops[0].0.addr, switch_a); - assert_eq!(entry.next_hops[0].1, Replication::External); + assert_eq!(entry.next_hops[0].next_hop.addr, switch_a); + assert_eq!(entry.next_hops[0].replication, Replication::External); - mcast.set_forwarding(vec![( - NextHopV6::new(switch_b, vni), - Replication::Underlay, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(switch_b, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; let fwd = hdl.dump_mcast_fwd()?; let entry = fwd @@ -438,29 +448,30 @@ fn test_multiple_nexthops_accumulate() -> Result<()> { let nexthop_a = entry .next_hops .iter() - .find(|(nexthop, _)| nexthop.addr == switch_a) + .find(|hop| hop.next_hop.addr == switch_a) .expect("switch_a not found"); let nexthop_b = entry .next_hops .iter() - .find(|(nexthop, _)| nexthop.addr == switch_b) + .find(|hop| hop.next_hop.addr == switch_b) .expect("switch_b not found"); assert_eq!( - nexthop_a.1, + nexthop_a.replication, Replication::External, "switch_a should have External" ); assert_eq!( - nexthop_b.1, + nexthop_b.replication, Replication::Underlay, "switch_b should have Underlay" ); - mcast.set_forwarding(vec![( - NextHopV6::new(switch_a, vni), - Replication::Both, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(switch_a, vni), + replication: Replication::Both, + source_filter: SourceFilter::default(), + }])?; let fwd = hdl.dump_mcast_fwd()?; let entry = fwd @@ -477,21 +488,21 @@ fn test_multiple_nexthops_accumulate() -> Result<()> { let nexthop_a = entry .next_hops .iter() - .find(|(nexthop, _)| nexthop.addr == switch_a) + .find(|hop| hop.next_hop.addr == switch_a) .expect("switch_a not found"); let nexthop_b = entry .next_hops .iter() - .find(|(nexthop, _)| nexthop.addr == switch_b) + .find(|hop| hop.next_hop.addr == switch_b) .expect("switch_b not found"); assert_eq!( - nexthop_a.1, + nexthop_a.replication, Replication::Both, "switch_a should now have Both (updated)" ); assert_eq!( - nexthop_b.1, + nexthop_b.replication, Replication::Underlay, "switch_b should still have Underlay" ); @@ -538,19 +549,19 @@ fn test_unsubscribe_all() -> Result<()> { let p0 = topol.nodes[0].port.name().to_string(); let p1 = topol.nodes[1].port.name().to_string(); assert_eq!( - entry.ports.len(), + entry.subscribers.len(), 2, "Expected 2 ports subscribed before unsubscribe_all" ); assert!( - entry.ports.contains(&p0), + entry.has_port(&p0), "expected {p0} to be subscribed; got {:?}", - entry.ports + entry.subscribers ); assert!( - entry.ports.contains(&p1), + entry.has_port(&p1), "expected {p1} to be subscribed; got {:?}", - entry.ports + entry.subscribers ); // Unsubscribe all ports from the group @@ -617,10 +628,11 @@ fn test_clear_forwarding_stops_underlay_egress() -> Result<()> { // Route via node B's underlay address to select the egress link. let fake_switch_addr = topol.nodes[1].port.underlay_ip().into(); - mcast.set_forwarding(vec![( - NextHopV6::new(fake_switch_addr, vni), - Replication::Underlay, - )])?; + mcast.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch_addr, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; // Allow IPv4 multicast traffic via Multicast target let mcast_cidr = IpCidr::Ip4(IPV4_MULTICAST_CIDR.parse().unwrap()); @@ -648,7 +660,7 @@ fn test_clear_forwarding_stops_underlay_egress() -> Result<()> { "Expected 1 next hop in forwarding table" ); assert_eq!( - entry.next_hops[0].1, + entry.next_hops[0].replication, Replication::Underlay, "Expected Underlay replication mode" ); @@ -754,9 +766,9 @@ fn test_multiple_simultaneous_groups() -> Result<()> { .find(|e| e.underlay == underlay_a) .expect("missing subscription entry for group A"); assert!( - entry_a.ports.contains(&p0) && !entry_a.ports.contains(&p1), + entry_a.has_port(&p0) && !entry_a.has_port(&p1), "group A should have only node 0; got {:?}", - entry_a.ports + entry_a.subscribers ); // Group B should have only node 1 @@ -766,22 +778,24 @@ fn test_multiple_simultaneous_groups() -> Result<()> { .find(|e| e.underlay == underlay_b) .expect("missing subscription entry for group B"); assert!( - entry_b.ports.contains(&p1) && !entry_b.ports.contains(&p0), + entry_b.has_port(&p1) && !entry_b.has_port(&p0), "group B should have only node 1; got {:?}", - entry_b.ports + entry_b.subscribers ); // Set up forwarding for both groups (needed for Tx path) let vni = Vni::new(DEFAULT_MULTICAST_VNI)?; let fake_switch = topol.nodes[1].port.underlay_ip().into(); - mcast_a.set_forwarding(vec![( - NextHopV6::new(fake_switch, vni), - Replication::Underlay, - )])?; - mcast_b.set_forwarding(vec![( - NextHopV6::new(fake_switch, vni), - Replication::Underlay, - )])?; + mcast_a.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; + mcast_b.set_forwarding(vec![McastForwardingNextHop { + next_hop: NextHopV6::new(fake_switch, vni), + replication: Replication::Underlay, + source_filter: SourceFilter::default(), + }])?; // Start snoops on node B (we send from node 0, so we snoop on node 1) let dev_b = topol.nodes[1].port.name().to_string(); diff --git a/xde/src/dev_map.rs b/xde/src/dev_map.rs index 01d3727f..e9bfcccc 100644 --- a/xde/src/dev_map.rs +++ b/xde/src/dev_map.rs @@ -8,7 +8,6 @@ use crate::postbox::Postbox; use crate::xde::XdeDev; use alloc::collections::btree_map::BTreeMap; use alloc::collections::btree_map::Entry; -use alloc::collections::btree_set::BTreeSet; use alloc::string::String; use alloc::sync::Arc; use alloc::vec::Vec; @@ -18,6 +17,7 @@ use opte::api::OpteError; use opte::api::Vni; use opte::ddi::sync::KRwLock; use opte::ddi::sync::KRwLockReadGuard; +use oxide_vpc::api::SourceFilter; /// A map/set lookup key for ports indexed on `(Vni, MacAddr)`. /// @@ -63,13 +63,14 @@ pub struct DevMap { devs: BTreeMap, names: BTreeMap, /// Subscriptions keyed by underlay IPv6 multicast group (admin-scoped ff04::/16). + /// Each port has its own source filter for per-member filtering. /// This table is sled-local and independent of any per-VPC VNI. VNI validation /// and VPC isolation are enforced during inbound overlay decapsulation on the /// destination port, not here. /// /// Rationale: multicast groups are fleet-wide; ports opt-in to receive a given /// underlay group, and the overlay layer subsequently filters by VNI as appropriate. - mcast_groups: BTreeMap>, + mcast_groups: BTreeMap>, } impl Default for DevMap { @@ -102,23 +103,30 @@ impl DevMap { pub fn remove(&mut self, name: &str) -> Option { let key = get_key(&self.names.remove(name)?); - self.mcast_groups.retain(|_group, subscribers| { - subscribers.remove(&key); - !subscribers.is_empty() + self.mcast_groups.retain(|_group, members| { + members.remove(&key); + !members.is_empty() }); self.devs.remove(&key) } - /// Allow a port to receive on a given multicast group. + /// Allow a port to receive on a given multicast group with source filtering. /// /// This takes the underlay IPv6 multicast group address (ff04::/16). /// Callers at the ioctl boundary may pass an overlay group; the handler - /// translates overlay→underlay via the M2P table before calling here. + /// translates overlay->underlay via the M2P table before calling here. + /// + /// The source filter determines which packet sources this port accepts: + /// - `Exclude` with empty sources: accept any source (*, G) + /// - `Include` with sources: accept only listed sources + /// + /// Re-subscribing with a different filter updates the existing subscription. pub fn mcast_subscribe( &mut self, name: &str, mcast_underlay: MulticastUnderlay, + filter: SourceFilter, ) -> Result<(), OpteError> { let port = self .names @@ -126,7 +134,10 @@ impl DevMap { .ok_or_else(|| OpteError::PortNotFound(name.into()))?; let key = get_key(port); - self.mcast_groups.entry(mcast_underlay).or_default().insert(key); + self.mcast_groups + .entry(mcast_underlay) + .or_default() + .insert(key, filter); Ok(()) } @@ -143,8 +154,13 @@ impl DevMap { .ok_or_else(|| OpteError::PortNotFound(name.into()))?; let key = get_key(port); - if let Entry::Occupied(set) = self.mcast_groups.entry(mcast_underlay) { - set.into_mut().remove(&key); + if let Entry::Occupied(mut entry) = + self.mcast_groups.entry(mcast_underlay) + { + entry.get_mut().remove(&key); + if entry.get().is_empty() { + entry.remove(); + } } Ok(()) @@ -155,11 +171,11 @@ impl DevMap { self.mcast_groups.remove(&mcast_underlay); } - /// Find the keys for all ports who want to receive a given multicast packet. - pub fn mcast_listeners( + /// Find all subscribers for a given multicast group with their source filters. + pub fn mcast_subscribers( &self, mcast_underlay: &MulticastUnderlay, - ) -> Option> { + ) -> Option> { self.mcast_groups.get(mcast_underlay).map(|v| v.iter()) } @@ -217,18 +233,21 @@ impl DevMap { } } - /// Dump all multicast subscriptions as a vector of (group, ports) pairs. + /// Dump all multicast subscriptions as a vector of (group, subscribers) pairs. pub fn dump_mcast_subscriptions( &self, - ) -> Vec<(MulticastUnderlay, Vec)> { + ) -> Vec<(MulticastUnderlay, Vec<(String, SourceFilter)>)> { let mut out = Vec::new(); for (group, subs) in self.mcast_groups.iter() { - let ports: Vec = subs + let subscribers: Vec<(String, SourceFilter)> = subs .iter() - .filter_map(|vm| self.devs.get(vm)) - .map(|d| d.devname.clone()) + .filter_map(|(vm, filter)| { + self.devs + .get(vm) + .map(|d| (d.devname.clone(), filter.clone())) + }) .collect(); - out.push((*group, ports)); + out.push((*group, subscribers)); } out } diff --git a/xde/src/stats.rs b/xde/src/stats.rs index d738d7fc..a9ada8b5 100644 --- a/xde/src/stats.rs +++ b/xde/src/stats.rs @@ -89,6 +89,16 @@ pub struct XdeStats { /// The number of multicast Rx packets dropped because the inner destination /// IP address is not multicast (malformed packet). mcast_rx_bad_inner_dst: KStatU64, + /// The number of multicast Tx packets blocked by source filtering. + mcast_tx_source_filtered: KStatU64, + /// The number of multicast Rx packets blocked by source filtering. + mcast_rx_source_filtered: KStatU64, + /// The number of multicast Tx packets dropped because no inner IP header + /// was found (non-IP frames cannot be source-filtered). + mcast_tx_no_inner_ip: KStatU64, + /// The number of multicast Tx packets not forwarded to a next hop because + /// the aggregated source filter for that sled rejected the source. + mcast_tx_fwd_source_filtered: KStatU64, } impl XdeStats { @@ -136,6 +146,22 @@ impl XdeStats { &self.mcast_rx_bad_inner_dst } + pub fn mcast_tx_source_filtered(&self) -> &KStatU64 { + &self.mcast_tx_source_filtered + } + + pub fn mcast_rx_source_filtered(&self) -> &KStatU64 { + &self.mcast_rx_source_filtered + } + + pub fn mcast_tx_no_inner_ip(&self) -> &KStatU64 { + &self.mcast_tx_no_inner_ip + } + + pub fn mcast_tx_fwd_source_filtered(&self) -> &KStatU64 { + &self.mcast_tx_fwd_source_filtered + } + pub fn parse_error(&self, dir: Direction, err: &ParseError) { use Direction::*; (match (dir, err) { diff --git a/xde/src/xde.rs b/xde/src/xde.rs index 1c92974f..e07a6bad 100644 --- a/xde/src/xde.rs +++ b/xde/src/xde.rs @@ -284,7 +284,9 @@ use oxide_vpc::api::DumpVirt2BoundaryResp; use oxide_vpc::api::DumpVirt2PhysResp; use oxide_vpc::api::ListPortsResp; use oxide_vpc::api::McastForwardingEntry; +use oxide_vpc::api::McastForwardingNextHop; use oxide_vpc::api::McastSubscribeReq; +use oxide_vpc::api::McastSubscriberEntry; use oxide_vpc::api::McastSubscriptionEntry; use oxide_vpc::api::McastUnsubscribeAllReq; use oxide_vpc::api::McastUnsubscribeReq; @@ -299,6 +301,7 @@ use oxide_vpc::api::SetMcast2PhysReq; use oxide_vpc::api::SetMcastForwardingReq; use oxide_vpc::api::SetVirt2BoundaryReq; use oxide_vpc::api::SetVirt2PhysReq; +use oxide_vpc::api::SourceFilter; use oxide_vpc::cfg::IpCfg; use oxide_vpc::cfg::VpcCfg; use oxide_vpc::engine::VpcNetwork; @@ -315,9 +318,14 @@ use oxide_vpc::engine::router; const ETHERNET_MTU: u16 = 1500; // Type alias for multicast forwarding table: -// Maps IPv6 destination addresses to their next hop replication entries. -type McastForwardingTable = - BTreeMap>; +// Maps underlay multicast addresses to next hops with replication and source filters. +// The source filter is the aggregated filter for the destination sled (union of +// all subscriber filters on that sled). Packets are only forwarded if the +// aggregated filter allows the source. +type McastForwardingTable = BTreeMap< + MulticastUnderlay, + BTreeMap, +>; // Entry limits for the various flow tables. const FW_FT_LIMIT: NonZeroU32 = NonZeroU32::new(8096).unwrap(); @@ -424,10 +432,32 @@ unsafe extern "C" { // Multicast dataplane problem probes pub safe fn __dtrace_probe_mcast__tx__pullup__fail(len: uintptr_t); pub safe fn __dtrace_probe_mcast__rx__pullup__fail(len: uintptr_t); + /// Fires when a multicast Tx packet is dropped because no inner IP header + /// was found (non-IP frames cannot be source-filtered). + pub safe fn __dtrace_probe_mcast__tx__no__inner__ip(mblk_addr: uintptr_t); pub safe fn __dtrace_probe_mcast__no__fwd__entry( underlay: *const oxide_vpc::api::Ipv6Addr, vni: uintptr_t, ); + /// Fires when a multicast packet is blocked by source filtering. + pub safe fn __dtrace_probe_mcast__source__filtered( + af: uintptr_t, // AF_INET or AF_INET6 + inner_src: uintptr_t, // source IP that was blocked + inner_dst: uintptr_t, // multicast group + vni: uintptr_t, + port: uintptr_t, // port name that filtered the packet + filter_mode: uintptr_t, // 0 = Include, 1 = Exclude + ); + /// Fires when a multicast packet is not forwarded to a next hop because + /// the aggregated source filter for that sled rejected the source. + pub safe fn __dtrace_probe_mcast__fwd__source__filtered( + af: uintptr_t, // AF_INET or AF_INET6 + inner_src: uintptr_t, // source IP that was blocked + inner_dst: uintptr_t, // multicast group + vni: uintptr_t, + next_hop: uintptr_t, // next hop address that was skipped + filter_mode: uintptr_t, // 0 = Include, 1 = Exclude + ); } fn bad_packet_parse_probe( @@ -2135,6 +2165,30 @@ fn guest_loopback( } } +/// Extract AF constant and address pointer from a single address for DTrace probes. +#[inline] +fn dtrace_addr(addr: &oxide_vpc::api::IpAddr) -> (usize, uintptr_t) { + match addr { + oxide_vpc::api::IpAddr::Ip4(v4) => { + (AF_INET as usize, v4 as *const _ as uintptr_t) + } + oxide_vpc::api::IpAddr::Ip6(v6) => { + (AF_INET6 as usize, v6 as *const _ as uintptr_t) + } + } +} + +/// Extract AF constant and address pointers from a src/dst pair for DTrace probes. +#[inline] +fn dtrace_addrs( + src: &oxide_vpc::api::IpAddr, + dst: &oxide_vpc::api::IpAddr, +) -> (usize, uintptr_t, uintptr_t) { + let (af, src_ptr) = dtrace_addr(src); + let (_, dst_ptr) = dtrace_addr(dst); + (af, src_ptr, dst_ptr) +} + /// Locate the Oxide Multicast Geneve option and return the offset to its body. /// /// Walks through Geneve options starting at `geneve_offset + 8` to find the @@ -2206,6 +2260,7 @@ fn update_mcast_replication( } struct MulticastTxContext<'a> { + inner_src: oxide_vpc::api::IpAddr, // Inner/overlay source IP (for source filtering) inner_dst: oxide_vpc::api::IpAddr, // Inner/overlay destination IP (for subscriptions) underlay_dst: Ipv6Addr, // Outer/underlay destination IP (for forwarding lookup) vni: Vni, @@ -2218,6 +2273,7 @@ struct MulticastTxContext<'a> { } struct MulticastRxContext<'a> { + inner_src: oxide_vpc::api::IpAddr, // Inner/overlay source IP (for source filtering) inner_dst: oxide_vpc::api::IpAddr, // Inner/overlay destination IP (for subscriptions) underlay_dst: Ipv6Addr, // Outer/underlay destination IP (for forwarding lookup) vni: Vni, @@ -2246,14 +2302,7 @@ fn handle_mcast_tx<'a>( cpu_mcast_fwd: &'a McastForwardingTable, ) { // DTrace probe: multicast Tx entry - let (af, addr_ptr) = match &ctx.inner_dst { - oxide_vpc::api::IpAddr::Ip4(v4) => { - (AF_INET as usize, AsRef::<[u8]>::as_ref(v4).as_ptr() as uintptr_t) - } - oxide_vpc::api::IpAddr::Ip6(v6) => { - (AF_INET6 as usize, AsRef::<[u8]>::as_ref(v6).as_ptr() as uintptr_t) - } - }; + let (af, addr_ptr) = dtrace_addr(&ctx.inner_dst); __dtrace_probe_mcast__tx(af, addr_ptr, ctx.vni.as_u32() as uintptr_t); // Compute packet offsets once (used for both local delivery and next hop forwarding) @@ -2272,13 +2321,36 @@ fn handle_mcast_tx<'a>( oxide_vpc::api::Ipv6Addr::from(ctx.underlay_dst.bytes()); let group_key = MulticastUnderlay::new_unchecked(underlay_addr); - if let Some(listeners) = devs.mcast_listeners(&group_key) { + if let Some(subscribers) = devs.mcast_subscribers(&group_key) { let my_key = VniMac::new(ctx.vni, src_dev.port.mac_addr()); - for el in listeners { + for (key, filter) in subscribers { // Skip delivering to self - if my_key == *el { + if my_key == *key { + continue; + } + + let Some(dev) = devs.get_by_key(*key) else { + let xde = get_xde_state(); + xde.stats.vals.mcast_tx_stale_local().incr(1); + continue; + }; + + if !filter.allows(ctx.inner_src) { + let xde = get_xde_state(); + xde.stats.vals.mcast_tx_source_filtered().incr(1); + let (af, src_ptr, dst_ptr) = + dtrace_addrs(&ctx.inner_src, &ctx.inner_dst); + __dtrace_probe_mcast__source__filtered( + af, + src_ptr, + dst_ptr, + ctx.vni.as_u32() as uintptr_t, + dev.port.name_cstr().as_ptr() as uintptr_t, + filter.mode as uintptr_t, + ); continue; } + // Note: The inner destination MAC is already set to the multicast MAC by // OPTE's `EncapAction` transformation. No manual rewrite needed for Tx path. let Ok(my_pkt) = ctx.out_pkt.pullup(NonZeroUsize::new(pullup_len)) @@ -2293,34 +2365,17 @@ fn handle_mcast_tx<'a>( continue; }; - match devs.get_by_key(*el) { - Some(dev) => { - // DTrace probe: local delivery - let (af, addr_ptr) = match &ctx.inner_dst { - oxide_vpc::api::IpAddr::Ip4(v4) => ( - AF_INET as usize, - AsRef::<[u8]>::as_ref(v4).as_ptr() as uintptr_t, - ), - oxide_vpc::api::IpAddr::Ip6(v6) => ( - AF_INET6 as usize, - AsRef::<[u8]>::as_ref(v6).as_ptr() as uintptr_t, - ), - }; - __dtrace_probe_mcast__local__delivery( - af, - addr_ptr, - ctx.vni.as_u32() as uintptr_t, - dev.port.name_cstr().as_ptr() as uintptr_t, - ); - guest_loopback(src_dev, dev, *el, my_pkt, postbox); - let xde = get_xde_state(); - xde.stats.vals.mcast_tx_local().incr(1); - } - None => { - let xde = get_xde_state(); - xde.stats.vals.mcast_tx_stale_local().incr(1); - } - } + // DTrace probe: local delivery + let (af, addr_ptr) = dtrace_addr(&ctx.inner_dst); + __dtrace_probe_mcast__local__delivery( + af, + addr_ptr, + ctx.vni.as_u32() as uintptr_t, + dev.port.name_cstr().as_ptr() as uintptr_t, + ); + guest_loopback(src_dev, dev, *key, my_pkt, postbox); + let xde = get_xde_state(); + xde.stats.vals.mcast_tx_local().incr(1); } } @@ -2346,7 +2401,28 @@ fn handle_mcast_tx<'a>( if let Some(next_hops) = cpu_mcast_fwd.get(&underlay_key) { // We found forwarding entries, replicate to each next hop - for (next_hop, replication) in next_hops.iter() { + for (next_hop, (replication, source_filter)) in next_hops.iter() { + // Check aggregated source filter before forwarding. + // This filter is the union of all subscriber filters on the + // destination sled. If no subscriber would accept this source, + // skip forwarding to avoid sending packets across the network + // just to have them filtered at the destination. + if !source_filter.allows(ctx.inner_src) { + let xde = get_xde_state(); + xde.stats.vals.mcast_tx_fwd_source_filtered().incr(1); + let (af, src_ptr, dst_ptr) = + dtrace_addrs(&ctx.inner_src, &ctx.inner_dst); + __dtrace_probe_mcast__fwd__source__filtered( + af, + src_ptr, + dst_ptr, + ctx.vni.as_u32() as uintptr_t, + &next_hop.addr as *const _ as uintptr_t, + source_filter.mode as uintptr_t, + ); + continue; + } + // Clone packet with headers using pullup let Ok(mut fwd_pkt) = ctx.out_pkt.pullup(NonZeroUsize::new(pullup_len)) @@ -2497,14 +2573,7 @@ fn handle_mcast_rx( postbox: &mut Postbox, ) { // DTrace probe: multicast Rx entry - let (af, addr_ptr) = match &ctx.inner_dst { - oxide_vpc::api::IpAddr::Ip4(v4) => { - (AF_INET as usize, v4 as *const _ as uintptr_t) - } - oxide_vpc::api::IpAddr::Ip6(v6) => { - (AF_INET6 as usize, v6 as *const _ as uintptr_t) - } - }; + let (af, addr_ptr) = dtrace_addr(&ctx.inner_dst); __dtrace_probe_mcast__rx(af, addr_ptr, ctx.vni.as_u32() as uintptr_t); // Subscription is keyed by underlay (outer) IPv6 multicast address. @@ -2531,10 +2600,32 @@ fn handle_mcast_rx( return; }; - // Deliver to all local subscribers. VNI validation and VPC isolation - // are handled by OPTE's inbound overlay layer. - if let Some(ports) = devs.mcast_listeners(&group_key) { - for el in ports { + // Deliver to local subscribers that accept this source. + // VNI validation and VPC isolation are handled by OPTE's inbound overlay layer. + if let Some(subscribers) = devs.mcast_subscribers(&group_key) { + for (key, filter) in subscribers { + let Some(dev) = devs.get_by_key(*key) else { + let xde = get_xde_state(); + xde.stats.vals.mcast_rx_stale_local().incr(1); + continue; + }; + + if !filter.allows(ctx.inner_src) { + let xde = get_xde_state(); + xde.stats.vals.mcast_rx_source_filtered().incr(1); + let (af, src_ptr, dst_ptr) = + dtrace_addrs(&ctx.inner_src, &ctx.inner_dst); + __dtrace_probe_mcast__source__filtered( + af, + src_ptr, + dst_ptr, + ctx.vni.as_u32() as uintptr_t, + dev.port.name_cstr().as_ptr() as uintptr_t, + filter.mode as uintptr_t, + ); + continue; + } + let Ok(mut my_pkt) = ctx.pkt.pullup(NonZeroUsize::new(ctx.pullup_len)) else { @@ -2562,32 +2653,17 @@ fn handle_mcast_rx( my_pkt[ctx.inner_eth_off..][..ETHER_ADDR_LEN] .copy_from_slice(&expected_mac); - match devs.get_by_key(*el) { - Some(dev) => { - // DTrace probe: Rx local delivery - let (af, addr_ptr) = match &ctx.inner_dst { - oxide_vpc::api::IpAddr::Ip4(v4) => { - (AF_INET as usize, v4 as *const _ as uintptr_t) - } - oxide_vpc::api::IpAddr::Ip6(v6) => { - (AF_INET6 as usize, v6 as *const _ as uintptr_t) - } - }; - __dtrace_probe_mcast__local__delivery( - af, - addr_ptr, - ctx.vni.as_u32() as uintptr_t, - dev.port.name_cstr().as_ptr() as uintptr_t, - ); - xde_rx_one_direct(stream, dev, *el, my_pkt, postbox); - let xde = get_xde_state(); - xde.stats.vals.mcast_rx_local().incr(1); - } - None => { - let xde = get_xde_state(); - xde.stats.vals.mcast_rx_stale_local().incr(1); - } - } + // DTrace probe: Rx local delivery + let (af, addr_ptr) = dtrace_addr(&ctx.inner_dst); + __dtrace_probe_mcast__local__delivery( + af, + addr_ptr, + ctx.vni.as_u32() as uintptr_t, + dev.port.name_cstr().as_ptr() as uintptr_t, + ); + xde_rx_one_direct(stream, dev, *key, my_pkt, postbox); + let xde = get_xde_state(); + xde.stats.vals.mcast_rx_local().incr(1); } } else { // No subscription entry found for this multicast group @@ -2716,15 +2792,17 @@ fn xde_mc_tx_one<'a>( let meta = parsed_pkt.meta(); - // Extract inner destination IP for potential multicast processing - let inner_dst_ip = match &meta.inner_l3 { - Some(ValidL3::Ipv4(v4)) => { - Some(oxide_vpc::api::IpAddr::from(v4.destination())) - } - Some(ValidL3::Ipv6(v6)) => { - Some(oxide_vpc::api::IpAddr::from(v6.destination())) - } - None => None, + // Extract inner source and destination IPs for potential multicast processing + let (inner_src_ip, inner_dst_ip) = match &meta.inner_l3 { + Some(ValidL3::Ipv4(v4)) => ( + Some(oxide_vpc::api::IpAddr::from(v4.source())), + Some(oxide_vpc::api::IpAddr::from(v4.destination())), + ), + Some(ValidL3::Ipv6(v6)) => ( + Some(oxide_vpc::api::IpAddr::from(v6.source())), + Some(oxide_vpc::api::IpAddr::from(v6.destination())), + ), + None => (None, None), }; let Ok(non_eth_payl_bytes) = @@ -2850,7 +2928,21 @@ fn xde_mc_tx_one<'a>( // ff0x:: address (via M2P table). if ip6_dst.is_multicast() { // This is a multicast packet, so we determine the inner - // destination from the packet contents or use a fallback + // source and destination from the packet contents or use a fallback + let inner_src = match inner_src_ip { + Some(ip) => ip, + None => { + // No inner L3 header. Multicast source filtering requires + // an IP source address; drop non-IP frames. + opte::engine::dbg!( + "mcast Tx: no inner L3 for source filtering" + ); + let xde = get_xde_state(); + xde.stats.vals.mcast_tx_no_inner_ip().incr(1); + __dtrace_probe_mcast__tx__no__inner__ip(mblk_addr); + return; + } + }; let inner_dst = inner_dst_ip.unwrap_or_else(|| { // Fallback: derive from outer IPv6 multicast address // For IPv4 multicast mapped to IPv6, the last 4 bytes @@ -2878,6 +2970,7 @@ fn xde_mc_tx_one<'a>( mcast_fwd.get_or_insert_with(|| src_dev.mcast_fwd.read()); handle_mcast_tx( MulticastTxContext { + inner_src, inner_dst, underlay_dst: ip6_dst, vni, @@ -3339,7 +3432,7 @@ fn xde_rx_one( ); let vni = meta.outer_encap.vni(); - // Compute inner Ethernet offset and extract inner destination IP for multicast processing + // Compute inner Ethernet offset and extract inner src/dst IP for multicast processing let inner_eth_off = ( &meta.outer_eth, &meta.outer_v6, @@ -3347,9 +3440,15 @@ fn xde_rx_one( &meta.outer_encap, ) .packet_length(); - let inner_dst = match &meta.inner_l3 { - ValidL3::Ipv4(v4) => oxide_vpc::api::IpAddr::from(v4.destination()), - ValidL3::Ipv6(v6) => oxide_vpc::api::IpAddr::from(v6.destination()), + let (inner_src, inner_dst) = match &meta.inner_l3 { + ValidL3::Ipv4(v4) => ( + oxide_vpc::api::IpAddr::from(v4.source()), + oxide_vpc::api::IpAddr::from(v4.destination()), + ), + ValidL3::Ipv6(v6) => ( + oxide_vpc::api::IpAddr::from(v6.source()), + oxide_vpc::api::IpAddr::from(v6.destination()), + ), }; // Drop the parsed packet before calling handle_mcast_rx @@ -3359,6 +3458,7 @@ fn xde_rx_one( // (leaf node) handle_mcast_rx( MulticastRxContext { + inner_src, inner_dst, underlay_dst: ip6_dst, vni, @@ -3689,15 +3789,7 @@ fn set_m2p_hdlr(env: &mut IoctlEnvelope) -> Result { state.m2p.set(req.group, underlay); // DTrace: multicast map set - let (af, group_ptr): (usize, uintptr_t) = match req.group { - oxide_vpc::api::IpAddr::Ip4(v4) => { - (AF_INET as usize, AsRef::<[u8]>::as_ref(&v4).as_ptr() as uintptr_t) - } - oxide_vpc::api::IpAddr::Ip6(v6) => ( - AF_INET6 as usize, - AsRef::<[u8]>::as_ref(&v6).as_ptr() as uintptr_t, - ), - }; + let (af, group_ptr) = dtrace_addr(&req.group); __dtrace_probe_mcast__map__set( af as uintptr_t, group_ptr, @@ -3720,15 +3812,7 @@ fn clear_m2p_hdlr(env: &mut IoctlEnvelope) -> Result { state.m2p.remove(&req.group); // DTrace: multicast map clear - let (af, group_ptr): (usize, uintptr_t) = match req.group { - oxide_vpc::api::IpAddr::Ip4(v4) => { - (AF_INET as usize, AsRef::<[u8]>::as_ref(&v4).as_ptr() as uintptr_t) - } - oxide_vpc::api::IpAddr::Ip6(v6) => ( - AF_INET6 as usize, - AsRef::<[u8]>::as_ref(&v6).as_ptr() as uintptr_t, - ), - }; + let (af, group_ptr) = dtrace_addr(&req.group); __dtrace_probe_mcast__map__clear( af as uintptr_t, group_ptr, @@ -3773,25 +3857,25 @@ fn set_mcast_forwarding_hdlr( // Fleet-level multicast: enforce DEFAULT_MULTICAST_VNI for all replication modes. // NextHopV6.addr must be unicast (switch address for routing). // The packet will be sent to the multicast address (req.underlay). - for (next_hop, _rep) in &req.next_hops { - if next_hop.vni.as_u32() != DEFAULT_MULTICAST_VNI { + for entry in &req.next_hops { + if entry.next_hop.vni.as_u32() != DEFAULT_MULTICAST_VNI { return Err(OpteError::System { errno: EINVAL, msg: format!( "multicast next hop VNI must be DEFAULT_MULTICAST_VNI ({DEFAULT_MULTICAST_VNI}), got: {}", - next_hop.vni.as_u32() + entry.next_hop.vni.as_u32() ), }); } // NextHopV6.addr must be unicast (the switch endpoint for routing). // The actual packet destination is the multicast address (req.underlay). - if next_hop.addr.is_multicast() { + if entry.next_hop.addr.is_multicast() { return Err(OpteError::System { errno: EINVAL, msg: format!( "NextHopV6.addr must be unicast (switch address), got multicast: {}", - next_hop.addr + entry.next_hop.addr ), }); } @@ -3807,10 +3891,13 @@ fn set_mcast_forwarding_hdlr( // Get or create the next hop map for this underlay address let next_hop_map = mcast_fwd.entry(underlay).or_default(); - // Insert/update next hops: same next hop addr → replace replication mode, + // Insert/update next hops: same next hop addr → replace entry, // different next hop addr → add new entry (like `swadm route add`) - for (next_hop, rep) in req.next_hops { - next_hop_map.insert(next_hop, rep); + for entry in req.next_hops { + next_hop_map.insert( + entry.next_hop, + (entry.replication, entry.source_filter), + ); } } @@ -3888,7 +3975,11 @@ fn dump_mcast_forwarding_hdlr() -> Result { underlay: *underlay, next_hops: next_hops .iter() - .map(|(next_hop, rep)| (*next_hop, *rep)) + .map(|(next_hop, (rep, filter))| McastForwardingNextHop { + next_hop: *next_hop, + replication: *rep, + source_filter: filter.clone(), + }) .collect(), }) .collect(); @@ -3904,8 +3995,12 @@ fn dump_mcast_subscriptions_hdlr() let mut entries: alloc::vec::Vec = alloc::vec::Vec::new(); - for (underlay, ports) in devs.dump_mcast_subscriptions().into_iter() { - entries.push(McastSubscriptionEntry { underlay, ports }); + for (underlay, subs) in devs.dump_mcast_subscriptions().into_iter() { + let subscribers = subs + .into_iter() + .map(|(port, filter)| McastSubscriberEntry { port, filter }) + .collect(); + entries.push(McastSubscriptionEntry { underlay, subscribers }); } Ok(DumpMcastSubscriptionsResp { entries }) @@ -3929,6 +4024,25 @@ fn mcast_subscribe_hdlr(env: &mut IoctlEnvelope) -> Result { req.group ))); } + + // Validate source filter: sources must contain valid unicast addresses + for src in &req.filter.sources { + if src.is_multicast() { + return Err(OpteError::BadState(format!( + "source filter address {src} is multicast" + ))); + } + + if src.is_unspecified() + || src.is_loopback() + || src.is_broadcast() + || src.is_link_local() + { + return Err(OpteError::BadState(format!( + "source filter address {src} is a special-use address" + ))); + } + } let group_key = match req.group { oxide_vpc::api::IpAddr::Ip6(ip6) => { // If an overlay->underlay mapping exists, use it; otherwise, if the @@ -3960,7 +4074,7 @@ fn mcast_subscribe_hdlr(env: &mut IoctlEnvelope) -> Result { } }; - devs.mcast_subscribe(&req.port_name, group_key)?; + devs.mcast_subscribe(&req.port_name, group_key, req.filter.clone())?; // DTrace: subscribe let (af, group_ptr): (usize, uintptr_t) = (