from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Set, Optional, Dict
import asyncio
import json
import time
import aiomysql

clients: Set[WebSocket] = set()
email_clients: Dict[str, Set[WebSocket]] = {}

# uvicorn server_fastapi:app --host 0.0.0.0 --port 8000 --reload
app = FastAPI()
clients: Set[WebSocket] = set()
db_pool: Optional[aiomysql.Pool] = None

ALLOWED_ORIGINS = {
    "https://b52-9979.online","https://football.b52-9979.online"
    # "http://localhost:8000",
}

ADMIN_EMAILS = {"xx"}
ADMIN_MAX = 3
USER_MAX = 1
NO_KICK_EMAILS = {
    "truc_nt","dang"
}

@app.on_event("startup")
async def startup_event():
    global db_pool
    db_pool = await aiomysql.create_pool(
        host="127.0.0.1",       
        port=3306,
        user="dang",       
        password="dang",
        db="b52",
        autocommit=True,
        minsize=1,
        maxsize=5,
    )
    print("? DB pool created")
    
@app.on_event("shutdown")
async def shutdown_event():
    global db_pool
    if db_pool is not None:
        db_pool.close()
        await db_pool.wait_closed()
        db_pool = None
        print("? DB pool closed")

async def session_exists(session_token: str) -> bool:
    global db_pool
    if db_pool is None:
        raise RuntimeError("DB pool not initialized")

    async with db_pool.acquire() as conn:
        async with conn.cursor(aiomysql.DictCursor) as cur:
            await cur.execute(
                "SELECT 1 FROM users WHERE session_token = %s LIMIT 1",
                (session_token,)
            )
            row = await cur.fetchone()
            return row is not None

data = {}

async def send_safe(ws: WebSocket, data: str):
    try:
        await ws.send_text(data)
    except Exception:
        pass

async def broadcast_safe(data: str, *, exclude: WebSocket = None):
    tasks = []
    for c in list(clients):
        if exclude is not None and c is exclude:
            continue
        tasks.append(send_safe(c, data))
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)

def max_sessions(email: str) -> int:
    return ADMIN_MAX if email in ADMIN_EMAILS else USER_MAX

def is_no_kick(email: str) -> bool:
    return (email or "").strip().lower() in NO_KICK_EMAILS

async def kick_old(email: str, new_ws: WebSocket):
    ws_set = email_clients.get(email)
    if not ws_set:
        return

    # tránh kick chính nó
    ws_set.discard(new_ws)

    limit = max_sessions(email)
    # nếu còn dưới limit thì khỏi kick
    if len(ws_set) < limit:
        return

    # kick bớt để chừa chỗ cho new_ws
    need_kick = len(ws_set) - (limit - 1)
    to_kick = list(ws_set)[:need_kick]  # set không có thứ tự

    payload = json.dumps({
        "type": "kick",
        "reason": "another_session_connected" if limit == 1 else "too_many_sessions",
        "email": email,
        "ts": time.time()
    }, ensure_ascii=False, separators=(",", ":"))

    for old_ws in to_kick:
        await send_safe(old_ws, payload)
        try:
            await old_ws.close(code=1008)
        except Exception:
            pass

        clients.discard(old_ws)
        ws_set.discard(old_ws)

    # cleanup
    if not ws_set:
        email_clients.pop(email, None)


@app.websocket("/banh/ws")
async def ws_endpoint(ws: WebSocket):
    await ws.accept()
    origin = ws.headers.get("origin")
    if origin not in ALLOWED_ORIGINS:
        await ws.close(code=1008)
        return

    token = ws.query_params.get("token")
    email = ws.query_params.get("email")

    if token is None or email is None:
        await ws.close(code=1008)
        return

    if token != "danimhdmaet" and not await session_exists(token):
        await ws.close(code=1008)
        return
    
    if not is_no_kick(email):
        await kick_old(email , ws)
        
    email_clients.setdefault(email, set()).add(ws)
    clients.add(ws)
    try:
        tick_task = asyncio.create_task(ticker(ws))
        while True:
            msg = await ws.receive_text()
            try:
                obj = json.loads(msg)
            except json.JSONDecodeError:
                await send_safe(ws, json.dumps({
                    "type": "error",
                    "error": "invalid_json",
                    "raw": msg,
                }, ensure_ascii=False))
                continue

            if not isinstance(obj, dict):
                await send_safe(ws, json.dumps({
                    "type": "error",
                    "error": "json_not_object",
                    "raw": obj,
                }, ensure_ascii=False))
                continue

            msg_type = obj.get("type")

            if msg_type == "getdata":
                text = json.dumps(
                    {"type": "matches", "data": obj.get("data")},
                    ensure_ascii=False, separators=(",", ":")
                )
                await send_safe(ws, text)
                await broadcast_safe(text)
            if msg_type == "getdatatest":
                text = json.dumps(
                    {"type": "matches_test", "data": obj.get("data")},
                    ensure_ascii=False, separators=(",", ":")
                )
                await send_safe(ws, text)
                await broadcast_safe(text)
            
            # await send_safe(ws, json.dumps({"type": "echo", "data": msg, "ts": time.time()}))
    except WebSocketDisconnect:
        pass
    finally:
        clients.discard(ws)
        tick_task.cancel()

async def ticker(ws: WebSocket):
    i = 0
    while True:
        await send_safe(ws, json.dumps({"type": "ping", "n": i}))
        i += 1
        await asyncio.sleep(12)

#
@app.post("/broadcast")
async def http_broadcast(payload: dict):
    data = json.dumps({"type": "broadcast", "payload": payload})
    await asyncio.gather(*(send_safe(c, data) for c in list(clients)))
    return {"ok": True, "sent_to": len(clients)}
