diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java index 927290a..cf51887 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java @@ -13,5 +13,5 @@ import lombok.Data; public class BaseDeviceID { @Schema(description = "设备ID") - protected String deviceID; + protected String deviceId; } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/CommandResponseChan.java similarity index 96% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/CommandResponseChan.java index f83159d..adb1adf 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/CommandResponseChan.java @@ -1,7 +1,6 @@ -package net.maku.iot.communication.mqtt.chan; +package net.maku.iot.communication.dto; import lombok.extern.slf4j.Slf4j; -import net.maku.iot.communication.dto.BaseCommandResponseDTO; import java.util.Objects; import java.util.concurrent.CompletableFuture; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java index ee91daa..c782212 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java @@ -6,7 +6,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.exception.ServerException; import net.maku.iot.communication.mqtt.MqttGateway; -import net.maku.iot.communication.mqtt.chan.CommandResponseChan; +import net.maku.iot.communication.dto.CommandResponseChan; import net.maku.iot.communication.dto.DeviceCommandDTO; import net.maku.iot.communication.dto.DeviceCommandResponseDTO; import net.maku.iot.dto.DeviceClientDTO; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java index 452bc8c..51c37ac 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java @@ -1,12 +1,24 @@ package net.maku.iot.communication.service; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.exception.ServerException; +import net.maku.iot.communication.dto.DeviceCommandDTO; import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.CommandResponseChan; +import net.maku.iot.communication.mqtt.MqttGateway; +import net.maku.iot.communication.tcp.TcpGateway; +import net.maku.iot.dto.DeviceClientDTO; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.DeviceCommandEnum; +import net.maku.iot.enums.DeviceTopicEnum; +import net.maku.iot.service.IotDeviceServiceLogService; import org.springframework.stereotype.Component; +import java.util.UUID; + /** * @Description TODO * @Author LSF @@ -17,29 +29,110 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class TCPService implements BaseCommunication { + private final TcpGateway tcpGateway; + + private final IotDeviceServiceLogService iotDeviceEventLogService; + @Override public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { -// nettyClientConfig.sendMessage("asdddddddddddddddddd"); - return ""; + // 构建命令对象 + String commandId = StrUtil.replaceChars(UUID.randomUUID().toString(), "-", ""); + DeviceCommandDTO commandDTO = new DeviceCommandDTO(); + commandDTO.setCommand(command); + commandDTO.setId(commandId); + commandDTO.setPayload(payload); + String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device)); + + // 发送命令到设备命令主题 + try { + tcpGateway.sendCommandToDevice(device.getId(),commandTopic, JSONUtil.toJsonStr(commandDTO)); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 失败,原因:{}", + command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic,e.getMessage())); + } + log.info("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 成功", command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic); + iotDeviceEventLogService.createAndSaveDeviceServiceLog(device.getId(), device.getTenantId(), command, commandId, payload); + return commandId; } @Override public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return null; + // 构建并发送命令 + String commandId = asyncSendCommand(device, command, payload); + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); + } + return (DeviceCommandResponseDTO) receiver; } @Override public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) { - return null; + // 构建并发送命令 + String commandId = asyncSendCommand(device, command, payload); + + // 2秒后模拟设备响应 + new Thread(() -> { + try { + //模拟设备正常响应 + Thread.sleep(2000); + //模拟设备超时响应 + //Thread.sleep(15000); + DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO(); + simulateResponseDto.setCommandId(commandId); + simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!"); + simulateResponseDto.setCommand(command); + simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("模拟设备响应线程被中断", e); + } + }).start(); + + // 等待返回结果 + Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId); + if (receiver == null) { + throw new ServerException(StrUtil.format("{}设备没有回复", device.getName())); + } + return (DeviceCommandResponseDTO) receiver; } @Override public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) { - return; + // 封装 设备属性上报的 topic + String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); + try { + tcpGateway.sendCommandToDevice(device.getId(),commandTopic, payload); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ", + device.getCode(), device.getName(), commandTopic)); + } } @Override public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) { - return; + // 封装 设备命令执行结果的 topic + String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device)); + try { + tcpGateway.sendCommandToDevice(device.getId(),commandTopic, payload); + } catch (Exception e) { + log.error(e.getMessage()); + throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ", + device.getCode(), device.getName(), commandTopic)); + } + } + + + /** + * 设备命令响应处理,把设备响应结果放入通道中 + * + * @param commandResponse 设备命令响应 + */ + public void commandReplied(DeviceCommandResponseDTO commandResponse) { + CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false); + commandResponseChan.put(commandResponse); } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java new file mode 100644 index 0000000..24c7d29 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/TcpGateway.java @@ -0,0 +1,60 @@ +package net.maku.iot.communication.tcp; + +import cn.hutool.json.JSONUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.util.CharsetUtil; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.exception.ServerException; +import net.maku.iot.communication.mqtt.config.MqttConfig; +import net.maku.iot.communication.tcp.config.NettyServerConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +/** + * TCP 网关 + * + * @author LSF maku_lsf@163.com + */ +@Component +@Slf4j +public class TcpGateway { + + @Autowired + private final ConcurrentMap deviceChannels; + + @Autowired + public TcpGateway(ConcurrentMap deviceChannels) { + System.out.printf("-------------------------------->TcpGateway"); + this.deviceChannels = deviceChannels; + } + + /** + * 发送命令到设备 + * @param deviceId 设备ID + * @param commandTopic 命令主题 + * @param payload 命令内容 + */ + public void sendCommandToDevice(Long deviceId, String commandTopic, String payload) { + Channel channel = deviceChannels.get(deviceId); + if (channel != null && channel.isActive()) { + Map payloadMap = new HashMap(); + payloadMap.put("topic", commandTopic); + payloadMap.put("payload", payload); + + channel.writeAndFlush(Unpooled.copiedBuffer(JSONUtil.toJsonStr(payloadMap), CharsetUtil.UTF_8)); + log.info("发送命令到设备 {}: {}", deviceId, payload); + } else { + throw new ServerException("设备"+deviceId+"不在线或通道无效"); + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java index bbf627c..4422f34 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java @@ -1,55 +1,89 @@ package net.maku.iot.communication.tcp.config; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; -import lombok.Data; import lombok.extern.slf4j.Slf4j; -import net.maku.framework.common.utils.IpUtils; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.TcpMsgDTO; +import net.maku.iot.dto.DeviceClientDTO; +import net.maku.iot.entity.IotDeviceEntity; +import net.maku.iot.enums.DevicePropertyEnum; +import net.maku.iot.enums.DeviceRunningStatusEnum; +import net.maku.iot.enums.DeviceTopicEnum; +import net.maku.iot.service.IotDeviceService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -@Data +import java.util.HashMap; +import java.util.List; +import java.util.Map; + @Slf4j @Configuration public class NettyClientConfig { - private ChannelHandlerContext ctx; - + @Autowired + private IotDeviceService deviceService; @Bean public Bootstrap nettyClient() { Bootstrap nettyClient = new Bootstrap(); - // 设置事件循环组(主线程组和从线程组) - nettyClient.group(new io.netty.channel.nio.NioEventLoopGroup()) - //指定使用 NioServerSocketChannel 作为服务器通道 + nettyClient.group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, true) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler() { - @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) { - log.info("<------------------------ 客户端接收到: {}", msg); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) { - String msg = IpUtils.getHostName(); - log.info("------------------------> 发送消息到服务端: 我是 {}", msg); - ctx.writeAndFlush(msg); - } - }); - } - }); + .option(ChannelOption.SO_KEEPALIVE, true) // 设置为长连接 + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60*1000); // 设置连接超时时间 return nettyClient; } + public void configureBootstrap(Bootstrap bootstrap) { + List devices = deviceService.list(new LambdaQueryWrapper().eq(IotDeviceEntity::getProtocolType, "TCP")); + for (IotDeviceEntity device : devices) { + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + //模拟设备认证 + Map authenticateMap = new HashMap(); + authenticateMap.put("authenticate", device.getId().toString()); + String authenticateMapJson = JsonUtils.toJsonString(authenticateMap); + log.info("------------------------> 发送认证信息到服务端: {}", authenticateMapJson); + ctx.writeAndFlush(authenticateMapJson); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) { + log.info("设备 {} 接收到服务端消息: {}", device.getId(), msg); + //模拟属性上报 + if(msg.contains("authenticate passed")){ + String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device)); + + DevicePropertyDTO devicePropertyDTO = new DevicePropertyDTO(); + devicePropertyDTO.setDeviceId(device.getId().toString()); + devicePropertyDTO.setPropertyType(DevicePropertyEnum.RUNNING_STATUS); + devicePropertyDTO.setPayload(String.valueOf(DeviceRunningStatusEnum.ONLINE.getValue())); + + TcpMsgDTO tcpMsgDTO = new TcpMsgDTO(); + tcpMsgDTO.setTopic(commandTopic); + tcpMsgDTO.setMsg(devicePropertyDTO); + + String runningStatusjson = JsonUtils.toJsonString(tcpMsgDTO); + log.info("------------------------> 设备发送上线文本:{}",runningStatusjson); + ctx.writeAndFlush(runningStatusjson); + } + } + }); + } + }); + } + } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java index 176b7f2..bcaae48 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java @@ -2,7 +2,7 @@ package net.maku.iot.communication.tcp.config; import io.netty.bootstrap.Bootstrap; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; @@ -11,16 +11,24 @@ import org.springframework.context.event.ContextRefreshedEvent; @Slf4j public class NettyClientStartupConfig implements ApplicationListener { - @Autowired - private Bootstrap nettyClient; + private final ObjectProvider nettyClientProvider; + private final NettyClientConfig nettyClientConfig; + + public NettyClientStartupConfig(ObjectProvider nettyClientProvider, NettyClientConfig nettyClientConfig) { + this.nettyClientProvider = nettyClientProvider; + this.nettyClientConfig = nettyClientConfig; + } @Override public void onApplicationEvent(ContextRefreshedEvent event) { - // 确保服务器启动完成后再启动客户端 try { Thread.sleep(5000); // 延迟5秒以确保服务器启动 - nettyClient.connect("127.0.0.1", 8888).sync(); - log.info("Connected to Netty server on port 8888"); + Bootstrap nettyClient = nettyClientProvider.getIfAvailable(); + if (nettyClient != null) { + nettyClientConfig.configureBootstrap(nettyClient); + nettyClient.connect("127.0.0.1", 8888).sync(); + log.info("Connected to Netty server on port 8888"); + } } catch (InterruptedException e) { log.error("Failed to connect to Netty server", e); } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java index 34208ac..e190ce9 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java @@ -46,8 +46,8 @@ public class NettyServerConfig { ch.pipeline().addLast( new StringDecoder(), new StringEncoder(), -// new DeviceMsgHandler(deviceChannels), // 添加设备身份处理器 - new ConnectionHandler(deviceChannels,tcpMessageHandlerFactory) // 添加设备连接处理器 + // 添加设备连接处理器 + new ConnectionHandler(deviceChannels,tcpMessageHandlerFactory) ); } }) @@ -68,22 +68,3 @@ public class NettyServerConfig { } } } - - -// // 发送命令到设备 -// public void sendCommandToDevice(String deviceId, String command) { -// Channel channel = deviceChannels.get(deviceId); -// if (channel != null && channel.isActive()) { -// channel.writeAndFlush(Unpooled.copiedBuffer(command, CharsetUtil.UTF_8)); -// log.info("发送命令到设备 {}: {}", deviceId, command); -// } else { -// log.warn("设备 {} 不在线或通道无效", deviceId); -// } -// } -// -// // 假设有方法通过通道获取设备 ID -// private String getDeviceId(Channel channel) { -// // 这里应该有逻辑来从通道获取设备 ID -// return "deviceId"; -// } -//} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java index 1fd0ee2..9c3ec6d 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java @@ -20,7 +20,7 @@ public class TcpClient { PrintWriter writer = new PrintWriter(outputStream, true)) { DevicePropertyDTO dto = new DevicePropertyDTO(); - dto.setDeviceID("123456"); + dto.setDeviceId("123456"); dto.setPropertyType(DevicePropertyEnum.TEMPERATURE); dto.setPayload("60"); diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java index 8d9812d..aaa8abb 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java @@ -1,19 +1,18 @@ package net.maku.iot.communication.tcp.handler; -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.AttributeKey; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; import net.maku.iot.communication.dto.TcpMsgDTO; import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory; import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * @Description TODO @@ -35,22 +34,29 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { - // 请求设备发送其 ID - ctx.writeAndFlush("ACK"); - + System.out.printf("channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (msg!=null&& StrUtil.contains(msg.toString(),"topic")) { - // 处理 TCP 消息 - handleTcpMessage(ctx, JsonUtils.parseObject(msg.toString(), TcpMsgDTO.class)); - } else { - // 处理其他类型的消息 - log.warn("接收到未知的消息类型:{}", msg); + if (msg == null) { + return; + } + //鉴权认证 + if (authenticate(ctx, msg)) { + //这里可以根据业务自定义扩展消息处理 + if (StrUtil.contains(msg.toString(), "topic")) { + // 处理 TCP 消息 + handleTcpMessage( JsonUtils.parseObject(msg.toString(), TcpMsgDTO.class)); + } else { + // 处理其他类型的消息 + log.warn("接收到未知的消息类型:{}", msg); + } + } else { + ctx.close(); } - } + } @Override @@ -59,14 +65,14 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter { if (deviceId != null) { deviceChannels.remove(deviceId); } - log.info(" {} 断开连接", deviceId == null ? "未知设备" : deviceId); + log.info(" 设备 {} 断开连接", deviceId == null ? "未知设备" : deviceId); } - private void handleTcpMessage(ChannelHandlerContext ctx, TcpMsgDTO message) { + private void handleTcpMessage( TcpMsgDTO message) { String topic = message.getTopic(); if (topic != null) { tcpMessageHandlerFactory.getHandlersForTopic(topic).forEach(handler -> { - handler.handle(topic, message.getMsg().toString()); + handler.handle(topic, message.getMsg()); }); } else { log.warn("接收到主题为null的消息。"); @@ -74,8 +80,51 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter { } + /** + * TCP连接鉴权,自行根据业务扩展 + */ + private boolean authenticate(ChannelHandlerContext ctx, Object message) { + String messageRegex = "\"(authenticate|deviceId)\"\\s*:\\s*\"\\d+\""; + Pattern messagePattern = Pattern.compile(messageRegex); + Matcher matcherPattern = messagePattern.matcher(message.toString()); + if (!matcherPattern.find()) { + ctx.writeAndFlush("设备消息无法识别!"); + return false; + } + if (StrUtil.contains(message.toString(), "authenticate")) { + Pattern pattern = Pattern.compile("\"authenticate\"\\s*:\\s*\"(\\d+)\""); + Matcher matcher = pattern.matcher(message.toString()); + if (matcher.find()) { + String deviceId = matcher.group(1); + setDeviceId(ctx.channel(), deviceId); + deviceChannels.put(deviceId, ctx.channel()); + ctx.writeAndFlush("authenticate passed"); + } + } + + if (StrUtil.contains(message.toString(), "deviceId")) { + Pattern pattern = Pattern.compile("\"deviceId\"\\s*:\\s*\"(\\d+)\""); + Matcher matcher = pattern.matcher(message.toString()); + if (matcher.find()) { + String deviceId = matcher.group(1); + Channel channel = deviceChannels.get(deviceId); + if (channel == null) { + ctx.writeAndFlush("设备连接不存在!请重新连接"); + return false; + } + } + } + + return true; + + } + private String getDeviceId(Channel channel) { // 从 Channel 的属性中获取设备 ID return channel.attr(AttributeKey.valueOf("deviceId")).get(); } + + private String setDeviceId(Channel channel, String deviceId) { + return channel.attr(AttributeKey.valueOf("deviceId")).setIfAbsent(deviceId); + } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java index 5d51878..1ae82d8 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java @@ -1,10 +1,21 @@ package net.maku.iot.communication.tcp.handler; +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory; import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; +import net.maku.iot.communication.service.MQTTService; +import net.maku.iot.communication.service.TCPService; import net.maku.iot.enums.DeviceTopicEnum; import org.springframework.stereotype.Component; +import java.util.Optional; + /** * @Description TODO * @Author LSF @@ -12,15 +23,57 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component +@RequiredArgsConstructor public class DeviceCommandResponseTCPMessageHandler implements TCPMessageHandler { + + private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory; + + private final TCPService deviceTCPService; @Override public boolean supports(String topic) { return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.COMMAND_RESPONSE.getTopic()); } @Override - public void handle(String topic, String message) { - //TCP设备响应处理 - System.out.printf("TCP设备响应处理"); + public void handle(String topic, Object message) { + DeviceCommandResponseDTO commandResponseDTO = parseCommandReplyMessage(topic, message); + Optional.ofNullable(commandResponseDTO.getCommand()) + .orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令类型! 主题:'{}',消息:{}", topic, message))); + Optional.ofNullable(commandResponseDTO.getCommandId()) + .orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令ID! 主题:'{}',消息:{}", topic, message))); + Optional.ofNullable(commandResponseDTO) + .ifPresent(responseDTO -> { + // 调用设备命令执行器的命令响应处理逻辑 + try { + deviceTCPService.commandReplied( responseDTO); + } catch (Exception e) { + log.error(StrUtil.format("调用设备命令执行器响应处理方法出错,topic:{}, message:{}", topic, message), e); + } + // 调用自定义命令响应处理器 + try { + deviceCommandResponseHandlerFactory.getHandlers().forEach(h -> h.handle(topic, responseDTO)); + } catch (Exception e) { + log.error(StrUtil.format("调用设备命令响应响应处理器出错,topic:{}, message:{}", topic, message), e); + } + }); + + + + } + + private DeviceCommandResponseDTO parseCommandReplyMessage(String topic, Object message) { + try { + ObjectMapper mapper = new ObjectMapper(); + DeviceCommandResponseDTO commandResponse = mapper.convertValue(message, DeviceCommandResponseDTO.class); + if (StrUtil.isBlank(commandResponse.getCommandId())) { + log.error(StrUtil.format("主题'{}'的消息,缺失指令ID", topic)); + return null; + } + return commandResponse; + + } catch (Exception e) { + log.error(StrUtil.format("将主题'{}'的消息解析为设备命令响应对象失败", topic), e); + return null; + } } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java index 10fac43..8d7d048 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java @@ -1,10 +1,16 @@ package net.maku.iot.communication.tcp.handler; +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; import org.springframework.stereotype.Component; +import java.util.Optional; + /** * @Description TODO * @Author LSF @@ -12,15 +18,31 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component +@RequiredArgsConstructor public class DevicePropertyTCPMessageHandler implements TCPMessageHandler { + + private final DevicePropertyChangeHandlerFactory statusChangeHandlerFactory; + @Override public boolean supports(String topic) { return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.PROPERTY.getTopic()); } @Override - public void handle(String topic, String message) { - //TCP设备属性上报处理 - System.out.printf("TCP设备属性上报处理"); + public void handle(String topic, Object message) { + DevicePropertyDTO devicePropertyDTO = parseStatusMessage(topic, message); + Optional.ofNullable(devicePropertyDTO) + .ifPresent(deviceProperty -> statusChangeHandlerFactory.getHandlers() + .forEach(h -> h.handle(topic, deviceProperty))); + } + + private DevicePropertyDTO parseStatusMessage(String topic, Object message) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.convertValue(message, DevicePropertyDTO.class); + } catch (Exception e) { + log.error(StrUtil.format("将主题'{}'的消息解析为设备运行状态对象失败", topic), e); + return null; + } } } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java index 900bc21..5632532 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java @@ -22,5 +22,5 @@ public interface TCPMessageHandler { * @param topic * @param message */ - void handle(String topic, String message); + void handle(String topic, Object message); } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java index b9e48aa..c532d11 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java @@ -201,9 +201,7 @@ public class IotDeviceServiceImpl extends BaseServiceImpl