netty源码解析之NioEventLoop

NioEventLoop

Posted by Brade on July 5, 2021

NioEventLoop 源码解析

本文中 netty 采用的版本为 4.1.66.Final-SNAPSHOTNioEventLoop类图如下所示:

img.png 最顶层跟NioEventLoopGroup一样,都是Executor ==> ExecutorService

EventExecutor

接口,继承了EventExecutorGroup,是一个特殊的EventExecutorGroup,新定义了一些查看Thread是否在事件循环中执行。

public interface EventExecutor extends EventExecutorGroup {

    /**
     * Returns a reference to itself.
     */
    @Override
    EventExecutor next();

    /**
     * 返回当前 EventExecutor 的父级EventExecutorGroup
     * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
     */
    EventExecutorGroup parent();

    /**
     * 当前线程是否在 EventLoop
     * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
     */
    boolean inEventLoop();

    /**
     * 给出的指定线程是否在 EventLoop
     * Return {@code true} if the given {@link Thread} is executed in the event loop,
     * {@code false} otherwise.
     */
    boolean inEventLoop(Thread thread);

    /**
     * Return a new {@link Promise}.
     */
    <V> Promise<V> newPromise();

    /**
     * Create a new {@link ProgressivePromise}.
     */
    <V> ProgressivePromise<V> newProgressivePromise();

    /**
     * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
     * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     */
    <V> Future<V> newSucceededFuture(V result);

    /**
     * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
     * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
     * every call of blocking methods will just return without blocking.
     */
    <V> Future<V> newFailedFuture(Throwable cause);
}

OrderedEventExecutor

接口,只继承了 EventExecutor,未做任何处理,空白接口。

AbstractExecutorService

JDK中定义普通类,是ExecutorService的默认实现类。此类说明:该类使用newTaskFor 返回的RunnableFuture 实现submitinvokeAnyinvokeAll 方法,默认为该包中提供的FutureTask 类。例如,submit(Runnable) 的实现创建了一个关联的 RunnableFuture,它被执行并返回。 子类可以覆盖 newTaskFor 方法以返回除 FutureTask 之外的 RunnableFuture 实现。

 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

AbstractEventExecutor

抽象类,继承AbstractExecutorService 然后实现了EventExecutor中定义的和继承自EventExecutorGroup的接口。

AbstractScheduledEventExecutor

抽象类,继承AbstractEventExecutor,扩展了支持调度的AbstractEventExecutor

初始化了一个长度为11的调度队列DefaultPriorityQueue,队列是一个ScheduledFutureTask数组。

    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                    SCHEDULED_FUTURE_TASK_COMPARATOR,
                    // Use same initial capacity as java.util.PriorityQueue
                    11);
        }
        return scheduledTaskQueue;
    }

队列长度增长算法如下:当队列长度小于64时,增长为原来的2倍 + 2,大于等于是增长为原来的1.5倍

    @Override
    public boolean offer(T e) {
        if (e.priorityQueueIndex(this) != INDEX_NOT_IN_QUEUE) {
            throw new IllegalArgumentException("e.priorityQueueIndex(): " + e.priorityQueueIndex(this) +
                    " (expected: " + INDEX_NOT_IN_QUEUE + ") + e: " + e);
        }

        // Check that the array capacity is enough to hold values by doubling capacity.
        if (size >= queue.length) {
            // Use a policy which allows for a 0 initial capacity. Same policy as JDK's priority queue, double when
            // "small", then grow by 50% when "large".
            queue = Arrays.copyOf(queue, queue.length + ((queue.length < 64) ?
                                                         (queue.length + 2) :
                                                         (queue.length >>> 1)));
        }

        bubbleUp(size++, e);
        return true;
    }

具体调度实现在这个方法:

    private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        // 当前线程如果在EventLoop中,直接放到任务队列
        if (inEventLoop()) {
            scheduleFromEventLoop(task);
        } else {
            final long deadlineNanos = task.deadlineNanos();
            // 当前线程的不在EventLoop,调度任务也未过期,直接执行
            // beforeScheduledTaskSubmitted 和 afterScheduledTaskSubmitted 在NioEventLoop/EpollEventLoop中重写 
            if (beforeScheduledTaskSubmitted(deadlineNanos)) {
                execute(task);
            } else {
                // 懒加载执行,具体功能在子类SingleThreadEventExecutor 中的 private void execute(Runnable task, boolean immediate) 方法实现
                lazyExecute(task);
                // 调度任务已经提交
                if (afterScheduledTaskSubmitted(deadlineNanos)) {
                // 执行唤醒任务
                    execute(WAKEUP_TASK);
                }
            }
        }

        return task;
    }

