目录
- 前言
- 架构
- 功能和特性
- 入门角色
- 写个例子
- 任务执行流程
- ScheduleJobBootstrap初始化
- ScheduleJobBootstrap执行
- 执行流程总结
- 分片的策略
前言
ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、以及任务治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的任务生态。 它的各个产品使用统一的任务 API,开发者仅需一次开发,即可随意部署。
架构
elasticjob由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成组成,这是ElasticJob-Lite 的架构图:
从架构图可以看到,左上角App1和App2两个业务模块中的Elastic-Job往zk中注册了信息,右边的Elastic-Job-Lite是监听了zk的,因此,整个任务的调度是由zk来完成的。下面的console通过Rest API去获取zk中的信息,得到调度数据和日志,并存盘。
这是ElasticJob-Cloud的架构图:
ElasticJob-Cloud的调度是依赖Mesos的,从架构图的理解,Mesos和zk结合做好任务调度,再分发给Mesos的代理并执行。
功能和特性
以下是ElasticJob的特性优点
- 支持任务在分布式场景下的分片和高可用
- 能够水平扩展任务的吞吐量和执行效率
- 任务处理能力随资源配备弹性伸缩
- 优化任务和资源调度
- 相同任务聚合至相同的执行器统一处理
- 动态调配追加资源至新分配的任务
- 失效转移
- 错过任务重新执行
- 分布式环境下任务自动诊断和修复
- 基于有向无环图 (DAG) 的任务依赖
- 基于有向无环图 (DAG) 的任务项目依赖
- 可扩展的任务类型统一接口
- 支持丰富的任务类型库--包括数据流、脚本、HTTP、文件、大数据
- 易于对接业务任务--兼容 Spring IOC
- 任务管控端
- 任务事件追踪
- 注册中心管理
入门角色
既然这么多优点,我们就入门试试吧。入门elasticjob-lite也继承了Quartz框架,同样的很简单,只要三个角色:
SimpleJob
:任务主体。如果用过Quartz,那么应该能够理解这个,基本上和Quartz的Job接口类似,只要实现一个execute方法就行了,入门用这个就行;
JobConfiguration
:任务配置。同样的可以理解为类似Quartz框架中的Trigger,最重要的就是配置任务的执行频率;
ScheduleJobBootstrap
:调度主体。这个一样,参考Quartz框架中的Scheduler对象,它把任务和配置结合起来,任务按照配置中的频率执行。
写个例子
我们创建这三种角色,首先创建任务主体:
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
/**
* (这个类的说明)
*
* @author mars酱
*/
public class MarsSimpleJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
System.out.printf("Item: %s | Time: %s | Thread: %s | %s%n",
shardingContext.getShardingItem(),
new SimpleDateFormat("HH:mm:ss").format(new Date()),
Thread.currentThread().getId(),
"就是这么简单~");
}
}
再创建任务配置:
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import javax.sql.DataSource;
import java.util.Objects;
/**
* (这个类的说明)
*
* @author mars酱
*/
public class JobConfigurationBuilder {
public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) {
JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3)
.cron(cronExpression)
.shardingItemParameters("0=a,1=b,2=c");
if (Objects.nonNull(tracingConfig)) {
builder.addExtraConfigurations(tracingConfig);
}
return builder.build();
}
}
最后创建调度器,并执行:
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import javax.sql.DataSource;
/**
* (这个类的说明)
*
* @author mars酱
*/
public final class SchedulerMain {
private static final int EMBED_ZOOKEEPER_PORT = 4181;
private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;
private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java";
// CHECKSTYLE:OFF
public static void main(final String[] args) {
// 内嵌zk服务
EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
// 简单作业
setUpSimpleJob(regCenter, null);
}
private static CoordinatorRegistryCenter setUpRegistryCenter() {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
result.init();
return result;
}
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) {
new ScheduleJobBootstrap(regCenter,
new MarsSimpleJob(),
JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule();
}
}
运行的效果:
截图中Item
是处理的分片项,Thread
是当前线程的id,看到了Quartz框架的影子...。
任务执行流程
既然能成功运行,我们看看内部的处理逻辑吧。Mars酱本机并没有安装zk,所以copy了官方的例子,在程序运行前先启用了一个内嵌的zk服务:
EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
这个只能在模拟的时候使用,千万不能拿去放生产环境。接下来就是注册中心的配置了,我们需要的是CoordinatorRegistryCenter对象:
private static CoordinatorRegistryCenter setUpRegistryCenter() {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
result.init();
return result;
}
好了,zk的部分处理完成,下面就是直接SchedulerJobBootstrap的部分了。
ScheduleJobBootstrap初始化
ScheduleJobBootstrap的初始化在例子中需要三个参数:
CoordinatorRegistryCenter
:这个是协调用的注册中心。是一个接口类,它的实现在ElasticJob里面只有一个ZookeeperRegisterCenter对象,未来是不是会支持其他的注册中心呢?
ElasticJob
: Mars酱理解为任务对象。但是ElasticJob这个对象本身是个空接口,有两个子接口SimpleJob
和DataflowJob
,前者Mars酱的理解是和Quartz中的Job对象类似,只要实现execute函数就行,后者有需要实现两个接口,一个fetchData
获取数据,一个processData
处理数据。所以,ElasticJob这个接口留空,是为了还有其他扩展吧?
JobConfiguration
:弹性任务配置项。构建这个对象不能直接设置,只能用buider的方式构建。需要配置的属性很多,但是核心属性大致就是几个:任务名称、分片数、执行频率、分片参数。JobConfiguration的所有属性如下:
属性名 | 说明 |
String jobName | 任务名称 |
String cron | cron表达式 |
String timeZone | 任务运行的时区 |
int shardingTotalCount | 任务分片总数 |
String shardingItemParameters | 分片序号和参数,多个键值对之间用逗号分隔,从0开始,但是不能大于或等于任务分片的总数 |
String jobParameter | 任务自定义任务参数 |
boolean monitorExecution | 是否监听执行 |
boolean failover | 是否启用故障转移。开启表示如果任务在一次任务执行中途宕机,允许将该次未完成的任务在另一任务节点上补偿执行 |
boolean misfire | 不发火。哈哈,其实是是否开启错过任务重新执行 |
int maxTimeDiffSeconds | 最大时差 |
int reconcileIntervalMinutes | 间隔时长 |
String jobShardingStrategyType | 任务分片策略类型,总共三种 |
String jobExecutorServiceHandlerType | 任务执行程序服务处理程序类型 |
String jobErrorHandlerType | 任务错误处理类型 |
Collection jobListenerTypes | 任务监听类型 |
Collection extraConfigurations | 附加配置信息 |
String description | 任务描述 |
Properties props | 扩展用属性值 |
boolean disabled | 是否禁用 |
boolean overwrite | 是否覆盖 |
String label | 标签 |
boolean staticSharding | 是否支持静态分片 |
ScheduleJobBootstrap执行
同样的,例子中的MarsSimpleJob的execute函数,最终会被ElasticJob框架调用,我们按照被执行的反向顺序往上找。MarsSimpleJob是继承SimpleJob
的, 而SimpleJob
的execute函数是被SimpleJobExecutor
所调用:
/**
* Simple job executor.
*/
public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> {
@Override
public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
// 这里调用execute函数
elasticJob.execute(shardingContext);
}
@Override
public Class<SimpleJob> getElasticJobClass() {
return SimpleJob.class;
}
}
再继续往上找,process的核心流程就是在ElasticJobExecutor
里面了,调用process的部分在ElasticJobExcutor
中几个重载的process方法调用的,两个process函数完成不同的功能,调用SimpleExecutor的process部分是这样:
@SuppressWarnings("unchecked")
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
jobFacade.postJobExecutionEvent(startEvent);
log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
JobExecutionEvent completeEvent;
try {
// 这里调用SimpleJobExecutor的process
jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item);
jobFacade.postJobExecutionEvent(completeEvent);
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtils.transform(cause));
JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
上面这个process负责最终任务的执行部分,由JobItemExecutor对象调用,SimpleJobExecutor被JobItemExecutor接口定义。整个这个proces由guava包的EventBus处理消息事件,执行之前有startEvent,执行完成有completeEvent,异常也有对应的失败event,方面架构图中存盘事件日志、ELK日志收集动作。
调用这个process的部分,由另一个process完成,长这样的:
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
process(jobConfig, shardingContexts, item, jobExecutionEvent);
return;
}
CountDownLatch latch = new CountDownLatch(items.size());
for (int each : items) {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
ExecutorService executorService = executorContext.get(ExecutorService.class);
if (executorService.isShutdown()) {
return;
}
// 提交给线程池执行
executorService.submit(() -> {
try {
process(jobConfig, shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
上面这个process负责把分片参数依次组装好,设置好JobExecutionEvent中的ip、主机名等参数,然后放入线程池中去执行。再往上,看现在这个process被调用的部分:
private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
return;
}
// 往注册中心注册ShardingContexts信息
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
// 发送跟踪日志,标记任务正在运行
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
try {
// 调用process
process(jobConfig, shardingContexts, executionSource);
} finally {
// TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
// 告知注册中心任务完成
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
// 没有失败信息,通知任务完成
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
} else {
// 否则通知失败
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
itemErrorMessages.clear();
}
}
}
方法execute从注册中心注册ShardingContext信息,并发送跟踪日志事件,然后调用process,最后发送跟踪消息标记任务完成。再有一个重载的execute方法调用上面这个execute方法,如下:
public void execute() {
// job的配置信息
JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
executorContext.reloadIfNecessary(jobConfig);
JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
// 这里有玄机
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
// 发送时间消息总线
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(),
State.TASK_FINISHED,
String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.",
jobConfig.getJobName(),
shardingContexts.getShardingItemParameters().keySet()));
return;
}
try {
// 任务执行的前置流程
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
// 调用上面的execute方法
execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
}
// 故障转移
jobFacade.failoverIfNecessary();
try {
// 任务执行的后置流程
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
这个execute就由Quartz的JobRunShell调用了,Quartz的调用的过程在 Java | 一分钟掌握定时任务 | 6 - Quartz定时任务里面还好Mars酱分析过了。
执行流程总结
那么,追踪完源代码,大致的流程就应该是如下:
1.组装基本参数(任务、频率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任务属性 -> 4.设置各种facade -> 5.初始化ElasticJobExecutor -> 6.调用scheduler执行任务 -> 7.获取任务执行器(SimpleJobExecutor) -> 8.各种校验逻辑 -> 9. 处理分片参数 -> 10. 设置任务为运行状态 -> 11. 提交任务到线程池 -> 12.执行任务 -> 13.处理任务后续逻辑
任务的调度过程由zk完成,取决于zk的任务调度策略吧?如果一台机器的定时运行时挂了,zk会转移到另一台运行中的机器中去。-- Mars酱
分片的策略
任务的分片策略,用于将任务在分布式环境下分解成为任务使用。
SPI 名称 | 详细说明 |
JobShardingStrategy | 作业分片策略接口 |
已知实现类 | 详细说明 |
AverageAllocationJobShardingStrategy | 根据分片项平均分片 |
OdevitySortByNameJobShardingStrategy | 根据任务名称哈希值的奇偶数决定按照任务服务器 IP 升序或是降序的方式分片 |
RoundRobinByNameJobShardingStrategy | 根据任务名称轮询分片 |
那么任务的分片策略在哪里使用的呢?就在代码中注释的“这里有玄机”那行。在getShardingContexts的方法中会调用ShardingService,它会去获取JobConfiguration
中配置的分片策略方式:
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
waitingOtherShardingItemCompleted();
JobConfiguration jobConfig = configService.load(false);
int shardingTotalCount = jobConfig.getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
resetShardingInfo(shardingTotalCount);
// 获取任务分片策略
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
如果不设置,默认使用的是平均分片策略。