xnio源码解析之启动流程

Posted by Brade on December 5, 2021

XNIO 启动流程

本文中 XNIO 使用的版本为 3.8.4.Final.

1 什么是 XNIO

JBOSS 开源的一个以 NIO 思想为基础的异步 IO 框架. 与 netty 类似,只是使用 ChannelListener 进行事件通知 .

同时 XNIO 继承重用了JDK NIOByteBuffer类; XNIO 通过封装 NIOFileChannel 中的方法 transferTotransferFrom 来实现 zero-copy(零拷贝).

XNIOapi 和默认实现 nio-imp 组成. 也可以在其 GitHub 上找到并使用其它实现.

2 创建 XNIO 服务器及客户端

2.1 创建服务端

public class XNioServer {

    private final int SERVER_PORT = 8080;

    public static void main(String[] args) throws IOException {
        XNioServer server = new XNioServer();
        server.createXnioServer();
    }

    public void createXnioServer() throws IOException {
        int readThreads = (int) Math.round(Math.random() * 10);
        if (readThreads == 0) {
            readThreads = 1;
        }
        int writeThreads = (int) Math.round(Math.random() * 10);
        if (writeThreads == 0) {
            writeThreads = 1;
        }
        Xnio xnio = Xnio.getInstance("nio", XNioServerTest.class.getClassLoader());
        OptionMap options = OptionMap.create(Options.WORKER_WRITE_THREADS, writeThreads, Options.WORKER_READ_THREADS, readThreads);
        XnioWorker worker = xnio.createWorker(options);
        SocketAddress bindAddress = new InetSocketAddress(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), SERVER_PORT);
        ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = channel -> {
            try {
                final StreamConnection connection = channel.accept();
                System.out.println("server connection ===>: " + connection.isOpen());
                ByteBuffer inboundBuf = ByteBuffer.allocate(512);
                Charset charset = Charset.forName(charset);
                final ConduitStreamSourceChannel sourceChannel = connection.getSourceChannel();
                final ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel();
                connection.getSourceChannel().setReadListener(c -> {
                    try {
                        if (c.read(inboundBuf) == -1) {
                            inboundBuf.flip();
                            System.out.print("server recive ===>: " + charset.decode(inboundBuf));
                            sinkChannel.write(ByteBuffer.wrap("create Server,hello world!".getBytes("UTF-8")));
                            sinkChannel.flush();
                            sinkChannel.shutdownWrites();
                        } else {
                            c.resumeReads();
                        }
                    } catch (IOException e) {
                        IoUtils.safeClose(c);
                    }
                });
                sourceChannel.resumeReads();
            } catch (IOException e) {
                IoUtils.safeClose(channel);
            }
        };
        AcceptingChannel<StreamConnection> server = worker.createStreamConnectionServer(bindAddress, acceptListener, options);
        server.resumeAccepts();
        System.out.println("server is open ===>: " + server.isOpen());

    }

}

2.2 创建客户端

public class XNioClient {
    public static void main(String[] args) throws Exception {
        new XNioClient().createXnioClient();
    }

    public void createXnioClient() throws Exception {
        final Charset charset = Charset.forName("utf-8");
        //创建Xnio实例,并构造XnioWorker
        final Xnio xnio = Xnio.getInstance();
        final XnioWorker worker = xnio.createWorker(OptionMap.EMPTY);
        final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8080);
        final IoFuture<StreamConnection> streamConnectionIoFuture = worker
                .openStreamConnection(inetSocketAddress, null, OptionMap.EMPTY);
        StreamConnection connection = streamConnectionIoFuture.get();
        System.out.println("connection.isOpen ===>: " + connection.isOpen());
        ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel();
        sinkChannel.write(ByteBuffer.wrap("Hello world! this client send. \n".getBytes(charset)));
        sinkChannel.flush();
        sinkChannel.shutdownWrites();
        ConduitStreamSourceChannel sourceChannel = connection.getSourceChannel();
        ByteBuffer inboundBuf = ByteBuffer.allocate(512);
        sourceChannel.setReadListener(channel -> {
            try {
                if (channel.read(inboundBuf) == -1) {
                    inboundBuf.flip();
                    System.out.println("client recive: ==>" + charset.decode(inboundBuf));
                    channel.shutdownReads();
                } else {
                    channel.resumeReads();
                }
            } catch (IOException e) {
                IoUtils.safeClose(channel);
            }

        });
        sourceChannel.resumeReads();
    }
}

3 启动流程解析

主要分析服务端的流程,客户端略.

NioXnioWorker -> QueuedNioTcpServer2 -> NioTcpServer -> NioSocketStreamConnection -> NioSocketConduit

3.1 Xnio 实例化

通过 XniogetInstance方法,最终采用的是JDKSPI技术,找到对应的接口XnioProvider,服务提供者实现.

    public static Xnio getInstance(String provider, final ClassLoader classLoader) {
        return doGetInstance(provider, (ServiceLoader)AccessController.doPrivileged(new PrivilegedAction<ServiceLoader<XnioProvider>>() {
            public ServiceLoader<XnioProvider> run() {
                return ServiceLoader.load(XnioProvider.class, classLoader);
            }
        }));
    }

3.1.1 XnioProvider

找到默认实现nio-impl包中META-INF/services目录下的org.xnio.XnioProvider文件,里面对应的接口是org.xnio.nio.NioXnioProvider.

3.1.2 NioXnioProvider

此实现类中只是简单的提供了一个获取实例的方法,对应的实例是NioXnio.

public final class NioXnioProvider implements XnioProvider {
    private static final Xnio INSTANCE = new NioXnio();

    public NioXnioProvider() {
    }

    public Xnio getInstance() {
        return INSTANCE;
    }

    public String getName() {
        return INSTANCE.getName();
    }
}

3.1.3 NioXnio

构造器中的逻辑是根据操作环境和jdk版本获取对应的选择器创建者SelectorCreator. 如果使用的window环境,一般都是使用的WindowsSelectorProvider实现.

3.2 XnioWorker 实例化

通过调用 Xnio#createWorker方法,再调用 XnioWorker.Builder#build方法,最后调用NioXnio#build方法实现

    public XnioWorker createWorker(OptionMap optionMap) throws IOException, IllegalArgumentException {
        return this.createWorker((ThreadGroup)null, optionMap);
    }
    public XnioWorker createWorker(ThreadGroup threadGroup, OptionMap optionMap, Runnable terminationTask) throws IOException, IllegalArgumentException {
        Builder workerBuilder = this.createWorkerBuilder();
        workerBuilder.populateFromOptions(optionMap);
        workerBuilder.setThreadGroup(threadGroup);
        workerBuilder.setTerminationTask(terminationTask);
        return workerBuilder.build();
    }
    protected XnioWorker build(Builder builder) {
        NioXnioWorker worker = new NioXnioWorker(builder);
        worker.start();
        return worker;
    }

注意 Builder 类,里面给了些配置参数的默认值

     /**
     * 工作线程池构建器: 核心线程 4, 最大工作线程池数 16,工作线程 1
     */
    public static class Builder {
    private final Xnio xnio;
    private ExecutorService externalExecutorService;
    private Runnable terminationTask;
    private String workerName;
    private int coreWorkerPoolSize = 4;
    private int maxWorkerPoolSize = 16;
    private ThreadGroup threadGroup;
    private boolean daemon;
    private int workerKeepAlive = 60000;
    private int workerIoThreads = 1;
    private long workerStackSize = 0L;
    private CidrAddressTable<InetSocketAddress> bindAddressConfigurations = new CidrAddressTable();
    }    

3.2.1 NioXnioWorker

