mockapi/app/modules/admin/controller.py
2026-03-16 09:00:26 +00:00

705 lines
No EOL
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import json
from typing import Optional, Dict, Any
from datetime import datetime
from fastapi import APIRouter, Request, Form, Depends, HTTPException, status
from fastapi.responses import HTMLResponse, RedirectResponse, PlainTextResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.ext.asyncio import AsyncSession
from pathlib import Path
from app.core.config import settings
from app.core.middleware.auth_middleware import verify_password, get_password_hash
from app.core.database import get_db
from app.modules.endpoints.repositories.endpoint_repository import EndpointRepository
from app.modules.endpoints.schemas.endpoint_schema import EndpointCreate, EndpointUpdate, EndpointResponse
from app.modules.endpoints.services.route_service import RouteManager
from app.modules.oauth2.repositories import OAuthClientRepository, OAuthTokenRepository, OAuthUserRepository
from app.modules.oauth2.schemas import OAuthClientCreate, OAuthClientUpdate
import secrets
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/admin", tags=["admin"])
templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates"))
# Helper to get route manager from app state
def get_route_manager(request: Request) -> RouteManager:
return request.app.state.route_manager
# Helper to get repository
async def get_repository(db: AsyncSession = Depends(get_db)) -> EndpointRepository:
return EndpointRepository(db)
# Helper to get OAuth client repository
async def get_oauth_client_repository(db: AsyncSession = Depends(get_db)) -> OAuthClientRepository:
return OAuthClientRepository(db)
# Helper to get OAuth token repository
async def get_oauth_token_repository(db: AsyncSession = Depends(get_db)) -> OAuthTokenRepository:
return OAuthTokenRepository(db)
# Helper to get OAuth user repository
async def get_oauth_user_repository(db: AsyncSession = Depends(get_db)) -> OAuthUserRepository:
return OAuthUserRepository(db)
def prepare_client_data(
client_name: str,
redirect_uris: str,
grant_types: str,
scopes: str,
is_active: bool = True,
) -> dict:
"""Convert form data to client creation dict."""
import secrets
from app.core.middleware.auth_middleware import get_password_hash
client_id = secrets.token_urlsafe(16)
client_secret_plain = secrets.token_urlsafe(32)
# Hash the secret
client_secret_hash = get_password_hash(client_secret_plain)
# Parse comma-separated strings, strip whitespace
redirect_uris_list = [uri.strip() for uri in redirect_uris.split(",") if uri.strip()]
grant_types_list = [gt.strip() for gt in grant_types.split(",") if gt.strip()]
scopes_list = [scope.strip() for scope in scopes.split(",") if scope.strip()]
return {
"client_id": client_id,
"client_secret": client_secret_hash,
"name": client_name,
"redirect_uris": redirect_uris_list,
"grant_types": grant_types_list,
"scopes": scopes_list,
"is_active": is_active,
"_plain_secret": client_secret_plain, # temporary for display
}
# Pagination constants
PAGE_SIZE = 20
# Precomputed hash of admin password (bcrypt)
admin_password_hash = get_password_hash(settings.admin_password)
# ---------- Authentication Routes ----------
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request, error: Optional[str] = None):
"""Display login form."""
return templates.TemplateResponse(
"admin/login.html",
{"request": request, "error": error, "session": request.session}
)
@router.post("/login", response_class=RedirectResponse)
async def login(
request: Request,
username: str = Form(...),
password: str = Form(...),
):
"""Process login credentials and set session."""
if username != settings.admin_username:
logger.warning(f"Failed login attempt: invalid username '{username}'")
return RedirectResponse(
url="/admin/login?error=Invalid+credentials",
status_code=status.HTTP_302_FOUND
)
# Verify password against precomputed bcrypt hash
if not verify_password(password, admin_password_hash):
logger.warning(f"Failed login attempt: invalid password for '{username}'")
return RedirectResponse(
url="/admin/login?error=Invalid+credentials",
status_code=status.HTTP_302_FOUND
)
# Authentication successful, set session
request.session["username"] = username
logger.info(f"User '{username}' logged in")
return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND)
@router.get("/logout")
async def logout(request: Request):
"""Clear session and redirect to login."""
request.session.clear()
return RedirectResponse(url="/admin/login", status_code=status.HTTP_302_FOUND)
# ---------- Dashboard ----------
@router.get("/", response_class=HTMLResponse)
async def dashboard(
request: Request,
repository: EndpointRepository = Depends(get_repository),
route_manager: RouteManager = Depends(get_route_manager),
):
"""Admin dashboard with statistics."""
async with repository.session as session:
# Total endpoints
total_endpoints = await repository.get_all(limit=1000)
total_count = len(total_endpoints)
# Active endpoints
active_endpoints = await repository.get_active()
active_count = len(active_endpoints)
# Methods count (unique)
methods = set(e.method for e in total_endpoints)
methods_count = len(methods)
# Registered routes count
total_routes = len(route_manager.registered_routes)
stats = {
"total_endpoints": total_count,
"active_endpoints": active_count,
"methods_count": methods_count,
"total_routes": total_routes,
}
return templates.TemplateResponse(
"admin/dashboard.html",
{"request": request, "stats": stats, "session": request.session}
)
# ---------- Endpoints CRUD ----------
@router.get("/endpoints", response_class=HTMLResponse)
async def list_endpoints(
request: Request,
page: int = 1,
repository: EndpointRepository = Depends(get_repository),
):
"""List all endpoints with pagination."""
skip = (page - 1) * PAGE_SIZE
endpoints = await repository.get_all(skip=skip, limit=PAGE_SIZE)
total = len(await repository.get_all(limit=1000)) # naive count
total_pages = (total + PAGE_SIZE - 1) // PAGE_SIZE if total > 0 else 1
# Ensure page is within bounds
if page < 1 or (total_pages > 0 and page > total_pages):
return RedirectResponse(url="/admin/endpoints?page=1")
return templates.TemplateResponse(
"admin/endpoints.html",
{
"request": request,
"session": request.session,
"endpoints": endpoints,
"page": page,
"total_pages": total_pages,
"error": request.query_params.get("error"),
}
)
@router.get("/endpoints/new", response_class=HTMLResponse)
async def new_endpoint_form(request: Request):
"""Display form to create a new endpoint."""
return templates.TemplateResponse(
"admin/endpoint_form.html",
{
"request": request,
"session": request.session,
"action": "Create",
"form_action": "/admin/endpoints",
"endpoint": None,
"errors": {},
}
)
@router.post("/endpoints", response_class=RedirectResponse)
async def create_endpoint(
request: Request,
route: str = Form(...),
method: str = Form(...),
response_body: str = Form(...),
response_code: int = Form(200),
content_type: str = Form("application/json"),
is_active: bool = Form(True),
variables: str = Form("{}"),
headers: str = Form("{}"),
delay_ms: int = Form(0),
repository: EndpointRepository = Depends(get_repository),
route_manager: RouteManager = Depends(get_route_manager),
):
"""Create a new endpoint."""
# Parse JSON fields
try:
variables_dict = json.loads(variables) if variables else {}
except json.JSONDecodeError:
return RedirectResponse(
url="/admin/endpoints/new?error=Invalid+JSON+in+variables",
status_code=status.HTTP_302_FOUND
)
try:
headers_dict = json.loads(headers) if headers else {}
except json.JSONDecodeError:
return RedirectResponse(
url="/admin/endpoints/new?error=Invalid+JSON+in+headers",
status_code=status.HTTP_302_FOUND
)
# Validate using Pydantic schema
try:
endpoint_data = EndpointCreate(
route=route,
method=method,
response_body=response_body,
response_code=response_code,
content_type=content_type,
is_active=is_active,
variables=variables_dict,
headers=headers_dict,
delay_ms=delay_ms,
).dict()
except Exception as e:
logger.error(f"Validation error: {e}")
# Could pass errors to form, but for simplicity redirect with error
return RedirectResponse(
url="/admin/endpoints/new?error=" + str(e).replace(" ", "+"),
status_code=status.HTTP_302_FOUND
)
# Create endpoint
endpoint = await repository.create(endpoint_data)
if not endpoint:
return RedirectResponse(
url="/admin/endpoints/new?error=Failed+to+create+endpoint",
status_code=status.HTTP_302_FOUND
)
logger.info(f"Created endpoint {endpoint.id}: {method} {route}")
# Refresh routes to include new endpoint
await route_manager.refresh_routes()
return RedirectResponse(url="/admin/endpoints", status_code=status.HTTP_302_FOUND)
@router.get("/endpoints/{endpoint_id}", response_class=HTMLResponse)
async def edit_endpoint_form(
request: Request,
endpoint_id: int,
repository: EndpointRepository = Depends(get_repository),
):
"""Display form to edit an existing endpoint."""
endpoint = await repository.get_by_id(endpoint_id)
if not endpoint:
raise HTTPException(status_code=404, detail="Endpoint not found")
return templates.TemplateResponse(
"admin/endpoint_form.html",
{
"request": request,
"session": request.session,
"action": "Edit",
"form_action": f"/admin/endpoints/{endpoint_id}",
"endpoint": endpoint,
"errors": {},
}
)
@router.post("/endpoints/{endpoint_id}", response_class=RedirectResponse)
async def update_endpoint(
request: Request,
endpoint_id: int,
route: Optional[str] = Form(None),
method: Optional[str] = Form(None),
response_body: Optional[str] = Form(None),
response_code: Optional[int] = Form(None),
content_type: Optional[str] = Form(None),
is_active: Optional[bool] = Form(None),
variables: Optional[str] = Form(None),
headers: Optional[str] = Form(None),
delay_ms: Optional[int] = Form(None),
repository: EndpointRepository = Depends(get_repository),
route_manager: RouteManager = Depends(get_route_manager),
):
"""Update an existing endpoint."""
# Parse JSON fields if provided
variables_dict = None
if variables is not None:
try:
variables_dict = json.loads(variables) if variables else {}
except json.JSONDecodeError:
return RedirectResponse(
url=f"/admin/endpoints/{endpoint_id}?error=Invalid+JSON+in+variables",
status_code=status.HTTP_302_FOUND
)
headers_dict = None
if headers is not None:
try:
headers_dict = json.loads(headers) if headers else {}
except json.JSONDecodeError:
return RedirectResponse(
url=f"/admin/endpoints/{endpoint_id}?error=Invalid+JSON+in+headers",
status_code=status.HTTP_302_FOUND
)
# Build update dict (only include fields that are not None)
update_data = {}
if route is not None:
update_data["route"] = route
if method is not None:
update_data["method"] = method
if response_body is not None:
update_data["response_body"] = response_body
if response_code is not None:
update_data["response_code"] = response_code
if content_type is not None:
update_data["content_type"] = content_type
if is_active is not None:
update_data["is_active"] = is_active
if variables_dict is not None:
update_data["variables"] = variables_dict
if headers_dict is not None:
update_data["headers"] = headers_dict
if delay_ms is not None:
update_data["delay_ms"] = delay_ms
# Validate using Pydantic schema (optional fields)
try:
validated = EndpointUpdate(**update_data).dict(exclude_unset=True)
except Exception as e:
logger.error(f"Validation error: {e}")
return RedirectResponse(
url=f"/admin/endpoints/{endpoint_id}?error=" + str(e).replace(" ", "+"),
status_code=status.HTTP_302_FOUND
)
# Update endpoint
endpoint = await repository.update(endpoint_id, validated)
if not endpoint:
return RedirectResponse(
url=f"/admin/endpoints/{endpoint_id}?error=Failed+to+update+endpoint",
status_code=status.HTTP_302_FOUND
)
logger.info(f"Updated endpoint {endpoint_id}")
# Refresh routes to reflect changes
await route_manager.refresh_routes()
return RedirectResponse(url="/admin/endpoints", status_code=status.HTTP_302_FOUND)
@router.post("/endpoints/{endpoint_id}", response_class=RedirectResponse, include_in_schema=False)
async def delete_endpoint(
request: Request,
endpoint_id: int,
repository: EndpointRepository = Depends(get_repository),
route_manager: RouteManager = Depends(get_route_manager),
):
"""Delete an endpoint (handled via POST with _method=DELETE)."""
# Check if method override is present (HTML forms can't send DELETE)
form = await request.form()
if form.get("_method") != "DELETE":
# Fallback to update
return await update_endpoint(request, endpoint_id, repository=repository, route_manager=route_manager)
success = await repository.delete(endpoint_id)
if not success:
return RedirectResponse(
url=f"/admin/endpoints?error=Failed+to+delete+endpoint",
status_code=status.HTTP_302_FOUND
)
logger.info(f"Deleted endpoint {endpoint_id}")
# Refresh routes to remove deleted endpoint
await route_manager.refresh_routes()
return RedirectResponse(url="/admin/endpoints", status_code=status.HTTP_302_FOUND)
# ---------- OAuth2 Management Routes ----------
@router.get("/oauth/clients", response_class=HTMLResponse, tags=["admin-oauth"])
async def list_oauth_clients(
request: Request,
page: int = 1,
repository: OAuthClientRepository = Depends(get_oauth_client_repository),
):
"""List all OAuth clients with pagination."""
skip = (page - 1) * PAGE_SIZE
clients = await repository.get_all(skip=skip, limit=PAGE_SIZE)
total = len(await repository.get_all(limit=1000)) # naive count
total_pages = (total + PAGE_SIZE - 1) // PAGE_SIZE if total > 0 else 1
# Ensure page is within bounds
if page < 1 or (total_pages > 0 and page > total_pages):
return RedirectResponse(url="/admin/oauth/clients?page=1")
return templates.TemplateResponse(
"admin/oauth/clients.html",
{
"request": request,
"session": request.session,
"clients": clients,
"page": page,
"total_pages": total_pages,
"error": request.query_params.get("error"),
}
)
@router.get("/oauth/clients/new", response_class=HTMLResponse, tags=["admin-oauth"])
async def new_oauth_client_form(request: Request):
"""Display form to create a new OAuth client."""
return templates.TemplateResponse(
"admin/oauth/client_form.html",
{
"request": request,
"session": request.session,
"action": "Create",
"form_action": "/admin/oauth/clients",
"client": None,
"errors": {},
"error": request.query_params.get("error"),
}
)
@router.post("/oauth/clients", response_class=RedirectResponse, tags=["admin-oauth"])
async def create_oauth_client(
request: Request,
client_name: str = Form(...),
redirect_uris: str = Form(...),
grant_types: str = Form(...),
scopes: str = Form(...),
is_active: bool = Form(True),
repository: OAuthClientRepository = Depends(get_oauth_client_repository),
):
"""Create a new OAuth client."""
try:
# Prepare client data with generated credentials
data = prepare_client_data(
client_name=client_name,
redirect_uris=redirect_uris,
grant_types=grant_types,
scopes=scopes,
is_active=is_active,
)
plain_secret = data.pop("_plain_secret")
# Validate using Pydantic schema
client_data = OAuthClientCreate(**data).dict()
except Exception as e:
logger.error(f"Validation error: {e}")
return RedirectResponse(
url="/admin/oauth/clients/new?error=" + str(e).replace(" ", "+"),
status_code=status.HTTP_302_FOUND
)
# Create client
client = await repository.create(client_data)
if not client:
return RedirectResponse(
url="/admin/oauth/clients/new?error=Failed+to+create+client",
status_code=status.HTTP_302_FOUND
)
logger.info(f"Created OAuth client {client.client_id}")
# TODO: Display client secret only once (store in flash message)
# For now, redirect to list with success message
return RedirectResponse(url="/admin/oauth/clients", status_code=status.HTTP_302_FOUND)
@router.get("/oauth/clients/{client_id}/edit", response_class=HTMLResponse, tags=["admin-oauth"])
async def edit_oauth_client_form(
request: Request,
client_id: int,
repository: OAuthClientRepository = Depends(get_oauth_client_repository),
):
"""Display form to edit an existing OAuth client."""
client = await repository.get_by_id(client_id)
if not client:
raise HTTPException(status_code=404, detail="Client not found")
return templates.TemplateResponse(
"admin/oauth/client_form.html",
{
"request": request,
"session": request.session,
"action": "Edit",
"form_action": f"/admin/oauth/clients/{client_id}",
"client": client,
"errors": {},
"error": request.query_params.get("error"),
}
)
@router.post("/oauth/clients/{client_id}", response_class=RedirectResponse, tags=["admin-oauth"])
async def update_oauth_client(
request: Request,
client_id: int,
client_name: Optional[str] = Form(None),
redirect_uris: Optional[str] = Form(None),
grant_types: Optional[str] = Form(None),
scopes: Optional[str] = Form(None),
is_active: Optional[bool] = Form(None),
repository: OAuthClientRepository = Depends(get_oauth_client_repository),
):
"""Update an existing OAuth client."""
# Build update dict
update_data = {}
if client_name is not None:
update_data["name"] = client_name
if redirect_uris is not None:
update_data["redirect_uris"] = [uri.strip() for uri in redirect_uris.split(",") if uri.strip()]
if grant_types is not None:
update_data["grant_types"] = [gt.strip() for gt in grant_types.split(",") if gt.strip()]
if scopes is not None:
update_data["scopes"] = [scope.strip() for scope in scopes.split(",") if scope.strip()]
if is_active is not None:
update_data["is_active"] = is_active
if not update_data:
return RedirectResponse(url=f"/admin/oauth/clients/{client_id}/edit", status_code=status.HTTP_302_FOUND)
# Validate using Pydantic schema (optional fields)
try:
validated = OAuthClientUpdate(**update_data).dict(exclude_unset=True)
except Exception as e:
logger.error(f"Validation error: {e}")
return RedirectResponse(
url=f"/admin/oauth/clients/{client_id}/edit?error=" + str(e).replace(" ", "+"),
status_code=status.HTTP_302_FOUND
)
# Update client
client = await repository.update(client_id, validated)
if not client:
return RedirectResponse(
url=f"/admin/oauth/clients/{client_id}/edit?error=Failed+to+update+client",
status_code=status.HTTP_302_FOUND
)
logger.info(f"Updated OAuth client {client_id}")
return RedirectResponse(url="/admin/oauth/clients", status_code=status.HTTP_302_FOUND)
@router.post("/oauth/clients/{client_id}/delete", response_class=RedirectResponse, tags=["admin-oauth"])
async def delete_oauth_client(
request: Request,
client_id: int,
repository: OAuthClientRepository = Depends(get_oauth_client_repository),
):
"""Delete a client (soft delete via is_active=False)."""
client = await repository.update(client_id, {"is_active": False})
if not client:
return RedirectResponse(
url="/admin/oauth/clients?error=Failed+to+delete+client",
status_code=status.HTTP_302_FOUND
)
logger.info(f"Soft-deleted OAuth client {client_id}")
return RedirectResponse(url="/admin/oauth/clients", status_code=status.HTTP_302_FOUND)
@router.get("/oauth/tokens", response_class=HTMLResponse, tags=["admin-oauth"])
async def list_oauth_tokens(
request: Request,
page: int = 1,
client_id: Optional[str] = None,
user_id: Optional[int] = None,
active: Optional[bool] = None,
repository: OAuthTokenRepository = Depends(get_oauth_token_repository),
):
"""List OAuth tokens with filtering (client, user, active/expired)."""
# Fetch all tokens (limited to reasonable count) for filtering
all_tokens = await repository.get_all(limit=1000)
# Apply filters
filtered = []
for token in all_tokens:
if client_id is not None and token.client_id != client_id:
continue
if user_id is not None and token.user_id != user_id:
continue
if active is not None:
is_expired = token.expires_at < datetime.utcnow()
if active and is_expired:
continue
if not active and not is_expired:
continue
filtered.append(token)
# Pagination after filtering
total = len(filtered)
total_pages = (total + PAGE_SIZE - 1) // PAGE_SIZE if total > 0 else 1
# Ensure page is within bounds
if page < 1 or (total_pages > 0 and page > total_pages):
return RedirectResponse(url="/admin/oauth/tokens?page=1")
skip = (page - 1) * PAGE_SIZE
tokens = filtered[skip:skip + PAGE_SIZE]
return templates.TemplateResponse(
"admin/oauth/tokens.html",
{
"request": request,
"session": request.session,
"tokens": tokens,
"page": page,
"total_pages": total_pages,
"client_id": client_id,
"user_id": user_id,
"active": active,
"now": datetime.utcnow(),
"error": request.query_params.get("error"),
}
)
@router.post("/oauth/tokens/{token_id}/revoke", response_class=RedirectResponse, tags=["admin-oauth"])
async def revoke_oauth_token(
request: Request,
token_id: int,
repository: OAuthTokenRepository = Depends(get_oauth_token_repository),
):
"""Revoke token (delete from database)."""
success = await repository.delete(token_id)
if not success:
return RedirectResponse(
url="/admin/oauth/tokens?error=Failed+to+revoke+token",
status_code=status.HTTP_302_FOUND
)
logger.info(f"Revoked OAuth token {token_id}")
return RedirectResponse(url="/admin/oauth/tokens", status_code=status.HTTP_302_FOUND)
@router.get("/oauth/users", response_class=HTMLResponse, tags=["admin-oauth"])
async def list_oauth_users(
request: Request,
page: int = 1,
repository: OAuthUserRepository = Depends(get_oauth_user_repository),
):
"""List OAuth users."""
skip = (page - 1) * PAGE_SIZE
users = await repository.get_all(skip=skip, limit=PAGE_SIZE)
total = len(await repository.get_all(limit=1000)) # naive count
total_pages = (total + PAGE_SIZE - 1) // PAGE_SIZE if total > 0 else 1
# Ensure page is within bounds
if page < 1 or (total_pages > 0 and page > total_pages):
return RedirectResponse(url="/admin/oauth/users?page=1")
return templates.TemplateResponse(
"admin/oauth/users.html",
{
"request": request,
"session": request.session,
"users": users,
"page": page,
"total_pages": total_pages,
"error": request.query_params.get("error"),
}
)
@router.post("/oauth/users/{user_id}/toggle", response_class=RedirectResponse, tags=["admin-oauth"])
async def toggle_oauth_user(
request: Request,
user_id: int,
repository: OAuthUserRepository = Depends(get_oauth_user_repository),
):
"""Toggle user active status."""
user = await repository.get_by_id(user_id)
if not user:
return RedirectResponse(
url="/admin/oauth/users?error=User+not+found",
status_code=status.HTTP_302_FOUND
)
new_status = not user.is_active
updated = await repository.update(user_id, {"is_active": new_status})
if not updated:
return RedirectResponse(
url="/admin/oauth/users?error=Failed+to+toggle+user",
status_code=status.HTTP_302_FOUND
)
logger.info(f"Toggled OAuth user {user_id} active status to {new_status}")
return RedirectResponse(url="/admin/oauth/users", status_code=status.HTTP_302_FOUND)