|
53 | 53 | import org.elasticsearch.discovery.Discovery; |
54 | 54 | import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; |
55 | 55 | import org.elasticsearch.threadpool.ThreadPool; |
56 | | -import org.elasticsearch.transport.Transports; |
57 | 56 |
|
58 | 57 | import java.util.Arrays; |
59 | 58 | import java.util.Collections; |
@@ -173,14 +172,17 @@ ClusterState state() { |
173 | 172 | return clusterStateSupplier.get(); |
174 | 173 | } |
175 | 174 |
|
| 175 | + private static boolean isMasterUpdateThread() { |
| 176 | + return Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME); |
| 177 | + } |
| 178 | + |
176 | 179 | public static boolean assertMasterUpdateThread() { |
177 | | - assert Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME) : |
178 | | - "not called from the master service thread"; |
| 180 | + assert isMasterUpdateThread() : "not called from the master service thread"; |
179 | 181 | return true; |
180 | 182 | } |
181 | 183 |
|
182 | 184 | public static boolean assertNotMasterUpdateThread(String reason) { |
183 | | - assert Thread.currentThread().getName().contains(MASTER_UPDATE_THREAD_NAME) == false : |
| 185 | + assert isMasterUpdateThread() == false : |
184 | 186 | "Expected current thread [" + Thread.currentThread() + "] to not be the master service thread. Reason: [" + reason + "]"; |
185 | 187 | return true; |
186 | 188 | } |
@@ -240,10 +242,7 @@ protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs task |
240 | 242 | final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() { |
241 | 243 | @Override |
242 | 244 | protected boolean blockingAllowed() { |
243 | | - // allow this one to block on the MasterServiceUpdateThread |
244 | | - return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && |
245 | | - ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && |
246 | | - ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON); |
| 245 | + return isMasterUpdateThread() || super.blockingAllowed(); |
247 | 246 | } |
248 | 247 | }; |
249 | 248 | clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state())); |
|
0 commit comments