Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions lib/logstash/codecs/netflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class LogStash::Codecs::Netflow < LogStash::Codecs::Base
def initialize(params = {})
super(params)
@threadsafe = true
@payload_buffer = ""
end

def clone
Expand All @@ -75,6 +76,12 @@ def register

def decode(payload, metadata = nil, &block)
# BinData::trace_reading do
unless @payload_buffer.empty?
# Reuse previously bufferized payload
payload = @payload_buffer + payload
@payload_buffer = ""
end

header = Header.read(payload)

unless @versions.include?(header.version)
Expand All @@ -100,9 +107,24 @@ def decode(payload, metadata = nil, &block)
end
elsif header.version == 10
# BinData::trace_reading do
flowset = IpfixPDU.read(payload)
flowset.records.each do |record|
decode_ipfix(flowset, record).each { |event| yield(event) }
while payload.bytesize > 4
flowset = IpfixShortPDU.read(payload)
if flowset.pdu_length > payload.bytesize
# Incomplete PDU => bufferize & wait for next call
@payload_buffer = payload
payload = ""
else
flowset = IpfixPDU.read(payload)
flowset.records.each do |record|
decode_ipfix(flowset, record).each { |event| yield(event) }
end
# Remove processed PDU from payload
payload = payload.byteslice(flowset.pdu_length..-1)
end
end
unless payload.empty?
# Not enough bytes to read PDU length => bufferize & wait for next call
@payload_buffer = payload
end
# end
else
Expand Down
6 changes: 6 additions & 0 deletions lib/logstash/codecs/netflow/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,12 @@ class IpfixOptionFlowset < BinData::Record
end
end

class IpfixShortPDU < BinData::Record
endian :big
uint16 :version
uint16 :pdu_length
end

class IpfixPDU < BinData::Record
endian :big
uint16 :version
Expand Down
207 changes: 206 additions & 1 deletion spec/codecs/netflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -662,10 +662,167 @@
}
END

events << <<-END
{
"@timestamp" : "2015-05-13T11:20:26.000Z",
"netflow" : {
"destinationIPv4Address" : "10.4.36.64",
"destinationTransportPort" : 9200,
"egressInterface" : 0,
"flowEndSysUpTime" : 1356,
"flowStartSysUpTime" : 1356,
"icmpTypeCodeIPv4" : 0,
"ingressInterface" : 0,
"ipClassOfService" : 0,
"ipVersion" : 4,
"octetDeltaCount" : 60,
"packetDeltaCount" : 1,
"protocolIdentifier" : 6,
"sourceIPv4Address" : "192.168.253.130",
"sourceTransportPort" : 38254,
"tcpControlBits" : 2,
"version" : 10,
"vlanId" : 0
},
"@version" : "1"
}
END

events << <<-END
{
"@timestamp" : "2015-05-13T11:20:28.000Z",
"netflow" : {
"destinationIPv4Address" : "192.168.253.128",
"destinationTransportPort" : 22,
"egressInterface" : 0,
"flowEndSysUpTime" : 14611,
"flowStartSysUpTime" : 12727,
"icmpTypeCodeIPv4" : 0,
"ingressInterface" : 0,
"ipClassOfService" : 0,
"ipVersion" : 4,
"octetDeltaCount" : 256,
"packetDeltaCount" : 4,
"protocolIdentifier" : 6,
"sourceIPv4Address" : "192.168.253.1",
"sourceTransportPort" : 60560,
"tcpControlBits" : 24,
"version" : 10,
"vlanId" : 0
},
"@version" : "1"
}
END

events << <<-END
{
"@timestamp" : "2015-05-13T11:20:28.000Z",
"netflow" : {
"destinationIPv4Address" : "192.168.253.1",
"destinationTransportPort" : 60560,
"egressInterface" : 0,
"flowEndSysUpTime" : 14611,
"flowStartSysUpTime" : 12727,
"icmpTypeCodeIPv4" : 0,
"ingressInterface" : 0,
"ipClassOfService" : 0,
"ipVersion" : 4,
"octetDeltaCount" : 1916,
"packetDeltaCount" : 3,
"protocolIdentifier" : 6,
"sourceIPv4Address" : "192.168.253.128",
"sourceTransportPort" : 22,
"tcpControlBits" : 24,
"version" : 10,
"vlanId" : 0
},
"@version" : "1"
}
END

events << <<-END
{
"@timestamp" : "2015-05-13T11:20:28.000Z",
"netflow" : {
"destinationIPv4Address" : "192.168.253.128",
"destinationTransportPort" : 22,
"egressInterface" : 0,
"flowEndSysUpTime" : 12726,
"flowStartSysUpTime" : 12725,
"icmpTypeCodeIPv4" : 0,
"ingressInterface" : 0,
"ipClassOfService" : 0,
"ipVersion" : 4,
"octetDeltaCount" : 168,
"packetDeltaCount" : 2,
"protocolIdentifier" : 6,
"sourceIPv4Address" : "192.168.253.1",
"sourceTransportPort" : 65308,
"tcpControlBits" : 24,
"version" : 10,
"vlanId" : 0
},
"@version" : "1"
}
END


events << <<-END
{
"@timestamp" : "2015-05-13T11:20:28.000Z",
"netflow" : {
"destinationIPv4Address" : "192.168.253.1",
"destinationTransportPort" : 65308,
"egressInterface" : 0,
"flowEndSysUpTime" : 12726,
"flowStartSysUpTime" : 12725,
"icmpTypeCodeIPv4" : 0,
"ingressInterface" : 0,
"ipClassOfService" : 0,
"ipVersion" : 4,
"octetDeltaCount" : 84,
"packetDeltaCount" : 1,
"protocolIdentifier" : 6,
"sourceIPv4Address" : "192.168.253.128",
"sourceTransportPort" : 22,
"tcpControlBits" : 24,
"version" : 10,
"vlanId" : 0
},
"@version" : "1"
}
END

