怎么写netty服务器
㈠ 如何实现Netty框架中服务器端的消息推送
netty框架是用在服务器端,客户端是嵌入式编程,通过自定义的tcp通信协议进行连接的,现在需求是这样的,服务器端只是用来和客户端进行通信,现在有第三方如微信端进行支付成功后在数据库里生成了一条数据,表示要往某个客户端发送指令,以下两种方式可供参考:1、微信端生成通讯指令后调用TCP端的接口(负责通讯程序和数据库交互的),在接口程序中通过定义Socket连到通讯程序服务器端,根据通道编号去发送,但是这种会导致服务器端的tcp客户端连接变得更多。
2、直接在netty框架中定义了scheleAtF。
当然也可借助第三方工具来完成推送。例如极光推送,极光推送具有以下功能:
1、多种消息类型
开发者可以轻松地通过极光发送各个移动平台的系统通知,还可以在控制台编辑多种富文本展示模板; 极光还提供自定义消息的透传,客户端接到消息内容后根据自己的逻辑自由处理。
2、用户和推送统计
完整的消息生命周期查询,并且可以形成“推送报表”与“用户统计报表”呈现给开发者,用来观察推送的效果和应用发展趋势。
3、短信补充
通过极光后台推送APP通知消息,对于一些重要又不能遗漏的信息可以调用极光短信的后台对未收到的客户端发送短信通知,保证消息的可靠性。
4、A/B 测试
合理的推送能够激活用户,提高用户粘性,使用A/B分组测试的科学方法,根据测试反馈的结果,帮助开发者选择最优化的推送方案。
5、极光推送安全包
为金融、新闻、政务及其他对推送安全要求极高的客户提供安全严谨、稳定可靠的信息推送解决方案
6、可定制的私有云
对于安全性要求更高,希望推送数据和系统存储在自己服务器的客户,及个性化需求需要定制开发的,性能更高要求的,或者想拥有自己推送平台的甚至要求源码授权二次开发的开发者,极光提供全功能的私有云解决方案。
深圳市和讯华谷信息技术有限公司(极光 Aurora Mobile,纳斯达克股票代码:JG)成立于2011年,是中国领先的开发者服务提供商,专注于为开发者提供稳定高效的消息推送、一键认证以及流量变现等服务,助力开发者的运营、增长与变现。同时,极光的行业应用已经拓展至市场洞察、金融风控与商业地理服务,助力各行各业优化决策、提升效率。
㈡ 刚学习netty和java,不会用,但现在急需,您有没有用netty编写的服务器和客户端的例子,要能互相发送包
(一)handler处理篇
首先,是handler,初次接触netty的朋友要注意,handler不是一个单例.即每个channel下都会有自己的一个handler实例.
public class ServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
ServerHandler.class.getName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
logger.log(Level.INFO, "Channel state changed: {0}", e);
}
super.handleUpstream(ctx, e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
System.out.println(this);
String request = (String) e.getMessage();
//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
//服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
if (request.toLowerCase().equals("bye")) {
ChannelFuture future = e.getChannel().write("bye\r\n");
future.addListener(ChannelFutureListener.CLOSE);
} else {
//以下是我初步解析客户端发送过来的数据,然后决定处理方式
RecevieData receivedaData = MessageDecoder.decode(request);
if (null != receivedaData) {
//服务器第5版
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//然后判断命令是否存在
for (String s : CommandCode.COMMANDS) {
if (s.equals(receivedaData.getActionType())) {
COMMAND_FLAG.set(true);
if (s.equals(CommandCode.KEEP_ALIVE)) {
serverChannelGroup.addChannel(e.getChannel());
}
break;
} else {
COMMAND_FLAG.set(false);
}
}
if (COMMAND_FLAG.get()) {
COMMAND_FLAG.set(false);
//将这个命令传递给下一个handler来处理.
//这里的"下一个handler"即为用户自己定义的处理handler
ctx.sendUpstream(e);
} else {
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
}
} else {
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果格式错误,那么直接返回
e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
}
}
在上面这个handler中,我使用了ctx.sendUpstream(e);来处理,个人觉得此处为了要实现执行运行时代码,也可以使用接口等方式.但既然netty提供了sendUpstream 的方法,我们用这个岂不是更方便^_^
下面是使用SSL连接的handler
public class ServerSSLHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
ServerSSLHandler.class.getName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
logger.log(Level.INFO, "Channel state changed: {0}", e);
}
super.handleUpstream(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
//ssl握手
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
sslHandler.handshake();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
System.out.println(this);
String request = (String) e.getMessage();
//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
//服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
if (request.toLowerCase().equals("bye")) {
ChannelFuture future = e.getChannel().write("bye\r\n");
future.addListener(ChannelFutureListener.CLOSE);
} else {
//以下是我初步解析客户端发送过来的数据,然后决定处理方式
RecevieData receivedaData = MessageDecoder.decode(request);
if (null != receivedaData) {
//服务器第5版
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//然后判断命令是否存在
for (String s : CommandCode.COMMANDS) {
if (s.equals(receivedaData.getActionType())) {
COMMAND_FLAG.set(true);
if (s.equals(CommandCode.KEEP_ALIVE)) {
serverChannelGroup.addChannel(e.getChannel());
}
break;
} else {
COMMAND_FLAG.set(false);
}
}
if (COMMAND_FLAG.get()) {
COMMAND_FLAG.set(false);
//将这个命令传递给下一个handler来处理.
//这里的"下一个handler"即为用户自己定义的处理handler
ctx.sendUpstream(e);
} else {
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
}
} else {
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果格式错误,那么直接返回
e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
}
}
当我们有了2个handler后,当然就是要把他们添加到我们的Pipeline中
public class ServerPipelineFactory implements
ChannelPipelineFactory {
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = pipeline();
ServerConfig config = ServerConfig.getInstance();
try {
if (config.ssl()) {
SSLEngine engine =
SecureSslContextFactory.getServerContext().createSSLEngine();
//说明是服务器端SslContext
engine.setUseClientMode(false);
pipeline.addLast("ssl", new SslHandler(engine));
}
//此Decoder可以自动解析一句以\r\n结束的命令,我为了方便,也用了这个Decoder
//使用这个Decoder,我不用刻意发送命令长度用于解析,只要没有收到\r\n说明数据还
//没有发送完毕.这个Decoder会等到收到\r\n后调用下个handler
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
//字串解码,可以自己设置charset
pipeline.addLast("decoder", new StringDecoder());
//字串编码,可以自己设置charset
pipeline.addLast("encoder", new StringEncoder());
if (config.ssl()) {
//如果开启了SSL,那么使用sslhandler
pipeline.addLast("sslhandler", new ServerSSLHandler());
} else {
//如果没有开启SSL,那么使用普通handler
pipeline.addLast("handler", new ServerHandler());
}
//遍历配置文件中的服务器handler,将其添加进Pipeline链中
for (Element e : config.handler()) {
pipeline.addLast(e.attribute(e.getQName("id")).getValue().trim(),
(ChannelHandler) Class.forName(e.attribute(e.getQName("class")).getValue().trim()).newInstance());
}
} catch (DocumentException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (InstantiationException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (IllegalAccessException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (ClassNotFoundException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
}
return pipeline;
}
}
server.xml,放到lib下即可,注意其中的handler 以及clienthandler 项,如果你新建了自己的handler,那么需要在此xml中配置一下.
<?xml version="1.0" encoding="UTF-8"?>
<root>
<!-- 配置主机地址 -->
<host>127.0.0.1</host>
<!-- 配置服务端口 -->
<port>8080</port>
<!-- 是否启用ssl,1为启用,0为停用 -->
<ssl>0</ssl>
<!--服务器业务handler -->
<handler id="timeHandler" class="com.chinatenet.nio.server.handler.ServerTimeHandler" />
<!--客户端业务handler -->
<clienthandler id="timeHandler" class="com.chinatenet.nio.client.handler.ClientTimeHandler" />
</root>
到此,一个简单的可扩展handler的服务器雏形就出来了
下面,我们添加一个自定义的服务器处理handler进来
public class ServerTimeHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
RecevieData receivedaData = MessageDecoder.decode((String) e.getMessage());
if (CommandCode.GET_TIME.equals(receivedaData.getActionType())
|| CommandCode.KEEP_ALIVE.equals(receivedaData.getActionType())) {
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//回复客户端后,即可进行自己的业务.当然.这里可以根据需要,看
//是先回复再处理还是等处理结果出来后,将结果返回客户端
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.OK,
System.currentTimeMillis() / 1000 + ""));
} else {
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED,
StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果不是此handler处理的命令,那么流下去
ctx.sendUpstream(e);
}
}
}
最后测试一下
public class Server {
public static void main(String[] args) throws DocumentException {
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ServerPipelineFactory());
//因为我要用到长连接
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
ServerConfig config = ServerConfig.getInstance();
bootstrap.bind(new InetSocketAddress(Integer.valueOf(config.port())));
}
}
总结:在整个服务器编码中,刚开始会遇到"远程主机强迫关闭了一个现有的连接。"等这类错误,最后修改成"相互告知对方我要关闭了"再进行关闭就可以了.
㈢ 如何用netty写一个断开tcp连接的方法
论一个TCP 连接是如何建立的以及通信结束后是如何终止的。
建立一个 TCP 连接
TCP使用三次握手 ( three-way handshake ) 协议来建立连接,图 3-10 描述了三次握手的报文序列。这三次握手为:
请求端(通常称为客户)发送一个 SYN 报文段( SYN 为 1 )指明客户打算连接的服务器的端口,以及初始顺序号( ISN )。
服务器发回包含服务器的初始顺序号的 SYN 报文段( SYN 为 1 )作为应答。同时,将确认号设置为客户的 ISN 加 1 以对客户的 SYN 报文段进行确认( ACK 也为 1 )。
客户必须将确认号设置为服务器的 ISN 加 1 以对服务器的 SYN 报文段进行确认( ACK 为 1 ),该报文通知目的主机双方已完成连接建立。
㈣ netty写的服务器可以即接收socket请求又接收websocket请求吗
接收只是一个接口,具体程序还是在后端处理的。 你只要各写一个接口,然后把数据转成相同形式,再调用你的处理代码就可以了。
㈤ 用java的netty框架写了一个udp服务端,怎么测试它能承受的并发压力
主从Reactor多线程Nio结构,主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(sub reactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操作。
利用主从NIO线程模型,可以解决1个服务端监听线程无法有效处理所有客户端连接的性能不足问题。
它的工作流程总结如下:
从主线程池中随机选择一个Reactor线程作为Acceptor线程,用于绑定监听端口,接收客户端连接;
Acceptor线程接收客户端连接请求之后创建新的SocketChannel,将其注册到主线程池的其它Reactor线程上,由其负责接入认证、IP黑白名单过滤、握手等操作;
步骤2完成之后,业务层的链路正式建立,将SocketChannel从主线程池的Reactor线程的多路复用器上摘除,重新注册到Sub线程池的线程上,用于处理I/O的读写操作。
㈥ 如何实现Netty框架中服务器端的消息推送
etty框架是用在服务器端,客户端是嵌入式编程,通过自定义的tcp通信协议进行连接的,现在需求是这样的,我的服务器端只是用来和客户端进行通信,现在有第三方如微信端进行支付成功后在数据库里生成了一条数据,表示要往某个客户端发送指令,我尝试了两种方式:
1、微信端生成通讯指令后调用TCP端的接口(负责通讯程序和数据库交互的),在接口程序中通过定义Socket连到通讯程序服务器端,根据通道编号去发送,但是这种会导致服务器端的tcp客户端连接变得更多
2、直接在netty框架中定义了scheleAtF
㈦ 怎么使用netty写一个http长连接服务器
netty本身实现的长连接,就是一个连接一个worker。worker的数量是有限的(通常是cpu cores+1),所以你的服务器要是连接数多的话,得考虑使用“异步”Request(netty的http没实现这么个功能),或者说“Continuation”,当连接“无事可做”的时候,放弃线程的使用权,当要处理事务的时候,才重新拿到一个线程。
当然,如果你只想实现长连接而不在意request 一直占有worker,那么你只要不放弃连接就可以了(websocket本身也是一种长连接,netty里面有websocket的例子)。
㈧ 用Netty作http静态资源服务器,类似Nginx这样的,大一点的文件响应不正常怎么回事
为什么Nginx的性能要比Apache高得多?
????这得益于Nginx使用了最新的epoll(Linux?2.6内核)和kqueue(freebsd)网络I/O模型,而Apache则使用的是传统的select模型。目前Linux下能够承受高并发访问的Squid、Memcached都采用的是epoll网络I/O模型。
处理大量的连接的读写,Apache所采用的select网络I/O模型非常低效。下面用一个比喻来解析Apache采用的select模型和Nginx采用的epoll模型进行之间的区别:
假设你在大学读书,住的宿舍楼有很多间房间,你的朋友要来找你。select版宿管大妈就会带着你的朋友挨个房间去找,直到找到你为止。而epoll版宿管大妈会先记下每位同学的房间号,你的朋友来时,只需告诉你的朋友你住在哪个房间即可,不用亲自带着你的朋友满大楼找人。如果来了10000个人,都要找自己住这栋楼的同学时,select版和epoll版宿管大妈,谁的效率更高,不言自明。同理,在高并发服务器中,轮询I/O是最耗时间的操作之一,select和epoll的性能谁的性能更高,同样十分明了。为什么会出现502错误呢?
nginx出现502有很多原因,但大部分原因可以归结为资源数量不够用,也就是说后端php-fpm处理有问题,nginx将正确的客户端请求发给了后端的php-fpm进程,但是因为php-fpm进程的问题导致不能正确解析php代码,最终返回给了客户端502错误。优化php-fpm,优化代码,加大内存才是解决502的根源。10000并发的话,nginx的表现怎么样?
2009年9月3日下午2:30,金山游戏《剑侠情缘网络版叁》临时维护1小时,大量玩家上官网,论坛、评论、客服等动态应用Nginx服务器集群,每台服务器的Nginx活动连接数达到2.8万。