博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
通过源码分析Java开源任务调度框架Quartz的主要流程
阅读量:4982 次
发布时间:2019-06-12

本文共 8812 字,大约阅读时间需要 29 分钟。

通过源码分析Java开源任务调度框架Quartz的主要流程

从使用效果、调用链路跟踪、E-R图、循环调度逻辑几个方面分析Quartz。

github项目地址:  , 补充了SQL输出

 

系统说明:

IDE: IntelliJ

JDK:1.8

Quartz:2.2.1

 

使用效果

1.从github项目中,拉取项目到本地,导入IDEA。

    相信读者都有一定工作经验,这些细节不赘述。

2.本文采用Mysql数据库。

    请执行 resources/scripts/tables_mysql_innodb.sql

3.修改jdbc.properties中数据库配置

 

4.通过IDEA, Edit Configurations -> Add Tomcat Server, 部署到Tomcat

 

 

暴露的Restful 接口 /say-hello.do 以及添加好任务后的调用效果:

 

 

添加任务

在tomcat启动成功后,在首页点击“添加任务”,添加如下任务:

 

 

代码执行逻辑在SyncJobFactory类中,从Output中可以看到执行的输出信息,

调用链跟踪的最后会回到这个类来。

 

现在开始跟踪调用链路。 

 

IDEA 快捷键:
进入方法:  Ctrl + 鼠标左键
光标前进/后退: Ctrl + Shirt + 右方向键/左方向键
 
 
一、 调用链路跟踪

从配置文件applicationContext.xml配置中找到任务调度核心类SchedulerFactoryBean

 resources/applicationContext.xml

...

 

使用IDEA快捷键,点击进入SchedulerFactoryBean类,它实现了InitializingBean接口,

在Spring中凡是实现了InitializingBean接口的Bean,都会在Bean属性都设置完成后调用afterPropertiesSet()方法.

 SchedulerFactoryBean.java

//---------------------------------------------------------------------// Implementation of InitializingBean interface// 实现 InitializingBean 接口//---------------------------------------------------------------------public void afterPropertiesSet() throws Exception {    //...    // Create SchedulerFactory instance.    // 创建 SchedulerFactory 调度器工厂实例    SchedulerFactory schedulerFactory = (SchedulerFactory)            BeanUtils.instantiateClass(this.schedulerFactoryClass);    initSchedulerFactory(schedulerFactory);    //...    // Get Scheduler instance from SchedulerFactory.    // 通过调度器工厂 获取 调度器实例    try {        this.scheduler = createScheduler(schedulerFactory, this.schedulerName);    //...}

 

 SchedulerFactoryBean.java

