add:提供多通信协议能力

This commit is contained in:
LSF 2024-08-14 10:08:01 +08:00
parent ca640e2e2b
commit 73ecc0bf4f
41 changed files with 1138 additions and 932 deletions

View File

@ -11,6 +11,7 @@ CREATE TABLE iot_device (
temperature varchar(10) DEFAULT NULL COMMENT '温度',
status tinyint NOT NULL DEFAULT '1' COMMENT '状态0禁用1启用',
running_status int NOT NULL DEFAULT '0' COMMENT '运行状态0.离线状态 1.在线状态 2.正常待机 3.用户使用中 4.OTA升级中',
protocol_type varchar(20) NOT NULL DEFAULT 'MQTT' COMMENT '协议类型',
up_time datetime DEFAULT NULL COMMENT '上线时间',
down_time datetime DEFAULT NULL COMMENT '下线时间',
tenant_id bigint DEFAULT NULL COMMENT '租户ID',
@ -94,6 +95,17 @@ INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, r
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), '登出', 'SIGN_OFF', NULL, NULL, 3, NULL, 0, 1, 10000, now(), 10000, now());
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'OTA升级', 'OTA_UPGRADE', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now());
INSERT INTO sys_dict_type (dict_type,dict_name,remark,sort,tenant_id,version,deleted,creator,create_time,updater,update_time )VALUES('device_protocol_type', '设备协议类型', '设备协议类型', 0, 10000, 0, 0, 10000, now(), 10000, now() );
SET @typeId = @@identity;
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'MQTT', 'MQTT', NULL, NULL, 0, NULL, 0, 0, 10000, now(), 10000, now());
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'TCP', 'TCP', NULL, NULL, 1, NULL, 0, 0, 10000, now(), 10000, now());
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'UDP', 'UDP', NULL, NULL, 2, NULL, 0, 1, 10000, now(), 10000, now());
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'BLE', 'BLE', NULL, NULL, 3, NULL, 0, 1, 10000, now(), 10000, now());
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'CoAP', 'CoAP', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now());
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'LwM2M', 'LwM2M', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now());
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'Modbus', 'Modbus', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now());
INSERT INTO sys_dict_type (dict_type,dict_name,remark,sort,tenant_id,version,deleted,creator,create_time,updater,update_time )VALUES('device_property', '设备属性', '设备通用属性:运行状态|APP版本|电池电量百分比|温度', 0, 10000, 0, 0, 10000, now(), 10000, now() );
SET @typeId = @@identity;
INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), '运行状态', 'RUNNING_STATUS', NULL, NULL, 0, NULL, 0, 0, 10000, now(), 10000, now());

View File

@ -0,0 +1,28 @@
package net.maku.iot.communication;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.DeviceCommandEnum;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
/**
* 基础通信协议具备功能
*/
public interface BaseCommunication {
// 异步发送指令,不等待设备响应
String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload);
//同步发送指定等待设备响应
DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload);
//同步发送指定等待设备响应调试实现
DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload);
//模拟设备属性上报
void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload);
//模拟设备服务指令响应数据
void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload);
}

View File

@ -0,0 +1,36 @@
package net.maku.iot.communication;
import lombok.AllArgsConstructor;
import net.maku.framework.common.exception.ServerException;
import org.springframework.stereotype.Service;
/**
* @Description TODO
* @Author LSF
* @Date 2024/8/9 14:53
*/
@Service
@AllArgsConstructor
public class CommunicationServiceFactory {
private final MQTTService mqttService;
private final TCPService tcpService;
public BaseCommunication getProtocol(String protocolType) {
if (protocolType == null) {
new ServerException("协议不存在!");
}
switch (protocolType) {
case "MQTT":
return mqttService;
case "TCP":
return tcpService;
// case "Modbus":
// return tcpService;
default:
return null;
}
}
}

View File

@ -1,43 +1,40 @@
package net.maku.iot.mqtt.service;
package net.maku.iot.communication;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.exception.ServerException;
import net.maku.iot.communication.mqtt.MqttGateway;
import net.maku.iot.communication.mqtt.dto.CommandResponseChan;
import net.maku.iot.communication.mqtt.dto.DeviceCommandDTO;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.dto.DeviceClientDTO;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.DeviceCommandEnum;
import net.maku.iot.enums.DeviceServiceEnum;
import net.maku.iot.enums.DeviceTopicEnum;
import net.maku.iot.mqtt.MqttGateway;
import net.maku.iot.mqtt.dto.Chan;
import net.maku.iot.mqtt.dto.DeviceCommandDTO;
import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.service.IotDeviceServiceLogService;
import net.maku.iot.utils.MqttUtils;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Exchanger;
/**
* 设备命令发送服务
**/
* @Description TODO
* @Author LSF
* @Date 2024/8/9 14:21
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DeviceMqttService {
private final MqttUtils mqttUtils;
public class MQTTService implements BaseCommunication {
private final MqttGateway mqttGateway;
private final IotDeviceServiceLogService iotDeviceEventLogService;
/**
* 命令等待exchanger缓存key: command id
*/
private final ConcurrentMap<String, Exchanger<Object>> commandExchangers = new ConcurrentHashMap<>();
/**
* 异步发送命令返回命令id
*
* @param device
@ -45,11 +42,11 @@ public class DeviceMqttService {
* @param payload
* @return
*/
@Override
public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) {
return asyncSendCommand(device, command, payload, Boolean.FALSE);
}
/**
* 异步发送命令返回命令id
*
@ -81,7 +78,6 @@ public class DeviceMqttService {
return commandId;
}
/**
* 同步发送命令并返回响应结果
*
@ -106,16 +102,14 @@ public class DeviceMqttService {
public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) {
// 构建并发送命令
String commandId = asyncSendCommand(device, command, payload, retained);
// 等待返回结果 超时3秒(可控)
Object receiver = Chan.getInstance(commandId, true).get(commandId, 3 * 1000L);
// 等待返回结果
Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId);
if (receiver == null) {
log.error("Failed to receive the message. {}", device.getName());
throw new ServerException(StrUtil.format("{}设备没有回复", device.getName()));
}
return (DeviceCommandResponseDTO) receiver;
}
/**
* 发送命令并返回响应结果模拟设备响应
*
@ -146,50 +140,18 @@ public class DeviceMqttService {
}
}).start();
// 等待设备响应
return waitCommandResponse(command, commandId);
}
/**
* 订阅设备命令响应主题并等待获取返回结果
*
* @param command
* @param commandId
* @return
*/
private DeviceCommandResponseDTO waitCommandResponse(DeviceCommandEnum command, String commandId) {
// 创建命令响应等待exchanger
Exchanger<Object> commandExchanger = new Exchanger<>();
commandExchangers.put(commandId, commandExchanger);
try {
Object result = commandExchanger.exchange("", 10, TimeUnit.SECONDS);
return (DeviceCommandResponseDTO) result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServerException(StrUtil.format("{} <{}>,{} 命令中断",
DeviceServiceEnum.COMMAND_ID.getValue(), commandId, command.getTitle()), e);
} catch (TimeoutException e) {
throw new ServerException(StrUtil.format("{} <{}>,{} 命令超时",
DeviceServiceEnum.COMMAND_ID.getValue(), commandId, command.getTitle()), e);
} finally {
// 移除命令响应等待exchanger
commandExchangers.remove(commandId);
// 等待返回结果
Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId);
if (receiver == null) {
throw new ServerException(StrUtil.format("{}设备没有回复", device.getName()));
}
return (DeviceCommandResponseDTO) receiver;
}
/**
* 设备命令响应处理
*
* @param topic
* @param commandResponse
* 模拟设备属性上报
*/
public void commandReplied(String topic, DeviceCommandResponseDTO commandResponse) {
Chan chan = Chan.getInstance(commandResponse.getCommandId(), false);
chan.put(commandResponse);
}
@Override
public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) {
// 封装 设备属性上报的 topic
String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device));
@ -202,6 +164,14 @@ public class DeviceMqttService {
}
}
/**
* 模拟设备服务指令响应数据
*
* @param device
* @param payload
*/
@Override
public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) {
// 封装 设备命令执行结果的 topic
String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device));
@ -214,4 +184,15 @@ public class DeviceMqttService {
}
}
/**
* 设备命令响应处理把设备响应结果放入通道中
*
* @param commandResponse 设备命令响应
*/
public void commandReplied(DeviceCommandResponseDTO commandResponse) {
CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false);
commandResponseChan.put(commandResponse);
}
}

View File

@ -0,0 +1,45 @@
package net.maku.iot.communication;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.DeviceCommandEnum;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import org.springframework.stereotype.Component;
/**
* @Description TODO
* @Author LSF
* @Date 2024/8/9 14:21
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TCPService implements BaseCommunication {
@Override
public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) {
return "";
}
@Override
public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) {
return null;
}
@Override
public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) {
return null;
}
@Override
public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) {
return;
}
@Override
public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) {
return;
}
}

View File

@ -1,7 +1,7 @@
package net.maku.iot.mqtt;
package net.maku.iot.communication.mqtt;
import jakarta.annotation.Resource;
import net.maku.iot.mqtt.config.MqttConfig;
import net.maku.iot.communication.mqtt.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;

View File

@ -1,11 +1,11 @@
package net.maku.iot.mqtt.config;
package net.maku.iot.communication.mqtt.config;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import net.maku.iot.communication.mqtt.factory.MqttMessageHandlerFactory;
import net.maku.iot.enums.DeviceTopicEnum;
import net.maku.iot.mqtt.factory.MqttMessageHandlerFactory;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;

View File

@ -1,4 +1,4 @@
package net.maku.iot.mqtt.dto;
package net.maku.iot.communication.mqtt.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

View File

@ -0,0 +1,96 @@
package net.maku.iot.communication.mqtt.dto;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 数据生产消费者通道
*/
@Slf4j
public class CommandResponseChan {
// 存储通道的 ConcurrentHashMap
private static final ConcurrentHashMap<String, CommandResponseChan> CHANNEL = new ConcurrentHashMap<>();
private final CompletableFuture<BaseCommandResponse> future = new CompletableFuture<>();
private final Long DEFAULT_WAIT_MILLISECONDS = 5 * 1000L;
// 私有构造函数不允许外部直接实例化
private CommandResponseChan() {
}
/**
* 获取或创建通道实例
*
* @param commandId 通道标识
* @param isNeedCreate 是否需要创建新的通道实例
* @return 通道实例
*/
public static CommandResponseChan getInstance(String commandId, boolean isNeedCreate) {
if (!isNeedCreate) {
return CHANNEL.get(commandId);
}
return CHANNEL.computeIfAbsent(commandId, k -> new CommandResponseChan());
}
/**
* 从通道中获取数据默认超时时间为 5
*
* @param commandId 通道标识
* @return 获取的数据如果超时返回 null
*/
public BaseCommandResponse get(String commandId) {
return get(commandId, DEFAULT_WAIT_MILLISECONDS);
}
/**
* 从通道中获取数据支持超时设置
*
* @param commandId 通道标识
* @param timeout 超时时间毫秒
* @return 获取的数据如果超时返回 null
*/
public BaseCommandResponse get(String commandId, long timeout) {
CommandResponseChan channel = CHANNEL.get(commandId);
if (Objects.isNull(channel)) {
return null;
}
try {
return channel.future.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// 超时异常处理
log.error("Device response timeout. {}", commandId);
return null;
} catch (Exception e) {
// 其他异常处理
e.printStackTrace();
return null;
} finally {
// 确保在获取数据后移除通道
CHANNEL.remove(commandId, channel);
}
}
/**
* 向通道中放入数据并唤醒可能正在等待数据的线程
*
* @param response 要放入的数据
*/
public void put(BaseCommandResponse response) {
String commandId = response.getCommandId();
if (commandId == null) {
return;
}
CommandResponseChan channel = CHANNEL.get(commandId);
if (Objects.isNull(channel)) {
return;
}
channel.future.complete(response);
}
}

View File

