亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

前言

RocketMQ作為一款優秀的開源消息中間件,實現了文件的高性能存儲和讀取,在眾多消息中間件中脫穎而出,其文件模塊設計思想很值得我們學習和借鑒。因此很多開發者在使用的時候,也開始研究其文件存儲的實現原理,但是在學習過程中,由于自身知識儲備不足,往往只能了解其基本原理和整體架構,對于具體是怎么實現是,用到了什么技術,往往是一知半解。目前網上有很多介紹RocketMQ原理和源碼的文章,但是很多都是講解整體架構,對源代碼的分析也僅僅是停留在代碼注釋層面,導致對整體和細節的把握不能統一, 給人一種"不識廬山真面目,只緣身在此山中"的感覺。

筆者針對開發者在研究RocketMQ的過程中遇到的困惑,基于對RocketMQ的文件存儲原理和源碼研究,結合JAVA NIO的文件讀寫,自己動手實現了一個簡化版本的RocketMQ文件系統,分享出來,希望能抽絲剝繭,幫助開發者從本質上理解RocketMQ文件存儲的原理,起到拋磚引玉,舉一反三的作用。

本文不是一篇介紹RocketMQ存儲基本原理的文章,本文假設讀者對RocketMQ的CommitLog,ConsumeQueue,IndexFile已經有一定的了解,熟悉java NIO文件讀寫。本文適合對RocketMQ的文件存儲原理有一定的了解,并且希望進一步了解RocketMQ是如何通過java NIO實現的讀者。

 

核心原理

在向commitLog文件寫入消息的時候,需要記錄該條消息在commitLog文件的偏移量offset(消息在commitLog的起始字節數),讀取的時候根據offset讀取。RocketMQ保存offset的文件為consumeQueue 和indexFile。

 

RockeetMQ文件讀寫流程

 

RocketMQ文件存儲示意圖

RocketMQ文件邏輯存儲結構


RocketMQ文件offset查找示意圖

 

CommitLog讀寫

commitLog文件寫入的是完整的消息,長度不固定,因此讀取的時候只能根據文件存儲偏移量offset讀取。實際上offset保存在consumeQueue,indexFile文件中。

consumeQueue讀寫

consumeQueue在消費方拉取消息的時候讀取,讀取原理比較簡單。

consumeQueue每條數據固定長度是20(8:offset+4:msgLen+8:tagCode),順序寫入,每寫入一條消息,寫入位置postition+20。讀取的時候按消息序號index(第幾條消息)讀取。

假設消費方要消費消息序號index=2的消息(第2條消息),過程如下:

1.定位consumeQueue文件,然后讀位置postition定位到40(2*20),讀取數據。

2.根據1讀取 的數據取到offset值(存儲在consumeQueue的偏移量)。

3.根據2得到的offset值,定位commitLog文件,然后讀取commitLog上的整條消息。

參見RocketMQ文件offset查找示意圖

indexFile讀寫

indexFile由indexHead(長度40),500W個hash槽位(每個槽位長度固定4),2000W個indexData組成。

indexFile是為了方便通過messageId讀取消息而設計的,因此需要將messageId和消息序號index做一層映射,將messageId取模后得到槽位下標(第幾個槽位),然后將當前messageId對應的消息index(消息序號)放到對應的槽位,并將數據順序保存到indexFile的indexData部分。

寫入過程:

1.hash(messageId)%500W得到槽位(slot)的下標slot_index(第幾個槽位,槽位長度固定4),

然后將消息序號index存放到對應的槽位(為簡化設計,暫不考慮hash沖突的情況)。

2.存儲indexData數據,起始存儲位置postition 為

indexDataOffset = 40(文件頭長度) + 500W * 4+(index-1)*20

讀取過程:

1.hash(messageId) % 500W定位到槽位的下標slot_index(第幾個槽位)。

2.然后根據槽位下標計算槽位的偏移量slot_offset(每個槽位的固定長度 是4)。

slot_offset = 40(文件頭長度) + slot_index * 4。

3.然后根據slot_offset獲取到槽位上存儲的消息的序號index。

4.根據消息的index計算該條消息存儲在indexFile的indexData部分的偏移量indexDataOffset,

indexDataOffset = 40(文件頭長度) + 500W * 4+( index - 1 ) * 20

5.根據indexDataOffset讀取indexFile的IndexData部分,然后獲取commitLog的offset,即可讀取到實際的消息。

參見RocketMQ文件offset查找示意圖

 

代碼實現

1.手動生成10個消息,并創建commitLog文件,consumeQueue,indexFile文件

public class CommitLogWriteTest {

    private static Long commitLogOffset = 0L;//8byte(commitlog offset)
    private static List<ConsumerQueueData> consumerQueueDatas = new ArrayList<>();
    private static List<IndexFileItemData> indexFileItemDatas = new ArrayList<>();
    private static int MESSAGE_COUNT = 10;

    public static void main(String[] args) throws IOException {
        createCommitLog();
        createConsumerQueue();
        createIndexFile();
    }

    private static void createCommitLog() throws IOException {
         System.out.println("");
        System.out.println("commitLog file create!" );

        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MAppedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
        fileChannel.close();
        Random random = new Random();

        int count = 0;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String topic = "Topic-test";
            String msgId = UUID.randomUUID().toString();
            String msgBody = "消息內容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48) + 1);//
            long queueOffset = i;//索引偏移量
            String transactionId = UUID.randomUUID().toString();


         /* 數據格式,位置固定
         int totalSize;//消息長度
         String msgId;
         String topic;
         long queueOffset;//索引偏移量
         long bodySize;//消息長度
         byte[] body;//消息內容
         String transactionId;
         long commitLogOffset;//從第一個文件開始算的偏移量

         */

            int msgTotalLen = 8 //msgTotalLen field
                    + 64  //msgId field長度
                    + 64 //topic field長度
                    + 8 //索引偏移量field長度
                    + 8 //消息長度field長度
                    + msgBody.getBytes(StandardCharsets.UTF_8).length //field
                    + 64  //transactionId field長度
                    + 64  //commitLogOffset field長度;
                    ;

            // 定位寫入文件的起始位置
            //如果3個消息長度分別是100,200,350,則偏移量分別是0,100,300
            mappedByteBuffer.position(Integer.valueOf(commitLogOffset + ""));

            mappedByteBuffer.putLong(msgTotalLen);//msgTotalLen
            mappedByteBuffer.put(getBytes(msgId, 64));//msgId
            mappedByteBuffer.put(getBytes(topic, 64));//topic,定長64
            mappedByteBuffer.putLong(queueOffset);//索引偏移量
            mappedByteBuffer.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
            mappedByteBuffer.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
            mappedByteBuffer.put(getBytes(transactionId, 64));
            mappedByteBuffer.putLong(commitLogOffset);//commitLogOffset
           

            System.out.println("寫入消息,第:" + i + "次");

            System.out.println("msgTotalLen:" + msgTotalLen);
            System.out.println("msgId:" + msgId);
            System.out.println("topic:" + topic);
            System.out.println("msgBody:" + msgBody);
            System.out.println("transactionId:" + transactionId);
            System.out.println("commitLogOffset:" + commitLogOffset);

            ConsumerQueueData consumerQueueData = new ConsumerQueueData();
            consumerQueueData.setOffset(commitLogOffset);
            consumerQueueData.setMsgLength(msgTotalLen);
            consumerQueueData.setTagCode(100L);
            
          //準備生成consumeQueue文件
            consumerQueueDatas.add(consumerQueueData);
            IndexFileItemData indexFileItemData = new IndexFileItemData();
            indexFileItemData.setKeyHash(msgId.hashCode());
            indexFileItemData.setMessageId(msgId);
            indexFileItemData.setPhyOffset(commitLogOffset);
            //準備生成indexFile文件
            indexFileItemDatas.add(indexFileItemData);
            mappedByteBuffer.force();
          
            commitLogOffset = msgTotalLen + commitLogOffset;
             count++;
        }

        System.out.println("commitLog數據保存完成,totalSize:" + count);

    }


    public static void createConsumerQueue() throws IOException {
        System.out.println("");
        System.out.println("ConsumerQueue file create!" );

        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
        fileChannel.close();
        int count = 0;
        for (int i = 0; i < consumerQueueDatas.size(); i++) {
            ConsumerQueueData consumerQueueData = consumerQueueDatas.get(i);
            //指定寫入位置
            mappedByteBuffer.position(i * 20);
            mappedByteBuffer.putLong(consumerQueueData.getOffset());//8byte(commitlog offset)
            mappedByteBuffer.putInt(consumerQueueData.getMsgLength());//4byte (msgLength)
            mappedByteBuffer.putLong(consumerQueueData.getTagCode());//8byte (tagCode)

            count++;
            System.out.println("consumerQueue數據寫入完成:" + JSON.toJSONString(consumerQueueData));
            mappedByteBuffer.force();

        }
        System.out.println("ConsumerQueue數據保存完成count:" + count);


    }


    public static void createIndexFile() throws IOException {
        System.out.println("");
        System.out.println("IndexFile file create!" );

        //文件場創建時間,在寫第一條消息的時候創建
        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
        ByteBuffer headerByteBuffer = mappedByteBuffer.slice();
        long firstDataTime = System.currentTimeMillis();

        fileChannel.close();

        //開始寫hash槽,從頭部后寫入
        /*  已經填充有index的slot數量
          (并不是每個slot槽下都掛載有index索引單元,這 里統計的是所有掛載了index索引單元的slot槽的數量,hash沖突)*/
        int hashSlotCount = 0;

        /* 已該indexFile中包含的索引單元個數(統計出當前indexFile中所有slot槽下掛載的所有index索引單元的數量之和),
        如果沒有hash沖突,hashSlotCount = indexCount*/
        int indexCount = 0;
        //假設建立100個槽位(總長度400)
        int soltNum = 100;

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            IndexFileItemData indexFileItemData = indexFileItemDatas.get(i);
            int keyHash = indexFileItemData.getKeyHash();

            //取模,計算第幾個槽位
            int slotPos = keyHash % 100 > 0 ? keyHash % 100 : -1 * (keyHash % 100);

            // slot存放的文件偏移量(字節長度)
            int absSlotPos = 40 + slotPos * 4;

            // 存儲實際數據的文件偏移量(字節長度)
            int absIndexPos =
                    40 + soltNum * 4
                            + indexCount * 20;


            //將indexCount存到對應的hash槽
            mappedByteBuffer.putInt(absSlotPos, indexCount);

            //寫入數據(IndecFile的實際數據部分)
            mappedByteBuffer.putInt(absIndexPos, indexFileItemData.getKeyHash());//8byte msg hashcode
            mappedByteBuffer.putLong(absIndexPos + 4, indexFileItemData.getPhyOffset());//8byte msg hashcode
            mappedByteBuffer.putInt(absIndexPos + 4 + 8, Integer.valueOf((System.currentTimeMillis() - firstDataTime) + ""));//8byte (timeDiff)
            mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, 0);//8byte (preIndex)暫不考慮hash沖突的情況


            //模擬最后一個文件,寫入header
            if (i == 0) {
                //該indexFile中第一條消息的存儲時間
                headerByteBuffer.putLong(0, firstDataTime);
                //該indexFile種第一條消息在commitlog種的偏移量commitlog offset
                mappedByteBuffer.putLong(16, indexFileItemData.getPhyOffset());
            }
            //模擬第一個文件,寫入header
            if (i == MESSAGE_COUNT - 1) {
                //該indexFile種最后一條消息存儲時間
                headerByteBuffer.putLong(8, System.currentTimeMillis());
                //該indexFile中最后一條消息在commitlog中的偏移量commitlog offset
                headerByteBuffer.putLong(24, indexFileItemData.getPhyOffset());
            }
            //已經填充有index的slot數量
            headerByteBuffer.putInt(32, hashSlotCount + 1);
            //該indexFile中包含的索引單元個數
            headerByteBuffer.putInt(36, indexCount + 1);
            mappedByteBuffer.force();
            System.out.println("msgId:" + indexFileItemData.getMessageId() + ",keyHash:" + keyHash + ",保存槽位為" + slotPos + "的數據,absSlotPos=" + absSlotPos + ",值index=" + indexCount + ",絕對位置:" + absIndexPos + ",commit-phyOffset:" + indexFileItemData.getPhyOffset());

            indexCount++;
            hashSlotCount++;

        }

    }


    //將變長字符串定長byte[],方便讀取
    private static byte[] getBytes(String s, int length) {
        int fixLength = length - s.getBytes().length;
        if (s.getBytes().length < length) {
            byte[] S_bytes = new byte[length];
            System.arraycopy(s.getBytes(), 0, S_bytes, 0, s.getBytes().length);
            for (int x = length - fixLength; x < length; x++) {
                S_bytes[x] = 0x00;
            }
            return S_bytes;
        }
        return s.getBytes(StandardCharsets.UTF_8);
    }

}

運行結果:

commitLog file create!
寫入消息,第:0次
msgTotalLen:338
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05
topic:Topic-test
msgBody:消息內容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:874605e6-69d2-4301-a65e-01e63de75a4d
commitLogOffset:0
寫入消息,第:1次
msgTotalLen:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e
topic:Topic-test
msgBody:消息內容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:b991a3e9-66fc-4a54-97fc-1492f7f54d3c
commitLogOffset:338
寫入消息,第:2次
msgTotalLen:296
msgId:a0c7c833-9811-4f17-800b-847766aef7dd
topic:Topic-test
msgBody:消息內容msgm
transactionId:9a836d21-704f-46ae-926c-b7933efe06a5
commitLogOffset:676
寫入消息,第:3次
msgTotalLen:299
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356
topic:Topic-test
msgBody:消息內容msgmsgm
transactionId:19506313-c7ae-4282-8bc7-1f5ca7735c44
commitLogOffset:972
寫入消息,第:4次
msgTotalLen:306
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5
topic:Topic-test
msgBody:消息內容msgmsgmsgmsgms
transactionId:09f3b762-159e-4486-8820-0bce0ef7972d
commitLogOffset:1271
寫入消息,第:5次
msgTotalLen:313
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1
topic:Topic-test
msgBody:消息內容msgmsgmsgmsgmsgmsgmsg
transactionId:42dce613-6aaf-466b-b185-02a3f7917579
commitLogOffset:1577
寫入消息,第:6次
msgTotalLen:321
msgId:05be27f8-fb7a-4662-904f-2263e8899086
topic:Topic-test
msgBody:消息內容msgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:6c7db927-911c-4d19-a240-a951fad957bd
commitLogOffset:1890
寫入消息,第:7次
msgTotalLen:318
msgId:9a508d90-30f6-4a25-812f-25d750736afe
topic:Topic-test
msgBody:消息內容msgmsgmsgmsgmsgmsgmsgmsgms
transactionId:0bbc5e92-0a78-4699-a7a4-408e7bd3b897
commitLogOffset:2211
寫入消息,第:8次
msgTotalLen:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be
topic:Topic-test
msgBody:消息內容msgmsgmsgmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724
commitLogOffset:2529
寫入消息,第:9次
msgTotalLen:329
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489
topic:Topic-test
msgBody:消息內容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:e9078205-15be-42b1-ad7e-55b9f5e229eb
commitLogOffset:2834
commitLog數據保存完成,totalSize:10

ConsumerQueue file create!
consumerQueue數據寫入完成:{"msgLength":338,"offset":0,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":338,"offset":338,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":296,"offset":676,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":299,"offset":972,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":306,"offset":1271,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":313,"offset":1577,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":321,"offset":1890,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":318,"offset":2211,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":305,"offset":2529,"tagCode":100}
consumerQueue數據寫入完成:{"msgLength":329,"offset":2834,"tagCode":100}
ConsumerQueue數據保存完成count:10

IndexFile file create!
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05,keyHash:-358470777,保存槽位為77的數據,absSlotPos=348,值index=0,絕對位置:440,commit-phyOffset:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e,keyHash:466366793,保存槽位為93的數據,absSlotPos=412,值index=1,絕對位置:460,commit-phyOffset:676
msgId:a0c7c833-9811-4f17-800b-847766aef7dd,keyHash:1237522456,保存槽位為56的數據,absSlotPos=264,值index=2,絕對位置:480,commit-phyOffset:972
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356,keyHash:-1115509881,保存槽位為81的數據,absSlotPos=364,值index=3,絕對位置:500,commit-phyOffset:1271
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5,keyHash:1219778974,保存槽位為74的數據,absSlotPos=336,值index=4,絕對位置:520,commit-phyOffset:1577
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1,keyHash:460184183,保存槽位為83的數據,absSlotPos=372,值index=5,絕對位置:540,commit-phyOffset:1890
msgId:05be27f8-fb7a-4662-904f-2263e8899086,keyHash:-339624012,保存槽位為12的數據,absSlotPos=88,值index=6,絕對位置:560,commit-phyOffset:2211
msgId:9a508d90-30f6-4a25-812f-25d750736afe,keyHash:403329587,保存槽位為87的數據,absSlotPos=388,值index=7,絕對位置:580,commit-phyOffset:2529
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be,keyHash:-1569335572,保存槽位為72的數據,absSlotPos=328,值index=8,絕對位置:600,commit-phyOffset:2834
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489,keyHash:597856342,保存槽位為42的數據,absSlotPos=208,值index=9,絕對位置:620,commit-phyOffset:3163

 

2.讀取consumeQueue文件,并根據offset從commitLog讀取一條完整的消息

public class ConsumeQueueMessageReadTest {

    public static MappedByteBuffer mappedByteBuffer = null;
    private static int MESSAGE_COUNT = 10;

    public static void main(String[] args) throws IOException {
        FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
        fileChannel.close();
        //根據索引下標讀取索引,實際情況是用戶消費的最新點位(for循環的i值),
      //存在在broker的偏移量文件中
       
        int index = 0;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            mappedByteBuffer.position(i * 20);

            long commitlogOffset = mappedByteBuffer.getLong();
            // System.out.println(commitlogOffset);
            long msgLen = mappedByteBuffer.getInt();
            Long tag = mappedByteBuffer.getLong();
            //System.out.println("======讀取到consumerQueue,commitlogOffset:"+commitlogOffset+",msgLen :"+msgLen+"===");
            //根據偏移量讀取CommitLog
            System.out.println("=================commitlog讀取第:"+index+"消息,偏移量為" + commitlogOffset + "===================");

            readCommitLogByOffset(Integer.valueOf(commitlogOffset + ""));
            index ++;

        }
    }


    public static MappedByteBuffer initFileChannel() throws IOException {
        if (mappedByteBuffer == null) {
            FileChannel commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
                    StandardOpenOption.WRITE, StandardOpenOption.READ);

            mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
            commitLogfileChannel.close();
        }

        return mappedByteBuffer;

    }


    /*
     *
     * 根據偏移量讀取commitLog
     * */
    public static void readCommitLogByOffset(int offset) throws IOException {

        /* 存放順序,讀到時候保持順序一致
           b.putLong(totalSize);//totalSize
            b.put(getBytes(msgId, 64));//msgId
            b.put(getBytes(topic, 64));//topic,定長64
            b.putLong(queueOffset);//索引偏移量
            b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
            b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
            b.put(getBytes(transactionId, 64));
            b.putLong(commitLogOffset);//commitLogOffset
        */

        MappedByteBuffer mappedByteBuffer = initFileChannel();
        mappedByteBuffer.position(offset);


        long totalSize = mappedByteBuffer.getLong();//消息長度

        byte[] msgIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(msgIdByte);

        byte[] topicByte = new byte[64];// 固定是64
        mappedByteBuffer.get(topicByte);
        long queueOffset = mappedByteBuffer.getLong();
        Long bodySize = mappedByteBuffer.getLong();
        int bSize = Integer.valueOf(bodySize + "");
        byte[] bodyByte = new byte[bSize];//bodySize 長度不固定
        mappedByteBuffer.get(bodyByte);
        byte[] transactionIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(transactionIdByte);
        long commitLogOffset = mappedByteBuffer.getLong();//偏移量
        System.out.println("totalSize:" + totalSize);
        System.out.println("msgId:" + new String(msgIdByte));
        System.out.println("topic:" + new String(topicByte));
        System.out.println("queueOffset:" + queueOffset);
        System.out.println("bodySize:" + bodySize);
        System.out.println("body:" + new String(bodyByte));
        System.out.println("transactionId:" + new String(transactionIdByte));
        System.out.println("commitLogOffset:" + commitLogOffset);

    }

}

運行結果:

=================commitlog讀取第:0消息,偏移量為0===================
totalSize:338
msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05                            
topic:Topic-test                                                      
queueOffset:0
bodySize:58
body:消息內容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:874605e6-69d2-4301-a65e-01e63de75a4d                            
commitLogOffset:0
=================commitlog讀取第:1消息,偏移量為338===================
totalSize:338
msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681e                            
topic:Topic-test                                                      
queueOffset:1
bodySize:58
body:消息內容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:b991a3e9-66fc-4a54-97fc-1492f7f54d3c                            
commitLogOffset:338
=================commitlog讀取第:2消息,偏移量為676===================
totalSize:296
msgId:a0c7c833-9811-4f17-800b-847766aef7dd                            
topic:Topic-test                                                      
queueOffset:2
bodySize:16
body:消息內容msgm
transactionId:9a836d21-704f-46ae-926c-b7933efe06a5                            
commitLogOffset:676
=================commitlog讀取第:3消息,偏移量為972===================
totalSize:299
msgId:050d6330-1f4a-4dff-a650-4f7eaee63356                            
topic:Topic-test                                                      
queueOffset:3
bodySize:19
body:消息內容msgmsgm
transactionId:19506313-c7ae-4282-8bc7-1f5ca7735c44                            
commitLogOffset:972
=================commitlog讀取第:4消息,偏移量為1271===================
totalSize:306
msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5                            
topic:Topic-test                                                      
queueOffset:4
bodySize:26
body:消息內容msgmsgmsgmsgms
transactionId:09f3b762-159e-4486-8820-0bce0ef7972d                            
commitLogOffset:1271
=================commitlog讀取第:5消息,偏移量為1577===================
totalSize:313
msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1                            
topic:Topic-test                                                      
queueOffset:5
bodySize:33
body:消息內容msgmsgmsgmsgmsgmsgmsg
transactionId:42dce613-6aaf-466b-b185-02a3f7917579                            
commitLogOffset:1577
=================commitlog讀取第:6消息,偏移量為1890===================
totalSize:321
msgId:05be27f8-fb7a-4662-904f-2263e8899086                            
topic:Topic-test                                                      
queueOffset:6
bodySize:41
body:消息內容msgmsgmsgmsgmsgmsgmsgmsgmsgms
transactionId:6c7db927-911c-4d19-a240-a951fad957bd                            
commitLogOffset:1890
=================commitlog讀取第:7消息,偏移量為2211===================
totalSize:318
msgId:9a508d90-30f6-4a25-812f-25d750736afe                            
topic:Topic-test                                                      
queueOffset:7
bodySize:38
body:消息內容msgmsgmsgmsgmsgmsgmsgmsgms
transactionId:0bbc5e92-0a78-4699-a7a4-408e7bd3b897                            
commitLogOffset:2211
=================commitlog讀取第:8消息,偏移量為2529===================
totalSize:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be                            
topic:Topic-test                                                      
queueOffsmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724                            
commitLogOffset:2529
=================commitlog讀取第:9消息,偏移量為2834===================
totalSize:329
msgId:93c46c53-b097-4dd0-90d7-06d5d877f489                            
topic:Topic-test                                                      
queueOffset:9
bodySize:49
body:消息內容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgm
transactionId:e9078205-15be-42b1-ad7e-55b9f5e229eb                            
commitLogOffset:2834

3.根據messageId讀取indexFile,然后根據偏移量從CommitLog讀取一條完整的消息

 public class IndexFileMessageReadTest {

    public static MappedByteBuffer mappedByteBuffer = null;

    public static void main(String[] args) throws IOException {
        String msgId = "8b78474f-b28a-4442-99a0-6f7883f0302b";
        readByMessageId(msgId);

    }

    private static void readByMessageId(String messageId) throws IOException {
        FileChannel indexFileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/index.txt")),
                StandardOpenOption.WRITE, StandardOpenOption.READ);
        MappedByteBuffer indexMappedByteBuffer = indexFileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
        indexFileChannel.close();

        System.out.println("============get indexFile header===============");
        System.out.println("beginTimestampIndex:" + indexMappedByteBuffer.getLong());
        System.out.println("endTimestampIndex:" + indexMappedByteBuffer.getLong());
        System.out.println("beginPhyoffsetIndex:" + indexMappedByteBuffer.getLong());
        System.out.println("endPhyoffsetIndex:" + indexMappedByteBuffer.getLong());
        System.out.println("hashSlotcountIndex:" + indexMappedByteBuffer.getInt());
        System.out.println("indexCountIndex:" + indexMappedByteBuffer.getInt());
        System.out.println("");

        int keyHash = messageId.hashCode();

        //取模,計算第幾個槽位
        int slotPos = keyHash % 100 > 0 ? keyHash % 100 : -1 * (keyHash % 100);
        System.out.println("messageId:" + messageId + ",取模為:" + slotPos);

        // slot的文件偏移量(字節長度)
        int absSlotPos = 40 + slotPos * 4;
        System.out.println("哈希槽的字節數組位置:(40+" + slotPos + "*4)=" + absSlotPos);


        //獲取hash槽上存取的件索引,第幾個文件
        int index = indexMappedByteBuffer.getInt(absSlotPos);

        //計算數據需要存儲的文件偏移量(字節長度)
        int absIndexPos =
                40 + 100 * 4
                        + index * 20;

        System.out.println("第幾個文件index=" + index + ",實際存儲數據的字節數組位置:(40 + 100 * 4+index *20)=" + absIndexPos);

        long keyHash1 = indexMappedByteBuffer.getInt(absIndexPos);
        long pyhOffset = indexMappedByteBuffer.getLong(absIndexPos + 4);
        int timeDiff = indexMappedByteBuffer.getInt(absIndexPos + 4 + 8);
        int preIndexNo = indexMappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);


        System.out.println("從index獲取到的commitLog偏移量為:" + pyhOffset);
        System.out.println("");

        readCommitLogByOffset((int) pyhOffset);

    }


    public static MappedByteBuffer initFileChannel() throws IOException {
        if (mappedByteBuffer == null) {
            FileChannel commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),
                    StandardOpenOption.WRITE, StandardOpenOption.READ);

            mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);
            commitLogfileChannel.close();
        }

        return mappedByteBuffer;

    }


    /*
     *
     * 根據偏移量讀取CcommitLog
     * */
    public static void readCommitLogByOffset(int offset) throws IOException {


        /*b.putLong(totalSize);//totalSize
            b.put(getBytes(msgId, 64));//msgId
            b.put(getBytes(topic, 64));//topic,定長64
            b.putLong(queueOffset);//索引偏移量
            b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySize
            b.put(msgBody.getBytes(StandardCharsets.UTF_8));//body
            b.put(getBytes(transactionId, 64));
            b.putLong(commitLogOffset);//commitLogOffset
        */
        System.out.println("=================commitlog讀取偏移量為" + offset + "的消息===================");

        MappedByteBuffer mappedByteBuffer = initFileChannel();
        mappedByteBuffer.position(offset);


        long totalSize = mappedByteBuffer.getLong();//消息長度

        byte[] msgIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(msgIdByte);

        byte[] topicByte = new byte[64];// 固定是64
        mappedByteBuffer.get(topicByte);
        long queueOffset = mappedByteBuffer.getLong();
        Long bodySize = mappedByteBuffer.getLong();
        int bSize = Integer.valueOf(bodySize + "");
        byte[] bodyByte = new byte[bSize];//bodySize 長度不固定
        mappedByteBuffer.get(bodyByte);
        byte[] transactionIdByte = new byte[64];//uuid 固定是64
        mappedByteBuffer.get(transactionIdByte);
        long commitLogOffset = mappedByteBuffer.getLong();//偏移量
        System.out.println("totalSize:" + totalSize);
        System.out.println("msgId:" + new String(msgIdByte));
        System.out.println("topic:" + new String(topicByte));
        System.out.println("queueOffset:" + queueOffset);
        System.out.println("bodySize:" + bodySize);
        System.out.println("body:" + new String(bodyByte));
        System.out.println("transactionId:" + new String(transactionIdByte));
        System.out.println("commitLogOffset:" + commitLogOffset);

    }


    public static byte[] toByteArray(long number) {
        byte length = Long.BYTES;
        byte[] bytes = new byte[length];

        for (byte i = 0; i < length; i++) {
            bytes[length - 1 - i] = (byte) number;
            number >>= 8;
        }

        return bytes;
    }

}

運行結果:

============get indexFile header===============
beginTimestampIndex:1669602898169
endTimestampIndex:1669602898176
beginPhyoffsetIndex:338
endPhyoffsetIndex:3163
hashSlotcountIndex:10
indexCountIndex:10

messageId:9a508d90-30f6-4a25-812f-25d750736afe,取模為:87
哈希槽的字節數組位置:(40+87*4)=388
第幾個文件index=7,實際存儲數據的字節數組位置:(40 + 100 * 4+index *20)=580
從index獲取到的commitLog偏移量為:2529

=================commitlog讀取偏移量為2529的消息===================
totalSize:305
msgId:63249e08-bd0c-4a5b-954b-aea83cb442be                            
topic:Topic-test                                                      
queueOffset:8
bodySize:25
body:消息內容msgmsgmsgmsgm
transactionId:22cc0dd6-2036-4423-8e6f-d7043b953724                            
commitLogOffset:2529

結語

本文基于java NIO實現了RocketMQ的文件系統的最精簡的功能,希望能幫助開發人員加深對RocketMQ文件系統底層實現原理的了解,并能熟練運用Java NIO進行文件讀寫。歡迎一起交流討論,不足的地方歡迎指正。

分享到:
標簽:RocketMQ
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定