Netty - 知识点

共享handler

Netty的逻辑是,每次有连接到来的时候,都会调用ChannelInitializer的initChannel()方法,然后pipeline中的所有handler都会被new一次,但是这里大部分的handler内部都是没有成员变量的,也就是说无状态的,我们可以使用单例模式,即调用pipeline().addLast()方法的时候,都直接使用单例,不需要每次都new,提高效率也避免了创建很多小的对象,注意:如果一个handler要被多个channel进行共享,必须要加上@ChannelHandler.Sharable显示地标明这个handler是支持多个channel共享的,否则会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1. 加上注解标识,表明该 handler 是可以多个 channel 共享的
@ChannelHandler.Sharable
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {

// 2. 构造单例
public static final LoginRequestHandler INSTANCE = new LoginRequestHandler();

protected LoginRequestHandler() {
}

}
...
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// ...单例模式,多个 channel 共享同一个 handler
ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
// ...
}
});

压缩handler

利用MessageToMessageCodec

使用它可以让我们的编解码操作放到一个类里面去实现,它是一个无状态的handler,因此可以使用单例模式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> {
public static final PacketCodecHandler INSTANCE = new PacketCodecHandler();

private PacketCodecHandler() {

}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) {
out.add(PacketCodec.INSTANCE.decode(byteBuf));
}

@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {
ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
PacketCodec.INSTANCE.encode(byteBuf, packet);
out.add(byteBuf);
}
}

缩短事件传播路径

压缩handler - 合并平行handler

平行handler即一个消息只会有一个handler触发处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@ChannelHandler.Sharable
public class IMHandler extends SimpleChannelInboundHandler<Packet> {
public static final IMHandler INSTANCE = new IMHandler();

private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> handlerMap;

private IMHandler() {
handlerMap = new HashMap<>();

handlerMap.put(MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
handlerMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);
handlerMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);
handlerMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);
handlerMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);
handlerMap.put(GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);
handlerMap.put(LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {
handlerMap.get(packet.getCommand()).channelRead(ctx, packet);
}
}

更改事件传播源

如果outBound类型的handler较多,在写数据的时候能用ctx.writeAndFlush()就用这个方法
ctx.writeAndFlush()是从pipeline链中的当前节点开始往前找到第一个outBound类型的handler把对象往前进行传播,如果这个对象确认不需要经过其他outBound处理,就使用这个方法
ctx.channel().writeAndFlush()是从pipeline链中的最后一个outBound类型的handler开始,把对象往前进行传播,如果确认这个对象需要经过后面的outBound类型的handler,就使用这个方法

减少阻塞主线程的操作

默认情况下,Netty在启动的时候会开启2倍的CPU核数个NIO线程,而通常情况下我们单机会有几万或者十几万的连接,因此一条NIO线程会管理着几千或几万个连接,在传播事件的过程中,单条NIO线程的处理逻辑可以抽象成以下逻辑:

1
2
3
4
5
6
List<Channel> channelList = 已有数据可读的 channel
for (Channel channel in channelist) {
for (ChannelHandler handler in channel.pipeline()) {
handler.channelRead0();
}
}

只要有一个handler中的channelRead0()方法阻塞了NIO线程,都会拖慢绑定在该NIO线程上的所有的channel,我们需要把耗时的操作丢到我们的业务线程池中处理:

1
2
3
4
5
6
7
8
9
10
ThreadPool threadPool = xxx;

protected void channelRead0(ChannelHandlerContext ctx, T packet) {
threadPool.submit(new Runnable() {
// 1. balabala 一些逻辑
// 2. 数据库或者网络等一些耗时的操作
// 3. writeAndFlush()
// 4. balabala 其他的逻辑
})
}

准确统计处理时长

writeAndFlush()这个方法如果在非NIO线程中执行,它是一个异步的操作,调用之后是会立即返回的,剩下的所有操作都是Netty内部有一个任务队列异步执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void channelRead0(ChannelHandlerContext ctx, T packet) {
threadPool.submit(new Runnable() {
long begin = System.currentTimeMillis();
// 1. balabala 一些逻辑
// 2. 数据库或者网络等一些耗时的操作

// 3. writeAndFlush
xxx.writeAndFlush().addListener(future -> {
if (future.isDone()) {
// 4. balabala 其他的逻辑
long time = System.currentTimeMillis() - begin;
}
});
})
}

心跳与定时检测

服务端空闲检测

服务端对于连接假死的应对策略就是空闲检测,Netty自带的IdelStateHandler就可以实现这个功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class IMIdleStateHandler extends IdleStateHandler {

private static final int READER_IDLE_TIME = 15;

public IMIdleStateHandler() {
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}

@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
System.out.println(READER_IDLE_TIME + "秒内未读到数据,关闭连接");
ctx.channel().close();
}
}

调用了父类IdleStateHandler的构造函数,有四个参数:

  1. 读空闲时间
  2. 写空闲时间
  3. 读写空闲时间,表示我们不关心这两类条件
  4. 时间单位

    客户端定时发送心跳

    服务端在一段时间内没有收到客户单数据,产生的原因可以分为以下两种:
  5. 连接假死
  6. 非假死状态下确实没有发送数据
    我们需要排除掉第二种可能性,那么连接自然就是假死的,我们可以在客户端定期发送数据到服务端,通常称为心跳数据包
1
2
3
4
5
6
7
8
9
10
11
12
13
public class HeartBeatTimerHandler extends ChannelInboundHandlerAdapter {

private static final int HEARTBEAT_INTERVAL = 5;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.executor().scheduleAtFixedRate(() -> {
ctx.writeAndFlush(new HeartBeatRequestPacket());
}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);

super.channelActive(ctx);
}
}

ctx.executor()返回的是当前的channel绑定的NIO线程,scheduleAtFixedRate(),类型JDK的定时任务机制,可以每隔一段时间执行一个任务

服务端回复心跳与客户端空闲检测

客户端空闲检测与服务端一样,在客户端pipeline的最前方插入IdleStateHandler
为了排除是否是因为服务端非假死状态下确实没有发送数据,服务端也要定期发送心跳给客户端,服务端只要在收到心跳之后回复客户端,给客户端发送一个心跳响应包即可

评论