先调用父类XnioWorker的构造方法,初始化线程池;

    /**
     * 构建一个新实例,旨在仅从实现中调用
     *
     * @param builder the worker builder
     */
    protected XnioWorker(final Builder builder) {
        this.xnio = builder.xnio;
        this.terminationTask = builder.terminationTask;
        final SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkPermission(CREATE_WORKER_PERMISSION);
        }
        String workerName = builder.getWorkerName();
        // 线程名为 XNIO + 序列号
        if (workerName == null) {
            workerName = "XNIO-" + seq.getAndIncrement();
        }
        name = workerName;
        final boolean markThreadAsDaemon = builder.isDaemon();
        bindAddressTable = builder.getBindAddressConfigurations();
        final Runnable terminationTask = new Runnable() {
            public void run() {
                // 调用停止任务线程池方法,模板方法,由子类实现,默认为 NioXnioWorkder
                taskPoolTerminated();
            }
        };
        final ExecutorService executorService = builder.getExternalExecutorService();
        // 实例化任务线程池
        if (executorService != null) {
            // 增强的队列执行器任务线程池: jboss提供
            if (executorService instanceof EnhancedQueueExecutor) {
                taskPool = new ExternalTaskPool(
                        new EnhancedQueueExecutorTaskPool((EnhancedQueueExecutor) executorService));
            } else if (executorService instanceof ThreadPoolExecutor) {
                taskPool = new ExternalTaskPool(new ThreadPoolExecutorTaskPool((ThreadPoolExecutor) executorService));
            } else {
                taskPool = new ExternalTaskPool(new ExecutorServiceTaskPool(executorService));
            }
        } else if (EnhancedQueueExecutor.DISABLE_HINT) { // 禁用提示
            final int poolSize = max(builder.getMaxWorkerPoolSize(), builder.getCoreWorkerPoolSize());
            taskPool = new ThreadPoolExecutorTaskPool(new DefaultThreadPoolExecutor(
                poolSize,
                poolSize,
                builder.getWorkerKeepAlive(), TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(),
                new WorkerThreadFactory(builder.getThreadGroup(), builder.getWorkerStackSize(), markThreadAsDaemon),
                terminationTask));
        } else {
            taskPool = new EnhancedQueueExecutorTaskPool(new EnhancedQueueExecutor.Builder()
                .setCorePoolSize(builder.getCoreWorkerPoolSize())
                .setMaximumPoolSize(builder.getMaxWorkerPoolSize())
                .setKeepAliveTime(builder.getWorkerKeepAlive(), TimeUnit.MILLISECONDS)
                .setThreadFactory(new WorkerThreadFactory(builder.getThreadGroup(), builder.getWorkerStackSize(), markThreadAsDaemon))
                .setTerminationTask(terminationTask)
                .setRegisterMBean(true)
                .setMBeanName(workerName)
                .build()
            );
        }
    }

再调用构造方法进行工作线程的设置.

    NioXnioWorker(final Builder builder) {
        super(builder);
        final NioXnio xnio = (NioXnio) builder.getXnio();
        /**
         * 线程数量: 默认为 1
         */
        final int threadCount = builder.getWorkerIoThreads();
        /**
         * 工作栈大小
         */
        this.workerStackSize = builder.getWorkerStackSize();
        final String workerName = getName();
        /**
         * 工作线程数组: 根据线程数量创建
         */
        WorkerThread[] workerThreads;
        workerThreads = new WorkerThread[threadCount];
        /**
         * 线程组
          */
        final ThreadGroup threadGroup = builder.getThreadGroup();
        /**
         * 是否守护线程
         */
        final boolean markWorkerThreadAsDaemon = builder.isDaemon();
        boolean ok = false;
        try {
            // 一个工作线程 -> 一个线程选择器(属于给定的线程组,有指定的工作栈大小)
            for (int i = 0; i < threadCount; i++) {
                final Selector threadSelector;
                try {
                    // 线程选择器:主选择器创建器,windows 为 WindowsSelectorProvider 所创建
                    threadSelector = xnio.mainSelectorCreator.open();
                } catch (IOException e) {
                    throw Log.log.unexpectedSelectorOpenProblem(e);
                }
                // 创建工作线程并指定属于哪个线程组,指定栈大小
                final WorkerThread workerThread = new WorkerThread(this, threadSelector, String.format("%s I/O-%d", workerName, Integer.valueOf(i + 1)), threadGroup, workerStackSize, i);
                // Mark as daemon if the Options.THREAD_DAEMON has been set
                if (markWorkerThreadAsDaemon) {
                    workerThread.setDaemon(true);
                }
                workerThreads[i] = workerThread;
            }
            final Selector threadSelector;
            try {
                threadSelector = xnio.mainSelectorCreator.open();
            } catch (IOException e) {
                throw Log.log.unexpectedSelectorOpenProblem(e);
            }
            // 创建接受线程:用于接受请求,只用一个线程
            acceptThread = new WorkerThread(this, threadSelector, String.format("%s Accept", workerName), threadGroup, workerStackSize, threadCount);
            if (markWorkerThreadAsDaemon) {
                acceptThread.setDaemon(true);
            }
            ok = true;
        } finally {
            if (! ok) {
                for (WorkerThread worker : workerThreads) {
                    if (worker != null) safeClose(worker.getSelector());
                }
            }
        }
        this.workerThreads = workerThreads;
        // 工作线程技术指标
        this.metrics = new NioWorkerMetrics(workerName);
        metrics.register();
    }

调用线程启动worker.start()方法 .

    /**
     * 启动工作线程
     */
    void start() {
        for (WorkerThread worker : workerThreads) {
            openResourceUnconditionally();
            worker.start();
        }
        openResourceUnconditionally();
        acceptThread.start();
    }

3.2.2 WorkerThread

工作线程启动方法.

    /**
     * 从选择器获取连接及注册兴趣事件
     */
    public void run() {
        final Selector selector = this.selector;
        try {
            log.tracef("Starting worker thread %s", this);
            final Object lock = workLock;
            final Queue<Runnable> workQueue = selectorWorkQueue;
            final TreeSet<TimeKey> delayQueue = delayWorkQueue;
            log.debugf("Started channel thread '%s', selector %s", currentThread().getName(), selector);
            Runnable task;
            Iterator<TimeKey> iterator;
            long delayTime = Long.MAX_VALUE;
            Set<SelectionKey> selectedKeys;
            SelectionKey[] keys = new SelectionKey[16];
            int oldState;
            int keyCount;
            // 列循环执行线程任务
            for (;;) {
                // Run all tasks
                do {
                    synchronized (lock) {
                        // 从工作队列弹出工作线程
                        task = workQueue.poll();
                        if (task == null) {
                            iterator = delayQueue.iterator();
                            delayTime = Long.MAX_VALUE;
                            // 延期队列迭代器有值
                            if (iterator.hasNext()) {
                                final long now = nanoTime();
                                do {
                                    // 时间键
                                    final TimeKey key = iterator.next();
                                    // 截止时间 <= 当前时间 - 开始时间 ,即已超时
                                    if (key.deadline <= (now - START_TIME)) {
                                        // 将线程加到工作队列
                                        workQueue.add(key.command);
                                        // 延迟队列移除时间键
                                        iterator.remove();
                                    } else {
                                        // 延期时间(超时剩余时间) = 截止时间 - (当前时间 - 开始时间)
                                        delayTime = key.deadline - (now - START_TIME);
                                        // the rest are in the future
                                        break;
                                    }
                                } while (iterator.hasNext());
                            }
                            // 再次从工作队列弹出工作线程
                            task = workQueue.poll();
                        }
                    }
                    // clear interrupt status
                    Thread.interrupted();
                    // 执行工作线程
                    safeRun(task);
                } while (task != null);
                // all tasks have been run
                oldState = state;
                // 线程状态:未停止
                if ((oldState & SHUTDOWN) != 0) {
                    synchronized (lock) {
                        // 选择键数量
                        keyCount = selector.keys().size();
                        state = keyCount | SHUTDOWN;
                        // 队列线程执行完毕
                        if (keyCount == 0 && workQueue.isEmpty()) {
                            // no keys or tasks left, shut down (delay tasks are discarded)
                            return;
                        }
                    }
                    synchronized (selector) {
                        final Set<SelectionKey> keySet = selector.keys();
                        synchronized (keySet) {
                            // 清空当前选择器中的选择键
                            keys = keySet.toArray(keys);
                            Arrays.fill(keys, keySet.size(), keys.length, null);
                        }
                    }
                    // shut em down
                    // 两次遍历选择键
                    for (int i = 0; i < keys.length; i++) {
                        final SelectionKey key = keys[i];
                        // 为空则中断
                        if (key == null) break; //end of list
                        // 不为空置为空
                        keys[i] = null;
                        final NioHandle attachment = (NioHandle) key.attachment();
                        // 有处理器不为空强制关闭
                        if (attachment != null) {
                            safeClose(key.channel());
                            attachment.forceTermination();
                        }
                    }
                    // 再次从头到尾全部清空选择器中的选择键
                    Arrays.fill(keys, 0, keys.length, null);
                }
                // clear interrupt status
                Thread.interrupted();
                // perform select
                // 执行
                try {
                    // 线程状态不是停止
                    if ((oldState & SHUTDOWN) != 0) {
                        selectorLog.tracef("Beginning select on %s (shutdown in progress)", selector);
                        // 立刻返回选择器的通道,无空闲则返回0
                        selector.selectNow();
                    } else if (delayTime == Long.MAX_VALUE) {
                        selectorLog.tracef("Beginning select on %s", selector);
                        // 轮询状态置为 true
                        polling = true;
                        try {
                            Runnable item = null;
                            synchronized (lock) {
                                // 返回队列的头元素,不删除,队列为空则返回 null
                               item =  workQueue.peek();
                            }
                            if (item != null) {
                                log.tracef("SelectNow, queue is not empty");
                                selector.selectNow();
                            } else {
                                log.tracef("Select, queue is empty");
                                selector.select();
                            }
                        } finally {
                            polling = false;
                        }
                    } else {
                        final long millis = 1L + delayTime / 1000000L;
                        selectorLog.tracef("Beginning select on %s (with timeout)", selector);
                        polling = true;
                        try {
                            Runnable item = null;
                            synchronized (lock) {
                                // 返回队列的头元素,不删除,队列为空则返回 null
                               item =  workQueue.peek();
                            }
                            if (item != null) {
                                log.tracef("SelectNow, queue is not empty");
                                selector.selectNow();
                            } else {
                                log.tracef("Select, queue is empty");
                                // 延迟对应毫秒数后再返回对应选择器通道
                                selector.select(millis);
                            }
                        } finally {
                            polling = false;
                        }
                    }
                } catch (CancelledKeyException ignored) {
                    // Mac and other buggy implementations sometimes spits these out
                    selectorLog.trace("Spurious cancelled key exception");
                } catch (IOException e) {
                    selectorLog.selectionError(e);
                    // hopefully transient; should never happen
                }
                selectorLog.tracef("Selected on %s", selector);
                // iterate the ready key set
                synchronized (selector) {
                    selectedKeys = selector.selectedKeys();
                    synchronized (selectedKeys) {
                        // copy so that handlers can safely cancel keys
                        keys = selectedKeys.toArray(keys);
                        // 置空选择器键
                        Arrays.fill(keys, selectedKeys.size(), keys.length, null);
                        // 清空
                        selectedKeys.clear();
                    }
                }
                for (int i = 0; i < keys.length; i++) {
                    final SelectionKey key = keys[i];
                    if (key == null) break; //end of list
                    keys[i] = null;
                    final int ops;
                    try {
                        // 获取兴趣事件
                        ops = key.interestOps();
                        if (ops != 0) {
                            selectorLog.tracef("Selected key %s for %s", key, key.channel());
                            final NioHandle handle = (NioHandle) key.attachment();
                            // 对应处理器为空,直接取消
                            if (handle == null) {
                                cancelKey(key, false);
                            } else {
                                // clear interrupt status
                                // 测试当前线程是否被中断, 通过该方法清除线程的中断状态
                                Thread.interrupted();
                                selectorLog.tracef("Calling handleReady key %s for %s", key.readyOps(), key.channel());
                                handle.handleReady(key.readyOps());
                            }
                        }
                    } catch (CancelledKeyException ignored) {
                        selectorLog.tracef("Skipping selection of cancelled key %s", key);
                    } catch (Throwable t) {
                        selectorLog.tracef(t, "Unexpected failure of selection of key %s", key);
                    }
                }
                // all selected keys invoked; loop back to run tasks
            }
        } finally {
            log.tracef("Shutting down channel thread \"%s\"", this);
            // 关闭选择器
            safeClose(selector);
            getWorker().closeResource();
        }
    }

3.3 accept 线程监听ChannelListener 实例化

监听器中实现我们的业务,进行数据的读写操作.

3.4 创建并开启服务

调用 XnioWorker#createStreamConnectionServer,传入参数为监听地址,监听器,配置参数

AcceptingChannel<StreamConnection> server = worker.createStreamConnectionServer(bindAddress, acceptListener, options);

调用 NioXnioWorker.createTcpConnectionServer

    /**
     * 创建 tcp 连接服务器
     * @param bindAddress the address to bind to
     * @param acceptListener the initial accept listener
     * @param optionMap the initial configuration for the server
     * @return
     * @throws IOException
     */
    protected AcceptingChannel<StreamConnection> createTcpConnectionServer(final InetSocketAddress bindAddress, final ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, final OptionMap optionMap) throws IOException {
        checkShutdown();
        boolean ok = false;
        // 打开服务套接字通道: jdk nio
        final ServerSocketChannel channel = ServerSocketChannel.open();
        try {
            if (optionMap.contains(Options.RECEIVE_BUFFER)) channel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1));
            channel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, true));
            // 配置不阻塞
            channel.configureBlocking(false);
            // 如果配置了积压请求参数
            if (optionMap.contains(Options.BACKLOG)) {
                // 默认积压请求数为 128
                channel.socket().bind(bindAddress, optionMap.get(Options.BACKLOG, 128));
            } else {
                channel.socket().bind(bindAddress);
            }
            if (false) {
                final NioTcpServer server = new NioTcpServer(this, channel, optionMap, false);
                server.setAcceptListener(acceptListener);
                ok = true;
                return server;
            } else {
                // 默认走此分支: 监听存放在队列
                final QueuedNioTcpServer2 server = new QueuedNioTcpServer2(new NioTcpServer(this, channel, optionMap, true));
                // 设置接受请求的监听器
                server.setAcceptListener(acceptListener);
                ok = true;
                return server;
            }
        } finally {
            // 如果不成功,关闭服务器套接字通道
            if (! ok) {
                IoUtils.safeClose(channel);
            }
        }
    }

