import pLimit from "p-limit"; import type { WebhookReservation } from "./dedupe.server"; /** * Background runner for webhook side-effects. * * Shopify expects a 200 response within ~5 seconds, otherwise it considers * the delivery failed and retries it. Heavy automation work (PDF render, * Shopify Files upload, SMTP send) routinely exceeded that budget, which * caused duplicate invoice emails before we added the dedupe table. * * Returning the response immediately and finishing the work afterwards keeps * Shopify happy. Two problems with a naive `void work()`: * * 1. DoS / resource exhaustion — an order burst would spawn unbounded * concurrent PDF renders + SMTP sends. We cap concurrency with a small * in-process queue (`p-limit`); excess tasks queue instead of piling up. * 2. Data loss on restart — `void work()` is invisible to shutdown, so a * container stop (SIGTERM) killed in-flight invoice work mid-send. We * track in-flight tasks and drain them (bounded) on SIGTERM/SIGINT. * * Reserve/commit dedupe (see dedupe.server.ts) is integrated here: on success * we `commit()` the reservation (permanently deduped); on failure we * `release()` it so Shopify's retry re-runs the work instead of being dropped * as a duplicate. */ const CONCURRENCY = Math.max(1, Number(process.env.WEBHOOK_CONCURRENCY) || 4); const DRAIN_TIMEOUT_MS = Math.max( 1000, Number(process.env.WEBHOOK_DRAIN_TIMEOUT_MS) || 25_000, ); const limit = pLimit(CONCURRENCY); const inFlight = new Set>(); let draining = false; export function runWebhookInBackground( description: string, work: () => Promise, reservation?: WebhookReservation | null, ): void { if (draining) { // The process is shutting down. We still enqueue so the drain awaits this // task — the server has already stopped listening, so this is at most the // tail end of the last accepted request. console.warn(`[webhook-queue] enqueuing task during shutdown drain: ${description}`); } const task = limit(async () => { try { await work(); await reservation?.commit(); } catch (err) { console.error(`background webhook task '${description}' failed:`, err); // Drop the dedupe reservation so Shopify's retry re-runs the work. try { await reservation?.release(); } catch (releaseErr) { console.error( `background webhook task '${description}': failed to release dedupe reservation:`, releaseErr, ); } } }); inFlight.add(task); void task.finally(() => inFlight.delete(task)); } /** * Stop accepting new work (best-effort) and await in-flight + queued tasks, * bounded by `timeoutMs`, so a container stop drains invoice work instead of * killing it mid-send. Idempotent. */ export async function drainWebhookQueue(timeoutMs = DRAIN_TIMEOUT_MS): Promise { draining = true; if (inFlight.size === 0) return; console.log( `[webhook-queue] draining ${inFlight.size} in-flight webhook task(s) (timeout ${timeoutMs}ms)...`, ); let timer: ReturnType | undefined; const timeout = new Promise((resolve) => { timer = setTimeout(resolve, timeoutMs); if (typeof timer.unref === "function") timer.unref(); }); await Promise.race([Promise.allSettled([...inFlight]), timeout]); if (timer) clearTimeout(timer); if (inFlight.size > 0) { console.warn( `[webhook-queue] drain timed out with ${inFlight.size} task(s) still running`, ); } else { console.log("[webhook-queue] drain complete"); } } // Bridge for the custom server (server.js), which loads only the bundled // build and cannot import this module directly. It awaits this drain before // calling process.exit during graceful shutdown. type DrainGlobal = typeof globalThis & { __linumiqWebhookDrain?: typeof drainWebhookQueue; }; (globalThis as DrainGlobal).__linumiqWebhookDrain = drainWebhookQueue; // Safety net for runtimes that don't go through server.js (e.g. `shopify app // dev`): stop accepting work and best-effort drain. The custom server awaits // the same (idempotent) drain before exiting. for (const signal of ["SIGTERM", "SIGINT"] as const) { process.once(signal, () => { void drainWebhookQueue(); }); }