2222import org .apache .logging .log4j .Logger ;
2323import org .apache .logging .log4j .LogManager ;
2424import org .elasticsearch .cluster .ClusterState ;
25+ import org .elasticsearch .cluster .coordination .Coordinator ;
2526import org .elasticsearch .cluster .node .DiscoveryNode ;
2627import org .elasticsearch .cluster .routing .allocation .AllocationService ;
2728import org .elasticsearch .cluster .service .ClusterApplier ;
29+ import org .elasticsearch .cluster .service .ClusterApplierService ;
2830import org .elasticsearch .cluster .service .MasterService ;
31+ import org .elasticsearch .common .Randomness ;
2932import org .elasticsearch .common .io .stream .NamedWriteableRegistry ;
3033import org .elasticsearch .common .network .NetworkService ;
3134import org .elasticsearch .common .settings .ClusterSettings ;
5861import java .util .function .Supplier ;
5962import java .util .stream .Collectors ;
6063
64+ import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
65+
6166/**
6267 * A module for loading classes for node discovery.
6368 */
6469public class DiscoveryModule {
6570 private static final Logger logger = LogManager .getLogger (DiscoveryModule .class );
6671
72+ public static final String ZEN_DISCOVERY_TYPE = "zen" ;
73+ public static final String ZEN2_DISCOVERY_TYPE = "zen2" ;
74+
6775 public static final Setting <String > DISCOVERY_TYPE_SETTING =
68- new Setting <>("discovery.type" , "zen" , Function .identity (), Property .NodeScope );
76+ new Setting <>("discovery.type" , ZEN_DISCOVERY_TYPE , Function .identity (), Property .NodeScope );
6977 public static final Setting <List <String >> DISCOVERY_HOSTS_PROVIDER_SETTING =
7078 Setting .listSetting ("discovery.zen.hosts_provider" , Collections .emptyList (), Function .identity (), Property .NodeScope );
7179
@@ -75,14 +83,14 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
7583 NamedWriteableRegistry namedWriteableRegistry , NetworkService networkService , MasterService masterService ,
7684 ClusterApplier clusterApplier , ClusterSettings clusterSettings , List <DiscoveryPlugin > plugins ,
7785 AllocationService allocationService , Path configFile , GatewayMetaState gatewayMetaState ) {
78- final Collection <BiConsumer <DiscoveryNode ,ClusterState >> joinValidators = new ArrayList <>();
86+ final Collection <BiConsumer <DiscoveryNode , ClusterState >> joinValidators = new ArrayList <>();
7987 final Map <String , Supplier <UnicastHostsProvider >> hostProviders = new HashMap <>();
8088 hostProviders .put ("settings" , () -> new SettingsBasedHostsProvider (settings , transportService ));
8189 hostProviders .put ("file" , () -> new FileBasedUnicastHostsProvider (configFile ));
8290 for (DiscoveryPlugin plugin : plugins ) {
83- plugin .getZenHostsProviders (transportService , networkService ).entrySet (). forEach (entry -> {
84- if (hostProviders .put (entry . getKey (), entry . getValue () ) != null ) {
85- throw new IllegalArgumentException ("Cannot register zen hosts provider [" + entry . getKey () + "] twice" );
91+ plugin .getZenHostsProviders (transportService , networkService ).forEach (( key , value ) -> {
92+ if (hostProviders .put (key , value ) != null ) {
93+ throw new IllegalArgumentException ("Cannot register zen hosts provider [" + key + "] twice" );
8694 }
8795 });
8896 BiConsumer <DiscoveryNode , ClusterState > joinValidator = plugin .getJoinValidator ();
@@ -117,18 +125,21 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
117125 };
118126
119127 Map <String , Supplier <Discovery >> discoveryTypes = new HashMap <>();
120- discoveryTypes .put ("zen" ,
128+ discoveryTypes .put (ZEN_DISCOVERY_TYPE ,
121129 () -> new ZenDiscovery (settings , threadPool , transportService , namedWriteableRegistry , masterService , clusterApplier ,
122130 clusterSettings , hostsProvider , allocationService , Collections .unmodifiableCollection (joinValidators ), gatewayMetaState ));
131+ discoveryTypes .put (ZEN2_DISCOVERY_TYPE , () -> new Coordinator (NODE_NAME_SETTING .get (settings ), settings , clusterSettings ,
132+ transportService , namedWriteableRegistry , allocationService , masterService ,
133+ () -> gatewayMetaState .getPersistedState (settings , (ClusterApplierService ) clusterApplier ), hostsProvider , clusterApplier ,
134+ Randomness .get ()));
123135 discoveryTypes .put ("single-node" , () -> new SingleNodeDiscovery (settings , transportService , masterService , clusterApplier ));
124136 for (DiscoveryPlugin plugin : plugins ) {
125- plugin .getDiscoveryTypes (threadPool , transportService , namedWriteableRegistry ,
126- masterService , clusterApplier , clusterSettings , hostsProvider , allocationService , gatewayMetaState ).entrySet ()
127- .forEach (entry -> {
128- if (discoveryTypes .put (entry .getKey (), entry .getValue ()) != null ) {
129- throw new IllegalArgumentException ("Cannot register discovery type [" + entry .getKey () + "] twice" );
130- }
131- });
137+ plugin .getDiscoveryTypes (threadPool , transportService , namedWriteableRegistry , masterService , clusterApplier , clusterSettings ,
138+ hostsProvider , allocationService , gatewayMetaState ).forEach ((key , value ) -> {
139+ if (discoveryTypes .put (key , value ) != null ) {
140+ throw new IllegalArgumentException ("Cannot register discovery type [" + key + "] twice" );
141+ }
142+ });
132143 }
133144 String discoveryType = DISCOVERY_TYPE_SETTING .get (settings );
134145 Supplier <Discovery > discoverySupplier = discoveryTypes .get (discoveryType );
0 commit comments