728x90
반응형

Chunk 지향 프로그래밍

Chunk란 아이템이 트랜잭션에 commit되는 수를 말한다.
즉, 청크 지향 처리란 한 번에 하나씩 데이터를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것을 의미한다. Chunk 단위로 트랜잭션을 수행하기 때문에, 수행이 실패한 경우 해당 Chunk 만큼만 롤백이 되고, 이전에 커밋된 트랜잭션 범위까지는 반영된다는 것을 의미한다.
Chunk 지향 프로세싱은 1000개의 데이터에 대해 배치 로직을 실행한다고 가정하면, Chunk 단위로 나누지 않았을 경우에는 한개만 실패해도 성공한 999개의 데이터가 롤백된다. Chunk 단위를 10으로 한다면, 작업 중에 다른 Chunk는 영향을 받지 않는다.
 
여기선 Reader, Processor에서는 1건씩 다뤄지고, Writer에서는 Chunk 단위로 처리된다는 것을 기억하면 된다.
ChunkOrientedTasklet
 
public class ChunkOrientedTasklet<I> implements Tasklet {
 
private static final String INPUTS_KEY = "INPUTS";
 
private final ChunkProcessor<I> chunkProcessor;
 
private final ChunkProvider<I> chunkProvider;
 
private boolean buffering = true;
 
private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class);
 
 
public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) {
 
this.chunkProvider = chunkProvider;
 
this.chunkProcessor = chunkProcessor;
 
}
 
 
public void setBuffering(boolean buffering) {
 
this.buffering = buffering;
 
}
 
 
@Nullable
 
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
 
Chunk<I> inputs = (Chunk)chunkContext.getAttribute("INPUTS");
 
if (inputs == null) {
 
// Reader에서 데이터 가져오기
 
inputs = this.chunkProvider.provide(contribution);
 
if (this.buffering) {
 
chunkContext.setAttribute("INPUTS", inputs);
 
}
 
}
 
 
 
this.chunkProcessor.process(contribution, inputs); // Processor & Writer 처리
 
this.chunkProvider.postProcess(contribution, inputs);
 
if (inputs.isBusy()) {
 
logger.debug("Inputs still busy");
 
return RepeatStatus.CONTINUABLE;
 
} else {
 
chunkContext.removeAttribute("INPUTS");
 
chunkContext.setComplete();
 
if (logger.isDebugEnabled()) {
 
logger.debug("Inputs not busy, ended: " + inputs.isEnd());
 
}
 
 
return RepeatStatus.continueIf(!inputs.isEnd());
 
}
 
}
 
}
ChunkOrientedTasklet에서 Chunk 단위로 작업하기 위한 코드는 execute()에 있다.
 
SimpleChunkProcessor
ChunkProcessor 는 Processor와 Writer의 로직을 구현하고 있다.
 
public interface ChunkProcessor<I> {
 
void process(StepContribution var1, Chunk<I> var2) throws Exception;
 
}
ChunkProcessor는 인터페이스이기 때문에 실제 구현체가 있어야하며, 기본적으로 SimpleChunkProcessor가 사용된다.
 
//
 
// Source code recreated from a .class file by IntelliJ IDEA
 
// (powered by FernFlower decompiler)
 
//
 
 
package org.springframework.batch.core.step.item;
 
 
import io.micrometer.core.instrument.Tag;
 
import io.micrometer.core.instrument.Timer.Sample;
 
import java.util.Iterator;
 
import java.util.List;
 
import org.springframework.batch.core.StepContribution;
 
import org.springframework.batch.core.StepExecution;
 
import org.springframework.batch.core.StepListener;
 
import org.springframework.batch.core.listener.MulticasterBatchListener;
 
import org.springframework.batch.core.metrics.BatchMetrics;
 
import org.springframework.batch.core.step.item.Chunk.ChunkIterator;
 
import org.springframework.batch.item.ItemProcessor;
 
import org.springframework.batch.item.ItemWriter;
 
import org.springframework.beans.factory.InitializingBean;
 
import org.springframework.lang.Nullable;
 
import org.springframework.util.Assert;
 
 
public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
 
private ItemProcessor<? super I, ? extends O> itemProcessor;
 
private ItemWriter<? super O> itemWriter;
 
private final MulticasterBatchListener<I, O> listener;
 
 
// ...
 
