當前位置:首頁 » 編程語言 » java調度任務

java調度任務

發布時間: 2025-03-06 17:49:31

java線程池之ScheledThreadPoolExecutor實現原理

java中非同步周期任務調度有Timer,ScheledThreadPoolExecutor等實現,目前單機版的定時調度都是使用ScheledThreadPoolExecutor去實現,那麼它是如何實現周期執行任務的呢?其實它還是利用ThreadPoolExecutor線程池去執行任務,這一點從它是繼承自ThreadPoolExecutor救可以看的出來,其實關鍵在於如何實現任務的周期性調度,

ScheledThreadPoolExecutor類以及核心函數

首先ScheledThreadPoolExecutor是實現ScheledExecutorService介面,它主要定義了四個方法:

周期調度一個Runnable的對象

周期調度一個Callable的對象

固定周期調度Runnable對象 (不管上一次Runnable執行結束的時間,總是以固定延遲時間執行 即 上一個Runnable執行開始時候 + 延時時間 = 下一個Runnable執行的時間點)

以固定延遲調度unnable對象(當上一個Runnable執行結束後+固定延遲 = 下一個Runnable執行的時間點)

{publicScheledFuture<?>schele(Runnablecommand,longdelay,TimeUnitunit);public<V>ScheledFuture<V>schele(Callable<V>callable,longdelay,TimeUnitunit);publicScheledFuture<?>scheleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,TimeUnitunit);publicScheledFuture<?>scheleWithFixedDelay(Runnablecommand,longinitialDelay,longdelay,TimeUnitunit);}

其次,ScheledThreadPoolExecutor是繼承ThreadPoolExecutor,所以它是藉助線程池的能力去執行任務,然後自身去實現周期性調度。從構造方法調用父類的線程池的構造方法,核心線程數是構造方法傳入,這里可以看到最大線程數是Integer的最大值即2147483647, 還有等待隊列是DelayedWorkQueue,它是實現延時的關鍵.

/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}

scheleAtFixedRate是實現周期性調度的方法,調度任務就是實現Runnable對象, 以及系統的開始延時時間,周期的調度的間隔時間。

計算初始觸發時間和執行周期,並和傳入的Runnable對象作為參數封裝成 ScheledFutureTask,然後調用decorateTask裝飾Tas(默認實現為空)。

設置ScheledFutureTask對象outerTask為t(默認就是它自己)。

調用delayedExecute延遲執行任務。

publicScheledFuture<?>scheleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,**TimeUnitunit){if(command==null||unit==null)thrownewNullPointerException();if(period<=0)();ScheledFutureTask<Void>sft=newScheledFutureTask<Void>(command,null,triggerTime(initialDelay,unit),unit.toNanos(period));RunnableScheledFuture<Void>t=decorateTask(command,sft);sft.outerTask=t;delayedExecute(t);returnt;}

判斷線程池狀態,如果不是處於running狀態,則拒絕該任務。

將該任務加入父類的延遲隊列(實際為初始化的DelayedWorkQueue對象)

再次判斷線程池不是處於running狀態,並且,判斷是否是處於shutdown狀態並且標志是否是true(默認是false,表示是否線程次處於shutdown狀態下是否繼續執行周期性任務),若果為true,則從隊列刪除任務,false,則確保啟動線程來執行周期性任務

privatevoiddelayedExecute(RunnableScheledFuture<?>task){if(isShutdown())reject(task);else{super.getQueue().add(task);if(isShutdown()&&!canRunInCurrentRunState(task.isPeriodic())&&remove(task))task.cancel(false);elseensurePrestart();}}

獲取線程池數量

如果小於核心線程數,則啟動核心線程執行任務,如果線程數為空,則啟動非核心線程

voidensurePrestart(){intwc=workerCountOf(ctl.get());if(wc<corePoolSize)addWorker(null,true);elseif(wc==0)addWorker(null,false);}ScheledFutureTask的run函數

獲取是否是周期性任務

判斷是否線程池狀態是否可以執行任務,如果為true,則取消任務 3 如果是非周期性任務,則直接調用父類FutureTask的run方法, 4 如果是周期性任務,則調用FutureTask的runAndReset函數, 如果該函數返回為true,則調用setNextRunTime設置下一次運行的時間, 並且還行reExecutePeriodic再次執行周期性任務。

