亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

想要閱讀Netty源碼的同學,建議從GitHub上把源碼拉下來,方便寫注釋、Debug調試哦~點我去下載!

 

先來看一個簡單的Echo服務端程序,監聽本地的9999端口,有客戶端接入時控制臺輸出一句話,接收到客戶端的數據后直接原樣寫回。

public class EchoServer {
	// 綁定的端口
	private final int port;

	public EchoServer(int port) {
		this.port = port;
	}

	public static void main(String[] args) {
		// 啟動Echo服務
		new EchoServer(9999).start();
	}

	public void start() {
		/*
		bossGroup負責客戶端的接入
		workerGroup負責IO數據的讀寫
		 */
		NioEventLoopGroup boss = new NioEventLoopGroup(1);
		NioEventLoopGroup worker = new NioEventLoopGroup();
		new ServerBootstrap()
				.group(boss, worker)
				.channel(NIOServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel sc) throws Exception {
						sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){

							@Override
							public void channelActive(ChannelHandlerContext ctx) throws Exception {
								super.channelActive(ctx);
								System.out.println("有新的客戶端連接...");
							}

							@Override
							public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
								/*
								原樣寫回給客戶端,因為OutBoundHandler還要使用,因此不能釋放msg。
								底層數據寫完后會自動釋放。
								 */
								ctx.writeAndFlush(msg);
							}

							@Override
							public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
								// 出現異常了
								cause.printStackTrace();
								ctx.channel().close();
							}
						});
					}
				})
				.bind(port);
	}
}

代碼還是很簡單的,接下來會一步步分析,僅僅幾行代碼,Netty到底做了什么!

 

NioEventLoopGroup源碼分析

Netty程序要想成功運行,需要EventLoopGroup進行驅動,ServerBootstrap.bind()會將ServerSocketChannel綁定到本地端口,這樣服務端就可以接收客戶端的連接了,但是在這之前,必須確保設置了EventLoopGroup,ServerBootstrap調用bind()前會進行檢查,方法是validate(),源碼如下:

/**
* 驗證必要的參數
*/
public B validate() {
    if (group == null) {//EventLoopGroup必須設置,依賴它驅動程序
        throw new IllegalStateException("group not set");
    }
    if (channelFactory == null) {//依賴channelFactory創建ServerSocketChannel對象
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return self();
}

先來看看類的繼承關系:

Netty服務端啟動全流程源碼分析

 

NioEventLoopGroup實現了ScheduledExecutorService,說明它不僅可以執行異步任務,還可以執行定時任務。實現Iterable接口,是因為EventLoopGroup管理著一組EventLoop,需要對其進行迭代遍歷。
MultithreadEventExecutorGroup代表它是一個多線程的事件執行器,而它管理的EventLoop就是個單線程的事件執行器。

 

先來看構造函數,它的構造函數非常多,我們直接看參數最全的一個:

/**
 * @param nThreads 線程數量,就是NioEventLoop的數量,默認CPU核心數*2
 * @param executor NioEventLoop.run()的執行者,默認為ThreadPerTaskExecutor,NioEventLoop將利用它來啟動一個FastThreadLocalThread并執行
 * @param chooserFactory 選擇器工廠,默認DefaultEventExecutorChooserFactory,輪詢選擇NioEventLoop
 * @param selectorProvider 多路復用器提供者,DefaultSelectorProvider.create()
 * @param selectStrategyFactory select策略工廠,指示EventLoop應該要做什么事情
 * @param rejectedExecutionHandler 拒絕策略
 * @param taskQueueFactory 任務隊列工廠,默認PlatformDependent.newMpscQueue(),Netty實現的高性能無鎖隊列
 */
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler,
                         final EventLoopTaskQueueFactory taskQueueFactory) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
            rejectedExecutionHandler, taskQueueFactory);
}
  • nThreads:線程數,意味著Group需要創建多少個EventLoop,默認是CPU核心數*2。
  • executor:NioEventLoop.run()的執行者,默認為ThreadPerTaskExecutor,NioEventLoop將利用它來啟動一個FastThreadLocalThread并執行。
  • chooserFactory:選擇器工廠,默認DefaultEventExecutorChooserFactory,輪詢選擇NioEventLoop。
  • selectorProvider:多路復用器提供者,DefaultSelectorProvider.create(),根據平臺會提供對應實現。
  • selectStrategyFactory:select策略工廠,指示EventLoop應該要做什么事情。
  • rejectedExecutionHandler:拒絕策略。
  • taskQueueFactory:任務隊列工廠,默認PlatformDependent.newMpscQueue(),Netty實現的高性能無鎖隊列。

 

NioEventLoopGroup會把參數傳給父類構造器MultithreadEventLoopGroup,這里會對nThreads進行初始化設置:

/**
* 參數太多,以后也可能會改變,后面的參數直接用Object...接收了
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                    Object... args) {
    // 如果nThreads=0,則默認為CPU核心數*2
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}

再次調用父類構造器,核心初始化流程在
MultithreadEventExecutorGroup中:

/*
創建一個多線程的事件執行器組
 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    // 確保線程數大于0
    checkPositive(nThreads, "nThreads");

    /*
    如果沒提供Executor,則創建默認的ThreadPerTaskExecutor。
    ThreadPerTaskExecutor依賴于一個ThreadFactory,靠它創建線程來執行任務。
    默認的ThreadFactory會使用FastThreadLocalThread來提升FastThreadLocal的性能。
     */
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 創建子EventExecutor
    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // EventExecutor創建失敗,停機釋放資源
                for (int j = 0; j < i; j++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    /*
    創建選擇器:簡單輪詢
        PowerOfTwoEventExecutorChooser:2的冪次方,位運算
        GenericEventExecutorChooser:否則,取余
    有事件/任務要執行時,取出一個EventExecutor
     */
    chooser = chooserFactory.newChooser(children);

    // 所有children停止時收到一個通知,優雅停機時用到
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e : children) {
        e.terminationFuture().addListener(terminationListener);
    }

    // 返回一個只讀的children,iterator()迭代時使用
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

EventLoopGroup會管理EventLoop,EventLoop執行任務需要依賴Executor,Executor執行任務需要依賴ThreadFactory創建新的線程,我們看下Netty默認的Executor實現。

 

默認的ThreadFactory,會創建FastThreadLocalThread線程,來優化FastThreadLocal的性能,關于FastThreadLocal后面會有專門的文章介紹。

// 創建一個默認的線程工廠
protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass());
}

/*
默認的線程工廠
 */
public class DefaultThreadFactory implements ThreadFactory {

    private static final AtomicInteger poolId = new AtomicInteger();

    // 生成線程名稱時用到:prefix+nextId自增
    private final AtomicInteger nextId = new AtomicInteger();
    private final String prefix;//前綴
    private final boolean daemon;//是否守護線程,默認false
    private final int priority;//優先級 默認5
    protected final ThreadGroup threadGroup;//所屬線程組
	
    // 省略部分代碼......
    
    @Override
    public Thread newThread(Runnable r) {
        // 創建一個FastThreadLocalThread線程,優化FastThreadLocal的性能
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }

            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }

    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
}

有了ThreadFactory,Executor的實現就很簡單了,當要執行任務的時候,創建一個新線程去跑就好了。EventLoop會在第一次execute()時調用該方法,整個生命周期只會調用一次,即每個EventLoop只會創建一個線程,后續所有的任務,都是在run()方法里無限輪詢去執行。

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    /*
    執行任務時,利用ThreadFactory創建一個新線程去跑。
    EventLoop會在第一次execute()時調用該方法,整個生命周期只會調用一次,
    即每個EventLoop只會創建一個線程,后續所有的任務,都是在run()方法里無限輪詢去執行。
     */
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

有了Executor,接下來就會調用newChild()進行children的初始化,對于NioEventLoopGroup來說,它管理的孩子是NioEventLoop,所以newChild()會創建NioEventLoop:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    // EventLoop需要一個TaskQueue來存放待執行的任務,這里判斷是否有指定QueueFactory,沒有則使用默認的
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    
    // 創建NioEventLoop
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

EventLoopGroup本身不干活,向它提交任務,它只會交給它的孩子EventLoop執行,所以它依賴一個EventExecutorChooser,當有任務來臨時,從眾多的孩子中挑選出一個,默認的選擇策略就是簡單輪詢。

Netty這里做了一個小小的優化,如果孩子數量是2的冪次方數會使用位運算,否則取模。源碼如下:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    // 單例模式,通過INSTANCE提供一個單例對象
    private DefaultEventExecutorChooserFactory() { }

    /*
    創建一個選擇器,從一組EventExecutor中挑選出一個。
    Netty默認的選擇策略就是:簡單輪詢。
     */
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        // 兩種Chooser實現都有一個AtomicLong計數器,每次next()先自增再取余

        // 如果數量是2的冪次方數,則采用位運算
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            // 否則,對長度進行取余
            return new GenericEventExecutorChooser(executors);
        }
    }

    // 是否是2的冪次方數
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
        /*
        二進制中,最高位是符號位,0正數、1負數。剩下的就是這個數的絕對值部分。
        原碼:設置符號位,其他0填充
        反碼:正數的反碼與原碼相同,負數的反碼:除符號位外,其他位取反
        補碼:正數的補碼與原碼相同,負數的補碼:除符號位外,其他位取反,然后在最后一位加1(計算機使用補碼)

        如下舉例:
            5:00000000 00000000 00000000 00000101(原碼)
            5:00000000 00000000 00000000 00000101(反碼)
            5:00000000 00000000 00000000 00000101(補碼)

           -5:10000000 00000000 00000000 00000101(原碼)
           -5:11111111 11111111 11111111 11111010(反碼)
           -5:11111111 11111111 11111111 11111011(補碼)

           5 & -5 = 00000000 00000000 00000000 00000001 = 1 不是2的冪次方數
           8 & -8 = 00000000 00000000 00000000 00001000
                  & 11111111 11111111 11111111 11111000
                  = 00000000 00000000 00000000 00001000 = 8 是2的冪次方數
         */
    }

    // 2的冪次方數的選擇器,位運算
    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            // 計數器自增 & 長度-1,和HashMap一樣
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    // 普通的選擇器,取余
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicLong idx = new AtomicLong();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

