add:优化

This commit is contained in:
LSF 2024-08-17 16:34:44 +08:00
parent 3a786fa2eb
commit a784937a68
15 changed files with 488 additions and 191 deletions

View File

@ -13,5 +13,5 @@ import lombok.Data;
public class BaseDeviceID {
@Schema(description = "设备ID")
protected String deviceID;
protected String deviceId;
}

View File

@ -1,7 +1,6 @@
package net.maku.iot.communication.mqtt.chan;
package net.maku.iot.communication.dto;
import lombok.extern.slf4j.Slf4j;
import net.maku.iot.communication.dto.BaseCommandResponseDTO;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

View File

@ -6,7 +6,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.exception.ServerException;
import net.maku.iot.communication.mqtt.MqttGateway;
import net.maku.iot.communication.mqtt.chan.CommandResponseChan;
import net.maku.iot.communication.dto.CommandResponseChan;
import net.maku.iot.communication.dto.DeviceCommandDTO;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.dto.DeviceClientDTO;

View File

@ -1,12 +1,24 @@
package net.maku.iot.communication.service;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.exception.ServerException;
import net.maku.iot.communication.dto.DeviceCommandDTO;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.dto.CommandResponseChan;
import net.maku.iot.communication.mqtt.MqttGateway;
import net.maku.iot.communication.tcp.TcpGateway;
import net.maku.iot.dto.DeviceClientDTO;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.DeviceCommandEnum;
import net.maku.iot.enums.DeviceTopicEnum;
import net.maku.iot.service.IotDeviceServiceLogService;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @Description TODO
* @Author LSF
@ -17,29 +29,110 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class TCPService implements BaseCommunication {
private final TcpGateway tcpGateway;
private final IotDeviceServiceLogService iotDeviceEventLogService;
@Override
public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) {
// nettyClientConfig.sendMessage("asdddddddddddddddddd");
return "";
// 构建命令对象
String commandId = StrUtil.replaceChars(UUID.randomUUID().toString(), "-", "");
DeviceCommandDTO commandDTO = new DeviceCommandDTO();
commandDTO.setCommand(command);
commandDTO.setId(commandId);
commandDTO.setPayload(payload);
String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device));
// 发送命令到设备命令主题
try {
tcpGateway.sendCommandToDevice(device.getId(),commandTopic, JSONUtil.toJsonStr(commandDTO));
} catch (Exception e) {
log.error(e.getMessage());
throw new ServerException(StrUtil.format("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 失败,原因:{}",
command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic,e.getMessage()));
}
log.info("发送'{}'命令:{} 到设备:{}-{}, Topic:{} 成功", command.getTitle(), commandId, device.getCode(), device.getName(), commandTopic);
iotDeviceEventLogService.createAndSaveDeviceServiceLog(device.getId(), device.getTenantId(), command, commandId, payload);
return commandId;
}
@Override
public DeviceCommandResponseDTO syncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) {
return null;
// 构建并发送命令
String commandId = asyncSendCommand(device, command, payload);
// 等待返回结果
Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId);
if (receiver == null) {
throw new ServerException(StrUtil.format("{}设备没有回复", device.getName()));
}
return (DeviceCommandResponseDTO) receiver;
}
@Override
public DeviceCommandResponseDTO syncSendCommandDebug(IotDeviceEntity device, DeviceCommandEnum command, String payload) {
return null;
// 构建并发送命令
String commandId = asyncSendCommand(device, command, payload);
// 2秒后模拟设备响应
new Thread(() -> {
try {
//模拟设备正常响应
Thread.sleep(2000);
//模拟设备超时响应
//Thread.sleep(15000);
DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO();
simulateResponseDto.setCommandId(commandId);
simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!");
simulateResponseDto.setCommand(command);
simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("模拟设备响应线程被中断", e);
}
}).start();
// 等待返回结果
Object receiver = CommandResponseChan.getInstance(commandId, true).get(commandId);
if (receiver == null) {
throw new ServerException(StrUtil.format("{}设备没有回复", device.getName()));
}
return (DeviceCommandResponseDTO) receiver;
}
@Override
public void simulateDeviceReportAttributeData(IotDeviceEntity device, String payload) {
return;
// 封装 设备属性上报的 topic
String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device));
try {
tcpGateway.sendCommandToDevice(device.getId(),commandTopic, payload);
} catch (Exception e) {
log.error(e.getMessage());
throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ",
device.getCode(), device.getName(), commandTopic));
}
}
@Override
public void simulateDeviceCommandResponseAttributeData(IotDeviceEntity device, String payload) {
return;
// 封装 设备命令执行结果的 topic
String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device));
try {
tcpGateway.sendCommandToDevice(device.getId(),commandTopic, payload);
} catch (Exception e) {
log.error(e.getMessage());
throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ",
device.getCode(), device.getName(), commandTopic));
}
}
/**
* 设备命令响应处理把设备响应结果放入通道中
*
* @param commandResponse 设备命令响应
*/
public void commandReplied(DeviceCommandResponseDTO commandResponse) {
CommandResponseChan commandResponseChan = CommandResponseChan.getInstance(commandResponse.getCommandId(), false);
commandResponseChan.put(commandResponse);
}
}

