diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java similarity index 72% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java index 719fdd5..db67d94 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/BaseCommandResponse.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseCommandResponseDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.communication.mqtt.dto; +package net.maku.iot.communication.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; @@ -9,7 +9,7 @@ import lombok.Data; * @author eden on 2024/6/17 */ @Data -public class BaseCommandResponse { +public class BaseCommandResponseDTO extends BaseDeviceID { /** * 命令ID */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java new file mode 100644 index 0000000..927290a --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/BaseDeviceID.java @@ -0,0 +1,17 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 17:24 + */ +@Data +@Schema(description = "设备ID") +public class BaseDeviceID { + + @Schema(description = "设备ID") + protected String deviceID; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java similarity index 86% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java index e8154de..7a921ff 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.communication.mqtt.dto; +package net.maku.iot.communication.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; @@ -11,7 +11,7 @@ import net.maku.iot.enums.DeviceCommandEnum; */ @Data @Schema(description = "设备命令对象") -public class DeviceCommandDTO { +public class DeviceCommandDTO extends BaseDeviceID { /** * 命令类型 */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java similarity index 96% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java index eb762f9..1793647 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DeviceCommandResponseDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DeviceCommandResponseDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.communication.mqtt.dto; +package net.maku.iot.communication.dto; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import io.swagger.v3.oas.annotations.media.Schema; @@ -15,7 +15,7 @@ import net.maku.iot.enums.DeviceCommandEnum; @Data @Schema(description = "设备命令响应DTO") @JsonIgnoreProperties(ignoreUnknown = true) -public class DeviceCommandResponseDTO extends BaseCommandResponse { +public class DeviceCommandResponseDTO extends BaseCommandResponseDTO { /** * 命令类型 */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java similarity index 85% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java index fed63de..e43249e 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/DevicePropertyDTO.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/DevicePropertyDTO.java @@ -1,4 +1,4 @@ -package net.maku.iot.communication.mqtt.dto; +package net.maku.iot.communication.dto; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; @@ -11,7 +11,7 @@ import net.maku.iot.enums.DevicePropertyEnum; */ @Data @Schema(description = "设备属性对象") -public class DevicePropertyDTO { +public class DevicePropertyDTO extends BaseDeviceID { /** * 设备属性类型 */ diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java new file mode 100644 index 0000000..b2fdf66 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/dto/TcpMsgDTO.java @@ -0,0 +1,16 @@ +package net.maku.iot.communication.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 19:31 + */ +@Data +@Schema(description = "tcp通讯数据包装类") +public class TcpMsgDTO { + private String topic; + private Object msg; +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java similarity index 87% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java index 1896613..f83159d 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/dto/CommandResponseChan.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/chan/CommandResponseChan.java @@ -1,6 +1,7 @@ -package net.maku.iot.communication.mqtt.dto; +package net.maku.iot.communication.mqtt.chan; import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.dto.BaseCommandResponseDTO; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -17,7 +18,7 @@ public class CommandResponseChan { // 存储通道的 ConcurrentHashMap private static final ConcurrentHashMap CHANNEL = new ConcurrentHashMap<>(); - private final CompletableFuture future = new CompletableFuture<>(); + private final CompletableFuture future = new CompletableFuture<>(); private final Long DEFAULT_WAIT_MILLISECONDS = 5 * 1000L; @@ -45,7 +46,7 @@ public class CommandResponseChan { * @param commandId 通道标识 * @return 获取的数据,如果超时返回 null */ - public BaseCommandResponse get(String commandId) { + public BaseCommandResponseDTO get(String commandId) { return get(commandId, DEFAULT_WAIT_MILLISECONDS); } @@ -56,7 +57,7 @@ public class CommandResponseChan { * @param timeout 超时时间(毫秒) * @return 获取的数据,如果超时返回 null */ - public BaseCommandResponse get(String commandId, long timeout) { + public BaseCommandResponseDTO get(String commandId, long timeout) { CommandResponseChan channel = CHANNEL.get(commandId); if (Objects.isNull(channel)) { return null; @@ -82,7 +83,7 @@ public class CommandResponseChan { * * @param response 要放入的数据 */ - public void put(BaseCommandResponse response) { + public void put(BaseCommandResponseDTO response) { String commandId = response.getCommandId(); if (commandId == null) { return; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java index 3af2705..76bb221 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseHandler.java @@ -1,7 +1,7 @@ package net.maku.iot.communication.mqtt.handler; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; /** * 设备命令响应处理器 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java index 5389920..5cae0e2 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DeviceCommandResponseMqttMessageHandler.java @@ -4,8 +4,8 @@ import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; -import net.maku.iot.communication.MQTTService; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.service.MQTTService; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; import net.maku.iot.communication.mqtt.factory.DeviceCommandResponseHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java index 3822fe7..17eee72 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyChangeHandler.java @@ -1,7 +1,7 @@ package net.maku.iot.communication.mqtt.handler; -import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; /** * 设备属性变化处理器 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java index f8416f2..1299857 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/mqtt/handler/DevicePropertyMqttMessageHandler.java @@ -4,7 +4,7 @@ import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.utils.JsonUtils; -import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; import net.maku.iot.communication.mqtt.factory.DevicePropertyChangeHandlerFactory; import net.maku.iot.enums.DeviceTopicEnum; import org.springframework.stereotype.Component; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java similarity index 89% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java index ef40e9f..15882ac 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/BaseCommunication.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/BaseCommunication.java @@ -1,8 +1,8 @@ -package net.maku.iot.communication; +package net.maku.iot.communication.service; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.DeviceCommandEnum; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; /** * 基础通信协议具备功能 diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java similarity index 94% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java index c4535e1..4d34019 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/CommunicationServiceFactory.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/CommunicationServiceFactory.java @@ -1,4 +1,4 @@ -package net.maku.iot.communication; +package net.maku.iot.communication.service; import lombok.AllArgsConstructor; import net.maku.framework.common.exception.ServerException; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java similarity index 95% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java index 462dc64..ee91daa 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/MQTTService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/MQTTService.java @@ -1,4 +1,4 @@ -package net.maku.iot.communication; +package net.maku.iot.communication.service; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; @@ -6,9 +6,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.maku.framework.common.exception.ServerException; import net.maku.iot.communication.mqtt.MqttGateway; -import net.maku.iot.communication.mqtt.dto.CommandResponseChan; -import net.maku.iot.communication.mqtt.dto.DeviceCommandDTO; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.mqtt.chan.CommandResponseChan; +import net.maku.iot.communication.dto.DeviceCommandDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; import net.maku.iot.dto.DeviceClientDTO; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.DeviceCommandEnum; @@ -17,9 +17,6 @@ import net.maku.iot.service.IotDeviceServiceLogService; import org.springframework.stereotype.Component; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Exchanger; /** * @Description TODO diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java similarity index 86% rename from maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java rename to maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java index 8ee0d71..452bc8c 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/TCPService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/service/TCPService.java @@ -1,10 +1,10 @@ -package net.maku.iot.communication; +package net.maku.iot.communication.service; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.DeviceCommandEnum; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; import org.springframework.stereotype.Component; /** @@ -17,9 +17,9 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class TCPService implements BaseCommunication { - @Override public String asyncSendCommand(IotDeviceEntity device, DeviceCommandEnum command, String payload) { +// nettyClientConfig.sendMessage("asdddddddddddddddddd"); return ""; } diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java new file mode 100644 index 0000000..bbf627c --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientConfig.java @@ -0,0 +1,55 @@ +package net.maku.iot.communication.tcp.config; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.IpUtils; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Data +@Slf4j +@Configuration +public class NettyClientConfig { + + private ChannelHandlerContext ctx; + + + @Bean + public Bootstrap nettyClient() { + Bootstrap nettyClient = new Bootstrap(); + // 设置事件循环组(主线程组和从线程组) + nettyClient.group(new io.netty.channel.nio.NioEventLoopGroup()) + //指定使用 NioServerSocketChannel 作为服务器通道 + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) { + log.info("<------------------------ 客户端接收到: {}", msg); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + String msg = IpUtils.getHostName(); + log.info("------------------------> 发送消息到服务端: 我是 {}", msg); + ctx.writeAndFlush(msg); + } + }); + } + }); + return nettyClient; + } + +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java new file mode 100644 index 0000000..176b7f2 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyClientStartupConfig.java @@ -0,0 +1,28 @@ +package net.maku.iot.communication.tcp.config; + +import io.netty.bootstrap.Bootstrap; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.ContextRefreshedEvent; + +@Configuration +@Slf4j +public class NettyClientStartupConfig implements ApplicationListener { + + @Autowired + private Bootstrap nettyClient; + + @Override + public void onApplicationEvent(ContextRefreshedEvent event) { + // 确保服务器启动完成后再启动客户端 + try { + Thread.sleep(5000); // 延迟5秒以确保服务器启动 + nettyClient.connect("127.0.0.1", 8888).sync(); + log.info("Connected to Netty server on port 8888"); + } catch (InterruptedException e) { + log.error("Failed to connect to Netty server", e); + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java new file mode 100644 index 0000000..8b8a338 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/NettyServerConfig.java @@ -0,0 +1,85 @@ +package net.maku.iot.communication.tcp.config; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import net.maku.iot.communication.mqtt.factory.MqttMessageHandlerFactory; +import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory; +import net.maku.iot.communication.tcp.handler.ConnectionHandler; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + + +@Configuration +@Slf4j +public class NettyServerConfig { + + @Bean + public ConcurrentMap deviceChannels() { + return new ConcurrentHashMap<>(); + } + + @Bean + public ServerBootstrap nettyServer(ConcurrentMap deviceChannels) { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast( + new StringDecoder(), + new StringEncoder(), +// new DeviceMsgHandler(deviceChannels), // 添加设备身份处理器 + new ConnectionHandler(deviceChannels) // 添加设备连接处理器 + ); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + return bootstrap; + } + + @Bean + public ChannelFuture serverChannelFuture(ServerBootstrap serverBootstrap) throws InterruptedException { + try { + ChannelFuture future = serverBootstrap.bind(8888).sync(); + log.info("------------------------ Netty 服务器在端口 8888 启动成功"); + return future; + } catch (Exception e) { + log.error("------------------------ Netty 服务器启动失败", e); + throw e; + } + } +} + + +// // 发送命令到设备 +// public void sendCommandToDevice(String deviceId, String command) { +// Channel channel = deviceChannels.get(deviceId); +// if (channel != null && channel.isActive()) { +// channel.writeAndFlush(Unpooled.copiedBuffer(command, CharsetUtil.UTF_8)); +// log.info("发送命令到设备 {}: {}", deviceId, command); +// } else { +// log.warn("设备 {} 不在线或通道无效", deviceId); +// } +// } +// +// // 假设有方法通过通道获取设备 ID +// private String getDeviceId(Channel channel) { +// // 这里应该有逻辑来从通道获取设备 ID +// return "deviceId"; +// } +//} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java new file mode 100644 index 0000000..1fd0ee2 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/config/TcpClient.java @@ -0,0 +1,42 @@ +package net.maku.iot.communication.tcp.config; + +import cn.hutool.json.JSONUtil; +import net.maku.iot.communication.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.TcpMsgDTO; +import net.maku.iot.enums.DevicePropertyEnum; +import net.maku.iot.enums.DeviceTopicEnum; + +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.Socket; + +public class TcpClient { + public static void main(String[] args) { + String serverAddress = ""; // 服务端的地址 + int port = 8888; // 服务端的端口号 + + try (Socket socket = new Socket(serverAddress, port); + OutputStream outputStream = socket.getOutputStream(); + PrintWriter writer = new PrintWriter(outputStream, true)) { + + DevicePropertyDTO dto = new DevicePropertyDTO(); + dto.setDeviceID("123456"); + dto.setPropertyType(DevicePropertyEnum.TEMPERATURE); + dto.setPayload("60"); + + + TcpMsgDTO tcpMsgDTO = new TcpMsgDTO(); + tcpMsgDTO.setMsg(dto); + tcpMsgDTO.setTopic(DeviceTopicEnum.PROPERTY.getTopic()); + + + writer.println(JSONUtil.toJsonStr(tcpMsgDTO)); // 发送消息到服务端 + + System.out.println("Message sent: " + JSONUtil.toJsonStr(tcpMsgDTO)); + + Thread.sleep(100000); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java new file mode 100644 index 0000000..a15fcd0 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/factory/TcpMessageHandlerFactory.java @@ -0,0 +1,47 @@ +package net.maku.iot.communication.tcp.factory; + +import lombok.RequiredArgsConstructor; +import net.maku.iot.communication.tcp.handler.TCPMessageHandler; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * TCP消息处理器工厂,自动获取所有实现的处理器实例 + * + * @author LSF maku_lsf@163.com + */ +@Component +@RequiredArgsConstructor +public class TcpMessageHandlerFactory { + private final ApplicationContext applicationContext; + + /** + * 所有消息处理器 + */ + private List messageHandlers; + + private List loadHandlers() { + if (messageHandlers != null) { + return messageHandlers; + } + messageHandlers = new ArrayList<>(applicationContext.getBeansOfType(TCPMessageHandler.class).values()); + return messageHandlers; + } + + /** + * 获取与主题对应的tcp消息处理器 + * + * @param topic 主题 + * @return 处理器列表 + */ + public List getHandlersForTopic(String topic) { + return Collections.unmodifiableList(loadHandlers().stream() + .filter(handler -> handler.supports(topic)) + .collect(Collectors.toList())); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java new file mode 100644 index 0000000..99053a2 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/ConnectionHandler.java @@ -0,0 +1,82 @@ +package net.maku.iot.communication.tcp.handler; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.AttributeKey; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import net.maku.framework.common.utils.JsonUtils; +import net.maku.iot.communication.dto.TcpMsgDTO; +import net.maku.iot.communication.tcp.factory.TcpMessageHandlerFactory; + +import java.util.concurrent.ConcurrentMap; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 16:52 + */ +@Slf4j +public class ConnectionHandler extends ChannelInboundHandlerAdapter { + + + @Resource + private TcpMessageHandlerFactory tcpMessageHandlerFactory; + + private final ConcurrentMap deviceChannels; + + public ConnectionHandler(ConcurrentMap deviceChannels) { + this.deviceChannels = deviceChannels; + } + + + @Override + public void channelActive(ChannelHandlerContext ctx) { + // 请求设备发送其 ID + ctx.writeAndFlush("ACK"); + + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg!=null&& StrUtil.contains(msg.toString(),"topic")) { + // 处理 TCP 消息 + handleTcpMessage(ctx, JsonUtils.parseObject(msg.toString(), TcpMsgDTO.class)); + } else { + // 处理其他类型的消息 + log.warn("接收到未知的消息类型:{}", msg); + } + } + + + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + String deviceId = getDeviceId(ctx.channel()); + if (deviceId != null) { + deviceChannels.remove(deviceId); + } + log.info(" {} 断开连接", deviceId == null ? "未知设备" : deviceId); + } + + private void handleTcpMessage(ChannelHandlerContext ctx, TcpMsgDTO message) { + String topic = message.getTopic(); + if (topic != null) { + tcpMessageHandlerFactory.getHandlersForTopic(topic).forEach(handler -> { + handler.handle(topic, message.getMsg().toString()); + }); + } else { + log.warn("接收到主题为null的消息。"); + } + } + + + private String getDeviceId(Channel channel) { + // 从 Channel 的属性中获取设备 ID + return channel.attr(AttributeKey.valueOf("deviceId")).get(); + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java new file mode 100644 index 0000000..687212b --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DeviceCommandResponseTCPMessageHandler.java @@ -0,0 +1,21 @@ +package net.maku.iot.communication.tcp.handler; + +import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; +import net.maku.iot.enums.DeviceTopicEnum; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 19:23 + */ +public class DeviceCommandResponseTCPMessageHandler implements MqttMessageHandler { + @Override + public boolean supports(String topic) { + return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.COMMAND_RESPONSE.getTopic()); + } + + @Override + public void handle(String topic, String message) { + //TCP设备响应处理 + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java new file mode 100644 index 0000000..23bf225 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/DevicePropertyTCPMessageHandler.java @@ -0,0 +1,21 @@ +package net.maku.iot.communication.tcp.handler; + +import net.maku.iot.communication.mqtt.handler.MqttMessageHandler; +import net.maku.iot.enums.DeviceTopicEnum; + +/** + * @Description TODO + * @Author LSF + * @Date 2024/8/14 19:24 + */ +public class DevicePropertyTCPMessageHandler implements MqttMessageHandler { + @Override + public boolean supports(String topic) { + return DeviceTopicEnum.startsWith(topic, DeviceTopicEnum.PROPERTY.getTopic()); + } + + @Override + public void handle(String topic, String message) { + //TCP设备属性上报处理 + } +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java new file mode 100644 index 0000000..900bc21 --- /dev/null +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/communication/tcp/handler/TCPMessageHandler.java @@ -0,0 +1,26 @@ +package net.maku.iot.communication.tcp.handler; + + +/** + * TCP消息处理接口 + * + * @author LSF maku_lsf@163.com + */ +public interface TCPMessageHandler { + + /** + * 是否支持处理指定的topic + * + * @param topic + * @return + */ + boolean supports(String topic); + + /** + * TCP消息处理接口 + * + * @param topic + * @param message + */ + void handle(String topic, String message); +} diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java index 6588370..b2e0105 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/IotDeviceService.java @@ -3,9 +3,9 @@ package net.maku.iot.service; import net.maku.framework.common.utils.PageResult; import net.maku.framework.mybatis.service.BaseService; import net.maku.iot.entity.IotDeviceEntity; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; import net.maku.iot.query.IotDeviceQuery; -import net.maku.iot.communication.BaseCommunication; +import net.maku.iot.communication.service.BaseCommunication; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO; import net.maku.iot.vo.DeviceCommandVO; import net.maku.iot.vo.DeviceReportAttributeDataVO; diff --git a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java index 50fd92e..b9e48aa 100644 --- a/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java +++ b/maku-boot-module/maku-module-iot/src/main/java/net/maku/iot/service/impl/IotDeviceServiceImpl.java @@ -16,13 +16,13 @@ import net.maku.iot.convert.IotDeviceConvert; import net.maku.iot.dao.IotDeviceDao; import net.maku.iot.entity.IotDeviceEntity; import net.maku.iot.enums.*; -import net.maku.iot.communication.mqtt.dto.DeviceCommandResponseDTO; -import net.maku.iot.communication.mqtt.dto.DevicePropertyDTO; +import net.maku.iot.communication.dto.DeviceCommandResponseDTO; +import net.maku.iot.communication.dto.DevicePropertyDTO; import net.maku.iot.communication.mqtt.handler.DeviceCommandResponseHandler; import net.maku.iot.communication.mqtt.handler.DevicePropertyChangeHandler; import net.maku.iot.query.IotDeviceQuery; -import net.maku.iot.communication.BaseCommunication; -import net.maku.iot.communication.CommunicationServiceFactory; +import net.maku.iot.communication.service.BaseCommunication; +import net.maku.iot.communication.service.CommunicationServiceFactory; import net.maku.iot.service.IotDeviceEventLogService; import net.maku.iot.service.IotDeviceService; import net.maku.iot.vo.DeviceCommandResponseAttributeDataVO;