diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java index c782212..50c654b 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java @@ -176,7 +176,7 @@ public class MQTTService implements BaseCommunication { mqttGateway.sendToMqtt(commandTopic, payload); } catch (Exception e) { log.error(e.getMessage()); - throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ", + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟命令执行结果上报失败! Topic:{} ", device.getCode(), device.getName(), commandTopic)); } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java index 51c37ac..dc166f7 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java @@ -5,10 +5,10 @@ import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.exception.ServerException; +import net.maku.iot.communication.dto.CommandResponseChan; import net.maku.iot.communication.dto.DeviceCommandDTO; import net.maku.iot.communication.dto.DeviceCommandResponseDTO; -import net.maku.iot.communication.dto.CommandResponseChan; -import net.maku.iot.communication.mqtt.MqttGateway; +import net.maku.iot.communication.dto.DevicePropertyDTO; import net.maku.iot.communication.tcp.TcpGateway; import net.maku.iot.dto.DeviceClientDTO; import net.maku.iot.entity.IotDeviceEntity; @@ -30,7 +30,6 @@ import java.util.UUID; public class TCPService implements BaseCommunication { private final TcpGateway tcpGateway; - private final IotDeviceServiceLogService iotDeviceEventLogService; @Override @@ -40,6 +39,7 @@ public class TCPService implements BaseCommunication { DeviceCommandDTO commandDTO = new DeviceCommandDTO(); commandDTO.setCommand(command); commandDTO.setId(commandId); + commandDTO.setDeviceId(String.valueOf(device.getId())); commandDTO.setPayload(payload); String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device)); @@ -83,6 +83,7 @@ public class TCPService implements BaseCommunication { DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO(); simulateResponseDto.setCommandId(commandId); simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!"); + simulateResponseDto.setDeviceId(String.valueOf(device.getId())); simulateResponseDto.setCommand(command); simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto)); } catch (InterruptedException e) { @@ -104,7 +105,7 @@ public class TCPService implements BaseCommunication { // 封装 设备属性上报的 topic String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); try { - tcpGateway.sendCommandToDevice(device.getId(),commandTopic, payload); + tcpGateway.simulateDeviceReport(device.getId(), commandTopic, payload, DevicePropertyDTO.class); } catch (Exception e) { log.error(e.getMessage()); throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ", @@ -117,10 +118,10 @@ public class TCPService implements BaseCommunication { // 封装 设备命令执行结果的 topic String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); try { - tcpGateway.sendCommandToDevice(device.getId(),commandTopic, payload); + tcpGateway.simulateDeviceReport(device.getId(), commandTopic, payload, DeviceCommandResponseDTO.class); } catch (Exception e) { log.error(e.getMessage()); - throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ", + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟命令执行结果上报失败! Topic:{} ", device.getCode(), device.getName(), commandTopic)); } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java index 24c7d29..d2fdaee 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java @@ -1,23 +1,14 @@ package net.maku.iot.communication.tcp; -import cn.hutool.json.JSONUtil; -import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.util.CharsetUtil; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.exception.ServerException; -import net.maku.iot.communication.mqtt.config.MqttConfig; -import net.maku.iot.communication.tcp.config.NettyServerConfig; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.DeviceCommandDTO; +import net.maku.iot.communication.dto.TcpMsgDTO; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.integration.annotation.MessagingGateway; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.integration.support.MessageBuilder; -import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentMap; /** @@ -34,27 +25,59 @@ public class TcpGateway { @Autowired public TcpGateway(ConcurrentMap deviceChannels) { - System.out.printf("-------------------------------->TcpGateway"); this.deviceChannels = deviceChannels; } /** + * 获取设备通道 + * + * @return + */ + public ConcurrentMap getTcpDeviceChannels() { + return deviceChannels; + } + + /** * 发送命令到设备 * @param deviceId 设备ID * @param commandTopic 命令主题 * @param payload 命令内容 */ public void sendCommandToDevice(Long deviceId, String commandTopic, String payload) { - Channel channel = deviceChannels.get(deviceId); + Channel channel = deviceChannels.get(deviceId.toString()); if (channel != null && channel.isActive()) { - Map payloadMap = new HashMap(); - payloadMap.put("topic", commandTopic); - payloadMap.put("payload", payload); + TcpMsgDTO tcpMsgDTO = new TcpMsgDTO(); + tcpMsgDTO.setTopic(commandTopic); + DeviceCommandDTO deviceCommandDTO = JsonUtils.parseObject(payload, DeviceCommandDTO.class); + deviceCommandDTO.setDeviceId(deviceId.toString()); + tcpMsgDTO.setMsg(deviceCommandDTO); - channel.writeAndFlush(Unpooled.copiedBuffer(JSONUtil.toJsonStr(payloadMap), CharsetUtil.UTF_8)); + String commandJson = JsonUtils.toJsonString(tcpMsgDTO); +// channel.writeAndFlush(commandJson); log.info("发送命令到设备 {}: {}", deviceId, payload); } else { throw new ServerException("设备"+deviceId+"不在线或通道无效"); } } + + public void simulateDeviceReport(Long deviceId, String commandTopic, String payload, Class reportDtoclazz) { + Channel channel = deviceChannels.get(deviceId.toString()); + if (channel != null && channel.isActive()) { + try { + TcpMsgDTO tcpMsgDTO = new TcpMsgDTO(); + tcpMsgDTO.setTopic(commandTopic); + tcpMsgDTO.setMsg(JsonUtils.parseObject(payload, reportDtoclazz)); + String devicePropertyJson = JsonUtils.toJsonString(tcpMsgDTO); + // 模拟上报,触发 channelRead 处理 + channel.pipeline().fireChannelRead(devicePropertyJson); + log.info("模拟设备 {} 上报数据: {}", deviceId, devicePropertyJson); + } catch (Exception e) { + log.error("模拟设备上报数据时出现错误", e); + } + } else { + throw new ServerException("设备 " + deviceId + " 不在线或通道无效"); + } + } + + } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java index aaa8abb..f9be014 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java @@ -15,15 +15,16 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; /** - * @Description TODO + * @Description TCP服务器连接处理器 * @Author LSF * @Date 2024/8/14 16:52 */ @Slf4j public class ConnectionHandler extends ChannelInboundHandlerAdapter { + public static final AttributeKey DEVICE_ID = AttributeKey.valueOf("DEVICE_ID"); - private final ConcurrentMap deviceChannels; + private ConcurrentMap deviceChannels; private final TcpMessageHandlerFactory tcpMessageHandlerFactory; public ConnectionHandler(ConcurrentMap deviceChannels,TcpMessageHandlerFactory tcpMessageHandlerFactory) { @@ -96,7 +97,8 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter { Matcher matcher = pattern.matcher(message.toString()); if (matcher.find()) { String deviceId = matcher.group(1); - setDeviceId(ctx.channel(), deviceId); +// setDeviceId(ctx.channel(), deviceId); + ctx.channel().attr(DEVICE_ID).set(deviceId); deviceChannels.put(deviceId, ctx.channel()); ctx.writeAndFlush("authenticate passed"); } @@ -114,17 +116,14 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter { } } } - return true; - } private String getDeviceId(Channel channel) { - // 从 Channel 的属性中获取设备 ID - return channel.attr(AttributeKey.valueOf("deviceId")).get(); + return channel.attr(DEVICE_ID).get(); } - private String setDeviceId(Channel channel, String deviceId) { - return channel.attr(AttributeKey.valueOf("deviceId")).setIfAbsent(deviceId); - } +// private String setDeviceId(Channel channel, String deviceId) { +// return channel.attr(AttributeKey.valueOf("deviceId")).setIfAbsent(deviceId); +// } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java index 1ae82d8..e8e00ef 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java @@ -4,12 +4,8 @@ import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import net.maku.framework.common.utils.JsonUtils; import net.maku.iot.communication.dto.DeviceCommandResponseDTO; -import net.maku.iot.communication.dto.DevicePropertyDTO; import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory; -import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; -import net.maku.iot.communication.service.MQTTService; import net.maku.iot.communication.service.TCPService; import net.maku.iot.enums.DeviceTopicEnum; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java index c532d11..0fc2e5e 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java @@ -146,12 +146,14 @@ public class IotDeviceServiceImpl extends BaseServiceImpl