java调度任务
⑴ java线程池之ScheledThreadPoolExecutor实现原理
java中异步周期任务调度有Timer,ScheledThreadPoolExecutor等实现,目前单机版的定时调度都是使用ScheledThreadPoolExecutor去实现,那么它是如何实现周期执行任务的呢?其实它还是利用ThreadPoolExecutor线程池去执行任务,这一点从它是继承自ThreadPoolExecutor救可以看的出来,其实关键在于如何实现任务的周期性调度,
ScheledThreadPoolExecutor类以及核心函数首先ScheledThreadPoolExecutor是实现ScheledExecutorService接口,它主要定义了四个方法:
周期调度一个Runnable的对象
周期调度一个Callable的对象
固定周期调度Runnable对象 (不管上一次Runnable执行结束的时间,总是以固定延迟时间执行 即 上一个Runnable执行开始时候 + 延时时间 = 下一个Runnable执行的时间点)
以固定延迟调度unnable对象(当上一个Runnable执行结束后+固定延迟 = 下一个Runnable执行的时间点)
{publicScheledFuture<?>schele(Runnablecommand,longdelay,TimeUnitunit);public<V>ScheledFuture<V>schele(Callable<V>callable,longdelay,TimeUnitunit);publicScheledFuture<?>scheleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,TimeUnitunit);publicScheledFuture<?>scheleWithFixedDelay(Runnablecommand,longinitialDelay,longdelay,TimeUnitunit);}其次,ScheledThreadPoolExecutor是继承ThreadPoolExecutor,所以它是借助线程池的能力去执行任务,然后自身去实现周期性调度。从构造方法调用父类的线程池的构造方法,核心线程数是构造方法传入,这里可以看到最大线程数是Integer的最大值即2147483647, 还有等待队列是DelayedWorkQueue,它是实现延时的关键.
/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}scheleAtFixedRate是实现周期性调度的方法,调度任务就是实现Runnable对象, 以及系统的开始延时时间,周期的调度的间隔时间。
计算初始触发时间和执行周期,并和传入的Runnable对象作为参数封装成 ScheledFutureTask,然后调用decorateTask装饰Tas(默认实现为空)。
设置ScheledFutureTask对象outerTask为t(默认就是它自己)。
调用delayedExecute延迟执行任务。
publicScheledFuture<?>scheleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,**TimeUnitunit){if(command==null||unit==null)thrownewNullPointerException();if(period<=0)();ScheledFutureTask<Void>sft=newScheledFutureTask<Void>(command,null,triggerTime(initialDelay,unit),unit.toNanos(period));RunnableScheledFuture<Void>t=decorateTask(command,sft);sft.outerTask=t;delayedExecute(t);returnt;}判断线程池状态,如果不是处于running状态,则拒绝该任务。
将该任务加入父类的延迟队列(实际为初始化的DelayedWorkQueue对象)
再次判断线程池不是处于running状态,并且,判断是否是处于shutdown状态并且标志是否是true(默认是false,表示是否线程次处于shutdown状态下是否继续执行周期性任务),若果为true,则从队列删除任务,false,则确保启动线程来执行周期性任务
privatevoiddelayedExecute(RunnableScheledFuture<?>task){if(isShutdown())reject(task);else{super.getQueue().add(task);if(isShutdown()&&!canRunInCurrentRunState(task.isPeriodic())&&remove(task))task.cancel(false);elseensurePrestart();}}获取线程池数量
如果小于核心线程数,则启动核心线程执行任务,如果线程数为空,则启动非核心线程
voidensurePrestart(){intwc=workerCountOf(ctl.get());if(wc<corePoolSize)addWorker(null,true);elseif(wc==0)addWorker(null,false);}ScheledFutureTask的run函数获取是否是周期性任务
判断是否线程池状态是否可以执行任务,如果为true,则取消任务 3 如果是非周期性任务,则直接调用父类FutureTask的run方法, 4 如果是周期性任务,则调用FutureTask的runAndReset函数, 如果该函数返回为true,则调用setNextRunTime设置下一次运行的时间, 并且还行reExecutePeriodic再次执行周期性任务。
publicvoidrun(){booleanperiodic=isPeriodic();if(!canRunInCurrentRunState(periodic))cancel(false);elseif(!periodic)ScheledFutureTask.super.run();elseif(ScheledFutureTask.super.runAndReset()){setNextRunTime();reExecutePeriodic(outerTask);}}判断线程池是否处于可执行任务的状态,如果为true,则重新将设置下一次运行时间的任务加入父类的等待队列,
如果线程池处于不可运行任务的状态,则并且从等待队列中移除成功, 调用任务的取消操作,否则调用ensurePrestart确保启动线程执行任务
voidreExecutePeriodic(RunnableScheledFuture<?>task){if(canRunInCurrentRunState(true)){super.getQueue().add(task);if(!canRunInCurrentRunState(true)&&remove(task))task.cancel(false);elseensurePrestart();}}DelayedWorkQueue类核心函数DelayedWorkQueue是继承AbstractQueue,并实现BlockingQueue接口
<Runnable>implementsBlockingQueue<Runnable>{核心字段
//初始容量为_CAPACITY=16;//等待队列,只能保存RunnableScheledFuture对象privateRunnableScheledFuture<?>[]queue=newRunnableScheledFuture<?>[INITIAL_CAPACITY];//锁privatefinalReentrantLocklock=newReentrantLock();//对俄大小privateintsize=0;//leader线程,表示最近需要执行的任务的线程。privateThreadleader=null;//条件锁=lock.newCondition();offer函数:
将添加的参数转换成RunnableScheledFuture对象。
加全局锁。
获取当前队列的size,如果等于队列的长度,则嗲用grow扩容,增加50%的数组长度。
size加1。
如果数组为0,则将加入的对象放在索引为0的位置,然后设置ScheledFutureTask的heapIndex的索引(便于后续快速删除)。
调用siftUp做堆的上浮操作,这里是小根堆的操作。
如果队列中第一个元素是传入的对象,则将laader设置null
释放锁
返回true
publicbooleanoffer(Runnablex){if(x==null)thrownewNullPointerException();RunnableScheledFuture<?>e=(RunnableScheledFuture<?>)x;finalReentrantLocklock=this.lock;lock.lock();try{inti=size;if(i>=queue.length)grow();size=i+1;if(i==0){queue[0]=e;setIndex(e,0);}else{siftUp(i,e);}if(queue[0]==e){leader=null;available.signal();}}finally{lock.unlock();}returntrue;}siftUp主要就是做小根堆的上移操作,从if (key.compareTo(e) >= 0) 看出,如果key大于parent索引的元素,则停止。
/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}0poll函数
加锁
获取队列中索引为0的云元素,若果为null或者第一个元素的执行时间戳时间大于当前时间则直接返回null,否则调用finishPoll将第一个元素返回.
释放锁
/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}1将队列size 减 1
获取队列中队列中最后一个元素,并且设置队列最后一个为null
最后一个元素不为null,则调用sfitdown进行,将最后一个元素设置到索引为0的位置,将下移操作,重新调整小根堆。
ScheledFutureTask的heapIndex为-1
/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}2ScheledFutureTask的compareTo函数ScheledFutureTask实现compareTo方法逻辑
首先比较是否是同一个对象
若果是ScheledFutureTask对象,则比较time的大小,time是下一次执行的任务的时间戳,如果不是,则比较 getDelay的时间大小
/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}3ScheledThreadPoolExecutor的take函数就是ThreadPoolExecutor的从任务队列中获取任务,没有任务则一直等待(这里是线程数小于核心线程数的情况)
加可中断锁
获取队列中第一个元素的任务,从前面可以知道此任务执行的时间戳最小的任务
如果第一个任务为空,则再全局的锁的条件锁上等待,
如果第一个任务不为空,则获取延迟时间,如果延时时间小于0,说明第一个任务已经到时间了,则返回第一个任务。
如果leader线程不为空,则让线程在全局锁的条件锁上等待
如果leader为空,则将获取第一个任务的当前线程赋值为leader变量。
在全局锁的条件锁上等待delay纳秒, 等待结束后,如果当前线程还是等于leader线程,则重置leader为空
最后判断 leader为空并且第一个任务不为空,则唤醒全局锁上条件锁的等待的线程。
释放全局锁。
/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}4总结 综合前面所述,线程池从DelayedWorkQueue每次取出的任务就是延迟时间最小的任务, 若果到达时间的任务,则执行任务,否则则用条件锁Conditon的wait进行等待,执行完后,则用signal进行唤醒下一个任务的执行。
⑵ 什么是Quartz
要理解Quartz,我们先从Java自带的Timer开始。Timer类提供了简单的定时任务调度机制,允许您安排任务在指定时间点执行或以固定时间间隔重复执行。Timer基于后台线程和任务队列运行,当您调度一个任务时,该任务被添加到队列中并根据指定的时间点或间隔安排执行。
使用Timer时,您需遵循以下步骤:首先创建一个TimerTask对象,这是定义定时任务逻辑的基础。接着,使用Timer对象的schele()方法调度任务,该方法接受TimerTask对象和表示任务执行时间或间隔的参数。最后,如果需要取消任务,可以使用Timer的cancel()方法。
然而,从Java 5开始,推荐使用更加强大和灵活的ScheledExecutorService接口来替代Timer类,以支持更高级的任务调度需求。ScheledExecutorService接口扩展了ExecutorService接口,允许执行延迟任务或以固定时间间隔重复任务。Java提供了两个实现ScheledExecutorService接口的类:ScheledThreadPoolExecutor和SingleThreadScheledExecutor。
Quartz是一个功能强大的开源任务调度框架,专门用于在Java应用程序中执行作业和触发器的调度。Quartz包含许多组件和注解,用于定义和管理作业的调度和执行。其核心组件包括调度器、任务和触发器,分别负责任务的调度、执行和触发。
调度器(Scheler)是Quartz中的关键组件,负责将任务(Job)和触发器(Trigger)结合在一起,并按照触发器定义的时间触发任务执行。任务(Job)是一个接口,包含了一个名为void execute(JobExecutionContext context)的方法,用于定义需要执行的任务逻辑。JobDetail则是一个用于描述Job实现类及其相关静态信息的对象,如任务在scheler中的组名等。
触发器(Trigger)定义了触发事件,可以是固定时间点或固定时间间隔。触发器包括SimpleTrigger和CronTrigger两种类型,其中SimpleTrigger用于循环执行固定时间间隔的任务,而CronTrigger则允许根据Cron表达式定义出各种复杂的调度方案。此外,触发器可以与Calendar关联,用于排除在特定日历时间(如法定假日)内执行任务。
在Quartz中,任务状态包括start(开始)、stop(停止)、pause(暂停)和resume(重试)。SchelerFactory用于创建Scheler,提供两种方式:StdSchelerFactory用于读取classpath下的quartz.properties配置文件来实例化Scheler,而DirectSchelerFactory则允许在代码中直接配置Scheler参数。
JobDetailMap和Trigger中的JobDataMap在Quartz中允许您访问参数,以便在任务执行时访问和使用这些参数。任务类可以通过实现定时任务业务逻辑,并在调度器配置类中通过Spring Boot容器启动后自动启动任务调度来实现。监听配置文件并启动Scheler,任务调度即告完成。