教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

Java培訓(xùn):java高性能并發(fā)計數(shù)器之巔峰對決

更新時間:2023年01月03日14時56分 來源:傳智教育 瀏覽次數(shù):

好口碑IT培訓(xùn)

  并發(fā)計數(shù)器各個方案介紹

  方案概述

  1. jdk5提供的原子更新長整型類 AtomicLong

  2. synchronized

  3. jdk8提供的 LongAdder 【單機(jī)推薦】

  4. Redisson分布式累加器【分布式推薦】

  方案介紹

  jdk5提供的原子更新長整型類 AtomicLong

  在JDK1.5開始就新增了并發(fā)的Integer/Long的操作工具類AtomicInteger和AtomicLong。

  AtomicLong 利用底層操作系統(tǒng)的CAS來保證原子性,在一個死循環(huán)內(nèi)不斷執(zhí)行CAS操作,直到操作成功。不過,CAS操作的一個問題是在并發(fā)量比較大的時候,可能很多次的執(zhí)行CAS操作都不成功,這樣性能就受到較大影響。

  示例代碼

AtomicLong value = new AtomicLong(0);  //定義
incrementAndGet(); //遞增1
```

  synchronized

  synchronized是一個重量級鎖,主要是因為線程競爭鎖會引起操作系統(tǒng)用戶態(tài)和內(nèi)核態(tài)切換,浪費(fèi)資源效率不高,在jdk1.5之前,synchronized沒有做任何優(yōu)化,但在jdk1.6做了性能優(yōu)化,它會經(jīng)歷偏向鎖,輕量級鎖,最后才到重量級鎖這個過程,在性能方面有了很大的提升,在jdk1.7的ConcurrentHashMap是基于ReentrantLock的實現(xiàn)了鎖,但在jdk1.8之后又替換成了synchronized,就從這一點可以看出JVM團(tuán)隊對synchronized的性能還是挺有信心的。下面我們分別來介紹下無鎖,偏向鎖,輕量級鎖,重量級鎖。

  jdk8提供的 LongAdder 【單機(jī)推薦】

  在JDK8中又新增了LongAdder,這是一個針對Long類型的數(shù)據(jù)的操作工具類。

  那我們知道,在ConcurrentHashMap中,對Map分割成多個segment,這樣多個Segment的操作就可以并行執(zhí)行,從而可以提高性能。在JDK8中,LongAdder與ConcurrentHashMap類似,將內(nèi)部操作數(shù)據(jù)value分離成一個Cell數(shù)組,每個線程訪問時,通過Hash等算法映射到其中一個Cell上。

  計算最終的數(shù)據(jù)結(jié)果,則是各個Cell數(shù)組的累計求和。

  LongAddr常用api方法

add():  //增加指定的數(shù)值;
increament(): //增加1;
decrement(): //減少1;
intValue();  //intValue();/floatValue()/doubleValue():得到最終計數(shù)后的結(jié)果
sum()://求和,得到最終計數(shù)結(jié)果
sumThenReset()://求和得到最終計數(shù)結(jié)果,并重置value。
```

  Redisson分布式累加器【分布式推薦】

  基于Redis的Redisson分布式整長型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的接口。通過利用客戶端內(nèi)置的LongAdder對象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計其性能最高比分布式AtomicLong對象快 10000 倍以上。

RLongAddr itheimaLongAddr = redission.getLongAddr("itheimaLongAddr");
itheimaLongAddr.add(100);  //添加指定數(shù)量
itheimaLongAddr.increment(); //遞增1
itheimaLongAddr.increment(); //遞減1
itheimaLongAddr.sum(); //聚合求和
```

  基于Redis的Redisson分布式雙精度浮點累加器(DoubleAdder)采用了與java.util.concurrent.atomic.DoubleAdder類似的接口。通過利用客戶端內(nèi)置的DoubleAdder對象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計其性能最高比分布式AtomicDouble對象快 12000 倍。

  示例代碼

RLongDouble itheimaDouble = redission.getLongDouble("itheimaLongDouble");
itheimaDouble.add(100);  //添加指定數(shù)量
itheimaDouble.increment(); //遞增1
itheimaDouble.increment(); //遞減1
itheimaDouble.sum(); //聚合求和
```

  以上【整長型累加器】和【雙精度浮點累加器】完美適用于分布式統(tǒng)計計量場景。

  各個方案性能測試

  測試代碼

