Skip to content
This repository was archived by the owner on Mar 21, 2023. It is now read-only.

Commit 4e9d68f

Browse files
kroepkebernd
authored andcommitted
Improved NetFlowV9 support (#21)
* move template caching and flow buffering in custom message aggregator much of this is still wip * remove template cache * wip for codec aggregation and parsing custom format of v9 currently fixing tests * wip * migrate tests * fix v9 parsing by preserving the complete packets during buffering aggregating the data flowsets does not work, because all records are based on the packet's timestamp to simplify parsing in the codec, the aggregator now collects all templates/option templates into a protobuf and adds received and buffered data flows to be parsed. the netflow packets are preserved completely and can also contain templates. the codec will not use them, though, only the aggregator does * remove duplicated license header * update protobuf comment * update comment * tweak license header to avoid diff * fix guice setup for transport * fix handler setup the codec-aggregator is null in super class, so that the put call added it after the raw-message handler thus the code never ran and screwed up parsing * change how the packet cache works the previous implementation only checked for a single template id to be present for each packet, which in general is wrong if not all templates arrive at the same time (which might happen for large numbers of active templates) the new implementation manually checks each packet's template requirements agains the ids of received templates, for the current remoteaddress/source id combination * prefix netflow v9 fields with nf_ * remove unused optional template atomic ref * don't prefix unknown fields names, that is done centrally now fixes double prefixing of fields * don't forget to fix test * add flow timestamp fields from ipfix * fix test after field definition list update
1 parent f00cf72 commit 4e9d68f

23 files changed

+2804
-1270
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@
7272
<artifactId>jackson-dataformat-yaml</artifactId>
7373
</dependency>
7474

75+
<dependency>
76+
<groupId>com.google.protobuf</groupId>
77+
<artifactId>protobuf-java</artifactId>
78+
<version>${protobuf.version}</version>
79+
<scope>provided</scope>
80+
</dependency>
81+
7582
<dependency>
7683
<groupId>junit</groupId>
7784
<artifactId>junit</artifactId>

src/main/java/org/graylog/plugins/netflow/NetFlowPluginModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.graylog.plugins.netflow.codecs.NetFlowCodec;
2323
import org.graylog.plugins.netflow.inputs.NetFlowUdpInput;
24+
import org.graylog.plugins.netflow.transport.NetFlowUdpTransport;
2425
import org.graylog2.plugin.PluginConfigBean;
2526
import org.graylog2.plugin.PluginModule;
2627

@@ -37,5 +38,6 @@ public Set<? extends PluginConfigBean> getConfigBeans() {
3738
protected void configure() {
3839
addMessageInput(NetFlowUdpInput.class);
3940
addCodec("netflow", NetFlowCodec.class);
41+
addTransport("netflow-udp", NetFlowUdpTransport.class);
4042
}
4143
}

src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java

Lines changed: 109 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,74 +16,74 @@
1616
*/
1717
package org.graylog.plugins.netflow.codecs;
1818

19-
import com.fasterxml.jackson.databind.ObjectMapper;
2019
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.collect.Maps;
2121
import com.google.inject.assistedinject.Assisted;
22-
import org.apache.commons.lang3.SystemUtils;
22+
import com.google.protobuf.InvalidProtocolBufferException;
23+
import io.netty.buffer.ByteBuf;
24+
import io.netty.buffer.ByteBufUtil;
25+
import io.netty.buffer.Unpooled;
2326
import org.graylog.plugins.netflow.flows.FlowException;
24-
import org.graylog.plugins.netflow.flows.NetFlowParser;
27+
import org.graylog.plugins.netflow.flows.NetFlowFormatter;
28+
import org.graylog.plugins.netflow.v5.NetFlowV5Packet;
29+
import org.graylog.plugins.netflow.v5.NetFlowV5Parser;
2530
import org.graylog.plugins.netflow.v9.NetFlowV9FieldTypeRegistry;
26-
import org.graylog.plugins.netflow.v9.NetFlowV9TemplateCache;
31+
import org.graylog.plugins.netflow.v9.NetFlowV9Journal;
32+
import org.graylog.plugins.netflow.v9.NetFlowV9OptionTemplate;
33+
import org.graylog.plugins.netflow.v9.NetFlowV9Packet;
34+
import org.graylog.plugins.netflow.v9.NetFlowV9Parser;
35+
import org.graylog.plugins.netflow.v9.NetFlowV9Record;
36+
import org.graylog.plugins.netflow.v9.NetFlowV9Template;
2737
import org.graylog2.plugin.Message;
38+
import org.graylog2.plugin.ResolvableInetSocketAddress;
2839
import org.graylog2.plugin.configuration.Configuration;
2940
import org.graylog2.plugin.configuration.ConfigurationRequest;
3041
import org.graylog2.plugin.configuration.fields.ConfigurationField;
31-
import org.graylog2.plugin.configuration.fields.NumberField;
3242
import org.graylog2.plugin.configuration.fields.TextField;
3343
import org.graylog2.plugin.inputs.annotations.Codec;
3444
import org.graylog2.plugin.inputs.annotations.ConfigClass;
3545
import org.graylog2.plugin.inputs.annotations.FactoryClass;
3646
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
47+
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
3748
import org.graylog2.plugin.inputs.codecs.MultiMessageCodec;
3849
import org.graylog2.plugin.inputs.transports.NettyTransport;
3950
import org.graylog2.plugin.journal.RawMessage;
51+
import org.graylog2.shared.utilities.ExceptionUtils;
4052
import org.slf4j.Logger;
4153
import org.slf4j.LoggerFactory;
4254

4355
import javax.annotation.Nonnull;
4456
import javax.annotation.Nullable;
4557
import javax.inject.Inject;
46-
import javax.inject.Named;
4758
import java.io.FileInputStream;
4859
import java.io.IOException;
4960
import java.io.InputStream;
50-
import java.nio.file.Path;
51-
import java.nio.file.Paths;
61+
import java.net.InetSocketAddress;
5262
import java.util.Collection;
53-
import java.util.concurrent.ScheduledExecutorService;
63+
import java.util.List;
64+
import java.util.Map;
65+
import java.util.stream.Collectors;
5466

5567
@Codec(name = "netflow", displayName = "NetFlow")
5668
public class NetFlowCodec extends AbstractCodec implements MultiMessageCodec {
57-
private static final Logger LOG = LoggerFactory.getLogger(NetFlowCodec.class);
58-
59-
@VisibleForTesting
60-
static final String CK_CACHE_SIZE = "cache_size";
61-
@VisibleForTesting
62-
static final String CK_CACHE_PATH = "cache_path";
63-
@VisibleForTesting
64-
static final String CK_CACHE_SAVE_INTERVAL = "cache_save_interval";
69+
/**
70+
* Marker byte which signals that the contained netflow packet should be parsed as is.
71+
*/
72+
public static final byte PASSTHROUGH_MARKER = 0x00;
73+
/**
74+
* Marker byte which signals that the contained netflow v9 packet is non-RFC:
75+
* It contains all necessary template flows before any data flows and can be completely parsed without a template cache.
76+
*/
77+
public static final byte ORDERED_V9_MARKER = 0x01;
6578
@VisibleForTesting
6679
static final String CK_NETFLOW9_DEFINITION_PATH = "netflow9_definitions_Path";
67-
68-
private static final int DEFAULT_CACHE_SIZE = 1000;
69-
private static final String DEFAULT_CACHE_PATH = SystemUtils.getJavaIoTmpDir().toPath().resolve("netflow-templates.json").toString();
70-
private static final int DEFAULT_CACHE_SAVE_INTERVAL = 15 * 60;
71-
72-
private final NetFlowV9TemplateCache templateCache;
80+
private static final Logger LOG = LoggerFactory.getLogger(NetFlowCodec.class);
7381
private final NetFlowV9FieldTypeRegistry typeRegistry;
7482

7583
@Inject
76-
protected NetFlowCodec(@Assisted Configuration configuration,
77-
@Named("daemonScheduler") ScheduledExecutorService scheduler,
78-
ObjectMapper objectMapper) throws IOException {
84+
protected NetFlowCodec(@Assisted Configuration configuration) throws IOException {
7985
super(configuration);
8086

81-
final int cacheSize = configuration.getInt(CK_CACHE_SIZE, DEFAULT_CACHE_SIZE);
82-
final int cacheSaveInterval = configuration.getInt(CK_CACHE_SAVE_INTERVAL, DEFAULT_CACHE_SAVE_INTERVAL);
83-
final String configCachePath = configuration.getString(CK_CACHE_PATH, DEFAULT_CACHE_PATH);
84-
final Path cachePath = Paths.get(configCachePath);
85-
this.templateCache = new NetFlowV9TemplateCache(cacheSize, cachePath, cacheSaveInterval, scheduler, objectMapper);
86-
8787
final String netFlow9DefinitionsPath = configuration.getString(CK_NETFLOW9_DEFINITION_PATH);
8888
if (netFlow9DefinitionsPath == null || netFlow9DefinitionsPath.trim().isEmpty()) {
8989
this.typeRegistry = NetFlowV9FieldTypeRegistry.create();
@@ -94,6 +94,13 @@ protected NetFlowCodec(@Assisted Configuration configuration,
9494
}
9595
}
9696

97+
@Nullable
98+
@Override
99+
public CodecAggregator getAggregator() {
100+
// this is intentional: we replace the entire channel handler in NetFlowUdpTransport because we need a different signature
101+
return null;
102+
}
103+
97104
@Nullable
98105
@Override
99106
public Message decode(@Nonnull RawMessage rawMessage) {
@@ -104,13 +111,82 @@ public Message decode(@Nonnull RawMessage rawMessage) {
104111
@Override
105112
public Collection<Message> decodeMessages(@Nonnull RawMessage rawMessage) {
106113
try {
107-
return NetFlowParser.parse(rawMessage, templateCache, typeRegistry);
114+
final ResolvableInetSocketAddress remoteAddress = rawMessage.getRemoteAddress();
115+
final InetSocketAddress sender = remoteAddress != null ? remoteAddress.getInetSocketAddress() : null;
116+
117+
final byte[] payload = rawMessage.getPayload();
118+
if (payload.length < 3) {
119+
LOG.debug("NetFlow message (source: {}) doesn't even fit the NetFlow version (size: {} bytes)",
120+
sender, payload.length);
121+
return null;
122+
}
123+
124+
final ByteBuf buffer = Unpooled.wrappedBuffer(payload);
125+
switch (buffer.readByte()) {
126+
case PASSTHROUGH_MARKER:
127+
final NetFlowV5Packet netFlowV5Packet = NetFlowV5Parser.parsePacket(buffer);
128+
129+
return netFlowV5Packet.records().stream()
130+
.map(record -> NetFlowFormatter.toMessage(netFlowV5Packet.header(), record, sender))
131+
.collect(Collectors.toList());
132+
case ORDERED_V9_MARKER:
133+
// our "custom" netflow v9 that has all the templates in the same packet
134+
return decodeV9(sender, buffer);
135+
default:
136+
final List<RawMessage.SourceNode> sourceNodes = rawMessage.getSourceNodes();
137+
final RawMessage.SourceNode sourceNode = sourceNodes.isEmpty() ? null : sourceNodes.get(sourceNodes.size() - 1);
138+
final String inputId = sourceNode == null ? "<unknown>" : sourceNode.inputId;
139+
LOG.warn("Unsupported NetFlow packet on input {} (source: {})", inputId, sender);
140+
return null;
141+
}
108142
} catch (FlowException e) {
109143
LOG.error("Error parsing NetFlow packet <{}> received from <{}>", rawMessage.getId(), rawMessage.getRemoteAddress(), e);
144+
if (LOG.isDebugEnabled()) {
145+
LOG.debug("NetFlow packet hexdump:\n{}", ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(rawMessage.getPayload())));
146+
}
147+
return null;
148+
} catch (InvalidProtocolBufferException e) {
149+
LOG.error("Invalid NetFlowV9 entry found, cannot parse the messages", ExceptionUtils.getRootCause(e));
110150
return null;
111151
}
112152
}
113153

154+
@VisibleForTesting
155+
Collection<Message> decodeV9(InetSocketAddress sender, ByteBuf buffer) throws InvalidProtocolBufferException {
156+
final List<NetFlowV9Packet> netFlowV9Packets = decodeV9Packets(buffer);
157+
158+
return netFlowV9Packets.stream().map(netFlowV9Packet -> netFlowV9Packet.records().stream()
159+
.filter(record -> record instanceof NetFlowV9Record)
160+
.map(record -> NetFlowFormatter.toMessage(netFlowV9Packet.header(), record, sender))
161+
.collect(Collectors.toList())
162+
).flatMap(Collection::stream)
163+
.collect(Collectors.toList());
164+
}
165+
166+
@VisibleForTesting
167+
List<NetFlowV9Packet> decodeV9Packets(ByteBuf buffer) throws InvalidProtocolBufferException {
168+
byte[] v9JournalEntry = new byte[buffer.readableBytes()];
169+
buffer.readBytes(v9JournalEntry);
170+
final NetFlowV9Journal.RawNetflowV9 rawNetflowV9 = NetFlowV9Journal.RawNetflowV9.parseFrom(v9JournalEntry);
171+
172+
// parse all templates used in the packet
173+
final Map<Integer, NetFlowV9Template> templateMap = Maps.newHashMap();
174+
rawNetflowV9.getTemplatesMap().forEach((templateId, byteString) -> {
175+
final NetFlowV9Template netFlowV9Template = NetFlowV9Parser.parseTemplate(
176+
Unpooled.wrappedBuffer(byteString.toByteArray()), typeRegistry);
177+
templateMap.put(templateId, netFlowV9Template);
178+
});
179+
final NetFlowV9OptionTemplate[] optionTemplate = {null};
180+
rawNetflowV9.getOptionTemplateMap().forEach((templateId, byteString) -> {
181+
optionTemplate[0] = NetFlowV9Parser.parseOptionTemplate(Unpooled.wrappedBuffer(byteString.toByteArray()), typeRegistry);
182+
});
183+
184+
return rawNetflowV9.getPacketsList().stream()
185+
.map(bytes -> Unpooled.wrappedBuffer(bytes.toByteArray()))
186+
.map(buf -> NetFlowV9Parser.parsePacket(buf, typeRegistry, templateMap, optionTemplate[0]))
187+
.collect(Collectors.toList());
188+
}
189+
114190
@FactoryClass
115191
public interface Factory extends AbstractCodec.Factory<NetFlowCodec> {
116192
@Override
@@ -132,12 +208,7 @@ public void overrideDefaultValues(@Nonnull ConfigurationRequest cr) {
132208
@Override
133209
public ConfigurationRequest getRequestedConfiguration() {
134210
final ConfigurationRequest configuration = super.getRequestedConfiguration();
135-
136-
configuration.addField(new NumberField(CK_CACHE_SIZE, "Maximum cache size", DEFAULT_CACHE_SIZE, "Maximum number of elements in the NetFlow 9 template cache", ConfigurationField.Optional.OPTIONAL));
137-
configuration.addField(new TextField(CK_CACHE_PATH, "Cache file path", DEFAULT_CACHE_PATH, "Path to the file persisting the the NetFlow 9 template cache", ConfigurationField.Optional.OPTIONAL));
138-
configuration.addField(new NumberField(CK_CACHE_SAVE_INTERVAL, "Cache save interval (seconds)", DEFAULT_CACHE_SAVE_INTERVAL, "Interval in seconds for persisting the cache contents", ConfigurationField.Optional.OPTIONAL));
139211
configuration.addField(new TextField(CK_NETFLOW9_DEFINITION_PATH, "Netflow 9 field definitions", "", "Path to the YAML file containing Netflow 9 field definitions", ConfigurationField.Optional.OPTIONAL));
140-
141212
return configuration;
142213
}
143214
}

0 commit comments

Comments
 (0)