Undertow源码解析之启动流程

Posted by Brade on February 9, 2022

undertow 启动流程

本文中 Undertow 使用的版本为 2.2.14.Final. 具体可以查看 Github 或者 官网 .

1 关于 Undertow

  • Undertow 是一个用 java 编写的灵活的高性能 Web 服务器, 提供基于 NIO(XNIO) 的阻塞和非阻塞 API.
  • Undertow 具有基于组合的架构, 允许通过组合小型单一用途处理程序来构建 Web 服务器.
  • Undertow 被设计为完全可嵌入的, 具有易于使用的 fluent 构建器 API, Undertow 的生命周期完全由嵌入应用程序控制.
  • UndertowJBoss 赞助, 是 Wildfly 应用程序服务器中的默认 Web 服务器.
  • Spring Cloud 也同时支持 Undertow 作为内嵌 Servlet 容器, 内存占用相对 Tomcat 较小.

2 创建 Undertow 服务器

Undertow 此服务器非 Servlet 服务器.

    public static void main(String[] args) {
        new HelloWorldServer().create();
    }

    public void create() {
        Undertow server = Undertow.builder()
                .addHttpListener(port, "localhost")
                .setHandler(exchange -> {
                    exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
                    exchange.getResponseSender().send("Hello World,this is undertow example!");
                }).build();
        server.start();
    }

3 启动流程解析

3.1 Undertow 实例化

通过 Undertow 内部静态类 Builder实现.

public static final class Builder {
        /**
         * 缓存大小
         */
        private int bufferSize;
        /**
         * accept线程
         */
        private int ioThreads;
        /**
         * 工作线程
         */
        private int workerThreads;
        /**
         * 堆外内存
         */
        private boolean directBuffers;
        /**
         * 监听器集合
         */
        private final List<ListenerConfig> listeners = new ArrayList<>();
        /**
         * http处理器
         */
        private HttpHandler handler;
        /**
         * xnio 工作者
         */
        private XnioWorker worker;
        /**
         * ssl 执行器
         */
        private Executor sslEngineDelegatedTaskExecutor;
        /**
         * byteBuffer 池
         */
        private ByteBufferPool byteBufferPool;
        /**
         * 工作者参数
         */
        private final OptionMap.Builder workerOptions = OptionMap.builder();
        /**
         * socket 参数
         */
        private final OptionMap.Builder socketOptions = OptionMap.builder();
        /**
         * server 参数
         */
        private final OptionMap.Builder serverOptions = OptionMap.builder();

        private Builder() {
            // accept 线程,为 cup 核心数,最少为 2
            ioThreads = Math.max(Runtime.getRuntime().availableProcessors(), 2);
            // 工作线程为 accept 线程的 8 倍
            workerThreads = ioThreads * 8;
            long maxMemory = Runtime.getRuntime().maxMemory();
            //smaller than 64mb of ram we use 512b buffers
            // 根据系统内存采用不同缓存和堆外内存策略
            if (maxMemory < 64 * 1024 * 1024) {
                //use 512b buffers
                directBuffers = false;
                bufferSize = 512;
            } else if (maxMemory < 128 * 1024 * 1024) {
                //use 1k buffers
                directBuffers = true;
                bufferSize = 1024;
            } else {
                //use 16k buffers for best performance
                //as 16k is generally the max amount of data that can be sent in a single write() call
                directBuffers = true;
                bufferSize = 1024 * 16 - 20; //the 20 is to allow some space for protocol headers, see UNDERTOW-1209
            }

        }

        /**
         * 创建 Undertow 实例
         * @return
         */
        public Undertow build() {
            return new Undertow(this);
        }

        public Builder addListener(ListenerBuilder listenerBuilder) {
            listeners.add(new ListenerConfig(listenerBuilder));
            return this;
        }

        /**
         * 增加 http 监听器: 监听配置默认指定对应的监听类型
         * @param port
         * @param host
         * @return
         */
        public Builder addHttpListener(int port, String host) {
            listeners.add(new ListenerConfig(ListenerType.HTTP, port, host, null, null, null));
            return this;
        }

