diff --git a/pkg/network/tracer/testutil/tcp.go b/pkg/network/tracer/testutil/tcp.go index 417e407b1b2e..ca4241dc2e36 100644 --- a/pkg/network/tracer/testutil/tcp.go +++ b/pkg/network/tracer/testutil/tcp.go @@ -48,8 +48,8 @@ func (t *TCPServer) Addr() net.Addr { return t.ln.Addr() } -// Run starts the TCP server -func (t *TCPServer) Run() error { +// Listen sets up the socket with net.Listen +func (t *TCPServer) Listen() error { networkType := "tcp" if t.Network != "" { networkType = t.Network @@ -59,11 +59,16 @@ func (t *TCPServer) Run() error { return err } t.ln = ln + t.address = ln.Addr().String() + return nil +} +// StartAccepting starts up the server's Accept goroutine +func (t *TCPServer) StartAccepting() { go func() { for { - conn, err := ln.Accept() + conn, err := t.ln.Accept() if err != nil { return } @@ -74,7 +79,16 @@ func (t *TCPServer) Run() error { go t.onMessage(conn) } }() +} + +// Run starts the TCP server +func (t *TCPServer) Run() error { + err := t.Listen() + if err != nil { + return err + } + t.StartAccepting() return nil } @@ -101,7 +115,7 @@ func DialTCP(network, address string) (net.Conn, error) { func (t *TCPServer) Shutdown() { if t.ln != nil { _ = t.ln.Close() - t.ln = nil + // do not set t.ln = nil here, because otherwise t.ln.Accept() can panic later } } diff --git a/pkg/networkpath/traceroute/common/traceroute_parallel.go b/pkg/networkpath/traceroute/common/traceroute_parallel.go index 11a671dd8ee8..e377a8e4d0b1 100644 --- a/pkg/networkpath/traceroute/common/traceroute_parallel.go +++ b/pkg/networkpath/traceroute/common/traceroute_parallel.go @@ -7,10 +7,7 @@ package common import ( "context" - "errors" "fmt" - "net/netip" - "slices" "sync" "time" @@ -19,78 +16,9 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/log" ) -// ReceiveProbeNoPktError is returned when ReceiveProbe() didn't find anything new. -// This is normal if the RTT is long -type ReceiveProbeNoPktError struct { - Err error -} - -func (e *ReceiveProbeNoPktError) Error() string { - return fmt.Sprintf("ReceiveProbe() didn't find any new packets: %s", e.Err) -} -func (e *ReceiveProbeNoPktError) Unwrap() error { - return e.Err -} - -// BadPacketError is a non-fatal error that occurs when a packet is malformed. -type BadPacketError struct { - Err error -} - -func (e *BadPacketError) Error() string { - return fmt.Sprintf("Failed to parse packet: %s", e.Err) -} -func (e *BadPacketError) Unwrap() error { - return e.Err -} - -// ProbeResponse is the response of a single probe in a traceroute -type ProbeResponse struct { - // TTL is the Time To Live of the probe that was originally sent - TTL uint8 - // IP is the IP address of the responding host - IP netip.Addr - // RTT is the round-trip time of the probe - RTT time.Duration - // IsDest is true if the responding host is the destination - IsDest bool -} - -// TracerouteDriverInfo is metadata about a TracerouteDriver -type TracerouteDriverInfo struct { - SupportsParallel bool -} - -// TracerouteDriver is an implementation of traceroute send+receive of packets -type TracerouteDriver interface { - // GetDriverInfo returns metadata about this driver - GetDriverInfo() TracerouteDriverInfo - // SendProbe sends a traceroute packet with a specific TTL - SendProbe(ttl uint8) error - // ReceiveProbe polls to get a traceroute response with a timeout. - ReceiveProbe(timeout time.Duration) (*ProbeResponse, error) -} - // TracerouteParallelParams are the parameters for TracerouteParallel type TracerouteParallelParams struct { - // MinTTL is the TTL to start the traceroute at - MinTTL uint8 - // MaxTTL is the TTL to end the traceroute at - MaxTTL uint8 - // TracerouteTimeout is the maximum time to wait for a response - TracerouteTimeout time.Duration - // PollFrequency is how often to poll for a response - PollFrequency time.Duration - // SendDelay is the delay between sending probes (typically small) - SendDelay time.Duration -} - -// ProbeCount returns the number of probes that will be sent -func (p TracerouteParallelParams) ProbeCount() int { - if p.MinTTL > p.MaxTTL { - return 0 - } - return int(p.MaxTTL) - int(p.MinTTL) + 1 + TracerouteParams } // MaxTimeout combines the timeout+probe delays into a total timeout for the traceroute @@ -101,12 +29,10 @@ func (p TracerouteParallelParams) MaxTimeout() time.Duration { // TracerouteParallel runs a traceroute in parallel func TracerouteParallel(ctx context.Context, t TracerouteDriver, p TracerouteParallelParams) ([]*ProbeResponse, error) { - if p.MinTTL > p.MaxTTL { - return nil, fmt.Errorf("min TTL must be less than or equal to max TTL") - } - if p.MinTTL < 1 { - return nil, fmt.Errorf("min TTL must be at least 1") + if err := p.validate(); err != nil { + return nil, err } + info := t.GetDriverInfo() if !info.SupportsParallel { return nil, fmt.Errorf("tried to call TracerouteParallel on a TracerouteDriver that doesn't support parallel") @@ -144,15 +70,13 @@ func TracerouteParallel(ctx context.Context, t TracerouteDriver, p TraceroutePar g.Go(func() error { for i := int(p.MinTTL); i <= int(p.MaxTTL); i++ { // leave if we got cancelled - select { - case <-writerCtx.Done(): + if writerCtx.Err() != nil { return nil - default: } err := t.SendProbe(uint8(i)) if err != nil { - return err + return fmt.Errorf("SendProbe() failed: %w", err) } time.Sleep(p.SendDelay) @@ -163,28 +87,19 @@ func TracerouteParallel(ctx context.Context, t TracerouteDriver, p TraceroutePar g.Go(func() error { for { // leave if we got cancelled, SendProbe() failed, etc - select { - // doesn't use writerCtx because even if we writerCancel(), we want to keep reading - case <-groupCtx.Done(): + // doesn't use writerCtx because when we find the destination, we writerCancel(), and we want to keep reading + if groupCtx.Err() != nil { return nil - default: } probe, err := t.ReceiveProbe(p.PollFrequency) - if CheckParallelRetryable("ReceiveProbe", err) { + if CheckProbeRetryable("ReceiveProbe", err) { continue } else if err != nil { + return fmt.Errorf("ReceiveProbe() failed: %w", err) + } else if err = p.validateProbe(probe); err != nil { return err } - if probe == nil { - return fmt.Errorf("ReceiveProbe() returned nil without an error (this indicates a bug in the TracerouteDriver)") - } - if probe.TTL == 0 { - return fmt.Errorf("ReceiveProbe() got TTL 0 which is only allowed for TracerouteSerial (this indicates a bug in the TracerouteDriver)") - } - if probe.TTL < p.MinTTL || probe.TTL > p.MaxTTL { - return fmt.Errorf("ReceiveProbe() received an invalid TTL: expected TTL in [%d, %d], got %d", p.MinTTL, p.MaxTTL, probe.TTL) - } writeProbe(probe) // no need to send more probes if we found the destination @@ -205,48 +120,5 @@ func TracerouteParallel(ctx context.Context, t TracerouteDriver, p TraceroutePar return nil, ctx.Err() } - destIdx := slices.IndexFunc(results, func(pr *ProbeResponse) bool { - return pr != nil && pr.IsDest - }) - // trim off anything after the destination - if destIdx != -1 { - results = slices.Clip(results[:destIdx+1]) - } - - return results[p.MinTTL:], nil -} - -// ToHops converts a list of ProbeResponses to a Results -// TODO remove this, and use a single type to represent results -func ToHops(p TracerouteParallelParams, probes []*ProbeResponse) ([]*Hop, error) { - if p.MinTTL != 1 { - return nil, fmt.Errorf("ToHops: processResults() requires MinTTL == 1") - } - hops := make([]*Hop, len(probes)) - for i, probe := range probes { - hops[i] = &Hop{} - if probe != nil { - hops[i].IP = probe.IP.AsSlice() - hops[i].RTT = probe.RTT - hops[i].IsDest = probe.IsDest - } - } - return hops, nil -} - -var badPktLimit = log.NewLogLimit(10, 5*time.Minute) - -// CheckParallelRetryable returns whether ReceiveProbe failed due to a real error or just an irrelevant packet -func CheckParallelRetryable(funcName string, err error) bool { - noPktErr := &ReceiveProbeNoPktError{} - badPktErr := &BadPacketError{} - if errors.As(err, &noPktErr) { - return true - } else if errors.As(err, &badPktErr) { - if badPktLimit.ShouldLog() { - log.Warnf("%s() saw a malformed packet: %s", funcName, err) - } - return true - } - return false + return clipResults(p.MinTTL, results), nil } diff --git a/pkg/networkpath/traceroute/common/traceroute_parallel_test.go b/pkg/networkpath/traceroute/common/traceroute_parallel_test.go index bbae03b02c61..29bc63334472 100644 --- a/pkg/networkpath/traceroute/common/traceroute_parallel_test.go +++ b/pkg/networkpath/traceroute/common/traceroute_parallel_test.go @@ -19,70 +19,14 @@ import ( "github.com/stretchr/testify/require" ) -type MockDriver struct { - t *testing.T - params TracerouteParallelParams - - sentTTLs map[uint8]struct{} - - info TracerouteDriverInfo - sendHandler func(ttl uint8) error - receiveHandler func() (*ProbeResponse, error) -} - -func initMockDriver(t *testing.T, params TracerouteParallelParams) *MockDriver { - return &MockDriver{ - t: t, - params: params, - info: TracerouteDriverInfo{ - SupportsParallel: true, - }, - sentTTLs: make(map[uint8]struct{}), - sendHandler: nil, - receiveHandler: nil, - } -} - -func (m *MockDriver) GetDriverInfo() TracerouteDriverInfo { - return m.info -} - -func (m *MockDriver) SendProbe(ttl uint8) error { - require.NotContains(m.t, m.sentTTLs, ttl, "same TTL sent twice") - m.sentTTLs[ttl] = struct{}{} - - m.t.Logf("wrote %d\n", ttl) - if m.sendHandler == nil { - return nil - } - return m.sendHandler(ttl) -} - -func (m *MockDriver) ReceiveProbe(timeout time.Duration) (*ProbeResponse, error) { - require.Equal(m.t, m.params.PollFrequency, timeout) - - if m.receiveHandler == nil { - return noData(timeout) - } - res, err := m.receiveHandler() - var errNoPkt *ReceiveProbeNoPktError - if !errors.As(err, &errNoPkt) { - m.t.Logf("read %+v, %v\n", res, err) - } - return res, err -} - -func noData(pollFrequency time.Duration) (*ProbeResponse, error) { - time.Sleep(pollFrequency) - return nil, &ReceiveProbeNoPktError{Err: fmt.Errorf("testing, no data")} -} - -var testParams = TracerouteParallelParams{ - MinTTL: 1, - MaxTTL: 10, - TracerouteTimeout: 500 * time.Millisecond, - PollFrequency: 1 * time.Millisecond, - SendDelay: 1 * time.Millisecond, +var parallelParams = TracerouteParallelParams{ + TracerouteParams: TracerouteParams{ + MinTTL: 1, + MaxTTL: 10, + TracerouteTimeout: 500 * time.Millisecond, + PollFrequency: 1 * time.Millisecond, + SendDelay: 1 * time.Millisecond, + }, } const mockDestTTL = 5 @@ -102,11 +46,11 @@ func mockResult(ttl uint8) *ProbeResponse { func TestParallelTraceroute(t *testing.T) { // basic test that checks if the traceroute runs correctly - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() var expectedResults []*ProbeResponse - receiveProbes := make(chan *ProbeResponse, testParams.MaxTTL) + receiveProbes := make(chan *ProbeResponse, parallelParams.MaxTTL) expectedTTL := uint8(1) m.sendHandler = func(ttl uint8) error { @@ -128,12 +72,12 @@ func TestParallelTraceroute(t *testing.T) { m.receiveHandler = func() (*ProbeResponse, error) { probe, ok := <-receiveProbes if !ok { - return noData(testParams.PollFrequency) + return noData(parallelParams.PollFrequency) } return probe, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.NoError(t, err) require.Equal(t, expectedResults, results) require.Len(t, results, mockDestTTL) @@ -144,11 +88,11 @@ func testParallelTracerouteShuffled(t *testing.T, seed int64) { // and expects them to come back in the correct order r := rand.New(rand.NewSource(seed)) - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() var expectedResults []*ProbeResponse - receiveProbes := make(chan *ProbeResponse, testParams.MaxTTL) + receiveProbes := make(chan *ProbeResponse, parallelParams.MaxTTL) m.sendHandler = func(ttl uint8) error { result := mockResult(ttl) @@ -168,12 +112,12 @@ func testParallelTracerouteShuffled(t *testing.T, seed int64) { m.receiveHandler = func() (*ProbeResponse, error) { probe, ok := <-receiveProbes if !ok { - return noData(testParams.PollFrequency) + return noData(parallelParams.PollFrequency) } return probe, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.NoError(t, err) require.Equal(t, expectedResults, results) require.Len(t, results, mockDestTTL) @@ -190,7 +134,7 @@ var errMock = errors.New("mock error") func TestParallelTracerouteSendErr(t *testing.T) { // this test checks that TracerouteParallel returns an error if SendProbe() fails - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() hasCalled := false @@ -201,14 +145,14 @@ func TestParallelTracerouteSendErr(t *testing.T) { return errMock } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.Nil(t, results) require.ErrorIs(t, err, errMock) } func TestParallelTracerouteReceiveErr(t *testing.T) { // this test checks that TracerouteParallel returns an error if ReceiveProbe() fails - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() hasCalled := false @@ -219,7 +163,7 @@ func TestParallelTracerouteReceiveErr(t *testing.T) { return nil, errMock } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.Nil(t, results) require.ErrorIs(t, err, errMock) } @@ -227,21 +171,21 @@ func TestParallelTracerouteReceiveErr(t *testing.T) { func TestParallelTracerouteTimeout(t *testing.T) { // this test checks that TracerouteParallel times out when it is waiting for // a response during the traceroute - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() totalCalls := 0 m.receiveHandler = func() (*ProbeResponse, error) { totalCalls++ - return noData(testParams.PollFrequency) + return noData(parallelParams.PollFrequency) } start := time.Now() - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.NoError(t, err) // divide by 2 to give margin for error - require.Greater(t, time.Since(start), testParams.TracerouteTimeout/2) + require.Greater(t, time.Since(start), parallelParams.TracerouteTimeout/2) // make sure it kept polling repeatedly require.Greater(t, totalCalls, 5) for _, res := range results { @@ -252,14 +196,14 @@ func TestParallelTracerouteTimeout(t *testing.T) { func TestParallelTracerouteMinTTL(t *testing.T) { // same as TestParallelTraceroute but it checks that we don't send TTL=1 when MinTTL=2 - // make a copy of testParams - testParams := testParams - testParams.MinTTL = 2 - m := initMockDriver(t, testParams) + // make a copy of parallelParams + parallelParams := parallelParams + parallelParams.MinTTL = 2 + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() var expectedResults []*ProbeResponse - receiveProbes := make(chan *ProbeResponse, testParams.MaxTTL) + receiveProbes := make(chan *ProbeResponse, parallelParams.MaxTTL) // expectedTTL starts at 2 in this test expectedTTL := uint8(2) @@ -282,12 +226,12 @@ func TestParallelTracerouteMinTTL(t *testing.T) { m.receiveHandler = func() (*ProbeResponse, error) { probe, ok := <-receiveProbes if !ok { - return noData(testParams.PollFrequency) + return noData(parallelParams.PollFrequency) } return probe, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.NoError(t, err) require.Equal(t, expectedResults, results) require.Len(t, results, mockDestTTL-1) @@ -295,14 +239,14 @@ func TestParallelTracerouteMinTTL(t *testing.T) { func TestParallelTracerouteReportsExternalCancellation(t *testing.T) { // this test checks that TracerouteParallel forwards a cancellation from the context - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() ctx, cancel := context.WithCancelCause(context.Background()) // cancel it right away cancel(errMock) - results, err := TracerouteParallel(ctx, m, testParams) + results, err := TracerouteParallel(ctx, m, parallelParams) require.Nil(t, results) require.ErrorIs(t, err, context.Canceled) require.ErrorIs(t, context.Cause(ctx), errMock) @@ -310,11 +254,11 @@ func TestParallelTracerouteReportsExternalCancellation(t *testing.T) { func TestParallelTracerouteMissingHop(t *testing.T) { // this test simulates a missing hop at TTL=3 - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() var expectedResults []*ProbeResponse - receiveProbes := make(chan *ProbeResponse, testParams.MaxTTL) + receiveProbes := make(chan *ProbeResponse, parallelParams.MaxTTL) m.sendHandler = func(ttl uint8) error { result := mockResult(ttl) @@ -339,12 +283,12 @@ func TestParallelTracerouteMissingHop(t *testing.T) { m.receiveHandler = func() (*ProbeResponse, error) { probe, ok := <-receiveProbes if !ok { - return noData(testParams.PollFrequency) + return noData(parallelParams.PollFrequency) } return probe, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.NoError(t, err) require.Equal(t, expectedResults, results) require.Len(t, results, mockDestTTL) @@ -352,11 +296,11 @@ func TestParallelTracerouteMissingHop(t *testing.T) { func TestParallelTracerouteMissingDest(t *testing.T) { // this test simulates not getting the destination back - it should keep sending probes until MaxTTL - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() var expectedResults []*ProbeResponse - receiveProbes := make(chan *ProbeResponse, testParams.MaxTTL) + receiveProbes := make(chan *ProbeResponse, parallelParams.MaxTTL) m.sendHandler = func(ttl uint8) error { result := mockResult(ttl) @@ -373,7 +317,7 @@ func TestParallelTracerouteMissingDest(t *testing.T) { } } - if ttl == testParams.MaxTTL { + if ttl == parallelParams.MaxTTL { close(receiveProbes) } @@ -382,14 +326,14 @@ func TestParallelTracerouteMissingDest(t *testing.T) { m.receiveHandler = func() (*ProbeResponse, error) { probe, ok := <-receiveProbes if !ok { - return noData(testParams.PollFrequency) + return noData(parallelParams.PollFrequency) } return probe, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.NoError(t, err) - require.Len(t, results, int(testParams.MaxTTL)) + require.Len(t, results, int(parallelParams.MaxTTL)) for i, r := range results { // up to but excluding the destination TTL, we should have results if i < len(expectedResults) { @@ -404,7 +348,7 @@ func TestParallelTracerouteMissingDest(t *testing.T) { func TestParallelTracerouteProbeSanityCheck(t *testing.T) { // this probe checks that TracerouteParallel yells at you when it reads // a an invalid TTL - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() hasReceived := false @@ -417,14 +361,14 @@ func TestParallelTracerouteProbeSanityCheck(t *testing.T) { return result, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.Nil(t, results) require.ErrorContains(t, err, "received an invalid TTL") } func TestParallelTracerouteProbeReturnValueCheck(t *testing.T) { // this probe checks that TracerouteParallel yells at you when you return nothing at all - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() hasReceived := false @@ -434,7 +378,7 @@ func TestParallelTracerouteProbeReturnValueCheck(t *testing.T) { return nil, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.Nil(t, results) require.ErrorContains(t, err, "ReceiveProbe() returned nil without an error") } @@ -442,11 +386,11 @@ func TestParallelTracerouteProbeReturnValueCheck(t *testing.T) { func TestParallelTracerouteDoubleReceive(t *testing.T) { // same as TestParallelTraceroute but receives the probes a second time, with a larger RTT. // it should not overwrite the RTT - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() var expectedResults []*ProbeResponse - receiveProbes := make(chan *ProbeResponse, 2*testParams.MaxTTL) + receiveProbes := make(chan *ProbeResponse, 2*parallelParams.MaxTTL) expectedTTL := uint8(1) m.sendHandler = func(ttl uint8) error { @@ -475,33 +419,33 @@ func TestParallelTracerouteDoubleReceive(t *testing.T) { m.receiveHandler = func() (*ProbeResponse, error) { probe, ok := <-receiveProbes if !ok { - return noData(testParams.PollFrequency) + return noData(parallelParams.PollFrequency) } return probe, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.NoError(t, err) require.Equal(t, expectedResults, results) require.Len(t, results, mockDestTTL) } -func TestCheckParallelRetryable(t *testing.T) { - require.True(t, CheckParallelRetryable("test", &ReceiveProbeNoPktError{fmt.Errorf("foo")})) - require.True(t, CheckParallelRetryable("test", &BadPacketError{fmt.Errorf("foo")})) +func TestCheckProbeRetryable(t *testing.T) { + require.True(t, CheckProbeRetryable("test", &ReceiveProbeNoPktError{fmt.Errorf("foo")})) + require.True(t, CheckProbeRetryable("test", &BadPacketError{fmt.Errorf("foo")})) - require.False(t, CheckParallelRetryable("test", fmt.Errorf("foo"))) - require.False(t, CheckParallelRetryable("test", nil)) + require.False(t, CheckProbeRetryable("test", fmt.Errorf("foo"))) + require.False(t, CheckProbeRetryable("test", nil)) } func TestParallelTracerouteDestOverwrite(t *testing.T) { // this test checks that shouldUpdate is set to true when an IsDest == true probe comes // for the first time, even overwriting an ICMP probe with IsDest == false - m := initMockDriver(t, testParams) + m := initMockDriver(t, parallelParams.TracerouteParams, parallelInfo) t.Parallel() var expectedResults []*ProbeResponse - receiveProbes := make(chan *ProbeResponse, 2*testParams.MaxTTL) + receiveProbes := make(chan *ProbeResponse, 2*parallelParams.MaxTTL) expectedTTL := uint8(1) m.sendHandler = func(ttl uint8) error { @@ -531,13 +475,23 @@ func TestParallelTracerouteDestOverwrite(t *testing.T) { m.receiveHandler = func() (*ProbeResponse, error) { probe, ok := <-receiveProbes if !ok { - return noData(testParams.PollFrequency) + return noData(parallelParams.PollFrequency) } return probe, nil } - results, err := TracerouteParallel(context.Background(), m, testParams) + results, err := TracerouteParallel(context.Background(), m, parallelParams) require.NoError(t, err) require.Equal(t, expectedResults, results) require.Len(t, results, mockDestTTL) } + +func TestParallelSupport(t *testing.T) { + m := initMockDriver(t, parallelParams.TracerouteParams, TracerouteDriverInfo{ + SupportsParallel: false, + }) + t.Parallel() + + _, err := TracerouteParallel(context.Background(), m, parallelParams) + require.ErrorContains(t, err, "doesn't support parallel") +} diff --git a/pkg/networkpath/traceroute/common/traceroute_serial.go b/pkg/networkpath/traceroute/common/traceroute_serial.go new file mode 100644 index 000000000000..ed9177e06cdb --- /dev/null +++ b/pkg/networkpath/traceroute/common/traceroute_serial.go @@ -0,0 +1,87 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package common + +import ( + "context" + "fmt" + "time" + + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +// TracerouteSerialParams are the parameters for TracerouteSerial +type TracerouteSerialParams struct { + TracerouteParams +} + +// MaxTimeout combines the timeout+probe delays into a total timeout for the traceroute +func (p TracerouteSerialParams) MaxTimeout() time.Duration { + delaySum := (p.TracerouteTimeout) * time.Duration(p.ProbeCount()) + // add 500ms so that if all hops are missing, it doesn't race with the traceroute itself + return delaySum + 500*time.Millisecond +} + +// TracerouteSerial runs a traceroute in serial. Sometimes this is necessary over TracerouteParallel +// because the driver doesn't support parallel. +func TracerouteSerial(ctx context.Context, t TracerouteDriver, p TracerouteSerialParams) ([]*ProbeResponse, error) { + if err := p.validate(); err != nil { + return nil, err + } + globalTimeoutCtx, cancel := context.WithTimeout(ctx, p.MaxTimeout()) + defer cancel() + + results := make([]*ProbeResponse, int(p.MaxTTL)+1) + for i := int(p.MinTTL); i <= int(p.MaxTTL); i++ { + if ctx.Err() != nil { + break + } + sendDelay := time.After(p.SendDelay) + + timeoutCtx, cancel := context.WithTimeout(globalTimeoutCtx, p.TracerouteTimeout) + defer cancel() + + err := t.SendProbe(uint8(i)) + if err != nil { + return nil, fmt.Errorf("SendProbe() failed: %w", err) + } + + var probe *ProbeResponse + for probe == nil { + if timeoutCtx.Err() != nil { + break + } + + probe, err = t.ReceiveProbe(p.PollFrequency) + if CheckProbeRetryable("ReceiveProbe", err) { + continue + } else if err != nil { + return nil, fmt.Errorf("ReceiveProbe() failed: %w", err) + } else if err := p.validateProbe(probe); err != nil { + return nil, err + } + } + + // if we found the destination, no need to keep going + if probe != nil { + log.Tracef("found probe %+v", probe) + results[probe.TTL] = probe + if probe.IsDest { + break + } + } + + // wait for at least SendDelay to pass + <-sendDelay + } + + // if we got externally cancelled, report that + if ctx.Err() != nil { + return nil, ctx.Err() + } + + return clipResults(p.MinTTL, results), nil +} diff --git a/pkg/networkpath/traceroute/common/traceroute_serial_test.go b/pkg/networkpath/traceroute/common/traceroute_serial_test.go new file mode 100644 index 000000000000..f414351eefd6 --- /dev/null +++ b/pkg/networkpath/traceroute/common/traceroute_serial_test.go @@ -0,0 +1,255 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package common + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +var serialParams = TracerouteSerialParams{ + TracerouteParams{ + MinTTL: 1, + MaxTTL: 10, + TracerouteTimeout: 100 * time.Millisecond, + PollFrequency: 1 * time.Millisecond, + SendDelay: 1 * time.Millisecond, + }, +} +var serialInfo = TracerouteDriverInfo{ + SupportsParallel: false, +} + +func TestTracerouteSerial(t *testing.T) { + m := initMockDriver(t, serialParams.TracerouteParams, serialInfo) + t.Parallel() + + var expectedResults []*ProbeResponse + receiveProbes := make(chan *ProbeResponse, 1) + + sendTTL := uint8(0) + m.sendHandler = func(ttl uint8) error { + sendTTL++ + require.Equal(t, sendTTL, ttl) + + result := mockResult(sendTTL) + if result != nil { + expectedResults = append(expectedResults, result) + select { + case receiveProbes <- result: + default: + // we expect this channel to never fill + t.Fatalf("TTL %d never got read by receiveHandler", sendTTL-1) + } + } + + return nil + } + m.receiveHandler = func() (*ProbeResponse, error) { + var probe *ProbeResponse + select { + case probe = <-receiveProbes: + default: + } + if probe == nil { + return noData(serialParams.PollFrequency) + } + return probe, nil + } + results, err := TracerouteSerial(context.Background(), m, serialParams) + require.NoError(t, err) + require.Equal(t, expectedResults, results) + require.Len(t, results, mockDestTTL) +} + +func TestTracerouteSerialMissingHop(t *testing.T) { + m := initMockDriver(t, serialParams.TracerouteParams, serialInfo) + t.Parallel() + + var expectedResults []*ProbeResponse + receiveProbes := make(chan *ProbeResponse, 1) + + sendTTL := uint8(0) + m.sendHandler = func(ttl uint8) error { + sendTTL++ + require.Equal(t, sendTTL, ttl) + + result := mockResult(sendTTL) + if result != nil { + // fake a missing hop + if ttl == 2 { + result = nil + } + expectedResults = append(expectedResults, result) + select { + case receiveProbes <- result: + default: + // we expect this channel to never fill + t.Fatalf("TTL %d never got read by receiveHandler", sendTTL-1) + } + } + + return nil + } + m.receiveHandler = func() (*ProbeResponse, error) { + var probe *ProbeResponse + select { + case probe = <-receiveProbes: + default: + } + if probe == nil { + return noData(serialParams.PollFrequency) + } + return probe, nil + } + results, err := TracerouteSerial(context.Background(), m, serialParams) + require.NoError(t, err) + require.Equal(t, expectedResults, results) + require.Len(t, results, mockDestTTL) +} + +func TestTracerouteSerialWrongHop(t *testing.T) { + // this test checks that TracerouteSerial correctly handles a probe that returns the wrong hop + m := initMockDriver(t, serialParams.TracerouteParams, serialInfo) + t.Parallel() + + var expectedResults []*ProbeResponse + receiveProbes := make(chan *ProbeResponse, 1) + + sendTTL := uint8(0) + m.sendHandler = func(ttl uint8) error { + sendTTL++ + require.Equal(t, sendTTL, ttl) + + result := mockResult(sendTTL) + if result != nil { + resultToRead := result + // send the old hop twice + if ttl == 2 { + result = nil + resultToRead = mockResult(sendTTL - 1) + } + expectedResults = append(expectedResults, result) + select { + case receiveProbes <- resultToRead: + default: + // we expect this channel to never fill + t.Fatalf("TTL %d never got read by receiveHandler", sendTTL-1) + } + } + + return nil + } + m.receiveHandler = func() (*ProbeResponse, error) { + var probe *ProbeResponse + select { + case probe = <-receiveProbes: + default: + } + if probe == nil { + return noData(serialParams.PollFrequency) + } + return probe, nil + } + results, err := TracerouteSerial(context.Background(), m, serialParams) + require.NoError(t, err) + require.Equal(t, expectedResults, results) + require.Len(t, results, mockDestTTL) +} + +func TestTracerouteSerialMissingDest(t *testing.T) { + m := initMockDriver(t, serialParams.TracerouteParams, serialInfo) + t.Parallel() + + var expectedResults []*ProbeResponse + receiveProbes := make(chan *ProbeResponse, 1) + + sendTTL := uint8(0) + m.sendHandler = func(ttl uint8) error { + sendTTL++ + require.Equal(t, sendTTL, ttl) + + result := mockResult(sendTTL) + if result != nil { + // fake a missing destination + if result.IsDest { + result = nil + } + expectedResults = append(expectedResults, result) + select { + case receiveProbes <- result: + default: + // we expect this channel to never fill + t.Fatalf("TTL %d never got read by receiveHandler", sendTTL-1) + } + } + + return nil + } + m.receiveHandler = func() (*ProbeResponse, error) { + var probe *ProbeResponse + select { + case probe = <-receiveProbes: + default: + } + if probe == nil { + return noData(serialParams.PollFrequency) + } + return probe, nil + } + results, err := TracerouteSerial(context.Background(), m, serialParams) + require.NoError(t, err) + + require.Len(t, results, int(parallelParams.MaxTTL)) + for i, r := range results { + // up to but excluding the destination TTL, we should have results + if i < len(expectedResults) { + require.Equal(t, expectedResults[i], r, "mismatch at index %d", i) + } else { + // after that, it should all be zero up to MaxTTL + require.Zero(t, r, "expected zero at index %d", i) + } + } +} + +func TestTracerouteSerialSendErr(t *testing.T) { + // this test checks that TracerouteSerial returns an error if SendProbe() fails + m := initMockDriver(t, serialParams.TracerouteParams, serialInfo) + t.Parallel() + + hasCalled := false + m.sendHandler = func(_ uint8) error { + require.False(t, hasCalled, "SendProbe() called more than once") + hasCalled = true + + return errMock + } + + results, err := TracerouteSerial(context.Background(), m, serialParams) + require.Nil(t, results) + require.ErrorIs(t, err, errMock) +} + +func TestTracerouteSerialReceiveErr(t *testing.T) { + // this test checks that TracerouteSerial returns an error if ReceiveProbe() fails + m := initMockDriver(t, serialParams.TracerouteParams, serialInfo) + t.Parallel() + + hasCalled := false + m.receiveHandler = func() (*ProbeResponse, error) { + require.False(t, hasCalled, "ReceiveProbe() called more than once") + hasCalled = true + + return nil, errMock + } + + results, err := TracerouteSerial(context.Background(), m, serialParams) + require.Nil(t, results) + require.ErrorIs(t, err, errMock) +} diff --git a/pkg/networkpath/traceroute/common/traceroute_types.go b/pkg/networkpath/traceroute/common/traceroute_types.go new file mode 100644 index 000000000000..ea08ee3cdeca --- /dev/null +++ b/pkg/networkpath/traceroute/common/traceroute_types.go @@ -0,0 +1,158 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package common + +import ( + "errors" + "fmt" + "net/netip" + "slices" + "time" + + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +// ReceiveProbeNoPktError is returned when ReceiveProbe() didn't find anything new. +// This is normal if the RTT is long +type ReceiveProbeNoPktError struct { + Err error +} + +func (e *ReceiveProbeNoPktError) Error() string { + return fmt.Sprintf("ReceiveProbe() didn't find any new packets: %s", e.Err) +} +func (e *ReceiveProbeNoPktError) Unwrap() error { + return e.Err +} + +// BadPacketError is a non-fatal error that occurs when a packet is malformed. +type BadPacketError struct { + Err error +} + +func (e *BadPacketError) Error() string { + return fmt.Sprintf("Failed to parse packet: %s", e.Err) +} +func (e *BadPacketError) Unwrap() error { + return e.Err +} + +// ProbeResponse is the response of a single probe in a traceroute +type ProbeResponse struct { + // TTL is the Time To Live of the probe that was originally sent + TTL uint8 + // IP is the IP address of the responding host + IP netip.Addr + // RTT is the round-trip time of the probe + RTT time.Duration + // IsDest is true if the responding host is the destination + IsDest bool +} + +// TracerouteDriverInfo is metadata about a TracerouteDriver +type TracerouteDriverInfo struct { + SupportsParallel bool +} + +// TracerouteDriver is an implementation of traceroute send+receive of packets +type TracerouteDriver interface { + // GetDriverInfo returns metadata about this driver + GetDriverInfo() TracerouteDriverInfo + // SendProbe sends a traceroute packet with a specific TTL + SendProbe(ttl uint8) error + // ReceiveProbe polls to get a traceroute response with a timeout. + ReceiveProbe(timeout time.Duration) (*ProbeResponse, error) +} + +// TracerouteParams are the parameters for a traceroute shared between serial and parallel +type TracerouteParams struct { + // MinTTL is the TTL to start the traceroute at + MinTTL uint8 + // MaxTTL is the TTL to end the traceroute at + MaxTTL uint8 + // TracerouteTimeout is the maximum time to wait for a response + TracerouteTimeout time.Duration + // PollFrequency is how often to poll for a response + PollFrequency time.Duration + // SendDelay is the delay between sending probes (typically small) + SendDelay time.Duration +} + +func (p TracerouteParams) validate() error { + if p.MinTTL > p.MaxTTL { + return fmt.Errorf("min TTL must be less than or equal to max TTL") + } + if p.MinTTL < 1 { + return fmt.Errorf("min TTL must be at least 1") + } + return nil +} + +func (p TracerouteParams) validateProbe(probe *ProbeResponse) error { + if probe == nil { + return fmt.Errorf("ReceiveProbe() returned nil without an error (this indicates a bug in the TracerouteDriver)") + } + if probe.TTL < p.MinTTL || probe.TTL > p.MaxTTL { + return fmt.Errorf("ReceiveProbe() received an invalid TTL: expected TTL in [%d, %d], got %d", p.MinTTL, p.MaxTTL, probe.TTL) + } + return nil +} + +// ProbeCount returns the number of probes that will be sent +func (p TracerouteParams) ProbeCount() int { + if p.MinTTL > p.MaxTTL { + return 0 + } + return int(p.MaxTTL) - int(p.MinTTL) + 1 +} + +// clipResults removes probes before the minTTL and after the destination +func clipResults(minTTL uint8, results []*ProbeResponse) []*ProbeResponse { + destIdx := slices.IndexFunc(results, func(pr *ProbeResponse) bool { + return pr != nil && pr.IsDest + }) + // trim off anything after the destination + if destIdx != -1 { + results = slices.Clip(results[:destIdx+1]) + } + + return results[minTTL:] +} + +// ToHops converts a list of ProbeResponses to a Results +// TODO remove this, and use a single type to represent results +func ToHops(p TracerouteParams, probes []*ProbeResponse) ([]*Hop, error) { + if p.MinTTL != 1 { + return nil, fmt.Errorf("ToHops: processResults() requires MinTTL == 1") + } + hops := make([]*Hop, len(probes)) + for i, probe := range probes { + hops[i] = &Hop{} + if probe != nil { + hops[i].IP = probe.IP.AsSlice() + hops[i].RTT = probe.RTT + hops[i].IsDest = probe.IsDest + } + } + return hops, nil +} + +var badPktLimit = log.NewLogLimit(10, 5*time.Minute) + +// CheckProbeRetryable returns whether ReceiveProbe failed due to a real error or just an irrelevant packet +func CheckProbeRetryable(funcName string, err error) bool { + noPktErr := &ReceiveProbeNoPktError{} + badPktErr := &BadPacketError{} + if errors.As(err, &noPktErr) { + return true + } else if errors.As(err, &badPktErr) { + if badPktLimit.ShouldLog() { + log.Warnf("%s() saw a malformed packet: %s", funcName, err) + } + return true + } + return false +} diff --git a/pkg/networkpath/traceroute/common/traceroute_types_test.go b/pkg/networkpath/traceroute/common/traceroute_types_test.go new file mode 100644 index 000000000000..3a8350ed4162 --- /dev/null +++ b/pkg/networkpath/traceroute/common/traceroute_types_test.go @@ -0,0 +1,114 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package common + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type MockDriver struct { + t *testing.T + params TracerouteParams + + sentTTLs map[uint8]struct{} + + info TracerouteDriverInfo + sendHandler func(ttl uint8) error + receiveHandler func() (*ProbeResponse, error) +} + +var parallelInfo = TracerouteDriverInfo{ + SupportsParallel: true, +} + +func initMockDriver(t *testing.T, params TracerouteParams, info TracerouteDriverInfo) *MockDriver { + return &MockDriver{ + t: t, + params: params, + info: info, + sentTTLs: make(map[uint8]struct{}), + sendHandler: nil, + receiveHandler: nil, + } +} + +func (m *MockDriver) GetDriverInfo() TracerouteDriverInfo { + return m.info +} + +func (m *MockDriver) SendProbe(ttl uint8) error { + require.NotContains(m.t, m.sentTTLs, ttl, "same TTL sent twice") + m.sentTTLs[ttl] = struct{}{} + + m.t.Logf("wrote %d\n", ttl) + if m.sendHandler == nil { + return nil + } + return m.sendHandler(ttl) +} + +func (m *MockDriver) ReceiveProbe(timeout time.Duration) (*ProbeResponse, error) { + require.Equal(m.t, m.params.PollFrequency, timeout) + + if m.receiveHandler == nil { + return noData(timeout) + } + res, err := m.receiveHandler() + var errNoPkt *ReceiveProbeNoPktError + if !errors.As(err, &errNoPkt) { + m.t.Logf("read %+v, %v\n", res, err) + } + return res, err +} + +func noData(pollFrequency time.Duration) (*ProbeResponse, error) { + time.Sleep(pollFrequency) + return nil, &ReceiveProbeNoPktError{Err: fmt.Errorf("testing, no data")} +} + +func TestClipResultsDest(t *testing.T) { + results := []*ProbeResponse{ + nil, + {TTL: 1, IsDest: false}, + {TTL: 2, IsDest: false}, + {TTL: 3, IsDest: true}, + nil, + } + + clipped := clipResults(1, results) + require.Equal(t, results[1:4], clipped) +} + +func TestClipResultsNoDest(t *testing.T) { + results := []*ProbeResponse{ + nil, + {TTL: 1, IsDest: false}, + {TTL: 2, IsDest: false}, + {TTL: 3, IsDest: false}, + nil, + } + + clipped := clipResults(1, results) + require.Equal(t, results[1:], clipped) +} + +func TestClipResultsMinTTL(t *testing.T) { + results := []*ProbeResponse{ + nil, + nil, + {TTL: 2, IsDest: false}, + {TTL: 3, IsDest: false}, + nil, + } + + clipped := clipResults(2, results) + require.Equal(t, results[2:], clipped) +} diff --git a/pkg/networkpath/traceroute/filter/tcp.go b/pkg/networkpath/traceroute/filter/tcp.go deleted file mode 100644 index 90695c8154fe..000000000000 --- a/pkg/networkpath/traceroute/filter/tcp.go +++ /dev/null @@ -1,74 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2025-present Datadog, Inc. - -// Package filter has BPF filters for sockets useful for traceroutes -package filter - -import ( - "encoding/binary" - "fmt" - "net/netip" - - "golang.org/x/net/bpf" -) - -// TCP4FilterConfig is the config for GenerateTCP4Filter -type TCP4FilterConfig struct { - Src netip.AddrPort - Dst netip.AddrPort -} - -const ethHeaderSize = 14 - -// GenerateTCP4Filter creates a classic BPF filter for TCP SOCK_RAW sockets. -// It will only allow packets whose tuple matches the given config. -func (c TCP4FilterConfig) GenerateTCP4Filter() ([]bpf.RawInstruction, error) { - if !c.Src.Addr().Is4() || !c.Dst.Addr().Is4() { - return nil, fmt.Errorf("GenerateTCP4Filter2: src=%s and dst=%s must be IPv4", c.Src.Addr(), c.Dst.Addr()) - } - srcAddr := binary.BigEndian.Uint32(c.Src.Addr().AsSlice()) - dstAddr := binary.BigEndian.Uint32(c.Dst.Addr().AsSlice()) - srcPort := uint32(c.Src.Port()) - dstPort := uint32(c.Dst.Port()) - - // Process to derive the following program: - // 1. Generate the BPF program with placeholder values: - // tcpdump -i eth0 -d 'ip and tcp and src 2.4.6.8 and dst 1.3.5.7 and src port 1234 and dst port 5678' - // 2. Remove the first two instructions that check the ethernet header, since tcpdump uses AF_PACKET and we do not - // 3. Subtract the ethernet header size from all LoadAbsolutes - // 4. Replace the placeholder values with src/dst AddrPorts - return bpf.Assemble([]bpf.Instruction{ - // (002) ldb [23] -- load Protocol - bpf.LoadAbsolute{Size: 1, Off: 23 - ethHeaderSize}, - // (003) jeq #0x6 jt 4 jf 16 -- if TCP, goto 4, else 16 - bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x6, SkipTrue: 0, SkipFalse: 12}, - // (004) ld [26] -- load source IP - bpf.LoadAbsolute{Size: 4, Off: 26 - ethHeaderSize}, - // (005) jeq #0x2040608 jt 6 jf 16 -- if srcAddr matches, goto 6, else 16 - bpf.JumpIf{Cond: bpf.JumpEqual, Val: srcAddr, SkipTrue: 0, SkipFalse: 10}, - // (006) ld [30] -- load destination IP - bpf.LoadAbsolute{Size: 4, Off: 30 - ethHeaderSize}, - // (007) jeq #0x1030507 jt 8 jf 16 -- if dstAddr matches, goto 8, else 16 - bpf.JumpIf{Cond: bpf.JumpEqual, Val: dstAddr, SkipTrue: 0, SkipFalse: 8}, - // (008) ldh [20] -- load Fragment Offset - bpf.LoadAbsolute{Size: 2, Off: 20 - ethHeaderSize}, - // (009) jset #0x1fff jt 16 jf 10 -- if fragmented, goto 16, else 10 - bpf.JumpIf{Cond: bpf.JumpBitsSet, Val: 0x1fff, SkipTrue: 6, SkipFalse: 0}, - // (010) ldxb 4*([14]&0xf) -- x = IP header length - bpf.LoadMemShift{Off: 14 - ethHeaderSize}, - // (011) ldh [x + 14] -- load source port - bpf.LoadIndirect{Size: 2, Off: 14 - ethHeaderSize}, - // (012) jeq #0x4d2 jt 13 jf 16 -- if srcPort matches, goto 13, else 16 - bpf.JumpIf{Cond: bpf.JumpEqual, Val: srcPort, SkipTrue: 0, SkipFalse: 3}, - // (013) ldh [x + 16] -- load destination port - bpf.LoadIndirect{Size: 2, Off: 16 - ethHeaderSize}, - // (014) jeq #0x162e jt 15 jf 16 -- if dstPort matches, goto 15, else 16 - bpf.JumpIf{Cond: bpf.JumpEqual, Val: dstPort, SkipTrue: 0, SkipFalse: 1}, - // (015) ret #262144 -- accept packet - bpf.RetConstant{Val: 262144}, - // (016) ret #0 -- drop packet - bpf.RetConstant{Val: 0}, - }) -} diff --git a/pkg/networkpath/traceroute/filter/attach.go b/pkg/networkpath/traceroute/packets/attach.go similarity index 96% rename from pkg/networkpath/traceroute/filter/attach.go rename to pkg/networkpath/traceroute/packets/attach.go index dbea5299711c..46cc6a860ef9 100644 --- a/pkg/networkpath/traceroute/filter/attach.go +++ b/pkg/networkpath/traceroute/packets/attach.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2025-present Datadog, Inc. -package filter +package packets import "errors" diff --git a/pkg/networkpath/traceroute/filter/attach_linux.go b/pkg/networkpath/traceroute/packets/attach_linux.go similarity index 92% rename from pkg/networkpath/traceroute/filter/attach_linux.go rename to pkg/networkpath/traceroute/packets/attach_linux.go index a8e0e69a9112..65e7185b7425 100644 --- a/pkg/networkpath/traceroute/filter/attach_linux.go +++ b/pkg/networkpath/traceroute/packets/attach_linux.go @@ -5,7 +5,7 @@ //go:build linux -package filter +package packets import ( "errors" @@ -38,11 +38,6 @@ func SetBPF(c syscall.RawConn, filter []bpf.RawInstruction) error { return nil } -// this is a simple BPF program that drops all packets no matter what -var dropAllFilter = []bpf.RawInstruction{ - {Op: 0x6, Jt: 0, Jf: 0, K: 0x00000000}, -} - // SetBPFAndDrain sets the filter for a raw socket and drains old data, so that // new packets are guaranteed to match the filter func SetBPFAndDrain(c syscall.RawConn, filter []bpf.RawInstruction) error { diff --git a/pkg/networkpath/traceroute/filter/attach_nolinux.go b/pkg/networkpath/traceroute/packets/attach_nolinux.go similarity index 97% rename from pkg/networkpath/traceroute/filter/attach_nolinux.go rename to pkg/networkpath/traceroute/packets/attach_nolinux.go index eb92d8614d73..8eb82ed4f405 100644 --- a/pkg/networkpath/traceroute/filter/attach_nolinux.go +++ b/pkg/networkpath/traceroute/packets/attach_nolinux.go @@ -5,7 +5,7 @@ //go:build !linux -package filter +package packets import ( "syscall" diff --git a/pkg/networkpath/traceroute/packets/cbpf_filters_unix.go b/pkg/networkpath/traceroute/packets/cbpf_filters_unix.go new file mode 100644 index 000000000000..d49016fa8f16 --- /dev/null +++ b/pkg/networkpath/traceroute/packets/cbpf_filters_unix.go @@ -0,0 +1,61 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +//go:build unix || linux + +package packets + +import "golang.org/x/net/bpf" + +// this is a simple BPF program that drops all packets no matter what +var dropAllFilter = []bpf.RawInstruction{ + {Op: 0x6, Jt: 0, Jf: 0, K: 0x00000000}, +} + +// icmp4Filter is the verbatim output of tcpdump -i eth0 -dd 'icmp' +// It allows only ICMPv4 traffic. +var icmp4Filter = []bpf.RawInstruction{ + {Op: 0x28, Jt: 0, Jf: 0, K: 0x0000000c}, + {Op: 0x15, Jt: 0, Jf: 3, K: 0x00000800}, + {Op: 0x30, Jt: 0, Jf: 0, K: 0x00000017}, + {Op: 0x15, Jt: 0, Jf: 1, K: 0x00000001}, + {Op: 0x6, Jt: 0, Jf: 0, K: 0x00040000}, + {Op: 0x6, Jt: 0, Jf: 0, K: 0x00000000}, +} + +// icmpFilter is codegen'd from tcpdump -i eth0 -dd 'icmp || icmp6' +// It allows ICMPv4 and ICMPv6 traffic +var icmpFilter = []bpf.RawInstruction{ + {Op: 0x28, Jt: 0, Jf: 0, K: 0x0000000c}, + {Op: 0x15, Jt: 0, Jf: 2, K: 0x00000800}, + {Op: 0x30, Jt: 0, Jf: 0, K: 0x00000017}, + {Op: 0x15, Jt: 6, Jf: 7, K: 0x00000001}, + {Op: 0x15, Jt: 0, Jf: 6, K: 0x000086dd}, + {Op: 0x30, Jt: 0, Jf: 0, K: 0x00000014}, + {Op: 0x15, Jt: 3, Jf: 0, K: 0x0000003a}, + {Op: 0x15, Jt: 0, Jf: 3, K: 0x0000002c}, + {Op: 0x30, Jt: 0, Jf: 0, K: 0x00000036}, + {Op: 0x15, Jt: 0, Jf: 1, K: 0x0000003a}, + {Op: 0x6, Jt: 0, Jf: 0, K: 0x00040000}, + {Op: 0x6, Jt: 0, Jf: 0, K: 0x00000000}, +} + +// udpFilter is codegen'd from tcpdump -i eth0 -dd 'icmp || icmp6 || udp' +// it allows ICMPv4, ICMPv6, and UDP traffic (basically it omits TCP) +var udpFilter = []bpf.RawInstruction{ + {Op: 0x28, Jt: 0, Jf: 0, K: 0x0000000c}, + {Op: 0x15, Jt: 0, Jf: 2, K: 0x00000800}, + {Op: 0x30, Jt: 0, Jf: 0, K: 0x00000017}, + {Op: 0x15, Jt: 7, Jf: 6, K: 0x00000001}, + {Op: 0x15, Jt: 0, Jf: 7, K: 0x000086dd}, + {Op: 0x30, Jt: 0, Jf: 0, K: 0x00000014}, + {Op: 0x15, Jt: 4, Jf: 0, K: 0x0000003a}, + {Op: 0x15, Jt: 0, Jf: 2, K: 0x0000002c}, + {Op: 0x30, Jt: 0, Jf: 0, K: 0x00000036}, + {Op: 0x15, Jt: 1, Jf: 0, K: 0x0000003a}, + {Op: 0x15, Jt: 0, Jf: 1, K: 0x00000011}, + {Op: 0x6, Jt: 0, Jf: 0, K: 0x00040000}, + {Op: 0x6, Jt: 0, Jf: 0, K: 0x00000000}, +} diff --git a/pkg/networkpath/traceroute/common/frame_parser.go b/pkg/networkpath/traceroute/packets/frame_parser.go similarity index 71% rename from pkg/networkpath/traceroute/common/frame_parser.go rename to pkg/networkpath/traceroute/packets/frame_parser.go index e6b5835894cb..bc5661e180c3 100644 --- a/pkg/networkpath/traceroute/common/frame_parser.go +++ b/pkg/networkpath/traceroute/packets/frame_parser.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2025-present Datadog, Inc. -package common +package packets import ( "encoding/binary" @@ -12,38 +12,51 @@ import ( "net/netip" "slices" + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/common" "github.com/google/gopacket" "github.com/google/gopacket/layers" ) // FrameParser parses traceroute responses using gopacket. type FrameParser struct { - Ethernet layers.Ethernet IP4 layers.IPv4 + IP6 layers.IPv6 TCP layers.TCP ICMP4 layers.ICMPv4 + ICMP6 layers.ICMPv6 Payload gopacket.Payload Layers []gopacket.LayerType - parser *gopacket.DecodingLayerParser + parserv4 *gopacket.DecodingLayerParser + parserv6 *gopacket.DecodingLayerParser } -var ignoredLayerErr = &ReceiveProbeNoPktError{ - Err: fmt.Errorf("FrameParser saw an a layer type not used by traceroute (e.g. ARP) and decided to ignore it"), +var ignoredLayerErr = &common.ReceiveProbeNoPktError{ + Err: fmt.Errorf("FrameParser saw an a layer type not used by traceroute (e.g. SCTP) and decided to ignore it"), } -const expectedLayerCount = 3 +const expectedLayerCount = 2 // NewFrameParser constructs a new FrameParser func NewFrameParser() *FrameParser { p := &FrameParser{} - p.parser = gopacket.NewDecodingLayerParser(layers.LayerTypeEthernet, &p.Ethernet, &p.IP4, &p.TCP, &p.ICMP4, &p.Payload) + p.parserv4 = gopacket.NewDecodingLayerParser(layers.LayerTypeIPv4, &p.IP4, &p.TCP, &p.ICMP4, &p.Payload) + // TODO: IPv6 is not actually implemented yet + p.parserv6 = gopacket.NewDecodingLayerParser(layers.LayerTypeIPv6, &p.IP6, &p.TCP, &p.ICMP6, &p.Payload) return p } // Parse parses an ethernet packet func (p *FrameParser) Parse(buffer []byte) error { - err := p.parser.DecodeLayers(buffer, &p.Layers) + parser, err := p.getParser(buffer) + if err != nil { + return err + } + // TODO: currently we don't support ipv6 + if parser == p.parserv6 { + return ignoredLayerErr + } + err = parser.DecodeLayers(buffer, &p.Layers) var unsupportedErr gopacket.UnsupportedLayerType if errors.As(err, &unsupportedErr) { if len(p.Layers) < expectedLayerCount { @@ -57,7 +70,7 @@ func (p *FrameParser) Parse(buffer []byte) error { return fmt.Errorf("Parse: %w", err) } if err := p.checkLayers(); err != nil { - return err + return &common.BadPacketError{Err: err} } return nil } @@ -67,7 +80,7 @@ func (p *FrameParser) GetIPLayer() gopacket.LayerType { if len(p.Layers) < expectedLayerCount { return gopacket.LayerTypeZero } - return p.Layers[1] + return p.Layers[0] } // GetTransportLayer gets the layer type of the transport layer (e.g. TCP, ICMP) @@ -75,7 +88,7 @@ func (p *FrameParser) GetTransportLayer() gopacket.LayerType { if len(p.Layers) < expectedLayerCount { return gopacket.LayerTypeZero } - return p.Layers[2] + return p.Layers[1] } // TODO IPv6 @@ -156,6 +169,8 @@ type ICMPInfo struct { IPPair IPPair // ICMPType is the kind of ICMP packet (e.g. TTL exceeded) ICMPType layers.ICMPv4TypeCode + // WrappedPacketID is the packet ID from the wrapped IP payload + WrappedPacketID uint16 // ICMPPair is the source/dest IPs from the wrapped IP payload ICMPPair IPPair // Payload is the payload from within the wrapped IP packet, typically containing the first 8 bytes of TCP/UDP. @@ -177,10 +192,11 @@ func (p *FrameParser) GetICMPInfo() (ICMPInfo, error) { } icmpInfo := ICMPInfo{ - IPPair: ipPair, - ICMPType: p.ICMP4.TypeCode, - ICMPPair: getIPv4Pair(&innerPkt), - Payload: slices.Clone(innerPkt.Payload), + IPPair: ipPair, + ICMPType: p.ICMP4.TypeCode, + WrappedPacketID: innerPkt.Id, + ICMPPair: getIPv4Pair(&innerPkt), + Payload: slices.Clone(innerPkt.Payload), } return icmpInfo, nil default: @@ -191,3 +207,32 @@ func (p *FrameParser) GetICMPInfo() (ICMPInfo, error) { // TTLExceeded4 is the TTL Exceeded ICMP4 TypeCode var TTLExceeded4 = layers.CreateICMPv4TypeCode(layers.ICMPv4TypeTimeExceeded, layers.ICMPv4CodeTTLExceeded) + +func (p *FrameParser) getParser(buffer []byte) (*gopacket.DecodingLayerParser, error) { + if len(buffer) < 1 { + return nil, fmt.Errorf("getParser: buffer was empty") + } + version := buffer[0] >> 4 + switch version { + case 4: + return p.parserv4, nil + case 6: + return p.parserv6, nil + default: + return nil, &common.BadPacketError{Err: fmt.Errorf("unexpected IP version %d", version)} + } +} + +// removes the preceding ethernet header from the buffer +func stripEthernetHeader(buf []byte) ([]byte, error) { + var eth layers.Ethernet + err := (ð).DecodeFromBytes(buf, gopacket.NilDecodeFeedback) + if err != nil { + return nil, fmt.Errorf("stripEthernetHeader failed to decode ethernet: %w", err) + } + // return ignoredLayerErr when the it's not an IP packet + if eth.EthernetType != layers.EthernetTypeIPv4 && eth.EthernetType != layers.EthernetTypeIPv6 { + return nil, ignoredLayerErr + } + return eth.Payload, nil +} diff --git a/pkg/networkpath/traceroute/common/frame_parser_test.go b/pkg/networkpath/traceroute/packets/frame_parser_test.go similarity index 75% rename from pkg/networkpath/traceroute/common/frame_parser_test.go rename to pkg/networkpath/traceroute/packets/frame_parser_test.go index d8e8b8d44f13..b6721ff51a93 100644 --- a/pkg/networkpath/traceroute/common/frame_parser_test.go +++ b/pkg/networkpath/traceroute/packets/frame_parser_test.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2025-present Datadog, Inc. -package common +package packets import ( "net" @@ -21,18 +21,12 @@ func clearLayer(layer *layers.BaseLayer) { // SerializeLayers doesn't populate these fields, so we exclude them from equality comparison func clearBuffers(parser *FrameParser) { - clearLayer(&parser.Ethernet.BaseLayer) clearLayer(&parser.IP4.BaseLayer) clearLayer(&parser.TCP.BaseLayer) clearLayer(&parser.ICMP4.BaseLayer) } func TestFrameParserTCP(t *testing.T) { - eth := &layers.Ethernet{ - SrcMAC: net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}, - DstMAC: net.HardwareAddr{0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c}, - EthernetType: layers.EthernetTypeIPv4, - } ip4 := &layers.IPv4{ Version: 4, TTL: 123, @@ -60,7 +54,7 @@ func TestFrameParserTCP(t *testing.T) { FixLengths: true, ComputeChecksums: true, } - err = gopacket.SerializeLayers(buf, opts, eth, ip4, tcp, payload) + err = gopacket.SerializeLayers(buf, opts, ip4, tcp, payload) require.NoError(t, err) parser := NewFrameParser() @@ -70,7 +64,6 @@ func TestFrameParserTCP(t *testing.T) { clearBuffers(parser) - require.EqualExportedValues(t, eth, &parser.Ethernet) require.EqualExportedValues(t, ip4, &parser.IP4) require.EqualExportedValues(t, tcp, &parser.TCP) require.Equal(t, payload, parser.Payload) @@ -91,11 +84,6 @@ func TestFrameParserICMP4(t *testing.T) { require.NoError(t, err) tcpBytes := buf.Bytes()[:8] - eth := &layers.Ethernet{ - SrcMAC: net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}, - DstMAC: net.HardwareAddr{0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c}, - EthernetType: layers.EthernetTypeIPv4, - } ip4 := &layers.IPv4{ Version: 4, TTL: 123, @@ -114,7 +102,7 @@ func TestFrameParserICMP4(t *testing.T) { FixLengths: true, ComputeChecksums: true, } - err = gopacket.SerializeLayers(buf, opts, eth, ip4, icmp4, payload) + err = gopacket.SerializeLayers(buf, opts, ip4, icmp4, payload) require.NoError(t, err) parser := NewFrameParser() @@ -124,7 +112,6 @@ func TestFrameParserICMP4(t *testing.T) { clearBuffers(parser) - require.EqualExportedValues(t, eth, &parser.Ethernet) require.EqualExportedValues(t, ip4, &parser.IP4) require.EqualExportedValues(t, icmp4, &parser.ICMP4) @@ -140,21 +127,26 @@ func TestFrameParserICMP4(t *testing.T) { } func TestFrameParserUnrecognizedPacket(t *testing.T) { - eth := &layers.Ethernet{ - SrcMAC: net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}, - DstMAC: net.HardwareAddr{0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c}, - EthernetType: layers.EthernetTypeARP, + ip4 := &layers.IPv4{ + Version: 4, + TTL: 123, + SrcIP: net.ParseIP("127.0.0.1"), + DstIP: net.ParseIP("127.0.0.2"), + Id: 41821, + Protocol: layers.IPProtocolSCTP, } - arp := &layers.ARP{ - Protocol: layers.EthernetTypeARP, + sctp := &layers.SCTP{ + SrcPort: 42, + DstPort: 123, } + payload := gopacket.Payload([]byte{42}) buf := gopacket.NewSerializeBuffer() opts := gopacket.SerializeOptions{ FixLengths: true, ComputeChecksums: true, } - err := gopacket.SerializeLayers(buf, opts, eth, arp) + err := gopacket.SerializeLayers(buf, opts, ip4, sctp, payload) require.NoError(t, err) parser := NewFrameParser() @@ -165,11 +157,6 @@ func TestFrameParserUnrecognizedPacket(t *testing.T) { func TestFrameParserTLSPacket(t *testing.T) { // tests a TLS packet which has one extra layer we don't care about - eth := &layers.Ethernet{ - SrcMAC: net.HardwareAddr{0x01, 0x02, 0x03, 0x04, 0x05, 0x06}, - DstMAC: net.HardwareAddr{0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c}, - EthernetType: layers.EthernetTypeIPv4, - } ip4 := &layers.IPv4{ Version: 4, TTL: 123, @@ -198,7 +185,7 @@ func TestFrameParserTLSPacket(t *testing.T) { FixLengths: true, ComputeChecksums: true, } - err = gopacket.SerializeLayers(buf, opts, eth, ip4, tcp, tls) + err = gopacket.SerializeLayers(buf, opts, ip4, tcp, tls) require.NoError(t, err) parser := NewFrameParser() @@ -208,7 +195,6 @@ func TestFrameParserTLSPacket(t *testing.T) { clearBuffers(parser) - require.EqualExportedValues(t, eth, &parser.Ethernet) require.EqualExportedValues(t, ip4, &parser.IP4) require.EqualExportedValues(t, tcp, &parser.TCP) } diff --git a/pkg/networkpath/traceroute/packets/packet_sink.go b/pkg/networkpath/traceroute/packets/packet_sink.go new file mode 100644 index 000000000000..63ff600f11e3 --- /dev/null +++ b/pkg/networkpath/traceroute/packets/packet_sink.go @@ -0,0 +1,19 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package packets + +import ( + "net/netip" +) + +// Sink is an interface which sends IP packets +type Sink interface { + // WriteTo writes the given packet (buffer starts at the IP layer) to addrPort. + // (the port is required for compatibility with Windows) + WriteTo(buf []byte, addrPort netip.AddrPort) error + // Close closes the socket + Close() error +} diff --git a/pkg/networkpath/traceroute/packets/packet_sink_unix.go b/pkg/networkpath/traceroute/packets/packet_sink_unix.go new file mode 100644 index 000000000000..1fd99cc688a1 --- /dev/null +++ b/pkg/networkpath/traceroute/packets/packet_sink_unix.go @@ -0,0 +1,80 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +//go:build unix || linux + +package packets + +import ( + "errors" + "fmt" + "net/netip" + "os" + "syscall" + + "golang.org/x/sys/unix" +) + +// SinkUnix is an implementation of the packet sink interface for unix OSes +// TODO change to sinkUnix and don't export this? +type SinkUnix struct { + rawConn syscall.RawConn +} + +var _ Sink = &SinkUnix{} + +// NewSinkUnix returns a new SinkUnix implementing packet sink +func NewSinkUnix(addr netip.Addr) (*SinkUnix, error) { + if !addr.Is4() { + return nil, fmt.Errorf("SinkUnix only supports IPv4 addresses (for now)") + } + fd, err := unix.Socket(unix.AF_INET, unix.SOCK_RAW|unix.SOCK_NONBLOCK, unix.IPPROTO_RAW) + if err != nil { + return nil, fmt.Errorf("NewSinkUnix failed to create socket: %s", err) + } + + sock := os.NewFile(uintptr(fd), "") + rawConn, err := sock.SyscallConn() + if err != nil { + sock.Close() + return nil, fmt.Errorf("failed to get raw connection: %w", err) + } + + return &SinkUnix{ + rawConn: rawConn, + }, nil +} + +// WriteTo writes the given packet (buffer starts at the IP layer) to addrPort. +func (p *SinkUnix) WriteTo(buf []byte, addrPort netip.AddrPort) error { + if !addrPort.Addr().Is4() { + return fmt.Errorf("SinkUnix only supports IPv4 addresses (for now)") + } + sa := &unix.SockaddrInet4{ + Addr: addrPort.Addr().As4(), + } + + var sendtoErr error + err := p.rawConn.Write(func(fd uintptr) (done bool) { + err := unix.Sendto(int(fd), buf, 0, sa) + if err == nil { + return true + } + + return err == syscall.EAGAIN || err == syscall.EWOULDBLOCK + }) + + return errors.Join(sendtoErr, err) +} + +// Close closes the socket +func (p *SinkUnix) Close() error { + var closeErr error + err := p.rawConn.Control(func(fd uintptr) { + closeErr = unix.Close(int(fd)) + }) + + return errors.Join(closeErr, err) +} diff --git a/pkg/networkpath/traceroute/packets/packet_sink_windows.go b/pkg/networkpath/traceroute/packets/packet_sink_windows.go new file mode 100644 index 000000000000..b43764c4ac87 --- /dev/null +++ b/pkg/networkpath/traceroute/packets/packet_sink_windows.go @@ -0,0 +1,8 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +//go:build windows + +package packets diff --git a/pkg/networkpath/traceroute/common/packet_source.go b/pkg/networkpath/traceroute/packets/packet_source.go similarity index 58% rename from pkg/networkpath/traceroute/common/packet_source.go rename to pkg/networkpath/traceroute/packets/packet_source.go index d0cd6049943a..67df491b95c0 100644 --- a/pkg/networkpath/traceroute/common/packet_source.go +++ b/pkg/networkpath/traceroute/packets/packet_source.go @@ -3,33 +3,33 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2025-present Datadog, Inc. -package common +// Package packets has packet capture/emitting/filtering logic +package packets import ( - "encoding/hex" "errors" "fmt" "os" "time" - "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/common" ) -// PacketSource is an interface representing ethernet packet capture -type PacketSource interface { +// Source is an interface representing ethernet packet capture +type Source interface { // SetReadDeadline sets the deadline for when a Read() call must finish SetReadDeadline(t time.Time) error - // Read reads a packet (including the ethernet frame) + // Read reads a packet (starting with the IP frame) Read(buf []byte) (int, error) // Close closes the socket Close() error } // ReadAndParse reads from the given source into the buffer, and parses it with parser -func ReadAndParse(source PacketSource, buffer []byte, parser *FrameParser) error { +func ReadAndParse(source Source, buffer []byte, parser *FrameParser) error { n, err := source.Read(buffer) if errors.Is(err, os.ErrDeadlineExceeded) { - return &ReceiveProbeNoPktError{Err: err} + return &common.ReceiveProbeNoPktError{Err: err} } if err != nil { return fmt.Errorf("ConnHandle failed to Read: %w", err) @@ -40,11 +40,7 @@ func ReadAndParse(source PacketSource, buffer []byte, parser *FrameParser) error err = parser.Parse(buffer[:n]) if err != nil { - log.DebugFunc(func() string { - data := hex.EncodeToString(buffer[:n]) - return fmt.Sprintf("error parsing packet of length %d: %s, %s", n, err, data) - }) - return &BadPacketError{Err: fmt.Errorf("sackDriver failed to parse packet of length %d: %w", n, err)} + return err } return nil diff --git a/pkg/networkpath/traceroute/common/afpacket_source_linux.go b/pkg/networkpath/traceroute/packets/packet_source_linux.go similarity index 75% rename from pkg/networkpath/traceroute/common/afpacket_source_linux.go rename to pkg/networkpath/traceroute/packets/packet_source_linux.go index 4a488be63a56..9dabd5e00e55 100644 --- a/pkg/networkpath/traceroute/common/afpacket_source_linux.go +++ b/pkg/networkpath/traceroute/packets/packet_source_linux.go @@ -5,13 +5,14 @@ //go:build linux -package common +package packets import ( "fmt" "os" "time" + "golang.org/x/net/bpf" "golang.org/x/sys/unix" ) @@ -22,7 +23,7 @@ type AFPacketSource struct { sock *os.File } -var _ PacketSource = &AFPacketSource{} +var _ Source = &AFPacketSource{} // ethPAllNetwork is all protocols, in network byte order var ethPAllNetwork = htons(uint16(unix.ETH_P_ALL)) @@ -43,9 +44,18 @@ func (a *AFPacketSource) SetReadDeadline(t time.Time) error { return a.sock.SetReadDeadline(t) } -// Read reads a packet (including the ethernet frame) +// Read reads a packet (starting with the IP frame) func (a *AFPacketSource) Read(buf []byte) (int, error) { - return a.sock.Read(buf) + n, err := a.sock.Read(buf) + if err != nil { + return n, err + } + payload, err := stripEthernetHeader(buf[:n]) + if err != nil { + return n, err + } + copy(buf, payload) + return n, nil } // Close closes the socket @@ -53,6 +63,14 @@ func (a *AFPacketSource) Close() error { return a.sock.Close() } +func (a *AFPacketSource) setBpfAndDrain(filter []bpf.RawInstruction) error { + conn, err := a.sock.SyscallConn() + if err != nil { + return err + } + return SetBPFAndDrain(conn, filter) +} + // htons converts a short (uint16) from host-to-network byte order. func htons(i uint16) uint16 { return i<<8 | i>>8 diff --git a/pkg/networkpath/traceroute/packets/packet_source_windows.go b/pkg/networkpath/traceroute/packets/packet_source_windows.go new file mode 100644 index 000000000000..b43764c4ac87 --- /dev/null +++ b/pkg/networkpath/traceroute/packets/packet_source_windows.go @@ -0,0 +1,8 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +//go:build windows + +package packets diff --git a/pkg/networkpath/traceroute/filter/rawconn.go b/pkg/networkpath/traceroute/packets/rawconn_unix.go similarity index 95% rename from pkg/networkpath/traceroute/filter/rawconn.go rename to pkg/networkpath/traceroute/packets/rawconn_unix.go index 0fa644198cfb..87008d7370bb 100644 --- a/pkg/networkpath/traceroute/filter/rawconn.go +++ b/pkg/networkpath/traceroute/packets/rawconn_unix.go @@ -3,7 +3,9 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2025-present Datadog, Inc. -package filter +//go:build unix || linux + +package packets import ( "context" diff --git a/pkg/networkpath/traceroute/packets/tcp_filter.go b/pkg/networkpath/traceroute/packets/tcp_filter.go new file mode 100644 index 000000000000..3b0f078c4498 --- /dev/null +++ b/pkg/networkpath/traceroute/packets/tcp_filter.go @@ -0,0 +1,110 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package packets + +import ( + "encoding/binary" + "fmt" + "net/netip" + + "golang.org/x/net/bpf" +) + +// TCPFilterConfig is the config for GenerateTCP4Filter +type TCPFilterConfig struct { + Src netip.AddrPort + Dst netip.AddrPort +} + +// GenerateTCP4Filter creates a classic BPF filter for TCP SOCK_RAW sockets. +// It will only allow packets whose tuple matches the given config. +func (c TCPFilterConfig) GenerateTCP4Filter() ([]bpf.RawInstruction, error) { + if !c.Src.Addr().Is4() || !c.Dst.Addr().Is4() { + return nil, fmt.Errorf("GenerateTCP4Filter2: src=%s and dst=%s must be IPv4", c.Src.Addr(), c.Dst.Addr()) + } + srcAddr := binary.BigEndian.Uint32(c.Src.Addr().AsSlice()) + dstAddr := binary.BigEndian.Uint32(c.Dst.Addr().AsSlice()) + srcPort := uint32(c.Src.Port()) + dstPort := uint32(c.Dst.Port()) + + // Process to derive the following program: + // 1. Generate the BPF program with placeholder values: + // tcpdump -i eth0 -d 'ip and tcp and src 2.4.6.8 and dst 1.3.5.7 and src port 1234 and dst port 5678' + // 2. Replace the placeholder values with src/dst AddrPorts + return bpf.Assemble([]bpf.Instruction{ + // (000) ldh [12] -- load EtherType + bpf.LoadAbsolute{Size: 2, Off: 12}, + // (001) jeq #0x800 jt 2 jf 17 -- if IPv4, continue, else drop + bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x800, SkipTrue: 0, SkipFalse: 15}, + // (002) ldb [23] -- load Protocol + bpf.LoadAbsolute{Size: 1, Off: 23}, + // (003) jeq #0x1 jt 16 jf 4 -- if ICMPv4, accept packet, else continue + bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x1, SkipTrue: 12, SkipFalse: 0}, + // (004) jeq #0x6 jt 5 jf 17 -- if TCP, continue, else drop + bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x6, SkipTrue: 0, SkipFalse: 12}, + // (005) ld [26] -- load source IP + bpf.LoadAbsolute{Size: 4, Off: 26}, + // (006) jeq #0x2040608 jt 7 jf 17 -- if srcAddr matches, continue, else drop + bpf.JumpIf{Cond: bpf.JumpEqual, Val: srcAddr, SkipTrue: 0, SkipFalse: 10}, + // (007) ld [30] -- load destination IP + bpf.LoadAbsolute{Size: 4, Off: 30}, + // (008) jeq #0x1030507 jt 9 jf 17 -- if dstAddr matches, continue, else drop + bpf.JumpIf{Cond: bpf.JumpEqual, Val: dstAddr, SkipTrue: 0, SkipFalse: 8}, + // (009) ldh [20] -- load Fragment Offset + bpf.LoadAbsolute{Size: 2, Off: 20}, + // (010) jset #0x1fff jt 17 jf 11 -- if fragmented, drop, else continue + bpf.JumpIf{Cond: bpf.JumpBitsSet, Val: 0x1fff, SkipTrue: 6, SkipFalse: 0}, + // (011) ldxb 4*([14]&0xf) -- x = IP header length + bpf.LoadMemShift{Off: 14}, + // (012) ldh [x + 14] -- load source port + bpf.LoadIndirect{Size: 2, Off: 14}, + // (013) jeq #0x4d2 jt 14 jf 17 -- if srcPort matches, continue, else drop + bpf.JumpIf{Cond: bpf.JumpEqual, Val: srcPort, SkipTrue: 0, SkipFalse: 3}, + // (014) ldh [x + 16] -- load destination port + bpf.LoadIndirect{Size: 2, Off: 16}, + // (015) jeq #0x162e jt 16 jf 17 -- if dstPort matches, continue, else drop + bpf.JumpIf{Cond: bpf.JumpEqual, Val: dstPort, SkipTrue: 0, SkipFalse: 1}, + // (016) ret #262144 -- accept packet + bpf.RetConstant{Val: 262144}, + // (017) ret #0 -- drop packet + bpf.RetConstant{Val: 0}, + + // // (000) ldh [12] + // bpf.LoadAbsolute{Size: 2, Off: 12}, + // // (001) jeq #0x800 jt 2 jf 16 -- if IPv4, goto 2, else 16 + // bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x800, SkipTrue: 0, SkipFalse: 14}, + // // (002) ldb [23] -- load Protocol + // bpf.LoadAbsolute{Size: 1, Off: 23}, + // // (003) jeq #0x6 jt 4 jf 16 -- if TCP, goto 4, else 16 + // bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x6, SkipTrue: 0, SkipFalse: 12}, + // // (004) ld [26] -- load source IP + // bpf.LoadAbsolute{Size: 4, Off: 26}, + // // (005) jeq #0x2040608 jt 6 jf 16 -- if srcAddr matches, goto 6, else 16 + // bpf.JumpIf{Cond: bpf.JumpEqual, Val: srcAddr, SkipTrue: 0, SkipFalse: 10}, + // // (006) ld [30] -- load destination IP + // bpf.LoadAbsolute{Size: 4, Off: 30}, + // // (007) jeq #0x1030507 jt 8 jf 16 -- if dstAddr matches, goto 8, else 16 + // bpf.JumpIf{Cond: bpf.JumpEqual, Val: dstAddr, SkipTrue: 0, SkipFalse: 8}, + // // (008) ldh [20] -- load Fragment Offset + // bpf.LoadAbsolute{Size: 2, Off: 20}, + // // (009) jset #0x1fff jt 16 jf 10 -- if fragmented, goto 16, else 10 + // bpf.JumpIf{Cond: bpf.JumpBitsSet, Val: 0x1fff, SkipTrue: 6, SkipFalse: 0}, + // // (010) ldxb 4*([14]&0xf) -- x = IP header length + // bpf.LoadMemShift{Off: 14}, + // // (011) ldh [x + 14] -- load source port + // bpf.LoadIndirect{Size: 2, Off: 14}, + // // (012) jeq #0x4d2 jt 13 jf 16 -- if srcPort matches, goto 13, else 16 + // bpf.JumpIf{Cond: bpf.JumpEqual, Val: srcPort, SkipTrue: 0, SkipFalse: 3}, + // // (013) ldh [x + 16] -- load destination port + // bpf.LoadIndirect{Size: 2, Off: 16}, + // // (014) jeq #0x162e jt 15 jf 16 -- if dstPort matches, goto 15, else 16 + // bpf.JumpIf{Cond: bpf.JumpEqual, Val: dstPort, SkipTrue: 0, SkipFalse: 1}, + // // (015) ret #262144 -- accept packet + // bpf.RetConstant{Val: 262144}, + // // (016) ret #0 -- drop packet + // bpf.RetConstant{Val: 0}, + }) +} diff --git a/pkg/networkpath/traceroute/filter/tcp_test.go b/pkg/networkpath/traceroute/packets/tcp_filter_test.go similarity index 50% rename from pkg/networkpath/traceroute/filter/tcp_test.go rename to pkg/networkpath/traceroute/packets/tcp_filter_test.go index 4c27b8cd9ec6..4c879b5471b2 100644 --- a/pkg/networkpath/traceroute/filter/tcp_test.go +++ b/pkg/networkpath/traceroute/packets/tcp_filter_test.go @@ -5,33 +5,75 @@ //go:build test && linux && linux_bpf -package filter +package packets import ( "bufio" - "context" "errors" "net" "net/netip" "os" - "syscall" + "runtime" "testing" "time" + "github.com/google/gopacket/layers" "github.com/stretchr/testify/require" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netns" "github.com/DataDog/datadog-agent/pkg/network/tracer/testutil" + netnsutil "github.com/DataDog/datadog-agent/pkg/util/kernel/netns" ) type tcpTestCase struct { - filterConfig func(server, client netip.AddrPort) TCP4FilterConfig + filterConfig func(server, client netip.AddrPort) TCPFilterConfig shouldCapture bool } +func makeNetns() (netns.NsHandle, error) { + // Lock the OS Thread so we don't accidentally switch namespaces + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + // Save the current network namespace + origns, err := netns.Get() + defer origns.Close() + if err != nil { + return netns.None(), err + } + // Switch back to the original namespace when we are done + defer netns.Set(origns) + + // Create a new network namespace + newns, err := netns.New() + if err != nil { + return netns.None(), err + } + + // get the loopback interface + lo, err := netlink.LinkByName("lo") + if err != nil { + return netns.None(), err + } + + // enable the loopback interface (it's down by default) + err = netlink.LinkSetUp(lo) + if err != nil { + return netns.None(), err + } + + return newns, err +} + func doTestCase(t *testing.T, tc tcpTestCase) { - // we use bound ports on the server and the client so this should be safe to parallelize + // we use a separate netns for each test so this is safe to parallelize t.Parallel() + testns, err := makeNetns() + require.NoError(t, err) + defer testns.Close() + server := testutil.NewTCPServerOnAddress("127.0.0.42:0", func(c net.Conn) { r := bufio.NewReader(c) r.ReadBytes(byte('\n')) @@ -39,7 +81,10 @@ func doTestCase(t *testing.T, tc tcpTestCase) { testutil.GracefulCloseTCP(c) }) t.Cleanup(server.Shutdown) - require.NoError(t, server.Run()) + // we only need testns to be active when we call bind()/listen() + err = netnsutil.WithNS(testns, server.Listen) + require.NoError(t, err) + server.StartAccepting() dialer := net.Dialer{ Timeout: time.Minute, @@ -49,7 +94,12 @@ func doTestCase(t *testing.T, tc tcpTestCase) { }, } - conn, err := dialer.Dial("tcp", server.Address()) + var conn net.Conn + err = netnsutil.WithNS(testns, func() error { + nsConn, nsErr := dialer.Dial("tcp", server.Address()) + conn = nsConn + return nsErr + }) require.NoError(t, err) defer testutil.GracefulCloseTCP(conn) @@ -62,42 +112,42 @@ func doTestCase(t *testing.T, tc tcpTestCase) { filter, err := cfg.GenerateTCP4Filter() require.NoError(t, err) - lc := &net.ListenConfig{ - Control: func(_network, _address string, c syscall.RawConn) error { - err := SetBPFAndDrain(c, filter) - require.NoError(t, err) - return err - }, - } - - rawConn, err := MakeRawConn(context.Background(), lc, "ip:tcp", clientAddrPort.Addr()) + var source *AFPacketSource + netnsutil.WithNS(testns, func() error { + nsSource, nsErr := NewAFPacketSource() + source = nsSource + return nsErr + }) + err = source.setBpfAndDrain(filter) require.NoError(t, err) conn.Write([]byte("bar\n")) buffer := make([]byte, 1024) + parser := NewFrameParser() + + source.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + err = ReadAndParse(source, buffer, parser) - rawConn.SetDeadline(time.Now().Add(500 * time.Millisecond)) - n, addr, err := rawConn.ReadFromIP(buffer) if !errors.Is(err, os.ErrDeadlineExceeded) { // ErrDeadlineExceeded is what the test checks for, so we should only blow up on real errors require.NoError(t, err) - require.NotZero(t, n) } hasCaptured := !errors.Is(err, os.ErrDeadlineExceeded) if tc.shouldCapture { require.True(t, hasCaptured, "expected to see a packet, but found nothing") - require.Equal(t, addr.IP, net.IP(cfg.Src.Addr().AsSlice())) + require.Equal(t, layers.LayerTypeIPv4, parser.GetIPLayer()) + require.Equal(t, net.IP(cfg.Src.Addr().AsSlice()), parser.IP4.SrcIP) } else { - require.False(t, hasCaptured, "expected not to see a packet, but found one from %s", addr) + require.False(t, hasCaptured, "expected not to see a packet, but found one from %s", parser.IP4.SrcIP) } } func TestTCPFilterMatch(t *testing.T) { doTestCase(t, tcpTestCase{ - filterConfig: func(server, client netip.AddrPort) TCP4FilterConfig { - return TCP4FilterConfig{Src: server, Dst: client} + filterConfig: func(server, client netip.AddrPort) TCPFilterConfig { + return TCPFilterConfig{Src: server, Dst: client} }, shouldCapture: true, }) @@ -114,8 +164,8 @@ func manglePort(ap netip.AddrPort) netip.AddrPort { func TestTCPFilterBadServerIP(t *testing.T) { doTestCase(t, tcpTestCase{ - filterConfig: func(server, client netip.AddrPort) TCP4FilterConfig { - return TCP4FilterConfig{Src: mangleIP(server), Dst: client} + filterConfig: func(server, client netip.AddrPort) TCPFilterConfig { + return TCPFilterConfig{Src: mangleIP(server), Dst: client} }, shouldCapture: false, }) @@ -123,8 +173,8 @@ func TestTCPFilterBadServerIP(t *testing.T) { func TestTCPFilterBadServerPort(t *testing.T) { doTestCase(t, tcpTestCase{ - filterConfig: func(server, client netip.AddrPort) TCP4FilterConfig { - return TCP4FilterConfig{Src: manglePort(server), Dst: client} + filterConfig: func(server, client netip.AddrPort) TCPFilterConfig { + return TCPFilterConfig{Src: manglePort(server), Dst: client} }, shouldCapture: false, }) @@ -132,8 +182,8 @@ func TestTCPFilterBadServerPort(t *testing.T) { func TestTCPFilterBadClientIP(t *testing.T) { doTestCase(t, tcpTestCase{ - filterConfig: func(server, client netip.AddrPort) TCP4FilterConfig { - return TCP4FilterConfig{Src: server, Dst: mangleIP(client)} + filterConfig: func(server, client netip.AddrPort) TCPFilterConfig { + return TCPFilterConfig{Src: server, Dst: mangleIP(client)} }, shouldCapture: false, }) @@ -141,8 +191,8 @@ func TestTCPFilterBadClientIP(t *testing.T) { func TestTCPFilterBadClientPort(t *testing.T) { doTestCase(t, tcpTestCase{ - filterConfig: func(server, client netip.AddrPort) TCP4FilterConfig { - return TCP4FilterConfig{Src: server, Dst: manglePort(client)} + filterConfig: func(server, client netip.AddrPort) TCPFilterConfig { + return TCPFilterConfig{Src: server, Dst: manglePort(client)} }, shouldCapture: false, }) diff --git a/pkg/networkpath/traceroute/runner/runner.go b/pkg/networkpath/traceroute/runner/runner.go index ad779f2a4141..9cb9829f732a 100644 --- a/pkg/networkpath/traceroute/runner/runner.go +++ b/pkg/networkpath/traceroute/runner/runner.go @@ -162,11 +162,13 @@ func makeSackParams(target net.IP, targetPort uint16, maxTTL uint8, timeout time return sack.Params{}, fmt.Errorf("invalid target IP") } parallelParams := common.TracerouteParallelParams{ - MinTTL: DefaultMinTTL, - MaxTTL: maxTTL, - TracerouteTimeout: timeout, - PollFrequency: 100 * time.Millisecond, - SendDelay: 10 * time.Millisecond, + TracerouteParams: common.TracerouteParams{ + MinTTL: DefaultMinTTL, + MaxTTL: maxTTL, + TracerouteTimeout: timeout, + PollFrequency: 100 * time.Millisecond, + SendDelay: 10 * time.Millisecond, + }, } params := sack.Params{ Target: netip.AddrPortFrom(targetAddr, targetPort), diff --git a/pkg/networkpath/traceroute/sack/portable_sack/portable_sack.go b/pkg/networkpath/traceroute/sack/portable_sack/portable_sack.go index 09be7eb895b4..70abffd6b18c 100644 --- a/pkg/networkpath/traceroute/sack/portable_sack/portable_sack.go +++ b/pkg/networkpath/traceroute/sack/portable_sack/portable_sack.go @@ -53,11 +53,13 @@ func main() { HandshakeTimeout: 500 * time.Millisecond, FinTimeout: 500 * time.Millisecond, ParallelParams: common.TracerouteParallelParams{ - MinTTL: 1, - MaxTTL: 30, - TracerouteTimeout: 1 * time.Second, - PollFrequency: 100 * time.Millisecond, - SendDelay: 10 * time.Millisecond, + TracerouteParams: common.TracerouteParams{ + MinTTL: 1, + MaxTTL: 30, + TracerouteTimeout: 1 * time.Second, + PollFrequency: 100 * time.Millisecond, + SendDelay: 10 * time.Millisecond, + }, }, LoosenICMPSrc: true, } diff --git a/pkg/networkpath/traceroute/sack/sack_driver_linux.go b/pkg/networkpath/traceroute/sack/sack_driver_linux.go index 6667e1ea64d6..12310ef07f13 100644 --- a/pkg/networkpath/traceroute/sack/sack_driver_linux.go +++ b/pkg/networkpath/traceroute/sack/sack_driver_linux.go @@ -8,31 +8,28 @@ package sack import ( - "context" "encoding/binary" "encoding/hex" "errors" "fmt" "math" - "net" "net/netip" "os" "time" "github.com/google/gopacket/layers" - "golang.org/x/net/ipv4" "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/common" - "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/filter" + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/packets" "github.com/DataDog/datadog-agent/pkg/util/log" ) type sackDriver struct { - tcpConn *ipv4.RawConn + sink packets.Sink - source common.PacketSource + source packets.Source buffer []byte - parser *common.FrameParser + parser *packets.FrameParser sendTimes []time.Time localAddr netip.Addr @@ -41,22 +38,23 @@ type sackDriver struct { state *sackTCPState } -func newSackDriver(ctx context.Context, params Params, localAddr netip.Addr) (*sackDriver, error) { - tcpConn, err := filter.MakeRawConn(ctx, &net.ListenConfig{}, "ip:tcp", localAddr) +func newSackDriver(params Params, localAddr netip.Addr) (*sackDriver, error) { + sink, err := packets.NewSinkUnix(params.Target.Addr()) if err != nil { - return nil, fmt.Errorf("newSackDriver failed to make TCP raw conn: %w", err) + return nil, fmt.Errorf("newSackDriver failed to make SinkUnix: %w", err) } - source, err := common.NewAFPacketSource() + + source, err := packets.NewAFPacketSource() if err != nil { - tcpConn.Close() - return nil, fmt.Errorf("newSackDriver failed to make ICMP raw conn: %w", err) + sink.Close() + return nil, fmt.Errorf("newSackDriver failed to make AFPacketSource: %w", err) } retval := &sackDriver{ - tcpConn: tcpConn, + sink: sink, source: source, buffer: make([]byte, 1024), - parser: common.NewFrameParser(), + parser: packets.NewFrameParser(), sendTimes: make([]time.Time, params.ParallelParams.MaxTTL+1), localAddr: localAddr, localPort: 0, // to be set by ReadHandshake() @@ -66,8 +64,8 @@ func newSackDriver(ctx context.Context, params Params, localAddr netip.Addr) (*s } func (s *sackDriver) Close() { - s.tcpConn.Close() s.source.Close() + s.sink.Close() } func (s *sackDriver) GetDriverInfo() common.TracerouteDriverInfo { @@ -96,15 +94,15 @@ func (s *sackDriver) SendProbe(ttl uint8) error { state: *s.state, } // TODO ipv6 - header, packet, err := gen.generateV4(ttl) + packet, err := gen.generateV4(ttl) if err != nil { return fmt.Errorf("sackDriver failed to generate packet: %w", err) } log.TraceFunc(func() string { - return fmt.Sprintf("sending packet: %+v %s\n", header, hex.EncodeToString(packet)) + return fmt.Sprintf("sending packet: %s\n", hex.EncodeToString(packet)) }) - err = s.tcpConn.WriteTo(header, packet, nil) + err = s.sink.WriteTo(packet, s.params.Target) if err != nil { return fmt.Errorf("sackDriver failed to WriteToIP: %w", err) } @@ -119,7 +117,7 @@ func (s *sackDriver) ReceiveProbe(timeout time.Duration) (*common.ProbeResponse, if err != nil { return nil, fmt.Errorf("sackDriver failed to SetReadDeadline: %w", err) } - err = common.ReadAndParse(s.source, s.buffer, s.parser) + err = packets.ReadAndParse(s.source, s.buffer, s.parser) if err != nil { return nil, err } @@ -127,9 +125,9 @@ func (s *sackDriver) ReceiveProbe(timeout time.Duration) (*common.ProbeResponse, return s.handleProbeLayers(s.parser) } -func (s *sackDriver) ExpectedIPPair() common.IPPair { +func (s *sackDriver) ExpectedIPPair() packets.IPPair { // from the target to us - return common.IPPair{ + return packets.IPPair{ SrcAddr: s.params.Target.Addr(), DstAddr: s.localAddr, } @@ -178,7 +176,7 @@ func (s *sackDriver) getRTTFromRelSeq(relSeq uint32) (time.Duration, error) { var errPacketDidNotMatchTraceroute = &common.ReceiveProbeNoPktError{Err: fmt.Errorf("packet did not match the traceroute")} -func (s *sackDriver) handleProbeLayers(parser *common.FrameParser) (*common.ProbeResponse, error) { +func (s *sackDriver) handleProbeLayers(parser *packets.FrameParser) (*common.ProbeResponse, error) { ipPair, err := parser.GetIPPair() if err != nil { return nil, fmt.Errorf("sackDriver failed to get IP pair: %w", err) @@ -224,10 +222,10 @@ func (s *sackDriver) handleProbeLayers(parser *common.FrameParser) (*common.Prob if err != nil { return nil, &common.BadPacketError{Err: fmt.Errorf("sackDriver failed to get ICMP info: %w", err)} } - if icmpInfo.ICMPType != common.TTLExceeded4 { + if icmpInfo.ICMPType != packets.TTLExceeded4 { return nil, errPacketDidNotMatchTraceroute } - tcpInfo, err := common.ParseTCPFirstBytes(icmpInfo.Payload) + tcpInfo, err := packets.ParseTCPFirstBytes(icmpInfo.Payload) if err != nil { return nil, &common.BadPacketError{Err: fmt.Errorf("sackDriver failed to parse TCP info: %w", err)} } @@ -283,12 +281,12 @@ func (s *sackDriver) ReadHandshake(localPort uint16) error { } for !s.IsHandshakeFinished() { // we should have already connected by now so it should be over quickly - err = common.ReadAndParse(s.source, s.buffer, s.parser) + err = packets.ReadAndParse(s.source, s.buffer, s.parser) if errors.Is(err, os.ErrDeadlineExceeded) { return fmt.Errorf("sackDriver readHandshake timed out") // deadline exceeded is normally retryable, so this comes second in order - } else if common.CheckParallelRetryable("ReadHandshake", err) { + } else if common.CheckProbeRetryable("ReadHandshake", err) { continue } else if err != nil { return fmt.Errorf("sackDriver failed to readAndParse: %w", err) diff --git a/pkg/networkpath/traceroute/sack/sack_packet_linux.go b/pkg/networkpath/traceroute/sack/sack_packet_linux.go index 7769c4eb3c6f..dd0209deee35 100644 --- a/pkg/networkpath/traceroute/sack/sack_packet_linux.go +++ b/pkg/networkpath/traceroute/sack/sack_packet_linux.go @@ -11,11 +11,9 @@ import ( "encoding/binary" "fmt" + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/packets" "github.com/google/gopacket" "github.com/google/gopacket/layers" - "golang.org/x/net/ipv4" - - "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/common" ) type sackTCPState struct { @@ -28,7 +26,7 @@ type sackTCPState struct { } type sackPacketGen struct { - ipPair common.IPPair + ipPair packets.IPPair sPort uint16 dPort uint16 @@ -83,10 +81,10 @@ func (s *sackPacketGen) generatePacketV4(ttl uint8) (*layers.IPv4, *layers.TCP, return ipLayer, tcpLayer, nil } -func (s *sackPacketGen) generateBufferV4(ttl uint8) (int, []byte, error) { +func (s *sackPacketGen) generateV4(ttl uint8) ([]byte, error) { ip4, tcp, err := s.generatePacketV4(ttl) if err != nil { - return 0, nil, fmt.Errorf("failed to generate packet: %w", err) + return nil, fmt.Errorf("failed to generate packet: %w", err) } buf := gopacket.NewSerializeBuffer() @@ -96,20 +94,7 @@ func (s *sackPacketGen) generateBufferV4(ttl uint8) (int, []byte, error) { } err = gopacket.SerializeLayers(buf, opts, ip4, tcp, gopacket.Payload([]byte{ttl})) if err != nil { - return 0, nil, fmt.Errorf("failed to serialize packet: %w", err) + return nil, fmt.Errorf("failed to serialize packet: %w", err) } - return 20, buf.Bytes(), nil -} - -func (s *sackPacketGen) generateV4(ttl uint8) (*ipv4.Header, []byte, error) { - headerLen, packet, err := s.generateBufferV4(ttl) - if err != nil { - return nil, nil, fmt.Errorf("failed to generate buffer: %w", err) - } - var ipHdr ipv4.Header - if err := ipHdr.Parse(packet[:headerLen]); err != nil { - return nil, nil, fmt.Errorf("failed to parse IP header of length %d: %w", headerLen, err) - } - - return &ipHdr, packet[headerLen:], nil + return buf.Bytes(), nil } diff --git a/pkg/networkpath/traceroute/sack/traceroute_sack_linux.go b/pkg/networkpath/traceroute/sack/traceroute_sack_linux.go index f1eb97185cb5..ba85cb1d1858 100644 --- a/pkg/networkpath/traceroute/sack/traceroute_sack_linux.go +++ b/pkg/networkpath/traceroute/sack/traceroute_sack_linux.go @@ -80,7 +80,7 @@ func runSackTraceroute(ctx context.Context, p Params) (*sackResult, error) { defer cancel() // create the raw packet connection which watches for TCP/ICMP responses - driver, err := newSackDriver(ctx, p, local.AddrPort().Addr()) + driver, err := newSackDriver(p, local.AddrPort().Addr()) if err != nil { return nil, fmt.Errorf("failed to init sack driver: %w", err) } @@ -136,7 +136,7 @@ func RunSackTraceroute(ctx context.Context, p Params) (*common.Results, error) { return nil, fmt.Errorf("sack traceroute failed: %w", err) } - hops, err := common.ToHops(p.ParallelParams, sackResult.Hops) + hops, err := common.ToHops(p.ParallelParams.TracerouteParams, sackResult.Hops) if err != nil { return nil, fmt.Errorf("sack traceroute ToHops failed: %w", err) } diff --git a/pkg/networkpath/traceroute/tcp/portable_syn/portable_syn.go b/pkg/networkpath/traceroute/tcp/portable_syn/portable_syn.go new file mode 100644 index 000000000000..d9b79ce50c33 --- /dev/null +++ b/pkg/networkpath/traceroute/tcp/portable_syn/portable_syn.go @@ -0,0 +1,67 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +// Package main contains a portable binary to easily run SYN traceroutes +package main + +import ( + "encoding/json" + "fmt" + "net/netip" + "os" + "time" + + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/tcp" + + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" + pkglogsetup "github.com/DataDog/datadog-agent/pkg/util/log/setup" +) + +func main() { + loglevel := os.Getenv("LOG_LEVEL") + if loglevel == "" { + loglevel = "warn" + } + + err := pkglogsetup.SetupLogger( + pkglogsetup.LoggerName("tcp"), + loglevel, + "", + "", + false, + true, + false, + pkgconfigsetup.Datadog(), + ) + if err != nil { + fmt.Printf("SetupLogger failed: %s\n", err) + os.Exit(1) + } + + if len(os.Args) < 2 { + println("Usage: portable_syn ") + os.Exit(1) + } + target := netip.MustParseAddrPort(os.Args[1]) + compatibilityMode := os.Getenv("COMPAT") == "true" + + cfg := tcp.NewTCPv4(target.Addr().AsSlice(), target.Port(), 1, 1, 30, 10*time.Millisecond, 1*time.Second, compatibilityMode) + + params := tcp.Params{ + LoosenICMPSrc: true, + } + + results, err := cfg.Traceroute(params) + if err != nil { + fmt.Printf("Traceroute failed: %s\n", err) + os.Exit(1) + } + json, err := json.MarshalIndent(results, "", " ") + if err != nil { + fmt.Printf("Error marshalling results: %s\n", err) + os.Exit(1) + } + println(string(json)) +} diff --git a/pkg/networkpath/traceroute/tcp/tcp_driver.go b/pkg/networkpath/traceroute/tcp/tcp_driver.go new file mode 100644 index 000000000000..5ead4e3b2af9 --- /dev/null +++ b/pkg/networkpath/traceroute/tcp/tcp_driver.go @@ -0,0 +1,316 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package tcp + +import ( + "errors" + "fmt" + "math/rand" + "net" + "net/netip" + "sync" + "time" + + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/common" + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/packets" + "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/google/gopacket/layers" +) + +// Params is extra TCP parameters specific to the tcpDriver +// TODO combine this with TCPv4? +type Params struct { + // LoosenICMPSrc disables checking the source IP/port in ICMP payloads when enabled. + // Reason: Some environments don't properly translate the payload of an ICMP TTL exceeded + // packet meaning you can't trust the source address to correspond to your own private IP. + LoosenICMPSrc bool +} + +type probeData struct { + sendTime time.Time + ttl uint8 + packetID uint16 + seqNum uint32 +} + +type tcpDriver struct { + config *TCPv4 + params Params + + sink packets.Sink + + source packets.Source + buffer []byte + parser *packets.FrameParser + + // mu guards against concurrent use of sentProbes + mu sync.Mutex + sentProbes []probeData + + // if CompatibilityMode is enabled, we randomize the packet ID starting from this base + basePacketID uint16 + // if CompatibilityMode is enabled, we store a single seqNum for the duration of the traceroute + seqNum uint32 + + portReservation net.Listener +} + +var _ common.TracerouteDriver = &tcpDriver{} + +func newTCPDriver(config *TCPv4, params Params, sink packets.Sink, source packets.Source) (*tcpDriver, error) { + var basePacketID uint16 + var seqNum uint32 + if config.CompatibilityMode { + basePacketID = uint16(rand.Uint32()) + seqNum = rand.Uint32() + } + + addr, conn, err := common.LocalAddrForHost(config.Target, config.DestPort) + if err != nil { + return nil, fmt.Errorf("failed to get local address for target: %w", err) + } + conn.Close() // we don't need the UDP port here + config.srcIP = addr.IP + + port, tcpListener, err := reserveLocalPort() + if err != nil { + return nil, fmt.Errorf("failed to create TCP listener: %w", err) + } + config.srcPort = port + + return &tcpDriver{ + config: config, + params: params, + + sink: sink, + + source: source, + buffer: make([]byte, 1024), + parser: packets.NewFrameParser(), + + sentProbes: nil, + + basePacketID: basePacketID, + seqNum: seqNum, + + portReservation: tcpListener, + }, nil +} + +func (t *tcpDriver) storeProbe(probeData probeData) { + t.mu.Lock() + defer t.mu.Unlock() + t.sentProbes = append(t.sentProbes, probeData) +} + +func (t *tcpDriver) findMatchingProbe(packetID *uint16, seqNum *uint32) probeData { + t.mu.Lock() + defer t.mu.Unlock() + for _, probe := range t.sentProbes { + if packetID != nil && probe.packetID != *packetID { + continue + } + if seqNum != nil && probe.seqNum != *seqNum { + continue + } + return probe + } + return probeData{} +} +func (t *tcpDriver) getLastSentProbe() (probeData, error) { + t.mu.Lock() + defer t.mu.Unlock() + if t.GetDriverInfo().SupportsParallel { + return probeData{}, fmt.Errorf("getLastSentProbe is only possible in a serial traceroute") + } + + if len(t.sentProbes) == 0 { + return probeData{}, fmt.Errorf("getLastSentProbe was called before we sent anything") + } + return t.sentProbes[len(t.sentProbes)-1], nil +} + +func (t *tcpDriver) getLocalAddrPort() netip.AddrPort { + addr, _ := common.UnmappedAddrFromSlice(t.config.srcIP) + return netip.AddrPortFrom(addr, t.config.srcPort) + +} + +func (t *tcpDriver) getTargetAddrPort() netip.AddrPort { + addr, _ := common.UnmappedAddrFromSlice(t.config.Target) + return netip.AddrPortFrom(addr, t.config.DestPort) +} + +// GetDriverInfo returns metadata about this driver +func (t *tcpDriver) GetDriverInfo() common.TracerouteDriverInfo { + return common.TracerouteDriverInfo{ + // in compatibility mode, we can't support parallel + SupportsParallel: !t.config.CompatibilityMode, + } +} + +func (t *tcpDriver) getNextPacketIDAndSeqNum(ttl uint8) (uint16, uint32) { + if t.config.CompatibilityMode { + return t.basePacketID + uint16(ttl), t.seqNum + } + return 41821, rand.Uint32() +} + +// SendProbe sends a traceroute packet with a specific TTL +func (t *tcpDriver) SendProbe(ttl uint8) error { + packetID, seqNum := t.getNextPacketIDAndSeqNum(ttl) + _, buffer, _, err := t.config.createRawTCPSynBuffer(packetID, seqNum, int(ttl)) + if err != nil { + return fmt.Errorf("tcpDriver SendProbe failed to createRawTCPSynBuffer: %w", err) + } + + log.Tracef("sending probe with ttl=%d, packetID=%d, seqNum=%d", ttl, packetID, seqNum) + t.storeProbe(probeData{ + sendTime: time.Now(), + ttl: ttl, + packetID: packetID, + seqNum: seqNum, + }) + + err = t.sink.WriteTo(buffer, t.getTargetAddrPort()) + if err != nil { + return fmt.Errorf("tcpDriver SendProbe failed to write packet: %w", err) + } + return nil +} + +// ReceiveProbe polls to get a traceroute response with a timeout. +func (t *tcpDriver) ReceiveProbe(timeout time.Duration) (*common.ProbeResponse, error) { + err := t.source.SetReadDeadline(time.Now().Add(timeout)) + if err != nil { + return nil, fmt.Errorf("tcpDriver failed to SetReadDeadline: %w", err) + } + + err = packets.ReadAndParse(t.source, t.buffer, t.parser) + if err != nil { + return nil, err + } + + return t.handleProbeLayers(t.parser) +} + +func (t *tcpDriver) ExpectedIPPair() packets.IPPair { + // from the target to us + return packets.IPPair{ + SrcAddr: t.getTargetAddrPort().Addr(), + DstAddr: t.getLocalAddrPort().Addr(), + } +} + +var errPacketDidNotMatchTraceroute = &common.ReceiveProbeNoPktError{Err: fmt.Errorf("packet did not match the traceroute")} + +func (t *tcpDriver) handleProbeLayers(parser *packets.FrameParser) (*common.ProbeResponse, error) { + ipPair, err := parser.GetIPPair() + if err != nil { + return nil, fmt.Errorf("tcpDriver failed to get IP pair: %w", err) + } + + var probe probeData + var isDest bool + + switch parser.GetTransportLayer() { + case layers.LayerTypeTCP: + // we only care about SYNACK + if !(parser.TCP.SYN && parser.TCP.ACK) { + return nil, errPacketDidNotMatchTraceroute + } + log.Tracef("saw synack") + + if ipPair != t.ExpectedIPPair() { + return nil, errPacketDidNotMatchTraceroute + } + log.Tracef("ports?") + // make sure the ports match + if t.config.DestPort != uint16(parser.TCP.SrcPort) || + t.config.srcPort != uint16(parser.TCP.DstPort) { + return nil, errPacketDidNotMatchTraceroute + } + + expectedSeq := parser.TCP.Ack - 1 + + log.Tracef("time to go") + if t.config.CompatibilityMode { + // in mode, traceroute is serialized and all we can do is use the most recent probe + lastProbe, err := t.getLastSentProbe() + if err != nil { + return nil, fmt.Errorf("tcpDriver handleProbeLayers failed to getLastSentProbe: %w", err) + } + log.Tracef("last probe %+v", lastProbe) + if lastProbe.seqNum == expectedSeq { + probe = lastProbe + } + } else { + // find the probe with the matching seq number + probe = t.findMatchingProbe(nil, &expectedSeq) + } + + if probe == (probeData{}) { + log.Warnf("got synack but couldn't find probe matching seqNum=%d", expectedSeq) + + } + isDest = true + case layers.LayerTypeICMPv4: + log.Tracef("saw icmp") + icmpInfo, err := parser.GetICMPInfo() + if err != nil { + return nil, &common.BadPacketError{Err: fmt.Errorf("sackDriver failed to get ICMP info: %w", err)} + } + if icmpInfo.ICMPType != packets.TTLExceeded4 { + return nil, errPacketDidNotMatchTraceroute + } + + // make sure the source/destination match + tcpInfo, err := packets.ParseTCPFirstBytes(icmpInfo.Payload) + if err != nil { + return nil, &common.BadPacketError{Err: fmt.Errorf("sackDriver failed to parse TCP info: %w", err)} + } + + icmpDst := netip.AddrPortFrom(icmpInfo.ICMPPair.DstAddr, tcpInfo.DstPort) + if icmpDst != t.getTargetAddrPort() { + return nil, errPacketDidNotMatchTraceroute + } + if !t.params.LoosenICMPSrc { + icmpSrc := netip.AddrPortFrom(icmpInfo.IPPair.SrcAddr, tcpInfo.SrcPort) + expectedSrc := t.getLocalAddrPort() + if icmpSrc != expectedSrc { + log.Tracef("icmp src mismatch. expected: %s actual: %s", expectedSrc, icmpSrc) + return nil, errPacketDidNotMatchTraceroute + } + } + + probe = t.findMatchingProbe(&icmpInfo.WrappedPacketID, &tcpInfo.Seq) + if probe == (probeData{}) { + log.Warnf("couldn't find probe matching packetID=%d and seqNum=%d", icmpInfo.WrappedPacketID, tcpInfo.Seq) + } + default: + return nil, errPacketDidNotMatchTraceroute + } + + if probe == (probeData{}) { + return nil, errPacketDidNotMatchTraceroute + } + rtt := time.Since(probe.sendTime) + + return &common.ProbeResponse{ + TTL: probe.ttl, + IP: ipPair.SrcAddr, + RTT: rtt, + IsDest: isDest, + }, nil +} + +// Close closes the tcpDriver +func (t *tcpDriver) Close() error { + sinkErr := t.sink.Close() + sourceErr := t.source.Close() + portErr := t.portReservation.Close() + return errors.Join(sinkErr, sourceErr, portErr) +} diff --git a/pkg/networkpath/traceroute/tcp/tcp_traceroute.go b/pkg/networkpath/traceroute/tcp/tcp_traceroute.go new file mode 100644 index 000000000000..657ef49dc055 --- /dev/null +++ b/pkg/networkpath/traceroute/tcp/tcp_traceroute.go @@ -0,0 +1,65 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package tcp + +import ( + "context" + "fmt" + "time" + + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/common" +) + +// Traceroute runs a TCP traceroute +func (t *TCPv4) Traceroute(driverParams Params) (*common.Results, error) { + driver, err := t.getTracerouteDriver(driverParams) + if err != nil { + return nil, err + } + + commonParams := common.TracerouteParams{ + MinTTL: t.MinTTL, + MaxTTL: t.MaxTTL, + TracerouteTimeout: t.Timeout, + PollFrequency: 100 * time.Millisecond, + SendDelay: t.Delay, + } + var resp []*common.ProbeResponse + if t.CompatibilityMode { + params := common.TracerouteSerialParams{ + TracerouteParams: commonParams, + } + resp, err = common.TracerouteSerial(context.Background(), driver, params) + if err != nil { + return nil, err + } + } else { + params := common.TracerouteParallelParams{ + TracerouteParams: commonParams, + } + ctx := context.Background() + resp, err = common.TracerouteParallel(ctx, driver, params) + if err != nil { + return nil, err + } + } + + hops, err := common.ToHops(commonParams, resp) + if err != nil { + return nil, fmt.Errorf("SYN traceroute ToHops failed: %w", err) + } + + result := &common.Results{ + Source: t.srcIP, + SourcePort: t.srcPort, + Target: t.Target, + DstPort: t.DestPort, + Hops: hops, + Tags: []string{"tcp_method:syn", fmt.Sprintf("tcp_compatibility_mode:%t", t.CompatibilityMode)}, + } + + return result, nil +} diff --git a/pkg/networkpath/traceroute/tcp/tcp_traceroute_unix.go b/pkg/networkpath/traceroute/tcp/tcp_traceroute_unix.go new file mode 100644 index 000000000000..8788df451947 --- /dev/null +++ b/pkg/networkpath/traceroute/tcp/tcp_traceroute_unix.go @@ -0,0 +1,39 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2025-present Datadog, Inc. + +package tcp + +import ( + "fmt" + + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/common" + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/packets" +) + +func (t *TCPv4) getTracerouteDriver(params Params) (*tcpDriver, error) { + targetAddr, ok := common.UnmappedAddrFromSlice(t.Target) + if !ok { + return nil, fmt.Errorf("failed to get netipAddr for target %s", t.Target) + } + t.Target = targetAddr.AsSlice() + + sink, err := packets.NewSinkUnix(targetAddr) + if err != nil { + return nil, fmt.Errorf("Traceroute failed to make SinkUnix: %w", err) + } + + source, err := packets.NewAFPacketSource() + if err != nil { + sink.Close() + return nil, fmt.Errorf("Traceroute failed to make AFPacketSource: %w", err) + } + + driver, err := newTCPDriver(t, params, sink, source) + if err != nil { + return nil, fmt.Errorf("getTracerouteDriver failed to make newTCPDriver: %w", err) + } + + return driver, nil +} diff --git a/pkg/networkpath/traceroute/tcp/tcpv4_unix.go b/pkg/networkpath/traceroute/tcp/tcpv4_unix.go index 7527d0e4a94a..89e4c652999a 100644 --- a/pkg/networkpath/traceroute/tcp/tcpv4_unix.go +++ b/pkg/networkpath/traceroute/tcp/tcpv4_unix.go @@ -19,7 +19,7 @@ import ( "golang.org/x/net/ipv4" "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/common" - "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/filter" + "github.com/DataDog/datadog-agent/pkg/networkpath/traceroute/packets" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -66,7 +66,7 @@ func (t *TCPv4) TracerouteSequential() (*common.Results, error) { t.srcPort = port // create a socket filter for TCP to reduce CPU usage - filterCfg := filter.TCP4FilterConfig{ + filterCfg := packets.TCPFilterConfig{ Src: netip.AddrPortFrom(targetAddr, t.DestPort), Dst: netip.AddrPortFrom(localAddr, port), } @@ -76,17 +76,17 @@ func (t *TCPv4) TracerouteSequential() (*common.Results, error) { } tcpLc := &net.ListenConfig{ Control: func(_network, _address string, c syscall.RawConn) error { - return filter.SetBPFAndDrain(c, filterProg) + return packets.SetBPFAndDrain(c, filterProg) }, } log.Tracef("filtered on: %+v", filterCfg) // create raw sockets to listen to TCP and ICMP - rawIcmpConn, err := filter.MakeRawConn(context.Background(), &net.ListenConfig{}, "ip4:icmp", localAddr) + rawIcmpConn, err := packets.MakeRawConn(context.Background(), &net.ListenConfig{}, "ip4:icmp", localAddr) if err != nil { return nil, fmt.Errorf("failed to make ICMP raw socket: %w", err) } - rawTCPConn, err := filter.MakeRawConn(context.Background(), tcpLc, "ip4:tcp", localAddr) + rawTCPConn, err := packets.MakeRawConn(context.Background(), tcpLc, "ip4:tcp", localAddr) if err != nil { return nil, fmt.Errorf("failed to make TCP raw socket: %w", err) } diff --git a/tasks/system_probe.py b/tasks/system_probe.py index eca4b795b13f..90447f8a0809 100644 --- a/tasks/system_probe.py +++ b/tasks/system_probe.py @@ -58,7 +58,7 @@ "./pkg/gpu/...", "./pkg/system-probe/config/...", "./comp/metadata/inventoryagent/...", - "./pkg/networkpath/traceroute/filter/...", + "./pkg/networkpath/traceroute/packets/...", ] TEST_PACKAGES = " ".join(TEST_PACKAGES_LIST) # change `timeouts` in `test/new-e2e/system-probe/test-runner/main.go` if you change them here