编程知识 cdmana.com

Netty source code 3-channelpipeline and channelhandler

Welcome to pay attention  github.com/hsfxuebao , I hope that's helpful , If you think it's OK, please give me a little Star Ha

   Netty Of Channel Filter implementation principle and Servlet、Filter The mechanism is consistent , It will Channel The data pipeline of is abstracted as ChannelPipeline, Message in ChannelPipeline Medium flow and transfer .ChannelPipeline hold I/O Event interceptors ChannelHandler The linked list of , from ChannelHandler Yes I/O Events are intercepted and handled , You can easily add and delete ChannelHandler To achieve different business logic customization , There is no need for what is already ChannelHandler Make changes , It can realize modification closure and extension support .

1. General situation

The following figure shows the specific event propagation flow chart :

  • (1) At the bottom SocketChannel read() Method reading ByteBuf, Trigger ChannelRead event , from I/O Threads NioEventLoop call ChannelPipeline Of fireChannelRead(Object msg) Method , The message is ( ByteBuf) Transferred to the ChannelPipeline in .

  • (2) The messages are in turn HeadHandler、ChannelHandlerl 、ChannelHandler2-.. ... TailHandler Intercept and process , In the process , whatever ChannelHandler Can interrupt the current process , End message delivery ;

  • (3) call ChannelHandlerContext Of write Method send message , Message from TailHandler Start , Via ChannelHanderN.....ChannelHandlerl. HeadHandler, Finally, it is added to the message sending buffer to wait for refresh and sending , In this process, the message delivery can also be interrupted , For example, when coding fails , You need to interrupt the process , Construct abnormal Future return .

 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+  Copy code 

First clarify a relationship :
One Channel Contains a ChannelPipeline , and ChannelPipeline In the middle Maintained a by ChannelHandlerContext A bidirectional linked list . This The head of the linked list is HeadContext, The end of the list is TailContext, And each ChannelHandlerContext There is also a connection between ChannelHandler( This channelHandlerContext It's just channelHandler Context of , It is used to save the context, context and before and after channelHandler Be a bridge to communicate ), Here's the picture :

image.png

2. ChannelHandler

2.1 Channel

Channel Concept and java.nio.channel Consistent concept , To connect to IO equipment (socket, Documents, etc. ) The link between . For example, read the network 、 Write , Client initiates connection , Active close connection , Link off , Get the network address of both sides of communication .

Channel Of IO There are two main types : Non blocking IO (NIO) And obstruction IO(OIO).

There are two types of data transmission : By event messaging (Message) And pass by byte (Byte).

There are also two types of : The server (ServerSocket) And the client (Socket). There are also some protocols based on the transmission protocol Channel, Such as : UDT、SCTP etc. .

Netty Design corresponding classes layer by layer according to types . The bottom layer is the abstract class  AbstractChannel, Then on this basis IO type 、 Data transfer type 、 Applicable party type implementation . Class diagrams can be seen at a glance , As shown in the figure below :

image.png

2.1.1 Channel state

image.png

channelRegistered state

/** * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop} */
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
 Copy code 

You can see from the notes that it is in  Channel  Bound to the  Eventloop  Called at the time above .

Whether it's Server still Client, Bound to the  Eventloop  When , In the end, they call  Abstract.initAndRegister()  In this way (Server Is in  AbstractBootstrap.doBind()  Called when , Client Is in  Bootstrap.doConnect()  Called when ).

initAndRegister()  The method is defined as follows :

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    //  hold channel Bound to the Eventloop Go above the object 
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
 Copy code 

Keep tracking and you'll find  AbstractChannel.AbstractUnsafe.register0()  On the way .

        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //  Do the actual binding action . hold Channel Register events of interest to Eventloop.selector above . The concrete realization is Abstract.doRegister() In the way 
                doRegister();
                neverRegistered = false;
                registered = true;

                //  adopt pipeline The communication mechanism of , Trigger handlerAdded event 
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                
                //  adopt pipeline The communication mechanism of , Trigger channelRegistered event 
                pipeline.fireChannelRegistered();

                //  It's not bound yet , So here  isActive()  return false.
                if (isActive()) {
                    if (firstRegistration) {
                        //  If the current link is already active , Call channelActive() Method 
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
 Copy code 

You can also see from the above code , After calling  pipeline.fireChannelRegistered()  after , And then I call  isActive()  Judge whether the current link is activated , If activated, it will call  pipeline.fireChannelActive()  Method .

This is the time , about Client and Server Not activated yet , therefore , At this time, whether it is Server still Client Will not call  pipeline.fireChanenlActive()  Method .

channelActive state

From the starter  bind()  Interface start , Call down  doBind()  Method :

private ChannelFuture doBind(final SocketAddress localAddress) {
    //  Initialization and registration 
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        //  call  doBind0
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ....
    }
}
 Copy code 

doBind  Method will call  doBind0()  Method , stay  doBind0()  The method will pass  EventLoop  To carry out  channel  Of  bind() Mission .

private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {

    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                //  call channel.bind Interface 
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
 Copy code 

doBind0()  Method will be called to  pipeline.bind(localAddress, promise); Method , adopt  pipeline  The communication mechanism of , Will eventually be called to  AbstractChannel.AbstractUnsafe.bind()  Method , This method mainly does two things :

  • call  doBind(): Call the underlying layer JDK API Conduct Channel Port binding for .
  • call  pipeline.fireChannelActive().
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
    ....
    
    // wasActive  Is... Before binding is successful  false
    boolean wasActive = isActive();
    try {
        //  call doBind() call JDK Bottom API Port binding 
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    //  After binding ,isActive()  return true
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                //  Trigger channelActive event 
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}
 Copy code 

That is, when there is a new client connection , Will become active .

channelInactive state

fireChannelnactive()  Methods are called in two places : Channel.close()  and  Channel.disconnect().

Before calling, it will confirm that the status is from  Active--->Inactive.

channelUnregistered state

fireChannelUnregistered()  The method is in  Channel  from  Eventloop  Called when deregistering in . Channel.close()  When triggered .

2.2 ChannelHandler

ChannelHandler Be similar to Servlet Of Filter filter , Responsible for I/O Events or I/O Operations to intercept and process , It can selectively intercept and handle events of interest to itself , You can also pass through and terminate the event . be based on ChannelHandler Interface , Users can easily customize business logic , For example, printing logs 、 Encapsulate exception information in a unified way 、 Performance statistics, message encoding and decoding, etc .

ChannelHandler Support annotation , There are currently two types of annotations supported .

  • Sharable: Multiple ChannelPipeline share ChannelHandler;
  • Skip: By Skip Annotated methods are not called , Directly ignored .

2.2.1 ChannelHandler Life cycle of

handlerAdded(): Add to  ChannelPipeline  Called when the .
handlerRemoved(): from  ChannelPipeline  Called when removed from .
exceptionCaught(): In the process of processing  ChannelPipeline  Call... When an error occurs in the .

Handle I/O Incident or interception I/O operation , And forward it to  ChannelPipeline  Next handler in . ChannelHandler  Itself does not provide many methods , But usually one of its subtypes must be implemented :

  • ChannelInboundHandler: Handle inbound data and various state changes .
  • ChannelOutboundHandler: Process outbound data and allow all operations to be intercepted .

2.2.2 ChannelInboundHandler Interface

channelRegistered(): When Channel Already registered with it EventLoop And be able to deal with I/O When called .
channelUnregistered(): When Channel From him EventLoop Log off and can't process any of I/O When called .
channelActive(): When Channel Called when active .
channelInactive(): When Channel Called when leaving the active state and no longer connecting to the remote node .
channelRead(): When from Channel Called when reading data .
channelReadComplete(): When Channel Is called when a read operation on is completed . When all readable bytes are from Channel After reading in , The callback method will be called .

2.2.2 ChannelOutboundHandler Interface

Outbound operations and data will be generated by ChannelOutboundHandler Handle . Its method will be  Channel ChannelPipeline  as well as  ChannelHandlerContext  call .

ChannelOutboundHandler  A powerful feature of is the ability to postpone operations or events on demand , This makes it possible to process requests in a number of complex ways . for example , If the write to the remote node is paused , Then you can postpone the refresh operation and continue later .

connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise): When the request will Channel Called when connecting to a remote node .
disconnect(ChannelHandlerContext ctx, ChannelPromise promise): When the request will Channel Called when disconnected from a remote node .
deregister(ChannelHandlerContext ctx, ChannelPromise promise): When the request will Channel From its EventLoop Called on logoff .
read(ChannelHandlerContext ctx): When requested from Channel Called when reading more data .
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise): When the request passes Channel Called when writing data to a remote node .
flush(ChannelHandlerContext ctx): When requested from Channel Called when flushing queued data to a remote node .

ChannelPromise and ChannelFuture

ChannelFuture  Express  Channel  Medium asynchrony I/O Results of operation , stay netty All of the I/O Operations are asynchronous , I/O The call of will directly return , Can pass  ChannelFuture  To get I/O The result or status information of the operation .

When I/O At the beginning of the operation , A new object will be created . The new object is incomplete - It was neither successful , And there was no failure , Nor has it been cancelled , because I/O The operation is not complete yet .

If I/O Operation completed successfully ( Fail or cancel ), Mark the object as completed , It contains more specific information , For example, the cause of the failure .

Please note that , Even if the failure and cancellation belong to the completed state .

ChannelPromise  yes  ChannelFuture  A sub interface of , It defines some writable methods , Such as  setSuccess()  and  setFailure(), So that  ChannelFuture  immutable .

image.png

priority of use addListener(GenericFutureListener), Instead of await()

As a I/O When there are operations and any follow-up tasks , Recommended priority  addListener(GenericFutureListener)  To get notified , Instead of  await()

addListener(GenericFutureListener)  It's non blocking . It will put specific  ChannelFutureListener  Add to  ChannelFuture  in , then I/O Thread will be I/O Operation related future Notify the listener when finished .

ChannelFutureListener  It will be conducive to the best performance and resource utilization , Because it's not blocked at all . And it won't cause deadlock .

2.2.3 ChannelHandler Adapter

ChannelInboundHandlerAdapter  and  ChannelOutboundHandlerAdapter  These two adapter classes provide
ChannelInboundHandler  and  ChannelOutboundHandler  Basic implementation of , They inherit a common parent interface
ChannelHandler  Methods , Extended abstract classes  ChannelHandlerAdapter.

image.png

ChannelHandlerAdapter  It also provides practical methods  isSharable().

If its corresponding implementation is marked  Sharable, So this method will return  true, Indicates that it can be added to multiple  ChannelPipeline  in .

ChannelInboundHandlerAdapter  and  ChannelOutboundHandlerAdapter  The method body provided in calls its associated  ChannelHandlerContext  The equivalent method on the , Thus forwarding the event to  ChannelPipeline  Medium  ChannelHandler  in .

2.2.4 Netty Provided ChannelHandler

netty Provides part of handler, Developers can choose to inherit and implement some functions
codecs :ByteToMessageDecoder....
Half package processing :DelimiterBasedFrameDecoder,FixedLengthFrameDecoder...

3. ChannelPipeline

Through the interface, you can find ,channelPipeline There are two main functions :

  • The first management channelHandler Of crud, such as first,last,addxxx Other methods
  • the second IO Propagation and handling of events , For example, the source code contains firexxx The representation of triggers the next handler event

The inheritance diagram is as follows :

image.png

We already know one before Channel The basic process of initialization , Now let's review . The following code is AbstractChannel Constructors :

protected AbstractChannel(Channel parent, ChannelId id) {
    this.parent = parent;
    this.id = id;
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
 Copy code 

3.1 HeadContext and TailContext

HeadContext Realized ChannelInboundHandler and ChannelOutboundHandler, and tail Realized ChannelInboundHandler Interface , And they all realized ChannelHandlerContext Interface , So we can say head and tail It's a ChannelHandler, Another ChannelHandlerContext. Then look at HeadContext The code in the constructor :


final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, HeadContext.class);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
}

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, TailContext.class);
        setAddComplete();
    }

 Copy code 

AbstractChannel There is one pipeline Field , In the constructor, it will be initialized to DefaultChannelPipeline Example . The code here confirms one point : Every Channel There is one. ChannelPipeline. Then let's track DefaultChannelPipeline The initialization process , First go to DefaultChannelPipeline The constructor :

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
 Copy code 

stay DefaultChannelPipeline The constructor , First, it will be associated with Channel Save to field channel in . Then instantiate two ChannelHandlerContext: One is HeadContext example head, The other is TailContext example tail. And then head and tail Point to each other , Form a two-way linked list .

Special note : We are in the beginning diagram head and tail Not included ChannelHandler, This is because HeadContext and TailContext Inherited from AbstractChannelHandlerContext At the same time, it also realizes ChannelHandler The interface , So they have Context and Handler The dual properties of .

3.2 Pipeline Event propagation mechanism

Netty There are two kinds of communication events in :Inbound Events and Outbound event . The following is from Netty Description of these two events on the official website , Look at the picture again :

image.png

As can be seen from the above figure ,inbound Events and outbound The flow of events is different :

  • inbound The popularity of events is from the bottom up , also inbound Is passed by calling the corresponding ChannelHandlerContext.fireIN_EVT() Method

    • for example :ChannelHandlerContext Of fireChannelRegistered() The call will send a ChannelRegistered Of inbound To the next ChannelHandlerContext
  • outbound Just the opposite , It's from top to bottom ., and outbound Method is passed by calling ChannelHandlerContext.OUT_EVT() Method .

    • for example :ChannelHandlerContext Of bind() When the method is called, a bind Of outbound Event to the next ChannelHandlerContext.

3.2.1 Outbound Event communication

outbound Similar to active triggering ( The event that initiated the request );

Outbound Events are requests (request event), That is to ask for something to happen , And then through Outbound Notice of the incident .Outbound The event The direction of communication is tail -> customContext -> head.

ChannelOutboundHandler There are methods :

public interface ChannelOutboundHandler extends ChannelHandler {
 void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
 void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
 void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
 void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
 void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
 void read(ChannelHandlerContext ctx) throws Exception;
 void flush(ChannelHandlerContext ctx) throws Exception;
}
 Copy code 

3.2.2 Connection event Case on

We're going to start with connect Event as an example , Look at the Outbound Event propagation mechanism .

First , When the user calls Bootstrap Of connect() When the method is used , Will trigger a Connect Request event , This call will trigger such as calling the chain :

image.png

Continue tracking , We found out AbstractChannel Of connect() In fact, I called DefaultChannelPipeline Of connect() Method :

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return tail.connect(remoteAddress, localAddress);
}
 Copy code 

You can see , When outbound event ( Here is connect event ) Pass on to Pipeline after , It's actually based on tail Start spreading as a starting point . and tail.connect() It's actually calling theta AbstractChannelHandlerContext Of connect() Method :

@Override
public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");

    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}
 Copy code 

findContextOutbound(MASK_CONNECT) Method as the name suggests , Its function is based on the current Context As a starting point , towards Pipeline Medium Context The front end of the two-way linked list looks for the first Association ChannelOutboundHandler Of Context, Then return . findContextOutbound(MASK_CONNECT) The method code is implemented as follows :

private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.prev;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
    return ctx;
}
 Copy code 

When we found one outbound Of Context after , I'm going to call its invokeConnect() Method , This method calls Context Its associated ChannelHandler Of connect() Method :

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
 if (invokeHandler()) {
     try {
         ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);

     } catch (Throwable t) {
       notifyOutboundHandlerException(t, promise);
     }
  } else {
       connect(remoteAddress, localAddress, promise);
  }
}
 Copy code 

If the user does not rewrite ChannelHandler Of connect() Method , Then call ChannelOutboundHandlerAdapter Of connect() Realization :

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {

 ctx.connect(remoteAddress, localAddress, promise);
}
 Copy code 

We see ,ChannelOutboundHandlerAdapter Of connect() Just call ctx.connect(), And this call goes back to :Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect In such a cycle , until connect The event passed on to DefaultChannelPipeline The head node of the bidirectional linked list , namely head in . Why is it passed to head What about China? ? Think about it ,head Realized ChannelOutboundHandler. because head Itself is a ChannelHandlerContext, It's realized again ChannelOutboundHandler Interface , So when connect() The message is delivered to head after , The message will be forwarded to the corresponding ChannelHandler In dealing with , and head Of handler() Method returns head In itself :

So the final connect() Event is in head Processed in .head Of connect() The event processing logic is as follows :

