Key Features
Per‑Group FIFO
Section titled “Per‑Group FIFO”One in‑flight job per groupId
, strict ordering guaranteed. Jobs in the same group are processed sequentially, while different groups run in parallel for high throughput.
// Jobs in the same group execute sequentially
await queue.add({ groupId: 'user:1', data: { id: 'a', ms: 100 } });
await queue.add({ groupId: 'user:1', data: { id: 'b', ms: 100 } });
// Jobs in different groups may run in parallel across workers
await queue.add({ groupId: 'user:2', data: { id: 'c', ms: 100 } });
Under the hood, the worker uses atomic completion to immediately reserve the next job from the same group, minimizing group contention and preserving FIFO.
Order by Timestamp (orderMs
)
Section titled “Order by Timestamp (orderMs)”If orderMs
is provided, GroupMQ respects this timestamp to ensure jobs are processed in the correct order, even when producers are slightly out of sync. You can also set a global orderingDelayMs
on the queue to add a small delay that improves strict ordering.
await queue.add({
groupId: 'acct:42',
data: { event: 'withdrawal' },
orderMs: Date.now() - 1000, // earlier event
});
Idempotence
Section titled “Idempotence”Avoid duplicate side‑effects by providing a stable jobId
. GroupMQ de‑duplicates via an internal idempotence mapping and retains minimal metadata for inspection based on keepCompleted
/keepFailed
.
await queue.add({
groupId: 'order:123',
data: { step: 'charge' },
jobId: 'order:123:charge:v1',
});
Cron & Repeats
Section titled “Cron & Repeats”Schedule repeating jobs using fixed intervals or cron patterns. Repeats are materialized by the worker’s periodic cycle.
// Every 5 seconds
await queue.add({ groupId: 'cron', data: { id: 'tick', ms: 50 }, repeat: { every: 5000 } });
// Cron pattern (midnight)
await queue.add({ groupId: 'cron', data: { id: 'nightly', ms: 50 }, repeat: { pattern: '0 0 * * *' } });
Delayed Jobs
Section titled “Delayed Jobs”Run jobs in the future using delay
or a specific runAt
timestamp. You can also change a job’s delay later with changeDelay
.
await queue.add({ groupId: 'email', data: { id: 'welcome' }, delay: 2000 });
await queue.add({ groupId: 'email', data: { id: 'weekly' }, runAt: Date.now() + 7 * 86400_000 });
Retries & Failures
Section titled “Retries & Failures”Configure retry attempts at queue, job, or worker level. Failures are recorded with reasons and optional stack traces. Dead‑lettering occurs when attempts are exhausted.
// Queue default
const queue = new Queue({ redis, namespace: 'app', maxAttempts: 3 });
// Job override
await queue.add({ groupId: 'g1', data: { id: 'x' }, maxAttempts: 5 });
// Worker backoff
new Worker({
queue,
maxAttempts: 5,
backoff: (attempt) => Math.min(30_000, 2 ** (attempt - 1) * 500),
async handler(job) { /* ... */ },
}).run();
Pause & Resume
Section titled “Pause & Resume”Temporarily stop processing without losing state.
await queue.pause();
// ... do maintenance ...
await queue.resume();
Graceful Shutdown
Section titled “Graceful Shutdown”Stop workers without dropping in‑flight jobs. The worker waits for the current job or emits a graceful-timeout
event.
await worker.close(30_000); // wait up to 30s for the current job
Adaptive Blocking & Low Redis Load
Section titled “Adaptive Blocking & Low Redis Load”Workers use adaptive blocking reservations and recovery routines (e.g., promoting delayed jobs, recovering delayed groups) to reduce Redis churn while staying responsive under load.
BullMQ‑like API Shapes
Section titled “BullMQ‑like API Shapes”Common method names and Job shapes align with BullMQ conventions where sensible (e.g., counts, status lists), easing integration with dashboards and tooling.