fix 使用CompletableFuture代替锁

This commit is contained in:
eden 2024-07-31 14:31:39 +08:00
parent 6a2a656947
commit ca640e2e2b

View File

@ -2,8 +2,9 @@ package net.maku.iot.mqtt.dto;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.TimeUnit;
/** /**
* 数据生产消费者 * 数据生产消费者
@ -13,18 +14,10 @@ public class Chan {
// 存储通道的 ConcurrentHashMap // 存储通道的 ConcurrentHashMap
private static final ConcurrentHashMap<String, Chan> CHANNEL = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, Chan> CHANNEL = new ConcurrentHashMap<>();
// 超时时间单位用于将毫秒转换为纳秒 private final CompletableFuture<Object> future = new CompletableFuture<>();
private static final int UNIT = 1000_000;
// 存储数据的变量
private volatile Object data;
// 当前线程的变量
private volatile Thread t;
// 私有构造函数不允许外部直接实例化 // 私有构造函数不允许外部直接实例化
private Chan() { private Chan() {
} }
/** /**
@ -38,9 +31,7 @@ public class Chan {
if (!isNeedCreate) { if (!isNeedCreate) {
return CHANNEL.get(commandId); return CHANNEL.get(commandId);
} }
Chan chan = new Chan(); return CHANNEL.computeIfAbsent(commandId, k -> new Chan());
CHANNEL.put(commandId, chan);
return chan;
} }
/** /**
@ -55,11 +46,13 @@ public class Chan {
if (Objects.isNull(chan)) { if (Objects.isNull(chan)) {
return null; return null;
} }
chan.t = Thread.currentThread(); try {
LockSupport.parkNanos(chan.t, timeout * UNIT); return chan.future.get(timeout, TimeUnit.MILLISECONDS);
chan.t = null; } catch (Exception e) {
CHANNEL.remove(commandId); return null;
return chan.data; } finally {
CHANNEL.remove(commandId, chan);
}
} }
/** /**
@ -68,14 +61,14 @@ public class Chan {
* @param response 要放入的数据 * @param response 要放入的数据
*/ */
public void put(BaseCommandResponse response) { public void put(BaseCommandResponse response) {
Chan chan = CHANNEL.get(response.getCommandId()); String commandId = response.getCommandId();
if (commandId == null) {
return;
}
Chan chan = CHANNEL.get(commandId);
if (Objects.isNull(chan)) { if (Objects.isNull(chan)) {
return; return;
} }
chan.data = response; chan.future.complete(response);
if (chan.t == null) {
return;
}
LockSupport.unpark(chan.t);
} }
} }