SpringBatch中ListItemReader问题分析与总结
SpringBatch中ListItemReader问题分析与总结
项目中使用了SpringBatch作为批处理框架。在使用的过程中曾经遇到一系列问题,本文章主要针对ListItemReader的问题进行记录、分析、总结、分享,期望能够抛砖引玉以及提升独立思考与解决问题能力。
此处先贴一下ListItemReader的源码:
/**
* An {@link ItemReader} that pulls data from a list. Useful for testing.
*
* @author Dave Syer
* @author jojoldu
*
*/
public class ListItemReader<T> implements ItemReader<T> {
private List<T> list;
public ListItemReader(List<T> list) {
// If it is a proxy we assume it knows how to deal with its own state.
// (It's probably transaction aware.)
if (AopUtils.isAopProxy(list)) {
this.list = list;
}
else {
this.list = new LinkedList<>(list);
}
}
@Nullable
@Override
public T read() {
if (!list.isEmpty()) {
return list.remove(0);
}
return null;
}
}
1.问题现象
-
系统运行过程中频繁出现批处理超时预警:即batch_step_execution表中start_time、end_time相减,得到的处理时间超出预期,且与业务逻辑处理的实际执行时间相差较大。
-
系统导入/导出大数据量文件时出现慢、内存占用较高的现象。
2.问题分析
2.1 并发问题
2.1.1 问题原因
严格来说这并不算ListltemReader本身的问题,而是 Spring 在创建一个被@StepScope修饰的Bean的时候的处理机制的问题。
在创建Bean时,Spring 使用了synchronized来保证线程安全性,这意味着此时在整个应用中创建@StepScope的Bean是同步执行的。如果一旦创建的过程比较长,那么后续的创建Bean都将阻塞。
将这个情况放到项目中:如果有多个这样的Bean同时执行,则意味着其他的使用了@StepScope的job将无法启动(需等待同步代码块执行完后释放锁),这对系统来说是个致命问题。
以下是StepScope关键源码:
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
StepContext context = getContext();
Object scopedObject = context.getAttribute(name);
if (scopedObject == null) {
synchronized (mutex) {
scopedObject = context.getAttribute(name);
if (scopedObject == null) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Creating object in scope=%s, name=%s", this.getName(), name));
}
scopedObject = objectFactory.getObject();
context.setAttribute(name, scopedObject);
}
}
}
return scopedObject;
}
以下是一个典型的问题业务代码示例:
@StepScope
public ListItemReader<TestDO> reader() {
//典型的问题代码:执行数据库查询
List<TestDO> queryFromDatabaseList = this.dao.selectList();
ListItemReader<TestDO> reader = new ListItemReader<TestDO>(queryFromDatabaseList);
return reader;
}
2.1.2 解决方式
明白了问题的原因,那么解决办法也很简单:
2.1.2.1 使用其他Tasklet
上文提及的ItemReader是属于ChunkOrientedTasklet依赖的组件。 ChunkOrientedTasklet是SpringBatch中Tasklet的一个实现,是一个基于Chunk的批处理方式。与传统的ItemStream相比,它提供了更高效的处理方式, 可以更快、更高效地处理大量教据。 它的实现原理是将数据成多chunk,每个chunk由ItemReader 读取一组数据 ,然后交ItemProcessor进行处理,并最终给ItemWriter负责写入逻辑。每个chunk大小可以配置。
再回到我们的问题,我们的问题是使用了@StepScope的ItemReader在创建的过程中会阻塞其他job的执行。这是一个基于ChunkOrientedTasklet来实现的job。 那么解决方式之一,则是我们不使用ChunkOrientedTasklet,而使用其他的Tasklet来实现,例如TaskletStep。 但与此同时,程序也将失去了ChunkOrientedTasklet带来的优点,最典型的影响是
1.无法通过查询 batch_step_execution相关表确认数据处理状态了
2.无法利用ChunkOrientedTasklet的分批次处理大量数据,需要自行处理其中的分批次处理逻辑了
以下是一个使用Tasklet的示例:
@Bean
public Job job1() {
return jobs.get("job1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Step step1() {
return steps.get("step1")
.tasklet((contribution, chunkContext) -> {
System.out.println("hello");
// simulate processing time
Thread.sleep(random.nextInt(3000));
return RepeatStatus.FINISHED;
})
.build();
}
2.1.2.1 使用其他ItemReader
使用TaskletStep的方式会丧失处理分批次大数据量数据的优点,如果希望继续使用ChunkOrientedTasklet,则可以考虑使用以下ItemReader来替代ListItemReader。 例如:数据库查询类:JdbcCurosorItemReader/文件处理类:FlatFileItemReader等。 更多的请查看官网[docs.spring.io/spring-batc…]
2.2.线程安全问题
这并不是ListItemReader的独有问题,而是使用了SpringBatch的多线程处理机制时将会遇到的问题。而ListItemReader的该问题较为突出。
以下是SpringBatch中使用多线程处理错误的示例代码。
@StepScope
public ListItemReader<TestDO> reader() {
List<TestDO> list = new ArrayList();
for(int i=0;i<10000;i++){
list.add(new TestDO());
}
ListItemReader<TestDO> reader = new ListItemReader<TestDO>(list);
return reader;
}
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(reader())
.writer(writer())
.taskExecutor(taskExecutor)
.build();
}
2.2.1问题原因
ListItemReader本身是线程安全的,并且它是无状态的,但是当调用read方法时,由于其底层是ArrayList存储数据,那么如果结合SpringBatch 的多线程处理机制,将会导致线程安全性问题。
2.2.2 解决方式
2.2.2.1 不使用多线程进行处理
谨慎评估当前业务场景是否需要多线程进行处理,以下是可以参考的一些评估依据 1.要处理的数据量是否较大?当前业务场景是否确实需要? 2.系统资源(CPU、内存等)是否足够? 3.开发人员对多线程编程的掌握程度如何,是否能够hold住可能出现的问题?
2.2.2.2 使用同步关键字进行处理
以下是一个线程安全的ItemReader示例:
public class ThreadSafeIteratorItemReader<T> implements ItemReader<T> {
private final Iterator<T> iterator;
public ThreadSafeIteratorItemReader(Iterable<T> iterable) {
Assert.notNull(iterable, "Iterable argument cannot be null!");
this.iterator = iterable.iterator();
}
public ThreadSafeIteratorItemReader(Iterator<T> iterator) {
Assert.notNull(iterator, "Iterator argument cannot be null!");
this.iterator = iterator;
}
public T read() throws Exception {
T t;
synchronized (this) {
t = this.iterator.hasNext() ? this.iterator.next() : null;
}
return t;
}
}
2.2.2.3 使用线程安全的容器
如CopyOnWriteArrayList作为ListItemReader的底层实现。
2.3 性能问题
2.3.1 问题原因
-
ListItemReader在read时使用的是remove(0)来实现的.我们知道ArrayList擅长随机访问元素,不擅长插入/删除元素,以及LinkedList擅长插入/删除元素,不擅长随机访问无素,所以在大部分情况下,read 方法调用时性能是较差的。(MyBatis的selectList查询结果默认使用ArrayList)
-
ArrayList的remove(0)方法的工现是基于System.arraycopy来实现的,那么意味着这个过程需要大量的内存,一旦数据量较大,则容易出现大量gc甚至OOM的情况出现,从 而导致性能大幅度下降或者崩溃的情况。
-
ListItemReader的构造器中使用了isAopProxy来判定List 的实现类型,这基本上决定了没有场景可以使用到性能略好的LinkedList.
3. 总结与思考
-
综合以上情况可知,ListItemReader的使用场景非常局限,必须是小量数据且处理迅速且处理速度能够稳定的场景。考虑到主要大部分业务开发类型的job还是数据查询为主,数据量的评估对开发人员总体要求较高,一般情况不使用ListItemReader。
-
架构师/技术负责人在引入一项“新技术”到项目中时,不仅需要从宏观层面做好调研,在项目开发时、线上运行时亦需要关注实际使用情况,警惕盲区。
-
开发人员在根据项目中demo进行使用时,亦需要多看源码进行反思、总结,保持对技术的好奇心与保持独立思考的能力。
转载自:https://juejin.cn/post/7264921718421209142