想要閱讀Netty源碼的同學,建議從GitHub上把源碼拉下來,方便寫注釋、Debug調試哦~點我去下載!
先來看一個簡單的Echo服務端程序,監聽本地的9999端口,有客戶端接入時控制臺輸出一句話,接收到客戶端的數據后直接原樣寫回。
public class EchoServer {
// 綁定的端口
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) {
// 啟動Echo服務
new EchoServer(9999).start();
}
public void start() {
/*
bossGroup負責客戶端的接入
workerGroup負責IO數據的讀寫
*/
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
new ServerBootstrap()
.group(boss, worker)
.channel(NIOServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("有新的客戶端連接...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*
原樣寫回給客戶端,因為OutBoundHandler還要使用,因此不能釋放msg。
底層數據寫完后會自動釋放。
*/
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出現異常了
cause.printStackTrace();
ctx.channel().close();
}
});
}
})
.bind(port);
}
}
代碼還是很簡單的,接下來會一步步分析,僅僅幾行代碼,Netty到底做了什么!
NioEventLoopGroup源碼分析
Netty程序要想成功運行,需要EventLoopGroup進行驅動,ServerBootstrap.bind()會將ServerSocketChannel綁定到本地端口,這樣服務端就可以接收客戶端的連接了,但是在這之前,必須確保設置了EventLoopGroup,ServerBootstrap調用bind()前會進行檢查,方法是validate(),源碼如下:
/**
* 驗證必要的參數
*/
public B validate() {
if (group == null) {//EventLoopGroup必須設置,依賴它驅動程序
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {//依賴channelFactory創建ServerSocketChannel對象
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
先來看看類的繼承關系:
NioEventLoopGroup實現了ScheduledExecutorService,說明它不僅可以執行異步任務,還可以執行定時任務。實現Iterable接口,是因為EventLoopGroup管理著一組EventLoop,需要對其進行迭代遍歷。
MultithreadEventExecutorGroup代表它是一個多線程的事件執行器,而它管理的EventLoop就是個單線程的事件執行器。
先來看構造函數,它的構造函數非常多,我們直接看參數最全的一個:
/**
* @param nThreads 線程數量,就是NioEventLoop的數量,默認CPU核心數*2
* @param executor NioEventLoop.run()的執行者,默認為ThreadPerTaskExecutor,NioEventLoop將利用它來啟動一個FastThreadLocalThread并執行
* @param chooserFactory 選擇器工廠,默認DefaultEventExecutorChooserFactory,輪詢選擇NioEventLoop
* @param selectorProvider 多路復用器提供者,DefaultSelectorProvider.create()
* @param selectStrategyFactory select策略工廠,指示EventLoop應該要做什么事情
* @param rejectedExecutionHandler 拒絕策略
* @param taskQueueFactory 任務隊列工廠,默認PlatformDependent.newMpscQueue(),Netty實現的高性能無鎖隊列
*/
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler,
final EventLoopTaskQueueFactory taskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory);
}
- nThreads:線程數,意味著Group需要創建多少個EventLoop,默認是CPU核心數*2。
- executor:NioEventLoop.run()的執行者,默認為ThreadPerTaskExecutor,NioEventLoop將利用它來啟動一個FastThreadLocalThread并執行。
- chooserFactory:選擇器工廠,默認DefaultEventExecutorChooserFactory,輪詢選擇NioEventLoop。
- selectorProvider:多路復用器提供者,DefaultSelectorProvider.create(),根據平臺會提供對應實現。
- selectStrategyFactory:select策略工廠,指示EventLoop應該要做什么事情。
- rejectedExecutionHandler:拒絕策略。
- taskQueueFactory:任務隊列工廠,默認PlatformDependent.newMpscQueue(),Netty實現的高性能無鎖隊列。
NioEventLoopGroup會把參數傳給父類構造器MultithreadEventLoopGroup,這里會對nThreads進行初始化設置:
/**
* 參數太多,以后也可能會改變,后面的參數直接用Object...接收了
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
Object... args) {
// 如果nThreads=0,則默認為CPU核心數*2
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
再次調用父類構造器,核心初始化流程在
MultithreadEventExecutorGroup中:
/*
創建一個多線程的事件執行器組
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 確保線程數大于0
checkPositive(nThreads, "nThreads");
/*
如果沒提供Executor,則創建默認的ThreadPerTaskExecutor。
ThreadPerTaskExecutor依賴于一個ThreadFactory,靠它創建線程來執行任務。
默認的ThreadFactory會使用FastThreadLocalThread來提升FastThreadLocal的性能。
*/
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 創建子EventExecutor
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// EventExecutor創建失敗,停機釋放資源
for (int j = 0; j < i; j++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
/*
創建選擇器:簡單輪詢
PowerOfTwoEventExecutorChooser:2的冪次方,位運算
GenericEventExecutorChooser:否則,取余
有事件/任務要執行時,取出一個EventExecutor
*/
chooser = chooserFactory.newChooser(children);
// 所有children停止時收到一個通知,優雅停機時用到
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e : children) {
e.terminationFuture().addListener(terminationListener);
}
// 返回一個只讀的children,iterator()迭代時使用
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
EventLoopGroup會管理EventLoop,EventLoop執行任務需要依賴Executor,Executor執行任務需要依賴ThreadFactory創建新的線程,我們看下Netty默認的Executor實現。
默認的ThreadFactory,會創建FastThreadLocalThread線程,來優化FastThreadLocal的性能,關于FastThreadLocal后面會有專門的文章介紹。
// 創建一個默認的線程工廠
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
/*
默認的線程工廠
*/
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolId = new AtomicInteger();
// 生成線程名稱時用到:prefix+nextId自增
private final AtomicInteger nextId = new AtomicInteger();
private final String prefix;//前綴
private final boolean daemon;//是否守護線程,默認false
private final int priority;//優先級 默認5
protected final ThreadGroup threadGroup;//所屬線程組
// 省略部分代碼......
@Override
public Thread newThread(Runnable r) {
// 創建一個FastThreadLocalThread線程,優化FastThreadLocal的性能
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
有了ThreadFactory,Executor的實現就很簡單了,當要執行任務的時候,創建一個新線程去跑就好了。EventLoop會在第一次execute()時調用該方法,整個生命周期只會調用一次,即每個EventLoop只會創建一個線程,后續所有的任務,都是在run()方法里無限輪詢去執行。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
/*
執行任務時,利用ThreadFactory創建一個新線程去跑。
EventLoop會在第一次execute()時調用該方法,整個生命周期只會調用一次,
即每個EventLoop只會創建一個線程,后續所有的任務,都是在run()方法里無限輪詢去執行。
*/
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
有了Executor,接下來就會調用newChild()進行children的初始化,對于NioEventLoopGroup來說,它管理的孩子是NioEventLoop,所以newChild()會創建NioEventLoop:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// EventLoop需要一個TaskQueue來存放待執行的任務,這里判斷是否有指定QueueFactory,沒有則使用默認的
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// 創建NioEventLoop
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
EventLoopGroup本身不干活,向它提交任務,它只會交給它的孩子EventLoop執行,所以它依賴一個EventExecutorChooser,當有任務來臨時,從眾多的孩子中挑選出一個,默認的選擇策略就是簡單輪詢。
Netty這里做了一個小小的優化,如果孩子數量是2的冪次方數會使用位運算,否則取模。源碼如下:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
// 單例模式,通過INSTANCE提供一個單例對象
private DefaultEventExecutorChooserFactory() { }
/*
創建一個選擇器,從一組EventExecutor中挑選出一個。
Netty默認的選擇策略就是:簡單輪詢。
*/
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 兩種Chooser實現都有一個AtomicLong計數器,每次next()先自增再取余
// 如果數量是2的冪次方數,則采用位運算
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
// 否則,對長度進行取余
return new GenericEventExecutorChooser(executors);
}
}
// 是否是2的冪次方數
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
/*
二進制中,最高位是符號位,0正數、1負數。剩下的就是這個數的絕對值部分。
原碼:設置符號位,其他0填充
反碼:正數的反碼與原碼相同,負數的反碼:除符號位外,其他位取反
補碼:正數的補碼與原碼相同,負數的補碼:除符號位外,其他位取反,然后在最后一位加1(計算機使用補碼)
如下舉例:
5:00000000 00000000 00000000 00000101(原碼)
5:00000000 00000000 00000000 00000101(反碼)
5:00000000 00000000 00000000 00000101(補碼)
-5:10000000 00000000 00000000 00000101(原碼)
-5:11111111 11111111 11111111 11111010(反碼)
-5:11111111 11111111 11111111 11111011(補碼)
5 & -5 = 00000000 00000000 00000000 00000001 = 1 不是2的冪次方數
8 & -8 = 00000000 00000000 00000000 00001000
& 11111111 11111111 11111111 11111000
= 00000000 00000000 00000000 00001000 = 8 是2的冪次方數
*/
}
// 2的冪次方數的選擇器,位運算
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
// 計數器自增 & 長度-1,和HashMap一樣
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
// 普通的選擇器,取余
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
有了選擇器后,你向EventLoopGroup提交的任務,包括注冊Channel,它都會輪詢出一個EventLoop轉交任務,源碼如下:
@Override
public ChannelFuture register(Channel channel) {
// 選出一個孩子,讓它去執行
return next().register(channel);
}
EventLoopGroup還有一個方法特別有用,那就是shutdownGracefully()優雅停機,調用后它會停止接受新的任務,并把隊列中等待執行的任務(包括定時任務)處理完(Netty不保證100%處理完),然后釋放資源。由于EventLoopGroup本身不干活,因此它依然停止所有的EventLoop,,源碼如下:
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
// 將孩子按個停機
l.shutdownGracefully(quietPeriod, timeout, unit);
}
// 返回一個終止Future,停機后收到通知
return terminationFuture();
}
NioEventLoopGroup差不多就這樣,比較簡單,它只是負責管理EventLoop,核心還是在EventLoop上。
NioEventLoop源碼分析
NioEventLoopGroup在創建時,會根據線程數初始化NioEventLoop。NioEventLoop可以看作是一個單線程的線程池,也是真正干活的角色,它的繼承關系如下:
NioEventLoop的主要職責是負責處理注冊到其上的Channel的IO事件,除此之外它還可以執行用戶提交的系統任務和定時任務,例如:你可以每隔一段時間檢查一下連接是否斷開,如果斷開,客戶端可以重連,服務端需要及時釋放資源。
一個Channel只能被注冊到一個EventLoop上,一個EventLoop可以注冊多個Channel。一旦Channel注冊到EventLoop,該EventLoop就要負責處理它整個生命周期的所有事件。事件以回調的方式被觸發,所有的回調任務會被封裝成一個Runnable放入taskQueue,由EventLoop線程串行化處理。雖然看似「串行化處理」效率低下,但是這避免了線程切換的開銷和數據同步的問題,而且你可以開啟多個EventLoop,并行處理,充分利用CPU資源。
先看屬性,如下:
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
// 是否禁用SelectionKey優化?默認為false
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
// Selector重建的閾值,默認512,目的是解決JDK Selector空輪詢Bug
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
// 當前有幾個準備就緒的Channel?selectStrategy會用到,大于0代表有Channel事件需要處理
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
再看構造函數,源碼如下:
/**
* 創建一個NioEventLoop實例,用來執行注冊在其上的Channel事件
* @param parent 所屬Group
* @param executor
* @param selectorProvider 多路復用器提供者,不同平臺會使用不同實現
* @param strategy Selector.select()的策略
* @param rejectedExecutionHandler 拒絕策略
* @param queueFactory 任務隊列工廠
*/
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
/*
每個EventLoop都會有一個Selector,用來監聽注冊在其上的Channel事件。
對于BossGroup,處理的是Accept。
對于WorkerGroup,處理的是read、write...
SelectorTuple:Selector元組,Netty提供了一個Selector包裝,用來優化select()性能
*/
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrAppedSelector = selectorTuple.unwrappedSelector;
}
- parent:EventLoop隸屬的EventLoopGroup,由Group來管理和調度。
- executor:EventLoop需要executor開啟新線程跑自身的run()方法。
- selectorProvider:多路復用器提供者,不同平臺會使用不同實現。
- strategy:select策略工廠,指示EventLoop應該要做什么事情。
- rejectedExecutionHandler:拒絕策略。
- queueFactory:任務隊列工廠,負責創建taskQueue。
NioEventLoop首先創建了兩個TaskQueue來存放待執行的任務,run()方法會不斷消費任務。雖然可以多線程并發的往taskQueue中提交任務,但是由于EventLoop是單線程的,所有taskQueue的生產消費模型是:多生產者單消費者。針對這種消費場景,Netty實現了高性能的無鎖隊列「MpscQueue」,Queue的創建源碼如下:
// 創建TaskQueue,存放待執行的任務
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
// 默認創建Netty實現MpscQueue:Netty實現的高性能無鎖隊列,適用于多個生產者,單個消費者。
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
/*
根據最大隊列數創建Queue。
MpscQueue:Netty實現的高性能無鎖隊列,適用于多個生產者,單個消費者。
多個線程可以并發往EventLoop提交任務,但是EventLoop本身是單線程消費的。
*/
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
關于MpscQueue,后面會專門寫文章介紹。
創建完taskQueue就是調用父類構造器進行相應的賦值操作了,這里略過,下面主要看openSelector()。
每個NioEventLoop被創建時,都會同時創建一個Selector多路復用器,這是JDK提供的,不熟悉的同學去看看JAVA Nio編程。EventLoopGroup會將Channel注冊到NioEventLoop上,實際上就是注冊到Selector上了。這樣NioEventLoop就可以通過Selector來監聽準備就緒的Channel,然后根據事件類型去觸發相應的回調,所以Selector是NioEventLoop的核心。
openSelector()會做一個優化,將JDK的SelectorImpl的selectedKeys、publicSelectedKeys屬性由HashSet替換成Netty的SelectedSelectionKeySet,內部是一個數組。當Selector監聽到有準備就緒的Channel時,會往HashSet里添加SelectionKey,當SelectionKey比較多時,就容易發生哈希沖突,時間復雜度會增加,而SelectedSelectionKeySet內部使用數組來保存,避免了哈希沖突,性能會有一定的提升。
/*
打開一個Selector多路復用器
*/
private SelectorTuple openSelector() {
final Selector unwrappedSelector;//未包裝的原生Selector
try {
// 基于SelectorProvider打開一個原生的Selector,這是JDK提供的。
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 如果禁用了SelectionKey優化,則unwrappedSelector和selector都指向原生Selector
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 否則,使用SelectedSelectionKeySet來優化SelectionKey
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 反射獲取SelectorImpl的selectedKeys、publicSelectedKeys屬性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
/*
通過反射將SelectorImpl的selectedKeys、publicSelectedKeys替換為selectedKeySet來提升性能。
*/
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 創建一個SelectorTuple,包含一個原生的Selector,和優化過的Selector。
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
SelectedSelectionKeySet部分源碼如下:
/**
* Selector的 Set<SelectionKey> selectedKeys
* 默認用HashSet存儲,當有Channel準備就緒時,會添加到HashSet中,但如果發生沖突,HashSet的時間復雜度是O(n)鏈表/O(log n)紅黑樹
* Netty通過反射將selectedKeys、publicSelectedKeys替換成SelectedSelectionKeySet
* 使用數組來避免哈希沖突
*/
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
// 使用數組來保存,默認長度1024
SelectionKey[] keys;
int size;//keys大小
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
// 省略部分代碼.......
}
創建完Selector,NioEventLoop的初始化就完成了,但此時線程并未啟動,Netty這里做了懶加載處理,只有當EventLoop第一次被調用execute()執行任務時才會通過executor去創建線程跑run()方法。
用戶不主動提交任務的前提下,對于BossGroup的EventLoop來說,線程是在調用bind()方法將ServerSocketChannel注冊到EventLoop時被啟動的。對于WorkerGroup的EventLoop來說,線程是在BossGroup接收到客戶端連接時,將SocketChannel注冊到WorkerGroup時被啟動的。
不管是ServerSocketChannel.bind()還是接收到客戶端連接,都是要將Channel注冊到EventLoop,再由EventLoop去輪詢處理事件。register()源碼如下:
// 注冊Channel
@Override
public ChannelFuture register(Channel channel) {
// 創建一個DefaultChannelPromise,再注冊,目的是讓用戶可以在注冊完成時收到通知
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 轉交給Channel.Unsafe完成
promise.channel().unsafe().register(this, promise);
return promise;
}
這里需要說下Channel.Unsafe接口,對于bind()、write()、read()等這類方法,由于需要和底層API交互,Netty對開發者屏蔽了底層實現,不希望由開發者調用這類方法,于是將它們封裝到Channel.Unsafe中,從名字中也能看出來,這些操作是不安全的,開發者盡量不要去自己調用。
register()操作的目的其實就是將JDK的SocketChannel注冊到Selector多路復用器上,由于需要和底層API交互,于是轉交給Channel.Unsafe處理,源碼在
io.netty.channel.AbstractChannel.AbstractUnsafe#register(),如下所示:
/*
將Channel注冊到EventLoop,其實就是調用JDK底層的:SocketChannel.register(selector)。
將Channel注冊到多路復用器。
*/
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
// 重復注冊校驗
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// 檢查是否兼容,Channel和EventLoop模式不能混用,例如Oio和Nio不兼容
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
/*
當前線程是否是EventLoop線程?
如果是就直接執行,否則提交一個任務,后面串行化執行。
*/
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
register()做了一些檢查,然后確保由EventLoop來執行注冊操作,前面說過了,EventLoop會負責處理Channel的所有事件。register0()完成注冊,并觸發相應的事件回調,通過Pipeline傳播出去。
private void register0(ChannelPromise promise) {
try {
// 確保Channel是打開狀態
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// JDK原生channel.register(selector)
doRegister();
neverRegistered = false;
registered = true;
// 觸發 ChannelHandler.handlerAdded()回調
pipeline.invokeHandlerAddedIfNeeded();
// 通知promise操作成功了,觸發回調
safeSetSuccess(promise);
// 注冊完成,觸發ChannelRegistered回調,通過pipeline傳播出去
pipeline.fireChannelRegistered();
// 如果連接激活了,則觸發active事件,只在首次注冊時會觸發
if (isActive()) {
if (firstRegistration) {
// 觸發ChannelRegistered回調,通過pipeline傳播出去
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// 異常了,關閉資源,觸發失敗通知
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
doRegister()會調用JDK底層的注冊,源碼如下:
// 真正調用JDK底層API完成注冊
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 獲取Java原生SocketChannel注冊到未包裝的原生Selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
完成SocketChannel的注冊后,EventLoop就可以通過輪詢Selector來監聽準備就緒的Channel了,后面就是一系列的事件處理了。
在調用
io.netty.channel.AbstractChannel.AbstractUnsafe#register()時,EventLoop線程已經啟動并執行run()方法,在run()方法里,EventLoop線程會執行一個死循環,直到線程被停止。
在死循環里,EventLoop線程會不斷輪詢Selector是否有準備就緒的Channel需要處理?taskQueue是否有任務在等待執行?scheduledTaskQueue是否有定時任務需要執行?NioEventLoop.run()是任務處理的關鍵。
@Override
protected void run() {
/*
無效空輪詢的次數
JDK的Selector存在Bug,會導致空輪詢,CPU飆升。
Netty會檢測Selector.select()空輪詢次數,超過SELECTOR_AUTO_REBUILD_THRESHOLD則重建Selector。
有效輪詢:要么有IO事件到達、要么執行了Task。
*/
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
/*
NioEventLoop的執行策略:
有任務待執行嗎?
沒有:Selector.select()阻塞,等待IO事件到達(定時任務判斷)
有:非阻塞調用Selector.selectNow(),
*/
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:// 重試IO循環
continue;
case SelectStrategy.BUSY_WAIT:// NIO不支持忙等,走SELECT
case SelectStrategy.SELECT: // 隊列中沒有任務要執行
// 下一個要執行的定時任務截止時間
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;//沒有定時任務
}
nextWakeupNanos.set(curDeadlineNanos);
try {
/*
如果沒有任務要執行,則在下一個任務要執行前,阻塞等待IO事件。
沒有定時任務,則等待超時為Long.MAX_VALUE,無限等待
*/
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;//無效輪詢次數+1,后面會判斷是否重置
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
// 優先處理所有的IO事件后再去處理Task
try {
if (strategy > 0) {// 代表有準備就緒的Channel待處理
processSelectedKeys();
}
} finally {
// 處理完IO事件后,執行所有Task
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
// 先處理IO事件,并記錄所花的時間
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 根據ioTime和ioRatio,計算處理Task能分配的時間
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
/*
有待執行的任務,且Selector.selectNow()返回0,沒有IO事件需要處理,那就先執行少量的Task。
每64個任務檢查一次超時,如果有足夠的任務,那么最少執行64個。
所以,不應該提交耗時任務,阻塞IO線程!!!
*/
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
// 如果執行了任務或者有IO事件,說明這次輪詢是有效的,重置selectCnt
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // 意外喚醒時,是否需要重置selectCnt,解決Selector空輪詢Bug
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// 不管正常/異常停止,都要關閉,釋放資源。
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
SelectStrategy是一個選擇策略,其實就是告訴EventLoop線程需要做什么事。
- SELECT:表示當前沒有任務要執行,應該阻塞在Selector.select()上等待就緒的Channel。
- CONTINUE:重試IO循環。
- BUSY_WAIT:忙等,Nio不支持,會走SELECT邏輯。
- 大于0:代表有準備就緒的Channel需要處理。
NioEventLoop在沒有Channel事件,又沒有taskQueue任務時,會調用
nextScheduledTaskDeadlineNanos()計算距離下一次要執行的定時任務還有多長時間,在這之前,它會調用Selector.select(curDeadlineNanos)阻塞等待Channel事件(5微妙內不會阻塞),源碼如下:
// 在下一個定時任務要執行前,等待IO事件
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
// 沒有定時任務,直接阻塞
return selector.select();
}
// 如果截止時間在5微秒內,超時將為0
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
被喚醒,要么是因為有Channel事件了,要么是超時了需要執行定時任務了,開始走下面的邏輯。
ioRatio代表EventLoop執行IO事件和Task的時間比例,100代表優先執行完所有的IO事件再執行系統任務,否則會根據這個比例去調整執行Task所消耗的時間。
processSelectedKeys()會挨個處理準備就緒的Channel事件,前面說過,Netty默認會使用數組代替HashSet優化SelectionKey,這里會進行判斷:
/*
處理SelectionKey,分為優化后的處理,和普通處理
優化:HashSet<SelectionKey> --> SelectionKey[]
*/
private void processSelectedKeys() {
if (selectedKeys != null) {
// 說明Netty將HashSet優化為數組了,可以高效處理
processSelectedKeysOptimized();
} else {
// 沒優化過,普通處理
processSelectedKeysPlain(selector.selectedKeys());
}
}
不論如何,最終都會遍歷selectedKeys,挨個處理,源碼如下:
// 處理SelectionKey事件
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {//有效性檢查,Channel、Selector可能已經被關閉
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
// 準備就緒的事件標志位
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 連接就緒
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// 數據可寫
ch.unsafe().forceFlush();
}
// 數據可讀、有新的連接接入
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 對于ServerSocketChannel只關心OP_ACCEPT事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
針對不同的就緒事件,會調用Channel.Unsafe對應的方法。
對于OP_CONNECT事件,會調用unsafe.finishConnect()方法,它主要就是判斷連接是否激活,如果激活則觸發ChannelActive回調,并通過Pipeline傳播出去。
對于OP_WRITE事件,會調用ch.unsafe().forceFlush()方法,這里的ch是指客戶端Channel,它會將ChannelOutboundBuffer緩沖的數據轉換成JDK的ByteBuffer并調用底層API通過SocketChannel響應給客戶端。
對于OP_ACCEPT事件,ServerSocketChannel會調用
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read()方法來接收客戶端連接:
/*
NioEventLoop.processSelectedKey() 當Channel有 OP_READ | OP_ACCEPT 事件時調用該方法。
對于服務端Channel來說,就是 OP_ACCEPT.
*/
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 接收對端數據時,ByteBuf的分配策略,基于歷史數據動態調整初始化大小,避免太大浪費空間,太小又會頻繁擴容
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
/*
對于ServerSocketChannel來說,就是接收一個客戶端Channel,添加到readBuf
*/
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 遞增已讀取的消息數量
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 通過pipeline傳播ChannelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// 讀取完畢的回調,有的Handle會根據本次讀取的總字節數,自適應調整下次應該分配的緩沖區大小
allocHandle.readComplete();
// 通過pipeline傳播ChannelReadComplete事件
pipeline.fireChannelReadComplete();
if (exception != null) {// 事件處理異常了
// 是否需要關閉連接
closed = closeonReadError(exception);
// 通過pipeline傳播異常事件
pipeline.fireExceptionCaught(exception);
}
if (closed) {//如果需要關閉,那就關閉
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
主要是看doReadMessages()方法,Netty會調用accept()獲取到一個JDK原生SocketChannel,并把它包裝成Netty的NioSocketChannel:
/*
對于服務端Channel來說,處理 OP_ACCEPT 事件就是從Channel中接收一個客戶端Channel。
*/
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 獲取客戶端Channel,調用的就是JDK原生方法:serverSocketChannel.accept()
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 將原生SocketChannel包裝成Netty的NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
接收到客戶端的連接,并把它封裝成NioSocketChannel,隨后會觸發channelRead回調,在
ServerBootstrapAcceptor.ServerBootstrapAcceptor中,會把客戶端Channel注冊到WorkerGroup中,由WorkerGroup去完成后續的IO讀寫事件,BossGroup只負責連接的建立,這就是經典的Reactor線程模型。
同樣對于OP_ACCEPT事件,SocketChannel會調用
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()來接收對端發送的數據:
/*
客戶端發送數據時觸發。
見 io.netty.channel.nio.NioEventLoop.processSelectedKey
*/
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 分配一個ByteBuf,大小能容納可讀數據,又不過于浪費空間。
byteBuf = allocHandle.allocate(allocator);
/*
doReadBytes(byteBuf):ByteBuf內部有ByteBuffer,底層還是調用了SocketChannel.read(ByteBuffer)
allocHandle.lastBytesRead()根據讀取到的實際字節數,自適應調整下次分配的緩沖區大小。
*/
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// 沒數據可讀了.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 遞增已經讀取的消息數量
allocHandle.incMessagesRead(1);
readPending = false;
// 通過pipeline傳播ChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());//判斷是否需要繼續讀
// 讀取完畢,pipeline傳播ChannelReadComplete事件
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
大體邏輯和ServerSocketChannel類似,只是接收到的數據不再是SocketChannel,而是ByteBuf。底層還是調用了JDK原生的SocketChannel.read(ByteBuffer),再將ByteBuffer轉換成Netty的ByteBuf。
數據接收到后,繼續通過Pipeline傳播ChannelRead和ChannelReadComplete回調。
到這里,基本就把EventLoop說的差不多了,整體工作流程已經了解了。細節的地方如:ByteBuf是如何動態分配的,ByteBuf是如何寫出到SocketChannel的等等,這些后面專門寫文章講吧,不然這篇文章太長了。
ServerBootstrap源碼分析
前面分別講了NioEventLoopGroup和NioEventLoop單獨的工作流程,還沒有把整個完整的流程給串起來。作為服務端啟動的引導類,ServerBootstrap是服務端整個啟動流程的入口,核心方法 bind() 會調用initAndRegister()創建一個ServerSocketChannel,并把它注冊到BossGroup的EventLoop的 Selector 上,這樣BossGroup就可以處理連接事件了。但此時是不會有連接事件的,因為還沒有綁定到本地端口,客戶端無法建立連接。
注冊完后,ServerBootstrap隨后會調用doBind0()將ServerSocketChannel綁定到本地端口,至此服務端啟動完成,耐心等待Channel事件即可。
/*
創建一個ServerSocketChannel,并綁定到本地端口
*/
public ChannelFuture bind(SocketAddress localAddress) {
// 數據驗證,group/channelFactory不能為null
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
/*
1.反射創建ServerSocketChannel
2.ServerSocketChannel的初始化,創建Pipeline、設置Options、Attrs。
3.將ServerSocketChannel注冊到EventLoop
此時,EventLoop可以開始輪詢Accept事件了,但是由于還未bind本地端口,所以不會有事件發生。
*/
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
// 如果異常了,直接返回
return regFuture;
}
if (regFuture.isDone()) {
// Register成功了,則開始綁定端口
ChannelPromise promise = channel.newPromise();
/*
將Channel綁定到本地端口,底層還是調用了JDK原生的channel.bind()。
由于bind()是一個出站事件,需要通過Pipeline傳播,所以會轉交給Pipeline執行:pipeline.bind(localAddress, promise)。
最終會傳播到DefaultChannelPipeline的HeadContext.bind(),它又會轉交給Channel.Unsafe.bind()。
Channel.Unsafe.bind()最終會調用JDK原生的javaChannel().bind(),詳見:io.netty.channel.socket.nio.NioServerSocketChannel.doBind()
綁定成功后,會觸發promise的回調
*/
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 因為是異步的,防止Register還沒完成,通過注冊回調來綁定。
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
先看initAndRegister(),初始化ServerSocketChannel并注冊到BossGroup:
// 初始化和注冊
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/*
channelFactory根據Channel.class反射創建實例
服務端:ServerSocketChannel
客戶端:SocketChannel
*/
channel = channelFactory.newChannel();
/*
初始化Channel:服務端和客戶端
1.設置ChannelPipeline
2.設置options
3.設置attrs
*/
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 將Channel注冊到EventLoop,從Group中輪詢出一個EventLoop
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
init()會初始化Channel,分為服務端和客戶端兩種??蛻舳薈hannel初始化很簡單,就是設置Pipeline、Options、Attrs,這里就不貼代碼了。服務端復雜一些,除了設置自身的Pipeline、Options、Attrs,還要負責初始化客戶端接入的Channel,并把它注冊到WorkerGroup:
// 服務端Channel初始化
@Override
void init(Channel channel) {// 這里的channel是ServerSocketChannel
// 設置options
setChannelOptions(channel, newOptionsArray(), logger);
// 設置attrs
setAttributes(channel, newAttributesArray());
// 初始化ServerSocketChannel的ChannelPipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
// 和ServerSocketChannel建立連接的客戶端SocketChannel需要設置的options和attrs
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
/*
往服務端Channel添加Handler:
1.封裝HandlerAdded回調任務,保存在PendingHandlerCallback
2.后續的register()操作會觸發回調:pipeline.invokeHandlerAddedIfNeeded();
*/
p.addLast(new ChannelInitializer<Channel>() {
/*
initChannel()何時被調用?
ChannelHandler被添加到Pipeline有一個對應的回調:handlerAdded()
addLast()會提交一個任務,讓EventLoop來觸發這個回調
ChannelInitializer在handlerAdded()回調里會執行該初始化方法。
*/
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();//ServerBootstrap.handler()設置的
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// ServerBootstrapAcceptor是服務端接收客戶端連接的核心
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
服務端Channel在初始化Pipeline的時候會添加一個ServerBootstrapAcceptor,它是服務端接收客戶端連接的核心。
先看屬性,它保留了客戶端連接時創建Channel的必要信息:
private final EventLoopGroup childGroup;// Reactor模型中的WorkerGroup
private final ChannelHandler childHandler;// 客戶端Channel的ChannelHandler
private final Entry<ChannelOption<?>, Object>[] childOptions;// 客戶端Channel的Options
private final Entry<AttributeKey<?>, Object>[] childAttrs;// 客戶端Channel的Attrs
private final Runnable enableAutoReadTask; // 啟用自動讀取的任務
構造函數就不貼代碼了,都是屬性賦值操作。
需要重點關注的方法是channelRead(),前面已經分析過了,BossGroup監聽到有客戶端接入時會觸發該回調:
/*
有客戶端連接時,觸發.
見 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read()
*/
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;// 這里的Channel是SocketChannel
// 設置客戶端Channel的Pipeline、Options、Attrs
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
/*
將客戶端Channel注冊到WorkerGroup:
1.next()輪詢出一個EventLoop.register()
2.Channel.Unsafe.register(),Channel注冊到Selector
3.觸發各種回調
Channel一旦注冊到EventLoop,就由該EventLoop負責處理它整個生命周期的所有事件。
*/
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 如果注冊失敗,強制關閉連接
if (!future.isSuccess()) {
// 底層就是調用原生JDK的關閉方法:javaChannel().close();
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
這里處理的是客戶端的接入,設置Options、Attrs、Pipeline,并注冊到WorkerGroup,后續的所有讀寫事件交給WorkerGroup處理。
在doBind0()沒調用之前,所有的這一切都不會發生,所以最后只要看一下Netty是如何將ServerSocketChannel綁定到本地端口的,整個流程就全部分析結束了。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 往Channel綁定的EventLoop提交一個綁定任務,轉交給Channel去執行
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
由于bind()是一個出站事件,所以會轉交給Pipeline執行,需要它把事件傳播出去。
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
Pipeline會從TailContext開始傳播,TailContext會往后尋找能處理bind事件的ChannelHandler:
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
/*
TailContext會往后尋找能處理bind事件的ChannelHandler。
因為是出站事件,所以調用findContextOutbound()
*/
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {// 讓EventLoop線程串行化處理
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
如果用戶沒有重寫bind()回調的話,TailContext會把事件傳播給HeadContext,由于bind操作需要和底層API交互,HeadContext會將操作轉交給Channel.Unsafe執行,所以最終會調用
io.netty.channel.AbstractChannel.AbstractUnsafe#bind(),源碼如下:
/*
將ServerSocketChannel綁定到本地端口,如何被觸發的?
1.Bootstrap.bind()會往Channel注冊的EventLoop提交一個任務:Channel.bind()
2.由于bind()是一個出站事件,需要被Pipeline傳播出去,于是會被轉交給Pipeline執行:Pipeline.bind()
3.bind()事件從TailContext開始傳播,不出意外會傳播到HeadContext。
4.HeadContext會再將bind()任務轉交給Channel.Unsafe執行,于是被觸發。
總結:Channel.bind()會將事件通過Pipeline進行傳播,從TailContext到HeadContext。
*/
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();//確保是EventLoop線程執行
// promise標記為不可取消 確保Channel是Open狀態,如果close了就無法bind了
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.iswindows() && !PlatformDependent.maybeSuperUser()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();//連接是否活躍
try {
/*
真正的綁定操作,子類實現。
看NioServerSocketChannel實現,就是調用了JDK原生的javaChannel().bind();
*/
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
// 連接處于活躍狀態,觸發Active回調,往EventLoop提交一個任務,通過Pipeline傳播出去。
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
doBind()又會調回到
NioServerSocketChannel.doBind(),其實就是調用JDK原生的ServerSocketChannel.bind(localAddress , backlog),源碼如下:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 獲取JDK的ServerSocketChannel.bind()
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
綁定成功后就可以正常處理客戶端的接入了,之后客戶端Channel都會由WorkerGroup驅動IO的讀寫。
總結
這篇文章分析了Netty服務端啟動的全流程,從ServerSocketChannel的創建到綁定端口,再到BossGroup驅動客戶端連接的接入和WorkerGroup驅動數據的讀寫。
還重點分析了NioEventLoopGroup和NioEventLoop的工作模式,認真讀完,相信你會對Netty整體的工作機制有所了解。
數據接收ByteBuf的分配,數據write的底層細節沒有介紹到,包括Netty對高性能所作的努力也還沒有過多介紹,考慮到篇幅原因,后面會專門再開一篇文章。
寫到這里就結束了,此時此刻,我的電腦編輯器已經非常卡了,艱難的敲下這段文字后,是時候說再見了?。?!






