API - Queue

GroupMQ exposes a BullMQ-like Queue API with group-aware FIFO semantics and optional ordering delay. Below are all public methods with a brief explanation and a practical example for each.

import Redis from 'ioredis'
import { Queue } from 'groupmq'

const redis = new Redis()
const queue = new Queue<{ foo: string }>({
  redis,
  namespace: 'demo',
  jobTimeoutMs: 30_000,
  maxAttempts: 3,
  orderingDelayMs: 0,
  schedulerLockTtlMs: 1500, // (optional) default: 1500ms, controls min repeat interval
})

Add a job to a group. If repeat is provided, registers a repeating job definition.

// One-off job
await queue.add({ groupId: 'user:1', data: { foo: 'bar' } })

// Delayed job (run after 5s)
await queue.add({ groupId: 'user:1', data: { foo: 'baz' }, delay: 5000 })

// Cron job (every minute)
await queue.add({
  groupId: 'reports',
  data: { type: 'daily' },
  repeat: { pattern: '* * * * *' },
})

Reserve the next available job immediately (non-blocking). Returns null if none.

const job = await queue.reserve()
if (job) {
  // process
}

Block for up to timeoutSec seconds waiting for the next available job, adapting timeouts to activity and delayed jobs.

// Wait up to 5s for a job
const job = await queue.reserveBlocking(5)
if (job) {
  // process
}

Reserve up to maxBatch jobs (one per available group) atomically. Used internally by workers with concurrency > 1 for efficient job batching.

// Get up to 4 jobs from different groups
const jobs = await queue.reserveBatch(4)
for (const job of jobs) {
  // process each job
}

Mark a reserved job as completed.

await queue.complete({ id: job.id, groupId: job.groupId })

completeAndReserveNext(completedJobId, groupId)

Section titled “completeAndReserveNext(completedJobId, groupId)”

Atomically complete the current job and try to reserve the next job in the same group to minimize overtaking by other workers.

const next = await queue.completeAndReserveNext(job.id, job.groupId)
if (next) {
  // process next from same group
}

Requeue a job with optional backoff (ms). Returns -1 if queue-level max attempts reached.

await queue.retry(job.id, 2000)

Replace the job payload in Redis.

await queue.updateData(job.id, { foo: 'updated' })

Change or clear a job’s delay. Use promote to force immediate execution.

await queue.changeDelay(job.id, 10_000) // push 10s into the future
await queue.promote(job.id) // or promote immediately

Promote a delayed job to run now.

await queue.promote(job.id)

Remove a job regardless of state (waiting, delayed, or processing).

await queue.remove(job.id)

Remove finished jobs of a status older than graceTimeMs. Status is one of 'completed' | 'failed' | 'delayed'.

// Remove completed older than 1h
await queue.clean(60 * 60 * 1000, Number.MAX_SAFE_INTEGER, 'completed')

// Remove failed older than 1h
await queue.clean(60 * 60 * 1000, Number.MAX_SAFE_INTEGER, 'failed')

// Remove delayed regardless of age (negative grace)
await queue.clean(-24 * 60 * 60 * 1000, Number.MAX_SAFE_INTEGER, 'delayed')

Remove the job from active processing and make it eligible for inspection or later replay (implementation stores metadata in job hash and removes unique mapping).

await queue.deadLetter(job.id, job.groupId)

Fetch a single job as a Job entity with enriched fields (compatible with BullBoard expectations).

const j = await queue.getJob(job.id)
console.log(j.status, j.attemptsMade, j.data)

Fetch jobs across multiple statuses. Best-effort ordering, primarily for dashboards (BullBoard).

const jobs = await queue.getJobsByStatus(['waiting', 'active', 'failed'], 0, 50)

getActiveCount() / getWaitingCount() / getDelayedCount()

Section titled “getActiveCount() / getWaitingCount() / getDelayedCount()”

Counts for active, waiting, and delayed jobs.

const [a, w, d] = await Promise.all([
  queue.getActiveCount(),
  queue.getWaitingCount(),
  queue.getDelayedCount(),
])

getActiveJobs() / getWaitingJobs() / getDelayedJobs()

Section titled “getActiveJobs() / getWaitingJobs() / getDelayedJobs()”

Lists of job IDs by state.

const ids = await queue.getWaitingJobs()

getUniqueGroups() / getUniqueGroupsCount()

Section titled “getUniqueGroups() / getUniqueGroupsCount()”

List or count of group IDs that currently have jobs.

const groups = await queue.getUniqueGroups()
const groupCount = await queue.getUniqueGroupsCount()

Fetch completed or failed jobs with metadata and return values.

const completed = await queue.getCompleted(20)
const failed = await queue.getFailed(20)

getCompletedJobs(limit?) / getFailedJobs(limit?)

Section titled “getCompletedJobs(limit?) / getFailedJobs(limit?)”

Convenience: return completed/failed as Job entities.

const finished = await queue.getCompletedJobs(10)

recordCompleted(…) / recordAttemptFailure(…) / recordFinalFailure(…)

Section titled “recordCompleted(…) / recordAttemptFailure(…) / recordFinalFailure(…)”

Low-level helpers the worker uses to persist attempt/final outcomes. Generally not needed in app code unless building custom processors.

// Example: manually record completion metadata
await queue.recordCompleted({ id: job.id, groupId: job.groupId }, 'ok', {
  processedOn: Date.now() - 100,
  finishedOn: Date.now(),
  attempts: job.attempts,
  maxAttempts: job.maxAttempts,
})

Move due repeating jobs into the queue. Workers call this periodically.

const moved = await queue.processRepeatingJobs()

Promote delayed jobs that are now ready. Workers call this periodically.

await queue.promoteDelayedJobs()

Recover groups that should be ready after ordering delay. Workers invoke this as a safety net.

await queue.recoverDelayedGroups()

Remove a repeating job definition and clean up instances.

await queue.removeRepeatingJob('reports', { pattern: '* * * * *' })

Pause or resume the queue globally.

await queue.pause()
const paused = await queue.isPaused()
await queue.resume()

Wait until there are no active, waiting, or delayed jobs, or until timeout.

const drained = await queue.waitForEmpty(60_000)

Extend a job’s visibility/lock. Usually handled by the worker heartbeat.

await queue.heartbeat({ id: job.id, groupId: job.groupId })

Close underlying Redis connections.

await queue.close()