();
+
+ if (location == StandardLocation.CLASS_PATH && kinds.contains(Kind.CLASS)) {
+ for (JavaFileObject file : fileObjects.values()) {
+ if (file.getKind() == Kind.CLASS && file.getName().startsWith(packageName)) {
+ files.add(file);
+ }
+ }
+
+ files.addAll(classLoader.files());
+ } else if (location == StandardLocation.SOURCE_PATH && kinds.contains(Kind.SOURCE)) {
+ for (JavaFileObject file : fileObjects.values()) {
+ if (file.getKind() == Kind.SOURCE && file.getName().startsWith(packageName)) {
+ files.add(file);
+ }
+ }
+ }
+
+ for (JavaFileObject file : result) {
+ files.add(file);
+ }
+
+ return files;
+ }
+ }
+
+
+}
diff --git a/job-core/src/main/java/com/lts/job/core/constant/Constants.java b/job-core/src/main/java/com/lts/job/core/constant/Constants.java
index 2218ed44c..c6fddfb3e 100644
--- a/job-core/src/main/java/com/lts/job/core/constant/Constants.java
+++ b/job-core/src/main/java/com/lts/job/core/constant/Constants.java
@@ -1,6 +1,8 @@
package com.lts.job.core.constant;
+import java.util.regex.Pattern;
+
/**
* @author Robert HG (254963746@qq.com) on 7/24/14.
* 一些配置常量
@@ -24,4 +26,37 @@ public interface Constants {
// 默认TaskTracker节点组
public static final String DEFAULT_NODE_TASK_TRACKER_GROUP = "taskTrackerGroup";
+ public static final String CHARSET = "utf-8";
+
+ public static final int DEFAULT_TIMEOUT = 1000;
+
+ public static final String TIMEOUT_KEY = "timeout";
+
+ public static final String SESSION_TIMEOUT_KEY = "session";
+
+ public static final int DEFAULT_SESSION_TIMEOUT = 60 * 1000;
+
+ public static final String REGISTER = "register";
+
+ public static final String UNREGISTER = "unregister";
+
+ /**
+ * 注册中心失败事件重试事件
+ */
+ public static final String REGISTRY_RETRY_PERIOD_KEY = "retry.period";
+
+ /**
+ * 重试周期
+ */
+ public static final int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
+
+ public static final Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*");
+
+ /**
+ * 注册中心自动重连时间
+ */
+ public static final String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period";
+
+ public static final int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000;
+
}
diff --git a/job-core/src/main/java/com/lts/job/core/constant/EcTopic.java b/job-core/src/main/java/com/lts/job/core/constant/EcTopic.java
new file mode 100644
index 000000000..a950b6443
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/constant/EcTopic.java
@@ -0,0 +1,14 @@
+package com.lts.job.core.constant;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 5/11/15.
+ */
+public interface EcTopic {
+
+ // 工作线程变化
+ String WORK_THREAD_CHANGE = "WORK_THREAD_CHANGE";
+ // 节点启用
+ String NODE_ENABLE = "NODE_ENABLE";
+ // 节点禁用
+ String NODE_DISABLE = "NODE_DISABLE";
+}
diff --git a/job-core/src/main/java/com/lts/job/core/domain/Job.java b/job-core/src/main/java/com/lts/job/core/domain/Job.java
index ca7606bca..72bcca006 100644
--- a/job-core/src/main/java/com/lts/job/core/domain/Job.java
+++ b/job-core/src/main/java/com/lts/job/core/domain/Job.java
@@ -1,6 +1,7 @@
package com.lts.job.core.domain;
+import com.lts.job.core.exception.JobSubmitException;
import com.lts.job.core.util.JSONUtils;
import com.lts.job.remoting.annotation.NotNull;
@@ -18,9 +19,8 @@ public class Job {
/**
* 优先级 (数值越大 优先级越低)
*/
- protected Integer priority = 0;
+ protected Integer priority = 100;
// 提交的节点 (可以手动指定)
- @NotNull
protected String submitNodeGroup;
// 执行的节点
@NotNull
@@ -136,4 +136,13 @@ public void setTriggerTime(Long triggerTime) {
public String toString() {
return JSONUtils.toJSONString(this);
}
+
+ public void checkField() throws JobSubmitException {
+ if(taskId == null){
+ throw new JobSubmitException("taskId不能为空!");
+ }
+ if(taskTrackerNodeGroup == null){
+ throw new JobSubmitException("taskTrackerNodeGroup不能为空!");
+ }
+ }
}
diff --git a/job-core/src/main/java/com/lts/job/core/domain/JobNodeConfig.java b/job-core/src/main/java/com/lts/job/core/domain/JobNodeConfig.java
deleted file mode 100644
index 9b46944e7..000000000
--- a/job-core/src/main/java/com/lts/job/core/domain/JobNodeConfig.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package com.lts.job.core.domain;
-
-import com.lts.job.core.cluster.NodeType;
-import com.lts.job.core.util.JSONUtils;
-
-/**
- * @author Robert HG (254963746@qq.com) on 8/20/14.
- * 任务节点配置
- */
-public class JobNodeConfig {
-
- // 应用节点组
- private String nodeGroup;
- // 唯一标识
- private String identity;
- // 工作线程
- private int workThreads;
- // 节点类型
- private NodeType nodeType;
- // zookeeper 地址
- private String zookeeperAddress;
- // 远程连接超时时间
- private int invokeTimeoutMillis;
- // 监听端口
- private int listenPort;
- // 任务信息存储路径(譬如TaskTracker反馈任务信息给JobTracker, JobTracker down掉了, 那么存储下来等待JobTracker可用时再发送)
- private String jobInfoSavePath;
- // 集群名字
- private String clusterName;
-
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- public String getNodeGroup() {
- return nodeGroup;
- }
-
- public void setNodeGroup(String nodeGroup) {
- this.nodeGroup = nodeGroup;
- }
-
- public String getIdentity() {
- return identity;
- }
-
- public void setIdentity(String identity) {
- this.identity = identity;
- }
-
- public int getWorkThreads() {
- return workThreads;
- }
-
- public void setWorkThreads(int workThreads) {
- this.workThreads = workThreads;
- }
-
- public NodeType getNodeType() {
- return nodeType;
- }
-
- public void setNodeType(NodeType nodeType) {
- this.nodeType = nodeType;
- }
-
- public String getZookeeperAddress() {
- return zookeeperAddress;
- }
-
- public void setZookeeperAddress(String zookeeperAddress) {
- this.zookeeperAddress = zookeeperAddress;
- }
-
- public int getInvokeTimeoutMillis() {
- return invokeTimeoutMillis;
- }
-
- public void setInvokeTimeoutMillis(int invokeTimeoutMillis) {
- this.invokeTimeoutMillis = invokeTimeoutMillis;
- }
-
- public int getListenPort() {
- return listenPort;
- }
-
- public void setListenPort(int listenPort) {
- this.listenPort = listenPort;
- }
-
- public String getJobInfoSavePath() {
- return jobInfoSavePath;
- }
-
- public void setJobInfoSavePath(String jobInfoSavePath) {
- this.jobInfoSavePath = jobInfoSavePath + "/.lts";
- }
-
- public String getFilePath() {
- return jobInfoSavePath + "/" + nodeType + "/" + nodeGroup + "/";
- }
-
- @Override
- public String toString() {
- return JSONUtils.toJSONString(this);
- }
-}
diff --git a/job-core/src/main/java/com/lts/job/core/exception/JobSubmitException.java b/job-core/src/main/java/com/lts/job/core/exception/JobSubmitException.java
new file mode 100644
index 000000000..fc79f1c6e
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/exception/JobSubmitException.java
@@ -0,0 +1,27 @@
+package com.lts.job.core.exception;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 5/12/15.
+ */
+public class JobSubmitException extends RuntimeException {
+
+ public JobSubmitException() {
+ super();
+ }
+
+ public JobSubmitException(String message) {
+ super(message);
+ }
+
+ public JobSubmitException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public JobSubmitException(Throwable cause) {
+ super(cause);
+ }
+
+ protected JobSubmitException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/job-core/src/main/java/com/lts/job/core/exception/NodeRegistryException.java b/job-core/src/main/java/com/lts/job/core/exception/NodeRegistryException.java
new file mode 100644
index 000000000..3748c91f1
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/exception/NodeRegistryException.java
@@ -0,0 +1,27 @@
+package com.lts.job.core.exception;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 5/17/15.
+ */
+public class NodeRegistryException extends RuntimeException {
+
+ public NodeRegistryException() {
+ super();
+ }
+
+ public NodeRegistryException(String message) {
+ super(message);
+ }
+
+ public NodeRegistryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NodeRegistryException(Throwable cause) {
+ super(cause);
+ }
+
+ protected NodeRegistryException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/job-core/src/main/java/com/lts/job/core/extension/Activate.java b/job-core/src/main/java/com/lts/job/core/extension/Activate.java
new file mode 100644
index 000000000..0c94855df
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/extension/Activate.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 1999-2012 Alibaba Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.lts.job.core.extension;
+
+import java.lang.annotation.*;
+
+/**
+ * Activate
+ *
+ * 对于可以被框架中自动激活加载扩展,此Annotation用于配置扩展被自动激活加载条件。
+ * 比如,过滤扩展,有多个实现,使用Activate Annotation的扩展可以根据条件被自动加载。
+ *
+ * {@link Activate#group()}生效的Group。具体的有哪些Group值由框架SPI给出。
+ * {@link Activate#value()}在{@link com.lts.job.core.cluster.Config}中Key集合中有,则生效。
+ *
+ *
+ *
+ * 底层框架SPI提供者通过{@link ExtensionLoader}的{@link ExtensionLoader#getAdaptiveExtension()}方法
+ * 获得条件的扩展。
+ *
+ * @author william.liangf
+ * @author ding.lid
+ * @export
+ * @see SPI
+ * @see ExtensionLoader
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+public @interface Activate {
+ /**
+ * Group过滤条件。
+ *
+ * 包含{@link ExtensionLoader#getAdaptiveExtension()}的group参数给的值,则返回扩展。
+ *
+ * 如没有Group设置,则不过滤。
+ */
+ String[] group() default {};
+
+ /**
+ * Key过滤条件。包含{@link ExtensionLoader#getAdaptiveExtension()}的URL的参数Key中有,则返回扩展。
+ *
+ * 示例:
+ * 注解的值 @Activate("cache,validatioin")
,
+ * 则{@link ExtensionLoader#getAdaptiveExtension()}的URL的参数有cache
Key,或是validatioin
则返回扩展。
+ *
+ * 如没有设置,则不过滤。
+ */
+ String[] value() default {};
+
+ /**
+ * 排序信息,可以不提供。
+ */
+ String[] before() default {};
+
+ /**
+ * 排序信息,可以不提供。
+ */
+ String[] after() default {};
+
+ /**
+ * 排序信息,可以不提供。
+ */
+ int order() default 0;
+}
\ No newline at end of file
diff --git a/job-core/src/main/java/com/lts/job/core/extension/Adaptive.java b/job-core/src/main/java/com/lts/job/core/extension/Adaptive.java
new file mode 100644
index 000000000..c2478b989
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/extension/Adaptive.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 1999-2012 Alibaba Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.lts.job.core.extension;
+
+import com.lts.job.core.cluster.Config;
+
+import java.lang.annotation.*;
+
+/**
+ * 在{@link ExtensionLoader}生成Extension的Adaptive Instance时,为{@link ExtensionLoader}提供信息。
+ * @export
+ *
+ * @see ExtensionLoader
+ * @see com.lts.job.core.cluster.Config
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+public @interface Adaptive {
+
+ /**
+ * 从{@link Config}的Key名,对应的Value作为要Adapt成的Extension名。
+ *
+ * 如果{@link Config}这些Key都没有Value,使用 用 缺省的扩展(在接口的{@link SPI}中设定的值)。
+ * 比如,String[] {"key1", "key2"}
,表示
+ *
+ * 先在URL上找key1的Value作为要Adapt成的Extension名;
+ * key1没有Value,则使用key2的Value作为要Adapt成的Extension名。
+ * key2没有Value,使用缺省的扩展。
+ * 如果没有设定缺省扩展,则方法调用会抛出{@link IllegalStateException}。
+ *
+ *
+ * 如果不设置则缺省使用Extension接口类名的点分隔小写字串。
+ * 即对于Extension接口{@code com.lts.job.core.XxxYyyService}的缺省值为String[] {"xxx.yyy.service"}
+ *
+ * @see SPI#value()
+ */
+ String[] value() default {};
+
+}
\ No newline at end of file
diff --git a/job-core/src/main/java/com/lts/job/core/extension/ExtensionFactory.java b/job-core/src/main/java/com/lts/job/core/extension/ExtensionFactory.java
new file mode 100644
index 000000000..0a4529632
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/extension/ExtensionFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 1999-2012 Alibaba Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.lts.job.core.extension;
+
+/**
+ * Created by hugui on 5/18/15.
+ */
+@SPI
+public interface ExtensionFactory {
+
+ /**
+ * Get extension.
+ *
+ * @param type object type.
+ * @param name object name.
+ * @return object instance.
+ */
+ T getExtension(Class type, String name);
+
+}
+
diff --git a/job-core/src/main/java/com/lts/job/core/extension/ExtensionLoader.java b/job-core/src/main/java/com/lts/job/core/extension/ExtensionLoader.java
new file mode 100644
index 000000000..721bb7466
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/extension/ExtensionLoader.java
@@ -0,0 +1,735 @@
+/*
+ * Copyright 1999-2012 Alibaba Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.lts.job.core.extension;
+
+import com.lts.job.core.cluster.Config;
+import com.lts.job.core.util.ConcurrentHashSet;
+import com.lts.job.core.util.Holder;
+import com.lts.job.core.util.StringUtils;
+import com.lts.job.core.compiler.Compiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Pattern;
+
+/**
+ * Dubbo使用的扩展点获取。
+ *
+ * 自动注入关联扩展点。
+ * 自动Wrap上扩展点的Wrap类。
+ * 缺省获得的的扩展点是一个Adaptive Instance。
+ *
+ *
+ * @author william.liangf
+ * @author ding.lid
+ * @author Robert HG (254963746@qq.com)
+ * @see JDK5.0的自动发现机制实现
+ * @see SPI
+ * @see Adaptive
+ * @see Activate
+ */
+public class ExtensionLoader {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExtensionLoader.class);
+
+ private static final String DUBBO_DIRECTORY = "META-INF/lts/";
+
+ private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
+
+ private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
+
+ private static final ConcurrentMap, ExtensionLoader>> EXTENSION_LOADERS = new ConcurrentHashMap, ExtensionLoader>>();
+
+ private static final ConcurrentMap, Object> EXTENSION_INSTANCES = new ConcurrentHashMap, Object>();
+
+ private final Class> type;
+
+ private final ExtensionFactory objectFactory;
+
+ private final ConcurrentMap, String> cachedNames = new ConcurrentHashMap, String>();
+
+ private final Holder>> cachedClasses = new Holder>>();
+
+ private final Map cachedActivates = new ConcurrentHashMap();
+
+ private volatile Class> cachedAdaptiveClass = null;
+
+ private final ConcurrentMap> cachedInstances = new ConcurrentHashMap>();
+
+ // 配置在@SPI
+ private String cachedDefaultName;
+
+ private final Holder cachedAdaptiveInstance = new Holder();
+ private volatile Throwable createAdaptiveInstanceError;
+
+ private Set> cachedWrapperClasses;
+
+ private Map exceptions = new ConcurrentHashMap();
+
+ private static boolean withExtensionAnnotation(Class type) {
+ return type.isAnnotationPresent(SPI.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ExtensionLoader getExtensionLoader(Class type) {
+ if (type == null)
+ throw new IllegalArgumentException("Extension type == null");
+ if (!type.isInterface()) {
+ throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
+ }
+ if (!withExtensionAnnotation(type)) {
+ throw new IllegalArgumentException("Extension type(" + type +
+ ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
+ }
+
+ ExtensionLoader loader = (ExtensionLoader) EXTENSION_LOADERS.get(type);
+ if (loader == null) {
+ EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type));
+ loader = (ExtensionLoader) EXTENSION_LOADERS.get(type);
+ }
+ return loader;
+ }
+
+ private ExtensionLoader(Class> type) {
+ this.type = type;
+ objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
+ }
+
+ public String getExtensionName(T extensionInstance) {
+ return getExtensionName(extensionInstance.getClass());
+ }
+
+ public String getExtensionName(Class> extensionClass) {
+ return cachedNames.get(extensionClass);
+ }
+
+ private boolean isMatchGroup(String group, String[] groups) {
+ if (group == null || group.length() == 0) {
+ return true;
+ }
+ if (groups != null && groups.length > 0) {
+ for (String g : groups) {
+ if (group.equals(g)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * 返回扩展点实例,如果没有指定的扩展点或是还没加载(即实例化)则返回null
。注意:此方法不会触发扩展点的加载。
+ *
+ * 一般应该调用{@link #getExtension(String)}方法获得扩展,这个方法会触发扩展点加载。
+ *
+ * @see #getExtension(String)
+ */
+ @SuppressWarnings("unchecked")
+ public T getLoadedExtension(String name) {
+ if (name == null || name.length() == 0)
+ throw new IllegalArgumentException("Extension name == null");
+ Holder holder = cachedInstances.get(name);
+ if (holder == null) {
+ cachedInstances.putIfAbsent(name, new Holder());
+ holder = cachedInstances.get(name);
+ }
+ return (T) holder.get();
+ }
+
+ /**
+ * 返回已经加载的扩展点的名字。
+ *
+ * 一般应该调用{@link #getSupportedExtensions()}方法获得扩展,这个方法会返回所有的扩展点。
+ *
+ * @see #getSupportedExtensions()
+ */
+ public Set getLoadedExtensions() {
+ return Collections.unmodifiableSet(new TreeSet(cachedInstances.keySet()));
+ }
+
+ /**
+ * 返回指定名字的扩展。如果指定名字的扩展不存在,则抛异常 {@link IllegalStateException}.
+ *
+ * @param name
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ public T getExtension(String name) {
+ if (name == null || name.length() == 0)
+ throw new IllegalArgumentException("Extension name == null");
+ if ("true".equals(name)) {
+ return getDefaultExtension();
+ }
+ Holder holder = cachedInstances.get(name);
+ if (holder == null) {
+ cachedInstances.putIfAbsent(name, new Holder());
+ holder = cachedInstances.get(name);
+ }
+ Object instance = holder.get();
+ if (instance == null) {
+ synchronized (holder) {
+ instance = holder.get();
+ if (instance == null) {
+ instance = createExtension(name);
+ holder.set(instance);
+ }
+ }
+ }
+ return (T) instance;
+ }
+
+ /**
+ * 返回缺省的扩展,如果没有设置则返回null
。
+ */
+ public T getDefaultExtension() {
+ getExtensionClasses();
+ if (null == cachedDefaultName || cachedDefaultName.length() == 0
+ || "true".equals(cachedDefaultName)) {
+ return null;
+ }
+ return getExtension(cachedDefaultName);
+ }
+
+ public boolean hasExtension(String name) {
+ if (name == null || name.length() == 0)
+ throw new IllegalArgumentException("Extension name == null");
+ try {
+ return getExtensionClass(name) != null;
+ } catch (Throwable t) {
+ return false;
+ }
+ }
+
+ public Set getSupportedExtensions() {
+ Map> clazzes = getExtensionClasses();
+ return Collections.unmodifiableSet(new TreeSet(clazzes.keySet()));
+ }
+
+ /**
+ * 返回缺省的扩展点名,如果没有设置缺省则返回null
。
+ */
+ public String getDefaultExtensionName() {
+ getExtensionClasses();
+ return cachedDefaultName;
+ }
+
+ /**
+ * 编程方式添加新扩展点。
+ *
+ * @param name 扩展点名
+ * @param clazz 扩展点类
+ * @throws IllegalStateException 要添加扩展点名已经存在。
+ */
+ public void addExtension(String name, Class> clazz) {
+ getExtensionClasses(); // load classes
+
+ if (!type.isAssignableFrom(clazz)) {
+ throw new IllegalStateException("Input type " +
+ clazz + "not implement Extension " + type);
+ }
+ if (clazz.isInterface()) {
+ throw new IllegalStateException("Input type " +
+ clazz + "can not be interface!");
+ }
+
+ if (!clazz.isAnnotationPresent(Adaptive.class)) {
+ if (StringUtils.isEmpty(name)) {
+ throw new IllegalStateException("Extension name is blank (Extension " + type + ")!");
+ }
+ if (cachedClasses.get().containsKey(name)) {
+ throw new IllegalStateException("Extension name " +
+ name + " already existed(Extension " + type + ")!");
+ }
+
+ cachedNames.put(clazz, name);
+ cachedClasses.get().put(name, clazz);
+ } else {
+ if (cachedAdaptiveClass != null) {
+ throw new IllegalStateException("Adaptive Extension already existed(Extension " + type + ")!");
+ }
+
+ cachedAdaptiveClass = clazz;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public T getAdaptiveExtension() {
+ Object instance = cachedAdaptiveInstance.get();
+ if (instance == null) {
+ if (createAdaptiveInstanceError == null) {
+ synchronized (cachedAdaptiveInstance) {
+ instance = cachedAdaptiveInstance.get();
+ if (instance == null) {
+ try {
+ instance = createAdaptiveExtension();
+ cachedAdaptiveInstance.set(instance);
+ } catch (Throwable t) {
+ createAdaptiveInstanceError = t;
+ throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
+ }
+ }
+ }
+ } else {
+ throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
+ }
+ }
+
+ return (T) instance;
+ }
+
+ private IllegalStateException findException(String name) {
+ for (Map.Entry entry : exceptions.entrySet()) {
+ if (entry.getKey().toLowerCase().contains(name.toLowerCase())) {
+ return entry.getValue();
+ }
+ }
+ StringBuilder buf = new StringBuilder("No such extension " + type.getName() + " by name " + name);
+
+
+ int i = 1;
+ for (Map.Entry entry : exceptions.entrySet()) {
+ if (i == 1) {
+ buf.append(", possible causes: ");
+ }
+
+ buf.append("\r\n(");
+ buf.append(i++);
+ buf.append(") ");
+ buf.append(entry.getKey());
+ buf.append(":\r\n");
+ buf.append(StringUtils.toString(entry.getValue()));
+ }
+ return new IllegalStateException(buf.toString());
+ }
+
+ @SuppressWarnings("unchecked")
+ private T createExtension(String name) {
+ Class> clazz = getExtensionClasses().get(name);
+ if (clazz == null) {
+ throw findException(name);
+ }
+ try {
+ T instance = (T) EXTENSION_INSTANCES.get(clazz);
+ if (instance == null) {
+ EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
+ instance = (T) EXTENSION_INSTANCES.get(clazz);
+ }
+ injectExtension(instance);
+ Set> wrapperClasses = cachedWrapperClasses;
+ if (wrapperClasses != null && wrapperClasses.size() > 0) {
+ for (Class> wrapperClass : wrapperClasses) {
+ instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
+ }
+ }
+ return instance;
+ } catch (Throwable t) {
+ throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
+ type + ") could not be instantiated: " + t.getMessage(), t);
+ }
+ }
+
+ private T injectExtension(T instance) {
+ try {
+ if (objectFactory != null) {
+ for (Method method : instance.getClass().getMethods()) {
+ if (method.getName().startsWith("set")
+ && method.getParameterTypes().length == 1
+ && Modifier.isPublic(method.getModifiers())) {
+ Class> pt = method.getParameterTypes()[0];
+ try {
+ String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
+ Object object = objectFactory.getExtension(pt, property);
+ if (object != null) {
+ method.invoke(instance, object);
+ }
+ } catch (Exception e) {
+ LOGGER.error("fail to inject via method " + method.getName()
+ + " of interface " + type.getName() + ": " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ return instance;
+ }
+
+ private Class> getExtensionClass(String name) {
+ if (type == null)
+ throw new IllegalArgumentException("Extension type == null");
+ if (name == null)
+ throw new IllegalArgumentException("Extension name == null");
+ Class> clazz = getExtensionClasses().get(name);
+ if (clazz == null)
+ throw new IllegalStateException("No such extension \"" + name + "\" for " + type.getName() + "!");
+ return clazz;
+ }
+
+ private Map> getExtensionClasses() {
+ Map> classes = cachedClasses.get();
+ if (classes == null) {
+ synchronized (cachedClasses) {
+ classes = cachedClasses.get();
+ if (classes == null) {
+ classes = loadExtensionClasses();
+ cachedClasses.set(classes);
+ }
+ }
+ }
+ return classes;
+ }
+
+ // 此方法已经getExtensionClasses方法同步过。
+ private Map> loadExtensionClasses() {
+ final SPI defaultAnnotation = type.getAnnotation(SPI.class);
+ if (defaultAnnotation != null) {
+ String value = defaultAnnotation.value();
+ if (value != null && (value = value.trim()).length() > 0) {
+ String[] names = NAME_SEPARATOR.split(value);
+ if (names.length > 1) {
+ throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ + ": " + Arrays.toString(names));
+ }
+ if (names.length == 1) cachedDefaultName = names[0];
+ }
+ }
+
+ Map> extensionClasses = new HashMap>();
+ loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
+ loadFile(extensionClasses, DUBBO_DIRECTORY);
+ return extensionClasses;
+ }
+
+ private void loadFile(Map> extensionClasses, String dir) {
+ String fileName = dir + type.getName();
+ try {
+ Enumeration configs;
+ ClassLoader classLoader = findClassLoader();
+ if (classLoader != null) {
+ configs = classLoader.getResources(fileName);
+ } else {
+ configs = ClassLoader.getSystemResources(fileName);
+ }
+ if (configs != null) {
+ while (configs.hasMoreElements()) {
+ java.net.URL config = configs.nextElement();
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(config.openStream(), "utf-8"));
+ try {
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ final int ci = line.indexOf('#');
+ if (ci >= 0) line = line.substring(0, ci);
+ line = line.trim();
+ if (line.length() > 0) {
+ try {
+ String name = null;
+ int i = line.indexOf('=');
+ if (i > 0) {
+ name = line.substring(0, i).trim();
+ line = line.substring(i + 1).trim();
+ }
+ if (line.length() > 0) {
+ Class> clazz = Class.forName(line, true, classLoader);
+ if (!type.isAssignableFrom(clazz)) {
+ throw new IllegalStateException("Error when load extension class(interface: " +
+ type + ", class line: " + clazz.getName() + "), class "
+ + clazz.getName() + "is not subtype of interface.");
+ }
+ if (clazz.isAnnotationPresent(Adaptive.class)) {
+ if (cachedAdaptiveClass == null) {
+ cachedAdaptiveClass = clazz;
+ } else if (!cachedAdaptiveClass.equals(clazz)) {
+ throw new IllegalStateException("More than 1 adaptive class found: "
+ + cachedAdaptiveClass.getClass().getName()
+ + ", " + clazz.getClass().getName());
+ }
+ } else {
+ try {
+ clazz.getConstructor(type);
+ Set> wrappers = cachedWrapperClasses;
+ if (wrappers == null) {
+ cachedWrapperClasses = new ConcurrentHashSet>();
+ wrappers = cachedWrapperClasses;
+ }
+ wrappers.add(clazz);
+ } catch (NoSuchMethodException e) {
+ clazz.getConstructor();
+ if (name == null || name.length() == 0) {
+ if (clazz.getSimpleName().length() > type.getSimpleName().length()
+ && clazz.getSimpleName().endsWith(type.getSimpleName())) {
+ name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
+ } else {
+ throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + config);
+ }
+ }
+ String[] names = NAME_SEPARATOR.split(name);
+ if (names != null && names.length > 0) {
+ Activate activate = clazz.getAnnotation(Activate.class);
+ if (activate != null) {
+ cachedActivates.put(names[0], activate);
+ }
+ for (String n : names) {
+ if (!cachedNames.containsKey(clazz)) {
+ cachedNames.put(clazz, n);
+ }
+ Class> c = extensionClasses.get(n);
+ if (c == null) {
+ extensionClasses.put(n, clazz);
+ } else if (c != clazz) {
+ throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (Throwable t) {
+ IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + config + ", cause: " + t.getMessage(), t);
+ exceptions.put(line, e);
+ }
+ }
+ } // end of while read lines
+ } finally {
+ reader.close();
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Exception when load extension class(interface: " +
+ type + ", class file: " + config + ") in " + config, t);
+ }
+ } // end of while configs
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Exception when load extension class(interface: " +
+ type + ", description file: " + fileName + ").", t);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private T createAdaptiveExtension() {
+ try {
+ return injectExtension((T) getAdaptiveExtensionClass().newInstance());
+ } catch (Exception e) {
+ throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
+ }
+ }
+
+ private Class> getAdaptiveExtensionClass() {
+ getExtensionClasses();
+ if (cachedAdaptiveClass != null) {
+ return cachedAdaptiveClass;
+ }
+ return cachedAdaptiveClass = createAdaptiveExtensionClass();
+ }
+
+ private Class> createAdaptiveExtensionClass() {
+ String code = createAdaptiveExtensionClassCode();
+ ClassLoader classLoader = findClassLoader();
+ Compiler compiler = ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
+ return compiler.compile(code, classLoader);
+ }
+
+ private String createAdaptiveExtensionClassCode() {
+ StringBuilder codeBuidler = new StringBuilder();
+ Method[] methods = type.getMethods();
+ boolean hasAdaptiveAnnotation = false;
+ for (Method m : methods) {
+ if (m.isAnnotationPresent(Adaptive.class)) {
+ hasAdaptiveAnnotation = true;
+ break;
+ }
+ }
+ // 完全没有Adaptive方法,则不需要生成Adaptive类
+ if (!hasAdaptiveAnnotation)
+ throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");
+
+ codeBuidler.append("package " + type.getPackage().getName() + ";");
+ codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";");
+ codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adpative" + " implements " + type.getCanonicalName() + " {");
+
+ for (Method method : methods) {
+ Class> rt = method.getReturnType();
+ Class>[] pts = method.getParameterTypes();
+ Class>[] ets = method.getExceptionTypes();
+
+ Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
+ StringBuilder code = new StringBuilder(512);
+ if (adaptiveAnnotation == null) {
+ code.append("throw new UnsupportedOperationException(\"method ")
+ .append(method.toString()).append(" of interface ")
+ .append(type.getName()).append(" is not adaptive method!\");");
+ } else {
+ int configTypeIndex = -1;
+ for (int i = 0; i < pts.length; ++i) {
+ if (pts[i].equals(Config.class)) {
+ configTypeIndex = i;
+ break;
+ }
+ }
+ // 有类型为URL的参数
+ if (configTypeIndex != -1) {
+ // Null Point check
+ String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"config == null\");",
+ configTypeIndex);
+ code.append(s);
+
+ s = String.format("\n%s config = arg%d;", Config.class.getName(), configTypeIndex);
+ code.append(s);
+ }
+ // 参数没有URL类型
+ else {
+ String attribMethod = null;
+
+ // 找到参数的URL属性
+ LBL_PTS:
+ for (int i = 0; i < pts.length; ++i) {
+ Method[] ms = pts[i].getMethods();
+ for (Method m : ms) {
+ String name = m.getName();
+ if ((name.startsWith("get") || name.length() > 3)
+ && Modifier.isPublic(m.getModifiers())
+ && !Modifier.isStatic(m.getModifiers())
+ && m.getParameterTypes().length == 0
+ && m.getReturnType() == Config.class) {
+ configTypeIndex = i;
+ attribMethod = name;
+ break LBL_PTS;
+ }
+ }
+ }
+ if (attribMethod == null) {
+ throw new IllegalStateException("fail to create adative class for interface " + type.getName()
+ + ": not found config parameter or config attribute in parameters of method " + method.getName());
+ }
+
+ // Null point check
+ String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");",
+ configTypeIndex, pts[configTypeIndex].getName());
+ code.append(s);
+ s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");",
+ configTypeIndex, attribMethod, pts[configTypeIndex].getName(), attribMethod);
+ code.append(s);
+
+ s = String.format("%s config = arg%d.%s();", Config.class.getName(), configTypeIndex, attribMethod);
+ code.append(s);
+ }
+
+ String[] value = adaptiveAnnotation.value();
+ // 没有设置Key,则使用“扩展点接口名的点分隔 作为Key
+ if (value.length == 0) {
+ char[] charArray = type.getSimpleName().toCharArray();
+ StringBuilder sb = new StringBuilder(128);
+ for (int i = 0; i < charArray.length; i++) {
+ if (Character.isUpperCase(charArray[i])) {
+ if (i != 0) {
+ sb.append(".");
+ }
+ sb.append(Character.toLowerCase(charArray[i]));
+ } else {
+ sb.append(charArray[i]);
+ }
+ }
+ value = new String[]{sb.toString()};
+ }
+
+ String defaultExtName = cachedDefaultName;
+ String getNameCode = null;
+ for (int i = value.length - 1; i >= 0; --i) {
+ if (i == value.length - 1) {
+ if (null != defaultExtName) {
+ getNameCode = String.format("config.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
+ } else {
+ getNameCode = String.format("config.getParameter(\"%s\")", value[i]);
+ }
+ } else {
+ getNameCode = String.format("config.getParameter(\"%s\", %s)", value[i], getNameCode);
+ }
+ }
+ code.append("\nString extName = ").append(getNameCode).append(";");
+ // check extName == null?
+ String s = String.format("\nif(extName == null) " +
+ "throw new IllegalStateException(\"Fail to get extension(%s) name from config(\" + config.toString() + \") use keys(%s)\");",
+ type.getName(), Arrays.toString(value));
+ code.append(s);
+
+ s = String.format("\n%s extension = (% 0) {
+ codeBuidler.append(", ");
+ }
+ codeBuidler.append(pts[i].getCanonicalName());
+ codeBuidler.append(" ");
+ codeBuidler.append("arg" + i);
+ }
+ codeBuidler.append(")");
+ if (ets.length > 0) {
+ codeBuidler.append(" throws ");
+ for (int i = 0; i < ets.length; i++) {
+ if (i > 0) {
+ codeBuidler.append(", ");
+ }
+ codeBuidler.append(pts[i].getCanonicalName());
+ }
+ }
+ codeBuidler.append(" {");
+ codeBuidler.append(code.toString());
+ codeBuidler.append("\n}");
+ }
+ codeBuidler.append("\n}");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(codeBuidler.toString());
+ }
+ return codeBuidler.toString();
+ }
+
+ private static ClassLoader findClassLoader() {
+ return ExtensionLoader.class.getClassLoader();
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName() + "[" + type.getName() + "]";
+ }
+
+}
+
diff --git a/job-core/src/main/java/com/lts/job/core/extension/SPI.java b/job-core/src/main/java/com/lts/job/core/extension/SPI.java
new file mode 100644
index 000000000..768ebd7e1
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/extension/SPI.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 1999-2012 Alibaba Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.lts.job.core.extension;
+
+import java.lang.annotation.*;
+
+/**
+ * @author william.liangf
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface SPI {
+
+ /**
+ * 缺省扩展点名。
+ */
+ String value() default "";
+
+}
\ No newline at end of file
diff --git a/job-core/src/main/java/com/lts/job/core/extension/factory/AdaptiveExtensionFactory.java b/job-core/src/main/java/com/lts/job/core/extension/factory/AdaptiveExtensionFactory.java
new file mode 100644
index 000000000..3257137a7
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/extension/factory/AdaptiveExtensionFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 1999-2012 Alibaba Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.lts.job.core.extension.factory;
+
+import com.lts.job.core.extension.Adaptive;
+import com.lts.job.core.extension.ExtensionFactory;
+import com.lts.job.core.extension.ExtensionLoader;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * AdaptiveExtensionFactory
+ *
+ * @author william.liangf
+ */
+@Adaptive
+public class AdaptiveExtensionFactory implements ExtensionFactory {
+
+ private final List factories;
+
+ public AdaptiveExtensionFactory() {
+ ExtensionLoader loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
+ List list = new ArrayList();
+ for (String name : loader.getSupportedExtensions()) {
+ list.add(loader.getExtension(name));
+ }
+ factories = Collections.unmodifiableList(list);
+ }
+
+ public T getExtension(Class type, String name) {
+ for (ExtensionFactory factory : factories) {
+ T extension = factory.getExtension(type, name);
+ if (extension != null) {
+ return extension;
+ }
+ }
+ return null;
+ }
+
+}
\ No newline at end of file
diff --git a/job-core/src/main/java/com/lts/job/core/extension/factory/SpiExtensionFactory.java b/job-core/src/main/java/com/lts/job/core/extension/factory/SpiExtensionFactory.java
new file mode 100644
index 000000000..3ca4dee2d
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/extension/factory/SpiExtensionFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 1999-2012 Alibaba Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.lts.job.core.extension.factory;
+
+import com.lts.job.core.extension.ExtensionFactory;
+import com.lts.job.core.extension.ExtensionLoader;
+import com.lts.job.core.extension.SPI;
+
+/**
+ * SpiExtensionFactory
+ *
+ * @author william.liangf
+ */
+public class SpiExtensionFactory implements ExtensionFactory {
+
+ public T getExtension(Class type, String name) {
+ if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
+ ExtensionLoader loader = ExtensionLoader.getExtensionLoader(type);
+ if (loader.getSupportedExtensions().size() > 0) {
+ return loader.getAdaptiveExtension();
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java b/job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java
index b28c4d339..edc9014fb 100644
--- a/job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java
+++ b/job-core/src/main/java/com/lts/job/core/factory/JobNodeConfigFactory.java
@@ -1,7 +1,7 @@
package com.lts.job.core.factory;
import com.lts.job.core.constant.Constants;
-import com.lts.job.core.domain.JobNodeConfig;
+import com.lts.job.core.cluster.Config;
import com.lts.job.core.util.StringUtils;
/**
@@ -9,12 +9,12 @@
*/
public class JobNodeConfigFactory {
- public static JobNodeConfig getDefaultConfig() {
- JobNodeConfig config = new JobNodeConfig();
+ public static Config getDefaultConfig() {
+ Config config = new Config();
config.setIdentity(StringUtils.generateUUID());
config.setWorkThreads(Constants.AVAILABLE_PROCESSOR);
config.setNodeGroup("lts");
- config.setZookeeperAddress("localhost:2181");
+ config.setRegistryAddress("zookeeper://127.0.0.1:2181");
config.setInvokeTimeoutMillis(1000 * 6);
config.setListenPort(0);
config.setJobInfoSavePath(Constants.USER_HOME);
diff --git a/job-core/src/main/java/com/lts/job/registry/util/NamedThreadFactory.java b/job-core/src/main/java/com/lts/job/core/factory/NamedThreadFactory.java
similarity index 94%
rename from job-core/src/main/java/com/lts/job/registry/util/NamedThreadFactory.java
rename to job-core/src/main/java/com/lts/job/core/factory/NamedThreadFactory.java
index 5e5a065fa..79414222e 100755
--- a/job-core/src/main/java/com/lts/job/registry/util/NamedThreadFactory.java
+++ b/job-core/src/main/java/com/lts/job/core/factory/NamedThreadFactory.java
@@ -1,9 +1,10 @@
-package com.lts.job.registry.util;
+package com.lts.job.core.factory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
+ * 带有名称的 线程 工厂类
* @author Robert HG (254963746@qq.com) on 5/5/14.
*/
public class NamedThreadFactory implements ThreadFactory {
diff --git a/job-core/src/main/java/com/lts/job/core/factory/NodeFactory.java b/job-core/src/main/java/com/lts/job/core/factory/NodeFactory.java
index 2d99c5fd3..901a04ebb 100644
--- a/job-core/src/main/java/com/lts/job/core/factory/NodeFactory.java
+++ b/job-core/src/main/java/com/lts/job/core/factory/NodeFactory.java
@@ -1,8 +1,7 @@
package com.lts.job.core.factory;
import com.lts.job.core.cluster.Node;
-import com.lts.job.core.domain.JobNodeConfig;
-import com.lts.job.core.registry.PathParser;
+import com.lts.job.core.cluster.Config;
import com.lts.job.core.util.NetUtils;
/**
@@ -11,7 +10,7 @@
*/
public class NodeFactory {
- public static T create(PathParser pathParser, Class clazz, JobNodeConfig config) {
+ public static T create(Class clazz, Config config) {
try {
T node = clazz.newInstance();
node.setIp(NetUtils.getLocalHost());
@@ -19,7 +18,6 @@ public static T create(PathParser pathParser, Class clazz, J
node.setThreads(config.getWorkThreads());
node.setPort(config.getListenPort());
node.setIdentity(config.getIdentity());
- node.setPath(pathParser.getPath(node));
return node;
} catch (InstantiationException e) {
throw new RuntimeException(e);
diff --git a/job-core/src/main/java/com/lts/job/core/listener/MasterNodeChangeListener.java b/job-core/src/main/java/com/lts/job/core/listener/MasterChangeListener.java
similarity index 89%
rename from job-core/src/main/java/com/lts/job/core/listener/MasterNodeChangeListener.java
rename to job-core/src/main/java/com/lts/job/core/listener/MasterChangeListener.java
index c7b9e8cdf..d5f84cdef 100644
--- a/job-core/src/main/java/com/lts/job/core/listener/MasterNodeChangeListener.java
+++ b/job-core/src/main/java/com/lts/job/core/listener/MasterChangeListener.java
@@ -6,7 +6,7 @@
* @author Robert HG (254963746@qq.com) on 8/23/14.
* Master 节点变化 监听器
*/
-public interface MasterNodeChangeListener {
+public interface MasterChangeListener {
/**
* master 为 master节点
diff --git a/job-core/src/main/java/com/lts/job/core/listener/MasterNodeElectionListener.java b/job-core/src/main/java/com/lts/job/core/listener/MasterElectionListener.java
similarity index 52%
rename from job-core/src/main/java/com/lts/job/core/listener/MasterNodeElectionListener.java
rename to job-core/src/main/java/com/lts/job/core/listener/MasterElectionListener.java
index 72df356fa..d1f2de3ff 100644
--- a/job-core/src/main/java/com/lts/job/core/listener/MasterNodeElectionListener.java
+++ b/job-core/src/main/java/com/lts/job/core/listener/MasterElectionListener.java
@@ -2,38 +2,43 @@
import com.lts.job.core.Application;
import com.lts.job.core.cluster.Node;
+import com.lts.job.core.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* @author Robert HG (254963746@qq.com) on 8/23/14.
- * 用来监听 自己类型 节点的变化,用来选举master
+ * 用来监听 自己类型 节点的变化,用来选举master
*/
-public class MasterNodeElectionListener implements NodeChangeListener {
+public class MasterElectionListener implements NodeChangeListener {
private Application application;
- public MasterNodeElectionListener(Application application) {
+ public MasterElectionListener(Application application) {
this.application = application;
}
- @Override
- public void addNode(Node node) {
- if (isSameGroup(node)) {
- application.getMasterElector().addNode(node);
+ public void removeNodes(List nodes) {
+ if (CollectionUtils.isEmpty(nodes)) {
+ return;
}
- }
-
- @Override
- public void removeNode(Node node) {
- if (isSameGroup(node)) {
- application.getMasterElector().removeNode(node);
+ // 只需要和当前节点相同的节点类型和组
+ List groupNodes = new ArrayList();
+ for (Node node : nodes) {
+ if (isSameGroup(node)) {
+ groupNodes.add(node);
+ }
+ }
+ if (groupNodes.size() > 0) {
+ application.getMasterElector().removeNode(groupNodes);
}
}
- @Override
public void addNodes(List nodes) {
+ if (CollectionUtils.isEmpty(nodes)) {
+ return;
+ }
// 只需要和当前节点相同的节点类型和组
List groupNodes = new ArrayList();
for (Node node : nodes) {
@@ -41,18 +46,20 @@ public void addNodes(List nodes) {
groupNodes.add(node);
}
}
- if(groupNodes.size() > 0){
+ if (groupNodes.size() > 0) {
application.getMasterElector().addNodes(groupNodes);
}
}
/**
* 是否和当前节点是相同的GROUP
+ *
* @param node
* @return
*/
- private boolean isSameGroup(Node node){
+ private boolean isSameGroup(Node node) {
return node.getNodeType().equals(application.getConfig().getNodeType())
&& node.getGroup().equals(application.getConfig().getNodeGroup());
}
+
}
diff --git a/job-core/src/main/java/com/lts/job/core/listener/NodeChangeListener.java b/job-core/src/main/java/com/lts/job/core/listener/NodeChangeListener.java
index 0d73a0ef7..08f958c66 100644
--- a/job-core/src/main/java/com/lts/job/core/listener/NodeChangeListener.java
+++ b/job-core/src/main/java/com/lts/job/core/listener/NodeChangeListener.java
@@ -5,28 +5,21 @@
import java.util.List;
/**
- * @author Robert HG (254963746@qq.com) on 8/17/14.
- * 节点变化监听器
+ * Created by hugui on 5/18/15.
*/
public interface NodeChangeListener {
/**
* 添加节点
*
- * @param node
- */
- public void addNode(Node node);
-
- /**
- * 删除节点
- *
- * @param node
+ * @param nodes
*/
- public void removeNode(Node node);
+ public void addNodes(List nodes);
/**
- * 批量添加节点
+ * 移除节点
* @param nodes
*/
- public void addNodes(List nodes);
+ public void removeNodes(List nodes);
+
}
diff --git a/job-core/src/main/java/com/lts/job/core/listener/SelfChangeListener.java b/job-core/src/main/java/com/lts/job/core/listener/SelfChangeListener.java
new file mode 100644
index 000000000..fc261334d
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/listener/SelfChangeListener.java
@@ -0,0 +1,63 @@
+package com.lts.job.core.listener;
+
+import com.lts.job.core.Application;
+import com.lts.job.core.cluster.Node;
+import com.lts.job.core.cluster.NodeType;
+import com.lts.job.core.constant.EcTopic;
+import com.lts.job.core.cluster.Config;
+import com.lts.job.core.util.CollectionUtils;
+import com.lts.job.ec.EventCenter;
+import com.lts.job.ec.EventInfo;
+
+import java.util.List;
+
+/**
+ * 用来监听自己的节点信息变化
+ *
+ * @author Robert HG (254963746@qq.com) on 5/11/15.
+ */
+public class SelfChangeListener implements NodeChangeListener {
+
+ private Config config;
+ private EventCenter eventCenter;
+
+ public SelfChangeListener(Application application) {
+ this.config = application.getConfig();
+ this.eventCenter = application.getEventCenter();
+ }
+
+
+ private void change(Node node) {
+ if (node.getIdentity().equals(config.getIdentity())) {
+ // 是当前节点, 看看节点配置是否发生变化
+ // 1. 看 threads 有没有改变 , 目前只有 TASK_TRACKER 对 threads起作用
+ if (node.getNodeType().equals(NodeType.TASK_TRACKER)
+ && (node.getThreads() != config.getWorkThreads())) {
+ config.setWorkThreads(node.getThreads());
+ eventCenter.publishAsync(new EventInfo(EcTopic.WORK_THREAD_CHANGE));
+ }
+
+ // 2. 看 available 有没有改变
+ if (node.isAvailable() != config.isAvailable()) {
+ String topic = node.isAvailable() ? EcTopic.NODE_ENABLE : EcTopic.NODE_DISABLE;
+ config.setAvailable(node.isAvailable());
+ eventCenter.publishAsync(new EventInfo(topic));
+ }
+ }
+ }
+
+ @Override
+ public void addNodes(List nodes) {
+ if (CollectionUtils.isEmpty(nodes)) {
+ return;
+ }
+ for (Node node : nodes) {
+ change(node);
+ }
+ }
+
+ @Override
+ public void removeNodes(List nodes) {
+
+ }
+}
diff --git a/job-core/src/main/java/com/lts/job/core/loadbalance/AbstractLoadBalance.java b/job-core/src/main/java/com/lts/job/core/loadbalance/AbstractLoadBalance.java
index fb8c0f111..aca3c3f7c 100644
--- a/job-core/src/main/java/com/lts/job/core/loadbalance/AbstractLoadBalance.java
+++ b/job-core/src/main/java/com/lts/job/core/loadbalance/AbstractLoadBalance.java
@@ -1,5 +1,7 @@
package com.lts.job.core.loadbalance;
+import com.lts.job.core.cluster.Config;
+
import java.util.List;
/**
@@ -8,7 +10,7 @@
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
- public S select(List shards, String seed) {
+ public S select(Config config, List shards, String seed) {
if (shards == null || shards.size() == 0) {
return null;
}
diff --git a/job-core/src/main/java/com/lts/job/core/loadbalance/LoadBalance.java b/job-core/src/main/java/com/lts/job/core/loadbalance/LoadBalance.java
index 080ba086f..b3e0fe13e 100644
--- a/job-core/src/main/java/com/lts/job/core/loadbalance/LoadBalance.java
+++ b/job-core/src/main/java/com/lts/job/core/loadbalance/LoadBalance.java
@@ -1,12 +1,18 @@
package com.lts.job.core.loadbalance;
+import com.lts.job.core.cluster.Config;
+import com.lts.job.core.extension.Adaptive;
+import com.lts.job.core.extension.SPI;
+
import java.util.List;
/**
* Robert HG (254963746@qq.com) on 3/25/15.
*/
+@SPI("random")
public interface LoadBalance {
- public S select(List shards, String seed);
+ @Adaptive("loadbalance")
+ public S select(Config config, List shards, String seed);
}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/AbstractRegistry.java b/job-core/src/main/java/com/lts/job/core/registry/AbstractRegistry.java
new file mode 100644
index 000000000..895275059
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/registry/AbstractRegistry.java
@@ -0,0 +1,189 @@
+package com.lts.job.core.registry;
+
+import com.lts.job.core.Application;
+import com.lts.job.core.cluster.Node;
+import com.lts.job.core.util.CollectionUtils;
+import com.lts.job.core.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 5/17/15.
+ */
+public abstract class AbstractRegistry implements Registry {
+
+ protected final static Logger LOGGER = LoggerFactory.getLogger(Registry.class);
+
+ private final Set registered = new ConcurrentHashSet();
+ private final ConcurrentMap> subscribed = new ConcurrentHashMap>();
+
+ protected Application application;
+ private Node node;
+
+ public AbstractRegistry(Application application) {
+
+ this.application = application;
+ }
+
+ @Override
+ public void register(Node node) {
+ if (node == null) {
+ throw new IllegalArgumentException("register node == null");
+ }
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Register: " + node);
+ }
+ registered.add(node);
+ }
+
+ @Override
+ public void unregister(Node node) {
+ if (node == null) {
+ throw new IllegalArgumentException("unregister node == null");
+ }
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Unregister: " + node);
+ }
+ registered.remove(node);
+ }
+
+ @Override
+ public void subscribe(Node node, NotifyListener listener) {
+ if (node == null) {
+ throw new IllegalArgumentException("subscribe node == null");
+ }
+ if (listener == null) {
+ throw new IllegalArgumentException("subscribe listener == null");
+ }
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Subscribe: " + node);
+ }
+ Set listeners = subscribed.get(node);
+ if (listeners == null) {
+ subscribed.putIfAbsent(node, new ConcurrentHashSet());
+ listeners = subscribed.get(node);
+ }
+ listeners.add(listener);
+
+ }
+
+ @Override
+ public void unsubscribe(Node node, NotifyListener listener) {
+ if (node == null) {
+ throw new IllegalArgumentException("unsubscribe node == null");
+ }
+ if (listener == null) {
+ throw new IllegalArgumentException("unsubscribe listener == null");
+ }
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("unsubscribe: " + node);
+ }
+ Set listeners = subscribed.get(node);
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ }
+
+ protected void notify(NotifyEvent event, List nodes, NotifyListener listener) {
+ if (event == null) {
+ throw new IllegalArgumentException("notify event == null");
+ }
+ if (listener == null) {
+ throw new IllegalArgumentException("notify listener == null");
+ }
+ if (CollectionUtils.isEmpty(nodes)) {
+ LOGGER.warn("Ignore empty notify nodes for subscribe node " + getNode());
+ return;
+ }
+
+ listener.notify(event, nodes);
+ }
+
+ @Override
+ public void destroy() {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Destroy registry:" + getNode());
+ }
+ Set destroyRegistered = new HashSet(getRegistered());
+ if (!destroyRegistered.isEmpty()) {
+ for (Node node : new HashSet(getRegistered())) {
+ try {
+ unregister(node);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Destroy unregister node " + node);
+ }
+ } catch (Throwable t) {
+ LOGGER.warn("Failed to unregister node " + node + " to registry " + getNode() + " on destroy, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ Map> destroySubscribed = new HashMap>(getSubscribed());
+ if (!destroySubscribed.isEmpty()) {
+ for (Map.Entry> entry : destroySubscribed.entrySet()) {
+ Node node = entry.getKey();
+ for (NotifyListener listener : entry.getValue()) {
+ try {
+ unsubscribe(node, listener);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Destroy unsubscribe node " + node);
+ }
+ } catch (Throwable t) {
+ LOGGER.warn("Failed to unsubscribe node " + node + " to registry " + getNode() + " on destroy, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ }
+ }
+
+ protected Set getRegistered() {
+ return registered;
+ }
+
+ protected ConcurrentMap> getSubscribed() {
+ return subscribed;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ public void setNode(Node node) {
+ this.node = node;
+ }
+
+ /**
+ * 恢复
+ *
+ * @throws Exception
+ */
+ protected void recover() throws Exception {
+ // register
+ Set recoverRegistered = new HashSet(getRegistered());
+ if (!recoverRegistered.isEmpty()) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Recover register node " + recoverRegistered);
+ }
+ for (Node node : recoverRegistered) {
+ register(node);
+ }
+ }
+ // subscribe
+ Map> recoverSubscribed = new HashMap>(getSubscribed());
+ if (!recoverSubscribed.isEmpty()) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Recover subscribe node " + recoverSubscribed.keySet());
+ }
+ for (Map.Entry> entry : recoverSubscribed.entrySet()) {
+ Node node = entry.getKey();
+ for (NotifyListener listener : entry.getValue()) {
+ subscribe(node, listener);
+ }
+ }
+ }
+ }
+
+}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/FailbackRegistry.java b/job-core/src/main/java/com/lts/job/core/registry/FailbackRegistry.java
new file mode 100644
index 000000000..d02504ca7
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/registry/FailbackRegistry.java
@@ -0,0 +1,325 @@
+package com.lts.job.core.registry;
+
+import com.lts.job.core.Application;
+import com.lts.job.core.cluster.Node;
+import com.lts.job.core.constant.Constants;
+import com.lts.job.core.factory.NamedThreadFactory;
+import com.lts.job.core.util.ConcurrentHashSet;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 5/17/15.
+ */
+public abstract class FailbackRegistry extends AbstractRegistry {
+
+ // 定时任务执行器
+ private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("LTSRegistryFailedRetryTimer", true));
+
+ // 失败重试定时器,定时检查是否有请求失败,如有,无限次重试
+ private ScheduledFuture> retryFuture;
+
+ // 注册失败的定时重试
+ private final Set failedRegistered = new ConcurrentHashSet();
+ private final Set failedUnRegistered = new ConcurrentHashSet();
+ private final ConcurrentMap> failedSubscribed = new ConcurrentHashMap>();
+ private final ConcurrentMap> failedUnsubscribed = new ConcurrentHashMap>();
+ private final ConcurrentMap>>> failedNotified = new ConcurrentHashMap>>>();
+
+ public FailbackRegistry(Application application) {
+ super(application);
+
+ int retryPeriod = application.getConfig().getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
+
+ this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
+ public void run() {
+ // 检测并连接注册中心
+ try {
+ retry();
+ } catch (Throwable t) { // 防御性容错
+ LOGGER.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
+ }
+ }
+ }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void register(Node node) {
+ try {
+ super.register(node);
+ failedRegistered.clear();
+ doRegister(node);
+ } catch (Exception e) {
+ // 将失败的注册请求记录到失败列表,定时重试
+ failedRegistered.add(node);
+ }
+ }
+
+ @Override
+ public void unregister(Node node) {
+ try {
+ super.unregister(node);
+ failedUnRegistered.clear();
+ doUnRegister(node);
+ } catch (Exception e) {
+ // 将失败的取消注册请求记录到失败列表,定时重试
+ failedUnRegistered.add(node);
+ }
+ }
+
+ @Override
+ public void subscribe(Node node, NotifyListener listener) {
+ try {
+ super.subscribe(node, listener);
+
+ removeFailedSubscribed(node, listener);
+
+ doSubscribe(node, listener);
+
+ } catch (Exception e) {
+ addFailedSubscribed(node, listener);
+ }
+ }
+
+ @Override
+ public void unsubscribe(Node node, NotifyListener listener) {
+ try {
+ super.unsubscribe(node, listener);
+
+ removeFailedSubscribed(node, listener);
+
+ doUnsubscribe(node, listener);
+
+ } catch (Exception e) {
+ addFailedUnsubscribed(node, listener);
+ }
+ }
+
+ protected void addFailedUnsubscribed(Node node, NotifyListener listener) {
+ // 将失败的取消订阅请求记录到失败列表,定时重试
+ Set listeners = failedUnsubscribed.get(node);
+ if (listeners == null) {
+ failedUnsubscribed.putIfAbsent(node, new ConcurrentHashSet());
+ listeners = failedUnsubscribed.get(node);
+ }
+ listeners.add(listener);
+ }
+
+ @Override
+ protected void notify(NotifyEvent event, List nodes, NotifyListener listener) {
+ try {
+ super.notify(event, nodes, listener);
+ } catch (Exception e) {
+ // 将失败的通知请求记录到失败列表,定时重试
+ Map>> listeners = failedNotified.get(getNode());
+
+ if (listeners == null) {
+ failedNotified.putIfAbsent(getNode(), new ConcurrentHashMap>>());
+ listeners = failedNotified.get(getNode());
+ }
+ listeners.put(listener, new NotifyPair>(event, nodes));
+ LOGGER.error("Failed to notify, waiting for retry, cause: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ try {
+ retryFuture.cancel(true);
+ } catch (Throwable t) {
+ LOGGER.warn(t.getMessage(), t);
+ }
+ }
+
+ @Override
+ protected void recover() throws Exception {
+ // register
+ Set recoverRegistered = new HashSet(getRegistered());
+ if (!recoverRegistered.isEmpty()) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Recover register node " + recoverRegistered);
+ }
+ for (Node node : recoverRegistered) {
+ failedRegistered.add(node);
+ }
+ }
+ // subscribe
+ Map> recoverSubscribed = new HashMap>(getSubscribed());
+ if (!recoverSubscribed.isEmpty()) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Recover subscribe node " + recoverSubscribed.keySet());
+ }
+ for (Map.Entry> entry : recoverSubscribed.entrySet()) {
+ Node node = entry.getKey();
+ for (NotifyListener listener : entry.getValue()) {
+ addFailedSubscribed(node, listener);
+ }
+ }
+ }
+ }
+
+ private void removeFailedSubscribed(Node node, NotifyListener listener) {
+ Set listeners = failedSubscribed.get(node);
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ listeners = failedUnsubscribed.get(node);
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ Map>> notified = failedNotified.get(node);
+ if (notified != null) {
+ notified.remove(listener);
+ }
+ }
+
+ private void addFailedSubscribed(Node node, NotifyListener listener) {
+ Set listeners = failedSubscribed.get(node);
+ if (listeners == null) {
+ failedSubscribed.putIfAbsent(node, new ConcurrentHashSet());
+ listeners = failedSubscribed.get(node);
+ }
+ listeners.add(listener);
+ }
+
+ protected void retry() {
+ if (!failedRegistered.isEmpty()) {
+ Set failed = new HashSet();
+ if (failed.size() > 0) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Retry register {}", failed);
+ }
+ try {
+ for (Node node : failed) {
+ doRegister(node);
+ }
+ } catch (Throwable t) { // 忽略所有异常,等待下次重试
+ LOGGER.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ if (!failedUnRegistered.isEmpty()) {
+ Set failed = new HashSet();
+ if (failed.size() > 0) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Retry unregister {}", failed);
+ }
+ try {
+ for (Node node : failed) {
+ doUnRegister(node);
+ }
+ } catch (Throwable t) { // 忽略所有异常,等待下次重试
+ LOGGER.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ if (!failedSubscribed.isEmpty()) {
+ Map> failed = new HashMap>(failedSubscribed);
+ for (Map.Entry> entry : new HashMap>(failed).entrySet()) {
+ if (entry.getValue() == null || entry.getValue().size() == 0) {
+ failed.remove(entry.getKey());
+ }
+ }
+ if (failed.size() > 0) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Retry subscribe " + failed);
+ }
+ try {
+ for (Map.Entry> entry : failed.entrySet()) {
+ Node node = entry.getKey();
+ Set listeners = entry.getValue();
+ for (NotifyListener listener : listeners) {
+ try {
+ doSubscribe(node, listener);
+ listeners.remove(listener);
+ } catch (Throwable t) { // 忽略所有异常,等待下次重试
+ LOGGER.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ } catch (Throwable t) { // 忽略所有异常,等待下次重试
+ LOGGER.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ if (!failedUnsubscribed.isEmpty()) {
+ Map> failed = new HashMap>(failedUnsubscribed);
+ for (Map.Entry> entry : new HashMap>(failed).entrySet()) {
+ if (entry.getValue() == null || entry.getValue().size() == 0) {
+ failed.remove(entry.getKey());
+ }
+ }
+ if (failed.size() > 0) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Retry unsubscribe " + failed);
+ }
+ try {
+ for (Map.Entry> entry : failed.entrySet()) {
+ Node node = entry.getKey();
+ Set listeners = entry.getValue();
+ for (NotifyListener listener : listeners) {
+ try {
+ doUnsubscribe(node, listener);
+ listeners.remove(listener);
+ } catch (Throwable t) { // 忽略所有异常,等待下次重试
+ LOGGER.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ } catch (Throwable t) { // 忽略所有异常,等待下次重试
+ LOGGER.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ if (!failedNotified.isEmpty()) {
+ Map>>> failed = new HashMap>>>(failedNotified);
+ for (Map.Entry>>> entry : new HashMap>>>(failed).entrySet()) {
+ if (entry.getValue() == null || entry.getValue().size() == 0) {
+ failed.remove(entry.getKey());
+ }
+ }
+ if (failed.size() > 0) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Retry notify " + failed);
+ }
+ try {
+ for (Map>> values : failed.values()) {
+ for (Map.Entry>> entry : values.entrySet()) {
+ try {
+ NotifyListener listener = entry.getKey();
+ NotifyPair> notifyPair = entry.getValue();
+ listener.notify(notifyPair.event, notifyPair.nodes);
+ values.remove(listener);
+ } catch (Throwable t) { // 忽略所有异常,等待下次重试
+ LOGGER.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ } catch (Throwable t) { // 忽略所有异常,等待下次重试
+ LOGGER.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
+ }
+ }
+ }
+ }
+
+ private class NotifyPair {
+ T1 event;
+ T2 nodes;
+
+ public NotifyPair(T1 event, T2 nodes) {
+ this.event = event;
+ this.nodes = nodes;
+ }
+ }
+
+ protected abstract void doRegister(Node node);
+
+ protected abstract void doUnRegister(Node node);
+
+ protected abstract void doSubscribe(Node node, NotifyListener listener);
+
+ protected abstract void doUnsubscribe(Node node, NotifyListener listener);
+}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/NodeRegistryUtils.java b/job-core/src/main/java/com/lts/job/core/registry/NodeRegistryUtils.java
new file mode 100644
index 000000000..a51593696
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/registry/NodeRegistryUtils.java
@@ -0,0 +1,121 @@
+package com.lts.job.core.registry;
+
+import com.lts.job.core.cluster.Node;
+import com.lts.job.core.cluster.NodeType;
+import com.lts.job.core.util.NetUtils;
+import com.lts.job.core.util.StringUtils;
+
+import java.util.Date;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 5/11/15.
+ *
+ * /LTS/{集群名字}/NODES/TASK_TRACKER/TASK_TRACKER:\\192.168.0.150:8888?group=TASK_TRACKER&threads=8&identity=85750db6-e854-4eb3-a595-9227a5f2c8f6&createTime=1408189898185&isAvailable=true&listenNodeTypes=CLIENT,TASK_TRACKER
+ * /LTS/{集群名字}/NODES/JOB_CLIENT/JOB_CLIENT:\\192.168.0.150:8888?group=JOB_CLIENT&threads=8&identity=85750db6-e854-4eb3-a595-9227a5f2c8f6&createTime=1408189898185&isAvailable=true&listenNodeTypes=CLIENT,TASK_TRACKER
+ * /LTS/{集群名字}/NODES/JOB_TRACKER/JOB_TRACKER:\\192.168.0.150:8888?group=JOB_TRACKER&threads=8&identity=85750db6-e854-4eb3-a595-9227a5f2c8f6&createTime=1408189898185&isAvailable=true&listenNodeTypes=CLIENT,TASK_TRACKER
+ *
+ */
+public class NodeRegistryUtils {
+
+ public static String getRootPath(String clusterName) {
+ return "/LTS/" + clusterName + "/NODES";
+ }
+
+ public static String getNodeTypePath(String clusterName, NodeType nodeType) {
+ return NodeRegistryUtils.getRootPath(clusterName) + "/" + nodeType;
+ }
+
+ public static Node parse(String fullPath) {
+ Node node = new Node();
+ String[] nodeDir = fullPath.split("/");
+ NodeType nodeType = NodeType.valueOf(nodeDir[4]);
+ node.setNodeType(nodeType);
+ String url = nodeDir[5];
+
+ url = url.substring(nodeType.name().length() + 3);
+ String address = url.split("\\?")[0];
+ String ip = address.split(":")[0];
+
+ node.setIp(ip);
+ if (address.contains(":")) {
+ String port = address.split(":")[1];
+ if (port != null && !"".equals(port.trim())) {
+ node.setPort(Integer.valueOf(port));
+ }
+ }
+ String params = url.split("\\?")[1];
+
+ String[] paramArr = params.split("&");
+ for (String paramEntry : paramArr) {
+ String key = paramEntry.split("=")[0];
+ String value = paramEntry.split("=")[1];
+
+ if ("group".equals(key)) {
+ node.setGroup(value);
+ } else if ("threads".equals(key)) {
+ node.setThreads(Integer.valueOf(value));
+ } else if ("identity".equals(key)) {
+ node.setIdentity(value);
+ } else if ("createTime".equals(key)) {
+ node.setCreateTime(Long.valueOf(value));
+ } else if ("isAvailable".equals(key)) {
+ node.setAvailable(Boolean.valueOf(value));
+ }
+ }
+ return node;
+ }
+
+ public static String getFullPath(String clusterName, Node node) {
+ StringBuilder path = new StringBuilder();
+
+ path.append(getRootPath(clusterName))
+ .append("/")
+ .append(node.getNodeType())
+ .append("/")
+ .append(node.getNodeType())
+ .append(":\\\\")
+ .append(node.getIp());
+
+ if (node.getPort() != null && node.getPort() != 0) {
+ path.append(":").append(node.getPort());
+ }
+
+ path.append("?")
+ .append("group=")
+ .append(node.getGroup());
+ if (node.getThreads() != 0) {
+ path.append("&threads=")
+ .append(node.getThreads());
+ }
+
+ path.append("&identity=")
+ .append(node.getIdentity())
+ .append("&createTime=")
+ .append(node.getCreateTime())
+ .append("&isAvailable=")
+ .append(node.isAvailable());
+
+ return path.toString();
+ }
+
+ public static void main(String[] args) {
+ Node node = new Node();
+ node.setGroup("group1");
+ node.setIdentity(StringUtils.generateUUID());
+ node.setThreads(222);
+ node.setNodeType(NodeType.JOB_TRACKER);
+ node.setCreateTime(new Date().getTime());
+ node.setPort(2313);
+ node.setIp(NetUtils.getLocalHost());
+ String fullPath = NodeRegistryUtils.getFullPath("lts", node);
+ System.out.println(fullPath);
+
+ node = NodeRegistryUtils.parse(fullPath);
+ node.setNodeType(NodeType.JOB_CLIENT);
+ fullPath = NodeRegistryUtils.getFullPath("lts", node);
+ System.out.println(fullPath);
+
+ node = NodeRegistryUtils.parse(fullPath);
+ System.out.println(node);
+ }
+}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/NotifyEvent.java b/job-core/src/main/java/com/lts/job/core/registry/NotifyEvent.java
new file mode 100644
index 000000000..a13d52395
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/registry/NotifyEvent.java
@@ -0,0 +1,10 @@
+package com.lts.job.core.registry;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 5/17/15.
+ */
+public enum NotifyEvent {
+
+ ADD,
+ REMOVE
+}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/NotifyListener.java b/job-core/src/main/java/com/lts/job/core/registry/NotifyListener.java
new file mode 100644
index 000000000..9d4f4981a
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/registry/NotifyListener.java
@@ -0,0 +1,14 @@
+package com.lts.job.core.registry;
+
+import com.lts.job.core.cluster.Node;
+
+import java.util.List;
+
+/**
+ * @author Robert HG (254963746@qq.com) on 5/17/15.
+ */
+public interface NotifyListener {
+
+ void notify(NotifyEvent event, List nodes);
+
+}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/PathParser.java b/job-core/src/main/java/com/lts/job/core/registry/PathParser.java
deleted file mode 100644
index 54225be4e..000000000
--- a/job-core/src/main/java/com/lts/job/core/registry/PathParser.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.lts.job.core.registry;
-
-import com.lts.job.core.cluster.Node;
-
-/**
- * @author Robert HG (254963746@qq.com) on 3/27/15.
- */
-public interface PathParser {
-
- public Node parse(String path);
-
- public String getPath(Node node);
-
-}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/Registry.java b/job-core/src/main/java/com/lts/job/core/registry/Registry.java
index c8affd58f..fe2772e3c 100644
--- a/job-core/src/main/java/com/lts/job/core/registry/Registry.java
+++ b/job-core/src/main/java/com/lts/job/core/registry/Registry.java
@@ -1,11 +1,10 @@
package com.lts.job.core.registry;
import com.lts.job.core.cluster.Node;
-import com.lts.job.core.listener.NodeChangeListener;
/**
* @author Robert HG (254963746@qq.com) on 6/22/14.
- * 节点注册接口
+ * 节点注册接口
*/
public interface Registry {
@@ -24,11 +23,22 @@ public interface Registry {
void unregister(Node node);
/**
- * 添加节点变化监听器
+ * 监听节点
+ *
* @param listener
*/
- void addNodeChangeListener(NodeChangeListener listener);
+ void subscribe(Node node, NotifyListener listener);
- void destroy();
+ /**
+ * 取消监听节点
+ *
+ * @param node
+ * @param listener
+ */
+ void unsubscribe(Node node, NotifyListener listener);
+ /**
+ * 销毁
+ */
+ void destroy();
}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/RegistryFactory.java b/job-core/src/main/java/com/lts/job/core/registry/RegistryFactory.java
new file mode 100644
index 000000000..2a9f4d180
--- /dev/null
+++ b/job-core/src/main/java/com/lts/job/core/registry/RegistryFactory.java
@@ -0,0 +1,32 @@
+package com.lts.job.core.registry;
+
+import com.lts.job.core.Application;
+import com.lts.job.core.registry.redis.RedisRegistry;
+import com.lts.job.core.registry.zookeeper.ZookeeperRegistry;
+import com.lts.job.core.util.StringUtils;
+
+/**
+ * Created by hugui on 5/17/15.
+ */
+public class RegistryFactory {
+
+ public static Registry getRegistry(Application application) {
+ String address = application.getConfig().getRegistryAddress();
+ if (StringUtils.isEmpty(address)) {
+ throw new IllegalArgumentException("address is null!");
+ }
+ if (address.startsWith("zookeeper://")) {
+ application.getConfig().setRegistryAddress(
+ address.replace("zookeeper://", "")
+ );
+ return new ZookeeperRegistry(application);
+ } else if (address.startsWith("redis://")) {
+ application.getConfig().setRegistryAddress(
+ address.replace("redis://", "")
+ );
+ return new RedisRegistry(application);
+ }
+ throw new IllegalArgumentException("illegal address protocol");
+ }
+
+}
diff --git a/job-core/src/main/java/com/lts/job/core/registry/ZkNodeRegistry.java b/job-core/src/main/java/com/lts/job/core/registry/ZkNodeRegistry.java
deleted file mode 100644
index 53de44f96..000000000
--- a/job-core/src/main/java/com/lts/job/core/registry/ZkNodeRegistry.java
+++ /dev/null
@@ -1,173 +0,0 @@
-package com.lts.job.core.registry;
-
-import com.lts.job.core.Application;
-import com.lts.job.core.cluster.Node;
-import com.lts.job.core.cluster.NodeType;
-import com.lts.job.core.listener.NodeChangeListener;
-import com.lts.job.core.util.CollectionUtils;
-import com.lts.job.registry.zookeeper.ChildListener;
-import com.lts.job.registry.zookeeper.StateListener;
-import com.lts.job.registry.zookeeper.ZookeeperClient;
-import com.lts.job.registry.zookeeper.zkclient.ZkClientZookeeperClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author Robert HG (254963746@qq.com) on 6/22/14.
- * 节点注册器,并监听自己关注的节点
- */
-public class ZkNodeRegistry implements Registry {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ZkNodeRegistry.class);
- private ZookeeperClient zkClient;
- // 用来记录父节点下的子节点的变化
- private final ConcurrentHashMap > NODE_CHILDREN_MAP = new ConcurrentHashMap>();
- private ChildChangeListener listener;
- private List nodeChangeListeners;
- private Application application;
- private ZkPathParser zkPathParser;
-
- public ZkNodeRegistry(Application application) {
- this.listener = new ChildChangeListener();
- this.application = application;
- this.zkPathParser = new ZkPathParser(application);
- application.setPathParser(this.zkPathParser);
- }
-
- /**
- * 添加节点变化监听器
- * @param nodeChangeListener
- */
- public void addNodeChangeListener(NodeChangeListener nodeChangeListener) {
- if (this.nodeChangeListeners == null) {
- this.nodeChangeListeners = new ArrayList();
- }
- this.nodeChangeListeners.add(nodeChangeListener);
- }
-
- @Override
- public void register(final Node node) {
- zkClient = new ZkClientZookeeperClient(application.getConfig().getZookeeperAddress());
- zkClient.addStateListener(new StateListener() {
- @Override
- public void stateChanged(int state) {
- if (state == RECONNECTED) {
- try {
- doSubscribe(node);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
- });
- doSubscribe(node);
- }
-
- protected void doSubscribe(Node node) {
-
- if (zkClient.exists(node.getPath())) {
- return;
- }
-
- zkClient.create(node.getPath(), true, false);
-
- List listenNodeTypes = node.getListenNodeTypes();
- if (CollectionUtils.isNotEmpty(listenNodeTypes)) {
-
- for (NodeType nodeType : listenNodeTypes) {
- String listenNodePath = zkPathParser.getPath(nodeType);
- // 为自己关注的 节点 添加监听
- zkClient.addChildListener(listenNodePath, listener);
-
- // 将自己关注的 节点类型加入到节点管理中去
- List children = zkClient.getChildren(listenNodePath);
- if (CollectionUtils.isNotEmpty(children)) {
- List listenedNodes = new ArrayList();
- for (String child : children) {
- Node listenedNode = zkPathParser.parse(listenNodePath + "/" + child);
- listenedNodes.add(listenedNode);
- application.getNodeManager().addNode(listenedNode);
- }
- if (CollectionUtils.isNotEmpty(nodeChangeListeners)) {
- for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
- nodeChangeListener.addNodes(listenedNodes);
- }
- }
- NODE_CHILDREN_MAP.put(listenNodePath, children);
- }
- }
- }
- }
-
- protected void doUnSubscribe(Node node) {
- zkClient.delete(node.getPath());
-
- List