Skip to content
This repository was archived by the owner on Mar 21, 2023. It is now read-only.

Commit 2f3a98b

Browse files
author
Jochen Schalanda
committed
Add support for reading Logstash NetFlow definitions
(cherry picked from commit 3dfd948)
1 parent 7dc83da commit 2f3a98b

File tree

12 files changed

+289
-328
lines changed

12 files changed

+289
-328
lines changed

pom.xml

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@
4242
<graylog.plugin-dir>/usr/share/graylog-server/plugin</graylog.plugin-dir>
4343
</properties>
4444

45+
<dependencyManagement>
46+
<dependencies>
47+
<dependency>
48+
<groupId>com.fasterxml.jackson</groupId>
49+
<artifactId>jackson-bom</artifactId>
50+
<version>${jackson.version}</version>
51+
<type>pom</type>
52+
<scope>import</scope>
53+
</dependency>
54+
</dependencies>
55+
</dependencyManagement>
56+
4557
<dependencies>
4658
<dependency>
4759
<groupId>org.graylog2</groupId>
@@ -55,6 +67,10 @@
5567
<version>${auto-value.version}</version>
5668
<scope>provided</scope>
5769
</dependency>
70+
<dependency>
71+
<groupId>com.fasterxml.jackson.dataformat</groupId>
72+
<artifactId>jackson-dataformat-yaml</artifactId>
73+
</dependency>
5874

5975
<dependency>
6076
<groupId>junit</groupId>
@@ -68,12 +84,6 @@
6884
<version>${assertj-core.version}</version>
6985
<scope>test</scope>
7086
</dependency>
71-
<dependency>
72-
<groupId>com.fasterxml.jackson.dataformat</groupId>
73-
<artifactId>jackson-dataformat-yaml</artifactId>
74-
<version>${jackson.version}</version>
75-
<scope>test</scope>
76-
</dependency>
7787
<dependency>
7888
<groupId>io.pkts</groupId>
7989
<artifactId>pkts-core</artifactId>

