From 88f88807e27096822bf892a87883cc9143c67e6c Mon Sep 17 00:00:00 2001 From: LSF <695944503@qq.com> Date: Wed, 14 Aug 2024 20:12:56 +0800 Subject: [PATCH] =?UTF-8?q?add:=E8=B0=83=E6=95=B4=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../maku/iot/communication/BaseCommunication.java | 28 --- .../communication/CommunicationServiceFactory.java | 36 ---- .../net/maku/iot/communication/MQTTService.java | 198 --------------------- .../net/maku/iot/communication/TCPService.java | 45 ----- .../communication/dto/BaseCommandResponseDTO.java | 18 ++ .../maku/iot/communication/dto/BaseDeviceID.java | 17 ++ .../iot/communication/dto/DeviceCommandDTO.java | 32 ++++ .../dto/DeviceCommandResponseDTO.java | 41 +++++ .../iot/communication/dto/DevicePropertyDTO.java | 26 +++ .../net/maku/iot/communication/dto/TcpMsgDTO.java | 16 ++ .../mqtt/chan/CommandResponseChan.java | 97 ++++++++++ .../mqtt/dto/BaseCommandResponse.java | 18 -- .../mqtt/dto/CommandResponseChan.java | 96 ---------- .../communication/mqtt/dto/DeviceCommandDTO.java | 32 ---- .../mqtt/dto/DeviceCommandResponseDTO.java | 41 ----- .../communication/mqtt/dto/DevicePropertyDTO.java | 26 --- .../mqtt/handler/DeviceCommandResponseHandler.java | 2 +- .../DeviceCommandResponseMqttMessageHandler.java | 4 +- .../mqtt/handler/DevicePropertyChangeHandler.java | 2 +- .../handler/DevicePropertyMqttMessageHandler.java | 2 +- .../communication/service/BaseCommunication.java | 28 +++ .../service/CommunicationServiceFactory.java | 36 ++++ .../iot/communication/service/MQTTService.java | 195 ++++++++++++++++++++ .../maku/iot/communication/service/TCPService.java | 45 +++++ .../tcp/config/NettyClientConfig.java | 55 ++++++ .../tcp/config/NettyClientStartupConfig.java | 28 +++ .../tcp/config/NettyServerConfig.java | 85 +++++++++ .../iot/communication/tcp/config/TcpClient.java | 42 +++++ .../tcp/factory/TcpMessageHandlerFactory.java | 47 +++++ .../tcp/handler/ConnectionHandler.java | 82 +++++++++ .../DeviceCommandResponseTCPMessageHandler.java | 21 +++ .../handler/DevicePropertyTCPMessageHandler.java | 21 +++ .../tcp/handler/TCPMessageHandler.java | 26 +++ .../net/maku/iot/service/IotDeviceService.java | 4 +- .../iot/service/impl/IotDeviceServiceImpl.java | 8 +- 35 files changed, 969 insertions(+), 531 deletions(-) delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java delete mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java create mode 100644 maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java deleted file mode 100644 index ef40e9f..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java +++ /dev/null @@ -1,28 +0,0 @@ -package net.maku.iot.communication; - -import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.enums.DeviceCommandEnum; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; - -/** - * 基础通信协议具备功能 - */ -public interface BaseCommunication { - - // 异步发送指令,不等待设备响应 - String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); - - //同步发送指定,等待设备响应 - DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); - - //同步发送指定,等待设备响应,调试实现 - DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload); - - //模拟设备属性上报 - void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload); - - //模拟设备服务指令响应数据 - void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload); - - -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java deleted file mode 100644 index c4535e1..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -package net.maku.iot.communication; - -import lombok.AllArgsConstructor; -import net.maku.framework.common.exception.ServerException; -import org.springframework.stereotype.Service; - -/** - * @Description TODO - * @Author LSF - * @Date 2024/8/9 14:53 - */ -@Service -@AllArgsConstructor -public class CommunicationServiceFactory { - - private final MQTTService mqttService; - private final TCPService tcpService; - - public BaseCommunication getProtocol(String protocolType) { - if (protocolType == null) { - new ServerException("协议不存在!"); - } - switch (protocolType) { - case "MQTT": - return mqttService; - case "TCP": - return tcpService; -// case "Modbus": -// return tcpService; - default: - return null; - } - } - - -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java deleted file mode 100644 index 462dc64..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java +++ /dev/null @@ -1,198 +0,0 @@ -package net.maku.iot.communication; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONUtil; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import net.maku.framework.common.exception.ServerException; -import net.maku.iot.communication.mqtt.MqttGateway; -import net.maku.iot.communication.mqtt.dto.CommandResponseChan; -import net.maku.iot.communication.mqtt.dto.DeviceCommandDTO; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.dto.DeviceClientDTO; -import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.enums.DeviceCommandEnum; -import net.maku.iot.enums.DeviceTopicEnum; -import net.maku.iot.service.IotDeviceServiceLogService; -import org.springframework.stereotype.Component; - -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Exchanger; - -/** - * @Description TODO - * @Author LSF - * @Date 2024/8/9 14:21 - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class MQTTService implements BaseCommunication { - - private final MqttGateway mqttGateway; - private final IotDeviceServiceLogService iotDeviceEventLogService; - - /** - * 异步发送命令,返回命令id - * - * @param device - * @param command - * @param payload - * @return - */ - @Override - public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return asyncSendCommand(device, command, payload, Boolean.FALSE); - } - - /** - * 异步发送命令,返回命令id - * - * @param device - * @param command - * @param payload - * @param retained - * @return - */ - public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { - // 构建命令对象 - String commandId = StrUtil.replaceChars(UUID.randomUUID().toString(), "-", ""); - DeviceCommandDTO commandDTO = new DeviceCommandDTO(); - commandDTO.setCommand(command); - commandDTO.setId(commandId); - commandDTO.setPayload(payload); - String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device)); - - // 发送命令到设备命令主题 - try { - mqttGateway.sendToMqtt(commandTopic, retained, JSONUtil.toJsonStr(commandDTO)); - } catch (Exception e) { - log.error(e.getMessage()); - throw new ServerException(StrUtil.format("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 失败", - command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic)); - } - log.info("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 成功", command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic); - iotDeviceEventLogService.createAndSaveDeviceServiceLog(device.getId(), device.getTenantId(), command, commandId, payload); - return commandId; - } - - /** - * 同步发送命令并返回响应结果 - * - * @param device - * @param command - * @param payload - * @return - */ - public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return syncSendCommand(device, command, payload, Boolean.FALSE); - } - - /** - * 发送命令并返回响应结果 - * - * @param device - * @param command - * @param payload - * @param retained - * @return - */ - public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { - // 构建并发送命令 - String commandId = asyncSendCommand(device, command, payload, retained); - // 等待返回结果 - Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); - if (receiver == null) { - throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); - } - return (DeviceCommandResponseDTO) receiver; - } - - /** - * 发送命令并返回响应结果,模拟设备响应 - * - * @param device - * @param command - * @param payload - * @return - */ - public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - // 构建并发送命令 - String commandId = asyncSendCommand(device, command, payload); - - // 2秒后模拟设备响应 - new Thread(() -> { - try { - //模拟设备正常响应 - Thread.sleep(2000); - //模拟设备超时响应 - //Thread.sleep(15000); - DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO(); - simulateResponseDto.setCommandId(commandId); - simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!"); - simulateResponseDto.setCommand(command); - simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("模拟设备响应线程被中断", e); - } - }).start(); - - // 等待返回结果 - Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); - if (receiver == null) { - throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); - } - return (DeviceCommandResponseDTO) receiver; - } - - /** - * 模拟设备属性上报 - */ - @Override - public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { - // 封装 设备属性上报的 topic - String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); - try { - mqttGateway.sendToMqtt(commandTopic, payload); - } catch (Exception e) { - log.error(e.getMessage()); - throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ", - device.getCode(), device.getName(), commandTopic)); - } - } - - - /** - * 模拟设备服务指令响应数据 - * - * @param device - * @param payload - */ - @Override - public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { - // 封装 设备命令执行结果的 topic - String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); - try { - mqttGateway.sendToMqtt(commandTopic, payload); - } catch (Exception e) { - log.error(e.getMessage()); - throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ", - device.getCode(), device.getName(), commandTopic)); - } - } - - /** - * 设备命令响应处理,把设备响应结果放入通道中 - * - * @param commandResponse 设备命令响应 - */ - public void commandReplied(DeviceCommandResponseDTO commandResponse) { - CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false); - commandResponseChan.put(commandResponse); - } - - -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java deleted file mode 100644 index 8ee0d71..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java +++ /dev/null @@ -1,45 +0,0 @@ -package net.maku.iot.communication; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.enums.DeviceCommandEnum; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; -import org.springframework.stereotype.Component; - -/** - * @Description TODO - * @Author LSF - * @Date 2024/8/9 14:21 - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class TCPService implements BaseCommunication { - - - @Override - public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return ""; - } - - @Override - public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return null; - } - - @Override - public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return null; - } - - @Override - public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { - return; - } - - @Override - public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { - return; - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java new file mode 100644 index 0000000..db67d94 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java @@ -0,0 +1,18 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * 响应消息基础类 + * + * @author eden on 2024/6/17 + */ +@Data +public class BaseCommandResponseDTO extends BaseDeviceID { + /** + * 命令ID + */ + @Schema(description = "命令ID", required = true) + protected String commandId; +} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java new file mode 100644 index 0000000..927290a --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java @@ -0,0 +1,17 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 17:24 + */ +@Data +@Schema(description = "设备ID") +public class BaseDeviceID { + + @Schema(description = "设备ID") + protected String deviceID; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java new file mode 100644 index 0000000..7a921ff --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java @@ -0,0 +1,32 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import net.maku.iot.enums.DeviceCommandEnum; + +/** + * 设备命令对象 + * + * @author LSF maku_lsf@163.com + */ +@Data +@Schema(description = "设备命令对象") +public class DeviceCommandDTO extends BaseDeviceID { + /** + * 命令类型 + */ + @Schema(description = "命令类型", required = true) + private DeviceCommandEnum command; + + /** + * 命令id + */ + @Schema(description = "命令id", required = true) + private String id; + + /** + * 命令内容 + */ + @Schema(description = "命令内容") + private String payload; +} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java new file mode 100644 index 0000000..1793647 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java @@ -0,0 +1,41 @@ +package net.maku.iot.communication.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import net.maku.iot.enums.DeviceCommandEnum; + +/** + * 设备命令响应DTO + * + * @author LSF maku_lsf@163.com + */ +@EqualsAndHashCode(callSuper = true) +@Data +@Schema(description = "设备命令响应DTO") +@JsonIgnoreProperties(ignoreUnknown = true) +public class DeviceCommandResponseDTO extends BaseCommandResponseDTO { + /** + * 命令类型 + */ + @Schema(description = "命令类型", required = true) + private DeviceCommandEnum command; + + /** + * 命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应) + */ + @Schema(description = "命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应)") + private boolean isCompleted = true; + + /** + * 响应状态码,0成功,其它数值异常,根据业务需要自定义 + */ + @Schema(description = "响应状态码,0成功,其它数值异常,根据业务需要自定义") + private Integer statusCode = 0; + /** + * 命令响应结果 + */ + @Schema(description = "命令响应结果") + private String responsePayload; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java new file mode 100644 index 0000000..e43249e --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java @@ -0,0 +1,26 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import net.maku.iot.enums.DevicePropertyEnum; + +/** + * 设备属性对象 + * + * @author LSF maku_lsf@163.com + */ +@Data +@Schema(description = "设备属性对象") +public class DevicePropertyDTO extends BaseDeviceID { + /** + * 设备属性类型 + */ + @Schema(description = "设备属性类型") + private DevicePropertyEnum propertyType; + + /** + * 属性数据 + */ + @Schema(description = "状态数据,不同状态类型需传入相应的状态数据", required = true) + private String payload; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java new file mode 100644 index 0000000..b2fdf66 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java @@ -0,0 +1,16 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 19:31 + */ +@Data +@Schema(description = "tcp通讯数据包装类") +public class TcpMsgDTO { + private String topic; + private Object msg; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java new file mode 100644 index 0000000..f83159d --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java @@ -0,0 +1,97 @@ +package net.maku.iot.communication.mqtt.chan; + +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.dto.BaseCommandResponseDTO; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * 数据生产消费者通道 + */ +@Slf4j +public class CommandResponseChan { + + // 存储通道的 ConcurrentHashMap + private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); + + private final CompletableFuture future = new CompletableFuture<>(); + + private final Long DEFAULT_WAIT_MILLISECONDS = 5 * 1000L; + + // 私有构造函数,不允许外部直接实例化 + private CommandResponseChan() { + } + + /** + * 获取或创建通道实例 + * + * @param commandId 通道标识 + * @param isNeedCreate 是否需要创建新的通道实例 + * @return 通道实例 + */ + public static CommandResponseChan getInstance(String commandId, boolean isNeedCreate) { + if (!isNeedCreate) { + return CHANNEL.get(commandId); + } + return CHANNEL.computeIfAbsent(commandId, k -> new CommandResponseChan()); + } + + /** + * 从通道中获取数据,默认超时时间为 5 秒 + * + * @param commandId 通道标识 + * @return 获取的数据,如果超时返回 null + */ + public BaseCommandResponseDTO get(String commandId) { + return get(commandId, DEFAULT_WAIT_MILLISECONDS); + } + + /** + * 从通道中获取数据,支持超时设置 + * + * @param commandId 通道标识 + * @param timeout 超时时间(毫秒) + * @return 获取的数据,如果超时返回 null + */ + public BaseCommandResponseDTO get(String commandId, long timeout) { + CommandResponseChan channel = CHANNEL.get(commandId); + if (Objects.isNull(channel)) { + return null; + } + try { + return channel.future.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // 超时异常处理 + log.error("Device response timeout. {}", commandId); + return null; + } catch (Exception e) { + // 其他异常处理 + e.printStackTrace(); + return null; + } finally { + // 确保在获取数据后移除通道 + CHANNEL.remove(commandId, channel); + } + } + + /** + * 向通道中放入数据,并唤醒可能正在等待数据的线程 + * + * @param response 要放入的数据 + */ + public void put(BaseCommandResponseDTO response) { + String commandId = response.getCommandId(); + if (commandId == null) { + return; + } + CommandResponseChan channel = CHANNEL.get(commandId); + if (Objects.isNull(channel)) { + return; + } + channel.future.complete(response); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java deleted file mode 100644 index 719fdd5..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java +++ /dev/null @@ -1,18 +0,0 @@ -package net.maku.iot.communication.mqtt.dto; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; - -/** - * 响应消息基础类 - * - * @author eden on 2024/6/17 - */ -@Data -public class BaseCommandResponse { - /** - * 命令ID - */ - @Schema(description = "命令ID", required = true) - protected String commandId; -} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java deleted file mode 100644 index 1896613..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java +++ /dev/null @@ -1,96 +0,0 @@ -package net.maku.iot.communication.mqtt.dto; - -import lombok.extern.slf4j.Slf4j; - -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * 数据生产消费者通道 - */ -@Slf4j -public class CommandResponseChan { - - // 存储通道的 ConcurrentHashMap - private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); - - private final CompletableFuture future = new CompletableFuture<>(); - - private final Long DEFAULT_WAIT_MILLISECONDS = 5 * 1000L; - - // 私有构造函数,不允许外部直接实例化 - private CommandResponseChan() { - } - - /** - * 获取或创建通道实例 - * - * @param commandId 通道标识 - * @param isNeedCreate 是否需要创建新的通道实例 - * @return 通道实例 - */ - public static CommandResponseChan getInstance(String commandId, boolean isNeedCreate) { - if (!isNeedCreate) { - return CHANNEL.get(commandId); - } - return CHANNEL.computeIfAbsent(commandId, k -> new CommandResponseChan()); - } - - /** - * 从通道中获取数据,默认超时时间为 5 秒 - * - * @param commandId 通道标识 - * @return 获取的数据,如果超时返回 null - */ - public BaseCommandResponse get(String commandId) { - return get(commandId, DEFAULT_WAIT_MILLISECONDS); - } - - /** - * 从通道中获取数据,支持超时设置 - * - * @param commandId 通道标识 - * @param timeout 超时时间(毫秒) - * @return 获取的数据,如果超时返回 null - */ - public BaseCommandResponse get(String commandId, long timeout) { - CommandResponseChan channel = CHANNEL.get(commandId); - if (Objects.isNull(channel)) { - return null; - } - try { - return channel.future.get(timeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - // 超时异常处理 - log.error("Device response timeout. {}", commandId); - return null; - } catch (Exception e) { - // 其他异常处理 - e.printStackTrace(); - return null; - } finally { - // 确保在获取数据后移除通道 - CHANNEL.remove(commandId, channel); - } - } - - /** - * 向通道中放入数据,并唤醒可能正在等待数据的线程 - * - * @param response 要放入的数据 - */ - public void put(BaseCommandResponse response) { - String commandId = response.getCommandId(); - if (commandId == null) { - return; - } - CommandResponseChan channel = CHANNEL.get(commandId); - if (Objects.isNull(channel)) { - return; - } - channel.future.complete(response); - } -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java deleted file mode 100644 index e8154de..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java +++ /dev/null @@ -1,32 +0,0 @@ -package net.maku.iot.communication.mqtt.dto; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; -import net.maku.iot.enums.DeviceCommandEnum; - -/** - * 设备命令对象 - * - * @author LSF maku_lsf@163.com - */ -@Data -@Schema(description = "设备命令对象") -public class DeviceCommandDTO { - /** - * 命令类型 - */ - @Schema(description = "命令类型", required = true) - private DeviceCommandEnum command; - - /** - * 命令id - */ - @Schema(description = "命令id", required = true) - private String id; - - /** - * 命令内容 - */ - @Schema(description = "命令内容") - private String payload; -} \ No newline at end of file diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java deleted file mode 100644 index eb762f9..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java +++ /dev/null @@ -1,41 +0,0 @@ -package net.maku.iot.communication.mqtt.dto; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; -import lombok.EqualsAndHashCode; -import net.maku.iot.enums.DeviceCommandEnum; - -/** - * 设备命令响应DTO - * - * @author LSF maku_lsf@163.com - */ -@EqualsAndHashCode(callSuper = true) -@Data -@Schema(description = "设备命令响应DTO") -@JsonIgnoreProperties(ignoreUnknown = true) -public class DeviceCommandResponseDTO extends BaseCommandResponse { - /** - * 命令类型 - */ - @Schema(description = "命令类型", required = true) - private DeviceCommandEnum command; - - /** - * 命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应) - */ - @Schema(description = "命令是否完成(默认true:命令已完成;false:命令未完成,后续命令完成将再次发送响应消息,服务端将继续等待该命令完成的响应)") - private boolean isCompleted = true; - - /** - * 响应状态码,0成功,其它数值异常,根据业务需要自定义 - */ - @Schema(description = "响应状态码,0成功,其它数值异常,根据业务需要自定义") - private Integer statusCode = 0; - /** - * 命令响应结果 - */ - @Schema(description = "命令响应结果") - private String responsePayload; -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java deleted file mode 100644 index fed63de..0000000 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java +++ /dev/null @@ -1,26 +0,0 @@ -package net.maku.iot.communication.mqtt.dto; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Data; -import net.maku.iot.enums.DevicePropertyEnum; - -/** - * 设备属性对象 - * - * @author LSF maku_lsf@163.com - */ -@Data -@Schema(description = "设备属性对象") -public class DevicePropertyDTO { - /** - * 设备属性类型 - */ - @Schema(description = "设备属性类型") - private DevicePropertyEnum propertyType; - - /** - * 属性数据 - */ - @Schema(description = "状态数据,不同状态类型需传入相应的状态数据", required = true) - private String payload; -} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java index 3af2705..76bb221 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java @@ -1,7 +1,7 @@ package net.maku.iot.communication.mqtt.handler; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; /** * 设备命令响应处理器 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java index 5389920..5cae0e2 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java @@ -4,8 +4,8 @@ import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; -import net.maku.iot.communication.MQTTService; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.service.MQTTService; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java index 3822fe7..17eee72 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java @@ -1,7 +1,7 @@ package net.maku.iot.communication.mqtt.handler; -import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; /** * 设备属性变化处理器 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java index f8416f2..1299857 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java @@ -4,7 +4,7 @@ import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; -import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java new file mode 100644 index 0000000..15882ac --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java @@ -0,0 +1,28 @@ +package net.maku.iot.communication.service; + +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; + +/** + * 基础通信协议具备功能 + */ +public interface BaseCommunication { + + // 异步发送指令,不等待设备响应 + String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //同步发送指定,等待设备响应 + DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //同步发送指定,等待设备响应,调试实现 + DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload); + + //模拟设备属性上报 + void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload); + + //模拟设备服务指令响应数据 + void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload); + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java new file mode 100644 index 0000000..4d34019 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java @@ -0,0 +1,36 @@ +package net.maku.iot.communication.service; + +import lombok.AllArgsConstructor; +import net.maku.framework.common.exception.ServerException; +import org.springframework.stereotype.Service; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:53 + */ +@Service +@AllArgsConstructor +public class CommunicationServiceFactory { + + private final MQTTService mqttService; + private final TCPService tcpService; + + public BaseCommunication getProtocol(String protocolType) { + if (protocolType == null) { + new ServerException("协议不存在!"); + } + switch (protocolType) { + case "MQTT": + return mqttService; + case "TCP": + return tcpService; +// case "Modbus": +// return tcpService; + default: + return null; + } + } + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java new file mode 100644 index 0000000..ee91daa --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java @@ -0,0 +1,195 @@ +package net.maku.iot.communication.service; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.exception.ServerException; +import net.maku.iot.communication.mqtt.MqttGateway; +import net.maku.iot.communication.mqtt.chan.CommandResponseChan; +import net.maku.iot.communication.dto.DeviceCommandDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.dto.DeviceClientDTO; +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.enums.DeviceTopicEnum; +import net.maku.iot.service.IotDeviceServiceLogService; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:21 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class MQTTService implements BaseCommunication { + + private final MqttGateway mqttGateway; + private final IotDeviceServiceLogService iotDeviceEventLogService; + + /** + * 异步发送命令,返回命令id + * + * @param device + * @param command + * @param payload + * @return + */ + @Override + public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return asyncSendCommand(device, command, payload, Boolean.FALSE); + } + + /** + * 异步发送命令,返回命令id + * + * @param device + * @param command + * @param payload + * @param retained + * @return + */ + public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { + // 构建命令对象 + String commandId = StrUtil.replaceChars(UUID.randomUUID().toString(), "-", ""); + DeviceCommandDTO commandDTO = new DeviceCommandDTO(); + commandDTO.setCommand(command); + commandDTO.setId(commandId); + commandDTO.setPayload(payload); + String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device)); + + // 发送命令到设备命令主题 + try { + mqttGateway.sendToMqtt(commandTopic, retained, JSONUtil.toJsonStr(commandDTO)); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 失败", + command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic)); + } + log.info("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 成功", command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic); + iotDeviceEventLogService.createAndSaveDeviceServiceLog(device.getId(), device.getTenantId(), command, commandId, payload); + return commandId; + } + + /** + * 同步发送命令并返回响应结果 + * + * @param device + * @param command + * @param payload + * @return + */ + public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return syncSendCommand(device, command, payload, Boolean.FALSE); + } + + /** + * 发送命令并返回响应结果 + * + * @param device + * @param command + * @param payload + * @param retained + * @return + */ + public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload, boolean retained) { + // 构建并发送命令 + String commandId = asyncSendCommand(device, command, payload, retained); + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); + } + return (DeviceCommandResponseDTO) receiver; + } + + /** + * 发送命令并返回响应结果,模拟设备响应 + * + * @param device + * @param command + * @param payload + * @return + */ + public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + // 构建并发送命令 + String commandId = asyncSendCommand(device, command, payload); + + // 2秒后模拟设备响应 + new Thread(() -> { + try { + //模拟设备正常响应 + Thread.sleep(2000); + //模拟设备超时响应 + //Thread.sleep(15000); + DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO(); + simulateResponseDto.setCommandId(commandId); + simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!"); + simulateResponseDto.setCommand(command); + simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("模拟设备响应线程被中断", e); + } + }).start(); + + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); + } + return (DeviceCommandResponseDTO) receiver; + } + + /** + * 模拟设备属性上报 + */ + @Override + public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { + // 封装 设备属性上报的 topic + String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); + try { + mqttGateway.sendToMqtt(commandTopic, payload); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ", + device.getCode(), device.getName(), commandTopic)); + } + } + + + /** + * 模拟设备服务指令响应数据 + * + * @param device + * @param payload + */ + @Override + public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { + // 封装 设备命令执行结果的 topic + String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); + try { + mqttGateway.sendToMqtt(commandTopic, payload); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ", + device.getCode(), device.getName(), commandTopic)); + } + } + + /** + * 设备命令响应处理,把设备响应结果放入通道中 + * + * @param commandResponse 设备命令响应 + */ + public void commandReplied(DeviceCommandResponseDTO commandResponse) { + CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false); + commandResponseChan.put(commandResponse); + } + + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java new file mode 100644 index 0000000..452bc8c --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java @@ -0,0 +1,45 @@ +package net.maku.iot.communication.service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DeviceCommandEnum; +import org.springframework.stereotype.Component; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/9 14:21 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class TCPService implements BaseCommunication { + + @Override + public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { +// nettyClientConfig.sendMessage("asdddddddddddddddddd"); + return ""; + } + + @Override + public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return null; + } + + @Override + public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { + return null; + } + + @Override + public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { + return; + } + + @Override + public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { + return; + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java new file mode 100644 index 0000000..bbf627c --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java @@ -0,0 +1,55 @@ +package net.maku.iot.communication.tcp.config; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.IpUtils; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Data +@Slf4j +@Configuration +public class NettyClientConfig { + + private ChannelHandlerContext ctx; + + + @Bean + public Bootstrap nettyClient() { + Bootstrap nettyClient = new Bootstrap(); + // 设置事件循环组(主线程组和从线程组) + nettyClient.group(new io.netty.channel.nio.NioEventLoopGroup()) + //指定使用 NioServerSocketChannel 作为服务器通道 + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) { + log.info("<------------------------ 客户端接收到: {}", msg); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + String msg = IpUtils.getHostName(); + log.info("------------------------> 发送消息到服务端: 我是 {}", msg); + ctx.writeAndFlush(msg); + } + }); + } + }); + return nettyClient; + } + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java new file mode 100644 index 0000000..176b7f2 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java @@ -0,0 +1,28 @@ +package net.maku.iot.communication.tcp.config; + +import io.netty.bootstrap.Bootstrap; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.ContextRefreshedEvent; + +@Configuration +@Slf4j +public class NettyClientStartupConfig implements ApplicationListener { + + @Autowired + private Bootstrap nettyClient; + + @Override + public void onApplicationEvent(ContextRefreshedEvent event) { + // 确保服务器启动完成后再启动客户端 + try { + Thread.sleep(5000); // 延迟5秒以确保服务器启动 + nettyClient.connect("127.0.0.1", 8888).sync(); + log.info("Connected to Netty server on port 8888"); + } catch (InterruptedException e) { + log.error("Failed to connect to Netty server", e); + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java new file mode 100644 index 0000000..8b8a338 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java @@ -0,0 +1,85 @@ +package net.maku.iot.communication.tcp.config; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.mqtt.factory.MqttMessageHandlerFactory; +import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory; +import net.maku.iot.communication.tcp.handler.ConnectionHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + + +@Configuration +@Slf4j +public class NettyServerConfig { + + @Bean + public ConcurrentMap deviceChannels() { + return new ConcurrentHashMap<>(); + } + + @Bean + public ServerBootstrap nettyServer(ConcurrentMap deviceChannels) { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast( + new StringDecoder(), + new StringEncoder(), +// new DeviceMsgHandler(deviceChannels), // 添加设备身份处理器 + new ConnectionHandler(deviceChannels) // 添加设备连接处理器 + ); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + return bootstrap; + } + + @Bean + public ChannelFuture serverChannelFuture(ServerBootstrap serverBootstrap) throws InterruptedException { + try { + ChannelFuture future = serverBootstrap.bind(8888).sync(); + log.info("------------------------ Netty 服务器在端口 8888 启动成功"); + return future; + } catch (Exception e) { + log.error("------------------------ Netty 服务器启动失败", e); + throw e; + } + } +} + + +// // 发送命令到设备 +// public void sendCommandToDevice(String deviceId, String command) { +// Channel channel = deviceChannels.get(deviceId); +// if (channel != null && channel.isActive()) { +// channel.writeAndFlush(Unpooled.copiedBuffer(command, CharsetUtil.UTF_8)); +// log.info("发送命令到设备 {}: {}", deviceId, command); +// } else { +// log.warn("设备 {} 不在线或通道无效", deviceId); +// } +// } +// +// // 假设有方法通过通道获取设备 ID +// private String getDeviceId(Channel channel) { +// // 这里应该有逻辑来从通道获取设备 ID +// return "deviceId"; +// } +//} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java new file mode 100644 index 0000000..1fd0ee2 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java @@ -0,0 +1,42 @@ +package net.maku.iot.communication.tcp.config; + +import cn.hutool.json.JSONUtil; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.TcpMsgDTO; +import net.maku.iot.enums.DevicePropertyEnum; +import net.maku.iot.enums.DeviceTopicEnum; + +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.Socket; + +public class TcpClient { + public static void main(String[] args) { + String serverAddress = ""; // 服务端的地址 + int port = 8888; // 服务端的端口号 + + try (Socket socket = new Socket(serverAddress, port); + OutputStream outputStream = socket.getOutputStream(); + PrintWriter writer = new PrintWriter(outputStream, true)) { + + DevicePropertyDTO dto = new DevicePropertyDTO(); + dto.setDeviceID("123456"); + dto.setPropertyType(DevicePropertyEnum.TEMPERATURE); + dto.setPayload("60"); + + + TcpMsgDTO tcpMsgDTO = new TcpMsgDTO(); + tcpMsgDTO.setMsg(dto); + tcpMsgDTO.setTopic(DeviceTopicEnum.PROPERTY.getTopic()); + + + writer.println(JSONUtil.toJsonStr(tcpMsgDTO)); // 发送消息到服务端 + + System.out.println("Message sent: " + JSONUtil.toJsonStr(tcpMsgDTO)); + + Thread.sleep(100000); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java new file mode 100644 index 0000000..a15fcd0 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java @@ -0,0 +1,47 @@ +package net.maku.iot.communication.tcp.factory; + +import lombok.RequiredArgsConstructor; +import net.maku.iot.communication.tcp.handler.TCPMessageHandler; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * TCP消息处理器工厂,自动获取所有实现的处理器实例 + * + * @author LSF maku_lsf@163.com + */ +@Component +@RequiredArgsConstructor +public class TcpMessageHandlerFactory { + private final ApplicationContext applicationContext; + + /** + * 所有消息处理器 + */ + private List messageHandlers; + + private List loadHandlers() { + if (messageHandlers != null) { + return messageHandlers; + } + messageHandlers = new ArrayList<>(applicationContext.getBeansOfType(TCPMessageHandler.class).values()); + return messageHandlers; + } + + /** + * 获取与主题对应的tcp消息处理器 + * + * @param topic 主题 + * @return 处理器列表 + */ + public List getHandlersForTopic(String topic) { + return Collections.unmodifiableList(loadHandlers().stream() + .filter(handler -> handler.supports(topic)) + .collect(Collectors.toList())); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java new file mode 100644 index 0000000..99053a2 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java @@ -0,0 +1,82 @@ +package net.maku.iot.communication.tcp.handler; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.AttributeKey; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.TcpMsgDTO; +import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory; + +import java.util.concurrent.ConcurrentMap; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 16:52 + */ +@Slf4j +public class ConnectionHandler extends ChannelInboundHandlerAdapter { + + + @Resource + private TcpMessageHandlerFactory tcpMessageHandlerFactory; + + private final ConcurrentMap deviceChannels; + + public ConnectionHandler(ConcurrentMap deviceChannels) { + this.deviceChannels = deviceChannels; + } + + + @Override + public void channelActive(ChannelHandlerContext ctx) { + // 请求设备发送其 ID + ctx.writeAndFlush("ACK"); + + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg!=null&& StrUtil.contains(msg.toString(),"topic")) { + // 处理 TCP 消息 + handleTcpMessage(ctx, JsonUtils.parseObject(msg.toString(), TcpMsgDTO.class)); + } else { + // 处理其他类型的消息 + log.warn("接收到未知的消息类型:{}", msg); + } + } + + + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + String deviceId = getDeviceId(ctx.channel()); + if (deviceId != null) { + deviceChannels.remove(deviceId); + } + log.info(" {} 断开连接", deviceId == null ? "未知设备" : deviceId); + } + + private void handleTcpMessage(ChannelHandlerContext ctx, TcpMsgDTO message) { + String topic = message.getTopic(); + if (topic != null) { + tcpMessageHandlerFactory.getHandlersForTopic(topic).forEach(handler -> { + handler.handle(topic, message.getMsg().toString()); + }); + } else { + log.warn("接收到主题为null的消息。"); + } + } + + + private String getDeviceId(Channel channel) { + // 从 Channel 的属性中获取设备 ID + return channel.attr(AttributeKey.valueOf("deviceId")).get(); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java new file mode 100644 index 0000000..687212b --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java @@ -0,0 +1,21 @@ +package net.maku.iot.communication.tcp.handler; + +import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; +import net.maku.iot.enums.DeviceTopicEnum; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 19:23 + */ +public class DeviceCommandResponseTCPMessageHandler implements MqttMessageHandler { + @Override + public boolean supports(String topic) { + return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.COMMAND_RESPONSE.getTopic()); + } + + @Override + public void handle(String topic, String message) { + //TCP设备响应处理 + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java new file mode 100644 index 0000000..23bf225 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java @@ -0,0 +1,21 @@ +package net.maku.iot.communication.tcp.handler; + +import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; +import net.maku.iot.enums.DeviceTopicEnum; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 19:24 + */ +public class DevicePropertyTCPMessageHandler implements MqttMessageHandler { + @Override + public boolean supports(String topic) { + return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.PROPERTY.getTopic()); + } + + @Override + public void handle(String topic, String message) { + //TCP设备属性上报处理 + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java new file mode 100644 index 0000000..900bc21 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java @@ -0,0 +1,26 @@ +package net.maku.iot.communication.tcp.handler; + + +/** + * TCP消息处理接口 + * + * @author LSF maku_lsf@163.com + */ +public interface TCPMessageHandler { + + /** + * 是否支持处理指定的topic + * + * @param topic + * @return + */ + boolean supports(String topic); + + /** + * TCP消息处理接口 + * + * @param topic + * @param message + */ + void handle(String topic, String message); +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java index 6588370..b2e0105 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java @@ -3,9 +3,9 @@ package net.maku.iot.service; import net.maku.framework.common.utils.PageResult; import net.maku.framework.mybatis.service.BaseService; import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; import net.maku.iot.query.IotDeviceQuery; -import net.maku.iot.communication.BaseCommunication; +import net.maku.iot.communication.service.BaseCommunication; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO; import net.maku.iot.vo.DeviceCommandVO; import net.maku.iot.vo.DeviceReportAttributeDataVO; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java index 50fd92e..b9e48aa 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java @@ -16,13 +16,13 @@ import net.maku.iot.convert.IotDeviceConvert; import net.maku.iot.dao.IotDeviceDao; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.*; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler; import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler; import net.maku.iot.query.IotDeviceQuery; -import net.maku.iot.communication.BaseCommunication; -import net.maku.iot.communication.CommunicationServiceFactory; +import net.maku.iot.communication.service.BaseCommunication; +import net.maku.iot.communication.service.CommunicationServiceFactory; import net.maku.iot.service.IotDeviceEventLogService; import net.maku.iot.service.IotDeviceService; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO;