summaryrefslogtreecommitdiff
path: root/src/queue/push_notification.ts
blob: 4912a1822fc080fdedf00d2e38fce949f012c80a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import { Queue, Worker } from "bullmq";
import { connection } from "../db/redis/info";
import {
  sendPushNotifications,
  PushNotificationData,
} from "../notifications/push";
import { logger } from "../lib/pino";

/** Job data for sending push notifications to multiple devices */
export type PushNotificationJob = {
  pushTokens: string[];
  notification: PushNotificationData;
};

/** Queue for batching and sending push notifications to parents */
export const pushNotificationQueue = new Queue<PushNotificationJob>(
  "pushNotificationQueue",
  {
    connection,
  },
);

/** Worker that processes push notification jobs and sends them via Expo */
export const pushNotificationWorker = new Worker<PushNotificationJob>(
  "pushNotificationQueue",
  async (job) => {
    const { pushTokens, notification } = job.data;

    try {
      if (!Array.isArray(pushTokens)) {
        logger.error(
          { jobId: job.id },
          "pushTokens is not an array in job data",
        );
        throw new Error("Invalid pushTokens");
      }

      if (pushTokens.length === 0) {
        logger.warn({ jobId: job.id }, "Empty pushTokens array in job");
        return { success: false, sent: 0, reason: "No tokens" };
      }

      if (!notification || typeof notification !== "object") {
        logger.error({ jobId: job.id }, "Invalid notification data in job");
        throw new Error("Invalid notification data");
      }

      if (!notification.title || !notification.body) {
        logger.error(
          { jobId: job.id },
          "Notification missing title or body in job",
        );
        throw new Error("Notification must have title and body");
      }

      logger.info(
        {
          jobId: job.id,
          tokenCount: pushTokens.length,
          title: notification.title,
        },
        "Processing push notification job",
      );

      const result = await sendPushNotifications(pushTokens, notification);

      logger.info(
        {
          jobId: job.id,
          success: result.success,
          sent: result.results.length,
          tokenCount: pushTokens.length,
        },
        "Push notifications sent",
      );

      return { success: result.success, sent: result.results.length };
    } catch (error) {
      logger.error(
        { error, jobId: job.id, tokenCount: pushTokens?.length },
        "Failed to send push notifications in job",
      );
      throw error;
    }
  },
  {
    connection,
    concurrency: 10,
  },
);

pushNotificationWorker.on("active", (job) => {
  logger.debug(
    { jobId: job!.id, tokenCount: job!.data.pushTokens.length },
    "Push notification job is active",
  );
});

pushNotificationWorker.on("completed", (job, returnvalue) => {
  logger.info(
    { jobId: job!.id, result: returnvalue },
    "Push notification job completed",
  );
});

pushNotificationWorker.on("error", (err) => {
  logger.error({ error: err }, "Push notification worker error");
});

pushNotificationWorker.on("failed", (job, err) => {
  logger.error(
    { error: err, jobId: job?.id, tokenCount: job?.data.pushTokens?.length },
    "Push notification job failed",
  );
});