springboot如何配置mq
㈠ spring boot 使用scheler要不要用activemq
越 女 詞(
㈡ rabbitmq+spring boot,我在配置中配置了重試間隔時間,但是不起作用是什麼原因
在介紹具體的實現思路之前,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另一個是Dead Letter Exchanges。
Time-To-Live Extensions
RabbitMQ允許我們為消息或者隊列設置TTL(time to live),也就是過期時間。TTL表明了一條消息可在隊列中存活的最大時間,單位為毫秒。也就是說,當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在經過TTL秒後「死亡」,成為Dead Letter。如果既配置了消息的TTL,又配置了隊列的TTL,那麼較小的那個值會被取用。更多資料請查閱官方文檔。
Dead Letter Exchange
剛才提到了,被設置了TTL的消息在過期後會成為Dead Letter。其實在RabbitMQ中,一共有三種消息的「死亡」形式:
消息被拒絕。通過調用basic.reject或者basic.nack並且設置的requeue參數為false。
消息因為設置了TTL而過期。
消息進入了一條已經達到最大長度的隊列。
㈢ spring boot rabbitmq 讀取完 什麼時候銷毀
收到重復消息的可能情況有如下幾點 發送方一條消息發送了多次。 接收到消息後,沒有正確通知rabbitmq消息已被消費,導致消息仍然處於隊列中,所以被再次發送。
㈣ spring boot中使用rabbitmq怎麼在代碼裡面獲取消費端的重試次數
在使用SpringBoot中,筆者使用到了RabbitMQ,其中踩了不少地雷,經過些許的刻版終於把它調通了,
筆者主要說的是從生產者生產數據並發送給消費者到後者接收並處理數據這么一個全過程,我這里的數據指的是實體對象.生產者和消費者是處在兩個不同的項目中的.
首先說明下整個過程.
在 Spring-AMQP 中比較重要的類就是 Message ,因為要發送的消息必須要構造成一個 Message 對象來進行傳輸。Message 對象包括兩部分 Body 和 Properties
消息生產者構造好 Message 之後,就會將 Message 發送到指定的 Exchange (交換機),再根據 Exchange 的類型及 routing-key 將消息路由到相應的 queue 中,最後被監聽該 queue 的消費者消費.
㈤ springboot中activemq中的url怎麼設置
於springmvc是通過DispatcherServlet來作為入口的,所以其實只要看Servlet不攔截所有後綴的方法。 如果只要攔截特定的幾個規則,可以配置一個Filter,在Filter中判斷後綴,後綴不對的全給他跳轉到404頁面去
㈥ springboot+maven+elasticsearch6.7.1的版本整合報錯,是包沒下好還是版本不兼容而卻少這個類
elasticsearch6.1配置
config下的elasticsearch.yml
1.pom
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.1.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.39</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties
# Elasticsearch
elasticsearch.ip=xxx.xxx.xxx
elasticsearch.port=9300
elasticsearch.pool=5
2.配置config
@Configuration
public class ElasticsearchConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
/**
* elk集群地址
*/
@Value("${elasticsearch.ip}")
private String hostName;
/**
* 埠
*/
@Value("${elasticsearch.port}")
private String port;
/**
* 集群名稱
*/
@Value("${elasticsearch.cluster.name}")
private String clusterName;
/**
* 連接池
*/
@Value("${elasticsearch.pool}")
private String poolSize;
@Bean
public TransportClient init() {
LOGGER.info("初始化開始。。。。。");
TransportClient transportClient = null;
try {
// 配置信息
Settings esSetting = Settings.builder()
.put("client.transport.sniff", true)//增加嗅探機制,找到ES集群
.put("thread_pool.search.size", Integer.parseInt(poolSize))//增加線程池個數,暫時設為5
.build();
//配置信息Settings自定義,下面設置為EMPTY
transportClient = new PreBuiltTransportClient(Settings.EMPTY);
TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(hostName), Integer.valueOf(port));
transportClient.addTransportAddresses(transportAddress);
} catch (Exception e) {
LOGGER.error("elasticsearch TransportClient create error!!!", e);
}
return transportClient;
}
}
3.配置工具類ElasticsearchUtils
@Component
public class ElasticsearchUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtils.class);
@Autowired
private TransportClient transportClient;
private static TransportClient client;
@PostConstruct
public void init() {
client = this.transportClient;
}
/**
* 創建索引
*
* @param index
* @return
*/
public static boolean createIndex(String index) {
if (!isIndexExist(index)) {
LOGGER.info("Index is not exits!");
}
CreateIndexResponse indexresponse = client.admin().indices().prepareCreate(index).execute().actionGet();
LOGGER.info("執行建立成功?" + indexresponse.isAcknowledged());
return indexresponse.isAcknowledged();
}
/**
* 刪除索引
*
* @param index
* @return
*/
public static boolean deleteIndex(String index) {
if (!isIndexExist(index)) {
LOGGER.info("Index is not exits!");
}
DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
if (dResponse.isAcknowledged()) {
LOGGER.info("delete index " + index + " successfully!");
} else {
LOGGER.info("Fail to delete index " + index);
}
return dResponse.isAcknowledged();
}
/**
* 判斷索引是否存在
*
* @param index
* @return
*/
public static boolean isIndexExist(String index) {
IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet();
if (inExistsResponse.isExists()) {
LOGGER.info("Index [" + index + "] is exist!");
} else {
LOGGER.info("Index [" + index + "] is not exist!");
}
return inExistsResponse.isExists();
}
/**
* 數據添加,正定ID
*
* @param jsonObject 要增加的數據
* @param index 索引,類似資料庫
* @param type 類型,類似表
* @param id 數據ID
* @return
*/
public static String addData(JSONObject jsonObject, String index, String type, String id) {
IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get();
LOGGER.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());
return response.getId();
}
/**
* 數據添加
*
* @param jsonObject 要增加的數據
* @param index 索引,類似資料庫
* @param type 類型,類似表
* @return
*/
public static String addData(JSONObject jsonObject, String index, String type) {
return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
}
/**
* 通過ID刪除數據
*
* @param index 索引,類似資料庫
* @param type 類型,類似表
* @param id 數據ID
*/
public static void deleteDataById(String index, String type, String id) {
DeleteResponse response = client.prepareDelete(index, type, id).execute().actionGet();
LOGGER.info("deleteDataById response status:{},id:{}", response.status().getStatus(), response.getId());
}
/**
* 通過ID 更新數據
*
* @param jsonObject 要增加的數據
* @param index 索引,類似資料庫
* @param type 類型,類似表
* @param id 數據ID
* @return
*/
public static void updateDataById(JSONObject jsonObject, String index, String type, String id) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(index).type(type).id(id).doc(jsonObject);
client.update(updateRequest);
}
/**
* 通過ID獲取數據
*
* @param index 索引,類似資料庫
* @param type 類型,類似表
* @param id 數據ID
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @return
*/
public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {
GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);
if (StringUtils.isNotEmpty(fields)) {
getRequestBuilder.setFetchSource(fields.split(","), null);
}
GetResponse getResponse = getRequestBuilder.execute().actionGet();
return getResponse.getSource();
}
/**
* 使用分詞查詢
*
* @param index 索引名稱
* @param type 類型名稱,可傳入多個type逗號分隔
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @param matchStr 過濾條件(xxx=111,aaa=222)
* @return
*/
public static List<Map<String, Object>> searchListData(String index, String type, String fields, String matchStr) {
return searchListData(index, type, 0, 0, null, fields, null, false, null, matchStr);
}
/**
* 使用分詞查詢
*
* @param index 索引名稱
* @param type 類型名稱,可傳入多個type逗號分隔
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @param sortField 排序欄位
* @param matchPhrase true 使用,短語精準匹配
* @param matchStr 過濾條件(xxx=111,aaa=222)
* @return
*/
public static List<Map<String, Object>> searchListData(String index, String type, String fields, String sortField, boolean matchPhrase, String matchStr) {
return searchListData(index, type, 0, 0, null, fields, sortField, matchPhrase, null, matchStr);
}
/**
* 使用分詞查詢
*
* @param index 索引名稱
* @param type 類型名稱,可傳入多個type逗號分隔
* @param size 文檔大小限制
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @param sortField 排序欄位
* @param matchPhrase true 使用,短語精準匹配
* @param highlightField 高亮欄位
* @param matchStr 過濾條件(xxx=111,aaa=222)
* @return
*/
public static List<Map<String, Object>> searchListData(String index, String type, Integer size, String fields, String sortField, boolean matchPhrase, String highlightField, String matchStr) {
return searchListData(index, type, 0, 0, size, fields, sortField, matchPhrase, highlightField, matchStr);
}
/**
* 使用分詞查詢
*
* @param index 索引名稱
* @param type 類型名稱,可傳入多個type逗號分隔
* @param startTime 開始時間
* @param endTime 結束時間
* @param size 文檔大小限制
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @param sortField 排序欄位
* @param matchPhrase true 使用,短語精準匹配
* @param highlightField 高亮欄位
* @param matchStr 過濾條件(xxx=111,aaa=222)
* @return
*/
public static List<Map<String, Object>> searchListData(String index, String type, long startTime, long endTime, Integer size, String fields, String sortField, boolean matchPhrase, String highlightField, String matchStr) {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
if (StringUtils.isNotEmpty(type)) {
searchRequestBuilder.setTypes(type.split(","));
}
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (startTime > 0 && endTime > 0) {
boolQuery.must(QueryBuilders.rangeQuery("processTime")
.format("epoch_millis")
.from(startTime)
.to(endTime)
.includeLower(true)
.includeUpper(true));
}
//搜索的的欄位
if (StringUtils.isNotEmpty(matchStr)) {
for (String s : matchStr.split(",")) {
String[] ss = s.split("=");
if (ss.length > 1) {
if (matchPhrase == Boolean.TRUE) {
boolQuery.must(QueryBuilders.matchPhraseQuery(s.split("=")[0], s.split("=")[1]));
} else {
boolQuery.must(QueryBuilders.matchQuery(s.split("=")[0], s.split("=")[1]));
}
}
}
}
// 高亮(xxx=111,aaa=222)
if (StringUtils.isNotEmpty(highlightField)) {
HighlightBuilder highlightBuilder = new HighlightBuilder();
//highlightBuilder.preTags("<span style='color:red' >");//設置前綴
//highlightBuilder.postTags("</span>");//設置後綴
// 設置高亮欄位
highlightBuilder.field(highlightField);
searchRequestBuilder.highlighter(highlightBuilder);
}
searchRequestBuilder.setQuery(boolQuery);
if (StringUtils.isNotEmpty(fields)) {
searchRequestBuilder.setFetchSource(fields.split(","), null);
}
searchRequestBuilder.setFetchSource(true);
if (StringUtils.isNotEmpty(sortField)) {
searchRequestBuilder.addSort(sortField, SortOrder.DESC);
}
if (size != null && size > 0) {
searchRequestBuilder.setSize(size);
}
//列印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢
LOGGER.info("\n{}", searchRequestBuilder);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
long totalHits = searchResponse.getHits().totalHits;
long length = searchResponse.getHits().getHits().length;
LOGGER.info("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length);
if (searchResponse.status().getStatus() == 200) {
// 解析對象
return setSearchResponse(searchResponse, highlightField);
}
return null;
}
/**
* 使用分詞查詢,並分頁
*
* @param index 索引名稱
* @param type 類型名稱,可傳入多個type逗號分隔
* @param currentPage 當前頁
* @param pageSize 每頁顯示條數
* @param startTime 開始時間
* @param endTime 結束時間
* @param fields 需要顯示的欄位,逗號分隔(預設為全部欄位)
* @param sortField 排序欄位
* @param matchPhrase true 使用,短語精準匹配
* @param highlightField 高亮欄位
* @param matchStr 過濾條件(xxx=111,aaa=222)
* @return
*/
public static EsPage searchDataPage(String index, String type, int currentPage, int pageSize, long startTime, long endTime, String fields, String sortField, boolean matchPhrase, String highlightField, String matchStr) {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
if (StringUtils.isNotEmpty(type)) {
searchRequestBuilder.setTypes(type.split(","));
}
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
// 需要顯示的欄位,逗號分隔(預設為全部欄位)
if (StringUtils.isNotEmpty(fields)) {
searchRequestBuilder.setFetchSource(fields.split(","), null);
}
//排序欄位
if (StringUtils.isNotEmpty(sortField)) {
searchRequestBuilder.addSort(sortField, SortOrder.DESC);
}
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (startTime > 0 && endTime > 0) {
boolQuery.must(QueryBuilders.rangeQuery("processTime")
.format("epoch_millis")
.from(startTime)
.to(endTime)
.includeLower(true)
.includeUpper(true));
}
// 查詢欄位
if (StringUtils.isNotEmpty(matchStr)) {
for (String s : matchStr.split(",")) {
String[] ss = s.split("=");
if (matchPhrase == Boolean.TRUE) {
boolQuery.must(QueryBuilders.matchPhraseQuery(s.split("=")[0], s.split("=")[1]));
} else {
boolQuery.must(QueryBuilders.matchQuery(s.split("=")[0], s.split("=")[1]));
}
}
}
// 高亮(xxx=111,aaa=222)
if (StringUtils.isNotEmpty(highlightField)) {
HighlightBuilder highlightBuilder = new HighlightBuilder();
//highlightBuilder.preTags("<span style='color:red' >");//設置前綴
//highlightBuilder.postTags("</span>");//設置後綴
// 設置高亮欄位
highlightBuilder.field(highlightField);
searchRequestBuilder.highlighter(highlightBuilder);
}
searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
searchRequestBuilder.setQuery(boolQuery);
// 分頁應用
searchRequestBuilder.setFrom(currentPage).setSize(pageSize);
// 設置是否按查詢匹配度排序
searchRequestBuilder.setExplain(true);
//列印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢
LOGGER.info("\n{}", searchRequestBuilder);
// 執行搜索,返回搜索響應信息
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
long totalHits = searchResponse.getHits().totalHits;
long length = searchResponse.getHits().getHits().length;
LOGGER.debug("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length);
if (searchResponse.status().getStatus() == 200) {
// 解析對象
List<Map<String, Object>> sourceList = setSearchResponse(searchResponse, highlightField);
return new EsPage(currentPage, pageSize, (int) totalHits, sourceList);
}
return null;
}
/**
* 高亮結果集 特殊處理
*
* @param searchResponse
* @param highlightField
*/
private static List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
List<Map<String, Object>> sourceList = new ArrayList<Map<String, Object>>();
StringBuffer stringBuffer = new StringBuffer();
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
searchHit.getSourceAsMap().put("id", searchHit.getId());
if (StringUtils.isNotEmpty(highlightField)) {
System.out.println("遍歷 高亮結果集,覆蓋 正常結果集" + searchHit.getSourceAsMap());
Text[] text = searchHit.getHighlightFields().get(highlightField).getFragments();
if (text != null) {
for (Text str : text) {
stringBuffer.append(str.string());
}
//遍歷 高亮結果集,覆蓋 正常結果集
searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
}
}
sourceList.add(searchHit.getSourceAsMap());
}
return sourceList;
}
}
4.配置分頁工具類EsPage
public class EsPage {
// 指定的或是頁面參數
private int currentPage; // 當前頁
private int pageSize; // 每頁顯示多少條
// 查詢es結果
private int recordCount; // 總記錄數
private List<Map<String, Object>> recordList; // 本頁的數據列表
// 計算
private int pageCount; // 總頁數
private int beginPageIndex; // 頁碼列表的開始索引(包含)
private int endPageIndex; // 頁碼列表的結束索引(包含)
/**
* 只接受前4個必要的屬性,會自動的計算出其他3個屬性的值
*
* @param currentPage
* @param pageSize
* @param recordCount
* @param recordList
*/
public EsPage(int currentPage, int pageSize, int recordCount, List<Map<String, Object>> recordList) {
this.currentPage = currentPage;
this.pageSize = pageSize;
this.recordCount = recordCount;
this.recordList = recordList;
// 計算總頁碼
pageCount = (recordCount + pageSize - 1) / pageSize;
// 計算 beginPageIndex 和 endPageIndex
// >> 總頁數不多於10頁,則全部顯示
if (pageCount <= 10) {
beginPageIndex = 1;
endPageIndex = pageCount;
}
// >> 總頁數多於10頁,則顯示當前頁附近的共10個頁碼
else {
// 當前頁附近的共10個頁碼(前4個 + 當前頁 + 後5個)
beginPageIndex = currentPage - 4;
endPageIndex = currentPage + 5;
// 當前面的頁碼不足4個時,則顯示前10個頁碼
if (beginPageIndex < 1) {
beginPageIndex = 1;
endPageIndex = 10;
}
// 當後面的頁碼不足5個時,則顯示後10個頁碼
if (endPageIndex > pageCount) {
endPageIndex = pageCount;
beginPageIndex = pageCount - 10 + 1;
}
}
}
public int getCurrentPage() {
return currentPage;
}
public void setCurrentPage(int currentPage) {
this.currentPage = currentPage;
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
public int getRecordCount() {
return recordCount;
}
public void setRecordCount(int recordCount) {
this.recordCount = recordCount;
}
public List<Map<String, Object>> getRecordList() {
return recordList;
}
public void setRecordLiswww.scjinhan.com?st<Map<String, Object>> recordList) {
this.recordList = recordList;
}
public int getPageCount() {
return pageCount;
}
public void setPageCount(int pageCount) {
this.pageCount = pageCount;
}
public int getBeginPageIndex() {
return beginPageIndex;
}
public void setBeginPageIndex(int beginPageIndex) {
this.beginPageIndex = beginPageIndex;
}
public int getEndPageIndex() {
return endPageIndex;
}
public void setEndPageIndex(int endPageIndex) {
this.endPageIndex = endPageIndex;
}
}
5.測試
@Test
public void createIndexTest() {
ElasticsearchUtils.createIndex("ymq_index");
ElasticsearchUtils.createIndex("ymq_indexsssss");
}
㈦ 微服務一定要用springboot嗎
SpringBoot設計的目的是為了簡化Spring應用初期工程的搭建以及開發過程。從一定的角度上說,SpringBoot並沒有在Spring的基礎上引入新的東西,只是在Spring和一些第三方的框架(比如:Mybatis、Redis、ActiveMQ...)的基礎上進行了整合和封裝,基於約定大於配置的思想,通過定義的註解替代了Spring應用中的.xml配置文件,使得項目的搭建、開發和部署變得簡單。
微服務從業務層面對項目進行分割,注重項目粒度的劃分,這也意味著一個項目將會被分成很多個子項目,比如:文件上傳為一個項目,用戶登錄鑒權為一個項目,項目之間獨立部署並通過協議進行數據交互。因此,利用SpringBoot做為微服務的開發框架,使得編碼、配置、部署、監控變得簡單。
㈧ spring boot怎樣向rabbitmq 中插入對象
SpringBoot將在類路徑中或從ServletContext的根目錄中提供名為/static(或/public或/resources或/META-INF/resources)的目錄中的靜態內容。也就是說默認情況下,可以將靜態文件放到static,public,resources,/META-INF/resources四個目錄下。如果一個文件可以放在四個路徑下,那肯定會有個先後之分,因此我做了一個十分簡單的驗證。驗證方法就是首先在四個路徑中放入相同名字的html文件,然後通過瀏覽器訪問,在確認了優先順序最高的那個之後,在其他的路徑中寫入另外一個相同文件名的html,再通過瀏覽器訪問,判斷出剩下的路徑的優先順序,以此類推在,直到將四個排序完成。代碼結構如下:在經過驗證之後,得出的結論為META-INF/resources>resources>static>public
㈨ springbootapplication包含哪些註解
1、##@SpringBootConfiguration:讀取配置文件,配置文件的路徑是當前根目錄(src/main/resources/application.yml等)。
2、##EnableAutoConfiguration:開啟自動配置,掃描當前的所有依賴的jar包,發現新的依賴出現將會將會根據依賴完各種自動配置(掃描start_web,自動配置內置tomcat默認路徑、埠;依賴了rabbitmq,自動配置rabbitTemble)。
3、##ComponetScan:屬於Spring框架。
(9)springboot如何配置mq擴展閱讀
SpringBoot基於所添加的依賴,去「猜測」你想要如何配置Spring。比如引入了spring-boot-starter-web,而這個啟動器中添加了tomcat、SpringMVC的依賴。
此時自動配置就知道是要開發一個web應用,所以就幫完成了web及SpringMVC的默認配置了!我們使用SpringBoot構建一個項目,只需要引入所需框架的依賴,配置就可以交給SpringBoot處理了。
㈩ spring-boot 如何才能集成mq 和kafka哪個好
Jafka/KafkaKafka是Apache下的一個子項目,是一個高性能跨語言分布式Publish/Subscribe消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;