"""bandwidth-worker: polls frps dashboard, writes deltas to Postgres via PostgREST. Also enforces per-tunnel quota: when a tunnel crosses its quota it is marked inactive and its auth-cache entry is invalidated so the next frps Ping/NewProxy is rejected, terminating the live session (frps OSS exposes no kill endpoint). """ from __future__ import annotations import logging import os import sys import time from datetime import datetime, timezone from typing import Any import httpx import redis POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL_SECONDS", "60")) FRPS_API = os.environ.get("FRPS_API_URL", "http://frps:7500/api/proxy/http") FRPS_USER = os.environ["FRPS_DASHBOARD_USER"] FRPS_PASS = os.environ["FRPS_DASHBOARD_PASS"] SUPABASE_URL = os.environ.get("SUPABASE_URL", "http://supabase-kong:8000") SERVICE_ROLE = os.environ["SUPABASE_SERVICE_ROLE_KEY"] REDIS_URL = os.environ["REDIS_URL"] logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stdout, ) log = logging.getLogger("bandwidth-worker") rds = redis.from_url(REDIS_URL, decode_responses=True) last_seen: dict[str, int] = {} def _headers(extra: dict[str, str] | None = None) -> dict[str, str]: h = { "apikey": SERVICE_ROLE, "authorization": f"Bearer {SERVICE_ROLE}", "content-type": "application/json", } if extra: h.update(extra) return h def fetch_proxies(client: httpx.Client) -> list[dict[str, Any]]: resp = client.get(FRPS_API, auth=(FRPS_USER, FRPS_PASS), timeout=10.0) resp.raise_for_status() return resp.json().get("proxies") or [] def previous_total(subdomain: str) -> int: if subdomain in last_seen: return last_seen[subdomain] try: v = rds.get(f"tunnel:bytes_total:{subdomain}") if v is not None: return int(v) except Exception as e: log.warning("redis get failed for %s: %s", subdomain, e) return 0 def persist_total(subdomain: str, total: int) -> None: last_seen[subdomain] = total try: rds.set(f"tunnel:bytes_total:{subdomain}", total) except Exception as e: log.warning("redis set failed for %s: %s", subdomain, e) def enforce_quota(client: httpx.Client, subdomain: str, row: dict[str, Any], new_used: int) -> None: """Deactivate a tunnel that has exhausted its quota and invalidate the auth cache so the next frps Ping/NewProxy is rejected.""" quota = int(row.get("quota_bytes") or 0) is_active = bool(row.get("is_active")) token = row.get("token") or "" if quota <= 0 or not is_active or new_used < quota: return upd = client.patch( f"{SUPABASE_URL}/rest/v1/tunnels?subdomain=eq.{subdomain}", headers=_headers({"prefer": "return=minimal"}), json={"is_active": False}, timeout=10.0, ) if upd.status_code >= 300: log.error("quota deactivate failed %s: %s", upd.status_code, upd.text) return if token: try: rds.delete(f"tunnel:token:{token}") except Exception as e: log.warning("redis auth-cache invalidate failed for %s: %s", subdomain, e) log.warning( "QUOTA EXCEEDED: subdomain=%s used=%d quota=%d -> deactivated", subdomain, new_used, quota, ) def record_delta(client: httpx.Client, subdomain: str, delta: int) -> None: r = client.post( f"{SUPABASE_URL}/rest/v1/usage_samples", headers=_headers({"prefer": "return=minimal"}), json={"subdomain": subdomain, "bytes_delta": delta}, timeout=10.0, ) if r.status_code >= 300: log.error("usage_samples insert failed %s: %s", r.status_code, r.text) return cur = client.get( f"{SUPABASE_URL}/rest/v1/tunnels?select=bytes_used,quota_bytes,is_active,token&subdomain=eq.{subdomain}", headers=_headers(), timeout=10.0, ) if cur.status_code >= 300: log.error("tunnels select failed %s: %s", cur.status_code, cur.text) return rows = cur.json() if not rows: log.warning("tunnel row missing for subdomain=%s; sample retained", subdomain) return row = rows[0] current = int(row.get("bytes_used") or 0) new_used = current + delta upd = client.patch( f"{SUPABASE_URL}/rest/v1/tunnels?subdomain=eq.{subdomain}", headers=_headers({"prefer": "return=minimal"}), json={ "bytes_used": new_used, "last_seen_at": datetime.now(timezone.utc).isoformat(), }, timeout=10.0, ) if upd.status_code >= 300: log.error("tunnels update failed %s: %s", upd.status_code, upd.text) return enforce_quota(client, subdomain, row, new_used) def poll_once(client: httpx.Client) -> None: proxies = fetch_proxies(client) log.info("poll: %d proxies", len(proxies)) for p in proxies: name = p.get("name") or "" if not name: continue traffic_in = int(p.get("today_traffic_in") or 0) traffic_out = int(p.get("today_traffic_out") or 0) total = traffic_in + traffic_out prev = previous_total(name) delta = total - prev if delta < 0: delta = total # daily counter reset if delta == 0: persist_total(name, total) continue try: record_delta(client, name, delta) persist_total(name, total) log.info("recorded delta=%d for %s (total=%d)", delta, name, total) except Exception as e: log.error("record_delta failed for %s: %s", name, e) def main() -> None: log.info("bandwidth-worker starting; interval=%ss api=%s", POLL_INTERVAL, FRPS_API) with httpx.Client() as client: while True: try: poll_once(client) except Exception as e: log.error("poll failed: %s", e) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()