View File

@ -0,0 +1,60 @@
package net.maku.iot.communication.tcp;
import cn.hutool.json.JSONUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.util.CharsetUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.exception.ServerException;
import net.maku.iot.communication.mqtt.config.MqttConfig;
import net.maku.iot.communication.tcp.config.NettyServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
* TCP 网关
*
* @author LSF maku_lsf@163.com
*/
@Component
@Slf4j
public class TcpGateway {
@Autowired
private final ConcurrentMap<String, Channel> deviceChannels;
@Autowired
public TcpGateway(ConcurrentMap<String, Channel> deviceChannels) {
System.out.printf("-------------------------------->TcpGateway");
this.deviceChannels = deviceChannels;
}
/**
* 发送命令到设备
* @param deviceId 设备ID
* @param commandTopic 命令主题
* @param payload 命令内容
*/
public void sendCommandToDevice(Long deviceId, String commandTopic, String payload) {
Channel channel = deviceChannels.get(deviceId);
if (channel != null && channel.isActive()) {
Map payloadMap = new HashMap();
payloadMap.put("topic", commandTopic);
payloadMap.put("payload", payload);
channel.writeAndFlush(Unpooled.copiedBuffer(JSONUtil.toJsonStr(payloadMap), CharsetUtil.UTF_8));
log.info("发送命令到设备 {}: {}", deviceId, payload);
} else {
throw new ServerException("设备"+deviceId+"不在线或通道无效");
}
}
}

View File

@ -1,55 +1,89 @@
package net.maku.iot.communication.tcp.config;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.IpUtils;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.dto.DevicePropertyDTO;
import net.maku.iot.communication.dto.TcpMsgDTO;
import net.maku.iot.dto.DeviceClientDTO;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.DevicePropertyEnum;
import net.maku.iot.enums.DeviceRunningStatusEnum;
import net.maku.iot.enums.DeviceTopicEnum;
import net.maku.iot.service.IotDeviceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Data
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Configuration
public class NettyClientConfig {
private ChannelHandlerContext ctx;
@Autowired
private IotDeviceService deviceService;
@Bean
public Bootstrap nettyClient() {
Bootstrap nettyClient = new Bootstrap();
// 设置事件循环组主线程组和从线程组
nettyClient.group(new io.netty.channel.nio.NioEventLoopGroup())
//指定使用 NioServerSocketChannel 作为服务器通道
nettyClient.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
log.info("<------------------------ 客户端接收到: {}", msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
String msg = IpUtils.getHostName();
log.info("------------------------> 发送消息到服务端: 我是 {}", msg);
ctx.writeAndFlush(msg);
}
});
}
});
.option(ChannelOption.SO_KEEPALIVE, true) // 设置为长连接
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60*1000); // 设置连接超时时间
return nettyClient;
}
public void configureBootstrap(Bootstrap bootstrap) {
List<IotDeviceEntity> devices = deviceService.list(new LambdaQueryWrapper<IotDeviceEntity>().eq(IotDeviceEntity::getProtocolType, "TCP"));
for (IotDeviceEntity device : devices) {
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler<String>() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//模拟设备认证
Map authenticateMap = new HashMap();
authenticateMap.put("authenticate", device.getId().toString());
String authenticateMapJson = JsonUtils.toJsonString(authenticateMap);
log.info("------------------------> 发送认证信息到服务端: {}", authenticateMapJson);
ctx.writeAndFlush(authenticateMapJson);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
log.info("设备 {} 接收到服务端消息: {}", device.getId(), msg);
//模拟属性上报
if(msg.contains("authenticate passed")){
String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device));
DevicePropertyDTO devicePropertyDTO = new DevicePropertyDTO();
devicePropertyDTO.setDeviceId(device.getId().toString());
devicePropertyDTO.setPropertyType(DevicePropertyEnum.RUNNING_STATUS);
devicePropertyDTO.setPayload(String.valueOf(DeviceRunningStatusEnum.ONLINE.getValue()));
TcpMsgDTO tcpMsgDTO = new TcpMsgDTO();
tcpMsgDTO.setTopic(commandTopic);
tcpMsgDTO.setMsg(devicePropertyDTO);
String runningStatusjson = JsonUtils.toJsonString(tcpMsgDTO);
log.info("------------------------> 设备发送上线文本:{}",runningStatusjson);
ctx.writeAndFlush(runningStatusjson);
}
}
});
}
});
}
}
}

View File

@ -2,7 +2,7 @@ package net.maku.iot.communication.tcp.config;
import io.netty.bootstrap.Bootstrap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
@ -11,16 +11,24 @@ import org.springframework.context.event.ContextRefreshedEvent;
@Slf4j
public class NettyClientStartupConfig implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private Bootstrap nettyClient;
private final ObjectProvider<Bootstrap> nettyClientProvider;
private final NettyClientConfig nettyClientConfig;
public NettyClientStartupConfig(ObjectProvider<Bootstrap> nettyClientProvider, NettyClientConfig nettyClientConfig) {
this.nettyClientProvider = nettyClientProvider;
this.nettyClientConfig = nettyClientConfig;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 确保服务器启动完成后再启动客户端
try {
Thread.sleep(5000); // 延迟5秒以确保服务器启动
nettyClient.connect("127.0.0.1", 8888).sync();
log.info("Connected to Netty server on port 8888");
Bootstrap nettyClient = nettyClientProvider.getIfAvailable();
if (nettyClient != null) {
nettyClientConfig.configureBootstrap(nettyClient);
nettyClient.connect("127.0.0.1", 8888).sync();
log.info("Connected to Netty server on port 8888");
}
} catch (InterruptedException e) {
log.error("Failed to connect to Netty server", e);
}

View File

@ -46,8 +46,8 @@ public class NettyServerConfig {
ch.pipeline().addLast(
new StringDecoder(),
new StringEncoder(),
// new DeviceMsgHandler(deviceChannels), // 添加设备身份处理器
new ConnectionHandler(deviceChannels,tcpMessageHandlerFactory) // 添加设备连接处理器
// 添加设备连接处理器
new ConnectionHandler(deviceChannels,tcpMessageHandlerFactory)
);
}
})
@ -68,22 +68,3 @@ public class NettyServerConfig {
}
}
}
// // 发送命令到设备
// public void sendCommandToDevice(String deviceId, String command) {
// Channel channel = deviceChannels.get(deviceId);
// if (channel != null && channel.isActive()) {
// channel.writeAndFlush(Unpooled.copiedBuffer(command, CharsetUtil.UTF_8));
// log.info("发送命令到设备 {}: {}", deviceId, command);
// } else {
// log.warn("设备 {} 不在线或通道无效", deviceId);
// }
// }
//
// // 假设有方法通过通道获取设备 ID
// private String getDeviceId(Channel channel) {
// // 这里应该有逻辑来从通道获取设备 ID
// return "deviceId";
// }
//}

View File

