一、背景:
使用JDK線程池ThreadPoolExecutor多線程異步執行批量插入、更新等操作方法,提高百萬級數據插入效率。
二、具體細節:

2.1、創建自適應機器本身線程數量的線程池
/創建自適應機器本身線程數量的線程池Integer processNum = Runtime.getRuntime().avAIlableProcessors();int corePoolSize = (int) (processNum / (1 - 0.2));int maxPoolSize = (int) (processNum / (1 - 0.5));ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,maxPoolSize,2L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());@Overridepublic boolean batchInsert(List<Student> list) throws Exception {Future<Boolean> a = null;try {/*** submit與execute 都是向線程池提交任務。* submit提交后執行提交類實現callable方法后重寫的call方法,execute提交后執行實現Runnable的run方法* Runnable任務沒有返回值,而Callable任務有返回值。* 并且Callable的call()方法只能通過ExecutorService的submit(Callable <T> task) 方法來執行* 多人同時提交時的線程控制:多線程多任務*/a = executorService.submit(new BatchWay(list,studentService));return a.get();} catch (Exception e) {e.printStackTrace();try {return a.get();} catch (Exception ex) {ex.printStackTrace();return false;}}}
2.2、業務核心處理類:@Slf4jpublic class BatchWay implements Callable<Boolean> { private int batch100 = 100; //100條為分界批量導入 private List<Student> list; //list中的大量數據 private StudentService studentService;
//有參的構造函數,方便初始化其類public BatchWay(List<Student> list, StudentService studentService) {this.list = list;this.studentService = studentService;}/**線程池*/// private ThreadPoolExecutor threadPoolExecutor =// new ThreadPoolExecutor(// 10, //corePoolSize:線程池中核心線程數// Runtime.getRuntime().availableProcessors(), //線程池中能擁有最多線程數 取所有// 5L, //keepAliveTime:表示空閑線程的存活時間 2秒// TimeUnit.SECONDS, //表示keepAliveTime的單位:秒// new LinkedBlockingQueue<>(100), //用于緩存任務的阻塞隊列//// Executors.defaultThreadFactory(),// new ThreadPoolExecutor.CallerRunsPolicy()// );/*** 功能描述:實現Callable的call方法* @MethodName: call* @MethodParam: []* @Return: JAVA.lang.Boolean* @Author: yyalin* @CreateDate: 2022/5/6 15:46*/public Boolean call(){try {batchOp(list);return true;} catch (Exception e) {e.printStackTrace();}return false;}/*** 功能描述:批量保存數據* @MethodName: batchOp* @MethodParam: [list]* @Return: void* @Author: yyalin* @CreateDate: 2022/5/6 15:40*/private void batchOp(List<Student> list) {if(!list.isEmpty()){Integer size = list.size();if(size<=batch100){//小于分批的直接插入即可studentService.saveBatch(list);}else if(size>batch100){//分批后再進行保存數據batchOpSpilit(list,batch100);}}}/*** 功能描述:對list進行切割* @MethodName: batchOpSpilit* @MethodParam: [list, batch100]* @Return: void* @Author: yyalin* @CreateDate: 2022/5/6 15:43*/private void batchOpSpilit(List<Student> list, int batch100) {log.info("開始切割………………");List<List<Student>> list1 = SplitListUtils.pagingList(list, batch100);try {for (List<Student> list2 : list1) {batchOp(list2);// threadPoolExecutor.allowCoreThreadTimeOut(true);// //再調batchOp方法,這里的多線程是多個小集合往數據庫插入// threadPoolExecutor.execute(() -> {//// log.info("我是線程開始保存數據...:" + Thread.currentThread().getName());// batchOp(list2);// });}// log.info("當前線程池剩余的數量222222:"+threadPoolExecutor.getPoolSize());} catch (Exception e) {// log.info("出現異常:"+e);} finally {//最后關閉線程 不允許提交新的任務,但是會處理完已提交的任務// threadPoolExecutor.shutdown();}}
2.3、造數據,多線程異步插入:
public String batchWay() throws Exception {log.info("開始批量操作.........");Random rand = new Random();List<Student> list = new ArrayList<>();for (int i = 0; i < 1000003; i++) {Student student=new Student();student.setStudentName("小李"+i);student.setAddr("上海"+rand.nextInt(9) * 1000);student.setAge(rand.nextInt(1000));student.setPhone("134"+rand.nextInt(9) * 1000);list.add(student);}long startTime = System.currentTimeMillis(); // 開始時間boolean a=studentService.batchInsert(list);long endTime = System.currentTimeMillis(); //結束時間return "執行完成一共耗時time: " + (endTime - startTime) / 1000 + " s";}
2.4、測試結果

匯總結果:
| 序號 |
核心線程(core_pool_size) |
插入數據(萬) | 耗時(秒) |
| 1 | 10 | 100w | 38s |
| 2 | 15 | 100w | 32s |
| 3 | 50 | 100w | 31s |
個人推薦:SpringBoot用線程池ThreadPoolTaskExecutor異步處理百萬級數據的方法。
總結:ThreadPoolTaskExecutor和ThreadPoolExecutor比Executors創建線程池更加靈活,可以設置參數,推薦ThreadPoolTaskExecutor和ThreadPoolExecutor,而ThreadPoolTaskExecutor是ThreadPoolExecutor的封裝,所以,性能更加優秀,推薦ThreadPoolTaskExecutor。






