XNIO 启动流程
本文中
XNIO
使用的版本为3.8.4.Final
.
1 什么是 XNIO
JBOSS
开源的一个以NIO
思想为基础的异步IO
框架. 与netty
类似,只是使用ChannelListener
进行事件通知 .同时
XNIO
继承重用了JDK NIO
的ByteBuffer
类;XNIO
通过封装NIO
的FileChannel
中的方法transferTo
和transferFrom
来实现zero-copy(零拷贝)
.
XNIO
由api
和默认实现nio-imp
组成. 也可以在其 GitHub 上找到并使用其它实现.
2 创建 XNIO
服务器及客户端
2.1 创建服务端
public class XNioServer {
private final int SERVER_PORT = 8080;
public static void main(String[] args) throws IOException {
XNioServer server = new XNioServer();
server.createXnioServer();
}
public void createXnioServer() throws IOException {
int readThreads = (int) Math.round(Math.random() * 10);
if (readThreads == 0) {
readThreads = 1;
}
int writeThreads = (int) Math.round(Math.random() * 10);
if (writeThreads == 0) {
writeThreads = 1;
}
Xnio xnio = Xnio.getInstance("nio", XNioServerTest.class.getClassLoader());
OptionMap options = OptionMap.create(Options.WORKER_WRITE_THREADS, writeThreads, Options.WORKER_READ_THREADS, readThreads);
XnioWorker worker = xnio.createWorker(options);
SocketAddress bindAddress = new InetSocketAddress(Inet4Address.getByAddress(new byte[]{127, 0, 0, 1}), SERVER_PORT);
ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = channel -> {
try {
final StreamConnection connection = channel.accept();
System.out.println("server connection ===>: " + connection.isOpen());
ByteBuffer inboundBuf = ByteBuffer.allocate(512);
Charset charset = Charset.forName(charset);
final ConduitStreamSourceChannel sourceChannel = connection.getSourceChannel();
final ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel();
connection.getSourceChannel().setReadListener(c -> {
try {
if (c.read(inboundBuf) == -1) {
inboundBuf.flip();
System.out.print("server recive ===>: " + charset.decode(inboundBuf));
sinkChannel.write(ByteBuffer.wrap("create Server,hello world!".getBytes("UTF-8")));
sinkChannel.flush();
sinkChannel.shutdownWrites();
} else {
c.resumeReads();
}
} catch (IOException e) {
IoUtils.safeClose(c);
}
});
sourceChannel.resumeReads();
} catch (IOException e) {
IoUtils.safeClose(channel);
}
};
AcceptingChannel<StreamConnection> server = worker.createStreamConnectionServer(bindAddress, acceptListener, options);
server.resumeAccepts();
System.out.println("server is open ===>: " + server.isOpen());
}
}
2.2 创建客户端
public class XNioClient {
public static void main(String[] args) throws Exception {
new XNioClient().createXnioClient();
}
public void createXnioClient() throws Exception {
final Charset charset = Charset.forName("utf-8");
//创建Xnio实例,并构造XnioWorker
final Xnio xnio = Xnio.getInstance();
final XnioWorker worker = xnio.createWorker(OptionMap.EMPTY);
final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8080);
final IoFuture<StreamConnection> streamConnectionIoFuture = worker
.openStreamConnection(inetSocketAddress, null, OptionMap.EMPTY);
StreamConnection connection = streamConnectionIoFuture.get();
System.out.println("connection.isOpen ===>: " + connection.isOpen());
ConduitStreamSinkChannel sinkChannel = connection.getSinkChannel();
sinkChannel.write(ByteBuffer.wrap("Hello world! this client send. \n".getBytes(charset)));
sinkChannel.flush();
sinkChannel.shutdownWrites();
ConduitStreamSourceChannel sourceChannel = connection.getSourceChannel();
ByteBuffer inboundBuf = ByteBuffer.allocate(512);
sourceChannel.setReadListener(channel -> {
try {
if (channel.read(inboundBuf) == -1) {
inboundBuf.flip();
System.out.println("client recive: ==>" + charset.decode(inboundBuf));
channel.shutdownReads();
} else {
channel.resumeReads();
}
} catch (IOException e) {
IoUtils.safeClose(channel);
}
});
sourceChannel.resumeReads();
}
}
3 启动流程解析
主要分析服务端的流程,客户端略.
NioXnioWorker
->QueuedNioTcpServer2
->NioTcpServer
->NioSocketStreamConnection
->NioSocketConduit
3.1 Xnio
实例化
通过
Xnio
的getInstance
方法,最终采用的是JDK
的SPI
技术,找到对应的接口XnioProvider
,服务提供者实现.
public static Xnio getInstance(String provider, final ClassLoader classLoader) {
return doGetInstance(provider, (ServiceLoader)AccessController.doPrivileged(new PrivilegedAction<ServiceLoader<XnioProvider>>() {
public ServiceLoader<XnioProvider> run() {
return ServiceLoader.load(XnioProvider.class, classLoader);
}
}));
}
3.1.1 XnioProvider
找到默认实现
nio-impl
包中META-INF/services
目录下的org.xnio.XnioProvider
文件,里面对应的接口是org.xnio.nio.NioXnioProvider
.
3.1.2 NioXnioProvider
此实现类中只是简单的提供了一个获取实例的方法,对应的实例是
NioXnio
.
public final class NioXnioProvider implements XnioProvider {
private static final Xnio INSTANCE = new NioXnio();
public NioXnioProvider() {
}
public Xnio getInstance() {
return INSTANCE;
}
public String getName() {
return INSTANCE.getName();
}
}
3.1.3 NioXnio
构造器中的逻辑是根据操作环境和
jdk
版本获取对应的选择器创建者SelectorCreator
. 如果使用的window
环境,一般都是使用的WindowsSelectorProvider
实现.
3.2 XnioWorker
实例化
通过调用
Xnio#createWorker
方法,再调用XnioWorker.Builder#build
方法,最后调用NioXnio#build
方法实现public XnioWorker createWorker(OptionMap optionMap) throws IOException, IllegalArgumentException { return this.createWorker((ThreadGroup)null, optionMap); }
public XnioWorker createWorker(ThreadGroup threadGroup, OptionMap optionMap, Runnable terminationTask) throws IOException, IllegalArgumentException { Builder workerBuilder = this.createWorkerBuilder(); workerBuilder.populateFromOptions(optionMap); workerBuilder.setThreadGroup(threadGroup); workerBuilder.setTerminationTask(terminationTask); return workerBuilder.build(); }
protected XnioWorker build(Builder builder) { NioXnioWorker worker = new NioXnioWorker(builder); worker.start(); return worker; }
注意
Builder
类,里面给了些配置参数的默认值/** * 工作线程池构建器: 核心线程 4, 最大工作线程池数 16,工作线程 1 */ public static class Builder { private final Xnio xnio; private ExecutorService externalExecutorService; private Runnable terminationTask; private String workerName; private int coreWorkerPoolSize = 4; private int maxWorkerPoolSize = 16; private ThreadGroup threadGroup; private boolean daemon; private int workerKeepAlive = 60000; private int workerIoThreads = 1; private long workerStackSize = 0L; private CidrAddressTable<InetSocketAddress> bindAddressConfigurations = new CidrAddressTable(); }
3.2.1 NioXnioWorker
先调用父类
XnioWorker
的构造方法,初始化线程池;
/**
* 构建一个新实例,旨在仅从实现中调用
*
* @param builder the worker builder
*/
protected XnioWorker(final Builder builder) {
this.xnio = builder.xnio;
this.terminationTask = builder.terminationTask;
final SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(CREATE_WORKER_PERMISSION);
}
String workerName = builder.getWorkerName();
// 线程名为 XNIO + 序列号
if (workerName == null) {
workerName = "XNIO-" + seq.getAndIncrement();
}
name = workerName;
final boolean markThreadAsDaemon = builder.isDaemon();
bindAddressTable = builder.getBindAddressConfigurations();
final Runnable terminationTask = new Runnable() {
public void run() {
// 调用停止任务线程池方法,模板方法,由子类实现,默认为 NioXnioWorkder
taskPoolTerminated();
}
};
final ExecutorService executorService = builder.getExternalExecutorService();
// 实例化任务线程池
if (executorService != null) {
// 增强的队列执行器任务线程池: jboss提供
if (executorService instanceof EnhancedQueueExecutor) {
taskPool = new ExternalTaskPool(
new EnhancedQueueExecutorTaskPool((EnhancedQueueExecutor) executorService));
} else if (executorService instanceof ThreadPoolExecutor) {
taskPool = new ExternalTaskPool(new ThreadPoolExecutorTaskPool((ThreadPoolExecutor) executorService));
} else {
taskPool = new ExternalTaskPool(new ExecutorServiceTaskPool(executorService));
}
} else if (EnhancedQueueExecutor.DISABLE_HINT) { // 禁用提示
final int poolSize = max(builder.getMaxWorkerPoolSize(), builder.getCoreWorkerPoolSize());
taskPool = new ThreadPoolExecutorTaskPool(new DefaultThreadPoolExecutor(
poolSize,
poolSize,
builder.getWorkerKeepAlive(), TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(),
new WorkerThreadFactory(builder.getThreadGroup(), builder.getWorkerStackSize(), markThreadAsDaemon),
terminationTask));
} else {
taskPool = new EnhancedQueueExecutorTaskPool(new EnhancedQueueExecutor.Builder()
.setCorePoolSize(builder.getCoreWorkerPoolSize())
.setMaximumPoolSize(builder.getMaxWorkerPoolSize())
.setKeepAliveTime(builder.getWorkerKeepAlive(), TimeUnit.MILLISECONDS)
.setThreadFactory(new WorkerThreadFactory(builder.getThreadGroup(), builder.getWorkerStackSize(), markThreadAsDaemon))
.setTerminationTask(terminationTask)
.setRegisterMBean(true)
.setMBeanName(workerName)
.build()
);
}
}
再调用构造方法进行工作线程的设置.
NioXnioWorker(final Builder builder) {
super(builder);
final NioXnio xnio = (NioXnio) builder.getXnio();
/**
* 线程数量: 默认为 1
*/
final int threadCount = builder.getWorkerIoThreads();
/**
* 工作栈大小
*/
this.workerStackSize = builder.getWorkerStackSize();
final String workerName = getName();
/**
* 工作线程数组: 根据线程数量创建
*/
WorkerThread[] workerThreads;
workerThreads = new WorkerThread[threadCount];
/**
* 线程组
*/
final ThreadGroup threadGroup = builder.getThreadGroup();
/**
* 是否守护线程
*/
final boolean markWorkerThreadAsDaemon = builder.isDaemon();
boolean ok = false;
try {
// 一个工作线程 -> 一个线程选择器(属于给定的线程组,有指定的工作栈大小)
for (int i = 0; i < threadCount; i++) {
final Selector threadSelector;
try {
// 线程选择器:主选择器创建器,windows 为 WindowsSelectorProvider 所创建
threadSelector = xnio.mainSelectorCreator.open();
} catch (IOException e) {
throw Log.log.unexpectedSelectorOpenProblem(e);
}
// 创建工作线程并指定属于哪个线程组,指定栈大小
final WorkerThread workerThread = new WorkerThread(this, threadSelector, String.format("%s I/O-%d", workerName, Integer.valueOf(i + 1)), threadGroup, workerStackSize, i);
// Mark as daemon if the Options.THREAD_DAEMON has been set
if (markWorkerThreadAsDaemon) {
workerThread.setDaemon(true);
}
workerThreads[i] = workerThread;
}
final Selector threadSelector;
try {
threadSelector = xnio.mainSelectorCreator.open();
} catch (IOException e) {
throw Log.log.unexpectedSelectorOpenProblem(e);
}
// 创建接受线程:用于接受请求,只用一个线程
acceptThread = new WorkerThread(this, threadSelector, String.format("%s Accept", workerName), threadGroup, workerStackSize, threadCount);
if (markWorkerThreadAsDaemon) {
acceptThread.setDaemon(true);
}
ok = true;
} finally {
if (! ok) {
for (WorkerThread worker : workerThreads) {
if (worker != null) safeClose(worker.getSelector());
}
}
}
this.workerThreads = workerThreads;
// 工作线程技术指标
this.metrics = new NioWorkerMetrics(workerName);
metrics.register();
}
调用线程启动
worker.start()
方法 .
/**
* 启动工作线程
*/
void start() {
for (WorkerThread worker : workerThreads) {
openResourceUnconditionally();
worker.start();
}
openResourceUnconditionally();
acceptThread.start();
}
3.2.2 WorkerThread
工作线程启动方法.
/** * 从选择器获取连接及注册兴趣事件 */ public void run() { final Selector selector = this.selector; try { log.tracef("Starting worker thread %s", this); final Object lock = workLock; final Queue<Runnable> workQueue = selectorWorkQueue; final TreeSet<TimeKey> delayQueue = delayWorkQueue; log.debugf("Started channel thread '%s', selector %s", currentThread().getName(), selector); Runnable task; Iterator<TimeKey> iterator; long delayTime = Long.MAX_VALUE; Set<SelectionKey> selectedKeys; SelectionKey[] keys = new SelectionKey[16]; int oldState; int keyCount; // 列循环执行线程任务 for (;;) { // Run all tasks do { synchronized (lock) { // 从工作队列弹出工作线程 task = workQueue.poll(); if (task == null) { iterator = delayQueue.iterator(); delayTime = Long.MAX_VALUE; // 延期队列迭代器有值 if (iterator.hasNext()) { final long now = nanoTime(); do { // 时间键 final TimeKey key = iterator.next(); // 截止时间 <= 当前时间 - 开始时间 ,即已超时 if (key.deadline <= (now - START_TIME)) { // 将线程加到工作队列 workQueue.add(key.command); // 延迟队列移除时间键 iterator.remove(); } else { // 延期时间(超时剩余时间) = 截止时间 - (当前时间 - 开始时间) delayTime = key.deadline - (now - START_TIME); // the rest are in the future break; } } while (iterator.hasNext()); } // 再次从工作队列弹出工作线程 task = workQueue.poll(); } } // clear interrupt status Thread.interrupted(); // 执行工作线程 safeRun(task); } while (task != null); // all tasks have been run oldState = state; // 线程状态:未停止 if ((oldState & SHUTDOWN) != 0) { synchronized (lock) { // 选择键数量 keyCount = selector.keys().size(); state = keyCount | SHUTDOWN; // 队列线程执行完毕 if (keyCount == 0 && workQueue.isEmpty()) { // no keys or tasks left, shut down (delay tasks are discarded) return; } } synchronized (selector) { final Set<SelectionKey> keySet = selector.keys(); synchronized (keySet) { // 清空当前选择器中的选择键 keys = keySet.toArray(keys); Arrays.fill(keys, keySet.size(), keys.length, null); } } // shut em down // 两次遍历选择键 for (int i = 0; i < keys.length; i++) { final SelectionKey key = keys[i]; // 为空则中断 if (key == null) break; //end of list // 不为空置为空 keys[i] = null; final NioHandle attachment = (NioHandle) key.attachment(); // 有处理器不为空强制关闭 if (attachment != null) { safeClose(key.channel()); attachment.forceTermination(); } } // 再次从头到尾全部清空选择器中的选择键 Arrays.fill(keys, 0, keys.length, null); } // clear interrupt status Thread.interrupted(); // perform select // 执行 try { // 线程状态不是停止 if ((oldState & SHUTDOWN) != 0) { selectorLog.tracef("Beginning select on %s (shutdown in progress)", selector); // 立刻返回选择器的通道,无空闲则返回0 selector.selectNow(); } else if (delayTime == Long.MAX_VALUE) { selectorLog.tracef("Beginning select on %s", selector); // 轮询状态置为 true polling = true; try { Runnable item = null; synchronized (lock) { // 返回队列的头元素,不删除,队列为空则返回 null item = workQueue.peek(); } if (item != null) { log.tracef("SelectNow, queue is not empty"); selector.selectNow(); } else { log.tracef("Select, queue is empty"); selector.select(); } } finally { polling = false; } } else { final long millis = 1L + delayTime / 1000000L; selectorLog.tracef("Beginning select on %s (with timeout)", selector); polling = true; try { Runnable item = null; synchronized (lock) { // 返回队列的头元素,不删除,队列为空则返回 null item = workQueue.peek(); } if (item != null) { log.tracef("SelectNow, queue is not empty"); selector.selectNow(); } else { log.tracef("Select, queue is empty"); // 延迟对应毫秒数后再返回对应选择器通道 selector.select(millis); } } finally { polling = false; } } } catch (CancelledKeyException ignored) { // Mac and other buggy implementations sometimes spits these out selectorLog.trace("Spurious cancelled key exception"); } catch (IOException e) { selectorLog.selectionError(e); // hopefully transient; should never happen } selectorLog.tracef("Selected on %s", selector); // iterate the ready key set synchronized (selector) { selectedKeys = selector.selectedKeys(); synchronized (selectedKeys) { // copy so that handlers can safely cancel keys keys = selectedKeys.toArray(keys); // 置空选择器键 Arrays.fill(keys, selectedKeys.size(), keys.length, null); // 清空 selectedKeys.clear(); } } for (int i = 0; i < keys.length; i++) { final SelectionKey key = keys[i]; if (key == null) break; //end of list keys[i] = null; final int ops; try { // 获取兴趣事件 ops = key.interestOps(); if (ops != 0) { selectorLog.tracef("Selected key %s for %s", key, key.channel()); final NioHandle handle = (NioHandle) key.attachment(); // 对应处理器为空,直接取消 if (handle == null) { cancelKey(key, false); } else { // clear interrupt status // 测试当前线程是否被中断, 通过该方法清除线程的中断状态 Thread.interrupted(); selectorLog.tracef("Calling handleReady key %s for %s", key.readyOps(), key.channel()); handle.handleReady(key.readyOps()); } } } catch (CancelledKeyException ignored) { selectorLog.tracef("Skipping selection of cancelled key %s", key); } catch (Throwable t) { selectorLog.tracef(t, "Unexpected failure of selection of key %s", key); } } // all selected keys invoked; loop back to run tasks } } finally { log.tracef("Shutting down channel thread \"%s\"", this); // 关闭选择器 safeClose(selector); getWorker().closeResource(); } }
3.3 accept 线程
监听ChannelListener
实例化
监听器中实现我们的业务,进行数据的读写操作.
3.4 创建并开启服务
调用
XnioWorker#createStreamConnectionServer
,传入参数为监听地址,监听器,配置参数AcceptingChannel<StreamConnection> server = worker.createStreamConnectionServer(bindAddress, acceptListener, options);
调用
NioXnioWorker.createTcpConnectionServer
/** * 创建 tcp 连接服务器 * @param bindAddress the address to bind to * @param acceptListener the initial accept listener * @param optionMap the initial configuration for the server * @return * @throws IOException */ protected AcceptingChannel<StreamConnection> createTcpConnectionServer(final InetSocketAddress bindAddress, final ChannelListener<? super AcceptingChannel<StreamConnection>> acceptListener, final OptionMap optionMap) throws IOException { checkShutdown(); boolean ok = false; // 打开服务套接字通道: jdk nio final ServerSocketChannel channel = ServerSocketChannel.open(); try { if (optionMap.contains(Options.RECEIVE_BUFFER)) channel.socket().setReceiveBufferSize(optionMap.get(Options.RECEIVE_BUFFER, -1)); channel.socket().setReuseAddress(optionMap.get(Options.REUSE_ADDRESSES, true)); // 配置不阻塞 channel.configureBlocking(false); // 如果配置了积压请求参数 if (optionMap.contains(Options.BACKLOG)) { // 默认积压请求数为 128 channel.socket().bind(bindAddress, optionMap.get(Options.BACKLOG, 128)); } else { channel.socket().bind(bindAddress); } if (false) { final NioTcpServer server = new NioTcpServer(this, channel, optionMap, false); server.setAcceptListener(acceptListener); ok = true; return server; } else { // 默认走此分支: 监听存放在队列 final QueuedNioTcpServer2 server = new QueuedNioTcpServer2(new NioTcpServer(this, channel, optionMap, true)); // 设置接受请求的监听器 server.setAcceptListener(acceptListener); ok = true; return server; } } finally { // 如果不成功,关闭服务器套接字通道 if (! ok) { IoUtils.safeClose(channel); } } }
调用
QueuedNioTcpServer2构造器
QueuedNioTcpServer2(final NioTcpServer realServer) { super(realServer.getWorker()); this.realServer = realServer; final NioXnioWorker worker = realServer.getWorker(); final int cnt = worker.getIoThreadCount(); acceptQueues = new ArrayList<>(cnt); // 根据 io 线程数, 采用接受监听队列 for (int i = 0; i < cnt; i ++) { acceptQueues.add(new LinkedBlockingQueue<>()); } realServer.getCloseSetter().set(ignored -> invokeCloseHandler()); // NioTcpServer#accept() -> new NioSocketStreamConnection realServer.getAcceptSetter().set(ignored -> handleReady()); }
调用
NioTcpServer构造器
.
/**
* 构造器
* @param worker
* @param channel
* @param optionMap
* @param useAcceptThreadOnly
* @throws IOException
*/
NioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap, final boolean useAcceptThreadOnly) throws IOException {
super(worker);
this.channel = channel;
final WorkerThread[] threads;
final int threadCount;
/**
* 令牌
*/
final int tokens;
final int connections;
if (useAcceptThreadOnly) {
threads = new WorkerThread[] { worker.getAcceptThread() };
threadCount = 1;
tokens = 0;
connections = 0;
} else {
threads = worker.getAll();
threadCount = threads.length;
if (threadCount == 0) {
throw log.noThreads();
}
tokens = optionMap.get(Options.BALANCING_TOKENS, -1);
connections = optionMap.get(Options.BALANCING_CONNECTIONS, 16);
if (tokens != -1) {
if (tokens < 1 || tokens >= threadCount) {
throw log.balancingTokens();
}
if (connections < 1) {
throw log.balancingConnectionCount();
}
tokenConnectionCount = connections;
}
}
socket = channel.socket();
if (optionMap.contains(Options.SEND_BUFFER)) {
final int sendBufferSize = optionMap.get(Options.SEND_BUFFER, DEFAULT_BUFFER_SIZE);
if (sendBufferSize < 1) {
throw log.parameterOutOfRange("sendBufferSize");
}
sendBufferUpdater.set(this, sendBufferSize);
}
if (optionMap.contains(Options.KEEP_ALIVE)) {
keepAliveUpdater.lazySet(this, optionMap.get(Options.KEEP_ALIVE, false) ? 1 : 0);
}
if (optionMap.contains(Options.TCP_OOB_INLINE)) {
oobInlineUpdater.lazySet(this, optionMap.get(Options.TCP_OOB_INLINE, false) ? 1 : 0);
}
if (optionMap.contains(Options.TCP_NODELAY)) {
tcpNoDelayUpdater.lazySet(this, optionMap.get(Options.TCP_NODELAY, false) ? 1 : 0);
}
if (optionMap.contains(Options.READ_TIMEOUT)) {
readTimeoutUpdater.lazySet(this, optionMap.get(Options.READ_TIMEOUT, 0));
}
if (optionMap.contains(Options.WRITE_TIMEOUT)) {
writeTimeoutUpdater.lazySet(this, optionMap.get(Options.WRITE_TIMEOUT, 0));
}
int perThreadLow, perThreadLowRem;
int perThreadHigh, perThreadHighRem;
if (optionMap.contains(Options.CONNECTION_HIGH_WATER) || optionMap.contains(Options.CONNECTION_LOW_WATER)) {
final int highWater = optionMap.get(Options.CONNECTION_HIGH_WATER, Integer.MAX_VALUE);
final int lowWater = optionMap.get(Options.CONNECTION_LOW_WATER, highWater);
if (highWater <= 0) {
throw badHighWater();
}
if (lowWater <= 0 || lowWater > highWater) {
throw badLowWater(highWater);
}
final long highLowWater = (long) highWater << CONN_HIGH_BIT | (long) lowWater << CONN_LOW_BIT;
connectionStatusUpdater.lazySet(this, highLowWater);
perThreadLow = lowWater / threadCount;
perThreadLowRem = lowWater % threadCount;
perThreadHigh = highWater / threadCount;
perThreadHighRem = highWater % threadCount;
} else {
perThreadLow = Integer.MAX_VALUE;
perThreadLowRem = 0;
perThreadHigh = Integer.MAX_VALUE;
perThreadHighRem = 0;
connectionStatusUpdater.lazySet(this, CONN_LOW_MASK | CONN_HIGH_MASK);
}
// 设置处理器,根据线程数
final NioTcpServerHandle[] handles = new NioTcpServerHandle[threadCount];
for (int i = 0, length = threadCount; i < length; i++) {
final SelectionKey key = threads[i].registerChannel(channel);
handles[i] = new NioTcpServerHandle(this, key, threads[i], i < perThreadHighRem ? perThreadHigh + 1 : perThreadHigh, i < perThreadLowRem ? perThreadLow + 1 : perThreadLow);
key.attach(handles[i]);
}
this.handles = handles;
if (tokens > 0) {
for (int i = 0; i < threadCount; i ++) {
// 初始化令牌数
handles[i].initializeTokenCount(i < tokens ? connections : 0);
}
}
mbeanHandle = worker.registerServerMXBean(
new XnioServerMXBean() {
public String getProviderName() {
return "nio";
}
public String getWorkerName() {
return worker.getName();
}
public String getBindAddress() {
return String.valueOf(getLocalAddress());
}
public int getConnectionCount() {
final AtomicInteger counter = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(handles.length);
for (final NioTcpServerHandle handle : handles) {
handle.getWorkerThread().execute(() -> {
counter.getAndAdd(handle.getConnectionCount());
latch.countDown();
});
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return counter.get();
}
public int getConnectionLimitHighWater() {
return getHighWater(connectionStatus);
}
public int getConnectionLimitLowWater() {
return getLowWater(connectionStatus);
}
}
);
}
调用
AcceptingChannel<StreamConnection>#resumeAccepts
方法,最终调用的是NioTcpServer#resumeAccepts
.
此方法最终调用会注册
SelectionKey#OP_ACCEP
兴趣事件到选择器上.
public void resumeAccepts() {
this.resumed = true;
this.doResume(16);
}
调用
NioTcpServerHandle#resume
,再调用NioHandle#resume
,最终回到workerThread#setOps
,入参16即为SelectionKey#OP_ACCEPT=1 << 4
.
void resume() {
WorkerThread thread = this.getWorkerThread();
if (thread == Thread.currentThread()) {
if (!this.stopped && !this.backOff && this.server.resumed) {
super.resume(16);
}
} else {
thread.execute(new Runnable() {
public void run() {
NioTcpServerHandle.this.resume();
}
});
}
}
void resume(int ops) {
try {
if (!Bits.allAreSet(this.selectionKey.interestOps(), ops)) {
this.workerThread.setOps(this.selectionKey, ops);
}
} catch (CancelledKeyException var3) {
}
}
读写兴趣事件的注册:
NioTcpServer#accept
->new NioSocketStreamConnection()
->SelectionKey#attach
/**
* 接受请求
* @return
* @throws ClosedChannelException
*/
public NioSocketStreamConnection accept() throws ClosedChannelException {
final WorkerThread current = WorkerThread.getCurrent();
if (current == null) {
return null;
}
/**
* 服务器处理器
*/
final NioTcpServerHandle handle;
if (handles.length == 1) {
handle = handles[0];
} else {
// 根据线程号从处理器集中获取对应处理器
handle = handles[current.getNumber()];
}
if (! handle.getConnection()) {
return null;
}
final SocketChannel accepted;
boolean ok = false;
try {
// 接受请求通道
accepted = channel.accept();
if (accepted != null) try {
// 获取 hashCode
int hash = ThreadLocalRandom.current().nextInt();
// 配置不阻塞
accepted.configureBlocking(false);
final Socket socket = accepted.socket();
socket.setKeepAlive(keepAlive != 0);
socket.setOOBInline(oobInline != 0);
socket.setTcpNoDelay(tcpNoDelay != 0);
final int sendBuffer = this.sendBuffer;
if (sendBuffer > 0) socket.setSendBufferSize(sendBuffer);
// io 线程
final WorkerThread ioThread = worker.getIoThread(hash);
// 选择器键:将接受线程管道注册到选择器
final SelectionKey selectionKey = ioThread.registerChannel(accepted);
// 创建新连接: 流连接
final NioSocketStreamConnection newConnection = new NioSocketStreamConnection(ioThread, selectionKey, handle);
newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
ok = true;
// 重置回退时间
handle.resetBackOff();
return newConnection;
} finally {
if (! ok) safeClose(accepted);
}
} catch (ClosedChannelException e) {
throw e;
} catch (IOException e) {
// something went wrong with the accept
// it could be due to running out of file descriptors, or due to closed channel, or other things
handle.startBackOff();
log.acceptFailed(e, handle.getBackOffTime());
return null;
} finally {
if (! ok) {
handle.freeConnection();
}
}
// by contract, only a resume will do
return null;
}
/**
* 构造器
* @param workerThread
* @param key
* @param closedHandle
*/
NioSocketStreamConnection(final WorkerThread workerThread, final SelectionKey key, final ChannelClosed closedHandle) {
super(workerThread);
// 创建导管,继承了 NioHandle,提供了读写方法
conduit = new NioSocketConduit(workerThread, key, this);
key.attach(conduit);
this.closedHandle = closedHandle;
// 设置导管信息目的
setSinkConduit(conduit);
// 设置导管的信息源头
setSourceConduit(conduit);
}
/**
* 准备就绪
* @param ops
*/
void handleReady(int ops) {
try {
if (ops == 0) {
// the dreaded bug
final SelectionKey key = getSelectionKey();
// 兴趣事件
final int interestOps = key.interestOps();
if (interestOps != 0) {
ops = interestOps;
} else {
// urp
forceTermination();
return;
}
}
// 读兴趣事件
if (Bits.allAreSet(ops, SelectionKey.OP_READ)) try {
if (isReadShutdown()) suspendReads();
// 调用读就绪处理器
readReadyHandler.readReady();
} catch (Throwable ignored) {
}
// 写兴趣事件
if (Bits.allAreSet(ops, SelectionKey.OP_WRITE)) try {
if (isWriteShutdown()) suspendWrites();
// 调用写就绪处理器
writeReadyHandler.writeReady();
} catch (Throwable ignored) {
}
} catch (CancelledKeyException ignored) {}
}
读写操作通过导管
NioSocketConduit
实现
/**
* 构造器
* @param workerThread
* @param selectionKey
* @param connection
*/
NioSocketConduit(final WorkerThread workerThread, final SelectionKey selectionKey, final NioSocketStreamConnection connection) {
super(workerThread, selectionKey);
this.connection = connection;
this.socketChannel = (SocketChannel) selectionKey.channel();
}
public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
long res = target.transferFrom(socketChannel, position, count);
checkReadTimeout(res > 0L);
return res;
}
public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
return Conduits.transfer(this, count, throughBuffer, target);
}
public int read(final ByteBuffer dst) throws IOException {
int res;
try {
res = socketChannel.read(dst);
} catch (ClosedChannelException e) {
return -1;
}
if (res != -1) checkReadTimeout(res > 0);
else terminateReads();
return res;
}
public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException {
if (length == 1) {
return read(dsts[offset]);
}
long res;
try {
res = socketChannel.read(dsts, offset, length);
} catch (ClosedChannelException e) {
return -1L;
}
if (res != -1L) checkReadTimeout(res > 0L);
else terminateReads();
return res;
}
/**
* 传输
* @param src the file to read from
* @param position the position within the file from which the transfer is to begin
* @param count the number of bytes to be transferred
* @return
* @throws IOException
*/
public final long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
long res = src.transferTo(position, count, socketChannel);
checkWriteTimeout(res > 0L);
return res;
}
/**
* 传输
* @param source the source to read from
* @param count the number of bytes to be transferred
* @param throughBuffer the buffer to copy through.
* @return
* @throws IOException
*/
public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
return Conduits.transfer(source, count, throughBuffer, this);
}
public int write(final ByteBuffer src) throws IOException {
int res = socketChannel.write(src);
checkWriteTimeout(res > 0);
return res;
}
public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
if (length == 1) {
return write(srcs[offset]);
}
long res = socketChannel.write(srcs, offset, length);
checkWriteTimeout(res > 0L);
return res;
}