[Spring Batch] 직접 AbstractPagingItemReader를 구현하여 도메인에 특화된 PagingCollectionsItemReader 만들어보기

2024. 5. 12. 21:48Backend/Spring

모두의 랜덤 디펜스에서는 '백준'의 문제 데이터를 이용하여 코딩테스트 문제를 제공합니다.

 

'백준' 문제의 메타데이터는 시간이 갈수록 쌓일 것이고, 이를 자체 DB에 주기적으로 업데이트 하는 과정이 필요했습니다.

 

데이터베이스에 저장된 백준 문제들은 정기적으로 업데이트되어야 합니다. 이 목적을 달성하기 위해 solved.ac API를 활용하며, 이 API의 URL 구조는 다음과 같습니다.

https://solved.ac/api/v3/search/problem?query={query}&page={page}

특정 문제의 데이터를 조회할 때, {query}에 'id:1000'과 같은 문제 ID를 입력함으로써 접근할 수 있습니다.

 

문제 메타데이터 업데이트 과정 

1. DB에서 문제 정보를 읽어온다.

2. 문제 메타 데이터의 변화가 있는지 문제 id를 포함하여 외부 API 호출을 통해 비교한다.

3. 변동사항을 반영하여 DB에 저장한다.

 

이 과정에서 API의 특성을 적극 활용하여 외부 요청 수를 최소화할 수 있습니다. {query} 파라미터를 이용하여 id:1000|id:1001|id:1002와 같이 여러 문제 ID를 '|(OR)' 연산자로 묶어 한 번의 요청으로 여러 문제 정보를 가져올 수 있습니다.

 

이 방식은 최대 50개의 문제 정보를 한 번에 가져올 수 있도록 페이징되어 있습니다.

 

외부 API 호출이 과도하게 이루어질 경우, solved.ac로부터 차단될 위험이 있습니다.

따라서 API의 특성을 이용하면서, 외부 API 요청의 수를 최소화할 수 있는 배치 설계가 필요했습니다.


구상해보기 - Batch Processor에서 id를 50개 모으는 방법은?

일반적으로 PagingItemReader를 사용하면, 각 아이템이 개별적으로 Processor와 Writer로 전달됩니다. 이 과정에서 각 문제 ID를 개별적으로 처리하게 되면, API 요청의 수가 많아져 시스템에 부담을 줄 수 있습니다. 따라서 이 문제를 해결하기 위해 새로운 접근 방법이 필요합니다.

 

API의 특성을 이용하여 API 요청 수를 최소화하려면 Reader를 통해 읽은 문제 ID들을 50개씩 묶어서 한 번의 API요청을 보내야 합니다.

 

따라서 문제 ID를 50개씩 어떻게 묶을 수 있을지 고민해보았습니다.

Writer에서는 chunk 사이즈에 맞춰 아이템을 bulk로 처리할 수 있으나, 외부 API 요청을 Writer 단계에서 처리하고 이를 수정하는 것은 적절하지 않다고 판단했습니다

또한, DB에서 문제를 읽는 Reader에서 API요청을 또 하는 하는 것도 적절하지 않다고 느꼈습니다.

 

이를 해결하기 위해 Reader에서 읽은 Item들을 List나 Set등의 Collection으로 묶어서 Processor로 넘겨주면 Processor에서 ID를 모아 API를 요청할 수 있지 않을까 생각했습니다.

 

이에, AbstractPagingItemReader를 구현한 JpaPagingItemReader를 참고하여, 문제 ID를 50개씩 묶어 처리할 수 있는 PagingCollectionsItemReader를 구현하는 방법을 선택했습니다

 

이 구현을 통해 Reader는 데이터베이스에서 읽어온 아이템을 바로 넘기는 것이 아니라, 이들을 50개 단위로 묶어서 한 번에 처리할 수 있습니다.


JpaPagingItemReader 살펴보기

JpaPagingItemReader와 그 기반 클래스인 AbstractPagingItemReader의 내부 동작 방식을 깊이 있게 분석해보겠습니다.

AbstractPagingItemReader에 정의된 doRead와 JpaPagingItemReader의 doReadPage

JpaPagingItemReader의 분석을 통해 알 수 있듯, AbstractPagingItemReader에 정의된 doRead 및 JpaPagingItemReader에서 구현된 doReadPage 메서드는 AbstractPagingItemReader에 정의된 protected volatile List<T> result를 사용하여 페이징 처리를 진행합니다

 

AbstractPagingItemReader에 정의된 doRead 메소드는 배치에서 아이템을 하나씩 읽는 데 사용됩니다. 이 메소드는 다음과 같은 과정을 통해 데이터를 처리합니다:

 

