api: initial draft for queues
Parents:
17fb5225 file(s) changed
- api/src/lib/queue/QueueRegistry.ts +30 -0
- api/src/lib/queue/WorkerRegistry.ts +41 -0
- api/src/lib/queue/index.ts +5 -0
- api/src/lib/queue/processors/mail.ts +28 -0
- api/src/lib/queue/types.ts +12 -0
api/src/lib/queue/QueueRegistry.ts
@@ -0,0 +1,30 @@
1 + import { Queue, QueueOptions } from 'bullmq';
2 +
3 + export type QueueName = 'mail'; // TODO: add image here later on
4 +
5 + const queues = new Map<QueueName, Queue>();
6 +
7 + const defaultOpts: QueueOptions = {
8 + connection: {
9 + host: process.env.VALKEY_HOST,
10 + port: Number(process.env.VALKEY_PORT)
11 + },
12 + defaultJobOptions: {
13 + attempts: 3,
14 + backoff: { type: 'exponential', delay: 2000 },
15 + removeOnComplete: { count: 100 },
16 + removeOnFail: { count: 500 },
17 + },
18 + };
19 +
20 + export function getQueue(name: QueueName): Queue {
21 + if (!queues.has(name)) {
22 + queues.set(name, new Queue(name, defaultOpts));
23 + }
24 +
25 + return queues.get(name)!;
26 + }
27 +
28 + export async function closeAllQueues(): Promise<void> {
29 + await Promise.all([...queues.values()].map(q => q.close()));
30 + }
api/src/lib/queue/WorkerRegistry.ts
@@ -0,0 +1,41 @@
1 + import { Worker, WorkerOptions, Processor } from 'bullmq';
2 + import { QueueName } from '@app/lib/queue/QueueRegistry';
3 + import { mailProcessor } from '@app/lib/queue/processors/mail';
4 +
5 + type ProcessorMap = Record<QueueName, Processor>;
6 +
7 + const processorMap: ProcessorMap = {
8 + mail: mailProcessor
9 + };
10 +
11 + // Per-queue concurrency
12 + // TODO: When we process images, we need to bump for that queue
13 + const concurrencyMap: Record<QueueName, number> = {
14 + mail: 10
15 + };
16 +
17 + const workers: Worker[] = [];
18 +
19 + export function bootWorkers(): void {
20 + for (const [name, processor] of Object.entries(processorMap)) {
21 + const queueName = name as QueueName;
22 +
23 + const worker = new Worker(queueName, processor, {
24 + connection: {
25 + host: process.env.VALKEY_HOST,
26 + port: Number(process.env.VALKEY_PORT)
27 + },
28 + concurrency: concurrencyMap[queueName],
29 + } satisfies WorkerOptions);
30 +
31 + worker.on('failed', (job, err) => {
32 + console.error(`[${queueName}] job ${job?.id} failed:`, err.message);
33 + });
34 +
35 + workers.push(worker);
36 + }
37 + }
38 +
39 + export async function closeAllWorkers(): Promise<void> {
40 + await Promise.all(workers.map(w => w.close()));
41 + }
api/src/lib/queue/index.ts
@@ -0,0 +1,5 @@
1 + export { getQueue, closeAllQueues } from '@app/lib/queue/QueueRegistry';
2 + export { bootWorkers, closeAllWorkers } from '@app/lib/queue/WorkerRegistry';
3 + export { MailJobName } from '@app/lib/queue/types';
4 +
5 + export type { MailJobData } from '@app/lib/queue/types';
api/src/lib/queue/processors/mail.ts
@@ -0,0 +1,28 @@
1 + import { Job } from 'bullmq';
2 + import {
3 + MailJobName, MailJobData,
4 + WelcomeEmailPayload,
5 + PasswordResetPayload,
6 + } from '@app/lib/queue/types';
7 +
8 + export async function mailProcessor(job: Job<MailJobData['data']>): Promise<void> {
9 + // job.name is the discriminant
10 + switch (job.name as MailJobName) {
11 + case MailJobName.WelcomeEmail: {
12 + const data = job.data as WelcomeEmailPayload;
13 + // await sendWelcomeEmail(data.userId);
14 +
15 + console.log("Dispatching an welcome email", data);
16 + break;
17 + }
18 +
19 + case MailJobName.PasswordReset: {
20 + const data = job.data as PasswordResetPayload;
21 + // await sendPasswordResetEmail(data.userId, data.token);
22 + break;
23 + }
24 +
25 + default:
26 + throw new Error(`Unknown mail job: ${job.name}`);
27 + }
28 + }
api/src/lib/queue/types.ts
@@ -0,0 +1,12 @@
1 + // Mail queue
2 + export enum MailJobName {
3 + WelcomeEmail = 'mail:welcome',
4 + PasswordReset = 'mail:password-reset',
5 + }
6 +
7 + export type WelcomeEmailPayload = { userId: string };
8 + export type PasswordResetPayload = { userId: string; token: string };
9 +
10 + export type MailJobData =
11 + | { name: MailJobName.WelcomeEmail; data: WelcomeEmailPayload }
12 + | { name: MailJobName.PasswordReset; data: PasswordResetPayload };