Mina基础(二):客户端、服务端实现


一、服务端

本次示例当中服务端由一下几部分构成

1、服务端Socket

2、服务端Socket封装类

3、DataHandler数据处理

4、项目启动后调用

1.1、创建Session服务端

package com.test.mina.server;

import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
 * @ClassName SocketServer
 * @description: Mina服务端,提供socket端口侦听服务:用于监听客户端连接,并接受客户端发送过来的数据
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-08 13:49:19
 */
public class SocketServer {
    private final static Logger logger = LoggerFactory.getLogger(SocketServer.class);
    /**
     * 1、创建一个非阻塞的server端的Socket
     * 2、设置编码过滤器(使用Mina提供的文本换行符编解码器)
     * 3、设置读取数据的缓冲区大小
     * 4、设置读写通道无操作进入空闲状态的时间 10ms
     * 5、绑定真正的逻辑处理
     * 6、绑定端口
     */

    // 绑定的端口
    private int port;
    private String name;
    // 服务启动标志
    private boolean started=false;

    // 设置读取、设置数据的缓冲区大小
    private int readerIndleTime=60; //s
    private int writerIndleTime=900; //s

    private IoHandler dataHandler;

    private IoAcceptor acceptorServer;
    public SocketServer(String name,int port){
        this.name = name;
        this.port = port;
    }

    public boolean start(){
        if(started){
            return true;
        }

        try {
            // 创建一个非阻塞的server端的Socket
            acceptorServer = new NioSocketAcceptor();

            acceptorServer.getSessionConfig().setReaderIdleTime(getReaderIndleTime());
            acceptorServer.getSessionConfig().setWriterIdleTime(getWriterIndleTime());
            // 设置读写通道无操作进入空闲状态的时间 10ms
            acceptorServer.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

            // 设置过滤链
            acceptorServer.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
            acceptorServer.setHandler(getDataHandler());

            acceptorServer.bind(new InetSocketAddress(port));
            logger.info("{} listening port:{}",name,port);

            started = true;

            return true;
        } catch (IOException e) {
            logger.error("start",e);
            return false;
        }
    }


    public void close() {
        this.acceptorServer.dispose();
    }

    public int getReaderIndleTime() {
        return readerIndleTime;
    }

    public void setReaderIndleTime(int readerIndleTime) {
        this.readerIndleTime = readerIndleTime;
    }

    public int getWriterIndleTime() {
        return writerIndleTime;
    }

    public void setWriterIndleTime(int writerIndleTime) {
        this.writerIndleTime = writerIndleTime;
    }

    public IoHandler getDataHandler() {
        return dataHandler;
    }

    public void setDataHandler(IoHandler dataHandler) {
        this.dataHandler = dataHandler;
    }
}

1.2、创建服务端封装类

package com.techen.mina.server;

import com.techen.mina.utils.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @ClassName Fes
 * @description:
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-15 10:22:26
 */
public class Fes {
    private static final Logger logger= LoggerFactory.getLogger(Fes.class);

    // 终端上线端口服务
    private SocketServer terminalServer;
    // 主站侧端口服务
    private SocketServer clientServer;

    // 终端上线端口
    private int terminalPort;
    // 主站连接端口
    private int clientPort;

    private Config config;

    // 终端数据处理器
    private DataHandler terminalDataHandler;
    // 主站连接数据处理器
    private DataHandler clientDataHandler;

    public Fes(Config config) {
        this.config = config;
    }



    public void start() {

        // 启动终端通信监听
        terminalPort = config.getIntValue("terminal_port", 12000);
        terminalServer = new SocketServer("Terminal Service", terminalPort);
        terminalServer.setReaderIndleTime(config.getIntValue("terminal_idle", 300));
        terminalServer.setWriterIndleTime(config.getIntValue("terminal_idle", 300));
        terminalServer.setDataHandler(terminalDataHandler);
        terminalServer.start();


        // 启动主站通信监听
        clientPort = config.getIntValue("client_port", 12001);
        clientServer = new SocketServer("Client Service", clientPort);
        clientServer.setReaderIndleTime(config.getIntValue("client_idle", 60));

        clientServer.setDataHandler(clientDataHandler);
        clientServer.start();

        logger.info("fes started.");
    }

    public void close() {
        terminalServer.close();
        clientServer.close();
    }

    public void setTerminalDataHandler(DataHandler terminalDataHandler) {
        this.terminalDataHandler = terminalDataHandler;
    }

    public DataHandler getClientDataHandler() {
        return clientDataHandler;
    }

    public DataHandler getTerminalDataHandler() {
        return terminalDataHandler;
    }

    public void setClientDataHandler(DataHandler clientDataHandler) {
        this.clientDataHandler = clientDataHandler;
    }
}

1.3、数据处理

package com.techen.mina.server;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @ClassName DataHandler
 * @description:
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-15 09:59:56
 */
@Component
public abstract class DataHandler extends IoHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(DataHandler.class);

    public DataHandler() {
        super();
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        logger.info("连接成功");
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        logger.info("服务端与客户端连接打开");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        logger.info("服务端与客户端连接关闭");
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        logger.info("服务端进入空闲状态");
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        logger.error("服务端发生异常", cause);
    }

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        // 收到的是字符串(传输之前的数据类型)
        byte[] msgBytes = message.toString().getBytes(StandardCharsets.UTF_8);

        try {
            this.messageReceived(session, msgBytes);
        } catch (Exception var6) {
            logger.error(var6.getMessage(), var6);
        }
    }

    public abstract void messageReceived(IoSession var1, byte[] var2);

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        logger.info("服务端发送消息成功");
    }

    @Override
    public void inputClosed(IoSession session) throws Exception {
        logger.info("输入关闭");
    }
}
/**-----------------------------------------------------------------------------------------------------**/
package com.techen.mina.server;

import org.apache.mina.core.session.IoSession;

/**
 * @ClassName ClientDataHandler
 * @description:
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-15 13:16:44
 */
public class ClientDataHandler extends DataHandler{

    @Override
    public void messageReceived(IoSession session, byte[] msgBytes) {
        System.out.println("收到客户端信息:"+new String(msgBytes));
    }
}
/**---------------------------------------------------------------------------------------------------**/
package com.techen.mina.server;

import org.apache.mina.core.session.IoSession;

/**
 * @ClassName TerminalDataHandler
 * @description:
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-15 16:18:34
 */
public class TerminalDataHandler extends DataHandler{

    @Override
    public void messageReceived(IoSession session, byte[] msgBytes) {
        System.out.println("收到设备端信息:"+new String(msgBytes));
    }
}    

1.4、项目启动后调用

package com.techen.mina;

import com.techen.mina.server.ClientDataHandler;
import com.techen.mina.server.Fes;
import com.techen.mina.server.TerminalDataHandler;
import com.techen.mina.utils.Configs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SocketMinaApplication {

    public static void main(String[] args) {
        SpringApplication.run(SocketMinaApplication.class, args);
        Fes fes = new Fes(Configs.getConfig("/config/serverconfig.properties"));
        fes.setClientDataHandler(new ClientDataHandler());
        fes.setTerminalDataHandler(new TerminalDataHandler());
        fes.start();
    }
}

二、客户端

本次示例当中客户端由一下几部分构成

1、客户端Socket

2、客户端Socket封装类

3、DataHandler数据处理

4、项目启动后调用

5、测试

2.1 客户端Socket

该类当中有一个主要的方法init,在该方法当中,添加监听端口connect.addListener()时,监听端口类可以实现IoServiceListener,去实现自动重连功能;connect方法用于连接服务端;sendMessage方法用于发送报文

package com.techen.mina.client;

import com.techen.mina.utils.CommUtils;
import com.techen.mina.utils.Configs;
import com.techen.mina.utils.SimpleThreadFactory;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.service.IoService;
import org.apache.mina.core.service.IoServiceListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @ClassName ClientServer:mina socket客户端封装。提供连接建立,自动重连等功能。
 * @description: IoSession是最重要的一个接口:当连接建立之后,不同的客户端会单独建立一个Session,在Session当中进行数据传输
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-18 17:39:39
 */
public class SocketClient {

    /**
     * 1、创建一个非阻塞的客户端
     * 2、设置超时时间
     * 3、设置编码解码器
     * 4、绑定逻辑处理类
     * 5、创建连接
     * 6、等待连接创建完成
     * 7、获取连接session
     * 8、发送数据
     * 9、等待关闭连接
     */
    private final static Logger logger = LoggerFactory.getLogger(SocketClient.class);
    private IoConnector connector;
    private IoSession ioSession;
    private AtomicBoolean beKilled=new AtomicBoolean(false);
    private AtomicBoolean isConnecting=new AtomicBoolean(false);

