fix(admin): bound owner-email enrichment concurrency to avoid self-throttle
A large concurrent burst of getUserById calls during the tunnels-list email enrichment self-inflicts an upstream throttle (truncated/empty bodies) that even the per-call retry can't fully escape, intermittently rendering owner_email as '—'. Add mapWithConcurrency and resolve owner emails at most a few at a time so each lookup stays inside the throttle allowance; retry + null fallback preserved.
This commit is contained in:
+16
-7
@@ -1,6 +1,6 @@
|
|||||||
import type { User } from '@supabase/supabase-js';
|
import type { User } from '@supabase/supabase-js';
|
||||||
import { getSupabaseAdmin } from '@/lib/supabase/admin';
|
import { getSupabaseAdmin } from '@/lib/supabase/admin';
|
||||||
import { withAdminRetry } from '@/lib/admin/retry';
|
import { withAdminRetry, mapWithConcurrency } from '@/lib/admin/retry';
|
||||||
import {
|
import {
|
||||||
parseOrder,
|
parseOrder,
|
||||||
parseSort,
|
parseSort,
|
||||||
@@ -77,6 +77,12 @@ type TunnelRow = TunnelJoinRow & {
|
|||||||
const USER_SCAN_MAX_PAGES = 50;
|
const USER_SCAN_MAX_PAGES = 50;
|
||||||
const USER_SCAN_PER_PAGE = 1000;
|
const USER_SCAN_PER_PAGE = 1000;
|
||||||
|
|
||||||
|
// Resolve tunnel owner emails a few at a time rather than all at once: a large
|
||||||
|
// concurrent burst of getUserById calls self-inflicts an upstream throttle
|
||||||
|
// (truncated/empty bodies) that even per-call retries can't fully escape, which
|
||||||
|
// is what intermittently rendered owner_email as "—" under load.
|
||||||
|
const OWNER_EMAIL_CONCURRENCY = 4;
|
||||||
|
|
||||||
function userSortValue(u: User, sort: UserSort): string | number {
|
function userSortValue(u: User, sort: UserSort): string | number {
|
||||||
switch (sort) {
|
switch (sort) {
|
||||||
case 'email':
|
case 'email':
|
||||||
@@ -291,11 +297,14 @@ export async function getTunnelsList(opts: {
|
|||||||
|
|
||||||
// Resolve owner emails (per-row getUserById; acceptable for current scale).
|
// Resolve owner emails (per-row getUserById; acceptable for current scale).
|
||||||
// The user_id comes from an existing tunnel row, so an empty body here is a
|
// The user_id comes from an existing tunnel row, so an empty body here is a
|
||||||
// transient burst flake rather than a genuine not-found — retry it. The
|
// transient burst flake rather than a genuine not-found — retry it, and bound
|
||||||
// try/catch null fallback remains as a last resort so one bad row can never
|
// the concurrency so the enrichment doesn't self-throttle. The try/catch null
|
||||||
// 500 the whole list (it surfaces as "—" only if every retry still fails).
|
// fallback remains as a last resort so one bad row can never 500 the whole
|
||||||
const emails = await Promise.all(
|
// list (it surfaces as "—" only if every retry still fails).
|
||||||
rows.map(async (t) => {
|
const emails = await mapWithConcurrency(
|
||||||
|
rows,
|
||||||
|
OWNER_EMAIL_CONCURRENCY,
|
||||||
|
async (t) => {
|
||||||
try {
|
try {
|
||||||
const { data: u } = await withAdminRetry(() =>
|
const { data: u } = await withAdminRetry(() =>
|
||||||
admin.auth.admin.getUserById(t.user_id),
|
admin.auth.admin.getUserById(t.user_id),
|
||||||
@@ -304,7 +313,7 @@ export async function getTunnelsList(opts: {
|
|||||||
} catch {
|
} catch {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}),
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
const tunnels: TunnelItem[] = rows.map((t, i) => ({
|
const tunnels: TunnelItem[] = rows.map((t, i) => ({
|
||||||
|
|||||||
@@ -126,3 +126,33 @@ export async function withAdminRetry<R extends { error: MaybeError }>(
|
|||||||
if (threw) throw lastThrown;
|
if (threw) throw lastThrown;
|
||||||
return lastResult as R;
|
return lastResult as R;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map over `items` running at most `concurrency` async tasks at a time, while
|
||||||
|
* preserving result order.
|
||||||
|
*
|
||||||
|
* Firing every GoTrue admin lookup at once (e.g. one `getUserById` per tunnel
|
||||||
|
* row) can self-inflict an upstream throttle: the proxy truncates the tail of a
|
||||||
|
* large concurrent burst, producing the very empty-body responses we retry on —
|
||||||
|
* and because the retries fire back into the same saturated window, a few rows
|
||||||
|
* can still fail. Bounding the concurrency keeps each lookup inside the
|
||||||
|
* throttle's allowance so {@link withAdminRetry} reliably resolves every row.
|
||||||
|
*/
|
||||||
|
export async function mapWithConcurrency<T, R>(
|
||||||
|
items: readonly T[],
|
||||||
|
concurrency: number,
|
||||||
|
task: (item: T, index: number) => Promise<R>,
|
||||||
|
): Promise<R[]> {
|
||||||
|
const results = new Array<R>(items.length);
|
||||||
|
const limit = Math.max(1, Math.min(concurrency, items.length || 1));
|
||||||
|
let next = 0;
|
||||||
|
async function worker(): Promise<void> {
|
||||||
|
for (;;) {
|
||||||
|
const i = next++;
|
||||||
|
if (i >= items.length) return;
|
||||||
|
results[i] = await task(items[i], i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await Promise.all(Array.from({ length: limit }, () => worker()));
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user