API - Worker

The Worker pulls jobs from a Queue, sends heartbeats, applies retries/backoff, and optionally chains jobs atomically within the same group to preserve FIFO.

import Redis from 'ioredis'
import { Queue, Worker } from 'groupmq'

const redis = new Redis()
const queue = new Queue<{ foo: string }>({ redis, namespace: 'demo' })

const worker = new Worker<{ foo: string }>({
  queue,
  heartbeatMs: 10_000,
  maxAttempts: 5,
  backoff: (attempt) => Math.min(30_000, 2 ** (attempt - 1) * 500),
  async handler(job) {
    // do work with job.data
    return { ok: true }
  },
})
new Worker({ queue, handler, heartbeatMs?, maxAttempts?, backoff?, ... })
  • queue - a Queue instance
  • handler(job) - async function receiving a reserved job
  • heartbeatMs - heartbeat interval; defaults to queue.jobTimeoutMs / 3
  • maxAttempts - worker-level cap (defaults to queue default)
  • backoff(attempt) - function returning ms to wait before retrying
  • enableCleanup - periodically runs cleanup and cron promotions (default true)
  • cleanupIntervalMs - cadence for cleanup/promotions (default 60s)
  • blockingTimeoutSec - reserve blocking timeout (default 5s)
  • atomicCompletion - complete and try to reserve next in-group (default true)
  • concurrency - number of jobs to process concurrently (default 1)
  • logger - true to enable default logs or pass a Logger

### Reserve and completion

Workers reserve jobs using `reserveBlocking` and `reserveBatch` (for concurrency > 1), send heartbeats, and on success attempt atomic in-group chaining via `completeAndReserveNext`. When `concurrency > 1`, the worker can process multiple jobs simultaneously while maintaining per-group FIFO ordering.

```ts
await worker.run()
  • ready
  • completed - (job)
  • failed - (job)
  • error - (error)
  • closed
  • ioredis:close
  • graceful-timeout - (job)
worker.on('ready', () => console.log('worker ready'))
worker.on('completed', (job) => console.log('done', job.id))
worker.on('failed', (job) => console.log('failed', job.id, job.failedReason))
worker.on('graceful-timeout', (job) => console.warn('timed out', job.id))
worker.on('error', (err) => console.error('worker error', err))
  • run() - start processing
  • close(gracefulTimeoutMs?) - stop and wait for current job
  • getCurrentJob()
  • isProcessing()
  • getWorkerMetrics()
await worker.run()

// later
const metrics = worker.getWorkerMetrics()
console.log(metrics.currentJobId, metrics.blockingStats)

const current = worker.getCurrentJob()
if (current) console.log('processing', current.job.id, current.processingTimeMs)

await worker.close(30_000)
  • Reserves jobs via reserveBlocking (single job) or reserveBatch (multiple jobs when concurrency > 1)
  • Emits events for lifecycle changes (ready, completed, failed, error, etc.)
  • Sends heartbeats to extend visibility timeouts while processing
  • On success: emits completed and records completion metadata
  • On failure: applies backoff, emits failed, records attempt/final failure, and dead-letters when attempts are exhausted
  • Attempts atomic in-group chaining with completeAndReserveNext to minimize reordering
  • When concurrency > 1: processes multiple jobs simultaneously while respecting per-group FIFO constraints