Skip to content
This repository was archived by the owner on Mar 21, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.graylog.plugins.netflow.codecs.NetFlowCodec;
import org.graylog.plugins.netflow.inputs.NetFlowUdpInput;
import org.graylog.plugins.netflow.transport.NetFlowUdpTransport;
import org.graylog2.plugin.PluginConfigBean;
import org.graylog2.plugin.PluginModule;

Expand All @@ -37,5 +38,6 @@ public Set<? extends PluginConfigBean> getConfigBeans() {
protected void configure() {
addMessageInput(NetFlowUdpInput.class);
addCodec("netflow", NetFlowCodec.class);
addTransport("netflow-udp", NetFlowUdpTransport.class);
}
}
147 changes: 109 additions & 38 deletions src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,74 +16,74 @@
*/
package org.graylog.plugins.netflow.codecs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang3.SystemUtils;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.graylog.plugins.netflow.flows.FlowException;
import org.graylog.plugins.netflow.flows.NetFlowParser;
import org.graylog.plugins.netflow.flows.NetFlowFormatter;
import org.graylog.plugins.netflow.v5.NetFlowV5Packet;
import org.graylog.plugins.netflow.v5.NetFlowV5Parser;
import org.graylog.plugins.netflow.v9.NetFlowV9FieldTypeRegistry;
import org.graylog.plugins.netflow.v9.NetFlowV9TemplateCache;
import org.graylog.plugins.netflow.v9.NetFlowV9Journal;
import org.graylog.plugins.netflow.v9.NetFlowV9OptionTemplate;
import org.graylog.plugins.netflow.v9.NetFlowV9Packet;
import org.graylog.plugins.netflow.v9.NetFlowV9Parser;
import org.graylog.plugins.netflow.v9.NetFlowV9Record;
import org.graylog.plugins.netflow.v9.NetFlowV9Template;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ResolvableInetSocketAddress;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.annotations.Codec;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.codecs.MultiMessageCodec;
import org.graylog2.plugin.inputs.transports.NettyTransport;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Codec(name = "netflow", displayName = "NetFlow")
public class NetFlowCodec extends AbstractCodec implements MultiMessageCodec {
private static final Logger LOG = LoggerFactory.getLogger(NetFlowCodec.class);

@VisibleForTesting
static final String CK_CACHE_SIZE = "cache_size";
@VisibleForTesting
static final String CK_CACHE_PATH = "cache_path";
@VisibleForTesting
static final String CK_CACHE_SAVE_INTERVAL = "cache_save_interval";
/**
* Marker byte which signals that the contained netflow packet should be parsed as is.
*/
public static final byte PASSTHROUGH_MARKER = 0x00;
/**
* Marker byte which signals that the contained netflow v9 packet is non-RFC:
* It contains all necessary template flows before any data flows and can be completely parsed without a template cache.
*/
public static final byte ORDERED_V9_MARKER = 0x01;
@VisibleForTesting
static final String CK_NETFLOW9_DEFINITION_PATH = "netflow9_definitions_Path";

private static final int DEFAULT_CACHE_SIZE = 1000;
private static final String DEFAULT_CACHE_PATH = SystemUtils.getJavaIoTmpDir().toPath().resolve("netflow-templates.json").toString();
private static final int DEFAULT_CACHE_SAVE_INTERVAL = 15 * 60;

private final NetFlowV9TemplateCache templateCache;
private static final Logger LOG = LoggerFactory.getLogger(NetFlowCodec.class);
private final NetFlowV9FieldTypeRegistry typeRegistry;

@Inject
protected NetFlowCodec(@Assisted Configuration configuration,
@Named("daemonScheduler") ScheduledExecutorService scheduler,
ObjectMapper objectMapper) throws IOException {
protected NetFlowCodec(@Assisted Configuration configuration) throws IOException {
super(configuration);

final int cacheSize = configuration.getInt(CK_CACHE_SIZE, DEFAULT_CACHE_SIZE);
final int cacheSaveInterval = configuration.getInt(CK_CACHE_SAVE_INTERVAL, DEFAULT_CACHE_SAVE_INTERVAL);
final String configCachePath = configuration.getString(CK_CACHE_PATH, DEFAULT_CACHE_PATH);
final Path cachePath = Paths.get(configCachePath);
this.templateCache = new NetFlowV9TemplateCache(cacheSize, cachePath, cacheSaveInterval, scheduler, objectMapper);

final String netFlow9DefinitionsPath = configuration.getString(CK_NETFLOW9_DEFINITION_PATH);
if (netFlow9DefinitionsPath == null || netFlow9DefinitionsPath.trim().isEmpty()) {
this.typeRegistry = NetFlowV9FieldTypeRegistry.create();
Expand All @@ -94,6 +94,13 @@ protected NetFlowCodec(@Assisted Configuration configuration,
}
}

@Nullable
@Override
public CodecAggregator getAggregator() {
// this is intentional: we replace the entire channel handler in NetFlowUdpTransport because we need a different signature
return null;
}

@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
Expand All @@ -104,13 +111,82 @@ public Message decode(@Nonnull RawMessage rawMessage) {
@Override
public Collection<Message> decodeMessages(@Nonnull RawMessage rawMessage) {
try {
return NetFlowParser.parse(rawMessage, templateCache, typeRegistry);
final ResolvableInetSocketAddress remoteAddress = rawMessage.getRemoteAddress();
final InetSocketAddress sender = remoteAddress != null ? remoteAddress.getInetSocketAddress() : null;

final byte[] payload = rawMessage.getPayload();
if (payload.length < 3) {
LOG.debug("NetFlow message (source: {}) doesn't even fit the NetFlow version (size: {} bytes)",
sender, payload.length);
return null;
}

final ByteBuf buffer = Unpooled.wrappedBuffer(payload);
switch (buffer.readByte()) {
case PASSTHROUGH_MARKER:
final NetFlowV5Packet netFlowV5Packet = NetFlowV5Parser.parsePacket(buffer);

return netFlowV5Packet.records().stream()
.map(record -> NetFlowFormatter.toMessage(netFlowV5Packet.header(), record, sender))
.collect(Collectors.toList());
case ORDERED_V9_MARKER:
// our "custom" netflow v9 that has all the templates in the same packet
return decodeV9(sender, buffer);
default:
final List<RawMessage.SourceNode> sourceNodes = rawMessage.getSourceNodes();
final RawMessage.SourceNode sourceNode = sourceNodes.isEmpty() ? null : sourceNodes.get(sourceNodes.size() - 1);
final String inputId = sourceNode == null ? "<unknown>" : sourceNode.inputId;
LOG.warn("Unsupported NetFlow packet on input {} (source: {})", inputId, sender);
return null;
}
} catch (FlowException e) {
LOG.error("Error parsing NetFlow packet <{}> received from <{}>", rawMessage.getId(), rawMessage.getRemoteAddress(), e);
if (LOG.isDebugEnabled()) {
LOG.debug("NetFlow packet hexdump:\n{}", ByteBufUtil.prettyHexDump(Unpooled.wrappedBuffer(rawMessage.getPayload())));
}
return null;
} catch (InvalidProtocolBufferException e) {
LOG.error("Invalid NetFlowV9 entry found, cannot parse the messages", ExceptionUtils.getRootCause(e));
return null;
}
}

@VisibleForTesting
Collection<Message> decodeV9(InetSocketAddress sender, ByteBuf buffer) throws InvalidProtocolBufferException {
final List<NetFlowV9Packet> netFlowV9Packets = decodeV9Packets(buffer);

return netFlowV9Packets.stream().map(netFlowV9Packet -> netFlowV9Packet.records().stream()
.filter(record -> record instanceof NetFlowV9Record)
.map(record -> NetFlowFormatter.toMessage(netFlowV9Packet.header(), record, sender))
.collect(Collectors.toList())
).flatMap(Collection::stream)
.collect(Collectors.toList());
}

@VisibleForTesting
List<NetFlowV9Packet> decodeV9Packets(ByteBuf buffer) throws InvalidProtocolBufferException {
byte[] v9JournalEntry = new byte[buffer.readableBytes()];
buffer.readBytes(v9JournalEntry);
final NetFlowV9Journal.RawNetflowV9 rawNetflowV9 = NetFlowV9Journal.RawNetflowV9.parseFrom(v9JournalEntry);

// parse all templates used in the packet
final Map<Integer, NetFlowV9Template> templateMap = Maps.newHashMap();
rawNetflowV9.getTemplatesMap().forEach((templateId, byteString) -> {
final NetFlowV9Template netFlowV9Template = NetFlowV9Parser.parseTemplate(
Unpooled.wrappedBuffer(byteString.toByteArray()), typeRegistry);
templateMap.put(templateId, netFlowV9Template);
});
final NetFlowV9OptionTemplate[] optionTemplate = {null};
rawNetflowV9.getOptionTemplateMap().forEach((templateId, byteString) -> {
optionTemplate[0] = NetFlowV9Parser.parseOptionTemplate(Unpooled.wrappedBuffer(byteString.toByteArray()), typeRegistry);
});

return rawNetflowV9.getPacketsList().stream()
.map(bytes -> Unpooled.wrappedBuffer(bytes.toByteArray()))
.map(buf -> NetFlowV9Parser.parsePacket(buf, typeRegistry, templateMap, optionTemplate[0]))
.collect(Collectors.toList());
}

@FactoryClass
public interface Factory extends AbstractCodec.Factory<NetFlowCodec> {
@Override
Expand All @@ -132,12 +208,7 @@ public void overrideDefaultValues(@Nonnull ConfigurationRequest cr) {
@Override
public ConfigurationRequest getRequestedConfiguration() {
final ConfigurationRequest configuration = super.getRequestedConfiguration();

configuration.addField(new NumberField(CK_CACHE_SIZE, "Maximum cache size", DEFAULT_CACHE_SIZE, "Maximum number of elements in the NetFlow 9 template cache", ConfigurationField.Optional.OPTIONAL));
configuration.addField(new TextField(CK_CACHE_PATH, "Cache file path", DEFAULT_CACHE_PATH, "Path to the file persisting the the NetFlow 9 template cache", ConfigurationField.Optional.OPTIONAL));
configuration.addField(new NumberField(CK_CACHE_SAVE_INTERVAL, "Cache save interval (seconds)", DEFAULT_CACHE_SAVE_INTERVAL, "Interval in seconds for persisting the cache contents", ConfigurationField.Optional.OPTIONAL));
configuration.addField(new TextField(CK_NETFLOW9_DEFINITION_PATH, "Netflow 9 field definitions", "", "Path to the YAML file containing Netflow 9 field definitions", ConfigurationField.Optional.OPTIONAL));

return configuration;
}
}
Expand Down
Loading