"""JWT 工具函数与统一用户依赖。""" import os from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from sqlalchemy.ext.asyncio import AsyncSession from app.db.crud import get_user_by_id from app.db.database import get_db SECRET_KEY = os.getenv("JWT_SECRET_KEY", "meetspot-dev-secret") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = int(os.getenv("ACCESS_TOKEN_EXPIRE_DAYS", "7")) bearer_scheme = HTTPBearer(auto_error=False) def create_access_token(data: dict) -> str: """生成带过期时间的JWT。""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def decode_token(token: str) -> Optional[dict]: """解码并验证JWT。""" try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: return None async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), db: AsyncSession = Depends(get_db), ): """FastAPI 依赖:获取当前用户。""" if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="缺少认证信息" ) payload = decode_token(credentials.credentials) if not payload or "sub" not in payload: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="令牌无效") user = await get_user_by_id(db, payload["sub"]) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在") return user