最近为公司的某个业务实现了一个基于Netty
的IM
聊天应用的服务端,这里将关键思路和重点技术总结下,希望能对大家有个帮助。
这是 IM
系列第二篇。
Netty
是一门强大的NIO
框架,如果涉及到网络IO
,大多数情况都请应该选择他作为底层的IO
框架。
接下来你会了解下如下几个知识点:
HTTP
升级为 WebSocket
老规矩,直接上代码:
private fun startNetty() {
try {
val begin = System.currentTimeMillis()
//初始化
bossGroup = NioEventLoopGroup()
workGroup = NioEventLoopGroup()
serverBootstrap = ServerBootstrap().group(bossGroup, workGroup) //boss辅助客户端的tcp连接请求 worker负责与客户端之前的读写操作
.channel(NioServerSocketChannel::class.java) //配置客户端的channel类型
.option(ChannelOption.SO_BACKLOG, 1024) //配置TCP参数,握手字符串长度设置
.childOption(ChannelOption.SO_KEEPALIVE, true)//开启心跳包活机制,就是客户端、服务端建立连接处于ESTABLISHED状态,超过2小时没有交流,机制会被启动
.childOption(ChannelOption.RCVBUF_ALLOCATOR, FixedRecvByteBufAllocator(592048))//配置固定长度接收缓存区分配器
.childHandler(childChannelHandler) //绑定I/O事件的处理,类,WebSocketChildChannelHandler中定义
val end = System.currentTimeMillis()
channelFuture = serverBootstrap.bind(port).sync().addListener {
if (it.isSuccess) {
log.warn("[im-启动]Netty Websocket服务器启动\"成功\",耗时 " + (end - begin) + " ms,已绑定端口 " + port + " 阻塞式等候客户端连接")
} else {
log.warn("[im-启动]Netty Websocket服务器启动\"失败\",耗时 " + (end - begin) + " ms,已绑定端口 " + port + " 阻塞式等候客户端连接")
}
}
} catch (e: Exception) {
log.error("[im-启动]启动netty时报错", e)
bossGroup.shutdownGracefully()
workGroup.shutdownGracefully()
}
}
上面的就是 Netty
负责 IO
连接的模板代码,用过 Netty
的,都应该明白。
其中最重要的是childHandler(childChannelHandler)
,这个childChannelHandler
是根据业务需要实现的IO
处理器。
比如我为了解析WebSocket
实现如下:
@Component
class IMChildHandlerInitializer : ChannelInitializer<SocketChannel>() {
@Autowired
private lateinit var webSocketServerHandler: IMWebsocketHandler
override fun initChannel(ch: SocketChannel) {
//处理基本的 HTTP请求
ch.pipeline().addLast("http-codec", HttpServerCodec())
ch.pipeline().addLast("aggregator", HttpObjectAggregator(64 * 1024))
ch.pipeline().addLast("http-chunked", ChunkedWriteHandler())
//http 的请求升级到 websocket 请求有 2 种方式,一种是自己写,一种是 交给netty内置的连接来处理
ch.pipeline().addLast("http-up-wb", WebSocketServerProtocolHandler("/ws"))
ch.pipeline().addLast("heart-notice",IdleStateHandler(60, 60, 0, TimeUnit.SECONDS))
ch.pipeline().addLast("heart-handler", heartHandler)
ch.pipeline().addLast("websocket-handler", webSocketServerHandler)
}
}
websocket
是http
协议升级来的,所以最前 3 行代码都是解析http
addLast("http-up-wb", WebSocketServerProtocolHandler("/ue-ws"))
使用内置的WebSocketServerProtocolHandler
将 http
升级到websocket
协议。其中/ws
就是连接的后缀,你可以自己修改。addLast("heart-notice",IdleStateHandler(60, 60, 0, TimeUnit.SECONDS))
是一个内置的心跳处理器,会在没有读\写超过规定时间的时候,触发对应的接口。addLast("heart-handler", heartHandler)
通过心跳来判定用户是否下线addLast("websocket-handler", webSocketServerHandler)
这个就是核心的websocket
业务处理类了。websocket
是长连接,当客户端异常退出或者路由器断线等特殊情况的时候,连接可能并不会自动关闭。但是对于应用而言,每一个连接都是宝贵的,所以不能让冗余的连接存在来浪费资源。
为了处理这种实际上断开了连接,但是应用没有断开的情况,需要增加了心跳机制。
同时因为连接是客户端发起的,所以应该选择让客户端主动来发起心跳,服务端根据心跳来进行处理。
下面是服务端处理心跳的简化代码。
@Component
@ChannelHandler.Sharable
class HeartHandler : ChannelInboundHandlerAdapter() {
companion object {
/**
* 用户的基本信息
*/
private val LOSE_HEART_TIMES: AttributeKey<Int> = AttributeKey.valueOf("lose-heart-times")
/**
* 日志
*/
private val log: Logger = LoggerFactory.getLogger(javaClass)
/**
* 默认测次数,触发该事件,就应该是一次了
*/
private const val DEFAULT_TIMES: Int = 1
/**
* 最大次数
*/
private const val LIMIT_LOSE_TIMES: Int = 2
}
@Autowired
private lateinit var channelContextManager: ChannelContextManager
/**
* 只会触发一次的激活回调
*
* @param ctx ChannelHandlerContext
*/
override fun channelActive(ctx: ChannelHandlerContext) {
ctx.channel().attr(LOSE_HEART_TIMES).set(DEFAULT_TIMES)
super.channelActive(ctx)
}
/**
* 通用事件的处理,因为在此之前注册了心跳事件,所以这里触发的应该为心跳事件
*
* @param ctx ChannelHandlerContext
* @param evt Any
*/
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt !is IdleStateEvent)
return
when (evt.state()) {
//某个 channel 读超时
IdleState.READER_IDLE -> {
if (loseHeartTimesOverLimit(ctx)) {
log.error("[im-心跳死亡]读超时,下线处理,用户下线:{}...")
ctx.close()
} else {
log.error("[im-心跳丢失]读超时{}...", channelContextManager.mtAttr(ctx)?.userInfo?.realName)
}
}
}
ctx.writeAndFlush(TextWebSocketFrame("服务端心跳事件..."))
}
/**
* 判定心跳的次数是否已经大于阈值
*
* @param ctx ChannelHandlerContext
*/
private fun loseHeartTimesOverLimit(ctx: ChannelHandlerContext): Boolean {
val times = ctx.channel().attr(LOSE_HEART_TIMES).get()!!
return if (times >= LIMIT_LOSE_TIMES) {
true
} else {
ctx.channel().attr(LOSE_HEART_TIMES).set(times + 1)
false
}
}
}
HeartHandler
是交给 Spring 进行管理的,所以会在多个连接中共享这个变量,按照Netty
要求,我们需要采用@ChannelHandler.Sharable
注解来注释该类,示意该变量是共享的。【备注1】
复写了channelActive
事件,作用是客户端第一次连接将心跳超时这个变量同这个Channel
绑定
复写了userEventTriggered
并且处理了IdleState.READER_IDLE
这个服务端读的事件,心跳判定就在这个这个事件里面处理。
loseHeartTimesOverLimit
利用了Channel
绑定的参数LOSE_HEART_TIMES
来判定当前是否已经超过了断线阈值。
本篇文章主要谈到了 2 个知识点:
Netty
来启动Websocket
的启动代码下一篇我们来讲讲具体的通信业务编写。
【备注1】20200722 最近看到SOFARPC
以及部分文章有提到,心跳的Handler
不应该共享,我后续代码已经改为不共享了,虽然共享我也没有发现出什么问题...~~~~