搶單任務源碼
Ⅰ #抬抬小手學Python# 用別人代碼完成我的工作,愉快摸魚「附源碼」
模塊是一個概念,它包含 1~N 個文件,如果文件是 Python 代碼文件(就是.py 結尾的文件),那每個文件中可以包含函數,類等內容。
在公司工作,很多項目都是協作開發來完成,一個項目後面可能存在很多工程師,為了開發方便,每個人負責的功能函數或者類都盡量封裝在一個 模塊 中,模塊英文請記住 mole ,有的地方叫做 庫 ,也有的地方叫做 包(package) ,對於現階段的你來說,當成一樣的內容就好。
互聯網上存在大量的開源模塊,這些模塊最大的優勢就是免費,很多時候使用這些模塊能極大的提高編碼效率,這也是很多人喜歡 Python 的原因之一。
模塊學習的過程,不能按照語法結構來學習,它是一種抽象的知識,是一種代碼的設計方式。例如將寫好的函授放到模塊中。
接下來就將上面的函數整合到一個模塊中去,建立一個新的文件 stir_fry.py 然後將兩個函數復制到新的文件中。
stir_fry.py 文件包含那兩個函數
好了,完成任務,一個模塊創建完畢了,這個 stir_fry.py 文件就是一個模塊。
你現在腦中肯定出現黑人問號臉了,What?這就完了。是的,完了,一個低配模塊完成。
下面就可以拿著這個模塊給別人使用去了。會寫模塊成為大佬之後,就可以給新入行的菜鳥指點江山,寫模塊了。
在另一個文件中,可以通過 import 模塊名 導入一個模塊,例如導入剛才創建的 stir_fry 模塊。
注意要新建一個文件,文件名隨意但是不要與模塊同名。
如果想要使用模塊中的函數,只需要參考下述語法格式即可。
通過 stir_fry 調用模塊中的函數。
當通過 import stir_fry 導入模塊之後,該模塊內的所有函數都一次性導入到新文件中了。
如果不想導入模塊的所有函數,而只導入某個函數,使用一下語法可以解決該問題。
修改上一節案例:
直接導入模塊中的函數,使用時不需要通過 模塊名. 的方式調用,直接書寫函數名即可。
導入模塊中多個函數
語法格式如下:
導入模塊所有函數
語法格式如下:
剛才通過模塊導入函數你應該發現一個潛在的問題,就是函數名稱太長怎麼辦,除了名稱太長,還存在一種情況,模塊中的函數名稱與當前文件中函數的名稱,存在重名的風險。此時可以學習一個新的內容,通過 as 給模塊導入進來的函數起個別名,然後在該文件都使用別名進行編碼。
語法格式如下:
上述內容應用到案例中如下述代碼:
as 別名也可直接作用於模塊,語法格式如下:
隨著程序設計變的越來越復雜,只把函數放到模塊中已經不能滿足要求了,需要將更高級的內容放到模塊中,也就是類。
首先在 dog_mole.py 文件中定義一個類。
此時的 dog_mole 就是模塊的名稱,而在該模塊中只有一個類 Dog ,也可以在該模塊中多創建幾個類,例如:
與導入模塊的函數部分知識一樣,如果希望導入一個模塊中的類,可以直接通過下述語法格式實現:
使用模塊中的類,語法格式如下:
具體代碼不在演示,自行完成吧。
導入模塊的類和導入模塊的的函數用法是一致的。
新建一個 demo.py 文件,在該文件導入 dog_mole 模塊中的類。
從模塊中導入多個類
該方式與函數的導入也一致,語法格式如下:
導入模塊中所有類
學到這里,你應該已經發現導入模塊中的函數與導入模塊中的類,從代碼編寫的角度幾乎看不出區別,對比著學習即可。
導入類的時候也可以應用別名,同樣使用 as 語法。
學習到這里你對模塊是什麼,模塊怎麼用已經有了一個基本認知,接下來先不用自己寫一個特別牛的模塊,我們先把一些常見的模塊應用起來。
通過隨機數模塊可以獲取到一個數字,它的使用場景非常廣,例如 游戲 相關開發、驗證碼相關、抽獎相關,學習了隨機數之後可以完成一些非常不錯的小案例。
randint 方法
導入隨機數模塊之後,可以通過 randint 方法隨機生成一個整數,例如下述代碼:
反復運行代碼會得到一個 1~10 之間的數字,由此可以 randint 方法中的參數含義。
choice 方法
通過 choice 方法可以配合列表實現一些效果,choice 可以隨機返回列表中的一個元素。
如果你想知道 choice 方法的具體用法,還記得怎麼查詢嗎?
shuffle 方法
該方法可以將一個列表的順序打亂。
簡單挑選了 random 模塊中的三個方法做為說明,對於模塊的學習,後面將為每個模塊單開一篇文章書寫。
時間模塊是 Python 中非常重要的一個內置模塊,很多場景都離不開它,內置模塊就是 Python 安裝好之後自帶的模塊。
time 方法
time 模塊主要用於操作時間,該方法中存在一個 time 對象,使用 time 方法之後,可以獲取從 1970年1月1日 00:00:00 到現在的秒數,很多地方會稱作時間戳。
輸出內容:
sleep 方法
該方法可以讓程序暫停,該方法的參數是的單位是 秒 。
使用語法格式為:
asctime 與 localtime 方法
以上兩個方法都可以返回當前系統時間,只是展示的形式不同。
time 模塊涉及的方法先只涉及這么多,後續滾雪球學習過程中在繼續補充。
Python 還內置了很多模塊,例如 sys 模塊、os 模塊、json 模塊、pickle 模塊、shelve 模塊、xml 模塊、re 模塊、logging 模塊等等內容,後續都將逐步學習到,有可能需要分開專題給大家講解。
Python 模塊,快速編碼的一種途徑,很多時候第三方模塊可以幫你解決大多數常見編碼場景,讓你在編碼的道路上飛奔。
Ⅱ 開發一套微信小程序搶單系統價格多少
要看選擇的開發方式,比如:
1、自己組建技術團隊自己開發,需要的人員有產品經理、框架工程師、java、PHP、前端、後端、測試工程師,開發周期在1-2個月。人員成本10-20萬,後期維護成本沒算。(不推薦)
2、購買別人的小程序源碼,並且自己配置伺服器,再找個技術人員專職維護。源碼費用一般10000-30000,伺服器一年至少3000,維護成本每月6000以上。(不推薦)
3、使用第三方小程序,購買第三方小程序使用賬號,總費用根據自身需求,費用2000以內到幾千元不等,不用擔心技術維護、不用建伺服器,拿過來就可以使用,還可以根據自己的搭建要求設計店鋪和綁定公眾號。(推薦)
Ⅲ 求java題源代碼,最好有注釋,
import java.awt.*;
import java.awt.event.*;
import java.awt.geom.*;
import java.util.*;
import javax.swing.*;
/**
* 多線程,小球演示. 打開Windows任務管理器,可看到線程變化。 可搜索到,run()方法/.start()
*
* : 程序技巧體會: 所謂產生一個小球,即是 new 其類對象,其屬性攜帶畫小球的 坐標、顏色、所在容器 等參數。
*
* 一個類,屬性用來作為參數容器用, 方法....完成功能。
* */
// 運行類
public class BouncePress {
//
public static void main(String[] args) {
JFrame frame = new BouncePressFrame(); // 生成窗口。執行構造。-----業務邏輯。
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); // similar to
// window
// listener
frame.show();
}
}
class BouncePressFrame extends JFrame {
private BallPressCanvas canvas;
public BouncePressFrame() {
setSize(600, 500); // 窗口大小
setTitle("Bounce Ball");
Container contentPane = getContentPane(); // Swing的窗口不能直接放入東西,只能在其上的ContentPane上放。
canvas = new BallPressCanvas(); // 生成一個新面板。-----canvas
contentPane.add(canvas, BorderLayout.CENTER); // 窗口中心 加入該面板。
JPanel buttonPanel = new JPanel(); // 再生成一個新面板。----buttonPanel
// 調用本類方法addButton。
addButton(buttonPanel, "Start", // 生成一個按鈕"Start"---加入面板buttonPanel
new ActionListener() { // |------>按鈕綁上 action監聽器。
public void actionPerformed(ActionEvent evt) { // | 小球容器對象的
addBall(Thread.NORM_PRIORITY - 4, Color.black); // 事件處理時,執行---addBall()方法。--->產生小球(參數對象)--->加入List中--->開始畫球。
}
}); // 按一次,addBall()一次--->產生一個新小球--->加入List中--->開始畫此新小球。
// --->畫球線程BallPressThread的run()--->小球(參數對象).move()--->每次畫時,先移動,再判斷,再畫。
// --->BallPressCanvas類的canvas對象.paint()--->自動調BallPressCanvas類的paintComponent(Graphics
// g)方法。
// --->該方法,從List中循環取出所有小球,第i個球,--->調該小球BallPress類
// .draw()方法--->調Graphics2D方法畫出小球。--使用color/
addButton(buttonPanel, "Express", new ActionListener() {
public void actionPerformed(ActionEvent evt) {
addBall(Thread.NORM_PRIORITY + 2, Color.red);
}
});
addButton(buttonPanel, "Close", new ActionListener() {
public void actionPerformed(ActionEvent evt) {
System.exit(0);
}
});
contentPane.add(buttonPanel, BorderLayout.SOUTH);
}
public void addButton(Container c, String title, ActionListener listener) {
JButton button = new JButton(title); // 生成一個按鈕。
c.add(button); // 加入容器中。
button.addActionListener(listener); // 按鈕綁上 action監聽器。
}
/** 主要業務方法。 */
public void addBall(int priority, Color color) {
// 生成 小球(參數對象)
BallPress b = new BallPress(canvas, color); // 生成BallPress對象,攜帶、初始化
// 畫Ball形小球,所需參數:所在容器組件,所需color--black/red.
// 小球加入 List中。
canvas.add(b); // 面板canvas 的ArrayList中 加入BallPress對象。
BallPressThread thread = new BallPressThread(b); // 生成畫小球的線程類BallPressThread對象。傳入BallPress對象(攜帶了畫球所需
// 容器、color參數)。
thread.setPriority(priority);
thread.start(); // call run(), ball start to move
// 畫球線程開始。--->BallPressThread的run()--->小球(參數對象).move()--->先移動,再畫。canvas.paint--->BallPressCanvas類的
}
}
// 畫球的線程類。
class BallPressThread extends Thread {
private BallPress b;
public BallPressThread(BallPress aBall) {
b = aBall;
}
// 畫球開始。
public void run() {
try {
for (int i = 1; i <= 1000; i++) { // 畫1000次。
b.move(); // 每次畫時,先移動,再判斷,再畫。
sleep(5); // 所以移動比Bounce.java的球慢。
}
} catch (InterruptedException e) {
}
}
}
// swing面板類.
// 作用1) 本類面板對象.paint()方法---->自動繪制面板,且自動調paintComponent(Graphics
// g)方法,--->重寫該方法,繪制面板(及其上組件)。
// 作用2) 該類對象 屬性ArrayList balls---兼作小球(參數對象)的容器。
class BallPressCanvas extends JPanel {
private ArrayList balls = new ArrayList();
public void add(BallPress b) {
balls.add(b); // 向ArrayList中添加球。當按下按鈕,添加多個球時,都保存在這個List中。
}
// 重寫了 javax.swing.JComponent的 paintComponent()方法。
// paint()方法自動調用該方法。
public void paintComponent(Graphics g) {
super.paintComponent(g);
Graphics2D g2 = (Graphics2D) g;
for (int i = 0; i < balls.size(); i++) { // 循環
BallPress b = (BallPress) balls.get(i); // 從List中取出第i個球,
b.draw(g2); // 畫此球。
}
}
}
/**
* 畫出球。
*
* 在 canvas上畫出,color色的小球圖形。
*
* 屬性,可用於攜帶畫小球所需參數。
*
*
*
* @author congan
*
*/
class BallPress {
private Component canvas;
private Color color;
private int x = 0;
private int y = 0;
private int dx = 2;
private int dy = 2;
// 構造 初始化 容器 顏色 參數。
public BallPress(Component c, Color aColor) {
canvas = c;
color = aColor;
}
// 制定位置,畫出小球。
public void draw(Graphics2D g2) {
g2.setColor(color);
g2.fill(new Ellipse2D.Double(x, y, 15, 15)); // ellipse:橢圓形
}
// 移動小球。
// 每次畫時,先移動,再判斷,再畫。
// 該方法每次執行,畫小球的起點坐標 (x,y), 每次各自+2, 即斜向右下運動。
public void move() {
x += dx; // x=x+dx; 畫小球的起點坐標 (x,y), 每次各自+2, 即斜向右下運動。
y += dy; // y=y+dy;
if (x < 0) { // 小球已到左邊框。保證,從左邊框開始畫。
x = 0;
dx = -dx; // 小球橫坐標變化值取反。開始反向運動。
}
if (x + 15 >= canvas.getWidth()) { // 小球右邊已經到畫板右邊。
x = canvas.getWidth() - 15;
dx = -dx; // 開始反向運動。
}
if (y < 0) { // 保證,從頂框開始畫。
y = 0;
dy = -dy;
}
if (y + 15 >= canvas.getHeight()) { // 小球已到畫板頂。
y = canvas.getHeight() - 15;
dy = -dy;
}
canvas.paint(canvas.getGraphics()); // 畫出面板對象canvas----(及其上所有組件)
// //.paint()方法,自動調用
}
}
/*import java.awt.*;
import java.awt.event.*;
import java.awt.geom.*;
import java.util.*;
import javax.swing.*;
*//**
* 單線程,小球演示 搜索不到,run()方法/.start()
*//*
public class Bounce {
public static void main(String[] args) {
JFrame frame = new BounceFrame();
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); // similar to
// window
// listener
frame.show();
}
}
不懂的再問啊。。。
Ⅳ 任務協作系統源碼和團隊協作源碼是一樣的嗎
有點差別,但共同點都是協作軟體源碼。Workless可量化的團隊協作 軟體提供源碼出售。
Ⅳ 源代碼什麼意思
源代碼(也稱源程序),是指一系列人類可讀的計算機語言指令。 在現代程序語言中,源代碼可以是以書籍或者磁帶的形式出現,但最為常用的格式是文本文件,這種典型格式的目的是為了編譯出計算機程序。計算機源代碼的最終目的是將人類可讀的文本翻譯成為計算機可以執行的二進制指令,這種過程叫做編譯,通過編譯器完成。
代碼組合
源代碼作為軟體的特殊部分,可能被包含在一個或多個文件中。一個程序不必用同一種格式的源代碼書寫。例如,一個程序如果有C語言庫的支持,那麼就可以用C語言;而另一部分為了達到比較高的運行效率,則可以用匯編語言編寫。
較為復雜的軟體,一般需要數十種甚至上百種的源代碼的參與。為了降低種復雜度,必須引入一種可以描述各個源代碼之間聯系,並且如何正確編譯的系統。在這樣的背景下,修訂控制系統(RCS)誕生了,並成為研發者對代碼修訂的必備工具之一。
還有另外一種組合:源代碼的編寫和編譯分別在不同的平台上實現,專業術語叫做軟體移植。
質量
對於計算機而言,並不存在真正意義上的「好」的源代碼;然而作為一個人,好的書寫習慣將決定源代碼的好壞。源代碼是否具有可讀性,成為好壞的重要標准。軟體文檔則是表明可讀性的關鍵。
源代碼主要功用有如下2種作用:
1、生成目標代碼,即計算機可以識別的代碼。
2、對軟體進行說明,即對軟體的編寫進行說明。為數不少的初學者,甚至少數有經驗的程序員都忽視軟體說明的編寫,因為這部分雖然不會在生成的程序中直接顯示,也不參與編譯。但是說明對軟體的學習、分享、維護和軟體復用都有巨大的好處。因此,書寫軟體說明在業界被認為是能創造優秀程序的良好習慣,一些公司也硬性規定必須書寫。
(需要指出的是,源代碼的修改不能改變已經生成的目標代碼。如果需要目標代碼做出相應的修改,必須重新編譯。 )
如果按照源代碼類型區分軟體,通常被分為兩類:自由軟體和非自由軟體。自由軟體一般是不僅可以免費得到,而且公開源代碼;相對應地,非自由軟體則是不公開源代碼。所有一切通過非正常手段獲得非自由軟體源代碼的行為都將被視為非法。
Ⅵ XXL admin 源碼解析
xxl-job 的 admin 服務是 xxl-job 的調度中心,負責管理和調度注冊的 job,關於 xxl-job 的使用,可以閱讀 「參考閱讀」 中的《XXL-JOB分布式調度框架全面詳解》,這里主要是介紹 admin 中的源碼。
admin 服務除了管理頁面上的一些介面外,還有一些核心功能,比如:
1、根據 job 的配置,自動調度 job;
2、接收 executor 實例的請求,實現注冊和下線;
3、監視失敗的 job,進行重試;
4、結束一些異常的 job;
5、清理和統計日誌;
這些功能都是在 admin 服務啟動後,在後台自動運行的,下面將詳細介紹 admin 服務這些功能敏罩的實現。
XxlJobAdminConfig 是 admin 服務的配置類,在 admin 服務啟動時,它除了配置 admin 服務的一些參數外,還會啟動 admin 服務的所有後台線程。
該類的屬性主要分為5類:
1、配置文件中的參數,比如 accessToken;
2、DAO 層各個數據表的 mapper;
3、Spring 容器中的一些 Bean,比如 JobAlarmer、DataSource 等;
4、私有變數 XxlJobScheler 對象;
5、私有靜態變數 adminConfig,指向實例自身。
該類有兩個重要方法,分別實現自介面 InitializingBean、DisposableBean,作用如下:
這兩個方法分別調用了 XxlJobScheler 對象的 init 、 destroy 方法,源碼如下:
XxlJobAdminConfig 作為 admin 服務的配置類,作用就是在 Spring 容器啟動時,調用 XxlJobScheler 的初始化方法,來初始化和啟動 admin 服務的功能。
XxlJobScheler 的作用就是調用各個輔助類(xxxHelper)來啟動和結束不同的線程和功能,初始化方法 init 的代碼如下:
下面我們主要介紹 init 中各個類及其作用,最後再簡單一下介紹 destroy 的作用纖拿森。
當 admin 服務向 executor 實例發出一個調度請求來執行 job 時,會調用 XxlJobTrigger.trigger() 方法把要傳輸的參數(比如 job_id、jobHandler、job_log_id、阻塞策略等,包裝成 TriggerParam 對象)傳給 ExecutorBiz 對象來執行一次調度。
xxl-job 對調度過程做了兩個優化:
JobTriggerPoolHelper 在 toStart 方法中初始化了它的兩個線程池屬性,代碼如下:
每次有調度請求時,就會在這兩個線程池中創建線程,創建線程的邏輯在 addTrigger 方法中。
不同 job 存在執行時長的差異,為了避免不同耗時 job 之間相互阻塞,xxl-job 根據 job 的響應時間,對 job 進行了區分,主要體現在:
如果快 job 與調用頻繁的慢 job 在同一個線程池中創建線程,慢 job 會佔用大量的線程,導致快 job 線程不能及時運行,降低了線程池和線程的利用率。xxl-job 通過快慢隔離,避免了這個問題。
不能,因為慢 job 還是會佔用大量線程,搶佔了快 job 的線程資源;增加線程池中的線程數不但沒有提升利用率,還會導致大量線程看空閑,利用率反而降低了。最好的方法還是用兩個線程池把兩者隔離,可以合理地使用各自線程池的資源。
為了記錄慢 job 的超時次毀畝數,代碼中使用一個 map(變數 jobTimeoutCountMap )來記錄一分鍾內 job 超時次數,key 值是 job_id,value 是超時次數。在調用 XxlJobTrigger.trigger() 方法之前,會先判斷 map 中,該 job_id 的超時次數是否大於 10,如果大於10,就是使用 slowTriggerPool,代碼如下:
調用 XxlJobTrigger.trigger() 方法後,根據兩個值來更新 jobTimeoutCountMap 的值:
和上面的代碼相結合,一個 job 在一分鍾內有10次調用超過 500 毫秒,就認為該 job 是一個 頻繁調度且耗時的 job。
代碼如下:
在該類中,屬性變數 minTim 和 jobTimeoutCountMap 都使用 volatile 來修飾,保證了並發調用 addTrigger 時數據的一致性和可見性。
admin 服務發起 job 調度請求時,是在靜態方法 public static void trigger() 中調用靜態變數 private static JobTriggerPoolHelper helper 的 addTrigger 方法來發起請求的。minTim 和 jobTimeoutCountMap 雖然不是 static 修飾的,但可以看做是全局唯一的(因為持有它們的對象是全局唯一的),因此這兩個參數維護的是 admin 服務全局的調度時間和超時次數,為了避免記錄的數據量過大,需要每分鍾清空一次數據的操作。
admin 服務提供了介面給 executor 來注冊和下線,另外,當 executor 長時間(90秒)沒有發心跳時,要把 executor 自動下線。前一個功能通過暴露一個介面來接收請求,後一個功能需要開啟一個線程,定時更新過期 executor 的狀態。
xxl-job 為了提升 admin 服務的性能,在前一個功能的介面接收到 executor 的請求時,不是同步執行,而是在線程池中開啟一個線程,非同步執行 executor 的注冊和下線請求。
JobRegistryHelper 類就負責管理這個線程池和定時線程的。
線程池的定義和初始化代碼如下:
executor 實例在發起注冊和下線請求時,會調用 AdminBizImpl 類的對應方法,該類的方法如下:
可以看到,AdminBizImpl 類的兩個方法都是調用了 JobRegistryHelper 方法來實現,其中 JobRegistryHelper.registry 方法代碼如下(registryRemove 代碼與之相似):
這兩個方法是通過在線程池 registryOrRemoveThreadPool 中創建線程來非同步執行請求,然後把數據更新或新建到數據表 xxl_job_registry 中。
當 executor 注冊到 admin 服務後(數據入庫到 xxl_job_registry 表),是不會在頁面上顯示的,需要要用戶手動添加 job_group 數據(添加到 xxl_job_group 表),admin 服務會自動把用戶添加的 job_group 數據與 xxl_job_registry 數據關聯。這就需要 admin 定時從 xxl_job_group 表讀取數據,關聯 xxl_job_registry 表和 xxl_job_group 表的數據。
這個功能是與 「executor 自動下線」 功能在同一個線程中實現,該線程的主要邏輯是:
相關代碼如下:
從這里可以看出,如果是對外介面(接收請求等)的功能,使用線程池和非同步線程來實現;如果是一些自動任務,則是通過一個線程來定時執行。
如果一個 Job 調度後,沒有響應返回,需要定時重試。作為一種「自動執行」的任務,很顯然可以像前面 JobRegistryHelper 一樣,使用一個線程定時重試。
在這個類中,定義了一個監視線程,以每10 秒一次的頻率運行,對失敗的 job 進行重試。如果 job 剩餘的重試次數大於0,就會 job 進行重試,並把發送告警信息。線程的定義如下:
在這個線程中,它利用 「資料庫執行 UPDATE 語句時會加上互斥鎖」 的特性,使用了 「基於資料庫的分布式鎖」,代碼如下所示:
在這個語句中,會把 jobLog 的狀態設置為 -1,這是一個無效狀態值,當其他線程通過有效狀態值來搜索失敗記錄時,會略過該記錄,這樣該記錄就不會被其他線程重試,達到的分布式鎖的功能(這個鎖是一個行鎖)。或者說,-1狀態類似於 java 中的對象頭的鎖標志位,表明該記錄已經被加鎖了,其他線程會「忽略」該記錄。
在 try 代碼塊中加鎖和解鎖,如果加鎖後重試時拋出異常,會導致該記錄永遠無法解鎖。所以,應該在 finnally 塊中執行解鎖操作,或者使用 redis 給鎖加一個過期時間來實現分布式鎖。
從失敗的日誌中取出 jobId,查詢出對應的 jobInfo 數據,如果日誌中的剩餘重試次數大於 0,就執行重試。代碼如下:
調度任務使用的就是前面介紹的 JobTriggerPoolHelper.trigger 方法,最後更新 jobLog 的 alarm_status 值,有兩個作用:
這個類與 JobRegistryHelper 類似,都有一個線程池、一個線程,通過前面 JobRegistryHelper 的學習,可以大膽猜測:
實際上,該類中線程池和線程的作用就是用來 「完成」 一個 job。
當 executor 接收到 admin 的調度請求後,會非同步執行 job,並立刻返回一個回調。
admin 接受到回調後,和前面的 「注冊、下線」 一樣,在線程池中創建線程來處理回調,主要是更新 job 和日誌。
當有回調請求時, public callback 方法(該方法被 AdminBizImpl 調用)會在線程池中創建一個線程,遍歷回調請求的參數列表,依次處理回調參數,代碼如下:
從代碼可以看出,最後調用 XxlJobCompleter.updateHandleInfoAndFinish 方法完成回調邏輯。
如果一個 job 較長時間前被調度,但是一直處於 「運行中」 且它所屬的 executor 已經超過 90 秒沒有心跳了,那麼可以認為該 job 已經丟失了,需要把該 job 結束掉。這個就是線程 monitorThread 的主要功能。
monitorThread 會以 60秒 一次的頻率,從 xxl_job_log 表中找出 10分鍾前調度、仍處於」運行中「狀態、executor 已經下線 的 job,然後調用 XxlJobCompleter.updateHandleInfoAndFinish 來更新 handler 的信息和結束 job,代碼如下:
從代碼可以看出,上面的兩個功能最後都調用了 XxlJobCompleter.updateHandleInfoAndFinish 方法,關於該方法的介紹,可以看後面 XxlJobCompleter 部分的介紹,這里不詳細展開。
如果去看 XxlJobTrigger.triger 方法,會發現每次調度 job 時,都會先新增一個 jobLog 記錄,這也是為什麼 JobFailMonitorHelper 中的線程在重試時,先查詢 jobLog 的原因。
JobLog 作為 job 的調度記錄,還可以用來統計一段時間內 job 的調度次數、成功數等;另外,會清理超出有效期(配置的參數 logretentiondays )的日誌,避免日誌數據過大。很顯然,這又是一個 」自動任務「,可以使用一個線程定時完成。
該類持有一個線程變數,線程以 每分鍾一次的頻率,執行兩個操作:
在線程 run 方法的前半部分,線程會統計 3 天內,每天的調度次數、運行次數、成功運行數、失敗次數;然後更新或新增 xxl_job_log_report 表的數據。
在線程 run 方法的後半部分,線程按天對日誌進行清理,如果當前時間與上次清理的時間相隔超過一天,就會清理日誌記錄,代碼如下:
如果不使用參數 lastCleanLogTime 來記錄上次清理的時間,只是清理一天前創建的數據記錄。那麼該線程每分鍾執行一次時,都會刪除前天當前時刻的數據,導致前一年的數不完整。
使用參數 lastCleanLogTime 來記錄上次清理的時間,並且與當前時間相差超過一天時才清理,能保證前一天的日誌是完整的。
不明白為什麼清理日誌時,不是一次性刪除全部的過期日誌,而是每次刪除 1000條。按理說,這些舊的日誌數據應該已經不在 buffer pool 中了,trigger_time 欄位又是普通索引,那麼 DELETE 操作會先更新到 change buffer 中,之後再合並。現在先查詢再刪除,相當於多了一次 IO 且沒有使用到 change buffer。
admin 服務是用來管理和調度 job 的,用戶也可以在它的管理後台新建一個 job,配置 CRON 和 JobHandler,然後 admin 服務就會按照配置的參數來調度 job。很顯然,這種「自動化工作」也是由線程定時執行的。
1、如果使用線程調度 Job,存在的第一個問題是:如果某個 Job 在調度時比較耗時,就可能阻塞後續的 Job,導致後續 job 的執行有延遲,怎麼解決這個問題?
在前面 JobTriggerPoolHelper 我們已經知道,admin 在調度 job 時是 」使用線程池、線程「 非同步執行調度任務,避免了主線程的阻塞。
2、使用線程定時調度 job,存在的第二個問題是:怎麼保證 job 在指定的時間執行,而不會出現大量延遲?
admin 使用 」預讀「 的方式,提前讀取在未來一段時間內要執行的 job,提前取到內存中,並使用 「時間輪演算法」 按時間分組 job,把未來要執行的 job 下一個時間段執行。
3、還隱藏第三個問題:admin 服務是可以多實例部署的,在這種情況下該怎麼避免一個 job 被多個實例重復調度?
admin 把一張數據表作為 「分布式鎖」 來保證只有一個 admin 實例能執行 job 調度,又通過隨機 sleep 線程一段時間,來降低線程之間的競爭。
下面我們就通過代碼來了解 xxl-job 是怎麼解決上述問題的。
在該類中,定義了一個調度線程,用來調度要執行的 job 和已經過期一段時間的 job,定義代碼如下:
該線程會預讀出 「下次執行時間 <= now + 5000 毫秒內」 的部分 job,根據它們下一次執行時間劃分成三段,執行三種不同的邏輯。
1、下次執行時間在 (- , now - 5000) 范圍內
說明過期時間已經大於 5000 毫秒,這時如果過期策略要求調度,就調度一次。代碼如下:
2、下次執行時間在 [now - 5000, now) 范圍內
說明過期時間小於5000毫秒,只能算是延遲不能算是過期,直接調度一次,代碼如下:
如果 job 的下一次執行時間在 5000 毫秒以內,為了省下下次預讀的 IO 耗時,這里會記錄下 job id,等待後面的調度。
3、下次執行時間在 [now, now + 5000) 范圍內
說明還沒到執行時間,先記錄下 job id, 等待後面的調度 ,代碼如下:
上面的3個步驟結束後,會更新 jobInfo 的 trigger_last_time、trigger_next_time、trigger_status 欄位:
可以看到,通過預讀,一方面會把過期一小段時間的 job 執行一遍,另一方面會把未來一小段時間內要執行的 job 取出,保存進一個 map 對象 ringData 中,等待另一個線程調度。這樣就避免了某些 job 到了時間還沒執行。
因為 admin 是可以多實例部署的,所以在調度 job 時,需要考慮怎麼避免 job 被多次調度。
xxl-job 在前面 JobFailMonitorHelper 中遍歷失敗的 job 時,會對每個 job 設置一個無效的狀態作為 」分布式行鎖「,如果設置失敗就跳過。而在這里,如果還使用該方法,有可能出現,一個 job 被設置為無效狀態後,線程就崩潰了,導致該 job 永遠無法被調度。因此,要盡量避免對 job 狀態的修改。
在這里,admin 服務使用一張表 xxl_job_lock 作為分布式鎖,每個 admin 實例都要先嘗試獲取該表的鎖,獲取成功才能繼續執行;同時,為了降低不同實例之間的競爭,會在線程開始執勤隨機 sleep 一段時間。
如何獲取分布式鎖?
在線程中會開啟一個事務,設置為手動提交,然後對表 xxl_job_lock 執行 FOR UPDATE 查詢。如果該線程執行語句成功,其他實例的線程就會排隊等待該表的鎖,實現了分布式鎖功能。代碼如下:
怎麼降低鎖的競爭?
為了降低鎖競爭,在線程開始前會先 sleep 4000 5000 毫秒的隨機值(不能大於 5000 毫秒,5000 毫秒是預讀的時間范圍);在線程結束當前循環時,會根據耗時和是否有預讀數據,選擇不同的 sleep 策略:
代碼如下:
在前面的線程中,對即將要開始的 job,不是立刻調度,而是按照執行的時刻(秒),把 job id 保存進一個 map 中,然後由 ringThread 線程按時刻進行調度,這只典型的「時間輪演算法」。代碼如下:
每次輪詢調度時,只取出當前時刻(秒)、前一秒內的 job,不會去調度與現在相隔太久的 job。
在執行輪詢調度前,有一個時間在 0 1000 毫秒范圍內的 sleep。如果沒有這個 sleep,該線程會一直執行,而 ringData 中當前時刻(秒)的數據可能已經為空,會導致大量無效的操作;增加了這個 sleep 之後,可以避免這種無效的操作。之所以 sleep 時間在 1000 毫秒以內,是因為調度時刻最小精確到秒,一秒的 sleep 可以避免 job 的延遲。
因為在前面的 scheleThread 線程中,最後一個操作是把 job 的 next_trigger_time 值更新為大於 now + 5000 毫秒,其他 admin 實例 scheleThread 線程的查詢條件是:next_trigger_time < now + 5000,不會查詢出這里調度的 job,所以不需要加分布式鎖。
至此,XxlJobScheler-init 方法的作用我們介紹完畢,下面我們簡單介紹一下 XxlJobScheler-destroy 方法
destroy 方法很簡單,就是銷毀前面初始化的線程池和線程,它銷毀的順序與前面啟動的順序相反。
代碼如下:
因為各個 toStop 方法都很相似,所以我們只介紹 JobScheleHelper 的 toStop 方法。
該方法的步驟如下:
1、設置停止標志位為 true;
2、sleep 一段時間,讓出 CPU 時間片給線程執行任務;
3、如果線程不是終止狀態(線程正在 sleep),中斷它;
4、線程執行 join 方法,直到線程結束,執行最後一次。
代碼如下:
至此,JobScheleHelper 的主要功能就介紹完了,可以看出, admin 服務在啟動時,啟動了多個線程池和線程,非同步執行任務和非同步響應 executor 的請求。
下面,我們介紹前面涉及到的 XxlJobTrigger 和 XxlJobCompleter。
XxlJobTrigger 是調度 job 時的封裝類,它主要工作就是接受傳入的 jobId、調度參數等,查詢對應的 jobGroup、jobInfo,然後調用 ExecutorBiz 對象來執行調度(run 方法)。
該類中三個核心方法及其調用關系如下: trigger -> processTrigger -> runExecutor ,
該方法的功能比較簡單,就是根據傳入的參數查詢 jobGroup 和 jobInfo 對象,設置相關的欄位值,然後調用 processTrigger 方法。
該方法的主要工作分為以下幾步:
1、保存一條調度日誌;
2、從 jobInfo、jobGroup 中取出欄位值,構造 TriggerParam 對象;
3、根據 jobInfo 的路由策略,從 jobGroup 中取出要調度的 executor 地址;
4、調用 runExecutor 方法執行調度;
5、保存調度參數、設置調度信息、更新日誌。
這里不會修改 jobInfo、jobGroup 對象的欄位值,只取出欄位值來使用,對這兩個對象欄位的修改,是在前一步 trigger 方法中進行的。
該方法會執行調度,並返回調度結果,它的核心代碼如下:
這里使用 XxlJobScheler 類取出 ExecutorBiz 對象,以 「懶載入」 的方式給每個 address 創建一個 ExecutorBiz 對象,代碼如下:
可以看出,該類中的三個方法其實可以歸類為:pre -> execute -> post,在執行前、執行時、執行後做一些前置和收尾工作。
該類在前面 JobCompleteHelper 中被使用,最終 job 的完成就是在該類中執行的,該類有兩個主要方法:
下面主要介紹 finishJob 方法。
finishJob 的主要功能是:如果當前任務執行成功了,就調度它的所有子任務,最後把子任務的調度消息添加到當前 job 的日誌中。代碼如下:
需要注意的是:
1、這里依賴於 JobTriggerPoolHelper 來調度 job,所以在 JobCompleteHelper 的監視線程開始時,有一個 50 秒的等待,就是等待 JobTriggerPoolHelper 啟動完成;
2、在 finishJob 方法中,調度子任務的時候,默認子任務的調度結果是成功,注意,這里是指 「調度」 這個行為是成功的,而不是指子任務執行是成功的。
1、XxlJobAdminConfig 作為 admin 服務的啟動入口,要盡可能保持簡潔,作用類似於一個倉庫,來管理和持有所有的類和對象,並不會去啟動具體的線程,它只需要「按下啟動器的按鈕」就可以了;
2、XxlJobScheler 是 admin 服務的啟動器類,它會調用各個輔助類(xxxHelper)來啟動對應的線程;
3、對外的介面,比如調度 job、接收注冊或下線等,都是使用線程池 + 線程 的非同步方式實現,避免 job 對主線程的阻塞;
4、對「自動任務「類的功能,都是使用線程定時執行;
XXL-JOB分布式調度框架全面詳解: https://juejin.cn/post/6948397386926391333
時間輪演算法:https://spongecaptain.cool/post/widget/timingwheel
一個開源的時間輪演算法介紹:https://spongecaptain.cool/post/widget/timingwheel2