        /**
         * 增加 https 监听器
         * @param port
         * @param host
         * @param keyManagers
         * @param trustManagers
         * @return
         */
        public Builder addHttpsListener(int port, String host, KeyManager[] keyManagers, TrustManager[] trustManagers) {
            listeners.add(new ListenerConfig(ListenerType.HTTPS, port, host, keyManagers, trustManagers, null));
            return this;
        }

        public Builder addHttpsListener(int port, String host, SSLContext sslContext) {
            listeners.add(new ListenerConfig(ListenerType.HTTPS, port, host, sslContext, null));
            return this;
        }

        /**
         * 增加 Ajp 监听器
         * @param port
         * @param host
         * @return
         */
        public Builder addAjpListener(int port, String host) {
            listeners.add(new ListenerConfig(ListenerType.AJP, port, host, null, null, null));
            return this;
        }

        public Builder addHttpListener(int port, String host, HttpHandler rootHandler) {
            listeners.add(new ListenerConfig(ListenerType.HTTP, port, host, null, null, rootHandler));
            return this;
        }

        public Builder addHttpsListener(int port, String host, KeyManager[] keyManagers, TrustManager[] trustManagers, HttpHandler rootHandler) {
            listeners.add(new ListenerConfig(ListenerType.HTTPS, port, host, keyManagers, trustManagers, rootHandler));
            return this;
        }

        public Builder addHttpsListener(int port, String host, SSLContext sslContext, HttpHandler rootHandler) {
            listeners.add(new ListenerConfig(ListenerType.HTTPS, port, host, sslContext, rootHandler));
            return this;
        }

        public Builder addAjpListener(int port, String host, HttpHandler rootHandler) {
            listeners.add(new ListenerConfig(ListenerType.AJP, port, host, null, null, rootHandler));
            return this;
        }

        public Builder setBufferSize(final int bufferSize) {
            this.bufferSize = bufferSize;
            return this;
        }

        public Builder setIoThreads(final int ioThreads) {
            this.ioThreads = ioThreads;
            return this;
        }

        public Builder setWorkerThreads(final int workerThreads) {
            this.workerThreads = workerThreads;
            return this;
        }

        public Builder setDirectBuffers(final boolean directBuffers) {
            this.directBuffers = directBuffers;
            return this;
        }

    /**
     * 增加处理器
     * @param handler
     * @return
     */
    public Builder setHandler(final HttpHandler handler) {
            this.handler = handler;
            return this;
        }

        public <T> Builder setServerOption(final Option<T> option, final T value) {
            serverOptions.set(option, value);
            return this;
        }

        public <T> Builder setSocketOption(final Option<T> option, final T value) {
            socketOptions.set(option, value);
            return this;
        }

        public <T> Builder setWorkerOption(final Option<T> option, final T value) {
            workerOptions.set(option, value);
            return this;
        }

        /**
         * When null (the default), a new {@link XnioWorker} will be created according
         * to the various worker-related configuration (ioThreads, workerThreads, workerOptions)
         * when {@link Undertow#start()} is called.
         * Additionally, this newly created worker will be shutdown when {@link Undertow#stop()} is called.
         * <br>
         * <p>
         * When non-null, the provided {@link XnioWorker} will be reused instead of creating a new {@link XnioWorker}
         * when {@link Undertow#start()} is called.
         * Additionally, the provided {@link XnioWorker} will NOT be shutdown when {@link Undertow#stop()} is called.
         * Essentially, the lifecycle of the provided worker must be maintained outside of the {@link Undertow} instance.
         */
        public Builder setWorker(XnioWorker worker) {
            this.worker = worker;
            return this;
        }

        public Builder setSslEngineDelegatedTaskExecutor(Executor sslEngineDelegatedTaskExecutor) {
            this.sslEngineDelegatedTaskExecutor = sslEngineDelegatedTaskExecutor;
            return this;
        }

