API - Worker
Worker
Section titled “Worker”The Worker pulls jobs from a Queue, sends heartbeats, applies retries/backoff, and optionally chains jobs atomically within the same group to preserve FIFO.
import Redis from 'ioredis'
import { Queue, Worker } from 'groupmq'
const redis = new Redis()
const queue = new Queue<{ foo: string }>({ redis, namespace: 'demo' })
const worker = new Worker<{ foo: string }>({
queue,
heartbeatMs: 10_000,
maxAttempts: 5,
backoff: (attempt) => Math.min(30_000, 2 ** (attempt - 1) * 500),
async handler(job) {
// do work with job.data
return { ok: true }
},
})
Constructor
Section titled “Constructor”new Worker({ queue, handler, heartbeatMs?, maxAttempts?, backoff?, ... })
Options
Section titled “Options”queue- aQueueinstancehandler(job)- async function receiving a reserved jobheartbeatMs- heartbeat interval; defaults toqueue.jobTimeoutMs / 3maxAttempts- worker-level cap (defaults to queue default)backoff(attempt)- function returning ms to wait before retryingenableCleanup- periodically runs cleanup and cron promotions (default true)cleanupIntervalMs- cadence for cleanup/promotions (default 60s)blockingTimeoutSec- reserve blocking timeout (default 5s)atomicCompletion- complete and try to reserve next in-group (default true)concurrency- number of jobs to process concurrently (default 1)logger-trueto enable default logs or pass a Logger
Stalled Job Detection (new):
stalledInterval- check interval for stalled jobs in ms (default: 30000)maxStalledCount- max times a job can stall before failing (default: 1)stalledGracePeriod- grace period before marking as stalled in ms (default: 0)
### Reserve and completion
Workers reserve jobs using `reserveBlocking` and `reserveBatch` (for concurrency > 1), send heartbeats, and on success attempt atomic in-group chaining via `completeAndReserveNext`. When `concurrency > 1`, the worker can process multiple jobs simultaneously while maintaining per-group FIFO ordering.
```ts
await worker.run()
Events
Section titled “Events”readycompleted-(job)failed-(job)error-(error)closedioredis:closegraceful-timeout-(job)stalled-(jobId, groupId)- emitted when a stalled job is detected
worker.on('ready', () => console.log('worker ready'))
worker.on('completed', (job) => console.log('done', job.id))
worker.on('failed', (job) => console.log('failed', job.id, job.failedReason))
worker.on('graceful-timeout', (job) => console.warn('timed out', job.id))
worker.on('error', (err) => console.error('worker error', err))
worker.on('stalled', (jobId, groupId) => console.warn('job stalled', jobId, groupId))
Methods
Section titled “Methods”run()- start processingclose(gracefulTimeoutMs?)- stop and wait for current jobgetCurrentJob()isProcessing()getWorkerMetrics()
await worker.run()
// later
const metrics = worker.getWorkerMetrics()
console.log(metrics.currentJobId, metrics.blockingStats)
const current = worker.getCurrentJob()
if (current) console.log('processing', current.job.id, current.processingTimeMs)
await worker.close(30_000)
Behavior overview
Section titled “Behavior overview”- Reserves jobs via
reserveBlocking(single job) orreserveBatch(multiple jobs when concurrency > 1) - Emits events for lifecycle changes (
ready,completed,failed,error, etc.) - Sends heartbeats to extend visibility timeouts while processing
- On success: emits
completedand records completion metadata - On failure: applies backoff, emits
failed, records attempt/final failure, and dead-letters when attempts are exhausted - Attempts atomic in-group chaining with
completeAndReserveNextto minimize reordering - When
concurrency > 1: processes multiple jobs simultaneously while respecting per-group FIFO constraints - Handles ordering methods:
'none': Processes jobs immediately inorderMsorder'scheduler': Respects Redis buffering window (requires scheduler enabled)'in-memory': Collects jobs during grace period before processing batch- Supports
orderingMaxWaitMultiplier(max total wait time) - Supports
orderingGracePeriodDecay(progressive wait reduction)
- Supports
Stalled Job Recovery
Section titled “Stalled Job Recovery”Workers automatically detect and recover stalled jobs (jobs whose worker crashed or lost connection):
- Background checker runs every
stalledInterval(default: 30s) - Detects jobs past their deadline +
stalledGracePeriod - Recovers jobs by moving them back to waiting state
- After
maxStalledCountstalls, jobs are permanently failed - Emits
stalledevent for monitoring and alerting
const worker = new Worker({
queue,
handler: async (job) => { /* ... */ },
stalledInterval: 30000, // Check every 30 seconds
maxStalledCount: 1, // Fail after 1 stall
stalledGracePeriod: 0, // No grace period
})
worker.on('stalled', (jobId, groupId) => {
console.warn(`Job ${jobId} stalled and recovered`)
metrics.increment('jobs.stalled')
})
This ensures jobs aren’t lost when workers crash unexpectedly.
See the Ordering Methods guide for detailed configuration options including advanced settings like grace period decay.