/** * Create the Scheduler instance for the given factory and scheduler name. * 通过制定工厂和调度器名称创建调度器实例 * Called by {
@link #afterPropertiesSet}. *

The default implementation invokes SchedulerFactory's getScheduler * method. Can be overridden for custom Scheduler creation. */protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName) throws SchedulerException { //... try { SchedulerRepository repository = SchedulerRepository.getInstance(); synchronized (repository) { Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null); Scheduler newScheduler = schedulerFactory.getScheduler(); if (newScheduler == existingScheduler) { throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " + "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!"); } //...}

 

 这个项目走的逻辑是 StdSchedulerFactory.getScheduler()方法,可自行debug。

 StdSchedulerFactory.java

/** * Returns a handle to the Scheduler produced by this factory. * 返回该工厂创造的调度器的句柄 */public Scheduler getScheduler() throws SchedulerException {    if (cfg == null) {        initialize();    }    SchedulerRepository schedRep = SchedulerRepository.getInstance();    Scheduler sched = schedRep.lookup(getSchedulerName());    //...    sched = instantiate();    return sched;}

 

StdSchedulerFactory.java

private Scheduler instantiate() throws SchedulerException {    //...    //大量的配置初始化、实例化代码    //...    //第1298行代码    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);    //...}

 

QuartzScheduler.java

/** * Create a QuartzScheduler with the given configuration * 根据给定的配置 创建Quartz调度器 */public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)        throws SchedulerException {        this.resources = resources;        if (resources.getJobStore() instanceof JobListener) {            addInternalJobListener((JobListener)resources.getJobStore());        }        //private QuartzSchedulerThread schedThread;        this.schedThread = new QuartzSchedulerThread(this, resources);        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();        //通过线程池执行 Quartz调度器线程        schedThreadExecutor.execute(this.schedThread);        //...}

 

 QuartzSchedulerThread.java

/** * 

* The main processing loop of the QuartzSchedulerThread. * Quartz调度器线程的主循环逻辑 *

*/@Overridepublic void run() { //while循环执行,只要调度器为被暂停 while(!halted.get()){ JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } if (qsRsrcs.getThreadPool().runInThread(shell) == false){} }}

 

 JobRunShell.java

public void run() {        //...        Job job = jec.getJobInstance();        //...        try {            log.debug("Calling execute on job " + jobDetail.getKey());            //执行            job.execute(jec);            endTime = System.currentTimeMillis();        }        //...        //更新Trigger触发器状态,删除FIRED_TRIGGERS触发记录        instCode = trigger.executionComplete(jec, jobExEx);        //...}

 

QuartzJobBean.java

/** * This implementation applies the passed-in job data map as bean property * values, and delegates to executeInternal afterwards. * 这个实现 把传入的map数据作为bean属性值,然后委托给 executeInternal 方法 */public final void execute(JobExecutionContext context) throws JobExecutionException {    try {    //执行    executeInternal(context);}

 

  SyncJobFactory.java

//回到了我们的业务类SyncJobFactory的executeInternal方法,//里面执行我们的业务代码protected void executeInternal(JobExecutionContext context) throws JobExecutionException {    try {        LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort());    }    //...    System.out.println("jobName:" + scheduleJob.getJobName() + "  " + scheduleJob);    //...}

 

 

 二、E-R图

梳理6张主要的Quartz表:

 

 
QRTZ_TRIGGERS 触发器表

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_JOB_DETAILS表SCHED_NAME外键

    JOB_NAME,任务名。自定义值。 联合主键,QRTZ_JOB_DETAILS表JOB_NAME外键

    JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_JOB_DETAILS表JOB_GROUP外键

    TRIGGER_STATE,触发器状态: WAITING , ACQUIRED, BLOCKING

    NEXT_FIRE_TIME, 下次触发时间:

    MISFIRE_INSTR,执行失败后的指令,

        非失败策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1; 

        失败策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0;

    TRIGGER_TYPE, 触发器类型,例如CRON,cron表达式类型的触发器

    PRIORITY,优先级

 

QRTZ_CRON_TRIGGERS cron类型触发器表

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_TRIGGERS表SCHED_NAME外键

    JOB_NAME,任务名。自定义值。 联合主键,QRTZ_TRIGGERS表JOB_NAME外键

    JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_TRIGGERS表JOB_GROUP外键

    CRON_EXPRESSION, cron表达式, 例如每30秒执行一次, 0/30 * * * * ?

 

QRTZ_JOB_DETAILS 任务详细表

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

    JOB_NAME,任务名。自定义值。 联合主键

    JOB_GROUP,任务组。 自定义值。联合主键

    JOB_DATA,blob类型,任务参数

 

QRTZ_FIRED_TRIGGERS 任务触发表

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

    ENTRY_ID,entry id,联合主键

    JOB_NAME,任务名。自定义值。 

    JOB_GROUP,任务组。 自定义值。

    FIRED_TIME, 任务触发时间

    STATE,状态

    INSTANCE_NAME, 服务器实例名

    PRIORITY,优先级

 

QRTZ_SCHEDULER_STATE 

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

    INSTANCE_NAME,服务器实例名。联合主键

    LAST_CHECKIN_TIME,上次检查时间

    CHECKIN_INTERVAL,检查间隔

 

QRTZ_LOCKS 全局锁

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

    LOCK_NAME,锁名称,例如,TRIGGER_ACCESS。联合主键

   

 

三、循环调度逻辑

    主要流程如下:

 

    源码如下:

QuartzSchedulerThread.java

public void run() {        //...        while (!halted.get()) {            try {                //合理休眠                //...                        //获取接下来的触发器                        //1.状态为WAITING                        //2.触发时间在30秒内                        //3.不是错过执行的或者错过了但是时间不超过两分钟                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());                                                       //...                                 //触发任务                                List
res = qsRsrcs.getJobStore().triggersFired(triggers); //... JobRunShell shell = null; //... //执行代码 if (qsRsrcs.getThreadPool().runInThread(shell) == false) { //... } // while (!halted) //.. }

 

 JobRunShell.java

protected QuartzScheduler qs = null;        public void run() {        qs.addInternalSchedulerListener(this);        try {            //...            do {                Job job = jec.getJobInstance();                // execute the job                try {                    //执行任务代码                    job.execute(jec);                //更新触发器,删除触发记录                qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);                break;            } while (true);        }     //...    }

 

 

四、扩展

 

除了对主线程 QuartzSchedulerThread 的分析

继续分析JobStoreSupport类的两个线程 ClusterManager 和 MisfireHandler 的分析, 它们维护触发器的MISFIRE_INSTR状态,和调度器状态QRTZ_SCHEDULER_STATE。

 

转载于:https://www.cnblogs.com/tanliwei/p/10020787.html

你可能感兴趣的文章