From 73ecc0bf4fddf3c80dc97f6280e173bb7471913c Mon Sep 17 00:00:00 2001 From: LSF <695944503@qq.com> Date: Wed, 14 Aug 2024 10:08:01 +0800 Subject: [PATCH] =?UTF-8?q?add:=E6=8F=90=E4=BE=9B=E5=A4=9A=E9=80=9A?= =?UTF-8?q?=E4=BF=A1=E5=8D=8F=E8=AE=AE=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db/mysql/module/maku-module-iot.sql | 12 ++ .../maku/iot/communication/BaseCommunication.java | 28 +++ .../communication/CommunicationServiceFactory.java | 36 ++++ .../net/maku/iot/communication/MQTTService.java | 198 +++++++++++++++++++ .../net/maku/iot/communication/TCPService.java | 45 +++++ .../maku/iot/communication/mqtt/MqttGateway.java | 37 ++++ .../iot/communication/mqtt/config/MqttConfig.java | 159 +++++++++++++++ .../mqtt/dto/BaseCommandResponse.java | 18 ++ .../mqtt/dto/CommandResponseChan.java | 96 +++++++++ .../communication/mqtt/dto/DeviceCommandDTO.java | 32 +++ .../mqtt/dto/DeviceCommandResponseDTO.java | 41 ++++ .../communication/mqtt/dto/DevicePropertyDTO.java | 26 +++ .../DeviceCommandResponseHandlerFactory.java | 41 ++++ .../DevicePropertyChangeHandlerFactory.java | 41 ++++ .../mqtt/factory/MqttMessageHandlerFactory.java | 47 +++++ .../mqtt/handler/DeviceCommandResponseHandler.java | 19 ++ .../DeviceCommandResponseMqttMessageHandler.java | 73 +++++++ .../mqtt/handler/DevicePropertyChangeHandler.java | 19 ++ .../handler/DevicePropertyMqttMessageHandler.java | 49 +++++ .../mqtt/handler/MqttMessageHandler.java | 24 +++ .../java/net/maku/iot/entity/IotDeviceEntity.java | 5 + .../java/net/maku/iot/enums/IOTProtocolEnum.java | 28 +++ .../main/java/net/maku/iot/mqtt/MqttGateway.java | 37 ---- .../java/net/maku/iot/mqtt/config/MqttConfig.java | 159 --------------- .../net/maku/iot/mqtt/dto/BaseCommandResponse.java | 18 -- .../src/main/java/net/maku/iot/mqtt/dto/Chan.java | 74 ------- .../net/maku/iot/mqtt/dto/DeviceCommandDTO.java | 32 --- .../iot/mqtt/dto/DeviceCommandResponseDTO.java | 41 ---- .../net/maku/iot/mqtt/dto/DevicePropertyDTO.java | 26 --- .../DeviceCommandResponseHandlerFactory.java | 41 ---- .../DevicePropertyChangeHandlerFactory.java | 41 ---- .../mqtt/factory/MqttMessageHandlerFactory.java | 47 ----- .../mqtt/handler/DeviceCommandResponseHandler.java | 19 -- .../DeviceCommandResponseMqttMessageHandler.java | 73 ------- .../mqtt/handler/DevicePropertyChangeHandler.java | 19 -- .../handler/DevicePropertyMqttMessageHandler.java | 49 ----- .../maku/iot/mqtt/handler/MqttMessageHandler.java | 24 --- .../maku/iot/mqtt/service/DeviceMqttService.java | 217 --------------------- .../net/maku/iot/service/IotDeviceService.java | 24 ++- .../iot/service/impl/IotDeviceServiceImpl.java | 52 +++-- .../src/main/java/net/maku/iot/vo/IotDeviceVO.java | 3 + 41 files changed, 1138 insertions(+), 932 deletions(-) create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java diff --git a/db/mysql/module/maku-module-iot.sql b/db/mysql/module/maku-module-iot.sql index 7d307ed..8bfd5f0 100644 --- a/db/mysql/module/maku-module-iot.sql +++ b/db/mysql/module/maku-module-iot.sql @@ -11,6 +11,7 @@ CREATE TABLE iot_device ( temperature varchar(10) DEFAULT NULL COMMENT '温度', status tinyint NOT NULL DEFAULT '1' COMMENT '状态,0禁用,1启用', running_status int NOT NULL DEFAULT '0' COMMENT '运行状态,0.离线状态 1.在线状态 2.正常待机 3.用户使用中 4.OTA升级中', + protocol_type varchar(20) NOT NULL DEFAULT 'MQTT' COMMENT '协议类型', up_time datetime DEFAULT NULL COMMENT '上线时间', down_time datetime DEFAULT NULL COMMENT '下线时间', tenant_id bigint DEFAULT NULL COMMENT '租户ID', @@ -94,6 +95,17 @@ INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, r INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), '登出', 'SIGN_OFF', NULL, NULL, 3, NULL, 0, 1, 10000, now(), 10000, now()); INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'OTA升级', 'OTA_UPGRADE', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_type (dict_type,dict_name,remark,sort,tenant_id,version,deleted,creator,create_time,updater,update_time )VALUES('device_protocol_type', '设备协议类型', '设备协议类型', 0, 10000, 0, 0, 10000, now(), 10000, now() ); +SET @typeId = @@identity; +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'MQTT', 'MQTT', NULL, NULL, 0, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'TCP', 'TCP', NULL, NULL, 1, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'UDP', 'UDP', NULL, NULL, 2, NULL, 0, 1, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'BLE', 'BLE', NULL, NULL, 3, NULL, 0, 1, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'CoAP', 'CoAP', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'LwM2M', 'LwM2M', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); +INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), 'Modbus', 'Modbus', NULL, NULL, 4, NULL, 0, 0, 10000, now(), 10000, now()); + + INSERT INTO sys_dict_type (dict_type,dict_name,remark,sort,tenant_id,version,deleted,creator,create_time,updater,update_time )VALUES('device_property', '设备属性', '设备通用属性:运行状态|APP版本|电池电量百分比|温度', 0, 10000, 0, 0, 10000, now(), 10000, now() ); SET @typeId = @@identity; INSERT INTO sys_dict_data (dict_type_id, dict_label, dict_value, label_class, remark, sort, tenant_id, version, deleted, creator, create_time, updater, update_time) VALUES ((SELECT @typeId), '运行状态', 'RUNNING_STATUS', NULL, NULL, 0, NULL, 0, 0, 10000, now(), 10000, now()); diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java new file mode 100644 index 0000000..ef40e9f --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java @@ -0,0 +1,28 @@ +package net.maku.iot.communication; + +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; + +/** + * 基础通信协议具备功能 + */ +public interface BaseCommunication { + + // 异步发送指令,不等待设备响应 + String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //同步发送指定,等待设备响应 + DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //同步发送指定,等待设备响应,调试实现 + DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //模拟设备属性上报 + void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload); + + //模拟设备服务指令响应数据 + void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload); + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java new file mode 100644 index 0000000..c4535e1 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java @@ -0,0 +1,36 @@ +package net.maku.iot.communication; + +import lombok.AllArgsConstructor; +import net.maku.framework.common.exception.ServerException; +import org.springframework.stereotype.Service; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:53 + */ +@Service +@AllArgsConstructor +public class CommunicationServiceFactory { + + private final MQTTService mqttService; + private final TCPService tcpService; + + public BaseCommunication getProtocol(String protocolType) { + if (protocolType == null) { + new ServerException("协议不存在!"); + } + switch (protocolType) { + case "MQTT": + return mqttService; + case "TCP": + return tcpService; +// case "Modbus": +// return tcpService; + default: + return null; + } + } + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java new file mode 100644 index 0000000..462dc64 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java @@ -0,0 +1,198 @@ +package net.maku.iot.communication; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.exception.ServerException; +import net.maku.iot.communication.mqtt.MqttGateway; +import net.maku.iot.communication.mqtt.dto.CommandResponseChan; +import net.maku.iot.communication.mqtt.dto.DeviceCommandDTO; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.dto.DeviceClientDTO; +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.enums.DeviceTopicEnum; +import net.maku.iot.service.IotDeviceServiceLogService; +import org.springframework.stereotype.Component; + +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Exchanger; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:21 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class MQTTService implements BaseCommunication { + + private final MqttGateway mqttGateway; + private final IotDeviceServiceLogService iotDeviceEventLogService; + + /** + * 异步发送命令,返回命令id + * + * @param device + * @param command + * @param payload + * @return + */ + @Override + public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return asyncSendCommand(device, command, payload, Boolean.FALSE); + } + + /** + * 异步发送命令,返回命令id + * + * @param device + * @param command + * @param payload + * @param retained + * @return + */ + public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { + // 构建命令对象 + String commandId = StrUtil.replaceChars(UUID.randomUUID().toString(), "-", ""); + DeviceCommandDTO commandDTO = new DeviceCommandDTO(); + commandDTO.setCommand(command); + commandDTO.setId(commandId); + commandDTO.setPayload(payload); + String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device)); + + // 发送命令到设备命令主题 + try { + mqttGateway.sendToMqtt(commandTopic, retained, JSONUtil.toJsonStr(commandDTO)); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 失败", + command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic)); + } + log.info("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 成功", command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic); + iotDeviceEventLogService.createAndSaveDeviceServiceLog(device.getId(), device.getTenantId(), command, commandId, payload); + return commandId; + } + + /** + * 同步发送命令并返回响应结果 + * + * @param device + * @param command + * @param payload + * @return + */ + public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return syncSendCommand(device, command, payload, Boolean.FALSE); + } + + /** + * 发送命令并返回响应结果 + * + * @param device + * @param command + * @param payload + * @param retained + * @return + */ + public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { + // 构建并发送命令 + String commandId = asyncSendCommand(device, command, payload, retained); + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); + } + return (DeviceCommandResponseDTO) receiver; + } + + /** + * 发送命令并返回响应结果,模拟设备响应 + * + * @param device + * @param command + * @param payload + * @return + */ + public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + // 构建并发送命令 + String commandId = asyncSendCommand(device, command, payload); + + // 2秒后模拟设备响应 + new Thread(() -> { + try { + //模拟设备正常响应 + Thread.sleep(2000); + //模拟设备超时响应 + //Thread.sleep(15000); + DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO(); + simulateResponseDto.setCommandId(commandId); + simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!"); + simulateResponseDto.setCommand(command); + simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("模拟设备响应线程被中断", e); + } + }).start(); + + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); + } + return (DeviceCommandResponseDTO) receiver; + } + + /** + * 模拟设备属性上报 + */ + @Override + public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { + // 封装 设备属性上报的 topic + String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); + try { + mqttGateway.sendToMqtt(commandTopic, payload); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ", + device.getCode(), device.getName(), commandTopic)); + } + } + + + /** + * 模拟设备服务指令响应数据 + * + * @param device + * @param payload + */ + @Override + public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { + // 封装 设备命令执行结果的 topic + String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); + try { + mqttGateway.sendToMqtt(commandTopic, payload); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ", + device.getCode(), device.getName(), commandTopic)); + } + } + + /** + * 设备命令响应处理,把设备响应结果放入通道中 + * + * @param commandResponse 设备命令响应 + */ + public void commandReplied(DeviceCommandResponseDTO commandResponse) { + CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false); + commandResponseChan.put(commandResponse); + } + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java new file mode 100644 index 0000000..8ee0d71 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java @@ -0,0 +1,45 @@ +package net.maku.iot.communication; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import org.springframework.stereotype.Component; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:21 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class TCPService implements BaseCommunication { + + + @Override + public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return ""; + } + + @Override + public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return null; + } + + @Override + public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return null; + } + + @Override + public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { + return; + } + + @Override + public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { + return; + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java new file mode 100644 index 0000000..e1a3fc4 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/MqttGateway.java @@ -0,0 +1,37 @@ +package net.maku.iot.communication.mqtt; + +import jakarta.annotation.Resource; +import net.maku.iot.communication.mqtt.config.MqttConfig; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +/** + * MQTT网关 + * + * @author LSF maku_lsf@163.com + */ +@Component +@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL) +public class MqttGateway { + @Resource + private MqttConfig mqttConfig; + + public void sendToMqtt(String payload) { + mqttConfig.mqttOutboundHandler().handleMessage(MessageBuilder.withPayload(payload).build()); + } + + public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload) { + mqttConfig.mqttOutboundHandler().handleMessage(MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).build()); + } + + public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload) { + mqttConfig.mqttOutboundHandler().handleMessage(MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.QOS, qos).build()); + } + + public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.RETAINED) boolean retained, String payload) { + mqttConfig.mqttOutboundHandler().handleMessage(MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.RETAINED, retained).build()); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java new file mode 100644 index 0000000..fea1398 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/config/MqttConfig.java @@ -0,0 +1,159 @@ +package net.maku.iot.communication.mqtt.config; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.mqtt.factory.MqttMessageHandlerFactory; +import net.maku.iot.enums.DeviceTopicEnum; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * MQTT 配置类,用于设置和管理 MQTT 连接和消息处理。 + * + * @author LSF maku_lsf@163.com + */ +@Data +@Slf4j +@Configuration +@IntegrationComponentScan +@ConfigurationProperties(prefix = "spring.mqtt") +public class MqttConfig { + public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel"; + public static final String INPUT_CHANNEL = "mqttInputChannel"; + + // MQTT 用户名 + private String username; + + // MQTT 密码 + private String password; + + // MQTT 服务器 URL + private String host; + + // 客户端 ID + private String clientId; + + // 默认主题 + private String defaultTopic; + + // 处理 MQTT 消息的工厂 + @Resource + private MqttMessageHandlerFactory mqttMessageHandlerFactory; + + @PostConstruct + public void init() { + log.info("MQTT 主机: {} 客户端ID: {} 默认主题:{}", this.host, this.clientId, this.defaultTopic); + } + + /** + * 配置并返回一个 MqttPahoClientFactory 实例,用于创建 MQTT 客户端连接。 + * + * @return MqttPahoClientFactory + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + // 设置连接选项,包括服务器 URI、用户名和密码。 + final MqttConnectOptions options = new MqttConnectOptions(); + options.setServerURIs(new String[]{host}); + options.setUserName(username); + options.setPassword(password.toCharArray()); + final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(options); + return factory; + } + + /** + * 创建一个用于发送 MQTT 消息的 MessageChannel。 + * + * @return MessageChannel + */ + @Bean(OUTBOUND_CHANNEL) + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + /** + * 配置用于发送 MQTT 消息的 MessageHandler。 + * + * @return MessageHandler + */ + @Bean + @ServiceActivator(inputChannel = OUTBOUND_CHANNEL) + public MessageHandler mqttOutboundHandler() { + // 使用 MqttPahoMessageHandler 创建一个新的 MQTT 客户端连接,用于发布消息。 + final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "_pub", mqttClientFactory()); + handler.setDefaultQos(1); + handler.setDefaultRetained(false); + handler.setDefaultTopic(defaultTopic); + handler.setAsync(true); + return handler; + } + + /** + * 创建用于接收 MQTT 消息的 MessageChannel。 + * + * @return MessageChannel + */ + @Bean + public MessageChannel mqttInputChannel() { + return new DirectChannel(); + } + + /** + * 配置 客户端,订阅的主题, + * PROPERTY:设备属性上报主题, + * COMMAND_RESPONSE:下发指令执行结果主题 + * + * @return MqttPahoMessageDrivenChannelAdapter + */ + @Bean + public MqttPahoMessageDrivenChannelAdapter mqttInboundAdapter() { + final MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + clientId + "_sub", + mqttClientFactory(), DeviceTopicEnum.PROPERTY.getWildcard(), + DeviceTopicEnum.COMMAND_RESPONSE.getWildcard() + ); + adapter.setCompletionTimeout(15000); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setQos(1); + adapter.setOutputChannel(mqttInputChannel()); + return adapter; + } + + /** + * 通过通道获取数据并处理消息。 + * + * @return MessageHandler + */ + @Bean + @ServiceActivator(inputChannel = INPUT_CHANNEL) + public MessageHandler mqttMessageHandler() { + return message -> { + String topic = (String) message.getHeaders().get("mqtt_receivedTopic"); + if (topic != null) { + mqttMessageHandlerFactory.getHandlersForTopic(topic).forEach(handler -> { + if (log.isDebugEnabled()) { + log.debug("主题: {}, 消息内容: {}", topic, message.getPayload()); + } + handler.handle(topic, message.getPayload().toString()); + }); + } else { + log.warn("接收到主题为null的消息。"); + } + }; + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java new file mode 100644 index 0000000..719fdd5 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java @@ -0,0 +1,18 @@ +package net.maku.iot.communication.mqtt.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * 响应消息基础类 + * + * @author eden on 2024/6/17 + */ +@Data +public class BaseCommandResponse { + /** + * 命令ID + */ + @Schema(description = "命令ID", required = true) + protected String commandId; +} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java new file mode 100644 index 0000000..1896613 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java @@ -0,0 +1,96 @@ +package net.maku.iot.communication.mqtt.dto; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * 数据生产消费者通道 + */ +@Slf4j +public class CommandResponseChan { + + // 存储通道的 ConcurrentHashMap + private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); + + private final CompletableFuture future = new CompletableFuture<>(); + + private final Long DEFAULT_WAIT_MILLISECONDS = 5 * 1000L; + + // 私有构造函数,不允许外部直接实例化 + private CommandResponseChan() { + } + + /** + * 获取或创建通道实例 + * + * @param commandId 通道标识 + * @param isNeedCreate 是否需要创建新的通道实例 + * @return 通道实例 + */ + public static CommandResponseChan getInstance(String commandId, boolean isNeedCreate) { + if (!isNeedCreate) { + return CHANNEL.get(commandId); + } + return CHANNEL.computeIfAbsent(commandId, k -> new CommandResponseChan()); + } + + /** + * 从通道中获取数据,默认超时时间为 5 秒 + * + * @param commandId 通道标识 + * @return 获取的数据,如果超时返回 null + */ + public BaseCommandResponse get(String commandId) { + return get(commandId, DEFAULT_WAIT_MILLISECONDS); + } + + /** + * 从通道中获取数据,支持超时设置 + * + * @param commandId 通道标识 + * @param timeout 超时时间(毫秒) + * @return 获取的数据,如果超时返回 null + */ + public BaseCommandResponse get(String commandId, long timeout) { + CommandResponseChan channel = CHANNEL.get(commandId); + if (Objects.isNull(channel)) { + return null; + } + try { + return channel.future.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // 超时异常处理 + log.error("Device response timeout. {}", commandId); + return null; + } catch (Exception e) { + // 其他异常处理 + e.printStackTrace(); + return null; + } finally { + // 确保在获取数据后移除通道 + CHANNEL.remove(commandId, channel); + } + } + + /** + * 向通道中放入数据,并唤醒可能正在等待数据的线程 + * + * @param response 要放入的数据 + */ + public void put(BaseCommandResponse response) { + String commandId = response.getCommandId(); + if (commandId == null) { + return; + } + CommandResponseChan channel = CHANNEL.get(commandId); + if (Objects.isNull(channel)) { + return; + } + channel.future.complete(response); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java new file mode 100644 index 0000000..e8154de --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java @@ -0,0 +1,32 @@ +package net.maku.iot.communication.mqtt.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import net.maku.iot.enums.DeviceCommandEnum; + +/** + * 设备命令对象 + * + * @author LSF maku_lsf@163.com + */ +@Data +@Schema(description = "设备命令对象") +public class DeviceCommandDTO { + /** + * 命令类型 + */ + @Schema(description = "命令类型", required = true) + private DeviceCommandEnum command; + + /** + * 命令id + */ + @Schema(description = "命令id", required = true) + private String id; + + /** + * 命令内容 + */ + @Schema(description = "命令内容") + private String payload; +} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java new file mode 100644 index 0000000..eb762f9 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java @@ -0,0 +1,41 @@ +package net.maku.iot.communication.mqtt.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.maku.iot.enums.DeviceCommandEnum; + +/** + * 设备命令响应DTO + * + * @author LSF maku_lsf@163.com + */ +@EqualsAndHashCode(callSuper = true) +@Data +@Schema(description = "设备命令响应DTO") +@JsonIgnoreProperties(ignoreUnknown = true) +public class DeviceCommandResponseDTO extends BaseCommandResponse { + /** + * 命令类型 + */ + @Schema(description = "命令类型", required = true) + private DeviceCommandEnum command; + + /** + * 命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应) + */ + @Schema(description = "命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应)") + private boolean isCompleted = true; + + /** + * 响应状态码,0成功,其它数值异常,根据业务需要自定义 + */ + @Schema(description = "响应状态码,0成功,其它数值异常,根据业务需要自定义") + private Integer statusCode = 0; + /** + * 命令响应结果 + */ + @Schema(description = "命令响应结果") + private String responsePayload; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java new file mode 100644 index 0000000..fed63de --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java @@ -0,0 +1,26 @@ +package net.maku.iot.communication.mqtt.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import net.maku.iot.enums.DevicePropertyEnum; + +/** + * 设备属性对象 + * + * @author LSF maku_lsf@163.com + */ +@Data +@Schema(description = "设备属性对象") +public class DevicePropertyDTO { + /** + * 设备属性类型 + */ + @Schema(description = "设备属性类型") + private DevicePropertyEnum propertyType; + + /** + * 属性数据 + */ + @Schema(description = "状态数据,不同状态类型需传入相应的状态数据", required = true) + private String payload; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java new file mode 100644 index 0000000..61322d0 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DeviceCommandResponseHandlerFactory.java @@ -0,0 +1,41 @@ +package net.maku.iot.communication.mqtt.factory; + +import lombok.RequiredArgsConstructor; +import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * 设备命令响应处理器工厂,自动获取所有实现的handler实例 + * + * @author LSF maku_lsf@163.com + */ +@Component +@RequiredArgsConstructor +public class DeviceCommandResponseHandlerFactory { + private final ApplicationContext applicationContext; + + /** + * 所有设备命令响应handlers + */ + private List handlers; + + /** + * 获取设备命令响应handlers + * + * @return + */ + public List getHandlers() { + if (handlers != null) { + return handlers; + } + handlers = Collections.unmodifiableList( + new ArrayList<>(applicationContext.getBeansOfType( + DeviceCommandResponseHandler.class).values())); + return handlers; + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java new file mode 100644 index 0000000..b84218c --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/DevicePropertyChangeHandlerFactory.java @@ -0,0 +1,41 @@ +package net.maku.iot.communication.mqtt.factory; + +import lombok.RequiredArgsConstructor; +import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * 设备运行状态变化处理器工厂,自动获取所有实现的handler实例 + * + * @author LSF maku_lsf@163.com + */ +@Component +@RequiredArgsConstructor +public class DevicePropertyChangeHandlerFactory { + private final ApplicationContext applicationContext; + + /** + * 所有设备运行属性变化handlers + */ + private List handlers; + + /** + * 获取设备运行状态变化handlers + * + * @return + */ + public List getHandlers() { + if (handlers != null) { + return handlers; + } + handlers = Collections.unmodifiableList( + new ArrayList<>(applicationContext.getBeansOfType( + DevicePropertyChangeHandler.class).values())); + return handlers; + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java new file mode 100644 index 0000000..d672227 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/factory/MqttMessageHandlerFactory.java @@ -0,0 +1,47 @@ +package net.maku.iot.communication.mqtt.factory; + +import lombok.RequiredArgsConstructor; +import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * MQTT消息处理器工厂,自动获取所有实现的处理器实例 + * + * @author LSF maku_lsf@163.com + */ +@Component +@RequiredArgsConstructor +public class MqttMessageHandlerFactory { + private final ApplicationContext applicationContext; + + /** + * 所有消息处理器 + */ + private List messageHandlers; + + private List loadHandlers() { + if (messageHandlers != null) { + return messageHandlers; + } + messageHandlers = new ArrayList<>(applicationContext.getBeansOfType(MqttMessageHandler.class).values()); + return messageHandlers; + } + + /** + * 获取与主题对应的处理器 + * + * @param topic 主题 + * @return 处理器列表 + */ + public List getHandlersForTopic(String topic) { + return Collections.unmodifiableList(loadHandlers().stream() + .filter(handler -> handler.supports(topic)) + .collect(Collectors.toList())); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java new file mode 100644 index 0000000..3af2705 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java @@ -0,0 +1,19 @@ +package net.maku.iot.communication.mqtt.handler; + + +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; + +/** + * 设备命令响应处理器 + * + * @author LSF maku_lsf@163.com + */ +public interface DeviceCommandResponseHandler { + /** + * 设备命令响应处理 + * + * @param topic + * @param commandResponse + */ + void handle(String topic, DeviceCommandResponseDTO commandResponse); +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java new file mode 100644 index 0000000..5389920 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java @@ -0,0 +1,73 @@ +package net.maku.iot.communication.mqtt.handler; + +import cn.hutool.core.util.StrUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.MQTTService; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory; +import net.maku.iot.enums.DeviceTopicEnum; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + * 设备命令响应处理器 + * + * @author LSF maku_lsf@163.com + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DeviceCommandResponseMqttMessageHandler implements MqttMessageHandler { + + private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory; + + private final MQTTService deviceMqttService; + + @Override + public boolean supports(String topic) { + return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.COMMAND_RESPONSE.getTopic()); + } + + @Override + public void handle(String topic, String message) { + DeviceCommandResponseDTO commandResponseDTO = parseCommandReplyMessage(topic, message); + Optional.ofNullable(commandResponseDTO.getCommand()) + .orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令类型! 主题:'{}',消息:{}", topic, message))); + Optional.ofNullable(commandResponseDTO.getCommandId()) + .orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令ID! 主题:'{}',消息:{}", topic, message))); + + Optional.ofNullable(commandResponseDTO) + .ifPresent(responseDTO -> { + // 调用设备命令执行器的命令响应处理逻辑 + try { + deviceMqttService.commandReplied( responseDTO); + } catch (Exception e) { + log.error(StrUtil.format("调用设备命令执行器响应处理方法出错,topic:{}, message:{}", topic, message), e); + } + // 调用自定义命令响应处理器 + try { + deviceCommandResponseHandlerFactory.getHandlers().forEach(h -> h.handle(topic, responseDTO)); + } catch (Exception e) { + log.error(StrUtil.format("调用设备命令响应响应处理器出错,topic:{}, message:{}", topic, message), e); + } + }); + } + + private DeviceCommandResponseDTO parseCommandReplyMessage(String topic, String message) { + try { + DeviceCommandResponseDTO commandResponse = JsonUtils.parseObject(message, DeviceCommandResponseDTO.class); + if (StrUtil.isBlank(commandResponse.getCommandId())) { + log.error(StrUtil.format("主题'{}'的消息,缺失指令ID", topic)); + return null; + } + return commandResponse; + + } catch (Exception e) { + log.error(StrUtil.format("将主题'{}'的消息解析为设备命令响应对象失败", topic), e); + return null; + } + } +} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java new file mode 100644 index 0000000..3822fe7 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java @@ -0,0 +1,19 @@ +package net.maku.iot.communication.mqtt.handler; + + +import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; + +/** + * 设备属性变化处理器 + * + * @author LSF maku_lsf@163.com + */ +public interface DevicePropertyChangeHandler { + /** + * 设备属性状态变化处理 + * + * @param topic + * @param deviceStatus + */ + void handle(String topic, DevicePropertyDTO deviceStatus); +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java new file mode 100644 index 0000000..f8416f2 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java @@ -0,0 +1,49 @@ +package net.maku.iot.communication.mqtt.handler; + +import cn.hutool.core.util.StrUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory; +import net.maku.iot.enums.DeviceTopicEnum; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + * 设备属性上报消息处理器 + * + * @author LSF maku_lsf@163.com + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DevicePropertyMqttMessageHandler implements MqttMessageHandler { + + private final DevicePropertyChangeHandlerFactory statusChangeHandlerFactory; + + @Override + public boolean supports(String topic) { + return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.PROPERTY.getTopic()); + } + + @Override + public void handle(String topic, String message) { + DevicePropertyDTO devicePropertyDTO = parseStatusMessage(topic, message); + Optional.ofNullable(devicePropertyDTO) + .ifPresent(deviceProperty -> statusChangeHandlerFactory.getHandlers() + .forEach(h -> h.handle(topic, deviceProperty))); + } + + private DevicePropertyDTO parseStatusMessage(String topic, String message) { + try { + return JsonUtils.parseObject(message, DevicePropertyDTO.class); + } catch (Exception e) { + log.error(StrUtil.format("将主题'{}'的消息解析为设备运行状态对象失败", topic), e); + return null; + } + } + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java new file mode 100644 index 0000000..a16367a --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/MqttMessageHandler.java @@ -0,0 +1,24 @@ +package net.maku.iot.communication.mqtt.handler; + +/** + * MQTT订阅消息处理接口 + * + * @author LSF maku_lsf@163.com + */ +public interface MqttMessageHandler { + /** + * 是否支持处理指定的topic + * + * @param topic + * @return + */ + boolean supports(String topic); + + /** + * mqtt消息处理接口 + * + * @param topic + * @param message + */ + void handle(String topic, String message); +} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java index 9f03409..2632c91 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/entity/IotDeviceEntity.java @@ -33,6 +33,11 @@ public class IotDeviceEntity extends BaseEntity { private Integer type; /** + * 设备和服务器通信协议类型 + */ + private String protocolType; + + /** * 唯一标识码 */ private String uid; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java new file mode 100644 index 0000000..2e15eee --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/enums/IOTProtocolEnum.java @@ -0,0 +1,28 @@ +package net.maku.iot.enums; + +import cn.hutool.core.util.StrUtil; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Arrays; + +/** + * IOT常用的通信协议 + * + * @author LSF maku_lsf@163.com + */ +@Getter +@RequiredArgsConstructor +public enum IOTProtocolEnum { + + MQTT("MQTT"), + TCP("TCP"), + UDP("UDP"), + BLE("BLE"), + CoAP("CoAP"), + LwM2M("LwM2M"), + Modbus("Modbus"); + + private final String value; +} + diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java deleted file mode 100644 index d1d2feb..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/MqttGateway.java +++ /dev/null @@ -1,37 +0,0 @@ -package net.maku.iot.mqtt; - -import jakarta.annotation.Resource; -import net.maku.iot.mqtt.config.MqttConfig; -import org.springframework.integration.annotation.MessagingGateway; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.integration.support.MessageBuilder; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.stereotype.Component; - -/** - * MQTT网关 - * - * @author LSF maku_lsf@163.com - */ -@Component -@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL) -public class MqttGateway { - @Resource - private MqttConfig mqttConfig; - - public void sendToMqtt(String payload) { - mqttConfig.mqttOutboundHandler().handleMessage(MessageBuilder.withPayload(payload).build()); - } - - public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload) { - mqttConfig.mqttOutboundHandler().handleMessage(MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).build()); - } - - public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload) { - mqttConfig.mqttOutboundHandler().handleMessage(MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.QOS, qos).build()); - } - - public void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.RETAINED) boolean retained, String payload) { - mqttConfig.mqttOutboundHandler().handleMessage(MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.RETAINED, retained).build()); - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java deleted file mode 100644 index ca3bf05..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/config/MqttConfig.java +++ /dev/null @@ -1,159 +0,0 @@ -package net.maku.iot.mqtt.config; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.Resource; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.factory.MqttMessageHandlerFactory; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - -/** - * MQTT 配置类,用于设置和管理 MQTT 连接和消息处理。 - * - * @author LSF maku_lsf@163.com - */ -@Data -@Slf4j -@Configuration -@IntegrationComponentScan -@ConfigurationProperties(prefix = "spring.mqtt") -public class MqttConfig { - public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel"; - public static final String INPUT_CHANNEL = "mqttInputChannel"; - - // MQTT 用户名 - private String username; - - // MQTT 密码 - private String password; - - // MQTT 服务器 URL - private String host; - - // 客户端 ID - private String clientId; - - // 默认主题 - private String defaultTopic; - - // 处理 MQTT 消息的工厂 - @Resource - private MqttMessageHandlerFactory mqttMessageHandlerFactory; - - @PostConstruct - public void init() { - log.info("MQTT 主机: {} 客户端ID: {} 默认主题:{}", this.host, this.clientId, this.defaultTopic); - } - - /** - * 配置并返回一个 MqttPahoClientFactory 实例,用于创建 MQTT 客户端连接。 - * - * @return MqttPahoClientFactory - */ - @Bean - public MqttPahoClientFactory mqttClientFactory() { - // 设置连接选项,包括服务器 URI、用户名和密码。 - final MqttConnectOptions options = new MqttConnectOptions(); - options.setServerURIs(new String[]{host}); - options.setUserName(username); - options.setPassword(password.toCharArray()); - final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - factory.setConnectionOptions(options); - return factory; - } - - /** - * 创建一个用于发送 MQTT 消息的 MessageChannel。 - * - * @return MessageChannel - */ - @Bean(OUTBOUND_CHANNEL) - public MessageChannel mqttOutboundChannel() { - return new DirectChannel(); - } - - /** - * 配置用于发送 MQTT 消息的 MessageHandler。 - * - * @return MessageHandler - */ - @Bean - @ServiceActivator(inputChannel = OUTBOUND_CHANNEL) - public MessageHandler mqttOutboundHandler() { - // 使用 MqttPahoMessageHandler 创建一个新的 MQTT 客户端连接,用于发布消息。 - final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "_pub", mqttClientFactory()); - handler.setDefaultQos(1); - handler.setDefaultRetained(false); - handler.setDefaultTopic(defaultTopic); - handler.setAsync(true); - return handler; - } - - /** - * 创建用于接收 MQTT 消息的 MessageChannel。 - * - * @return MessageChannel - */ - @Bean - public MessageChannel mqttInputChannel() { - return new DirectChannel(); - } - - /** - * 配置 客户端,订阅的主题, - * PROPERTY:设备属性上报主题, - * COMMAND_RESPONSE:下发指令执行结果主题 - * - * @return MqttPahoMessageDrivenChannelAdapter - */ - @Bean - public MqttPahoMessageDrivenChannelAdapter mqttInboundAdapter() { - final MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( - clientId + "_sub", - mqttClientFactory(), DeviceTopicEnum.PROPERTY.getWildcard(), - DeviceTopicEnum.COMMAND_RESPONSE.getWildcard() - ); - adapter.setCompletionTimeout(15000); - adapter.setConverter(new DefaultPahoMessageConverter()); - adapter.setQos(1); - adapter.setOutputChannel(mqttInputChannel()); - return adapter; - } - - /** - * 通过通道获取数据并处理消息。 - * - * @return MessageHandler - */ - @Bean - @ServiceActivator(inputChannel = INPUT_CHANNEL) - public MessageHandler mqttMessageHandler() { - return message -> { - String topic = (String) message.getHeaders().get("mqtt_receivedTopic"); - if (topic != null) { - mqttMessageHandlerFactory.getHandlersForTopic(topic).forEach(handler -> { - if (log.isDebugEnabled()) { - log.debug("主题: {}, 消息内容: {}", topic, message.getPayload()); - } - handler.handle(topic, message.getPayload().toString()); - }); - } else { - log.warn("接收到主题为null的消息。"); - } - }; - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java deleted file mode 100644 index 2b01d95..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/BaseCommandResponse.java +++ /dev/null @@ -1,18 +0,0 @@ -package net.maku.iot.mqtt.dto; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; - -/** - * 响应消息基础类 - * - * @author eden on 2024/6/17 - */ -@Data -public class BaseCommandResponse { - /** - * 命令ID - */ - @Schema(description = "命令ID", required = true) - protected String commandId; -} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java deleted file mode 100644 index 7936965..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/Chan.java +++ /dev/null @@ -1,74 +0,0 @@ -package net.maku.iot.mqtt.dto; - - -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -/** - * 数据生产消费者 - */ -public class Chan { - - // 存储通道的 ConcurrentHashMap - private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); - - private final CompletableFuture future = new CompletableFuture<>(); - - // 私有构造函数,不允许外部直接实例化 - private Chan() { - } - - /** - * 获取或创建通道实例 - * - * @param commandId 通道标识 - * @param isNeedCreate 是否需要创建新的通道实例 - * @return 通道实例 - */ - public static Chan getInstance(String commandId, boolean isNeedCreate) { - if (!isNeedCreate) { - return CHANNEL.get(commandId); - } - return CHANNEL.computeIfAbsent(commandId, k -> new Chan()); - } - - /** - * 从通道中获取数据,支持超时设置 - * - * @param commandId 通道标识 - * @param timeout 超时时间(毫秒) - * @return 获取的数据,如果超时返回 null - */ - public Object get(String commandId, long timeout) { - Chan chan = CHANNEL.get(commandId); - if (Objects.isNull(chan)) { - return null; - } - try { - return chan.future.get(timeout, TimeUnit.MILLISECONDS); - } catch (Exception e) { - return null; - } finally { - CHANNEL.remove(commandId, chan); - } - } - - /** - * 向通道中放入数据,并唤醒可能正在等待数据的线程 - * - * @param response 要放入的数据 - */ - public void put(BaseCommandResponse response) { - String commandId = response.getCommandId(); - if (commandId == null) { - return; - } - Chan chan = CHANNEL.get(commandId); - if (Objects.isNull(chan)) { - return; - } - chan.future.complete(response); - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java deleted file mode 100644 index e304a29..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandDTO.java +++ /dev/null @@ -1,32 +0,0 @@ -package net.maku.iot.mqtt.dto; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; -import net.maku.iot.enums.DeviceCommandEnum; - -/** - * 设备命令对象 - * - * @author LSF maku_lsf@163.com - */ -@Data -@Schema(description = "设备命令对象") -public class DeviceCommandDTO { - /** - * 命令类型 - */ - @Schema(description = "命令类型", required = true) - private DeviceCommandEnum command; - - /** - * 命令id - */ - @Schema(description = "命令id", required = true) - private String id; - - /** - * 命令内容 - */ - @Schema(description = "命令内容") - private String payload; -} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java deleted file mode 100644 index e4f6133..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DeviceCommandResponseDTO.java +++ /dev/null @@ -1,41 +0,0 @@ -package net.maku.iot.mqtt.dto; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; -import lombok.EqualsAndHashCode; -import net.maku.iot.enums.DeviceCommandEnum; - -/** - * 设备命令响应DTO - * - * @author LSF maku_lsf@163.com - */ -@EqualsAndHashCode(callSuper = true) -@Data -@Schema(description = "设备命令响应DTO") -@JsonIgnoreProperties(ignoreUnknown = true) -public class DeviceCommandResponseDTO extends BaseCommandResponse { - /** - * 命令类型 - */ - @Schema(description = "命令类型", required = true) - private DeviceCommandEnum command; - - /** - * 命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应) - */ - @Schema(description = "命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应)") - private boolean isCompleted = true; - - /** - * 响应状态码,0成功,其它数值异常,根据业务需要自定义 - */ - @Schema(description = "响应状态码,0成功,其它数值异常,根据业务需要自定义") - private Integer statusCode = 0; - /** - * 命令响应结果 - */ - @Schema(description = "命令响应结果") - private String responsePayload; -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java deleted file mode 100644 index d75b329..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/dto/DevicePropertyDTO.java +++ /dev/null @@ -1,26 +0,0 @@ -package net.maku.iot.mqtt.dto; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; -import net.maku.iot.enums.DevicePropertyEnum; - -/** - * 设备属性对象 - * - * @author LSF maku_lsf@163.com - */ -@Data -@Schema(description = "设备属性对象") -public class DevicePropertyDTO { - /** - * 设备属性类型 - */ - @Schema(description = "设备属性类型") - private DevicePropertyEnum propertyType; - - /** - * 属性数据 - */ - @Schema(description = "状态数据,不同状态类型需传入相应的状态数据", required = true) - private String payload; -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java deleted file mode 100644 index 742584d..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DeviceCommandResponseHandlerFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -package net.maku.iot.mqtt.factory; - -import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.DeviceCommandResponseHandler; -import org.springframework.context.ApplicationContext; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * 设备命令响应处理器工厂,自动获取所有实现的handler实例 - * - * @author LSF maku_lsf@163.com - */ -@Component -@RequiredArgsConstructor -public class DeviceCommandResponseHandlerFactory { - private final ApplicationContext applicationContext; - - /** - * 所有设备命令响应handlers - */ - private List handlers; - - /** - * 获取设备命令响应handlers - * - * @return - */ - public List getHandlers() { - if (handlers != null) { - return handlers; - } - handlers = Collections.unmodifiableList( - new ArrayList<>(applicationContext.getBeansOfType( - DeviceCommandResponseHandler.class).values())); - return handlers; - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java deleted file mode 100644 index e0d6bb4..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/DevicePropertyChangeHandlerFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -package net.maku.iot.mqtt.factory; - -import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.DevicePropertyChangeHandler; -import org.springframework.context.ApplicationContext; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * 设备运行状态变化处理器工厂,自动获取所有实现的handler实例 - * - * @author LSF maku_lsf@163.com - */ -@Component -@RequiredArgsConstructor -public class DevicePropertyChangeHandlerFactory { - private final ApplicationContext applicationContext; - - /** - * 所有设备运行属性变化handlers - */ - private List handlers; - - /** - * 获取设备运行状态变化handlers - * - * @return - */ - public List getHandlers() { - if (handlers != null) { - return handlers; - } - handlers = Collections.unmodifiableList( - new ArrayList<>(applicationContext.getBeansOfType( - DevicePropertyChangeHandler.class).values())); - return handlers; - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java deleted file mode 100644 index 368cf54..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/factory/MqttMessageHandlerFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -package net.maku.iot.mqtt.factory; - -import lombok.RequiredArgsConstructor; -import net.maku.iot.mqtt.handler.MqttMessageHandler; -import org.springframework.context.ApplicationContext; -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -/** - * MQTT消息处理器工厂,自动获取所有实现的处理器实例 - * - * @author LSF maku_lsf@163.com - */ -@Component -@RequiredArgsConstructor -public class MqttMessageHandlerFactory { - private final ApplicationContext applicationContext; - - /** - * 所有消息处理器 - */ - private List messageHandlers; - - private List loadHandlers() { - if (messageHandlers != null) { - return messageHandlers; - } - messageHandlers = new ArrayList<>(applicationContext.getBeansOfType(MqttMessageHandler.class).values()); - return messageHandlers; - } - - /** - * 获取与主题对应的处理器 - * - * @param topic 主题 - * @return 处理器列表 - */ - public List getHandlersForTopic(String topic) { - return Collections.unmodifiableList(loadHandlers().stream() - .filter(handler -> handler.supports(topic)) - .collect(Collectors.toList())); - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java deleted file mode 100644 index 2c04a66..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseHandler.java +++ /dev/null @@ -1,19 +0,0 @@ -package net.maku.iot.mqtt.handler; - - -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; - -/** - * 设备命令响应处理器 - * - * @author LSF maku_lsf@163.com - */ -public interface DeviceCommandResponseHandler { - /** - * 设备命令响应处理 - * - * @param topic - * @param commandResponse - */ - void handle(String topic, DeviceCommandResponseDTO commandResponse); -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java deleted file mode 100644 index 551a16b..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java +++ /dev/null @@ -1,73 +0,0 @@ -package net.maku.iot.mqtt.handler; - -import cn.hutool.core.util.StrUtil; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import net.maku.framework.common.utils.JsonUtils; -import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.mqtt.factory.DeviceCommandResponseHandlerFactory; -import net.maku.iot.mqtt.service.DeviceMqttService; -import org.springframework.stereotype.Component; - -import java.util.Optional; - -/** - * 设备命令响应处理器 - * - * @author LSF maku_lsf@163.com - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class DeviceCommandResponseMqttMessageHandler implements MqttMessageHandler { - private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory; - - - private final DeviceMqttService deviceMqttService; - - @Override - public boolean supports(String topic) { - return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.COMMAND_RESPONSE.getTopic()); - } - - @Override - public void handle(String topic, String message) { - DeviceCommandResponseDTO commandResponseDTO = parseCommandReplyMessage(topic, message); - Optional.ofNullable(commandResponseDTO.getCommand()) - .orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令类型! 主题:'{}',消息:{}", topic, message))); - Optional.ofNullable(commandResponseDTO.getCommandId()) - .orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令ID! 主题:'{}',消息:{}", topic, message))); - - Optional.ofNullable(commandResponseDTO) - .ifPresent(responseDTO -> { - // 调用设备命令执行器的命令响应处理逻辑 - try { - deviceMqttService.commandReplied(topic, responseDTO); - } catch (Exception e) { - log.error(StrUtil.format("调用设备命令执行器响应处理方法出错,topic:{}, message:{}", topic, message), e); - } - // 调用自定义命令响应处理器 - try { - deviceCommandResponseHandlerFactory.getHandlers().forEach(h -> h.handle(topic, responseDTO)); - } catch (Exception e) { - log.error(StrUtil.format("调用设备命令响应响应处理器出错,topic:{}, message:{}", topic, message), e); - } - }); - } - - private DeviceCommandResponseDTO parseCommandReplyMessage(String topic, String message) { - try { - DeviceCommandResponseDTO commandResponse = JsonUtils.parseObject(message, DeviceCommandResponseDTO.class); - if (StrUtil.isBlank(commandResponse.getCommandId())) { - log.error(StrUtil.format("主题'{}'的消息,缺失指令ID", topic)); - return null; - } - return commandResponse; - - } catch (Exception e) { - log.error(StrUtil.format("将主题'{}'的消息解析为设备命令响应对象失败", topic), e); - return null; - } - } -} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java deleted file mode 100644 index 739f266..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyChangeHandler.java +++ /dev/null @@ -1,19 +0,0 @@ -package net.maku.iot.mqtt.handler; - - -import net.maku.iot.mqtt.dto.DevicePropertyDTO; - -/** - * 设备属性变化处理器 - * - * @author LSF maku_lsf@163.com - */ -public interface DevicePropertyChangeHandler { - /** - * 设备属性状态变化处理 - * - * @param topic - * @param deviceStatus - */ - void handle(String topic, DevicePropertyDTO deviceStatus); -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java deleted file mode 100644 index f47e205..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/DevicePropertyMqttMessageHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package net.maku.iot.mqtt.handler; - -import cn.hutool.core.util.StrUtil; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import net.maku.framework.common.utils.JsonUtils; -import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.dto.DevicePropertyDTO; -import net.maku.iot.mqtt.factory.DevicePropertyChangeHandlerFactory; -import org.springframework.stereotype.Component; - -import java.util.Optional; - -/** - * 设备属性上报消息处理器 - * - * @author LSF maku_lsf@163.com - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class DevicePropertyMqttMessageHandler implements MqttMessageHandler { - - private final DevicePropertyChangeHandlerFactory statusChangeHandlerFactory; - - @Override - public boolean supports(String topic) { - return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.PROPERTY.getTopic()); - } - - @Override - public void handle(String topic, String message) { - DevicePropertyDTO devicePropertyDTO = parseStatusMessage(topic, message); - Optional.ofNullable(devicePropertyDTO) - .ifPresent(deviceProperty -> statusChangeHandlerFactory.getHandlers() - .forEach(h -> h.handle(topic, deviceProperty))); - } - - private DevicePropertyDTO parseStatusMessage(String topic, String message) { - try { - return JsonUtils.parseObject(message, DevicePropertyDTO.class); - } catch (Exception e) { - log.error(StrUtil.format("将主题'{}'的消息解析为设备运行状态对象失败", topic), e); - return null; - } - } - - -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java deleted file mode 100644 index 33307ea..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/handler/MqttMessageHandler.java +++ /dev/null @@ -1,24 +0,0 @@ -package net.maku.iot.mqtt.handler; - -/** - * MQTT订阅消息处理接口 - * - * @author LSF maku_lsf@163.com - */ -public interface MqttMessageHandler { - /** - * 是否支持处理指定的topic - * - * @param topic - * @return - */ - boolean supports(String topic); - - /** - * mqtt消息处理接口 - * - * @param topic - * @param message - */ - void handle(String topic, String message); -} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java deleted file mode 100644 index 0538be0..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/mqtt/service/DeviceMqttService.java +++ /dev/null @@ -1,217 +0,0 @@ -package net.maku.iot.mqtt.service; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONUtil; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import net.maku.framework.common.exception.ServerException; -import net.maku.iot.dto.DeviceClientDTO; -import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.enums.DeviceCommandEnum; -import net.maku.iot.enums.DeviceServiceEnum; -import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.mqtt.MqttGateway; -import net.maku.iot.mqtt.dto.Chan; -import net.maku.iot.mqtt.dto.DeviceCommandDTO; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.service.IotDeviceServiceLogService; -import net.maku.iot.utils.MqttUtils; -import org.springframework.stereotype.Component; - -import java.util.UUID; -import java.util.concurrent.*; - -/** - * 设备命令发送服务 - **/ -@Slf4j -@Component -@RequiredArgsConstructor -public class DeviceMqttService { - private final MqttUtils mqttUtils; - private final MqttGateway mqttGateway; - private final IotDeviceServiceLogService iotDeviceEventLogService; - - /** - * 命令等待exchanger缓存,key: command id - */ - private final ConcurrentMap> commandExchangers = new ConcurrentHashMap<>(); - - /** - * 异步发送命令,返回命令id - * - * @param device - * @param command - * @param payload - * @return - */ - public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return asyncSendCommand(device, command, payload, Boolean.FALSE); - } - - - /** - * 异步发送命令,返回命令id - * - * @param device - * @param command - * @param payload - * @param retained - * @return - */ - public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { - // 构建命令对象 - String commandId = StrUtil.replaceChars(UUID.randomUUID().toString(), "-", ""); - DeviceCommandDTO commandDTO = new DeviceCommandDTO(); - commandDTO.setCommand(command); - commandDTO.setId(commandId); - commandDTO.setPayload(payload); - String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device)); - - // 发送命令到设备命令主题 - try { - mqttGateway.sendToMqtt(commandTopic, retained, JSONUtil.toJsonStr(commandDTO)); - } catch (Exception e) { - log.error(e.getMessage()); - throw new ServerException(StrUtil.format("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 失败", - command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic)); - } - log.info("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 成功", command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic); - iotDeviceEventLogService.createAndSaveDeviceServiceLog(device.getId(), device.getTenantId(), command, commandId, payload); - return commandId; - } - - - /** - * 同步发送命令并返回响应结果 - * - * @param device - * @param command - * @param payload - * @return - */ - public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return syncSendCommand(device, command, payload, Boolean.FALSE); - } - - /** - * 发送命令并返回响应结果 - * - * @param device - * @param command - * @param payload - * @param retained - * @return - */ - public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { - // 构建并发送命令 - String commandId = asyncSendCommand(device, command, payload, retained); - // 等待返回结果 超时3秒(可控) - Object receiver = Chan.getInstance(commandId, true).get(commandId, 3 * 1000L); - if (receiver == null) { - log.error("Failed to receive the message. {}", device.getName()); - throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); - } - return (DeviceCommandResponseDTO) receiver; - } - - - /** - * 发送命令并返回响应结果,模拟设备响应 - * - * @param device - * @param command - * @param payload - * @return - */ - public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - // 构建并发送命令 - String commandId = asyncSendCommand(device, command, payload); - - // 2秒后模拟设备响应 - new Thread(() -> { - try { - //模拟设备正常响应 - Thread.sleep(2000); - //模拟设备超时响应 - //Thread.sleep(15000); - DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO(); - simulateResponseDto.setCommandId(commandId); - simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!"); - simulateResponseDto.setCommand(command); - simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("模拟设备响应线程被中断", e); - } - }).start(); - - // 等待设备响应 - return waitCommandResponse(command, commandId); - } - - - /** - * 订阅设备命令响应主题并等待获取返回结果 - * - * @param command - * @param commandId - * @return - */ - private DeviceCommandResponseDTO waitCommandResponse(DeviceCommandEnum command, String commandId) { - // 创建命令响应等待exchanger - Exchanger commandExchanger = new Exchanger<>(); - commandExchangers.put(commandId, commandExchanger); - - try { - Object result = commandExchanger.exchange("", 10, TimeUnit.SECONDS); - return (DeviceCommandResponseDTO) result; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServerException(StrUtil.format("{} <{}>,{} 命令中断", - DeviceServiceEnum.COMMAND_ID.getValue(), commandId, command.getTitle()), e); - } catch (TimeoutException e) { - throw new ServerException(StrUtil.format("{} <{}>,{} 命令超时", - DeviceServiceEnum.COMMAND_ID.getValue(), commandId, command.getTitle()), e); - } finally { - // 移除命令响应等待exchanger - commandExchangers.remove(commandId); - } - } - - /** - * 设备命令响应处理 - * - * @param topic - * @param commandResponse - */ - public void commandReplied(String topic, DeviceCommandResponseDTO commandResponse) { - Chan chan = Chan.getInstance(commandResponse.getCommandId(), false); - chan.put(commandResponse); - } - - public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { - // 封装 设备属性上报的 topic - String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); - try { - mqttGateway.sendToMqtt(commandTopic, payload); - } catch (Exception e) { - log.error(e.getMessage()); - throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ", - device.getCode(), device.getName(), commandTopic)); - } - } - - public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { - // 封装 设备命令执行结果的 topic - String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); - try { - mqttGateway.sendToMqtt(commandTopic, payload); - } catch (Exception e) { - log.error(e.getMessage()); - throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ", - device.getCode(), device.getName(), commandTopic)); - } - } - -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java index eafbde4..6588370 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java @@ -3,8 +3,9 @@ package net.maku.iot.service; import net.maku.framework.common.utils.PageResult; import net.maku.framework.mybatis.service.BaseService; import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; import net.maku.iot.query.IotDeviceQuery; +import net.maku.iot.communication.BaseCommunication; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO; import net.maku.iot.vo.DeviceCommandVO; import net.maku.iot.vo.DeviceReportAttributeDataVO; @@ -28,6 +29,27 @@ public interface IotDeviceService extends BaseService { void delete(List idList); /** + * 根据设备的协议类型获取发送服务 + * @param device 设备 + * @return + */ + BaseCommunication getSendService(IotDeviceEntity device); + + /** + * 根据协议类型获取发送服务 + * @param protocolType + * @return + */ + BaseCommunication getSendService(String protocolType); + + /** + * 根据设备ID获取发送服务 + * @param deviceId + * @return + */ + BaseCommunication getSendService(Long deviceId); + + /** * 对设备下发指令-同步响应模式 * * @param vo diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java index d7bebfa..50fd92e 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java @@ -16,12 +16,13 @@ import net.maku.iot.convert.IotDeviceConvert; import net.maku.iot.dao.IotDeviceDao; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.*; -import net.maku.iot.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.mqtt.dto.DevicePropertyDTO; -import net.maku.iot.mqtt.handler.DeviceCommandResponseHandler; -import net.maku.iot.mqtt.handler.DevicePropertyChangeHandler; -import net.maku.iot.mqtt.service.DeviceMqttService; +import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler; +import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler; import net.maku.iot.query.IotDeviceQuery; +import net.maku.iot.communication.BaseCommunication; +import net.maku.iot.communication.CommunicationServiceFactory; import net.maku.iot.service.IotDeviceEventLogService; import net.maku.iot.service.IotDeviceService; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO; @@ -35,7 +36,7 @@ import java.time.LocalDateTime; import java.util.List; /** - * 设备表 + * 设备服务类 * * @author LSF maku_lsf@163.com */ @@ -45,8 +46,7 @@ import java.util.List; public class IotDeviceServiceImpl extends BaseServiceImpl implements IotDeviceService, DevicePropertyChangeHandler, DeviceCommandResponseHandler { - //todo 后续版本更改为根据物模型自动选择不同的通信层Service - private final DeviceMqttService mqttService; + private final CommunicationServiceFactory communicationService; private final IotDeviceEventLogService deviceEventLogService; @Override @@ -85,6 +85,28 @@ public class IotDeviceServiceImpl extends BaseServiceImpl