diff --git a/db/mysql/module/maku-module-iot.sql b/db/mysql/module/maku-module-iot.sql index 7d307ed..8bfd5f0 100644 --- a/db/mysql/module/maku-module-iot.sql +++ b/db/mysql/module/maku-module-iot.sql @@ -11,6 +11,7 @@ CREATE TABLE iot_device ( temperature varchar(10) DEFAULT NULL COMMENT '温度', status tinyint NOT NULL DEFAULT '1' COMMENT '状态,0禁用,1启用', running_status int NOT NULL DEFAULT '0' COMMENT '运行状态,0.离线状态 1.在线状态 2.正常待机 3.用户使用中 4.OTA升级中', + protocol_type varchar(20) NOT NULL DEFAULT 'MQTT' COMMENT '协议类型', up_time datetime DEFAULT NULL COMMENT '上线时间', down_time datetime DEFAULT NULL COMMENT '下线时间', tenant_id bigint DEFAULT NULL COMMENT '租户ID', @@ -94,6 +95,17 @@ INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, r INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), '登出', 'SIGN_OFF', NULL, NULL, 3, NULL, 0, 1, 10000, now(), 10000, now()); INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'OTA升级', 'OTA_UPGRADE', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_type (dict_type,dict_name,remark,sort,tenant_id,version,deleted,creator,create_time,updater,update_time )VALUES('device_protocol_type', '设备协议类型', '设备协议类型', 0, 10000, 0, 0, 10000, now(), 10000, now() ); +SET @typeId = @@identity; +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'MQTT', 'MQTT', NULL, NULL, 0, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'TCP', 'TCP', NULL, NULL, 1, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'UDP', 'UDP', NULL, NULL, 2, NULL, 0, 1, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'BLE', 'BLE', NULL, NULL, 3, NULL, 0, 1, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'CoAP', 'CoAP', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'LwM2M', 'LwM2M', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'Modbus', 'Modbus', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); + + INSERT INTO sys_dict_type (dict_type,dict_name,remark,sort,tenant_id,version,deleted,creator,create_time,updater,update_time )VALUES('device_property', '设备属性', '设备通用属性:运行状态|APP版本|电池电量百分比|温度', 0, 10000, 0, 0, 10000, now(), 10000, now() ); SET @typeId = @@identity; INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), '运行状态', 'RUNNING_STATUS', NULL, NULL, 0, NULL, 0, 0, 10000, now(), 10000, now()); diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java new file mode 100644 index 0000000..ef40e9f --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java @@ -0,0 +1,28 @@ +package net.maku.iot.communication; + +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; + +/** + * 基础通信协议具备功能 + */ +public interface BaseCommunication { + + // 异步发送指令,不等待设备响应 + String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //同步发送指定,等待设备响应 + DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //同步发送指定,等待设备响应,调试实现 + DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //模拟设备属性上报 + void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload); + + //模拟设备服务指令响应数据 + void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload); + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java new file mode 100644 index 0000000..c4535e1 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java @@ -0,0 +1,36 @@ +package net.maku.iot.communication; + +import lombok.AllArgsConstructor; +import net.maku.framework.common.exception.ServerException; +import org.springframework.stereotype.Service; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:53 + */ +@Service +@AllArgsConstructor +public class CommunicationServiceFactory { + + private final MQTTService mqttService; + private final TCPService tcpService; + + public BaseCommunication getProtocol(String protocolType) { + if (protocolType == null) { + new ServerException("协议不存在!"); + } + switch (protocolType) { + case "MQTT": + return mqttService; + case "TCP": + return tcpService; +// case "Modbus": +// return tcpService; + default: + return null; + } + } + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java similarity index 71% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java index 0538be0..462dc64 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java @@ -1,43 +1,40 @@ -package net.maku.iot.mqtt.service; +package net.maku.iot.communication; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.exception.ServerException; +import net.maku.iot.communication.mqtt.MqttGateway; +import net.maku.iot.communication.mqtt.dto.CommandResponseChan; +import net.maku.iot.communication.mqtt.dto.DeviceCommandDTO; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; import net.maku.iot.dto.DeviceClientDTO; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.DeviceCommandEnum; -import net.maku.iot.enums.DeviceServiceEnum; import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.MqttGateway; -import net.maku.iot.mqtt.dto.Chan; -import net.maku.iot.mqtt.dto.DeviceCommandDTO; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; import net.maku.iot.service.IotDeviceServiceLogService; -import net.maku.iot.utils.MqttUtils; import org.springframework.stereotype.Component; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Exchanger; /** - * 设备命令发送服务 - **/ + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:21 + */ @Slf4j @Component @RequiredArgsConstructor -public class DeviceMqttService { - private final MqttUtils mqttUtils; +public class MQTTService implements BaseCommunication { + private final MqttGateway mqttGateway; private final IotDeviceServiceLogService iotDeviceEventLogService; /** - * 命令等待exchanger缓存,key: command id - */ - private final ConcurrentMap> commandExchangers = new ConcurrentHashMap<>(); - - /** * 异步发送命令,返回命令id * * @param device @@ -45,11 +42,11 @@ public class DeviceMqttService { * @param payload * @return */ + @Override public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { return asyncSendCommand(device, command, payload, Boolean.FALSE); } - /** * 异步发送命令,返回命令id * @@ -81,7 +78,6 @@ public class DeviceMqttService { return commandId; } - /** * 同步发送命令并返回响应结果 * @@ -106,16 +102,14 @@ public class DeviceMqttService { public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { // 构建并发送命令 String commandId = asyncSendCommand(device, command, payload, retained); - // 等待返回结果 超时3秒(可控) - Object receiver = Chan.getInstance(commandId, true).get(commandId, 3 * 1000L); + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); if (receiver == null) { - log.error("Failed to receive the message. {}", device.getName()); throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); } return (DeviceCommandResponseDTO) receiver; } - /** * 发送命令并返回响应结果,模拟设备响应 * @@ -146,50 +140,18 @@ public class DeviceMqttService { } }).start(); - // 等待设备响应 - return waitCommandResponse(command, commandId); - } - - - /** - * 订阅设备命令响应主题并等待获取返回结果 - * - * @param command - * @param commandId - * @return - */ - private DeviceCommandResponseDTO waitCommandResponse(DeviceCommandEnum command, String commandId) { - // 创建命令响应等待exchanger - Exchanger commandExchanger = new Exchanger<>(); - commandExchangers.put(commandId, commandExchanger); - - try { - Object result = commandExchanger.exchange("", 10, TimeUnit.SECONDS); - return (DeviceCommandResponseDTO) result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServerException(StrUtil.format("{} <{}>,{} 命令中断", - DeviceServiceEnum.COMMAND_ID.getValue(), commandId, command.getTitle()), e); - } catch (TimeoutException e) { - throw new ServerException(StrUtil.format("{} <{}>,{} 命令超时", - DeviceServiceEnum.COMMAND_ID.getValue(), commandId, command.getTitle()), e); - } finally { - // 移除命令响应等待exchanger - commandExchangers.remove(commandId); + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); } + return (DeviceCommandResponseDTO) receiver; } /** - * 设备命令响应处理 - * - * @param topic - * @param commandResponse + * 模拟设备属性上报 */ - public void commandReplied(String topic, DeviceCommandResponseDTO commandResponse) { - Chan chan = Chan.getInstance(commandResponse.getCommandId(), false); - chan.put(commandResponse); - } - + @Override public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { // 封装 设备属性上报的 topic String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); @@ -202,6 +164,14 @@ public class DeviceMqttService { } } + + /** + * 模拟设备服务指令响应数据 + * + * @param device + * @param payload + */ + @Override public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { // 封装 设备命令执行结果的 topic String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); @@ -214,4 +184,15 @@ public class DeviceMqttService { } } + /** + * 设备命令响应处理,把设备响应结果放入通道中 + * + * @param commandResponse 设备命令响应 + */ + public void commandReplied(DeviceCommandResponseDTO commandResponse) { + CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false); + commandResponseChan.put(commandResponse); + } + + } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java new file mode 100644 index 0000000..8ee0d71 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java @@ -0,0 +1,45 @@ +package net.maku.iot.communication; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import org.springframework.stereotype.Component; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:21 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class TCPService implements BaseCommunication { + + + @Override + public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return ""; + } + + @Override + public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return null; + } + + @Override + public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return null; + } + + @Override + public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { + return; + } + + @Override + public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { + return; + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java similarity index 94% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java index d1d2feb..e1a3fc4 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt; +package net.maku.iot.communication.mqtt; import jakarta.annotation.Resource; -import net.maku.iot.mqtt.config.MqttConfig; +import net.maku.iot.communication.mqtt.config.MqttConfig; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.support.MessageBuilder; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java similarity index 97% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java index ca3bf05..fea1398 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java @@ -1,11 +1,11 @@ -package net.maku.iot.mqtt.config; +package net.maku.iot.communication.mqtt.config; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.mqtt.factory.MqttMessageHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.factory.MqttMessageHandlerFactory; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java similarity index 86% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java index 2b01d95..719fdd5 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.mqtt.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java new file mode 100644 index 0000000..1896613 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java @@ -0,0 +1,96 @@ +package net.maku.iot.communication.mqtt.dto; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * 数据生产消费者通道 + */ +@Slf4j +public class CommandResponseChan { + + // 存储通道的 ConcurrentHashMap + private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); + + private final CompletableFuture future = new CompletableFuture<>(); + + private final Long DEFAULT_WAIT_MILLISECONDS = 5 * 1000L; + + // 私有构造函数,不允许外部直接实例化 + private CommandResponseChan() { + } + + /** + * 获取或创建通道实例 + * + * @param commandId 通道标识 + * @param isNeedCreate 是否需要创建新的通道实例 + * @return 通道实例 + */ + public static CommandResponseChan getInstance(String commandId, boolean isNeedCreate) { + if (!isNeedCreate) { + return CHANNEL.get(commandId); + } + return CHANNEL.computeIfAbsent(commandId, k -> new CommandResponseChan()); + } + + /** + * 从通道中获取数据,默认超时时间为 5 秒 + * + * @param commandId 通道标识 + * @return 获取的数据,如果超时返回 null + */ + public BaseCommandResponse get(String commandId) { + return get(commandId, DEFAULT_WAIT_MILLISECONDS); + } + + /** + * 从通道中获取数据,支持超时设置 + * + * @param commandId 通道标识 + * @param timeout 超时时间(毫秒) + * @return 获取的数据,如果超时返回 null + */ + public BaseCommandResponse get(String commandId, long timeout) { + CommandResponseChan channel = CHANNEL.get(commandId); + if (Objects.isNull(channel)) { + return null; + } + try { + return channel.future.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // 超时异常处理 + log.error("Device response timeout. {}", commandId); + return null; + } catch (Exception e) { + // 其他异常处理 + e.printStackTrace(); + return null; + } finally { + // 确保在获取数据后移除通道 + CHANNEL.remove(commandId, channel); + } + } + + /** + * 向通道中放入数据,并唤醒可能正在等待数据的线程 + * + * @param response 要放入的数据 + */ + public void put(BaseCommandResponse response) { + String commandId = response.getCommandId(); + if (commandId == null) { + return; + } + CommandResponseChan channel = CHANNEL.get(commandId); + if (Objects.isNull(channel)) { + return; + } + channel.future.complete(response); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java similarity index 93% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java index e304a29..e8154de 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.mqtt.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java similarity index 96% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java index e4f6133..eb762f9 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.mqtt.dto; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import io.swagger.v3.oas.annotations.media.Schema; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java similarity index 92% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java index d75b329..fed63de 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.dto; +package net.maku.iot.communication.mqtt.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java similarity index 89% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java index 742584d..61322d0 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.factory; +package net.maku.iot.communication.mqtt.factory; import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.DeviceCommandResponseHandler; +import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java similarity index 89% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java index e0d6bb4..b84218c 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.factory; +package net.maku.iot.communication.mqtt.factory; import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.DevicePropertyChangeHandler; +import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java similarity index 91% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java index 368cf54..d672227 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.factory; +package net.maku.iot.communication.mqtt.factory; import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.MqttMessageHandler; +import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java similarity index 72% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java index 2c04a66..3af2705 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; /** * 设备命令响应处理器 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java similarity index 88% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java index 551a16b..5389920 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java @@ -1,13 +1,13 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.MQTTService; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.mqtt.factory.DeviceCommandResponseHandlerFactory; -import net.maku.iot.mqtt.service.DeviceMqttService; import org.springframework.stereotype.Component; import java.util.Optional; @@ -21,10 +21,10 @@ import java.util.Optional; @Component @RequiredArgsConstructor public class DeviceCommandResponseMqttMessageHandler implements MqttMessageHandler { + private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory; - - private final DeviceMqttService deviceMqttService; + private final MQTTService deviceMqttService; @Override public boolean supports(String topic) { @@ -43,7 +43,7 @@ public class DeviceCommandResponseMqttMessageHandler implements MqttMessageHandl .ifPresent(responseDTO -> { // 调用设备命令执行器的命令响应处理逻辑 try { - deviceMqttService.commandReplied(topic, responseDTO); + deviceMqttService.commandReplied( responseDTO); } catch (Exception e) { log.error(StrUtil.format("调用设备命令执行器响应处理方法出错,topic:{}, message:{}", topic, message), e); } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java similarity index 72% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java index 739f266..3822fe7 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java @@ -1,7 +1,7 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; -import net.maku.iot.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; /** * 设备属性变化处理器 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java similarity index 88% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java index f47e205..f8416f2 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java @@ -1,12 +1,12 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.dto.DevicePropertyDTO; -import net.maku.iot.mqtt.factory.DevicePropertyChangeHandlerFactory; import org.springframework.stereotype.Component; import java.util.Optional; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java similarity index 88% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java index 33307ea..a16367a 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java @@ -1,4 +1,4 @@ -package net.maku.iot.mqtt.handler; +package net.maku.iot.communication.mqtt.handler; /** * MQTT订阅消息处理接口 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java index 9f03409..2632c91 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java @@ -33,6 +33,11 @@ public class IotDeviceEntity extends BaseEntity { private Integer type; /** + * 设备和服务器通信协议类型 + */ + private String protocolType; + + /** * 唯一标识码 */ private String uid; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java new file mode 100644 index 0000000..2e15eee --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java @@ -0,0 +1,28 @@ +package net.maku.iot.enums; + +import cn.hutool.core.util.StrUtil; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Arrays; + +/** + * IOT常用的通信协议 + * + * @author LSF maku_lsf@163.com + */ +@Getter +@RequiredArgsConstructor +public enum IOTProtocolEnum { + + MQTT("MQTT"), + TCP("TCP"), + UDP("UDP"), + BLE("BLE"), + CoAP("CoAP"), + LwM2M("LwM2M"), + Modbus("Modbus"); + + private final String value; +} + diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java deleted file mode 100644 index 7936965..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java +++ /dev/null @@ -1,74 +0,0 @@ -package net.maku.iot.mqtt.dto; - - -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -/** - * 数据生产消费者 - */ -public class Chan { - - // 存储通道的 ConcurrentHashMap - private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); - - private final CompletableFuture future = new CompletableFuture<>(); - - // 私有构造函数,不允许外部直接实例化 - private Chan() { - } - - /** - * 获取或创建通道实例 - * - * @param commandId 通道标识 - * @param isNeedCreate 是否需要创建新的通道实例 - * @return 通道实例 - */ - public static Chan getInstance(String commandId, boolean isNeedCreate) { - if (!isNeedCreate) { - return CHANNEL.get(commandId); - } - return CHANNEL.computeIfAbsent(commandId, k -> new Chan()); - } - - /** - * 从通道中获取数据,支持超时设置 - * - * @param commandId 通道标识 - * @param timeout 超时时间(毫秒) - * @return 获取的数据,如果超时返回 null - */ - public Object get(String commandId, long timeout) { - Chan chan = CHANNEL.get(commandId); - if (Objects.isNull(chan)) { - return null; - } - try { - return chan.future.get(timeout, TimeUnit.MILLISECONDS); - } catch (Exception e) { - return null; - } finally { - CHANNEL.remove(commandId, chan); - } - } - - /** - * 向通道中放入数据,并唤醒可能正在等待数据的线程 - * - * @param response 要放入的数据 - */ - public void put(BaseCommandResponse response) { - String commandId = response.getCommandId(); - if (commandId == null) { - return; - } - Chan chan = CHANNEL.get(commandId); - if (Objects.isNull(chan)) { - return; - } - chan.future.complete(response); - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java index eafbde4..6588370 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java @@ -3,8 +3,9 @@ package net.maku.iot.service; import net.maku.framework.common.utils.PageResult; import net.maku.framework.mybatis.service.BaseService; import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; import net.maku.iot.query.IotDeviceQuery; +import net.maku.iot.communication.BaseCommunication; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO; import net.maku.iot.vo.DeviceCommandVO; import net.maku.iot.vo.DeviceReportAttributeDataVO; @@ -28,6 +29,27 @@ public interface IotDeviceService extends BaseService { void delete(List idList); /** + * 根据设备的协议类型获取发送服务 + * @param device 设备 + * @return + */ + BaseCommunication getSendService(IotDeviceEntity device); + + /** + * 根据协议类型获取发送服务 + * @param protocolType + * @return + */ + BaseCommunication getSendService(String protocolType); + + /** + * 根据设备ID获取发送服务 + * @param deviceId + * @return + */ + BaseCommunication getSendService(Long deviceId); + + /** * 对设备下发指令-同步响应模式 * * @param vo diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java index d7bebfa..50fd92e 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java @@ -16,12 +16,13 @@ import net.maku.iot.convert.IotDeviceConvert; import net.maku.iot.dao.IotDeviceDao; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.*; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.mqtt.dto.DevicePropertyDTO; -import net.maku.iot.mqtt.handler.DeviceCommandResponseHandler; -import net.maku.iot.mqtt.handler.DevicePropertyChangeHandler; -import net.maku.iot.mqtt.service.DeviceMqttService; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler; +import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler; import net.maku.iot.query.IotDeviceQuery; +import net.maku.iot.communication.BaseCommunication; +import net.maku.iot.communication.CommunicationServiceFactory; import net.maku.iot.service.IotDeviceEventLogService; import net.maku.iot.service.IotDeviceService; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO; @@ -35,7 +36,7 @@ import java.time.LocalDateTime; import java.util.List; /** - * 设备表 + * 设备服务类 * * @author LSF maku_lsf@163.com */ @@ -45,8 +46,7 @@ import java.util.List; public class IotDeviceServiceImpl extends BaseServiceImpl implements IotDeviceService, DevicePropertyChangeHandler, DeviceCommandResponseHandler { - //todo 后续版本更改为根据物模型自动选择不同的通信层Service - private final DeviceMqttService mqttService; + private final CommunicationServiceFactory communicationService; private final IotDeviceEventLogService deviceEventLogService; @Override @@ -85,6 +85,28 @@ public class IotDeviceServiceImpl extends BaseServiceImpl