```
package com.itheima._01性能比較;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * @author 黑馬程序員
 */
public class CountTest {

    private int count = 0;

    @Test
    public void startCompare() {
        compareDetail(1, 100 * 10000);
        compareDetail(20, 100 * 10000);
        compareDetail(30, 100 * 10000);
        compareDetail(40, 100 * 10000);
        compareDetail(100, 100 * 10000);
    }

    /**
     * @param threadCount 線程數(shù)
     * @param times 每個線程增加的次數(shù)
     */
    public void compareDetail(int threadCount, int times) {
        try {
            System.out.println(String.format("threadCount: %s, times: %s", threadCount, times));
            long start = System.currentTimeMillis();
            testSynchronized(threadCount, times);
            System.out.println("testSynchronized cost: " + (System.currentTimeMillis() - start));

            start = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("testAtomicLong cost: " + (System.currentTimeMillis() - start));

            start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("testLongAdder cost: " + (System.currentTimeMillis() - start));
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void testSynchronized(int threadCount, int times) throws InterruptedException {
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    add();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }

    public synchronized void add() {
        count++;
    }

    public void testAtomicLong(int threadCount, int times) throws InterruptedException {
        AtomicLong count = new AtomicLong();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    count.incrementAndGet();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }

    public void testLongAdder(int threadCount, int times) throws InterruptedException {
        LongAdder count = new LongAdder();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    count.increment();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }
}
```

  運(yùn)行結(jié)果

threadCount: 1, times: 1000000
testSynchronized cost: 69
testAtomicLong cost: 16
testLongAdder cost: 15

threadCount: 20, times: 1000000
testSynchronized cost: 639
testAtomicLong cost: 457
testLongAdder cost: 59

threadCount: 30, times: 1000000
testSynchronized cost: 273
testAtomicLong cost: 538
testLongAdder cost: 70

threadCount: 40, times: 1000000
testSynchronized cost: 312
testAtomicLong cost: 717
testLongAdder cost: 81

threadCount: 100, times: 1000000
testSynchronized cost: 719
testAtomicLong cost: 2098
testLongAdder cost: 225
```

  結(jié)論

  

1661393077095_1.png

  并發(fā)量比較低的時候AtomicLong優(yōu)勢比較明顯,因為AtomicLong底層是一個樂觀鎖,不用阻塞線程,不斷cas即可。但是在并發(fā)比較高的時候用synchronized比較有優(yōu)勢,因為大量線程不斷cas,會導(dǎo)致cpu持續(xù)飆高,反而會降低效率

  LongAdder無論并發(fā)量高低,優(yōu)勢都比較明顯。且并發(fā)量越高,優(yōu)勢越明顯

  原理分析

  AtomicLong 實現(xiàn)原子操作原理

  非原子操作示例代碼

package com.itheima._02Unsafe測試;

import java.util.ArrayList;
import java.util.List;

/**
 * @author 黑馬程序員
 */
public class Test1 {

    private int value = 0;

    public static void main(String[] args) throws InterruptedException {
        Test1 test1 = new Test1();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value);
        //結(jié)果,期待值:10000,最終結(jié)果值:xxxx
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動100個線程,每個線程對value進(jìn)行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value++;
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運(yùn)行完成
        for (Thread thread : list) {
            thread.join();
        }
    }

}
```

  運(yùn)行效果

