驽马十驾 驽马十驾

驽马十驾,功在不舍

目录
IM系列3:消息处理框架的实现
/  

IM系列3:消息处理框架的实现

开篇

上一篇提到了,根据 Netty 来将 HTTP 升级为 WebSocket 协议,并且通过心跳机制来关闭因为各类操作导致的连接未关闭的问题 。

这一篇是核心内容,我们将通过WebSocket 进行聊天。

本篇内容的示例源码还是通过kotlin编写,继续给大家安利 kotlin

核心

消息体

服务端和客户端要进行通信,必然先定义好双方都统一的交互实体,这里给出了一个简单的定义,请根据自己的内容扩展:

class TransferInfo {

    var id: String = IdUtil.nextStrId()

    /**
     * 该信息是否是自己所发
     */
    var me: Boolean = false

    var from: String = "SERVER"

    var userName: String = "SERVER"

    var to: String = ""
  
		//标志不同的行为
    var cmd: String = ""

    var msgType: String = "0"

    var msg: String = ""

    var extra: String = ""
    
    var timestamp: String = System.currentTimeMillis().toString()
}

上面给出的是一个非常简单的定义,

  • msg表示的每次通信的核心消息
  • cmd表示的不同的命令,比如绑定、聊天、禁言等等操作

消息处理接口

要处理这些消息,需要实现一个基本的接口:

interface IMsgProcessor {
	
    fun cmd(): String

    fun process(ctx: ChannelHandlerContext, body: TransferInfo)

    /**
     * 接口中的实现类,利用java8后接口中可以有方法的特性
     * 优势如下
     * 1.避免再写一个抽象类了
     * 2.避免使用时候强转
     *
     * @param ctx ChannelHandlerContext
     * @param body TransferInfo
     */
    fun asyncProcess(ctx: ChannelHandlerContext, body: TransferInfo) {
        val executor = SpringBeanHelper.getBean(ThreadPoolTaskExecutor::class.java)
        executor.execute {
            process(ctx, body)
        }
    }
}

消息处理实现类容器

同时当通过一个实现类的容器类,将这些实现类全部收纳起来,本质就是一个Map<CMD,IMsgProcessor>

@Repository
class ProcessorHolder : ApplicationObjectSupport() {

    /**
     * 利用 kotlin 特性 延迟初始化获取到的 holder
     * 该特性要保证 Spring 初始化后才能够使用
     */
    val holder: Map<String, IMsgProcessor> by lazy {
        applicationContext!!.getBeansOfType(IMsgProcessor::class.java)
      					.values.associateBy { it.cmd() }
    }

}
  • 上述写法by lazy利用了kotlin延迟加载的特性,也可以替换InitingBean这一类在 Bean加载完成后该操作。
  • 核心思路就是:找到Spring容器中的IMsgProcessor实现类,通过实现类的cmd()方法获取到该类处理的任务CMD,并将其作为MapKey,而 Value则是实现类。

接下来就是消息的入口了。

消息处理入口

后端会有一个XxxHandler来接收传过来的实体,这里简单给出一个模型供大家参考:

@ChannelHandler.Sharable
@Component
class IMWebsocketHandler : SimpleChannelInboundHandler<WebSocketFrame>() {
    
    @Autowired
    private lateinit var processorHolder: ProcessorHolder
  
    override fun channelRead0(ctx: ChannelHandlerContext, frame: WebSocketFrame) {
       when (frame) {
            is TextWebSocketFrame -> {
                //获取消息内容text
                val body = frame.retain().text()
              	//... 省略了一些判定代码
  							//转为实体类
                val transferInfo = parseObj(body)
								//获取 cmd
                val cmd = transferInfo.cmd
                processorHolder.holder[cmd]?.let {
                    it.asyncProcess(ctx, transferInfo)
                }
            }
          	// 省略了一些其他代码... 
        }
    }
  • 复写了消息读取接口channelRead0,在其中读取传递过来的参数,然后根据不同的 cmd处理需求。
  • 通过ProcessorHolder获取CMD处理器的容器,然后找到对应的实体类来进行对应的操作。

通道绑定

前文有提到,Websocket 不同于 HTTP是长连接,且是有状态的。所以在客户端连接了后端Websocket后,在没有正式开始通信前,先要将通道用户进行绑定,后续信息交流的时候,就不需要每次都把标注用户身份的token带上。

定义的绑定命令可以类似如下操作:

{
  "id":"123123",
  "cmd":"001",
  "msg":"token..."
}
  • cmd 标志当前行为是 绑定
  • msg 是标志用户的 token 或者 userId

后端的处理器内容类似如下:

@Service
class BindProcessImpl : IMsgProcessor {
  	
