138 lines
No EOL
4.5 KiB
Python
138 lines
No EOL
4.5 KiB
Python
"""
|
|
FastAPI dependencies for OAuth2 authentication and authorization.
|
|
"""
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
from fastapi import Depends, HTTPException, status, Request
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from .services import TokenService, ScopeService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_current_token_payload(
|
|
request: Request,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Dependency that extracts and validates a Bearer token from the Authorization header.
|
|
|
|
Returns the decoded JWT payload if the token is valid.
|
|
|
|
Raises:
|
|
HTTPException with status 401 if token is missing or invalid.
|
|
"""
|
|
auth_header = request.headers.get("Authorization")
|
|
if not auth_header:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing Authorization header",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
parts = auth_header.split()
|
|
if len(parts) != 2 or parts[0].lower() != "bearer":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid Authorization header format. Expected: Bearer <token>",
|
|
headers={"WWW-Authenticate": "Bearer error=\"invalid_token\""},
|
|
)
|
|
|
|
token = parts[1]
|
|
|
|
# Get database session from request app state
|
|
if not hasattr(request.app.state, "session_factory"):
|
|
logger.error("Application session_factory not found in app.state")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Internal server error",
|
|
)
|
|
|
|
async_session_factory = request.app.state.session_factory
|
|
async with async_session_factory() as session:
|
|
token_service = TokenService(session)
|
|
try:
|
|
payload = await token_service.verify_token(token)
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during token validation: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Internal server error",
|
|
)
|
|
|
|
return payload
|
|
|
|
|
|
async def get_current_token_scopes(
|
|
payload: Dict[str, Any] = Depends(get_current_token_payload),
|
|
) -> list[str]:
|
|
"""
|
|
Dependency that extracts scopes from the validated token payload.
|
|
"""
|
|
return payload.get("scopes", [])
|
|
|
|
|
|
async def require_scope(
|
|
required_scope: str,
|
|
token_scopes: list[str] = Depends(get_current_token_scopes),
|
|
) -> None:
|
|
"""
|
|
Dependency that ensures the token has the required scope.
|
|
|
|
Args:
|
|
required_scope: The scope that must be present.
|
|
|
|
Raises:
|
|
HTTPException with status 403 if scope is missing.
|
|
"""
|
|
if required_scope not in token_scopes:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Insufficient scope",
|
|
headers={"WWW-Authenticate": f"Bearer error=\"insufficient_scope\", scope=\"{required_scope}\""},
|
|
)
|
|
|
|
|
|
async def require_any_scope(
|
|
required_scopes: list[str],
|
|
token_scopes: list[str] = Depends(get_current_token_scopes),
|
|
) -> None:
|
|
"""
|
|
Dependency that ensures the token has at least one of the required scopes.
|
|
|
|
Args:
|
|
required_scopes: List of scopes, at least one must be present.
|
|
|
|
Raises:
|
|
HTTPException with status 403 if none of the scopes are present.
|
|
"""
|
|
if not any(scope in token_scopes for scope in required_scopes):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Insufficient scope",
|
|
headers={"WWW-Authenticate": f"Bearer error=\"insufficient_scope\", scope=\"{' '.join(required_scopes)}\""},
|
|
)
|
|
|
|
|
|
async def require_all_scopes(
|
|
required_scopes: list[str],
|
|
token_scopes: list[str] = Depends(get_current_token_scopes),
|
|
) -> None:
|
|
"""
|
|
Dependency that ensures the token has all of the required scopes.
|
|
|
|
Args:
|
|
required_scopes: List of scopes that must all be present.
|
|
|
|
Raises:
|
|
HTTPException with status 403 if any scope is missing.
|
|
"""
|
|
for scope in required_scopes:
|
|
if scope not in token_scopes:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Missing required scope: {scope}",
|
|
headers={"WWW-Authenticate": f"Bearer error=\"insufficient_scope\", scope=\"{' '.join(required_scopes)}\""},
|
|
) |