|
19 | 19 |
|
20 | 20 | package org.elasticsearch.indices.cluster; |
21 | 21 |
|
| 22 | +import org.elasticsearch.ExceptionsHelper; |
22 | 23 | import org.elasticsearch.Version; |
23 | 24 | import org.elasticsearch.action.ActionResponse; |
24 | 25 | import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; |
|
40 | 41 | import org.elasticsearch.action.support.master.TransportMasterNodeAction; |
41 | 42 | import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils; |
42 | 43 | import org.elasticsearch.cluster.ClusterState; |
43 | | -import org.elasticsearch.cluster.ClusterStateTaskConfig; |
44 | 44 | import org.elasticsearch.cluster.ClusterStateTaskExecutor; |
| 45 | +import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; |
| 46 | +import org.elasticsearch.cluster.ClusterStateUpdateTask; |
45 | 47 | import org.elasticsearch.cluster.EmptyClusterInfoService; |
46 | | -import org.elasticsearch.cluster.NodeConnectionsService; |
47 | 48 | import org.elasticsearch.cluster.action.shard.ShardStateAction; |
48 | | -import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; |
49 | 49 | import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; |
| 50 | +import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; |
50 | 51 | import org.elasticsearch.cluster.metadata.AliasValidator; |
51 | 52 | import org.elasticsearch.cluster.metadata.IndexMetaData; |
52 | 53 | import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; |
|
56 | 57 | import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; |
57 | 58 | import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; |
58 | 59 | import org.elasticsearch.cluster.node.DiscoveryNode; |
59 | | -import org.elasticsearch.cluster.node.DiscoveryNodes; |
60 | 60 | import org.elasticsearch.cluster.routing.ShardRouting; |
61 | 61 | import org.elasticsearch.cluster.routing.allocation.AllocationService; |
62 | 62 | import org.elasticsearch.cluster.routing.allocation.FailedShard; |
|
65 | 65 | import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; |
66 | 66 | import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; |
67 | 67 | import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; |
68 | | -import org.elasticsearch.cluster.service.ClusterApplierService; |
69 | 68 | import org.elasticsearch.cluster.service.ClusterService; |
70 | | -import org.elasticsearch.common.Priority; |
71 | 69 | import org.elasticsearch.common.UUIDs; |
72 | 70 | import org.elasticsearch.common.component.AbstractComponent; |
73 | 71 | import org.elasticsearch.common.settings.ClusterSettings; |
|
94 | 92 | import java.util.Collections; |
95 | 93 | import java.util.HashSet; |
96 | 94 | import java.util.List; |
97 | | -import java.util.concurrent.atomic.AtomicReference; |
98 | 95 | import java.util.stream.Collectors; |
99 | 96 |
|
100 | 97 | import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom; |
101 | 98 | import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; |
102 | | -import static org.junit.Assert.assertNotNull; |
103 | | -import static org.junit.Assert.assertNull; |
104 | | -import static org.junit.Assert.assertSame; |
| 99 | +import static org.hamcrest.Matchers.notNullValue; |
| 100 | +import static org.junit.Assert.assertThat; |
105 | 101 | import static org.mockito.Matchers.any; |
106 | 102 | import static org.mockito.Matchers.anyList; |
| 103 | +import static org.mockito.Matchers.anyString; |
| 104 | +import static org.mockito.Mockito.doAnswer; |
107 | 105 | import static org.mockito.Mockito.mock; |
108 | 106 | import static org.mockito.Mockito.when; |
109 | 107 |
|
110 | 108 | public class ClusterStateChanges extends AbstractComponent { |
111 | 109 |
|
112 | | - private final ClusterService clusterService; |
113 | 110 | private final AllocationService allocationService; |
114 | | - private final AtomicReference<Runnable> nextMasterTaskToRun; |
115 | | - private final AtomicReference<ClusterState> lastClusterStateRef; |
| 111 | + private final ClusterService clusterService; |
116 | 112 | private final ShardStateAction.ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor; |
117 | 113 | private final ShardStateAction.ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor; |
118 | 114 |
|
@@ -144,29 +140,9 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th |
144 | 140 | DestructiveOperations destructiveOperations = new DestructiveOperations(settings, clusterSettings); |
145 | 141 | Environment environment = TestEnvironment.newEnvironment(settings); |
146 | 142 | Transport transport = mock(Transport.class); // it's not used |
147 | | - nextMasterTaskToRun = new AtomicReference<>(); |
148 | | - FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("fake-master", nextMasterTaskToRun::set); |
149 | | - lastClusterStateRef = new AtomicReference<>(); |
150 | | - masterService.setClusterStateSupplier(lastClusterStateRef::get); |
151 | | - masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { |
152 | | - lastClusterStateRef.set(event.state()); |
153 | | - publishListener.onResponse(null); |
154 | | - }); |
155 | | - clusterService = new ClusterService(settings, clusterSettings, masterService, mock(ClusterApplierService.class)); |
156 | | - clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { |
157 | | - @Override |
158 | | - public void connectToNodes(DiscoveryNodes discoveryNodes) { |
159 | | - // skip |
160 | | - } |
161 | | - |
162 | | - @Override |
163 | | - public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { |
164 | | - // skip |
165 | | - } |
166 | | - }); |
167 | | - clusterService.start(); |
168 | 143 |
|
169 | 144 | // mocks |
| 145 | + clusterService = mock(ClusterService.class); |
170 | 146 | IndicesService indicesService = mock(IndicesService.class); |
171 | 147 | // MetaDataCreateIndexService creates indices using its IndicesService instance to check mappings -> fake it here |
172 | 148 | try { |
@@ -286,40 +262,39 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout |
286 | 262 | } |
287 | 263 |
|
288 | 264 | private <T> ClusterState runTasks(ClusterStateTaskExecutor<T> executor, ClusterState clusterState, List<T> entries) { |
289 | | - lastClusterStateRef.set(clusterState); |
290 | | - assertNull(nextMasterTaskToRun.get()); |
291 | | - for (T task : entries) { |
292 | | - clusterService.submitStateUpdateTask(task.toString(), task, ClusterStateTaskConfig.build(Priority.NORMAL), executor, |
293 | | - (source, e) -> {}); |
294 | | - } |
295 | | - if (entries.isEmpty()) { |
296 | | - assertNull(nextMasterTaskToRun.get()); |
297 | | - } else { |
298 | | - assertNotNull(nextMasterTaskToRun.get()); |
299 | | - nextMasterTaskToRun.getAndSet(null).run(); |
300 | | - ClusterState firstClusterState = lastClusterStateRef.get(); |
301 | | - for (int i = 1; i < entries.size(); i++) { |
302 | | - nextMasterTaskToRun.getAndSet(null).run(); |
303 | | - assertSame(firstClusterState, lastClusterStateRef.get()); // due to batching, only the first actually does something |
| 265 | + try { |
| 266 | + ClusterTasksResult<T> result = executor.execute(clusterState, entries); |
| 267 | + for (ClusterStateTaskExecutor.TaskResult taskResult : result.executionResults.values()) { |
| 268 | + if (taskResult.isSuccess() == false) { |
| 269 | + throw taskResult.getFailure(); |
| 270 | + } |
304 | 271 | } |
| 272 | + return result.resultingState; |
| 273 | + } catch (Exception e) { |
| 274 | + throw ExceptionsHelper.convertToRuntime(e); |
305 | 275 | } |
306 | | - assertNull(nextMasterTaskToRun.get()); |
307 | | - return lastClusterStateRef.get(); |
308 | 276 | } |
309 | 277 |
|
310 | 278 | private <Request extends MasterNodeRequest<Request>, Response extends ActionResponse> ClusterState execute( |
311 | 279 | TransportMasterNodeAction<Request, Response> masterNodeAction, Request request, ClusterState clusterState) { |
312 | | - lastClusterStateRef.set(clusterState); |
313 | | - assertNull(nextMasterTaskToRun.get()); |
314 | | - try { |
315 | | - TransportMasterNodeActionUtils.runMasterOperation(masterNodeAction, request, clusterState, new PlainActionFuture<>()); |
316 | | - } catch (Exception e) { |
317 | | - throw new RuntimeException(e); |
318 | | - } |
319 | | - assertNotNull(nextMasterTaskToRun.get()); |
320 | | - nextMasterTaskToRun.getAndSet(null).run(); |
321 | | - assertNull(nextMasterTaskToRun.get()); |
322 | | - return lastClusterStateRef.get(); |
| 280 | + return executeClusterStateUpdateTask(clusterState, () -> { |
| 281 | + try { |
| 282 | + TransportMasterNodeActionUtils.runMasterOperation(masterNodeAction, request, clusterState, new PlainActionFuture<>()); |
| 283 | + } catch (Exception e) { |
| 284 | + throw new RuntimeException(e); |
| 285 | + } |
| 286 | + }); |
323 | 287 | } |
324 | 288 |
|
| 289 | + private ClusterState executeClusterStateUpdateTask(ClusterState state, Runnable runnable) { |
| 290 | + ClusterState[] result = new ClusterState[1]; |
| 291 | + doAnswer(invocationOnMock -> { |
| 292 | + ClusterStateUpdateTask task = (ClusterStateUpdateTask)invocationOnMock.getArguments()[1]; |
| 293 | + result[0] = task.execute(state); |
| 294 | + return null; |
| 295 | + }).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); |
| 296 | + runnable.run(); |
| 297 | + assertThat(result[0], notNullValue()); |
| 298 | + return result[0]; |
| 299 | + } |
325 | 300 | } |
0 commit comments