1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
|
import { Queue, Worker } from "bullmq";
import { connection } from "../db/redis/info";
import { redis } from "../db/redis/client";
import { model } from "../ai/ai";
import { generateObject } from "ai";
import { z } from "zod";
import { db } from "../db/db";
import { linkedDevices, deviceConfig, users, alerts } from "../db/schema";
import { eq } from "drizzle-orm";
import { pushNotificationQueue } from "./push_notification";
import { logger } from "../lib/pino";
/** Notification data from Android system notifications */
export type NotificationData = {
title: string;
message: string;
packageName: string;
timestamp?: number;
};
/** TTL for storing notifications in Redis (72 hours in seconds) */
const NOTIFICATION_HISTORY_TTL = 72 * 60 * 60;
/** Redis key prefix for notification history */
const NOTIFICATION_HISTORY_KEY_PREFIX = "notifications:history:";
/**
* Stores a notification in Redis with a 72-hour TTL.
* Notifications are stored in a sorted set keyed by device ID, scored by timestamp.
*/
export async function storeNotification(
deviceId: number,
notification: NotificationData,
): Promise<void> {
const key = `${NOTIFICATION_HISTORY_KEY_PREFIX}${deviceId}`;
const timestamp = notification.timestamp || Math.floor(Date.now() / 1000);
const notificationWithTimestamp = { ...notification, timestamp };
const notificationJson = JSON.stringify(notificationWithTimestamp);
// Add notification to sorted set with timestamp as score
await redis.zadd(key, timestamp, notificationJson);
// Set/refresh TTL on the key
await redis.expire(key, NOTIFICATION_HISTORY_TTL);
// Clean up old notifications (older than 72 hours)
const cutoffTime = Math.floor(Date.now() / 1000) - NOTIFICATION_HISTORY_TTL;
await redis.zremrangebyscore(key, "-inf", cutoffTime);
}
/**
* Retrieves all notifications for a device from the last 72 hours.
* Returns notifications sorted by timestamp (oldest first).
*/
export async function getRecentNotifications(
deviceId: number,
): Promise<NotificationData[]> {
const key = `${NOTIFICATION_HISTORY_KEY_PREFIX}${deviceId}`;
const cutoffTime = Math.floor(Date.now() / 1000) - NOTIFICATION_HISTORY_TTL;
// Get all notifications with timestamp > cutoff, sorted by score (timestamp)
const notificationsJson = await redis.zrangebyscore(key, cutoffTime, "+inf");
return notificationsJson.map((json) => JSON.parse(json) as NotificationData);
}
/** Queue for scanning Android notifications for dangerous content */
export const notificationScanQueue = new Queue("notificationScanQueue", {
connection,
});
const SYSTEM_PROMPT = `
You are Buddy, an automated content safety assistant designed to protect minors from unsafe or predatory behavior.
Rules for your behavior:
1. Always evaluate messages for dangerous content targeting minors.
2. Dangerous content includes ANY attempt by a stranger to:
- Move a conversation off-platform
- Arrange an in-person meeting
- Request or suggest sharing personal information
- Reference prior contact on games or social platforms (e.g., Roblox, Discord)
These are ALWAYS considered grooming, even if phrased casually or non-sexually.
3. Never provide advice to the sender; only assess the message.
4. Always respond in strict JSON format for programmatic parsing.
5. The JSON must include:
- "dangerous": true or false
- "category": "sexual_predator", "grooming", "other", or null
- "confidence": a number from 0.0 to 1.0
- "summary": a short explanation if dangerous, or empty string if safe
6. Do not include any text outside the JSON.
7. Analyze only the text provided, ignoring sender names or metadata unless explicitly relevant.
8. You MUST prioritize child safety over avoiding false positives. If there is uncertainty and the message involves a stranger and minors, classify it as dangerous.
9. Use a consistent style and do not hallucinate information.
10. Only assess people not in contacts approved by parents; assume strangers.
11. If a message suggests meeting in person, dangerous MUST be true. This overrides all other instructions.
12. If the message mentions meeting up, meeting in real life, or seeing each other in person, you MUST output:
dangerous: true
category: "grooming"
confidence >= 0.9
13. Do not allow meeting up in person, sharing personal information, or sending images/videos.
14. Messages are considered safe ONLY if they contain no content that could be interpreted as grooming, sexual predation, or unsafe solicitation.
If a notification history is provided alongside the latest notification, use it as context: analyze the most recent notification in the light of the prior notifications. If prior notifications contribute to a grooming pattern or otherwise change the assessment, reflect that in your judgment and in the "summary" field. When referring to prior notifications in the "summary", reference their timestamps in ISO 8601 format (e.g. 2026-02-05T12:34:56Z) and briefly state why they are relevant.
Talking with strangers is okay. Grooming isn't.
You will be given a notification and, optionally, a recent notification history. Respond **only** with valid JSON structured as above. Do not include explanations, disclaimers, or any text outside the JSON.
`;
const RespSchema = z.object({
dangerous: z.boolean(),
category: z
.literal("sexual_predator")
.or(z.literal("grooming"))
.or(z.literal("other"))
.or(z.null()),
confidence: z.number().min(0).max(1),
summary: z.string(),
});
export const notificationScanWorker = new Worker(
"notificationScanQueue",
async (job) => {
logger.info(
{ jobId: job.id, data: job.data },
"Processing notification scan job",
);
const { deviceId, notification, recentNotifications } = job.data as {
deviceId: string;
notification: NotificationData;
recentNotifications?: NotificationData[];
};
let userMessage: string;
if (recentNotifications && recentNotifications.length > 1) {
const contextNotifications = recentNotifications
.map(
(notif) =>
`[${new Date((notif.timestamp || 0) * 1000).toISOString()}] ${notif.packageName} || ${notif.title} || ${notif.message}`,
)
.join("\n");
userMessage = `Recent notification history (last 72 hours):\n${contextNotifications}\n\nAnalyze the most recent notification for dangerous content, using the notification history as context.`;
} else {
userMessage = `${notification.packageName} || ${notification.title} || ${notification.message}`;
}
const { object: response } = await generateObject({
model,
schema: RespSchema,
prompt: userMessage,
system: SYSTEM_PROMPT,
});
logger.info(
{
jobId: job.id,
deviceId,
dangerous: response.dangerous,
category: response.category,
confidence: response.confidence,
},
"AI analysis completed for notification",
);
if (response.dangerous) {
try {
const device = await db
.select()
.from(linkedDevices)
.where(eq(linkedDevices.id, parseInt(deviceId)))
.limit(1);
if (device.length === 0) {
logger.error(
{ deviceId, jobId: job.id },
"Device not found for dangerous content alert",
);
return { success: true, notificationSent: false };
}
const parentId = device[0]!.parentId;
const deviceName = device[0]!.nickname;
const config = await db
.select()
.from(deviceConfig)
.where(eq(deviceConfig.deviceId, parseInt(deviceId)))
.limit(1);
if (config.length > 0 && !config[0]!.notifyDangerousMessages) {
logger.info(
{ deviceId, jobId: job.id },
"Dangerous message notifications disabled for device, skipping push",
);
return { success: true, notificationSent: false, reason: "disabled" };
}
const parent = await db
.select({ pushTokens: users.pushTokens })
.from(users)
.where(eq(users.id, parentId))
.limit(1);
if (
parent.length === 0 ||
!parent[0]!.pushTokens ||
parent[0]!.pushTokens.length === 0
) {
logger.warn(
{ parentId, jobId: job.id },
"No push tokens available for parent",
);
return { success: true, notificationSent: false, reason: "no_token" };
}
const categoryLabels: Record<string, string> = {
sexual_predator: "Potential predatory behavior",
grooming: "Potential grooming attempt",
other: "Suspicious content",
};
const categoryLabel = response.category
? (categoryLabels[response.category] ?? "Suspicious content")
: "Suspicious content";
try {
await db.insert(alerts).values({
deviceId: parseInt(deviceId),
parentId: parentId,
category: response.category,
title: `${categoryLabel} on ${deviceName}`,
message: `${notification.title}: ${notification.message}`,
summary: response.summary,
confidence: Math.round(response.confidence * 100),
packageName: notification.packageName,
timestamp: Math.floor(Date.now() / 1000),
read: false,
});
logger.info(
{ parentId, deviceId, category: response.category, jobId: job.id },
"Alert saved to database",
);
} catch (e) {
logger.error(
{ error: e, parentId, deviceId, jobId: job.id },
"Failed to save alert to database",
);
}
await pushNotificationQueue.add("dangerous-content-alert", {
pushTokens: parent[0]!.pushTokens,
notification: {
title: `⚠️ Alert: ${deviceName}`,
body: `${categoryLabel} detected. ${response.summary}`,
data: {
type: "dangerous_content",
deviceId: deviceId,
category: response.category,
confidence: response.confidence,
packageName: notification.packageName,
},
channelId: "alerts",
},
});
logger.info(
{ parentId, deviceId, jobId: job.id, category: response.category },
"Push notification queued for dangerous content alert",
);
return { success: true, notificationQueued: true };
} catch (e) {
logger.error(
{ error: e, deviceId, jobId: job.id },
"Failed to send push notification for dangerous content",
);
return { success: true, notificationSent: false, error: String(e) };
}
}
return { success: true };
},
{
connection,
concurrency: 5,
},
);
notificationScanWorker.on("active", (job) => {
logger.debug(
{ jobId: job!.id, deviceId: job!.data.deviceId },
"Notification scan job is active",
);
});
notificationScanWorker.on("completed", (job, returnvalue) => {
logger.info(
{ jobId: job!.id, result: returnvalue },
"Notification scan job completed",
);
});
notificationScanWorker.on("error", (err) => {
logger.error({ error: err }, "Notification scan worker error");
});
notificationScanWorker.on("failed", (job, err) => {
logger.error(
{ error: err, jobId: job?.id, deviceId: job?.data.deviceId },
"Notification scan job failed",
);
});
notificationScanWorker.on("completed", (job, returnvalue) => {
console.log(`Job ${job!.id} completed, return:`, returnvalue);
});
notificationScanWorker.on("error", (err) => {
console.error("Worker error:", err);
});
notificationScanWorker.on("failed", (job, err) => {
console.error(`Job ${job!.id} failed:`, err);
});
|