Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ The following emojis are used to highlight certain changes:

### Fixed

- `bitswap/network/bsnet`: stop marking a peer unresponsive on a single failed send attempt. `send()` is retried by `multiAttempt()`, which already marks the peer once all retries are exhausted; marking on the first failure could permanently sideline a peer that had just reconnected (the disconnect notification being suppressed), hanging fetches from it until it fully disconnected. [#1164](https://github.com/ipfs/boxo/pull/1164)

### Security

- `tracing`: bumped OpenTelemetry OTLP exporters to [v1.43.0](https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.43.0), which caps the HTTP exporter's response body at 4 MiB. A hostile or man-in-the-middle collector could otherwise exhaust its memory ([CVE-2026-39882](https://github.com/open-telemetry/opentelemetry-go/security/advisories/GHSA-w8rr-5gcm-pp58)). The gRPC exporter is unaffected.
Expand Down
2 changes: 0 additions & 2 deletions bitswap/network/bsnet/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func (s *streamMessageSender) send(ctx context.Context, msg bsmsg.BitSwapMessage
stream, err := s.Connect(ctx)
if err != nil {
log.Infof("failed to open stream to %s: %s", s.to, err)
s.bsnet.connectEvtMgr.MarkUnresponsive(s.to)
Comment thread
gammazero marked this conversation as resolved.
return err
}

Expand All @@ -254,7 +253,6 @@ func (s *streamMessageSender) send(ctx context.Context, msg bsmsg.BitSwapMessage
timeout := s.opts.SendTimeout - time.Since(start)
if err = s.bsnet.msgToStream(ctx, stream, msg, timeout); err != nil {
log.Infof("failed to send message to %s: %s", s.to, err)
s.bsnet.connectEvtMgr.MarkUnresponsive(s.to)
return err
}

Expand Down
113 changes: 113 additions & 0 deletions bitswap/network/bsnet/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,119 @@ func TestMessageResendAfterError(t *testing.T) {
}
}

// TestMessageSendErrorDoesNotMarkUnresponsive checks that a single recoverable
// send error does not mark the peer unresponsive. send() is retried by
// multiAttempt(), which only marks the peer once all retries are exhausted;
// marking on the first failure could permanently sideline a peer whose
// connection is in fact still live (e.g. right after a reconnect, where the
// disconnect notification is suppressed), with no recovery path.
// Regression test for https://github.com/ipfs/boxo/issues/1163.
func TestMessageSendErrorDoesNotMarkUnresponsive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()

eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

testSendErrorBackoff := 100 * time.Millisecond
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: testSendErrorBackoff,
})
if err != nil {
t.Fatal(err)
}
defer ms.Reset()

// Fail the next send, then stop failing so the retry succeeds.
eh.setError(errMockNetErr)
go func() {
time.Sleep(testSendErrorBackoff / 2)
eh.setError(nil)
}()

if err = ms.SendMsg(ctx, msg); err != nil {
t.Fatal(err)
}

select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case <-r2.messageReceived:
}

// The single, recovered send error must not have marked the peer
// unresponsive. Marking it would emit a PeerDisconnected with no
// recovery (the peer stays connected, so no further event arrives),
// leaving the peer dropped from the receiver's set. Wait for the
// connection state to settle, then assert the peer is still connected.
require.Eventually(t, func() bool {
r1.mu.Lock()
defer r1.mu.Unlock()
_, ok := r1.peers[p2.ID()]
return ok
}, time.Second, 20*time.Millisecond, "peer was marked unresponsive after a single recoverable send error")

// Give a late erroneous PeerDisconnected a chance to land, and confirm
// the peer remains connected.
time.Sleep(300 * time.Millisecond)
r1.mu.Lock()
_, stillConnected := r1.peers[p2.ID()]
r1.mu.Unlock()
if !stillConnected {
t.Fatal("peer was marked unresponsive after a single recoverable send error")
}
}
Comment thread
gammazero marked this conversation as resolved.

// TestMessageSendErrorMarksUnresponsive checks that a peer whose sends keep
// failing is eventually marked unresponsive once multiAttempt() exhausts all
// retries, emitting a PeerDisconnected. This is the complement of
// TestMessageSendErrorDoesNotMarkUnresponsive: a single recoverable error must
// not mark the peer, but a persistently failing one must.
func TestMessageSendErrorMarksUnresponsive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()

eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: 10 * time.Millisecond,
})
if err != nil {
t.Fatal(err)
}
defer ms.Reset()

// Keep failing every send so all retries are exhausted.
eh.setError(errMockNetErr)

// SendMsg must fail once retries are exhausted.
if err = ms.SendMsg(ctx, msg); err == nil {
t.Fatal("expected SendMsg to fail after exhausting retries")
}

// The exhausted retries must have marked the peer unresponsive, which
// emits a PeerDisconnected, dropping it from the receiver's set.
require.Eventually(t, func() bool {
r1.mu.Lock()
defer r1.mu.Unlock()
_, ok := r1.peers[p2.ID()]
return !ok
}, time.Second, 20*time.Millisecond, "unresponsive peer was not disconnected after exhausting retries")
}

func TestMessageSendTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
Loading