Processing Jobs

import { Worker } from 'groupmq';

const worker = new Worker({
  queue,
  async handler(job) {
    // Your work here
    return `processed ${job.id}`;
  },
});

worker.on('completed', (job) => {
  console.log('completed', job.id);
});

worker.on('failed', (job) => {
  console.log('failed', job.id, job.failedReason);
});

worker.run();

When your Queue is created with a generic type, the Worker handler automatically gets full type inference for job.data.

type Task = { id: string; ms: number };

const queue = new Queue<Task>({ redis, namespace: 'tasks' });

new Worker({
  queue,
  async handler(job) {
    job.data.id; // string
    job.data.ms; // number
  },
}).run();
  • queue (required): The Queue<T> instance to consume jobs from.
  • handler (required): Async function that processes a reserved job. Return value is recorded as the job result.
  • name: Friendly name used in logs and metrics.
  • heartbeatMs: How often to extend the job visibility timeout. Default: queue.jobTimeoutMs / 3 (min 1000ms).
  • onError: Callback for unexpected errors in the worker loop or heartbeats.
  • maxAttempts: Per-worker cap on retries (cannot exceed job/queue limits). Default: queue.maxAttemptsDefault.
  • backoff: Function (attempt) => milliseconds to compute retry delay. Default: exponential with jitter up to 30s.
  • enableCleanup: Periodically run maintenance (cleanup, promote delayed, process repeating). Default: true.
  • cleanupIntervalMs: Interval for maintenance tasks. Default: 60_000 ms.
  • blockingTimeoutSec: Max seconds to block when waiting for jobs. Default: 5.
  • atomicCompletion: Try to atomically complete a job and immediately reserve the next from the same group to preserve FIFO. Default: true.
  • logger: Pass true for basic logging or a custom Logger instance.