        public Builder setByteBufferPool(ByteBufferPool byteBufferPool) {
            this.byteBufferPool = byteBufferPool;
            return this;
        }
    }

3.2 Undertow 启动

    public synchronized void start() {
        UndertowLogger.ROOT_LOGGER.infof("starting server: %s", Version.getFullVersionString());
        // 创建 xnio 实例
        xnio = Xnio.getInstance(Undertow.class.getClassLoader());
        channels = new ArrayList<>();
        try {
            if (internalWorker) {
                // 不提供指定的 xnioWorker ,自己创建一个
                worker = xnio.createWorker(OptionMap.builder()
                        .set(Options.WORKER_IO_THREADS, ioThreads)
                        .set(Options.CONNECTION_HIGH_WATER, 1000000)
                        .set(Options.CONNECTION_LOW_WATER, 1000000)
                        .set(Options.WORKER_TASK_CORE_THREADS, workerThreads)
                        .set(Options.WORKER_TASK_MAX_THREADS, workerThreads)
                        .set(Options.TCP_NODELAY, true)
                        .set(Options.CORK, true)
                        .addAll(workerOptions)
                        .getMap());
            }

            OptionMap socketOptions = OptionMap.builder()
                    .set(Options.WORKER_IO_THREADS, worker.getIoThreadCount())
                    .set(Options.TCP_NODELAY, true)
                    .set(Options.REUSE_ADDRESSES, true)
                    .set(Options.BALANCING_TOKENS, 1)
                    .set(Options.BALANCING_CONNECTIONS, 2)
                    .set(Options.BACKLOG, 1000)
                    .addAll(this.socketOptions)
                    .getMap();

            OptionMap serverOptions = OptionMap.builder()
                    .set(UndertowOptions.NO_REQUEST_TIMEOUT, 60 * 1000)
                    .addAll(this.serverOptions)
                    .getMap();


            ByteBufferPool buffers = this.byteBufferPool;
            if (buffers == null) {
                // 如果为空则创建默认的 byteBuffer池
                buffers = new DefaultByteBufferPool(directBuffers, bufferSize, -1, 4);
            }

            listenerInfo = new ArrayList<>();
            for (ListenerConfig listener : listeners) {
                UndertowLogger.ROOT_LOGGER.debugf("Configuring listener with protocol %s for interface %s and port %s", listener.type, listener.host, listener.port);
                // 根处理器默认从监听器配置中取,为空则从当前对象获取
                final HttpHandler rootHandler = listener.rootHandler != null ? listener.rootHandler : this.rootHandler;
                OptionMap socketOptionsWithOverrides = OptionMap.builder().addAll(socketOptions).addAll(listener.overrideSocketOptions).getMap();
                // AJP 协议
                if (listener.type == ListenerType.AJP) {
                    AjpOpenListener openListener = new AjpOpenListener(buffers, serverOptions);
                    openListener.setRootHandler(rootHandler);

                    final ChannelListener<StreamConnection> finalListener;
                    // 如果使用代理协议
                    if (listener.useProxyProtocol) {
                        // 采用代理连接监听器
                        finalListener = new ProxyProtocolOpenListener(openListener, null, buffers, OptionMap.EMPTY);
                    } else {
                        finalListener = openListener;
                    }
                    // 实例化 accept监听器
                    ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(finalListener);
                    // 实例化流连接通道,xnio
                    AcceptingChannel<? extends StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), acceptListener, socketOptionsWithOverrides);
                    // 恢复连接,默认实现是 NioTcpServer
                    server.resumeAccepts();
                    channels.add(server);
                    // 增加 ajp 监听器信息
                    listenerInfo.add(new ListenerInfo("ajp", server.getLocalAddress(), openListener, null, server));
                } else {
                    OptionMap undertowOptions = OptionMap.builder().set(UndertowOptions.BUFFER_PIPELINED_DATA, true).addAll(serverOptions).getMap();
                    // 是否采用 http2
                    boolean http2 = serverOptions.get(UndertowOptions.ENABLE_HTTP2, false);
                    // http 协议
                    if (listener.type == ListenerType.HTTP) {
                        HttpOpenListener openListener = new HttpOpenListener(buffers, undertowOptions);
                        HttpHandler handler = rootHandler;
                        if (http2) {
                            handler = new Http2UpgradeHandler(handler);
                        }
                        openListener.setRootHandler(handler);
                        final ChannelListener<StreamConnection> finalListener;
                        if (listener.useProxyProtocol) {
                            finalListener = new ProxyProtocolOpenListener(openListener, null, buffers, OptionMap.EMPTY);
                        } else {
                            finalListener = openListener;
                        }
                        // 实例化 accept监听器
                        ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(finalListener);
                        // 实例化流连接通道,xnio
                        AcceptingChannel<? extends StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), acceptListener, socketOptionsWithOverrides);
                        // 恢复连接,默认实现是 NioTcpServer
                        server.resumeAccepts();
                        channels.add(server);
                        // 增加 http 监听器信息
                        listenerInfo.add(new ListenerInfo("http", server.getLocalAddress(), openListener, null, server));
                        // https 协议
                    } else if (listener.type == ListenerType.HTTPS) {
                        OpenListener openListener;

                        HttpOpenListener httpOpenListener = new HttpOpenListener(buffers, undertowOptions);
                        httpOpenListener.setRootHandler(rootHandler);

                        if (http2) {
                            AlpnOpenListener alpn = new AlpnOpenListener(buffers, undertowOptions, httpOpenListener);
                            Http2OpenListener http2Listener = new Http2OpenListener(buffers, undertowOptions);
                            http2Listener.setRootHandler(rootHandler);
                            alpn.addProtocol(Http2OpenListener.HTTP2, http2Listener, 10);
                            alpn.addProtocol(Http2OpenListener.HTTP2_14, http2Listener, 7);
                            openListener = alpn;
                        } else {
                            openListener = httpOpenListener;
                        }

                        UndertowXnioSsl xnioSsl;
                        // 如果 ssl 上下文不为空
                        if (listener.sslContext != null) {
                            xnioSsl = new UndertowXnioSsl(xnio, OptionMap.create(Options.USE_DIRECT_BUFFERS, true), listener.sslContext, sslEngineDelegatedTaskExecutor);
                        } else {
                            OptionMap.Builder builder = OptionMap.builder()
                                    .addAll(socketOptionsWithOverrides);
                            if (!socketOptionsWithOverrides.contains(Options.SSL_PROTOCOL)) {
                                builder.set(Options.SSL_PROTOCOL, "TLSv1.2");
                            }
                            xnioSsl = new UndertowXnioSsl(
                                    xnio,
                                    OptionMap.create(Options.USE_DIRECT_BUFFERS, true),
                                    JsseSslUtils.createSSLContext(listener.keyManagers, listener.trustManagers, new SecureRandom(), builder.getMap()),
                                    sslEngineDelegatedTaskExecutor);
                        }

                        AcceptingChannel<? extends StreamConnection> sslServer;
                        // 使用代理协议
                        if (listener.useProxyProtocol) {
                            ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(new ProxyProtocolOpenListener(openListener, xnioSsl, buffers, socketOptionsWithOverrides));
                            sslServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), (ChannelListener) acceptListener, socketOptionsWithOverrides);
                        } else {
                            ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(openListener);
                            sslServer = xnioSsl.createSslConnectionServer(worker, new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), (ChannelListener) acceptListener, socketOptionsWithOverrides);
                        }

                        sslServer.resumeAccepts();
                        channels.add(sslServer);
                        // 增加 https 监听器信息
                        listenerInfo.add(new ListenerInfo("https", sslServer.getLocalAddress(), openListener, xnioSsl, sslServer));
                    }
                }

            }

        } catch (Exception e) {
            if(internalWorker && worker != null) {
                worker.shutdownNow();
            }
            throw new RuntimeException(e);
        }
    }