src/main/java/org/graylog/plugins/netflow/codecs/NetFlowCodec.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
import javax.annotation.Nullable;
4545
import javax.inject.Inject;
4646
import javax.inject.Named;
47+
import java.io.FileInputStream;
48+
import java.io.IOException;
49+
import java.io.InputStream;
4750
import java.nio.file.Path;
4851
import java.nio.file.Paths;
4952
import java.util.Collection;
@@ -59,26 +62,36 @@ public class NetFlowCodec extends AbstractCodec implements MultiMessageCodec {
5962
static final String CK_CACHE_PATH = "cache_path";
6063
@VisibleForTesting
6164
static final String CK_CACHE_SAVE_INTERVAL = "cache_save_interval";
65+
@VisibleForTesting
66+
static final String CK_NETFLOW9_DEFINITION_PATH = "netflow9_definitions_Path";
6267

6368
private static final int DEFAULT_CACHE_SIZE = 1000;
6469
private static final String DEFAULT_CACHE_PATH = SystemUtils.getJavaIoTmpDir().toPath().resolve("netflow-templates.json").toString();
6570
private static final int DEFAULT_CACHE_SAVE_INTERVAL = 15 * 60;
6671

6772
private final NetFlowV9TemplateCache templateCache;
68-
private final NetFlowV9FieldTypeRegistry typeRegistry = new NetFlowV9FieldTypeRegistry();
73+
private final NetFlowV9FieldTypeRegistry typeRegistry;
6974

7075
@Inject
7176
protected NetFlowCodec(@Assisted Configuration configuration,
7277
@Named("daemonScheduler") ScheduledExecutorService scheduler,
73-
ObjectMapper objectMapper) {
78+
ObjectMapper objectMapper) throws IOException {
7479
super(configuration);
7580

7681
final int cacheSize = configuration.getInt(CK_CACHE_SIZE, DEFAULT_CACHE_SIZE);
7782
final int cacheSaveInterval = configuration.getInt(CK_CACHE_SAVE_INTERVAL, DEFAULT_CACHE_SAVE_INTERVAL);
7883
final String configCachePath = configuration.getString(CK_CACHE_PATH, DEFAULT_CACHE_PATH);
7984
final Path cachePath = Paths.get(configCachePath);
80-
81-
templateCache = new NetFlowV9TemplateCache(cacheSize, cachePath, cacheSaveInterval, scheduler, objectMapper);
85+
this.templateCache = new NetFlowV9TemplateCache(cacheSize, cachePath, cacheSaveInterval, scheduler, objectMapper);
86+
87+
final String netFlow9DefinitionsPath = configuration.getString(CK_NETFLOW9_DEFINITION_PATH);
88+
if (netFlow9DefinitionsPath == null || netFlow9DefinitionsPath.trim().isEmpty()) {
89+
this.typeRegistry = NetFlowV9FieldTypeRegistry.create();
90+
} else {
91+
try (InputStream inputStream = new FileInputStream(netFlow9DefinitionsPath)) {
92+
this.typeRegistry = NetFlowV9FieldTypeRegistry.create(inputStream);
93+
}
94+
}
8295
}
8396

8497
@Nullable
@@ -120,9 +133,10 @@ public void overrideDefaultValues(@Nonnull ConfigurationRequest cr) {
120133
public ConfigurationRequest getRequestedConfiguration() {
121134
final ConfigurationRequest configuration = super.getRequestedConfiguration();
122135

123-
configuration.addField(new NumberField(CK_CACHE_SIZE, "Maximum cache size", DEFAULT_CACHE_SIZE, "Maximum number of elements in the NetFlow9 template cache", ConfigurationField.Optional.OPTIONAL));
124-
configuration.addField(new TextField(CK_CACHE_PATH, "Cache file path", DEFAULT_CACHE_PATH, "Path to the file persisting the the NetFlow9 template cache", ConfigurationField.Optional.OPTIONAL));
136+
configuration.addField(new NumberField(CK_CACHE_SIZE, "Maximum cache size", DEFAULT_CACHE_SIZE, "Maximum number of elements in the NetFlow 9 template cache", ConfigurationField.Optional.OPTIONAL));
137+
configuration.addField(new TextField(CK_CACHE_PATH, "Cache file path", DEFAULT_CACHE_PATH, "Path to the file persisting the the NetFlow 9 template cache", ConfigurationField.Optional.OPTIONAL));
125138
configuration.addField(new NumberField(CK_CACHE_SAVE_INTERVAL, "Cache save interval (seconds)", DEFAULT_CACHE_SAVE_INTERVAL, "Interval in seconds for persisting the cache contents", ConfigurationField.Optional.OPTIONAL));
139+
configuration.addField(new TextField(CK_NETFLOW9_DEFINITION_PATH, "Netflow 9 field definitions", "", "Path to the YAML file containing Netflow 9 field definitions", ConfigurationField.Optional.OPTIONAL));
126140

127141
return configuration;
128142
}

src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldDef.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.auto.value.AutoValue;
2222
import io.netty.buffer.ByteBuf;
2323

24+
import java.math.BigInteger;
2425
import java.net.InetAddress;
2526
import java.net.UnknownHostException;
2627
import java.nio.charset.StandardCharsets;
@@ -52,10 +53,18 @@ public Optional<Object> parse(ByteBuf bb) {
5253
return Optional.of(bb.readUnsignedShort());
5354
case INT16:
5455
return Optional.of(bb.readShort());
56+
case UINT24:
57+
return Optional.of(bb.readUnsignedMedium());
58+
case INT24:
59+
return Optional.of(bb.readMedium());
5560
case UINT32:
5661
return Optional.of(bb.readUnsignedInt());
5762
case INT32:
5863
return Optional.of(bb.readInt());
64+
case UINT64:
65+
byte[] uint64Bytes = new byte[8];
66+
bb.readBytes(uint64Bytes);
67+
return Optional.of(new BigInteger(uint64Bytes));
5968
case INT64:
6069
return Optional.of(bb.readLong());
6170
case VARINT:

src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9FieldType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public static NetFlowV9FieldType create(@JsonProperty("id") int id,
4040
}
4141

4242
public enum ValueType {
43-
UINT8(1), INT8(1), UINT16(2), INT16(2), UINT32(4), INT32(4), INT64(8), IPV4(4), IPV6(16), MAC(6), STRING(0), VARINT(0);
43+
UINT8(1), INT8(1), UINT16(2), INT16(2), UINT24(3), INT24(3),
44+
UINT32(4), INT32(4), UINT64(8), INT64(8), IPV4(4), IPV6(16),
45+
MAC(6), STRING(0), VARINT(0);
4446

4547
private final int defaultLength;
4648

Lines changed: 159 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,178 @@
11
package org.graylog.plugins.netflow.v9;
22

3-
import com.google.common.base.Splitter;
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
46
import com.google.common.collect.ImmutableMap;
57
import com.google.common.io.Resources;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
610

711
import java.io.IOException;
8-
import java.io.UncheckedIOException;
12+
import java.io.InputStream;
913
import java.net.URL;
10-
import java.nio.charset.StandardCharsets;
11-
import java.util.List;
14+
import java.util.Iterator;
1215
import java.util.Map;
1316

1417
public class NetFlowV9FieldTypeRegistry {
18+
private static final Logger LOG = LoggerFactory.getLogger(NetFlowV9FieldTypeRegistry.class);
19+
private static final String DEFAULT_DEFINITIONS = "/netflow9.yml";
20+
1521
private final Map<Integer, NetFlowV9FieldType> fieldTypes;
1622

17-
public NetFlowV9FieldTypeRegistry() {
18-
final URL url = Resources.getResource(this.getClass(), "/netflow9.csv");
19-
final List<String> lines;
23+
private NetFlowV9FieldTypeRegistry(InputStream definitions) throws IOException {
24+
this(definitions, new ObjectMapper(new YAMLFactory()));
25+
}
26+
27+
private NetFlowV9FieldTypeRegistry(InputStream definitions, ObjectMapper yamlMapper) throws IOException {
2028
try {
21-
lines = Resources.readLines(url, StandardCharsets.UTF_8);
22-
} catch (IOException e) {
23-
throw new UncheckedIOException(e);
29+
this.fieldTypes = parseYaml(definitions, yamlMapper);
30+
} catch (Exception e) {
31+
throw new IllegalArgumentException("Unable to parse NetFlow 9 definitions", e);
32+
}
33+
}
34+
35+
public static NetFlowV9FieldTypeRegistry create() throws IOException {
36+
final URL url = Resources.getResource(NetFlowV9FieldTypeRegistry.class, DEFAULT_DEFINITIONS);
37+
try (InputStream inputStream = url.openStream()) {
38+
return new NetFlowV9FieldTypeRegistry(inputStream);
39+
}
40+
}
41+
42+
public static NetFlowV9FieldTypeRegistry create(InputStream definitions) throws IOException {
43+
return new NetFlowV9FieldTypeRegistry(definitions);
44+
}
45+
46+
private static Map<Integer, NetFlowV9FieldType> parseYaml(InputStream inputStream, ObjectMapper yamlMapper) throws IOException {
47+
final JsonNode node = yamlMapper.readValue(inputStream, JsonNode.class);
48+
final ImmutableMap.Builder<Integer, NetFlowV9FieldType> mapBuilder = ImmutableMap.builder();
49+
final Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
50+
while (fields.hasNext()) {
51+
final Map.Entry<String, JsonNode> field = fields.next();
52+
53+
final Integer id;
54+
try {
55+
id = Integer.parseInt(field.getKey());
56+
} catch (NumberFormatException e) {
57+
LOG.debug("Skipping record with invalid id: {}", field.getKey(), e);
58+
continue;
59+
}
60+
61+
final JsonNode value = field.getValue();
62+
63+
if (!value.isArray()) {
64+
LOG.debug("Skipping invalid record: {}", field);
65+
continue;
66+
}
67+
68+
if (value.size() == 1 && ":skip".equals(value.get(0).asText())) {
69+
LOG.debug("Skipping record: {}", field);
70+
continue;
71+
}
72+
73+
if (value.size() != 2) {
74+
LOG.debug("Skipping incomplete record: {}", field);
75+
continue;
76+
}
77+
78+
final JsonNode typeNode = value.get(0);
79+
final NetFlowV9FieldType.ValueType type;
80+
if (typeNode.isTextual()) {
81+
type = symbolToValueType(typeNode.asText());
82+
} else if (typeNode.isInt()) {
83+
type = intToValueType(typeNode.asInt());
84+
} else {
85+
LOG.debug("Skipping invalid record type: {}", field);
86+
continue;
87+
}
88+
89+
final JsonNode nameNode = value.get(1);
90+
if (!nameNode.isTextual()) {
91+
LOG.debug("Skipping invalid record type: {}", field);
92+
continue;
93+
}
94+
95+
final String symbol = nameNode.asText();
96+
final String name = rubySymbolToString(symbol);
97+
98+
mapBuilder.put(id, NetFlowV9FieldType.create(id, type, name));
99+
}
100+
101+
return mapBuilder.build();
102+
}
103+
104+
private static String rubySymbolToString(String symbol) {
105+
if (symbol.charAt(0) == ':') {
106+
return symbol.substring(1);
107+
} else {
108+
return symbol;
24109
}
110+
}
25111

26-
final Splitter splitter = Splitter.on(',').trimResults().omitEmptyStrings();
27-
final ImmutableMap.Builder<Integer, NetFlowV9FieldType> fieldTypesBuilder = ImmutableMap.builder();
28-
for (String line : lines) {
29-
final List<String> items = splitter.splitToList(line);
30-
final int id = Integer.parseInt(items.get(0));
31-
final String name = items.get(1);
32-
final NetFlowV9FieldType.ValueType type = NetFlowV9FieldType.ValueType.valueOf(items.get(2));
112+
private static NetFlowV9FieldType.ValueType symbolToValueType(String type) {
113+
switch (type) {
114+
case ":int8":
115+
return NetFlowV9FieldType.ValueType.INT8;
116+
case ":uint8":
117+
return NetFlowV9FieldType.ValueType.UINT8;
118+
case ":int16":
119+
return NetFlowV9FieldType.ValueType.INT16;
120+
case ":uint16":
121+
return NetFlowV9FieldType.ValueType.UINT16;
122+
case ":int24":
123+
return NetFlowV9FieldType.ValueType.INT24;
124+
case ":uint24":
125+
return NetFlowV9FieldType.ValueType.UINT24;
126+
case ":int32":
127+
return NetFlowV9FieldType.ValueType.INT32;
128+
case ":uint32":
129+
return NetFlowV9FieldType.ValueType.UINT32;
130+
case ":int64":
131+
return NetFlowV9FieldType.ValueType.INT64;
132+
case ":uint64":
133+
return NetFlowV9FieldType.ValueType.UINT64;
134+
case ":ip4_addr":
135+
return NetFlowV9FieldType.ValueType.IPV4;
136+
case ":ip6_addr":
137+
return NetFlowV9FieldType.ValueType.IPV6;
138+
case ":mac_addr":
139+
return NetFlowV9FieldType.ValueType.MAC;
140+
case ":string":
141+
return NetFlowV9FieldType.ValueType.STRING;
142+
// HACK: http://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9000935
143+
case ":forwarding_status":
144+
return NetFlowV9FieldType.ValueType.UINT8;
145+
// HACK: http://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9000991
146+
case ":application_id":
147+
return NetFlowV9FieldType.ValueType.VARINT;
148+
// HACK: http://www.cisco.com/c/en/us/td/docs/security/asa/special/netflow/guide/asa_netflow.html#pgfId-1331620
149+
case ":acl_id_asa":
150+
return NetFlowV9FieldType.ValueType.VARINT;
151+
// HACK: https://www.iana.org/assignments/ipfix/ipfix.xml
152+
case "mpls_label_stack_octets":
153+
return NetFlowV9FieldType.ValueType.UINT32;
154+
default:
155+
LOG.debug("Unknown type: {}", type);
156+
return NetFlowV9FieldType.ValueType.STRING;
157+
}
158+
}
33159

34-
fieldTypesBuilder.put(id, NetFlowV9FieldType.create(id, type, name));
160+
private static NetFlowV9FieldType.ValueType intToValueType(int length) {
161+
switch (length) {
162+
case 1:
163+
return NetFlowV9FieldType.ValueType.UINT8;
164+
case 2:
165+
return NetFlowV9FieldType.ValueType.UINT16;
166+
case 3:
167+
return NetFlowV9FieldType.ValueType.UINT24;
168+
case 4:
169+
return NetFlowV9FieldType.ValueType.UINT32;
170+
case 8:
171+
return NetFlowV9FieldType.ValueType.UINT64;
172+
default:
173+
LOG.debug("Unknown type length: " + length);
174+
return NetFlowV9FieldType.ValueType.STRING;
35175
}
36-
this.fieldTypes = fieldTypesBuilder.build();
37176
}
38177

39178
public NetFlowV9FieldType get(int id) {
@@ -43,4 +182,5 @@ public NetFlowV9FieldType get(int id) {
43182
public Map<Integer, NetFlowV9FieldType> asMap() {
44183
return fieldTypes;
45184
}
185+
46186
}

0 commit comments

Comments
 (0)