驽马十驾 驽马十驾

驽马十驾,功在不舍

目录
基于TCP/UDP的请求响应模型设计
/    

基于TCP/UDP的请求响应模型设计

开篇

最近项目中,需要做一个基于UDP请求响应这样的应答模型。

HTTP本身就是支持请求-响应模型的,但是TCP和UDP不是。

经过一番思考和讨论,提出了2个解决方案,这个地方做一个记录。我相信我这个方案应该不是最好的,都有各自的缺陷。如果你有更好的解决方案,还请不吝赐教。

  • 基于JUCLockCondition
  • 基于Futureget

背景

该项目是基于Netty的,以下会有Netty的几个知识点:

  1. SimpleChannelInboundHandler.channelRead0()该方法会在Netty接收到消息的时候被触发,希望从这个地方获取到消息ID,并唤醒阻塞的程序。
  2. channel.writeAndFlush(object)该方法是将消息通过channel发给出去。
  3. 项目启动类,请注意其中的AgentContext,用来保存channel方便随时发送信息出去。
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
	Channel channel = bootstrap.group(group).channel(NioSocketChannel.class)
	                    .option(ChannelOption.TCP_NODELAY, true)
	                    .handler(new AgentChannelInitializer(new TcpReceivedHandler()))
	                    .connect(innerIp, innerPort)
	                    .sync().channel();
	//将channel保存,用于后续的消息发送
	AgentContext.setChannel(channel);
}
  1. 发送的对象和接收的对象一致,以下为大概的模型CmdBo
public class CmdBo{
    //唯一主键,建议通过uuid生成
    private String id;
    //请求体
    private String request;
    //响应内容
    private String response;
}

方法一:通过lock和condition

方案一上下文有2个关联,用于存储关联信息:

  • 一个是id控制量的关联ControlContext
  • 另外一个是id结果的关联:ResultContext

其大体流程为:

  1. 发送消息时候,生成一个唯一的id.
  2. 发送后阻塞,并将id同控制变量进行关联到ControlContext
  3. 接收消息时候,有2个操作:一个是解析出消息中的id,并将id同结果绑定。二是获取id对应的控制变量,将阻塞放开,后续从结果ResultMap中获取。

整体流程图如下所示:

基于网络的请求和响应图

Sendor的部分的代码如下所示:

public CmdBo send(CmdBo cmdbo) {
	Lock lock = new ReentrantLock();
	Condition condition = lock.newCondition();
	try {
		String id = cmdBo.getId();
		ControlContext.put(id, lock, condition);
		lock.lock();
		Channel channel = XXContenxt.CHANNEL;
		channel.writeAndFlush(xxx);
		//阻塞 
		condition.await();
		// 被唤醒后,从结果上下文中根据id获取结果内容
		CmdBo result = ResultContext.getResult(id);
		return result;
	}
	catch (Exception e) {
		e.printStackTrace();
	}
	return null;
}

receive部分的代码如下所示:

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
	String body = packet.content().toString(CharsetUtil.UTF_8);
	CmdBo bo = JSONUtil.toBean(body, CmdBo.class);
	String id = bo.getId();
    //将结果放在ResultContext的上下文对象中
	ResultContext.put(id, messageBo);
    //通过id获取控制对象
	Tuple2<Lock, Condition> tuple2 = ControlContext.get(id);
	Lock lock = tuple2.first;
	Condition condition = tuple2.second;
	try {
		lock.lock();
        //唤醒sendor阻塞的现成
		condition.signal();
	}
	catch (Exception e) {
		e.printStackTrace();
	}
	finally {
		lock.unlock();
	}
}

使用方式:

Sendor sendor=new Sendor();
CmdBo result=sendor.send(xxx); //此处会一直阻塞到有结果返回。

上述使用到了:LockCondition2个知识点,不懂的可以自行百度,这里不展开讲。

上述使用还存在细节问题:

  1. 上下文的Map,使用完了需要清理。
  2. 超时问题,当前处理方式,会导致线程一直被阻塞住。

方法二:Future+CountdownLatch

第二种方案比第一种方案好点,利用了Future,其思路同第一个大致相似,不过通过CountDownlatch来控制阻塞。

首先构建SyncFuture对象,核心代码如下所示:

public class SyncFuture<T> implements Future<T> {

    // 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。
    private CountDownLatch latch = new CountDownLatch(1);
    
    // 需要响应线程设置的响应结果
    private T response;
    
    // 获取响应结果,直到有结果才返回。
    @Override
    public T get() throws InterruptedException {
        latch.await();
        return this.response;
    }

    // 获取响应结果,直到有结果或者超过指定时间就返回。
    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException {
        if (latch.await(timeout, unit)) {
            return this.response;
        }
        return null;
    }

    // 用于设置响应结果,并且做countDown操作,通知请求线程
    public void setResponse(T response) {
        this.response = response;
        latch.countDown();
    }
}

Sendor部分代码:

public SyncFuture<MessageBo> send(MessageBo message) {
	Channel channel = XXContenxt.CHANNEL;
	//创建future对象并保存
	SyncFuture<MessageBo> future = new SyncFuture<>();
    //id同future映射保存到上下文中
	FutureContext.addFuture(message.getId(), future);
	//发送
	channel.writeAndFlush(xxx);
	return future;
}

Receiver

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
	String body = packet.content().toString(CharsetUtil.UTF_8);
	//System.out.println("客户端收到消息:" + body);
	CmdBo bo = JSONUtil.toBean(body, CmdBo.class);
	String id = bo.getId();
    //获取future
	SyncFuture future = FutureContext.getFuture(id);
    //填充future
	future.setResponse(messageBo);
}

使用方式

Sendor sendor=new Sendor();
SyncFuture<MessageBo> future = sendor.send(mock);
future.get();//该方法会一直阻塞
future.get(4,TimeUnit.Seconds);//阻塞一定的时间,推荐使用。

总结

上述2种方式大同小异,都是在发送后阻塞,然后从获取的结果内容中进行线程唤醒。

个人感觉应该不属于最好的解决方案,如果你有更好的解决方案,请提出来,大家一起学习。

最后上一张图来概括下:
image.png