有了選擇器后,你向EventLoopGroup提交的任務,包括注冊Channel,它都會輪詢出一個EventLoop轉交任務,源碼如下:

@Override
public ChannelFuture register(Channel channel) {
    // 選出一個孩子,讓它去執行
    return next().register(channel);
}

EventLoopGroup還有一個方法特別有用,那就是shutdownGracefully()優雅停機,調用后它會停止接受新的任務,并把隊列中等待執行的任務(包括定時任務)處理完(Netty不保證100%處理完),然后釋放資源。由于EventLoopGroup本身不干活,因此它依然停止所有的EventLoop,,源碼如下:

public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    for (EventExecutor l: children) {
        // 將孩子按個停機
        l.shutdownGracefully(quietPeriod, timeout, unit);
    }
    // 返回一個終止Future,停機后收到通知
    return terminationFuture();
}

NioEventLoopGroup差不多就這樣,比較簡單,它只是負責管理EventLoop,核心還是在EventLoop上。

 

NioEventLoop源碼分析

NioEventLoopGroup在創建時,會根據線程數初始化NioEventLoop。NioEventLoop可以看作是一個單線程的線程池,也是真正干活的角色,它的繼承關系如下:

Netty服務端啟動全流程源碼分析

 

NioEventLoop的主要職責是負責處理注冊到其上的Channel的IO事件,除此之外它還可以執行用戶提交的系統任務和定時任務,例如:你可以每隔一段時間檢查一下連接是否斷開,如果斷開,客戶端可以重連,服務端需要及時釋放資源。

 

一個Channel只能被注冊到一個EventLoop上,一個EventLoop可以注冊多個Channel。一旦Channel注冊到EventLoop,該EventLoop就要負責處理它整個生命周期的所有事件。事件以回調的方式被觸發,所有的回調任務會被封裝成一個Runnable放入taskQueue,由EventLoop線程串行化處理。雖然看似「串行化處理」效率低下,但是這避免了線程切換的開銷和數據同步的問題,而且你可以開啟多個EventLoop,并行處理,充分利用CPU資源。

 

先看屬性,如下:

private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.

// 是否禁用SelectionKey優化?默認為false
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
    SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;

// Selector重建的閾值,默認512,目的是解決JDK Selector空輪詢Bug
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;

// 當前有幾個準備就緒的Channel?selectStrategy會用到,大于0代表有Channel事件需要處理
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

再看構造函數,源碼如下:

/**
 * 創建一個NioEventLoop實例,用來執行注冊在其上的Channel事件
 * @param parent 所屬Group
 * @param executor
 * @param selectorProvider 多路復用器提供者,不同平臺會使用不同實現
 * @param strategy Selector.select()的策略
 * @param rejectedExecutionHandler 拒絕策略
 * @param queueFactory 任務隊列工廠
 */
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    /*
    每個EventLoop都會有一個Selector,用來監聽注冊在其上的Channel事件。
    對于BossGroup,處理的是Accept。
    對于WorkerGroup,處理的是read、write...
    SelectorTuple:Selector元組,Netty提供了一個Selector包裝,用來優化select()性能
     */
    final SelectorTuple selectorTuple = openSelector();
    this.selector = selectorTuple.selector;
    this.unwrAppedSelector = selectorTuple.unwrappedSelector;
}
  • parent:EventLoop隸屬的EventLoopGroup,由Group來管理和調度。
  • executor:EventLoop需要executor開啟新線程跑自身的run()方法。
  • selectorProvider:多路復用器提供者,不同平臺會使用不同實現。
  • strategy:select策略工廠,指示EventLoop應該要做什么事情。
  • rejectedExecutionHandler:拒絕策略。
  • queueFactory:任務隊列工廠,負責創建taskQueue。

 

NioEventLoop首先創建了兩個TaskQueue來存放待執行的任務,run()方法會不斷消費任務。雖然可以多線程并發的往taskQueue中提交任務,但是由于EventLoop是單線程的,所有taskQueue的生產消費模型是:多生產者單消費者。針對這種消費場景,Netty實現了高性能的無鎖隊列「MpscQueue」,Queue的創建源碼如下:

// 創建TaskQueue,存放待執行的任務
private static Queue<Runnable> newTaskQueue(
    EventLoopTaskQueueFactory queueFactory) {
    if (queueFactory == null) {
        // 默認創建Netty實現MpscQueue:Netty實現的高性能無鎖隊列,適用于多個生產者,單個消費者。
        return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
    }
    return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}

/*
根據最大隊列數創建Queue。
MpscQueue:Netty實現的高性能無鎖隊列,適用于多個生產者,單個消費者。
多個線程可以并發往EventLoop提交任務,但是EventLoop本身是單線程消費的。
*/
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
        : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

關于MpscQueue,后面會專門寫文章介紹。

 

創建完taskQueue就是調用父類構造器進行相應的賦值操作了,這里略過,下面主要看openSelector()。

每個NioEventLoop被創建時,都會同時創建一個Selector多路復用器,這是JDK提供的,不熟悉的同學去看看JAVA Nio編程。EventLoopGroup會將Channel注冊到NioEventLoop上,實際上就是注冊到Selector上了。這樣NioEventLoop就可以通過Selector來監聽準備就緒的Channel,然后根據事件類型去觸發相應的回調,所以Selector是NioEventLoop的核心。

 

openSelector()會做一個優化,將JDK的SelectorImpl的selectedKeys、publicSelectedKeys屬性由HashSet替換成Netty的SelectedSelectionKeySet,內部是一個數組。當Selector監聽到有準備就緒的Channel時,會往HashSet里添加SelectionKey,當SelectionKey比較多時,就容易發生哈希沖突,時間復雜度會增加,而SelectedSelectionKeySet內部使用數組來保存,避免了哈希沖突,性能會有一定的提升。

/*
打開一個Selector多路復用器
 */
private SelectorTuple openSelector() {
    final Selector unwrappedSelector;//未包裝的原生Selector
    try {
        // 基于SelectorProvider打開一個原生的Selector,這是JDK提供的。
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    // 如果禁用了SelectionKey優化,則unwrappedSelector和selector都指向原生Selector
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    // 否則,使用SelectedSelectionKeySet來優化SelectionKey

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });

    if (!(maybeSelectorImplClass instanceof Class) ||
        // ensure the current selector implementation is what we can instrument.
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
        if (maybeSelectorImplClass instanceof Throwable) {
            Throwable t = (Throwable) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
        }
        return new SelectorTuple(unwrappedSelector);
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 反射獲取SelectorImpl的selectedKeys、publicSelectedKeys屬性
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                    // This allows us to also do this in Java9+ without any extra flags.
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    long publicSelectedKeysFieldOffset =
                            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        PlatformDependent.putObject(
                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(
                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                    // We could not retrieve the offset, lets try reflection as last-resort.
                }

                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if (cause != null) {
                    return cause;
                }
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                }

                /*
                通過反射將SelectorImpl的selectedKeys、publicSelectedKeys替換為selectedKeySet來提升性能。
                 */
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
        return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
    // 創建一個SelectorTuple,包含一個原生的Selector,和優化過的Selector。
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

SelectedSelectionKeySet部分源碼如下:

/**
 * Selector的 Set<SelectionKey> selectedKeys
 * 默認用HashSet存儲,當有Channel準備就緒時,會添加到HashSet中,但如果發生沖突,HashSet的時間復雜度是O(n)鏈表/O(log n)紅黑樹
 * Netty通過反射將selectedKeys、publicSelectedKeys替換成SelectedSelectionKeySet
 * 使用數組來避免哈希沖突
 */
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    // 使用數組來保存,默認長度1024
    SelectionKey[] keys;
    int size;//keys大小

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }
    // 省略部分代碼.......
}

創建完Selector,NioEventLoop的初始化就完成了,但此時線程并未啟動,Netty這里做了懶加載處理,只有當EventLoop第一次被調用execute()執行任務時才會通過executor去創建線程跑run()方法。

 

用戶不主動提交任務的前提下,對于BossGroup的EventLoop來說,線程是在調用bind()方法將ServerSocketChannel注冊到EventLoop時被啟動的。對于WorkerGroup的EventLoop來說,線程是在BossGroup接收到客戶端連接時,將SocketChannel注冊到WorkerGroup時被啟動的。

 

不管是ServerSocketChannel.bind()還是接收到客戶端連接,都是要將Channel注冊到EventLoop,再由EventLoop去輪詢處理事件。register()源碼如下:

// 注冊Channel
@Override
public ChannelFuture register(Channel channel) {
    // 創建一個DefaultChannelPromise,再注冊,目的是讓用戶可以在注冊完成時收到通知
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    // 轉交給Channel.Unsafe完成
    promise.channel().unsafe().register(this, promise);
    return promise;
}

這里需要說下Channel.Unsafe接口,對于bind()、write()、read()等這類方法,由于需要和底層API交互,Netty對開發者屏蔽了底層實現,不希望由開發者調用這類方法,于是將它們封裝到Channel.Unsafe中,從名字中也能看出來,這些操作是不安全的,開發者盡量不要去自己調用。

 

register()操作的目的其實就是將JDK的SocketChannel注冊到Selector多路復用器上,由于需要和底層API交互,于是轉交給Channel.Unsafe處理,源碼在
io.netty.channel.AbstractChannel.AbstractUnsafe#register(),如下所示:

/*
將Channel注冊到EventLoop,其實就是調用JDK底層的:SocketChannel.register(selector)。
將Channel注冊到多路復用器。
 */
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    // 重復注冊校驗
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    // 檢查是否兼容,Channel和EventLoop模式不能混用,例如Oio和Nio不兼容
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;
    /*
    當前線程是否是EventLoop線程?
    如果是就直接執行,否則提交一個任務,后面串行化執行。
     */
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

register()做了一些檢查,然后確保由EventLoop來執行注冊操作,前面說過了,EventLoop會負責處理Channel的所有事件。register0()完成注冊,并觸發相應的事件回調,通過Pipeline傳播出去。

private void register0(ChannelPromise promise) {
    try {
        // 確保Channel是打開狀態
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // JDK原生channel.register(selector)
        doRegister();
        neverRegistered = false;
        registered = true;

        // 觸發 ChannelHandler.handlerAdded()回調
        pipeline.invokeHandlerAddedIfNeeded();

        // 通知promise操作成功了,觸發回調
        safeSetSuccess(promise);

        // 注冊完成,觸發ChannelRegistered回調,通過pipeline傳播出去
        pipeline.fireChannelRegistered();

        // 如果連接激活了,則觸發active事件,只在首次注冊時會觸發
        if (isActive()) {
            if (firstRegistration) {
                // 觸發ChannelRegistered回調,通過pipeline傳播出去
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        // 異常了,關閉資源,觸發失敗通知
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

doRegister()會調用JDK底層的注冊,源碼如下:

// 真正調用JDK底層API完成注冊
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 獲取Java原生SocketChannel注冊到未包裝的原生Selector上
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

完成SocketChannel的注冊后,EventLoop就可以通過輪詢Selector來監聽準備就緒的Channel了,后面就是一系列的事件處理了。

 

在調用
io.netty.channel.AbstractChannel.AbstractUnsafe#register()時,EventLoop線程已經啟動并執行run()方法,在run()方法里,EventLoop線程會執行一個死循環,直到線程被停止。

 

在死循環里,EventLoop線程會不斷輪詢Selector是否有準備就緒的Channel需要處理?taskQueue是否有任務在等待執行?scheduledTaskQueue是否有定時任務需要執行?NioEventLoop.run()是任務處理的關鍵。

@Override
protected void run() {
    /*
    無效空輪詢的次數
    JDK的Selector存在Bug,會導致空輪詢,CPU飆升。
    Netty會檢測Selector.select()空輪詢次數,超過SELECTOR_AUTO_REBUILD_THRESHOLD則重建Selector。
    有效輪詢:要么有IO事件到達、要么執行了Task。
     */
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                /*
                NioEventLoop的執行策略:
                有任務待執行嗎?
                    沒有:Selector.select()阻塞,等待IO事件到達(定時任務判斷)
                    有:非阻塞調用Selector.selectNow(),
                 */
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:// 重試IO循環
                    continue;

                case SelectStrategy.BUSY_WAIT:// NIO不支持忙等,走SELECT

                case SelectStrategy.SELECT: // 隊列中沒有任務要執行
                    // 下一個要執行的定時任務截止時間
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE;//沒有定時任務
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        /*
                        如果沒有任務要執行,則在下一個任務要執行前,阻塞等待IO事件。
                        沒有定時任務,則等待超時為Long.MAX_VALUE,無限等待
                         */
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;//無效輪詢次數+1,后面會判斷是否重置
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                // 優先處理所有的IO事件后再去處理Task
                try {
                    if (strategy > 0) {// 代表有準備就緒的Channel待處理
                        processSelectedKeys();
                    }
                } finally {
                    // 處理完IO事件后,執行所有Task
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                // 先處理IO事件,并記錄所花的時間
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // 根據ioTime和ioRatio,計算處理Task能分配的時間
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                /*
                有待執行的任務,且Selector.selectNow()返回0,沒有IO事件需要處理,那就先執行少量的Task。
                每64個任務檢查一次超時,如果有足夠的任務,那么最少執行64個。
                所以,不應該提交耗時任務,阻塞IO線程!!!
                 */
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {
                // 如果執行了任務或者有IO事件,說明這次輪詢是有效的,重置selectCnt
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // 意外喚醒時,是否需要重置selectCnt,解決Selector空輪詢Bug
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Error e) {
            throw (Error) e;
        } catch (Throwable t) {
            handleLoopException(t);
        } finally {
            // 不管正常/異常停止,都要關閉,釋放資源。
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Error e) {
                throw (Error) e;
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

SelectStrategy是一個選擇策略,其實就是告訴EventLoop線程需要做什么事。

  • SELECT:表示當前沒有任務要執行,應該阻塞在Selector.select()上等待就緒的Channel。
  • CONTINUE:重試IO循環。
  • BUSY_WAIT:忙等,Nio不支持,會走SELECT邏輯。
  • 大于0:代表有準備就緒的Channel需要處理。

 

NioEventLoop在沒有Channel事件,又沒有taskQueue任務時,會調用
nextScheduledTaskDeadlineNanos()計算距離下一次要執行的定時任務還有多長時間,在這之前,它會調用Selector.select(curDeadlineNanos)阻塞等待Channel事件(5微妙內不會阻塞),源碼如下:

// 在下一個定時任務要執行前,等待IO事件
private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        // 沒有定時任務,直接阻塞
        return selector.select();
    }
    // 如果截止時間在5微秒內,超時將為0
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

被喚醒,要么是因為有Channel事件了,要么是超時了需要執行定時任務了,開始走下面的邏輯。

 

ioRatio代表EventLoop執行IO事件和Task的時間比例,100代表優先執行完所有的IO事件再執行系統任務,否則會根據這個比例去調整執行Task所消耗的時間。

 

processSelectedKeys()會挨個處理準備就緒的Channel事件,前面說過,Netty默認會使用數組代替HashSet優化SelectionKey,這里會進行判斷:

/*
處理SelectionKey,分為優化后的處理,和普通處理
優化:HashSet<SelectionKey>  --> SelectionKey[]
 */
private void processSelectedKeys() {
    if (selectedKeys != null) {
        // 說明Netty將HashSet優化為數組了,可以高效處理
        processSelectedKeysOptimized();
    } else {
        // 沒優化過,普通處理
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

不論如何,最終都會遍歷selectedKeys,挨個處理,源碼如下:

// 處理SelectionKey事件
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {//有效性檢查,Channel、Selector可能已經被關閉
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop == this) {
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        // 準備就緒的事件標志位
        int readyOps = k.readyOps();
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            // 連接就緒
            unsafe.finishConnect();
        }

        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // 數據可寫
            ch.unsafe().forceFlush();
        }

        // 數據可讀、有新的連接接入
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 對于ServerSocketChannel只關心OP_ACCEPT事件
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

針對不同的就緒事件,會調用Channel.Unsafe對應的方法。

 

對于OP_CONNECT事件,會調用unsafe.finishConnect()方法,它主要就是判斷連接是否激活,如果激活則觸發ChannelActive回調,并通過Pipeline傳播出去。

 

對于OP_WRITE事件,會調用ch.unsafe().forceFlush()方法,這里的ch是指客戶端Channel,它會將ChannelOutboundBuffer緩沖的數據轉換成JDK的ByteBuffer并調用底層API通過SocketChannel響應給客戶端。

 

對于OP_ACCEPT事件,ServerSocketChannel會調用
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read()方法來接收客戶端連接:

/*
NioEventLoop.processSelectedKey() 當Channel有 OP_READ | OP_ACCEPT 事件時調用該方法。
對于服務端Channel來說,就是 OP_ACCEPT.
 */
@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // 接收對端數據時,ByteBuf的分配策略,基于歷史數據動態調整初始化大小,避免太大浪費空間,太小又會頻繁擴容
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                /*
                對于ServerSocketChannel來說,就是接收一個客戶端Channel,添加到readBuf
                 */
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                // 遞增已讀取的消息數量
                allocHandle.incMessagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 通過pipeline傳播ChannelRead事件
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        // 讀取完畢的回調,有的Handle會根據本次讀取的總字節數,自適應調整下次應該分配的緩沖區大小
        allocHandle.readComplete();
        // 通過pipeline傳播ChannelReadComplete事件
        pipeline.fireChannelReadComplete();

        if (exception != null) {// 事件處理異常了
            // 是否需要關閉連接
            closed = closeonReadError(exception);

            // 通過pipeline傳播異常事件
            pipeline.fireExceptionCaught(exception);
        }

        if (closed) {//如果需要關閉,那就關閉
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

主要是看doReadMessages()方法,Netty會調用accept()獲取到一個JDK原生SocketChannel,并把它包裝成Netty的NioSocketChannel:

/*
對于服務端Channel來說,處理 OP_ACCEPT 事件就是從Channel中接收一個客戶端Channel。
 */
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // 獲取客戶端Channel,調用的就是JDK原生方法:serverSocketChannel.accept()
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // 將原生SocketChannel包裝成Netty的NioSocketChannel
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
    return 0;
}

接收到客戶端的連接,并把它封裝成NioSocketChannel,隨后會觸發channelRead回調,在
ServerBootstrapAcceptor.ServerBootstrapAcceptor中,會把客戶端Channel注冊到WorkerGroup中,由WorkerGroup去完成后續的IO讀寫事件,BossGroup只負責連接的建立,這就是經典的Reactor線程模型。

 

同樣對于OP_ACCEPT事件,SocketChannel會調用
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()來接收對端發送的數據:

/*
客戶端發送數據時觸發。
見 io.netty.channel.nio.NioEventLoop.processSelectedKey
 */
@Override
public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 分配一個ByteBuf,大小能容納可讀數據,又不過于浪費空間。
            byteBuf = allocHandle.allocate(allocator);
            /*
            doReadBytes(byteBuf):ByteBuf內部有ByteBuffer,底層還是調用了SocketChannel.read(ByteBuffer)
            allocHandle.lastBytesRead()根據讀取到的實際字節數,自適應調整下次分配的緩沖區大小。
             */
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // 沒數據可讀了.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            // 遞增已經讀取的消息數量
            allocHandle.incMessagesRead(1);
            readPending = false;
            // 通過pipeline傳播ChannelRead事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());//判斷是否需要繼續讀

        // 讀取完畢,pipeline傳播ChannelReadComplete事件
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
}

大體邏輯和ServerSocketChannel類似,只是接收到的數據不再是SocketChannel,而是ByteBuf。底層還是調用了JDK原生的SocketChannel.read(ByteBuffer),再將ByteBuffer轉換成Netty的ByteBuf。

 

數據接收到后,繼續通過Pipeline傳播ChannelRead和ChannelReadComplete回調。

 

到這里,基本就把EventLoop說的差不多了,整體工作流程已經了解了。細節的地方如:ByteBuf是如何動態分配的,ByteBuf是如何寫出到SocketChannel的等等,這些后面專門寫文章講吧,不然這篇文章太長了。

 

ServerBootstrap源碼分析

 

前面分別講了NioEventLoopGroup和NioEventLoop單獨的工作流程,還沒有把整個完整的流程給串起來。作為服務端啟動的引導類,ServerBootstrap是服務端整個啟動流程的入口,核心方法 bind() 會調用initAndRegister()創建一個ServerSocketChannel,并把它注冊到BossGroup的EventLoop的 Selector 上,這樣BossGroup就可以處理連接事件了。但此時是不會有連接事件的,因為還沒有綁定到本地端口,客戶端無法建立連接。

注冊完后,ServerBootstrap隨后會調用doBind0()將ServerSocketChannel綁定到本地端口,至此服務端啟動完成,耐心等待Channel事件即可。

/*
創建一個ServerSocketChannel,并綁定到本地端口
 */
public ChannelFuture bind(SocketAddress localAddress) {
    // 數據驗證,group/channelFactory不能為null
    validate();
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    /*
    1.反射創建ServerSocketChannel
    2.ServerSocketChannel的初始化,創建Pipeline、設置Options、Attrs。
    3.將ServerSocketChannel注冊到EventLoop
    此時,EventLoop可以開始輪詢Accept事件了,但是由于還未bind本地端口,所以不會有事件發生。
     */
    final ChannelFuture regFuture = initAndRegister();

    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        // 如果異常了,直接返回
        return regFuture;
    }

    if (regFuture.isDone()) {
        // Register成功了,則開始綁定端口
        ChannelPromise promise = channel.newPromise();
        /*
        將Channel綁定到本地端口,底層還是調用了JDK原生的channel.bind()。
        由于bind()是一個出站事件,需要通過Pipeline傳播,所以會轉交給Pipeline執行:pipeline.bind(localAddress, promise)。
        最終會傳播到DefaultChannelPipeline的HeadContext.bind(),它又會轉交給Channel.Unsafe.bind()。
        Channel.Unsafe.bind()最終會調用JDK原生的javaChannel().bind(),詳見:io.netty.channel.socket.nio.NioServerSocketChannel.doBind()
        綁定成功后,會觸發promise的回調
         */
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 因為是異步的,防止Register還沒完成,通過注冊回調來綁定。
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

先看initAndRegister(),初始化ServerSocketChannel并注冊到BossGroup:

// 初始化和注冊
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        /*
        channelFactory根據Channel.class反射創建實例
            服務端:ServerSocketChannel
            客戶端:SocketChannel
         */
        channel = channelFactory.newChannel();

        /*
        初始化Channel:服務端和客戶端
            1.設置ChannelPipeline
            2.設置options
            3.設置attrs
         */
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 將Channel注冊到EventLoop,從Group中輪詢出一個EventLoop
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

init()會初始化Channel,分為服務端和客戶端兩種??蛻舳薈hannel初始化很簡單,就是設置Pipeline、Options、Attrs,這里就不貼代碼了。服務端復雜一些,除了設置自身的Pipeline、Options、Attrs,還要負責初始化客戶端接入的Channel,并把它注冊到WorkerGroup:

// 服務端Channel初始化
@Override
void init(Channel channel) {// 這里的channel是ServerSocketChannel
    // 設置options
    setChannelOptions(channel, newOptionsArray(), logger);
    // 設置attrs
    setAttributes(channel, newAttributesArray());

    // 初始化ServerSocketChannel的ChannelPipeline
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    // 和ServerSocketChannel建立連接的客戶端SocketChannel需要設置的options和attrs
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    /*
    往服務端Channel添加Handler:
        1.封裝HandlerAdded回調任務,保存在PendingHandlerCallback
        2.后續的register()操作會觸發回調:pipeline.invokeHandlerAddedIfNeeded();
     */
    p.addLast(new ChannelInitializer<Channel>() {
        /*
        initChannel()何時被調用?
            ChannelHandler被添加到Pipeline有一個對應的回調:handlerAdded()
            addLast()會提交一個任務,讓EventLoop來觸發這個回調
            ChannelInitializer在handlerAdded()回調里會執行該初始化方法。
         */
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();//ServerBootstrap.handler()設置的
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // ServerBootstrapAcceptor是服務端接收客戶端連接的核心
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

服務端Channel在初始化Pipeline的時候會添加一個ServerBootstrapAcceptor,它是服務端接收客戶端連接的核心。

 

先看屬性,它保留了客戶端連接時創建Channel的必要信息:

private final EventLoopGroup childGroup;// Reactor模型中的WorkerGroup
private final ChannelHandler childHandler;// 客戶端Channel的ChannelHandler
private final Entry<ChannelOption<?>, Object>[] childOptions;// 客戶端Channel的Options
private final Entry<AttributeKey<?>, Object>[] childAttrs;// 客戶端Channel的Attrs
private final Runnable enableAutoReadTask; // 啟用自動讀取的任務

構造函數就不貼代碼了,都是屬性賦值操作。

 

需要重點關注的方法是channelRead(),前面已經分析過了,BossGroup監聽到有客戶端接入時會觸發該回調:

/*
有客戶端連接時,觸發.
見 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read()
 */
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;// 這里的Channel是SocketChannel

    // 設置客戶端Channel的Pipeline、Options、Attrs
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        /*
        將客戶端Channel注冊到WorkerGroup:
            1.next()輪詢出一個EventLoop.register()
            2.Channel.Unsafe.register(),Channel注冊到Selector
            3.觸發各種回調
        Channel一旦注冊到EventLoop,就由該EventLoop負責處理它整個生命周期的所有事件。
         */
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 如果注冊失敗,強制關閉連接
                if (!future.isSuccess()) {
                    // 底層就是調用原生JDK的關閉方法:javaChannel().close();
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

這里處理的是客戶端的接入,設置Options、Attrs、Pipeline,并注冊到WorkerGroup,后續的所有讀寫事件交給WorkerGroup處理。

 

在doBind0()沒調用之前,所有的這一切都不會發生,所以最后只要看一下Netty是如何將ServerSocketChannel綁定到本地端口的,整個流程就全部分析結束了。

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // 往Channel綁定的EventLoop提交一個綁定任務,轉交給Channel去執行
   	channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

由于bind()是一個出站事件,所以會轉交給Pipeline執行,需要它把事件傳播出去。

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

Pipeline會從TailContext開始傳播,TailContext會往后尋找能處理bind事件的ChannelHandler:

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    /*
    TailContext會往后尋找能處理bind事件的ChannelHandler。
    因為是出站事件,所以調用findContextOutbound()
     */
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {// 讓EventLoop線程串行化處理
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}

如果用戶沒有重寫bind()回調的話,TailContext會把事件傳播給HeadContext,由于bind操作需要和底層API交互,HeadContext會將操作轉交給Channel.Unsafe執行,所以最終會調用
io.netty.channel.AbstractChannel.AbstractUnsafe#bind(),源碼如下:

/*
將ServerSocketChannel綁定到本地端口,如何被觸發的?
1.Bootstrap.bind()會往Channel注冊的EventLoop提交一個任務:Channel.bind()
2.由于bind()是一個出站事件,需要被Pipeline傳播出去,于是會被轉交給Pipeline執行:Pipeline.bind()
3.bind()事件從TailContext開始傳播,不出意外會傳播到HeadContext。
4.HeadContext會再將bind()任務轉交給Channel.Unsafe執行,于是被觸發。
總結:Channel.bind()會將事件通過Pipeline進行傳播,從TailContext到HeadContext。
 */
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();//確保是EventLoop線程執行

    // promise標記為不可取消 確保Channel是Open狀態,如果close了就無法bind了
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.iswindows() && !PlatformDependent.maybeSuperUser()) {
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();//連接是否活躍
    try {
        /*
        真正的綁定操作,子類實現。
        看NioServerSocketChannel實現,就是調用了JDK原生的javaChannel().bind();
         */
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        // 連接處于活躍狀態,觸發Active回調,往EventLoop提交一個任務,通過Pipeline傳播出去。
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

doBind()又會調回到
NioServerSocketChannel.doBind(),其實就是調用JDK原生的ServerSocketChannel.bind(localAddress , backlog),源碼如下:

@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        // 獲取JDK的ServerSocketChannel.bind()
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

綁定成功后就可以正常處理客戶端的接入了,之后客戶端Channel都會由WorkerGroup驅動IO的讀寫。

 

總結

這篇文章分析了Netty服務端啟動的全流程,從ServerSocketChannel的創建到綁定端口,再到BossGroup驅動客戶端連接的接入和WorkerGroup驅動數據的讀寫。

還重點分析了NioEventLoopGroup和NioEventLoop的工作模式,認真讀完,相信你會對Netty整體的工作機制有所了解。

 

數據接收ByteBuf的分配,數據write的底層細節沒有介紹到,包括Netty對高性能所作的努力也還沒有過多介紹,考慮到篇幅原因,后面會專門再開一篇文章。

 

寫到這里就結束了,此時此刻,我的電腦編輯器已經非常卡了,艱難的敲下這段文字后,是時候說再見了?。?!

分享到:
標簽:Netty
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定