SingleThreadEventExecutor

抽象类,继承了AbstractScheduledEventExecutor,实现了 OrderedEventExecutor单个线程中执行所有提交的任务。实际上是大部分实现都在这个类,跟MultithreadEventExecutorGroup对应,子类也是调用的它里面方法。

构造器主要是这个:

 /**
     * Create a new instance
     *
     * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
     * @param executor          the {@link Executor} which will be used for executing
     * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
     *                          executor thread
     * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
     * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
     */
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        // 当前Executor的直属集合
        super(parent);
        // 是否增加到唤醒任务,增加后会唤醒执行线程
        this.addTaskWakesUp = addTaskWakesUp;
        // 允许的最大挂起任务数,默认16
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        // 当前使用的执行器
        this.executor = ThreadExecutorMap.apply(executor, this);
        // 任务队列
        taskQueue = newTaskQueue(this.maxPendingTasks);
        // 拒绝策略,当超出最大挂起任务后执行
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

具体实现了父类的 AbstractScheduledEventExecutor lazyExecute() 方法:

    @Override
    public void lazyExecute(Runnable task) {
        execute(ObjectUtil.checkNotNull(task, "task"), false);
    }

    private void execute(Runnable task, boolean immediate) {
        // 判断当前线程是否在EventLoop,其实就是判断正在执行的线程与EventLoop中的线程是不是同一个
        boolean inEventLoop = inEventLoop();
        // 将线程增加到执行队列
        addTask(task);
        // 不在eventLoop
        if (!inEventLoop) {
            // 开始启动线程
            startThread();
            // 线程已经停止
            if (isShutdown()) {
                boolean reject = false;
                try {
                    // 先从队列移除线程
                    if (removeTask(task)) {
                        // 拒绝状态设置为true
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                // 拒绝状态为true
                if (reject) {
                    // 执行拒绝方法:抛个异常 RejectedExecutionException
                    reject();
                }
            }
        }

        // 如果线程不需要唤醒且立刻执行:
        if (!addTaskWakesUp && immediate) {
            // 执行唤醒方法,参数为是否在inEventLoop
            wakeup(inEventLoop);
        }
    }
    /**
     * 开始启动线程
     */
    private void startThread() {
        // 线程状态必须是 未启动:1
        if (state == ST_NOT_STARTED) {
            // SingleThreadEventExecutor 实例的状态字段state 从 ST_NOT_STARTED=1 设置为 ST_STARTED=2,即未启动==>启动
            // 调用的是compareAndSet方法,先比较再设值 ,这是一个原子方法,同时多线程状态效率高,采用分段set的方法
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    // 开始调用启动业务方法
                    doStartThread();
                    // 启动状态设置
                    success = true;
                } finally {
                    // 如果启动失败
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    /**
     * 线程启动的具体业务方法
     */
    private void doStartThread() {
        // 线程对象必须为空
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // 线程对象设值为当前线程
                thread = Thread.currentThread();
                // 如果打断状态为true,执行线程打断的方法
                if (interrupted) {
                    thread.interrupt();
                }
                // 设置一个执行是否成功状态字段
                boolean success = false;
                // 更新最近一次执行提交的任务的时间
                updateLastExecutionTime();
                try {
                    // run 方法在子类 NioEventLoop 中重写,实际上调用的是NioEventLoop.run()
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    // 死循环,检查线程状态,直到线程执行完毕
                    for (;;) {
                        int oldState = state;
                        // 如果当前线程状态为已经停止或者设置当前实例状态为停止状态设置成功,就中断死循环
                        // 当前线程状态设置为 ST_SHUTTING_DOWN = 3
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    // 检查是否在循环结束时调用了 confirmShutdown() 确认线程停止
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks. At this point the event loop
                        // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                        // graceful shutdown with quietPeriod.
                        // 运行任务队列中所有剩余的任务和关闭钩子。 此时,事件循环处于 ST_SHUTTING_DOWN 状态,仍在接受使用 quietPeriod 正常关闭所需的任务。
                        // 死循环,检查线程确实关闭
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }

                        // Now we want to make sure no more tasks can be added from this point. This is
                        // achieved by switching the state. Any new tasks beyond this point will be rejected.
                        // 现在我们要确保从这一点开始不能再添加任务。 这是通过切换状态来实现的。 任何超出此点的新任务都将被拒绝。
                        // 当前线程状态设置为 ST_SHUTTING_DOWN = 4
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break;
                            }
                        }

                        // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                        // No need to loop here, this is the final pass.
                        // 队列中的任务已经完成了,不能再添加了,运行所有剩余的任务。这里不需要循环,这是最后的关卡。
                        confirmShutdown();
                    } finally {
                        try {
                            // 当前类不实现,在子类NioEventLoop中重写,其实就是关闭选择器 selector.close();
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            // 移除线程的所有 FastThreadLocals。 用户可能会阻塞在future 中,一旦它解除阻塞,JVM 可能会终止并开始卸载类
                            FastThreadLocal.removeAll();

                            // 当前线程状态设置为 ST_TERMINATED=5
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            // 倒计时计数器开始工作
                            threadLock.countDown();
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

    /**
     * 检查队列中的任务数,唤醒任务除外
     * @return
     */
    final int drainTasks() {
        int numTasks = 0;
        for (;;) {
            Runnable runnable = taskQueue.poll();
            if (runnable == null) {
                break;
            }
            // WAKEUP_TASK should be just discarded as these are added internally.
            // The important bit is that we not have any user tasks left.
            if (WAKEUP_TASK != runnable) {
                numTasks++;
            }
        }
        return numTasks;
    }

SingleThreadEventLoop

抽象类,继承了 SingleThreadEventExecutor ,再实现了EventLoop里面的接口。

比如注册 channelselector 上的接口


    @Override
    public EventLoop next() {
        //调用的父类 AbstractEventExecutor中的 next()
        return (EventLoop) super.next();
    }
    
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // 调用Channel==>默认实现类,抽象类AbstractChannel中的 AbstractUnsafe.register()
        // ==> AbstractUnsafe.register0() ==> AbstractNioChannel.doRegister()
        // 从而把channel注册在了selector上
        // 其中的selector是调用的 NioEventLoop.unwrappedSelector()
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    

NioEventLoop

普通类,继承了SingleThreadEventLoop,实现多路复用selector的注册方法。

构造器,NioEventLoopGroupnewChild() 进行了调用:


    /**
     * 构造器,NioEventLoopGroup 的 newChild() 进行了调用
     * @param parent 
     * @param executor
     * @param selectorProvider
     * @param strategy
     * @param rejectedExecutionHandler
     * @param taskQueueFactory
     * @param tailTaskQueueFactory
     */
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
        // 调用父类的构造器,单线程事件循环
        super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
    

run()方法,在 SingleThreadEventExecutor 中的 doStartThread() 中进行调用


    @Override
    protected void run() {
        int selectCnt = 0;
        // 死循环
        for (;;) {
            try {
                int strategy;
                try {
                    // 计算selector策略:如果任务队列不为空,返回 selectNow(),否则返回 SelectStrategy.SELECT
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                // 如果用于处理I/O操作的比例为100%
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            // 调用处理 processSelectedKeys策略方法
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        // 执行所有任务
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        // 调用处理 processSelectedKeys策略方法
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 轮询任务队列中的所有任务,如果运行时间超过设置的超时时间则停止队列任务并返回
                        // 默认50,超时时间刚好 ioTime * 1,50 < ioRatio < 100 则越靠近100,处理任务时间越长
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                // 任务执行完毕或者 strategy>0
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) 意外唤醒(异常情况)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Error e) {
                throw (Error) e;
            } catch (Throwable t) {
                handleLoopException(t);
            } finally {
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        // 关闭全部channel
                        closeAll();
                        // 确认全部线程关闭
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw (Error) e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

打开创建 selector


    /**
     * 打开选择器,返回选择器元组
     * @return SelectorTuple
     */
    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            // 默认调用WindowsSelectorProvider.openSelector()
            // ==> new WindowsSelectorImpl(this)
            // 因为jdk是下载的win环境下的
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

run方法中调用了处理SelectedKeys的策略。


    /**
     * 处理 SelectedKeys 策略
     */
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            //优化过的selectedKeys采用数组,增加元素和扩容进行了优化,采用了数组,add操作永远都是O(1)的时间复杂度
            processSelectedKeysOptimized();
        } else {
            // 普通的selectedKeys是个set
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }