一文揭秘定时任务调度框架quartz

一文揭秘定时任务调度框架quartz

2023年7月21日发(作者:)

⼀⽂揭秘定时任务调度框架quartz之前写过quartz或者引⽤过quartz的⼀些⽂章,有很多⼈给我发消息问quartz的相关问题,趁着年底⽐较清闲,把quartz的问题整理了⼀下,顺带翻了翻源码,做了⼀些总结,希望能帮助到⼀些⼈或者减少⼈们探索的时间。注意,使⽤版本为quartz2.2.3 spring 的核⼼组件 1.1 Job组件1.1.1JobJob负责任务执⾏的逻辑,所有逻辑在execute()⽅法中,执⾏所需要的数据存放在JobExecutionContext 中Job实例:@PersistJobDataAfterExecution@DisallowConcurrentExecutionpublic class ColorJob implements Job { private static Logger _log = ger();

// parameter names specific to this job public static final String FAVORITE_COLOR = "favorite color"; public static final String EXECUTION_COUNT = "count";

// Since Quartz will re-instantiate a class every time it // gets executed, members non-static member variables can // not be used to maintain state! private int _counter = 1; /** *

* Empty constructor for job initialization *

*

* Quartz requires a public empty constructor so that the * scheduler can instantiate the class whenever it needs. *

*/ public ColorJob() { } /** *

* Called by the {@link ler} when a * {@link r} fires that is associated with * the Job. *

*

* @throws JobExecutionException * if there is an exception while executing the job. */ public void execute(JobExecutionContext context) throws JobExecutionException { // This job simply prints out its job name and the // date and time that it is running JobKey jobKey = Detail().getKey();

// Grab and print passed parameters JobDataMap data = Detail().getJobDataMap(); String favoriteColor = ing(FAVORITE_COLOR); int count = (EXECUTION_COUNT); _("ColorJob: " + jobKey + " executing at " + new Date() + "n" + " favorite color is " + favoriteColor + "n" +

" execution count (from job map) is " + count + "n" +

" execution count (from job member variable) is " + _counter);

// increment the count and store it back into the

// job map so that job state can be properly maintained count++; (EXECUTION_COUNT, count);

// Increment the local member variable

// This serves no real purpose since job state can not

// be maintained via member variables! _counter++; }}1.1.2 JobDetail存储Job的信息 主要负责1.指定执⾏的Job类,唯⼀标识(job名称和组别 名称)2.存储JobDataMap信息 // job1 will only run 5 times (at start time, plus 4 repeats), every 10 seconds JobDetail job1 = newJob().withIdentity("job1", "group1").build(); // pass initialization parameters into the job DataMap().put(TE_COLOR, "Green"); DataMap().put(ION_COUNT, 1); 数据库存储如下:1.1.3 Quartz JobBuilder提供了⼀个链式api创建JobDetail@Beanpublic JobDetail jobDetail() { return ().ofType() .storeDurably() .withIdentity("Qrtz_Job_Detail")

.withDescription("Invoke Sample ") .build();}1.1.4 Spring JobDetailFactoryBean spring提供的⼀个创建JobDetail的⽅式⼯⼚bean@Beanpublic JobDetailFactoryBean jobDetail() { JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean(); Class(); cription("Invoke Sample "); ability(true); return jobDetailFactory;}

1.2 Trigger组件

trigger的状态不同trigger的状态 // STATES String STATE_WAITING = "WAITING"; String STATE_ACQUIRED = "ACQUIRED"; String STATE_EXECUTING = "EXECUTING"; String STATE_COMPLETE = "COMPLETE"; String STATE_BLOCKED = "BLOCKED"; String STATE_ERROR = "ERROR"; String STATE_PAUSED = "PAUSED"; String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED"; String STATE_DELETED = "DELETED";状态的表结构trigger的类型 // TRIGGER TYPES /** Simple Trigger type. */ String TTYPE_SIMPLE = "SIMPLE"; /** Cron Trigger type. */ String TTYPE_CRON = "CRON"; /** Calendar Interval Trigger type. */ String TTYPE_CAL_INT = "CAL_INT"; /** Daily Time Interval Trigger type. */ String TTYPE_DAILY_TIME_INT = "DAILY_I"; /** A general blob Trigger type. */ String TTYPE_BLOB = "BLOB";对应表结构

1.2.1 trigger实例 SimpleTrigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startAt(startTime) .withSchedule(simpleSchedule().withIntervalInSeconds(10).withRepeatCount(4)).build();Trigger存储在mysql中 1.2.2 Quartz TriggerBuilder提供了⼀个链式创建Trigger的api@Beanpublic Trigger trigger(JobDetail job) { return gger().forJob(job) .withIdentity("Qrtz_Trigger") .withDescription("Sample trigger") .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1)) .build();}1.2.3 Spring SimpleTriggerFactoryBean spring提供的⼀个创建SimpleTrigger的⼯⼚类@Beanpublic SimpleTriggerFactoryBean trigger(JobDetail job) { SimpleTriggerFactoryBean trigger = new SimpleTriggerFactoryBean(); Detail(job); eatInterval(3600000); eatCount(_INDEFINITELY); return trigger;}1.3 调度组件1.3.1 quartz提供的⼯⼚类@Beanpublic Scheduler scheduler(Trigger trigger, JobDetail job) { StdSchedulerFactory factory = new StdSchedulerFactory(); lize(new ClassPathResource("ties").getInputStream());

Scheduler scheduler = eduler(); Factory(springBeanJobFactory()); leJob(job, trigger);

(); return scheduler;}1.3.2 spring提供的⼯⼚bean@Beanpublic SchedulerFactoryBean scheduler(Trigger trigger, JobDetail job) { SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean(); figLocation(new ClassPathResource("ties"));

Factory(springBeanJobFactory()); Details(job); ggers(trigger); return schedulerFactory;}2.⼯作原理 2.1 核⼼类QuartzSchedulerScheduler实现类StdScheduler封装了核⼼⼯作类QuartzScheduler /** *

* Construct a StdScheduler instance to proxy the given * QuartzScheduler instance, and with the given SchedulingContext. *

*/ public StdScheduler(QuartzScheduler sched) { = sched; } 2.2 JobDetail的存取 public void addJob(JobDetail jobDetail, boolean replace, boolean storeNonDurableWhileAwaitingScheduling) throws SchedulerException { validateState(); if (!storeNonDurableWhileAwaitingScheduling && !ble()) { throw new SchedulerException( "Jobs added with no trigger must be durable."); } Store().storeJob(jobDetail, replace); notifySchedulerThread(0L); notifySchedulerListenersJobAdded(jobDetail); }2.2.1 存储JobDetail信息(以mysql Jdbc⽅式为例) /** *

* Insert or update a job. *

*/ protected void storeJob(Connection conn,

JobDetail newJob, boolean replaceExisting) throws JobPersistenceException { boolean existingJob = jobExists(conn, ()); try { if (existingJob) { if (!replaceExisting) {

throw new ObjectAlreadyExistsException(newJob);

} getDelegate().updateJobDetail(conn, newJob); } else { getDelegate().insertJobDetail(conn, newJob); } } catch (IOException e) { throw new JobPersistenceException("Couldn't store job: " + sage(), e); } catch (SQLException e) { throw new JobPersistenceException("Couldn't store job: " + sage(), e); } }调⽤StdJDBCDelegate实现 /** *

* Insert the job detail record. *

*

* @param conn * the DB Connection * @param job * the job to insert * @return number of rows inserted * @throws IOException * if there were problems serializing the JobDataMap */ public int insertJobDetail(Connection conn, JobDetail job) throws IOException, SQLException { ByteArrayOutputStream baos = serializeJobData(DataMap()); PreparedStatement ps = null; int insertResult = 0; try { ps = eStatement(rtp(INSERT_JOB_DETAIL)); ing(1, ().getName()); ing(2, ().getGroup()); ing(3, cription()); ing(4, Class().getName()); setBoolean(ps, 5, ble()); setBoolean(ps, 6, urrentExectionDisallowed()); setBoolean(ps, 7, istJobDataAfterExecution()); setBoolean(ps, 8, tsRecovery()); setBytes(ps, 9, baos); insertResult = eUpdate(); } finally { closeStatement(ps); } return insertResult; }注意:JobDataMap序列化后以Blob形式存储到数据库中StdJDBCConstants中执⾏sql如下: String INSERT_JOB_DETAIL = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_JOB_DETAILS + " ("

