-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Refactor Sniffer and make it testable #29638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
e581c56
900d3aa
0d8a270
c9ec5a8
1858a4c
4c886a0
c2a88a3
5d7c636
9407f80
9658886
4094a4e
f3baab3
07dce21
5603296
440bd5c
2cf41b0
dccb673
880b6a9
1d5ac98
1759215
44b5178
2dd307e
8b22063
8a8d348
f80b119
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,12 +31,13 @@ | |
| import java.security.PrivilegedAction; | ||
| import java.util.List; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| /** | ||
| * Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of | ||
|
|
@@ -51,103 +52,84 @@ public class Sniffer implements Closeable { | |
| private static final Log logger = LogFactory.getLog(Sniffer.class); | ||
| private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer"; | ||
|
|
||
| private final Task task; | ||
| private final HostsSniffer hostsSniffer; | ||
| private final RestClient restClient; | ||
|
|
||
| private final long sniffIntervalMillis; | ||
| private final long sniffAfterFailureDelayMillis; | ||
| private final Scheduler scheduler; | ||
|
|
||
| private final AtomicReference<Future> nextTask = new AtomicReference<>(); | ||
|
|
||
| Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) { | ||
| this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay); | ||
| this(restClient, hostsSniffer, new DefaultScheduler(), sniffInterval, sniffAfterFailureDelay); | ||
| } | ||
|
|
||
| Sniffer(RestClient restClient, HostsSniffer hostsSniffer, Scheduler scheduler, long sniffInterval, long sniffAfterFailureDelay) { | ||
| this.hostsSniffer = hostsSniffer; | ||
| this.restClient = restClient; | ||
| this.sniffIntervalMillis = sniffInterval; | ||
| this.sniffAfterFailureDelayMillis = sniffAfterFailureDelay; | ||
| this.scheduler = scheduler; | ||
| //first sniffing round is immediately executed, next one will be executed depending on the configured sniff interval | ||
| scheduleNextRound(0L, sniffIntervalMillis, false); | ||
| } | ||
|
|
||
| /** | ||
| * Triggers a new sniffing round and explicitly takes out the failed host provided as argument | ||
| * Triggers a new immediate sniffing round, which will schedule a new round in sniffAfterFailureDelayMillis ms | ||
| */ | ||
| public void sniffOnFailure(HttpHost failedHost) { | ||
| this.task.sniffOnFailure(failedHost); | ||
| public final void sniffOnFailure() { | ||
| scheduleNextRound(0L, sniffAfterFailureDelayMillis, true); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| task.shutdown(); | ||
| private void scheduleNextRound(long delay, long nextDelay, boolean mustCancelNextRound) { | ||
| Task task = new Task(nextDelay); | ||
| Future<?> nextFuture = task.schedule(delay); | ||
| Future<?> previousFuture = nextTask.getAndSet(nextFuture); | ||
| if (mustCancelNextRound) { | ||
| previousFuture.cancel(false); | ||
| } | ||
| } | ||
|
|
||
| private static class Task implements Runnable { | ||
| private final HostsSniffer hostsSniffer; | ||
| private final RestClient restClient; | ||
|
|
||
| private final long sniffIntervalMillis; | ||
| private final long sniffAfterFailureDelayMillis; | ||
| private final ScheduledExecutorService scheduledExecutorService; | ||
| private final AtomicBoolean running = new AtomicBoolean(false); | ||
| private ScheduledFuture<?> scheduledFuture; | ||
|
|
||
| private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffIntervalMillis, long sniffAfterFailureDelayMillis) { | ||
| this.hostsSniffer = hostsSniffer; | ||
| this.restClient = restClient; | ||
| this.sniffIntervalMillis = sniffIntervalMillis; | ||
| this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis; | ||
| SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME); | ||
| this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory); | ||
| scheduleNextRun(0); | ||
| final class Task implements Runnable { | ||
| final long nextTaskDelay; | ||
|
|
||
| Task(long nextTaskDelay) { | ||
| this.nextTaskDelay = nextTaskDelay; | ||
| } | ||
|
|
||
| synchronized void scheduleNextRun(long delayMillis) { | ||
| if (scheduledExecutorService.isShutdown() == false) { | ||
| try { | ||
| if (scheduledFuture != null) { | ||
| //regardless of when the next sniff is scheduled, cancel it and schedule a new one with updated delay | ||
| this.scheduledFuture.cancel(false); | ||
| } | ||
| logger.debug("scheduling next sniff in " + delayMillis + " ms"); | ||
| this.scheduledFuture = this.scheduledExecutorService.schedule(this, delayMillis, TimeUnit.MILLISECONDS); | ||
| } catch(Exception e) { | ||
| logger.error("error while scheduling next sniffer task", e); | ||
| } | ||
| } | ||
| Future<?> schedule(long delay) { | ||
|
||
| return scheduler.schedule(this, delay); | ||
| } | ||
|
|
||
| @Override | ||
| public void run() { | ||
| sniff(null, sniffIntervalMillis); | ||
| } | ||
|
|
||
| void sniffOnFailure(HttpHost failedHost) { | ||
| sniff(failedHost, sniffAfterFailureDelayMillis); | ||
| } | ||
|
|
||
| void sniff(HttpHost excludeHost, long nextSniffDelayMillis) { | ||
| if (running.compareAndSet(false, true)) { | ||
| try { | ||
| List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts(); | ||
| logger.debug("sniffed hosts: " + sniffedHosts); | ||
| if (excludeHost != null) { | ||
| sniffedHosts.remove(excludeHost); | ||
| } | ||
| if (sniffedHosts.isEmpty()) { | ||
| logger.warn("no hosts to set, hosts will be updated at the next sniffing round"); | ||
| } else { | ||
| this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()])); | ||
| } | ||
| } catch (Exception e) { | ||
| logger.error("error while sniffing nodes", e); | ||
| } finally { | ||
| scheduleNextRun(nextSniffDelayMillis); | ||
| running.set(false); | ||
| } | ||
| try { | ||
| sniff(); | ||
| } catch (Exception e) { | ||
| logger.error("error while sniffing nodes", e); | ||
| } finally { | ||
| scheduleNextRound(nextTaskDelay, sniffIntervalMillis, false); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| synchronized void shutdown() { | ||
| scheduledExecutorService.shutdown(); | ||
| try { | ||
| if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) { | ||
| return; | ||
| } | ||
| scheduledExecutorService.shutdownNow(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| final void sniff() throws IOException { | ||
| List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts(); | ||
| logger.debug("sniffed hosts: " + sniffedHosts); | ||
|
||
| if (sniffedHosts.isEmpty()) { | ||
| logger.warn("no hosts to set, hosts will be updated at the next sniffing round"); | ||
| } else { | ||
| restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()])); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| nextTask.get().cancel(false); | ||
| this.scheduler.shutdown(); | ||
| } | ||
|
|
||
| /** | ||
| * Returns a new {@link SnifferBuilder} to help with {@link Sniffer} creation. | ||
| * | ||
|
|
@@ -158,8 +140,62 @@ public static SnifferBuilder builder(RestClient restClient) { | |
| return new SnifferBuilder(restClient); | ||
| } | ||
|
|
||
| private static class SnifferThreadFactory implements ThreadFactory { | ||
| /** | ||
| * The Scheduler interface allows to isolate the sniffing scheduling aspects so that we can test | ||
| * the sniffer by injecting when needed a custom scheduler that is more suited for testing. | ||
| */ | ||
| interface Scheduler { | ||
| /** | ||
| * Schedules the provided {@link Runnable} to be executed in <code>delayMillis</code> milliseconds | ||
| */ | ||
| Future<?> schedule(Task task, long delayMillis); | ||
|
|
||
| /** | ||
| * Shuts this scheduler down | ||
| */ | ||
| void shutdown(); | ||
| } | ||
|
|
||
| /** | ||
| * Default implementation of {@link Scheduler}, based on {@link ScheduledExecutorService} | ||
| */ | ||
| static final class DefaultScheduler implements Scheduler { | ||
| final ScheduledExecutorService executor; | ||
|
|
||
| DefaultScheduler() { | ||
| this(initScheduledExecutorService()); | ||
| } | ||
|
|
||
| DefaultScheduler(ScheduledExecutorService executor) { | ||
| this.executor = executor; | ||
| } | ||
|
|
||
| private static ScheduledExecutorService initScheduledExecutorService() { | ||
| ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new SnifferThreadFactory(SNIFFER_THREAD_NAME)); | ||
| executor.setRemoveOnCancelPolicy(true); | ||
| return executor; | ||
| } | ||
|
|
||
| @Override | ||
| public Future<?> schedule(Task task, long delayMillis) { | ||
| return executor.schedule(task, delayMillis, TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| @Override | ||
| public void shutdown() { | ||
| executor.shutdown(); | ||
| try { | ||
| if (executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { | ||
| return; | ||
| } | ||
| executor.shutdownNow(); | ||
| } catch (InterruptedException ignore) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| static class SnifferThreadFactory implements ThreadFactory { | ||
| private final AtomicInteger threadNumber = new AtomicInteger(1); | ||
| private final String namePrefix; | ||
| private final ThreadFactory originalThreadFactory; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was playing with cancellation as part of reindex I found that canceling a Runnable was sort of "best effort". If you make a test that calls
sniffOnFailurea bunch of time really fast together I'll bet you get multiple rounds of sniffing in parallel.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it depends on whether the runnable has already started or not? That is what I've seen, but hard to test in real-life though from a unit test...