Giao diện
Hướng dẫn cài đặt — Queue Adapter Redis
Yêu cầu
- Server Digiforce đang chạy
- Redis server >= 5.0 (yêu cầu Streams + Consumer Groups)
Bước 1: Kích hoạt Plugin
Vào Settings → Plugin Manager, tìm plugin-queue-adapter-redis và bật.
Bước 2: Cấu hình biến môi trường
Đặt hai biến bắt buộc:
bash
QUEUE_ADAPTER=redis
QUEUE_ADAPTER_REDIS_URL=redis://localhost:6379/0Quan trọng:
QUEUE_ADAPTER=redislà điều kiện tiên quyết. Nếu thiếu, plugin không kích hoạt adapter.
Tất cả biến môi trường
| Biến | Mặc định | Mô tả |
|---|---|---|
QUEUE_ADAPTER | (bắt buộc) | Đặt redis |
QUEUE_ADAPTER_REDIS_URL | (bắt buộc) | URL kết nối Redis |
QUEUE_CONNECTION_RETRY_MAX_ATTEMPTS | 20 | Số lần retry kết nối |
QUEUE_CONNECTION_RETRY_DELAY | 200 | Delay giữa các lần retry (ms) |
Bước 3: Khởi động lại server
Kiểm tra log:
- Kết nối thành công:
redis queue channel (<channel>) client connected - Kết nối thất bại:
can not connect redis at url: ...
Cách hoạt động chi tiết
Consumer Groups
Plugin sử dụng Consumer Group DIGIFORCE_APP với clientId = app.instanceId. Mỗi instance trong cluster là một consumer trong cùng group, đảm bảo mỗi message chỉ được xử lý bởi 1 instance.
Subscribe — tạo consumer
Khi subscribe một channel:
- Tạo Redis client riêng cho channel đó
- Tạo Consumer Group (
XGROUP CREATE) nếu chưa tồn tại (MKSTREAM) - Bắt đầu vòng lặp consume
Vòng lặp consume
Xử lý message
Mỗi message được xử lý:
- Parse
content(JSON) vàoptions - Gọi
event.process(content, { id, retried, signal: AbortSignal.timeout(timeout) }) - Nếu thành công →
XACK+XDEL - Nếu thất bại và
retried < maxRetries→ re-publish message mới vớiretried + 1 - Cuối cùng luôn
XACK+XDELmessage gốc
Publish message
XADD <channel> * content "<json>" options "<json>" MAXLEN ~ 100000Stream tự động trim giữ khoảng 100,000 entries.
Busy consumer — xử lý khi quá tải
Nếu event.idle() trả về false (consumer đang xử lý đủ job):
- Các message đã đọc được ACK + XDEL rồi re-publish (đẩy lại cuối stream)
- Consumer chờ
intervalms rồi thử lại
Nếu concurrency slots đã đầy:
- Consumer chờ tất cả reading hiện tại hoàn thành trước khi đọc thêm
Graceful Shutdown
Khi server tắt:
- Đặt
ready = false→ tất cả vòng lặp consume dừng - Chờ tất cả job đang xử lý hoàn thành (
processing) - Đóng tất cả Redis client
Lưu ý triển khai
- Mỗi subscription tạo Redis client riêng — 5 queues = 5 connections
- Redis >= 5.0 (yêu cầu Streams + Consumer Groups)
- Message sau khi xử lý bị xóa (
XDEL) — không giữ lại history - Nếu consumer tắt đột ngột, message chưa ACK sẽ nằm trong PEL (Pending Entries List) — cần xử lý bằng
XCLAIMthủ công