API - Queue
GroupMQ exposes a BullMQ-like Queue API with group-aware FIFO semantics and optional ordering delay. Below are all public methods with a brief explanation and a practical example for each.
import Redis from 'ioredis'
import { Queue } from 'groupmq'
const redis = new Redis()
const queue = new Queue<{ foo: string }>({
redis,
namespace: 'demo',
jobTimeoutMs: 30_000,
maxAttempts: 3,
orderingDelayMs: 0,
schedulerLockTtlMs: 1500, // (optional) default: 1500ms, controls min repeat interval
})
add(opts)
Section titled “add(opts)”Add a job to a group. If repeat
is provided, registers a repeating job definition.
// One-off job
await queue.add({ groupId: 'user:1', data: { foo: 'bar' } })
// Delayed job (run after 5s)
await queue.add({ groupId: 'user:1', data: { foo: 'baz' }, delay: 5000 })
// Cron job (every minute)
await queue.add({
groupId: 'reports',
data: { type: 'daily' },
repeat: { pattern: '* * * * *' },
})
reserve()
Section titled “reserve()”Reserve the next available job immediately (non-blocking). Returns null
if none.
const job = await queue.reserve()
if (job) {
// process
}
reserveBlocking(timeoutSec?, blockUntil?)
Section titled “reserveBlocking(timeoutSec?, blockUntil?)”Block for up to timeoutSec
seconds waiting for the next available job, adapting timeouts to activity and delayed jobs.
// Wait up to 5s for a job
const job = await queue.reserveBlocking(5)
if (job) {
// process
}
reserveBatch(maxBatch?)
Section titled “reserveBatch(maxBatch?)”Reserve up to maxBatch
jobs (one per available group) atomically. Used internally by workers with concurrency > 1 for efficient job batching.
// Get up to 4 jobs from different groups
const jobs = await queue.reserveBatch(4)
for (const job of jobs) {
// process each job
}
complete({ id, groupId })
Section titled “complete({ id, groupId })”Mark a reserved job as completed.
await queue.complete({ id: job.id, groupId: job.groupId })
completeAndReserveNext(completedJobId, groupId)
Section titled “completeAndReserveNext(completedJobId, groupId)”Atomically complete the current job and try to reserve the next job in the same group to minimize overtaking by other workers.
const next = await queue.completeAndReserveNext(job.id, job.groupId)
if (next) {
// process next from same group
}
retry(jobId, backoffMs?)
Section titled “retry(jobId, backoffMs?)”Requeue a job with optional backoff (ms). Returns -1 if queue-level max attempts reached.
await queue.retry(job.id, 2000)
updateData(jobId, data)
Section titled “updateData(jobId, data)”Replace the job payload in Redis.
await queue.updateData(job.id, { foo: 'updated' })
changeDelay(jobId, newDelay)
Section titled “changeDelay(jobId, newDelay)”Change or clear a job’s delay. Use promote
to force immediate execution.
await queue.changeDelay(job.id, 10_000) // push 10s into the future
await queue.promote(job.id) // or promote immediately
promote(jobId)
Section titled “promote(jobId)”Promote a delayed job to run now.
await queue.promote(job.id)
remove(jobId)
Section titled “remove(jobId)”Remove a job regardless of state (waiting, delayed, or processing).
await queue.remove(job.id)
clean(graceTimeMs, limit, status)
Section titled “clean(graceTimeMs, limit, status)”Remove finished jobs of a status older than graceTimeMs
. Status is one of 'completed' | 'failed' | 'delayed'
.
// Remove completed older than 1h
await queue.clean(60 * 60 * 1000, Number.MAX_SAFE_INTEGER, 'completed')
// Remove failed older than 1h
await queue.clean(60 * 60 * 1000, Number.MAX_SAFE_INTEGER, 'failed')
// Remove delayed regardless of age (negative grace)
await queue.clean(-24 * 60 * 60 * 1000, Number.MAX_SAFE_INTEGER, 'delayed')
deadLetter(jobId, groupId)
Section titled “deadLetter(jobId, groupId)”Remove the job from active processing and make it eligible for inspection or later replay (implementation stores metadata in job hash and removes unique mapping).
await queue.deadLetter(job.id, job.groupId)
getJob(id)
Section titled “getJob(id)”Fetch a single job as a Job
entity with enriched fields (compatible with BullBoard expectations).
const j = await queue.getJob(job.id)
console.log(j.status, j.attemptsMade, j.data)
getJobsByStatus(statuses, start?, end?)
Section titled “getJobsByStatus(statuses, start?, end?)”Fetch jobs across multiple statuses. Best-effort ordering, primarily for dashboards (BullBoard).
const jobs = await queue.getJobsByStatus(['waiting', 'active', 'failed'], 0, 50)
getActiveCount() / getWaitingCount() / getDelayedCount()
Section titled “getActiveCount() / getWaitingCount() / getDelayedCount()”Counts for active, waiting, and delayed jobs.
const [a, w, d] = await Promise.all([
queue.getActiveCount(),
queue.getWaitingCount(),
queue.getDelayedCount(),
])
getActiveJobs() / getWaitingJobs() / getDelayedJobs()
Section titled “getActiveJobs() / getWaitingJobs() / getDelayedJobs()”Lists of job IDs by state.
const ids = await queue.getWaitingJobs()
getUniqueGroups() / getUniqueGroupsCount()
Section titled “getUniqueGroups() / getUniqueGroupsCount()”List or count of group IDs that currently have jobs.
const groups = await queue.getUniqueGroups()
const groupCount = await queue.getUniqueGroupsCount()
getCompleted(limit?) / getFailed(limit?)
Section titled “getCompleted(limit?) / getFailed(limit?)”Fetch completed or failed jobs with metadata and return values.
const completed = await queue.getCompleted(20)
const failed = await queue.getFailed(20)
getCompletedJobs(limit?) / getFailedJobs(limit?)
Section titled “getCompletedJobs(limit?) / getFailedJobs(limit?)”Convenience: return completed/failed as Job
entities.
const finished = await queue.getCompletedJobs(10)
recordCompleted(…) / recordAttemptFailure(…) / recordFinalFailure(…)
Section titled “recordCompleted(…) / recordAttemptFailure(…) / recordFinalFailure(…)”Low-level helpers the worker uses to persist attempt/final outcomes. Generally not needed in app code unless building custom processors.
// Example: manually record completion metadata
await queue.recordCompleted({ id: job.id, groupId: job.groupId }, 'ok', {
processedOn: Date.now() - 100,
finishedOn: Date.now(),
attempts: job.attempts,
maxAttempts: job.maxAttempts,
})
processRepeatingJobs()
Section titled “processRepeatingJobs()”Move due repeating jobs into the queue. Workers call this periodically.
const moved = await queue.processRepeatingJobs()
promoteDelayedJobs()
Section titled “promoteDelayedJobs()”Promote delayed jobs that are now ready. Workers call this periodically.
await queue.promoteDelayedJobs()
recoverDelayedGroups()
Section titled “recoverDelayedGroups()”Recover groups that should be ready after ordering delay. Workers invoke this as a safety net.
await queue.recoverDelayedGroups()
removeRepeatingJob(groupId, repeat)
Section titled “removeRepeatingJob(groupId, repeat)”Remove a repeating job definition and clean up instances.
await queue.removeRepeatingJob('reports', { pattern: '* * * * *' })
pause() / resume() / isPaused()
Section titled “pause() / resume() / isPaused()”Pause or resume the queue globally.
await queue.pause()
const paused = await queue.isPaused()
await queue.resume()
waitForEmpty(timeoutMs?)
Section titled “waitForEmpty(timeoutMs?)”Wait until there are no active, waiting, or delayed jobs, or until timeout.
const drained = await queue.waitForEmpty(60_000)
heartbeat(job, extendMs?)
Section titled “heartbeat(job, extendMs?)”Extend a job’s visibility/lock. Usually handled by the worker heartbeat.
await queue.heartbeat({ id: job.id, groupId: job.groupId })
close()
Section titled “close()”Close underlying Redis connections.
await queue.close()