    private ProtocolCodecFactory codecFactory;
    // 延时连接 s
    private int delayConnect = Configs.getConfig().getIntValue("SOCKET_DELAY_CONNECT",5);
    // 重连间隔 s
    private int reConnectInterval = Configs.getConfig().getIntValue("SOCKET_RECONNECT_INTERVAL",10);
    // 连接超时时间 s
    private int connectTimeout = Configs.getConfig().getIntValue("SOCKET_CONNECT_TIMEOUT",20);
    // read idle time s
    private int readIdleTime = Configs.getConfig().getIntValue("SOCKET_READIDLE_TIME",30);

    /**
     * 自动重连
     */
    private boolean autoReconnect=true;

    private String ip;
    private int port;

    public SocketClient(String ip, int port) {
        init(ip,port,connectTimeout, readIdleTime);
    }
    public SocketClient(String ip, int port, int conTimeOut, int idleTime) {
        init(ip,port,conTimeOut, idleTime);
    }



    /**
     * @description: 初始化连接参数
     * @author: bozhiqiang
     * @updateTime: 2022/7/18 18:59
     */
    private void init(String ip, int port,int conTimeOut, int idleTime) {
        this.ip = ip;
        this.port = port;
        // 创建一个非阻塞的客户端
        connector = new NioSocketConnector();
        connector.setConnectTimeoutMillis(conTimeOut * 1000);
        connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE,idleTime);