    //连接的管理器
    @Autowired
    private lateinit var channelContextManager: ChannelContextManager
  
    companion object{
        private val USER_INFO: AttributeKey<UserInfo> = AttributeKey.valueOf("mt-user-info")
    }
    
    override fun cmd(): String {
        return "001"
    }

    /**
     * 处理 cmd的核心接口
     */
    override fun process(ctx: ChannelHandlerContext, body: TransferInfo) {
        val userInfo=bodyToUserInfo()
        ctx.channel().attr(USER_INFO).set(userInfo)
        channelContextManager.imUserBind(userInfo , ctx)
    }

}

这里有一个新的东西就是ChannelContextManager,我们来看看他到底是什么

Channel容器

每个用户连接后,服务端需要将这个连接保存下来。当需要对这个用户发送消息的时候,调用这个连接来进行消息发送。

ChannelContextManager的作用就是这个,我们来看看具体的代码

@Repository
class ChannelContextManager {

    companion object {
        private val IM_USER_CHANNEL = ConcurrentHashMap<String, ChannelHandlerContext>()
    }

    fun imUserBind(userInfo: MTUserInfo, ctx: ChannelHandlerContext) {
        IM_USER_CHANNEL[userInfo.userId.toString()] = ctx
        //添加通道关闭的事件该监听器,方便移除资源
 				ctx.channel().closeFuture().addListener {
            IM_USER_CHANNEL.remove(userInfo.userId,ctx)
        }
    }
  
    fun userChannel(uId: String): ChannelHandlerContext? {
        return IM_USER_CHANNEL[uId]
    }
}
  • Map容器IM_USER_CHANNEL用来保存用户和其连接
  • 在方法imUserBind中定义了这个连接的关闭Listener,当连接关闭后会自动触发IM_USER_CHANNEL.remove(userInfo.userId,ctx)

可以思考下:通过 Listener 来处理和自己捕捉关闭事件来处理的优势是什么?

消息接收和发送

用户的连接建立了,通道绑定了,接下来就是本文的目的了消息发送

首先为了方便消息发送,我们定义一个工具类:

class WbUtil {
    companion object {
        fun sendMsg(ctx: ChannelHandlerContext, msgBody: String) {
            ctx.channel().writeAndFlush(TextWebSocketFrame(msgBody))
        }
    }
}

接下来就是聊天的消息处理了

@Component
class ChatProcessImpl : IMsgProcessor {

    @Autowired
    private lateinit var channelContextManager: ChannelContextManager

    override fun cmd(): String {
        return "110"
    }

    override fun process(ctx: ChannelHandlerContext, body: TransferInfo) {
        val uId = body.to
        channelContextManager.userChannel(uId)?.let {
            //填充一些其他参数
            fillBody(body)
            WbUtil.sendMsg(it, body.toJson())
        }
    }
}
  • channelContextManager 获取到目的用户to的通道
  • 通过该通道给用户发送消息

结语

真正的IM系统绝对不止这么简单,还有很多需要考虑到,比如:

  • 发送消息时候,目标用户不在线的处理
  • 单机负载有上限,如何利用多台服务器负载的问题
  • ....

更多的是细节问题,需要结合各自的业务,一个一个分析进行处理。所以我这里仅仅是给出了一个大大的框架模板,具体的细节和更多的内容,请结合自己的业务完善吧。

后续有空,将这部分代码整理后,开源出来。

骐骥一跃,不能十步。驽马十驾,功在不舍。