From 5b0e893bbd7cc2a968333b3007e294f7adea97f6 Mon Sep 17 00:00:00 2001
From: mck
Date: Tue, 11 Mar 2014 10:53:59 +0100
Subject: [PATCH 1/3] Replace the BlockingQueue with a lmax disruptor. This
gives a significant performance increase in highly multithreaded jvms.
http://lmax-exchange.github.io/disruptor/
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
The lmax disruptor's ringBuffer size is initialised to 16k.
Rationale here is that statsd msgs are short strings, and sending 10k/s is a normal use-case.
If the Handler thread fails to send out the udp packets fast enough
(given it also squashes statsd messages into available MTU space)
and the ringBuffer becomes full then new statsd messages will be dropped and
a InsufficientCapacityException passed to the exception handler…
Exceptions from the lmax disruptor and handler are passed through to the exception handler.
Throwables from the lmax disruptor are logged as SEVERE.
---
pom.xml | 5 +
.../statsd/NonBlockingStatsDClient.java | 135 ++++++++++++++----
2 files changed, 109 insertions(+), 31 deletions(-)
diff --git a/pom.xml b/pom.xml
index 285be49..d734c49 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,11 @@
+
+ com.lmax
+ disruptor
+ 3.2.0
+
org.hamcrest
hamcrest-core
diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
index 88729c8..029dd81 100644
--- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
+++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
@@ -5,14 +5,21 @@
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.text.NumberFormat;
+import java.util.Arrays;
import java.util.Locale;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslatorOneArg;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.FatalExceptionHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.dsl.Disruptor;
+
/**
* A simple StatsD client implementation facilitating metrics recording.
*
@@ -67,23 +74,37 @@ protected NumberFormat initialValue() {
}
};
+ private final static EventFactory FACTORY = new EventFactory() {
+ @Override
+ public Event newInstance() {
+ return new Event();
+ }
+ };
+
+ private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg() {
+ @Override
+ public void translateTo(Event event, long sequence, String msg) {
+ event.setValue(msg);
+ }
+ };
+
private final String prefix;
private final DatagramChannel clientChannel;
private final InetSocketAddress address;
private final StatsDClientErrorHandler handler;
private final String constantTagsRendered;
- private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
- result.setName("StatsD-" + result.getName());
+ result.setName("StatsD-disruptor-" + result.getName());
result.setDaemon(true);
return result;
}
});
- private final BlockingQueue queue = new LinkedBlockingQueue();
+ private final Disruptor disruptor = new Disruptor(FACTORY, 16384, executor);
/**
* Create a new StatsD client communicating with a StatsD instance on the
@@ -182,7 +203,10 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, String[
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
- this.executor.submit(new QueueConsumer());
+
+ disruptor.handleEventsWith(new Handler());
+ disruptor.handleExceptionsWith(new DisruptorExceptionHandler(this.handler));
+ disruptor.start();
}
/**
@@ -192,6 +216,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, String[
@Override
public void stop() {
try {
+ disruptor.shutdown();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
@@ -438,41 +463,52 @@ public void histogram(String aspect, int value, String... tags) {
}
private void send(String message) {
- queue.offer(message);
+ if(!disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, message)) {
+ handler.handle(InsufficientCapacityException.INSTANCE);
+ }
}
- private class QueueConsumer implements Runnable {
+ private static class Event {
+
+ private String value;
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "Event: " + value;
+ }
+ }
+
+ private class Handler implements EventHandler {
+
private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES);
- @Override public void run() {
- while(!executor.isShutdown()) {
- try {
- String message = queue.poll(1, TimeUnit.SECONDS);
- if(null != message) {
- byte[] data = message.getBytes();
- if(sendBuffer.remaining() < (data.length + 1)) {
- blockingSend();
- }
- if(sendBuffer.position() > 0) {
- sendBuffer.put( (byte) '\n');
- }
- sendBuffer.put(data);
- if(null == queue.peek()) {
- blockingSend();
- }
- }
- } catch (Exception e) {
- handler.handle(e);
- }
+ @Override
+ public void onEvent(Event event, long sequence, boolean batchEnd) throws Exception {
+ String message = event.value;
+ byte[] data = message.getBytes();
+ if(sendBuffer.remaining() < (data.length + 1)) {
+ flush();
+ }
+ if(sendBuffer.position() > 0) {
+ sendBuffer.put( (byte) '\n');
+ }
+ sendBuffer.put(
+ data.length > sendBuffer.remaining() ? Arrays.copyOfRange(data, 0, sendBuffer.remaining()) : data);
+
+ if(batchEnd || 0 == sendBuffer.remaining()) {
+ flush();
}
}
- private void blockingSend() throws IOException {
+ private void flush() throws IOException {
int sizeOfBuffer = sendBuffer.position();
sendBuffer.flip();
int sentBytes = clientChannel.send(sendBuffer, address);
- sendBuffer.limit(sendBuffer.capacity());
- sendBuffer.rewind();
+ sendBuffer.clear();
if (sizeOfBuffer != sentBytes) {
handler.handle(
@@ -487,4 +523,41 @@ private void blockingSend() throws IOException {
}
}
}
+
+ private static class DisruptorExceptionHandler implements ExceptionHandler {
+
+ private final FatalExceptionHandler throwableHandler = new FatalExceptionHandler();
+ private final StatsDClientErrorHandler exceptionHandler;
+
+ public DisruptorExceptionHandler(StatsDClientErrorHandler handler) {
+ this.exceptionHandler = handler;
+ }
+
+ @Override
+ public void handleEventException(Throwable ex, long sequence, Object event) {
+ if(ex instanceof Exception) {
+ exceptionHandler.handle((Exception) ex);
+ } else {
+ throwableHandler.handleEventException(ex, sequence, event);
+ }
+ }
+
+ @Override
+ public void handleOnStartException(Throwable ex) {
+ if(ex instanceof Exception) {
+ exceptionHandler.handle((Exception) ex);
+ } else {
+ throwableHandler.handleOnStartException(ex);
+ }
+ }
+
+ @Override
+ public void handleOnShutdownException(Throwable ex) {
+ if(ex instanceof Exception) {
+ exceptionHandler.handle((Exception) ex);
+ } else {
+ throwableHandler.handleOnShutdownException(ex);
+ }
+ }
+ }
}
From 7d07455f3692507d25b6eaecf708ac8f84109a6e Mon Sep 17 00:00:00 2001
From: ssmiweve
Date: Mon, 30 Jun 2014 14:20:35 +0200
Subject: [PATCH 2/3] configurable ringBuffer size default is 16k msgs maximum.
---
.../statsd/NonBlockingStatsDClient.java | 32 ++++++++++++++++++-
1 file changed, 31 insertions(+), 1 deletion(-)
diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
index 029dd81..7d3981d 100644
--- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
+++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
@@ -19,6 +19,7 @@
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.util.Util;
/**
* A simple StatsD client implementation facilitating metrics recording.
@@ -45,11 +46,14 @@
* on any StatsD clients.
*
* @author Tom Denley
+ * @author mick semb wever
*
*/
public final class NonBlockingStatsDClient implements StatsDClient {
private static final int PACKET_SIZE_BYTES = 1500;
+ private static final int RINGBUFFER_DEFAULT_SIZE = 16384;
+ private static final int RINGBUFFER_MIN_SIZE = 128;
private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() {
@Override public void handle(Exception e) { /* No-op */ }
@@ -104,7 +108,7 @@ public void translateTo(Event event, long sequence, String msg) {
}
});
- private final Disruptor disruptor = new Disruptor(FACTORY, 16384, executor);
+ private final Disruptor disruptor = new Disruptor(FACTORY, calculateRingBufferSize(), executor);
/**
* Create a new StatsD client communicating with a StatsD instance on the
@@ -468,6 +472,32 @@ private void send(String message) {
}
}
+ private static int calculateRingBufferSize() {
+ int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
+
+ String userPreferredRBSize = System.getProperty(
+ "NonBlockingStatsDClient.ringBufferSize",
+ String.valueOf(RINGBUFFER_DEFAULT_SIZE));
+
+ try {
+ ringBufferSize = Integer.parseInt(userPreferredRBSize);
+ if (ringBufferSize < RINGBUFFER_MIN_SIZE) {
+ ringBufferSize = RINGBUFFER_MIN_SIZE;
+
+ LOGGER.warn(
+ "Invalid RingBufferSize {}, using minimum size {}.",
+ userPreferredRBSize,
+ RINGBUFFER_MIN_SIZE);
+ }
+ } catch (NumberFormatException ex) {
+ LOGGER.warn(
+ "Invalid RingBufferSize {}, using default size {}.",
+ userPreferredRBSize,
+ RINGBUFFER_DEFAULT_SIZE);
+ }
+ return Util.ceilingNextPowerOfTwo(ringBufferSize);
+ }
+
private static class Event {
private String value;
From c65b8935a6583bd3c8e729d8b0745235a8ff1df8 Mon Sep 17 00:00:00 2001
From: ssmiweve
Date: Mon, 30 Jun 2014 14:33:22 +0200
Subject: [PATCH 3/3] make the size of the ringBuffer configurable using the
"NonBlockingStatsDClient.ringBufferSize" system property. Its value must be
over 128.
---
.../com/timgroup/statsd/NonBlockingStatsDClient.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
index 7d3981d..e4d280a 100644
--- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
+++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
@@ -11,6 +11,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
@@ -55,6 +56,8 @@ public final class NonBlockingStatsDClient implements StatsDClient {
private static final int RINGBUFFER_DEFAULT_SIZE = 16384;
private static final int RINGBUFFER_MIN_SIZE = 128;
+ private final static Logger LOG = Logger.getLogger(NonBlockingStatsDClient.class.getName());
+
private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() {
@Override public void handle(Exception e) { /* No-op */ }
};
@@ -484,16 +487,16 @@ private static int calculateRingBufferSize() {
if (ringBufferSize < RINGBUFFER_MIN_SIZE) {
ringBufferSize = RINGBUFFER_MIN_SIZE;
- LOGGER.warn(
+ LOG.warning(String.format(
"Invalid RingBufferSize {}, using minimum size {}.",
userPreferredRBSize,
- RINGBUFFER_MIN_SIZE);
+ RINGBUFFER_MIN_SIZE));
}
} catch (NumberFormatException ex) {
- LOGGER.warn(
+ LOG.warning(String.format(
"Invalid RingBufferSize {}, using default size {}.",
userPreferredRBSize,
- RINGBUFFER_DEFAULT_SIZE);
+ RINGBUFFER_DEFAULT_SIZE));
}
return Util.ceilingNextPowerOfTwo(ringBufferSize);
}