定时任务重启后执行策略_quartz定时任务框架调度机制解析

定时任务重启后执行策略_quartz定时任务框架调度机制解析

2023年7月13日发(作者:)

定时任务重启后执⾏策略_quartz定时任务框架调度机制解析quartz2.2.1集群调度机制调研及源码分析引⾔quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执⾏过程:总结:附:引⾔1.单独启动⼀个Job Server来跑job,不部署在web容器中.其他web节点当需要启动异步任务的时候,可以通过种种⽅式(DB, JMS, WebService, etc)通知Job Server,⽽Job Server收到这个通知之后,把异步任务加载到⾃⼰的任务队列中去。2.独⽴出⼀个job server,这个server上跑⼀个spring+quartz的应⽤,这个应⽤专门⽤来启动任务。在jobserver上加上hessain,得到业务接⼝,这样jobserver就可以调⽤web container中的业务操作,也就是正真执⾏任务的还是在cluster中的tomcat。在jobserver启动定时任务之后,轮流调⽤各地址上的业务操作(类似apache分发tomcat⼀样),这样可以让不同的定时任务在不同的节点上运⾏,减低了⼀台某个node的压⼒本⾝事实上也是⽀持集群的。在这种⽅案下,cluster上的每⼀个node都在跑quartz,然后也是通过数据中记录的状态来判断这个操作是否正在执⾏,这就要求cluster上所有的node的时间应该是⼀样的。⽽且每⼀个node都跑应⽤就意味着每⼀个node都需要有⾃⼰的线程池来跑quartz.总的来说,第⼀种⽅法,在单独的server上执⾏任务,对任务的适⽤范围有很⼤的限制,要访问在web环境中的各种资源⾮常⿇烦.但是集中式的管理容易从架构上规避了分布式环境的种种同步问题.第⼆种⽅法在在第⼀种⽅法的基础上减轻了jobserver的重量,只发送调⽤请求,不直接执⾏任务,这样解决了独⽴server⽆法访问web环境的问题,⽽且可以做到节点的轮询.可以有效地均衡负载.第三种⽅案是quartz⾃⾝⽀持的集群⽅案,在架构上完全是分布式的,没有集中的管理,quratz通过数据库锁以及标识字段保证多个节点对任务不重复获取,并且有负载平衡机制和容错机制,⽤少量的冗余,换取了⾼可⽤性(high avilable HA)和⾼可靠性.(个⼈认为和git的机制有异曲同⼯之处,分布式的冗余设计,换取可靠性和速度).本⽂旨在研究quratz为解决分布式任务调度中存在的防⽌重复执⾏和负载均衡等问题⽽建⽴的机制.以调度流程作为顺序,配合源码理解其中原理.quratz的配置,及具体应⽤请参考CRM项⽬组的另⼀篇⽂章:CRM使⽤Quartz集群总结分享quartz集群架构quartz的分布式架构如上图,可以看到数据库是各节点上调度器的枢纽.各个节点并不感知其他节点的存在,只是通过数据库来进⾏间接的沟通.实际上,quartz的分布式策略就是⼀种以数据库作为边界资源的并发策略.每个节点都遵守相同的操作规范,使得对数据库的操作可以串⾏执⾏.⽽不同名称的调度器⼜可以互不影响的并⾏运⾏.组件间的通讯图如下:(*注:主要的sql语句附在⽂章最后)quartz运⾏时由QuartzSchedulerThread类作为主体,循环执⾏调度流程。JobStore作为中间层,按照quartz的并发策略执⾏数据库操作,完成主要的调度逻辑。JobRunShellFactory负责实例化JobDetail对象,将其放⼊线程池运⾏。LockHandler负责获取LOCKS表中的数据库锁。整个quartz对任务调度的时序⼤致如下:梳理⼀下其中的流程,可以表⽰为:0.调度器线程run()1.获取待触发trigger1.1数据库LOCKS表TRIGGER_ACCESS⾏加锁1.2读取JobDetail信息1.3读取trigger表中触发器信息并标记为"已获取"1.4commit事务,释放锁2.触发trigger2.1数据库LOCKS表STATE_ACCESS⾏加锁2.2确认trigger的状态2.3读取trigger的JobDetail信息2.4读取trigger的Calendar信息2.3更新trigger信息2.3commit事务,释放锁3实例化并执⾏Job3.1从线程池获取线程执⾏JobRunShell的run⽅法可以看到,这个过程中有两个相似的过程:同样是对数据表的更新操作,同样是在执⾏操作前获取锁 操作完成后释放锁.这⼀规则可以看做是quartz解决集群问题的核⼼思想.规则流程图:进⼀步解释这条规则就是:⼀个调度器实例在执⾏涉及到分布式问题的数据库操作前,⾸先要获取QUARTZ2_LOCKS表中对应当前调度器的⾏级锁,获取锁后即可执⾏其他表中的数据库操作,随着操作事务的提交,⾏级锁被释放,供其他调度器实例获取.集群中的每⼀个调度器实例都遵循这样⼀种严格的操作规程,那么对于同⼀类调度器来说,每个实例对数据库的操作只能是串⾏的.⽽不同名的调度器之间却可以并⾏执⾏.下⾯我们深⼊源码,从微观上观察quartz集群调度的细节调度器实例化⼀个最简单的quartz helloworld应⽤如下:public class HelloWorldMain {Log log = ();public void run() {try {//取得Schedule对象SchedulerFactory sf = new StdSchedulerFactory();Scheduler sch = eduler();JobDetail jd = new JobDetail("HelloWorldJobDetail",T_GROUP,);Trigger tg = nutelyTrigger(1);e("HelloWorldTrigger");leJob(jd, tg);();} catch ( Exception e ) {tackTrace();}}public static void main(String[] args) {HelloWorldMain hw = new HelloWorldMain();();}}我们看到初始化⼀个调度器需要⽤⼯⼚类获取实例:然后启动:下⾯跟进StdSchedulerFactory的getScheduler()⽅法:public Scheduler getScheduler() throws SchedulerException {if (cfg == null) {initialize();}SchedulerRepository schedRep = tance();//从"调度器仓库"中根据properties的SchedulerName配置获取⼀个调度器实例Scheduler sched = (getSchedulerName());if (sched != null) {if (down()) {(getSchedulerName());} else {return sched;}}//初始化调度器sched = instantiate();return sched;}跟进初始化调度器⽅法sched = instantiate();发现是⼀个700多⾏的初始化⽅法,涉及到读取配置资源,⽣成QuartzScheduler对象,创建该对象的运⾏线程,并启动线程;初始化JobStore,QuartzScheduler,DBConnectionManager等重要组件,⾄此,调度器的初始化⼯作已完成,初始化⼯作中quratz读取了数据库中存放的对应当前调度器的锁信息,对应CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS两个LOCK_ void initialize(ClassLoadHelper loadHelper,SchedulerSignaler signaler) throws SchedulerConfigException {if (dsName == null) {throw new SchedulerConfigException("DataSource name not set.");}classLoadHelper = loadHelper;if(isThreadsInheritInitializersClassLoadContext()) {("JDBCJobStore threads will inherit ContextClassLoader of thread: " + tThread().getName());initializersLoader = tThread().getContextClassLoader();}ignaler = signaler;// If the user hasn't specified an explicit lock handler, then// choose one based on CMT/Clustered/ (getLockHandler() == null) {// If the user hasn't specified an explicit lock handler,// then we *must* use DB locks with clusteringif (isClustered()) {setUseDBLocks(true);}if (getUseDBLocks()) {if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(e())) {if(getSelectWithLockSQL() == null) {//读取数据库LOCKS表中对应当前调度器的锁信息String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1}AND LOCK_NAME = ?";getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");setSelectWithLockSQL(msSqlDflt);}}getLog().info("Using db table-based data access locking (synchronization).");setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));} else {getLog().info("Using thread monitor-based data access locking (synchronization).");setLockHandler(new SimpleSemaphore());}}}当调⽤();⽅法时,scheduler做了如下⼯作:1.通知listener开始启动2.启动调度器线程3.启动plugin4.通知listener启动完成public void start() throws SchedulerException {if (shuttingDown|| closed) {throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");}// QTZ-212 : calling new schedulerStarting() method on the listeners// right after entering start()//通知该调度器的listener启动开始notifySchedulerListenersStarting();if (initialStart == null) {initialStart = new Date();//启动调度器的线程Store().schedulerStarted();//启动pluginsstartPlugins();} else {Store().schedulerResumed();}Pause(false);getLog().info("Scheduler " + queIdentifier() + " started.");//通知该调度器的listener启动完成notifySchedulerListenersStarted();}调度过程调度器启动后,调度器的线程就处于运⾏状态了,开始执⾏quartz的主要⼯作–调度任务.前⾯已介绍过,任务的调度过程⼤致分为三步:1.获取待触发trigger2.触发trigger3.实例化并执⾏Job下⾯分别分析三个阶段的源码.QuartzSchedulerThread是调度器线程类,调度过程的三个步骤就承载在run()⽅法中,分析见代码注释:按 Ctrl+C 复制代码按 Ctrl+C 复制代码调度器每次获取到的trigger是30s内需要执⾏的,所以要等待⼀段时间⾄trigger执⾏前2ms.在等待过程中涉及到⼀个新加进来更紧急的trigger的处理逻辑.分析写在注释中,不再赘述.可以看到调度器的只要在运⾏状态,就会不停地执⾏调度流程.值得注意的是,在流程的最后线程会等待⼀个随机的时间.这就是quartz⾃带的负载平衡机制.以下是三个步骤的跟进:触发器的获取调度器调⽤:在数据库中查找⼀定时间范围内将会被触发的trigger.参数的意义如下:参数1:nolaterthan = now+3000ms,即未来30s内将会被触发.参数2 最⼤获取数量,⼤⼩取线程池线程剩余量与定义值得较⼩者.参数3 时间窗⼝ 默认为0,程序会在nolaterthan后加上窗⼝⼤⼩来选择会在每次触发trigger后计算出trigger下次要执⾏的时间,并在数据库QRTZ2_TRIGGERS中的NEXT_FIRE_TIME字段中记录.查找时将当前毫秒数与该字段⽐较,就能找出下⼀段时间内将会触发的触发器.查找时,调⽤在JobStoreSupport类中的⽅法:public List acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)throws JobPersistenceException {String lockName;if(isAcquireTriggersWithinLock() || maxCount > 1) {lockName = LOCK_TRIGGER_ACCESS;} else {lockName = null;}return executeInNonManagedTXLock(lockName,new TransactionCallback>() {public List execute(Connection conn) throws JobPersistenceException {return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);}},new TransactionValidator>() {public Boolean validate(Connection conn, List result) throws JobPersistenceException {//...异常处理回调⽅法}});}该⽅法关键的⼀点在于执⾏了executeInNonManagedTXLock()⽅法,这⼀⽅法指定了⼀个锁名,两个回调函数.在开始执⾏时获得锁,在⽅法执⾏完毕后随着事务的提交锁被释放.在该⽅法的底层,使⽤ for update语句,在数据库中加⼊⾏级锁,保证了在该⽅法执⾏过程中,其他的调度器对trigger进⾏获取时将会等待该调度器释放该锁.此⽅法是前⾯介绍的quartz集群策略的的具体实现,这⼀模板⽅法在后⾯的trigger触发过程还会被使⽤.进⼀步解释:quratz在获取数据库资源之前,先要以for update⽅式访问LOCKS表中相应LOCK_NAME数据将改⾏锁定.如果在此前该⾏已经被锁定,那么等待,如果没有被锁定,那么读取满⾜要求的trigger,并把它们的status置为STATE_ACQUIRED,如果有tirgger已被置为STATE_ACQUIRED,那么说明该trigger已被别的调度器实例认领,⽆需再次认领,调度器会忽略此trigger.调度器实例之间的间接通信就体现在这⾥.eNextTrigger()⽅法中:int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED,STATE_WAITING);最后释放锁,这时如果下⼀个调度器在排队获取trigger的话,则仍会执⾏相同的步骤.这种机制保证了trigger不会被重复获取.按照这种算法正常运⾏状态下调度器每次读取的trigger中会有相当⼀部分已被标记为被获取.获取trigger的过程进⾏完毕.触发trigger:QuartzSchedulerThread line336:List res = Store().triggersFired(triggers);调⽤JobStoreSupport类的triggersFired()⽅法:public List triggersFired(final List triggers) throws JobPersistenceException {return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,new TransactionCallback>() {public List execute(Connection conn) throws JobPersistenceException {List results = new ArrayList();TriggerFiredResult result;for (OperableTrigger trigger : triggers) {try {TriggerFiredBundle bundle = triggerFired(conn, trigger);result = new TriggerFiredResult(bundle);} catch (JobPersistenceException jpe) {result = new TriggerFiredResult(jpe);} catch(RuntimeException re) {result = new TriggerFiredResult(re);}(result);}return results;}},new TransactionValidator>() {@Overridepublic Boolean validate(Connection conn, List result) throws JobPersistenceException {//...异常处理回调⽅法}});}此处再次⽤到了quratz的⾏为规范:executeInNonManagedTXLock()⽅法,在获取锁的情况下对trigger进⾏触发操作.其中的触发细节如下:protected TriggerFiredBundle triggerFired(Connection conn,OperableTrigger trigger)throws JobPersistenceException {JobDetail job;Calendar cal = null;// Make sure trigger wasn't deleted, paused, { // if trigger was deleted, state will be STATE_DELETEDString state = getDelegate().selectTriggerState(conn,());if (!(STATE_ACQUIRED)) {return null;}} catch (SQLException e) {throw new JobPersistenceException("Couldn't select trigger state: "+ sage(), e);}try {job = retrieveJob(conn, Key());if (job == null) { return null; }} catch (JobPersistenceException jpe) {try {getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);getDelegate().updateTriggerState(conn, (),STATE_ERROR);} catch (SQLException sqle) {getLog().error("Unable to set trigger state to ERROR.", sqle);}throw jpe;}if (endarName() != null) {cal = retrieveCalendar(conn, endarName());if (cal == null) { return null; }}try {getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);} catch (SQLException e) {throw new JobPersistenceException("Couldn't insert fired trigger: "+ sage(), e);}Date prevFireTime = viousFireTime();// call triggered - to update the trigger's d(cal);String state = STATE_WAITING;boolean force = true;if (urrentExectionDisallowed()) {state = STATE_BLOCKED;force = false;try {getDelegate().updateTriggerStatesForJobFromOtherState(conn, (),STATE_BLOCKED, STATE_WAITING);getDelegate().updateTriggerStatesForJobFromOtherState(conn, (),STATE_BLOCKED, STATE_ACQUIRED);getDelegate().updateTriggerStatesForJobFromOtherState(conn, (),STATE_PAUSED_BLOCKED, STATE_PAUSED);} catch (SQLException e) {throw new JobPersistenceException("Couldn't update states of blocked triggers: "+ sage(), e);}}if (tFireTime() == null) {state = STATE_COMPLETE;force = true;}storeTrigger(conn, trigger, job, true, state, force, false);DataMap().clearDirtyFlag();return new TriggerFiredBundle(job, trigger, cal, ().getGroup().equals(T_RECOVERY_GROUP), new Date(), viousFireTime(), prevFireTime, tFireTime());}该⽅法做了以下⼯作:1.获取trigger当前状态2.通过trigger中的JobKey读取trigger包含的Job信息3.将trigger更新⾄触发状态4.结合calendar的信息触发trigger,涉及多次状态更新5.更新数据库中trigger的信息,包括更改状态⾄STATE_COMPLETE,及计算下⼀次触发时间.6.返回trigger触发结果的数据传输类TriggerFiredBundle从该⽅法返回后,trigger的执⾏过程已基本完毕.回到执⾏quratz操作规范的executeInNonManagedTXLock⽅法,将数据库锁释放.trigger触发操作完成Job执⾏过程:再回到线程类QuartzSchedulerThread的 line353这时触发器都已出发完毕,job的详细信息都已就位QuartzSchedulerThread line:368为每个Job⽣成⼀个可运⾏的RunShell,并放⼊线程池运⾏.在最后调度线程⽣成了⼀个随机的等待时间,进⼊短暂的等待,这使得其他节点的调度器都有机会获取数据库资源.如此就实现了quratz的负载平衡.这样⼀次完整的调度过程就结束了.调度器线程进⼊下⼀次循环.总结:简单地说,quartz的分布式调度策略是以数据库为边界资源的⼀种异步策略.各个调度器都遵守⼀个基于数据库锁的操作规则保证了操作的唯⼀性.同时多个节点的异步运⾏保证了服务的可靠.但这种策略有⾃⼰的局限性.摘录官⽅⽂档中对quratz集群特性的说明:Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fireevery 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancingmechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load overmultiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioningthe set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makesuse of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about threenodes - depending upon your database's capabilities, etc.).说明指出,集群特性对于⾼cpu使⽤率的任务效果很好,但是对于⼤量的短任务,各个节点都会抢占数据库锁,这样就出现⼤量的线程等待资源.这种情况随着节点的增加会越来越严重.附:通讯图中关键步骤的主要sql语句: TRIGGER_ACCESS from QRTZ2_LOCKS for TRIGGER_NAME,TRIGGER_GROUP,NEXT_FIRE_TIME,PRIORITYFROM QRTZ2_TRIGGERSWHERE SCHEDULER_NAME = 'CRMscheduler'AND TRIGGER_STATE = 'ACQUIRED'AND NEXT_FIRE_TIME <= '{timekey 30s latter}'AND ( MISFIRE_INSTR = -1OR ( MISFIRE_INSTR != -1AND NEXT_FIRE_TIME >= '{timekey now}' ) )ORDER BY NEXT_FIRE_TIME ASC,PRIORITY DESC; *FROM QRTZ2_JOB_DETAILSWHERE SCHEDULER_NAME = CRMschedulerAND JOB_NAME = ?AND JOB_GROUP = ?; TQRTZ2_TRIGGERSSET TRIGGER_STATE = 'ACQUIRED'WHERE SCHED_NAME = 'CRMscheduler'AND TRIGGER_NAME = '{triggerName}'AND TRIGGER_GROUP = '{triggerGroup}'AND TRIGGER_STATE = 'waiting'; INTO QRTZ2_FIRED_TRIGGERS(SCHEDULER_NAME,ENTRY_ID,TRIGGER_NAME,TRIGGER_GROUP,INSTANCE_NAME,FIRED_TIME,SCHED_TIME,STATE,JOB_NAME,JOB_GROUP,IS_NONCONCURRENT,REQUESTS_RECOVERY,PRIORITY)VALUES( 'CRMscheduler', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);; STAT_ACCESS from QRTZ2_LOCKS for TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = AND TRIGGER_GROUP = ; TRIGGER_STATEFROM QRTZ2_TRIGGERSWHERE SCHEDULER_NAME = 'CRMscheduler'AND TRIGGER_NAME = ?AND TRIGGER_GROUP = ?; *FROM QRTZ2_JOB_DETAILSWHERE SCHEDULER_NAME = CRMschedulerAND JOB_NAME = ?AND JOB_GROUP = ?; *FROM QRTZ2_CALENDARSWHERE SCHEDULER_NAME = 'CRMscheduler'AND CALENDAR_NAME = ?; QRTZ2_FIRED_TRIGGERSSET INSTANCE_NAME = ?,FIRED_TIME = ?,SCHED_TIME = ?,ENTRY_STATE = ?,JOB_NAME = ?,JOB_GROUP = ?,IS_NONCONCURRENT = ?,REQUESTS_RECOVERY = ?WHERE SCHEDULER_NAME = 'CRMscheduler'AND ENTRY_ID = ?; TQRTZ2_TRIGGERSSET TRIGGER_STATE = ?WHERE SCHED_NAME = 'CRMscheduler'AND TRIGGER_NAME = '{triggerName}'AND TRIGGER_GROUP = '{triggerGroup}'AND TRIGGER_STATE = ?; QRTZ2_TRIGGERSSET JOB_NAME = ?,JOB_GROUP = ?,DESCRIPTION = ?,NEXT_FIRE_TIME = ?,PREV_FIRE_TIME = ?,TRIGGER_STATE = ?,TRIGGER_TYPE = ?,START_TIME = ?,END_TIME = ?,CALENDAR_NAME = ?,MISFIRE_INSTRUCTION = ?,PRIORITY = ?,JOB_DATAMAP = ?WHERE SCHEDULER_NAME = SCHED_NAME_SUBSTAND TRIGGER_NAME = ?AND TRIGGER_GROUP = ?;;

发布者:admin,转转请注明出处:http://www.yc00.com/web/1689246279a225610.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信