# 处理器
作用:主要用于处理网络I/O事件。例如记录日志、对消息进行编解码等。
netty发送和接收数据handler处理器 主要是继承 SimpleChannelInboundHandler 和 ChannelInboundHandlerAdapter
一般用netty来发送和接收数据都会继承SimpleChannelInboundHandler和ChannelInboundHandlerAdapter这两个抽象类,那么这两个到底有什么区别呢?
其实用这两个抽象类是有讲究的,在客户端的业务Handler继承的是SimpleChannelInboundHandler,而在服务器端继承的是ChannelInboundHandlerAdapter。
最主要的区别就是SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuffer资源(自动调用Bytebuffer.release())。而为何服务器端不能用呢,因为我们想让服务器把客户端请求的数据发送回去,而服务器端有可能在channelRead方法返回前还没有写完数据,因此不能让它自动release。
handler处理器 内置 方法
# channelActive
通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用
# channelRead
这个必须用啊,当收到对方发来的数据后,就会触发,参数msg就是发来的信息,可以是基础类型,也可以是序列化的复杂对象。
# channelReadComplete
channelRead执行后触发
# exceptionCaught
出错是会触发,做一些错误处理
/**
* netty服务器的监听 处理器
*
* @author flm 2017年10月27日
*/
public class IOHandler extends ChannelInboundHandlerAdapter {
private static Logger log = Logger.getLogger(IOHandler.class);
//netty AttributeKey 相对于 web session【重要】
public static final AttributeKey<DeviceSession> KEY = AttributeKey.valueOf("IO");
private Producer producer;
public IOHandler(Producer producer){
this.producer=producer;
}
/**
* 读取数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DeviceSession session = ctx.channel().attr(KEY).get(); // 检测是否 自己注册的 客户端
ByteBuf buffer=(ByteBuf) msg;
if (buffer == null||session == null) {
closeConnection(ctx); // 关闭连接
}
MsgEntity msgEntity = new MsgEntity(buffer); // 解码 buffer 封装 msgEntity
log.info("# Accept Client data :"+msgEntity.toString());
if (MsgType.UNKNOW == msgEntity.getMsgType()) {
log.info("# 客户端 发送数据 类型未定义... :"+msgEntity.toString());
return;
}
if(!session.isActivity()){
session.setActivity(true);
session.setImei(msgEntity.getImei());
SessionManager.getSingleton().addClient(session);
}
producer.putData(msgEntity);
}
/**
* 客户端 注册
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
log.info(String.format("# client registered...: %s ...", ctx.channel()));
DeviceSession session = new DeviceSession(ctx.channel());
// 绑定客户端到SOCKET
ctx.channel().attr(KEY).set(session);
}
/**
* 客户端 失去连接
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception
{
super.channelInactive(ctx);
log.info(String.format("# client out... : %s", ctx.channel()));
DeviceSession session = ctx.channel().attr(KEY).getAndSet(null);
// 移除 session 并删除 该客户端
SessionManager.getSingleton().removeClient(session, true);
if(session.getDeviceID() != null)
{
// producer.onData(new Request(new RootMessage(MessageType.LOGOUT, null, null), session));
}
}
/**
* 心跳机制 用户事件触发
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
{
if (evt instanceof IdleStateEvent)
{
IdleStateEvent e = (IdleStateEvent) evt;
//检测 是否 这段时间没有和服务器联系
if (e.state() == IdleState.ALL_IDLE)
{
//检测心跳
checkIdle(ctx);
}
}
super.userEventTriggered(ctx, evt);
}
/**
* 报错 处理事件
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
log.error("# 客户端连接 Netty 出错...");
cause.printStackTrace();
//关闭连接
closeConnection(ctx);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123