为什么netty只能netty 接收16进制数据2

> 博客详情
摘要: Handler在netty中,无疑占据着非常重要的地位。Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。一句话,没有它做不到的只有你想不到的。
参考自:http://blog.csdn.net/u/article/details/
Handler在netty中,无疑占据着非常重要的地位。Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。一句话,没有它做不到的只有你想不到的。
Netty中的所有handler都实现自ChannelHandler接口。按照输出输出来分,分为ChannelInboundHandler、ChannelOutboundHandler两大类。ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。
Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:
下面例子涉及的类包括:
一、HelloServer:
package com.yao.
import io.netty.bootstrap.ServerB
import io.netty.channel.ChannelF
import io.netty.channel.ChannelI
import io.netty.channel.ChannelO
import io.netty.channel.EventLoopG
import io.netty.channel.nio.NioEventLoopG
import io.netty.channel.socket.SocketC
import io.netty.channel.socket.nio.NioServerSocketC
public class HelloServer {
public void start(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer&SocketChannel&() {
public void initChannel(SocketChannel ch) throws Exception {
// 注册两个OutboundHandler,执行顺序为注册顺序的逆序,所以应该是OutboundHandler2 OutboundHandler1
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());
// 注册两个InboundHandler,执行顺序为注册顺序,所以应该是InboundHandler1 InboundHandler2
ch.pipeline().addLast(new InboundHandler1());
ch.pipeline().addLast(new InboundHandler2());
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
public static void main(String[] args) throws Exception {
HelloServer server = new HelloServer();
server.start(8000);
} 二、InboundHandler1:
package com.yao.
import io.netty.channel.ChannelHandlerC
import io.netty.channel.ChannelInboundHandlerA
import org.apache.commons.logging.L
import org.apache.commons.logging.LogF
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
private static Log logger = LogFactory.getLog(InboundHandler1.class);
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("InboundHandler1.channelRead: ctx :" + ctx);
// 通知执行下一个InboundHandler
//ctx.fireChannelRead(msg);
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
logger.info("InboundHandler1.channelReadComplete");
ctx.flush();
} 三、InboundHandler2:
package com.yao.
import io.netty.buffer.ByteB
import io.netty.channel.ChannelHandlerC
import io.netty.channel.ChannelInboundHandlerA
import org.apache.commons.logging.L
import org.apache.commons.logging.LogF
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
private static Log logger = LogFactory.getLog(InboundHandler2.class);
// 读取Client发送的信息,并打印出来
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("InboundHandler2.channelRead: ctx :" + ctx);
ByteBuf result = (ByteBuf)
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
String resultStr = new String(result1);
System.out.println("Client said:" + resultStr);
result.release();
ctx.write(msg);
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
logger.info("InboundHandler2.channelReadComplete");
ctx.flush();
} 四、OutboundHandler1 :
package com.yao.
import io.netty.buffer.ByteB
import io.netty.channel.ChannelHandlerC
import io.netty.channel.ChannelOutboundHandlerA
import io.netty.channel.ChannelP
import org.apache.commons.logging.L
import org.apache.commons.logging.LogF
public class OutboundHandler1 extends ChannelOutboundHandlerAdapter {
private static Log logger = LogFactory.getLog(OutboundHandler1.class);
// 向client发送消息
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
logger.info("OutboundHandler1.write");
String response = "I am ok!";
ByteBuf encoded = ctx.alloc().buffer(4 * response.length());
encoded.writeBytes(response.getBytes());
ctx.write(encoded);
ctx.flush();
五、OutboundHandler2:
package com.yao.
import io.netty.channel.ChannelHandlerC
import io.netty.channel.ChannelOutboundHandlerA
import io.netty.channel.ChannelP
import org.apache.commons.logging.L
import org.apache.commons.logging.LogF
public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {
private static Log logger = LogFactory.getLog(OutboundHandler2.class);
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
logger.info("OutboundHandler2.write");
// 执行下一个OutboundHandler
super.write(ctx, msg, promise);
下面是客户端
六、HelloClient:
package com.yao.
import io.netty.bootstrap.B
import io.netty.channel.ChannelF
import io.netty.channel.ChannelI
import io.netty.channel.ChannelO
import io.netty.channel.EventLoopG
import io.netty.channel.nio.NioEventLoopG
import io.netty.channel.socket.SocketC
import io.netty.channel.socket.nio.NioSocketC
public class HelloClient {
public void connect(String host, int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer&SocketChannel&() {
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HelloClientIntHandler());
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
public static void main(String[] args) throws Exception {
HelloClient client = new HelloClient();
client.connect("127.0.0.1", 8000);
七、HelloClientIntHandler:
package com.yao.
import io.netty.buffer.ByteB
import io.netty.channel.ChannelHandlerC
import io.netty.channel.ChannelInboundHandlerA
import org.apache.commons.logging.L
import org.apache.commons.logging.LogF
public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {
private static Log logger = LogFactory.getLog(HelloClientIntHandler.class);
// 读取服务端的信息
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("HelloClientIntHandler.channelRead");
ByteBuf result = (ByteBuf)
byte[] result1 = new byte[result.readableBytes()];
result.readBytes(result1);
result.release();
ctx.close();
System.out.println("Server said:" + new String(result1));
// 当连接建立的时候向服务端发送消息 ,channelActive 事件当连接建立的时候会触发
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("HelloClientIntHandler.channelActive");
String msg = "Are you ok?";
ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
encoded.writeBytes(msg.getBytes());
ctx.write(encoded);
ctx.flush();
} 八、总结:
在使用Handler的过程中,需要注意: 1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler。 2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。 3、ChannelOutboundHandler 在注册的时候需要放在最后一个ChannelInboundHandler之前,否则将无法传递到ChannelOutboundHandler。 4、Handler的消费处理放在最后一个处理。
这位兄台说的让我更加茅塞顿开,惊天地泣鬼神,正瞌睡也不瞌睡,一口气能敲五百航代码了,也不喘气。
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥通过一个实例来说明Netty的使用。用1个服务器连接5个客户端线程,客户端连接上服务器以后就向服务器发送消息,服务器接收到消息后向客户端返回消息,客户端接收到消息以后,等待随机的时间,再向服务端发送消息,这样一直循环下去。
项目结构:
NettyServer.java:
import java.net.InetSocketA
import java.util.concurrent.E
import org.jboss.netty.bootstrap.ServerB
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelF
import org.jboss.netty.handler.execution.ExecutionH
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolE
import Util.C
public class NettyServer {
public static String host = "127.0.0.1";
// 创建1个线程池
static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 48576));
public static void main(String[] args) {
// ChannelFactory
final ChannelFactory channelFactory = new NioServerSocketChannelFactory(
// Boss线程池,处理Socket请求
Executors.newCachedThreadPool(),
// Worker线程池,由于使用的是NIO,1个Worker线程可以管理多个Channel
Executors.newCachedThreadPool());
// ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
ServerPipelineFactory serverPipelineFactory = new ServerPipelineFactory(executionHandler);
bootstrap.setPipelineFactory(serverPipelineFactory);
// 禁用nagle算法
bootstrap.setOption("child.tcpNoDelay", true);
// 启用TCP保活检测
bootstrap.setOption("child.keepAlive", true);
// 监听5个端口
bootstrap.bind(new InetSocketAddress(Constant.p1));
System.out.println("Listening port " + Constant.p1 + "...");
bootstrap.bind(new InetSocketAddress(Constant.p2));
System.out.println("Listening port " + Constant.p2 + "...");
bootstrap.bind(new InetSocketAddress(Constant.p3));
System.out.println("Listening port " + Constant.p3 + "...");
bootstrap.bind(new InetSocketAddress(Constant.p4));
System.out.println("Listening port " + Constant.p4 + "...");
bootstrap.bind(new InetSocketAddress(Constant.p5));
System.out.println("Listening port " + Constant.p5 + "...");
ServerPipelineFactory.java:
import org.jboss.netty.channel.ChannelP
import org.jboss.netty.channel.ChannelPipelineF
import org.jboss.netty.channel.C
import org.jboss.netty.handler.codec.string.StringD
import org.jboss.netty.handler.codec.string.StringE
import org.jboss.netty.handler.execution.ExecutionH
public class ServerPipelineFactory implements ChannelPipelineFactory {
private final ExecutionHandler executionH
public ServerPipelineFactory(ExecutionHandler executionHandler){
this.executionHandler = executionH
public ChannelPipeline getPipeline() throws Exception {
// TODO Auto-generated method stub
return Channels.pipeline(
new StringEncoder(),
new StringDecoder(),
// 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前
executionHandler,
// 业务逻辑handler
new MyServerHandler());
MyServerHandler.java:
import org.jboss.netty.channel.C
import org.jboss.netty.channel.ChannelHandlerC
import org.jboss.netty.channel.ChannelStateE
import org.jboss.netty.channel.ExceptionE
import org.jboss.netty.channel.MessageE
import org.jboss.netty.channel.SimpleChannelH
import Util.T
public class MyServerHandler extends SimpleChannelHandler{
@SuppressWarnings("static-access")
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("Server received:" + e.getMessage());
// 休息随机秒后发送消息
Thread th = Thread.currentThread();
int interval = Tool.getInterval(100);
th.sleep(interval*1000);
e.getChannel().write("from Server: Hello!");
super.messageReceived(ctx, e);
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
super.exceptionCaught(ctx, e);
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("A client connected!");
super.channelConnected(ctx, e);
NettyClient.java:
import java.net.InetSocketA
import java.util.concurrent.E
import org.jboss.netty.bootstrap.ClientB
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelF
import org.jboss.netty.handler.execution.ExecutionH
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolE
import Util.C
public class NettyClient extends Thread{
public static String host = "127.0.0.1";
// 创建1个线程池
static ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 48576));
public NettyClient(int port) {
this.port =
// ChannelFactory
final ChannelFactory channelFactory = new NioClientSocketChannelFactory(
// Boss线程池
Executors.newCachedThreadPool(),
// Worker线程池
Executors.newCachedThreadPool());
// ServerBootstrap
bootstrap = new ClientBootstrap(channelFactory);
ClientPipelineFactory clientPipelineFactory = new ClientPipelineFactory(executionHandler);
bootstrap.setPipelineFactory(clientPipelineFactory);
bootstrap.setOption("tcpNoDelay" ,true);
bootstrap.setOption("keepAlive", true);
bootstrap.connect(new InetSocketAddress(port));
public void run(){
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// 开始试图连接
System.out.println("Connecting to port " + port + "...");
// 等待直到连接关闭或失败
future.getChannel().getCloseFuture().awaitUninterruptibly();
// 关闭线程池准备退出
bootstrap.releaseExternalResources();
public static void main(String[] args) {
NettyClient nc1 = new NettyClient(Constant.p1);
NettyClient nc2 = new NettyClient(Constant.p2);
NettyClient nc3 = new NettyClient(Constant.p3);
NettyClient nc4 = new NettyClient(Constant.p4);
NettyClient nc5 = new NettyClient(Constant.p5);
nc1.start();
nc2.start();
nc3.start();
nc4.start();
nc5.start();
ClientPipelineFactory.java:
import org.jboss.netty.channel.ChannelP
import org.jboss.netty.channel.ChannelPipelineF
import org.jboss.netty.channel.C
import org.jboss.netty.handler.codec.string.StringD
import org.jboss.netty.handler.codec.string.StringE
import org.jboss.netty.handler.execution.ExecutionH
public class ClientPipelineFactory implements ChannelPipelineFactory {
private final ExecutionHandler executionH
public ClientPipelineFactory(ExecutionHandler executionHandler){
this.executionHandler = executionH
public ChannelPipeline getPipeline() throws Exception {
// TODO Auto-generated method stub
return Channels.pipeline(
new StringEncoder(),
new StringDecoder(),
// 多个pipeline之间必须共享同一个ExecutionHandler,放在业务逻辑handler之前
executionHandler,
// 业务逻辑handler
new MyClientHandler());
MyClientHandler.java:
import org.jboss.netty.channel.C
import org.jboss.netty.channel.ChannelHandlerC
import org.jboss.netty.channel.ChannelStateE
import org.jboss.netty.channel.ExceptionE
import org.jboss.netty.channel.MessageE
import org.jboss.netty.channel.SimpleChannelH
import org.jboss.netty.channel.SimpleChannelUpstreamH
import Util.T
public class MyClientHandler extends SimpleChannelHandler{
// 连接到服务端时,发出消息
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("Connected to Server!");
e.getChannel().write("from Client: Hello! " + System.currentTimeMillis());
super.channelConnected(ctx, e);
@SuppressWarnings("static-access")
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("Client Received:" + e.getMessage());
// 休息随机秒后发送消息
Thread th = Thread.currentThread();
int interval = Tool.getInterval(5);
th.sleep(interval*1000);
e.getChannel().write("from Client: Hello! "
+ System.currentTimeMillis());
super.messageReceived(ctx, e);
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
super.exceptionCaught(ctx, e);
Constant.java:
public class Constant {
final static int start = 10000;
public static int p1 = start + 1;
public static int p2 = start + 2;
public static int p3 = start + 3;
public static int p4 = start + 4;
public static int p5 = start + 5;
Tool.java:
import java.util.R
public class Tool {
static Random rand = new Random();
public static int getInterval(int max){
return rand.nextInt(max);
阅读(...) 评论()& 1package org.jboss.netty.example.xlsvr.& 2& 3import java.util.HashM& 4import java.util.M& 5& 6/** *//**& 7 *& &#64;author hankchen<span style="color: # *&
下午02:46:52<span style="color: #&*/<span style="color: #<span style="color: #<span style="color: #/** *//**<span style="color: # * 响应数据<span style="color: #&*/<span style="color: #<span style="color: #/** *//**<span style="color: # * 通用协议介绍<span style="color: # * <span style="color: # * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后<span style="color: # * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:<span style="color: # * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)<span style="color: # * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map&String,String&<span style="color: # * 数据格式定义:<span style="color: # * 字段1键名长度&&& 字段1键名 字段1值长度&&& 字段1值<span style="color: # * 字段2键名长度&&& 字段2键名 字段2值长度&&& 字段2值<span style="color: # * 字段3键名长度&&& 字段3键名 字段3值长度&&& 字段3值<span style="color: # * &#8230;&&& &#8230;&&& &#8230;&&& &#8230;<span style="color: # * 长度为整型,占4个字节<span style="color: #&*/<span style="color: #public&class XLResponse {<span style="color: #&&& private&byte// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1<span style="color: #&&& private&byte// 加密类型。0表示不加密<span style="color: #&&& private&byte extend1;// 用于扩展协议。暂未定义任何值<span style="color: #&&& private&byte extend2;// 用于扩展协议。暂未定义任何值<span style="color: #&&& private&int// 会话ID<span style="color: #&&& private&int// 结果码<span style="color: #&&& private&int// 数据包长<span style="color: #&&& <span style="color: #&&& private Map&String,String& values=new HashMap&String, String&();<span style="color: #&&& <span style="color: #&&& private S<span style="color: #&&& <span style="color: #&&& public&void setValue(String key,String value){<span style="color: #&&&&&&& values.put(key, value);<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& public String getValue(String key){<span style="color: #&&&&&&& if (key==null) {<span style="color: #&&&&&&&&&&& return&null;<span style="color: #&&&&&&& }<span style="color: #&&&&&&& return values.get(key);<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&byte getEncode() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setEncode(byte encode) {<span style="color: #&&&&&&& this.encode =<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&byte getEncrypt() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setEncrypt(byte encrypt) {<span style="color: #&&&&&&& this.encrypt =<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&byte getExtend1() {<span style="color: #&&&&&&& return extend1;<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setExtend1(byte extend1) {<span style="color: #&&&&&&& this.extend1 = extend1;<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&byte getExtend2() {<span style="color: #&&&&&&& return extend2;<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setExtend2(byte extend2) {<span style="color: #&&&&&&& this.extend2 = extend2;<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&int getSessionid() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setSessionid(int sessionid) {<span style="color: #&&&&&&& this.sessionid =<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&int getResult() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #0&&& public&void setResult(int result) {<span style="color: #1&&&&&&& this.result =<span style="color: #2&&& }<span style="color: #3<span style="color: #4&&& public&int getLength() {<span style="color: #5&&&&&&& return<span style="color: #6&&& }<span style="color: #7<span style="color: #8&&& public&void setLength(int length) {<span style="color: #9&&&&&&& this.length =<span style="color: #0&&& }<span style="color: #1<span style="color: #2&&& public Map&String, String& getValues() {<span style="color: #3&&&&&&& return<span style="color: #4&&& }<span style="color: #5<span style="color: #6&&& public String getIp() {<span style="color: #7&&&&&&& return<span style="color: #8&&& }<span style="color: #9<span style="color: #0&&& public&void setIp(String ip) {<span style="color: #1&&&&&&& this.ip =<span style="color: #2&&& }<span style="color: #3<span style="color: #4&&& public&void setValues(Map&String, String& values) {<span style="color: #5&&&&&&& this.values =<span style="color: #6&&& }<span style="color: #7<span style="color: #8&&& &#64;Override<span style="color: #9&&& public String toString() {<span style="color: #0&&&&&&& return&"XLResponse [encode="&+ encode +&", encrypt="&+ encrypt +&", extend1="&+ extend1 +&", extend2="&+ extend2<span style="color: #1&&&&&&&&&&&&&&& +&", sessionid="&+ sessionid +&", result="&+ result +&", length="&+ length +&", values="&+ values +&", ip="&+ ip +&"]";<span style="color: #2&&& }<span style="color: #3} & & 1package org.jboss.netty.example.xlsvr.& 2& 3import java.util.HashM& 4import java.util.M& 5& 6/** *//**& 7 *& &#64;author hankchen& 8 *&
下午02:46:41& 9&*/<span style="color: #<span style="color: #/** *//**<span style="color: # * 请求数据<span style="color: #&*/<span style="color: #<span style="color: #/** *//**<span style="color: # * 通用协议介绍<span style="color: # * <span style="color: # * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后<span style="color: # * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:<span style="color: # * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)<span style="color: # * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map&String,String&<span style="color: # * 数据格式定义:<span style="color: # * 字段1键名长度&&& 字段1键名 字段1值长度&&& 字段1值<span style="color: # * 字段2键名长度&&& 字段2键名 字段2值长度&&& 字段2值<span style="color: # * 字段3键名长度&&& 字段3键名 字段3值长度&&& 字段3值<span style="color: # * &#8230;&&& &#8230;&&& &#8230;&&& &#8230;<span style="color: # * 长度为整型,占4个字节<span style="color: #&*/<span style="color: #public&class XLRequest {<span style="color: #&&& private&byte// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1<span style="color: #&&& private&byte// 加密类型。0表示不加密<span style="color: #&&& private&byte extend1;// 用于扩展协议。暂未定义任何值<span style="color: #&&& private&byte extend2;// 用于扩展协议。暂未定义任何值<span style="color: #&&& private&int// 会话ID<span style="color: #&&& private&int// 命令<span style="color: #&&& private&int// 数据包长<span style="color: #&&& <span style="color: #&&& private Map&String,String& params=new HashMap&String, String&(); //参数<span style="color: #&&& <span style="color: #&&& private S<span style="color: #<span style="color: #&&& public&byte getEncode() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setEncode(byte encode) {<span style="color: #&&&&&&& this.encode =<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&byte getEncrypt() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setEncrypt(byte encrypt) {<span style="color: #&&&&&&& this.encrypt =<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&byte getExtend1() {<span style="color: #&&&&&&& return extend1;<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setExtend1(byte extend1) {<span style="color: #&&&&&&& this.extend1 = extend1;<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&byte getExtend2() {<span style="color: #&&&&&&& return extend2;<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setExtend2(byte extend2) {<span style="color: #&&&&&&& this.extend2 = extend2;<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&int getSessionid() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setSessionid(int sessionid) {<span style="color: #&&&&&&& this.sessionid =<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&int getCommand() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setCommand(int command) {<span style="color: #&&&&&&& this.command =<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&int getLength() {<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #&&& public&void setLength(int length) {<span style="color: #&&&&&&& this.length =<span style="color: #&&& }<span style="color: #<span style="color: #&&& public Map&String, String& getParams() {<span style="color: #&&&&&&& return<span style="color: #0&&& }<span style="color: #1&&& <span style="color: #2&&& public&void setValue(String key,String value){<span style="color: #3&&&&&&& params.put(key, value);<span style="color: #4&&& }<span style="color: #5&&& <span style="color: #6&&& public String getValue(String key){<span style="color: #7&&&&&&& if (key==null) {<span style="color: #8&&&&&&&&&&& return&null;<span style="color: #9&&&&&&& }<span style="color: #0&&&&&&& return params.get(key);<span style="color: #1&&& }<span style="color: #2<span style="color: #3&&& public String getIp() {<span style="color: #4&&&&&&& return<span style="color: #5&&& }<span style="color: #6<span style="color: #7&&& public&void setIp(String ip) {<span style="color: #8&&&&&&& this.ip =<span style="color: #9&&& }<span style="color: #0<span style="color: #1&&& public&void setParams(Map&String, String& params) {<span style="color: #2&&&&&&& this.params =<span style="color: #3&&& }<span style="color: #4<span style="color: #5&&& &#64;Override<span style="color: #6&&& public String toString() {<span style="color: #7&&&&&&& return&"XLRequest [encode="&+ encode +&", encrypt="&+ encrypt +&", extend1="&+ extend1 +&", extend2="&+ extend2<span style="color: #8&&&&&&&&&&&&&&& +&", sessionid="&+ sessionid +&", command="&+ command +&", length="&+ length +&", params="&+ params +&", ip="&+ ip +&"]";<span style="color: #9&&& }<span style="color: #0}<span style="color: #1 二、协议的编码和解码
对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。
<span style="color: #package org.jboss.netty.example.xlsvr.<span style="color: #<span style="color: #import java.nio.ByteB<span style="color: #<span style="color: #import org.jboss.netty.buffer.ChannelB<span style="color: #import org.jboss.netty.buffer.ChannelB<span style="color: #import org.jboss.netty.channel.ChannelHandlerC<span style="color: #import org.jboss.netty.channel.C<span style="color: #import org.jboss.netty.channel.MessageE<span style="color: #import org.jboss.netty.channel.SimpleChannelDownstreamH<span style="color: #import org.jboss.netty.example.xlsvr.util.ProtocolU<span style="color: #import org.jboss.netty.example.xlsvr.vo.XLR<span style="color: #import org.slf4j.L<span style="color: #import org.slf4j.LoggerF<span style="color: #<span style="color: #/** *//**<span style="color: # *& &#64;author hankchen<span style="color: # *&
上午10:48:15<span style="color: #&*/<span style="color: #<span style="color: #/** *//**<span style="color: # * 服务器端编码器<span style="color: #&*/<span style="color: #public&class XLServerEncoder extends SimpleChannelDownstreamHandler {<span style="color: #&&& Logger logger=LoggerFactory.getLogger(XLServerEncoder.class);<span style="color: #&&& <span style="color: #&&& &#64;Override<span style="color: #&&& public&void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {<span style="color: #&&&&&&& XLResponse response=(XLResponse)e.getMessage();<span style="color: #&&&&&&& ByteBuffer headBuffer=ByteBuffer.allocate(<span style="color: #);<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 先组织报文头<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& headBuffer.put(response.getEncode());<span style="color: #&&&&&&& headBuffer.put(response.getEncrypt());<span style="color: #&&&&&&& headBuffer.put(response.getExtend1());<span style="color: #&&&&&&& headBuffer.put(response.getExtend2());<span style="color: #&&&&&&& headBuffer.putInt(response.getSessionid());<span style="color: #&&&&&&& headBuffer.putInt(response.getResult());<span style="color: #&&&&&&& <span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 组织报文的数据部分<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues()); <span style="color: #&&&&&&& int length=dataBuffer.readableBytes();<span style="color: #&&&&&&& headBuffer.putInt(length);<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 非常重要<span style="color: #&&&&&&&& * ByteBuffer需要手动flip(),ChannelBuffer不需要<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& headBuffer.flip();<span style="color: #&&&&&&& ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer();<span style="color: #&&&&&&& totalBuffer.writeBytes(headBuffer);<span style="color: #&&&&&&& logger.info("totalBuffer size="+totalBuffer.readableBytes());<span style="color: #&&&&&&& totalBuffer.writeBytes(dataBuffer);<span style="color: #&&&&&&& logger.info("totalBuffer size="+totalBuffer.readableBytes());<span style="color: #&&&&&&& Channels.write(ctx, e.getFuture(), totalBuffer);<span style="color: #&&& }<span style="color: #<span style="color: #}<span style="color: #
& <span style="color: #package org.jboss.netty.example.xlsvr.<span style="color: #<span style="color: #import org.jboss.netty.buffer.ChannelB<span style="color: #import org.jboss.netty.buffer.ChannelB<span style="color: #import org.jboss.netty.channel.C<span style="color: #import org.jboss.netty.channel.ChannelHandlerC<span style="color: #import org.jboss.netty.example.xlsvr.util.ProtocolU<span style="color: #import org.jboss.netty.example.xlsvr.vo.XLR<span style="color: #import org.jboss.netty.handler.codec.frame.FrameD<span style="color: #<span style="color: #/** *//**<span style="color: # *& &#64;author hankchen<span style="color: # *&
上午10:47:54<span style="color: #&*/<span style="color: #<span style="color: #/** *//**<span style="color: # * 客户端解码器<span style="color: #&*/<span style="color: #public&class XLClientDecoder extends FrameDecoder {<span style="color: #<span style="color: #&&& &#64;Override<span style="color: #&&& protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception {<span style="color: #&&&&&&& if (buffer.readableBytes()&<span style="color: #) {<span style="color: #&&&&&&&&&&& return&null;<span style="color: #&&&&&&& }<span style="color: #&&&&&&& buffer.markReaderIndex();<span style="color: #&&&&&&& byte encode=buffer.readByte();<span style="color: #&&&&&&& byte encrypt=buffer.readByte();<span style="color: #&&&&&&& byte extend1=buffer.readByte();<span style="color: #&&&&&&& byte extend2=buffer.readByte();<span style="color: #&&&&&&& int sessionid=buffer.readInt();<span style="color: #&&&&&&& int result=buffer.readInt();<span style="color: #&&&&&&& int length=buffer.readInt(); // 数据包长<span style="color: #&&&&&&& if (buffer.readableBytes()&length) {<span style="color: #&&&&&&&&&&& buffer.resetReaderIndex();<span style="color: #&&&&&&&&&&& return&null;<span style="color: #&&&&&&& }<span style="color: #&&&&&&& ChannelBuffer dataBuffer=ChannelBuffers.buffer(length);<span style="color: #&&&&&&& buffer.readBytes(dataBuffer, length);<span style="color: #&&&&&&& <span style="color: #&&&&&&& XLResponse response=new XLResponse();<span style="color: #&&&&&&& response.setEncode(encode);<span style="color: #&&&&&&& response.setEncrypt(encrypt);<span style="color: #&&&&&&& response.setExtend1(extend1);<span style="color: #&&&&&&& response.setExtend2(extend2);<span style="color: #&&&&&&& response.setSessionid(sessionid);<span style="color: #&&&&&&& response.setResult(result);<span style="color: #&&&&&&& response.setLength(length);<span style="color: #&&&&&&& response.setValues(ProtocolUtil.decode(encode, dataBuffer));<span style="color: #&&&&&&& response.setIp(ProtocolUtil.getClientIp(channel));<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #} & & 1package org.jboss.netty.example.xlsvr.& 2& 3import java.net.SocketA& 4import java.nio.charset.C& 5import java.util.HashM& 6import java.util.M& 7import java.util.Map.E& 8& 9import org.jboss.netty.buffer.ChannelB<span style="color: #import org.jboss.netty.buffer.ChannelB<span style="color: #import org.jboss.netty.channel.C<span style="color: #<span style="color: #/** *//**<span style="color: # *& &#64;author hankchen<span style="color: # *&
下午01:57:33<span style="color: #&*/<span style="color: #public&class ProtocolUtil {<span style="color: #&&& <span style="color: #&&& /** *//**<span style="color: #&&&& * 编码报文的数据部分<span style="color: #&&&& * &#64;param encode<span style="color: #&&&& * &#64;param values<span style="color: #&&&& * &#64;return<span style="color: #&&&& */<span style="color: #&&& public&static ChannelBuffer encode(int encode,Map&String,String& values){<span style="color: #&&&&&&& ChannelBuffer totalBuffer=null;<span style="color: #&&&&&&& if (values!=null&&& values.size()&<span style="color: #) {<span style="color: #&&&&&&&&&&& totalBuffer=ChannelBuffers.dynamicBuffer();<span style="color: #&&&&&&&&&&& int length=<span style="color: #,index=<span style="color: #;<span style="color: #&&&&&&&&&&& ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()];<span style="color: #&&&&&&&&&&& Charset charset=XLCharSetFactory.getCharset(encode);<span style="color: #&&&&&&&&&&& for(Entry&String,String& entry:values.entrySet()){<span style="color: #&&&&&&&&&&&&&&& String key=entry.getKey();<span style="color: #&&&&&&&&&&&&&&& String value=entry.getValue();<span style="color: #&&&&&&&&&&&&&&& ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();<span style="color: #&&&&&&&&&&&&&&& buffer.writeInt(key.length());<span style="color: #&&&&&&&&&&&&&&& buffer.writeBytes(key.getBytes(charset));<span style="color: #&&&&&&&&&&&&&&& buffer.writeInt(value.length());<span style="color: #&&&&&&&&&&&&&&& buffer.writeBytes(value.getBytes(charset));<span style="color: #&&&&&&&&&&&&&&& channelBuffers[index++]=<span style="color: #&&&&&&&&&&&&&&& length+=buffer.readableBytes();<span style="color: #&&&&&&&&&&& }<span style="color: #&&&&&&&&&&& <span style="color: #&&&&&&&&&&& for (int i =&<span style="color: #; i & channelBuffers. i++) {<span style="color: #&&&&&&&&&&&&&&& totalBuffer.writeBytes(channelBuffers[i]);<span style="color: #&&&&&&&&&&& }<span style="color: #&&&&&&& }<span style="color: #&&&&&&& return totalB<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& /** *//**<span style="color: #&&&& * 解码报文的数据部分<span style="color: #&&&& * &#64;param encode<span style="color: #&&&& * &#64;param dataBuffer<span style="color: #&&&& * &#64;return<span style="color: #&&&& */<span style="color: #&&& public&static Map&String,String& decode(int encode,ChannelBuffer dataBuffer){<span style="color: #&&&&&&& Map&String,String& dataMap=new HashMap&String, String&();<span style="color: #&&&&&&& if (dataBuffer!=null&&& dataBuffer.readableBytes()&<span style="color: #) {<span style="color: #&&&&&&&&&&& int processIndex=<span style="color: #,length=dataBuffer.readableBytes();<span style="color: #&&&&&&&&&&& Charset charset=XLCharSetFactory.getCharset(encode);<span style="color: #&&&&&&&&&&& while(processIndex&length){<span style="color: #&&&&&&&&&&&&&&& /** *//**<span style="color: #&&&&&&&&&&&&&&&& * 获取Key<span style="color: #&&&&&&&&&&&&&&&& */<span style="color: #&&&&&&&&&&&&&&& int size=dataBuffer.readInt();<span style="color: #&&&&&&&&&&&&&&& byte [] contents=new&byte [size];<span style="color: #&&&&&&&&&&&&&&& dataBuffer.readBytes(contents);<span style="color: #&&&&&&&&&&&&&&& String key=new String(contents, charset);<span style="color: #&&&&&&&&&&&&&&& processIndex=processIndex+size+<span style="color: #;<span style="color: #&&&&&&&&&&&&&&& /** *//**<span style="color: #&&&&&&&&&&&&&&&& * 获取Value<span style="color: #&&&&&&&&&&&&&&&& */<span style="color: #&&&&&&&&&&&&&&& size=dataBuffer.readInt();<span style="color: #&&&&&&&&&&&&&&& contents=new&byte [size];<span style="color: #&&&&&&&&&&&&&&& dataBuffer.readBytes(contents);<span style="color: #&&&&&&&&&&&&&&& String value=new String(contents, charset);<span style="color: #&&&&&&&&&&&&&&& dataMap.put(key, value);<span style="color: #&&&&&&&&&&&&&&& processIndex=processIndex+size+<span style="color: #;<span style="color: #&&&&&&&&&&& }<span style="color: #&&&&&&& }<span style="color: #&&&&&&& return dataM<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& /** *//**<span style="color: #&&&& * 获取客户端IP<span style="color: #&&&& * &#64;param channel<span style="color: #&&&& * &#64;return<span style="color: #&&&& */<span style="color: #&&& public&static String getClientIp(Channel channel){<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 获取客户端IP<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& SocketAddress address = channel.getRemoteAddress();<span style="color: #&&&&&&& String ip =&"";<span style="color: #&&&&&&& if (address !=&null) {<span style="color: #&&&&&&&&&&& ip = address.toString().trim();<span style="color: #&&&&&&&&&&& int index = ip.lastIndexOf(':');<span style="color: #&&&&&&&&&&& if (index &&<span style="color: #) {<span style="color: #0&&&&&&&&&&&&&&& index = ip.length();<span style="color: #1&&&&&&&&&&& }<span style="color: #2&&&&&&&&&&& ip = ip.substring(<span style="color: #, index);<span style="color: #3&&&&&&& }<span style="color: #4&&&&&&& if (ip.length() &&<span style="color: #) {<span style="color: #5&&&&&&&&&&& ip = ip.substring(Math.max(ip.indexOf("/") +&<span style="color: #, ip.length() -&<span style="color: #));<span style="color: #6&&&&&&& }<span style="color: #7&&&&&&& return<span style="color: #8&&& }<span style="color: #9}<span style="color: #0 三、服务器端实现 服务器端提供的功能是: 1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。 2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。 为了展示多协议的运用,这里客户端的请求采用的是基于问本行(\n\r)的协议。 具体代码如下: <span style="color: #package org.jboss.netty.example.<span style="color: #<span style="color: #import java.net.InetSocketA<span style="color: #import java.util.concurrent.E<span style="color: #<span style="color: #import org.jboss.netty.bootstrap.ServerB<span style="color: #import org.jboss.netty.channel.C<span style="color: #import org.jboss.netty.channel.ChannelP<span style="color: #import org.jboss.netty.channel.group.ChannelG<span style="color: #import org.jboss.netty.channel.group.ChannelGroupF<span style="color: #import org.jboss.netty.channel.group.DefaultChannelG<span style="color: #import org.jboss.netty.channel.socket.nio.NioServerSocketChannelF<span style="color: #import org.jboss.netty.example.xlsvr.codec.XLServerE<span style="color: #import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameD<span style="color: #import org.jboss.netty.handler.codec.frame.D<span style="color: #import org.jboss.netty.handler.codec.string.StringD<span style="color: #import org.jboss.netty.util.CharsetU<span style="color: #import org.slf4j.L<span style="color: #import org.slf4j.LoggerF<span style="color: #<span style="color: #/** *//**<span style="color: # *& &#64;author hankchen<span style="color: # *&
下午03:21:38<span style="color: #&*/<span style="color: #<span style="color: #public&class XLServer {<span style="color: #&&& public&static&final&int port =<span style="color: #80;<span style="color: #&&& public&static&final Logger logger=LoggerFactory.getLogger(XLServer.class);<span style="color: #&&& public&static&final ChannelGroup allChannels=new DefaultChannelGroup("XLServer");<span style="color: #&&& private&static&final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));<span style="color: #&&& <span style="color: #&&& public&static&void main(String [] args){<span style="color: #&&&&&&& try&{<span style="color: #&&&&&&&&&&& XLServer.startup();<span style="color: #&&&&&&& }&catch (Exception e) {<span style="color: #&&&&&&&&&&& e.printStackTrace();<span style="color: #&&&&&&& }<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& public&static&boolean startup() throws Exception{<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 采用默认ChannelPipeline管道<span style="color: #&&&&&&&& * 这意味着同一个XLServerHandler实例将被多个Channel通道共享<span style="color: #&&&&&&&& * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能!<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& ChannelPipeline pipeline=serverBootstrap.getPipeline(); <span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 解码器是基于文本行的协议,\r\n或者\n\r<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(<span style="color: #, Delimiters.lineDelimiter()));<span style="color: #&&&&&&& pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));<span style="color: #&&&&&&& pipeline.addLast("encoder", new XLServerEncoder());<span style="color: #&&&&&&& pipeline.addLast("handler", new XLServerHandler());<span style="color: #&&&&&&& <span style="color: #&&&&&&& serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀<span style="color: #&&&&&&& serverBootstrap.setOption("child.keepAlive", true); //注意child前缀<span style="color: #&&&&&&& <span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& Channel channel=serverBootstrap.bind(new InetSocketAddress(port));<span style="color: #&&&&&&& allChannels.add(channel);<span style="color: #&&&&&&& logger.info("server is started on port "+port);<span style="color: #&&&&&&& return&false;<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& public&static&void shutdown() throws Exception{<span style="color: #&&&&&&& try&{<span style="color: #&&&&&&&&&&& /** *//**<span style="color: #&&&&&&&&&&&& * 主动关闭服务器<span style="color: #&&&&&&&&&&&& */<span style="color: #&&&&&&&&&&& ChannelGroupFuture future=allChannels.close();<span style="color: #&&&&&&&&&&& future.awaitUninterruptibly();//阻塞,直到服务器关闭<span style="color: #&&&&&&&&&&& //serverBootstrap.releaseExternalResources();<span style="color: #&&&&&&& }&catch (Exception e) {<span style="color: #&&&&&&&&&&& e.printStackTrace();<span style="color: #&&&&&&&&&&& logger.error(e.getMessage(),e);<span style="color: #&&&&&&& }<span style="color: #&&&&&&& finally{<span style="color: #&&&&&&&&&&& logger.info("server is shutdown on port "+port);<span style="color: #&&&&&&&&&&& System.exit(<span style="color: #);<span style="color: #&&&&&&& }<span style="color: #&&& }<span style="color: #}<span style="color: # & & 1package org.jboss.netty.example.& 2& 3import java.util.R& 4& 5import org.jboss.netty.channel.C& 6import org.jboss.netty.channel.ChannelF& 7import org.jboss.netty.channel.ChannelHandlerC& 8import org.jboss.netty.channel.ChannelHandler.S& 9import org.jboss.netty.channel.ChannelStateE<span style="color: #import org.jboss.netty.channel.ExceptionE<span style="color: #import org.jboss.netty.channel.MessageE<span style="color: #import org.jboss.netty.channel.SimpleChannelH<span style="color: #import org.jboss.netty.example.xlsvr.vo.XLR<span style="color: #import org.slf4j.L<span style="color: #import org.slf4j.LoggerF<span style="color: #<span style="color: #/** *//**<span style="color: # *& &#64;author hankchen<span style="color: # *&
下午03:22:24<span style="color: #&*/<span style="color: #<span style="color: #&#64;Sharable<span style="color: #public&class XLServerHandler extends SimpleChannelHandler {<span style="color: #&&& private&static&final Logger logger=LoggerFactory.getLogger(XLServerHandler.class);<span style="color: #&&& <span style="color: #&&& &#64;Override<span style="color: #&&& public&void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {<span style="color: #&&&&&&& logger.info("messageReceived");<span style="color: #&&&&&&& if (e.getMessage() instanceof String) {<span style="color: #&&&&&&&&&&& String content=(String)e.getMessage();<span style="color: #&&&&&&&&&&& logger.info("content is "+content);<span style="color: #&&&&&&&&&&& if ("shutdown".equalsIgnoreCase(content)) {<span style="color: #&&&&&&&&&&&&&&& //e.getChannel().close();<span style="color: #&&&&&&&&&&&&&&& XLServer.shutdown();<span style="color: #&&&&&&&&&&& }else&{<span style="color: #&&&&&&&&&&&&&&& sendResponse(ctx);<span style="color: #&&&&&&&&&&& }<span style="color: #&&&&&&& }else&{<span style="color: #&&&&&&&&&&& logger.error("message is not a String.");<span style="color: #&&&&&&&&&&& e.getChannel().close();<span style="color: #&&&&&&& }<span style="color: #&&& }<span style="color: #<span style="color: #&&& &#64;Override<span style="color: #&&& public&void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {<span style="color: #&&&&&&& logger.error(e.getCause().getMessage(),e.getCause());<span style="color: #&&&&&&& e.getCause().printStackTrace();<span style="color: #&&&&&&& e.getChannel().close();<span style="color: #&&& }<span style="color: #<span style="color: #&&& &#64;Override<span style="color: #&&& public&void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {<span style="color: #&&&&&&& logger.info("channelConnected");<span style="color: #&&&&&&& sendResponse(ctx);<span style="color: #&&& }<span style="color: #<span style="color: #&&& &#64;Override<span style="color: #&&& public&void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {<span style="color: #&&&&&&& logger.info("channelClosed");<span style="color: #&&&&&&& //删除通道<span style="color: #&&&&&&& XLServer.allChannels.remove(e.getChannel());<span style="color: #&&& }<span style="color: #<span style="color: #&&& &#64;Override<span style="color: #&&& public&void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {<span style="color: #&&&&&&& logger.info("channelDisconnected");<span style="color: #&&&&&&& super.channelDisconnected(ctx, e);<span style="color: #&&& }<span style="color: #<span style="color: #&&& &#64;Override<span style="color: #&&& public&void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {<span style="color: #&&&&&&& logger.info("channelOpen");<span style="color: #&&&&&&& //增加通道<span style="color: #&&&&&&& XLServer.allChannels.add(e.getChannel());<span style="color: #&&& }<span style="color: #<span style="color: #&&& /** *//**<span style="color: #&&&& * 发送响应内容<span style="color: #&&&& * &#64;param ctx<span style="color: #&&&& * &#64;param e<span style="color: #&&&& * &#64;return<span style="color: #&&&& */<span style="color: #&&& private ChannelFuture sendResponse(ChannelHandlerContext ctx){<span style="color: #&&&&&&& Channel channel=ctx.getChannel();<span style="color: #&&&&&&& Random random=new Random();<span style="color: #&&&&&&& XLResponse response=new XLResponse();<span style="color: #&&&&&&& response.setEncode((byte)<span style="color: #);<span style="color: #&&&&&&& response.setResult(<span style="color: #);<span style="color: #&&&&&&& response.setValue("name","hankchen");<span style="color: #&&&&&&& response.setValue("time", String.valueOf(System.currentTimeMillis()));<span style="color: #&&&&&&& response.setValue("age",String.valueOf(random.nextInt()));<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 发送接收信息的时间戳到客户端<span style="color: #&&&&&&&& * 注意:Netty中所有的IO操作都是异步的!<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& ChannelFuture future=channel.write(response); //发送内容<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #}<span style="color: #0 四、客户端实现 客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。 关键代码如下:
1/** *//** 2 *& Copyright (C): 2012 3 *& &#64;author hankchen 4 *&
下午03:21:26 5&*/ 6 7/** *//** 8 * 服务器特征: 9 * 1、使用专用解码器解析服务器发过来的数据<span style="color: # * 2、客户端主动关闭连接<span style="color: #&*/<span style="color: #public&class XLClient {<span style="color: #&&& public&static&final&int port =XLServer.<span style="color: #&&& public&static&final String host ="localhost";<span style="color: #&&& private&static&final Logger logger=LoggerFactory.getLogger(XLClient.class);<span style="color: #&&& private&static&final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());<span style="color: #&&& private&static&final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory);<span style="color: #&&& <span style="color: #&&& /** *//**<span style="color: #&&&& * &#64;param args<span style="color: #&&&& * &#64;throws Exception <span style="color: #&&&& */<span style="color: #&&& public&static&void main(String[] args) throws Exception {<span style="color: #&&&&&&& ChannelFuture future=XLClient.startup();<span style="color: #&&&&&&& logger.info("future state is "+future.isSuccess());<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& /** *//**<span style="color: #&&&& * 启动客户端<span style="color: #&&&& * &#64;return<span style="color: #&&&& * &#64;throws Exception<span style="color: #&&&& */<span style="color: #&&& public&static ChannelFuture startup() throws Exception {<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式<span style="color: #&&&&&&&& * 例如,下面的代码形式是错误的:<span style="color: #&&&&&&&& * ChannelPipeline pipeline=clientBootstrap.getPipeline();<span style="color: #&&&&&&&& * pipeline.addLast("handler", new XLClientHandler());<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 请注意,这里不存在使用&#8220;child.&#8221;前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& clientBootstrap.setOption("tcpNoDelay", true);<span style="color: #&&&&&&& clientBootstrap.setOption("keepAlive", true);<span style="color: #&&&&&&& <span style="color: #&&&&&&& ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port));<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& future.awaitUninterruptibly();<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 如果连接失败,我们将打印连接失败的原因。<span style="color: #&&&&&&&& * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& if (!future.isSuccess()) {<span style="color: #&&&&&&&&&&& future.getCause().printStackTrace();<span style="color: #&&&&&&& }else&{<span style="color: #&&&&&&&&&&& logger.info("client is connected to server "+host+":"+port);<span style="color: #&&&&&&& }<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& /** *//**<span style="color: #&&&& * 关闭客户端<span style="color: #&&&& * &#64;param future<span style="color: #&&&& * &#64;throws Exception<span style="color: #&&&& */<span style="color: #&&& public&static&void shutdown(ChannelFuture future) throws Exception{<span style="color: #&&&&&&& try&{<span style="color: #&&&&&&&&&&& /** *//**<span style="color: #&&&&&&&&&&&& * 主动关闭客户端连接,会阻塞等待直到通道关闭<span style="color: #&&&&&&&&&&&& */<span style="color: #&&&&&&&&&&& future.getChannel().close().awaitUninterruptibly();<span style="color: #&&&&&&&&&&& //future.getChannel().getCloseFuture().awaitUninterruptibly();<span style="color: #&&&&&&&&&&& /** *//**<span style="color: #&&&&&&&&&&&& * 释放ChannelFactory通道工厂使用的资源。<span style="color: #&&&&&&&&&&&& * 这一步仅需要调用 releaseExternalResources()方法即可。<span style="color: #&&&&&&&&&&&& * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。<span style="color: #&&&&&&&&&&&& */<span style="color: #&&&&&&&&&&& clientBootstrap.releaseExternalResources();<span style="color: #&&&&&&& }&catch (Exception e) {<span style="color: #&&&&&&&&&&& e.printStackTrace();<span style="color: #&&&&&&&&&&& logger.error(e.getMessage(),e);<span style="color: #&&&&&&& }<span style="color: #&&&&&&& finally{<span style="color: #&&&&&&&&&&& System.exit(<span style="color: #);<span style="color: #&&&&&&&&&&& logger.info("client is shutdown to server "+host+":"+port);<span style="color: #&&&&&&& }<span style="color: #&&& }<span style="color: #} &
1public&class XLClientPipelineFactory implements ChannelPipelineFactory{ 2 3&&& &#64;Override 4&&& public ChannelPipeline getPipeline() throws Exception { 5&&&&&&& ChannelPipeline pipeline=Channels.pipeline(); 6&&&&&&& /** *//** 7&&&&&&&& * 使用专用的解码器,解决数据分段的问题 8&&&&&&&& * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。 9&&&&&&&& */<span style="color: #&&&&&&& pipeline.addLast("decoder", new XLClientDecoder());<span style="color: #&&&&&&& /** *//**<span style="color: #&&&&&&&& * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了!<span style="color: #&&&&&&&& */<span style="color: #&&&&&&& pipeline.addLast("handler", new XLClientHandler());<span style="color: #&&&&&&& return<span style="color: #&&& }<span style="color: #<span style="color: #} &
1/** *//** 2 *& Copyright (C): 2012 3 *& &#64;author hankchen 4 *&
下午03:21:52 5&*/ 6 7/** *//** 8 * 服务器特征: 9 * 1、使用专用的编码解码器,解决数据分段的问题<span style="color: # * 2、使用POJO替代ChannelBuffer传输<span style="color: #&*/<span style="color: #public&class XLClientHandler extends SimpleChannelHandler {<span style="color: #&&& private&static&final Logger logger=LoggerFactory.getLogger(XLClientHandler.class);<span style="color: #&&& private&final AtomicInteger count=new AtomicInteger(<span style="color: #); //计数器<span style="color: #&&& <span style="color: #&&& &#64;Override<span style="color: #&&& public&void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {<span style="color: #&&&&&&& processMethod1(ctx, e); //处理方式一<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& /** *//**<span style="color: #&&&& * &#64;param ctx<span style="color: #&&&& * &#64;param e<span style="color: #&&&& * &#64;throws Exception<span style="color: #&&&& */<span style="color: #&&& public&void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{<span style="color: #&&&&&&& logger.info("processMethod1&#8230;&#8230;,count="+count.addAndGet(<span style="color: #));<span style="color: #&&&&&&& XLResponse serverTime=(XLResponse)e.getMessage();<span style="color: #&&&&&&& logger.info("messageReceived,content:"+serverTime.toString());<span style="color: #&&&&&&& Thread.sleep(<span style="color: #00);<span style="color: #&&&&&&& <span style="color: #&&&&&&& if (count.get()&<span style="color: #) {<span style="color: #&&&&&&&&&&& //从新发送请求获取最新的服务器时间<span style="color: #&&&&&&&&&&& ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again\r\n".getBytes()));<span style="color: #&&&&&&& }else{<span style="color: #&&&&&&&&&&& //从新发送请求关闭服务器<span style="color: #&&&&&&&&&&& ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown\r\n".getBytes()));<span style="color: #&&&&&&& }<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& &#64;Override<span style="color: #&&& public&void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {<span style="color: #&&&&&&& logger.info("exceptionCaught");<span style="color: #&&&&&&& e.getCause().printStackTrace();<span style="color: #&&&&&&& ctx.getChannel().close();<span style="color: #&&& }<span style="color: #<span style="color: #&&& &#64;Override<span style="color: #&&& public&void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {<span style="color: #&&&&&&& logger.info("channelClosed");<span style="color: #&&&&&&& super.channelClosed(ctx, e);<span style="color: #&&& }<span style="color: #&&& <span style="color: #&&& <span style="color: #} 全文代码较多,写了很多注释,希望对读者有用,谢谢!(友情提示:本博文章欢迎转载,但请注明出处:hankchen,)
发现XLCharSetFactory类没有提供,能贴出来一下嘛?&&&&
&re: 使用Netty实现通用二进制协议的高效数据传输
质量很高的一篇博文,谢谢分享!!&&&&
&re: 使用Netty实现通用二进制协议的高效数据传输[未登录]
受益匪浅,感谢分享!&&&&
&re: 使用Netty实现通用二进制协议的高效数据传输
lz写的不错,我也写了一些文章,分享给各位道友:&&&&
&re: 使用Netty实现通用二进制协议的高效数据传输[未登录]
能问下netty对于实时传输支持吗,怎么实现(思路)&&&&

我要回帖

更多关于 netty 接收中文乱码 的文章

 

随机推荐