Skip to content

Commit ba5a953

Browse files
committed
validate flow output nfstream and cicflowmeter
1 parent bf4f5da commit ba5a953

5 files changed

Lines changed: 163 additions & 142 deletions

File tree

rustiflow/src/flows/cic_flow.rs

Lines changed: 90 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,18 @@ impl Flow for CicFlow {
133133
self.basic_flow.get_first_timestamp(),
134134
self.basic_flow.get_flow_duration_usec(),
135135
// Packet Length Stats (fwd & bwd)
136-
self.packet_len_stats.fwd_packet_len.get_count(),
137-
self.packet_len_stats.bwd_packet_len.get_count(),
138-
self.packet_len_stats.fwd_packet_len.get_total(),
139-
self.packet_len_stats.bwd_packet_len.get_total(),
140-
self.packet_len_stats.fwd_packet_len.get_max(),
141-
self.packet_len_stats.fwd_packet_len.get_min(),
142-
self.packet_len_stats.fwd_packet_len.get_mean(),
143-
self.packet_len_stats.fwd_packet_len.get_std(),
144-
self.packet_len_stats.bwd_packet_len.get_max(),
145-
self.packet_len_stats.bwd_packet_len.get_min(),
146-
self.packet_len_stats.bwd_packet_len.get_mean(),
147-
self.packet_len_stats.bwd_packet_len.get_std(),
136+
self.payload_len_stats.fwd_payload_len.get_count(),
137+
self.payload_len_stats.bwd_payload_len.get_count(),
138+
self.payload_len_stats.fwd_payload_len.get_total(),
139+
self.payload_len_stats.bwd_payload_len.get_total(),
140+
self.payload_len_stats.fwd_payload_len.get_max(),
141+
self.payload_len_stats.fwd_payload_len.get_min(),
142+
self.payload_len_stats.fwd_payload_len.get_mean(),
143+
self.payload_len_stats.fwd_payload_len.get_std(),
144+
self.payload_len_stats.bwd_payload_len.get_max(),
145+
self.payload_len_stats.bwd_payload_len.get_min(),
146+
self.payload_len_stats.bwd_payload_len.get_mean(),
147+
self.payload_len_stats.bwd_payload_len.get_std(),
148148
// Rate Stats (Flow)
149149
safe_per_second_rate(
150150
self.packet_len_stats.flow_total(),
@@ -154,21 +154,21 @@ impl Flow for CicFlow {
154154
self.packet_len_stats.flow_count() as f64,
155155
self.basic_flow.get_flow_duration_usec() as f64
156156
),
157-
// IAT Stats
158-
self.iat_stats.iat.get_mean(),
159-
self.iat_stats.iat.get_std(),
160-
self.iat_stats.iat.get_max(),
161-
self.iat_stats.iat.get_min(),
162-
self.iat_stats.fwd_iat.get_total(),
163-
self.iat_stats.fwd_iat.get_mean(),
164-
self.iat_stats.fwd_iat.get_std(),
165-
self.iat_stats.fwd_iat.get_max(),
166-
self.iat_stats.fwd_iat.get_min(),
167-
self.iat_stats.bwd_iat.get_total(),
168-
self.iat_stats.bwd_iat.get_mean(),
169-
self.iat_stats.bwd_iat.get_std(),
170-
self.iat_stats.bwd_iat.get_max(),
171-
self.iat_stats.bwd_iat.get_min(),
157+
// IAT Stats in us instead of ms
158+
self.iat_stats.iat.get_mean() * 1000.0,
159+
self.iat_stats.iat.get_std() * 1000.0,
160+
self.iat_stats.iat.get_max() * 1000.0,
161+
self.iat_stats.iat.get_min() * 1000.0,
162+
self.iat_stats.fwd_iat.get_total() * 1000.0,
163+
self.iat_stats.fwd_iat.get_mean() * 1000.0,
164+
self.iat_stats.fwd_iat.get_std() * 1000.0,
165+
self.iat_stats.fwd_iat.get_max() * 1000.0,
166+
self.iat_stats.fwd_iat.get_min() * 1000.0,
167+
self.iat_stats.bwd_iat.get_total() * 1000.0,
168+
self.iat_stats.bwd_iat.get_mean() * 1000.0,
169+
self.iat_stats.bwd_iat.get_std() * 1000.0,
170+
self.iat_stats.bwd_iat.get_max() * 1000.0,
171+
self.iat_stats.bwd_iat.get_min() * 1000.0,
172172
// TCP Flags Stats (fwd & bwd)
173173
self.tcp_flags_stats.fwd_psh_flag_count,
174174
self.tcp_flags_stats.bwd_psh_flag_count,
@@ -188,12 +188,12 @@ impl Flow for CicFlow {
188188
self.packet_len_stats.bwd_packet_len.get_count() as f64,
189189
self.basic_flow.get_flow_duration_usec() as f64
190190
),
191-
// Packet Length Stats (Flow)
192-
self.packet_len_stats.flow_min(),
193-
self.packet_len_stats.flow_max(),
194-
self.packet_len_stats.flow_mean(),
195-
self.packet_len_stats.flow_std(),
196-
self.packet_len_stats.flow_variance(),
191+
// Payload Length Stats (Flow)
192+
self.payload_len_stats.payload_len.get_min(),
193+
self.payload_len_stats.payload_len.get_max(),
194+
self.payload_len_stats.payload_len.get_mean(),
195+
self.payload_len_stats.payload_len.get_std(),
196+
self.payload_len_stats.payload_len.get_std().powi(2),
197197
// TCP Flags Stats (Flow)
198198
self.tcp_flags_stats.fwd_fin_flag_count + self.tcp_flags_stats.bwd_fin_flag_count,
199199
self.tcp_flags_stats.fwd_syn_flag_count + self.tcp_flags_stats.bwd_syn_flag_count,
@@ -221,19 +221,19 @@ impl Flow for CicFlow {
221221
self.bulk_stats.bwd_bulk_rate(),
222222
// Subflow Stats
223223
safe_div_int(
224-
self.packet_len_stats.fwd_packet_len.get_count(),
224+
self.payload_len_stats.fwd_payload_len.get_count(),
225225
self.subflow_stats.subflow_count
226226
),
227227
safe_div(
228-
self.packet_len_stats.fwd_packet_len.get_total(),
228+
self.payload_len_stats.fwd_payload_len.get_total(),
229229
self.subflow_stats.subflow_count as f64
230230
),
231231
safe_div_int(
232-
self.packet_len_stats.bwd_packet_len.get_count(),
232+
self.payload_len_stats.bwd_payload_len.get_count(),
233233
self.subflow_stats.subflow_count
234234
),
235235
safe_div(
236-
self.packet_len_stats.bwd_packet_len.get_total(),
236+
self.payload_len_stats.bwd_payload_len.get_total(),
237237
self.subflow_stats.subflow_count as f64
238238
),
239239
// Window Size Stats
@@ -384,18 +384,18 @@ impl Flow for CicFlow {
384384
self.basic_flow.protocol,
385385
self.basic_flow.get_flow_duration_usec(),
386386
// Packet Length Stats (fwd & bwd)
387-
self.packet_len_stats.fwd_packet_len.get_count(),
388-
self.packet_len_stats.bwd_packet_len.get_count(),
389-
self.packet_len_stats.fwd_packet_len.get_total(),
390-
self.packet_len_stats.bwd_packet_len.get_total(),
391-
self.packet_len_stats.fwd_packet_len.get_max(),
392-
self.packet_len_stats.fwd_packet_len.get_min(),
393-
self.packet_len_stats.fwd_packet_len.get_mean(),
394-
self.packet_len_stats.fwd_packet_len.get_std(),
395-
self.packet_len_stats.bwd_packet_len.get_max(),
396-
self.packet_len_stats.bwd_packet_len.get_min(),
397-
self.packet_len_stats.bwd_packet_len.get_mean(),
398-
self.packet_len_stats.bwd_packet_len.get_std(),
387+
self.payload_len_stats.fwd_payload_len.get_count(),
388+
self.payload_len_stats.bwd_payload_len.get_count(),
389+
self.payload_len_stats.fwd_payload_len.get_total(),
390+
self.payload_len_stats.bwd_payload_len.get_total(),
391+
self.payload_len_stats.fwd_payload_len.get_max(),
392+
self.payload_len_stats.fwd_payload_len.get_min(),
393+
self.payload_len_stats.fwd_payload_len.get_mean(),
394+
self.payload_len_stats.fwd_payload_len.get_std(),
395+
self.payload_len_stats.bwd_payload_len.get_max(),
396+
self.payload_len_stats.bwd_payload_len.get_min(),
397+
self.payload_len_stats.bwd_payload_len.get_mean(),
398+
self.payload_len_stats.bwd_payload_len.get_std(),
399399
// Rate Stats (Flow)
400400
safe_per_second_rate(
401401
self.packet_len_stats.flow_total(),
@@ -405,21 +405,21 @@ impl Flow for CicFlow {
405405
self.packet_len_stats.flow_count() as f64,
406406
self.basic_flow.get_flow_duration_usec() as f64
407407
),
408-
// IAT Stats
409-
self.iat_stats.iat.get_mean(),
410-
self.iat_stats.iat.get_std(),
411-
self.iat_stats.iat.get_max(),
412-
self.iat_stats.iat.get_min(),
413-
self.iat_stats.fwd_iat.get_total(),
414-
self.iat_stats.fwd_iat.get_mean(),
415-
self.iat_stats.fwd_iat.get_std(),
416-
self.iat_stats.fwd_iat.get_max(),
417-
self.iat_stats.fwd_iat.get_min(),
418-
self.iat_stats.bwd_iat.get_total(),
419-
self.iat_stats.bwd_iat.get_mean(),
420-
self.iat_stats.bwd_iat.get_std(),
421-
self.iat_stats.bwd_iat.get_max(),
422-
self.iat_stats.bwd_iat.get_min(),
408+
// IAT Stats in us instead of ms
409+
self.iat_stats.iat.get_mean() * 1000.0,
410+
self.iat_stats.iat.get_std() * 1000.0,
411+
self.iat_stats.iat.get_max() * 1000.0,
412+
self.iat_stats.iat.get_min() * 1000.0,
413+
self.iat_stats.fwd_iat.get_total() * 1000.0,
414+
self.iat_stats.fwd_iat.get_mean() * 1000.0,
415+
self.iat_stats.fwd_iat.get_std() * 1000.0,
416+
self.iat_stats.fwd_iat.get_max() * 1000.0,
417+
self.iat_stats.fwd_iat.get_min() * 1000.0,
418+
self.iat_stats.bwd_iat.get_total() * 1000.0,
419+
self.iat_stats.bwd_iat.get_mean() * 1000.0,
420+
self.iat_stats.bwd_iat.get_std() * 1000.0,
421+
self.iat_stats.bwd_iat.get_max() * 1000.0,
422+
self.iat_stats.bwd_iat.get_min() * 1000.0,
423423
// TCP Flags Stats (fwd & bwd)
424424
self.tcp_flags_stats.fwd_psh_flag_count,
425425
self.tcp_flags_stats.bwd_psh_flag_count,
@@ -439,12 +439,12 @@ impl Flow for CicFlow {
439439
self.packet_len_stats.bwd_packet_len.get_count() as f64,
440440
self.basic_flow.get_flow_duration_usec() as f64
441441
),
442-
// Packet Length Stats (Flow)
443-
self.packet_len_stats.flow_min(),
444-
self.packet_len_stats.flow_max(),
445-
self.packet_len_stats.flow_mean(),
446-
self.packet_len_stats.flow_std(),
447-
self.packet_len_stats.flow_variance(),
442+
// Payload Length Stats (Flow)
443+
self.payload_len_stats.payload_len.get_min(),
444+
self.payload_len_stats.payload_len.get_max(),
445+
self.payload_len_stats.payload_len.get_mean(),
446+
self.payload_len_stats.payload_len.get_std(),
447+
self.payload_len_stats.payload_len.get_std().powi(2),
448448
// TCP Flags Stats (Flow)
449449
self.tcp_flags_stats.fwd_fin_flag_count + self.tcp_flags_stats.bwd_fin_flag_count,
450450
self.tcp_flags_stats.fwd_syn_flag_count + self.tcp_flags_stats.bwd_syn_flag_count,
@@ -455,8 +455,10 @@ impl Flow for CicFlow {
455455
self.tcp_flags_stats.fwd_cwr_flag_count + self.tcp_flags_stats.bwd_cwr_flag_count,
456456
self.tcp_flags_stats.fwd_ece_flag_count + self.tcp_flags_stats.bwd_ece_flag_count,
457457
// UP/DOWN Ratio
458-
self.packet_len_stats.bwd_packet_len.get_count() as f64
459-
/ self.packet_len_stats.fwd_packet_len.get_count() as f64,
458+
safe_div_int(
459+
self.packet_len_stats.bwd_packet_len.get_count(),
460+
self.packet_len_stats.fwd_packet_len.get_count()
461+
),
460462
// Payload Length Stats
461463
self.payload_len_stats.payload_len.get_mean(),
462464
self.payload_len_stats.fwd_payload_len.get_mean(),
@@ -469,14 +471,22 @@ impl Flow for CicFlow {
469471
self.bulk_stats.bwd_bulk_packets.get_mean(),
470472
self.bulk_stats.bwd_bulk_rate(),
471473
// Subflow Stats
472-
self.packet_len_stats.fwd_packet_len.get_count() as f64
473-
/ self.subflow_stats.subflow_count as f64,
474-
self.packet_len_stats.fwd_packet_len.get_total()
475-
/ self.subflow_stats.subflow_count as f64,
476-
self.packet_len_stats.bwd_packet_len.get_count() as f64
477-
/ self.subflow_stats.subflow_count as f64,
478-
self.packet_len_stats.bwd_packet_len.get_total()
479-
/ self.subflow_stats.subflow_count as f64,
474+
safe_div_int(
475+
self.payload_len_stats.fwd_payload_len.get_count(),
476+
self.subflow_stats.subflow_count
477+
),
478+
safe_div(
479+
self.payload_len_stats.fwd_payload_len.get_total(),
480+
self.subflow_stats.subflow_count as f64
481+
),
482+
safe_div_int(
483+
self.payload_len_stats.bwd_payload_len.get_count(),
484+
self.subflow_stats.subflow_count
485+
),
486+
safe_div(
487+
self.payload_len_stats.bwd_payload_len.get_total(),
488+
self.subflow_stats.subflow_count as f64
489+
),
480490
// Window Size Stats
481491
self.window_size_stats.fwd_init_window_size,
482492
self.window_size_stats.bwd_init_window_size,

rustiflow/src/flows/features/icmp_stats.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ impl IcmpStats {
1818
}
1919
}
2020

21-
pub fn get_type(&self) -> u8 {
22-
self.icmp_type.unwrap_or(0)
21+
pub fn get_type(&self) -> i16 {
22+
self.icmp_type.map(|v| v as i16).unwrap_or(-1)
2323
}
2424

25-
pub fn get_code(&self) -> u8 {
26-
self.icmp_code.unwrap_or(0)
25+
pub fn get_code(&self) -> i16 {
26+
self.icmp_code.map(|v| v as i16).unwrap_or(-1)
2727
}
2828
}
2929

rustiflow/src/flows/features/retransmission_stats.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
use std::collections::hash_map::DefaultHasher;
12
use std::collections::HashSet;
3+
use std::hash::{Hash, Hasher};
4+
5+
use pnet::packet::ip::IpNextHeaderProtocols;
26

37
use crate::{flows::util::FlowExpireCause, packet_features::PacketFeatures};
48

@@ -29,13 +33,25 @@ impl RetransmissionStats {
2933
impl FlowFeature for RetransmissionStats {
3034
fn update(&mut self, packet: &PacketFeatures, is_forward: bool, _last_timestamp_us: i64) {
3135
let seq = packet.sequence_number;
36+
let ack = packet.sequence_number_ack;
37+
38+
if packet.protocol != IpNextHeaderProtocols::Icmp.0
39+
|| packet.protocol == IpNextHeaderProtocols::Icmpv6.0
40+
{
41+
// Skip ICMP packets
42+
return;
43+
}
44+
45+
let mut hasher = DefaultHasher::new();
46+
(seq, ack).hash(&mut hasher);
47+
let hash = hasher.finish();
3248

3349
if is_forward {
34-
if !self.fwd_seen_seqs.insert(seq) {
50+
if !self.fwd_seen_seqs.insert(hash as u32) {
3551
self.fwd_retransmission_count += 1;
3652
}
3753
} else {
38-
if !self.bwd_seen_seqs.insert(seq) {
54+
if !self.bwd_seen_seqs.insert(hash as u32) {
3955
self.bwd_retransmission_count += 1;
4056
}
4157
}

rustiflow/src/flows/features/timing_stats.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,34 @@ use super::util::FlowFeature;
44

55
#[derive(Clone)]
66
pub struct TimingStats {
7-
pub first_timestamp_fwd: Option<i64>,
8-
pub first_timestamp_bwd: Option<i64>,
9-
pub last_timestamp_fwd: Option<i64>,
10-
pub last_timestamp_bwd: Option<i64>,
7+
pub first_timestamp_fwd_ms: Option<i64>,
8+
pub first_timestamp_bwd_ms: Option<i64>,
9+
pub last_timestamp_fwd_ms: Option<i64>,
10+
pub last_timestamp_bwd_ms: Option<i64>,
1111
}
1212

1313
impl TimingStats {
1414
pub fn new() -> Self {
1515
TimingStats {
16-
first_timestamp_fwd: None,
17-
first_timestamp_bwd: None,
18-
last_timestamp_fwd: None,
19-
last_timestamp_bwd: None,
16+
first_timestamp_fwd_ms: None,
17+
first_timestamp_bwd_ms: None,
18+
last_timestamp_fwd_ms: None,
19+
last_timestamp_bwd_ms: None,
2020
}
2121
}
2222

2323
pub fn get_fwd_duration(&self) -> i64 {
24-
if let (Some(first), Some(last)) = (self.first_timestamp_fwd, self.last_timestamp_fwd) {
24+
if let (Some(first), Some(last)) = (self.first_timestamp_fwd_ms, self.last_timestamp_fwd_ms)
25+
{
2526
last - first
2627
} else {
2728
0
2829
}
2930
}
3031

3132
pub fn get_bwd_duration(&self) -> i64 {
32-
if let (Some(first), Some(last)) = (self.first_timestamp_bwd, self.last_timestamp_bwd) {
33+
if let (Some(first), Some(last)) = (self.first_timestamp_bwd_ms, self.last_timestamp_bwd_ms)
34+
{
3335
last - first
3436
} else {
3537
0
@@ -41,15 +43,15 @@ impl FlowFeature for TimingStats {
4143
fn update(&mut self, packet: &PacketFeatures, is_forward: bool, _last_timestamp_us: i64) {
4244
let current_ts = packet.timestamp_us / 1000;
4345
if is_forward {
44-
if self.first_timestamp_fwd.is_none() {
45-
self.first_timestamp_fwd = Some(current_ts);
46+
if self.first_timestamp_fwd_ms.is_none() {
47+
self.first_timestamp_fwd_ms = Some(current_ts);
4648
}
47-
self.last_timestamp_fwd = Some(current_ts);
49+
self.last_timestamp_fwd_ms = Some(current_ts);
4850
} else {
49-
if self.first_timestamp_bwd.is_none() {
50-
self.first_timestamp_bwd = Some(current_ts);
51+
if self.first_timestamp_bwd_ms.is_none() {
52+
self.first_timestamp_bwd_ms = Some(current_ts);
5153
}
52-
self.last_timestamp_bwd = Some(current_ts);
54+
self.last_timestamp_bwd_ms = Some(current_ts);
5355
}
5456
}
5557

@@ -60,10 +62,10 @@ impl FlowFeature for TimingStats {
6062
fn dump(&self) -> String {
6163
format!(
6264
"{},{},{},{},{},{}",
63-
self.first_timestamp_fwd.map(|t| t as i64).unwrap_or(0),
64-
self.first_timestamp_bwd.map(|t| t as i64).unwrap_or(0),
65-
self.last_timestamp_fwd.map(|t| t as i64).unwrap_or(0),
66-
self.last_timestamp_bwd.map(|t| t as i64).unwrap_or(0),
65+
self.first_timestamp_fwd_ms.unwrap_or(0),
66+
self.first_timestamp_bwd_ms.unwrap_or(0),
67+
self.last_timestamp_fwd_ms.unwrap_or(0),
68+
self.last_timestamp_bwd_ms.unwrap_or(0),
6769
self.get_fwd_duration(),
6870
self.get_bwd_duration()
6971
)

0 commit comments

Comments
 (0)