1661393298046_2.jpg

  結(jié)論

  > 可以發(fā)現(xiàn)輸出的結(jié)果值錯誤,這是因為 `value++` 不是一個原子操作,它將 `value++` 拆分成了 3 個步驟 `load、add、store`,多線程并發(fā)有可能上一個線程 add 過后還沒有 store 下一個線程又執(zhí)行了 load 了這種重復(fù)造成得到的結(jié)果可能比最終值要小。

  AtomicLong是JDK1.5提供的原子操作示例代碼

package com.itheima._03AtomicLong的CAS原子操作示例;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author 黑馬程序員
 */
public class Test2 {

    private AtomicLong value = new AtomicLong(0);

    public static void main(String[] args) throws InterruptedException {
        Test2 test1 = new Test2();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value);
        //結(jié)果,期待值:10000,最終結(jié)果值:10000
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動100個線程,每個線程對value進(jìn)行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value.incrementAndGet();
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運(yùn)行完成
        for (Thread thread : list) {
            thread.join();
        }
    }

}
```

  運(yùn)行效果

1661393347277_3.jpg

  AtomicLong CAS原理介紹

  1.使用volatile保證內(nèi)存可見性,獲取主存中最新的操作數(shù)據(jù)

  2.使用CAS(Compare-And-Swap)操作保證數(shù)據(jù)原子性

  CAS算法是jdk對并發(fā)操作共享數(shù)據(jù)的支持,包含了3個操作數(shù)

  第一個操作數(shù):內(nèi)存值value(V)

  第二個操作數(shù):預(yù)估值expect(O)

  第三個操作數(shù):更新值new(N)

  含義:CAS比較交換的過程可以通俗的理解為CAS(V,O,N),包含三個值分別為:V 內(nèi)存地址(主存)存放的實際值;O 預(yù)期的值(舊值);N 更新的新值。當(dāng)V和O相同時,也就是說舊值和內(nèi)存中實際的值相同表明該值沒有被其他線程更改過,即該舊值O就是目前來說最新的值了,自然而然可以將新值N賦值給V;當(dāng)V和O不相同時,會一致循環(huán)下去直至修改成功。

  AtomicLong底層CAS實現(xiàn)原子操作原理

  查看incrementAndGet()方法源碼

public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
```

getAndAddLong方法源碼

```java
public final long getAndAddLong(Object var1, long var2, long var4) {
    long var6;
    do {
        var6 = this.getLongVolatile(var1, var2);
    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

    return var6;
}
```

  > 這里是一個循環(huán)CAS操作

  compareAndSwapLong方法源碼

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
```

  我們發(fā)現(xiàn)調(diào)用的是 native 的 `unsafe.compareAndSwapLong(Object obj, long valueOffset, Long expect, Long update)`,我們翻看 Hotspot 源碼發(fā)現(xiàn)在 unsafe.cpp 中定義了這樣一段代碼

  > Unsafe中基本都是調(diào)用native方法,那么就需要去JVM里面找對應(yīng)的實現(xiàn)。

  >

  > 到`http://hg.openjdk.java.net/` 進(jìn)行一步步選擇下載對應(yīng)的hotspot版本,我這里下載的是`http://hg.openjdk.java.net/jdk8u/jdk8u60/hotspot/archive/tip.tar.gz`,

  >

  > 然后解hotspot目錄,發(fā)現(xiàn) `\src\share\vm\prims\unsafe.cpp`,這個就是對應(yīng)jvm相關(guān)的c++實現(xiàn)類了。

  >

  > 比如我們對CAS部分的實現(xiàn)很感興趣,就可以在該文件中搜索compareAndSwapInt,此時可以看到對應(yīng)的JNI方法為`Unsafe_CompareAndSwapInt`

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
  UnsafeWrapper("Unsafe_CompareAndSwapLong");
  Handle p (THREAD, JNIHandles::resolve(obj));
  jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
#ifdef SUPPORTS_NATIVE_CX8
  return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
#else
  if (VM_Version::supports_cx8())
    return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
  else {
    jboolean success = false;
    MutexLockerEx mu(UnsafeJlong_lock, Mutex::_no_safepoint_check_flag);
    jlong val = Atomic::load(addr);
    if (val == e) { Atomic::store(x, addr); success = true; }
    return success;
  }
