metaq源碼
❶ rocketmq 發送失敗一般怎麼處理
一:RocketMQ簡介
RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:
1.能夠保證嚴格的消息順序
2.提供豐富的消息拉取模式
3.高效的訂閱者水平擴展能力
4.實時的消息訂閱機制
5.億級消息堆積能力
二:安裝RocketMQ
下載源碼
首先我們從githup上獲取RocketMQ的源碼,目前最新的版本為3.5.8,下載地址為: 或者 wget /alibaba/RocketMQ/archive/v3.5.8.tar.gz。請注意:此時我們下載的是源碼,直接解壓時不能用的,所以我們需要編譯之後才能使用。
編譯源碼
在進行編譯源碼之前我們需要安裝JDK。如果你已經安裝過了,請跳過這里。如果你還沒有安裝過JDK,請參考這篇文章(linux環境下安裝JDK)。然後我們還需要安裝一下Maven。Maven的安裝還是比較簡單,只需要去官方上下載的安裝吧,然後直接解壓,再配置一下環境變數就OK。接下來我們把剛才下載來的RockeMQ的源碼解壓到/usr/local/rockemq-source文件夾中。在源碼中有一個Install.sh。如圖所示:
。運行sh install.sh。在編譯完成之後,我們只要target目錄下的alibaba-rocketmq這個文件夾中內容,把alibaba-rocketmq文件夾中的內容移動到/usr/local/rocketmq中。如果你不想編譯的話,可以從這里下載編譯之後的rocketmq。(rocketmq3.5.8)。
配置環境變數
接下來我們需要配置一下環境變數。在終端中輸入以下命令:vi /etc/profile ,在文件的末尾中添加如下兩句話:export rocketmq=/usr/local/rocketmq export PATH=$PATH:$rocketmq/bin。接下來我們使配置的換將變數生效:source /etc/profile.
三:啟動RocketMQ
接下來我們啟動一下剛才編譯的RocketMQ.在啟動之前我們需要修改一下RocketMQ啟動的內存大小(如果你的系統內存比較大的話,請忽略)。我們進入到/usr/local/rocketmq/bin中,在終端中輸入以下命令修改mqnamesrv的內存大小:vi runserver.sh.修改為如圖的內容:
,接下來修改broker的內存大小:vi runbroker.sh:
啟動mqnameserver
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最後的這個 & 不要少。
啟動mqbroker
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以換成你剛才啟動mqnamesrv的IP。autoCreateTopicEnable=true
這句話不要少了。最後的 & 也不要少了。
我們可以通過 ps aux | grep java命令來查看啟動的情況。
到此,rocketmq的安裝完畢。
四:RocketMQ的小例子
procer:
[java] view plain
package com.zkn.newlearn.rocketmq;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* Created by zkn on 2016/10/27.
*/
public class ProcerTest01 {
public static void main(String[] args) {
/**
* 一個應用創建一個Procer,由應用來維護此對象,可以設置為全局對象或者單例<br>
* 注意:ProcerGroupName需要由應用來保證唯一<br>
* ProcerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
* 因為伺服器會回查這個Group下的任意一個Procer
*/
DefaultMQProcer procer = new DefaultMQProcer("ProcerGroupName");
//procer.setNamesrvAddr("192.168.180.1:9876");
procer.setNamesrvAddr("192.168.180.133:9876");
procer.setInstanceName("Procer");
/**
* Procer對象在使用之前必須要調用start初始化,初始化一次即可<br>
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
try {
procer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < 100; i++) {
try {
/**
* 下面這段代碼表明一個Procer對象可以發送多個topic,多個tag的消息。
* 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,<br>
* 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,<br>
* 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。
*/
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2",
"TagB",
"OrderID001",
("Hello MetaQ TagB".getBytes()));
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3",
"TagC",
"OrderID001",
("Hello MetaQ TagC").getBytes());
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
TimeUnit.MILLISECONDS.sleep(1000);
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
/**
* 應用退出時,要調用shutdown來清理資源,關閉網路連接,從MetaQ伺服器上注銷自己
* 注意:我們建議應用在JBOSS、Tomcat等容器的退出銷毀方法里調用shutdown方法
*/
procer.shutdown();
}
}
❷ 怎麼指定rocketmq的jdk
一:RocketMQ簡介RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:1.能夠保證嚴格的消息順序2.提供豐富的消息拉取模式3.高效的訂閱者水平擴展能力4.實時的消息訂閱機制5.億級消息堆積能力二:安裝RocketMQ下載源碼首先我們從githup上獲取RocketMQ的源碼,目前最新的版本為3.5.8,下載地址為:或者wget/alibaba/RocketMQ/archive/v3.5.8.tar.gz。請注意:此時我們下載的是源碼,直接解壓時不能用的,所以我們需要編譯之後才能使用。編譯源碼在進行編譯源碼之前我們需要安裝JDK。如果你已經安裝過了,請跳過這里。如果你還沒有安裝過JDK,請參考這篇文章(Linux環境下安裝JDK)。然後我們還需要安裝一下Maven。Maven的安裝還是比較簡單,只需要去官方上下載的安裝吧,然後直接解壓,再配置一下環境變數就OK。接下來我們把剛才下載來的RockeMQ的源碼解壓到/usr/local/rockemq-source文件夾中。在源碼中有一個Install.sh。如圖所示:。運行shinstall.sh。在編譯完成之後,我們只要target目錄下的alibaba-rocketmq這個文件夾中內容,把alibaba-rocketmq文件夾中的內容移動到/usr/local/rocketmq中。如果你不想編譯的話,可以從這里下載編譯之後的rocketmq。(rocketmq3.5.8)。配置環境變數接下來我們需要配置一下環境變數。在終端中輸入以下命令:vi/etc/profile,在文件的末尾中添加如下兩句話:exportrocketmq=/usr/local/rocketmqexportPATH=$PATH:$rocketmq/bin。接下來我們使配置的換將變數生效:source/etc/profile.三:啟動RocketMQ接下來我們啟動一下剛才編譯的RocketMQ.在啟動之前我們需要修改一下RocketMQ啟動的內存大小(如果你的系統內存比較大的話,請忽略)。我們進入到/usr/local/rocketmq/bin中,在終端中輸入以下命令修改mqnamesrv的內存大小:virunserver.sh.修改為如圖的內容:,接下來修改broker的內存大小:virunbroker.sh:啟動mqnameserver進入到/usr/local/rocketmq/bin中輸入以下命令:nohupshmqnamesrv>~/logs/rocketmqlogs/namesrv.log2>&1&。注意最後的這個&不要少。啟動mqbroker進入到/usr/local/rocketmq/bin中輸入以下命令:nohupshmqbroker-nlocalhost:9876autoCreateTopicEnable=true>~/logs/rocketmqlogs/broker.log2>&1&。注意:localhost可以換成你剛才啟動mqnamesrv的IP。autoCreateTopicEnable=true這句話不要少了。最後的&也不要少了。我們可以通過psaux|grepjava命令來查看啟動的情況。到此,rocketmq的安裝完畢。四:RocketMQ的小例子procer:[java]viewplainpackagecom.zkn.newlearn.rocketmq;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.procer.DefaultMQProcer;importcom.alibaba.rocketmq.client.procer.SendResult;importcom.alibaba.rocketmq.common.message.Message;importcom.alibaba.rocketmq.remoting.exception.RemotingException;importjava.util.concurrent.TimeUnit;/***Createdbyzknon2016/10/27.*/publicclassProcerTest01{publicstaticvoidmain(String[]args){/***一個應用創建一個Procer,由應用來維護此對象,可以設置為全局對象或者單例*注意:ProcerGroupName需要由應用來保證唯一*ProcerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,*因為伺服器會回查這個Group下的任意一個Procer*/DefaultMQProcerprocer=newDefaultMQProcer("ProcerGroupName");//procer.setNamesrvAddr("192.168.180.1:9876");procer.setNamesrvAddr("192.168.180.133:9876");procer.setInstanceName("Procer");/***Procer對象在使用之前必須要調用start初始化,初始化一次即可*注意:切記不可以在每次發送消息時,都調用start方法*/try{procer.start();}catch(MQClientExceptione){e.printStackTrace();}for(inti=0;i<100;i++){try{/***下面這段代碼表明一個Procer對象可以發送多個topic,多個tag的消息。*注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,*例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,*需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。*/{Messagemsg=newMessage("TopicTest1",//topic"TagA",//tag"OrderID001",//key("HelloMetaQ").getBytes());//bodySendResultsendResult=procer.send(msg);System.out.println(sendResult);}{Messagemsg=newMessage("TopicTest2","TagB","OrderID001",("HelloMetaQTagB".getBytes()));SendResultsendResult=procer.send(msg);System.out.println(sendResult);}{Messagemsg=newMessage("TopicTest3","TagC","OrderID001",("HelloMetaQTagC").getBytes());SendResultsendResult=procer.send(msg);System.out.println(sendResult);}TimeUnit.MILLISECONDS.sleep(1000);}catch(MQClientExceptione){e.printStackTrace();}catch(InterruptedExceptione){e.printStackTrace();}catch(RemotingExceptione){e.printStackTrace();}catch(MQBrokerExceptione){e.printStackTrace();}}/***應用退出時,要調用shutdown來清理資源,關閉網路連接,從MetaQ伺服器上注銷自己*注意:我們建議應用在JBOSS、Tomcat等容器的退出銷毀方法里調用shutdown方法*/procer.shutdown();}}
❸ linux怎樣知道配置的rocketmq的環境變數是否生效
一:RocketMQ簡介
RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:
1.能夠保證嚴格的消息順序
2.提供豐富的消息拉取模式
3.高效的訂閱者水平擴展能力
4.實時的消息訂閱機制
5.億級消息堆積能力
二:安裝RocketMQ
下載源碼
首先我們從githup上獲取RocketMQ的源碼,目前最新的版本為3.5.8,下載地址為:https://github.com/alibaba/RocketMQ/releases 或者 wget https://github.com/alibaba/RocketMQ/releases/alibaba/RocketMQ/archive/v3.5.8.tar.gz。請注意:此時我們下載的是源碼,直接解壓時不能用的,所以我們需要編譯之後才能使用。
編譯源碼
在進行編譯源碼之前我們需要安裝JDK。如果你已經安裝過了,請跳過這里。如果你還沒有安裝過JDK,請參考這篇文章(Linux環境下安裝JDK)。然後我們還需要安裝一下Maven。Maven的安裝還是比較簡單,只需要去官方上下載的安裝吧,然後直接解壓,再配置一下環境變數就OK。接下來我們把剛才下載來的RockeMQ的源碼解壓到/usr/local/rockemq-source文件夾中。在源碼中有一個Install.sh。如圖所示:
。運行sh install.sh。在編譯完成之後,我們只要target目錄下的alibaba-rocketmq這個文件夾中內容,把alibaba-rocketmq文件夾中的內容移動到/usr/local/rocketmq中。如果你不想編譯的話,可以從這里下載編譯之後的rocketmq。(rocketmq3.5.8)。
配置環境變數
接下來我們需要配置一下環境變數。在終端中輸入以下命令:vi /etc/profile ,在文件的末尾中添加如下兩句話:export rocketmq=/usr/local/rocketmq export PATH=$PATH:$rocketmq/bin。接下來我們使配置的換將變數生效:source /etc/profile.
三:啟動RocketMQ
接下來我們啟動一下剛才編譯的RocketMQ.在啟動之前我們需要修改一下RocketMQ啟動的內存大小(如果你的系統內存比較大的話,請忽略)。我們進入到/usr/local/rocketmq/bin中,在終端中輸入以下命令修改mqnamesrv的內存大小:vi runserver.sh.修改為如圖的內容:
,接下來修改broker的內存大小:vi runbroker.sh:
啟動mqnameserver
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最後的這個 & 不要少。
啟動mqbroker
進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以換成你剛才啟動mqnamesrv的IP。autoCreateTopicEnable=true
這句話不要少了。最後的 & 也不要少了。
我們可以通過 ps aux | grep java命令來查看啟動的情況。
到此,rocketmq的安裝完畢。
四:RocketMQ的小例子
procer:
[java] view plain
package com.zkn.newlearn.rocketmq;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.procer.DefaultMQProcer;
import com.alibaba.rocketmq.client.procer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* Created by zkn on 2016/10/27.
*/
public class ProcerTest01 {
public static void main(String[] args) {
/**
* 一個應用創建一個Procer,由應用來維護此對象,可以設置為全局對象或者單例<br>
* 注意:ProcerGroupName需要由應用來保證唯一<br>
* ProcerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
* 因為伺服器會回查這個Group下的任意一個Procer
*/
DefaultMQProcer procer = new DefaultMQProcer("ProcerGroupName");
//procer.setNamesrvAddr("192.168.180.1:9876");
procer.setNamesrvAddr("192.168.180.133:9876");
procer.setInstanceName("Procer");
/**
* Procer對象在使用之前必須要調用start初始化,初始化一次即可<br>
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
try {
procer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < 100; i++) {
try {
/**
* 下面這段代碼表明一個Procer對象可以發送多個topic,多個tag的消息。
* 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,<br>
* 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,<br>
* 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。
*/
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2",
"TagB",
"OrderID001",
("Hello MetaQ TagB".getBytes()));
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3",
"TagC",
"OrderID001",
("Hello MetaQ TagC").getBytes());
SendResult sendResult = procer.send(msg);
System.out.println(sendResult);
}
TimeUnit.MILLISECONDS.sleep(1000);
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
/**
* 應用退出時,要調用shutdown來清理資源,關閉網路連接,從MetaQ伺服器上注銷自己
* 注意:我們建議應用在JBOSS、Tomcat等容器的退出銷毀方法里調用shutdown方法
*/
procer.shutdown();
}
}