Skip to content

Commit 2e68ed6

Browse files
authored
Merge pull request #1512 from jetstreamapp/chore/add-email-storage
chore: add email storage
2 parents 24d9c7a + c1e4400 commit 2e68ed6

File tree

7 files changed

+339
-1
lines changed

7 files changed

+339
-1
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
import { ENV, logger, prisma } from '@jetstream/api-config';
2+
import type { Request, Response } from '@jetstream/api-types';
3+
import { getErrorMessageAndStackObj } from '@jetstream/shared/utils';
4+
import crypto from 'crypto';
5+
import { LRUCache } from 'lru-cache';
6+
import { z } from 'zod';
7+
8+
// Cache for tracking used webhook tokens to prevent replay attacks
9+
// Tokens expire after 15 minutes, same as our timestamp validation window
10+
const WEBHOOK_TOKEN_CACHE = new LRUCache<string, boolean>({
11+
max: 10000, // Store up to 10k recent tokens
12+
ttl: 1000 * 60 * 15, // 15 minutes
13+
});
14+
15+
// Maximum age for webhook timestamps (in seconds)
16+
const MAX_TIMESTAMP_AGE_SECONDS = 60 * 15; // 15 minutes
17+
18+
// Mailgun webhook payload schema based on their documentation
19+
const MailgunWebhookSignatureSchema = z.object({
20+
timestamp: z.string(),
21+
token: z.string(),
22+
signature: z.string(),
23+
});
24+
25+
const MailgunDeliveryStatusSchema = z
26+
.object({
27+
code: z.number().optional(),
28+
message: z.string().optional(),
29+
description: z.string().optional(),
30+
'enhanced-code': z.string().optional(),
31+
'attempt-no': z.number().optional(),
32+
'mx-host': z.string().optional(),
33+
'session-seconds': z.number().optional(),
34+
tls: z.boolean().optional(),
35+
'certificate-verified': z.boolean().optional(),
36+
})
37+
.optional();
38+
39+
const MailgunEnvelopeSchema = z
40+
.object({
41+
sender: z.string().optional(),
42+
'sending-ip': z.string().optional(),
43+
transport: z.string().optional(),
44+
targets: z.string().optional(),
45+
})
46+
.optional();
47+
48+
const MailgunMessageHeadersSchema = z
49+
.object({
50+
to: z.string().optional(),
51+
from: z.string().optional(),
52+
subject: z.string().optional(),
53+
'message-id': z.string().optional(),
54+
})
55+
.optional();
56+
57+
const MailgunMessageSchema = z
58+
.object({
59+
headers: MailgunMessageHeadersSchema,
60+
size: z.number().optional(),
61+
attachments: z.array(z.any()).optional(),
62+
})
63+
.optional();
64+
65+
const MailgunFlagsSchema = z
66+
.object({
67+
'is-test-mode': z.boolean().optional(),
68+
'is-routed': z.boolean().optional(),
69+
'is-authenticated': z.boolean().optional(),
70+
'is-system-test': z.boolean().optional(),
71+
})
72+
.optional();
73+
74+
const MailgunEventDataSchema = z.object({
75+
id: z.string().optional(),
76+
event: z.string(),
77+
timestamp: z.number(),
78+
'log-level': z.string().optional(),
79+
recipient: z.string(),
80+
'recipient-domain': z.string().optional(),
81+
'recipient-provider': z.string().optional(),
82+
'delivery-status': MailgunDeliveryStatusSchema,
83+
envelope: MailgunEnvelopeSchema,
84+
message: MailgunMessageSchema,
85+
flags: MailgunFlagsSchema,
86+
tags: z.array(z.string()).optional(),
87+
'user-variables': z.record(z.string(), z.any()).optional(),
88+
});
89+
90+
const MailgunWebhookPayloadSchema = z.object({
91+
signature: MailgunWebhookSignatureSchema,
92+
'event-data': MailgunEventDataSchema,
93+
});
94+
95+
export const routeDefinition = {
96+
webhook: {
97+
controllerFn: () => mailgunWebhookHandler,
98+
},
99+
};
100+
101+
const mailgunWebhookHandler = async (req: Request, res: Response) => {
102+
try {
103+
// Parse and validate the webhook payload
104+
const rawBody = req.body as Buffer;
105+
const parseResult = MailgunWebhookPayloadSchema.safeParse(JSON.parse(rawBody.toString()));
106+
107+
if (!parseResult.success) {
108+
logger.warn({ error: parseResult.error }, 'Invalid Mailgun webhook payload');
109+
return res.status(400).send('Invalid payload');
110+
}
111+
112+
const { signature, 'event-data': eventData } = parseResult.data;
113+
114+
// Verify webhook signature if signing key is configured
115+
if (ENV.MAILGUN_WEBHOOK_SIGNING_KEY) {
116+
// Check timestamp freshness to prevent replay attacks
117+
const timestampAge = Math.abs(Date.now() / 1000 - parseInt(signature.timestamp));
118+
if (timestampAge > MAX_TIMESTAMP_AGE_SECONDS) {
119+
logger.warn(
120+
{ timestamp: signature.timestamp, age: timestampAge },
121+
'Mailgun webhook timestamp too old or too far in the future',
122+
);
123+
return res.status(403).send('Invalid timestamp');
124+
}
125+
126+
// Check if this token has already been used (replay attack prevention)
127+
if (WEBHOOK_TOKEN_CACHE.has(signature.token)) {
128+
logger.warn({ token: signature.token }, 'Mailgun webhook token already used (replay attack)');
129+
return res.status(403).send('Token already used');
130+
}
131+
132+
// Verify the signature
133+
const isValid = verifyWebhookSignature({
134+
timestamp: signature.timestamp,
135+
token: signature.token,
136+
signature: signature.signature,
137+
signingKey: ENV.MAILGUN_WEBHOOK_SIGNING_KEY,
138+
});
139+
140+
if (!isValid) {
141+
logger.warn({ timestamp: signature.timestamp }, 'Invalid Mailgun webhook signature');
142+
return res.status(403).send('Invalid signature');
143+
}
144+
145+
// Cache the token to prevent replay attacks
146+
WEBHOOK_TOKEN_CACHE.set(signature.token, true);
147+
} else {
148+
logger.warn('Mailgun webhook signing key not configured - skipping signature verification');
149+
return res.status(500).send('Webhook signing key not configured');
150+
}
151+
152+
// Extract recipient domain from recipient email
153+
const recipientDomain = eventData['recipient-domain'] || eventData.recipient.split('@')[1] || 'unknown';
154+
155+
// Store the webhook event in the database
156+
await prisma.mailgunWebhookEvent.create({
157+
data: {
158+
// Event metadata
159+
eventId: eventData.id,
160+
event: eventData.event,
161+
timestamp: new Date(eventData.timestamp * 1000),
162+
logLevel: eventData['log-level'],
163+
164+
// Recipient information
165+
recipient: eventData.recipient,
166+
recipientDomain,
167+
recipientProvider: eventData['recipient-provider'],
168+
169+
// Message information
170+
subject: eventData.message?.headers?.subject,
171+
messageId: eventData.message?.headers?.['message-id'],
172+
fromAddress: eventData.message?.headers?.from,
173+
toAddress: eventData.message?.headers?.to,
174+
messageSize: eventData.message?.size,
175+
176+
// Delivery status
177+
deliveryCode: eventData['delivery-status']?.code,
178+
deliveryMessage: eventData['delivery-status']?.message,
179+
deliveryDescription: eventData['delivery-status']?.description,
180+
deliveryEnhancedCode: eventData['delivery-status']?.['enhanced-code'],
181+
deliveryAttemptNo: eventData['delivery-status']?.['attempt-no'],
182+
deliveryMxHost: eventData['delivery-status']?.['mx-host'],
183+
deliverySessionSeconds: eventData['delivery-status']?.['session-seconds'],
184+
deliveryTls: eventData['delivery-status']?.tls,
185+
deliveryCertVerified: eventData['delivery-status']?.['certificate-verified'],
186+
187+
// Envelope information
188+
envelopeSender: eventData.envelope?.sender,
189+
envelopeSendingIp: eventData.envelope?.['sending-ip'],
190+
envelopeTransport: eventData.envelope?.transport,
191+
192+
// Metadata
193+
tags: eventData.tags || [],
194+
userVariables: eventData['user-variables'],
195+
flags: eventData.flags,
196+
},
197+
});
198+
199+
res.status(200).end();
200+
} catch (err) {
201+
logger.error(getErrorMessageAndStackObj(err), 'Error processing Mailgun webhook');
202+
return res.status(500).send(`Error processing Mailgun webhook`);
203+
}
204+
};
205+
206+
function verifyWebhookSignature({
207+
timestamp,
208+
token,
209+
signature,
210+
signingKey,
211+
}: {
212+
timestamp: string;
213+
token: string;
214+
signature: string;
215+
signingKey: string;
216+
}): boolean {
217+
const encodedToken = crypto.createHmac('sha256', signingKey).update(timestamp.concat(token)).digest('hex');
218+
return encodedToken === signature;
219+
}
Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1+
import { createRateLimit } from '@jetstream/api-config';
12
import { raw } from 'body-parser';
23
import express, { Router } from 'express';
34
import { routeDefinition as billingController } from '../controllers/billing.controller';
5+
import { routeDefinition as mailgunWebhookController } from '../controllers/mailgun-webhook.controller';
46

