109 lines
2.8 KiB
Python
109 lines
2.8 KiB
Python
"""
|
|
API 依赖注入
|
|
|
|
定义 FastAPI 依赖项。
|
|
"""
|
|
|
|
from typing import Annotated
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.exceptions import (
|
|
TokenError,
|
|
UserDisabledError,
|
|
)
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
from app.services.auth import AuthService
|
|
|
|
# HTTP Bearer 认证方案
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
async def get_auth_service(
|
|
session: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> AuthService:
|
|
"""获取认证服务实例"""
|
|
return AuthService(session)
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: Annotated[
|
|
HTTPAuthorizationCredentials | None,
|
|
Depends(security),
|
|
],
|
|
auth_service: Annotated[AuthService, Depends(get_auth_service)],
|
|
) -> User:
|
|
"""
|
|
获取当前认证用户
|
|
|
|
从请求头的 Bearer Token 中解析用户信息。
|
|
|
|
Raises:
|
|
HTTPException 401: 未认证
|
|
HTTPException 403: 用户被禁用
|
|
"""
|
|
if not credentials:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="未提供认证信息",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
try:
|
|
user = await auth_service.get_current_user(credentials.credentials)
|
|
return user
|
|
except TokenError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=e.message,
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
except UserDisabledError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=e.message,
|
|
)
|
|
|
|
|
|
async def get_current_active_user(
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
) -> User:
|
|
"""
|
|
获取当前活跃用户
|
|
|
|
确保用户处于激活状态。
|
|
"""
|
|
if not current_user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="账户已被禁用",
|
|
)
|
|
return current_user
|
|
|
|
|
|
async def get_current_superuser(
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
) -> User:
|
|
"""
|
|
获取当前超级管理员
|
|
|
|
确保用户具有超级管理员权限。
|
|
"""
|
|
if not current_user.is_superuser:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="需要管理员权限",
|
|
)
|
|
return current_user
|
|
|
|
|
|
# 类型别名,简化依赖注入声明
|
|
DbSession = Annotated[AsyncSession, Depends(get_db)]
|
|
CurrentUser = Annotated[User, Depends(get_current_user)]
|
|
ActiveUser = Annotated[User, Depends(get_current_active_user)]
|
|
SuperUser = Annotated[User, Depends(get_current_superuser)]
|
|
|