软件定制开发供应商Springboot结合Netty对接硬件,实现主动发送报文和接受硬件报文(ModbusRTU或者TCP以及DTU)

Springboot结合Netty对接硬件,软件定制开发供应商实现主动发送报文和接软件定制开发供应商受硬件报文(ModbusRTU或者TCP以及DTU)

一,认识

需要了解netty 软件定制开发供应商基本知识原理,软件定制开发供应商不了解的可以查看我之前的博客,软件定制开发供应商以及网上的资料,软件定制开发供应商这里不在过多撰述。

二,开发思路

软件定制开发供应商这里以对接硬件雷达水位计为例:

软件定制开发供应商说一下思路, 软件定制开发供应商这里场景各种设备连接DTU,然后通过DTU上报报文,和接收服务器下发的指令。

例如127.0.0.1:2233 就是你服务器的ip和端口,我们需要开发部署一个 JAVA 开发的Netty 服务器来监听 2233端口, 从机配置我们的服务器ip和端口连接到netty。

那么我们开发netty 的思路应该是什么样子的。

  1. netty 监听端口;
  2. netty 保存通道长链接;
  3. 将netty 的 里面的所有通过 存放到一个 ConcurrentHashMap 里面来进行管理;
  4. 通过 netty 监听 我们可以获取 从机上报到服务器的报文,我们进行业务处理;
  5. 通过Map 我们实现 定时下发报文,让从机回复响应;

三,准备工作

3.1 引入springboot依赖

springboot,依赖, 去掉tomcat ,我们这里只做服务器,并不需要tomcat,以及只用 starter

	<parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.1.3.RELEASE</version>    </parent>	<dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter</artifactId>            <exclusions>                <exclusion>                    <groupId>org.springframework.boot</groupId>                    <artifactId>spring-boot-starter-tomcat</artifactId>                </exclusion>                <exclusion>                    <groupId>org.slf4j</groupId>                    <artifactId>slf4j-log4j12</artifactId>                </exclusion>            </exclusions>    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

3.2 netty 核心包

		<!-- netty架包 -->        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-all</artifactId>        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

3.2 以及hutool 这里我们会用 它的定时器

		<dependency>            <groupId>cn.hutool</groupId>            <artifactId>hutool-all</artifactId>            <version>4.6.1</version>        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

其他相关依赖 不在撰写, 数据库依赖以及 工具类依赖 ,自己按需引用

四,编写代码

4.1 编写netty服务器

不在过多解释代码,每行都有注释

