diff --git a/logback-core/src/main/java/ch/qos/logback/core/AsyncAppenderBase.java b/logback-core/src/main/java/ch/qos/logback/core/AsyncAppenderBase.java index a51f6efec3..c2ea32d985 100755 --- a/logback-core/src/main/java/ch/qos/logback/core/AsyncAppenderBase.java +++ b/logback-core/src/main/java/ch/qos/logback/core/AsyncAppenderBase.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * This appender and derived classes, log events asynchronously. In order to @@ -45,6 +46,9 @@ */ public class AsyncAppenderBase extends UnsynchronizedAppenderBase implements AppenderAttachable { + private final AtomicInteger droppedEvents = new AtomicInteger(0); + private final AtomicInteger discardedEvents = new AtomicInteger(0); + AppenderAttachableImpl aai = new AppenderAttachableImpl(); BlockingQueue blockingQueue; @@ -160,6 +164,7 @@ public void stop() { @Override protected void append(E eventObject) { if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) { + discardedEvents.incrementAndGet(); return; } preprocess(eventObject); @@ -172,7 +177,10 @@ public boolean isQueueBelowDiscardingThreshold() { private void put(E eventObject) { if (neverBlock) { - blockingQueue.offer(eventObject); + boolean inserted = blockingQueue.offer(eventObject); + if (!inserted) { + droppedEvents.incrementAndGet(); + } } else { putUninterruptibly(eventObject); } @@ -196,6 +204,27 @@ private void putUninterruptibly(E eventObject) { } } + /** + * Returns the total number of discarded events due to reaching the discardingThreshold, + * since the creation of this appender. + * + * @return number of discarded events. + */ + public int getDiscardedEvents() { + return discardedEvents.get(); + } + + /** + * Returns the total number of dropped events since the creation of this appender. + * This can only return a non-zero value if the appender has been configured with + * {@link #setNeverBlock(boolean)} set to true and exceeding the queue size. + * + * @return number of dropped events. + */ + public int getDroppedEvents() { + return droppedEvents.get(); + } + public int getQueueSize() { return queueSize; } diff --git a/logback-core/src/test/java/ch/qos/logback/core/AsyncAppenderBaseTest.java b/logback-core/src/test/java/ch/qos/logback/core/AsyncAppenderBaseTest.java index 6b9bd9a55c..722568d255 100755 --- a/logback-core/src/test/java/ch/qos/logback/core/AsyncAppenderBaseTest.java +++ b/logback-core/src/test/java/ch/qos/logback/core/AsyncAppenderBaseTest.java @@ -39,6 +39,8 @@ public class AsyncAppenderBaseTest { AsyncAppenderBase asyncAppenderBase = new AsyncAppenderBase(); LossyAsyncAppender lossyAsyncAppender = new LossyAsyncAppender(); DelayingListAppender delayingListAppender = new DelayingListAppender(); + BlockingListAppender blockingListAppender = new BlockingListAppender(); + DiscardingAsyncAppender discardingAsyncAppender = new DiscardingAsyncAppender(); ListAppender listAppender = new ListAppender(); OnConsoleStatusListener onConsoleStatusListener = new OnConsoleStatusListener(); StatusChecker statusChecker = new StatusChecker(context); @@ -51,6 +53,7 @@ public void setUp() { asyncAppenderBase.setContext(context); lossyAsyncAppender.setContext(context); + discardingAsyncAppender.setContext(context); listAppender.setContext(context); listAppender.setName("list"); @@ -59,6 +62,10 @@ public void setUp() { delayingListAppender.setContext(context); delayingListAppender.setName("list"); delayingListAppender.start(); + + blockingListAppender.setContext(context); + blockingListAppender.setName("blocking list"); + blockingListAppender.start(); } @Test @@ -301,6 +308,51 @@ public void verifyInterruptionOfWorkerIsSwallowed() { Assertions.assertFalse(asyncAppenderBase.worker.isInterrupted()); } + @Test + public void verifyDiscardedEventsCounter() throws InterruptedException { + int bufferSize = 5; + int discardingThreshold = 3; + // One event will have been removed by the consuming thread. + int expectedDiscardedEvents = bufferSize - 1 - discardingThreshold; + discardingAsyncAppender.addAppender(blockingListAppender); + discardingAsyncAppender.setQueueSize(bufferSize); + discardingAsyncAppender.setDiscardingThreshold(discardingThreshold); + discardingAsyncAppender.start(); + + for (int i = 0; i < bufferSize; i++) { + discardingAsyncAppender.doAppend(i); + // Make sure the consuming thread is in a blocked state. + blockingListAppender.waitForAppend(); + } + blockingListAppender.unblock(); + discardingAsyncAppender.stop(); + + Assertions.assertEquals(expectedDiscardedEvents, discardingAsyncAppender.getDiscardedEvents()); + } + + @Test + public void verifyDroppedEventsCounter() throws InterruptedException { + int bufferSize = 5; + int loopLen = bufferSize * 2; + // One event will have been removed by the consuming thread. + int expectedDroppedEvents = bufferSize - 1; + asyncAppenderBase.setMaxFlushTime(1); + asyncAppenderBase.addAppender(blockingListAppender); + asyncAppenderBase.setQueueSize(bufferSize); + asyncAppenderBase.setNeverBlock(true); + asyncAppenderBase.start(); + + for (int i = 0; i < loopLen; i++) { + asyncAppenderBase.doAppend(i); + // Make sure the consuming thread is in a blocked state. + blockingListAppender.waitForAppend(); + } + blockingListAppender.unblock(); + asyncAppenderBase.stop(); + + Assertions.assertEquals(expectedDroppedEvents, asyncAppenderBase.getDroppedEvents()); + } + private void verify(ListAppender la, int atLeast) { // ListAppender passes as parameter should be stopped at this stage Assertions.assertFalse(la.isStarted()); @@ -309,6 +361,38 @@ private void verify(ListAppender la, int atLeast) { statusChecker.assertContainsMatch("Worker thread will flush remaining events before exiting."); } + static class BlockingListAppender extends ListAppender { + + private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch firstAppend = new CountDownLatch(1); + + public void unblock() { + latch.countDown(); + } + + public void waitForAppend() throws InterruptedException { + firstAppend.await(); + } + + @Override + protected void append(E e) { + try { + firstAppend.countDown(); + latch.await(); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + super.append(e); + } + } + + static class DiscardingAsyncAppender extends AsyncAppenderBase { + @Override + protected boolean isDiscardable(Integer i) { + return true; + } + } + static class LossyAsyncAppender extends AsyncAppenderBase { @Override protected boolean isDiscardable(Integer i) {