[모두의 랜덤 디펜스] Spring Batch 5.X로 업데이트된 문제 가져오는 Job 만들기 - 1편

2024. 3. 9. 18:45Backend/Spring

https://morandi.co.kr

 

모두의 랜덤 디펜스

모두의 랜덤 디펜스: 코딩테스트를 준비하는 최고의 솔루션!

morandi.co.kr

모두의 랜덤 디펜스는 코딩 테스트 실전 모의고사 서비스입니다.

기존 서비스를 아주 살짝~~ 피봇팅하여 게이밍 포인트를 추가한 서비스를 개발중에 있습니다.

 

1차 MVP로는 백준의 문제 풀을 이용하여 다양한 컨텐츠의 문제 풀이 환경을 제공하는 것을 목표로 하고 있는데, 문제 데이터의 지속 업데이트 기능이 필요하게 되어 Spring Batch에 관심을 가지게 되었습니다.

개요

모두의 랜덤 디펜스에서 '랜덤 디펜스' 기능은 3만개의 문제 리스트 중에서 저희가 '정의한 로직'을 바탕으로 문제를 뽑아 사용자에게 풀 수 있게 제공합니다!

 

'정의한 로직'은 사용자가 아직 풀어본 적이 없는 문제 중

 

solved.ac 기준 풀어낸 사용자 수, solved.ac 난이도, solved.ac 알고리즘 태그를 기준으로 문제를 출제하게 됩니다.

 

모두의 랜덤 디펜스를 아직 사용하지 않는 사용자들도 백준 문제를 풀게 될 것이고, 위 3개의 정보는 지속해서 변화하게 될 것입니다!

 

따라서, 문제 출제 로직과 밀접한 관련이 있는 만큼 주기적으로

문제들이 속하는 알고리즘 태그, solved.ac 난이도, 풀어낸 사용자 수와 같은 정보를

최신화하여 유지할 필요성을 느꼈습니다.

분석

Spring Batch를 사용하는 주요 이유는,

 

배치 작업인 대규모 데이터의 업데이트 작업을 비즈니스 로직이 작동중인 백엔드에서 함께 작동시키게 되면 많은 자원을 점유하여

 

사용자들의 경험 악화로 이어질 수 있기에 백엔드와 분리한 서버에서, 사용자들이 주로 사용하지 않는 시간대에 배치를 수행하여 해결하고자 하는 것입니다.

 

또한 Spring Batch를 이용하면 Batch의 Meta Table로 배치 프로세스의 실행 이력과 성공, 실패 여부를 추적할 수 있게됩니다.

 

배치 프로세스의 실행 이력 이런 걸 왜 추적해야해? 라고 생각할 수도 있지만

 

만약 예를 들어 문제에 포함된 알고리즘 태그를 업데이트 하다가 갑자기 Exception이 터져 작업이 종료되었다면? 데이터 누락이 발생하기 때문에 이에 대한 정보를 확인하고, 해결 후 다시 재 실행해줘야 할 것 입니다.

 

보통 배치는 야심한 시각에 주기적으로 실행하게 될텐데, 이런 정보가 없으면 항상 사람이 붙어서 쳐다보고 있어야 합니다~


AWS Batch?

만약 EC2위에 Spring Batch를 사용하려면

 

아마 Scheduler + Spring Batch를 이용하여 주기적으로 Batch 작업을 돌리게 될 것 같습니다.

 

하지만 Scheduler를 동작시키려면 EC2 인스턴스가 계속 동작하고 있어야하고, 이는 Spring Batch가 활성화 되지 않을 때조차 동작을 해야하기에 비용최적화 관점에서 아주 비효율적이게 됩니다.

 

Spring Batch 관리 도구로 Jenkins를 사용하는 경우도 많은데, 저희는 배포 자동화 툴로 Github Actions를 사용하고 있는 점으로 해당 사항은 검토하지 않았습니다.

 

저는 이런 상황에서 Fargate와 같은 서버리스 플랫폼을 사용하면 좋겠다~~ 라고 생각을 했는데, AWS Batch를 살펴보니 Fargate 옵션이 존재했습니다.

 

AWS에서도

30초 이내에 작업을 시작해야 하고, 작업에 vGPU 리소스가 필요하지 않고, vCPU가 4개 이하이고, 30GiB 미만의 메모리가 필요한 경우 Fargate를 선택합니다.

 

위처럼 소개해둔걸로 미루어보아 서버 인스턴스의 크기가 지나치게 크게 필요하지 않다면 Fargate를 사용하는 옵션을 적극적으로 검토할만 한 것 같습니다!

 

만약 AWS Batch로 Fargate를 이용하게 되면 Scheduling은 Spring Batch 내부가 아니라 AWS의 EventBridge를 이용하면 해결할 수 있습니다.


