46 lines
1.2 KiB
Python
46 lines
1.2 KiB
Python
"""stripe-stub: pretends every checkout succeeds and forwards test webhooks."""
|
|
|
|
import os
|
|
import uuid
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from fastapi import FastAPI, Request
|
|
|
|
DOMAIN = os.environ.get("DOMAIN", "linumiq.net")
|
|
EDGE_URL = os.environ.get(
|
|
"EDGE_STRIPE_WEBHOOK_URL",
|
|
"http://supabase-edge-functions:9000/stripe-webhook",
|
|
)
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
@app.post("/v1/checkout/sessions")
|
|
async def create_session(_request: Request) -> dict[str, str]:
|
|
session_id = f"stub_{uuid.uuid4().hex}"
|
|
return {
|
|
"id": session_id,
|
|
"url": f"https://app.{DOMAIN}/billing/success?session={session_id}",
|
|
}
|
|
|
|
|
|
@app.post("/v1/webhooks/test")
|
|
async def forward_test(request: Request) -> Any:
|
|
body = await request.body()
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.post(
|
|
EDGE_URL,
|
|
content=body,
|
|
headers={"content-type": request.headers.get("content-type", "application/json")},
|
|
)
|
|
try:
|
|
return resp.json()
|
|
except Exception:
|
|
return {"upstream_status": resp.status_code, "body": resp.text}
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> dict[str, str]:
|
|
return {"status": "ok"}
|