@ -20,7 +20,7 @@ public class TcpClient {
PrintWriter writer = new PrintWriter(outputStream, true)) {
DevicePropertyDTO dto = new DevicePropertyDTO();
dto.setDeviceID("123456");
dto.setDeviceId("123456");
dto.setPropertyType(DevicePropertyEnum.TEMPERATURE);
dto.setPayload("60");

View File

@ -1,19 +1,18 @@
package net.maku.iot.communication.tcp.handler;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.dto.TcpMsgDTO;
import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @Description TODO
@ -35,22 +34,29 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 请求设备发送其 ID
ctx.writeAndFlush("ACK");
System.out.printf("channelActive");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg!=null&& StrUtil.contains(msg.toString(),"topic")) {
// 处理 TCP 消息
handleTcpMessage(ctx, JsonUtils.parseObject(msg.toString(), TcpMsgDTO.class));
} else {
// 处理其他类型的消息
log.warn("接收到未知的消息类型:{}", msg);
if (msg == null) {
return;
}
//鉴权认证
if (authenticate(ctx, msg)) {
//这里可以根据业务自定义扩展消息处理
if (StrUtil.contains(msg.toString(), "topic")) {
// 处理 TCP 消息
handleTcpMessage( JsonUtils.parseObject(msg.toString(), TcpMsgDTO.class));
} else {
// 处理其他类型的消息
log.warn("接收到未知的消息类型:{}", msg);
}
} else {
ctx.close();
}
}
}
@Override
@ -59,14 +65,14 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
if (deviceId != null) {
deviceChannels.remove(deviceId);
}
log.info(" {} 断开连接", deviceId == null ? "未知设备" : deviceId);
log.info(" 设备 {} 断开连接", deviceId == null ? "未知设备" : deviceId);
}
private void handleTcpMessage(ChannelHandlerContext ctx, TcpMsgDTO message) {
private void handleTcpMessage( TcpMsgDTO message) {
String topic = message.getTopic();
if (topic != null) {
tcpMessageHandlerFactory.getHandlersForTopic(topic).forEach(handler -> {
handler.handle(topic, message.getMsg().toString());
handler.handle(topic, message.getMsg());
});
} else {
log.warn("接收到主题为null的消息。");
@ -74,8 +80,51 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
}
/**
* TCP连接鉴权自行根据业务扩展
*/
private boolean authenticate(ChannelHandlerContext ctx, Object message) {
String messageRegex = "\"(authenticate|deviceId)\"\\s*:\\s*\"\\d+\"";
Pattern messagePattern = Pattern.compile(messageRegex);
Matcher matcherPattern = messagePattern.matcher(message.toString());
if (!matcherPattern.find()) {
ctx.writeAndFlush("设备消息无法识别!");
return false;
}
if (StrUtil.contains(message.toString(), "authenticate")) {
Pattern pattern = Pattern.compile("\"authenticate\"\\s*:\\s*\"(\\d+)\"");
Matcher matcher = pattern.matcher(message.toString());
if (matcher.find()) {
String deviceId = matcher.group(1);
setDeviceId(ctx.channel(), deviceId);
deviceChannels.put(deviceId, ctx.channel());
ctx.writeAndFlush("authenticate passed");
}
}
if (StrUtil.contains(message.toString(), "deviceId")) {
Pattern pattern = Pattern.compile("\"deviceId\"\\s*:\\s*\"(\\d+)\"");
Matcher matcher = pattern.matcher(message.toString());
if (matcher.find()) {
String deviceId = matcher.group(1);
Channel channel = deviceChannels.get(deviceId);
if (channel == null) {
ctx.writeAndFlush("设备连接不存在!请重新连接");
return false;
}
}
}
return true;
}
private String getDeviceId(Channel channel) {
// Channel 的属性中获取设备 ID
return channel.attr(AttributeKey.<String>valueOf("deviceId")).get();
}
private String setDeviceId(Channel channel, String deviceId) {
return channel.attr(AttributeKey.<String>valueOf("deviceId")).setIfAbsent(deviceId);
}
}

View File

@ -1,10 +1,21 @@
package net.maku.iot.communication.tcp.handler;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.dto.DevicePropertyDTO;
import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory;
import net.maku.iot.communication.mqtt.handler.MqttMessageHandler;
import net.maku.iot.communication.service.MQTTService;
import net.maku.iot.communication.service.TCPService;
import net.maku.iot.enums.DeviceTopicEnum;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Description TODO
* @Author LSF
@ -12,15 +23,57 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DeviceCommandResponseTCPMessageHandler implements TCPMessageHandler {
private final DeviceCommandResponseHandlerFactory deviceCommandResponseHandlerFactory;
private final TCPService deviceTCPService;
@Override
public boolean supports(String topic) {
return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.COMMAND_RESPONSE.getTopic());
}
@Override
public void handle(String topic, String message) {
//TCP设备响应处理
System.out.printf("TCP设备响应处理");
public void handle(String topic, Object message) {
DeviceCommandResponseDTO commandResponseDTO = parseCommandReplyMessage(topic, message);
Optional.ofNullable(commandResponseDTO.getCommand())
.orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令类型! 主题:'{}',消息:{}", topic, message)));
Optional.ofNullable(commandResponseDTO.getCommandId())
.orElseThrow(() -> new IllegalArgumentException(StrUtil.format("缺失指令ID! 主题:'{}',消息:{}", topic, message)));
Optional.ofNullable(commandResponseDTO)
.ifPresent(responseDTO -> {
// 调用设备命令执行器的命令响应处理逻辑
try {
deviceTCPService.commandReplied( responseDTO);
} catch (Exception e) {
log.error(StrUtil.format("调用设备命令执行器响应处理方法出错topic:{}, message:{}", topic, message), e);
}
// 调用自定义命令响应处理器
try {
deviceCommandResponseHandlerFactory.getHandlers().forEach(h -> h.handle(topic, responseDTO));
} catch (Exception e) {
log.error(StrUtil.format("调用设备命令响应响应处理器出错topic:{}, message:{}", topic, message), e);
}
});
}
private DeviceCommandResponseDTO parseCommandReplyMessage(String topic, Object message) {
try {
ObjectMapper mapper = new ObjectMapper();
DeviceCommandResponseDTO commandResponse = mapper.convertValue(message, DeviceCommandResponseDTO.class);
if (StrUtil.isBlank(commandResponse.getCommandId())) {
log.error(StrUtil.format("主题'{}'的消息,缺失指令ID", topic));
return null;
}
return commandResponse;
} catch (Exception e) {
log.error(StrUtil.format("将主题'{}'的消息解析为设备命令响应对象失败", topic), e);
return null;
}
}
}

View File

@ -1,10 +1,16 @@
package net.maku.iot.communication.tcp.handler;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.iot.communication.mqtt.handler.MqttMessageHandler;
import net.maku.iot.communication.dto.DevicePropertyDTO;
import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory;
import net.maku.iot.enums.DeviceTopicEnum;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Description TODO
* @Author LSF
@ -12,15 +18,31 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DevicePropertyTCPMessageHandler implements TCPMessageHandler {
private final DevicePropertyChangeHandlerFactory statusChangeHandlerFactory;
@Override
public boolean supports(String topic) {
return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.PROPERTY.getTopic());
}
@Override
public void handle(String topic, String message) {
//TCP设备属性上报处理
System.out.printf("TCP设备属性上报处理");
public void handle(String topic, Object message) {
DevicePropertyDTO devicePropertyDTO = parseStatusMessage(topic, message);
Optional.ofNullable(devicePropertyDTO)
.ifPresent(deviceProperty -> statusChangeHandlerFactory.getHandlers()
.forEach(h -> h.handle(topic, deviceProperty)));
}
private DevicePropertyDTO parseStatusMessage(String topic, Object message) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.convertValue(message, DevicePropertyDTO.class);
} catch (Exception e) {
log.error(StrUtil.format("将主题'{}'的消息解析为设备运行状态对象失败", topic), e);
return null;
}
}
}

View File

@ -22,5 +22,5 @@ public interface TCPMessageHandler {
* @param topic
* @param message
*/
void handle(String topic, String message);
void handle(String topic, Object message);
}

View File

@ -201,9 +201,7 @@ public class IotDeviceServiceImpl extends BaseServiceImpl<IotDeviceDao, IotDevic
private void handleRunningStatus(IotDeviceEntity device, DevicePropertyDTO deviceProperty, DeviceTopicEnum.DeviceTopicContext topicContext) {
DeviceRunningStatusEnum oldStatus = DeviceRunningStatusEnum.parse(device.getRunningStatus().toString());
DeviceRunningStatusEnum newStatus = DeviceRunningStatusEnum.parse(deviceProperty.getPayload());
if (newStatus.equals(oldStatus)) {
return;
}
device.setRunningStatus(newStatus.getValue());
if (DeviceRunningStatusEnum.ONLINE.equals(newStatus)) {
device.setUpTime(LocalDateTime.now());