调用QueuedNioTcpServer2构造器

    QueuedNioTcpServer2(final NioTcpServer realServer) {
        super(realServer.getWorker());
        this.realServer = realServer;
        final NioXnioWorker worker = realServer.getWorker();
        final int cnt = worker.getIoThreadCount();
        acceptQueues = new ArrayList<>(cnt);
        // 根据 io 线程数, 采用接受监听队列
        for (int i = 0; i < cnt; i ++) {
            acceptQueues.add(new LinkedBlockingQueue<>());
        }
        realServer.getCloseSetter().set(ignored -> invokeCloseHandler());
        // NioTcpServer#accept() -> new NioSocketStreamConnection
        realServer.getAcceptSetter().set(ignored -> handleReady());
    }

调用NioTcpServer构造器.

    /**
     * 构造器
     * @param worker
     * @param channel
     * @param optionMap
     * @param useAcceptThreadOnly
     * @throws IOException
     */
    NioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap, final boolean useAcceptThreadOnly) throws IOException {
        super(worker);
        this.channel = channel;
        final WorkerThread[] threads;
        final int threadCount;
        /**
         * 令牌
         */
        final int tokens;
        final int connections;
        if (useAcceptThreadOnly) {
            threads = new WorkerThread[] { worker.getAcceptThread() };
            threadCount = 1;
            tokens = 0;
            connections = 0;
        } else {
            threads = worker.getAll();
            threadCount = threads.length;
            if (threadCount == 0) {
                throw log.noThreads();
            }
            tokens = optionMap.get(Options.BALANCING_TOKENS, -1);
            connections = optionMap.get(Options.BALANCING_CONNECTIONS, 16);
            if (tokens != -1) {
                if (tokens < 1 || tokens >= threadCount) {
                    throw log.balancingTokens();
                }
                if (connections < 1) {
                    throw log.balancingConnectionCount();
                }
                tokenConnectionCount = connections;
            }
        }
        socket = channel.socket();
        if (optionMap.contains(Options.SEND_BUFFER)) {
            final int sendBufferSize = optionMap.get(Options.SEND_BUFFER, DEFAULT_BUFFER_SIZE);
            if (sendBufferSize < 1) {
                throw log.parameterOutOfRange("sendBufferSize");
            }
            sendBufferUpdater.set(this, sendBufferSize);
        }
        if (optionMap.contains(Options.KEEP_ALIVE)) {
            keepAliveUpdater.lazySet(this, optionMap.get(Options.KEEP_ALIVE, false) ? 1 : 0);
        }
        if (optionMap.contains(Options.TCP_OOB_INLINE)) {
            oobInlineUpdater.lazySet(this, optionMap.get(Options.TCP_OOB_INLINE, false) ? 1 : 0);
        }
        if (optionMap.contains(Options.TCP_NODELAY)) {
            tcpNoDelayUpdater.lazySet(this, optionMap.get(Options.TCP_NODELAY, false) ? 1 : 0);
        }
        if (optionMap.contains(Options.READ_TIMEOUT)) {
            readTimeoutUpdater.lazySet(this, optionMap.get(Options.READ_TIMEOUT, 0));
        }
        if (optionMap.contains(Options.WRITE_TIMEOUT)) {
            writeTimeoutUpdater.lazySet(this, optionMap.get(Options.WRITE_TIMEOUT, 0));
        }
        int perThreadLow, perThreadLowRem;
        int perThreadHigh, perThreadHighRem;
        if (optionMap.contains(Options.CONNECTION_HIGH_WATER) || optionMap.contains(Options.CONNECTION_LOW_WATER)) {
            final int highWater = optionMap.get(Options.CONNECTION_HIGH_WATER, Integer.MAX_VALUE);
            final int lowWater = optionMap.get(Options.CONNECTION_LOW_WATER, highWater);
            if (highWater <= 0) {
                throw badHighWater();
            }
            if (lowWater <= 0 || lowWater > highWater) {
                throw badLowWater(highWater);
            }
            final long highLowWater = (long) highWater << CONN_HIGH_BIT | (long) lowWater << CONN_LOW_BIT;
            connectionStatusUpdater.lazySet(this, highLowWater);
            perThreadLow = lowWater / threadCount;
            perThreadLowRem = lowWater % threadCount;
            perThreadHigh = highWater / threadCount;
            perThreadHighRem = highWater % threadCount;
        } else {
            perThreadLow = Integer.MAX_VALUE;
            perThreadLowRem = 0;
            perThreadHigh = Integer.MAX_VALUE;
            perThreadHighRem = 0;
            connectionStatusUpdater.lazySet(this, CONN_LOW_MASK | CONN_HIGH_MASK);
        }
        // 设置处理器,根据线程数
        final NioTcpServerHandle[] handles = new NioTcpServerHandle[threadCount];
        for (int i = 0, length = threadCount; i < length; i++) {
            final SelectionKey key = threads[i].registerChannel(channel);
            handles[i] = new NioTcpServerHandle(this, key, threads[i], i < perThreadHighRem ? perThreadHigh + 1 : perThreadHigh, i < perThreadLowRem ? perThreadLow + 1 : perThreadLow);
            key.attach(handles[i]);
        }
        this.handles = handles;
        if (tokens > 0) {
            for (int i = 0; i < threadCount; i ++) {
                // 初始化令牌数
                handles[i].initializeTokenCount(i < tokens ? connections : 0);
            }
        }
        mbeanHandle = worker.registerServerMXBean(
                new XnioServerMXBean() {
                    public String getProviderName() {
                        return "nio";
                    }

                    public String getWorkerName() {
                        return worker.getName();
                    }

                    public String getBindAddress() {
                        return String.valueOf(getLocalAddress());
                    }

                    public int getConnectionCount() {
                        final AtomicInteger counter = new AtomicInteger();
                        final CountDownLatch latch = new CountDownLatch(handles.length);
                        for (final NioTcpServerHandle handle : handles) {
                            handle.getWorkerThread().execute(() -> {
                                counter.getAndAdd(handle.getConnectionCount());
                                latch.countDown();
                            });
                        }
                        try {
                            latch.await();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        return counter.get();
                    }

                    public int getConnectionLimitHighWater() {
                        return getHighWater(connectionStatus);
                    }

                    public int getConnectionLimitLowWater() {
                        return getLowWater(connectionStatus);
                    }
                }
        );

    }

调用AcceptingChannel<StreamConnection>#resumeAccepts 方法,最终调用的是NioTcpServer#resumeAccepts.

此方法最终调用会注册 SelectionKey#OP_ACCEP 兴趣事件到选择器上.

    public void resumeAccepts() {
        this.resumed = true;
        this.doResume(16);
    }

调用NioTcpServerHandle#resume,再调用NioHandle#resume,最终回到workerThread#setOps,入参16即为 SelectionKey#OP_ACCEPT=1 << 4.

    void resume() {
        WorkerThread thread = this.getWorkerThread();
        if (thread == Thread.currentThread()) {
            if (!this.stopped && !this.backOff && this.server.resumed) {
                super.resume(16);
            }
        } else {
            thread.execute(new Runnable() {
                public void run() {
                    NioTcpServerHandle.this.resume();
                }
            });
        }

    }
    void resume(int ops) {
        try {
            if (!Bits.allAreSet(this.selectionKey.interestOps(), ops)) {
                this.workerThread.setOps(this.selectionKey, ops);
            }
        } catch (CancelledKeyException var3) {
        }

    }

读写兴趣事件的注册: NioTcpServer#accept -> new NioSocketStreamConnection() -> SelectionKey#attach

    /**
     * 接受请求
     * @return
     * @throws ClosedChannelException
     */
    public NioSocketStreamConnection accept() throws ClosedChannelException {
        final WorkerThread current = WorkerThread.getCurrent();
        if (current == null) {
            return null;
        }
        /**
         * 服务器处理器
         */
        final NioTcpServerHandle handle;
        if (handles.length == 1) {
            handle = handles[0];
        } else {
            // 根据线程号从处理器集中获取对应处理器
            handle = handles[current.getNumber()];
        }
        if (! handle.getConnection()) {
            return null;
        }
        final SocketChannel accepted;
        boolean ok = false;
        try {
            // 接受请求通道
            accepted = channel.accept();
            if (accepted != null) try {
                // 获取 hashCode
                int hash = ThreadLocalRandom.current().nextInt();
                // 配置不阻塞
                accepted.configureBlocking(false);
                final Socket socket = accepted.socket();
                socket.setKeepAlive(keepAlive != 0);
                socket.setOOBInline(oobInline != 0);
                socket.setTcpNoDelay(tcpNoDelay != 0);
                final int sendBuffer = this.sendBuffer;
                if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
                // io 线程
                final WorkerThread ioThread = worker.getIoThread(hash);
                // 选择器键:将接受线程管道注册到选择器
                final SelectionKey selectionKey = ioThread.registerChannel(accepted);
                // 创建新连接: 流连接
                final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(ioThread, selectionKey, handle);
                newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
                newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
                ok = true;
                // 重置回退时间
                handle.resetBackOff();
                return newConnection;
            } finally {
                if (! ok) safeClose(accepted);
            }
        } catch (ClosedChannelException e) {
            throw e;
        } catch (IOException e) {
            // something went wrong with the accept
            // it could be due to running out of file descriptors, or due to closed channel, or other things
            handle.startBackOff();
            log.acceptFailed(e, handle.getBackOffTime());
            return null;
        } finally {
            if (! ok) {
                handle.freeConnection();
            }
        }
        // by contract, only a resume will do
        return null;
    }
    /**
     * 构造器
     * @param workerThread
     * @param key
     * @param closedHandle
     */
    NioSocketStreamConnection(final WorkerThread workerThread, final SelectionKey key, final ChannelClosed closedHandle) {
        super(workerThread);
        // 创建导管,继承了 NioHandle,提供了读写方法
        conduit = new NioSocketConduit(workerThread, key, this);
        key.attach(conduit);
        this.closedHandle = closedHandle;
        // 设置导管信息目的
        setSinkConduit(conduit);
        // 设置导管的信息源头
        setSourceConduit(conduit);
    }
    /**
     * 准备就绪
     * @param ops
     */
    void handleReady(int ops) {
        try {
            if (ops == 0) {
                // the dreaded bug
                final SelectionKey key = getSelectionKey();
                // 兴趣事件
                final int interestOps = key.interestOps();
                if (interestOps != 0) {
                    ops = interestOps;
                } else {
                    // urp
                    forceTermination();
                    return;
                }
            }
            // 读兴趣事件
            if (Bits.allAreSet(ops, SelectionKey.OP_READ)) try {
                if (isReadShutdown()) suspendReads();
                // 调用读就绪处理器
                readReadyHandler.readReady();
            } catch (Throwable ignored) {
            }
            // 写兴趣事件
            if (Bits.allAreSet(ops, SelectionKey.OP_WRITE)) try {
                if (isWriteShutdown()) suspendWrites();
                // 调用写就绪处理器
                writeReadyHandler.writeReady();
            } catch (Throwable ignored) {
            }
        } catch (CancelledKeyException ignored) {}
    }

读写操作通过导管 NioSocketConduit 实现


    /**
     * 构造器
     * @param workerThread
     * @param selectionKey
     * @param connection
     */
    NioSocketConduit(final WorkerThread workerThread, final SelectionKey selectionKey, final NioSocketStreamConnection connection) {
        super(workerThread, selectionKey);
        this.connection = connection;
        this.socketChannel = (SocketChannel) selectionKey.channel();
    }
    
     public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
        long res = target.transferFrom(socketChannel, position, count);
        checkReadTimeout(res > 0L);
        return res;
    }

    public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
        return Conduits.transfer(this, count, throughBuffer, target);
    }

    public int read(final ByteBuffer dst) throws IOException {
        int res;
        try {
            res = socketChannel.read(dst);
        } catch (ClosedChannelException e) {
            return -1;
        }
        if (res != -1) checkReadTimeout(res > 0);
        else terminateReads();
        return res;
    }

    public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
        if (length == 1) {
            return read(dsts[offset]);
        }
        long res;
        try {
            res = socketChannel.read(dsts, offset, length);
        } catch (ClosedChannelException e) {
            return -1L;
        }
        if (res != -1L) checkReadTimeout(res > 0L);
        else terminateReads();
        return res;
    }
    
     /**
     * 传输
     * @param src the file to read from
     * @param position the position within the file from which the transfer is to begin
     * @param count the number of bytes to be transferred
     * @return
     * @throws IOException
     */
    public final long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
        long res = src.transferTo(position, count, socketChannel);
        checkWriteTimeout(res > 0L);
        return res;
    }

    /**
     * 传输
     * @param source the source to read from
     * @param count the number of bytes to be transferred
     * @param throughBuffer the buffer to copy through.
     * @return
     * @throws IOException
     */
    public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
        return Conduits.transfer(source, count, throughBuffer, this);
    }

    public int write(final ByteBuffer src) throws IOException {
        int res = socketChannel.write(src);
        checkWriteTimeout(res > 0);
        return res;
    }

    public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
        if (length == 1) {
            return write(srcs[offset]);
        }
        long res = socketChannel.write(srcs, offset, length);
        checkWriteTimeout(res > 0L);
        return res;
    }