非同步任務怎麼配置
㈠ springmvc怎樣非同步處理
Spring MVC 3.2開始引入Servlet 3中的基於非同步的處理request.往常是返回一個值,而現在是一個Controller方法可以返回一個java.util.concurrent.Callable對象和從Spring MVC的託管線程生產返回值.同時Servlet容器的主線程退出和釋放,允許處理其他請求。Spring MVC通過TaskExecutor的幫助調用Callable在一個單獨的線程。並且當這個Callable返回時,這個rquest被分配回Servlet容器使用由Callable的返回值繼續處理。
這里有這樣的Controller方法的一個例子:
@RequestMapping(method=RequestMethod.POST)
public Callable<String> processUpload(final MultipartFile file) {
return new Callable<String>() {
public String call() throws Exception {
// ...
return "someView";
}
};
}1234567891011
Controller方法的另外一個選擇是返回一個DeferredResult的實例。在這種情況下,返回值也將由多線程產生.i.e. 一個不是由Spring MVC託管。例如可能產生的結果在應對一些外部事件,比如JMS消息,一個計劃任務等等.這里有這樣的Controller方法的一個例子:
@RequestMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<String>();
// Save the deferredResult somewhere..
return deferredResult;
}
// In some other thread...
deferredResult.setResult(data);12345678910
如果你沒有了解過Servlet 3.0中的非同步處理request的特性,這可能有點難以理解.這肯定會幫助閱讀.這里有一些關於底層機制基本原理:
1、Servlet 3.0 非同步機制
一個ServletRequest可以通過調用 request.startAsync()被放到一個非同步的模塊中.這樣做的主要作用是Servlet,以及任何過濾器,可以退出,但是響應仍將允許開放直到處理完成後。
調用request.startAsync()返回AsyncContext可以用於進一步控制非同步處理。例如,它提供一個方法叫dispatch,這個類似於Servlet API,並且它允許應用程序通過Servlet容器線程繼續請求處理。
ServletRequest提供獲取當前DispatcherType的介面.DispatcherType可以用來區分處理初始請求,一個非同步分發,forward,以及其他分發類型。
記住上面的,下面是通過Callable非同步請求處理事件的序列:
Controller返回一個Callable對象.
Spring MVC開始非同步處理並且提交Callable到TaskExecutor在一個單獨的線程中進行處理。
DispatcherServlet與所有的Filter的Servlet容器線程退出,但Response仍然開放。
Callable產生結果並且Spring MVC分發請求給Servlet容器繼續處理.
DispatcherServlet再次被調用並且繼續非同步的處理由Callable產生的結果
DeferredResult的處理順序與Callable十分相似,由應用程序多線程產生非同步結果:
Controller返回一個DeferredResult對象,並且把它保存在內在隊列當中或者可以訪問它的列表中。
Spring MVC開始非同步處理.
DispatcherServlet與所有的Filter的Servlet容器線程退出,但Response仍然開放。
application通過多線程返回DeferredResult中sets值.並且Spring MVC分發request給Servlet容器.
DispatcherServlet再次被調用並且繼續非同步的處理產生的結果.
為進一步在非同步請求處理動機的背景,並且when或者why使用它請看this blog post series.
2、非同步請求的異常處理
當Callable執行一個Controller方法的時候帶有異常怎麼辦?簡單的回答就是:和當執行一個普通Controller方法帶有異常一樣.它通過有規律的異常處理機制。詳細點來說就是:當Callable帶有異常時,Spring MVC以這個Exception為結果分發給Servlet容器.並且引導帶有Exception處理request而不是Controller方法返回一個值。當使用DeferredResult時,你可以選擇是否把Exception實例通過調用setResult或者setErrorResult進行傳值.
3、攔截非同步請求
一個HandlerInterceptor可樣也可以實現AsyncHandlerInterceptor,通過實現它的方法進行回調.當進行非同步處理的時候,將會調用而不是postHandle和afterCompletion.
一個HandlerInterceptor同樣可以注冊CallableProcessingInterceptor或者一個用於更深度的集成非同步request的生命周期.例如處理一個timeout事件.你可以參看AsyncHandlerInterceptor的Javadoc了解更多細節.
DeferredResult類也提供了方法,像onTimeout(Runnable)和onCompletion(Runnable).可以看DeferredResult了解更多細節.
當使用Callable的時候,你可以通過WebAsyncTask來對它進行包裝.這樣也可以提供注冊timeout與completion方法.
4、HTTP Streaming
一個Controller方法可以通過使用DeferredResult與Callable來非同步的產生它的返回值.並且這個可以被用來實現long polling技術.就是伺服器可以推動事件盡快給客戶端。
如果當你需要在一個HTTP response中放入多個event怎麼辦?這個技術需要」Long Polling」.這就是所謂的」Long Polling」.Spring MVC通過使用ResponseBodyEmitter做為返回值來實現這種功能.這樣可以被用來發送多個Object。而不是使用@ResponseBody這樣。這樣每個對象都可以對應一種HttpMessageConverter被寫入到response中。
下面是一個簡單的例子:
@RequestMapping("/events")
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// Save the emitter somewhere..
return emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();123456789101112131415
注意:ResponseBodyEmitter同樣也可以用做ResponseEntity中的body,用於定製化response的status與headers.
5、HTTP Streaming With Server-Sent Events
SseEmitter是ResponseBodyEmitter的子類,它提供Server-Sent Events.伺服器事件發送是」HTTP Streaming」的另一個變種技術.只是從伺服器發送的事件按照W3C Server-Sent Events規范來的.
Server-Sent Events能夠來用於它們的預期使用目的,就是從server發送events到clients.在Spring MVC中可以很容易的實現.僅僅需要返回一個SseEmitter類型的值.
注意:IE瀏覽器並不支持Server-Sent Events並且對於更高級的web應用程序消息傳遞場景:例如在線游戲,在線協作,金融應用以及其它.最好考慮使用Spring的WebSocket來支持.它包含SockJS-style WebSocket的模擬可以被廣泛的瀏覽器加高。(包含IE瀏覽器).在一個消息中心架構中通過發布-訂閱模型也能進行更高級別的與客戶消息傳遞模式進行交互.你可以通過看the following blog post了解更多.
6、HTTP Streaming Directly To The OutputStream
ResponseBodyEmitter允許通過HttpMessageConverter把發送的events寫到對象到response中.這可能是最常見的情況。例如寫JSON數據.可是有時候它被用來繞開message轉換直接寫入到response的OutputStream。例如文件下載.這樣可以通過返回StreamingResponseBody類型的值做到.
下面就是一個簡單的例子:
@RequestMapping("/download")
public StreamingResponseBody handle() {
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
// write...
}
};
}123456789
注意:StreamingResponseBody同樣可以用來作為ResponseEntity中的body用來定製化response的狀態與headers。
7、配置非同步請求
7.1 Servlet容器配置
保證web.xml中application的配置的版本是3.0:
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
...
</web-app>123456789
可以通過web.xml中的子元素<async-supported>true</async-supported>使得DispatcherServlet支持非同步.此外的任何Filter參與非同步語法處理必須配置為支持ASYNC分派器類型。這樣可以確保Spring Framework提供的所有filter都能夠非同步分發.自從它們繼承了OncePerRequestFilter之後.並且在runtime的時候會check filter是否需要被非同步調用分發.
下面是web.xml的配置示例:
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<filter>
<filter-name>Spring OpenEntityManagerInViewFilter</filter-name>
<filter-class>org.springframework.~.OpenEntityManagerInViewFilter</filter-class>
<async-supported>true</async-supported>
</filter>
<filter-mapping>
<filter-name>Spring OpenEntityManagerInViewFilter</filter-name>
<url-pattern>/*</url-pattern>
<dispatcher>REQUEST</dispatcher>
<dispatcher>ASYNC</dispatcher>
</filter-mapping>
</web-app>
如果使用Sevlet3,Java配置可以通過WebApplicationInitializer,你同樣需要像在web.xml中一樣,設置」asyncSupported」標簽為ASYNC.為了簡化這個配置,考慮繼承或者。它們會自動設置這些選項,使它很容易注冊過濾器實例。
7.2. Spring MVC配置
Spring MVC提供Java Config與MVC namespace作為選擇用來配置處理非同步request.WebMvcConfigurer可以通過configureAsyncSupport來進行配置,而xml可以通過子元素來進行配置.
如果你不想依賴Servlet容器(e.g. Tomcat是10)配置的值,允許你配置非同步請求默認的timeout值。你可以配置AsyncTaskExecutor用來包含Callable實例作為controller方法的返回值.強烈建議配置這個屬性,因為在默認情況下Spring MVC使用SimpleAsyncTaskExecutor。Spring MVC中Java配置與namespace允許你注冊CallableProcessingInterceptor與實例.
如果你想覆蓋DeferredResult的默認過期時間,你可以選擇使用合適的構造器.同樣的,對於Callable,你可以通過WebAsyncTask來包裝它並且使用相應的構造器來定製化過期時間.WebAsyncTask的構造器同樣允許你提供一個AsyncTaskExecutor.
因為水平有限,翻譯不足之處還望見諒。
原文地址:spring-framework-reference-4.2.6.RELEASE
㈡ 如何使用asynctask實現非同步任務
AsyncTask(非同步任務處理) 在使用AsyncTask時處理類需要繼承AsyncTask,提供三個泛型參數,並且重載AsyncTask的四個方法(至少重載一個)。
三個泛型參數: 1.Param 任務執行器需要的數據類型 2.Progress 後台計算中使用的進度單位數據類型 3.R。
㈢ python 非同步任務隊列Celery 使用
在 Python 中定義 Celery 的時候,我們要引入 Broker,中文翻譯過來就是「中間人」的意思。在工頭(生產者)提出任務的時候,把所有的任務放到 Broker 裡面,在 Broker 的另外一頭,一群碼農(消費者)等著取出一個個任務准備著手做。這種模式註定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
其實現架構如下圖所示:
可以看到,Celery 主要包含以下幾個模塊:
celery可以通過pip自動安裝。
broker 可選擇使用RabbitMQ/redis,backend可選擇使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安裝請參考對應的官方文檔。
------------------------------rabbitmq相關----------------------------------------------------------
官網安裝方法: http://www.rabbitmq.com/install-windows.html
啟動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 啟動rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已經啟動,可以打開頁面來看看 地址: http://localhost:15672/#/
用戶名密碼都是guest 。進入可以看到具體頁面。 關於rabbitmq的配置,網上很多 自己去搜以下就ok了。
------------------------------rabbitmq相關--------------------------------------------------------
項目結構如下:
使用前,需要三個方面:celery配置,celery實例,需執行的任務函數,如下:
Celery 的配置比較多,可以在 官方配置文檔: http://docs.celeryproject.org/en/latest/userguide/configuration.html 查詢每個配置項的含義。
當然,要保證上述非同步任務and下述定時任務都能正常執行,就需要先啟動celery worker,啟動命令行如下:
需 啟動beat ,執行定時任務時, Celery會通過celery beat進程來完成。Celery beat會保持運行, 一旦到了某一定時任務需要執行時, Celery beat便將其加入到queue中. 不像worker進程, Celery beat只需要一個即可。而且為了避免有重復的任務被發送出去,所以Celery beat僅能有一個。
命令行啟動:
如果你想將celery worker/beat要放到後台運行,推薦可以扔給supervisor。
supervisor.conf如下:
㈣ 如何用python簡單的設計開發非同步任務調度隊列
首先,客戶端可以直接扔任務到一個web services的介面上 –》 web api接收到任務後,會根據客戶端的ip和時間戳做task_id,返回給客戶,緊接著在redis裡面標記這任務的狀態。 格式為 func,args,kwargs,timeout=xx,queue_level=xx,interval_time=xx
主服務端:
一個線程,會不停的掃描那個redis hash表,取出任務的interval_time後,進行取模,如果匹配成功,就會塞到 redis sorted set有續集和裡面。
主線程,會不停的看看sorted set裡面,有沒有比自己實現小的任務,有的話,執行並刪除。 這里的執行是用多進程,為毛用多進程,因為線程很多時候是不好控制強制幹掉的。 每個任務都會用multiprocessing的方式去執行,去調用的時候,會多傳進一個task_id,用來把相關的進度推送到redis裡面。 另外,fork進程後,我會得到一個pid,我會把pid和timeout的信息,存放到kill_hash裡面。 然後會不間斷的查看,在指定的timeout內,這pid還在不在,如果還是存在,沒有退出的話,說明他的任務不太正常,我們就可以在main(),裡面幹掉這些任務。
所謂的優先順序就是個 High + middle +Low 的三合一鏈條而已,我每次都會堅持從高到低取任務,如果你的High級別的任務不斷的話,那麼我會一直幹不了低級別的任務了。 代碼的體現是在redis sorted set這邊,設立三個有序集合,我的worker隊列會從high開始做……
那麼如果想幹掉一個任務是如何操作的,首先我需要在 kill_hash 裡面標記任務應該趕緊幹掉,在就是在task_hash裡面把那個task_id幹掉,好讓他不會被持續的加入待執行的隊列裡面。
㈤ 如何測試@Async非同步任務
spring3支持@Async註解的非同步任務,之前大家都是通過使用如線程池來完成,spring3也是使用這種方式,但更簡單。
其具體實現在:org.springframework.aop.interceptor.AsyncExecutionInterceptor,是一個方法攔截器,其invoke方法的部分代碼如下:
Java代碼
Future<?> result = determineAsyncExecutor(specificMethod).submit(
new Callable<Object>() {
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (Throwable ex) {
ReflectionUtils.rethrowException(ex);
}
return null;
}
});
即把當前任務的調用提交給線程池,很簡單。
1、測試無事務的非同步任務
這個相對來說比較簡單:
1.1、設置任務的返回值為Future:
Java代碼
public Future sendSystemMessage(Long[] receiverIds, Message message);
1.2、調用future.get();等待任務結束。
Java代碼
Future future = messageApi.sendSystemMessage(userIds, message);
future.get();
這個很簡單。
2、測試帶事務的非同步任務
因為是帶事務的,所以非同步任務肯定要啟動一個線程來執行任務,所以無法在主線程回滾,造成數據會commit到資料庫,這在集成測試時肯定是不行的;解決方案是移除非同步任務:
2.1、使用spring profile,在測試環境下不執行<task:annotation-driven>即可。
2.2、使用我提供的工具類,在測試時移除非同步支持即可:
Java代碼
//移除非同步支持
if(AopProxyUtils.isAsync(messageApi)) {
AopProxyUtils.removeAsync(messageApi);
}
測試類可以參考MessageApiServiceIT.java
工具類下載 AopProxyUtils.java
3、包級別測試
Java代碼
@Async
public void sendSystemMessage() {
sendSystemMessageInner();
}
void sendSystemMessageInner() {
//測試時測試這個方法即可
}
這樣測試時測試這個包級別的sendSystemMessageInner方法即可
其實更好的做法是spring內部提供支持,支持這樣非同步調用的測試。
㈥ 如何運用Spring框架的@Async實現非同步任務
非同步的方法有3種
1. 最簡單的非同步調用,返回值為void
2. 帶參數的非同步調用 非同步方法可以傳入參數
3. 異常調用返回Future
㈦ 利用php +swoole如何實現非同步任務隊列
class msgServer
{
private $serv;
function __construct()
{
$this->serv = new SwooleServer("127.0.0.1", 9501);//創建一個服務
$this->serv->set(array('task_worker_num' => 4)); //配置task進程的數量
$this->serv->on('receive', array($this, 'onReceive'));//有數據進來的時候執行
$this->serv->on('task', array($this, 'onTask'));//有任務的時候執行
$this->serv->on('finish', array($this, 'onFinish'));//任務結束時執行
$this->serv->start();
}
public function onReceive($serv, $fd, $from_id, $data)
{
$data = json_decode($data, true);
$task_id = $serv->task($data);//這里發起了任務,於是上面的on('task', array($this, 'onTask'))就會執行
}
public function onTask($serv, $task_id, $from_id, $data)
{
$data['send_res'] = $this->sendMsg($data); //發送簡訊
//1.7.3之前,是$serv->finish("result");
return "result.";//這里告訴任務結束,於是上面的on('finish', array($this, 'onFinish'))就會執行
}
public function onFinish($serv, $task_id, $data)
{
$this->addSendLog($data); //添加簡訊發送記錄
}
}
㈧ 關於java的Ajax請求到伺服器處理完畢不響應json數據到瀏覽器的問題。
建議把上傳後的數據非同步處理,不要同步處理,同時返回給前台一個非同步任務id,後續通過任務id查詢,數據處理情況
㈨ SpringBoot中開啟非同步任務
顧名思義,非同步指的是處理數據時、與主請求不在同一干線、分開支路處理請求,可以為用戶提供較為良好的體驗、在開發過程中,不影響正常使用的前提下,建議使用非同步來處理請求。
1、在主啟動類加上註解@EnableAsync
2、在要開啟非同步任務的方法上加註解@Async
3、非同步任務開啟成功
㈩ 定時任務類,非同步類一般放在哪個包下
一、定時任務
步驟:
1:在啟動類上寫@EnableScheling註解
2:在要定時任務的類上寫@component
3:在要定時執行的方法上寫@Scheled(fixedRate=毫秒數)。
二、非同步任務
步驟
1: 啟動類裡面使用@EnableAsync註解開啟功能,自動掃描
2:在要非同步任務的類上寫@component
3:在定義非同步任務類寫@Async(寫在類上代表整個類都是非同步,在方法加上代表該類非同步執行)