// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
package org.springframework.batch.core.step.item;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer.Sample;
import java.util.Iterator;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepListener;
import org.springframework.batch.core.listener.MulticasterBatchListener;
import org.springframework.batch.core.metrics.BatchMetrics;
import org.springframework.batch.core.step.item.Chunk.ChunkIterator;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
private ItemProcessor<? super I, ? extends O> itemProcessor;
private ItemWriter<? super O> itemWriter;
private final MulticasterBatchListener<I, O> listener;
protected final O doProcess(I item) throws Exception {
if (this.itemProcessor == null) {
this.listener.beforeProcess(item);
// ItemProcessor의 process()로 가공
O result = this.itemProcessor.process(item);
this.listener.afterProcess(item, result);
} catch (Exception var3) {
this.listener.onProcessError(item, var3);
protected final void doWrite(List<O> items) throws Exception {
if (this.itemWriter != null) {
this.listener.beforeWrite(items);
// 가공된 데이터들을 doWirte()로 일괄 처리
this.doAfterWrite(items);
} catch (Exception var3) {
this.doOnWriteError(var3, items);
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
this.initializeUserData(inputs);
if (!this.isComplete(inputs)) {
// inputs는 이전에 `chunkProvider.privide()에서 받은 ChunkSize만큼 쌓인 item
Chunk<O> outputs = this.transform(contribution, inputs);
contribution.incrementFilterCount(this.getFilterCount(inputs, outputs));
// transform()을 통해 가공된 대량 데이터는 write()를 통해 일괄 저장된다.
// 이때 wirte()는 저장이 될 수도 있고, API 전송이 될 수도 있다. (ItemWriter 구현방식에 따라 다름)
this.write(contribution, inputs, this.getAdjustedOutputs(inputs, outputs));
protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
Sample sample = BatchMetrics.createTimerSample();
String status = "SUCCESS";
this.doWrite(outputs.getItems());
} catch (Exception var10) {
this.stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
contribution.incrementWriteCount(outputs.size());
// 전달받은 input을 doProcess()로 전달하고 변환 값을 받는다.
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
Chunk<O> outputs = new Chunk();
ChunkIterator iterator = inputs.iterator();
while(iterator.hasNext()) {
I item = iterator.next();
Sample sample = BatchMetrics.createTimerSample();
String status = "SUCCESS";
output = this.doProcess(item);
} catch (Exception var13) {
this.stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");