본문 바로가기

MicroService

Message-Driven 마이크로서비스

Message Driven 마이크로서비스에 관하여


이번 포스팅에서 마이크로서비스 간의 비동기 통신을 어떻게 구현하는지에 대해 살펴보자. 비동기 one-to-one, publish/subscribe 통신 스타일을 RabbitMq와 Spring Cloud Stream을 사용해서 구현해 볼 것이다.


이번 포스팅에서는

  • Spring Cloud Stream과 관련된 용어와 개념들
  • Binder 역할을 하는 RabbitMQ 메시지 브로커 사용하기
  • Spring Cloud Stream 프로그래밍 모델
  • Binding, Producers, Consumers 설정하기

Spring Cloud Stream이란?

Message Broker와 통신할 수 있도록 도와주는 것이다. Spring Cloud Stream으로 만들어진 application은 input/output 채널을 통해 다른 마이크로서비스와 통신할 수 있다. 이 채널들은 외부 메시지 브로커와 연결되어 있다. RabbitMq와 Kafka가 이 역할을 해주는 Binder 구현체이다. 

Spring Cloud Stream을 적용한 메시징 시스템 만들기

주문이 접수되면 order-service는 단지 주문을 데이터베이스에 저장만 하고 message broker에 이벤트를 던진다. 그럼 account-service가 해당 이벤트를 받아 다른 작업들을 asynchronous 진행한다. ( point-to-point )

account-service는 product-service 호출을 통해 가격을 조회하고, 계좌 잔액과 비교하여 주문 상태를 결정한다. 이 주문 상태를 order-service에게 message broker에게 돌려준다. 


Spring Cloud Stream 적용하기

Spring Cloud Stream에서 제공하는 어노테이션을 통해 어플리케이션과 메시지브로커를 연결할 수 있다. 
  • Sink : inbound 채널로부터 메시지를 받아오는 서비스에 기입
  • Source : outbound 채널에 메시지를 보내는 서비스에 기입
  • Processor : Sink + Source 둘다 필요한 경우 ( 위에서 order-service와 같은 경우 )

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Processor.class)
public class OrderApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
}

}


Channel 선언, 연결하기

Spring 덕분에 spring application과 message broker 구현체는 독립적이다. Spring Cloud Stream은 binder를 자동적으로 감지하고 사용한다. 

메시지 브로커에 주문 메시지를 던져 줄 Producer를 만들어보자.


@Service
public class OrderSender {

@Autowired
private Source source;

public boolean send(Order order) {
return this.source.output().send(MessageBuilder.withPayload(order).build());
}

}


@RestController
public class OrderController {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
private ObjectMapper mapper = new ObjectMapper();

@Autowired
OrderRepository repository;
@Autowired
OrderSender sender;

@PostMapping
public Order process(@RequestBody Order order) throws JsonProcessingException {
Order o = repository.add(order);
LOGGER.info("Order saved: {}", mapper.writeValueAsString(order));
boolean isSent = sender.send(o);
LOGGER.info("Order sent: {}", mapper.writeValueAsString(Collections.singletonMap("isSent", isSent)));
return o;
}

}


메시지 브로커에서 주문처리 이벤트를 들을 Receiver를 구현해보자.


@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Processor.class)
public class AccountApplication {

@Autowired
AccountService service;

public static void main(String[] args) {
new SpringApplicationBuilder(AccountApplication.class).web(true).run(args);
}

@Bean
@StreamListener(Processor.INPUT)
public void receiveOrder(Order order) throws JsonProcessingException {
service.process(order);
}

}



Message Broker 연결 설정하기

포트 15672는 웹화면에서 RabbitMq를 볼 수 있다.

docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management


스프링 프로젝트 application.yml에는

spring:
rabbitmq:
host: 192.168.99.100
port: 5672


spring:
cloud:
stream:
rabbit:
bindings:
output:
producer:
exchangeType: direct
input:
consumer:
exchangeType: direct


아래와 같이 channel에 이름도 줄 수 있다.

order-service 

spring:
cloud:
stream:
bindings:
output:
destination: orders-out
input:
destination: orders-in


account-service

spring:
cloud:
stream:
bindings:
output:
destination: orders-in
input:
destination: orders-out