protected final O doProcess(I item) throws Exception {
 
if (this.itemProcessor == null) {
 
return item;
 
} else {
 
try {
 
this.listener.beforeProcess(item);
 
// ItemProcessor의 process()로 가공
 
O result = this.itemProcessor.process(item);
 
this.listener.afterProcess(item, result);
 
return result;
 
} catch (Exception var3) {
 
this.listener.onProcessError(item, var3);
 
throw var3;
 
}
 
}
 
}
 
 
protected final void doWrite(List<O> items) throws Exception {
 
if (this.itemWriter != null) {
 
try {
 
this.listener.beforeWrite(items);
 
// 가공된 데이터들을 doWirte()로 일괄 처리
 
this.writeItems(items);
 
this.doAfterWrite(items);
 
} catch (Exception var3) {
 
this.doOnWriteError(var3, items);
 
throw var3;
 
}
 
}
 
}
 
 
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
 
this.initializeUserData(inputs);
 
if (!this.isComplete(inputs)) {
 
// inputs는 이전에 `chunkProvider.privide()에서 받은 ChunkSize만큼 쌓인 item
 
Chunk<O> outputs = this.transform(contribution, inputs);
 
contribution.incrementFilterCount(this.getFilterCount(inputs, outputs));
 
// transform()을 통해 가공된 대량 데이터는 write()를 통해 일괄 저장된다.
 
// 이때 wirte()는 저장이 될 수도 있고, API 전송이 될 수도 있다. (ItemWriter 구현방식에 따라 다름)
 
this.write(contribution, inputs, this.getAdjustedOutputs(inputs, outputs));
 
}
 
}
 
 
// ...
 
 
protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
 
Sample sample = BatchMetrics.createTimerSample();
 
String status = "SUCCESS";
 
 
try {
 
this.doWrite(outputs.getItems());
 
} catch (Exception var10) {
 
inputs.clear();
 
status = "FAILURE";
 
throw var10;
 
} finally {
 
this.stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
 
}
 
 
contribution.incrementWriteCount(outputs.size());
 
}
 
 
// 전달받은 input을 doProcess()로 전달하고 변환 값을 받는다.
 
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
 
Chunk<O> outputs = new Chunk();
 
ChunkIterator iterator = inputs.iterator();
 
 
while(iterator.hasNext()) {
 
I item = iterator.next();
 
Sample sample = BatchMetrics.createTimerSample();
 
String status = "SUCCESS";
 
 
Object output;
 
try {
 
output = this.doProcess(item);
 
} catch (Exception var13) {
 
inputs.clear();
 
status = "FAILURE";
 
throw var13;
 
} finally {
 
this.stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
 
}
 
 
if (output != null) {
 
outputs.add(output);
 
} else {
 
iterator.remove();
 
}
 
}
 
 
return outputs;
 
}
 
 
}
여기서 Chunk 단위 처리를 담당하는 핵심 로직은 process()에 있다.
Page Size vs Chunk Size
Chunk Size는 한번에 처리될 트랜잭션 단위를 의미하며, Page Size는 한번에 조회할 Item의 양을 의미한다.
AbstractPagingItemReader
 
@Nullable
 
protected T doRead() throws Exception {
 
synchronized(this.lock) {
 
 
if (this.results == null || this.current >= this.pageSize) {
 
if (this.logger.isDebugEnabled()) {
 
this.logger.debug("Reading page " + this.getPage());
 
}
 
 
this.doReadPage();
 
++this.page;
 
if (this.current >= this.pageSize) {
 
this.current = 0;
 
}
 
}
 
 
int next = this.current++;
 
return next < this.results.size() ? this.results.get(next) : null;
 
}
 
}
현재 읽어올 데이터가 없거나, pageSize를 초과한 경우 doReadPage()를 호출하는 것을 볼 수 있다. 즉, Page 단위로 끊어서 호출하는 것을 볼 수 있다.
Page Size와 Chunk Size를 다르게 설정하는 경우의 예를 들어보자. 만약 PageSize가 10, Chunk Size가 50이라면, ItemReader에서 Page조회가 5번 일어나면 1번의 트랜잭션이 발생하여, Chunk가 처리될 것이다.
한번의 트랜잭션의 처리를 위해 5번의 쿼리 조회가 발생하는 것은 성능샹 이슈가 발생할 수 있다. Spring Batch에서는 다음과 같이 설명이 되어있다.
Setting a fairly large page size and using a commit interval that matches the page size should provide better performance. (상당히 큰 페이지 크기를 설정하고 페이지 크기와 일치하는 커미트 간격을 사용하면 성능이 향상됩니다.)
추가적으로 JPA 사용시에도 lazeException 오류가 발생할 수 있다.
 
org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: com.blogcode.example3.domain.PurchaseOrder.productList, could not initialize proxy - no Session
 
