dev: add parallel dev environment under /docker/dev

Near-1:1 clone of the prod remote-access stack, isolated on a new external
dev_edge network and fronted by the same shared Caddy instance (dual-homed on
edge + dev_edge). Dev is manual-start (not on boot).

- Hostnames: app-dev / api-dev .linumiq.net, tunnels under *.dev.linumiq.net,
  dev tunnel ingress on port 7001.
- Dev Supabase (project supabase-dev, *-dev containers), web, frps, redis,
  stripe-stub, bandwidth-worker with fresh independent secrets (gitignored).
- Shared Caddyfile: app-dev -> web-dev, api-dev -> dev kong (+webhook block),
  *.dev -> frps-dev vhost. Caddy compose dual-homed on dev_edge.
- On-demand-TLS authorizer (prod check-subdomain, in gitignored volumes/)
  extended additively: app-dev/api-dev -> 200; *.dev delegated to the dev
  authorizer. Prod allow-list logic unchanged.
- dev.sh manual up/down/ps helper; README documents topology + secrets.

Secrets, frps.toml, volumes/, web worktree and data dirs are gitignored.
This commit is contained in:
2026-05-30 13:23:34 +02:00
parent 50ab46dbe1
commit 7fe0cc3753
25 changed files with 1473 additions and 0 deletions
+186
View File
@@ -0,0 +1,186 @@
"""bandwidth-worker: polls frps dashboard, writes deltas to Postgres via PostgREST.
Also enforces per-tunnel quota: when a tunnel crosses its quota it is marked
inactive and its auth-cache entry is invalidated so the next frps Ping/NewProxy
is rejected, terminating the live session (frps OSS exposes no kill endpoint).
"""
from __future__ import annotations
import logging
import os
import sys
import time
from datetime import datetime, timezone
from typing import Any
import httpx
import redis
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL_SECONDS", "60"))
FRPS_API = os.environ.get("FRPS_API_URL", "http://frps:7500/api/proxy/http")
FRPS_USER = os.environ["FRPS_DASHBOARD_USER"]
FRPS_PASS = os.environ["FRPS_DASHBOARD_PASS"]
SUPABASE_URL = os.environ.get("SUPABASE_URL", "http://supabase-kong:8000")
SERVICE_ROLE = os.environ["SUPABASE_SERVICE_ROLE_KEY"]
REDIS_URL = os.environ["REDIS_URL"]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
stream=sys.stdout,
)
log = logging.getLogger("bandwidth-worker")
rds = redis.from_url(REDIS_URL, decode_responses=True)
last_seen: dict[str, int] = {}
def _headers(extra: dict[str, str] | None = None) -> dict[str, str]:
h = {
"apikey": SERVICE_ROLE,
"authorization": f"Bearer {SERVICE_ROLE}",
"content-type": "application/json",
}
if extra:
h.update(extra)
return h
def fetch_proxies(client: httpx.Client) -> list[dict[str, Any]]:
resp = client.get(FRPS_API, auth=(FRPS_USER, FRPS_PASS), timeout=10.0)
resp.raise_for_status()
return resp.json().get("proxies") or []
def previous_total(subdomain: str) -> int:
if subdomain in last_seen:
return last_seen[subdomain]
try:
v = rds.get(f"tunnel:bytes_total:{subdomain}")
if v is not None:
return int(v)
except Exception as e:
log.warning("redis get failed for %s: %s", subdomain, e)
return 0
def persist_total(subdomain: str, total: int) -> None:
last_seen[subdomain] = total
try:
rds.set(f"tunnel:bytes_total:{subdomain}", total)
except Exception as e:
log.warning("redis set failed for %s: %s", subdomain, e)
def enforce_quota(client: httpx.Client, subdomain: str, row: dict[str, Any], new_used: int) -> None:
"""Deactivate a tunnel that has exhausted its quota and invalidate the
auth cache so the next frps Ping/NewProxy is rejected."""
quota = int(row.get("quota_bytes") or 0)
is_active = bool(row.get("is_active"))
token = row.get("token") or ""
if quota <= 0 or not is_active or new_used < quota:
return
upd = client.patch(
f"{SUPABASE_URL}/rest/v1/tunnels?subdomain=eq.{subdomain}",
headers=_headers({"prefer": "return=minimal"}),
json={"is_active": False},
timeout=10.0,
)
if upd.status_code >= 300:
log.error("quota deactivate failed %s: %s", upd.status_code, upd.text)
return
if token:
try:
rds.delete(f"tunnel:token:{token}")
except Exception as e:
log.warning("redis auth-cache invalidate failed for %s: %s", subdomain, e)
log.warning(
"QUOTA EXCEEDED: subdomain=%s used=%d quota=%d -> deactivated",
subdomain, new_used, quota,
)
def record_delta(client: httpx.Client, subdomain: str, delta: int) -> None:
r = client.post(
f"{SUPABASE_URL}/rest/v1/usage_samples",
headers=_headers({"prefer": "return=minimal"}),
json={"subdomain": subdomain, "bytes_delta": delta},
timeout=10.0,
)
if r.status_code >= 300:
log.error("usage_samples insert failed %s: %s", r.status_code, r.text)
return
cur = client.get(
f"{SUPABASE_URL}/rest/v1/tunnels?select=bytes_used,quota_bytes,is_active,token&subdomain=eq.{subdomain}",
headers=_headers(),
timeout=10.0,
)
if cur.status_code >= 300:
log.error("tunnels select failed %s: %s", cur.status_code, cur.text)
return
rows = cur.json()
if not rows:
log.warning("tunnel row missing for subdomain=%s; sample retained", subdomain)
return
row = rows[0]
current = int(row.get("bytes_used") or 0)
new_used = current + delta
upd = client.patch(
f"{SUPABASE_URL}/rest/v1/tunnels?subdomain=eq.{subdomain}",
headers=_headers({"prefer": "return=minimal"}),
json={
"bytes_used": new_used,
"last_seen_at": datetime.now(timezone.utc).isoformat(),
},
timeout=10.0,
)
if upd.status_code >= 300:
log.error("tunnels update failed %s: %s", upd.status_code, upd.text)
return
enforce_quota(client, subdomain, row, new_used)
def poll_once(client: httpx.Client) -> None:
proxies = fetch_proxies(client)
log.info("poll: %d proxies", len(proxies))
for p in proxies:
name = p.get("name") or ""
if not name:
continue
traffic_in = int(p.get("today_traffic_in") or 0)
traffic_out = int(p.get("today_traffic_out") or 0)
total = traffic_in + traffic_out
prev = previous_total(name)
delta = total - prev
if delta < 0:
delta = total # daily counter reset
if delta == 0:
persist_total(name, total)
continue
try:
record_delta(client, name, delta)
persist_total(name, total)
log.info("recorded delta=%d for %s (total=%d)", delta, name, total)
except Exception as e:
log.error("record_delta failed for %s: %s", name, e)
def main() -> None:
log.info("bandwidth-worker starting; interval=%ss api=%s", POLL_INTERVAL, FRPS_API)
with httpx.Client() as client:
while True:
try:
poll_once(client)
except Exception as e:
log.error("poll failed: %s", e)
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()