+ COL_SCHEDULER_NAME + ", " + COL_JOB_NAME + ", " + COL_JOB_GROUP + ", " + COL_DESCRIPTION + ", " + COL_JOB_CLASS + ", " + COL_IS_DURABLE + ", "

+ COL_IS_NONCONCURRENT + ", " + COL_IS_UPDATE_DATA + ", "

+ COL_REQUESTS_RECOVERY + ", " + COL_JOB_DATAMAP + ") " + " VALUES(" + SCHED_NAME_SUBST + ", ?, ?, ?, ?, ?, ?, ?, ?, ?)";2.2.2 查询JobDetail 强调⼀下,因JobDetail中的JobDataMap是以Blob形式存放到数据库中的(也可以通过useProperties属性修改成string存储,默认是false,Blob形式存储),所以查询时需要特殊处理:/** *

* Select the JobDetail object for a given job name / group name. *

*

* @param conn * the DB Connection * @return the populated JobDetail object * @throws ClassNotFoundException * if a class found during deserialization cannot be found or if * the job class could not be found * @throws IOException * if deserialization causes an error */ public JobDetail selectJobDetail(Connection conn, JobKey jobKey, ClassLoadHelper loadHelper) throws ClassNotFoundException, IOException, SQLException { PreparedStatement ps = null; ResultSet rs = null; try { ps = eStatement(rtp(SELECT_JOB_DETAIL)); ing(1, e()); ing(2, up()); rs = eQuery(); JobDetailImpl job = null; if (()) { job = new JobDetailImpl(); e(ing(COL_JOB_NAME)); up(ing(COL_JOB_GROUP)); cription(ing(COL_DESCRIPTION)); Class( ass(ing(COL_JOB_CLASS), )); ability(getBoolean(rs, COL_IS_DURABLE)); uestsRecovery(getBoolean(rs, COL_REQUESTS_RECOVERY)); Map map = null; if (canUseProperties()) { map = getMapFromProperties(rs); } else { map = (Map) getObjectFromBlob(rs, COL_JOB_DATAMAP); } if (null != map) { DataMap(new JobDataMap(map)); } } return job; } finally { closeResultSet(rs); closeStatement(ps); } }2.3 查询trigger /** *

* Retrieve the given {@link r}. *

*

* @return The desired Trigger, or null if there is no * match. */ public OperableTrigger retrieveTrigger(final TriggerKey triggerKey) throws JobPersistenceException { return (OperableTrigger)executeWithoutLock( // no locks necessary new TransactionCallback() { public Object execute(Connection conn) throws JobPersistenceException { return retrieveTrigger(conn, triggerKey); } }); }

protected OperableTrigger retrieveTrigger(Connection conn, TriggerKey key) throws JobPersistenceException { try { return getDelegate().selectTrigger(conn, key); } catch (Exception e) { throw new JobPersistenceException("Couldn't retrieve trigger: " + sage(), e); } } /** *

* Select a trigger. *

*

* @param conn * the DB Connection * @return the {@link r} object * @throws JobPersistenceException

*/ public OperableTrigger selectTrigger(Connection conn, TriggerKey triggerKey) throws SQLException, ClassNotFoundException, IOException, JobPersistenceException { PreparedStatement ps = null; ResultSet rs = null; try { OperableTrigger trigger = null; ps = eStatement(rtp(SELECT_TRIGGER)); ing(1, e()); ing(2, up()); rs = eQuery(); if (()) { String jobName = ing(COL_JOB_NAME); String jobGroup = ing(COL_JOB_GROUP); String description = ing(COL_DESCRIPTION); long nextFireTime = g(COL_NEXT_FIRE_TIME); long prevFireTime = g(COL_PREV_FIRE_TIME); String triggerType = ing(COL_TRIGGER_TYPE); long startTime = g(COL_START_TIME); long endTime = g(COL_END_TIME); String calendarName = ing(COL_CALENDAR_NAME); int misFireInstr = (COL_MISFIRE_INSTRUCTION); int priority = (COL_PRIORITY); Map map = null; if (canUseProperties()) { map = getMapFromProperties(rs); } else { map = (Map) getObjectFromBlob(rs, COL_JOB_DATAMAP); }

Date nft = null; if (nextFireTime > 0) { nft = new Date(nextFireTime); } Date pft = null; if (prevFireTime > 0) { pft = new Date(prevFireTime); } Date startTimeD = new Date(startTime); Date endTimeD = null; if (endTime > 0) { endTimeD = new Date(endTime); } if ((TTYPE_BLOB)) { (); rs = null; (); ps = null; ps = eStatement(rtp(SELECT_BLOB_TRIGGER)); ing(1, e()); ing(2, up()); rs = eQuery(); if (()) { trigger = (OperableTrigger) getObjectFromBlob(rs, COL_BLOB); } } else { TriggerPersistenceDelegate tDel = findTriggerPersistenceDelegate(triggerType);

if(tDel == null) throw new JobPersistenceException("No TriggerPersistenceDelegate for trigger discriminator type: " + triggerType); TriggerPropertyBundle triggerProps = null; try { triggerProps = tendedTriggerProperties(conn, triggerKey); } catch (IllegalStateException isex) { if (isTriggerStillPresent(ps)) { throw isex; } else { // QTZ-386 Trigger has been deleted return null; } } TriggerBuilder tb = newTrigger() .withDescription(description) .withPriority(priority) .startAt(startTimeD) .endAt(endTimeD) .withIdentity(triggerKey) .modifiedByCalendar(calendarName) .withSchedule(eduleBuilder()) .forJob(jobKey(jobName, jobGroup));

if (null != map) { obData(new JobDataMap(map)); }

trigger = (OperableTrigger) ();

fireInstruction(misFireInstr); tFireTime(nft); viousFireTime(pft);

setTriggerStateProperties(trigger, triggerProps); }

} return trigger; } finally { closeResultSet(rs); closeStatement(ps); } }执⾏的sql: String SELECT_TRIGGER = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND " + COL_TRIGGER_NAME + " = ? AND " + COL_TRIGGER_GROUP + " = ?";和JobDetail⼀样,也存在Blob的问题,不再赘述。2.4 调度执⾏线程QuartzSchedulerThread /** *

* The main processing loop of the QuartzSchedulerThread. *

*/ @Override public void run() { boolean lastAcquireFailed = false; while (!()) { try { // check if we're supposed synchronized (sigLock) { while (paused && !()) { try { // wait until togglePause(false) (1000L); } catch (InterruptedException ignore) { } } if (()) { break; } } int availThreadCount = eadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics List triggers = null; long now = tTimeMillis(); clearSignaledSchedulingChange(); try { triggers = Store().acquireNextTriggers( now + idleWaitTime, (availThreadCount, BatchSize()), chTimeWindow()); //1. lastAcquireFailed = false; if (gEnabled())

("batch acquisition of " + (triggers == null ? 0 : ()) + " triggers"); } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { SchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed = true; continue; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +sage(), e); } lastAcquireFailed = true; continue; } if (triggers != null && !y()) { now = tTimeMillis(); long triggerTime = (0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = tTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) (timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = tTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(y()) continue; // set triggers to 'executing' List bndles = new ArrayList(); boolean goAhead = true; synchronized(sigLock) { goAhead = !(); } if(goAhead) { try { List res = Store().triggersFired(triggers); //2 if(res != null) bndles = res; } catch (SchedulerException se) { SchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < (); i++) { Store().releaseAcquiredTrigger((i)); } continue; } } for (int i = 0; i < (); i++) { TriggerFiredResult result = (i); TriggerFiredBundle bndle = ggerFiredBundle(); Exception exception = eption(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + (i), exception); Store().releaseAcquiredTrigger((i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at or if the scheduler was shutdown (halted) if (bndle == null) { Store().releaseAcquiredTrigger((i)); continue; } JobRunShell shell = null; try { shell = RunShellFactory().createJobRunShell(bndle); lize(qs); } catch (SchedulerException se) { Store().triggeredJobComplete((i), Detail(), _ALL_JOB_TRIGGERS_ERROR); continue; } if (eadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not getLog().error("hread() return false!"); Store().triggeredJobComplete((i), Detail(), _ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if orAvailableThreads() follows contract continue; // while (!halted) } long now = tTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { (timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid qs = null; qsRsrcs = null; } 2.4.1 获取trigger(红⾊1)protected List acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { if (timeWindow < 0) { throw new IllegalArgumentException(); }

List acquiredTriggers = new ArrayList(); Set acquiredJobKeysForNoConcurrentExec = new HashSet(); final int MAX_DO_LOOP_RETRY = 3; int currentLoopCount = 0; do { currentLoopCount ++; try { List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

// No trigger is ready to fire yet. if (keys == null || () == 0) return acquiredTriggers; long batchEnd = noLaterThan; for(TriggerKey triggerKey: keys) { // If our trigger is no longer available, try a new one. OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); if(nextTrigger == null) { continue; // next trigger }

// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then // put it back into the timeTriggers set and continue to search for next trigger. JobKey jobKey = Key(); JobDetail job; try { job = retrieveJob(conn, jobKey); } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } continue; }

if (urrentExectionDisallowed()) { if (ns(jobKey)) { continue; // next trigger } else { (jobKey); } }

if (tFireTime().getTime() > batchEnd) { break; } // We now have a acquired trigger, let's add to return list. // If our trigger was no longer in the expected state, try a new one. int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING); if (rowsUpdated <= 0) { continue; // next trigger } eInstanceId(getFiredTriggerRecordId()); getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null); if(y()) { batchEnd = (tFireTime().getTime(), tTimeMillis()) + timeWindow; } (nextTrigger); } // if we didn't end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. if(() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { continue; }

// We are done with the while loop. break; } catch (Exception e) { throw new JobPersistenceException( "Couldn't acquire next trigger: " + sage(), e); } } while (true);

// Return the acquired trigger list return acquiredTriggers; }2.4.2 触发trigger(红⾊2) protected TriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger) throws JobPersistenceException { JobDetail job; Calendar cal = null; // Make sure trigger wasn't deleted, paused, try { // if trigger was deleted, state will be STATE_DELETED String state = getDelegate().selectTriggerState(conn, ()); if (!(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn't select trigger state: " + sage(), e); } try { job = retrieveJob(conn, Key()); if (job == null) { return null; } } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, (), STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } throw jpe; } if (endarName() != null) { cal = retrieveCalendar(conn, endarName()); if (cal == null) { return null; } } try { getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job); } catch (SQLException e) { throw new JobPersistenceException("Couldn't insert fired trigger: " + sage(), e); } Date prevFireTime = viousFireTime(); // call triggered - to update the trigger's red(cal); String state = STATE_WAITING; boolean force = true;

if (urrentExectionDisallowed()) { state = STATE_BLOCKED; force = false; try { getDelegate().updateTriggerStatesForJobFromOtherState(conn, (), STATE_BLOCKED, STATE_WAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, (), STATE_BLOCKED, STATE_ACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, (), STATE_PAUSED_BLOCKED, STATE_PAUSED); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't update states of blocked triggers: " + sage(), e); } }

if (tFireTime() == null) { state = STATE_COMPLETE; force = true; } storeTrigger(conn, trigger, job, true, state, force, false); DataMap().clearDirtyFlag(); return new TriggerFiredBundle(job, trigger, cal, ().getGroup() .equals(T_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, tFireTime()); } 2.4.3 数据库锁 StdRowLockSemaphore针对⽀持select for update的数据库如mysqlUpdateLockRowSemaphore针对不⽀持select for update的数据库如mssqlserver StdRowLockSemaphore的实现如下: public static final String SELECT_FOR_LOCK = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE"; public static final String INSERT_LOCK = "INSERT INTO " + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("

+ SCHED_NAME_SUBST + ", ?)";

总结: 的三⼤组件Job/trigger/scheduler,job负责业务逻辑,trigger负责执⾏时机,scheduler负责调度Job和trigger来执⾏。 2.使⽤mysql作为存储的话,使⽤StdJDBCDelegate和数据库进⾏交互,交互的sql在StdJDBCConstants中定义 Scheduler是核⼼类,Scheduler做其代理,真正执⾏的是QuartzSchedulerThread re存储控制,JobStoreSupport的两个实现JobStoreCMT容器管理事务,不需要使⽤commit和rollback;JobStoreTX⽤在单机环境,需要处理commit和rollback5.数据库锁使⽤了悲观锁select for update,定义为_scheduler_state定义了扫描间隔集群扫描间隔

参考⽂献:【1】/spring-quartz-schedule 【2】/xiaojin21cen/article/details/79298883

发布者:admin,转转请注明出处:http://www.yc00.com/web/1689929410a295174.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信