api: introduce queues and workers for insights

Pedro Lucas Porcellis porcellis@eletrotupi.com 4 days ago c4e059e5c5f2c31a451ad9b27204e46056841ed3
Parents: 7345b48
10 file(s) changed
  • api/src/lib/queue/QueueRegistry.ts +5 -5
  • api/src/lib/queue/WorkerRegistry.ts +26 -8
  • api/src/lib/queue/index.ts +8 -4
  • api/src/lib/queue/processors/insights/energySleep.ts +100 -0
  • api/src/lib/queue/processors/insights/index.ts +62 -0
  • api/src/lib/queue/processors/insights/moodTrend.ts +81 -0
  • api/src/lib/queue/processors/insights/triggerPattern.ts +87 -0
  • api/src/lib/queue/processors/insights/utils.ts +10 -0
  • api/src/lib/queue/processors/mail.ts +10 -7
  • api/src/lib/queue/types.ts +32 -7
api/src/lib/queue/QueueRegistry.ts
@@ -1,17 +1,17 @@
1 - import { Queue, QueueOptions } from 'bullmq';
1 + import { Queue, QueueOptions } from "bullmq";
2 2
3 - export type QueueName = 'mail'; // TODO: add image here later on
3 + export type QueueName = "mail" | "insights";
4 4
5 5 const queues = new Map<QueueName, Queue>();
6 6
7 7 const defaultOpts: QueueOptions = {
8 8 connection: {
9 9 host: process.env.VALKEY_HOST,
10 - port: Number(process.env.VALKEY_PORT)
10 + port: Number(process.env.VALKEY_PORT),
11 11 },
12 12 defaultJobOptions: {
13 13 attempts: 3,
14 - backoff: { type: 'exponential', delay: 2000 },
14 + backoff: { type: "exponential", delay: 2000 },
15 15 removeOnComplete: { count: 100 },
16 16 removeOnFail: { count: 500 },
17 17 },
@@ -26,5 +26,5 @@ return queues.get(name)!;
26 26 }
27 27
28 28 export async function closeAllQueues(): Promise<void> {
29 - await Promise.all([...queues.values()].map(q => q.close()));
29 + await Promise.all([...queues.values()].map((q) => q.close()));
30 30 }
api/src/lib/queue/WorkerRegistry.ts
@@ -1,34 +1,40 @@
1 - import { Worker, WorkerOptions, Processor } from 'bullmq';
2 - import { QueueName } from '@app/lib/queue/QueueRegistry';
3 - import { mailProcessor } from '@app/lib/queue/processors/mail';
1 + import { Worker, WorkerOptions, Processor } from "bullmq";
2 + import { getQueue, QueueName } from "@app/lib/queue/QueueRegistry";
3 + import { mailProcessor } from "@app/lib/queue/processors/mail";
4 + import { insightProcessor } from "@app/lib/queue/processors/insights";
5 + import { InsightJobName } from "@app/lib/queue/types";
4 6
5 7 type ProcessorMap = Record<QueueName, Processor>;
6 8
7 9 const processorMap: ProcessorMap = {
8 - mail: mailProcessor
10 + mail: mailProcessor,
11 + insights: insightProcessor,
9 12 };
10 13
11 14 // Per-queue concurrency
12 15 // TODO: When we process images, we need to bump for that queue
13 16 const concurrencyMap: Record<QueueName, number> = {
14 - mail: 10
17 + mail: 10,
18 + insights: 5,
15 19 };
16 20
17 21 const workers: Worker[] = [];
18 22
19 23 export function bootWorkers(): void {
24 + console.log("Booting workers...");
20 25 for (const [name, processor] of Object.entries(processorMap)) {
21 26 const queueName = name as QueueName;
22 27
28 + console.log(`Worker ${queueName} booting...`);
23 29 const worker = new Worker(queueName, processor, {
24 30 connection: {
25 31 host: process.env.VALKEY_HOST,
26 - port: Number(process.env.VALKEY_PORT)
32 + port: Number(process.env.VALKEY_PORT),
27 33 },
28 34 concurrency: concurrencyMap[queueName],
29 35 } satisfies WorkerOptions);
30 36
31 - worker.on('failed', (job, err) => {
37 + worker.on("failed", (job, err) => {
32 38 console.error(`[${queueName}] job ${job?.id} failed:`, err.message);
33 39 });
34 40
@@ -36,6 +42,18 @@ workers.push(worker);
36 42 }
37 43 }
38 44
45 + export async function scheduleInsights(): Promise<void> {
46 + const queue = getQueue("insights");
47 + await queue.add(
48 + InsightJobName.FanOut,
49 + {},
50 + {
51 + repeat: { pattern: "0 3 * * 1" }, // Mondays at 3am
52 + jobId: "insight-fan-out-weekly", // prevents duplicate repeatable jobs on restart
53 + },
54 + );
55 + }
56 +
39 57 export async function closeAllWorkers(): Promise<void> {
40 - await Promise.all(workers.map(w => w.close()));
58 + await Promise.all(workers.map((w) => w.close()));
41 59 }
api/src/lib/queue/index.ts
@@ -1,5 +1,9 @@
1 - export { getQueue, closeAllQueues } from '@app/lib/queue/QueueRegistry';
2 - export { bootWorkers, closeAllWorkers } from '@app/lib/queue/WorkerRegistry';
3 - export { MailJobName } from '@app/lib/queue/types';
1 + export { getQueue, closeAllQueues } from "@app/lib/queue/QueueRegistry";
2 + export {
3 + bootWorkers,
4 + closeAllWorkers,
5 + scheduleInsights,
6 + } from "@app/lib/queue/WorkerRegistry";
7 + export { MailJobName } from "@app/lib/queue/types";
4 8
5 - export type { MailJobData } from '@app/lib/queue/types';
9 + export type { MailJobData } from "@app/lib/queue/types";
api/src/lib/queue/processors/insights/energySleep.ts
@@ -0,0 +1,100 @@
1 + import { prisma } from "@app/lib/prisma";
2 + import { InsightType, InsightPeriod } from "@prisma/client";
3 + import { InsightPeriodPayload } from "@app/lib/queue/types";
4 +
5 + // Pearson correlation coefficient, returns value in [-1, 1]
6 + // TODO: Document this more and write to thesis
7 + function pearson(xs: number[], ys: number[]): number {
8 + const n = xs.length;
9 + if (n < 2) return 0;
10 +
11 + const meanX = xs.reduce((a, b) => a + b, 0) / n;
12 + const meanY = ys.reduce((a, b) => a + b, 0) / n;
13 +
14 + const num = xs.reduce((sum, x, i) => sum + (x - meanX) * (ys[i] - meanY), 0);
15 + const denX = Math.sqrt(xs.reduce((sum, x) => sum + (x - meanX) ** 2, 0));
16 + const denY = Math.sqrt(ys.reduce((sum, y) => sum + (y - meanY) ** 2, 0));
17 +
18 + if (denX === 0 || denY === 0) return 0;
19 + return num / (denX * denY);
20 + }
21 +
22 + export async function processEnergySleep({
23 + userId,
24 + periodStart,
25 + periodEnd,
26 + }: InsightPeriodPayload): Promise<void> {
27 + const start = new Date(periodStart);
28 + const end = new Date(periodEnd);
29 +
30 + const sleepRecords = await prisma.sleepRecord.findMany({
31 + where: { userId, date: { gte: start, lte: end } },
32 + orderBy: { date: "asc" },
33 + });
34 +
35 + if (sleepRecords.length < 3) return;
36 +
37 + // For each sleep record, find the closest mood entry on the following day
38 + // XXX: What if the user logs two sleep entries on the same day?
39 + const pairs: Array<{ sleep: number; energy: number }> = [];
40 +
41 + for (const record of sleepRecords) {
42 + const nextDayStart = new Date(record.date);
43 + nextDayStart.setDate(nextDayStart.getDate() + 1);
44 + const nextDayEnd = new Date(nextDayStart);
45 + nextDayEnd.setDate(nextDayEnd.getDate() + 1);
46 +
47 + const mood = await prisma.mood.findFirst({
48 + where: { userId, moment: { gte: nextDayStart, lt: nextDayEnd } },
49 + orderBy: { moment: "asc" },
50 + select: { energyLevel: true },
51 + });
52 +
53 + if (mood) {
54 + pairs.push({ sleep: record.average, energy: mood.energyLevel });
55 + }
56 + }
57 +
58 + if (pairs.length < 3) return;
59 +
60 + const sleepValues = pairs.map((p) => p.sleep);
61 + const energyValues = pairs.map((p) => p.energy);
62 + const score = parseFloat(pearson(sleepValues, energyValues).toFixed(3));
63 +
64 + await prisma.insight.upsert({
65 + where: {
66 + userId_type_periodStart: {
67 + userId,
68 + type: InsightType.ENERGY_SLEEP_CORRELATION,
69 + periodStart: start,
70 + },
71 + },
72 + update: {
73 + body: buildBody(score, pairs.length),
74 + metadata: { correlationScore: score, sampleSize: pairs.length },
75 + generatedAt: new Date(),
76 + },
77 + create: {
78 + userId,
79 + type: InsightType.ENERGY_SLEEP_CORRELATION,
80 + period: InsightPeriod.WEEKLY,
81 + title: "Sono e Energia",
82 + body: buildBody(score, pairs.length),
83 + metadata: { correlationScore: score, sampleSize: pairs.length },
84 + periodStart: start,
85 + periodEnd: end,
86 + },
87 + });
88 + }
89 +
90 + function buildBody(score: number, sampleSize: number): string {
91 + const base = sampleSize < 5 ? " (dados limitados, continue registrando)" : "";
92 +
93 + if (score > 0.6)
94 + return `Seu sono tem forte impacto positivo na sua energia no dia seguinte.${base}`;
95 + if (score > 0.3)
96 + return `Seu sono tem impacto moderado na sua energia no dia seguinte.${base}`;
97 + if (score < -0.3)
98 + return `Curiosamente, mais sono está associado a menos energia — vale investigar.${base}`;
99 + return `Não encontramos uma relação clara entre seu sono e energia ainda.${base}`;
100 + }
api/src/lib/queue/processors/insights/index.ts
@@ -0,0 +1,62 @@
1 + import { Job } from "bullmq";
2 + import {
3 + InsightJobName,
4 + InsightJobData,
5 + InsightPeriodPayload,
6 + } from "@app/lib/queue/types";
7 + import { getQueue } from "@app/lib/queue/QueueRegistry";
8 + import { prisma } from "@app/lib/prisma";
9 + import { processMoodTrend } from "@app/lib/queue/processors/insights/moodTrend";
10 + import { processEnergySleep } from "@app/lib/queue/processors/insights/energySleep";
11 + import { processTriggerPattern } from "@app/lib/queue/processors/insights/triggerPattern";
12 +
13 + export async function insightProcessor(
14 + job: Job<InsightJobData["data"]>,
15 + ): Promise<void> {
16 + console.log(`Insight job ${job.name} started`);
17 +
18 + switch (job.name as InsightJobName) {
19 + case InsightJobName.FanOut:
20 + return fanOut();
21 +
22 + case InsightJobName.MoodTrend:
23 + return processMoodTrend(job.data as InsightPeriodPayload);
24 +
25 + case InsightJobName.EnergySleep:
26 + return processEnergySleep(job.data as InsightPeriodPayload);
27 +
28 + case InsightJobName.TriggerPattern:
29 + return processTriggerPattern(job.data as InsightPeriodPayload);
30 +
31 + default:
32 + throw new Error(`Unknown insight job: ${job.name}`);
33 + }
34 + }
35 +
36 + async function fanOut(): Promise<void> {
37 + const users = await prisma.user.findMany({
38 + where: { active: true },
39 + select: { id: true },
40 + });
41 +
42 + const queue = getQueue("insights");
43 + const periodEnd = new Date();
44 + const periodStart = new Date();
45 + periodStart.setDate(periodEnd.getDate() - 7);
46 +
47 + const payload: Omit<InsightPeriodPayload, "userId"> = {
48 + periodStart: periodStart.toISOString(),
49 + periodEnd: periodEnd.toISOString(),
50 + };
51 +
52 + await queue.addBulk(
53 + users.flatMap((u) => [
54 + { name: InsightJobName.MoodTrend, data: { userId: u.id, ...payload } },
55 + { name: InsightJobName.EnergySleep, data: { userId: u.id, ...payload } },
56 + {
57 + name: InsightJobName.TriggerPattern,
58 + data: { userId: u.id, ...payload },
59 + },
60 + ]),
61 + );
62 + }
api/src/lib/queue/processors/insights/moodTrend.ts
@@ -0,0 +1,81 @@
1 + import { prisma } from "@app/lib/prisma";
2 + import { InsightType, InsightPeriod, BaseMoodOption } from "@prisma/client";
3 + import { InsightPeriodPayload } from "@app/lib/queue/types";
4 +
5 + const MOOD_SCORE: Record<BaseMoodOption, number> = {
6 + GREAT: 5,
7 + GOOD: 4,
8 + NEUTRAL: 3,
9 + SAD: 2,
10 + ANGRY: 1,
11 + };
12 +
13 + const avg = (values: number[]) =>
14 + values.reduce((a, b) => a + b, 0) / values.length;
15 +
16 + export async function processMoodTrend({
17 + userId,
18 + periodStart,
19 + periodEnd,
20 + }: InsightPeriodPayload): Promise<void> {
21 + const start = new Date(periodStart);
22 + const end = new Date(periodEnd);
23 +
24 + const moods = await prisma.mood.findMany({
25 + where: { userId, moment: { gte: start, lte: end } },
26 + orderBy: { moment: "asc" },
27 + select: { selectedMood: true, energyLevel: true, moment: true },
28 + });
29 +
30 + if (moods.length < 3) return;
31 +
32 + const scores = moods.map((m) => MOOD_SCORE[m.selectedMood]);
33 + const mid = Math.floor(scores.length / 2);
34 + const avgFirst = avg(scores.slice(0, mid));
35 + const avgSecond = avg(scores.slice(mid));
36 + const delta = Math.round(((avgSecond - avgFirst) / avgFirst) * 100);
37 +
38 + // Dominant mood in the period
39 + const moodCounts = moods.reduce<Partial<Record<BaseMoodOption, number>>>(
40 + (acc, m) => ({ ...acc, [m.selectedMood]: (acc[m.selectedMood] ?? 0) + 1 }),
41 + {},
42 + );
43 + const dominantMood = (
44 + Object.entries(moodCounts) as [BaseMoodOption, number][]
45 + ).sort((a, b) => b[1] - a[1])[0][0];
46 +
47 + const avgEnergy = avg(moods.map((m) => m.energyLevel));
48 +
49 + await prisma.insight.upsert({
50 + where: {
51 + userId_type_periodStart: {
52 + userId,
53 + type: InsightType.MOOD_TREND,
54 + periodStart: start,
55 + },
56 + },
57 + update: {
58 + body: buildBody(delta),
59 + metadata: { delta, avgFirst, avgSecond, dominantMood, avgEnergy },
60 + generatedAt: new Date(),
61 + },
62 + create: {
63 + userId,
64 + type: InsightType.MOOD_TREND,
65 + period: InsightPeriod.WEEKLY,
66 + title: "Tendência de Humor",
67 + body: buildBody(delta),
68 + metadata: { delta, avgFirst, avgSecond, dominantMood, avgEnergy },
69 + periodStart: start,
70 + periodEnd: end,
71 + },
72 + });
73 + }
74 +
75 + function buildBody(delta: number): string {
76 + if (delta > 10)
77 + return `Seu humor melhorou ${delta}% em relação ao início do período.`;
78 + if (delta < -10)
79 + return `Seu humor caiu ${Math.abs(delta)}% em relação ao início do período.`;
80 + return "Seu humor se manteve estável durante o período.";
81 + }
api/src/lib/queue/processors/insights/triggerPattern.ts
@@ -0,0 +1,87 @@
1 + import { prisma } from "@app/lib/prisma";
2 + import { InsightType, InsightPeriod, TriggerType } from "@prisma/client";
3 + import { InsightPeriodPayload } from "@app/lib/queue/types";
4 +
5 + const TRIGGER_LABEL: Record<TriggerType, string> = {
6 + SOCIAL: "social",
7 + WORK: "trabalho",
8 + HEALTH: "saúde",
9 + PHYSICAL: "físico",
10 + FAMILY: "família",
11 + OTHER: "outros",
12 + };
13 +
14 + export async function processTriggerPattern({
15 + userId,
16 + periodStart,
17 + periodEnd,
18 + }: InsightPeriodPayload): Promise<void> {
19 + const start = new Date(periodStart);
20 + const end = new Date(periodEnd);
21 +
22 + const triggers = await prisma.trigger.findMany({
23 + where: { userId, moment: { gte: start, lte: end } },
24 + select: { category: true },
25 + });
26 +
27 + if (triggers.length === 0) return;
28 +
29 + // Count by category
30 + const counts = triggers.reduce<Partial<Record<TriggerType, number>>>(
31 + (acc, t) => ({ ...acc, [t.category]: (acc[t.category] ?? 0) + 1 }),
32 + {},
33 + );
34 +
35 + const sorted = (Object.entries(counts) as [TriggerType, number][]).sort(
36 + (a, b) => b[1] - a[1],
37 + );
38 +
39 + const [topCategory, topCount] = sorted[0];
40 + const distribution = Object.fromEntries(sorted);
41 +
42 + await prisma.insight.upsert({
43 + where: {
44 + userId_type_periodStart: {
45 + userId,
46 + type: InsightType.TRIGGER_PATTERN,
47 + periodStart: start,
48 + },
49 + },
50 + update: {
51 + body: buildBody(topCategory, topCount, triggers.length),
52 + metadata: {
53 + topCategory,
54 + topCount,
55 + total: triggers.length,
56 + distribution,
57 + },
58 + generatedAt: new Date(),
59 + },
60 + create: {
61 + userId,
62 + type: InsightType.TRIGGER_PATTERN,
63 + period: InsightPeriod.WEEKLY,
64 + title: "Padrão de Gatilhos",
65 + body: buildBody(topCategory, topCount, triggers.length),
66 + metadata: {
67 + topCategory,
68 + topCount,
69 + total: triggers.length,
70 + distribution,
71 + },
72 + periodStart: start,
73 + periodEnd: end,
74 + },
75 + });
76 + }
77 +
78 + function buildBody(top: TriggerType, count: number, total: number): string {
79 + const pct = Math.round((count / total) * 100);
80 + const label = TRIGGER_LABEL[top];
81 + return `
82 + Gatilhos de ${label} foram os mais frequentes,
83 + representando ${pct}% dos registros do período.
84 + `
85 + .replace(/\s*\n\s*/g, " ")
86 + .trim();
87 + }
api/src/lib/queue/processors/insights/utils.ts
@@ -0,0 +1,10 @@
1 + export function rollingPeriod(days = 7) {
2 + const periodEnd = new Date();
3 + const periodStart = new Date();
4 + periodStart.setDate(periodEnd.getDate() - days);
5 +
6 + return {
7 + periodStart: periodStart.toISOString(),
8 + periodEnd: periodEnd.toISOString(),
9 + };
10 + }
api/src/lib/queue/processors/mail.ts
@@ -1,18 +1,21 @@
1 - import { Job } from 'bullmq';
1 + import { Job } from "bullmq";
2 2 import {
3 - MailJobName, MailJobData,
3 + MailJobName,
4 + MailJobData,
4 5 WelcomeEmailPayload,
5 6 PasswordResetPayload,
6 - ActivateAccountEmailPayload
7 - } from '@app/lib/queue/types';
7 + ActivateAccountEmailPayload,
8 + } from "@app/lib/queue/types";
8 9
9 10 import {
10 11 sendWelcomeEmail,
11 12 sendResetPasswordEmail,
12 - sendActivateAccountEmail
13 - } from '@app/services/mail'
13 + sendActivateAccountEmail,
14 + } from "@app/services/mail";
14 15
15 - export async function mailProcessor(job: Job<MailJobData['data']>): Promise<void> {
16 + export async function mailProcessor(
17 + job: Job<MailJobData["data"]>,
18 + ): Promise<void> {
16 19 // job.name is the discriminant
17 20 switch (job.name as MailJobName) {
18 21 case MailJobName.WelcomeEmail: {
api/src/lib/queue/types.ts
@@ -1,15 +1,40 @@
1 1 // Mail queue
2 2 export enum MailJobName {
3 - WelcomeEmail = 'mail:welcome',
4 - PasswordReset = 'mail:password-reset',
5 - ActivateAccountEmail = 'mail:activate-account'
3 + WelcomeEmail = "mail:welcome",
4 + PasswordReset = "mail:password-reset",
5 + ActivateAccountEmail = "mail:activate-account",
6 6 }
7 7
8 - export type WelcomeEmailPayload = { userId: number; code: string };
9 - export type PasswordResetPayload = { userId: number; token: string };
8 + export type WelcomeEmailPayload = { userId: number; code: string };
9 + export type PasswordResetPayload = { userId: number; token: string };
10 10 export type ActivateAccountEmailPayload = { userId: number; code: string };
11 11
12 12 export type MailJobData =
13 - | { name: MailJobName.WelcomeEmail; data: WelcomeEmailPayload }
13 + | { name: MailJobName.WelcomeEmail; data: WelcomeEmailPayload }
14 14 | { name: MailJobName.PasswordReset; data: PasswordResetPayload }
15 - | { name: MailJobName.ActivateAccountEmail; data: ActivateAccountEmailPayload };
15 + | {
16 + name: MailJobName.ActivateAccountEmail;
17 + data: ActivateAccountEmailPayload;
18 + };
19 +
20 + // Insight queue
21 + export enum InsightJobName {
22 + FanOut = "insight:fan-out",
23 + MoodTrend = "insight:mood-trend",
24 + EnergySleep = "insight:energy-sleep",
25 + TriggerPattern = "insight:trigger-pattern",
26 + }
27 +
28 + export type InsightPeriodPayload = {
29 + userId: number;
30 + periodStart: string;
31 + periodEnd: string;
32 + };
33 +
34 + export type InsightFanOutPayload = Record<string, never>; // no data, just triggers
35 +
36 + export type InsightJobData =
37 + | { name: InsightJobName.FanOut; data: InsightFanOutPayload }
38 + | { name: InsightJobName.MoodTrend; data: InsightPeriodPayload }
39 + | { name: InsightJobName.EnergySleep; data: InsightPeriodPayload }
40 + | { name: InsightJobName.TriggerPattern; data: InsightPeriodPayload };