Spring Batch 정의하기

Spring Batch 개념에 대한 내용은 이 블로그 보다 잘 설명한 블로그가 많을 것 같기에 과감하게 패스하고, 제가 정의헀던 Job에 대해 소개해보겠습니다.

 

Morandi Batch는 일단 크게 2개의 Job으로 작동됩니다.

 

1. 아직 업데이트 되지 않은 최신 문제가 있는지 확인하고 문제 목록을 업데이트한다.

 

2. 문제 목록을 스캔하여 알고리즘 태그, solved.ac 난이도, 풀어낸 사용자 수를 업데이트 한다.

 

이후 포스팅으로 올릴 내용인데, Algorithm 태그 종류는 정적이기 때문에 Backend가 동작할 때 PostConstruct을 통해 json을 바탕으로 초기화되도록 구현해뒀습니다.

 

먼저 1번의 Job을 정의하는 과정을 이번 포스팅에서 작성해 보겠습니다.


Spring Batch Job 정의하기

Spring Batch 5.X로 올라오면서 JobBuilderFactory가 Deprecated되고, 직접 JobBuilder를 생성해서 정의하게 바뀌었습니다.

Job Bean이 생성되면서 필요한 JobRepository와 transactionManger는 자동으로 DI됩니다.

 

JobRepository와 transactionManger의 역할은?

JobRepository는 배치 실행과 관련된 메타 데이터를 저장하고 있습니다.

작업의 실행 상태, 성공 여부, 재시작 가능 여부등을 추적합니다.

 

transactionManger는 배치 작업 시 DB와의 작업에서 데이터 일관성을 보장하고, 실행 중 예외가 발생하면 롤백할 수 있게하는 트랜잭션을 관리합니다. 이건 JPA에서도 들어본 적이 있으실 거에요.

 

여기에서 이용하는 PlatformTranscatonManager를 상속받은 AbstractPlatformTransactionManger 주상 클래스를 JpaTransactionManger에서 구현하고 있습니다.


Spring Batch Step 정의하기

StepBuilderFactory또한 이번에 Deprecated되면서 StepBuilder를 직접 생성하여 Step을 정의하게 됩니다.

Spring Batch에서는 Tasklet과 Chunk 두 개의 처리 방법이 존재합니다.

Tasklet

step이 간단하거나, 단일 작업을 수행하거나 할 때 사용합니다.

일반적으로 하나의 작업 단위가 실행될 때마다 트랜잭션 관리가 됩니다.

그 하나의 작업 단위에서 만약 예외가 터지면 전체가 롤백되고, 그 단위를 처음부터 싹 다시 실행해야 합니다 ㅜ

Chunk

ItemReader, ItemProcessor, ItemWriter를 사용하는 데이터를 쓰고, 처리하여, 쓰는 과정을 추상화합니다.

Chunk를 쓰는 장점은 ItemReader, ItemProcessor, ItemWriter로 분리하여 구조화할 수 있고,

재시작 가능한 지점을 유지하여, 많은 양의 처리 중 실패 시점부터 재개할 수 있다는 장점이 있습니다.

 

또한, Chunk 방식에서는 트랜잭션이 chunk 단위로 관리됩니다.

 

1번 Job같은 경우(새로운 문제 업데이트하는 Job) 사실 Tasklet으로 해도 관계 없기는 하지만

 

만약 DB가 비어있을 때에도 이 Batch를 사용해야하고, 그런 경우 약 3만개의 데이터를 DB에 로드해야 한다는 점으로 Chunk방식을 선택했습니다.

 

그리고 무엇보다도 Reader, Processor, Writer 3가지로 명확하게 분리가 된다는 점이 와닿았습니다!


NewProblemReader 구현하기

NewProblemReader는 solved.ac api를 이용합니다.

 

solved.ac API에서 queryParam에 qeury를 작성하면, page단위로 문제 목록을 조회할 수 있습니다.

 

ex) https://solved.ac/api/v3/search/problem?query={query}&page={page}

 

만약 저 query랑 page 자리를 채워주면 다음과 같습니다.

https://solved.ac/api/v3/search/problem?query=id:1000..&page=1

 

응답은 다음과 같습니다.

