异步任务怎么配置
㈠ springmvc怎样异步处理
Spring MVC 3.2开始引入Servlet 3中的基于异步的处理request.往常是返回一个值,而现在是一个Controller方法可以返回一个java.util.concurrent.Callable对象和从Spring MVC的托管线程生产返回值.同时Servlet容器的主线程退出和释放,允许处理其他请求。Spring MVC通过TaskExecutor的帮助调用Callable在一个单独的线程。并且当这个Callable返回时,这个rquest被分配回Servlet容器使用由Callable的返回值继续处理。
这里有这样的Controller方法的一个例子:
@RequestMapping(method=RequestMethod.POST)
public Callable<String> processUpload(final MultipartFile file) {
return new Callable<String>() {
public String call() throws Exception {
// ...
return "someView";
}
};
}1234567891011
Controller方法的另外一个选择是返回一个DeferredResult的实例。在这种情况下,返回值也将由多线程产生.i.e. 一个不是由Spring MVC托管。例如可能产生的结果在应对一些外部事件,比如JMS消息,一个计划任务等等.这里有这样的Controller方法的一个例子:
@RequestMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<String>();
// Save the deferredResult somewhere..
return deferredResult;
}
// In some other thread...
deferredResult.setResult(data);12345678910
如果你没有了解过Servlet 3.0中的异步处理request的特性,这可能有点难以理解.这肯定会帮助阅读.这里有一些关于底层机制基本原理:
1、Servlet 3.0 异步机制
一个ServletRequest可以通过调用 request.startAsync()被放到一个异步的模块中.这样做的主要作用是Servlet,以及任何过滤器,可以退出,但是响应仍将允许开放直到处理完成后。
调用request.startAsync()返回AsyncContext可以用于进一步控制异步处理。例如,它提供一个方法叫dispatch,这个类似于Servlet API,并且它允许应用程序通过Servlet容器线程继续请求处理。
ServletRequest提供获取当前DispatcherType的接口.DispatcherType可以用来区分处理初始请求,一个异步分发,forward,以及其他分发类型。
记住上面的,下面是通过Callable异步请求处理事件的序列:
Controller返回一个Callable对象.
Spring MVC开始异步处理并且提交Callable到TaskExecutor在一个单独的线程中进行处理。
DispatcherServlet与所有的Filter的Servlet容器线程退出,但Response仍然开放。
Callable产生结果并且Spring MVC分发请求给Servlet容器继续处理.
DispatcherServlet再次被调用并且继续异步的处理由Callable产生的结果
DeferredResult的处理顺序与Callable十分相似,由应用程序多线程产生异步结果:
Controller返回一个DeferredResult对象,并且把它保存在内在队列当中或者可以访问它的列表中。
Spring MVC开始异步处理.
DispatcherServlet与所有的Filter的Servlet容器线程退出,但Response仍然开放。
application通过多线程返回DeferredResult中sets值.并且Spring MVC分发request给Servlet容器.
DispatcherServlet再次被调用并且继续异步的处理产生的结果.
为进一步在异步请求处理动机的背景,并且when或者why使用它请看this blog post series.
2、异步请求的异常处理
当Callable执行一个Controller方法的时候带有异常怎么办?简单的回答就是:和当执行一个普通Controller方法带有异常一样.它通过有规律的异常处理机制。详细点来说就是:当Callable带有异常时,Spring MVC以这个Exception为结果分发给Servlet容器.并且引导带有Exception处理request而不是Controller方法返回一个值。当使用DeferredResult时,你可以选择是否把Exception实例通过调用setResult或者setErrorResult进行传值.
3、拦截异步请求
一个HandlerInterceptor可样也可以实现AsyncHandlerInterceptor,通过实现它的方法进行回调.当进行异步处理的时候,将会调用而不是postHandle和afterCompletion.
一个HandlerInterceptor同样可以注册CallableProcessingInterceptor或者一个用于更深度的集成异步request的生命周期.例如处理一个timeout事件.你可以参看AsyncHandlerInterceptor的Javadoc了解更多细节.
DeferredResult类也提供了方法,像onTimeout(Runnable)和onCompletion(Runnable).可以看DeferredResult了解更多细节.
当使用Callable的时候,你可以通过WebAsyncTask来对它进行包装.这样也可以提供注册timeout与completion方法.
4、HTTP Streaming
一个Controller方法可以通过使用DeferredResult与Callable来异步的产生它的返回值.并且这个可以被用来实现long polling技术.就是服务器可以推动事件尽快给客户端。
如果当你需要在一个HTTP response中放入多个event怎么办?这个技术需要”Long Polling”.这就是所谓的”Long Polling”.Spring MVC通过使用ResponseBodyEmitter做为返回值来实现这种功能.这样可以被用来发送多个Object。而不是使用@ResponseBody这样。这样每个对象都可以对应一种HttpMessageConverter被写入到response中。
下面是一个简单的例子:
@RequestMapping("/events")
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// Save the emitter somewhere..
return emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();123456789101112131415
注意:ResponseBodyEmitter同样也可以用做ResponseEntity中的body,用于定制化response的status与headers.
5、HTTP Streaming With Server-Sent Events
SseEmitter是ResponseBodyEmitter的子类,它提供Server-Sent Events.服务器事件发送是”HTTP Streaming”的另一个变种技术.只是从服务器发送的事件按照W3C Server-Sent Events规范来的.
Server-Sent Events能够来用于它们的预期使用目的,就是从server发送events到clients.在Spring MVC中可以很容易的实现.仅仅需要返回一个SseEmitter类型的值.
注意:IE浏览器并不支持Server-Sent Events并且对于更高级的web应用程序消息传递场景:例如在线游戏,在线协作,金融应用以及其它.最好考虑使用Spring的WebSocket来支持.它包含SockJS-style WebSocket的仿真可以被广泛的浏览器加高。(包含IE浏览器).在一个消息中心架构中通过发布-订阅模型也能进行更高级别的与客户消息传递模式进行交互.你可以通过看the following blog post了解更多.
6、HTTP Streaming Directly To The OutputStream
ResponseBodyEmitter允许通过HttpMessageConverter把发送的events写到对象到response中.这可能是最常见的情况。例如写JSON数据.可是有时候它被用来绕开message转换直接写入到response的OutputStream。例如文件下载.这样可以通过返回StreamingResponseBody类型的值做到.
下面就是一个简单的例子:
@RequestMapping("/download")
public StreamingResponseBody handle() {
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
// write...
}
};
}123456789
注意:StreamingResponseBody同样可以用来作为ResponseEntity中的body用来定制化response的状态与headers。
7、配置异步请求
7.1 Servlet容器配置
保证web.xml中application的配置的版本是3.0:
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
...
</web-app>123456789
可以通过web.xml中的子元素<async-supported>true</async-supported>使得DispatcherServlet支持异步.此外的任何Filter参与异步语法处理必须配置为支持ASYNC分派器类型。这样可以确保Spring Framework提供的所有filter都能够异步分发.自从它们继承了OncePerRequestFilter之后.并且在runtime的时候会check filter是否需要被异步调用分发.
下面是web.xml的配置示例:
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<filter>
<filter-name>Spring OpenEntityManagerInViewFilter</filter-name>
<filter-class>org.springframework.~.OpenEntityManagerInViewFilter</filter-class>
<async-supported>true</async-supported>
</filter>
<filter-mapping>
<filter-name>Spring OpenEntityManagerInViewFilter</filter-name>
<url-pattern>/*</url-pattern>
<dispatcher>REQUEST</dispatcher>
<dispatcher>ASYNC</dispatcher>
</filter-mapping>
</web-app>
如果使用Sevlet3,Java配置可以通过WebApplicationInitializer,你同样需要像在web.xml中一样,设置”asyncSupported”标签为ASYNC.为了简化这个配置,考虑继承或者。它们会自动设置这些选项,使它很容易注册过滤器实例。
7.2. Spring MVC配置
Spring MVC提供Java Config与MVC namespace作为选择用来配置处理异步request.WebMvcConfigurer可以通过configureAsyncSupport来进行配置,而xml可以通过子元素来进行配置.
如果你不想依赖Servlet容器(e.g. Tomcat是10)配置的值,允许你配置异步请求默认的timeout值。你可以配置AsyncTaskExecutor用来包含Callable实例作为controller方法的返回值.强烈建议配置这个属性,因为在默认情况下Spring MVC使用SimpleAsyncTaskExecutor。Spring MVC中Java配置与namespace允许你注册CallableProcessingInterceptor与实例.
如果你想覆盖DeferredResult的默认过期时间,你可以选择使用合适的构造器.同样的,对于Callable,你可以通过WebAsyncTask来包装它并且使用相应的构造器来定制化过期时间.WebAsyncTask的构造器同样允许你提供一个AsyncTaskExecutor.
因为水平有限,翻译不足之处还望见谅。
原文地址:spring-framework-reference-4.2.6.RELEASE
㈡ 如何使用asynctask实现异步任务
AsyncTask(异步任务处理) 在使用AsyncTask时处理类需要继承AsyncTask,提供三个泛型参数,并且重载AsyncTask的四个方法(至少重载一个)。
三个泛型参数: 1.Param 任务执行器需要的数据类型 2.Progress 后台计算中使用的进度单位数据类型 3.R。
㈢ python 异步任务队列Celery 使用
在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思。在工头(生产者)提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农(消费者)等着取出一个个任务准备着手做。这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。
其实现架构如下图所示:
可以看到,Celery 主要包含以下几个模块:
celery可以通过pip自动安装。
broker 可选择使用RabbitMQ/redis,backend可选择使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安装请参考对应的官方文档。
------------------------------rabbitmq相关----------------------------------------------------------
官网安装方法: http://www.rabbitmq.com/install-windows.html
启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 启动rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已经启动,可以打开页面来看看 地址: http://localhost:15672/#/
用户名密码都是guest 。进入可以看到具体页面。 关于rabbitmq的配置,网上很多 自己去搜以下就ok了。
------------------------------rabbitmq相关--------------------------------------------------------
项目结构如下:
使用前,需要三个方面:celery配置,celery实例,需执行的任务函数,如下:
Celery 的配置比较多,可以在 官方配置文档: http://docs.celeryproject.org/en/latest/userguide/configuration.html 查询每个配置项的含义。
当然,要保证上述异步任务and下述定时任务都能正常执行,就需要先启动celery worker,启动命令行如下:
需 启动beat ,执行定时任务时, Celery会通过celery beat进程来完成。Celery beat会保持运行, 一旦到了某一定时任务需要执行时, Celery beat便将其加入到queue中. 不像worker进程, Celery beat只需要一个即可。而且为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。
命令行启动:
如果你想将celery worker/beat要放到后台运行,推荐可以扔给supervisor。
supervisor.conf如下:
㈣ 如何用python简单的设计开发异步任务调度队列
首先,客户端可以直接扔任务到一个web services的接口上 –》 web api接收到任务后,会根据客户端的ip和时间戳做task_id,返回给客户,紧接着在redis里面标记这任务的状态。 格式为 func,args,kwargs,timeout=xx,queue_level=xx,interval_time=xx
主服务端:
一个线程,会不停的扫描那个redis hash表,取出任务的interval_time后,进行取模,如果匹配成功,就会塞到 redis sorted set有续集和里面。
主线程,会不停的看看sorted set里面,有没有比自己实现小的任务,有的话,执行并删除。 这里的执行是用多进程,为毛用多进程,因为线程很多时候是不好控制强制干掉的。 每个任务都会用multiprocessing的方式去执行,去调用的时候,会多传进一个task_id,用来把相关的进度推送到redis里面。 另外,fork进程后,我会得到一个pid,我会把pid和timeout的信息,存放到kill_hash里面。 然后会不间断的查看,在指定的timeout内,这pid还在不在,如果还是存在,没有退出的话,说明他的任务不太正常,我们就可以在main(),里面干掉这些任务。
所谓的优先级就是个 High + middle +Low 的三合一链条而已,我每次都会坚持从高到低取任务,如果你的High级别的任务不断的话,那么我会一直干不了低级别的任务了。 代码的体现是在redis sorted set这边,设立三个有序集合,我的worker队列会从high开始做……
那么如果想干掉一个任务是如何操作的,首先我需要在 kill_hash 里面标记任务应该赶紧干掉,在就是在task_hash里面把那个task_id干掉,好让他不会被持续的加入待执行的队列里面。
㈤ 如何测试@Async异步任务
spring3支持@Async注解的异步任务,之前大家都是通过使用如线程池来完成,spring3也是使用这种方式,但更简单。
其具体实现在:org.springframework.aop.interceptor.AsyncExecutionInterceptor,是一个方法拦截器,其invoke方法的部分代码如下:
Java代码
Future<?> result = determineAsyncExecutor(specificMethod).submit(
new Callable<Object>() {
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (Throwable ex) {
ReflectionUtils.rethrowException(ex);
}
return null;
}
});
即把当前任务的调用提交给线程池,很简单。
1、测试无事务的异步任务
这个相对来说比较简单:
1.1、设置任务的返回值为Future:
Java代码
public Future sendSystemMessage(Long[] receiverIds, Message message);
1.2、调用future.get();等待任务结束。
Java代码
Future future = messageApi.sendSystemMessage(userIds, message);
future.get();
这个很简单。
2、测试带事务的异步任务
因为是带事务的,所以异步任务肯定要启动一个线程来执行任务,所以无法在主线程回滚,造成数据会commit到数据库,这在集成测试时肯定是不行的;解决方案是移除异步任务:
2.1、使用spring profile,在测试环境下不执行<task:annotation-driven>即可。
2.2、使用我提供的工具类,在测试时移除异步支持即可:
Java代码
//移除异步支持
if(AopProxyUtils.isAsync(messageApi)) {
AopProxyUtils.removeAsync(messageApi);
}
测试类可以参考MessageApiServiceIT.java
工具类下载 AopProxyUtils.java
3、包级别测试
Java代码
@Async
public void sendSystemMessage() {
sendSystemMessageInner();
}
void sendSystemMessageInner() {
//测试时测试这个方法即可
}
这样测试时测试这个包级别的sendSystemMessageInner方法即可
其实更好的做法是spring内部提供支持,支持这样异步调用的测试。
㈥ 如何运用Spring框架的@Async实现异步任务
异步的方法有3种
1. 最简单的异步调用,返回值为void
2. 带参数的异步调用 异步方法可以传入参数
3. 异常调用返回Future
㈦ 利用php +swoole如何实现异步任务队列
class msgServer
{
private $serv;
function __construct()
{
$this->serv = new SwooleServer("127.0.0.1", 9501);//创建一个服务
$this->serv->set(array('task_worker_num' => 4)); //配置task进程的数量
$this->serv->on('receive', array($this, 'onReceive'));//有数据进来的时候执行
$this->serv->on('task', array($this, 'onTask'));//有任务的时候执行
$this->serv->on('finish', array($this, 'onFinish'));//任务结束时执行
$this->serv->start();
}
public function onReceive($serv, $fd, $from_id, $data)
{
$data = json_decode($data, true);
$task_id = $serv->task($data);//这里发起了任务,于是上面的on('task', array($this, 'onTask'))就会执行
}
public function onTask($serv, $task_id, $from_id, $data)
{
$data['send_res'] = $this->sendMsg($data); //发送短信
//1.7.3之前,是$serv->finish("result");
return "result.";//这里告诉任务结束,于是上面的on('finish', array($this, 'onFinish'))就会执行
}
public function onFinish($serv, $task_id, $data)
{
$this->addSendLog($data); //添加短信发送记录
}
}
㈧ 关于java的Ajax请求到服务器处理完毕不响应json数据到浏览器的问题。
建议把上传后的数据异步处理,不要同步处理,同时返回给前台一个异步任务id,后续通过任务id查询,数据处理情况
㈨ SpringBoot中开启异步任务
顾名思义,异步指的是处理数据时、与主请求不在同一干线、分开支路处理请求,可以为用户提供较为良好的体验、在开发过程中,不影响正常使用的前提下,建议使用异步来处理请求。
1、在主启动类加上注解@EnableAsync
2、在要开启异步任务的方法上加注解@Async
3、异步任务开启成功
㈩ 定时任务类,异步类一般放在哪个包下
一、定时任务
步骤:
1:在启动类上写@EnableScheling注解
2:在要定时任务的类上写@component
3:在要定时执行的方法上写@Scheled(fixedRate=毫秒数)。
二、异步任务
步骤
1: 启动类里面使用@EnableAsync注解开启功能,自动扫描
2:在要异步任务的类上写@component
3:在定义异步任务类写@Async(写在类上代表整个类都是异步,在方法加上代表该类异步执行)