티스토리 뷰

반응형

부제: RabbitMQ Spring 연동, RabbitMQ 연동

이전 글에서 이어집니다.

 

이번 시리즈에서는 Spring AMQP에 대해 소개하고 간단한 예제를 작성해본다. 이전 글에서 이어서 간단한 예제를 소개할 것이다.

 

 

1. 사전 준비 사항

 먼저 Spring Boot와 JAVA를 구동할 적절한 IDE가 필요하다. IntelliJ 를 추천한다. 그리고 실행 중인 RabbitMQ가 필요한데, 이번 예제에서는 Docker를 활용해서 간단하게 실행해볼 예정이다. 그러므로 Docker 설치가 필요하다.

 

 Docker가 설치된 PC에서 아래 명령어를 실행하면 RabbitMQ를 실행할 수 있다.

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

 

2. 소스코드 작성

 Spring boot 프로젝트를 생성한다. Spring boot에서는 amqp 연동을 위한 starter를 제공하므로 그것을 활용할 것이다. build.gradle을 아래와 같이 작성한다.

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.preamtree.rabbitdemo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

repositories {
    mavenCentral()
}

def springBootVersion = '2.3.1.RELEASE'
dependencies {
    implementation group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: springBootVersion
    implementation group: 'org.springframework.boot', name:'spring-boot-configuration-processor', version: springBootVersion
    implementation group: 'org.springframework.boot', name:'spring-boot-starter-amqp', version: '2.2.8.RELEASE'

    testImplementation('org.springframework.boot:spring-boot-starter-test:2.3.1.RELEASE') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

test {
    useJUnitPlatform()
}

 

 

 

 기본 생성되는 build.gradle에 spring-boot-starter-amqp 관련 의존성을 추가한 모습이다. 이 라이브러리에 대해 설정을 해주자. application.yaml 파일을 다음과 같이 작성한다.

 

spring:
  rabbitmq:
    host: localhost # rabbitMQ host (docker로 띄웠음)
    port: 5672 # default port
    username: guest # default username
    password: guest # default password
config:
  app:
    duration: 10000

 

 예제의 동작은 RabbitMQ 공식 사이트에 소개된 방식을 그대로 따를 예정이다. publisher와 subscriber를 하나의 프로젝트에 모두 구현하고, profile 환경변수로 publisher와 subscriber를 구분할 것이다. 그리고 scheduler를 활용하여 주기적으로 RabbitMQ에 publish할 것이다. main class는 다음과 같이 작성한다.

 

package com.preamtree.rabbitdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling // 주기적으로 publish 하기 위해 넣었음. Sender.class의 send() 메소드에 붙은 어노테이션 @Scheduled 참고.
public class RabbitdemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitdemoApplication.class, args);
    }

}

 

그리고 별다른 Controller 작성 없이 실행과 동시에 앱을 구동할 수 있도록 CommandLineRunner 하나를 구현해준다.

package com.preamtree.rabbitdemo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

// duration값 (default: 10seconds) 만큼 앱을 구동시킴.
@Profile("!test") // 테스트 버그 방지를 위함.
@Component
public class AppRunner implements CommandLineRunner {

    @Value("${config.app.duration:0}")
    private int duration;

    @Autowired
    private ConfigurableApplicationContext ctx;

    @Override
    public void run(String... arg0) throws Exception {
        System.out.println("Ready ... running for " + duration + "ms");
        Thread.sleep(duration);
        ctx.close();
    }
}

 

 10초간 앱을 작동시킨 후 앱을 종료한다. 이제 이 10초동안 동작할 pub-sub 예제를 작성할 것이다. 지금부터 소개하는 클래스는 Configuration 관련 내용을 다루는 클래스이다.

 

 

package com.preamtree.rabbitdemo.pubsub.component;

import com.preamtree.rabbitdemo.pubsub.RabbitMqConstants;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*
 * publisher, subscriber 모두에게 적용되는 내용.
 */
@Configuration
public class PubsubConfig {

    // exchange 정의.
    @Bean
    public FanoutExchange exchange() {
        return new FanoutExchange(RabbitMqConstants.EXCHANGE_NAME);
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

 common config이다. 이전 글에서 잠깐 다루었듯, Exchange는 publisher의 메시지를 subscriber가 수신하는 곳이다. 즉 publisher와 subscriber 모두에게 필요한 정의이다.

 

package com.preamtree.rabbitdemo.pubsub.component;

import com.preamtree.rabbitdemo.pubsub.RabbitMqConstants;
import com.preamtree.rabbitdemo.pubsub.Receiver;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

// subscriber는 exchange, queue 그리고 둘을 매핑하는 binding을 모두 정의한다.
@Profile("receiver")
@Configuration
public class ReceiverConfig {

    // durable, non-exclusive, non-autoDelete queue 1개 선언.
    @Bean
    public Queue queue() {
        return new Queue(RabbitMqConstants.QUEUE_NAME);
    }


    // exchange와 queue를 binding 해준다. 즉, exchange에서 데이터를 꺼내올 Queue 정의는 Receiver의 책임이다.
    @Bean
    public Binding binding(FanoutExchange exchange,
                           Queue tutorialQueue1) {
        return BindingBuilder.bind(tutorialQueue1).to(exchange);
    }

    // 정의된 메시지 리스너를 적용.
    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RabbitMqConstants.QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    // 메시지 리스너를 정의한다. Subscriber에 대한 설정 등.
    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver,
                                           Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "receive");
        messageListenerAdapter.setDefaultListenerMethod("receive"); // 실행할 메소드 지정.
        messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter); // Json형태로 받기 위해 MessageConverter 설정.
        return messageListenerAdapter;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

 

