IT/Spring & Spring Boot

[IT/일상/Java] MQTT 통신 코드 개선 일지 (feat. QueueChannel, Future)

땅일단 2024. 12. 14. 23:00

배경

현재 프로젝트에서 장치와 통신하는 데에 MQTT를 사용하고 있다.

MQTT가 뭔지에 대해서는 아래를 참고하자.

 

 

[IT] MQTT 프로토콜에 대해

MQTT란?MQTT는 Publisher, Broker, Subscriber의 세 가지 분류로 이루어진 프로토콜이다.Publisher가 Broker 에게 특정 Topic으로 메시지를 보내면, Subscriber는 Broker로부터 받고 싶은 Topic만을 구독하여 메시지

doringri.tistory.com

 

아무튼, 우리 장치는 어떤 토픽으로 명령에 대한 메시지를 보내면 장치가 다른 토픽을 통해 그 명령이 잘 수행됐는지 메시지를 주는 구조이다.

문제는 클라이언트가 HTTP로 명령 요청을 보내면 그 명령이 성공했는지에 대한 결과를 백엔드에서 응답으로 줘야 된다는 것이다. 또한 이 요청은 여러 사용자가 동시다발적으로 보낼 수 있다.

 

즉 구조가 아래와 같다.

 

이것을 처리하기 위해 기존에는 QueueChannel을 이용했었다.

"장치 켜줘" 라는 요청이 들어오면 ConcurrentHashMap (멀티스레드를 지원하므로) 에 해당 메시지에 대한 고유 key를 저장해놓는다.

그리고 "켰어" 라는 MQTT 메시지를 받으면 MQTT 핸들러를 통해 OutputChannel을 QueueChannel로 설정한다.

 

그러면 이제 해시맵에 저장된 고유 key를 이용하여 QueueChannel에서 "켰어" 라는 메시지를 빼내고, 해시맵에서도 삭제한다.

@Bean
@ServiceActivator(inputChannel="mqttInputChannel", outputChannel="queueChannel")
public MessageHandler MqttHandler() {
    return message;
}

@Bean
QueueChannel queueChannel() {
    return new QueueChannel(10);
}

 

ServiceActivator를 비롯한 Spring Integeration은 아래 블로그에 잘 정리되어 있다.

https://sup2is.github.io/2020/08/12/what-is-spring-integration.html

 

아무튼 QueueChannel의 문제점은, 선입선출의 구조상 큐에 잘못된 메시지가 들어갔을 경우 그 뒤에 오는 메시지들이 제대로 처리되지 못했고, 큐에 용량이 존재했기에 가득 찼을 경우 올바르게 동작하지 못한다.

무엇보다 잘 쓰이지 않는 방식인지라 자료를 찾기가 쉽지 않았다.

 

 

Future 객체

위의 QueueChannel을 이용하는 방식은 내가 만든 코드였는데, 위의 문제 때문에 선임분께서 코드를 수정해주셨다.

바로 Java의 Future 객체를 이용하는 방식이다.

 

Future 객체는 비동기 작업의 결과를 처리하는 인터페이스이다.

보통은 향상된 Future인 CompletableFuture라는 강력한 기능의 객체를 주로 이용한다. Java 8부터 추가되었다고 한다.

우리 프로젝트에서는 수동 완료 기능이 필요했기에, CompletableFuture에만 있는 .complete() 메소드를 사용해야 한다.

 

CompletableFuture는 비동기 작업을 위한 다양한 메소드를 지원하는데, 

thenApply(), thenAccept() 등을 통해 작업 완료 시 처리할 동작을 정의할 수도 있으며,

orTimeout() 을 통해 설정된 시간 안에 작업을 끝내지 못할 경우 예외를 발생시킬 수 있다.

thenCompose()로 여러 작업을 이어서 실행할 수 있다.

 

여러모로 자바스크립트의 Promise 객체와 비슷한 느낌이다. 하지만 자바스크립트는 싱글 스레드 기반이라는 점이 가장 큰 차이점인 것 같다.

 

 

개선

방식은 아래와 같이 바뀌었다.

 

  1. "장치 켜줘" 메시지를 보내면 ConcurrentHashMap에 고유 keyCompletableFuture 객체를 저장
  2. 요청보낸 메소드에서 CompletableFuture 객체의 complete() 메소드가 실행될 때까지 .orTimeout() 을 통해 대기
  3. "켰어" 메시지가 들어오면 핸들러에서 .complete() 메소드로 비동기 작업의 완료 알림 및 해시맵에서 삭제
  4. 만약 .orTimeout()에서 설정된 시간이 되도록 오지 않으면 예외 처리 후 해시맵에서 삭제

 

기존 방법보다 좀 더 직관적이고 부작용 없는 코드가 된 것 같다.