"count": 29876,
"items": [
{
"problemId": 1000,
"titleKo": "A+B",
.
.

 .  
 "acceptedUserCount": 283161,  

},
{
"problemId": 1001,
"titleKo": "A-B",
.
.

 .  
 "acceptedUserCount": 283161,  

}

그래서 위 응답에 알맞은 DTO 클래스를 만들고 WebClient를 통해 구현해보았습니다.

lastBaekjoonProblemId는 ProblemRepository에서 마지막 문제 번호를 조회하여 저장해둡니다.

 

ProblemResponse에는

ProblemDTO 리스트가 정의돼있습니다.

 

chunk 단위로 진행되는 Batch 특성상, ProblemResponse를 바로 전달하는 것은 비효율적이고, ProblemDTO를 한 item으로 전달해야합니다.

 

(chunk 사이즈랑 page사이즈를 같게 해야 효과적입니다. 이거는 PagingItemReader의 내용이긴 하지만)

Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.
page 사이즈를 크게 하고, page size와 맞는 commit interval을 사용하면 성능이 향상된다.

 

트랜잭션 한 번 처리하는데, API 여러 번 호출되면 전체적인 성능에 효과적이지 못합니다

따라서 내부에 Queue를 구성하고, read 한 번 당 ProblemDTO 하나씩 반환하도록 설계를 수정했습니다.

수정한 New ProblemReader

@Component
@Slf4j
public class NewProblemPagingReader implements ItemReader<ProblemDTO>, InitializingBean {
    private final WebClient webClient;
    private final ProblemRepository problemRepository;
    private Long lastBaekjoonProblemId;
    private int nextPage = 1;
    private final ArrayDeque<ProblemDTO> problemsQueue = new ArrayDeque<>();

    public NewProblemPagingReader(WebClient.Builder webClientBuilder, ProblemRepository problemRepository) {
        this.webClient = webClientBuilder.baseUrl("https://solved.ac/api/v3").build();
        this.problemRepository = problemRepository;
    }
    @Override
    public void afterPropertiesSet() {
        this.lastBaekjoonProblemId = this.problemRepository.findLastBaekjoonProblemId();
        if (lastBaekjoonProblemId == null) {
            lastBaekjoonProblemId = 0L;
        }
    }

    @Override
    public ProblemDTO read() {
        if (problemsQueue.isEmpty()) {
            fetchProblems();
        }
        return problemsQueue.poll();
    }
    private void fetchProblems() {
        log.info("Fetching problems for page: {} and lastBaekjoonProblemId: {}", nextPage, lastBaekjoonProblemId);
        Mono<ProblemsResponse> problemsResponseMono = webClient.get()
                .uri(uriBuilder -> uriBuilder.path("/search/problem")
                        .queryParam("query", "id:" + (lastBaekjoonProblemId + 1) + "..")
                        .queryParam("page", nextPage++)
                        .build())
                .retrieve()
                .bodyToMono(ProblemsResponse.class);

        ProblemsResponse problemsResponse = problemsResponseMono.block();
        if (problemsResponse != null && problemsResponse.getItems() != null) {
            problemsQueue.addAll(problemsResponse.getItems());
        }
    }
}

Queue를 사용하여 read요청이 올 때마다 poll하여 하나씩 반환합니다.

 

만약 비어있다면 fetchProblems를 통해 50개 단위로 새로운 문제를 가져오게 됩니다.

 

LinkedList를 사용하지 않고 ArrayDeque를 사용한 이유는

ArrayDeque가 전후간의 연결 관계를 유지하지 않기 때문에 조금 더 좋은 성능이 나오게 됩니다!!

마지막 부분에서는 ProblemDTO를 반환하는 것과 ProblemResponse를 반환하는 두 가지 구현의 구체적인 성능 비교까지 해보겠습니다.


NewProblemProcessor 구현하기

ProblemProecessor는 간단합니다.

각 ProblemDTO를 받아오고, 각 내부 값에 따라 Problem 인스턴스를 만들어 반환합니다.


NewProblemWriter 구현하기

Writer에는 JdbcBatchItemWriter를 사용했습니다.

 

Writer에서 List를 problemRepository.saveAll(~~)하는 것 보다 JdbcBatchWriter를 통해 bulk insert하는 것이 효과적입니다.

 

Problem에서는 ID Generatetype을 Identity로 사용하고 있으면, saveAll 메소드를 사용해도 배치 처리가 되지 않습니다. 왜냐하면 IDENTITY 전략이 데이터베이스에 의존하여 각각의 삽입 후에 ID를 생성해야 하므로, 각 엔티티 삽입마다 별도의 데이터베이스 호출이 필요하기 때문입니다.

GenerationType.SEQUENCE와 적절한 hibernate.jdbc.batch_size 설정하면 되기는 할테지만, 이 부분의 성능 차이는  다음 포스팅에서 다루도록 하겠습니다.

 

Spring Batch에서는 내부적으로 chunkSize만큼 Item이 쌓이면 Writer로 넘어가는데, 이 때 Bulk Insert를 처리합니다.

 

chunk size와 page size를 맞춘 설계가 가장 이상적이라고 알려져있는데, 이 부분의 성능 비교를 다음 포스팅에서 해보겠습니다.