序
本文主要研究一下PowerJob的LightTaskTracker
TaskTracker
tech/powerjob/worker/core/tracker/task/TaskTracker.java
@Slf4j | |
public abstract class TaskTracker { | |
/** | |
* TaskTracker创建时间 | |
*/ | |
protected final long createTime; | |
/** | |
* 任务实例ID,使用频率过高,从 InstanceInfo 提取出来单独保存一份 | |
*/ | |
protected final long instanceId; | |
/** | |
* 任务实例信息 | |
*/ | |
protected final InstanceInfo instanceInfo; | |
/** | |
* 追加的工作流上下文数据 | |
* | |
* @since 2021/02/05 | |
*/ | |
protected final Map<String, String> appendedWfContext; | |
/** | |
* worker 运行时元数据 | |
*/ | |
protected final WorkerRuntime workerRuntime; | |
/** | |
* 是否结束 | |
*/ | |
protected final AtomicBoolean finished; | |
/** | |
* 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down | |
*/ | |
protected int reportFailedCnt = 0; | |
protected static final int MAX_REPORT_FAILED_THRESHOLD = 5; | |
protected TaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { | |
this.createTime = System.currentTimeMillis(); | |
this.workerRuntime = workerRuntime; | |
this.instanceId = req.getInstanceId(); | |
this.instanceInfo = new InstanceInfo(); | |
// PowerJob 值拷贝场景不多,引入三方值拷贝类库可能引入类冲突等问题,综合评估手写 ROI 最高 | |
instanceInfo.setJobId(req.getJobId()); | |
instanceInfo.setInstanceId(req.getInstanceId()); | |
instanceInfo.setWfInstanceId(req.getWfInstanceId()); | |
instanceInfo.setExecuteType(req.getExecuteType()); | |
instanceInfo.setProcessorType(req.getProcessorType()); | |
instanceInfo.setProcessorInfo(req.getProcessorInfo()); | |
instanceInfo.setJobParams(req.getJobParams()); | |
instanceInfo.setInstanceParams(req.getInstanceParams()); | |
instanceInfo.setThreadConcurrency(req.getThreadConcurrency()); | |
instanceInfo.setTaskRetryNum(req.getTaskRetryNum()); | |
instanceInfo.setLogConfig(req.getLogConfig()); | |
// 特殊处理超时时间 | |
if (instanceInfo.getInstanceTimeoutMS() <= 0) { | |
instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE); | |
} | |
// 只有工作流中的任务允许向工作流中追加上下文数据 | |
this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap(); | |
this.finished = new AtomicBoolean(false); | |
} | |
/** | |
* 销毁 | |
*/ | |
public abstract void destroy(); | |
/** | |
* 停止任务 | |
*/ | |
public abstract void stopTask(); | |
/** | |
* 查询任务实例的详细运行状态 | |
* | |
* @return 任务实例的详细运行状态 | |
*/ | |
public abstract InstanceDetail fetchRunningStatus(); | |
//...... | |
} |
TaskTracker是个抽象类,其构造器接收ServerScheduleJobReq、WorkerRuntime,然后根据ServerScheduleJobReq构建InstanceInfo;它定义了destroy、stopTask、fetchRunningStatus抽象方法
LightTaskTracker
tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java
@Slf4j | |
public class LightTaskTracker extends TaskTracker { | |
/** | |
* statusReportScheduledFuture | |
*/ | |
private final ScheduledFuture > statusReportScheduledFuture; | |
/** | |
* timeoutCheckScheduledFuture | |
*/ | |
private final ScheduledFuture > timeoutCheckScheduledFuture; | |
/** | |
* processFuture | |
*/ | |
private final Future<ProcessResult> processFuture; | |
/** | |
* 执行线程 | |
*/ | |
private final AtomicReference<Thread> executeThread; | |
/** | |
* 处理器信息 | |
*/ | |
private final ProcessorBean processorBean; | |
/** | |
* 上下文 | |
*/ | |
private final TaskContext taskContext; | |
/** | |
* 任务状态 | |
*/ | |
private TaskStatus status; | |
/** | |
* 任务开始执行的时间 | |
*/ | |
private Long taskStartTime; | |
/** | |
* 任务执行结束的时间 或者 任务被 kill 掉的时间 | |
*/ | |
private Long taskEndTime; | |
/** | |
* 任务处理结果 | |
*/ | |
private ProcessResult result; | |
private final AtomicBoolean timeoutFlag = new AtomicBoolean(false); | |
protected final AtomicBoolean stopFlag = new AtomicBoolean(false); | |
protected final AtomicBoolean destroyFlag = new AtomicBoolean(false); | |
public LightTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { | |
super(req, workerRuntime); | |
try { | |
taskContext = constructTaskContext(req, workerRuntime); | |
// 等待处理 | |
status = TaskStatus.WORKER_RECEIVED; | |
// 加载 Processor | |
processorBean = workerRuntime.getProcessorLoader().load(new ProcessorDefinition().setProcessorType(req.getProcessorType()).setProcessorInfo(req.getProcessorInfo())); | |
executeThread = new AtomicReference<>(); | |
long delay = Integer.parseInt(System.getProperty(PowerJobDKey.WORKER_STATUS_CHECK_PERIOD, "15")) * 1000L; | |
// 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段 | |
long initDelay = RandomUtils.nextInt(5000, 10000); | |
// 上报任务状态 | |
statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS); | |
// 超时控制 | |
if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) { | |
if (instanceInfo.getInstanceTimeoutMS() < 1000L) { | |
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); | |
} else { | |
// 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s | |
timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); | |
} | |
} else { | |
timeoutCheckScheduledFuture = null; | |
} | |
// 提交任务到线程池 | |
processFuture = workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit(this::processTask); | |
} catch (Exception e) { | |
log.error("[TaskTracker-{}] fail to create TaskTracker for req:{} ", instanceId, req); | |
destroy(); | |
throw e; | |
} | |
} | |
//...... | |
} |
LightTaskTracker继承了TaskTracker,其构造器根据ServerScheduleJobReq创建ProcessorDefinition,然后使用workerRuntime.getProcessorLoader().load方法进行加载,之后通过workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay调度checkAndReportStatus;若设置了instanceTimeout则调度timeoutCheck;最后通过workerRuntime.getExecutorManager().getLightweightTaskExecutorService().submit来执行processTask
checkAndReportStatus
private synchronized void checkAndReportStatus() { | |
if (destroyFlag.get()) { | |
// 已经被销毁,不需要上报状态 | |
log.info("[TaskTracker-{}] has been destroyed,final status is {},needn't to report status!", instanceId, status); | |
return; | |
} | |
TaskTrackerReportInstanceStatusReq reportInstanceStatusReq = new TaskTrackerReportInstanceStatusReq(); | |
reportInstanceStatusReq.setAppId(workerRuntime.getAppId()); | |
reportInstanceStatusReq.setJobId(instanceInfo.getJobId()); | |
reportInstanceStatusReq.setInstanceId(instanceId); | |
reportInstanceStatusReq.setWfInstanceId(instanceInfo.getWfInstanceId()); | |
reportInstanceStatusReq.setTotalTaskNum(1); | |
reportInstanceStatusReq.setReportTime(System.currentTimeMillis()); | |
reportInstanceStatusReq.setStartTime(createTime); | |
reportInstanceStatusReq.setSourceAddress(workerRuntime.getWorkerAddress()); | |
reportInstanceStatusReq.setSucceedTaskNum(0); | |
reportInstanceStatusReq.setFailedTaskNum(0); | |
if (stopFlag.get()) { | |
if (finished.get()) { | |
// 已经被成功打断 | |
destroy(); | |
return; | |
} | |
final Thread workerThread = executeThread.get(); | |
if (!finished.get() && workerThread != null) { | |
// 未能成功打断任务,强制停止 | |
try { | |
if (tryForceStopThread(workerThread)) { | |
finished.set(true); | |
taskEndTime = System.currentTimeMillis(); | |
result = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_FORCE_STOP); | |
log.warn("[TaskTracker-{}] task need stop, force stop thread {} success!", instanceId, workerThread.getName()); | |
// 被终止的任务不需要上报状态 | |
destroy(); | |
return; | |
} | |
} catch (Exception e) { | |
log.warn("[TaskTracker-{}] task need stop,fail to stop thread {}", instanceId, workerThread.getName(), e); | |
} | |
} | |
} | |
if (finished.get()) { | |
if (result.isSuccess()) { | |
reportInstanceStatusReq.setSucceedTaskNum(1); | |
reportInstanceStatusReq.setInstanceStatus(InstanceStatus.SUCCEED.getV()); | |
} else { | |
reportInstanceStatusReq.setFailedTaskNum(1); | |
reportInstanceStatusReq.setInstanceStatus(InstanceStatus.FAILED.getV()); | |
} | |
// 处理工作流上下文 | |
if (taskContext.getWorkflowContext().getWfInstanceId() != null) { | |
reportInstanceStatusReq.setAppendedWfContext(taskContext.getWorkflowContext().getAppendedContextData()); | |
} | |
reportInstanceStatusReq.setResult(suit(result.getMsg())); | |
reportInstanceStatusReq.setEndTime(taskEndTime); | |
// 微操一下,上报最终状态时重新设置下时间,并且增加一小段偏移,保证在并发上报运行中状态以及最终状态时,最终状态的上报时间晚于运行中的状态 | |
reportInstanceStatusReq.setReportTime(System.currentTimeMillis() + 1); | |
reportFinalStatusThenDestroy(workerRuntime, reportInstanceStatusReq); | |
return; | |
} | |
// 未完成的任务,只需要上报状态 | |
reportInstanceStatusReq.setInstanceStatus(InstanceStatus.RUNNING.getV()); | |
log.info("[TaskTracker-{}] report status({}) success,real status is {}", instanceId, reportInstanceStatusReq, status); | |
TransportUtils.ttReportInstanceStatus(reportInstanceStatusReq, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter()); | |
} |
checkAndReportStatus方法构建TaskTrackerReportInstanceStatusReq,然后根据stopFlag和finished进行对应处理,针对未完成的任务执行TransportUtils.ttReportInstanceStatus进行上报
timeoutCheck
private void timeoutCheck() { | |
if (taskStartTime == null || System.currentTimeMillis() - taskStartTime < instanceInfo.getInstanceTimeoutMS()) { | |
return; | |
} | |
if (finished.get() && result != null) { | |
timeoutCheckScheduledFuture.cancel(true); | |
return; | |
} | |
// 首次判断超时 | |
if (timeoutFlag.compareAndSet(false, true)) { | |
// 超时,仅尝试打断任务 | |
log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},currentTime:{},runningTimeLimit:{}, try to interrupt it.", instanceId, taskStartTime, System.currentTimeMillis(), instanceInfo.getInstanceTimeoutMS()); | |
processFuture.cancel(true); | |
return; | |
} | |
if (finished.get()) { | |
// 已经成功被打断 | |
log.warn("[TaskTracker-{}] task timeout,taskStarTime:{},endTime:{}, interrupt success.", instanceId, taskStartTime, taskEndTime); | |
return; | |
} | |
Thread workerThread = executeThread.get(); | |
if (workerThread == null) { | |
return; | |
} | |
// 未能成功打断任务,强制终止 | |
try { | |
if (tryForceStopThread(workerThread)) { | |
finished.set(true); | |
taskEndTime = System.currentTimeMillis(); | |
result = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP); | |
log.warn("[TaskTracker-{}] task timeout, force stop thread {} success!", instanceId, workerThread.getName()); | |
} | |
} catch (Exception e) { | |
log.warn("[TaskTracker-{}] task timeout,fail to stop thread {}", instanceId, workerThread.getName(), e); | |
} | |
} | |
private boolean tryForceStopThread(Thread thread) { | |
String threadName = thread.getName(); | |
String allowStopThread = System.getProperty(PowerJobDKey.WORKER_ALLOWED_FORCE_STOP_THREAD); | |
if (!StringUtils.equalsIgnoreCase(allowStopThread, Boolean.TRUE.toString())) { | |
log.warn("[TaskTracker-{}] PowerJob not allowed to force stop a thread by config", instanceId); | |
return false; | |
} | |
log.warn("[TaskTracker-{}] fail to interrupt the thread[{}], try to force stop.", instanceId, threadName); | |
try { | |
thread.stop(); | |
return true; | |
} catch (Throwable t) { | |
log.warn("[TaskTracker-{}] stop thread[{}] failed, msg: {}", instanceId, threadName, t.getMessage()); | |
} | |
return false; | |
} |
timeoutCheck先判断是否超时,接着判断是否finished,是则取消当前任务,接着更新timeoutFlag,然后通过processFuture.cancel(true)尝试打断任务;若任务未能成功打断则通过tryForceStopThread强制终止,这里用了thread.stop这个废弃方法
processTask
private ProcessResult processTask() { | |
executeThread.set(Thread.currentThread()); | |
// 设置任务开始执行的时间 | |
taskStartTime = System.currentTimeMillis(); | |
status = TaskStatus.WORKER_PROCESSING; | |
// 开始执行时,提交任务判断是否超时 | |
ProcessResult res = null; | |
do { | |
Thread.currentThread().setContextClassLoader(processorBean.getClassLoader()); | |
if (res != null && !res.isSuccess()) { | |
// 重试 | |
taskContext.setCurrentRetryTimes(taskContext.getCurrentRetryTimes() + 1); | |
log.warn("[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}", instanceId, taskContext.getCurrentRetryTimes()); | |
} | |
try { | |
res = processorBean.getProcessor().process(taskContext); | |
} catch (InterruptedException e) { | |
log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e); | |
Thread.currentThread().interrupt(); | |
if (timeoutFlag.get()) { | |
res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED); | |
} else if (stopFlag.get()) { | |
res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED); | |
} else { | |
res = new ProcessResult(false, e.toString()); | |
} | |
} catch (Exception e) { | |
log.warn("[TaskTracker-{}] process failed !", instanceId, e); | |
res = new ProcessResult(false, e.toString()); | |
} | |
if (res == null) { | |
log.warn("[TaskTracker-{}] processor return null !", instanceId); | |
res = new ProcessResult(false, "Processor return null"); | |
} | |
} while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get()); | |
executeThread.set(null); | |
taskEndTime = System.currentTimeMillis(); | |
finished.set(true); | |
result = res; | |
status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; | |
// 取消超时检查任务 | |
if (timeoutCheckScheduledFuture != null) { | |
timeoutCheckScheduledFuture.cancel(true); | |
} | |
log.info("[TaskTracker-{}] task complete ! create time:{},queue time:{},use time:{},result:{}", instanceId, createTime, taskStartTime - createTime, System.currentTimeMillis() - taskStartTime, result); | |
// 执行完成后立即上报一次 | |
checkAndReportStatus(); | |
return result; | |
} |
processTask通过一个while循环来执行,该循环的条件是处理结果非成功,重试次数小于最大重试次数,任务未超时,stopFlag为false;循环内部执行的是processorBean.getProcessor().process(taskContext),它会捕获InterruptedException及Exception;循环外则更新任务结束时间,取消timeoutCheckScheduledFuture,最后执行checkAndReportStatus进行上报
LightTaskTracker.create
/** | |
* 静态方法创建 TaskTracker | |
* | |
* @param req 服务端调度任务请求 | |
* @return LightTaskTracker | |
*/ | |
public static LightTaskTracker create(ServerScheduleJobReq req, WorkerRuntime workerRuntime) { | |
try { | |
return new LightTaskTracker(req, workerRuntime); | |
} catch (Exception e) { | |
reportCreateErrorToServer(req, workerRuntime, e); | |
} | |
return null; | |
} |
LightTaskTracker提供了静态方法create用于创建LightTaskTracker
TaskTrackerActor
tech/powerjob/worker/actors/TaskTrackerActor.java
public class TaskTrackerActor { | |
private final WorkerRuntime workerRuntime; | |
public TaskTrackerActor(WorkerRuntime workerRuntime) { | |
this.workerRuntime = workerRuntime; | |
} | |
/** | |
* 服务器任务调度处理器 | |
*/ | |
public void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { | |
log.debug("[TaskTrackerActor] server schedule job by request: {}.", req); | |
Long instanceId = req.getInstanceId(); | |
// 区分轻量级任务模型以及重量级任务模型 | |
if (isLightweightTask(req)) { | |
final LightTaskTracker taskTracker = LightTaskTrackerManager.getTaskTracker(instanceId); | |
if (taskTracker != null) { | |
log.warn("[TaskTrackerActor] LightTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId); | |
return; | |
} | |
// 判断是否已经 overload | |
if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() * LightTaskTrackerManager.OVERLOAD_FACTOR) { | |
// ignore this request | |
log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={}),current size = {}!",instanceId,LightTaskTrackerManager.currentTaskTrackerSize()); | |
return; | |
} | |
if (LightTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxLightweightTaskNum()) { | |
log.warn("[TaskTrackerActor] this worker will be overload soon,current size = {}!",LightTaskTrackerManager.currentTaskTrackerSize()); | |
} | |
// 创建轻量级任务 | |
LightTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> LightTaskTracker.create(req, workerRuntime)); | |
} else { | |
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(instanceId); | |
if (taskTracker != null) { | |
log.warn("[TaskTrackerActor] HeavyTaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId); | |
return; | |
} | |
// 判断是否已经 overload | |
if (HeavyTaskTrackerManager.currentTaskTrackerSize() >= workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum()) { | |
// ignore this request | |
log.warn("[TaskTrackerActor] this worker is overload,ignore this request(instanceId={})! current size = {},", instanceId, HeavyTaskTrackerManager.currentTaskTrackerSize()); | |
return; | |
} | |
// 原子创建,防止多实例的存在 | |
HeavyTaskTrackerManager.atomicCreateTaskTracker(instanceId, ignore -> HeavyTaskTracker.create(req, workerRuntime)); | |
} | |
} | |
//...... | |
} |
TaskTrackerActor的path为taskTracker,它用于处理server的jobInstance请求和worker的task请求;其onReceiveServerScheduleJobReq方法的path为runJob,它接收ServerScheduleJobReq,然后通过isLightweightTask判断任务模型,是轻量级任务的话,则通过LightTaskTrackerManager.getTaskTracker(instanceId)获取taskTracker,接着判断当前实例的LightTaskTracker数量是否过多,过多则直接返回;最后通过LightTaskTrackerManager.atomicCreateTaskTracker来维护instanceId与LightTaskTracker的关系,若不存在则通过LightTaskTracker.create(req, workerRuntime)创建LightTaskTracker
isLightweightTask
private boolean isLightweightTask(ServerScheduleJobReq serverScheduleJobReq) { | |
final ExecuteType executeType = ExecuteType.valueOf(serverScheduleJobReq.getExecuteType()); | |
// 非单机执行的一定不是 | |
if (executeType != ExecuteType.STANDALONE){ | |
return false; | |
} | |
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(serverScheduleJobReq.getTimeExpressionType()); | |
// 固定频率以及固定延迟的也一定不是 | |
return timeExpressionType != TimeExpressionType.FIXED_DELAY && timeExpressionType != TimeExpressionType.FIXED_RATE; | |
} |
isLightweightTask的判断逻辑是如果executeType不是单机类型则不是轻量级任务,接着判断serverScheduleJobReq的timeExpressionType,类型不是FIXED_DELAY也不是FIXED_RATE的才是轻量级任务
小结
LightTaskTracker继承了TaskTracker,其构造器根据ServerScheduleJobReq创建ProcessorDefinition,然后使用workerRuntime.getProcessorLoader().load方法进行加载,之后调度checkAndReportStatus、timeoutCheck;最后执行processTask(没有把任务处理放到start方法,这些都在构造器里执行了
);TaskTrackerActor的path为taskTracker,它用于处理server的jobInstance请求和worker的task请求;其onReceiveServerScheduleJobReq方法的path为runJob,它接收ServerScheduleJobReq,用于根据请求的instanceId来创建和执行LightTaskTracker(通过ConcurrentHashMap来维护instanceId与LightTaskTracker的关系,避免重复执行
)。