add:调整结构

This commit is contained in:
LSF 2024-08-14 20:12:56 +08:00
parent 73ecc0bf4f
commit 88f88807e2
35 changed files with 969 additions and 531 deletions

View File

@ -1,4 +1,4 @@
package net.maku.iot.communication.mqtt.dto;
package net.maku.iot.communication.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@ -9,7 +9,7 @@ import lombok.Data;
* @author eden on 2024/6/17
*/
@Data
public class BaseCommandResponse {
public class BaseCommandResponseDTO extends BaseDeviceID {
/**
* 命令ID
*/

View File

@ -0,0 +1,17 @@
package net.maku.iot.communication.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* @Description TODO
* @Author LSF
* @Date 2024/8/14 17:24
*/
@Data
@Schema(description = "设备ID")
public class BaseDeviceID {
@Schema(description = "设备ID")
protected String deviceID;
}

View File

@ -1,4 +1,4 @@
package net.maku.iot.communication.mqtt.dto;
package net.maku.iot.communication.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@ -11,7 +11,7 @@ import net.maku.iot.enums.DeviceCommandEnum;
*/
@Data
@Schema(description = "设备命令对象")
public class DeviceCommandDTO {
public class DeviceCommandDTO extends BaseDeviceID {
/**
* 命令类型
*/

View File

@ -1,4 +1,4 @@
package net.maku.iot.communication.mqtt.dto;
package net.maku.iot.communication.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.swagger.v3.oas.annotations.media.Schema;
@ -15,7 +15,7 @@ import net.maku.iot.enums.DeviceCommandEnum;
@Data
@Schema(description = "设备命令响应DTO")
@JsonIgnoreProperties(ignoreUnknown = true)
public class DeviceCommandResponseDTO extends BaseCommandResponse {
public class DeviceCommandResponseDTO extends BaseCommandResponseDTO {
/**
* 命令类型
*/

View File

@ -1,4 +1,4 @@
package net.maku.iot.communication.mqtt.dto;
package net.maku.iot.communication.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@ -11,7 +11,7 @@ import net.maku.iot.enums.DevicePropertyEnum;
*/
@Data
@Schema(description = "设备属性对象")
public class DevicePropertyDTO {
public class DevicePropertyDTO extends BaseDeviceID {
/**
* 设备属性类型
*/

View File

@ -0,0 +1,16 @@
package net.maku.iot.communication.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* @Description TODO
* @Author LSF
* @Date 2024/8/14 19:31
*/
@Data
@Schema(description = "tcp通讯数据包装类")
public class TcpMsgDTO {
private String topic;
private Object msg;
}

View File

@ -1,6 +1,7 @@
package net.maku.iot.communication.mqtt.dto;
package net.maku.iot.communication.mqtt.chan;
import lombok.extern.slf4j.Slf4j;
import net.maku.iot.communication.dto.BaseCommandResponseDTO;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@ -17,7 +18,7 @@ public class CommandResponseChan {
// 存储通道的 ConcurrentHashMap
private static final ConcurrentHashMap<String, CommandResponseChan> CHANNEL = new ConcurrentHashMap<>();
private final CompletableFuture<BaseCommandResponse> future = new CompletableFuture<>();
private final CompletableFuture<BaseCommandResponseDTO> future = new CompletableFuture<>();
private final Long DEFAULT_WAIT_MILLISECONDS = 5 * 1000L;
@ -45,7 +46,7 @@ public class CommandResponseChan {
* @param commandId 通道标识
* @return 获取的数据如果超时返回 null
*/
public BaseCommandResponse get(String commandId) {
public BaseCommandResponseDTO get(String commandId) {
return get(commandId, DEFAULT_WAIT_MILLISECONDS);
}
@ -56,7 +57,7 @@ public class CommandResponseChan {
* @param timeout 超时时间毫秒
* @return 获取的数据如果超时返回 null
*/
public BaseCommandResponse get(String commandId, long timeout) {
public BaseCommandResponseDTO get(String commandId, long timeout) {
CommandResponseChan channel = CHANNEL.get(commandId);
if (Objects.isNull(channel)) {
return null;
@ -82,7 +83,7 @@ public class CommandResponseChan {
*
* @param response 要放入的数据
*/
public void put(BaseCommandResponse response) {
public void put(BaseCommandResponseDTO response) {
String commandId = response.getCommandId();
if (commandId == null) {
return;

View File

@ -1,7 +1,7 @@
package net.maku.iot.communication.mqtt.handler;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
/**
* 设备命令响应处理器

View File

@ -4,8 +4,8 @@ import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.MQTTService;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.service.MQTTService;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory;
import net.maku.iot.enums.DeviceTopicEnum;
import org.springframework.stereotype.Component;

View File

@ -1,7 +1,7 @@
package net.maku.iot.communication.mqtt.handler;
import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO;
import net.maku.iot.communication.dto.DevicePropertyDTO;
/**
* 设备属性变化处理器

View File

@ -4,7 +4,7 @@ import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO;
import net.maku.iot.communication.dto.DevicePropertyDTO;
import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory;
import net.maku.iot.enums.DeviceTopicEnum;
import org.springframework.stereotype.Component;

View File

@ -1,8 +1,8 @@
package net.maku.iot.communication;
package net.maku.iot.communication.service;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.DeviceCommandEnum;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
/**
* 基础通信协议具备功能

View File

@ -1,4 +1,4 @@
package net.maku.iot.communication;
package net.maku.iot.communication.service;
import lombok.AllArgsConstructor;
import net.maku.framework.common.exception.ServerException;

View File

@ -1,4 +1,4 @@
package net.maku.iot.communication;
package net.maku.iot.communication.service;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
@ -6,9 +6,9 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.exception.ServerException;
import net.maku.iot.communication.mqtt.MqttGateway;
import net.maku.iot.communication.mqtt.dto.CommandResponseChan;
import net.maku.iot.communication.mqtt.dto.DeviceCommandDTO;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.mqtt.chan.CommandResponseChan;
import net.maku.iot.communication.dto.DeviceCommandDTO;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.dto.DeviceClientDTO;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.DeviceCommandEnum;
@ -17,9 +17,6 @@ import net.maku.iot.service.IotDeviceServiceLogService;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Exchanger;
/**
* @Description TODO

View File

@ -1,10 +1,10 @@
package net.maku.iot.communication;
package net.maku.iot.communication.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.DeviceCommandEnum;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import org.springframework.stereotype.Component;
/**
@ -17,9 +17,9 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class TCPService implements BaseCommunication {
@Override
public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) {
// nettyClientConfig.sendMessage("asdddddddddddddddddd");
return "";
}

View File

@ -0,0 +1,55 @@
package net.maku.iot.communication.tcp.config;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.IpUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Data
@Slf4j
@Configuration
public class NettyClientConfig {
private ChannelHandlerContext ctx;
@Bean
public Bootstrap nettyClient() {
Bootstrap nettyClient = new Bootstrap();
// 设置事件循环组主线程组和从线程组
nettyClient.group(new io.netty.channel.nio.NioEventLoopGroup())
//指定使用 NioServerSocketChannel 作为服务器通道
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
log.info("<------------------------ 客户端接收到: {}", msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
String msg = IpUtils.getHostName();
log.info("------------------------> 发送消息到服务端: 我是 {}", msg);
ctx.writeAndFlush(msg);
}
});
}
});
return nettyClient;
}
}

View File

@ -0,0 +1,28 @@
package net.maku.iot.communication.tcp.config;
import io.netty.bootstrap.Bootstrap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
@Configuration
@Slf4j
public class NettyClientStartupConfig implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private Bootstrap nettyClient;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 确保服务器启动完成后再启动客户端
try {
Thread.sleep(5000); // 延迟5秒以确保服务器启动
nettyClient.connect("127.0.0.1", 8888).sync();
log.info("Connected to Netty server on port 8888");
} catch (InterruptedException e) {
log.error("Failed to connect to Netty server", e);
}
}
}

View File

@ -0,0 +1,85 @@
package net.maku.iot.communication.tcp.config;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import net.maku.iot.communication.mqtt.factory.MqttMessageHandlerFactory;
import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory;
import net.maku.iot.communication.tcp.handler.ConnectionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Configuration
@Slf4j
public class NettyServerConfig {
@Bean
public ConcurrentMap<String, Channel> deviceChannels() {
return new ConcurrentHashMap<>();
}
@Bean
public ServerBootstrap nettyServer(ConcurrentMap<String, Channel> deviceChannels) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new StringDecoder(),
new StringEncoder(),
// new DeviceMsgHandler(deviceChannels), // 添加设备身份处理器
new ConnectionHandler(deviceChannels) // 添加设备连接处理器
);
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
return bootstrap;
}
@Bean
public ChannelFuture serverChannelFuture(ServerBootstrap serverBootstrap) throws InterruptedException {
try {
ChannelFuture future = serverBootstrap.bind(8888).sync();
log.info("------------------------ Netty 服务器在端口 8888 启动成功");
return future;
} catch (Exception e) {
log.error("------------------------ Netty 服务器启动失败", e);
throw e;
}
}
}
// // 发送命令到设备
// public void sendCommandToDevice(String deviceId, String command) {
// Channel channel = deviceChannels.get(deviceId);
// if (channel != null && channel.isActive()) {
// channel.writeAndFlush(Unpooled.copiedBuffer(command, CharsetUtil.UTF_8));
// log.info("发送命令到设备 {}: {}", deviceId, command);
// } else {
// log.warn("设备 {} 不在线或通道无效", deviceId);
// }
// }
//
// // 假设有方法通过通道获取设备 ID
// private String getDeviceId(Channel channel) {
// // 这里应该有逻辑来从通道获取设备 ID
// return "deviceId";
// }
//}

View File

@ -0,0 +1,42 @@
package net.maku.iot.communication.tcp.config;
import cn.hutool.json.JSONUtil;
import net.maku.iot.communication.dto.DevicePropertyDTO;
import net.maku.iot.communication.dto.TcpMsgDTO;
import net.maku.iot.enums.DevicePropertyEnum;
import net.maku.iot.enums.DeviceTopicEnum;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
public class TcpClient {
public static void main(String[] args) {
String serverAddress = ""; // 服务端的地址
int port = 8888; // 服务端的端口号
try (Socket socket = new Socket(serverAddress, port);
OutputStream outputStream = socket.getOutputStream();
PrintWriter writer = new PrintWriter(outputStream, true)) {
DevicePropertyDTO dto = new DevicePropertyDTO();
dto.setDeviceID("123456");
dto.setPropertyType(DevicePropertyEnum.TEMPERATURE);
dto.setPayload("60");
TcpMsgDTO tcpMsgDTO = new TcpMsgDTO();
tcpMsgDTO.setMsg(dto);
tcpMsgDTO.setTopic(DeviceTopicEnum.PROPERTY.getTopic());
writer.println(JSONUtil.toJsonStr(tcpMsgDTO)); // 发送消息到服务端
System.out.println("Message sent: " + JSONUtil.toJsonStr(tcpMsgDTO));
Thread.sleep(100000);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,47 @@
package net.maku.iot.communication.tcp.factory;
import lombok.RequiredArgsConstructor;
import net.maku.iot.communication.tcp.handler.TCPMessageHandler;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* TCP消息处理器工厂自动获取所有实现的处理器实例
*
* @author LSF maku_lsf@163.com
*/
@Component
@RequiredArgsConstructor
public class TcpMessageHandlerFactory {
private final ApplicationContext applicationContext;
/**
* 所有消息处理器
*/
private List<TCPMessageHandler> messageHandlers;
private List<TCPMessageHandler> loadHandlers() {
if (messageHandlers != null) {
return messageHandlers;
}
messageHandlers = new ArrayList<>(applicationContext.getBeansOfType(TCPMessageHandler.class).values());
return messageHandlers;
}
/**
* 获取与主题对应的tcp消息处理器
*
* @param topic 主题
* @return 处理器列表
*/
public List<TCPMessageHandler> getHandlersForTopic(String topic) {
return Collections.unmodifiableList(loadHandlers().stream()
.filter(handler -> handler.supports(topic))
.collect(Collectors.toList()));
}
}

View File

@ -0,0 +1,82 @@
package net.maku.iot.communication.tcp.handler;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import net.maku.framework.common.utils.JsonUtils;
import net.maku.iot.communication.dto.TcpMsgDTO;
import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory;
import java.util.concurrent.ConcurrentMap;
/**
* @Description TODO
* @Author LSF
* @Date 2024/8/14 16:52
*/
@Slf4j
public class ConnectionHandler extends ChannelInboundHandlerAdapter {
@Resource
private TcpMessageHandlerFactory tcpMessageHandlerFactory;
private final ConcurrentMap<String, Channel> deviceChannels;
public ConnectionHandler(ConcurrentMap<String, Channel> deviceChannels) {
this.deviceChannels = deviceChannels;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 请求设备发送其 ID
ctx.writeAndFlush("ACK");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg!=null&& StrUtil.contains(msg.toString(),"topic")) {
// 处理 TCP 消息
handleTcpMessage(ctx, JsonUtils.parseObject(msg.toString(), TcpMsgDTO.class));
} else {
// 处理其他类型的消息
log.warn("接收到未知的消息类型:{}", msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
String deviceId = getDeviceId(ctx.channel());
if (deviceId != null) {
deviceChannels.remove(deviceId);
}
log.info(" {} 断开连接", deviceId == null ? "未知设备" : deviceId);
}
private void handleTcpMessage(ChannelHandlerContext ctx, TcpMsgDTO message) {
String topic = message.getTopic();
if (topic != null) {
tcpMessageHandlerFactory.getHandlersForTopic(topic).forEach(handler -> {
handler.handle(topic, message.getMsg().toString());
});
} else {
log.warn("接收到主题为null的消息。");
}
}
private String getDeviceId(Channel channel) {
// Channel 的属性中获取设备 ID
return channel.attr(AttributeKey.<String>valueOf("deviceId")).get();
}
}

View File

@ -0,0 +1,21 @@
package net.maku.iot.communication.tcp.handler;
import net.maku.iot.communication.mqtt.handler.MqttMessageHandler;
import net.maku.iot.enums.DeviceTopicEnum;
/**
* @Description TODO
* @Author LSF
* @Date 2024/8/14 19:23
*/
public class DeviceCommandResponseTCPMessageHandler implements MqttMessageHandler {
@Override
public boolean supports(String topic) {
return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.COMMAND_RESPONSE.getTopic());
}
@Override
public void handle(String topic, String message) {
//TCP设备响应处理
}
}

View File

@ -0,0 +1,21 @@
package net.maku.iot.communication.tcp.handler;
import net.maku.iot.communication.mqtt.handler.MqttMessageHandler;
import net.maku.iot.enums.DeviceTopicEnum;
/**
* @Description TODO
* @Author LSF
* @Date 2024/8/14 19:24
*/
public class DevicePropertyTCPMessageHandler implements MqttMessageHandler {
@Override
public boolean supports(String topic) {
return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.PROPERTY.getTopic());
}
@Override
public void handle(String topic, String message) {
//TCP设备属性上报处理
}
}

View File

@ -0,0 +1,26 @@
package net.maku.iot.communication.tcp.handler;
/**
* TCP消息处理接口
*
* @author LSF maku_lsf@163.com
*/
public interface TCPMessageHandler {
/**
* 是否支持处理指定的topic
*
* @param topic
* @return
*/
boolean supports(String topic);
/**
* TCP消息处理接口
*
* @param topic
* @param message
*/
void handle(String topic, String message);
}

View File

@ -3,9 +3,9 @@ package net.maku.iot.service;
import net.maku.framework.common.utils.PageResult;
import net.maku.framework.mybatis.service.BaseService;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.query.IotDeviceQuery;
import net.maku.iot.communication.BaseCommunication;
import net.maku.iot.communication.service.BaseCommunication;
import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO;
import net.maku.iot.vo.DeviceCommandVO;
import net.maku.iot.vo.DeviceReportAttributeDataVO;

View File

@ -16,13 +16,13 @@ import net.maku.iot.convert.IotDeviceConvert;
import net.maku.iot.dao.IotDeviceDao;
import net.maku.iot.entity.IotDeviceEntity;
import net.maku.iot.enums.*;
import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO;
import net.maku.iot.communication.dto.DeviceCommandResponseDTO;
import net.maku.iot.communication.dto.DevicePropertyDTO;
import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler;
import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler;
import net.maku.iot.query.IotDeviceQuery;
import net.maku.iot.communication.BaseCommunication;
import net.maku.iot.communication.CommunicationServiceFactory;
import net.maku.iot.communication.service.BaseCommunication;
import net.maku.iot.communication.service.CommunicationServiceFactory;
import net.maku.iot.service.IotDeviceEventLogService;
import net.maku.iot.service.IotDeviceService;
import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO;