首页 > 滚动 > > 内容页

【Netty源码分析】03 客户端接入流程 要闻速递

发表时间:2023-03-28 08:21:56 来源:腾讯云

Netty服务端启动完成,这时候客户端连接就可以接入进来了,下面我们就来分析下客户端连接接入的流程。

之前分析过NioEventLoop线程启动方法是startThread(),由于这个方法里面的逻辑比较复杂,并没有展开,这一节就是从这个方法开始分析。


(资料图片)

startThread

private void startThread() {    if (state == ST_NOT_STARTED) {        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {            try {                doStartThread();            } catch (Throwable cause) {                STATE_UPDATER.set(this, ST_NOT_STARTED);                PlatformDependent.throwException(cause);            }        }    }}

这个方法主要主要完成2件事:

利用casNioEventLoop的状态由ST_NOT_STARTED修改成ST_STARTED,即表示NioEventLoop线程启动;执行doStartThread()方法;

doStartThread()方法看着比较复杂,核心逻辑如下,向线程池执行器executor提交一个任务,而这个线程池执行器类型是ThreadPerTaskExecutor,即每次执行任务都会创建一个新线程,而且这个任务是无限循环的:事件轮询selector.select()、事件处理processSelectedKeys()和任务队列处理runAllTasks(),这样NioEventLoop就和具体的Thread线程进行了关联:

