diff --git a/barrage/barrage.go b/barrage/barrage.go index f1599b7..ba332e5 100644 --- a/barrage/barrage.go +++ b/barrage/barrage.go @@ -40,9 +40,6 @@ func worker(id int, ctx context.Context, server string, port int, srcRange strin BytesSent: 0, } - startTime := time.Now().UnixNano() - limiter := time.Tick(time.Millisecond * time.Duration(delay)) - // Configure connection to use. It looks like a listener, but it will be used to send packet. Allows setting the source port. srcPort := utils.RandomNum(sourcePortMin, sourcePortMax) @@ -78,35 +75,35 @@ func worker(id int, ctx context.Context, server string, port int, srcRange strin log.Printf("%s [%2d] Slinging packets at %s:%d with Source ID: %5d and delay of %dms\n", label, id, server, port, sourceID, delay) - // Infinite loop to keep slinging until we receive context done. - takeAction := false + + // Data limiter throttles flow packet generation. + dataLimiter := time.NewTicker(time.Millisecond * time.Duration(delay)) + defer dataLimiter.Stop() + + // Template retransmission ticker — fires every templateInterval seconds. + // When templateInterval is 0, no ticker is created so templates are never retransmitted. + var tmplTicker *time.Ticker + if templateInterval > 0 { + tmplTicker = time.NewTicker(time.Duration(templateInterval) * time.Second) + defer tmplTicker.Stop() + } for { - now := time.Now().UnixNano() - cycle := (now - startTime) / int64(time.Second) % int64(templateInterval) - if cycle == 0 { - if takeAction { - takeAction = false - // Retransmit template every templateInterval seconds - bytes, err := utils.SendPacket(conn, &net.UDPAddr{IP: destIP, Port: port}, tBuf, false) - if err != nil { - log.Printf("%s [%2d] Issue sending template packet: %v", label, id, err) - return - } - wStats.FlowsSent++ - wStats.BytesSent += uint64(bytes) - statsChan <- wStats - } - } else { - takeAction = true - } select { - case <-ctx.Done(): // Caught the signal to be done.... time to wrap it up + case <-ctx.Done(): log.Printf("%s [%2d] Exiting due to signal\n", label, id) return - default: - // Basic limiter to throttle/delay packets - <-limiter + case <-tmplTicker.C: + // Retransmit template every templateInterval seconds + bytes, err := utils.SendPacket(conn, &net.UDPAddr{IP: destIP, Port: port}, tBuf, false) + if err != nil { + log.Printf("%s [%2d] Issue sending template packet: %v", label, id, err) + return + } + wStats.FlowsSent++ + wStats.BytesSent += uint64(bytes) + statsChan <- wStats + case <-dataLimiter.C: flowCount := utils.RandomNum(5, 25) buf := gen.GenerateData(flowCount, sourceID, srcRange, dstRange, session) bytes, err := utils.SendPacket(conn, &net.UDPAddr{IP: destIP, Port: port}, buf, false) diff --git a/ipfix/ipfix.go b/ipfix/ipfix.go index 35608b7..36bc278 100644 --- a/ipfix/ipfix.go +++ b/ipfix/ipfix.go @@ -23,30 +23,6 @@ import ( // IPFIX version number per RFC 7011. const Version = 10 -// Port constants (aliases for backwards compatibility within this package) -const ( - httpsPort = utils.HTTPSPort - sshPort = utils.SSHPort - ftpPort = utils.FTPPort - dnsPort = utils.DNSPort - httpPort = utils.HTTPPort - ntpPort = utils.NTPPort - snmpPort = utils.SNMPPort - imapsPort = utils.IMAPSPort - mysqlPort = utils.MySQLPort - httpAltPort = utils.HTTPAltPort - httpsAltPort = utils.HTTPSAltPort - p2pPort = utils.P2PPort - btPort = utils.BTPort -) - -// Protocol constants (aliases for backwards compatibility within this package) -const ( - tcpProto = utils.TCPProto - udpProto = utils.UDPProto - icmpProto = utils.ICMPProto -) - // IANA IPFIX field type constants (RFC 7011 / Information Model) const ( ProtocolIdentifier = 4 @@ -342,50 +318,7 @@ func (gf *GenericFlow) Generate(srcIP net.IP, dstIP net.IP, flowSrcPort int, ses gf.IPClassOfService = 0 gf.FlowEndReason = uint8(utils.RandomNum(0, 4)) // 0=active, 1=idle, 2=other, 3=exporterReset, 4=exporterShutdown - switch flowSrcPort { - case sshPort: - gf.DestPort = uint16(sshPort) - gf.ProtocolIdentifier = tcpProto - case ftpPort: - gf.DestPort = uint16(ftpPort) - gf.ProtocolIdentifier = tcpProto - case dnsPort: - gf.DestPort = uint16(dnsPort) - gf.ProtocolIdentifier = udpProto - case httpPort: - gf.DestPort = uint16(httpPort) - gf.ProtocolIdentifier = tcpProto - case httpsPort: - gf.DestPort = uint16(httpsPort) - gf.ProtocolIdentifier = tcpProto - case ntpPort: - gf.DestPort = uint16(ntpPort) - gf.ProtocolIdentifier = udpProto - case snmpPort: - gf.DestPort = uint16(snmpPort) - gf.ProtocolIdentifier = udpProto - case imapsPort: - gf.DestPort = uint16(imapsPort) - gf.ProtocolIdentifier = tcpProto - case mysqlPort: - gf.DestPort = uint16(mysqlPort) - gf.ProtocolIdentifier = tcpProto - case httpAltPort: - gf.DestPort = uint16(httpAltPort) - gf.ProtocolIdentifier = tcpProto - case httpsAltPort: - gf.DestPort = uint16(httpsAltPort) - gf.ProtocolIdentifier = tcpProto - case p2pPort: - gf.DestPort = uint16(p2pPort) - gf.ProtocolIdentifier = tcpProto - case btPort: - gf.DestPort = uint16(btPort) - gf.ProtocolIdentifier = tcpProto - default: - gf.DestPort = uint16(httpsPort) - gf.ProtocolIdentifier = tcpProto - } + gf.DestPort, gf.ProtocolIdentifier = utils.ResolvePortProtocol(flowSrcPort) return *gf } @@ -406,7 +339,7 @@ type DataFlowSet struct { func (d *DataFlowSet) Generate(flowCount int, srcRange string, dstRange string, flowSrcPort int, session *netflow.Session, profile ...IPFIXFlowProfile) DataFlowSet { _ = profile // reserved for future profile-aware data generation - protoPorts := []int{21, 22, 53, 80, 443, 123, 161, 993, 3306, 8080, 8443, 6681, 6682} + protoPorts := utils.ProtoPorts items := make([]DataAny, flowCount) for i := range flowCount { @@ -672,7 +605,7 @@ func GenerateDataIPFIX(flowCount int, sourceID int, srcRange string, dstRange st // GenerateIPFIX creates an IPFIX packet containing both template and data FlowSets. func GenerateIPFIX(flowCount int, sourceID int, srcRange string, dstRange string, session *netflow.Session) IPFIX { templateFlow := new(TemplateFlowSet).Generate(session) - dataFlow := new(DataFlowSet).Generate(flowCount, srcRange, dstRange, httpsPort, session) + dataFlow := new(DataFlowSet).Generate(flowCount, srcRange, dstRange, utils.HTTPSPort, session) header := new(Header).Generate(flowCount+1, sourceID, session) return IPFIX{ Header: header, diff --git a/ipfix/ipfix_test.go b/ipfix/ipfix_test.go index 77cf0ee..6a801a6 100644 --- a/ipfix/ipfix_test.go +++ b/ipfix/ipfix_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/dmabry/flowgre/netflow" + "github.com/dmabry/flowgre/utils" "github.com/google/go-cmp/cmp" ) @@ -75,7 +76,7 @@ func TestGenerateDataIPFIX(t *testing.T) { flowCount := 10 sourceID := 618 session := netflow.NewSession() - flow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session) + flow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session) if len(flow.DataFlowSets) < 1 { t.Fatal("No data flowsets generated") @@ -122,7 +123,7 @@ func TestToBytes_RoundTrip(t *testing.T) { session := netflow.NewSession() tFlow := GenerateTemplateIPFIX(sourceID, session) - dFlow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session) + dFlow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session) tBuf := tFlow.ToBytes() dBuf := dFlow.ToBytes() @@ -418,7 +419,7 @@ func TestToBytes_BufferLengthMatchesFlowSetLengths(t *testing.T) { } // Test data-only packet - dFlow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session) + dFlow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session) dBuf := dFlow.ToBytes() expectedDLen := binary.Size(dFlow.Header) for _, fs := range dFlow.DataFlowSets { @@ -500,7 +501,7 @@ func TestGenericFlowIPv6(t *testing.T) { dstIP := net.ParseIP("2001:db8::2") session := netflow.NewSession() - result := new(GenericFlow).Generate(srcIP, dstIP, httpsPort, session) + result := new(GenericFlow).Generate(srcIP, dstIP, utils.HTTPSPort, session) // IPv4 fields should be zeroed if result.SourceIPv4Addr != 0 { @@ -538,7 +539,7 @@ func TestGenericFlowIPv4_ZerosIPv6(t *testing.T) { dstIP := net.ParseIP("10.0.0.2") session := netflow.NewSession() - result := new(GenericFlow).Generate(srcIP, dstIP, httpsPort, session) + result := new(GenericFlow).Generate(srcIP, dstIP, utils.HTTPSPort, session) // IPv4 fields should be populated if result.SourceIPv4Addr == 0 { @@ -568,7 +569,7 @@ func TestGenerateDataIPFIX_IPv6(t *testing.T) { flowCount := 10 sourceID := 618 session := netflow.NewSession() - flow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", httpsPort, session) + flow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", utils.HTTPSPort, session) if len(flow.DataFlowSets) < 1 { t.Fatal("No data flowsets generated") @@ -633,7 +634,7 @@ func TestToBytes_IPv6_RoundTrip(t *testing.T) { flowCount := 10 session := netflow.NewSession() - dFlow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", httpsPort, session) + dFlow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", utils.HTTPSPort, session) dBuf := dFlow.ToBytes() // Read back @@ -698,7 +699,7 @@ func TestToBytes_IPv6_BufferLengthMatchesFlowSetLengths(t *testing.T) { flowCount := 10 session := netflow.NewSession() - dFlow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", httpsPort, session) + dFlow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", utils.HTTPSPort, session) dBuf := dFlow.ToBytes() expectedDLen := binary.Size(dFlow.Header) for _, fs := range dFlow.DataFlowSets { diff --git a/netflow/dataflowset.go b/netflow/dataflowset.go index 9449988..011b726 100644 --- a/netflow/dataflowset.go +++ b/netflow/dataflowset.go @@ -38,7 +38,7 @@ func (d *DataFlowSet) Generate(flowCount int, srcRange string, dstRange string, dataFlowSet := new(DataFlowSet) dataFlowSet.FlowSetID = 256 - protoPorts := [13]int{21, 22, 53, 80, 443, 123, 161, 993, 3306, 8080, 8443, 6681, 6682} + protoPorts := utils.ProtoPorts items := make([]DataAny, flowCount) for i := range flowCount { srcIP, err := utils.RandomIPCIDR(srcRange) @@ -51,7 +51,7 @@ func (d *DataFlowSet) Generate(flowCount int, srcRange string, dstRange string, } var flowPort int if flowSrcPort == 0 { - flowPort = protoPorts[utils.RandomNum(0, 12)] + flowPort = protoPorts[utils.RandomNum(0, len(protoPorts))] } else { flowPort = flowSrcPort } diff --git a/netflow/flow.go b/netflow/flow.go index b7d1d8a..74af281 100644 --- a/netflow/flow.go +++ b/netflow/flow.go @@ -10,37 +10,6 @@ import ( "github.com/dmabry/flowgre/utils" ) -// Port constants (aliases for backwards compatibility within this package) -const ( - ftpPort = utils.FTPPort - sshPort = utils.SSHPort - dnsPort = utils.DNSPort - httpPort = utils.HTTPPort - httpsPort = utils.HTTPSPort - ntpPort = utils.NTPPort - snmpPort = utils.SNMPPort - imapsPort = utils.IMAPSPort - mysqlPort = utils.MySQLPort - httpAltPort = utils.HTTPAltPort - httpsAltPort = utils.HTTPSAltPort - p2pPort = utils.P2PPort - btPort = utils.BTPort -) - -// Protocol constants (aliases for backwards compatibility within this package) -const ( - tcpProto = utils.TCPProto - udpProto = utils.UDPProto - icmpProto = utils.ICMPProto - sctpProto = utils.SCTPProto - igmpProto = utils.IGMPProto - egpProto = utils.EGPProto - igpProto = utils.IGPProto - greProto = utils.GREProto - espProto = utils.ESPProto - eigrpProto = utils.EIGRPProto -) - // Constants for Field Types const ( IN_BYTES = 1 @@ -222,50 +191,7 @@ func (gf *GenericFlow) Generate(srcIP net.IP, dstIP net.IP, flowSrcPort int, ses gf.EngineType = 0 gf.EngineID = 0 - switch flowSrcPort { - case sshPort: - gf.L4DstPort = uint16(sshPort) - gf.Protocol = uint8(tcpProto) - case ftpPort: - gf.L4DstPort = uint16(ftpPort) - gf.Protocol = uint8(tcpProto) - case dnsPort: - gf.L4DstPort = uint16(dnsPort) - gf.Protocol = uint8(udpProto) - case httpPort: - gf.L4DstPort = uint16(httpPort) - gf.Protocol = uint8(tcpProto) - case httpsPort: - gf.L4DstPort = uint16(httpsPort) - gf.Protocol = uint8(tcpProto) - case ntpPort: - gf.L4DstPort = uint16(ntpPort) - gf.Protocol = uint8(udpProto) - case snmpPort: - gf.L4DstPort = uint16(snmpPort) - gf.Protocol = uint8(udpProto) - case imapsPort: - gf.L4DstPort = uint16(imapsPort) - gf.Protocol = uint8(tcpProto) - case mysqlPort: - gf.L4DstPort = uint16(mysqlPort) - gf.Protocol = uint8(tcpProto) - case httpAltPort: - gf.L4DstPort = uint16(httpAltPort) - gf.Protocol = uint8(tcpProto) - case httpsAltPort: - gf.L4DstPort = uint16(httpsAltPort) - gf.Protocol = uint8(tcpProto) - case p2pPort: - gf.L4DstPort = uint16(p2pPort) - gf.Protocol = uint8(tcpProto) - case btPort: - gf.L4DstPort = uint16(btPort) - gf.Protocol = uint8(tcpProto) - default: - gf.L4DstPort = uint16(httpsPort) - gf.Protocol = uint8(tcpProto) - } + gf.L4DstPort, gf.Protocol = utils.ResolvePortProtocol(flowSrcPort) return *gf } diff --git a/netflow/netflow.go b/netflow/netflow.go index e6551d9..a6384f5 100644 --- a/netflow/netflow.go +++ b/netflow/netflow.go @@ -10,12 +10,14 @@ import ( "encoding/binary" "fmt" "time" + + "github.com/dmabry/flowgre/utils" ) func GenerateNetflow(flowCount int, sourceID int, srcRange string, dstRange string, session *Session, profile ...FlowProfile) Netflow { netflow := new(Netflow) templateFlow := new(TemplateFlowSet).Generate(session, profile...) - dataFlow := new(DataFlowSet).Generate(flowCount, srcRange, dstRange, httpsPort, session, profile...) + dataFlow := new(DataFlowSet).Generate(flowCount, srcRange, dstRange, utils.HTTPSPort, session, profile...) header := new(Header).Generate(flowCount+1, sourceID, session) // always +1 of dataflow count, because we are counting the template netflow.Header = header netflow.TemplateFlowSets = append(netflow.TemplateFlowSets, templateFlow) diff --git a/netflow/netflow_test.go b/netflow/netflow_test.go index 5a51f31..52db39d 100644 --- a/netflow/netflow_test.go +++ b/netflow/netflow_test.go @@ -6,10 +6,11 @@ package netflow import ( "bytes" "encoding/binary" - "github.com/google/go-cmp/cmp" "net" "testing" + "github.com/google/go-cmp/cmp" + "github.com/dmabry/flowgre/utils" ) @@ -36,7 +37,8 @@ func TestHeader_Generate(t *testing.T) { t.Errorf("Header returned the wrong flow sequence! Got: %d Want: value", header.FlowSequence) } if header.SourceID != uint32(sourceID) { - t.Errorf("Header returned the wrong source id! Got: %d Want: %d", header.SourceID, sourceID) + t.Errorf("Header returned the wrong source id! Got: %d Want: %d", + header.SourceID, sourceID) } } @@ -64,7 +66,7 @@ func TestGenerateDataNetflow(t *testing.T) { flowcount := 10 sourceID := 618 session := NewSession() - flow := GenerateDataNetflow(flowcount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session) + flow := GenerateDataNetflow(flowcount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session) if len(flow.DataFlowSets) < 1 { t.Errorf("Returned incorrect number of Data Flows! Got: %d Want >: %d", len(flow.DataFlowSets), 1) @@ -90,7 +92,7 @@ func TestToBytes(t *testing.T) { flowcount := 10 session := NewSession() tflow := GenerateTemplateNetflow(sourceID, session) - dflow := GenerateDataNetflow(flowcount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session) + dflow := GenerateDataNetflow(flowcount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session) // Convert to Bytes tbuf := tflow.ToBytes() dbuf := dflow.ToBytes() @@ -228,7 +230,7 @@ func TestGenericFlowIPv6(t *testing.T) { session := NewSession() gf := new(GenericFlow) - result := gf.Generate(srcIP, dstIP, httpsPort, session) + result := gf.Generate(srcIP, dstIP, utils.HTTPSPort, session) // IPv4 fields should be zeroed if result.Ipv4SrcAddr != 0 { @@ -267,7 +269,7 @@ func TestGenericFlowIPv4BackwardCompat(t *testing.T) { session := NewSession() gf := new(GenericFlow) - result := gf.Generate(srcIP, dstIP, httpsPort, session) + result := gf.Generate(srcIP, dstIP, utils.HTTPSPort, session) // IPv4 fields should be populated if result.Ipv4SrcAddr != utils.IPToNum(srcIP) { @@ -334,7 +336,7 @@ func TestIPv6DataNetflow(t *testing.T) { flowcount := 10 sourceID := 618 session := NewSession() - flow := GenerateDataNetflow(flowcount, sourceID, "2001:db8::/48", "2001:db8:1::/48", httpsPort, session) + flow := GenerateDataNetflow(flowcount, sourceID, "2001:db8::/48", "2001:db8:1::/48", utils.HTTPSPort, session) if len(flow.DataFlowSets) < 1 { t.Fatal("expected at least one data flow set") @@ -369,7 +371,7 @@ func TestIPv6ToBytesRoundTrip(t *testing.T) { flowcount := 5 session := NewSession() tflow := GenerateTemplateNetflow(sourceID, session) - dflow := GenerateDataNetflow(flowcount, sourceID, "2001:db8::/48", "2001:db8:1::/48", httpsPort, session) + dflow := GenerateDataNetflow(flowcount, sourceID, "2001:db8::/48", "2001:db8:1::/48", utils.HTTPSPort, session) // Serialize and deserialize tbuf := tflow.ToBytes() diff --git a/netflow/profile_coverage_test.go b/netflow/profile_coverage_test.go index 48cebdd..b4567d5 100644 --- a/netflow/profile_coverage_test.go +++ b/netflow/profile_coverage_test.go @@ -6,6 +6,8 @@ package netflow import ( "net" "testing" + + "github.com/dmabry/flowgre/utils" ) func TestMinimalFlow_Generate_AllProtocols(t *testing.T) { @@ -17,20 +19,20 @@ func TestMinimalFlow_Generate_AllProtocols(t *testing.T) { wantPort uint16 wantProtocol uint8 }{ - {sshPort, uint16(sshPort), tcpProto}, - {ftpPort, uint16(ftpPort), tcpProto}, - {dnsPort, uint16(dnsPort), udpProto}, - {httpPort, uint16(httpPort), tcpProto}, - {httpsPort, uint16(httpsPort), tcpProto}, - {ntpPort, uint16(ntpPort), udpProto}, - {snmpPort, uint16(snmpPort), udpProto}, - {imapsPort, uint16(imapsPort), tcpProto}, - {mysqlPort, uint16(mysqlPort), tcpProto}, - {httpAltPort, uint16(httpAltPort), tcpProto}, - {httpsAltPort, uint16(httpsAltPort), tcpProto}, - {p2pPort, uint16(p2pPort), tcpProto}, - {btPort, uint16(btPort), tcpProto}, - {99999, uint16(httpsPort), tcpProto}, // default + {utils.SSHPort, uint16(utils.SSHPort), utils.TCPProto}, + {utils.FTPPort, uint16(utils.FTPPort), utils.TCPProto}, + {utils.DNSPort, uint16(utils.DNSPort), utils.UDPProto}, + {utils.HTTPPort, uint16(utils.HTTPPort), utils.TCPProto}, + {utils.HTTPSPort, uint16(utils.HTTPSPort), utils.TCPProto}, + {utils.NTPPort, uint16(utils.NTPPort), utils.UDPProto}, + {utils.SNMPPort, uint16(utils.SNMPPort), utils.UDPProto}, + {utils.IMAPSPort, uint16(utils.IMAPSPort), utils.TCPProto}, + {utils.MySQLPort, uint16(utils.MySQLPort), utils.TCPProto}, + {utils.HTTPAltPort, uint16(utils.HTTPAltPort), utils.TCPProto}, + {utils.HTTPSAltPort, uint16(utils.HTTPSAltPort), utils.TCPProto}, + {utils.P2PPort, uint16(utils.P2PPort), utils.TCPProto}, + {utils.BTPort, uint16(utils.BTPort), utils.TCPProto}, + {99999, uint16(utils.HTTPSPort), utils.TCPProto}, // default } for _, tc := range cases { @@ -56,20 +58,20 @@ func TestExtendedFlow_Generate_AllProtocols(t *testing.T) { wantPort uint16 wantProtocol uint8 }{ - {sshPort, uint16(sshPort), tcpProto}, - {ftpPort, uint16(ftpPort), tcpProto}, - {dnsPort, uint16(dnsPort), udpProto}, - {httpPort, uint16(httpPort), tcpProto}, - {httpsPort, uint16(httpsPort), tcpProto}, - {ntpPort, uint16(ntpPort), udpProto}, - {snmpPort, uint16(snmpPort), udpProto}, - {imapsPort, uint16(imapsPort), tcpProto}, - {mysqlPort, uint16(mysqlPort), tcpProto}, - {httpAltPort, uint16(httpAltPort), tcpProto}, - {httpsAltPort, uint16(httpsAltPort), tcpProto}, - {p2pPort, uint16(p2pPort), tcpProto}, - {btPort, uint16(btPort), tcpProto}, - {99999, uint16(httpsPort), tcpProto}, // default + {utils.SSHPort, uint16(utils.SSHPort), utils.TCPProto}, + {utils.FTPPort, uint16(utils.FTPPort), utils.TCPProto}, + {utils.DNSPort, uint16(utils.DNSPort), utils.UDPProto}, + {utils.HTTPPort, uint16(utils.HTTPPort), utils.TCPProto}, + {utils.HTTPSPort, uint16(utils.HTTPSPort), utils.TCPProto}, + {utils.NTPPort, uint16(utils.NTPPort), utils.UDPProto}, + {utils.SNMPPort, uint16(utils.SNMPPort), utils.UDPProto}, + {utils.IMAPSPort, uint16(utils.IMAPSPort), utils.TCPProto}, + {utils.MySQLPort, uint16(utils.MySQLPort), utils.TCPProto}, + {utils.HTTPAltPort, uint16(utils.HTTPAltPort), utils.TCPProto}, + {utils.HTTPSAltPort, uint16(utils.HTTPSAltPort), utils.TCPProto}, + {utils.P2PPort, uint16(utils.P2PPort), utils.TCPProto}, + {utils.BTPort, uint16(utils.BTPort), utils.TCPProto}, + {99999, uint16(utils.HTTPSPort), utils.TCPProto}, // default } for _, tc := range cases { @@ -93,7 +95,7 @@ func TestMinimalFlow_Generate_IPv6(t *testing.T) { srcIP := net.ParseIP("2001:db8::1") dstIP := net.ParseIP("2001:db8::2") - mf := new(MinimalFlow).Generate(srcIP, dstIP, httpsPort, session) + mf := new(MinimalFlow).Generate(srcIP, dstIP, utils.HTTPSPort, session) // IPv4 fields should be zeroed for IPv6 if mf.SrcAddr != 0 { @@ -111,7 +113,7 @@ func TestExtendedFlow_Generate_IPv6(t *testing.T) { srcIP := net.ParseIP("2001:db8::1") dstIP := net.ParseIP("2001:db8::2") - ef := new(ExtendedFlow).Generate(srcIP, dstIP, httpsPort, session) + ef := new(ExtendedFlow).Generate(srcIP, dstIP, utils.HTTPSPort, session) // IPv4 fields should be zeroed for IPv6 if ef.SrcAddr != 0 { diff --git a/netflow/profile_extended.go b/netflow/profile_extended.go index 88f8162..d51f2fc 100644 --- a/netflow/profile_extended.go +++ b/netflow/profile_extended.go @@ -16,7 +16,7 @@ type ExtendedProfile struct{} // Name returns the profile name. func (p *ExtendedProfile) Name() string { return "extended" } -// TemplateFields returns the 14-field extended template. +// TemplateFields returns the 15-field extended template. func (p *ExtendedProfile) TemplateFields() []Field { return []Field{ {Type: IN_BYTES, Length: 4}, @@ -97,52 +97,8 @@ func (ef *ExtendedFlow) Generate(srcIP net.IP, dstIP net.IP, flowSrcPort int, se ef.MaxTtl = uint8(utils.RandomNum(1, 128)) ef.FirstSwitched = uptime - 100 ef.LastSwitched = uptime - 10 - ef.Protocol = uint8(tcpProto) - switch flowSrcPort { - case sshPort: - ef.DstPort = uint16(sshPort) - ef.Protocol = uint8(tcpProto) - case ftpPort: - ef.DstPort = uint16(ftpPort) - ef.Protocol = uint8(tcpProto) - case dnsPort: - ef.DstPort = uint16(dnsPort) - ef.Protocol = uint8(udpProto) - case httpPort: - ef.DstPort = uint16(httpPort) - ef.Protocol = uint8(tcpProto) - case httpsPort: - ef.DstPort = uint16(httpsPort) - ef.Protocol = uint8(tcpProto) - case ntpPort: - ef.DstPort = uint16(ntpPort) - ef.Protocol = uint8(udpProto) - case snmpPort: - ef.DstPort = uint16(snmpPort) - ef.Protocol = uint8(udpProto) - case imapsPort: - ef.DstPort = uint16(imapsPort) - ef.Protocol = uint8(tcpProto) - case mysqlPort: - ef.DstPort = uint16(mysqlPort) - ef.Protocol = uint8(tcpProto) - case httpAltPort: - ef.DstPort = uint16(httpAltPort) - ef.Protocol = uint8(tcpProto) - case httpsAltPort: - ef.DstPort = uint16(httpsAltPort) - ef.Protocol = uint8(tcpProto) - case p2pPort: - ef.DstPort = uint16(p2pPort) - ef.Protocol = uint8(tcpProto) - case btPort: - ef.DstPort = uint16(btPort) - ef.Protocol = uint8(tcpProto) - default: - ef.DstPort = uint16(httpsPort) - ef.Protocol = uint8(tcpProto) - } + ef.DstPort, ef.Protocol = utils.ResolvePortProtocol(flowSrcPort) return *ef } diff --git a/netflow/profile_minimal.go b/netflow/profile_minimal.go index 99d2e5d..40ef288 100644 --- a/netflow/profile_minimal.go +++ b/netflow/profile_minimal.go @@ -55,52 +55,8 @@ func (mf *MinimalFlow) Generate(srcIP net.IP, dstIP net.IP, flowSrcPort int, ses } mf.SrcPort = utils.GenerateRand16(10000) - mf.Protocol = uint8(tcpProto) - switch flowSrcPort { - case sshPort: - mf.DstPort = uint16(sshPort) - mf.Protocol = uint8(tcpProto) - case ftpPort: - mf.DstPort = uint16(ftpPort) - mf.Protocol = uint8(tcpProto) - case dnsPort: - mf.DstPort = uint16(dnsPort) - mf.Protocol = uint8(udpProto) - case httpPort: - mf.DstPort = uint16(httpPort) - mf.Protocol = uint8(tcpProto) - case httpsPort: - mf.DstPort = uint16(httpsPort) - mf.Protocol = uint8(tcpProto) - case ntpPort: - mf.DstPort = uint16(ntpPort) - mf.Protocol = uint8(udpProto) - case snmpPort: - mf.DstPort = uint16(snmpPort) - mf.Protocol = uint8(udpProto) - case imapsPort: - mf.DstPort = uint16(imapsPort) - mf.Protocol = uint8(tcpProto) - case mysqlPort: - mf.DstPort = uint16(mysqlPort) - mf.Protocol = uint8(tcpProto) - case httpAltPort: - mf.DstPort = uint16(httpAltPort) - mf.Protocol = uint8(tcpProto) - case httpsAltPort: - mf.DstPort = uint16(httpsAltPort) - mf.Protocol = uint8(tcpProto) - case p2pPort: - mf.DstPort = uint16(p2pPort) - mf.Protocol = uint8(tcpProto) - case btPort: - mf.DstPort = uint16(btPort) - mf.Protocol = uint8(tcpProto) - default: - mf.DstPort = uint16(httpsPort) - mf.Protocol = uint8(tcpProto) - } + mf.DstPort, mf.Protocol = utils.ResolvePortProtocol(flowSrcPort) return *mf } diff --git a/netflow/profile_minimal_test.go b/netflow/profile_minimal_test.go index 8b54c51..01aa3cf 100644 --- a/netflow/profile_minimal_test.go +++ b/netflow/profile_minimal_test.go @@ -6,6 +6,8 @@ package netflow import ( "net" "testing" + + "github.com/dmabry/flowgre/utils" ) func TestMinimalProfile_Name(t *testing.T) { @@ -165,7 +167,7 @@ func TestMinimalFlow_Generate(t *testing.T) { srcIP := net.ParseIP("10.0.0.1") dstIP := net.ParseIP("10.0.0.2") - mf := new(MinimalFlow).Generate(srcIP, dstIP, httpsPort, session) + mf := new(MinimalFlow).Generate(srcIP, dstIP, utils.HTTPSPort, session) if mf.SrcAddr == 0 { t.Error("expected non-zero src addr") @@ -173,11 +175,11 @@ func TestMinimalFlow_Generate(t *testing.T) { if mf.DstAddr == 0 { t.Error("expected non-zero dst addr") } - if mf.DstPort != uint16(httpsPort) { - t.Errorf("expected dst port %d, got %d", httpsPort, mf.DstPort) + if mf.DstPort != uint16(utils.HTTPSPort) { + t.Errorf("expected dst port %d, got %d", utils.HTTPSPort, mf.DstPort) } - if mf.Protocol != tcpProto { - t.Errorf("expected protocol %d, got %d", tcpProto, mf.Protocol) + if mf.Protocol != utils.TCPProto { + t.Errorf("expected protocol %d, got %d", utils.TCPProto, mf.Protocol) } } @@ -188,7 +190,7 @@ func TestExtendedFlow_Generate(t *testing.T) { srcIP := net.ParseIP("10.0.0.1") dstIP := net.ParseIP("10.0.0.2") - ef := new(ExtendedFlow).Generate(srcIP, dstIP, httpsPort, session) + ef := new(ExtendedFlow).Generate(srcIP, dstIP, utils.HTTPSPort, session) if ef.SrcAddr == 0 { t.Error("expected non-zero src addr") @@ -196,11 +198,11 @@ func TestExtendedFlow_Generate(t *testing.T) { if ef.DstAddr == 0 { t.Error("expected non-zero dst addr") } - if ef.DstPort != uint16(httpsPort) { - t.Errorf("expected dst port %d, got %d", httpsPort, ef.DstPort) + if ef.DstPort != uint16(utils.HTTPSPort) { + t.Errorf("expected dst port %d, got %d", utils.HTTPSPort, ef.DstPort) } - if ef.Protocol != tcpProto { - t.Errorf("expected protocol %d, got %d", tcpProto, ef.Protocol) + if ef.Protocol != utils.TCPProto { + t.Errorf("expected protocol %d, got %d", utils.TCPProto, ef.Protocol) } if ef.SrcVlan == 0 || ef.SrcVlan > 4094 { t.Errorf("expected valid src VLAN, got %d", ef.SrcVlan) diff --git a/netflow/profile_roundtrip_test.go b/netflow/profile_roundtrip_test.go index 5ab6bc5..74cab8d 100644 --- a/netflow/profile_roundtrip_test.go +++ b/netflow/profile_roundtrip_test.go @@ -7,13 +7,15 @@ import ( "bytes" "encoding/binary" "testing" + + "github.com/dmabry/flowgre/utils" ) func TestDataFlowSet_Generate_MinimalProfile(t *testing.T) { t.Parallel() session := NewSession() - dfs := new(DataFlowSet).Generate(5, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session, &MinimalProfile{}) + dfs := new(DataFlowSet).Generate(5, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session, &MinimalProfile{}) if len(dfs.Items) != 5 { t.Fatalf("expected 5 items, got %d", len(dfs.Items)) @@ -28,8 +30,8 @@ func TestDataFlowSet_Generate_MinimalProfile(t *testing.T) { if mf.SrcAddr == 0 { t.Errorf("item[%d]: expected non-zero src addr", i) } - if mf.DstPort != uint16(httpsPort) { - t.Errorf("item[%d]: expected dst port %d, got %d", i, httpsPort, mf.DstPort) + if mf.DstPort != uint16(utils.HTTPSPort) { + t.Errorf("item[%d]: expected dst port %d, got %d", i, utils.HTTPSPort, mf.DstPort) } } } @@ -38,7 +40,7 @@ func TestDataFlowSet_Generate_ExtendedProfile(t *testing.T) { t.Parallel() session := NewSession() - dfs := new(DataFlowSet).Generate(5, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session, &ExtendedProfile{}) + dfs := new(DataFlowSet).Generate(5, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session, &ExtendedProfile{}) if len(dfs.Items) != 5 { t.Fatalf("expected 5 items, got %d", len(dfs.Items)) @@ -53,8 +55,8 @@ func TestDataFlowSet_Generate_ExtendedProfile(t *testing.T) { if ef.SrcAddr == 0 { t.Errorf("item[%d]: expected non-zero src addr", i) } - if ef.DstPort != uint16(httpsPort) { - t.Errorf("item[%d]: expected dst port %d, got %d", i, httpsPort, ef.DstPort) + if ef.DstPort != uint16(utils.HTTPSPort) { + t.Errorf("item[%d]: expected dst port %d, got %d", i, utils.HTTPSPort, ef.DstPort) } if ef.SrcVlan == 0 { t.Errorf("item[%d]: expected non-zero src VLAN", i) @@ -66,7 +68,7 @@ func TestDataFlowSet_Generate_DefaultProfile(t *testing.T) { t.Parallel() session := NewSession() - dfs := new(DataFlowSet).Generate(5, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session) + dfs := new(DataFlowSet).Generate(5, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session) if len(dfs.Items) != 5 { t.Fatalf("expected 5 items, got %d", len(dfs.Items)) @@ -93,7 +95,7 @@ func TestMinimalProfile_RoundTrip(t *testing.T) { // Generate template + data with minimal profile tFlow := GenerateTemplateNetflow(sourceID, session, &MinimalProfile{}) - dFlow := GenerateDataNetflow(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session, &MinimalProfile{}) + dFlow := GenerateDataNetflow(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session, &MinimalProfile{}) // Serialize tBuf := tFlow.ToBytes() @@ -167,7 +169,7 @@ func TestExtendedProfile_RoundTrip(t *testing.T) { // Generate template + data with extended profile tFlow := GenerateTemplateNetflow(sourceID, session, &ExtendedProfile{}) - dFlow := GenerateDataNetflow(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session, &ExtendedProfile{}) + dFlow := GenerateDataNetflow(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session, &ExtendedProfile{}) // Serialize tBuf := tFlow.ToBytes() @@ -202,6 +204,10 @@ func TestExtendedProfile_RoundTrip(t *testing.T) { var dHeader Header binary.Read(dReader, binary.BigEndian, &dHeader) + if dHeader.Version != 9 { + t.Fatalf("expected version 9, got %d", dHeader.Version) + } + // Parse data flow set header var dFsID, dFsLen uint16 binary.Read(dReader, binary.BigEndian, &dFsID) diff --git a/stats/collector.go b/stats/collector.go index c4380c3..76bed1b 100644 --- a/stats/collector.go +++ b/stats/collector.go @@ -40,12 +40,18 @@ type Collector struct { } // Run starts the stat collection loop. It reads from StatsChan and aggregates totals. +// Uses a ticker-driven pattern instead of a busy-wait loop to avoid wasting CPU cycles. func (sc *Collector) Run(wg *sync.WaitGroup, ctx context.Context) { defer wg.Done() - limiter := time.Tick(time.Second * 5) log.Println("Stats Collector started") sizeLabel := "bytes" var sizeOut uint64 + + // Periodic ticker for periodic logging/heartbeat (unused here but keeps + // the loop from blocking indefinitely when no stats arrive). + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { select { case stat, ok := <-sc.StatsChan: @@ -79,17 +85,14 @@ func (sc *Collector) Run(wg *sync.WaitGroup, ctx context.Context) { sc.mu.Unlock() } else { log.Println("Stats Channel Closed!") + return } case <-ctx.Done(): log.Printf("Stats Collector Exiting due to signal\n") return - default: - select { - case <-limiter: - case <-ctx.Done(): - log.Printf("Stats Collector Exiting due to signal\n") - return - } + case <-ticker.C: + // Periodic tick — nothing to do, just keeps the loop alive + // so it doesn't block forever when no stats arrive. } } } diff --git a/utils/flow.go b/utils/flow.go index 6743c12..7f256f8 100644 --- a/utils/flow.go +++ b/utils/flow.go @@ -33,3 +33,46 @@ const ( ESPProto = 50 EIGRPProto = 88 ) + +// ProtoPorts is the default set of destination ports used for generating +// simulated flow traffic. Each entry maps to a well-known service. +var ProtoPorts = []int{21, 22, 53, 80, 443, 123, 161, 993, 3306, 8080, 8443, 6681, 6682} + +// ResolvePortProtocol maps a destination port to its well-known port and +// associated IP protocol number. Returns (dstPort, protocol) where protocol +// is one of TCPProto, UDPProto, etc. +// +// This centralizes the port→protocol mapping that was duplicated across +// GenericFlow, MinimalFlow, ExtendedFlow, and IPFIX GenericFlow. +func ResolvePortProtocol(flowPort int) (dstPort uint16, protocol uint8) { + switch flowPort { + case FTPPort: + return uint16(FTPPort), TCPProto + case SSHPort: + return uint16(SSHPort), TCPProto + case DNSPort: + return uint16(DNSPort), UDPProto + case HTTPPort: + return uint16(HTTPPort), TCPProto + case HTTPSPort: + return uint16(HTTPSPort), TCPProto + case NTPPort: + return uint16(NTPPort), UDPProto + case SNMPPort: + return uint16(SNMPPort), UDPProto + case IMAPSPort: + return uint16(IMAPSPort), TCPProto + case MySQLPort: + return uint16(MySQLPort), TCPProto + case HTTPAltPort: + return uint16(HTTPAltPort), TCPProto + case HTTPSAltPort: + return uint16(HTTPSAltPort), TCPProto + case P2PPort: + return uint16(P2PPort), TCPProto + case BTPort: + return uint16(BTPort), TCPProto + default: + return uint16(HTTPSPort), TCPProto + } +} diff --git a/web/web.go b/web/web.go index 71b490b..e510de4 100644 --- a/web/web.go +++ b/web/web.go @@ -48,7 +48,9 @@ func RunWebServer(ip string, port int, wg *sync.WaitGroup, ctx context.Context, }() <-ctx.Done() // Block until context is cancelled log.Printf("Web server Exiting due to signal\n") - _ = srv.Shutdown(context.Background()) + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + _ = srv.Shutdown(shutdownCtx) } // HealthHandler is used to generate json payload for health. static for now.