From e904e9634548e47d611bdcbb88d7b180b927fd5f Mon Sep 17 00:00:00 2001 From: JustZvan Date: Fri, 6 Feb 2026 12:16:40 +0100 Subject: feat: initial commit! --- src/queue/push_notification.ts | 115 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 src/queue/push_notification.ts (limited to 'src/queue/push_notification.ts') diff --git a/src/queue/push_notification.ts b/src/queue/push_notification.ts new file mode 100644 index 0000000..4912a18 --- /dev/null +++ b/src/queue/push_notification.ts @@ -0,0 +1,115 @@ +import { Queue, Worker } from "bullmq"; +import { connection } from "../db/redis/info"; +import { + sendPushNotifications, + PushNotificationData, +} from "../notifications/push"; +import { logger } from "../lib/pino"; + +/** Job data for sending push notifications to multiple devices */ +export type PushNotificationJob = { + pushTokens: string[]; + notification: PushNotificationData; +}; + +/** Queue for batching and sending push notifications to parents */ +export const pushNotificationQueue = new Queue( + "pushNotificationQueue", + { + connection, + }, +); + +/** Worker that processes push notification jobs and sends them via Expo */ +export const pushNotificationWorker = new Worker( + "pushNotificationQueue", + async (job) => { + const { pushTokens, notification } = job.data; + + try { + if (!Array.isArray(pushTokens)) { + logger.error( + { jobId: job.id }, + "pushTokens is not an array in job data", + ); + throw new Error("Invalid pushTokens"); + } + + if (pushTokens.length === 0) { + logger.warn({ jobId: job.id }, "Empty pushTokens array in job"); + return { success: false, sent: 0, reason: "No tokens" }; + } + + if (!notification || typeof notification !== "object") { + logger.error({ jobId: job.id }, "Invalid notification data in job"); + throw new Error("Invalid notification data"); + } + + if (!notification.title || !notification.body) { + logger.error( + { jobId: job.id }, + "Notification missing title or body in job", + ); + throw new Error("Notification must have title and body"); + } + + logger.info( + { + jobId: job.id, + tokenCount: pushTokens.length, + title: notification.title, + }, + "Processing push notification job", + ); + + const result = await sendPushNotifications(pushTokens, notification); + + logger.info( + { + jobId: job.id, + success: result.success, + sent: result.results.length, + tokenCount: pushTokens.length, + }, + "Push notifications sent", + ); + + return { success: result.success, sent: result.results.length }; + } catch (error) { + logger.error( + { error, jobId: job.id, tokenCount: pushTokens?.length }, + "Failed to send push notifications in job", + ); + throw error; + } + }, + { + connection, + concurrency: 10, + }, +); + +pushNotificationWorker.on("active", (job) => { + logger.debug( + { jobId: job!.id, tokenCount: job!.data.pushTokens.length }, + "Push notification job is active", + ); +}); + +pushNotificationWorker.on("completed", (job, returnvalue) => { + logger.info( + { jobId: job!.id, result: returnvalue }, + "Push notification job completed", + ); +}); + +pushNotificationWorker.on("error", (err) => { + logger.error({ error: err }, "Push notification worker error"); +}); + +pushNotificationWorker.on("failed", (job, err) => { + logger.error( + { error: err, jobId: job?.id, tokenCount: job?.data.pushTokens?.length }, + "Push notification job failed", + ); +}); -- cgit v1.2.3