Adventure Time - Finn 3

새소식

Spring

메세지 큐를 활용한 트랜잭션 관리 - 기술적 챌린지

  • -

크래프톤 정글 2기의 마지막 커리큘럼인 "나만의 무기 갖기" 단계에서 개발하면서 겪었던 기술적 챌린지를 정리하려고 합니다.
 
우선 우리의 서비스는 개발자의 GitHub 활동을 분석하여 개인화된 이력서 템플릿을 제공하며, 기업과 개발자 간의 채용 과정을 간편하고 효율적으로 만들어주는 시스템입니다.
 
개발자의 GitHub를 분석해야 하다 보니 많은 API 호출 및 GPT API를 호출하는 과정이 발생합니다. 처음에는 기능만 목표로 구현을 했기 때문에 동기적으로 데이터를 호출하고 데이터 베이스에 저장했습니다. 하지만 GitHub계정을 분석하는데 시간이 너무 오래 걸렸기 때문에 이 속도를 줄이기 위해 리액티브 프로그래밍인 FluxMono를 사용해서 비동기 호출로 코드를 변경하였습니다.
 
아래는 관련 코드들 중 하나입니다. 
 

public Mono<Void> savePatchs(String accessToken, Map<String, Map<String, List<String>>> orgRepoCommits) {
        List<Mono<JsonNode>> requestMonos = new ArrayList<>();

        for (Map.Entry<String, Map<String, List<String>>> orgEntry : orgRepoCommits.entrySet()) {
            String orgName = orgEntry.getKey();
            Map<String, List<String>> repoCommits = orgEntry.getValue();

            for (Map.Entry<String, List<String>> repoEntry : repoCommits.entrySet()) {
                String repoName = repoEntry.getKey();
                List<String> oids = repoEntry.getValue();

                for (String oid : oids) {
                    String commitDetailUrl = "https://api.github.com/repos/" + orgName + "/" + repoName + "/commits/" + oid;

                    Mono<JsonNode> requestMono = webClient.get()
                            .uri(commitDetailUrl)
                            .header("Authorization", "Bearer " + accessToken)
                            .retrieve()
                            .bodyToMono(JsonNode.class);

                    requestMonos.add(requestMono);
                }
            }
        }

        return Flux.merge(requestMonos)
                .flatMap(response -> processResponse(response))
                .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(5)))
                .onErrorResume(WebClientResponseException.class, error -> {
                    log.error("Error while fetching commit detail: {}", error.getMessage());
                    return Mono.empty();
                })
                .then();
    }

 
 
위 코드는 Spring WebFlux에서 제공하는 비동기 HTTP 클라이언트 라이브러리인 webClient를 활용하여 서버의 데이터를 비동기적으로 요청하고, 각 응답을 Mono <JsonNode>로 변환하여 리스트에 저장합니다. 그리고 Flux.merge를 통해 병합한 후 처리하고 재시도와 오류 처리를 수행합니다. 
 
이렇게 비동기로 처리 방식을 바꿨더니 데이터 베이스에 데이터가 저장될 때 트랜잭션과  관련하여 문제가 발생했습니다. 매 번 비동기적으로 호출하고 저장할 때 몇 개의 데이터가 저장이 안 되는 상황이 발생했습니다. 
 
 
트랜잭션이란 데이터베이스 관리 시스템(DBMS)에서 데이터를 일관되고 안전하게 관리하기 위한 작업의 논리적 단위입니다.
트랜잭션은 원자성, 일관성, 격리성, 지속성 등의 특징을 가지는데 여기서 데이터가 저장이 안되는 상황은 원자성에 해당합니다. 원자성이란 작업들을 모두 완벽하게 수행하거나, 전혀 수행하지 않는 원자적인 단위로 처리됩니다. 즉, 작업 중 하나라도 실패하면 전체 트랜잭션이 롤백되어 이전 상태로 복원되는 특징입니다.
 
 
 

트랜잭션 특징

 
 
 
이 트랜잭션 문제를 해결하기 위해 여러가지 방법을 찾아보다가 각 비동기 작업이 완료되면 작업의 상태를 주기적으로 확인해서 트랜잭션을 관리하는 방법과 메시지 브로커를 이용하여 트랜잭션 관리를 메시지 브로커를 통해 이루어지는 방법을 알게 되었습니다. 모든 작업의 상태를 주기적으로 확인하기에는 데이터가 너무 많고 코드가 복잡하기 때문에 완벽하게 잡기에는 어렵다고 생각이 들었고 메시지 브로커를 이용하는 방법을 도입해 봤습니다.
 
 
우선 메시지 브로커는 분산 시스템에서 메세지 기반의 통신을 중개하고 관리하는 서비스입니다. 메세지 브로커는 메세지를 수신하고 발신하는 역할을 하며, 메세지 큐를 포함하여 다양한 패턴과 기능을 제공합니다.
 
 
 
여기서 저희는 메세지 큐와 관련된 RabbitMQ와 Kafka에 대해서 알아보게 되었고 두 개의 모델에 관련하여 차이점을 간단하게 설명하자면
 
 

