Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions barrage/barrage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ func worker(id int, ctx context.Context, server string, port int, srcRange strin
BytesSent: 0,
}

startTime := time.Now().UnixNano()
limiter := time.Tick(time.Millisecond * time.Duration(delay))

// Configure connection to use. It looks like a listener, but it will be used to send packet. Allows setting the source port.
srcPort := utils.RandomNum(sourcePortMin, sourcePortMax)

Expand Down Expand Up @@ -78,35 +75,35 @@ func worker(id int, ctx context.Context, server string, port int, srcRange strin

log.Printf("%s [%2d] Slinging packets at %s:%d with Source ID: %5d and delay of %dms\n",
label, id, server, port, sourceID, delay)
// Infinite loop to keep slinging until we receive context done.
takeAction := false

// Data limiter throttles flow packet generation.
dataLimiter := time.NewTicker(time.Millisecond * time.Duration(delay))
defer dataLimiter.Stop()

// Template retransmission ticker — fires every templateInterval seconds.
// When templateInterval is 0, no ticker is created so templates are never retransmitted.
var tmplTicker *time.Ticker
if templateInterval > 0 {
tmplTicker = time.NewTicker(time.Duration(templateInterval) * time.Second)
defer tmplTicker.Stop()
}

for {
now := time.Now().UnixNano()
cycle := (now - startTime) / int64(time.Second) % int64(templateInterval)
if cycle == 0 {
if takeAction {
takeAction = false
// Retransmit template every templateInterval seconds
bytes, err := utils.SendPacket(conn, &net.UDPAddr{IP: destIP, Port: port}, tBuf, false)
if err != nil {
log.Printf("%s [%2d] Issue sending template packet: %v", label, id, err)
return
}
wStats.FlowsSent++
wStats.BytesSent += uint64(bytes)
statsChan <- wStats
}
} else {
takeAction = true
}
select {
case <-ctx.Done(): // Caught the signal to be done.... time to wrap it up
case <-ctx.Done():
log.Printf("%s [%2d] Exiting due to signal\n", label, id)
return
default:
// Basic limiter to throttle/delay packets
<-limiter
case <-tmplTicker.C:
// Retransmit template every templateInterval seconds
bytes, err := utils.SendPacket(conn, &net.UDPAddr{IP: destIP, Port: port}, tBuf, false)
if err != nil {
log.Printf("%s [%2d] Issue sending template packet: %v", label, id, err)
return
}
wStats.FlowsSent++
wStats.BytesSent += uint64(bytes)
statsChan <- wStats
case <-dataLimiter.C:
flowCount := utils.RandomNum(5, 25)
buf := gen.GenerateData(flowCount, sourceID, srcRange, dstRange, session)
bytes, err := utils.SendPacket(conn, &net.UDPAddr{IP: destIP, Port: port}, buf, false)
Expand Down
73 changes: 3 additions & 70 deletions ipfix/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,6 @@ import (
// IPFIX version number per RFC 7011.
const Version = 10

// Port constants (aliases for backwards compatibility within this package)
const (
httpsPort = utils.HTTPSPort
sshPort = utils.SSHPort
ftpPort = utils.FTPPort
dnsPort = utils.DNSPort
httpPort = utils.HTTPPort
ntpPort = utils.NTPPort
snmpPort = utils.SNMPPort
imapsPort = utils.IMAPSPort
mysqlPort = utils.MySQLPort
httpAltPort = utils.HTTPAltPort
httpsAltPort = utils.HTTPSAltPort
p2pPort = utils.P2PPort
btPort = utils.BTPort
)

// Protocol constants (aliases for backwards compatibility within this package)
const (
tcpProto = utils.TCPProto
udpProto = utils.UDPProto
icmpProto = utils.ICMPProto
)

// IANA IPFIX field type constants (RFC 7011 / Information Model)
const (
ProtocolIdentifier = 4
Expand Down Expand Up @@ -342,50 +318,7 @@ func (gf *GenericFlow) Generate(srcIP net.IP, dstIP net.IP, flowSrcPort int, ses
gf.IPClassOfService = 0
gf.FlowEndReason = uint8(utils.RandomNum(0, 4)) // 0=active, 1=idle, 2=other, 3=exporterReset, 4=exporterShutdown

switch flowSrcPort {
case sshPort:
gf.DestPort = uint16(sshPort)
gf.ProtocolIdentifier = tcpProto
case ftpPort:
gf.DestPort = uint16(ftpPort)
gf.ProtocolIdentifier = tcpProto
case dnsPort:
gf.DestPort = uint16(dnsPort)
gf.ProtocolIdentifier = udpProto
case httpPort:
gf.DestPort = uint16(httpPort)
gf.ProtocolIdentifier = tcpProto
case httpsPort:
gf.DestPort = uint16(httpsPort)
gf.ProtocolIdentifier = tcpProto
case ntpPort:
gf.DestPort = uint16(ntpPort)
gf.ProtocolIdentifier = udpProto
case snmpPort:
gf.DestPort = uint16(snmpPort)
gf.ProtocolIdentifier = udpProto
case imapsPort:
gf.DestPort = uint16(imapsPort)
gf.ProtocolIdentifier = tcpProto
case mysqlPort:
gf.DestPort = uint16(mysqlPort)
gf.ProtocolIdentifier = tcpProto
case httpAltPort:
gf.DestPort = uint16(httpAltPort)
gf.ProtocolIdentifier = tcpProto
case httpsAltPort:
gf.DestPort = uint16(httpsAltPort)
gf.ProtocolIdentifier = tcpProto
case p2pPort:
gf.DestPort = uint16(p2pPort)
gf.ProtocolIdentifier = tcpProto
case btPort:
gf.DestPort = uint16(btPort)
gf.ProtocolIdentifier = tcpProto
default:
gf.DestPort = uint16(httpsPort)
gf.ProtocolIdentifier = tcpProto
}
gf.DestPort, gf.ProtocolIdentifier = utils.ResolvePortProtocol(flowSrcPort)

return *gf
}
Expand All @@ -406,7 +339,7 @@ type DataFlowSet struct {
func (d *DataFlowSet) Generate(flowCount int, srcRange string, dstRange string, flowSrcPort int, session *netflow.Session, profile ...IPFIXFlowProfile) DataFlowSet {
_ = profile // reserved for future profile-aware data generation

protoPorts := []int{21, 22, 53, 80, 443, 123, 161, 993, 3306, 8080, 8443, 6681, 6682}
protoPorts := utils.ProtoPorts

items := make([]DataAny, flowCount)
for i := range flowCount {
Expand Down Expand Up @@ -672,7 +605,7 @@ func GenerateDataIPFIX(flowCount int, sourceID int, srcRange string, dstRange st
// GenerateIPFIX creates an IPFIX packet containing both template and data FlowSets.
func GenerateIPFIX(flowCount int, sourceID int, srcRange string, dstRange string, session *netflow.Session) IPFIX {
templateFlow := new(TemplateFlowSet).Generate(session)
dataFlow := new(DataFlowSet).Generate(flowCount, srcRange, dstRange, httpsPort, session)
dataFlow := new(DataFlowSet).Generate(flowCount, srcRange, dstRange, utils.HTTPSPort, session)
header := new(Header).Generate(flowCount+1, sourceID, session)
return IPFIX{
Header: header,
Expand Down
17 changes: 9 additions & 8 deletions ipfix/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/dmabry/flowgre/netflow"
"github.com/dmabry/flowgre/utils"
"github.com/google/go-cmp/cmp"
)

Expand Down Expand Up @@ -75,7 +76,7 @@ func TestGenerateDataIPFIX(t *testing.T) {
flowCount := 10
sourceID := 618
session := netflow.NewSession()
flow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session)
flow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session)

if len(flow.DataFlowSets) < 1 {
t.Fatal("No data flowsets generated")
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestToBytes_RoundTrip(t *testing.T) {
session := netflow.NewSession()

tFlow := GenerateTemplateIPFIX(sourceID, session)
dFlow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session)
dFlow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session)

tBuf := tFlow.ToBytes()
dBuf := dFlow.ToBytes()
Expand Down Expand Up @@ -418,7 +419,7 @@ func TestToBytes_BufferLengthMatchesFlowSetLengths(t *testing.T) {
}

// Test data-only packet
dFlow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", httpsPort, session)
dFlow := GenerateDataIPFIX(flowCount, sourceID, "10.0.0.0/8", "10.0.0.0/8", utils.HTTPSPort, session)
dBuf := dFlow.ToBytes()
expectedDLen := binary.Size(dFlow.Header)
for _, fs := range dFlow.DataFlowSets {
Expand Down Expand Up @@ -500,7 +501,7 @@ func TestGenericFlowIPv6(t *testing.T) {
dstIP := net.ParseIP("2001:db8::2")
session := netflow.NewSession()

result := new(GenericFlow).Generate(srcIP, dstIP, httpsPort, session)
result := new(GenericFlow).Generate(srcIP, dstIP, utils.HTTPSPort, session)

// IPv4 fields should be zeroed
if result.SourceIPv4Addr != 0 {
Expand Down Expand Up @@ -538,7 +539,7 @@ func TestGenericFlowIPv4_ZerosIPv6(t *testing.T) {
dstIP := net.ParseIP("10.0.0.2")
session := netflow.NewSession()

result := new(GenericFlow).Generate(srcIP, dstIP, httpsPort, session)
result := new(GenericFlow).Generate(srcIP, dstIP, utils.HTTPSPort, session)

// IPv4 fields should be populated
if result.SourceIPv4Addr == 0 {
Expand Down Expand Up @@ -568,7 +569,7 @@ func TestGenerateDataIPFIX_IPv6(t *testing.T) {
flowCount := 10
sourceID := 618
session := netflow.NewSession()
flow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", httpsPort, session)
flow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", utils.HTTPSPort, session)

if len(flow.DataFlowSets) < 1 {
t.Fatal("No data flowsets generated")
Expand Down Expand Up @@ -633,7 +634,7 @@ func TestToBytes_IPv6_RoundTrip(t *testing.T) {
flowCount := 10
session := netflow.NewSession()

dFlow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", httpsPort, session)
dFlow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", utils.HTTPSPort, session)
dBuf := dFlow.ToBytes()

// Read back
Expand Down Expand Up @@ -698,7 +699,7 @@ func TestToBytes_IPv6_BufferLengthMatchesFlowSetLengths(t *testing.T) {
flowCount := 10
session := netflow.NewSession()

dFlow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", httpsPort, session)
dFlow := GenerateDataIPFIX(flowCount, sourceID, "2001:db8::/32", "2001:db8::/32", utils.HTTPSPort, session)
dBuf := dFlow.ToBytes()
expectedDLen := binary.Size(dFlow.Header)
for _, fs := range dFlow.DataFlowSets {
Expand Down
4 changes: 2 additions & 2 deletions netflow/dataflowset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (d *DataFlowSet) Generate(flowCount int, srcRange string, dstRange string,

dataFlowSet := new(DataFlowSet)
dataFlowSet.FlowSetID = 256
protoPorts := [13]int{21, 22, 53, 80, 443, 123, 161, 993, 3306, 8080, 8443, 6681, 6682}
protoPorts := utils.ProtoPorts
items := make([]DataAny, flowCount)
for i := range flowCount {
srcIP, err := utils.RandomIPCIDR(srcRange)
Expand All @@ -51,7 +51,7 @@ func (d *DataFlowSet) Generate(flowCount int, srcRange string, dstRange string,
}
var flowPort int
if flowSrcPort == 0 {
flowPort = protoPorts[utils.RandomNum(0, 12)]
flowPort = protoPorts[utils.RandomNum(0, len(protoPorts))]
} else {
flowPort = flowSrcPort
}
Expand Down
76 changes: 1 addition & 75 deletions netflow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,6 @@ import (
"github.com/dmabry/flowgre/utils"
)

// Port constants (aliases for backwards compatibility within this package)
const (
ftpPort = utils.FTPPort
sshPort = utils.SSHPort
dnsPort = utils.DNSPort
httpPort = utils.HTTPPort
httpsPort = utils.HTTPSPort
ntpPort = utils.NTPPort
snmpPort = utils.SNMPPort
imapsPort = utils.IMAPSPort
mysqlPort = utils.MySQLPort
httpAltPort = utils.HTTPAltPort
httpsAltPort = utils.HTTPSAltPort
p2pPort = utils.P2PPort
btPort = utils.BTPort
)

// Protocol constants (aliases for backwards compatibility within this package)
const (
tcpProto = utils.TCPProto
udpProto = utils.UDPProto
icmpProto = utils.ICMPProto
sctpProto = utils.SCTPProto
igmpProto = utils.IGMPProto
egpProto = utils.EGPProto
igpProto = utils.IGPProto
greProto = utils.GREProto
espProto = utils.ESPProto
eigrpProto = utils.EIGRPProto
)

// Constants for Field Types
const (
IN_BYTES = 1
Expand Down Expand Up @@ -222,50 +191,7 @@ func (gf *GenericFlow) Generate(srcIP net.IP, dstIP net.IP, flowSrcPort int, ses
gf.EngineType = 0
gf.EngineID = 0

switch flowSrcPort {
case sshPort:
gf.L4DstPort = uint16(sshPort)
gf.Protocol = uint8(tcpProto)
case ftpPort:
gf.L4DstPort = uint16(ftpPort)
gf.Protocol = uint8(tcpProto)
case dnsPort:
gf.L4DstPort = uint16(dnsPort)
gf.Protocol = uint8(udpProto)
case httpPort:
gf.L4DstPort = uint16(httpPort)
gf.Protocol = uint8(tcpProto)
case httpsPort:
gf.L4DstPort = uint16(httpsPort)
gf.Protocol = uint8(tcpProto)
case ntpPort:
gf.L4DstPort = uint16(ntpPort)
gf.Protocol = uint8(udpProto)
case snmpPort:
gf.L4DstPort = uint16(snmpPort)
gf.Protocol = uint8(udpProto)
case imapsPort:
gf.L4DstPort = uint16(imapsPort)
gf.Protocol = uint8(tcpProto)
case mysqlPort:
gf.L4DstPort = uint16(mysqlPort)
gf.Protocol = uint8(tcpProto)
case httpAltPort:
gf.L4DstPort = uint16(httpAltPort)
gf.Protocol = uint8(tcpProto)
case httpsAltPort:
gf.L4DstPort = uint16(httpsAltPort)
gf.Protocol = uint8(tcpProto)
case p2pPort:
gf.L4DstPort = uint16(p2pPort)
gf.Protocol = uint8(tcpProto)
case btPort:
gf.L4DstPort = uint16(btPort)
gf.Protocol = uint8(tcpProto)
default:
gf.L4DstPort = uint16(httpsPort)
gf.Protocol = uint8(tcpProto)
}
gf.L4DstPort, gf.Protocol = utils.ResolvePortProtocol(flowSrcPort)

return *gf
}
4 changes: 3 additions & 1 deletion netflow/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"encoding/binary"
"fmt"
"time"

"github.com/dmabry/flowgre/utils"
)

func GenerateNetflow(flowCount int, sourceID int, srcRange string, dstRange string, session *Session, profile ...FlowProfile) Netflow {
netflow := new(Netflow)
templateFlow := new(TemplateFlowSet).Generate(session, profile...)
dataFlow := new(DataFlowSet).Generate(flowCount, srcRange, dstRange, httpsPort, session, profile...)
dataFlow := new(DataFlowSet).Generate(flowCount, srcRange, dstRange, utils.HTTPSPort, session, profile...)
header := new(Header).Generate(flowCount+1, sourceID, session) // always +1 of dataflow count, because we are counting the template
netflow.Header = header
netflow.TemplateFlowSets = append(netflow.TemplateFlowSets, templateFlow)
Expand Down
Loading
Loading