public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
 
protected Log logger = LogFactory.getLog(this.getClass());
 
private volatile boolean initialized = false;
 
private int pageSize = 10;
AbstractPagingItemReader를 보면 pageSize의 default 크기는 10인 것을 확인할 수 있다.
 
protected void doReadPage() {
 
EntityTransaction tx = null;
 
if (this.transacted) {
 
tx = this.entityManager.getTransaction();
 
tx.begin();
 
this.entityManager.flush();
 
this.entityManager.clear();
 
}
 
 
Query query = this.createQuery().setFirstResult(this.getPage() * this.getPageSize()).setMaxResults(this.getPageSize());
 
if (this.parameterValues != null) {
 
Iterator var3 = this.parameterValues.entrySet().iterator();
 
 
while(var3.hasNext()) {
 
Entry<String, Object> me = (Entry)var3.next();
 
query.setParameter((String)me.getKey(), me.getValue());
 
}
 
}
 
 
if (this.results == null) {
 
this.results = new CopyOnWriteArrayList();
 
} else {
 
this.results.clear();
 
}
 
 
if (!this.transacted) {
 
List<T> queryResult = query.getResultList();
 
Iterator var7 = queryResult.iterator();
 
 
while(var7.hasNext()) {
 
T entity = var7.next();
 
this.entityManager.detach(entity);
 
this.results.add(entity);
 
}
 
} else {
 
this.results.addAll(query.getResultList());
 
tx.commit();
 
}
 
 
}
그리고 JpaPagingItemReaderdoReadPage()를 보면 this.entityManager.flush(), this.entityManager.clear()로 이전 트랜잭션을 초기화 시키기때문에 만약 Chunk Size가 100, Page Size가 10이라면 마지막 조회를 제외한 9번의 조회결과들의 트랜잭션 세션이 전부 종료되어 오류가 발생하는 것을 볼 수 있다.
이 문제 또한, Page Size와 Chunk Size를 일치시키면 해결되는 것을 볼 수 있다.
2개의 값이 의미하는 바는 다르지만, 여러 이슈로 2개 값을 일치 시키는 것이 보편적으로 좋은 방법이며, 2개 값을 일치 시키는 것을 권장한다.
728x90
반응형
728x90
반응형

시퀀스(Sequence) 와 MAX +1 의 차이

일련번호 같이 기본키를 잡을만한 컬럼의 고유번호를 지정할 때 자동증분을 많이 이용하고는 한다.

보통 시퀀스(Sequence) 또는 MAX + 1 을 사용할텐데, 현업에서는 아직도 MAX + 1을 사용하는 곳이 많다고 한다.

 

이 둘의 가장 큰 차이점은 무엇일까?

일단 하나 들어보자면 데이터 중복의 허용과 비허용을 말할 수 있다.

 

예를 들어보자.

 

한번에 10명의 사용자가 동시접속을 해서 게시글을 작성했다고 생각해보자.

그렇다면 총 10번의 INSERT가 진행될 것이다.

 

각각의 INSERT에 대해서 일련번호를 MAX + 1, Sequence.NEXTVAL을 사용했다 치면 각각의 값들은 이런식으로 나올 수 있다.

물론 무조건 그렇다는게 아니라 그럴수도 있다는 점을 알아두면 좋겠다.

MAX + 1은 1,1,2,3,4,5,5,6,7,8 이라는 값이 나올 수 있고
Sequence.NEXTVAL 같은 경우에는 1,2,3,4,5,6,7,8,9,10이라는 값으로 나올 것이다.

여기서 말하고자 하는것은 MAX를 사용할 경우에는 중복이 될 수 있고, Sequence 같은 경우에는 중복이 발생할 수 없다는 점이다.

시퀀스 크리티컬 세션이 보장되기 때문에 절대 데이터의 중복처리가 되질 않는다.

MAX + 1 현재 가장 큰 값에 + 1을 시켜주는 것이기 때문에 중복이 될 확률이 높다.

 

일단 차이점에 대해 작성을 해봤는데, 개인적인 생각으로는 MAX + 1을 써야하는 이유가 전혀 없다고 생각된다.

 

개발이 편해서? 시퀀스도 편하다. 시퀀스를 생성하는게 번거로워서? 그냥 생성문만 실행시켜주면 끝이다.

그리고 성능을 많이 신경써야 하는 부분에 있어서 MAX + 1은 풀스캔을 하기 때문에 성능상 매우 좋지 않다.

 

되도록 시퀀스를 쓰기를 바란다.