@ -1,4 +1,4 @@
package net.maku.iot.mqtt.dto;
package net.maku.iot.communication.mqtt.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

View File

@ -1,4 +1,4 @@
package net.maku.iot.mqtt.dto;
package net.maku.iot.communication.mqtt.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.swagger.v3.oas.annotations.media.Schema;

View File

@ -1,4 +1,4 @@
package net.maku.iot.mqtt.dto;
package net.maku.iot.communication.mqtt.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

View File

@ -1,7 +1,7 @@
package net.maku.iot.mqtt.factory;
package net.maku.iot.communication.mqtt.factory;
import lombok.RequiredArgsConstructor;
import net.maku.iot.mqtt.handler.DeviceCommandResponseHandler;
import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

View File

@ -1,7 +1,7 @@
package net.maku.iot.mqtt.factory;
package net.maku.iot.communication.mqtt.factory;
import lombok.RequiredArgsConstructor;
import net.maku.iot.mqtt.handler.DevicePropertyChangeHandler;
import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

View File

@ -1,7 +1,7 @@
package net.maku.iot.mqtt.factory;
package net.maku.iot.communication.mqtt.factory;
import lombok.RequiredArgsConstructor;
import net.maku.iot.mqtt.handler.MqttMessageHandler;
import net.maku.iot.communication.mqtt.handler.MqttMessageHandler;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

View File

@ -1,7 +1,7 @@
package net.maku.iot.mqtt.handler;
package net.maku.iot.communication.mqtt.handler;
import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
/**
* 设备命令响应处理器

View File

@ -1,13 +1,13 @@
package net.maku.iot.mqtt.handler;
package net.maku.iot.communication.mqtt.handler;
import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.MQTTService;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory;
import net.maku.iot.enums.DeviceTopicEnum;
import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.mqtt.factory.DeviceCommandResponseHandlerFactory;
import net.maku.iot.mqtt.service.DeviceMqttService;
import org.springframework.stereotype.Component;
import java.util.Optional;
@ -21,10 +21,10 @@ import java.util.Optional;
@Component
@RequiredArgsConstructor
public class DeviceCommandResponseMqttMessageHandler implements MqttMessageHandler {
private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory;
private final DeviceMqttService deviceMqttService;
private final MQTTService deviceMqttService;
@Override
public boolean supports(String topic) {
@ -43,7 +43,7 @@ public class DeviceCommandResponseMqttMessageHandler implements MqttMessageHandl
.ifPresent(responseDTO -> {
// 调用设备命令执行器的命令响应处理逻辑
try {
deviceMqttService.commandReplied(topic, responseDTO);
deviceMqttService.commandReplied( responseDTO);
} catch (Exception e) {
log.error(StrUtil.format("调用设备命令执行器响应处理方法出错topic:{}, message:{}", topic, message), e);
}

View File

@ -1,7 +1,7 @@
package net.maku.iot.mqtt.handler;
package net.maku.iot.communication.mqtt.handler;
import net.maku.iot.mqtt.dto.DevicePropertyDTO;
import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO;
/**
* 设备属性变化处理器

View File

@ -1,12 +1,12 @@
package net.maku.iot.mqtt.handler;
package net.maku.iot.communication.mqtt.handler;
import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO;
import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory;
import net.maku.iot.enums.DeviceTopicEnum;
import net.maku.iot.mqtt.dto.DevicePropertyDTO;
import net.maku.iot.mqtt.factory.DevicePropertyChangeHandlerFactory;
import org.springframework.stereotype.Component;
import java.util.Optional;

View File

@ -1,4 +1,4 @@
package net.maku.iot.mqtt.handler;
package net.maku.iot.communication.mqtt.handler;
/**
* MQTT订阅消息处理接口

View File

@ -33,6 +33,11 @@ public class IotDeviceEntity extends BaseEntity {
private Integer type;
/**
* 设备和服务器通信协议类型
*/
private String protocolType;
/**
* 唯一标识码
*/
private String uid;

View File

@ -0,0 +1,28 @@
package net.maku.iot.enums;
import cn.hutool.core.util.StrUtil;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
/**
* IOT常用的通信协议
*
* @author LSF maku_lsf@163.com
*/
@Getter
@RequiredArgsConstructor
public enum IOTProtocolEnum {
MQTT("MQTT"),
TCP("TCP"),
UDP("UDP"),
BLE("BLE"),
CoAP("CoAP"),
LwM2M("LwM2M"),
Modbus("Modbus");
private final String value;
}

View File

@ -1,74 +0,0 @@
package net.maku.iot.mqtt.dto;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* 数据生产消费者
*/
public class Chan {
// 存储通道的 ConcurrentHashMap
private static final ConcurrentHashMap<String, Chan> CHANNEL = new ConcurrentHashMap<>();
private final CompletableFuture<Object> future = new CompletableFuture<>();
// 私有构造函数不允许外部直接实例化
private Chan() {
}
/**
* 获取或创建通道实例
*
* @param commandId 通道标识
* @param isNeedCreate 是否需要创建新的通道实例
* @return 通道实例
*/
public static Chan getInstance(String commandId, boolean isNeedCreate) {
if (!isNeedCreate) {
return CHANNEL.get(commandId);
}
return CHANNEL.computeIfAbsent(commandId, k -> new Chan());
}
/**
* 从通道中获取数据支持超时设置
*
* @param commandId 通道标识
* @param timeout 超时时间毫秒
* @return 获取的数据如果超时返回 null
*/
public Object get(String commandId, long timeout) {
Chan chan = CHANNEL.get(commandId);
if (Objects.isNull(chan)) {
return null;
}
try {
return chan.future.get(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
return null;
} finally {
CHANNEL.remove(commandId, chan);
}
}
/**
* 向通道中放入数据并唤醒可能正在等待数据的线程
*
* @param response 要放入的数据
*/
public void put(BaseCommandResponse response) {
String commandId = response.getCommandId();
if (commandId == null) {
return;
}
Chan chan = CHANNEL.get(commandId);
if (Objects.isNull(chan)) {
return;
}
chan.future.complete(response);
}
}

View File

@ -3,8 +3,9 @@ package net.maku.iot.service;
import net.maku.framework.common.utils.PageResult;
import net.maku.framework.mybatis.service.BaseService;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.query.IotDeviceQuery;
import net.maku.iot.communication.BaseCommunication;
import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO;
import net.maku.iot.vo.DeviceCommandVO;
import net.maku.iot.vo.DeviceReportAttributeDataVO;
@ -28,6 +29,27 @@ public interface IotDeviceService extends BaseService<IotDeviceEntity> {
void delete(List<Long> idList);
/**
* 根据设备的协议类型获取发送服务
* @param device 设备
* @return
*/
BaseCommunication getSendService(IotDeviceEntity device);
/**
* 根据协议类型获取发送服务
* @param protocolType
* @return
*/
BaseCommunication getSendService(String protocolType);
/**
* 根据设备ID获取发送服务
* @param deviceId
* @return
*/
BaseCommunication getSendService(Long deviceId);
/**
* 对设备下发指令-同步响应模式
*
* @param vo

View File

@ -16,12 +16,13 @@ import net.maku.iot.convert.IotDeviceConvert;
import net.maku.iot.dao.IotDeviceDao;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.*;
import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.mqtt.dto.DevicePropertyDTO;
import net.maku.iot.mqtt.handler.DeviceCommandResponseHandler;
import net.maku.iot.mqtt.handler.DevicePropertyChangeHandler;
import net.maku.iot.mqtt.service.DeviceMqttService;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO;
import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler;
import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler;
import net.maku.iot.query.IotDeviceQuery;
import net.maku.iot.communication.BaseCommunication;
import net.maku.iot.communication.CommunicationServiceFactory;
import net.maku.iot.service.IotDeviceEventLogService;
import net.maku.iot.service.IotDeviceService;
import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO;
@ -35,7 +36,7 @@ import java.time.LocalDateTime;
import java.util.List;
/**
* 设备
* 设备服务类
*
* @author LSF maku_lsf@163.com
*/
@ -45,8 +46,7 @@ import java.util.List;
public class IotDeviceServiceImpl extends BaseServiceImpl<IotDeviceDao, IotDeviceEntity>
implements IotDeviceService, DevicePropertyChangeHandler, DeviceCommandResponseHandler {
//todo 后续版本更改为根据物模型自动选择不同的通信层Service
private final DeviceMqttService mqttService;
private final CommunicationServiceFactory communicationService;
private final IotDeviceEventLogService deviceEventLogService;
@Override
@ -85,6 +85,28 @@ public class IotDeviceServiceImpl extends BaseServiceImpl<IotDeviceDao, IotDevic
}
@Override
public BaseCommunication getSendService(IotDeviceEntity device) {
if (device != null) {
return getSendService(device.getProtocolType());
}
return null;
}
@Override
public BaseCommunication getSendService(String protocolType) {
return communicationService.getProtocol(protocolType);
}
@Override
public BaseCommunication getSendService(Long deviceId) {
IotDeviceEntity device = getById(deviceId);
if (device != null) {
return getSendService(device.getProtocolType());
}
return null;
}
@Override
@Transactional(rollbackFor = Exception.class)
public DeviceCommandResponseDTO syncSendCommand(DeviceCommandVO vo) {
@ -92,9 +114,8 @@ public class IotDeviceServiceImpl extends BaseServiceImpl<IotDeviceDao, IotDevic
Assert.notNull(device, "未注册的设备:{}", vo.getDeviceId());
DeviceCommandEnum commandEnum = DeviceCommandEnum.parse(vo.getCommand());
try {
return mqttService.syncSendCommand(getById(vo.getDeviceId()), commandEnum, vo.getPayload());
return getSendService(device).syncSendCommand(getById(vo.getDeviceId()), commandEnum, vo.getPayload());
} catch (ServerException e) {
if (DeviceCommandEnum.parse(vo.getCommand()).getEventType() != null
&& StrUtil.contains(e.getMessage(), DeviceServiceEnum.COMMAND_ID.getValue())) {
@ -112,23 +133,26 @@ public class IotDeviceServiceImpl extends BaseServiceImpl<IotDeviceDao, IotDevic
public DeviceCommandResponseDTO syncSendCommandDebug(DeviceCommandVO vo) {
IotDeviceEntity device = getById(vo.getDeviceId());
DeviceCommandEnum commandEnum = DeviceCommandEnum.parse(vo.getCommand());
return mqttService.syncSendCommandDebug(device, commandEnum, vo.getPayload());
return getSendService(device).syncSendCommandDebug(device, commandEnum, vo.getPayload());
}
@Override
@Transactional(rollbackFor = Exception.class)
public void asyncSendCommand(DeviceCommandVO vo) {
mqttService.asyncSendCommand(getById(vo.getDeviceId()), DeviceCommandEnum.parse(vo.getCommand()), vo.getPayload());
IotDeviceEntity device = getById(vo.getDeviceId());
getSendService(device).asyncSendCommand(device, DeviceCommandEnum.parse(vo.getCommand()), vo.getPayload());
}
@Override
public void simulateDeviceReportAttributeData(DeviceReportAttributeDataVO vo) {
mqttService.simulateDeviceReportAttributeData(getById(vo.getDeviceId()), JSONUtil.toJsonStr(vo));
IotDeviceEntity device = getById(vo.getDeviceId());
getSendService(device).simulateDeviceReportAttributeData(device, JSONUtil.toJsonStr(vo));
}
@Override
public void simulateDeviceCommandResponseAttributeData(DeviceCommandResponseAttributeDataVO vo) {
mqttService.simulateDeviceCommandResponseAttributeData(getById(vo.getDeviceId()), JSONUtil.toJsonStr(vo));
IotDeviceEntity device = getById(vo.getDeviceId());
getSendService(device).simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(vo));
}
/**

View File

@ -30,6 +30,9 @@ public class IotDeviceVO implements Serializable {
@Schema(description = "设备类型1.手持设备2.柜体3传感设备")
private Integer type;
@Schema(description = "协议类型")
private String protocolType;
@Schema(description = "唯一标识码")
private String uid;