231 lines
7.8 KiB
Python
231 lines
7.8 KiB
Python
"""bandwidth-worker: polls frps dashboard, writes deltas to Postgres via PostgREST.
|
|
|
|
Also enforces per-tunnel quota: when a tunnel crosses its quota it is marked
|
|
inactive and its auth-cache entry is invalidated so the next frps Ping/NewProxy
|
|
is rejected, terminating the live session (frps OSS exposes no kill endpoint).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
import httpx
|
|
import redis
|
|
|
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL_SECONDS", "60"))
|
|
FRPS_API = os.environ.get("FRPS_API_URL", "http://frps:7500/api/proxy/http")
|
|
FRPS_USER = os.environ["FRPS_DASHBOARD_USER"]
|
|
FRPS_PASS = os.environ["FRPS_DASHBOARD_PASS"]
|
|
SUPABASE_URL = os.environ.get("SUPABASE_URL", "http://supabase-kong:8000")
|
|
SERVICE_ROLE = os.environ["SUPABASE_SERVICE_ROLE_KEY"]
|
|
REDIS_URL = os.environ["REDIS_URL"]
|
|
# TTL for the edge active-state cache key (tunnel:active:<sub>) read by the
|
|
# Caddy forward_auth gate. Short so a deactivation propagates fast and a stale
|
|
# "1" self-heals within one poll cycle; refreshed every poll while active.
|
|
ACTIVE_TTL = int(os.environ.get("ACTIVE_STATE_TTL_SECONDS", "45"))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
log = logging.getLogger("bandwidth-worker")
|
|
|
|
rds = redis.from_url(REDIS_URL, decode_responses=True)
|
|
last_seen: dict[str, int] = {}
|
|
|
|
|
|
def _headers(extra: dict[str, str] | None = None) -> dict[str, str]:
|
|
h = {
|
|
"apikey": SERVICE_ROLE,
|
|
"authorization": f"Bearer {SERVICE_ROLE}",
|
|
"content-type": "application/json",
|
|
}
|
|
if extra:
|
|
h.update(extra)
|
|
return h
|
|
|
|
|
|
def fetch_proxies(client: httpx.Client) -> list[dict[str, Any]]:
|
|
resp = client.get(FRPS_API, auth=(FRPS_USER, FRPS_PASS), timeout=10.0)
|
|
resp.raise_for_status()
|
|
return resp.json().get("proxies") or []
|
|
|
|
|
|
def previous_total(subdomain: str) -> int:
|
|
if subdomain in last_seen:
|
|
return last_seen[subdomain]
|
|
try:
|
|
v = rds.get(f"tunnel:bytes_total:{subdomain}")
|
|
if v is not None:
|
|
return int(v)
|
|
except Exception as e:
|
|
log.warning("redis get failed for %s: %s", subdomain, e)
|
|
return 0
|
|
|
|
|
|
def persist_total(subdomain: str, total: int) -> None:
|
|
last_seen[subdomain] = total
|
|
try:
|
|
rds.set(f"tunnel:bytes_total:{subdomain}", total)
|
|
except Exception as e:
|
|
log.warning("redis set failed for %s: %s", subdomain, e)
|
|
|
|
|
|
def enforce_quota(client: httpx.Client, subdomain: str, row: dict[str, Any], new_used: int) -> None:
|
|
"""Deactivate a tunnel that has exhausted its quota and invalidate the
|
|
auth cache so the next frps Ping/NewProxy is rejected."""
|
|
quota = int(row.get("quota_bytes") or 0)
|
|
is_active = bool(row.get("is_active"))
|
|
token = row.get("token") or ""
|
|
if quota <= 0 or not is_active or new_used < quota:
|
|
return
|
|
|
|
upd = client.patch(
|
|
f"{SUPABASE_URL}/rest/v1/tunnels?subdomain=eq.{subdomain}",
|
|
headers=_headers({"prefer": "return=minimal"}),
|
|
json={"is_active": False},
|
|
timeout=10.0,
|
|
)
|
|
if upd.status_code >= 300:
|
|
log.error("quota deactivate failed %s: %s", upd.status_code, upd.text)
|
|
return
|
|
|
|
if token:
|
|
try:
|
|
rds.delete(f"tunnel:token:{token}")
|
|
except Exception as e:
|
|
log.warning("redis auth-cache invalidate failed for %s: %s", subdomain, e)
|
|
|
|
# Flip the edge active-state key so the Caddy forward_auth gate denies the
|
|
# next request (within ~1 request) even on a still-connected tunnel.
|
|
try:
|
|
rds.set(f"tunnel:active:{subdomain}", "0", ex=ACTIVE_TTL)
|
|
except Exception as e:
|
|
log.warning("redis active-state deactivate failed for %s: %s", subdomain, e)
|
|
|
|
log.warning(
|
|
"QUOTA EXCEEDED: subdomain=%s used=%d quota=%d -> deactivated",
|
|
subdomain, new_used, quota,
|
|
)
|
|
|
|
|
|
def record_delta(client: httpx.Client, subdomain: str, delta: int) -> None:
|
|
r = client.post(
|
|
f"{SUPABASE_URL}/rest/v1/usage_samples",
|
|
headers=_headers({"prefer": "return=minimal"}),
|
|
json={"subdomain": subdomain, "bytes_delta": delta},
|
|
timeout=10.0,
|
|
)
|
|
if r.status_code >= 300:
|
|
log.error("usage_samples insert failed %s: %s", r.status_code, r.text)
|
|
return
|
|
|
|
cur = client.get(
|
|
f"{SUPABASE_URL}/rest/v1/tunnels?select=bytes_used,quota_bytes,is_active,token&subdomain=eq.{subdomain}",
|
|
headers=_headers(),
|
|
timeout=10.0,
|
|
)
|
|
if cur.status_code >= 300:
|
|
log.error("tunnels select failed %s: %s", cur.status_code, cur.text)
|
|
return
|
|
rows = cur.json()
|
|
if not rows:
|
|
log.warning("tunnel row missing for subdomain=%s; sample retained", subdomain)
|
|
return
|
|
row = rows[0]
|
|
current = int(row.get("bytes_used") or 0)
|
|
new_used = current + delta
|
|
upd = client.patch(
|
|
f"{SUPABASE_URL}/rest/v1/tunnels?subdomain=eq.{subdomain}",
|
|
headers=_headers({"prefer": "return=minimal"}),
|
|
json={
|
|
"bytes_used": new_used,
|
|
"last_seen_at": datetime.now(timezone.utc).isoformat(),
|
|
},
|
|
timeout=10.0,
|
|
)
|
|
if upd.status_code >= 300:
|
|
log.error("tunnels update failed %s: %s", upd.status_code, upd.text)
|
|
return
|
|
|
|
enforce_quota(client, subdomain, row, new_used)
|
|
|
|
# Keep the edge active-state key fresh so the forward_auth gate has a hot,
|
|
# authoritative value (covers reactivation and TTL expiry of an active key).
|
|
quota = int(row.get("quota_bytes") or 0)
|
|
active_now = bool(row.get("is_active")) and (quota <= 0 or new_used < quota)
|
|
try:
|
|
rds.set(f"tunnel:active:{subdomain}", "1" if active_now else "0", ex=ACTIVE_TTL)
|
|
except Exception as e:
|
|
log.warning("redis active-state refresh failed for %s: %s", subdomain, e)
|
|
|
|
|
|
_schema_checked = False
|
|
|
|
|
|
def assert_api_schema(proxies: list[dict[str, Any]]) -> None:
|
|
"""Fail loud if the frps dashboard API stops returning the expected
|
|
camelCase traffic fields (e.g. after an frps upgrade), so the quota
|
|
accounting bug cannot silently return."""
|
|
global _schema_checked
|
|
if _schema_checked or not proxies:
|
|
return
|
|
sample = proxies[0]
|
|
if "todayTrafficIn" not in sample:
|
|
log.error(
|
|
"frps API schema mismatch: 'todayTrafficIn' missing from "
|
|
"proxy keys=%s; quota accounting is DISABLED until the field "
|
|
"names are fixed",
|
|
sorted(sample.keys()),
|
|
)
|
|
else:
|
|
log.info("frps API schema OK: todayTrafficIn present")
|
|
_schema_checked = True
|
|
|
|
|
|
def poll_once(client: httpx.Client) -> None:
|
|
proxies = fetch_proxies(client)
|
|
log.info("poll: %d proxies", len(proxies))
|
|
assert_api_schema(proxies)
|
|
for p in proxies:
|
|
name = p.get("name") or ""
|
|
if not name:
|
|
continue
|
|
traffic_in = int(p.get("todayTrafficIn") or 0)
|
|
traffic_out = int(p.get("todayTrafficOut") or 0)
|
|
total = traffic_in + traffic_out
|
|
prev = previous_total(name)
|
|
delta = total - prev
|
|
if delta < 0:
|
|
delta = total # daily counter reset
|
|
if delta == 0:
|
|
persist_total(name, total)
|
|
continue
|
|
try:
|
|
record_delta(client, name, delta)
|
|
persist_total(name, total)
|
|
log.info("recorded delta=%d for %s (total=%d)", delta, name, total)
|
|
except Exception as e:
|
|
log.error("record_delta failed for %s: %s", name, e)
|
|
|
|
|
|
def main() -> None:
|
|
log.info("bandwidth-worker starting; interval=%ss api=%s", POLL_INTERVAL, FRPS_API)
|
|
with httpx.Client() as client:
|
|
while True:
|
|
try:
|
|
poll_once(client)
|
|
except Exception as e:
|
|
log.error("poll failed: %s", e)
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|