编程知识 cdmana.com

Netty: a high performance communication framework

Netty What is it? ?

  • Netty It's an asynchronous 、 Network application framework based on event driven , For rapid development of high performance 、 A highly reliable network IO Program
  • Netty Mainly for TCP Under the agreement , oriented Clients High concurrency applications at the end , perhaps Peer-to-Peer The application of large amount of data continuous transmission under the scenario
  • Netty The essence is a NIO frame , It is suitable for a variety of application scenarios related to server communication

Netty Application scenarios of

  • Remote service invocation of distributed services RPC frame , such as Dubbo Just use Netty Frame work RPC
  • Netty As a basic communication component with high performance , Provides TCP/UDP、HTTP Equal protocol stack , And can customize and develop private protocol stack

I'm learning Netty Before , Let's first look at why Netty Can be widely used .

One 、IO Model

What is? I/O Model ?

Simple understanding is to use what kind of channel to send and receive data , And it largely determines the performance of program communication .

Java Supported by 3 Network programming model /IO Pattern

  • BIO Synchronized and blocked

The server implements a pattern of one thread per connection , That is, when the client has a connection request, the server needs to start a thread to process it . It is suitable for the fixed mechanism with small number of connections , The requirements for server resources are relatively high , If this connection doesn't do anything, it will cause unnecessary thread overhead .

  • NIO Synchronous nonblocking

The server implementation mode deals with multiple requests for one thread ( Connect ), That is, the connection requests sent by the client will be registered on the multiplexer , The multiplexer polls the connection to have I/O The request is processed . Selectors Selector To maintain the connection channel channel.Netty Based on the framework of NIO Realization .

  • AIO Asynchronous non-blocking

AIO Introduce the concept of asynchronous channel , Adopted Proactor Pattern , Simplified programming , Valid request to start thread . After the completion of the operating system, the server program is informed to start the thread to process , It is generally used in applications with more connections and longer connection time .

Two 、BIO Model

Each read-write request creates a thread to process .

BIO Model

2.1 BIO Programming flow

  1. The server starts a ServerSocket
  2. After the client sends the request , First, ask the server if there is thread response , If not, they will wait , Or be rejected
  3. If there is a response , The client thread will wait for the end of the request , And go on with it

Server side