        //设置过滤链
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
        connector.getFilterChain().addLast("executor", new ExecutorFilter());
        connector.addListener(new ClientIoServiceListener());
    }

    /**
     * @description: mina socket连接状态监听类:mina socket连接状态监听类,在连接session失效后,自动重连
     * @author: bozhiqiang
     * @updateTime: 2022/7/18 19:02
     */
    private class ClientIoServiceListener implements IoServiceListener {
        @Override
        public void serviceActivated(IoService service) throws Exception {
            //logger.debug("socketClient serviceActivated");
        }

        @Override
        public void serviceDeactivated(IoService service) throws Exception {
            //logger.debug("socketClient serviceDeactivated");
        }
        @Override
        public void serviceIdle(IoService service, IdleStatus idleStatus)
                throws Exception {
            logger.debug("socketClient serviceIdle");
        }
        @Override
        public void sessionClosed(IoSession session) throws Exception {
            logger.debug("socketClient sessionClosed");
        }
        @Override
        public void sessionCreated(IoSession session) throws Exception {
            logger.debug("socketClient sessionCreated");
        }

        @Override
        public void sessionDestroyed(IoSession session) throws Exception {
            logger.debug("socketClient sessionDestroyed! ");
            if(autoReconnect){
                delayConnect(delayConnect, TimeUnit.SECONDS);
            }
        }
    }

    /**
     * socket连接断开后,延时重连
     * @Description: 在侦听到连接断开事件后,延时重连
     * @param delay		延时时间
     * @param timeUnit	延时时间单位
     * @return: void
     * @Modify:
     */
    private void delayConnect(long delay, TimeUnit timeUnit) {
        logger.debug("delay " + delay+" "+timeUnit  +" to reconnect");
        scheduleExecutor.schedule(new Runnable() {
            public void run() {
                try {
                    SocketClient.this.connect();
                } catch (Exception e) {
                    logger.error("error", e);
                }
            }
        }, delay, timeUnit);
    }
    /**
     * 连接建立线程池,目前只是内部自动重连采用该线程池,外部调用connect方法,还是通过调用线程
     */
    private static ScheduledExecutorService scheduleExecutor = Executors.newScheduledThreadPool(6, new SimpleThreadFactory("SocketClientConManager"));
    /**
     * 连接服务器
     * @Description: SocketClient实例创建后,根据连接参数与服务器建立连接。建立连接为阻塞过程,如连接无法建立当前线程将一直阻塞
     * @return: void
     * @Modify:
     */
    public void connect() {
        if (isConnecting.get() ||( ioSession != null &&  ioSession.isConnected())) {
            return;
        }
        isConnecting.set(true);
        beKilled.set(false);
        logger.info("connecting to {}:{}  ",ip,port );
        while(true){
            try{
                if(beKilled.get()==true){
                    logger.warn("socketClient was killed, break connect(). ");
                    break;
                }
                // 创建连接
                ConnectFuture cf = connector.connect(new InetSocketAddress(ip, port));
                // 等待连接创建完成:阻塞直到连接建立,因为我们后面要使用连接成功之后创建的Session对象来进行写数据的操作
                cf.awaitUninterruptibly();
                // 获取连接session
                ioSession = cf.getSession();
                if(ioSession.isConnected()){
                    logger.info("connected to {}:{}",ip,port);
                }
                isConnecting.set(false);
                break;
            } catch(Exception e){
                logger.error(e.getMessage(),e);
                if(autoReconnect){
                    logger.warn("connect to {}:{} failed! reconnect after {} s",ip,port,reConnectInterval);
                    try {
                        Thread.sleep(reConnectInterval*1000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
                else{
                    break;
                }
            }
        }
    }



    // 当前连接是否可用
    public boolean isConnected(){
        return (ioSession != null && ioSession.isConnected());
    }

    /**
     * 关闭连接
     * @Description: 关闭socket连接,如close前,SocketClient实例正在进行连接建立(或重连)操作,则关闭后,连接建立或重连操作也将取消
     * @return: void
     * @Modify:
     */
    public void close() {
        if (ioSession != null && ioSession.isConnected()) {
            ioSession.closeNow();
        }
        if(connector!=null){
            connector.dispose();
        }
        beKilled.set(true);
    }


    /**
     * 发送报文
     * @Description: 通过ioSession.write发送报文到服务端
     * @param messages	byte[] 报文,AMR自定义报文格式
     * @return: void
     * @Modify:
     */
    public boolean sendMessage(byte[] messages) {
        if (ioSession != null && ioSession.isConnected()) {
            try {
                ioSession.write(messages);
                logger.debug("forward to server:" + CommUtils.byteToHexString(messages));
            } catch (Exception e) {
                logger.error("",e);
                return false;
            }
        } else {
            logger.info("IoSession is not connected! " + ioSession);
            return false;
        }
        return true;
    }




    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
    public void setDataHandler(IoHandler dataHandler) {
        connector.setHandler(dataHandler);
    }
    public void setCodecFactory(ProtocolCodecFactory codecFactory) {
        this.codecFactory = codecFactory;
    }
    public IoSession getIoSession() {
        return ioSession;
    }
    public AtomicBoolean getBeKilled() {
        return beKilled;
    }
    public void setBeKilled(AtomicBoolean beKilled) {
        this.beKilled = beKilled;
    }
    public int getDelayConnect() {
        return delayConnect;
    }
    public void setDelayConnect(int delayConnect) {
        this.delayConnect = delayConnect;
    }
    public int getReConnectInterval() {
        return reConnectInterval;
    }
    public void setReConnectInterval(int reConnectInterval) {
        this.reConnectInterval = reConnectInterval;
    }
    public int getConnectTimeout() {
        return connectTimeout;
    }
    public void setConnectTimeout(int connectTimeout) {
        this.connectTimeout = connectTimeout;
    }
    public int getReadIdleTime() {
        return readIdleTime;
    }
    public void setReadIdleTime(int readIdleTime) {
        this.readIdleTime = readIdleTime;
    }
    public boolean isAutoReconnect() {
        return autoReconnect;
    }
    public void setAutoReconnect(boolean autoReconnect) {
        this.autoReconnect = autoReconnect;
    }

}

2.2 客户端Socket封装

package com.techen.mina.client;

import com.techen.mina.utils.Configs;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @ClassName JobServer
 * @description:
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-18 19:50:26
 */
public class JobServer {
    String ip;
    int port;
    // 终端数据处理器
    private DataHandler clientTaskDataHandler;
    // 终端上线端口服务
    private SocketClient socketClient;

    /**
     * 任务服务器、前置机连接集合,key为前置机的serverId
     */
    public static Map<String,SocketClient> onlineClients;

    public JobServer(){
        onlineClients = new ConcurrentHashMap<>();
    }

    // 服务启动时调用即可
    public void start() {
        // 启动终端通信监听
        ip = Configs.getConfig().getValue("server_ip", "127.0.0.1");
        port = Configs.getConfig().getIntValue("server_port", 7950);

        // 初始化ip、port、创建创建一个非阻塞的客户端(connect)
        socketClient = new SocketClient(ip,port);
        // 设置处理器
        socketClient.setDataHandler(getClientTaskDataHandler());

        // 将连接加入到容器当中
        onlineClients.put("1",socketClient);
        // 创建连接、获取IoSession
        socketClient.connect();
    }


    public DataHandler getClientTaskDataHandler() {
        return clientTaskDataHandler;
    }
    public void setClientTaskDataHandler(DataHandler clientTaskDataHandler) {
        this.clientTaskDataHandler = clientTaskDataHandler;
    }

    public static Map<String, SocketClient> getOnlineClients() {
        return onlineClients;
    }
}

2.3 DataHandler数据处理

package com.techen.mina.client;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @ClassName DataHandler
 * @description:
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-15 09:59:56
 */
@Component
public abstract class DataHandler extends IoHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(DataHandler.class);

    public DataHandler() {
        super();
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        logger.info("客端登录成功");
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        logger.info("服务端与客户端连接打开");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        logger.info("客户端登录关闭");
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        logger.info("服务端进入空闲状态");
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        logger.error("服务端发生异常", cause);
    }

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        byte[] msgBytes = message.toString().getBytes(StandardCharsets.UTF_8);

        try {
            this.messageReceived(session, msgBytes);
        } catch (Exception var6) {
            logger.error(var6.getMessage(), var6);
        }
    }

    public abstract void messageReceived(IoSession var1, byte[] var2);

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
    }

    @Override
    public void inputClosed(IoSession session) throws Exception {
        logger.info("输入关闭");
    }
}
/**-----------------------------------------------------------------------------------------------------**/
    package com.techen.mina.client;