private void doStartThread() {    assert thread == null;    //executor线程执行器,类型是:ThreadPerTaskExecutor,即每次执行任务都会创建一个新线程    executor.execute(new Runnable() {        @Override        public void run() {            //将executor线程执行器创建的线程:FastThreadLocalThread保存到EventLoop的全局变量中,相当于thread和EventLoop的绑定            thread = Thread.currentThread();            if (interrupted) {                thread.interrupt();            }            boolean success = false;            updateLastExecutionTime();            try {                //然后调用EventLoop中的run方法进行启动                SingleThreadEventExecutor.this.run();                success = true;            } catch (Throwable t) {                logger.warn("Unexpected exception from an event executor: ", t);            }        }    });}

该方法大致完成2件事:

thread = Thread.currentThread();:将executor线程池分配的线程保存起来,这样就完成了NioEventLoopThread线程的关联;SingleThreadEventExecutor.this.run():具体实现在NioEventLoop.run()方法,所以,startThread()核心就是分配一个线程运行NioEventLoop.run()方法。
protected void run() {    for (;;) {        try {            try {                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                case SelectStrategy.CONTINUE:// 默认实现下,不存在这个情况                    continue;                case SelectStrategy.BUSY_WAIT:                case SelectStrategy.SELECT:                    //selector.select轮询io事件                    select(wakenUp.getAndSet(false));                    if (wakenUp.get()) {                        selector.wakeup();                    }                default:                }            } catch (IOException e) {                rebuildSelector0();                handleLoopException(e);                continue;            }            cancelledKeys = 0;            needsToSelectAgain = false;            final int ioRatio = this.ioRatio;            if (ioRatio == 100) {               try {                    // 处理 Channel 感兴趣的就绪 IO 事件                    processSelectedKeys();                } finally {                    // 运行所有普通任务和定时任务,不限制时间                    runAllTasks();                }            } else {                final long ioStartTime = System.nanoTime();                try {                    // 处理IO事件                    processSelectedKeys();                } finally {                    // 运行所有普通任务和定时任务,限制时间                    final long ioTime = System.nanoTime() - ioStartTime;                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            }        } catch (Throwable t) {            handleLoopException(t);        }        //  EventLoop 优雅关闭        try {            if (isShuttingDown()) {                closeAll();                if (confirmShutdown()) {                    return;                }            }        } catch (Throwable t) {            handleLoopException(t);        }    }}

该方法主要完成三件事:

select(wakenUp.getAndSet(false)):主要执行selector.select()方法进行事件轮询processSelectedKeys():如果轮询到事件,会在这里进行处理runAllTasks():处理任务队列和定时任务队列中的任务

下面我们就分别来分析下这三个方法。

select

private void select(boolean oldWakenUp) throws IOException {    Selector selector = this.selector;    try {        int selectCnt = 0;//计数器置0        long currentTimeNanos = System.nanoTime();        /**         * selectDeadLineNanos是select()方法运行的截止时间         *         * currentTimeNanos:可以看成当前时间         * delayNanos(currentTimeNanos):获取间隔时间,这里分为两种情况:         * 1、netty里面定时任务队列scheduledTaskQueue是按照延迟时间从小到大进行排序,如果定时任务队列中有任务,         * 则只需要获取到第一个任务的启动时间 - 当前时间 = select()方法可以运行的时间间隔,即:select()方法要在第一个定时任务执行之前退出,这样才能去执行定时任务         * 2、如果定时任务队列没有任务,则delayNanos(currentTimeNanos)返回1秒对应的时间间隔         *         */        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);        for (;;) {            //计算超时时间            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;            /**             * timeoutMillis <= 0表示当前已经超时了,不能继续向下执行select()方法了,需要立即退出select方法,在退出前还有个判断:selectCnt == 0             * selectCnt == 0表示第一次进入循环,则执行下Selector.selectNow()检出准备好的网络IO事件,该方法不会阻塞,             */            if (timeoutMillis <= 0) {                if (selectCnt == 0) {                    selector.selectNow();                    selectCnt = 1;                }                break;            }            /**             * 如果没有超时,但是通过hasTasks()判断到taskQueue任务队列中有需要执行的任务,这时也需要退出select()方法             * 1、利用cas将wakeUp值由false变成true,wakeUp=true表示线程处于唤醒状态,可以执行任务,进入select()方法前会把wakeUp设置成false             * 表示线程处于select()方法阻塞中,不能处理任务队列中的任务,这时只要处理Selector.select()             * 2、退出前执行一次:selector.selectNow()             */            if (hasTasks() && wakenUp.compareAndSet(false, true)) {                //有任务,进行一次非阻塞式的select                selector.selectNow();                selectCnt = 1;                break;            }            //调用select方法,阻塞时间为上面算出的最近一个将要超时的定时任务时间            /**             * 未超时,任务队列中也没有需要执行的任务,这时就可以放心的执行Selector.select()方法了,这里带上之前计算出的超时时间             * 如果之前计算时存在定时任务,则保证在第一个定时任务启动前唤醒即可,没有定时任务则默认超时1秒             */            int selectedKeys = selector.select(timeoutMillis);            //轮询次数+1            selectCnt ++;            /**             * 发生如下几种情况,select()方法都需要退出:             * 1、selectedKeys != 0:表示轮询到IO事件             * 2、oldWakenUp:这个是入参,值为false,是在select()方法中控制是否需要退出,默认是没有使用到的,没有意义             * 3、wakenUp.get():进入select()方法之前,wakeUp被设置成false,如果这里为true,表示已有外部线程对线程进行唤醒操作,             *      一般就是addTask()添加新任务时会触发唤醒,然后及时去执行taskQueue中的任务             * 4、hasTasks() || hasScheduledTasks():判断任务队列和定时任务队列是否有任务需要执行             */            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                break;            }            //线程中断响应:如果线程被中断,计数器置1,break退出for循环,则退出select()检测            if (Thread.interrupted()) {                if (logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely because " +                            "Thread.currentThread().interrupt() was called. Use " +                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");                }                selectCnt = 1;                break;            }            /**             * 正常情况下:time >= currentTimeNanos + TimeUnit.MILLISECONDS.toNanos(timeoutMillis)             * 但是,jdk nio中存在一个bug,selector.select(timeoutMillis)在没有IO事件触发时并不会等待超时而是立即返回,造成空轮询             *             * 下面就是Netty解决空轮询问题             * 1、if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)             *      表示selector.select(timeoutMillis)经过超时后才被唤醒,属于正常情况,把selectCnt重置成1             * 2、如果不是,表示可能发生空轮询selectCnt不会被重置成1,for循环一次selectCnt就会被累加1次;             * 3、等到 selectCnt > 门限值,默认是512,可以通过io.netty.selectorAutoRebuildThreshold参数设置,             *      则判断真正发生了nio空循环bug,则重建Selector替换掉当前这个出问题的Selector             */            long time = System.nanoTime();            //判断执行了一次阻塞式select后,当前时间和开始时间是否大于超时时间。(大于是很正常的,小于的话,说明没有执行发生了空轮询)            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                selectCnt = 1;             } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                selector = selectRebuildSelector(selectCnt);                selectCnt = 1;                break;            }            currentTimeNanos = time;        }    } catch (CancelledKeyException e) {        if (logger.isDebugEnabled()) {            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",                        selector, e);        }    }}

select()方法代码看着很复杂,其核心思想理解再来分析就比较简单的。select()主要是用来执行selector.select()IO事件进行轮询,作为server,这里就是轮询OP_ACCEPT事件,看是否有客户端接入进来。但是NioEventLoop是单线程处理模式,不可能让线程一直处理selector.select(),还有轮询到的事件以及任务队列中任务等等都需要使用这个线程进行处理,所以,上面一大堆代码都是用来判断什么时候退出select()方法的,总结下退出逻辑主要分为如下几种情况:

在执行selector.select()方法之前,计算出一个超时时间,超时时间默认是1秒,如果定时任务队列有任务,则取出第一个任务(按顺序存放),保证在该定时任务执行之前退出select()方法即可;如果超时就退出,退出前判断是否是第一次进入for循环,如果是在退出之前调用一次无阻塞的selector.selectNow()轮询下判断任务队列taskQueue中是否有任务,如果有则将wakenUp利用cas设置成true,执行下无阻塞的selector.selectNow()轮询后退出select()方法如果上面情况都不存在,开始执行阻塞selector.select(timeoutMillis)轮询,并将之前计算的超时时间带上;selector.select(timeoutMillis)执行完成后,继续判断是否需要退出select()方法,发生如下任一情况则要退出:轮询到IO事件,则需要退出select()方法去处理事件外部线程对对线程执行过唤醒操作,比如addTask()等操作需要唤醒线程执行队列任务,才能及时去执行taskQueue中的任务:进入select()方法之前,wakeUp被设置成false,如果这里为true,表示已有外部线程对线程进行唤醒操作任务队列taskQueue或定时任务队列scheduledTaskQueue中有需要处理的任务,这时需要退出select()方法,转去执行任务

processSelectedKeys

private void processSelectedKeys() {    //selectedKeys != null表示已对Selector进行优化过,替换掉Selector内部的selectedKeys,正常情况下进入这个流程    if (selectedKeys != null) {        processSelectedKeysOptimized();    } else {        processSelectedKeysPlain(selector.selectedKeys());    } }

processSelectedKeys()主要是对selector.select()方法轮询到的事件进行处理,作为server,如果轮询到OP_ACCEPT,就表示有客户端接入进来了,那我们就跟踪下这个方法,看接入进来的客户端处理流程。

Netty是对Selector进行了优化,将selectedKeysSet实现替换成了数组实现,提升性能,所以,这里一般走的是processSelectedKeysOptimized()这个流程:

private void processSelectedKeysOptimized() {    for (int i = 0; i < selectedKeys.size; ++i) {        final SelectionKey k = selectedKeys.keys[i];        selectedKeys.keys[i] = null;        //k.attachment()获取到的就是NioServerSocketChannel        final Object a = k.attachment();        if (a instanceof AbstractNioChannel) {//一般是走这个分支流程            processSelectedKey(k, (AbstractNioChannel) a);        } else {            @SuppressWarnings("unchecked")            NioTask task = (NioTask) a;            processSelectedKey(k, task);        }        if (needsToSelectAgain) {            selectedKeys.reset(i + 1);            selectAgain();            i = -1;        }    }}

这里关键一点是Object a = k.attachment();,之前分析过向selector注册时把NioServerSocketChannel作为attachment添加进去,所以,这里取出来的就是NioServerSocketChannel对象。processSelectedKey()方法通过if判断事件类型进行处理,server端这里肯定是OP_ACCEPT

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}

具体的处理逻辑交由Unsafe对象进行处理:

public void read() {    assert eventLoop().inEventLoop();    final ChannelConfig config = config();    final ChannelPipeline pipeline = pipeline();    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    allocHandle.reset(config);    boolean closed = false;    Throwable exception = null;    try {        try {            do {                //doReadMessages()读出来一个客户端连接的Channel                int localRead = doReadMessages(readBuf);                if (localRead == 0) {                    break;                }                if (localRead < 0) {                    closed = true;                    break;                }                allocHandle.incMessagesRead(localRead);            } while (allocHandle.continueReading());        } catch (Throwable t) {            exception = t;        }        int size = readBuf.size();        for (int i = 0; i < size; i ++) {            readPending = false;            pipeline.fireChannelRead(readBuf.get(i));        }        readBuf.clear();        allocHandle.readComplete();        pipeline.fireChannelReadComplete();        if (exception != null) {            closed = closeOnReadError(exception);            pipeline.fireExceptionCaught(exception);        }        if (closed) {            inputShutdown = true;            if (isOpen()) {                close(voidPromise());            }        }    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

这个类主要完成2件事:

doReadMessages(readBuf):调用serverSocketChannel.accept()接收到客户端连接socketChannel,并封装成Netty中类型:NioSocketChannel,然后放入到readBuf集合中;pipeline.fireChannelRead(readBuf.get(i));:将读入的客户端连接作为参数,即NioSocketChannel对象,通过pipeline触发channelRead事件进行handler间传播,注意这里的pipelineNioServerSocketChannel中的,即server端的。最终会进入到ServerBootstrapAcceptor#channelRead方法中进行处理。
public void channelRead(ChannelHandlerContext ctx, Object msg) {    final Channel child = (Channel) msg;    child.pipeline().addLast(childHandler);    setChannelOptions(child, childOptions, logger);    for (Entry, Object> e: childAttrs) {        child.attr((AttributeKey) e.getKey()).set(e.getValue());    }    try {        //childGroup.register(child):会给客户端连接进来的Channel从线程池中获取一个EventLoop绑定给Channel        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

这个方法主要完成3件事:

child.pipeline().addLast(childHandler):向NioSocketChannel中添加ServerBootstrap.childHandler(new TestServerInitializer()),后面通过触发handlerAdded()时回调initChannel()实现向pipeline添加handler;设置optionattr信息;childGroup.register(child):将客户端连接NioSocketChannel注册到NioEventLoop实例上,基本和之前分析NioServerSocketChannel注册逻辑一致,这个过程中会触发三个事件:handlerAddedchannelRegisteredchannelActive,之前NioServerSocketChannel注册时只能触发前两个,绑定端口后才能触发第三个事件,客户端连接不存在端口绑定问题,所以这里会直接触发channelActive。和NioServerSocketChannel一样,真正向selector注册感兴趣事件就是在channelActive触发这里:
public void channelActive(ChannelHandlerContext ctx) {    //触发channelActive事件传播    ctx.fireChannelActive();    //向selector注册真正关注的事件    readIfIsAutoRead();}

channelActive和之前分析NioServerSocketChannel的处理逻辑一致,就不再分析。

总结

分析到这里,基本搞清楚了客户端接入的处理流程,现在再次总结下:

NioServerSocketChannel绑定的NioEventLoop不停轮询OP_ACCEPT,触发后通过调用java api获取到ServerSocket,然后包装成NioSocketChannel;然后触发channelRead事件传播,然后会进入server pipeline中非常重要的一个handlerServerBootstrapAcceptor,连接处理器专门处理客户端连接;在ServerBootstrapAcceptor#channelRead()方法中,完成NioSocketChannel的设置:optionattrhandler添加等;最重要的是将channel注册到NioEventLoop上,注册过程中会触发三种事件:handlerAddedchannelRegisteredchannelActive,和之前分析server channel注册过程一样,最终在channelActive这里向selector注册真正感兴趣IO事件,整个流程全部完成。

Copyright ©  2015-2022 亚洲产业网版权所有  备案号:豫ICP备20022870号-9   联系邮箱:553 138 779@qq.com