-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreassembler.cc
More file actions
112 lines (94 loc) · 3.58 KB
/
reassembler.cc
File metadata and controls
112 lines (94 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include "reassembler.hh"
#include "debug.hh"
#include <cassert>
using namespace std;
void Reassembler::insert( uint64_t first_index, string data, bool is_last_substring )
{
// Mark first_close_index_
if ( is_last_substring ) {
first_close_index_ = first_index + data.size();
is_closed_ = true;
}
uint64_t first_unacceptable_index = first_unassemebled_index_ + output_.writer().available_capacity();
// Segment index range: [first_index, end_index)
uint64_t end_index = first_index + data.size();
// Do nothing if the entire new string is after first_unacceptable_index or first_close_index_
// or is before first_unassemebled_index_
if ( first_index >= first_unacceptable_index || ( is_closed_ && first_index > first_close_index_ ) || end_index < first_unassemebled_index_ ) {
return;
}
// Remove assembled data
if ( first_index < first_unassemebled_index_ && data.size() > 0 ) {
data = data.substr(first_unassemebled_index_ - first_index);
first_index = first_unassemebled_index_;
end_index = first_index + data.size();
}
bool is_handled = data.size() == 0 ? true : false ; // whether the new string is totally handled by the existing segment(s)
list<Segment>::iterator it = segments_.begin();
for ( ; !is_handled && it != segments_.end(); ++it ) {
uint64_t seg_end_index = it->first_index + it->data.size();
// the string is not overlap with this segment
// and before this segment
// and not able to merge
if ( end_index < it->first_index ) {
break;
}
// merge the new string's front part: [first_index, it->first_index)
if ( first_index < it->first_index && end_index >= it->first_index ) {
it->data = data.substr(0, it->first_index - first_index) + it->data;
it->first_index = first_index;
assert(seg_end_index == it->first_index + it->data.size());
}
// totally overlap with this segment after merge
if ( it->first_index <= first_index && end_index <= seg_end_index ) {
is_handled = true;
break;
}
// Trim the new string to [seg_end_index, end_index)
// if first_index < seg_end_index
if ( first_index < seg_end_index ) {
data = data.substr(seg_end_index - first_index);
first_index = seg_end_index;
}
}
if ( !is_handled ) {
// Discard the bytes beyond the stream's available capacity
// or discard the bytes start from first_close_index_
if (is_closed_) {
first_unacceptable_index = min(first_close_index_, first_unacceptable_index);
}
if (end_index > first_unacceptable_index) {
data = data.substr(0, data.size() - (end_index - first_unacceptable_index));
}
if (data.size() > 0) {
segments_.insert(it, Segment(first_index, data, false));
}
}
// Push data
for ( auto &seg : segments_ ) {
if ( seg.first_index == first_unassemebled_index_ ) {
output_.writer().push(seg.data);
uint64_t seg_end_index = seg.first_index + seg.data.size();
first_unassemebled_index_ = seg_end_index;
seg.pushed = true;
} else {
break;
}
}
// Clean up the pushed segments
segments_.remove_if([](Segment seg){ return seg.pushed; });
// Close byte stream if all bytes are pushed
if ( is_closed_ && first_unassemebled_index_ == first_close_index_ ) {
output_.writer().close();
}
}
// How many bytes are stored in the Reassembler itself?
// This function is for testing only; don't add extra state to support it.
uint64_t Reassembler::count_bytes_pending() const
{
uint64_t total = 0;
for (const auto& seg : segments_) {
total += seg.data.size();
}
return total;
}