前言
在 JAVA 中你不了解異步編程,crud 完全沒有問題,但是有的需求你無法優雅的實現。
js 也存在異步編程,當你理解了用同步的思維編寫異步的代碼時,相信你在編程上的造詣又更進一步。
大多人都在追捧微服務,可能他們只會用 Ribbon 和 Feign。微服務是一個架構上的選擇,當你沒有達到架構層次時,我認為你應該更加注重業務上的代碼編寫,即微服務中單體服務代碼的編寫。
單體服務性能極差,你的微服務整體性能也好不到哪里去,只能通過限流、熔斷外加多部署機器來解決并發低的問題。在你想玩微服務之前,并發玩好了再考慮高并發。先把 java 中 juc 包下的并發相關的知識整的明明白白再進行下一步,這花不了幾個時間。微服務是你進階之后再學的。
本來打算繼續寫 MySQL,但實在提不起來我的興致(還需要看書研究,畢竟是個黑盒研究),只好拿這篇完成任務了。
本文內容
- js 中 Promise 和 async await 的一個列子
- SpringBoot 中異步編程
- Future
- CompletableFuture
js 異步編程
要習慣使用 Promise ,避免把 fn 當成參數傳遞,避免回調地獄。這不僅僅是 api 調用的問題,這是你編程思想轉變。
const awaitFunc = function _awaitFunc() {
return Promise.resolve('awaitFunc').then(data => {
console.log(data);
return 'awaitFunc-then-return-data';
});
};
const async = async function _async() {
setTimeout(() => {
console.log('驗證加入了宏任務隊列---1');
}, 0);
// 加不加 await 有什么區別?
await awaitFunc().then(data => {
console.log(data);
setTimeout(() => {
console.log('驗證加入了宏任務隊列---2');
}, 0);
});
console.log('awaitFunc 執行完在打印');
};
async();
SpringBoot 中異步編程
在 SpringBoot @EnableAsync 和 @Async 就可以助你異步編程。底層原理就是 ThreadPoolExecutor 和 Future 的封裝。
我們拿這個燒水舉例子,當你同步串行執行,需要消耗 20 分鐘。同步編程思維模型較簡單,容易實現。
當你多線程異步執行,只需要消耗 16 分鐘。異步編程思維模型稍微復雜一點,多線程之間通信異步轉同步是一個挑戰。
@GetMApping("/tea/async")
public RetUtil makeTeaAsync() throws InterruptedException, ExecutionException {
// Stopwatch 用于計算代碼執行時間
final Stopwatch started = Stopwatch.createStarted();
final Future asyncResult = makeTeaService.boilWater();
final Future asyncResult1 = makeTeaService.washTeaCup();
asyncResult.get();
asyncResult1.get();
final long elapsed = started.elapsed(TimeUnit.SECONDS);
String str = StrUtil.format("任務執行了 {} 秒", elapsed);
final MakeTeaVO makeTeaVO = new MakeTeaVO();
makeTeaVO.setMessage(str);
return RetUtil.success(makeTeaVO);
}
@Service
public class IMakeTeaServiceImpl implements IMakeTeaService {
@Override
@Async
public AsyncResult<String> boilWater() throws InterruptedException {
System.out.println("洗水壺");
TimeUnit.SECONDS.sleep(1);
System.out.println("燒開水");
TimeUnit.SECONDS.sleep(15);
return new AsyncResult("洗水壺->燒開水");
}
@Override
@Async
public AsyncResult<String> washTeaCup() throws InterruptedException {
System.out.println("洗茶杯");
System.out.println("洗茶壺");
System.out.println("拿茶葉");
TimeUnit.SECONDS.sleep(4);
return new AsyncResult("洗茶杯,洗茶壺,拿茶葉");
}
}
AsyncResult 是 Future 的實現類,當調用 Future.get 會阻塞等待結果的返回。@Async 也可以指定在那個線程池中執行任務。
final Future asyncResult = makeTeaService.boilWater();
final Future asyncResult1 = makeTeaService.washTeaCup();
asyncResult.get();
asyncResult1.get();
這個 Demo 的實現,需要調用兩次 Furute.get() 算是個不優雅的實現。
@Override
public String makeTea() throws InterruptedException {
final CountDownLatch count = new CountDownLatch(2);
THREAD_POOL_EXECUTOR.execute(() -> {
System.out.println("洗水壺");
System.out.println("燒開水");
try {
TimeUnit.SECONDS.sleep(16);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
count.countDown();
}
});
THREAD_POOL_EXECUTOR.execute(() -> {
System.out.println("洗茶杯");
System.out.println("洗茶壺");
System.out.println("拿茶葉");
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
count.countDown();
}
});
count.await();
System.out.println("泡茶");
return "";
}
@GetMapping("/tea/async2")
public RetUtil makeTeaAsync2() throws InterruptedException, ExecutionException {
final Stopwatch started = Stopwatch.createStarted();
makeTeaService.makeTea();
final long elapsed = started.elapsed(TimeUnit.SECONDS);
String str = StrUtil.format("任務執行了 {} 秒", elapsed);
final MakeTeaVO makeTeaVO = new MakeTeaVO();
makeTeaVO.setMessage(str);
return RetUtil.success(makeTeaVO);
}
使用 CountDownLatch 將異步代碼轉換為同步返回,這只是另一個實現
Future
public interface Future<V> {
/**
* 嘗試取消這個任務的執行.
* 如果任務執行完成之后,調用 cancel 返回 false.
* 如果任務已經被取消了,調用 cancel 也會返回 false
*
* 如果任務已經執行了, mayInterruptIfRunning 標志是否中斷執行任務的線程.
* mayInterruptIfRunning 為 true 會觸發線程的中斷(當線程睡眠,會拋出異常 InterruptedException),
* 為 false 時不中斷任務執行,只改變 Future 的狀態
*
* 調用了 cancel 方法,調用 get 方法會拋出異常
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 任務完成之前調用 cancel ,此方法返回 true
*/
boolean isCancelled();
/**
* 任務完成返回 true
*/
boolean isDone();
/**
* 等待任務完成,然后返回其結果
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待任務完成,然后返回其結果.超時沒有返回,拋出異常 TimeoutException
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Future.cancel(true) 會觸發線程休眠的中斷,即 TimeUnit.SECONDS.sleep(10); 會拋出異常。
Future.cancel(true) 或者 Future.cancel(false) 都會觸發 Future.get() 異常。
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Future<String> submit = THREAD_POOL_EXECUTOR.submit(() -> {
System.out.println("任務開始執行");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務執行完畢");
return "ok";
});
THREAD_POOL_EXECUTOR.execute(() -> {
System.out.println("執行 submit.cancel");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
submit.cancel(false);
});
// submit.get();
System.out.println("整個流程執行結束");
}
JDK 提供 Future 的實現 FutureTask 源碼相對較簡單,不再展開。
CompletableFuture
由于 Future 使用的局限性:不能鏈式調用、多個異步計算的結果不能傳遞下一個異步任務(可以做到,但是編程稍微復雜),異步執行異常的捕獲處理
從 JDK 1.8 開始,大佬 Doug Lea 帶來了更加容易的異步編程模型,CompletableFuture。
CompletableFuture 可以做到
1、獲取異步執行的結果鏈式傳遞下一個異步去執行
2、異步執行時,你有機會處理異步執行時發生的異常
總之,CompletableFuture 很想。
CompletableFuture 實現比較復雜,有的地方不是那么容易理解,當你理解其實現思想,你也算是一只腳邁入了響應式編程中去了。
開胃小菜
public class CompletableFutureBlog1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
// 洗水壺,燒水
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("洗水壺");
System.out.println("燒水");
try {
TimeUnit.SECONDS.sleep(16);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "洗水壺 -> 燒水";
});
// 洗茶壺,洗茶杯 -> 拿茶葉
CompletableFuture<String> completableFuture2 =
CompletableFuture.supplyAsync(() -> {
System.out.println("洗茶壺");
System.out.println("洗茶杯");
System.out.println("拿茶葉");
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "洗茶壺,洗茶杯 -> 拿茶葉";
});
// 組合二者異步運算的結果,傳遞給方法計算
final CompletableFuture<String> completableFuture = completableFuture2.thenCombine(completableFuture1, (result2, result1) -> {
System.out.println(StrUtil.format("result2 是 洗茶壺,洗茶杯 -> 拿茶葉: {}", result2));
System.out.println(StrUtil.format("result1 是 洗水壺 -> 燒水: {}", result1));
System.out.println("泡茶");
return "泡茶";
});
completableFuture.get();
System.out.println("執行時間: " + started.elapsed(TimeUnit.SECONDS));
}
}
runAsync 和 supplyAsync 的區別
runAsync 和 supplyAsync 區別就是你是否需要獲取異步計算的結果。當你需要異步處理的結果,你需要 supplyAsync
public class CompletableFutureBlog2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
final CompletableFuture<Integer> ret = CompletableFuture.supplyAsync(() -> {
System.out.println("開始進行耗時的異步計算,消耗 3 秒");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
final Integer integer = ret.get();
System.out.println(StrUtil.format("異步執行的結果: {}", integer));
System.out.println("執行時間: " + started.elapsed(TimeUnit.SECONDS));
}
}
thenApplyAsync 、thenAcceptAsync 和 thenRunAsync
thenXX 都是為了在上一個異步計算的結束之后執行。
我們對異步計算的結果分為以下幾個情況:
-
需要依賴異步計算的結果,并且依賴異步計算的結果計算返回另個一個結果
thenApplyAsync -
依賴異步計算的結果,但是不會產生新的結果,
thenAcceptAsync -
不依賴計算計算的結果,并且沒有返回值
thenRunAsync
public class CompletableFutureBlog3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
final CompletableFuture<Integer> ret = CompletableFuture.supplyAsync(() -> {
System.out.println("開始進行耗時的異步計算,消耗 3 秒");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
final Integer result = ret.thenApplyAsync(data -> {
System.out.println("依賴上一個異步計算,消耗 5 秒");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return data + 12;
}).get();
System.out.println(StrUtil.format("異步執行的結果: {}", result));
System.out.println("執行時間: " + started.elapsed(TimeUnit.SECONDS));
}
}
thenCombineAsync
結合另一個 CompletableFuture 異步計算,當兩個異步計算執行完了,執行回調。
計算一個耗時的計算。將這個耗時計算拆成兩個耗時的異步計算,當兩個異步計算結束,在合并最終的結果
public class CompletableFutureBlog4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
final CompletableFuture<Integer> ret1 = CompletableFuture.supplyAsync(() -> {
System.out.println("開始進行耗時的異步計算,消耗 3 秒");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
final CompletableFuture<Integer> ret2 = CompletableFuture.supplyAsync(() -> {
System.out.println("開始進行耗時的異步計算,消耗 5 秒");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
final CompletableFuture<Integer> integerCompletableFuture = ret2.thenCombineAsync(ret1, (result1, result2) -> result1 + result2);
final Integer result = integerCompletableFuture.get();
System.out.println(StrUtil.format("異步執行的結果: {}", result));
System.out.println("執行時間: " + started.elapsed(TimeUnit.SECONDS));
}
}
allOf 和 anyOf
可以組合多個 CompletableFuture ,當每個 CompletableFuture 都執行完,執行后續邏輯。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
可以組合多個 CompletableFuture ,當任何一個 CompletableFuture 都執行完,執行后續邏輯。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
future,future2,future3 執行完之后,再執行后續邏輯。
public class CompletableFutureBlog5 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Stopwatch started = Stopwatch.createStarted();
final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
});
final CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
});
final CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(1);
});
final CompletableFuture<Void> future1 = CompletableFuture.allOf(future3, future2, future);
future1.get();
System.out.println("執行時間: " + started.elapsed(TimeUnit.SECONDS));
}
}
將上述 demo 中 allOf 替換為 anyOf,當任一 CompletableFuture 執行完畢,future1.get(); 就會返回結果。
別的方法看參數和注釋就學會了。就不再一一列舉了。
當使用的時候,先考慮要不要依賴異步計算的結果,要不要處理異常,要不要返回新的異步計算結果,從這幾個方面就可以知道選擇哪個 api 了。
本文由 張攀欽的博客 http://www.mflyyou.cn/ 創作。 可自由轉載、引用,但需署名作者且注明文章出處。
如轉載至微信公眾號,請在文末添加作者公眾號二維碼。微信公眾號名稱:Mflyyou






