fix 用通道替换线程数据传递
This commit is contained in:
parent
219cc12815
commit
6a2a656947
|
@ -0,0 +1,18 @@
|
|||
package net.maku.iot.mqtt.dto;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 响应消息基础类
|
||||
*
|
||||
* @author eden on 2024/6/17
|
||||
*/
|
||||
@Data
|
||||
public class BaseCommandResponse {
|
||||
/**
|
||||
* 命令ID
|
||||
*/
|
||||
@Schema(description = "命令ID", required = true)
|
||||
protected String commandId;
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package net.maku.iot.mqtt.dto;
|
||||
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
/**
|
||||
* 数据生产消费者
|
||||
*/
|
||||
public class Chan {
|
||||
|
||||
// 存储通道的 ConcurrentHashMap
|
||||
private static final ConcurrentHashMap<String, Chan> CHANNEL = new ConcurrentHashMap<>();
|
||||
|
||||
// 超时时间单位,用于将毫秒转换为纳秒
|
||||
private static final int UNIT = 1000_000;
|
||||
|
||||
// 存储数据的变量
|
||||
private volatile Object data;
|
||||
|
||||
// 当前线程的变量
|
||||
private volatile Thread t;
|
||||
|
||||
// 私有构造函数,不允许外部直接实例化
|
||||
private Chan() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取或创建通道实例
|
||||
*
|
||||
* @param commandId 通道标识
|
||||
* @param isNeedCreate 是否需要创建新的通道实例
|
||||
* @return 通道实例
|
||||
*/
|
||||
public static Chan getInstance(String commandId, boolean isNeedCreate) {
|
||||
if (!isNeedCreate) {
|
||||
return CHANNEL.get(commandId);
|
||||
}
|
||||
Chan chan = new Chan();
|
||||
CHANNEL.put(commandId, chan);
|
||||
return chan;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从通道中获取数据,支持超时设置
|
||||
*
|
||||
* @param commandId 通道标识
|
||||
* @param timeout 超时时间(毫秒)
|
||||
* @return 获取的数据,如果超时返回 null
|
||||
*/
|
||||
public Object get(String commandId, long timeout) {
|
||||
Chan chan = CHANNEL.get(commandId);
|
||||
if (Objects.isNull(chan)) {
|
||||
return null;
|
||||
}
|
||||
chan.t = Thread.currentThread();
|
||||
LockSupport.parkNanos(chan.t, timeout * UNIT);
|
||||
chan.t = null;
|
||||
CHANNEL.remove(commandId);
|
||||
return chan.data;
|
||||
}
|
||||
|
||||
/**
|
||||
* 向通道中放入数据,并唤醒可能正在等待数据的线程
|
||||
*
|
||||
* @param response 要放入的数据
|
||||
*/
|
||||
public void put(BaseCommandResponse response) {
|
||||
Chan chan = CHANNEL.get(response.getCommandId());
|
||||
if (Objects.isNull(chan)) {
|
||||
return;
|
||||
}
|
||||
chan.data = response;
|
||||
if (chan.t == null) {
|
||||
return;
|
||||
}
|
||||
LockSupport.unpark(chan.t);
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package net.maku.iot.mqtt.dto;
|
|||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import net.maku.iot.enums.DeviceCommandEnum;
|
||||
|
||||
/**
|
||||
|
@ -10,10 +11,11 @@ import net.maku.iot.enums.DeviceCommandEnum;
|
|||
*
|
||||
* @author LSF maku_lsf@163.com
|
||||
*/
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
@Schema(description = "设备命令响应DTO")
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class DeviceCommandResponseDTO {
|
||||
public class DeviceCommandResponseDTO extends BaseCommandResponse {
|
||||
/**
|
||||
* 命令类型
|
||||
*/
|
||||
|
@ -21,12 +23,6 @@ public class DeviceCommandResponseDTO {
|
|||
private DeviceCommandEnum command;
|
||||
|
||||
/**
|
||||
* 命令ID
|
||||
*/
|
||||
@Schema(description = "命令ID", required = true)
|
||||
private String commandId;
|
||||
|
||||
/**
|
||||
* 命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应)
|
||||
*/
|
||||
@Schema(description = "命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应)")
|
||||
|
|
|
@ -11,6 +11,7 @@ import net.maku.iot.enums.DeviceCommandEnum;
|
|||
import net.maku.iot.enums.DeviceServiceEnum;
|
||||
import net.maku.iot.enums.DeviceTopicEnum;
|
||||
import net.maku.iot.mqtt.MqttGateway;
|
||||
import net.maku.iot.mqtt.dto.Chan;
|
||||
import net.maku.iot.mqtt.dto.DeviceCommandDTO;
|
||||
import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO;
|
||||
import net.maku.iot.service.IotDeviceServiceLogService;
|
||||
|
@ -105,8 +106,13 @@ public class DeviceMqttService {
|
|||
public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) {
|
||||
// 构建并发送命令
|
||||
String commandId = asyncSendCommand(device, command, payload, retained);
|
||||
// 等待返回结果
|
||||
return waitCommandResponse(command, commandId);
|
||||
// 等待返回结果 超时3秒(可控)
|
||||
Object receiver = Chan.getInstance(commandId, true).get(commandId, 3 * 1000L);
|
||||
if (receiver == null) {
|
||||
log.error("Failed to receive the message. {}", device.getName());
|
||||
throw new ServerException(StrUtil.format("{}设备没有回复", device.getName()));
|
||||
}
|
||||
return (DeviceCommandResponseDTO) receiver;
|
||||
}
|
||||
|
||||
|
||||
|
@ -180,24 +186,8 @@ public class DeviceMqttService {
|
|||
* @param commandResponse
|
||||
*/
|
||||
public void commandReplied(String topic, DeviceCommandResponseDTO commandResponse) {
|
||||
Exchanger<Object> exchanger = commandExchangers.remove(commandResponse.getCommandId());
|
||||
if (exchanger != null) {
|
||||
if (commandResponse.isCompleted()) {
|
||||
try {
|
||||
// 将响应交换到等待线程
|
||||
exchanger.exchange(commandResponse, 15, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.error("将主题'{}'的命令响应交换到等待线程失败,中断异常: {}", topic, e.getMessage());
|
||||
} catch (TimeoutException e) {
|
||||
log.error("将主题'{}'的命令响应交换到等待线程失败,超时异常: {}", topic, e.getMessage());
|
||||
}
|
||||
} else {
|
||||
log.warn("命令ID为'{}'的响应未完成,未通知等待线程", commandResponse.getCommandId());
|
||||
}
|
||||
} else {
|
||||
log.warn("找不到命令ID为'{}'的响应交换器", commandResponse.getCommandId());
|
||||
}
|
||||
Chan chan = Chan.getInstance(commandResponse.getCommandId(), false);
|
||||
chan.put(commandResponse);
|
||||
}
|
||||
|
||||
public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) {
|
||||
|
|
Loading…
Reference in New Issue
Block a user