728x90
반응형
728x90
반응형

--최근 수정된 데이터조회(수정된지 오래되었으면 오류발생)

SELECT
ORA_ROWSCN AS SCN
, TO_CHAR(SCN_TO_TIMESTAMP(ORA_ROWSCN),'YYYY-MM-DD HH24:MI:SS') AS TM
, MENU_ID
FROM TB_MENU_INFO;

 

 

--10분전 데이터 조회

SELECT  * 
  FROM TB_MENU_INFO  AS OF TIMESTAMP(SYSTIMESTAMP - INTERVAL '10' MINUTE) 
 WHERE  MENU_ID LIKE 'SC_%'

 

cf) SECOND, MINUTE, HOUR, DAY 로 조회 가능

 

--특정 날짜 시간 데이터 조회
SELECT  * 
  FROM TB_MENU_INFO  AS OF TIMESTAMP(TO_TIMESTAMP('2018-11-12 08:30:10', 'YYYY-MM-DD HH24:MI:SS')) 
 WHERE  MENU_ID LIKE 'SC_%'

 
728x90
반응형
728x90
반응형

Pseudo 컬럼을 이용한 변경 체크

기존의 테이블에 컬럼 추가 없이 Pseudo 컬럼을 이용한 방법을 사용할 수 있다.

이는 10g에서의 SCN 번호를 가져올 수 있는 ORA_ROWSCN 함수를 사용해서 체크하는 방법을 이용하면 간단히 구현될 수 있다. 10g에서는 SCN 값의 변경사항 중 하나가 종래의 block level SCN에서 Row level SCN을 지원하게 되었다는 것이다(SCN은 System Change Number 또는 System Commit Number라고 병행해서 사용된다. 이 값은 커밋시마다 부여되는 오라클의 내부시계와 같은 역할을 수행한다).


기존에 BLOCK 레벨로 부여하던 SCN 값이 로우 레벨에 따라 다른 번호를 가질 수 있게 된 것이다. 따라서 이러한 ora_ rowscn 값을 이용해 테이블의 로우가 언제 변경되었는지에 대한 정보도 뽑아볼 수 있다. 그러나 모든 테이블에 다 적용되는 것이 아닌 ROW LEVEL SCN을 적용하게끔 테이블 생성시 ROW DEPENDENCIES 옵션으로 생성해야 한다는 것이다.

728x90
반응형
728x90
반응형

오라클 DB에 접속하는 Java 프로그램을 컴파일 하거나, 실행할 때 Unsupported majar.minor version 에러가 발생할 수 있습니다.

 

$ javac jdbcTest_CNT.java
warning: oracle/jdbc/driver/OracleDriver.class(oracle/jdbc/driver:OracleDriver.class): major version 51 is newer than 50, the highest major version supported by this compiler.
It is recommended that the compiler be upgraded.
1 warning
$ java jdbcTest_CNT
Exception in thread "main" java.lang.UnsupportedClassVersionError: oracle/jdbc/driver/OracleDriver : Unsupported major.minor version 51.0
       at java.lang.ClassLoader.defineClass1(Native Method)
       at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
       at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
       at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
       at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
       at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
       at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
       at java.security.AccessController.doPrivileged(Native Method)
       at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
       at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
       at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
       at jdbcTest_CNT.main(jdbcTest_CNT.java:56)

 

원인은 Oracle JDBC 버전과 JDK 버전, Java 버전이 서로 다르기 때문입니다.

 

예를 들어,

아래와 같이 ClassPath 에는 ojdbc7.jar 파일을 사용하도록 설정해놓고,

컴파일 or 실행할 때는 Java 6 버전(1.6 버전)을 쓰게 되면 이런 에러가 발생합니다.

 

export CLASSPATH=.:$ORACLE_HOME/jdbc/lib/ojdbc7.jar
$ javac -version
javac 1.6.0_37

$ java -version
java version "1.6.0_37" Java(TM) SE Runtime Environment (build 1.6.0_37-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.12-b01, mixed mode)

 

이런 경우, JDK, Java 버전을 1.7 로 똑같이 맞춰주던가,

아니면, $ORACLE_HOME/jdbc/lib/ 디렉토리에 가보면, ojdbc6.jar 파일이 있습니다.

이걸 ClassPath 에 잡아주면 에러없이 잘 동작합니다.

 

오라클 DB버전별로 지원되는 JDBC 버전과 JDK 버전이 있습니다. 버전에 맞는 걸 정확히 확인하고 사용하는 것이 바람직합니다.

 

 

728x90
반응형

+ Recent posts