public class BIOMain {
    public static void main(String[] args) {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(6666));
            System.out.println(" The server has started , Port number :6666");
            while (true){
                System.out.println(" Wait for the client to connect ...");
                //  Wait for the client to connect , When there is no client connection , It will block 
                Socket socket = serverSocket.accept();
                System.out.println(" client :" + socket.getLocalAddress() + " Successful connection ");
                //  Whenever a client comes in , Start a thread to process 
                new BioServer(socket).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(serverSocket !=null) {
                System.out.println(" The server is down ");
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

client :

public class BioServer extends Thread {

    private Socket socket;

    public BioServer(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            while (true) {
                BufferedInputStream bufferedInputStream =
                        new BufferedInputStream(socket.getInputStream());
                byte[] bytes = new byte[1024];
                System.out.println(" Waiting for data to be sent ...");
                //  When there is no data , This place is going to block 
                int read = bufferedInputStream.read(bytes, 0, 1024);
                String result = new String(bytes, 0, read);
                System.out.println(">>> " + result);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

shortcoming : Each connection creates a thread , Consume CPU resources , If the process pool is added, the effect is not good , Because it's dealing with connections Accept and Read Places can cause thread blocking , Waste resources .

3、 ... and 、NIO Model

We know BIO The main problem with the model is where the thread is blocked , therefore ,NIO introduce Selector It solves the problem of thread blocking .

public class NioServer {

    public static void main(String[] args) {
        try {
            // 1.  Create a ServerSocketChannel
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

            // 2.  Get the binding port 
            serverSocketChannel.socket().bind(new InetSocketAddress(6666));

            // 3.  Set to non-blocking mode 
            serverSocketChannel.configureBlocking(false);

            // 4.  obtain Selector
            Selector selector = Selector.open();

            // 5.  take serverSocketChannel Sign up to selector On ,  And set up selector Client side Accept Events of interest 
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            // 6.  Loop waiting for client connection 
            while (true) {
                //  When no events are registered to selector when , So let's go to the next loop 
                if (selector.select(1000) == 0) {
                    //System.out.println(" There are no events happening right now , So let's go to the next loop ");
                    continue;
                }
                //  Get relevant SelectionKey aggregate 
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = it.next();
                    //  Based on event processing handler
                    handler(selectionKey);
                    it.remove();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *  Based on event processing , according to key Handle the corresponding channel events 
     * @param selectionKey
     * @throws IOException
     */
    private static void handler(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isAcceptable()) {  //  If it is OP_ACCEPT event , Indicates that there is a new client connection 
            ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
            //  Generate the corresponding Channel
            SocketChannel socketChannel = channel.accept();
            //  take socketChannel Set to non blocking 
            socketChannel.configureBlocking(false);
            System.out.println(" Client connection successful ... Generate socketChannel");
            //  Change the current socketChannel Sign up to selector On ,  Events of concern : read ,  At the same time socketChannel Associated with a Buffer
            socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024));
        } else if (selectionKey.isReadable()) { //  If it's a read event 
            //  adopt key Reverse access Channel
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //  Get the channel The associated buffer
            //ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
            ByteBuffer buffer = ByteBuffer.allocate(512);

            //  Put the present channel The data reads buffer Go inside 
            socketChannel.read(buffer);
            System.out.println(" Read data from client :"+new String(buffer.array()));

            //
            ByteBuffer buffer1 = ByteBuffer.wrap("hello client".getBytes());
            socketChannel.write(buffer1);
            selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        } else if (selectionKey.isWritable()){ //  If it's writing events 
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            System.out.println(" Write events ");
            selectionKey.interestOps(SelectionKey.OP_READ);
        }
    }
}

In order to make it easy for everyone to have a clear understanding of NIO framework , Here's an overall flow chart to show :

NIO

Speaking of NIO, You have to know its three core modules :

NIO Three core parts :

  • Channel passageway : Duplex connection channel between client and server . So in the process of the request , Between client and server Channel It's just going on and on “ Connect 、 inquiry 、 To break off ” The process of . Until the data is ready , Re pass Channel Send it back .Channel There are mainly 4 A type of :FileChannel( Read data from file )、DatagramChannel( Reading and writing UDP Network protocol data )、SocketChannel( Reading and writing TCP Network protocol data )、ServerSocketChannel( Can monitor TCP Connect )
  • Buffer buffer : A buffer container for client to store server information , If the server has the data ready , Would pass Channel Go to Buffer In the buffer .Buffer Yes 7 A type of :ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer.
  • Selector Selectors : Server choice Channel A multiplexer for .Selector There are two core tasks : Whether the monitoring data is ready , The reply Channel.

NIO working principle :

NIO It's buffer oriented programming . It's reading data into a buffer , Move back and forth in the buffer if needed .

NIO Working mode —— Non-blocking mode :

Java NIO Non blocking mode of , To cause a thread to send a request or read data from a channel , But it can only get the data that is currently available , If no data is currently available , You will get nothing , Instead of keeping threads blocked .

NIO characteristic :

One thread maintains one Selector, Selector Maintain multiple Channel, When channel When there is an incident , Then the thread processes .

BIO and NIO Comparison of

  • BIO Processing data as a stream ,NIO Processing data in blocks , Block processing is more efficient than stream processing
  • BIO It's blocked , and NIO It's non blocking
  • BIO Operation is based on byte stream and character stream , and NIO Is based on channel and buffer To operate , Data is read from the channel to the buffer or written from the buffer to the channel ,selector Used to listen for multiple channel events ( such as : Connection request , Data arrival, etc ), So a single thread can listen to multiple client channels

NIO shortcoming :

Programming complexity , buffer Buffer Consider the read-write pointer switch . and Netty After sealing it , Optimize and provide an easy to use mode and interface , therefore Netty It is widely used in communication framework .

3、 ... and 、Netty

Netty It's an asynchronous 、 Network application framework based on event driven , It encapsulates NIO.

Netty frame :

Netty Execute the process

Netty And NIO The difference between server and client

Netty

NIO

Server side

NioServerSocketChannel

ServerSocketChannel

client

NioSocketChannel

SocketChanel

Threading model

Based on master-slave Reactor Multithreading model , It maintains two thread pools , One is to deal with Accept Connect , The other is dealing with read-write Events .

Server side :

@Slf4j
public class TcpServer extends Thread {
    private Integer port;
    public TcpServer(Integer port){
        this.port = port;
    }

    @Override
    public void run() {
        //  Create... Based on the host name and port number ip Socket address (ip Address + Port number )
        InetSocketAddress socketAddress = new InetSocketAddress(port);
        //  Main thread group , Handle Accept The thread connecting the event , Here, the number of threads is set to 1 that will do ,netty Processing link events is single threaded by default , Over setting is a waste of cpu resources 
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //  The worker thread , Handle hadnler Worker thread , In fact, it's dealing with IO Reading and writing , The default thread data is  CPU  Multiply the number of cores by 2
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        //  establish ServerBootstrap example 
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup) // initialization ServerBootstrap Thread group 
                .channel(NioServerSocketChannel.class)  //  Set the... To be instantiated ServerChannel class 
                .childHandler(new ServerChannelInitializer()) //  initialization ChannelPipeline Responsibility chain 
                .localAddress(socketAddress)
                .option(ChannelOption.SO_BACKLOG, 1024) // Set queue size 
                .childOption(ChannelOption.SO_KEEPALIVE, true); //  Whether to start the heartbeat keeping alive mechanism 

        try {
            //  Binding port , Start receiving incoming connections , Asynchronous connection 
            ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync();
            log.info("TCP The server starts listening on the port :{}", socketAddress.getPort());
            if (channelFuture.isSuccess()) {
                log.info("TCP Service started successfully -------------------");
            }

            //  The main thread executes here  wait  Child thread end , The subthread is really listening and accepting requests ,
            // closeFuture() It's opening up a channel The monitor for , Responsible for monitoring channel Whether it is closed or not ,
            //  If it detects channel Shut down the , The child thread will be released ,syncUninterruptibly() Let the main thread wait for the result of the sub thread synchronously 
            channelFuture.channel().closeFuture().sync();
            log.info("TCP The service has been shut down ");
        } catch (InterruptedException e) {
            log.error("tcp server exception: {}", e.getMessage());
        } finally {
            //  Close the main thread group 
            bossGroup.shutdownGracefully();
            //  Close the team 
            workerGroup.shutdownGracefully();
        }
    }
}

Customize Handler

@Slf4j
public class TCPServerHandler extends ChannelInboundHandlerAdapter {

    /**
     *  Client connection ID 
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(" Client connected :{}", ctx.channel().localAddress().toString());
        //  Get the unique identity of the current client 
        String uuid = ctx.channel().id().asLongText();
        log.info(" The currently connected client id:{}", uuid);
        //  The corresponding identification and channel Deposit to map in 
        CLIENT_MAP.put(uuid, ctx.channel());
    }


    /**
     *  Read messages sent by clients 
     * @param ctx
     * @param msg  Data sent by the client 
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //  Use netty Provided ByteBuf Generate bytes Buffer, It maintains a byte array , Notice that it's not JDK Self contained ByteBuffer
        ByteBuf byteBuf = (ByteBuf) msg;
        //  Read byteBuf
        //  Business processing  
        //  Send a message back to the client 
        
    }

    /**
     *  Triggered when client disconnects 
     *  When the client actively breaks the link of the server , This channel is inactive . That is to say, the communication channel between client and server is closed and data can not be transmitted 
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info(" Before disconnection ,CLIENT_MAP:{}", CLIENT_MAP);
        // When the client disconnects , eliminate map Cached client information 
        CLIENT_MAP.clear();
        log.info(ctx.channel().localAddress().toString() + "  The channel is not active ! And close .");
        log.info(" After disconnection ,CLIENT_MAP:{}", CLIENT_MAP);
        //  Closed flow 
        ctx.close();
    }

    /**
     *  Trigger when an exception occurs 
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error(" Abnormal situation : {}", cause.toString());
    }
    
    
    /**
     * channelRead Call after execution of method , Send a message to the client 
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // writeAndFlush = write + flush: Write data to cache , And refresh 
        //  You need to encode the data you send 
        ctx.writeAndFlush(Unpooled.copiedBuffer(" Received a message , return ok!"));
    }
}

client :

public class NettyClient {

    public void run(){
        //  An event loop group 
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            //  The client starts helper
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println(" The client is ready , About to connect to the server ...");
            //  Connect server , And back to channelFuture object , It's used for asynchronous notification 
            //  Generally in Socket Programming , Waiting for response results are blocked synchronously , and Netty It doesn't cause obstruction , because ChannelFuture It takes the form of an observer model to get the results 
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",6666).sync();
            //  Monitor channel closure 
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //  close netty
            eventLoopGroup.shutdownGracefully();
        }
    }
}

Client defined Handler

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" Client connected ..");
        ctx.writeAndFlush(Unpooled.copiedBuffer("msg", CharsetUtil.UTF_8));
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //  Read the message sent by the server 
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(" Received server :" + ctx.channel().remoteAddress() + " The news of :" + byteBuf.toString(CharsetUtil.UTF_8));
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getMessage());
    }
}

thus , One Netty The client server is set up , Start two services .

The client console prints the results :

The client is ready , About to connect to the server ... Client connected .. Received server :/127.0.0.1:6666 The news of : The server received your message , And send you a ok

Server console print results :

The server is ready Client connection address :/127.0.0.1:6666, Messages received :msg

source :

https://www.toutiao.com/i6930944968446362123/

This article is from WeChat official account. - IT Big guy says (itdakashuo)

The source and reprint of the original text are detailed in the text , If there is any infringement , Please contact the yunjia_community@tencent.com Delete .

Original publication time : 2021-03-28

Participation of this paper Tencent cloud media sharing plan , You are welcome to join us , share .

版权声明
本文为[IT tycoon says]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/04/20210408111751717c.html

Scroll to Top