#endif
UNSAFE_END
```

  Atomic::cmpxchg c++源碼

  可以看到調(diào)用了“Atomic::cmpxchg”方法,“Atomic::cmpxchg”方法在linux_x86和windows_x86的實現(xiàn)如下。

  linux_x86的實現(xiàn):

1661393503544_4.jpg

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
```

  windows_x86的實現(xiàn)(c++源文件):

1661393567504_5.jpg

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}
```

  Atomic::cmpxchg方法解析:

  mp是“os::is_MP()”的返回結(jié)果,“os::is_MP()”是一個內(nèi)聯(lián)函數(shù),用來判斷當(dāng)前系統(tǒng)是否為多處理器。

  如果當(dāng)前系統(tǒng)是多處理器,該函數(shù)返回1。

  否則,返回0。

  LOCK_IF_MP(mp)會根據(jù)mp的值來決定是否為cmpxchg指令添加lock前綴。

  如果通過mp判斷當(dāng)前系統(tǒng)是多處理器(即mp值為1),則為cmpxchg指令添加lock前綴。

  否則,不加lock前綴。

  這是一種優(yōu)化手段,認(rèn)為單處理器的環(huán)境沒有必要添加lock前綴,只有在多核情況下才會添加lock前綴,因為lock會導(dǎo)致性能下降。cmpxchg是匯編指令,作用是比較并交換操作數(shù)。

  > 底層會調(diào)用cmpxchg匯編指令,如果是多核處理器會加鎖實現(xiàn)原子操作

  反匯編指令查詢

  查看java程序運(yùn)行的匯編指令資料

1661393650563_6.jpg

  將上圖2個文件拷貝到j(luò)re\bin目錄下,如下圖

1661393663582_7.jpg

  配置運(yùn)行參數(shù)

  ```

  -server -Xcomp -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -XX:CompileCommand=compileonly,*

  ```

1661393691738_8.jpg

1661393707679_9.jpg

1661393719174_10.jpg

  運(yùn)行Test2效果

1661393737130_11.jpg

  synchronized 實現(xiàn)同步操作原理

  鎖對象

  java中任何一個對象都可以稱為鎖對象,原因在于java對象在內(nèi)存中存儲結(jié)構(gòu),如下圖所示:

1661393750947_12.jpg

  在對象頭中主要存儲的主要是一些運(yùn)行時的數(shù)據(jù),如下所示:

1661393763314_13.jpg

  其中 在Mark Work中存儲著該對象作為鎖時的一些信息,如下所示是Mark Work中在64位系統(tǒng)中詳細(xì)信息:

1661393777046_14.jpg

  偏向鎖

  在無競爭環(huán)境中(沒有并發(fā))使用一種鎖

  > 偏向鎖的作用是當(dāng)有線程訪問同步代碼或方法時,線程只需要判斷對象頭的Mark Word中判斷一下是否有偏向鎖指向線程ID.

  >

  > 偏向鎖記錄過程

  >

  > - 線程搶到了對象的同步鎖(鎖標(biāo)志為01參考上圖即無其他線程占用)

  > - 對象Mark World 將是否偏向標(biāo)志位設(shè)置為1

  > - 記錄搶到鎖的線程ID

  > - 進(jìn)入偏向狀態(tài)

  輕量級鎖

  當(dāng)有另外一個線程競爭獲取這個鎖時,由于該鎖已經(jīng)是偏向鎖,當(dāng)發(fā)現(xiàn)對象頭 Mark Word 中的線程 ID 不是自己的線程 ID,就會進(jìn)行 CAS 操作獲取鎖,**如果獲取成功**,直接替換 Mark Word 中的線程 ID 為自己的 ID,該鎖會保持偏向鎖狀態(tài);**如果獲取鎖失敗**,代表當(dāng)前鎖有一定的競爭,偏向鎖將升級為輕量級鎖。

  - 舉個例子來說明一下什么時候需要升級偏向鎖

  假設(shè)A線程 持有鎖 X(此時X是偏向鎖) 這是有個B線程也同樣用到了鎖X,而B線程在檢查鎖對象的Mark World時發(fā)現(xiàn)偏向鎖的線程ID已經(jīng)指向了線程A。這時候就需要升級鎖X為輕量級鎖。輕量級鎖意味著標(biāo)示該資源現(xiàn)在處于競爭狀態(tài)。

  當(dāng)有其他線程想訪問加了輕量級鎖的資源時,會使用自旋鎖優(yōu)化,來進(jìn)行資源訪問。

  > 自旋策略

  >

  > JVM 提供了一種自旋鎖,可以通過自旋方式不斷嘗試獲取鎖,從而避免線程被掛起阻塞。這是基于大多數(shù)情況下,線程持有鎖的時間都不會太長,畢竟線程被掛起阻塞可能會得不償失。

  >

  > 從 JDK1.7 開始,自旋鎖默認(rèn)啟用,自旋次數(shù)由 JVM 設(shè)置決定,這里我不建議設(shè)置的重試次數(shù)過多,因為 CAS 重試操作意味著長時間地占用 CPU。自旋鎖重試之后如果搶鎖依然失敗,同步鎖就會升級至重量級鎖,鎖標(biāo)志位改為 10。在這個狀態(tài)下,未搶到鎖的線程都會進(jìn)入 Monitor,之后會被阻塞在 _WaitSet 隊列中。

  重量級鎖

  自旋失敗,很大概率 再一次自選也是失敗,因此直接升級成重量級鎖,進(jìn)行線程阻塞,減少cpu消耗。

  當(dāng)鎖升級為重量級鎖后,未搶到鎖的線程都會被阻塞,進(jìn)入阻塞隊列。

  重量級鎖在高并發(fā)下性能就會變慢,因為所有沒有獲取鎖的線程會進(jìn)行阻塞等待,到獲取鎖的時候被喚醒,這些操作都是消耗很多資源。

  輕量級鎖膨脹流程圖

1661393802532_15.jpg

  LongAdder 實現(xiàn)原子操作原理

  LongAdder實現(xiàn)高并發(fā)計數(shù)實現(xiàn)思路

  LongAdder實現(xiàn)高并發(fā)的秘密就是用空間換時間,對一個值的cas操作,變成對多個值的cas操作,當(dāng)獲取數(shù)量的時候,對這多個值加和即可。

1661393816329_16.jpg

  測試代碼

```
package com.itheima._04LongAddr使用測試;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;

/**
 * @author 黑馬程序員
 */
public class Test3 {

    private LongAdder value = new LongAdder(); //默認(rèn)初始值0

    public static void main(String[] args) throws InterruptedException {
        Test3 test1 = new Test3();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value.sum());
        //結(jié)果,期待值:10000,最終結(jié)果值:10000
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動100個線程,每個線程對value進(jìn)行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value.increment();
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運(yùn)行完成
        for (Thread thread : list) {
            thread.join();
        }

    }

}
```

  源碼分析

  1. 先對base變量進(jìn)行cas操作,cas成功后返回

  2. 對線程獲取一個hash值(調(diào)用getProbe),hash值對數(shù)組長度取模,定位到cell數(shù)組中的元素,對數(shù)組中的元素進(jìn)行cas

  增加數(shù)量源碼

public void increment() {
    add(1L);
}
```

```java
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}
```

  當(dāng)數(shù)組不為空,并且根據(jù)線程hash值定位到數(shù)組某個下標(biāo)中的元素不為空,對這個元素cas成功則直接返回,否則進(jìn)入longAccumulate方法

1661393945975_17.jpg

  1. cell數(shù)組已經(jīng)初始化完成,主要是在cell數(shù)組中放元素,對cell數(shù)組進(jìn)行擴(kuò)容等操作

  2. cell數(shù)組沒有初始化,則對數(shù)組進(jìn)行初始化

  3. cell數(shù)組正在初始化,這時其他線程利用cas對baseCount進(jìn)行累加操作

  完整代碼

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}
```

  獲取計算數(shù)量源碼

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
```

  需要注意的是,調(diào)用sum()返回的數(shù)量有可能并不是當(dāng)前的數(shù)量,因為在調(diào)用sum()方法的過程中,可能有其他數(shù)組對base變量或者cell數(shù)組進(jìn)行了改動,所以需要確保所有線程運(yùn)行完再獲取就是準(zhǔn)確值

  LongAdder 的前世今生

  其實在 Jdk1.7 時代,LongAdder 還未誕生時,就有一些人想著自己去實現(xiàn)一個高性能的計數(shù)器了,比如一款 Java 性能監(jiān)控框架 dropwizard/metrics 就做了這樣事,在早期版本中,其優(yōu)化手段并沒有 Jdk1.8 的 LongAdder 豐富,而在 metrics 的最新版本中,其已經(jīng)使用 Jdk1.8 的 LongAdder 替換掉了自己的輪子。在最后的測評中,我們將 metrics 版本的 LongAdder 也作為一個參考對象。

  應(yīng)用場景

  AtomicLong等原子類的使用

  并發(fā)少競爭少(讀多寫少)的計數(shù)原子操作

  LongAdder 的使用

  高性能計數(shù)器的首選方案, 單體項目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器

  應(yīng)用場景功能:獲取全局自增id值

  Synchronized與Lock的使用比較

  Synchronized 適合少量的同步并發(fā)競爭

  Lock 適合大量的同步并發(fā)競爭

1661394039753_18.jpg

  總結(jié)

  并發(fā)情況優(yōu)化鎖思路:

  互斥鎖 -> 樂觀鎖 -> 鎖的粒度控制

  在Java中對應(yīng)的實現(xiàn)方式:

  ReentrantLock或者Syschronized -> CAS + Volatile -> 拆分競爭點(longAddr,分布式累加器,ConcurrentHashMap等)

  ReentrantLock或者Syschronized 在高并發(fā)時都存在獲取鎖等待、阻塞、喚醒等操作,所以在使用的使用注意拆分競爭點。

  AtomicLong

  1. 并發(fā)量非常高,可能導(dǎo)致都在不停的爭搶該值,可能導(dǎo)致很多線程一致處于循環(huán)狀態(tài)而無法更新數(shù)據(jù),從而導(dǎo)致 CPU 資源的消耗過高。解決這個問題需要使用LongAdder

  2. ABA 問題,比如說上一個線程增加了某個值,又改變了某個值,然后后面的線程以為數(shù)據(jù)沒有發(fā)生過變化,其實已經(jīng)被改動了。解決這個問題請參考《擴(kuò)展:原子更新字段類-ABA問題解決》

  synchronized

  synchronized鎖升級實際上是把本來的悲觀鎖變成了 在一定條件下 使用無所(同樣線程獲取相同資源的偏向鎖),以及使用樂觀(自旋鎖 cas)和一定條件下悲觀(重量級鎖)的形式。

  偏向鎖:適用于單線程適用鎖的情況

  輕量級鎖:適用于競爭較不激烈的情況(這和樂觀鎖的使用范圍類似)

  重量級鎖:適用于競爭激烈的情況

  LongAdder

  - AtomicLong :并發(fā)場景下讀性能優(yōu)秀,寫性能急劇下降,不適合作為高性能的計數(shù)器方案。內(nèi)需求量少。

  - LongAdder :并發(fā)場景下寫性能優(yōu)秀,讀性能由于組合求值的原因,不如直接讀值的方案,但由于計數(shù)器場景寫多讀少的緣故,整體性能在幾個方案中最優(yōu),是高性能計數(shù)器的首選方案。由于 Cells 數(shù)組以及緩存行填充的緣故,占用內(nèi)存較大。

  最佳方案

  高性能計數(shù)器的首選方案, 單體項目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器

  應(yīng)用場景功能:獲取全局自增id值

0 分享到:
和我們在線交談!