메시지 브로커(2) BULL

뜨뜨미지근한물
8 min readJan 9, 2022

--

Bull is a Node library that implements a fast and robust queue system based on Redis.

Although it is possible to implement queues directly using Redis commands, this library provides an API that takes care of all the low-level details and enriches Redis basic functionality so that more complex use-cases can be handled easily.

루비온레일즈의 사이드킥 (Sidekiq) 처럼, NodeJs에서 레디스를 기반으로 한 비동기 작업을 처리할 수 있도록 도와준다.

Redis Queue를 활용하는 라이브러리 중에서는 가장 높은 사용률을 보이고 있으며, (npm trends) 요즘 핫한 프레임워크인 NestJs에서도 공식적인 비동기 처리 라이브러리로 활용하고 있다.

BULL

: Redis를 활용해 메세지 브로커를 구현한 NodeJs Library로, 대표적인 특징은 다음과 같다.

  • 레디스 List를 Queue로 활용하면서, 메시지 큐 기능을 구현.
  • redis에 대한 command는 file로 작성해 놓은 lua 스크립트를 가져와 실행하면서 처리.
  • Producer (큐 job을 만드는 책임 ~ before Queue),
    Consumer (큐 job을 가져와 처리하는 책임 ~ after Queue),
    Event Listener (이벤트 콜백으로 각 액션마다 후처리 수행)
    의 3 관점에서 복잡한 메시지 큐 기능 구현

Cf. 메시지 브로커로써 Redis?!

Sidekiq이나 Bull 에서는 메시지 브로커라는 말은 없고, Queue를 활용한 비동기 작업 처리를 도와주는 라이브러리로 소개되고 있다.

마이크로 서비스간의 통신 프로토콜 모듈로써의 기능이 부족하고. 메모리 디비에 메시지가 적재되면서, 메시지의 지속성 또한 없기 때문인 듯 하다.

그러나 메시지 패턴을 구현하는데 요구되는 Publish/Subscribe 패턴과 Message Queue 패턴은, 레디스의 펍섭과 큐(List)를 활용해 구현할 수 있다.

메시지 지향 미들웨어(MOM)이 일반적으로 비동기 메시지 전달에 기초한 것을 가리킨다. 는 관점에 입각해, Bull에 대한 글을 작성해 보겠다.

참고)

https://ko.wikipedia.org/wiki/%EB%A9%94%EC%8B%9C%EC%A7%80_%EB%B8%8C%EB%A1%9C%EC%BB%A4

BULL 조금 더 파보기

Queue & Job 관점

: BULL을 쓰면서, 표면적으로 Queue와 Job 객체를 활용하게 된다.

1, Queue: 큐 관련 기능을 수행하는 객체

  • Node EventEmitter를 상속받아, event callback에 Redis Pub/Sub 패턴을 붙여서 활용 가능 (이걸 AOP로 볼 수 있을까?)
  • 레디스의 기능을 활용하기 위해 3개의 커넥션을 가져 기능 수행
  • delay 기능 (큐 유틸)을 위해, NodeJs에서 setTimeOut 타이머를 활용
  • 각 큐마다 이름을 부여해, 복수의 큐 생성 및 활용 가능

2, Job: 메시지(job) 관련 기능을 수행하는 객체

  • Queue에 들어가게 되는 메시지로써, Redis hash 자료구조로 메시지 내용이 생성됨
  • 생성된 잡은 메시지 큐를 구성하는 여러 자료구조에 Id가 담겨 처리됨.
    - job이 enqueue되면 wait list에 들어가고.
    - 복수의 컨슈머는 wait list를 block pop (폴링과 유사) 하면서,
    들어온 job을 active list로 넘기고 처리를 시작함
    - delay와 우선순위 처리를 위해 Delay와 Priority라는 zset 활용
BULL의 Message Queue 구현을 위한 자료 구조 / 실제 레디스에 저장되는 구조 (ft medis)
  • 잡에 이름을 부여해, 하나의 큐 내부에서 작업별로 컨슈머 지정 가능

Producer & Consumer 관점

: 메시지를 생성하고 소비하는 관점에서 수행되는 로직은 Queue 객체의 메서드로 수행된다. 각각 어떤 기능을 갖고 어떤 로직으로 돌아가는지 보면 다음과 같다.

1, Producer: Queue.add() 메서드로 job을 생성하고 enqueue 함.
( Redis의 Wait List에 넣음 )

기본적인 bull enqueue 메서드

기능

  • attempts & backoff: 컨슈머가 잡을 실패하는 경우, 수행하는 재시도에 대해 각각 횟수와 재시도 정책을 설정할 수 있다.
    - 여기서 retry 횟수 모두를 실패하면, Redis failed zset으로, 성공하면 Redis success zset으로 JobId가 넘어간다.
    - 재시도 정책은, 일정시간마다 재시작 / 2 * 재시도 횟수 시간마다 재시작
    / 사용자가 직접 설정한 재시작 정책을 적용할 수 있다.
  • priority: add() 옵션으로 설정 가능
    - 오름차순으로 priority zset에 저장되며, 낮을수록 우선순위가 높아진다.
  • delay: add() 옵션으로 설정 가능
    - 오름차순으로 priority zset에 저장되며, Date()를 ms 로 전환해 기입
    - NodeJs에서 설정한 타이머가 돌 때, 지나거나 같은 작업을 수행한다.
    ( NodeJs 이벤트 루프가 timers 큐 처리하는 방식과 흡사?! )

2, Consumer: Queue.process() 메서드로 job을 가져와 처리함.
( Redis의 Wait List → Active List → Success / Failed Zset)

기본적인 bull pop 메서드

기능

  • args1: Queue에 담기는 job의 이름을 규정해, 특정 job만 처리하는 컨슈머 설정이 가능
  • args2: 동시성 처리
    - promise로 백그라운드 스레드가 processJob 함수를 수행하게끔 처리.
bull 내부의 동시성 처리

로직 ~ 아래의 프로세스가 내부 메서드로 구현 → processJobs
이 비동기 함수가 재귀를 돌면서, 큐에 대한 지속적인 subscribe 수행

  • (2) redis의 Wait List → Active List로 job 옮김 ( Redis BRPOPLPUSH )
    - BRPOPLPUSH으로 기본 5초 동안 롱 폴링하는 효과
    - 참고) 레디스 고급 커맨드
    - * Blocking: 자기 작업을 수행하다 다른 주체의 작업이 시작하면, 이를 끝까지 기다렸다가, 다시 자기 작업을 수행하는 작업
  • (3) 작업의 중복 처리를 막기 위해, 특정 컨슈머가 가져온 job에는 lock을 걸어 놓음 (기본 30초) ~ SQS 의 visibility timeout과 유사
  • Active 상태에서 특정시간 (lock 30초) 동안 처리 안되면, ( = 큐에서 작업 가져오고 잠적..) 큐에서 lock이 풀린 상태로 Active List에 남음
    ~ stalled job 으로 간주
  • (1) Active List의 stalled job이 있으면, 우선적으로 처리

참고자료

언급한 것 외에도 여러 기능이 있으며, 아래의 참고 자료를 통해 알아볼 수 있다. 공식 문서가 잘되어 있다!

--

--