likes
comments
collection
share

Spring Batch(一):入门案例与详细案例分析

作者站长头像
站长
· 阅读数 26

一、Spring Batch简介

1. 何为批处理?

何为批处理,大白话:就是将数据分批次进行处理的过程。比如:银行对账逻辑,跨系统数据同步等。

常规的批处理操作步骤:系统A从数据库中导出数据到文件,系统B读取文件数据并写入到数据库

Spring Batch(一):入门案例与详细案例分析

典型批处理特点:

  • 自动执行,根据系统设定的工作步骤自动完成
  • 数据量大,少则百万,多则上千万甚至上亿。(如果是10亿,100亿那只能上大数据了)
  • 定时执行,比如:每天,每周,每月执行。

2. Spring Batch了解

官网介绍:docs.spring.io/spring-batc…

  • Sping Batch 是一个轻量级的、完善的的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。
  • Spring Batch 是Spring的一个子项目,基于Spring框架为基础的开发的框架
  • Spring Batch 提供大量可重用的组件,比如:日志,追踪,事务,任务作业统计,任务重启,跳过,重复,资源管理等
  • Spring Batch 是一个批处理应用框架,不提供调度框架,如果需要定时处理需要额外引入-调度框架,比如: Quartz

3. Spring Batch 优势

Spring Batch 框架通过提供丰富的开箱即用的组件和高可靠性、高扩展性的能力,使得开发批处理应用的人员专注于业务处理,提高处理应用的开发能力。下面就是使用Spring Batch后能获取到优势:

  • 丰富的开箱即用组件
  • 面向Chunk的处理
  • 事务管理能力
  • 元数据管理
  • 易监控的批处理应用
  • 丰富的流程定义
  • 健壮的批处理应用
  • 易扩展的批处理应用
  • 复用企业现有的IT代码

4. Spring Batch 架构

Spring Batch 核心架构分三层:应用层,核心层,基础架构层。

Spring Batch(一):入门案例与详细案例分析

Application:应用层,包含所有的批处理作业,程序员自定义代码实现逻辑。

Batch Core:核心层,包含Spring Batch启动和控制所需要的核心类,比如:JobLauncher, Job,Step等。

Batch Infrastructure:基础架构层,提供通用的读,写与服务处理。

三层体系使得Spring Batch 架构可以在不同层面进行扩展,避免影响,实现高内聚低耦合设计。

二、入门案例

1. 批量处理流程

前面对Spring Batch 有大体了解之后,那么开始写个案例玩一下。

开始前,先了解一下Spring Batch程序运行大纲:

Spring Batch(一):入门案例与详细案例分析

JobLauncher:作业调度器,作业启动主要入口。

Job:作业,需要执行的任务逻辑,

Step:作业步骤,一个Job作业由1个或者多个Step组成,完成所有Step操作,一个完整Job才算执行结束。

ItemReader:Step步骤执行过程中数据输入。可以从数据源(文件系统,数据库,队列等)中读取Item(数据记录)。

ItemWriter:Step步骤执行过程中数据输出,将Item(数据记录)写入数据源(文件系统,数据库,队列等)。

ItemProcessor:Item数据加工逻辑(输入),比如:数据清洗,数据转换,数据过滤,数据校验等

JobRepository: 保存Job或者检索Job的信息。SpringBatch需要持久化Job(可以选择数据库/内存),JobRepository就是持久化的接口

2. 入门案例

需求:打印一个hello spring batch!不带读/写/处理

步骤1:导入依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.3</version>
    <relativePath/>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

  <!--内存版-->
 <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.12</version>
</dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

</dependencies>

步骤2:配置数据库四要素与初始化SQL脚本

spring:
  datasource:
    username: root
    password: admin
    url: jdbc:mysql://127.0.0.1:3306/springbatch?serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true
    driver-class-name: com.mysql.cj.jdbc.Driver
    # 初始化数据库,文件在依赖jar包中
  sql:
    init:
      schema-locations: classpath:org/springframework/batch/core/schema-mysql.sql
      mode: always
      #mode: never

这里要注意, sql.init.model 第一次启动为always, 后面启动需要改为never,否则每次执行SQL都会异常。第一次启动会自动执行指定的脚本,后续不需要再初始化

Spring Batch(一):入门案例与详细案例分析

步骤3:创建测试方法

package com.langfeiyes.batch._01_hello;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class HelloJob {
    //job调度器
    @Autowired
    private JobLauncher jobLauncher;
    //job构造器工厂
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    //step构造器工厂
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    //任务-step执行逻辑由tasklet完成
    @Bean
    public Tasklet tasklet(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                System.out.println("Hello SpringBatch....");
                return RepeatStatus.FINISHED;
            }
        };
    }
    //作业步骤-不带读/写/处理
    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .tasklet(tasklet())
                .build();
    }
    //定义作业
    @Bean
    public Job job(){
        return jobBuilderFactory.get("hello-job")
                .start(step1())
                .build();
    }
    public static void main(String[] args) {
        SpringApplication.run(HelloJob.class, args);
    }

}

步骤3:分析

例子是一个简单的SpringBatch 入门案例,使用了最简单的一种步骤处理模型:Tasklet模型,step1中没有带上读/写/处理逻辑,只有简单打印操作,后续随学习深入,我们再讲解更复杂化模型。

三、入门案例解析

1>@EnableBatchProcessing

批处理启动注解,要求贴配置类或者启动类上

@SpringBootApplication
@EnableBatchProcessing
public class HelloJob {
    ...
}

贴上@EnableBatchProcessing注解后,SpringBoot会自动加载JobLauncher JobBuilderFactory StepBuilderFactory 类并创建对象交给容器管理,要使用时,直接@Autowired即可

//job调度器
@Autowired
private JobLauncher jobLauncher;
//job构造器工厂
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造器工厂
@Autowired
private StepBuilderFactory stepBuilderFactory;

2>配置数据库四要素

批处理允许重复执行,异常重试,此时需要保存批处理状态与数据,Spring Batch 将数据缓存在H2内存中或者缓存在指定数据库中。入门案例如果要保存在MySQL中,所以需要配置数据库四要素。

3>创建Tasklet对象

//任务-step执行逻辑由tasklet完成
@Bean
public Tasklet tasklet(){
    return new Tasklet() {
        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            System.out.println("Hello SpringBatch....");
            return RepeatStatus.FINISHED;
        }
    };
}

Tasklet负责批处理step步骤中具体业务执行,它是一个接口,有且只有一个execute方法,用于定制step执行逻辑。

public interface Tasklet {
  RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

execute方法返回值是一个状态枚举类:RepeatStatus,里面有可继续执行态与已经完成态

public enum RepeatStatus {
  /**
   * 可继续执行的-tasklet返回这个状态会进入死循环
   */
  CONTINUABLE(true), 
  /**
   * 已经完成态
   */
  FINISHED(false);
    ....
}

4>创建Step对象

//作业步骤-不带读/写/处理
@Bean
public Step step1(){
    return stepBuilderFactory.get("step1")
        .tasklet(tasklet())
        .build();
}

Job作业执行靠Step步骤执行,入门案例选用最简单的Tasklet模式,后续再讲Chunk块处理模式。

5>创建Job并执行Job

//定义作业
@Bean
public Job job(){
    return jobBuilderFactory.get("hello-job")
        .start(step1())
        .build();
}

创建Job对象交给容器管理,当springboot启动之后,会自动去从容器中加载Job对象,并将Job对象交给JobLauncherApplicationRunner类,再借助JobLauncher类实现job执行。