Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 把 NettyServer 的创建交给 Spring 管理
@Component
public class NettyServer {

private Logger logger = LoggerFactory.getLogger(getClass());

@Value("${netty.port}")
private Integer port;

@Resource
private NettyServerHandlerInitializer nettyServerHandlerInitializer;

// boss 线程组,用于服务端接受客户端的连接
private EventLoopGroup bossGroup = new NioEventLoopGroup();

// worker 线程组,用于服务端接受客户端的数据读写
private EventLoopGroup workerGroup = new NioEventLoopGroup();

// Netty Server Channel
private Channel channel;

// 启动 Netty Server
@PostConstruct
public void start() throws InterruptedException {
// 创建 ServerBootstrap 对象,用于 Netty Server 启动
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置 ServerBootstrap 的各种属性
// 设置两个 EventLoopGroup 对象
bootstrap.group(bossGroup, workerGroup)
// 指定 Channel 为服务端 NioServerSocketChannel
.channel(NioServerSocketChannel.class)
// 设置 Netty Server 的端口
.localAddress(new InetSocketAddress(port))
// 服务端 accept 队列的大小
.option(ChannelOption.SO_BACKLOG, 1024)
// TCP Keepalive 机制,实现 TCP 层级的心跳保活功能
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 允许较小的数据包的发送,降低延迟
.childOption(ChannelOption.TCP_NODELAY, true)
// 处理器
.childHandler(nettyServerHandlerInitializer);
// 绑定端口,并同步等待成功,即启动服务端
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
channel = future.channel();
logger.info("[start][Netty Server 启动在 {} 端口]", port);
}
}

// 关闭 Netty Server
@PreDestroy
public void shutdown() {
// 关闭 Netty Server
if (channel != null) {
channel.close();
}
// <3.2> 优雅关闭两个 EventLoopGroup 对象
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}

NettyServerHandlerInitializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {

/**
* 心跳超时时间
*/
private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;

@Resource
private NettyServerHandler nettyServerHandler;

@Override
protected void initChannel(Channel ch) {
// 获得 Channel 对应的 ChannelPipeline
ChannelPipeline channelPipeline = ch.pipeline();
// 添加一堆 NettyServerHandler 到 ChannelPipeline 中
channelPipeline
// 空闲检测
.addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS))
// 编码器
.addLast(new InvocationEncoder())
// 解码器
.addLast(new InvocationDecoder())
// 消息分发器
.addLast(messageDispatcher)
// 服务端处理器
.addLast(nettyServerHandler)
;
}

}