1. 메시지 모델

  • RabbitMQ: RabbitMQ는 AMQP(Advanced Message Queuing Protocol) 프로토콜을 기반으로 메시지 큐 형태의 모델을 가지고 있습니다. 메시지 큐를 통해 메시지를 저장하고 전달합니다.
  • Kafka: Kafka는 publish-subscribe 모델을 기반으로 하며, 데이터 스트림을 처리하기 위해 고안되었습니다. 데이터 스트림은 토픽을 통해 발행되고, 구독자들이 해당 토픽의 데이터를 소비합니다.

 

2. 데이터 처리 방식

  • RabbitMQ: RabbitMQ는 메시지를 큐에 저장하고 해당 큐의 구독자에게 전달합니다. 주로 작은 크기의 메시지 또는 태스크를 처리하는 데 적합합니다.
  • Kafka: Kafka는 대용량의 실시간 데이터 스트림을 처리하는 데 특화되어 있습니다. 이벤트 로그, 로그 파일, 센서 데이터 등과 같은 대량의 데이터 스트림을 처리하는 데 사용됩니다.

 
3. 지연 및 처리량

  • RabbitMQ: RabbitMQ는 작은 크기의 메시지를 처리하는 데 효율적이며, 짧은 지연 시간을 가집니다. 대량의 메시지를 처리할 때 처리량이 일부 제한될 수 있습니다.
  • Kafka: Kafka는 대용량의 데이터 스트림을 처리하는 데 특화되어 있으며, 높은 처리량을 제공하고 지연 시간을 최소화합니다.

 

4. 유연성 

  • RabbitMQ: RabbitMQ는 메시지 라우팅, 큐 설정, 다양한 메시지 패턴에 대한 유연한 구성이 가능합니다.
  • Kafka: Kafka는 데이터 유실 없이 지속적인 데이터 스트림을 처리하기 위해 설계되었기 때문에 데이터를 영구적으로 저장하고 스케일 아웃하는 데 중점을 둡니다.

 

5. 용도

  • RabbitMQ: RabbitMQ는 메시지 큐나 태스크 큐와 같은 비동기 작업 처리, 작은 규모의 이벤트 처리에 적합합니다.
  • Kafka: Kafka는 대용량 데이터 스트림 처리, 로그 처리, 이벤트 소싱 및 데이터 파이프라인 구축에 적합합니다.

 
 
위와 같은 차이점들 중에서 Kafka는 대용량 데이터 처리를 할 때 특화돼있고 RabbitMQ는 작은 크기의 메시지를 처리하는데 효율적이고 비동기 작업 처리에 적합한 점에서 RabbitMQ를 결정했습니다.
 
 

Mono<Void> saveAndSendMono = distinctPatchesToSave.collectList()
        .flatMap(patches -> {
            patchRepository.saveAll(patches);
            return Flux.fromIterable(patches)
                    .flatMap(patch -> messageOrgSenderService.orgPatchSendMessage(patch))
                    .then();
        });

operations.add(saveAndSendMono);

return Flux.merge(operations).then();

 
위 코드는 distinctPatchesToSave로부터 가져온 고유한 패치 목록을 저장하고, 각 패치를 메시지로 보내는 비동기 작업을 병렬로 처리합니다. API를 비동기로 호출하고 위와 같은 과정으로 메세지 큐에 전송을 한 다음에 메세지 큐에서 데이터를 순차적으로 처리를 하면서  전체적인 로직 속도가 약90초에서 19초정도로 79.6%정도 개선이 되었고 데이터베이스에 모든 데이터가 잘 저장됐습니다. 
 
 
처음에는 단순히 비동기식으로 코드를 바꾸기만 하면 문제가 없을 줄 알았지만 막상 코드를 바꿔보니 트랜잭션과 관련하여 여러 부분에서 문제가 발생하였고 이를 해결하려고 알아보다가 트랜잭션과 메시지 큐에 대해서 좀 더 알 수 있어서 좋은 경험이 되었던 것 같습니다.
 
하지만 이런 메세지큐를 구현할 때 주의해야 할 점이 있는데, 메시지가 중간에 유실되거나 처리되지 않는 문제에 대해서 생각해봐야 합니다.
 
다음 포스팅에서는 이런 부분을 반영해서 코드를 리팩터링 하여 문제점을 해결한 코드를 정리하겠습니다.
 
관련 전체 코드는 아래 깃허브 참고하세요.
https://github.com/DevProfiIe/Dev-Profile-Backend

GitHub - DevProfiIe/Dev-Profile-Backend: "Krafton Jungle: 나만의 무기 갖기" - 개발자의 GitHub 활동을 분석하여

"Krafton Jungle: 나만의 무기 갖기" - 개발자의 GitHub 활동을 분석하여 개인화된 이력서 템플릿을 제공하며, 기업과 개발자 간의 채용 과정을 간편하고 효율적으로 만들어주는 시스템 - GitHub - DevProfiI

github.com

 

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.