package com.joygis.iot.netty.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import java.net.InetSocketAddress;/** * 功能描述: netty服务启动类 * * @Author keLe * @Date 2022/8/26 */@Slf4j@Componentpublic class NettyServer {    public void start(InetSocketAddress address) {        //配置服务端的NIO线程组        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            // 绑定线程池,编码解码            //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝            ServerBootstrap bootstrap = new ServerBootstrap()                    .group(bossGroup, workerGroup)                    // 指定Channel                    .channel(NioServerSocketChannel.class)                    //使用指定的端口设置套接字地址                    .localAddress(address)                    //使用自定义处理类                    .childHandler(new NettyServerChannelInitializer())                    //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数                    .option(ChannelOption.SO_BACKLOG, 128)                    //保持长连接,2小时无数据激活心跳机制                    .childOption(ChannelOption.SO_KEEPALIVE, true)                    //将小的数据包包装成更大的帧进行传送,提高网络的负载                    .childOption(ChannelOption.TCP_NODELAY, true);            // 绑定端口,开始接收进来的连接            ChannelFuture future = bootstrap.bind(address).sync();            if (future.isSuccess()) {                log.info("netty服务器开始监听端口:{}",address.getPort());            }            //关闭channel和块,直到它被关闭            future.channel().closeFuture().sync();        } catch (Exception e) {            e.printStackTrace();            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

4.2 编写netty服务器 自定义处理类

package com.joygis.iot.netty.server;import com.joygis.iot.netty.MyDecoder;import com.joygis.iot.netty.MyEncoder;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/** * 功能描述: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器 * * @Author keLe * @Date 2022/8/26 */public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception {        ChannelPipeline pipeline = socketChannel.pipeline();        //接收消息格式,使用自定义解析数据格式        pipeline.addLast("decoder",new MyDecoder());        //发送消息格式,使用自定义解析数据格式        pipeline.addLast("encoder",new MyEncoder());        //针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开        //如果是读空闲或者写空闲,不处理,这里根据自己业务考虑使用        //pipeline.addLast(new IdleStateHandler(600,0,0, TimeUnit.SECONDS));        //自定义的空闲检测        pipeline.addLast(new NettyServerHandler());    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

接收消息格式,使用自定义解析数据格式工具类

package com.joygis.iot.netty;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;/** * 功能描述: 自定义接收消息格式 * * @Author keLe * @Date 2022/8/26 */public class MyDecoder extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {        //创建字节数组,buffer.readableBytes可读字节长度        byte[] b = new byte[byteBuf.readableBytes()];        //复制内容到字节数组b        byteBuf.readBytes(b);        //字节数组转字符串        String str = new String(b);        list.add(bytesToHexString(b));    }    public String bytesToHexString(byte[] bArray) {        StringBuffer sb = new StringBuffer(bArray.length);        String sTemp;        for (int i = 0; i < bArray.length; i++) {            sTemp = Integer.toHexString(0xFF & bArray[i]);            if (sTemp.length() < 2) {                sb.append(0);            }            sb.append(sTemp.toUpperCase());        }        return sb.toString();    }    public static String toHexString1(byte[] b) {        StringBuffer buffer = new StringBuffer();        for (int i = 0; i < b.length; ++i) {            buffer.append(toHexString1(b[i]));        }        return buffer.toString();    }    public static String toHexString1(byte b) {        String s = Integer.toHexString(b & 0xFF);        if (s.length() == 1) {            return "0" + s;        } else {            return s;        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

自定义发送消息格式,使用自定义解析数据格式工具类

/** * 功能描述: 自定义发送消息格式 * * @Author keLe * @Date 2022/8/26 */public class MyEncoder extends MessageToByteEncoder<String> {    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {        //将16进制字符串转为数组        byteBuf.writeBytes(hexString2Bytes(s));    }    /**     * 功能描述: 16进制字符串转字节数组     * @Author keLe     * @Date 2022/8/26     * @param src 16进制字符串     * @return byte[]     */    public static byte[] hexString2Bytes(String src) {        int l = src.length() / 2;        byte[] ret = new byte[l];        for (int i = 0; i < l; i++) {            ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();        }        return ret;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

4.3 编写netty服务器 报文处理类

package com.joygis.iot.netty.server;import com.joygis.iot.netty.ChannelMap;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelId;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.timeout.IdleState;import io.netty.handler.timeout.IdleStateEvent;import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;/** * 功能描述: netty服务端处理类 * * @Author keLe * @Date 2022/8/26 */@Slf4jpublic class NettyServerHandler extends ChannelInboundHandlerAdapter {    /**     * 功能描述: 有客户端连接服务器会触发此函数     * @Author keLe     * @Date 2022/8/26     * @param  ctx 通道     * @return void     */    @Override    public void channelActive(ChannelHandlerContext ctx) {        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();        String clientIp = insocket.getAddress().getHostAddress();        int clientPort = insocket.getPort();        //获取连接通道唯一标识        ChannelId channelId = ctx.channel().id();        //如果map中不包含此连接,就保存连接        if (ChannelMap.getChannelMap().containsKey(channelId)) {            log.info("客户端:{},是连接状态,连接通道数量:{} ",channelId,ChannelMap.getChannelMap().size());        } else {            //保存连接            ChannelMap.addChannel(channelId, ctx.channel());            log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]",channelId, clientIp,clientPort);            log.info("连接通道数量: {}",ChannelMap.getChannelMap().size());        }    }    /**     * 功能描述: 有客户端终止连接服务器会触发此函数     * @Author keLe     * @Date 2022/8/26     * @param  ctx 通道处理程序上下文     * @return void     */    @Override    public void channelInactive(ChannelHandlerContext ctx) {        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();        String clientIp = inSocket.getAddress().getHostAddress();        ChannelId channelId = ctx.channel().id();        //包含此客户端才去删除        if (ChannelMap.getChannelMap().containsKey(channelId)) {            //删除连接            ChannelMap.getChannelMap().remove(channelId);            log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]",channelId, clientIp,inSocket.getPort());            log.info("连接通道数量: " + ChannelMap.getChannelMap().size());        }    }    /**     * 功能描述: 有客户端发消息会触发此函数     * @Author keLe     * @Date 2022/8/26     * @param  ctx 通道处理程序上下文     * @param  msg 客户端发送的消息     * @return void     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        log.info("加载客户端报文,客户端id:{},客户端消息:{}",ctx.channel().id(), msg);        String data = String.valueOf(msg);        Integer water = Integer.parseInt(data.substring(6,10),16);        log.info("当前水位:{}cm",water);        //响应客户端        //this.channelWrite(ctx.channel().id(), msg);    }   /* @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        String bytes = "01 03 00 02 00 01 25 CA";        ctx.writeAndFlush(bytes);    }*/    /**     * 功能描述: 服务端给客户端发送消息     * @Author keLe     * @Date 2022/8/26     * @param  channelId 连接通道唯一id     * @param  msg 需要发送的消息内容     * @return void     */    public void channelWrite(ChannelId channelId, Object msg) throws Exception {        Channel channel = ChannelMap.getChannelMap().get(channelId);        if (channel == null) {            log.info("通道:{},不存在",channelId);            return;        }        if (msg == null || msg == "") {            log.info("服务端响应空的消息");            return;        }        //将客户端的信息直接返回写入ctx        channel.write(msg);        //刷新缓存区        channel.flush();    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {        String socketString = ctx.channel().remoteAddress().toString();        if (evt instanceof IdleStateEvent) {            IdleStateEvent event = (IdleStateEvent) evt;            if (event.state() == IdleState.READER_IDLE) {                log.info("Client:{},READER_IDLE 读超时",socketString);                ctx.disconnect();                Channel channel = ctx.channel();                ChannelId id = channel.id();                ChannelMap.removeChannelByName(id);            } else if (event.state() == IdleState.WRITER_IDLE) {                log.info("Client:{}, WRITER_IDLE 写超时",socketString);                ctx.disconnect();                Channel channel = ctx.channel();                ChannelId id = channel.id();                ChannelMap.removeChannelByName(id);            } else if (event.state() == IdleState.ALL_IDLE) {                log.info("Client:{},ALL_IDLE 总超时",socketString);                ctx.disconnect();                Channel channel = ctx.channel();                ChannelId id = channel.id();                ChannelMap.removeChannelByName(id);            }        }    }    /**     * 功能描述: 发生异常会触发此函数     * @Author keLe     * @Date 2022/8/26     * @param  ctx 通道处理程序上下文     * @param  cause 异常     * @return void     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();        log.info("{}:发生了错误,此连接被关闭。此时连通数量:{}",ctx.channel().id(),ChannelMap.getChannelMap().size());    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159

4.4 编写管理通道Map类

package com.joygis.iot.netty;import io.netty.channel.Channel;import io.netty.channel.ChannelId;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import org.springframework.util.CollectionUtils;import java.util.concurrent.ConcurrentHashMap;/** * 功能描述: 管理通道Map类 * * @Author keLe * @Date 2022/8/26 */public class ChannelMap {    /**     * 管理一个全局map,保存连接进服务端的通道数量     */    private static final ConcurrentHashMap<ChannelId, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(128);    public static ConcurrentHashMap<ChannelId, Channel> getChannelMap() {        return CHANNEL_MAP;    }    /**     *  获取指定name的channel     */    public static Channel getChannelByName(ChannelId channelId){        if(CollectionUtils.isEmpty(CHANNEL_MAP)){            return null;        }        return CHANNEL_MAP.get(channelId);    }    /**     *  将通道中的消息推送到每一个客户端     */    public static boolean pushNewsToAllClient(String obj){        if(CollectionUtils.isEmpty(CHANNEL_MAP)){            return false;        }        for(ChannelId channelId: CHANNEL_MAP.keySet()) {            Channel channel = CHANNEL_MAP.get(channelId);            channel.writeAndFlush(new TextWebSocketFrame(obj));        }        return true;    }    /**     *  将channel和对应的name添加到ConcurrentHashMap     */    public static void addChannel(ChannelId channelId,Channel channel){        CHANNEL_MAP.put(channelId,channel);    }    /**     *  移除掉name对应的channel     */    public static boolean removeChannelByName(ChannelId channelId){        if(CHANNEL_MAP.containsKey(channelId)){            CHANNEL_MAP.remove(channelId);            return true;        }        return false;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

4.5 编写配置类

package com.joygis.iot.netty;import lombok.Getter;import lombok.Setter;import lombok.ToString;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.PropertySource;import org.springframework.stereotype.Component;/** * 功能描述: 配置类 * * @Author keLe * @Date 2022/8/26 */@Setter@Getter@ToString@Component@Configuration@PropertySource("classpath:application.yml")@ConfigurationProperties(prefix = "socket")public class SocketProperties {    private Integer port;    private String host;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

appliction.yml

spring:  profiles:    active: test  resources:    cache:      period: 0  application:    name: iot-nettysocket:  # 监听端口 8090  port: 8090  #ip地址  host: 127.0.0.1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

4.6 springboot 启动,netty也自启动,任务定时器池也启动

package com.joygis.iot.config;import cn.hutool.cron.CronUtil;import com.joygis.iot.netty.SocketProperties;import com.joygis.iot.netty.server.NettyServer;import lombok.extern.slf4j.Slf4j;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.net.InetSocketAddress;/** * 功能描述: 任务队列 * @Author keLe * @Date 2022/7/20 */@Component@Slf4jpublic class LaunchRunner implements CommandLineRunner {    @Resource    private NettyServer nettyServer;    @Resource    private SocketProperties socketProperties;    @Override    public void run(String... args) throws Exception {        TaskRunner();        InetSocketAddress address = new InetSocketAddress(socketProperties.getHost(),socketProperties.getPort());        log.info("netty服务器启动地址:"+socketProperties.getHost());        nettyServer.start(address);    }    /**     * 执行正在运行的任务     */    private  void TaskRunner() {        /**         * 任务队列启动         */        CronUtil.setMatchSecond(true);        CronUtil.start();        log.info("-----------------------任务服务启动------------------------\	" +                        "当前正在启动的{}个任务"+                        "-----------------------------------------------------------\	"                , CronUtil.getScheduler().size()        );    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

4.7 编写定时下发报文

两个定时器,一个定时下发报文,一个定时删除不活跃的连接

package com.joygis.iot.manage;import com.joygis.iot.netty.ChannelMap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelId;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;import javax.annotation.Resource;import java.util.Arrays;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;/** * 功能描述: 定时发送Dtu报文 * * @Author keLe * @Date 2022/8/29 */@Slf4j@Componentpublic class DtuManage {    public void sendMsg(){        ConcurrentHashMap<ChannelId, Channel> channelMap = ChannelMap.getChannelMap();        if(CollectionUtils.isEmpty(channelMap)){            return;        }        ConcurrentHashMap.KeySetView<ChannelId, Channel> channelIds = channelMap.keySet();        byte[] msgBytes = {0x01, 0x03, 0x00, 0x02, 0x00, 0x01, 0x25, (byte) 0xCA};        for(ChannelId channelId : channelIds){            Channel channel = ChannelMap.getChannelByName(channelId);            // 判断是否活跃            if(channel==null || !channel.isActive()){                ChannelMap.getChannelMap().remove(channelId);                log.info("客户端:{},连接已经中断",channelId);                return ;            }            // 指令发送            ByteBuf buffer = Unpooled.buffer();            log.info("开始发送报文:{}",Arrays.toString(msgBytes));            buffer.writeBytes(msgBytes);            channel.writeAndFlush(buffer).addListener((ChannelFutureListener) future -> {                if (future.isSuccess()) {                    log.info("客户端:{},回写成功:{}",channelId,Arrays.toString(msgBytes));                } else {                    log.info("客户端:{},回写失败:{}",channelId,Arrays.toString(msgBytes));                }            });        }    }    /**     * 功能描述: 定时删除不活跃的连接     * @Author keLe     * @Date 2022/8/26     * @return void     */    public void deleteInactiveConnections(){        ConcurrentHashMap<ChannelId, Channel> channelMap = ChannelMap.getChannelMap();        if(!CollectionUtils.isEmpty(channelMap)){            for (Map.Entry<ChannelId, Channel> next : channelMap.entrySet()) {                ChannelId channelId = next.getKey();                Channel channel = next.getValue();                if (!channel.isActive()) {                    channelMap.remove(channelId);                    log.info("客户端:{},连接已经中断",channelId);                }            }        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

五 ,测试

使用网络助手进行调试
百度云盘下载地址:https://pan.baidu.com/s/1dcVk9MH88RMRF9dmR3mH5g
提取码:z7h0

5.1 启动服务

5.2 通过网络助手连接发送指令

发送报文

定时发送报文

六 总结

我们有了方向,才有了思路,就会有具体落地。
如果对netty还有不太明白的地方,可以看看我的后续博客,持续更新中。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发