add:优化

This commit is contained in:
LSF 2024-08-19 19:10:48 +08:00
parent a784937a68
commit 577c8df126
6 changed files with 60 additions and 39 deletions

View File

@ -176,7 +176,7 @@ public class MQTTService implements BaseCommunication {
mqttGateway.sendToMqtt(commandTopic, payload);
} catch (Exception e) {
log.error(e.getMessage());
throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ",
throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟命令执行结果上报失败! Topic:{} ",
device.getCode(), device.getName(), commandTopic));
}
}

View File

@ -5,10 +5,10 @@ import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.exception.ServerException;
import net.maku.iot.communication.dto.CommandResponseChan;
import net.maku.iot.communication.dto.DeviceCommandDTO;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.dto.CommandResponseChan;
import net.maku.iot.communication.mqtt.MqttGateway;
import net.maku.iot.communication.dto.DevicePropertyDTO;
import net.maku.iot.communication.tcp.TcpGateway;
import net.maku.iot.dto.DeviceClientDTO;
import net.maku.iot.entity.IotDeviceEntity;
@ -30,7 +30,6 @@ import java.util.UUID;
public class TCPService implements BaseCommunication {
private final TcpGateway tcpGateway;
private final IotDeviceServiceLogService iotDeviceEventLogService;
@Override
@ -40,6 +39,7 @@ public class TCPService implements BaseCommunication {
DeviceCommandDTO commandDTO = new DeviceCommandDTO();
commandDTO.setCommand(command);
commandDTO.setId(commandId);
commandDTO.setDeviceId(String.valueOf(device.getId()));
commandDTO.setPayload(payload);
String commandTopic = DeviceTopicEnum.COMMAND.buildTopic(DeviceClientDTO.from(device));
@ -83,6 +83,7 @@ public class TCPService implements BaseCommunication {
DeviceCommandResponseDTO simulateResponseDto = new DeviceCommandResponseDTO();
simulateResponseDto.setCommandId(commandId);
simulateResponseDto.setResponsePayload(command.getTitle() + ",设备执行成功!");
simulateResponseDto.setDeviceId(String.valueOf(device.getId()));
simulateResponseDto.setCommand(command);
simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(simulateResponseDto));
} catch (InterruptedException e) {
@ -104,7 +105,7 @@ public class TCPService implements BaseCommunication {
// 封装 设备属性上报的 topic
String commandTopic = DeviceTopicEnum.PROPERTY.buildTopic(DeviceClientDTO.from(device));
try {
tcpGateway.sendCommandToDevice(device.getId(),commandTopic, payload);
tcpGateway.simulateDeviceReport(device.getId(), commandTopic, payload, DevicePropertyDTO.class);
} catch (Exception e) {
log.error(e.getMessage());
throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟属性上报失败! Topic:{} ",
@ -117,10 +118,10 @@ public class TCPService implements BaseCommunication {
// 封装 设备命令执行结果的 topic
String commandTopic = DeviceTopicEnum.COMMAND_RESPONSE.buildTopic(DeviceClientDTO.from(device));
try {
tcpGateway.sendCommandToDevice(device.getId(),commandTopic, payload);
tcpGateway.simulateDeviceReport(device.getId(), commandTopic, payload, DeviceCommandResponseDTO.class);
} catch (Exception e) {
log.error(e.getMessage());
throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟发送命令执行结果失败! Topic:{} ",
throw new ServerException(StrUtil.format("模拟设备:{}-{},模拟命令执行结果上报失败! Topic:{} ",
device.getCode(), device.getName(), commandTopic));
}
}

View File

@ -1,23 +1,14 @@
package net.maku.iot.communication.tcp;
import cn.hutool.json.JSONUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.util.CharsetUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.exception.ServerException;
import net.maku.iot.communication.mqtt.config.MqttConfig;
import net.maku.iot.communication.tcp.config.NettyServerConfig;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.dto.DeviceCommandDTO;
import net.maku.iot.communication.dto.TcpMsgDTO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
@ -34,27 +25,59 @@ public class TcpGateway {
@Autowired
public TcpGateway(ConcurrentMap<String, Channel> deviceChannels) {
System.out.printf("-------------------------------->TcpGateway");
this.deviceChannels = deviceChannels;
}
/**
* 获取设备通道
*
* @return
*/
public ConcurrentMap<String, Channel> getTcpDeviceChannels() {
return deviceChannels;
}
/**
* 发送命令到设备
* @param deviceId 设备ID
* @param commandTopic 命令主题
* @param payload 命令内容
*/
public void sendCommandToDevice(Long deviceId, String commandTopic, String payload) {
Channel channel = deviceChannels.get(deviceId);
Channel channel = deviceChannels.get(deviceId.toString());
if (channel != null && channel.isActive()) {
Map payloadMap = new HashMap();
payloadMap.put("topic", commandTopic);
payloadMap.put("payload", payload);
TcpMsgDTO tcpMsgDTO = new TcpMsgDTO();
tcpMsgDTO.setTopic(commandTopic);
DeviceCommandDTO deviceCommandDTO = JsonUtils.parseObject(payload, DeviceCommandDTO.class);
deviceCommandDTO.setDeviceId(deviceId.toString());
tcpMsgDTO.setMsg(deviceCommandDTO);
channel.writeAndFlush(Unpooled.copiedBuffer(JSONUtil.toJsonStr(payloadMap), CharsetUtil.UTF_8));
String commandJson = JsonUtils.toJsonString(tcpMsgDTO);
// channel.writeAndFlush(commandJson);
log.info("发送命令到设备 {}: {}", deviceId, payload);
} else {
throw new ServerException("设备"+deviceId+"不在线或通道无效");
}
}
public void simulateDeviceReport(Long deviceId, String commandTopic, String payload, Class reportDtoclazz) {
Channel channel = deviceChannels.get(deviceId.toString());
if (channel != null && channel.isActive()) {
try {
TcpMsgDTO tcpMsgDTO = new TcpMsgDTO();
tcpMsgDTO.setTopic(commandTopic);
tcpMsgDTO.setMsg(JsonUtils.parseObject(payload, reportDtoclazz));
String devicePropertyJson = JsonUtils.toJsonString(tcpMsgDTO);
// 模拟上报触发 channelRead 处理
channel.pipeline().fireChannelRead(devicePropertyJson);
log.info("模拟设备 {} 上报数据: {}", deviceId, devicePropertyJson);
} catch (Exception e) {
log.error("模拟设备上报数据时出现错误", e);
}
} else {
throw new ServerException("设备 " + deviceId + " 不在线或通道无效");
}
}
}

View File

@ -15,15 +15,16 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @Description TODO
* @Description TCP服务器连接处理器
* @Author LSF
* @Date 2024/8/14 16:52
*/
@Slf4j
public class ConnectionHandler extends ChannelInboundHandlerAdapter {
public static final AttributeKey<String> DEVICE_ID = AttributeKey.valueOf("DEVICE_ID");
private final ConcurrentMap<String, Channel> deviceChannels;
private ConcurrentMap<String, Channel> deviceChannels;
private final TcpMessageHandlerFactory tcpMessageHandlerFactory;
public ConnectionHandler(ConcurrentMap<String, Channel> deviceChannels,TcpMessageHandlerFactory tcpMessageHandlerFactory) {
@ -96,7 +97,8 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
Matcher matcher = pattern.matcher(message.toString());
if (matcher.find()) {
String deviceId = matcher.group(1);
setDeviceId(ctx.channel(), deviceId);
// setDeviceId(ctx.channel(), deviceId);
ctx.channel().attr(DEVICE_ID).set(deviceId);
deviceChannels.put(deviceId, ctx.channel());
ctx.writeAndFlush("authenticate passed");
}
@ -114,17 +116,14 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter {
}
}
}
return true;
}
private String getDeviceId(Channel channel) {
// Channel 的属性中获取设备 ID
return channel.attr(AttributeKey.<String>valueOf("deviceId")).get();
return channel.attr(DEVICE_ID).get();
}
private String setDeviceId(Channel channel, String deviceId) {
return channel.attr(AttributeKey.<String>valueOf("deviceId")).setIfAbsent(deviceId);
}
// private String setDeviceId(Channel channel, String deviceId) {
// return channel.attr(AttributeKey.<String>valueOf("deviceId")).setIfAbsent(deviceId);
// }
}

View File

@ -4,12 +4,8 @@ import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.dto.DevicePropertyDTO;
import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory;
import net.maku.iot.communication.mqtt.handler.MqttMessageHandler;
import net.maku.iot.communication.service.MQTTService;
import net.maku.iot.communication.service.TCPService;
import net.maku.iot.enums.DeviceTopicEnum;
import org.springframework.stereotype.Component;

View File

@ -146,12 +146,14 @@ public class IotDeviceServiceImpl extends BaseServiceImpl<IotDeviceDao, IotDevic
@Override
public void simulateDeviceReportAttributeData(DeviceReportAttributeDataVO vo) {
IotDeviceEntity device = getById(vo.getDeviceId());
vo.setDeviceId(vo.getDeviceId());
getSendService(device).simulateDeviceReportAttributeData(device, JSONUtil.toJsonStr(vo));
}
@Override
public void simulateDeviceCommandResponseAttributeData(DeviceCommandResponseAttributeDataVO vo) {
IotDeviceEntity device = getById(vo.getDeviceId());
vo.setDeviceId(vo.getDeviceId());
getSendService(device).simulateDeviceCommandResponseAttributeData(device, JSONUtil.toJsonStr(vo));
}