ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 스프링으로 시작하는 리액티브 프로그래밍 1 : 리액티브 입문과 Non-Blocking I/O
    책책책 책을 읽읍시다/프로그래밍 2024. 8. 5. 23:15

    저자 : 황정식

    들어가며


    표지

     리액티브 프로그래밍과 Spring WebFlux에 대해 부담없는 수준으로 알려주는 책이다. 개인적으로는 리액티브 프로그래밍에 대해 개념과 간단한 동작원리를 알 수 있어서 나쁘지 않았다. 다만, 이 한권으로 완벽히는 다 못가져갈 것 같고, 나도 다 읽기는 했지만 자바 ORM 표준 JPA 프로그래밍 때처럼 이 기술에 대해 어느정도 감을 잡은 것 같다라는 자신감은 얻지 못했다.

     

    Chapter 01. 리액티브 시스템과 리액티브 프로그래밍


     'reactive'의 사전적 의미에는 '반응을 하는'이라는 뜻이 있는데, 어떤 이벤트나 상황이 발생했을 때, 반응을 해서 그에 따라 적절하게 행동하는 것을 의미한다. 리액티브 시스템에서 반응을 잘한다는 것은 클라이언트의 요청에 머뭇거리지 않고 반응을 잘해서 즉시 응답해 주는 것을 의미한다. 다시 말해서 클라이언트의 요청에 즉각적으로 응답함으로써 지연 시간을 최소화한다고 볼 수 있다.

     

    아래는 리액티브라는 용어의 의미를 올바르게 정의하기 위해 노력하는 사람들이 만든 리액티브 시스템 구축을 위한 일종의 설계 원칙이자 리액티브 시스템의 특징이라고 할 수 있다.

    리액티브 선언문 설계 원칙

    • MEANS는 리액티브 시스템에서 주요 통신 수단으로 무엇을 사용할 것인지 표현한 것이다. 비동기 메시지 기반의 통신을 통해서 구성요소들 간의 느슨한 결합, 격리성, 위치 투명성을 보장한다.
    • FORM은 메시지 기반 통신을 통해서 어떠한 형태를 지니는 시스템으로 형성되는지를 나타낸다. 비동기 메시지 통신 기반하에 탄력성과 회복성을 가지는 시스템이어야 함을 보여준다.
      • 리액티브 시스템에서 탄력성(Elastic)이란 시스템의 작업량이 변화하더라도 일정한 응답을 유지하는 것을 의미한다. 즉, 시스템으로 유입되는 입력이 많든 적든 간에 시스템에서 요구하는 응답성을 일정하게 유지하는 것이다.
      • 회복성(Resilence)이란 시스템에 장애가 발생하더라도 응답성을 유지하는 것을 의미한다. 회복성이 없다면 장애 발생 시 시스템이 응답하지 못하는 심각한 문제에 직면하기 때문에 이를 방지하기 위해서 회복성은 리액티브 시스템의 중요한 설계 원칙이라고 볼 수 있다. 이러한 회복성을 확보하기 위해 리액티브 시스템의 구성요소들은 비동기 메시지 기반 통신을 통해 느슨한 결합과 격리성을 보장하는 것이다. 시스템의 구성요소들이 독립적으로 분리되기 때문에 장애가 발생하더라도 전체 시스템은 여전히 응답 가능하고 장애가 발생한 부분만 복구하면 된다는 의미다.
    • VALUE는 비동기 메시지 기반 통신을 바탕으로 한 회복성과 예측 가능한 규모 확장 알고리즘을 통해 시스템의 처리량을 자동으로 확장하고 축소하는 탄력성을 확보함으로써 즉각적으로 응답 가능한 시스템을 구축할 수 있음을 의미한다.

     

    Chapter 02. 리액티브 스트림즈(Reactive Streams)


    아래 표는 리액티브 스트림즈를 통해 구현해야 되는 API 컴포넌트들이다.

    컴포넌트 설명
    Publisher 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
    Subscriber 구독한 Publisher로부터 통지(발행, 게시, 방출)된 데이터를 전달받아서 처리하는 역할을 한다.
    Subscription Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할을 한다.
    Processor Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscirber가 구독할 수 있다.

    Publisher와 Subscriber의 동작 과정

     

     

    Chapter. 03 Blocking I/O와 Non-Blocking I/O


    3. 1 Blocking I/O

    Blocking I/O 예시

     본사 API 서버에서 지점 API 서버로 요청을 보내는 그 시점에 본사 API 서버에서 실행된 스레드(요청 스레드)는 차단되어 지점 API 서버의 스레드(작업 스레드)가 처리를 끝내고 응답을 반환하기 전까지 대기하게 된다.

     Blocking I/O 방식의 문제점을 보완하기 위해 멀티스레딩 기법으로 추가 스레드를 할당하여 차단된 그 시간을 효율적으로 사용할 수는 있지만, 컨텍스트 스위칭(Context Switching)으로 인한 스레드 전환 비용, 과다한 메모리 사용으로 오버헤드 발생, 스레드 풀(Thread Pool)에서의 응답 지연(유휴 스레드가 없을 경우)이라는 문제가 발생한다.

    3. 2 Non-Blocking I/O

     Non-Blocking I/O는 Blocking I/O와 반대로 스레드가 차단되지 않는다.

    Non-Blocking I/O 예시

     A 지점 API 서버로의 요청을 처리하는 동안 스레드가 차단되지 않기 때문에 대기 없이 B 지점 API 서버로 요청을 즉시 보낼 수 있다. 작업 스레드의 종료 여부와 관계없이 요청한 스레드는 차단되지 않는다. 스레드가 차단되지 않기 때문에 하나의 스레드로 많은 수의 요청을 처리할 수 있다. 즉, Blocking I/O 방식보다 더 적은 수의 스레드를 사용하기 때문에 Blocking I/O에서 멀티스레딩 기법을 사용할 때 발생한 문제점들이 생기지 않는다. 따라서 CPU 대기 시간 및 사용량에 있어서도 대단히 효율적이다. 단, CPU를 많이 사용하는 작업이나 작업 흐름 중 Blocking I/O 요소가 포함된 경우에는 Non-Blocking의 이점을 발휘하기 힘들다.

     

     Spring MVC와 WebFlux의 가장 큰 차이점은 MVC는 Blocking I/O 방식이고, WebFlux는 Non-Blocking I/O 방식이라는 점이다.

     

    이제 코드로 Blocking/Non-Blocking 방식의 차이점을 알아보자. 저자의 깃헙(https://github.com/bjpublic/Spring-Reactive) 레포지토리를 참조하였다.

    Blocking I/O 예제 코드

    Blocking I/O 본사 API 서버의 컨트롤러

    @Slf4j
    @RestController
    @RequestMapping("/v1/books")
    public class MvcController {
        private final RestTemplate restTemplate;
    
        URI baseUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("localhost")
                .port(7070)
                .path("/v1/books")
                .build()
                .encode()
                .toUri();
    
        public MvcController(RestTemplateBuilder restTemplateBuilder) {
            this.restTemplate = restTemplateBuilder.build();
        }
    
        @ResponseStatus(HttpStatus.OK)
        @GetMapping("/{book-id}")
        public ResponseEntity<Book> getBook(@PathVariable("book-id") long bookId) {
            URI getBookUri = UriComponentsBuilder.fromUri(baseUri)
                    .path("/{book-id}")
                    .build()
                    .expand(bookId)
                    .encode()
                    .toUri(); // http://localhost:7070/v1/books/{book-id}
    
            ResponseEntity<Book> response = restTemplate.getForEntity(getBookUri, Book.class);
            Book book = response.getBody();
    
            return ResponseEntity.ok(book);
        }
    
    }

    우리에게 익숙한 MVC 환경 기반의 코드이다.

     

    Blocking I/O 지점 API 서버

    @Slf4j
    @RestController
    @RequestMapping("/v1/books")
    public class BranchController {
        private Map<Long, Book> bookMap;
    
        @Autowired
        public BranchController(Map<Long, Book> bookMap) {
            this.bookMap = bookMap;
        }
    
        @ResponseStatus(HttpStatus.OK)
        @GetMapping("/{book-id}")
        public ResponseEntity<Book> getBook(@PathVariable("book-id") long bookId)
                throws InterruptedException {
            Thread.sleep(5000);
    
            Book book = bookMap.get(bookId);
    
            return ResponseEntity.ok(book);
        }
    }

    Thread.sleep으로 의도적으로 지연을 발생시켜 오래 걸리는 작업인 것처럼 시뮬레이션한다.

     

    Blocking I/O 클라이언트

    @Slf4j
    @SpringBootApplication
    public class MvcApplication {
        private URI baseUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("localhost")
                .port(8080)
                .path("/v1/books")
                .build()
                .encode()
                .toUri();
    
        public static void main(String[] args) {
            SpringApplication.run(MvcApplication.class, args);
        }
    
        @Bean
        public RestTemplateBuilder restTemplate() {
            return new RestTemplateBuilder();
        }
    
        @Bean
        public CommandLineRunner run() {
            return (String... args) -> {
                log.info("# 요청 시작 시간: {}", LocalTime.now());
    
                for (int i = 1; i <= 5; i++) {
                    Book book = this.getBook(i);
                    log.info("{}: book name: {}", LocalTime.now(), book.getName());
                }
            };
        }
    
        private Book getBook(long bookId) {
            RestTemplate restTemplate = new RestTemplate();
    
            URI getBooksUri = UriComponentsBuilder.fromUri(baseUri)
                    .path("/{book-id}")
                    .build()
                    .expand(bookId)
                    .encode()
                    .toUri(); // http://localhost:8080/v1/books/{book-id}
    
            ResponseEntity<Book> response =
                    restTemplate.getForEntity(getBooksUri, Book.class);
            Book book = response.getBody();
    
            return book;
        }
    }

    for 문에서 순회하며 본사 API 서버의 API를 5회 호출한다.

    실행 결과를 보면 전체 조회 시간이 약 25초 정도 걸렸다. Blocking I/O 방식이기 때문에 스레드가 차단되었기 때문이다.

     

    이번엔 Non-Blocking I/O 예제를 보자.

    Blocking I/O 실행결과

    작업 흐름은 본사에서 지점 API를 호출하는 것으로 같다.

     

    Non-Blocking I/O 본사 API 서버

    @Slf4j
    @RequestMapping("/v1/books")
    @RestController
    public class ReactiveHeadController {
        URI baseUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("localhost")
                .port(5050)
                .path("/v1/books")
                .build()
                .encode()
                .toUri();
    
        @ResponseStatus(HttpStatus.OK)
        @GetMapping("/{book-id}")
        public Mono<Book> getBook(@PathVariable("book-id") long bookId) {
            URI getBookUri = UriComponentsBuilder.fromUri(baseUri)
                    .path("/{book-id}")
                    .build()
                    .expand(bookId)
                    .encode()
                    .toUri(); // http://localhost:5050/v1/books/{book-id}
    
            return WebClient.create()
                    .get()
                    .uri(getBookUri)
                    .retrieve()
                    .bodyToMono(Book.class);
        }
    }

     

    Spring MVC에서는 RestTemplate을 사용해 요청을 전송하고 응답으로 받은 데이터를 ResponseEntity 클래스를 사용해서 반환했다. WebFlux에서는 WebClient를 사용해서 요청을 전송한 후, 응답으로 전달받은 데이터를 Mono 타입으로 바꾸는 과정을 거친 후 최종적으로 Mono 타입의 객체를 반환한다. WebClient는 Non-Blocking I/O 방식을 지원하는 HTTP 클라이언트이고, Mono는 Reactor에서 지원하는 Publisher 타입 중 하나이다.

     

    Non-Blocking I/O 지점 API 서버

    @Slf4j
    @RequestMapping("/v1/books")
    @RestController
    public class ReactiveBranchController {
        private Map<Long, Book> bookMap;
    
        @Autowired
        public ReactiveBranchController(Map<Long, Book> bookMap) {
            this.bookMap = bookMap;
        }
    
        @ResponseStatus(HttpStatus.OK)
        @GetMapping("/{book-id}")
        public Mono<Book> getBook(@PathVariable("book-id") long bookId)
                throws InterruptedException {
            Thread.sleep(5000);
    
            Book book = bookMap.get(bookId);
            log.info("# book for response: {}, {}", book.getBookId(), book.getName());
            return Mono.just(book);
        }
    }

    마찬가지로 Therad.sleep을 통해 응답을 의도적으로 지연시켰다.

     

    Non-Blocking I/O 클라이언트

    @Slf4j
    @SpringBootApplication
    public class ReactiveHeadApplication {
        private URI baseUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("localhost")
                .port(6060)
                .path("/v1/books")
                .build()
                .encode()
                .toUri();
        public static void main(String[] args) {
            System.setProperty("reactor.netty.ioWorkerCount", "1");
            SpringApplication.run(ReactiveHeadApplication.class, args);
        }
    
        @Bean
        public CommandLineRunner run() {
            return (String... args) -> {
                log.info("# 요청 시작 시간: {}", LocalTime.now());
    
                for (int i = 1; i <= 5; i++) {
                    int a = i;
                    this.getBook(i)
                            .subscribe(
                                    book -> {
                                        // 전달 받은 도서를 처리.
                                        log.info("{}: book name: {}",
                                                LocalTime.now(), book.getName());
                                    }
                            );
                }
            };
        }
    
        private Mono<Book> getBook(long bookId) {
            URI getBooksUri = UriComponentsBuilder.fromUri(baseUri)
                    .path("/{book-id}")
                    .build()
                    .expand(bookId)
                    .encode()
                    .toUri(); // http://localhost:6060/v1/books/{book-id}
    
            return WebClient.create()
                    .get()
                    .uri(getBooksUri)
                    .retrieve()
                    .bodyToMono(Book.class);
        }
    }

    Blocking 때와 마찬가지로 for문에서 본사 API를 5회 호출한다.

    Non-Blocking I/O 실행 결과

    Blocking I/O 기반 코드에서 25초가 걸렸던 것이 5.5초 정도 밖에 걸리지 않았다. 스레드가 차단되지 않았기 때문에 먼저 요청한 작업이 끝나지 않아도 미리 지점 API를 호출할 수 있어서이다.

     

     

    댓글

Designed by Tistory.