 Subscriber(receiver)에 대한 config class이다. 이 예제에서는 특정 조건의 메시지가 수신되면 "receive"라는 메소드를 실행하도록 설정되어있다. 이렇게 복잡한 설정 대신 원하는 메소드 위에 @RabbitListener 라는 어노테이션을 붙여서 해도 된다.

 

package com.preamtree.rabbitdemo.pubsub.component;

import com.preamtree.rabbitdemo.pubsub.Sender;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Profile("sender")
@Configuration
public class SenderConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory,
                                         Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter); // Json을 보내기 위해 메시지컨버터 세팅.
        return rabbitTemplate;
    }

    @Bean
    public Sender sender() {
        return new Sender();
    }
}

 

 publisher config class 이다. jackson 적용 외에는 특이사항이 없다. 그리고 custom-constant가 몇 개 있는데 간단하게 소개하고 넘어가겠다.

 

package com.preamtree.rabbitdemo.pubsub;

// 상수 모음.
public class RabbitMqConstants {
    public final static String EXCHANGE_NAME = "tutorial.pubsub";
    public final static String QUEUE_NAME = "tutorialQueue";
    public final static String ROUTING_KEY = ""; // FanoutExchange 를 쓰므로 routing Key는 DEFAULT을 사용.
}

 

 

그럼 이제 Sender와 Receiver, 그리고 주고 받는 메시지 모델에 대해 살펴보자.

 

package com.preamtree.rabbitdemo.pubsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class Sender {
    @Autowired
    private RabbitTemplate template;

    @Autowired
    private FanoutExchange fanout;

    AtomicInteger count = new AtomicInteger(0);

    // 앱이 실행되는 동안 10초 마다 메시지를 보낸다.
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() throws JsonProcessingException {
        
        // 메시지 모델 조립
        EventPayload eventPayload = new EventPayload();
        eventPayload.setEventName("Hello" + count.incrementAndGet());
        Map<String, Object> eventData = new HashMap<>();
        eventData.put("test", 99999);
        eventPayload.setData(eventData);

        // 모든 Sender는 Queue에 직접 송신하지 않고, exchange를 지정하여 거기로 송신.
        template.convertAndSend(RabbitMqConstants.EXCHANGE_NAME, RabbitMqConstants.ROUTING_KEY, eventPayload);
        System.out.println("Publisher - Sent '" + new ObjectMapper().writeValueAsString(eventPayload) + "'");
    }
}

 

 주석에 기술되어 있지만 메시지를 보낼 때는 사전에 설정한(이번 예제에선 FanoutExchange로 설정했다.) Exchange로 보내야한다. 즉, Publisher(Sender)는 Exchange만 신경쓰면 되는거다.

 

package com.preamtree.rabbitdemo.pubsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

// subscriber를 정의함.
public class Receiver {

    // 이 부분에 @RabbitListener을 붙여도 된다. (이번 예제에서는 Config로 직접 설정해서 처리함.)
    public void receive(EventPayload eventPayload) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        System.out.println("Subscriber - Received '" + objectMapper.writeValueAsString(eventPayload) + "'");
    }
}

 

  이제 마지막으로 EventPayload를 공개한다.

 

package com.preamtree.rabbitdemo.pubsub;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Map;
import java.util.Objects;

// publisher와 subscriber가 주고받는 class.
public class EventPayload {
    @JsonProperty("event_name")
    private String eventName;
    private Map<String, Object> data;

    public String getEventName() {
        return eventName;
    }

    public void setEventName(String eventName) {
        this.eventName = eventName;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public void setData(Map<String, Object> data) {
        this.data = data;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        EventPayload that = (EventPayload) o;
        return Objects.equals(eventName, that.eventName) &&
                Objects.equals(data, that.data);
    }

    @Override
    public int hashCode() {
        return Objects.hash(eventName, data);
    }
}

 

 이 클래스는 구현하는 사람 맘대로 구현하는 것이다. 다만 Publisher와 Subscriber가 다른 앱일 경우가 대부분이므로, 상호간에 약속은 반드시 필요할 것이다. 이 예제에서는 String field 1개와, Map field 1개로 처리했다. 혹시 몰라서 Equals와 HashCode도 꼼꼼하게 구현했다.

 

이제 앱을 실행해보면 아래와 같은 결과가 나온다. 앱을 실행할때는 실행 환경 변수로 spring.profiles.active 값을 꼭 설정해줘야 한다. 아래 결과는 sender, receiver로 설정했을 때의 결과이다.

 

실행결과 콘솔 스크린샷

 

전체 프로젝트 Github 링크는 여기이다.

 

 

 

 

-끝-

 

 

 

출처 및 참고

https://www.rabbitmq.com/

 

 

 

 

«   2021/10   »
          1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31            
글 보관함
Total
743,149
Today
51
Yesterday
236