怎麼寫netty伺服器
㈠ 如何實現Netty框架中伺服器端的消息推送
netty框架是用在伺服器端,客戶端是嵌入式編程,通過自定義的tcp通信協議進行連接的,現在需求是這樣的,伺服器端只是用來和客戶端進行通信,現在有第三方如微信端進行支付成功後在資料庫里生成了一條數據,表示要往某個客戶端發送指令,以下兩種方式可供參考:1、微信端生成通訊指令後調用TCP端的介面(負責通訊程序和資料庫交互的),在介面程序中通過定義Socket連到通訊程序伺服器端,根據通道編號去發送,但是這種會導致伺服器端的tcp客戶端連接變得更多。
2、直接在netty框架中定義了scheleAtF。
當然也可藉助第三方工具來完成推送。例如極光推送,極光推送具有以下功能:
1、多種消息類型
開發者可以輕松地通過極光發送各個移動平台的系統通知,還可以在控制台編輯多種富文本展示模板; 極光還提供自定義消息的透傳,客戶端接到消息內容後根據自己的邏輯自由處理。
2、用戶和推送統計
完整的消息生命周期查詢,並且可以形成「推送報表」與「用戶統計報表」呈現給開發者,用來觀察推送的效果和應用發展趨勢。
3、簡訊補充
通過極光後台推送APP通知消息,對於一些重要又不能遺漏的信息可以調用極光簡訊的後台對未收到的客戶端發送簡訊通知,保證消息的可靠性。
4、A/B 測試
合理的推送能夠激活用戶,提高用戶粘性,使用A/B分組測試的科學方法,根據測試反饋的結果,幫助開發者選擇最優化的推送方案。
5、極光推送安全包
為金融、新聞、政務及其他對推送安全要求極高的客戶提供安全嚴謹、穩定可靠的信息推送解決方案
6、可定製的私有雲
對於安全性要求更高,希望推送數據和系統存儲在自己伺服器的客戶,及個性化需求需要定製開發的,性能更高要求的,或者想擁有自己推送平台的甚至要求源碼授權二次開發的開發者,極光提供全功能的私有雲解決方案。
深圳市和訊華谷信息技術有限公司(極光 Aurora Mobile,納斯達克股票代碼:JG)成立於2011年,是中國領先的開發者服務提供商,專注於為開發者提供穩定高效的消息推送、一鍵認證以及流量變現等服務,助力開發者的運營、增長與變現。同時,極光的行業應用已經拓展至市場洞察、金融風控與商業地理服務,助力各行各業優化決策、提升效率。
㈡ 剛學習netty和java,不會用,但現在急需,您有沒有用netty編寫的伺服器和客戶端的例子,要能互相發送包
(一)handler處理篇
首先,是handler,初次接觸netty的朋友要注意,handler不是一個單例.即每個channel下都會有自己的一個handler實例.
public class ServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
ServerHandler.class.getName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
logger.log(Level.INFO, "Channel state changed: {0}", e);
}
super.handleUpstream(ctx, e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
System.out.println(this);
String request = (String) e.getMessage();
//如果接受到客戶端發送的bye指令,那麼就給客戶端回復一個bye指令,客戶端接受到後,主動關閉連接
//伺服器端通過ChannelFutureListener.CLOSE,當它認為客戶端已經接受到伺服器發送的bye後,也主動關閉連接
if (request.toLowerCase().equals("bye")) {
ChannelFuture future = e.getChannel().write("bye\r\n");
future.addListener(ChannelFutureListener.CLOSE);
} else {
//以下是我初步解析客戶端發送過來的數據,然後決定處理方式
RecevieData receivedaData = MessageDecoder.decode(request);
if (null != receivedaData) {
//伺服器第5版
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//然後判斷命令是否存在
for (String s : CommandCode.COMMANDS) {
if (s.equals(receivedaData.getActionType())) {
COMMAND_FLAG.set(true);
if (s.equals(CommandCode.KEEP_ALIVE)) {
serverChannelGroup.addChannel(e.getChannel());
}
break;
} else {
COMMAND_FLAG.set(false);
}
}
if (COMMAND_FLAG.get()) {
COMMAND_FLAG.set(false);
//將這個命令傳遞給下一個handler來處理.
//這里的"下一個handler"即為用戶自己定義的處理handler
ctx.sendUpstream(e);
} else {
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
}
} else {
//版本錯誤
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果格式錯誤,那麼直接返回
e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
}
}
在上面這個handler中,我使用了ctx.sendUpstream(e);來處理,個人覺得此處為了要實現執行運行時代碼,也可以使用介面等方式.但既然netty提供了sendUpstream 的方法,我們用這個豈不是更方便^_^
下面是使用SSL連接的handler
public class ServerSSLHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
ServerSSLHandler.class.getName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
logger.log(Level.INFO, "Channel state changed: {0}", e);
}
super.handleUpstream(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
//ssl握手
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
sslHandler.handshake();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
System.out.println(this);
String request = (String) e.getMessage();
//如果接受到客戶端發送的bye指令,那麼就給客戶端回復一個bye指令,客戶端接受到後,主動關閉連接
//伺服器端通過ChannelFutureListener.CLOSE,當它認為客戶端已經接受到伺服器發送的bye後,也主動關閉連接
if (request.toLowerCase().equals("bye")) {
ChannelFuture future = e.getChannel().write("bye\r\n");
future.addListener(ChannelFutureListener.CLOSE);
} else {
//以下是我初步解析客戶端發送過來的數據,然後決定處理方式
RecevieData receivedaData = MessageDecoder.decode(request);
if (null != receivedaData) {
//伺服器第5版
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//然後判斷命令是否存在
for (String s : CommandCode.COMMANDS) {
if (s.equals(receivedaData.getActionType())) {
COMMAND_FLAG.set(true);
if (s.equals(CommandCode.KEEP_ALIVE)) {
serverChannelGroup.addChannel(e.getChannel());
}
break;
} else {
COMMAND_FLAG.set(false);
}
}
if (COMMAND_FLAG.get()) {
COMMAND_FLAG.set(false);
//將這個命令傳遞給下一個handler來處理.
//這里的"下一個handler"即為用戶自己定義的處理handler
ctx.sendUpstream(e);
} else {
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
}
} else {
//版本錯誤
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果格式錯誤,那麼直接返回
e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
}
}
當我們有了2個handler後,當然就是要把他們添加到我們的Pipeline中
public class ServerPipelineFactory implements
ChannelPipelineFactory {
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = pipeline();
ServerConfig config = ServerConfig.getInstance();
try {
if (config.ssl()) {
SSLEngine engine =
SecureSslContextFactory.getServerContext().createSSLEngine();
//說明是伺服器端SslContext
engine.setUseClientMode(false);
pipeline.addLast("ssl", new SslHandler(engine));
}
//此Decoder可以自動解析一句以\r\n結束的命令,我為了方便,也用了這個Decoder
//使用這個Decoder,我不用刻意發送命令長度用於解析,只要沒有收到\r\n說明數據還
//沒有發送完畢.這個Decoder會等到收到\r\n後調用下個handler
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
//字串解碼,可以自己設置charset
pipeline.addLast("decoder", new StringDecoder());
//字串編碼,可以自己設置charset
pipeline.addLast("encoder", new StringEncoder());
if (config.ssl()) {
//如果開啟了SSL,那麼使用sslhandler
pipeline.addLast("sslhandler", new ServerSSLHandler());
} else {
//如果沒有開啟SSL,那麼使用普通handler
pipeline.addLast("handler", new ServerHandler());
}
//遍歷配置文件中的伺服器handler,將其添加進Pipeline鏈中
for (Element e : config.handler()) {
pipeline.addLast(e.attribute(e.getQName("id")).getValue().trim(),
(ChannelHandler) Class.forName(e.attribute(e.getQName("class")).getValue().trim()).newInstance());
}
} catch (DocumentException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (InstantiationException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (IllegalAccessException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (ClassNotFoundException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
}
return pipeline;
}
}
server.xml,放到lib下即可,注意其中的handler 以及clienthandler 項,如果你新建了自己的handler,那麼需要在此xml中配置一下.
<?xml version="1.0" encoding="UTF-8"?>
<root>
<!-- 配置主機地址 -->
<host>127.0.0.1</host>
<!-- 配置服務埠 -->
<port>8080</port>
<!-- 是否啟用ssl,1為啟用,0為停用 -->
<ssl>0</ssl>
<!--伺服器業務handler -->
<handler id="timeHandler" class="com.chinatenet.nio.server.handler.ServerTimeHandler" />
<!--客戶端業務handler -->
<clienthandler id="timeHandler" class="com.chinatenet.nio.client.handler.ClientTimeHandler" />
</root>
到此,一個簡單的可擴展handler的伺服器雛形就出來了
下面,我們添加一個自定義的伺服器處理handler進來
public class ServerTimeHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
RecevieData receivedaData = MessageDecoder.decode((String) e.getMessage());
if (CommandCode.GET_TIME.equals(receivedaData.getActionType())
|| CommandCode.KEEP_ALIVE.equals(receivedaData.getActionType())) {
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//回復客戶端後,即可進行自己的業務.當然.這里可以根據需要,看
//是先回復再處理還是等處理結果出來後,將結果返回客戶端
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.OK,
System.currentTimeMillis() / 1000 + ""));
} else {
//版本錯誤
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED,
StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果不是此handler處理的命令,那麼流下去
ctx.sendUpstream(e);
}
}
}
最後測試一下
public class Server {
public static void main(String[] args) throws DocumentException {
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ServerPipelineFactory());
//因為我要用到長連接
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
ServerConfig config = ServerConfig.getInstance();
bootstrap.bind(new InetSocketAddress(Integer.valueOf(config.port())));
}
}
總結:在整個伺服器編碼中,剛開始會遇到"遠程主機強迫關閉了一個現有的連接。"等這類錯誤,最後修改成"相互告知對方我要關閉了"再進行關閉就可以了.
㈢ 如何用netty寫一個斷開tcp連接的方法
論一個TCP 連接是如何建立的以及通信結束後是如何終止的。
建立一個 TCP 連接
TCP使用三次握手 ( three-way handshake ) 協議來建立連接,圖 3-10 描述了三次握手的報文序列。這三次握手為:
請求端(通常稱為客戶)發送一個 SYN 報文段( SYN 為 1 )指明客戶打算連接的伺服器的埠,以及初始順序號( ISN )。
伺服器發回包含伺服器的初始順序號的 SYN 報文段( SYN 為 1 )作為應答。同時,將確認號設置為客戶的 ISN 加 1 以對客戶的 SYN 報文段進行確認( ACK 也為 1 )。
客戶必須將確認號設置為伺服器的 ISN 加 1 以對伺服器的 SYN 報文段進行確認( ACK 為 1 ),該報文通知目的主機雙方已完成連接建立。
㈣ netty寫的伺服器可以即接收socket請求又接收websocket請求嗎
接收只是一個介面,具體程序還是在後端處理的。 你只要各寫一個介面,然後把數據轉成相同形式,再調用你的處理代碼就可以了。
㈤ 用java的netty框架寫了一個udp服務端,怎麼測試它能承受的並發壓力
主從Reactor多線程Nio結構,主從Reactor線程模型的特點是:服務端用於接收客戶端連接的不再是個1個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求處理完成後(可能包含接入認證等),將新創建的SocketChannel注冊到IO線程池(sub reactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工作。Acceptor線程池僅僅只用於客戶端的登陸、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到後端subReactor線程池的IO線程上,由IO線程負責後續的IO操作。
利用主從NIO線程模型,可以解決1個服務端監聽線程無法有效處理所有客戶端連接的性能不足問題。
它的工作流程總結如下:
從主線程池中隨機選擇一個Reactor線程作為Acceptor線程,用於綁定監聽埠,接收客戶端連接;
Acceptor線程接收客戶端連接請求之後創建新的SocketChannel,將其注冊到主線程池的其它Reactor線程上,由其負責接入認證、IP黑白名單過濾、握手等操作;
步驟2完成之後,業務層的鏈路正式建立,將SocketChannel從主線程池的Reactor線程的多路復用器上摘除,重新注冊到Sub線程池的線程上,用於處理I/O的讀寫操作。
㈥ 如何實現Netty框架中伺服器端的消息推送
etty框架是用在伺服器端,客戶端是嵌入式編程,通過自定義的tcp通信協議進行連接的,現在需求是這樣的,我的伺服器端只是用來和客戶端進行通信,現在有第三方如微信端進行支付成功後在資料庫里生成了一條數據,表示要往某個客戶端發送指令,我嘗試了兩種方式:
1、微信端生成通訊指令後調用TCP端的介面(負責通訊程序和資料庫交互的),在介面程序中通過定義Socket連到通訊程序伺服器端,根據通道編號去發送,但是這種會導致伺服器端的tcp客戶端連接變得更多
2、直接在netty框架中定義了scheleAtF
㈦ 怎麼使用netty寫一個http長連接伺服器
netty本身實現的長連接,就是一個連接一個worker。worker的數量是有限的(通常是cpu cores+1),所以你的伺服器要是連接數多的話,得考慮使用「非同步」Request(netty的http沒實現這么個功能),或者說「Continuation」,當連接「無事可做」的時候,放棄線程的使用權,當要處理事務的時候,才重新拿到一個線程。
當然,如果你只想實現長連接而不在意request 一直佔有worker,那麼你只要不放棄連接就可以了(websocket本身也是一種長連接,netty裡面有websocket的例子)。
㈧ 用Netty作http靜態資源伺服器,類似Nginx這樣的,大一點的文件響應不正常怎麼回事
為什麼Nginx的性能要比Apache高得多?
????這得益於Nginx使用了最新的epoll(Linux?2.6內核)和kqueue(freebsd)網路I/O模型,而Apache則使用的是傳統的select模型。目前Linux下能夠承受高並發訪問的Squid、Memcached都採用的是epoll網路I/O模型。
處理大量的連接的讀寫,Apache所採用的select網路I/O模型非常低效。下面用一個比喻來解析Apache採用的select模型和Nginx採用的epoll模型進行之間的區別:
假設你在大學讀書,住的宿舍樓有很多間房間,你的朋友要來找你。select版宿管大媽就會帶著你的朋友挨個房間去找,直到找到你為止。而epoll版宿管大媽會先記下每位同學的房間號,你的朋友來時,只需告訴你的朋友你住在哪個房間即可,不用親自帶著你的朋友滿大樓找人。如果來了10000個人,都要找自己住這棟樓的同學時,select版和epoll版宿管大媽,誰的效率更高,不言自明。同理,在高並發伺服器中,輪詢I/O是最耗時間的操作之一,select和epoll的性能誰的性能更高,同樣十分明了。為什麼會出現502錯誤呢?
nginx出現502有很多原因,但大部分原因可以歸結為資源數量不夠用,也就是說後端php-fpm處理有問題,nginx將正確的客戶端請求發給了後端的php-fpm進程,但是因為php-fpm進程的問題導致不能正確解析php代碼,最終返回給了客戶端502錯誤。優化php-fpm,優化代碼,加大內存才是解決502的根源。10000並發的話,nginx的表現怎麼樣?
2009年9月3日下午2:30,金山游戲《劍俠情緣網路版叄》臨時維護1小時,大量玩家上官網,論壇、評論、客服等動態應用Nginx伺服器集群,每台伺服器的Nginx活動連接數達到2.8萬。