Skip to content
This repository was archived by the owner on Mar 21, 2023. It is now read-only.

Commit 07a5b50

Browse files
author
Jochen Schalanda
committed
Fix NetFlow 9 type parsing
The NetFlowV9Parser ignored unknown/invalid type definitions which lead to wrong byte offsets when processing NetFlowV9 flows. Additionally, custom length specifications were ignored when converting the field types to unsigned integers so that too many or too few bytes would be read. (cherry picked from commit 167d091)
1 parent 2f3a98b commit 07a5b50

File tree

9 files changed

+298
-52
lines changed

9 files changed

+298
-52
lines changed

src/main/java/org/graylog/plugins/netflow/flows/NetFlowFormatter.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ private static String toMessageString(NetFlowV9BaseRecord record) {
5555
final long octetCount = (long) fields.get("in_bytes");
5656
final String srcAddr = (String) fields.get("ipv4_src_addr");
5757
final String dstAddr = (String) fields.get("ipv4_dst_addr");
58-
final int srcPort = (int) fields.get("l4_src_port");
59-
final int dstPort = (int) fields.get("l4_dst_port");
60-
final short protocol = (short) fields.get("protocol");
58+
final Integer srcPort = (Integer) fields.get("l4_src_port");
59+
final Integer dstPort = (Integer) fields.get("l4_dst_port");
60+
final Short protocol = (Short) fields.get("protocol");
6161

6262
return String.format("NetFlowV9 [%s]:%d <> [%s]:%d proto:%d pkts:%d bytes:%d",
6363
srcAddr, srcPort,
@@ -125,11 +125,11 @@ public static Message toMessage(NetFlowV9Header header,
125125

126126
final String srcAddr = (String) fields.get("ipv4_src_addr");
127127
final String dstAddr = (String) fields.get("ipv4_dst_addr");
128-
final int srcPort = (int) fields.get("l4_src_port");
129-
final int dstPort = (int) fields.get("l4_dst_port");
128+
final Object srcPort = fields.get("l4_src_port");
129+
final Object dstPort = fields.get("l4_dst_port");
130130
final String ipv4NextHop = (String) fields.get("ipv4_next_hop");
131-
final long first = (long) fields.get("first_switched");
132-
final long last = (long) fields.get("last_switched");
131+
final Long first = (Long) fields.get("first_switched");
132+
final Long last = (Long) fields.get("last_switched");
133133

134134
message.addField(MF_FLOW_PACKET_ID, header.sequence());
135135
message.addField(MF_TOS, fields.get("ip_tos"));
@@ -148,18 +148,22 @@ public static Message toMessage(NetFlowV9Header header,
148148
message.addField(MF_DST_MASK, fields.get("dst_mask"));
149149
message.addField(MF_SRC_AS, fields.get("src_as"));
150150
message.addField(MF_DST_AS, fields.get("dst_as"));
151-
message.addField(MF_PROTO, fields.get("protocol"));
152-
final Protocol protocolInfo = Protocol.getByNumber((short) fields.get("protocol"));
153-
if (protocolInfo != null) {
154-
message.addField(MF_PROTO_NAME, protocolInfo.getAlias());
151+
final Object protocol = fields.get("protocol");
152+
if (protocol != null) {
153+
message.addField(MF_PROTO, protocol);
154+
short protocolNumber = ((Number) protocol).shortValue();
155+
final Protocol protocolInfo = Protocol.getByNumber(protocolNumber);
156+
if (protocolInfo != null) {
157+
message.addField(MF_PROTO_NAME, protocolInfo.getAlias());
158+
}
155159
}
156160
message.addField(MF_TCP_FLAGS, fields.get("tcp_flags"));
157161

158-
if (first > 0) {
162+
if (first != null && first > 0) {
159163
long start = timestamp - (header.sysUptime() - first);
160164
message.addField(MF_START, new DateTime(start, DateTimeZone.UTC));
161165
}
162-
if (last > 0) {
166+
if (last != null && last > 0) {
163167
long stop = timestamp - (header.sysUptime() - last);
164168
message.addField(MF_STOP, new DateTime(stop, DateTimeZone.UTC));
165169
}

src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldDef.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,29 +46,21 @@ public Optional<Object> parse(ByteBuf bb) {
4646
int len = length() != 0 ? length() : type().valueType().getDefaultLength();
4747
switch (type().valueType()) {
4848
case UINT8:
49-
return Optional.of(bb.readUnsignedByte());
49+
case UINT16:
50+
case UINT24:
51+
case UINT32:
52+
case UINT64:
53+
return parseUnsignedNumber(bb, len);
5054
case INT8:
5155
return Optional.of(bb.readByte());
52-
case UINT16:
53-
return Optional.of(bb.readUnsignedShort());
5456
case INT16:
5557
return Optional.of(bb.readShort());
56-
case UINT24:
57-
return Optional.of(bb.readUnsignedMedium());
5858
case INT24:
5959
return Optional.of(bb.readMedium());
60-
case UINT32:
61-
return Optional.of(bb.readUnsignedInt());
6260
case INT32:
6361
return Optional.of(bb.readInt());
64-
case UINT64:
65-
byte[] uint64Bytes = new byte[8];
66-
bb.readBytes(uint64Bytes);
67-
return Optional.of(new BigInteger(uint64Bytes));
6862
case INT64:
6963
return Optional.of(bb.readLong());
70-
case VARINT:
71-
return Optional.of(parseLong(bb, len));
7264
case IPV4:
7365
byte[] b = new byte[4];
7466
bb.readBytes(b);
@@ -98,12 +90,23 @@ public Optional<Object> parse(ByteBuf bb) {
9890
}
9991
}
10092

101-
private long parseLong(ByteBuf bb, int length) {
102-
long l = 0;
103-
for (int i = 0; i < length; i++) {
104-
l <<= 8;
105-
l |= bb.readUnsignedByte();
93+
private Optional<Object> parseUnsignedNumber(ByteBuf bb, int length) {
94+
switch (length) {
95+
case 1:
96+
return Optional.of(bb.readUnsignedByte());
97+
case 2:
98+
return Optional.of(bb.readUnsignedShort());
99+
case 3:
100+
return Optional.of(bb.readUnsignedMedium());
101+
case 4:
102+
return Optional.of(bb.readUnsignedInt());
103+
case 8:
104+
return Optional.of(bb.readLong());
105+
default:
106+
byte[] uint64Bytes = new byte[length];
107+
bb.readBytes(uint64Bytes);
108+
return Optional.of(new BigInteger(uint64Bytes));
106109
}
107-
return l;
110+
108111
}
109112
}

src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldType.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,18 @@ public enum ValueType {
5353
public int getDefaultLength() {
5454
return defaultLength;
5555
}
56+
57+
public static ValueType byLength(int length) {
58+
switch (length) {
59+
case 1: return UINT8;
60+
case 2: return UINT16;
61+
case 3: return UINT24;
62+
case 4: return UINT32;
63+
case 6: return MAC;
64+
case 8: return UINT64;
65+
case 16: return IPV6;
66+
default: return VARINT;
67+
}
68+
}
5669
}
5770
}

src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Parser.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,15 @@ public static NetFlowV9Packet parsePacket(ByteBuf bb, NetFlowV9TemplateCache cac
3434
final int dataLength = bb.readableBytes();
3535
final NetFlowV9Header header = parseHeader(bb);
3636

37-
List<NetFlowV9Template> templates = Collections.emptyList();
37+
final ImmutableList.Builder<NetFlowV9Template> allTemplates = ImmutableList.builder();
3838
NetFlowV9OptionTemplate optTemplate = null;
3939
List<NetFlowV9BaseRecord> records = Collections.emptyList();
4040
while (bb.isReadable()) {
4141
bb.markReaderIndex();
4242
int flowSetId = bb.readUnsignedShort();
4343
if (flowSetId == 0) {
44-
templates = parseTemplates(bb, typeRegistry);
44+
final List<NetFlowV9Template> templates = parseTemplates(bb, typeRegistry);
45+
allTemplates.addAll(templates);
4546
for (NetFlowV9Template t : templates) {
4647
cache.put(t);
4748
}
@@ -56,7 +57,7 @@ public static NetFlowV9Packet parsePacket(ByteBuf bb, NetFlowV9TemplateCache cac
5657

5758
return NetFlowV9Packet.create(
5859
header,
59-
templates,
60+
allTemplates.build(),
6061
optTemplate,
6162
records,
6263
dataLength);
@@ -116,10 +117,13 @@ public static List<NetFlowV9Template> parseTemplates(ByteBuf bb, NetFlowV9FieldT
116117
for (int i = 0; i < fieldCount; i++) {
117118
int fieldType = bb.readUnsignedShort();
118119
int fieldLength = bb.readUnsignedShort();
119-
final NetFlowV9FieldType type = typeRegistry.get(fieldType);
120-
if (type == null) {
121-
// Skip unknown/invalid field type
122-
continue;
120+
final NetFlowV9FieldType registeredType = typeRegistry.get(fieldType);
121+
final NetFlowV9FieldType type;
122+
if (registeredType == null) {
123+
// Unknown/invalid field type
124+
type = NetFlowV9FieldType.create(fieldType, NetFlowV9FieldType.ValueType.byLength(fieldLength), "nf_field_" + fieldType);
125+
} else {
126+
type = registeredType;
123127
}
124128
final NetFlowV9FieldDef fieldDef = NetFlowV9FieldDef.create(type, fieldLength);
125129
fieldDefs.add(fieldDef);

src/test/java/org/graylog/plugins/netflow/codecs/NetFlowCodecTest.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ public void decodeMessagesSuccessfullyDecodesNetFlowV9() throws Exception {
197197
.containsEntry("nf_src_address", "192.168.124.1")
198198
.containsEntry("nf_dst_address", "239.255.255.250")
199199
.containsEntry("nf_proto_name", "UDP")
200-
.containsEntry("nf_src_as", 0)
201-
.containsEntry("nf_dst_as", 0)
200+
.containsEntry("nf_src_as", 0L)
201+
.containsEntry("nf_dst_as", 0L)
202202
.containsEntry("nf_snmp_input", 0)
203203
.containsEntry("nf_snmp_output", 0);
204204

@@ -215,14 +215,14 @@ public void decodeMessagesSuccessfullyDecodesNetFlowV9() throws Exception {
215215
.containsEntry("nf_src_address", "192.168.124.20")
216216
.containsEntry("nf_dst_address", "121.161.231.32")
217217
.containsEntry("nf_proto_name", "UDP")
218-
.containsEntry("nf_src_as", 0)
219-
.containsEntry("nf_dst_as", 0)
218+
.containsEntry("nf_src_as", 0L)
219+
.containsEntry("nf_dst_as", 0L)
220220
.containsEntry("nf_snmp_input", 0)
221221
.containsEntry("nf_snmp_output", 0);
222222
}
223223

224224
@Test
225-
public void pcapNetFlowV5() throws Exception {
225+
public void pcap_softflowd_NetFlowV5() throws Exception {
226226
final List<Message> allMessages = new ArrayList<>();
227227
try (InputStream inputStream = Resources.getResource("netflow-data/netflow5.pcap").openStream()) {
228228
final Pcap pcap = Pcap.openStream(inputStream);
@@ -244,7 +244,29 @@ public void pcapNetFlowV5() throws Exception {
244244
}
245245

246246
@Test
247-
public void pcapNetFlowV9() throws Exception {
247+
public void pcap_pmacctd_NetFlowV5() throws Exception {
248+
final List<Message> allMessages = new ArrayList<>();
249+
try (InputStream inputStream = Resources.getResource("netflow-data/pmacctd-netflow5.pcap").openStream()) {
250+
final Pcap pcap = Pcap.openStream(inputStream);
251+
pcap.loop(packet -> {
252+
if (packet.hasProtocol(Protocol.UDP)) {
253+
final UDPPacket udp = (UDPPacket) packet.getPacket(Protocol.UDP);
254+
final InetSocketAddress source = new InetSocketAddress(udp.getSourceIP(), udp.getSourcePort());
255+
final Collection<Message> messages = codec.decodeMessages(new RawMessage(udp.getPayload().getArray(), source));
256+
assertThat(messages)
257+
.isNotNull()
258+
.isNotEmpty();
259+
allMessages.addAll(messages);
260+
}
261+
return true;
262+
}
263+
);
264+
assertThat(allMessages).hasSize(42);
265+
}
266+
}
267+
268+
@Test
269+
public void pcap_softflowd_NetFlowV9() throws Exception {
248270
final List<Message> allMessages = new ArrayList<>();
249271
try (InputStream inputStream = Resources.getResource("netflow-data/netflow9.pcap").openStream()) {
250272
final Pcap pcap = Pcap.openStream(inputStream);
@@ -264,4 +286,26 @@ public void pcapNetFlowV9() throws Exception {
264286
}
265287
assertThat(allMessages).hasSize(19);
266288
}
289+
290+
@Test
291+
public void pcap_pmacctd_NetFlowV9() throws Exception {
292+
final List<Message> allMessages = new ArrayList<>();
293+
try (InputStream inputStream = Resources.getResource("netflow-data/pmacctd-netflow9.pcap").openStream()) {
294+
final Pcap pcap = Pcap.openStream(inputStream);
295+
pcap.loop(packet -> {
296+
if (packet.hasProtocol(Protocol.UDP)) {
297+
final UDPPacket udp = (UDPPacket) packet.getPacket(Protocol.UDP);
298+
final InetSocketAddress source = new InetSocketAddress(udp.getSourceIP(), udp.getSourcePort());
299+
final Collection<Message> messages = codec.decodeMessages(new RawMessage(udp.getPayload().getArray(), source));
300+
assertThat(messages)
301+
.isNotNull()
302+
.isNotEmpty();
303+
allMessages.addAll(messages);
304+
}
305+
return true;
306+
}
307+
);
308+
}
309+
assertThat(allMessages).hasSize(6);
310+
}
267311
}

src/test/java/org/graylog/plugins/netflow/v5/NetFlowV5ParserTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717

1818
import com.google.common.io.Resources;
1919
import com.google.common.net.InetAddresses;
20+
import io.netty.buffer.ByteBuf;
2021
import io.netty.buffer.Unpooled;
22+
import io.pkts.Pcap;
23+
import io.pkts.packet.UDPPacket;
24+
import io.pkts.protocol.Protocol;
2125
import org.junit.Test;
2226

2327
import java.io.IOException;
28+
import java.io.InputStream;
29+
import java.util.ArrayList;
2430
import java.util.List;
2531

32+
import static org.assertj.core.api.Assertions.assertThat;
2633
import static org.junit.Assert.assertEquals;
2734
import static org.junit.Assert.assertNotNull;
2835

@@ -119,4 +126,44 @@ public void testParse2() throws IOException {
119126
assertEquals(0, r.dstMask());
120127
assertEquals(110L, r.packetCount());
121128
}
129+
130+
@Test
131+
public void pcap_softflowd_NetFlowV5() throws Exception {
132+
final List<NetFlowV5Record> allRecords = new ArrayList<>();
133+
try (InputStream inputStream = Resources.getResource("netflow-data/netflow5.pcap").openStream()) {
134+
final Pcap pcap = Pcap.openStream(inputStream);
135+
pcap.loop(packet -> {
136+
if (packet.hasProtocol(Protocol.UDP)) {
137+
final UDPPacket udp = (UDPPacket) packet.getPacket(Protocol.UDP);
138+
final ByteBuf byteBuf = Unpooled.wrappedBuffer(udp.getPayload().getArray());
139+
final NetFlowV5Packet netFlowV5Packet = NetFlowV5Parser.parsePacket(byteBuf);
140+
assertThat(netFlowV5Packet).isNotNull();
141+
allRecords.addAll(netFlowV5Packet.records());
142+
}
143+
return true;
144+
}
145+
);
146+
}
147+
assertThat(allRecords).hasSize(4);
148+
}
149+
150+
@Test
151+
public void pcap_pmacctd_NetFlowV5() throws Exception {
152+
final List<NetFlowV5Record> allRecords = new ArrayList<>();
153+
try (InputStream inputStream = Resources.getResource("netflow-data/pmacctd-netflow5.pcap").openStream()) {
154+
final Pcap pcap = Pcap.openStream(inputStream);
155+
pcap.loop(packet -> {
156+
if (packet.hasProtocol(Protocol.UDP)) {
157+
final UDPPacket udp = (UDPPacket) packet.getPacket(Protocol.UDP);
158+
final ByteBuf byteBuf = Unpooled.wrappedBuffer(udp.getPayload().getArray());
159+
final NetFlowV5Packet netFlowV5Packet = NetFlowV5Parser.parsePacket(byteBuf);
160+
assertThat(netFlowV5Packet).isNotNull();
161+
allRecords.addAll(netFlowV5Packet.records());
162+
}
163+
return true;
164+
}
165+
);
166+
}
167+
assertThat(allRecords).hasSize(42);
168+
}
122169
}

0 commit comments

Comments
 (0)