eletrotupi / tcc / api/src/lib/queue/WorkerRegistry.ts master
1.7 KB Raw
import { Worker, WorkerOptions, Processor } from "bullmq";
import { getQueue, QueueName } from "@app/lib/queue/QueueRegistry";
import { mailProcessor } from "@app/lib/queue/processors/mail";
import { insightProcessor } from "@app/lib/queue/processors/insights";
import { InsightJobName } from "@app/lib/queue/types";

type ProcessorMap = Record<QueueName, Processor>;

const processorMap: ProcessorMap = {
  mail: mailProcessor,
  insights: insightProcessor,
};

// Per-queue concurrency
// TODO: When we process images, we need to bump for that queue
const concurrencyMap: Record<QueueName, number> = {
  mail: 10,
  insights: 5,
};

const workers: Worker[] = [];

export function bootWorkers(): void {
  console.log("Booting workers...");
  for (const [name, processor] of Object.entries(processorMap)) {
    const queueName = name as QueueName;

    console.log(`Worker ${queueName} booting...`);
    const worker = new Worker(queueName, processor, {
      connection: {
        host: process.env.VALKEY_HOST,
        port: Number(process.env.VALKEY_PORT),
      },
      concurrency: concurrencyMap[queueName],
    } satisfies WorkerOptions);

    worker.on("failed", (job, err) => {
      console.error(`[${queueName}] job ${job?.id} failed:`, err.message);
    });

    workers.push(worker);
  }
}

export async function scheduleInsights(): Promise<void> {
  const queue = getQueue("insights");
  await queue.add(
    InsightJobName.FanOut,
    {},
    {
      repeat: { pattern: "0 3 * * 1" }, // Mondays at 3am
      jobId: "insight-fan-out-weekly", // prevents duplicate repeatable jobs on restart
    },
  );
}

export async function closeAllWorkers(): Promise<void> {
  await Promise.all(workers.map((w) => w.close()));
}