publicvoidrun(){booleanperiodic=isPeriodic();if(!canRunInCurrentRunState(periodic))cancel(false);elseif(!periodic)ScheledFutureTask.super.run();elseif(ScheledFutureTask.super.runAndReset()){setNextRunTime();reExecutePeriodic(outerTask);}}

判斷線程池是否處於可執行任務的狀態,如果為true,則重新將設置下一次運行時間的任務加入父類的等待隊列,

如果線程池處於不可運行任務的狀態,則並且從等待隊列中移除成功, 調用任務的取消操作,否則調用ensurePrestart確保啟動線程執行任務

voidreExecutePeriodic(RunnableScheledFuture<?>task){if(canRunInCurrentRunState(true)){super.getQueue().add(task);if(!canRunInCurrentRunState(true)&&remove(task))task.cancel(false);elseensurePrestart();}}DelayedWorkQueue類核心函數

DelayedWorkQueue是繼承AbstractQueue,並實現BlockingQueue介面

<Runnable>implementsBlockingQueue<Runnable>{

核心欄位

//初始容量為_CAPACITY=16;//等待隊列,只能保存RunnableScheledFuture對象privateRunnableScheledFuture<?>[]queue=newRunnableScheledFuture<?>[INITIAL_CAPACITY];//鎖privatefinalReentrantLocklock=newReentrantLock();//對俄大小privateintsize=0;//leader線程,表示最近需要執行的任務的線程。privateThreadleader=null;//條件鎖=lock.newCondition();

offer函數:

將添加的參數轉換成RunnableScheledFuture對象。

加全局鎖。

獲取當前隊列的size,如果等於隊列的長度,則嗲用grow擴容,增加50%的數組長度。

size加1。

如果數組為0,則將加入的對象放在索引為0的位置,然後設置ScheledFutureTask的heapIndex的索引(便於後續快速刪除)。

調用siftUp做堆的上浮操作,這里是小根堆的操作。

如果隊列中第一個元素是傳入的對象,則將laader設置null

釋放鎖

返回true

publicbooleanoffer(Runnablex){if(x==null)thrownewNullPointerException();RunnableScheledFuture<?>e=(RunnableScheledFuture<?>)x;finalReentrantLocklock=this.lock;lock.lock();try{inti=size;if(i>=queue.length)grow();size=i+1;if(i==0){queue[0]=e;setIndex(e,0);}else{siftUp(i,e);}if(queue[0]==e){leader=null;available.signal();}}finally{lock.unlock();}returntrue;}

siftUp主要就是做小根堆的上移操作,從if (key.compareTo(e) >= 0) 看出,如果key大於parent索引的元素,則停止。

/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}0

poll函數

加鎖

獲取隊列中索引為0的雲元素,若果為null或者第一個元素的執行時間戳時間大於當前時間則直接返回null,否則調用finishPoll將第一個元素返回.

釋放鎖

/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}1

將隊列size 減 1

獲取隊列中隊列中最後一個元素,並且設置隊列最後一個為null

最後一個元素不為null,則調用sfitdown進行,將最後一個元素設置到索引為0的位置,將下移操作,重新調整小根堆。

ScheledFutureTask的heapIndex為-1

/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}2ScheledFutureTask的compareTo函數

ScheledFutureTask實現compareTo方法邏輯

首先比較是否是同一個對象

若果是ScheledFutureTask對象,則比較time的大小,time是下一次執行的任務的時間戳,如果不是,則比較 getDelay的時間大小

/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}3

ScheledThreadPoolExecutor的take函數就是ThreadPoolExecutor的從任務隊列中獲取任務,沒有任務則一直等待(這里是線程數小於核心線程數的情況)

加可中斷鎖

獲取隊列中第一個元素的任務,從前面可以知道此任務執行的時間戳最小的任務

如果第一個任務為空,則再全局的鎖的條件鎖上等待,

如果第一個任務不為空,則獲取延遲時間,如果延時時間小於0,說明第一個任務已經到時間了,則返回第一個任務。

如果leader線程不為空,則讓線程在全局鎖的條件鎖上等待

如果leader為空,則將獲取第一個任務的當前線程賦值為leader變數。

在全局鎖的條件鎖上等待delay納秒, 等待結束後,如果當前線程還是等於leader線程,則重置leader為空

最後判斷 leader為空並且第一個任務不為空,則喚醒全局鎖上條件鎖的等待的線程。