57
const routes: express.Router = Router();
68

9+
const webhookRateLimit = createRateLimit('webhook', {
10+
windowMs: 1000 * 60 * 1, // 1 minute
11+
limit: 1000,
12+
});
13+
714
routes.use(raw({ type: 'application/json' }));
815

9-
routes.post('/stripe', billingController.webhook.controllerFn());
16+
routes.post('/stripe', webhookRateLimit, billingController.webhook.controllerFn());
17+
routes.post('/mailgun', webhookRateLimit, mailgunWebhookController.webhook.controllerFn());
1018

1119
export default routes;

libs/api-config/src/lib/env-config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ const envSchema = z.object({
188188
JETSTREAM_EMAIL_FROM_NAME: z.string().optional().default(''),
189189
JETSTREAM_EMAIL_REPLY_TO: z.string().optional().default(''),
190190
MAILGUN_API_KEY: z.string().optional(),
191+
MAILGUN_WEBHOOK_SIGNING_KEY: z.string().optional(),
191192
/**
192193
* Salesforce Org Connections
193194
* Connected App OAuth2 for connecting orgs

libs/auth/server/src/lib/auth.constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ export const EMAIL_VERIFICATION_TOKEN_DURATION_HOURS = 1;
55
export const DELETE_AUTH_ACTIVITY_DAYS = 365;
66
export const DELETE_EMAIL_ACTIVITY_DAYS = 180;
77
export const DELETE_TOKEN_DAYS = 3;
8+
export const DELETE_MAILGUN_WEBHOOK_DAYS = 30;

libs/auth/server/src/lib/auth.db.service.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import { actionDisplayName, methodDisplayName } from './auth-logging.db.service'
3939
import {
4040
DELETE_AUTH_ACTIVITY_DAYS,
4141
DELETE_EMAIL_ACTIVITY_DAYS,
42+
DELETE_MAILGUN_WEBHOOK_DAYS,
4243
DELETE_TOKEN_DAYS,
4344
PASSWORD_RESET_DURATION_MINUTES,
4445
} from './auth.constants';
@@ -132,6 +133,11 @@ export async function pruneExpiredRecords() {
132133
expiresAt: { lte: addDays(startOfDay(new Date()), -DELETE_TOKEN_DAYS) },
133134
},
134135
});
136+
await prisma.mailgunWebhookEvent.deleteMany({
137+
where: {
138+
createdAt: { lte: addDays(startOfDay(new Date()), -DELETE_MAILGUN_WEBHOOK_DAYS) },
139+
},
140+
});
135141
}
136142

137143
async function findUserByProviderId(provider: OauthProviderType, providerAccountId: string) {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
-- CreateTable
2+
CREATE TABLE "mailgun_webhook_event" (
3+
"id" UUID NOT NULL DEFAULT uuid_generate_v4(),
4+
"eventId" VARCHAR(255),
5+
"event" VARCHAR(50) NOT NULL,
6+
"timestamp" TIMESTAMP(3) NOT NULL,
7+
"logLevel" VARCHAR(20),
8+
"recipient" VARCHAR(255) NOT NULL,
9+
"recipientDomain" VARCHAR(255) NOT NULL,
10+
"recipientProvider" VARCHAR(100),
11+
"subject" VARCHAR(500),
12+
"messageId" VARCHAR(255),
13+
"fromAddress" VARCHAR(255),
14+
"toAddress" VARCHAR(255),
15+
"messageSize" INTEGER,
16+
"deliveryCode" INTEGER,
17+
"deliveryMessage" VARCHAR(500),
18+
"deliveryDescription" VARCHAR(500),
19+
"deliveryEnhancedCode" VARCHAR(50),
20+
"deliveryAttemptNo" INTEGER,
21+
"deliveryMxHost" VARCHAR(255),
22+
"deliverySessionSeconds" DOUBLE PRECISION,
23+
"deliveryTls" BOOLEAN,
24+
"deliveryCertVerified" BOOLEAN,
25+
"envelopeSender" VARCHAR(255),
26+
"envelopeSendingIp" VARCHAR(50),
27+
"envelopeTransport" VARCHAR(20),
28+
"tags" TEXT[] DEFAULT ARRAY[]::TEXT[],
29+
"userVariables" JSON,
30+
"flags" JSON,
31+
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
32+
33+
CONSTRAINT "mailgun_webhook_event_pkey" PRIMARY KEY ("id")
34+
);
35+
36+
-- CreateIndex
37+
CREATE INDEX "mailgun_webhook_event_createdAt_idx" ON "mailgun_webhook_event"("createdAt");
38+
39+
-- CreateIndex
40+
CREATE INDEX "mailgun_webhook_event_event_createdAt_idx" ON "mailgun_webhook_event"("event", "createdAt");
41+
42+
-- CreateIndex
43+
CREATE INDEX "mailgun_webhook_event_recipient_createdAt_idx" ON "mailgun_webhook_event"("recipient", "createdAt");
44+
45+
-- CreateIndex
46+
CREATE INDEX "mailgun_webhook_event_recipientDomain_createdAt_idx" ON "mailgun_webhook_event"("recipientDomain", "createdAt");
47+
48+
-- CreateIndex
49+
CREATE INDEX "mailgun_webhook_event_deliveryCode_event_idx" ON "mailgun_webhook_event"("deliveryCode", "event");
50+
51+
-- CreateIndex
52+
CREATE INDEX "mailgun_webhook_event_timestamp_idx" ON "mailgun_webhook_event"("timestamp");

prisma/schema.prisma

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,57 @@ model EmailActivity {
337337
createdAt DateTime @default(now())
338338
}
339339

340+
model MailgunWebhookEvent {
341+
id String @id @default(dbgenerated("uuid_generate_v4()")) @db.Uuid
342+
eventId String? @db.VarChar(255)
343+
event String @db.VarChar(50)
344+
timestamp DateTime
345+
logLevel String? @db.VarChar(20)
346+
347+
// Recipient information
348+
recipient String @db.VarChar(255)
349+
recipientDomain String @db.VarChar(255)
350+
recipientProvider String? @db.VarChar(100)
351+
352+
// Message information
353+
subject String? @db.VarChar(500)
354+
messageId String? @db.VarChar(255)
355+
fromAddress String? @db.VarChar(255)
356+
toAddress String? @db.VarChar(255)
357+
messageSize Int?
358+
359+
// Delivery status
360+
deliveryCode Int?
361+
deliveryMessage String? @db.VarChar(500)
362+
deliveryDescription String? @db.VarChar(500)
363+
deliveryEnhancedCode String? @db.VarChar(50)
364+
deliveryAttemptNo Int?
365+
deliveryMxHost String? @db.VarChar(255)
366+
deliverySessionSeconds Float?
367+
deliveryTls Boolean?
368+
deliveryCertVerified Boolean?
369+
370+
// Envelope information
371+
envelopeSender String? @db.VarChar(255)
372+
envelopeSendingIp String? @db.VarChar(50)
373+
envelopeTransport String? @db.VarChar(20)
374+
375+
// Additional metadata
376+
tags String[] @default([])
377+
userVariables Json? @db.Json
378+
flags Json? @db.Json
379+
380+
createdAt DateTime @default(now())
381+
382+
@@index([createdAt])
383+
@@index([event, createdAt])
384+
@@index([recipient, createdAt])
385+
@@index([recipientDomain, createdAt])
386+
@@index([deliveryCode, event])
387+
@@index([timestamp])
388+
@@map("mailgun_webhook_event")
389+
}
390+
340391
model UserPreference {
341392
id String @id @default(dbgenerated("uuid_generate_v4()")) @db.Uuid
342393
user User @relation(fields: [userId], references: [id], onDelete: Cascade)

0 commit comments

Comments
 (0)