上一篇提到了,根据 Netty 来将 HTTP 升级为 WebSocket 协议,并且通过心跳机制来关闭因为各类操作导致的连接未关闭的问题 。
这一篇是核心内容,我们将通过WebSocket 进行聊天。
本篇内容的示例源码还是通过kotlin
编写,继续给大家安利 kotlin
服务端和客户端要进行通信,必然先定义好双方都统一的交互实体,这里给出了一个简单的定义,请根据自己的内容扩展:
class TransferInfo {
var id: String = IdUtil.nextStrId()
/**
* 该信息是否是自己所发
*/
var me: Boolean = false
var from: String = "SERVER"
var userName: String = "SERVER"
var to: String = ""
//标志不同的行为
var cmd: String = ""
var msgType: String = "0"
var msg: String = ""
var extra: String = ""
var timestamp: String = System.currentTimeMillis().toString()
}
上面给出的是一个非常简单的定义,
msg
表示的每次通信的核心消息cmd
表示的不同的命令,比如绑定、聊天、禁言等等操作要处理这些消息,需要实现一个基本的接口:
interface IMsgProcessor {
fun cmd(): String
fun process(ctx: ChannelHandlerContext, body: TransferInfo)
/**
* 接口中的实现类,利用java8后接口中可以有方法的特性
* 优势如下
* 1.避免再写一个抽象类了
* 2.避免使用时候强转
*
* @param ctx ChannelHandlerContext
* @param body TransferInfo
*/
fun asyncProcess(ctx: ChannelHandlerContext, body: TransferInfo) {
val executor = SpringBeanHelper.getBean(ThreadPoolTaskExecutor::class.java)
executor.execute {
process(ctx, body)
}
}
}
同时当通过一个实现类的容器类,将这些实现类全部收纳起来,本质就是一个Map<CMD,IMsgProcessor>
@Repository
class ProcessorHolder : ApplicationObjectSupport() {
/**
* 利用 kotlin 特性 延迟初始化获取到的 holder
* 该特性要保证 Spring 初始化后才能够使用
*/
val holder: Map<String, IMsgProcessor> by lazy {
applicationContext!!.getBeansOfType(IMsgProcessor::class.java)
.values.associateBy { it.cmd() }
}
}
by lazy
利用了kotlin
延迟加载的特性,也可以替换InitingBean
这一类在 Bean
加载完成后该操作。Spring
容器中的IMsgProcessor
实现类,通过实现类的cmd()
方法获取到该类处理的任务CMD
,并将其作为Map
的Key
,而 Value
则是实现类。接下来就是消息的入口了。
后端会有一个XxxHandler
来接收传过来的实体,这里简单给出一个模型供大家参考:
@ChannelHandler.Sharable
@Component
class IMWebsocketHandler : SimpleChannelInboundHandler<WebSocketFrame>() {
@Autowired
private lateinit var processorHolder: ProcessorHolder
override fun channelRead0(ctx: ChannelHandlerContext, frame: WebSocketFrame) {
when (frame) {
is TextWebSocketFrame -> {
//获取消息内容text
val body = frame.retain().text()
//... 省略了一些判定代码
//转为实体类
val transferInfo = parseObj(body)
//获取 cmd
val cmd = transferInfo.cmd
processorHolder.holder[cmd]?.let {
it.asyncProcess(ctx, transferInfo)
}
}
// 省略了一些其他代码...
}
}
channelRead0
,在其中读取传递过来的参数,然后根据不同的 cmd
处理需求。ProcessorHolder
获取CMD
处理器的容器,然后找到对应的实体类来进行对应的操作。前文有提到,Websocket 不同于 HTTP是长连接,且是有状态的。所以在客户端连接了后端Websocket
后,在没有正式开始通信前,先要将通道
同用户
进行绑定,后续信息交流的时候,就不需要每次都把标注用户身份的token
带上。
定义的绑定命令可以类似如下操作:
{
"id":"123123",
"cmd":"001",
"msg":"token..."
}
后端的处理器内容类似如下:
@Service
class BindProcessImpl : IMsgProcessor {
//连接的管理器
@Autowired
private lateinit var channelContextManager: ChannelContextManager
companion object{
private val USER_INFO: AttributeKey<UserInfo> = AttributeKey.valueOf("mt-user-info")
}
override fun cmd(): String {
return "001"
}
/**
* 处理 cmd的核心接口
*/
override fun process(ctx: ChannelHandlerContext, body: TransferInfo) {
val userInfo=bodyToUserInfo()
ctx.channel().attr(USER_INFO).set(userInfo)
channelContextManager.imUserBind(userInfo , ctx)
}
}
这里有一个新的东西就是ChannelContextManager
,我们来看看他到底是什么
每个用户连接后,服务端需要将这个连接保存下来。当需要对这个用户发送消息的时候,调用这个连接来进行消息发送。
ChannelContextManager
的作用就是这个,我们来看看具体的代码
@Repository
class ChannelContextManager {
companion object {
private val IM_USER_CHANNEL = ConcurrentHashMap<String, ChannelHandlerContext>()
}
fun imUserBind(userInfo: MTUserInfo, ctx: ChannelHandlerContext) {
IM_USER_CHANNEL[userInfo.userId.toString()] = ctx
//添加通道关闭的事件该监听器,方便移除资源
ctx.channel().closeFuture().addListener {
IM_USER_CHANNEL.remove(userInfo.userId,ctx)
}
}
fun userChannel(uId: String): ChannelHandlerContext? {
return IM_USER_CHANNEL[uId]
}
}
Map
容器IM_USER_CHANNEL
用来保存用户和其连接imUserBind
中定义了这个连接的关闭Listener
,当连接关闭后会自动触发IM_USER_CHANNEL.remove(userInfo.userId,ctx)
可以思考下:通过 Listener 来处理和自己捕捉关闭事件来处理的优势是什么?
用户的连接建立了,通道绑定了,接下来就是本文的目的了消息发送
首先为了方便消息发送,我们定义一个工具类:
class WbUtil {
companion object {
fun sendMsg(ctx: ChannelHandlerContext, msgBody: String) {
ctx.channel().writeAndFlush(TextWebSocketFrame(msgBody))
}
}
}
接下来就是聊天的消息处理了
@Component
class ChatProcessImpl : IMsgProcessor {
@Autowired
private lateinit var channelContextManager: ChannelContextManager
override fun cmd(): String {
return "110"
}
override fun process(ctx: ChannelHandlerContext, body: TransferInfo) {
val uId = body.to
channelContextManager.userChannel(uId)?.let {
//填充一些其他参数
fillBody(body)
WbUtil.sendMsg(it, body.toJson())
}
}
}
channelContextManager
获取到目的用户to
的通道真正的IM
系统绝对不止这么简单,还有很多需要考虑到,比如:
更多的是细节问题,需要结合各自的业务,一个一个分析进行处理。所以我这里仅仅是给出了一个大大的框架模板,具体的细节和更多的内容,请结合自己的业务完善吧。
后续有空,将这部分代码整理后,开源出来。