Skip to content

Commit bb93e71

Browse files
Add a packet deduplicator and tests using a bitset window
1 parent 118cd9f commit bb93e71

File tree

5 files changed

+362
-50
lines changed

5 files changed

+362
-50
lines changed

src/extension/network/NUClearNetwork.cpp

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -675,39 +675,35 @@ namespace extension {
675675
// Check if this packet is a retransmission of data
676676
if (header.type == DATA_RETRANSMISSION) {
677677

678-
// See if we recently processed this packet
679-
// NOLINTNEXTLINE(readability-qualified-auto) MSVC disagrees
680-
auto it = std::find(remote->recent_packets.begin(),
681-
remote->recent_packets.end(),
682-
packet.packet_id);
683-
684678
// We recently processed this packet, this is just a failed ack
685-
// Send the ack again if it was reliable
686-
if (it != remote->recent_packets.end() && packet.reliable) {
679+
if (remote->deduplicator.is_duplicate(packet.packet_id)) {
687680

688-
// Allocate room for the whole ack packet
689-
std::vector<uint8_t> r(sizeof(ACKPacket) + (packet.packet_count / 8), 0);
690-
ACKPacket& response = *reinterpret_cast<ACKPacket*>(r.data());
691-
response = ACKPacket();
692-
response.packet_id = packet.packet_id;
693-
response.packet_no = packet.packet_no;
694-
response.packet_count = packet.packet_count;
681+
// Send the ack again if it was reliable
682+
if (packet.reliable) {
683+
// Allocate room for the whole ack packet
684+
std::vector<uint8_t> r(sizeof(ACKPacket) + (packet.packet_count / 8), 0);
685+
ACKPacket& response = *reinterpret_cast<ACKPacket*>(r.data());
686+
response = ACKPacket();
687+
response.packet_id = packet.packet_id;
688+
response.packet_no = packet.packet_no;
689+
response.packet_count = packet.packet_count;
695690

696-
// Set the bits for all packets (we got the whole thing)
697-
for (int i = 0; i < packet.packet_count; ++i) {
698-
(&response.packets)[i / 8] |= uint8_t(1 << (i % 8));
699-
}
691+
// Set the bits for all packets (we got the whole thing)
692+
for (int i = 0; i < packet.packet_count; ++i) {
693+
(&response.packets)[i / 8] |= uint8_t(1 << (i % 8));
694+
}
700695

701-
// Make who we are sending it to into a useable address
702-
const sock_t& to = remote->target;
696+
// Make who we are sending it to into a useable address
697+
const sock_t& to = remote->target;
703698

704-
// Send the packet
705-
::sendto(data_fd,
706-
reinterpret_cast<const char*>(r.data()),
707-
static_cast<socklen_t>(r.size()),
708-
0,
709-
&to.sock,
710-
to.size());
699+
// Send the packet
700+
::sendto(data_fd,
701+
reinterpret_cast<const char*>(r.data()),
702+
static_cast<socklen_t>(r.size()),
703+
0,
704+
&to.sock,
705+
to.size());
706+
}
711707

712708
// We don't need to process this packet we already did
713709
return;
@@ -739,13 +735,11 @@ namespace extension {
739735
0,
740736
&to.sock,
741737
to.size());
742-
743-
// Set this packet to have been recently received
744-
remote->recent_packets[remote->recent_packets_index
745-
.fetch_add(1, std::memory_order_relaxed)] =
746-
packet.packet_id;
747738
}
748739

740+
// Add the packet to our deduplicator
741+
remote->deduplicator.add_packet(packet.packet_id);
742+
749743
packet_callback(*remote, packet.hash, packet.reliable, std::move(out));
750744
}
751745
else {
@@ -851,17 +845,12 @@ namespace extension {
851845
&part.data + p.second.size() - sizeof(DataPacket) + 1);
852846
}
853847

848+
// Add the packet to our deduplicator
849+
remote->deduplicator.add_packet(packet.packet_id);
850+
854851
// Send our assembled data packet
855852
packet_callback(*remote, packet.hash, packet.reliable, std::move(out));
856853

857-
// If the packet was reliable add that it was recently received
858-
if (packet.reliable) {
859-
// Set this packet to have been recently received
860-
remote->recent_packets[remote->recent_packets_index
861-
.fetch_add(1, std::memory_order_relaxed)] =
862-
packet.packet_id;
863-
}
864-
865854
// We have completed this packet, discard the data
866855
assemblers.erase(assemblers.find(packet.packet_id));
867856
}

src/extension/network/NUClearNetwork.hpp

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
#include "../../util/network/sock_t.hpp"
4141
#include "../../util/platform.hpp"
42+
#include "PacketDeduplicator.hpp"
4243
#include "RTTEstimator.hpp"
4344
#include "wire_protocol.hpp"
4445

@@ -57,22 +58,14 @@ namespace extension {
5758
std::string name,
5859
const sock_t& target,
5960
const std::chrono::steady_clock::time_point& last_update = std::chrono::steady_clock::now())
60-
: name(std::move(name)), target(target), last_update(last_update) {
61-
62-
// Set our recent packets to an invalid value
63-
recent_packets.fill(-1);
64-
}
61+
: name(std::move(name)), target(target), last_update(last_update) {}
6562

6663
/// The name of the remote target
6764
std::string name;
6865
/// The socket address for the remote target
6966
sock_t target{};
7067
/// When we last received data from the remote target
7168
std::chrono::steady_clock::time_point last_update;
72-
/// A list of the last n packet groups to be received
73-
std::array<int, std::numeric_limits<uint8_t>::max()> recent_packets{};
74-
/// An index for the recent_packets (circular buffer)
75-
std::atomic<uint8_t> recent_packets_index{0};
7669
/// Mutex to protect the fragmented packet storage
7770
std::mutex assemblers_mutex;
7871
/// Storage for fragmented packets while we build them
@@ -82,6 +75,9 @@ namespace extension {
8275

8376
/// RTT estimator for this network target
8477
RTTEstimator rtt;
78+
79+
/// Packet deduplicator for this network target
80+
PacketDeduplicator deduplicator;
8581
};
8682

8783
NUClearNetwork() = default;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2025 NUClear Contributors
5+
*
6+
* This file is part of the NUClear codebase.
7+
* See https://github.com/Fastcode/NUClear for further info.
8+
*
9+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
10+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
11+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
12+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
13+
*
14+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
15+
* Software.
16+
*
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
19+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
20+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21+
*/
22+
#include "PacketDeduplicator.hpp"
23+
24+
namespace NUClear {
25+
namespace extension {
26+
namespace network {
27+
28+
PacketDeduplicator::PacketDeduplicator() : newest_seen(0) {}
29+
30+
bool PacketDeduplicator::is_duplicate(uint16_t packet_id) const {
31+
// If we haven't seen any packets yet, nothing is a duplicate
32+
if (!initialized) {
33+
return false;
34+
}
35+
36+
// Calculate relative position in window using unsigned subtraction
37+
uint16_t relative_id = newest_seen - packet_id;
38+
39+
// If the packet is too old or too new, it's not a duplicate
40+
if (relative_id >= 256) {
41+
return false;
42+
}
43+
44+
return window[relative_id];
45+
}
46+
47+
void PacketDeduplicator::add_packet(uint16_t packet_id) {
48+
// If this is our first packet, just set it as newest_seen
49+
if (!initialized) {
50+
newest_seen = packet_id;
51+
window[0] = true;
52+
initialized = true;
53+
return;
54+
}
55+
56+
// Calculate relative position in window using unsigned subtraction
57+
uint16_t relative_id = newest_seen - packet_id;
58+
59+
// If the distance is more than half the range, the packet is newer than our newest_seen
60+
if (relative_id > 32768) {
61+
// Calculate how far to shift to make this packet our newest
62+
uint16_t shift_amount = packet_id - newest_seen;
63+
window <<= shift_amount;
64+
newest_seen = packet_id;
65+
window[0] = true;
66+
}
67+
// Packet is recent enough to be counted
68+
else if (relative_id < 256) {
69+
window[relative_id] = true;
70+
}
71+
}
72+
73+
} // namespace network
74+
} // namespace extension
75+
} // namespace NUClear
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2025 NUClear Contributors
5+
*
6+
* This file is part of the NUClear codebase.
7+
* See https://github.com/Fastcode/NUClear for further info.
8+
*
9+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
10+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
11+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
12+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
13+
*
14+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
15+
* Software.
16+
*
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
19+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
20+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21+
*/
22+
#ifndef NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP
23+
#define NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP
24+
25+
#include <bitset>
26+
#include <cstdint>
27+
28+
namespace NUClear {
29+
namespace extension {
30+
namespace network {
31+
32+
/**
33+
* A class that implements a sliding window bitset for packet deduplication.
34+
* Maintains a 256-bit window of recently seen packet IDs, sliding forward as new packets are added.
35+
*/
36+
class PacketDeduplicator {
37+
public:
38+
PacketDeduplicator();
39+
40+
/**
41+
* Check if a packet ID has been seen recently
42+
*
43+
* @param packet_id The packet ID to check
44+
*
45+
* @return true if the packet has been seen recently, false otherwise
46+
*/
47+
bool is_duplicate(uint16_t packet_id) const;
48+
49+
/**
50+
* Add a packet ID to the window
51+
*
52+
* @param packet_id The packet ID to add
53+
*/
54+
void add_packet(uint16_t packet_id);
55+
56+
private:
57+
/// Whether we've seen any packets yet
58+
bool initialized{false};
59+
/// The newest packet ID we've seen
60+
uint16_t newest_seen;
61+
/// The 256-bit window of seen packets (newest at 0, older at higher indices)
62+
std::bitset<256> window;
63+
};
64+
65+
} // namespace network
66+
} // namespace extension
67+
} // namespace NUClear
68+
69+
#endif // NUCLEAR_EXTENSION_NETWORK_PACKET_DEDUPLICATOR_HPP

0 commit comments

Comments
 (0)