環境:Springboot2.6.12 + Spring Batch4.2.7
Spring Batch是一個輕量級的,完全面向Spring的批處理框架,可以應用于企業級大量的數據處理系統。Spring Batch以POJO和大家熟知的Spring框架為基礎,使開發者更容易的訪問和利用企業級服務。Spring Batch可以提供大量的,可重復的數據處理功能,包括日志記錄/跟蹤,事務管理,作業處理統計工作重新啟動、跳過,和資源管理等重要功能。
業務場景:
-
定期提交批處理。
-
并行批處理:作業的并行處理
-
分階段、企業消息驅動的處理
-
大規模并行批處理
-
故障后手動或計劃重新啟動
-
相關步驟的順序處理(擴展到工作流驅動的批處理)
-
部分處理:跳過記錄(例如,回滾時)
-
整批事務,適用于小批量或現有存儲過程/腳本的情況
技術目標:
-
批處理開發人員使用Spring編程模型:專注于業務邏輯,讓框架負責基礎設施。
-
基礎架構、批處理執行環境和批處理應用程序之間的關注點清晰分離。
-
提供通用的核心執行服務,作為所有項目都可以實現的接口。
-
提供可“開箱即用”的核心執行接口的簡單和默認實現。
-
通過在所有層中利用spring框架,可以輕松配置、定制和擴展服務。
-
所有現有的核心服務都應該易于替換或擴展,而不會對基礎架構層造成任何影響。
-
提供一個簡單的部署模型,使用Maven構建的架構JAR與應用程序完全分離。
Spring Batch的結構:

此分層體系結構突出了三個主要的高級組件:應用程序、核心和基礎架構。該應用程序包含開發人員使用SpringBatch編寫的所有批處理作業和自定義代碼。批處理核心包含啟動和控制批處理作業所需的核心運行時類。它包括JobLauncher、Job和Step的實現。應用程序和核心都構建在公共基礎架構之上。此基礎結構包含公共讀寫器和服務(如RetryTemplate),應用程序開發人員(讀寫器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的庫)都使用這些服務。
下面介紹開發流程
本例完成 讀取文件內容,經過處理后,將數據保存到數據庫中
-
引入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>MySQL</groupId><artifactId>mysql-connector-JAVA</artifactId></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-validator</artifactId><version>6.0.7.Final</version></dependency>
-
應用配置文件
spring:datasource:driverClassName: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8username: rootpassword: *******type: com.zaxxer.hikari.HikariDataSourcehikari:minimumIdle: 10maximumPoolSize: 200autoCommit: trueidleTimeout: 30000poolName: MasterDatabookHikariCPmaxLifetime: 1800000connectionTimeout: 30000connectionTestQuery: SELECT 1---spring:jpa:generateDdl: falsehibernate:ddlAuto: updateopenInView: trueshow-sql: true---spring:batch:job:enabled: false #是否自動執行任務initialize-schema: always #自動為我們創建數據庫腳本
-
開啟批處理功能
@Configuration@EnableBatchProcessingpublic class BatchConfig extends DefaultBatchConfigurer{}
-
任務啟動器
接著上一步的配置類BatchConfig重寫對應方法
@Overrideprotected JobLauncher createJobLauncher() throws Exception {SimpleJobLauncher jobLauncher = new SimpleJobLauncher();jobLauncher.setJobRepository(createJobRepository());jobLauncher.afterPropertiesSet();return jobLauncher;}
-
任務存儲
接著上一步的配置類BatchConfig重寫對應方法
@Resourceprivate PlatformTransactionManager transactionManager ;@Overrideprotected JobRepository createJobRepository() throws Exception {JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();factory.setDatabaseType("mysql");factory.setTransactionManager(transactionManager);factory.setDataSource(dataSource);factory.afterPropertiesSet();return factory.getObject();}
-
定義JOB
@Beanpublic Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){return builder.get("myJob").incrementer(new RunIdIncrementer()).flow(step).end().listener(jobExecutionListener).build();}
-
定義ItemReader讀取器
@Beanpublic ItemReader<Person> reader(){FlatFileItemReader<Person> reader = new FlatFileItemReader<>();reader.setResource(new ClassPathResource("cvs/persons.cvs"));reader.setLineMApper(new DefaultLineMapper<Person>() {// 代碼塊{setL.NETokenizer(new DelimitedLineTokenizer(",") {{setNames("id", "name");}}) ;setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{setTargetType(Person.class) ;}});}});return reader;}
-
定義ItemProcessor處理器
@Beanpublic ItemProcessor<Person, Person2> processorPerson(){return new ItemProcessor<Person, Person2>() {@Overridepublic Person2 process(Person item) throws Exception {Person2 p = new Person2() ;p.setId(item.getId()) ;p.setName(item.getName() + ", pk");return p ;}} ;}
-
定義ItemWriter寫數據
@Resourceprivate Validator<Person> validator ;@Resourceprivate EntityManagerFactory entityManagerFactory ;@Beanpublic ItemWriter<Person2> writerPerson(){JpAItemWriter<Person2> writer = null ;JpaItemWriterBuilder<Person2> builder = new JpaItemWriterBuilder<>() ;builder.entityManagerFactory(entityManagerFactory) ;writer = builder.build() ;return writer;}
-
定義Step
@Beanpublic Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){return stepBuilderFactory.get("myStep").<Person, Person>chunk(2) // Chunk的機制(即每次讀取一條數據,再處理一條數據,累積到一定數量后再一次性交給writer進行寫入操作).reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2).listener(new MyReadListener()).processor(processor).writer(writer).faultTolerant().skip(Exception.class).skipLimit(2).listener(new MyWriteListener()).build();}
-
定義相應的監聽器
public class MyReadListener implements ItemReadListener<Person> {private Logger logger = LoggerFactory.getLogger(MyReadListener.class);@Overridepublic void beforeRead() {}@Overridepublic void afterRead(Person item) {System.out.println("reader after: " + Thread.currentThread().getName()) ;}@Overridepublic void onReadError(Exception ex) {logger.info("讀取數據錯誤:{}", ex);}}
@Componentpublic class MyWriteListener implements ItemWriteListener<Person> {private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);@Overridepublic void beforeWrite(List<? extends Person> items) {}@Overridepublic void afterWrite(List<? extends Person> items) {System.out.println("writer after: " + Thread.currentThread().getName()) ;}@Overridepublic void onWriteError(Exception exception, List<? extends Person> items) {try {logger.info(format("%s%n", exception.getMessage()));for (Person item : items) {logger.info(format("Failed writing BlogInfo : %s", item.toString()));}} catch (Exception e) {e.printStackTrace();}}}
person.cvs文件內容

實體類:@Entity@Table(name = "t_person")public class Person { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id ; private String name ;}
啟動任務執行
@RestController@RequestMapping("/demo")public class DemoController {@Resource@Qualifier("myJob")private Job job ;@Resourceprivate JobLauncher launcher ;@GetMapping("/index")public Object index() {JobParameters jobParameters = new JobParametersBuilder().toJobParameters() ;try {launcher.run(job, jobParameters) ;} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException| JobParametersInvalidException e) {e.printStackTrace();}return "success" ;}}
啟動服務,自動為我們創建了表

執行任務
查看表情況



完畢!!!






