NIO的背景
為什么一個(gè)已經(jīng)存在10年的增強(qiáng)包還是JAVA的新I/O包呢?原因是對(duì)于大多數(shù)的Java程序員而言,基本的I/O操作都能夠勝任。在日常工作中,大部分的Java開(kāi)發(fā)者沒(méi)有必要去學(xué)習(xí)NIO。更進(jìn)一步,NIO不僅僅是一個(gè)性能提升包。相反,它是一個(gè)和Java I/O相關(guān)的不同功能的集合。NIO通過(guò)使得Java應(yīng)用的性能“更加接近實(shí)質(zhì)”來(lái)達(dá)到性能提升的效果,也就是意味著NIO和NIO.2的API暴露了低層次的系統(tǒng)操作的入口。NIO的代價(jià)就是它在提供更強(qiáng)大的I/O控制能力的同時(shí),也要求我們比使用基本的I/O編程更加細(xì)心地使用和練習(xí)。NIO的另一特點(diǎn)是它對(duì)于應(yīng)用程序的表現(xiàn)力的關(guān)注,這個(gè)我們會(huì)在下面的練習(xí)中看到。
Java NIO和IO的主要區(qū)別
- 面向流與面向緩沖. Java NIO和IO之間第一個(gè)最大的區(qū)別是,IO是面向流的,NIO是面向緩沖區(qū)的。Java IO面向流意味著每次從流中讀一個(gè)或多個(gè)字節(jié),直至讀取所有字節(jié),它們沒(méi)有被緩存在任何地方。此外,它不能前后移動(dòng)流中的數(shù)據(jù)。如果需要前后移動(dòng)從流中讀取的數(shù)據(jù),需要先將它緩存到一個(gè)緩沖區(qū)。 Java NIO的緩沖導(dǎo)向方法略有不同。數(shù)據(jù)讀取到一個(gè)它稍后處理的緩沖區(qū),需要時(shí)可在緩沖區(qū)中前后移動(dòng)。這就增加了處理過(guò)程中的靈活性。
- 阻塞與非阻塞IO Java IO的各種流是阻塞的。這意味著,當(dāng)一個(gè)線程調(diào)用read() 或 write()時(shí),該線程被阻塞,直到有一些數(shù)據(jù)被讀取,或數(shù)據(jù)完全寫入。該線程在此期間不能再干任何事情了。 Java NIO的非阻塞模式,使一個(gè)線程從某通道發(fā)送請(qǐng)求讀取數(shù)據(jù),但是它僅能得到目前可用的數(shù)據(jù),如果目前沒(méi)有數(shù)據(jù)可用時(shí),該線程可以繼續(xù)做其他的事情。 非阻塞寫也是如此。一個(gè)線程請(qǐng)求寫入一些數(shù)據(jù)到某通道,但不需要等待它完全寫入,這個(gè)線程同時(shí)可以去做別的事情。線程通常將非阻塞IO的空閑時(shí)間用于在其它通道上執(zhí)行IO操作,所以一個(gè)單獨(dú)的線程現(xiàn)在可以管理多個(gè)輸入和輸出通道(channel)。
- 選擇器(Selectors) Java NIO的選擇器允許一個(gè)單獨(dú)的線程來(lái)監(jiān)視多個(gè)輸入通道,你可以注冊(cè)多個(gè)通道使用一個(gè)選擇器,然后使用一個(gè)單獨(dú)的線程來(lái)“選擇”通道:這些通道里已經(jīng)有可以處理的輸入,或者選擇已準(zhǔn)備寫入的通道。這種選擇機(jī)制,使得一個(gè)單獨(dú)的線程很容易來(lái)管理多個(gè)通道。
最佳practice
SelectionKey.OP_WRITE訂閱時(shí)機(jī)
現(xiàn)象: cpu占用超高
原因: 訂閱了SelectionKey.OP_WRITE事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isConnectable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); }
分析: 當(dāng)socket緩沖區(qū)可寫入時(shí)就會(huì)觸發(fā)OP_WRITE事件. 而socket緩沖區(qū)大多時(shí)間都可寫入(網(wǎng)絡(luò)不擁堵),由于nio水平觸發(fā)的特性O(shè)P_WRITE會(huì)一直觸發(fā)導(dǎo)致while()一直空轉(zhuǎn)
水平觸發(fā): 簡(jiǎn)單解釋為只要滿足條件就一直觸發(fā),而不是發(fā)生狀態(tài)改變時(shí)才觸發(fā)(有點(diǎn)主動(dòng)和被動(dòng)觸發(fā)的感覺(jué))
最佳實(shí)踐:
方案一: 當(dāng)有寫數(shù)據(jù)需求時(shí)訂閱OP_WRITE事件,數(shù)據(jù)發(fā)送完成取消訂閱.
while (channel.isOpen()) { if (channel.isConnected() && writeBuffer.isReadable()) { //writeBuffer可讀 注冊(cè)write事件 channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } //當(dāng)采用臨時(shí)訂閱OP_WRITE方式 必須使用select(ms)進(jìn)行超時(shí)返回 // 因?yàn)楹苡锌赡墚?dāng)select()前極短時(shí)間內(nèi)writeBuffer有數(shù)據(jù),而此時(shí)沒(méi)有訂閱OP_WRITE事件,會(huì)使select()一直阻塞 int ready = selector.select(300); if (ready > 0) { SelectionKey selectionKey = iterator.next(); iterator.remove(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false); if (selectionKey.isWritable()) { writeBuffer.flip(); while (writeBuffer.hasRemaining()) { channel.write(writeBuffer); } writeBuffer.clear(); socketChannel.register(selector, SelectionKey.OP_READ); } } }
當(dāng)使用臨時(shí)訂閱OP_WRITE事件方式時(shí),必須使用selector.select(long),進(jìn)行超時(shí)返回. 因?yàn)楹苡锌赡墚?dāng)select()前極短時(shí)間內(nèi)writeBuffer有數(shù)據(jù),而此時(shí)沒(méi)有訂閱OP_WRITE事件,會(huì)使select()一直阻塞
方案二: 不訂閱OP_WRITE事件,直接通過(guò)socketChannel.write()寫數(shù)據(jù).
Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT); channel.connect(new InetSocketAddress("localhost", 5555)); while (channel.isOpen()) { if (channel.isConnected()) { writeBuffer.flip(); while (writeBuffer.hasRemaining()) { channel.write(writeBuffer); } writeBuffer.clear(); } int ready = selector.select(500); ...各種事件處理 }
方案三: 一直訂閱OP_WRITE,socketChannel主動(dòng)寫
while (channel.isOpen()) { //這里與方案一有區(qū)別 可以直接阻塞 int ready = selector.select(); if (ready > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { ...緩沖區(qū)已寫數(shù)據(jù)清理 SelectionKey selectionKey = iterator.next(); iterator.remove(); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false); if (selectionKey.isConnectable()) { if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } //訂閱讀/寫事件 socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } if (selectionKey.isReadable()) { ...讀事件處理 } if (selectionKey.isWritable()) { //改為主動(dòng)讀取式 ByteBuffer byteBuffer = awaitGetWrite(writeBuffer, 30, 50); if (byteBuffer != null) { int write = channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + write); if (write != byteBuffer.limit()) { System.out.print("a"); } } } } } } /** * 等待獲取寫緩存 * @param byteBuf 緩沖區(qū) * @param ms 緩沖時(shí)間 防止空轉(zhuǎn) * @param cap 閾值:超過(guò)則直接返回,沒(méi)超過(guò)等待ms后判斷是否超過(guò)閾值 * @return */ public ByteBuffer awaitGetWrite(ByteBuf byteBuf, long ms, int cap) { //緩沖大小 不要過(guò)大就行 自己調(diào)整 int socketCap = 1024 * 30; if (byteBuf.readableBytes() >= cap) {//>=cap直接返回 return ByteBuffer.allocate(byteBuf.readableBytes() > socketCap ? socketCap : byteBuf.readableBytes()); } else {//<cap時(shí)等待 CountDownLatch countDownLatch = new CountDownLatch(1); try { countDownLatch.await(ms, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } if (byteBuf.readableBytes() > 0) { return ByteBuffer.allocate(byteBuf.readableBytes() > socketCap ? socketCap : byteBuf.readableBytes()); } else { return null; } } }
優(yōu)點(diǎn)缺點(diǎn)方案1當(dāng)網(wǎng)絡(luò)擁堵時(shí),不嘗試寫數(shù)據(jù)需要自己控制訂閱/取消訂閱的時(shí)機(jī)方案2不關(guān)心網(wǎng)絡(luò)擁堵,只要有數(shù)據(jù)就嘗試寫,當(dāng)網(wǎng)絡(luò)擁堵時(shí)做大量無(wú)用功編寫方便,無(wú)需關(guān)心OP_WRITE事件訂閱時(shí)機(jī)方案3相比方案1 編碼復(fù)雜度下降
綜合上述個(gè)人覺(jué)得還是方案3比較好
channel.write()寫數(shù)據(jù)問(wèn)題
現(xiàn)象: 網(wǎng)絡(luò)擁堵時(shí),cpu占用超高
原因: 網(wǎng)絡(luò)擁堵時(shí), channel.write()一直寫不進(jìn)去,導(dǎo)致while()空轉(zhuǎn)
采取上一問(wèn)題方案3可以避免該問(wèn)題
writeBuffer.flip(); while (writeBuffer.hasRemaining()) { channel.write(writeBuffer); } writeBuffer.clear();
分析: 當(dāng)網(wǎng)絡(luò)擁堵時(shí),channel.write()可能寫入0數(shù)據(jù),而這里采用死循環(huán)寫入數(shù)據(jù),假如一直寫不進(jìn)去就會(huì)導(dǎo)致空轉(zhuǎn)
最佳實(shí)踐:
while (writeBuffer.isReadable()) { //這里使用的是netty的ByteBuf ByteBuffer byteBuffer = writeBuffer.nioBuffer(); channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position()); int left = byteBuffer.limit() - byteBuffer.position(); if (left != 0) {//無(wú)法全部寫入到socket緩沖區(qū)中,說(shuō)明socket緩沖區(qū)已滿,可能發(fā)生空轉(zhuǎn) break System.err.print("a"); //防止空轉(zhuǎn) 依賴外層循環(huán)重新進(jìn)入 break; } }
結(jié)合OP_WRITE訂閱時(shí)機(jī)問(wèn)題,可以得知方案一的臨時(shí)訂閱OP_WRITE事件方式,能更好的防止channel.write(byteBuffer)空轉(zhuǎn)
TCP斷開(kāi)判斷
現(xiàn)象: 當(dāng)TCP一方斷開(kāi)時(shí),另一方cpu占用超高
原因: 當(dāng)TCP一方斷開(kāi)時(shí),一直會(huì)觸發(fā)OP_READ,導(dǎo)致空轉(zhuǎn).
分析: 當(dāng)TCP一方斷開(kāi)時(shí),觸發(fā)OP_READ,socketChannel.read(readBuffer)返回-1,表示對(duì)方連接已斷開(kāi),自己也需要斷開(kāi)連接socketChannel.close(),否則會(huì)一直觸發(fā)OP_READ,導(dǎo)致空轉(zhuǎn)
while (true) { int ready = selector.select(); if (ready > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isConnectable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.configureBlocking(false); //The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream int read = socketChannel.read(readBuffer); readBuffer.flip(); //讀到-1 沒(méi)有處理 導(dǎo)致空轉(zhuǎn) if (read > 0) { System.out.print(new String(readBuffer.array(), 0, read)); } } ... } } } 復(fù)制代碼
最佳實(shí)踐:
if (selectionKey.isReadable()) { ByteBuffer readBuffer = Server.SocketContext.get(socketChannel).getReadBuffer(); int read = socketChannel.read(readBuffer); readBuffer.flip(); if (read > 0) { System.out.print(new String(readBuffer.array(), 0, read)); } else if (read == -1) {//對(duì)面已斷開(kāi) close System.out.println("斷開(kāi)..." + socketChannel.socket().getRemoteSocketAddress()); socketChannel.close(); } }
ByteBuf使用
ByteBuf,ByteBuffer對(duì)比
特性ByteBuffer1.有position,limit屬性,通過(guò)flip()切換讀寫模式 ,不支持同時(shí)讀/寫 2.定長(zhǎng) 3.直接內(nèi)存ByteBuf1.有rix,wix,cap,maxCap屬性,支持同時(shí)讀/寫 2.自動(dòng)擴(kuò)容 3.直接內(nèi)存,堆內(nèi)存,組合
建議使用ByteBuf
ByteBuf 的clear()和discardReadBytes()對(duì)比
現(xiàn)象: 使用clear()導(dǎo)致丟數(shù)據(jù)
原因: clear()實(shí)現(xiàn)通過(guò) rix=wix=0,假如此時(shí)同時(shí)有數(shù)據(jù)寫入,該部分?jǐn)?shù)據(jù)則丟失
if (selectionKey.isWritable()) { while (writeBuffer.isReadable()) { ByteBuffer byteBuffer = writeBuffer.nioBuffer(); channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position()); int left = byteBuffer.limit() - byteBuffer.position(); if (left != 0) {//無(wú)法一次性寫入到緩沖區(qū)中,可能發(fā)生空轉(zhuǎn) break ... break; } else { //清理已發(fā)送數(shù)據(jù) writeBuffer.clear(); } } ... } 復(fù)制代碼
最佳實(shí)踐:
使用discardReadBytes(),其通過(guò)arrayCopy方式并且線程安全,能夠防止數(shù)據(jù)丟失.但頻繁的arrayCopy會(huì)有性能問(wèn)題. 可以使用clear()和discardReadBytes()的組合
if (selectionKey.isWritable()) { while (writeBuffer.isReadable()) { //當(dāng)緩沖區(qū)使用>2/3事 且wix-rix< (maxCap*1/3) 對(duì)緩沖區(qū)進(jìn)行整理 if (writeBuffer.writerIndex() > (writeBuffer.maxCapacity() / 3 * 2) && writeBuffer.writerIndex() - writeBuffer .readerIndex() < (writeBuffer.maxCapacity() / 3)) { System.out.println(String.format("緩沖區(qū)使用超過(guò)2/3 discardReadBytes writerIndex:%d " + "readerIndex:%d", writeBuffer .writerIndex(), writeBuffer.readerIndex())); writeBuffer.discardReadBytes(); } ByteBuffer byteBuffer = writeBuffer.nioBuffer(); channel.write(byteBuffer); writeBuffer.readerIndex(writeBuffer.readerIndex() + byteBuffer.position()); int left = byteBuffer.limit() - byteBuffer.position(); if (left != 0) {//無(wú)法一次性寫入到緩沖區(qū)中,可能發(fā)生空轉(zhuǎn) break ... //防止空轉(zhuǎn) 等待下次write事件 break; } else { //注意clear()的使用 因?yàn)閣riteBuffer一直在寫入 writerIndex可能>readIndex if (writeBuffer.writerIndex() == writeBuffer.readerIndex()) { //TODO 因?yàn)椴皇窃舆^(guò)程 理論上會(huì)有問(wèn)題 但實(shí)際驗(yàn)證中卻沒(méi)問(wèn)題 待驗(yàn)證 writeBuffer.clear(); System.out.println("clear"); } } } ... }
使用快速收斂
在GunNetty中,快速收斂確保Selector中所有的key均為有效key,不包含失效key,該方法一般使用在關(guān)閉channel之后
@Override public int fastLimit() throws IOException { bootSelector.wakeup(); return bootSelector.select(0); }
1.如果正在阻塞輪訓(xùn),立刻終止,使用wakeup函數(shù)
2.立刻select(0)刪除已經(jīng)失效的key