diff options
| author | JustZvan <justzvan@justzvan.xyz> | 2026-02-06 12:16:40 +0100 |
|---|---|---|
| committer | JustZvan <justzvan@justzvan.xyz> | 2026-02-06 12:16:40 +0100 |
| commit | e904e9634548e47d611bdcbb88d7b180b927fd5f (patch) | |
| tree | 21aa5be08fc5b22585508c0263ee5ea4effcc593 /src/queue/push_notification.ts | |
feat: initial commit!
Diffstat (limited to 'src/queue/push_notification.ts')
| -rw-r--r-- | src/queue/push_notification.ts | 115 |
1 files changed, 115 insertions, 0 deletions
diff --git a/src/queue/push_notification.ts b/src/queue/push_notification.ts new file mode 100644 index 0000000..4912a18 --- /dev/null +++ b/src/queue/push_notification.ts @@ -0,0 +1,115 @@ +import { Queue, Worker } from "bullmq"; +import { connection } from "../db/redis/info"; +import { + sendPushNotifications, + PushNotificationData, +} from "../notifications/push"; +import { logger } from "../lib/pino"; + +/** Job data for sending push notifications to multiple devices */ +export type PushNotificationJob = { + pushTokens: string[]; + notification: PushNotificationData; +}; + +/** Queue for batching and sending push notifications to parents */ +export const pushNotificationQueue = new Queue<PushNotificationJob>( + "pushNotificationQueue", + { + connection, + }, +); + +/** Worker that processes push notification jobs and sends them via Expo */ +export const pushNotificationWorker = new Worker<PushNotificationJob>( + "pushNotificationQueue", + async (job) => { + const { pushTokens, notification } = job.data; + + try { + if (!Array.isArray(pushTokens)) { + logger.error( + { jobId: job.id }, + "pushTokens is not an array in job data", + ); + throw new Error("Invalid pushTokens"); + } + + if (pushTokens.length === 0) { + logger.warn({ jobId: job.id }, "Empty pushTokens array in job"); + return { success: false, sent: 0, reason: "No tokens" }; + } + + if (!notification || typeof notification !== "object") { + logger.error({ jobId: job.id }, "Invalid notification data in job"); + throw new Error("Invalid notification data"); + } + + if (!notification.title || !notification.body) { + logger.error( + { jobId: job.id }, + "Notification missing title or body in job", + ); + throw new Error("Notification must have title and body"); + } + + logger.info( + { + jobId: job.id, + tokenCount: pushTokens.length, + title: notification.title, + }, + "Processing push notification job", + ); + + const result = await sendPushNotifications(pushTokens, notification); + + logger.info( + { + jobId: job.id, + success: result.success, + sent: result.results.length, + tokenCount: pushTokens.length, + }, + "Push notifications sent", + ); + + return { success: result.success, sent: result.results.length }; + } catch (error) { + logger.error( + { error, jobId: job.id, tokenCount: pushTokens?.length }, + "Failed to send push notifications in job", + ); + throw error; + } + }, + { + connection, + concurrency: 10, + }, +); + +pushNotificationWorker.on("active", (job) => { + logger.debug( + { jobId: job!.id, tokenCount: job!.data.pushTokens.length }, + "Push notification job is active", + ); +}); + +pushNotificationWorker.on("completed", (job, returnvalue) => { + logger.info( + { jobId: job!.id, result: returnvalue }, + "Push notification job completed", + ); +}); + +pushNotificationWorker.on("error", (err) => { + logger.error({ error: err }, "Push notification worker error"); +}); + +pushNotificationWorker.on("failed", (job, err) => { + logger.error( + { error: err, jobId: job?.id, tokenCount: job?.data.pushTokens?.length }, + "Push notification job failed", + ); +}); |