釋放全局鎖。

/***Createsanew{@codeScheledThreadPoolExecutor}withthe*givencorepoolsize.**@,even*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset*@{@codecorePoolSize<0}*/(intcorePoolSize){super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}4

總結 綜合前面所述,線程池從DelayedWorkQueue每次取出的任務就是延遲時間最小的任務, 若果到達時間的任務,則執行任務,否則則用條件鎖Conditon的wait進行等待,執行完後,則用signal進行喚醒下一個任務的執行。

⑵ 什麼是Quartz

要理解Quartz,我們先從Java自帶的Timer開始。Timer類提供了簡單的定時任務調度機制,允許您安排任務在指定時間點執行或以固定時間間隔重復執行。Timer基於後台線程和任務隊列運行,當您調度一個任務時,該任務被添加到隊列中並根據指定的時間點或間隔安排執行。

使用Timer時,您需遵循以下步驟:首先創建一個TimerTask對象,這是定義定時任務邏輯的基礎。接著,使用Timer對象的schele()方法調度任務,該方法接受TimerTask對象和表示任務執行時間或間隔的參數。最後,如果需要取消任務,可以使用Timer的cancel()方法。

然而,從Java 5開始,推薦使用更加強大和靈活的ScheledExecutorService介面來替代Timer類,以支持更高級的任務調度需求。ScheledExecutorService介面擴展了ExecutorService介面,允許執行延遲任務或以固定時間間隔重復任務。Java提供了兩個實現ScheledExecutorService介面的類:ScheledThreadPoolExecutor和SingleThreadScheledExecutor。

Quartz是一個功能強大的開源任務調度框架,專門用於在Java應用程序中執行作業和觸發器的調度。Quartz包含許多組件和註解,用於定義和管理作業的調度和執行。其核心組件包括調度器、任務和觸發器,分別負責任務的調度、執行和觸發。

調度器(Scheler)是Quartz中的關鍵組件,負責將任務(Job)和觸發器(Trigger)結合在一起,並按照觸發器定義的時間觸發任務執行。任務(Job)是一個介面,包含了一個名為void execute(JobExecutionContext context)的方法,用於定義需要執行的任務邏輯。JobDetail則是一個用於描述Job實現類及其相關靜態信息的對象,如任務在scheler中的組名等。

觸發器(Trigger)定義了觸發事件,可以是固定時間點或固定時間間隔。觸發器包括SimpleTrigger和CronTrigger兩種類型,其中SimpleTrigger用於循環執行固定時間間隔的任務,而CronTrigger則允許根據Cron表達式定義出各種復雜的調度方案。此外,觸發器可以與Calendar關聯,用於排除在特定日歷時間(如法定假日)內執行任務。

在Quartz中,任務狀態包括start(開始)、stop(停止)、pause(暫停)和resume(重試)。SchelerFactory用於創建Scheler,提供兩種方式:StdSchelerFactory用於讀取classpath下的quartz.properties配置文件來實例化Scheler,而DirectSchelerFactory則允許在代碼中直接配置Scheler參數。

JobDetailMap和Trigger中的JobDataMap在Quartz中允許您訪問參數,以便在任務執行時訪問和使用這些參數。任務類可以通過實現定時任務業務邏輯,並在調度器配置類中通過Spring Boot容器啟動後自動啟動任務調度來實現。監聽配置文件並啟動Scheler,任務調度即告完成。

熱點內容
在b站清緩存 發布:2025-03-06 21:21:51 瀏覽:494
安卓系統中國哪裡下載 發布:2025-03-06 21:21:42 瀏覽:924
幀緩存定義 發布:2025-03-06 21:20:21 瀏覽:703
3d動畫解壓密碼 發布:2025-03-06 21:20:14 瀏覽:56
安卓系統2K哪裡下載 發布:2025-03-06 21:14:04 瀏覽:943
如何在雲主機上搭建web伺服器 發布:2025-03-06 21:09:05 瀏覽:591
電腦改為伺服器有什麼用 發布:2025-03-06 21:00:38 瀏覽:270
網站在文件夾 發布:2025-03-06 20:51:46 瀏覽:116
阿瑪尼行李箱密碼鎖如何換密碼 發布:2025-03-06 20:46:02 瀏覽:106
xp共享文件夾win7無法訪問 發布:2025-03-06 20:35:40 瀏覽:589