API - Worker
Worker
Section titled “Worker”The Worker pulls jobs from a Queue
, sends heartbeats, applies retries/backoff, and optionally chains jobs atomically within the same group to preserve FIFO.
import Redis from 'ioredis'
import { Queue, Worker } from 'groupmq'
const redis = new Redis()
const queue = new Queue<{ foo: string }>({ redis, namespace: 'demo' })
const worker = new Worker<{ foo: string }>({
queue,
heartbeatMs: 10_000,
maxAttempts: 5,
backoff: (attempt) => Math.min(30_000, 2 ** (attempt - 1) * 500),
async handler(job) {
// do work with job.data
return { ok: true }
},
})
Constructor
Section titled “Constructor”new Worker({ queue, handler, heartbeatMs?, maxAttempts?, backoff?, ... })
Options
Section titled “Options”queue
- aQueue
instancehandler(job)
- async function receiving a reserved jobheartbeatMs
- heartbeat interval; defaults toqueue.jobTimeoutMs / 3
maxAttempts
- worker-level cap (defaults to queue default)backoff(attempt)
- function returning ms to wait before retryingenableCleanup
- periodically runs cleanup and cron promotions (default true)cleanupIntervalMs
- cadence for cleanup/promotions (default 60s)blockingTimeoutSec
- reserve blocking timeout (default 5s)atomicCompletion
- complete and try to reserve next in-group (default true)concurrency
- number of jobs to process concurrently (default 1)logger
-true
to enable default logs or pass a Logger
### Reserve and completion
Workers reserve jobs using `reserveBlocking` and `reserveBatch` (for concurrency > 1), send heartbeats, and on success attempt atomic in-group chaining via `completeAndReserveNext`. When `concurrency > 1`, the worker can process multiple jobs simultaneously while maintaining per-group FIFO ordering.
```ts
await worker.run()
Events
Section titled “Events”ready
completed
-(job)
failed
-(job)
error
-(error)
closed
ioredis:close
graceful-timeout
-(job)
worker.on('ready', () => console.log('worker ready'))
worker.on('completed', (job) => console.log('done', job.id))
worker.on('failed', (job) => console.log('failed', job.id, job.failedReason))
worker.on('graceful-timeout', (job) => console.warn('timed out', job.id))
worker.on('error', (err) => console.error('worker error', err))
Methods
Section titled “Methods”run()
- start processingclose(gracefulTimeoutMs?)
- stop and wait for current jobgetCurrentJob()
isProcessing()
getWorkerMetrics()
await worker.run()
// later
const metrics = worker.getWorkerMetrics()
console.log(metrics.currentJobId, metrics.blockingStats)
const current = worker.getCurrentJob()
if (current) console.log('processing', current.job.id, current.processingTimeMs)
await worker.close(30_000)
Behavior overview
Section titled “Behavior overview”- Reserves jobs via
reserveBlocking
(single job) orreserveBatch
(multiple jobs when concurrency > 1) - Emits events for lifecycle changes (
ready
,completed
,failed
,error
, etc.) - Sends heartbeats to extend visibility timeouts while processing
- On success: emits
completed
and records completion metadata - On failure: applies backoff, emits
failed
, records attempt/final failure, and dead-letters when attempts are exhausted - Attempts atomic in-group chaining with
completeAndReserveNext
to minimize reordering - When
concurrency > 1
: processes multiple jobs simultaneously while respecting per-group FIFO constraints