1. Batch 시스템에서 Item을 하나 읽기 위해 AbstractPagingItemReader에 정의된 doRead를 호출하고

2. doRead에서는 current 인덱스를 이용해 protected volatile List<T> result에서 현재 처리할 아이템을 가져옵니다.

3. result에 읽으려는 값들이 없다면 this.doReadPage()를 호출하여 페이징 처리를 합니다. 이 때, JpaPagingItemReader에서 구현된 doReadPage메서드에서 페이징하여 Item을 원하는 크기만큼 읽어오고 result에 채웁니다.

 

4. 만약 더 이상 읽을 Item이 없다면 doRead에서 null을 반환하여 더 이상 읽을 Item이 없다는 것을 알립니다.

 

이 메커니즘 속에서, 원하는 것과 같이 여러 Item을 묶어서 Processor로 반환하고 싶다면 doReadPage에서 읽은 Item들을 result에 저장할 때 List<Item> 형태로 여러 개 저장하면 됩니다. 

 

배치 처리 중에 여러 아이템을 효과적으로 묶어 처리할 수 있는 PagingCollectionsItemReader를 개발했습니다. 이 커스텀 Reader는 doReadPage에서 읽은 아이템들을 result에 저장할 때, List<Item> 형태로 여러 개를 저장합니다.


PagingCollectionsItemReader 정의하기

 

필요한 필드들은 JpaPagingItemReader의 필드를 참고하여 만들었으며, 제네릭 부분만 수정하여

원하는 Collection으로 처리할 수 있게 C extends Collection<T>를 통해 T를 저장하는 Collection들이 Item으로 처리될 수 있게 만들었습니다.

PagingCollectionsItemReader의 doReadPage 메서드

 

doReadPage에서는 JpaPagingItemReader에 있던 트랜잭션 관련 코드를 제거했습니다.

코드 제거를 통해 Reader에서 읽은 Item들이 트랜잭션이 종료되면서 Processor에서 N+1 문제가 발생하는 것을 해결할 수 있습니다.

 

이 부분에 관해서는 다음 포스팅에서 다뤘습니다. 

https://miiiinju.tistory.com/14

 

[모두의 랜덤 디펜스] Spring Batch Processor에서 발생하는 N+1문제 삽질하기

혹시나 정답을 알고 계신 분이 있으시다면 제가 한 분석에 관해 피드백 주시면 정말 감사할 것 같습니다 ㅜㅜ default_batch_fetch_size옵션이 들어가있음에도 불구하고 @OneToMany에 대해 N+1문제가 발생

miiiinju.tistory.com

 

이 외에는, 116번째 줄의 setMaxResult에서 주석에 표시된 것과 같이, totalPageSize에 collectionLength와 chunkSize를 통해 한 번에 가져올 페이지의 크기를 지정해두고 페이징 하도록 구성했습니다.

140번 줄의 fetchQuery메서드에서 getResultList를 통해 페이징 해온 Item List들을

 

distributeResultsByChunk 메서드를 통해 CollectionLength만큼 잘라서 한 List<Item>으로 묶고, results에 add하도록 만들었습니다.

사용 예시

PagingCollectionsItemReader Bean 생성

PagingCollectionsItemReader는 Builder를 만들어 Bean으로 등록해주었습니다.

BatchConfig

 

Reader에서는 읽은 Problem들을 List<Problem>로 묶어서 Processor로 넘기도록 구성했습니다.

Processor는 내부적으로 API를 호출하여 업데이트된 정보를 가져오고, 업데이트된 정보에 따라 Problem을 수정하는 두 개의 Processor로 구성되어 있습니다.

Writer는 업데이트된 문제 데이터를 chunk 단위로 쓰는 역할을 합니다.

 

GetProblemUpdateInfoProcessor

 내부적으로 API 호출하여 업데이트된 정보를 가져오는 Processor에서는

 

getAPIURI 메서드를 통해 쿼리를 만들고, getProblemUpdate메서드를 통해 API요청을 진행합니다.

 

ProblemUpdateProcessor

다음 Processor에서는 problem 각각 업데이트된 'updatedProblemInfo'에 따라 변경을 반영하는 역할을 수행하며 기존에 설계했던 문제 업데이트 처리를 완료합니다.


결론

이를 통해 JpaPagingItemReader를 사용하는 것만으로 적절하지 않은 상황에서 

도메인 상황에 적합한 Reader를 직접 만들었습니다.

 

이를 통해 외부 API 호출 수를 최적화하여 문제 업데이트에 소모되는 시간을 최소화했습니다.

 

해당 Reader를 통해 어느 정도의 성능향상이 있었는지의 구체적인 비교는 다음 포스팅에서 다루겠습니다.