@@ -64,7 +64,6 @@ public class DatafeedManager {
6464 private final DatafeedJobBuilder datafeedJobBuilder ;
6565 private final TaskRunner taskRunner = new TaskRunner ();
6666 private final AutodetectProcessManager autodetectProcessManager ;
67- private volatile boolean isolated ;
6867
6968 public DatafeedManager (ThreadPool threadPool , Client client , ClusterService clusterService , DatafeedJobBuilder datafeedJobBuilder ,
7069 Supplier <Long > currentTimeSupplier , Auditor auditor , AutodetectProcessManager autodetectProcessManager ) {
@@ -130,18 +129,20 @@ public void stopAllDatafeedsOnThisNode(String reason) {
130129 * This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves
131130 * the datafeed tasks in the "started" state, so that they get restarted on a different node.
132131 */
133- public void isolateAllDatafeedsOnThisNode () {
134- isolated = true ;
132+ public void isolateAllDatafeedsOnThisNodeBeforeShutdown () {
135133 Iterator <Holder > iter = runningDatafeedsOnThisNode .values ().iterator ();
136134 while (iter .hasNext ()) {
137135 Holder next = iter .next ();
138136 next .isolateDatafeed ();
139- next .setRelocating ();
137+ // TODO: it's not ideal that this "isolate" method does something a bit different to the one below
138+ next .setNodeIsShuttingDown ();
140139 iter .remove ();
141140 }
142141 }
143142
144143 public void isolateDatafeed (long allocationId ) {
144+ // This calls get() rather than remove() because we expect that the persistent task will
145+ // be removed shortly afterwards and that operation needs to be able to find the holder
145146 Holder holder = runningDatafeedsOnThisNode .get (allocationId );
146147 if (holder != null ) {
147148 holder .isolateDatafeed ();
@@ -195,7 +196,7 @@ protected void doRun() {
195196 holder .stop ("general_lookback_failure" , TimeValue .timeValueSeconds (20 ), e );
196197 return ;
197198 }
198- if (isolated == false ) {
199+ if (holder . isIsolated () == false ) {
199200 if (next != null ) {
200201 doDatafeedRealtime (next , holder .datafeedJob .getJobId (), holder );
201202 } else {
@@ -298,7 +299,7 @@ public class Holder {
298299 private final ProblemTracker problemTracker ;
299300 private final Consumer <Exception > finishHandler ;
300301 volatile Scheduler .Cancellable cancellable ;
301- private volatile boolean isRelocating ;
302+ private volatile boolean isNodeShuttingDown ;
302303
303304 Holder (TransportStartDatafeedAction .DatafeedTask task , String datafeedId , DatafeedJob datafeedJob ,
304305 ProblemTracker problemTracker , Consumer <Exception > finishHandler ) {
@@ -324,7 +325,7 @@ boolean isIsolated() {
324325 }
325326
326327 public void stop (String source , TimeValue timeout , Exception e ) {
327- if (isRelocating ) {
328+ if (isNodeShuttingDown ) {
328329 return ;
329330 }
330331
@@ -344,11 +345,12 @@ public void stop(String source, TimeValue timeout, Exception e) {
344345 if (cancellable != null ) {
345346 cancellable .cancel ();
346347 }
347- auditor .info (datafeedJob .getJobId (), Messages .getMessage (Messages .JOB_AUDIT_DATAFEED_STOPPED ));
348+ auditor .info (datafeedJob .getJobId (),
349+ Messages .getMessage (isIsolated () ? Messages .JOB_AUDIT_DATAFEED_ISOLATED : Messages .JOB_AUDIT_DATAFEED_STOPPED ));
348350 finishHandler .accept (e );
349351 logger .info ("[{}] datafeed [{}] for job [{}] has been stopped{}" , source , datafeedId , datafeedJob .getJobId (),
350352 acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout .getStringRep () + "] expired" );
351- if (autoCloseJob ) {
353+ if (autoCloseJob && isIsolated () == false ) {
352354 closeJob ();
353355 }
354356 if (acquired ) {
@@ -361,16 +363,18 @@ public void stop(String source, TimeValue timeout, Exception e) {
361363 }
362364
363365 /**
364- * This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called
365- * immediately prior to shutting down a node. Then the datafeed task can remain "started", and be
366- * relocated to a different node. Calling this method at any other time will ruin the datafeed.
366+ * This stops a datafeed WITHOUT updating the corresponding persistent task. When called it
367+ * will stop the datafeed from sending data to its job as quickly as possible. The caller
368+ * must do something sensible with the corresponding persistent task. If the node is shutting
369+ * down the task will automatically get reassigned. Otherwise the caller must take action to
370+ * remove or reassign the persistent task, or the datafeed will be left in limbo.
367371 */
368372 public void isolateDatafeed () {
369373 datafeedJob .isolate ();
370374 }
371375
372- public void setRelocating () {
373- isRelocating = true ;
376+ public void setNodeIsShuttingDown () {
377+ isNodeShuttingDown = true ;
374378 }
375379
376380 private Long executeLookBack (long startTime , Long endTime ) throws Exception {
0 commit comments