public void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) throws Exception {
 // Finally call here 
   unsafe.connect(remoteAddress, localAddress, promise);
}
 Copy code 

Come here , Whole connect() The request event is over . The whole... Is depicted in the figure below connect() Processing of request events :

image.png

3.2.3 Inbound Event communication

inbound Similar to event callback ( Event in response to the request ),

Inbound Its characteristic is that its propagation direction is head -> customContext -> tail.

ChannelInboundHandler There are methods :

public interface ChannelInboundHandler extends ChannelHandler {
 void channelRegistered(ChannelHandlerContext ctx) throws Exception;
 void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
 void channelActive(ChannelHandlerContext ctx) throws Exception;
 void channelInactive(ChannelHandlerContext ctx) throws Exception;
 void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
 void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
 void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
 void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
 void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
 Copy code 

3.2.4 Use fireXXX Method continues to propagate the event

Be careful , If we catch an event , And want to pass on this event , So you need to call Context Corresponding propagation method fireXXX.

As the following example code :MyInboundHandler Received a channelActive event , It's processed , If you want the event to continue to propagate, you need to call ctx.fireChannelActive() Method

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
     System.out.println(" Successful connection ");
     ctx.fireChannelActive();
 }
}
 Copy code 

3.2.5 Activate the event Case on

Inbound Events and Outbound The processing of events is similar , It's just that the direction of transmission is different . Inbound An event is a notification event , That is, something has happened , And then through Inbound Notice of the incident .Inbound It usually happens in Channel A change in the state of or IO Event ready .

Inbound Its characteristic is that its propagation direction is head -> customContext -> tail.

Above we analyzed connect() This Outbound event , Then go on to analyze connect() What happens after the event Inbound event , And finally find Outbound and Inbound The connection between events . When connect() This Outbound Spread to unsafe after , In fact AbstractNioUnsafe Of connect() Method :

public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

     if (doConnect(remoteAddress, localAddress)) {
           fulfillConnectPromise(promise, wasActive);
     } else {
         ...
     } 
}
 Copy code 

stay AbstractNioUnsafe Of connect() In the method , First call doConnect() The actual method of Socket Connect , When connected, it will call fulfillConnectPromise() Method :

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
     if (!wasActive && active) {
           pipeline().fireChannelActive();
     }
}
 Copy code 

We see , stay fulfillConnectPromise() in , By calling pipeline().fireChannelActive() Method to activate the message of the channel ( namely Socket Successful connection ) Send out . And here , When calling pipeline.fireXXX after , Namely Inbound Starting point of event . So when you call pipeline().fireChannelActive() after , And there's a ChannelActive Inbound event , Let's start here and look at this Inbound How events spread ?

public final ChannelPipeline fireChannelActive() {
     AbstractChannelHandlerContext.invokeChannelActive(head);
     return this;
}
 Copy code 

Sure enough , stay fireChannelActive() In the method , It's called head.invokeChannelActive(), So it can be proved that Inbound The incident happened in Pipeline The starting point of transmission in is head. that , stay head.invokeChannelActive() And what did you do in it ?

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         next.invokeChannelActive();
     } else {
         executor.execute(new Runnable() {
          @Override
          public void run() {
               next.invokeChannelActive();
         }
         });
     }
}
 Copy code 

The above code should be familiar . Think back in Outbound event ( for example connect() event ) During the transmission of , We have similar operations :

  • 1、 First call findContextInbound(), from Pipeline Find the first one in the two-way linked list Inbind The event Context, Then return it to .

  • 2、 call Context Of invokeChannelActive() Method ,invokeChannelActive() The source code of the method is as follows :

    private void invokeChannelActive() {
       if (invokeHandler()) {
           try {
                 ((ChannelInboundHandler) handler()).channelActive(this);
           } catch (Throwable t) {
                 notifyHandlerException(t);
           }
       } else {
           fireChannelActive();
       }
    }
     Copy code 

This method and Outbound The corresponding method of ( Such as :invokeConnect() Method ) cut from the same cloth . And Outbound equally , If the user does not rewrite channelActive() Method , That would call ChannelInboundHandlerAdapter Of channelActive() Method :