import com.techen.mina.utils.CommUtils;
import com.techen.mina.utils.Configs;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

/**
 * @ClassName ClientTaskDataHandler
 * @description: 该处理器的作用主要是接收信息,并维持心跳
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-18 19:43:00
 */
public class ClientTaskDataHandler extends DataHandler {

    private static final Logger logger = LoggerFactory.getLogger(ClientTaskDataHandler.class);
    @Override
    public void messageReceived(IoSession session, byte[] message) {
        logger.info("客户端收到消息:{}", CommUtils.byteToHexString(message));
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        InetSocketAddress remoteSocketAddress = (InetSocketAddress)session.getRemoteAddress();
        //超过2个心跳周期未收到前置机报文,断开连接
        if(session.getIdleCount(IdleStatus.READER_IDLE) >Integer.parseInt(Configs.getConfig().getValue("max_heart_count", "2"))){
            logger.info("taskclient readidle count>=2, close now. {}",remoteSocketAddress);
            session.closeNow();
            return;
        }
        //发送心跳报文
        if(status==IdleStatus.READER_IDLE){
            logger.debug("send heart bytes to:{}",remoteSocketAddress);
            session.write("心跳报文");
        }
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        //发送登录报文
        InetSocketAddress remoteSocketAddress = (InetSocketAddress)session.getRemoteAddress();
        logger.info("send login bytes to:{}",remoteSocketAddress);
        session.write("登录报文");
    }
}

2.4 容器启动时创建

package com.techen.mina;

import com.techen.mina.client.ClientTaskDataHandler;
import com.techen.mina.client.JobServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SocketMinaApplication {

    public static void main(String[] args) {
        SpringApplication.run(SocketMinaApplication.class, args);
        JobServer jobServer = new JobServer();
        jobServer.setClientTaskDataHandler(new ClientTaskDataHandler());
        jobServer.start();
    }

}

2.5 测试

package com.techen.mina.controller;

import com.techen.mina.client.JobServer;
import com.techen.mina.client.SocketClient;
import com.techen.mina.utils.CommUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName ClientController
 * @description:
 * @author: bozhiqiang
 * @Version 1.0.0
 * @createTime: 2022-07-19 10:06:19
 */
@RestController
public class ClientController {

    @GetMapping("/test1")
    public void test1(){

        SocketClient socketClient = JobServer.getOnlineClients().get("1");
        // 发送报文到服务端
        socketClient.sendMessage(CommUtils.hex2BinaryLH("168e200e200684b000586000210f70000010002cbbc0a100068562113000000681104333334333c1600000000000000000000000000000000574010090800c916"));

    }
    @GetMapping("/test2")
    public void test2(){
        SocketClient socketClient = JobServer.getOnlineClients().get("1");
        // 发送报文到服务端
        socketClient.sendMessage(CommUtils.hex2BinaryLH("268e200e200684b000586000210f70000010002cbbc0a100068562113000000681104333334333c1600000000000000000000000000000000574010090800c916"));

    }

    @GetMapping("/test3")
    public void test3(){
        SocketClient socketClient = JobServer.getOnlineClients().get("1");
        // 发送报文到服务端
        socketClient.sendMessage(CommUtils.hex2BinaryLH("368e200e200684b000586000210f70000010002cbbc0a100068562113000000681104333334333c1600000000000000000000000000000000574010090800c916"));

    }
}

文章作者: superzqbo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 superzqbo !
评论
  目录