import db from "../../db.server"; import { ensureWebhookCleanupScheduled } from "./cleanup.server"; /** * How long a `status="processing"` reservation is considered "live" before we * assume the worker that claimed it crashed mid-process. After this window a * stale reservation may be reclaimed and the work retried. */ const STALE_LEASE_MS = 5 * 60 * 1000; // 5 minutes interface ProcessedRow { webhookId: string; status: string; receivedAt: Date; } /** * Minimal shape of the Prisma client surface we use — declared inline so the * helper can be unit-tested with a tiny stub instead of a real database. */ export interface DedupeDeps { db: { processedWebhook: { create: (args: { data: { webhookId: string; topic: string; shopDomain: string; status: string }; }) => Promise; findUnique: (args: { where: { webhookId: string } }) => Promise; update: (args: { where: { webhookId: string }; data: { status?: string; receivedAt?: Date }; }) => Promise; delete: (args: { where: { webhookId: string } }) => Promise; }; }; } /** * A claim on a single Shopify webhook delivery. Obtained from * {@link reserveWebhook}. The caller MUST eventually `commit()` (work * succeeded — the delivery is permanently deduped) or `release()` (work * failed — drop the reservation so Shopify's retry re-runs the work). * * `commit`/`release` are no-ops for reservations without a webhook id (unit * tests / non-Shopify callers) and for the fail-open path. */ export interface WebhookReservation { webhookId: string | null; commit: () => Promise; release: () => Promise; } function noopReservation(webhookId: string | null): WebhookReservation { return { webhookId, commit: async () => {}, release: async () => {}, }; } function isP2002(err: unknown): boolean { // Duck-typed so callers can stub the db without pulling in the real // `Prisma` namespace. P2002 = unique-constraint violation. return (err as { code?: string } | null)?.code === "P2002"; } function makeReservation( webhookId: string, shop: string, topic: string, deps: DedupeDeps, ): WebhookReservation { return { webhookId, commit: async () => { try { await deps.db.processedWebhook.update({ where: { webhookId }, data: { status: "done" }, }); } catch (err) { // The work already succeeded; a failed commit just risks a later // duplicate (which the side-effect code is expected to tolerate). console.warn(`dedupe: failed to commit webhook ${webhookId} (${topic}/${shop}):`, err); } }, release: async () => { try { await deps.db.processedWebhook.delete({ where: { webhookId } }); } catch (err) { console.warn(`dedupe: failed to release webhook ${webhookId} (${topic}/${shop}):`, err); } }, }; } /** * Reserve this Shopify webhook delivery for processing. * * Shopify retries a delivery (re-using the same `X-Shopify-Webhook-Id`) when * it doesn't receive a 200 within its ~5s timeout. Naively recording the id as * "processed" *before* doing the work meant that if the heavy background work * later failed (SMTP/GraphQL/PDF error), Shopify's retry was dropped as a * duplicate and the invoice was never sent. * * This uses a two-phase reserve/commit keyed on the webhook id, with the * unique `webhookId` primary key as the concurrency lock: * * - RESERVE: insert a `status="processing"` row. A unique-constraint * violation (`P2002`) means the id is already claimed; we then inspect the * existing row: * - `done` → genuine duplicate → return `null` (skip). * - `processing`, fresh → another delivery is in flight → `null`. * - `processing`, stale → previous worker crashed → reclaim & retry. * - COMMIT (caller, on success) → flip the row to `status="done"`. * - RELEASE (caller, on failure) → delete the row so a retry reprocesses. * * Returns a {@link WebhookReservation} when the caller should process the * delivery, or `null` when it must short-circuit (duplicate / concurrent). * * Fail-open: a dedupe-table error (other than P2002) never silently drops a * webhook — we return a no-op reservation and let the work run. */ export async function reserveWebhook( request: Request, shop: string, topic: string, deps: DedupeDeps = { db }, ): Promise { // Opportunistically schedule TTL cleanup (runtime-only; never in build/CLI // since this is reached only while handling a live webhook request). ensureWebhookCleanupScheduled(); const webhookId = request.headers.get("x-shopify-webhook-id"); if (!webhookId) { // No id (unit tests / non-Shopify callers): process without dedupe. return noopReservation(null); } const reservation = makeReservation(webhookId, shop, topic, deps); try { await deps.db.processedWebhook.create({ data: { webhookId, topic, shopDomain: shop, status: "processing" }, }); return reservation; } catch (err) { if (!isP2002(err)) { // Don't fail (or silently drop) a webhook on a logging-table issue. console.warn(`dedupe: failed to reserve webhook ${webhookId} (${topic}/${shop}):`, err); return noopReservation(webhookId); } } // A row already exists. Classify it. let existing: ProcessedRow | null = null; try { existing = await deps.db.processedWebhook.findUnique({ where: { webhookId } }); } catch (err) { console.warn(`dedupe: failed to load existing webhook ${webhookId} (${topic}/${shop}):`, err); // Another worker owns the row and we can't classify it — be safe and skip. return null; } if (!existing) { // Raced with a release/delete between create() and findUnique(); reclaim. return reservation; } if (existing.status === "done") { console.log( `dedupe: skipping already-processed ${topic} for ${shop} (webhookId=${webhookId})`, ); return null; } const age = Date.now() - new Date(existing.receivedAt).getTime(); if (age > STALE_LEASE_MS) { // The worker that reserved this crashed mid-process (or left a stale row). // Renew the lease and retry the work. try { await deps.db.processedWebhook.update({ where: { webhookId }, data: { status: "processing", receivedAt: new Date() }, }); } catch (err) { console.warn(`dedupe: failed to reclaim stale webhook ${webhookId}:`, err); return null; } console.log( `dedupe: reclaiming stale ${topic} reservation for ${shop} ` + `(webhookId=${webhookId}, age=${Math.round(age / 1000)}s)`, ); return reservation; } // A fresh "processing" row: another delivery is actively working on it. // Skip this concurrent delivery. Shopify will retry; if the active worker // fails it releases the reservation so a later retry reprocesses. console.log( `dedupe: ${topic} for ${shop} already in-flight (webhookId=${webhookId}); ` + `skipping concurrent delivery`, ); return null; }