similarly , stay ChannelInboundHandlerAdapter Of channelActive() in , Just call ctx.fireChannelActive() Method , So it will enter Context.fireChannelActive() -> Connect.findContextInbound() ->nextContext.invokeChannelActive() ->nextHandler.channelActive() -> nextContext.fireChannelActive() In such a cycle . Empathy ,tail Itself has realized ChannelInboundHandler Interface , It's realized again ChannelHandlerContext Interface , So when channelActive() The message is delivered to tail after , The message will be forwarded to the corresponding ChannelHandler In dealing with , and tail Of handler() Back to you tail In itself :

TailContext Of channelActive() The method is empty . If you check by yourself TailContext Of Inbound When dealing with the method, you will find , Their implementation is empty . so , If it is Inbound, When the user does not implement a custom processor , Then the default is not to process . The following figure describes Inbound The transmission process of events :

image.png

3.3 Event summary

3.3.1 Outbound Event summary

1、Outbound An event is a request event ( from connect() Make a request , And finally by unsafe Process this request ).

2、Outbound The initiator of the incident was Channel.

3、Outbound The handler of the incident is unsafe.

4、Outbound The incident happened in Pipeline Medium The direction of transmission is tail -> head, Here it means ContextHandler

5、 stay ChannelHandler When handling events in , If this Handler Not the last one Handler, You need to call ctx Methods ( Such as : ctx.connect() Method ) Continue to spread this event . If you don't do that , Then the propagation of this event will be terminated in advance .

6、Outbound Flow of events :Context.OUT_EVT() -> Connect.findContextOutbound() -> nextContext.invokeOUT_EVT()-> nextHandler.OUT_EVT() -> nextContext.OUT_EVT()

3.3.2 Inbound Event summary

1、Inbound An event is a notification event , When something is ready , Inform the top .

2、Inbound The originator of the incident was unsafe.

3、Inbound The handler of the incident is Channel, If the user does not implement a custom processing method , that Inbound The default handler of the event is TailContext, And its processing method is empty implementation .

4、Inbound The incident happened in Pipeline The transmission direction in is head -> tail.

5、 stay ChannelHandler When handling events in , If this Handler Not the last one Handler, You need to call ctx.fireIN_EVT() things Pieces of ( Such as :ctx.fireChannelActive() Method ) Continue to spread this event . If you don't do that , Then the propagation of this event will be terminated in advance .

6、Outbound Flow of events :Context.fireIN_EVT() -> Connect.findContextInbound() -> nextContext.invokeIN_EVT() ->nextHandler.IN_EVT() -> nextContext.fireIN_EVT().outbound and inbound Events are very similar in design , also Context And Handler The direct calling relationship is also easy to confuse , So when we read the source code here , Need special attention .

4. ChannelHandlerContext Interface

ChannelHandlerContext  On behalf of  ChanelHandler  and  ChannelPipeline  Relationship between , Whenever there is  ChanelHandler  Add to  ChannelPipeline  in , Will create  ChannelHandlerContext.

ChannelHandlerContext  Its main function is to manage its associated  ChannelPipeline  And the same  ChannelPipeline  In the other  ChanelHandler  Interaction between .

ChannelHandlerContext  There are many ways , Some of these methods also exist in  Channel  and  ChannelPipeline  On , But there is an important difference .

If the  Channel  and  ChannelPipeline  These methods on will follow  ChannelPipeline  To spread ( Start from the beginning or the end ).
And the call is located at  ChannelHandlerContext  Same method on , From the currently associated  ChannelHandler  Start , And it's only going to spread to people in that area  ChannelPipeline  The next in the to handle the event  ChannelHandler.

This can reduce  ChannelHandler  Call overhead of .

4.1 Use ChannelHandlerContext

image.png

The picture above shows Channel ChannelPipeline ChannelHandler as well as ChannelHandlerContext The relationship between .

Reference documents

Netty Learning and source code analysis github Address
Netty From getting started to mastering video tutorials (B standing )
Netty Authoritative guide The second edition
ChannelPipeline and ChannelHandler
Netty And ChannelPipeline and ChannelHandler

版权声明
本文为[hsfxuebao]所创,转载请带上原文链接,感谢
https://cdmana.com/2022/134/202205141227436267.html

Scroll to Top