`
jandyfish
  • 浏览: 15641 次
社区版块
存档分类
最新评论

spring的cron定时任务解析

阅读更多
在用spring定时任务时,配置如下:
<task:executor id="executor" pool-size="5" />
	<task:scheduler id="scheduler" pool-size="10" />
	<task:annotation-driven executor="executor" scheduler="scheduler" />


配置项schema为spring-task-3.0.xsd。
对某方法定义cron如下:
@Scheduled(cron = "0 */5 * * * ?")
	pulic void run(){
		//other codes…
}

因run方法是每5分钟跑一次,而run方法中具有复杂的业务逻辑(大概包括调用第三方httpservice,导数据到内存计算,再分次批量写库)。Task可能会运行超过5分钟,此种情况下,预期目标是和linux cron一样,定点执行task,而不用理会前一个task是否已完成。

运行一段时间后,发现少了几个点的数据,然后针对数据查看了相应的日志,发现每次都只会有一个shedule线程跑同一个run方法,而这个run方法处于wait状态或5分钟内没跑完,会导致后续的task不能按时启动。

查看spring源码,shedule流程初始化如下:
1.在bean初始化后,通过反射机制,对实例化的bean的class进行注解扫描,判断bean的class定义中@Scheduled注解。若有则判断调度模式,如本题中为cron调度模式。此实现代码位于:
ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization
在扫描到@Scheduled的方法后,把每一个目标method封装为Runnable,代码如下:
MethodInvokingRunnable runnable = new MethodInvokingRunnable();
					runnable.setTargetObject(bean);
					runnable.setTargetMethod(method.getName());
					runnable.setArguments(new Object[0]);
					try {
						runnable.prepare();
					}
					catch (Exception ex) {
						throw new IllegalStateException("failed to prepare task", ex);
					}


此postProcessAfterInitialization是维护crontasks,fixedRateTasks容器,针对每一个目标method生成相应的MethodInvokingRunnable对实例。

2.针对cron任务,对每一个MethodInvokingRunnable注册相应的CronTrigger,并通过ConcurrentTaskSchedule.shedule创建ReschedulingRunnable执行器并解析cron表达式添加第1个task。
代码实现在ScheduledTaskRegistrar. afterPropertiesSet。

3. ReschedulingRunnable自身 串联schdule。如下代码所示:
public ScheduledFuture schedule() {
		synchronized (this.triggerContextMonitor) {
			this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
			if (this.scheduledExecutionTime == null) {
				return null;
			}
			long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
			this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
			return this;
		}
	}

	@Override
	public void run() {
		Date actualExecutionTime = new Date();
		super.run();
		Date completionTime = new Date();
		synchronized (this.triggerContextMonitor) {
			this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
		}
		if (!this.currentFuture.isCancelled()) {
			schedule();
		}
	}


其中trigger.nextExecutionTime即获取下一个task的执行时间,并提交到executor执行器。

而见run中,是执行了当前的task后再根据corn表达式获取下一个时间点。如此若当前task超过5分钟则下一个5分钟时间点的任务不会被执行。而想要达到linux cron形式的task调度,则需要在此基础之上进行简单处理。
处理办法:
额外创建一个线程池executor如名称为A,在cron方法中提交runnable给这个A执行。
这样的情况可以满足和linux cron一样的定时需求,但得对task有独立性要求。若前后有存在依赖,或者可能导致数据一致性问题还是得慎重。

ps:依据spring中根据bean来解析相应class的schedule注解,如果相应class的实例为多个,就会导致同时有多个相同的task执行。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics