最近项目中,需要做一个基于UDP
的请求响应
这样的应答模型。
HTTP
本身就是支持请求-响应模型
的,但是TCP和UDP不是。
经过一番思考和讨论,提出了2个解决方案,这个地方做一个记录。我相信我这个方案应该不是最好的,都有各自的缺陷。如果你有更好的解决方案,还请不吝赐教。
JUC
的Lock
和Condition
Future
的get
该项目是基于Netty
的,以下会有Netty
的几个知识点:
SimpleChannelInboundHandler.channelRead0()
该方法会在Netty
接收到消息的时候被触发,希望从这个地方获取到消息ID,并唤醒阻塞的程序。channel.writeAndFlush(object)
该方法是将消息通过channel
发给出去。AgentContext
,用来保存channel
方便随时发送信息出去。EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
Channel channel = bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new AgentChannelInitializer(new TcpReceivedHandler()))
.connect(innerIp, innerPort)
.sync().channel();
//将channel保存,用于后续的消息发送
AgentContext.setChannel(channel);
}
CmdBo
public class CmdBo{
//唯一主键,建议通过uuid生成
private String id;
//请求体
private String request;
//响应内容
private String response;
}
方案一上下文有2个关联,用于存储关联信息:
id
同控制量
的关联ControlContext
id
同结果
的关联:ResultContext
其大体流程为:
ControlContext
中ResultMap
中获取。整体流程图如下所示:
Sendor
的部分的代码如下所示:
public CmdBo send(CmdBo cmdbo) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
String id = cmdBo.getId();
ControlContext.put(id, lock, condition);
lock.lock();
Channel channel = XXContenxt.CHANNEL;
channel.writeAndFlush(xxx);
//阻塞
condition.await();
// 被唤醒后,从结果上下文中根据id获取结果内容
CmdBo result = ResultContext.getResult(id);
return result;
}
catch (Exception e) {
e.printStackTrace();
}
return null;
}
receive
部分的代码如下所示:
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String body = packet.content().toString(CharsetUtil.UTF_8);
CmdBo bo = JSONUtil.toBean(body, CmdBo.class);
String id = bo.getId();
//将结果放在ResultContext的上下文对象中
ResultContext.put(id, messageBo);
//通过id获取控制对象
Tuple2<Lock, Condition> tuple2 = ControlContext.get(id);
Lock lock = tuple2.first;
Condition condition = tuple2.second;
try {
lock.lock();
//唤醒sendor阻塞的现成
condition.signal();
}
catch (Exception e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
使用方式:
Sendor sendor=new Sendor();
CmdBo result=sendor.send(xxx); //此处会一直阻塞到有结果返回。
上述使用到了:Lock
和Condition
2个知识点,不懂的可以自行百度,这里不展开讲。
上述使用还存在细节问题:
第二种方案比第一种方案好点,利用了Future
,其思路同第一个大致相似,不过通过CountDownlatch
来控制阻塞。
首先构建SyncFuture
对象,核心代码如下所示:
public class SyncFuture<T> implements Future<T> {
// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。
private CountDownLatch latch = new CountDownLatch(1);
// 需要响应线程设置的响应结果
private T response;
// 获取响应结果,直到有结果才返回。
@Override
public T get() throws InterruptedException {
latch.await();
return this.response;
}
// 获取响应结果,直到有结果或者超过指定时间就返回。
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException {
if (latch.await(timeout, unit)) {
return this.response;
}
return null;
}
// 用于设置响应结果,并且做countDown操作,通知请求线程
public void setResponse(T response) {
this.response = response;
latch.countDown();
}
}
Sendor
部分代码:
public SyncFuture<MessageBo> send(MessageBo message) {
Channel channel = XXContenxt.CHANNEL;
//创建future对象并保存
SyncFuture<MessageBo> future = new SyncFuture<>();
//id同future映射保存到上下文中
FutureContext.addFuture(message.getId(), future);
//发送
channel.writeAndFlush(xxx);
return future;
}
Receiver
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String body = packet.content().toString(CharsetUtil.UTF_8);
//System.out.println("客户端收到消息:" + body);
CmdBo bo = JSONUtil.toBean(body, CmdBo.class);
String id = bo.getId();
//获取future
SyncFuture future = FutureContext.getFuture(id);
//填充future
future.setResponse(messageBo);
}
使用方式
Sendor sendor=new Sendor();
SyncFuture<MessageBo> future = sendor.send(mock);
future.get();//该方法会一直阻塞
future.get(4,TimeUnit.Seconds);//阻塞一定的时间,推荐使用。
上述2种方式大同小异,都是在发送后阻塞,然后从获取的结果内容中进行线程唤醒。
个人感觉应该不属于最好的解决方案,如果你有更好的解决方案,请提出来,大家一起学习。
最后上一张图来概括下: