亚洲视频二区_亚洲欧洲日本天天堂在线观看_日韩一区二区在线观看_中文字幕不卡一区

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.430618.com 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

Disruptor是一個開源框架,研發的初衷是為了解決高并發下隊列鎖的問題,最早由LMAX提出并使用,能夠在無鎖的情況下實現隊列的并發操作,并號稱能夠在一個線程里每秒處理6百萬筆訂單

官網:lmax-exchange.github.io/disruptor/

目前,包括Apache Storm、Camel、Log4j2在內的很多知名項目都應用了Disruptor以獲取高性能

為什么會產生Disruptor框架

「目前JAVA內置隊列保證線程安全的方式:」

ArrayBlockingQueue:基于數組形式的隊列,通過加鎖的方式,來保證多線程情況下數據的安全;

LinkedBlockingQueue:基于鏈表形式的隊列,也通過加鎖的方式,來保證多線程情況下數據的安全;

ConcurrentLinkedQueue:基于鏈表形式的隊列,通過CAS的方式

我們知道,在編程過程中,加鎖通常會嚴重地影響性能,所以盡量用無鎖方式,就產生了Disruptor這種無鎖高并發框架

基本概念

參考地址:github.com/LMAX-Exchan…

RingBuffer——Disruptor底層數據結構實現,核心類,是線程間交換數據的中轉地;

Sequencer——序號管理器,生產同步的實現者,負責消費者/生產者各自序號、序號柵欄的管理和協調,Sequencer有單生產者,多生產者兩種不同的模式,里面實現了各種同步的算法;

Sequence——序號,聲明一個序號,用于跟蹤ringbuffer中任務的變化和消費者的消費情況,disruptor里面大部分的并發代碼都是通過對Sequence的值同步修改實現的,而非鎖,這是disruptor高性能的一個主要原因;

SequenceBarrier——序號柵欄,管理和協調生產者的游標序號和各個消費者的序號,確保生產者不會覆蓋消費者未來得及處理的消息,確保存在依賴的消費者之間能夠按照正確的順序處理

EventProcessor——事件處理器,監聽RingBuffer的事件,并消費可用事件,從RingBuffer讀取的事件會交由實際的生產者實現類來消費;它會一直偵聽下一個可用的序號,直到該序號對應的事件已經準備好。

EventHandler——業務處理器,是實際消費者的接口,完成具體的業務邏輯實現,第三方實現該接口;代表著消費者。

Producer——生產者接口,第三方線程充當該角色,producer向RingBuffer寫入事件。

Wait Strategy:Wait Strategy決定了一個消費者怎么等待生產者將事件(Event)放入Disruptor中。

高性能無鎖并發框架Disruptor,太強了

 

等待策略

源碼地址:github.com/LMAX-Exchan…

「BlockingWaitStrategy」

Disruptor的默認策略是BlockingWaitStrategy。在BlockingWaitStrategy內部是使用鎖和condition來控制線程的喚醒。BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一致的性能表現。

「SleepingWaitStrategy」

SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產者線程的影響最小,通過使用LockSupport.parkNanos(1)來實現循環等待。

「YieldingWaitStrategy」

YieldingWaitStrategy是可以使用在低延遲系統的策略之一。YieldingWaitStrategy將自旋以等待序列增加到適當的值。在循環體內,將調用Thread.yield()以允許其他排隊的線程運行。在要求極高性能且事件處理線數小于 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。

「BusySpinWaitStrategy」

性能最好,適合用于低延遲的系統。在要求極高性能且事件處理線程數小于CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。

「PhasedBackoffWaitStrategy」

自旋 + yield + 自定義策略,CPU資源緊缺,吞吐量和延遲并不重要的場景。

使用舉例

參考地址:github.com/LMAX-Exchan…

<dependency>
      <groupId>com.lmax</groupId>
      <artifactId>disruptor</artifactId>
      <version>3.3.4</version>
   </dependency>
//定義事件event  通過Disruptor 進行交換的數據類型。
public class LongEvent {    private Long value;    public Long getValue() {
        return value;
    }    public void setValue(Long value) {        this.value = value;    }}
public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }}
//定義事件消費者
public class LongEventHandler implements EventHandler<LongEvent>  {    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {         System.out.println("消費者:"+event.getValue());
    }}
//定義生產者
public class LongEventProducer {
    public final RingBuffer<LongEvent> ringBuffer;
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void onData(ByteBuffer byteBuffer) {
        // 1.ringBuffer 事件隊列 下一個槽
        long sequence = ringBuffer.next();
        Long data = null;
        try {
            //2.取出空的事件隊列
            LongEvent longEvent = ringBuffer.get(sequence);
            data = byteBuffer.getLong(0);
            //3.獲取事件隊列傳遞的數據
            longEvent.setValue(data);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } finally {
            System.out.println("生產這準備發送數據");
            //4.發布事件
            ringBuffer.publish(sequence);
        }
    }
}
public class DisruptorMain {
    public static void main(String[] args) {
        // 1.創建一個可緩存的線程 提供線程來出發Consumer 的事件處理
        ExecutorService executor = Executors.newCachedThreadPool();
        // 2.創建工廠
        EventFactory<LongEvent> eventFactory = new LongEventFactory();
        // 3.創建ringBuffer 大小
        int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方
        // 4.創建Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
                ProducerType.SINGLE, new YieldingWaitStrategy());
        // 5.連接消費端方法
        disruptor.handleEventsWith(new LongEventHandler());
        // 6.啟動
        disruptor.start();
        // 7.創建RingBuffer容器
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        // 8.創建生產者
        LongEventProducer producer = new LongEventProducer(ringBuffer);
        // 9.指定緩沖區大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 1; i <= 100; i++) {
            byteBuffer.putLong(0, i);
            producer.onData(byteBuffer);
        }
        //10.關閉disruptor和executor
        disruptor.shutdown();
        executor.shutdown();
    }
}

核心設計原理

Disruptor通過以下設計來解決隊列速度慢的問題:

「環形數組結構:」

為了避免垃圾回收,采用數組而非鏈表。同時,數組對處理器的緩存機制更加友好

?

原因:CPU緩存是由很多個緩存行組成的。每個緩存行通常是64字節,并且它有效地引用主內存中的一塊兒地址。一個Java的long類型變量是8字節,因此在一個緩存行中可以存8個long類型的變量。CPU每次從主存中拉取數據時,會把相鄰的數據也存入同一個緩存行。在訪問一個long數組的時候,如果數組中的一個值被加載到緩存中,它會自動加載另外7個。因此你能非??斓谋闅v這個數組。

?

「元素位置定位:」

數組長度2^n,通過位運算,加快定位的速度。下標采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

「無鎖設計:」

每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據,整個過程通過原子變量CAS,保證操作的線程安全

數據結構

框架使用RingBuffer來作為隊列的數據結構,RingBuffer就是一個可自定義大小的環形數組。

除數組外還有一個序列號(sequence),用以指向下一個可用的元素,供生產者與消費者使用。

原理圖如下所示:

高性能無鎖并發框架Disruptor,太強了

 

Sequence

mark:Disruptor通過順序遞增的序號來編號管理通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿著序號逐個遞增處理。

「數組+序列號設計的優勢是什么呢?」

回顧一下HashMap,在知道索引(index)下標的情況下,存與取數組上的元素時間復雜度只有O(1),而這個index我們可以通過序列號與數組的長度取模來計算得出,index=sequence % table.length。當然也可以用位運算來計算效率更高,此時table.length必須是2的冪次方。

寫數據流程

單線程寫數據的流程:

  1. 申請寫入m個元素;
  2. 若是有m個元素可以入,則返回最大的序列號。這兒主要判斷是否會覆蓋未讀的元素;
  3. 若是返回的正確,則生產者開始寫入元素。
高性能無鎖并發框架Disruptor,太強了

 

使用場景

經過測試,Disruptor的的延時和吞吐量都比ArrayBlockingQueue優秀很多,所以,當你在使用ArrayBlockingQueue出現性能瓶頸的時候,你就可以考慮采用Disruptor的代替。

參考:github.com/LMAX-Exchan…

高性能無鎖并發框架Disruptor,太強了

 


高性能無鎖并發框架Disruptor,太強了

 

當然,Disruptor性能高并不是必然的,所以,是否使用還得經過測試。

Disruptor的最常用的場景就是“生產者-消費者”場景,對場景的就是“一個生產者、多個消費者”的場景,并且要求順序處理。

舉個例子,我們從MySQL的BigLog文件中順序讀取數據,然后寫入到ElasticSearch(搜索引擎)中。在這種場景下,BigLog要求一個文件一個生產者,那個是一個生產者。而寫入到ElasticSearch,則嚴格要求順序,否則會出現問題,所以通常意義上的多消費者線程無法解決該問題,如果通過加鎖,則性能大打折扣

作者:月伴飛魚
鏈接:https://juejin.im/post/6869795029800452103
來源:掘金

分享到:
標簽:Disruptor
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定