Bỏ qua, đến nội dung

Hướng dẫn cài đặt — Queue Adapter Redis

Yêu cầu

  • Server Digiforce đang chạy
  • Redis server >= 5.0 (yêu cầu Streams + Consumer Groups)

Bước 1: Kích hoạt Plugin

Vào Settings → Plugin Manager, tìm plugin-queue-adapter-redis và bật.

Bước 2: Cấu hình biến môi trường

Đặt hai biến bắt buộc:

bash
QUEUE_ADAPTER=redis
QUEUE_ADAPTER_REDIS_URL=redis://localhost:6379/0

Quan trọng: QUEUE_ADAPTER=redis là điều kiện tiên quyết. Nếu thiếu, plugin không kích hoạt adapter.

Tất cả biến môi trường

BiếnMặc địnhMô tả
QUEUE_ADAPTER(bắt buộc)Đặt redis
QUEUE_ADAPTER_REDIS_URL(bắt buộc)URL kết nối Redis
QUEUE_CONNECTION_RETRY_MAX_ATTEMPTS20Số lần retry kết nối
QUEUE_CONNECTION_RETRY_DELAY200Delay giữa các lần retry (ms)

Bước 3: Khởi động lại server

Kiểm tra log:

  • Kết nối thành công: redis queue channel (<channel>) client connected
  • Kết nối thất bại: can not connect redis at url: ...

Cách hoạt động chi tiết

Consumer Groups

Plugin sử dụng Consumer Group DIGIFORCE_APP với clientId = app.instanceId. Mỗi instance trong cluster là một consumer trong cùng group, đảm bảo mỗi message chỉ được xử lý bởi 1 instance.

Subscribe — tạo consumer

Khi subscribe một channel:

  1. Tạo Redis client riêng cho channel đó
  2. Tạo Consumer Group (XGROUP CREATE) nếu chưa tồn tại (MKSTREAM)
  3. Bắt đầu vòng lặp consume

Vòng lặp consume

Xử lý message

Mỗi message được xử lý:

  1. Parse content (JSON) và options
  2. Gọi event.process(content, { id, retried, signal: AbortSignal.timeout(timeout) })
  3. Nếu thành công → XACK + XDEL
  4. Nếu thất bại và retried < maxRetries → re-publish message mới với retried + 1
  5. Cuối cùng luôn XACK + XDEL message gốc

Publish message

XADD <channel> * content "<json>" options "<json>" MAXLEN ~ 100000

Stream tự động trim giữ khoảng 100,000 entries.

Busy consumer — xử lý khi quá tải

Nếu event.idle() trả về false (consumer đang xử lý đủ job):

  1. Các message đã đọc được ACK + XDEL rồi re-publish (đẩy lại cuối stream)
  2. Consumer chờ interval ms rồi thử lại

Nếu concurrency slots đã đầy:

  • Consumer chờ tất cả reading hiện tại hoàn thành trước khi đọc thêm

Graceful Shutdown

Khi server tắt:

  1. Đặt ready = false → tất cả vòng lặp consume dừng
  2. Chờ tất cả job đang xử lý hoàn thành (processing)
  3. Đóng tất cả Redis client

Lưu ý triển khai

  • Mỗi subscription tạo Redis client riêng — 5 queues = 5 connections
  • Redis >= 5.0 (yêu cầu Streams + Consumer Groups)
  • Message sau khi xử lý bị xóa (XDEL) — không giữ lại history
  • Nếu consumer tắt đột ngột, message chưa ACK sẽ nằm trong PEL (Pending Entries List) — cần xử lý bằng XCLAIM thủ công