events << <<-END
{
"@timestamp" : "2015-05-13T11:20:28.000Z",
"netflow" : {
"destinationIPv4Address" : "224.0.0.251",
"destinationTransportPort" : 5353,
"egressInterface" : 0,
"flowEndSysUpTime" : 12741,
"flowStartSysUpTime" : 12741,
"icmpTypeCodeIPv4" : 0,
"ingressInterface" : 0,
"ipClassOfService" : 0,
"ipVersion" : 4,
"octetDeltaCount" : 232,
"packetDeltaCount" : 1,
"protocolIdentifier" : 17,
"sourceIPv4Address" : "192.168.253.1",
"sourceTransportPort" : 5353,
"tcpControlBits" : 0,
"version" : 10,
"vlanId" : 0
},
"@version" : "1"
}
END

end

it "should decode raw data" do
expect(decode.size).to eq(7)
expect(decode.size).to eq(13)

expect(decode[0].get("[netflow][version]")).to eq(10)
expect(decode[0].get("[netflow][systemInitTimeMilliseconds]")).to eq(1431516013506)
Expand Down Expand Up @@ -711,6 +868,48 @@
expect(decode[6].get("[netflow][destinationTransportPort]")).to eq(443)
expect(decode[6].get("[netflow][protocolIdentifier]")).to eq(6)
expect(decode[6].get("[netflow][tcpControlBits]")).to eq(26)

expect(decode[7].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.130")
expect(decode[7].get("[netflow][destinationIPv4Address]")).to eq("10.4.36.64")
expect(decode[7].get("[netflow][sourceTransportPort]")).to eq(38254)
expect(decode[7].get("[netflow][destinationTransportPort]")).to eq(9200)
expect(decode[7].get("[netflow][protocolIdentifier]")).to eq(6)
expect(decode[7].get("[netflow][tcpControlBits]")).to eq(2)

expect(decode[8].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.1")
expect(decode[8].get("[netflow][destinationIPv4Address]")).to eq("192.168.253.128")
expect(decode[8].get("[netflow][sourceTransportPort]")).to eq(60560)
expect(decode[8].get("[netflow][destinationTransportPort]")).to eq(22)
expect(decode[8].get("[netflow][protocolIdentifier]")).to eq(6)
expect(decode[8].get("[netflow][tcpControlBits]")).to eq(24)

expect(decode[9].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.128")
expect(decode[9].get("[netflow][destinationIPv4Address]")).to eq("192.168.253.1")
expect(decode[9].get("[netflow][sourceTransportPort]")).to eq(22)
expect(decode[9].get("[netflow][destinationTransportPort]")).to eq(60560)
expect(decode[9].get("[netflow][protocolIdentifier]")).to eq(6)
expect(decode[9].get("[netflow][tcpControlBits]")).to eq(24)

expect(decode[10].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.1")
expect(decode[10].get("[netflow][destinationIPv4Address]")).to eq("192.168.253.128")
expect(decode[10].get("[netflow][sourceTransportPort]")).to eq(65308)
expect(decode[10].get("[netflow][destinationTransportPort]")).to eq(22)
expect(decode[10].get("[netflow][protocolIdentifier]")).to eq(6)
expect(decode[10].get("[netflow][tcpControlBits]")).to eq(24)

expect(decode[11].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.128")
expect(decode[11].get("[netflow][destinationIPv4Address]")).to eq("192.168.253.1")
expect(decode[11].get("[netflow][sourceTransportPort]")).to eq(22)
expect(decode[11].get("[netflow][destinationTransportPort]")).to eq(65308)
expect(decode[11].get("[netflow][protocolIdentifier]")).to eq(6)
expect(decode[11].get("[netflow][tcpControlBits]")).to eq(24)

expect(decode[12].get("[netflow][sourceIPv4Address]")).to eq("192.168.253.1")
expect(decode[12].get("[netflow][destinationIPv4Address]")).to eq("224.0.0.251")
expect(decode[12].get("[netflow][sourceTransportPort]")).to eq(5353)
expect(decode[12].get("[netflow][destinationTransportPort]")).to eq(5353)
expect(decode[12].get("[netflow][protocolIdentifier]")).to eq(17)
expect(decode[12].get("[netflow][tcpControlBits]")).to eq(0)
end

it "should serialize to json" do
Expand All @@ -721,6 +920,12 @@
expect(JSON.parse(decode[4].to_json)).to eq(JSON.parse(json_events[4]))
expect(JSON.parse(decode[5].to_json)).to eq(JSON.parse(json_events[5]))
expect(JSON.parse(decode[6].to_json)).to eq(JSON.parse(json_events[6]))
expect(JSON.parse(decode[7].to_json)).to eq(JSON.parse(json_events[7]))
expect(JSON.parse(decode[8].to_json)).to eq(JSON.parse(json_events[8]))
expect(JSON.parse(decode[9].to_json)).to eq(JSON.parse(json_events[9]))
expect(JSON.parse(decode[10].to_json)).to eq(JSON.parse(json_events[10]))
expect(JSON.parse(decode[11].to_json)).to eq(JSON.parse(json_events[11]))
expect(JSON.parse(decode[12].to_json)).to eq(JSON.parse(json_events[12]))
end

end
Expand Down