""" Flask application — MariaDB HA connection monitor. Serves the web dashboard and bridges engine.py events to the browser via Server-Sent Events (SSE). Single global DatabaseEngine instance. """ import json import queue import subprocess import urllib.request import urllib.error import base64 from flask import Flask, render_template, request, jsonify, Response from engine import DatabaseEngine app = Flask(__name__) engine = DatabaseEngine() # ── Page Routes ────────────────────────────────────────── @app.route("/") def index(): """Serve the main dashboard — MariaDB HA Live Demo.""" return render_template("dashboard.html") # ── API Routes ─────────────────────────────────────────── @app.route("/api/initialize", methods=["POST"]) def api_initialize(): """Create the test database and table.""" config = request.get_json(silent=True) or {} _apply_config(config) result = engine.initialize() if "error" in result: return jsonify(result), 500 return jsonify({"status": "initialized"}) @app.route("/api/start", methods=["POST"]) def api_start(): """Begin the transaction monitoring loop.""" config = request.get_json(silent=True) or {} _apply_config(config) if not engine.is_running: engine.start() return jsonify({"status": "started"}) @app.route("/api/stop", methods=["POST"]) def api_stop(): """Stop the monitoring loop.""" engine.stop() return jsonify({"status": "stopped"}) @app.route("/api/state", methods=["GET"]) def api_state(): """Return current engine state (for polling fallback).""" return jsonify(engine.get_state()) # ── Demo Dashboard API ─────────────────────────────────── @app.route("/api/maxscale/servers") def api_maxscale_servers(): """Proxy MaxScale REST API — returns server status for dashboard.""" try: req = urllib.request.Request("http://127.0.0.1:8989/v1/servers") auth = base64.b64encode(b"monitor:monitor123").decode() req.add_header("Authorization", f"Basic {auth}") with urllib.request.urlopen(req, timeout=3) as resp: data = json.loads(resp.read()) servers = [] for s in data.get("data", []): attr = s["attributes"] state = attr["state"] # Enhance: if state is just "Running" but server has slave GTIDs, mark as Slave if state.strip() == "Running": gtid = attr.get("gtid_current_pos") or "" # If GTID has multiple domains (e.g., "1-1-5,2-4-3"), it was a slave if "," in gtid: state = "Slave, Running" servers.append({ "id": s["id"], "state": state, "connections": attr["parameters"].get("connections", 0), "address": attr["parameters"]["address"], "port": attr["parameters"]["port"], }) return jsonify({"servers": servers}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/docker/status") def api_docker_status(): """Return running Docker container status for dashboard.""" try: result = subprocess.run( ["docker", "ps", "--format", "{{.Names}}||{{.Status}}", "--filter", "name=mariadb", "--filter", "name=maxscale"], capture_output=True, text=True, timeout=5, ) containers = [] for line in result.stdout.strip().split("\n"): if "||" in line: name, status = line.split("||", 1) running = "Up" in status containers.append({"name": name, "running": running, "status": status}) return jsonify({"containers": containers}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/demo/action", methods=["POST"]) def api_demo_action(): """Execute a demo action: kill or recover a node.""" data = request.get_json(silent=True) or {} action = data.get("action", "") node = data.get("node", "") allowed_nodes = ["mariadb1", "mariadb2", "mariadb3", "mariadb4", "mariadb5", "mariadb6", "maxscale1", "maxscale2"] if node and node not in allowed_nodes: return jsonify({"error": f"Unknown node: {node}"}), 400 def _find_dc_master(dc_servers): """Query MaxScale REST API to find the current master for a DC. dc_servers: list of server IDs like ['server1','server2','server3']""" try: req = urllib.request.Request("http://127.0.0.1:8989/v1/servers") auth = base64.b64encode(b"monitor:monitor123").decode() req.add_header("Authorization", f"Basic {auth}") with urllib.request.urlopen(req, timeout=3) as resp: data = json.loads(resp.read()) for s in data.get("data", []): if s["id"] in dc_servers and "Master" in s["attributes"]["state"]: return s["id"].replace("server", "mariadb") except Exception: pass return None try: if action == "kill" and node: subprocess.run( ["docker", "stop", "-t", "1", node], capture_output=True, text=True, timeout=5, ) return jsonify({"status": "killed", "node": node}) elif action == "recover" and node: subprocess.run( ["docker", "start", node], capture_output=True, text=True, timeout=10, ) return jsonify({"status": "recovered", "node": node}) elif action == "kill-dc1": master = _find_dc_master(["server1", "server2", "server3"]) target = master if master else "mariadb1" subprocess.run( ["docker", "stop", "-t", "1", target], capture_output=True, text=True, timeout=5, ) return jsonify({"status": "killed", "node": target}) elif action == "kill-dc2": master = _find_dc_master(["server4", "server5", "server6"]) target = master if master else "mariadb4" subprocess.run( ["docker", "stop", "-t", "1", target], capture_output=True, text=True, timeout=5, ) return jsonify({"status": "killed", "node": target}) elif action == "recover-dc1": subprocess.run( ["docker", "start", "mariadb1"], capture_output=True, text=True, timeout=10, ) return jsonify({"status": "recovered", "node": "mariadb1"}) elif action == "recover-dc2": subprocess.run( ["docker", "start", "mariadb4"], capture_output=True, text=True, timeout=10, ) return jsonify({"status": "recovered", "node": "mariadb4"}) elif action == "recover-all": for n in allowed_nodes: subprocess.run( ["docker", "start", n], capture_output=True, text=True, timeout=5, ) return jsonify({"status": "recovered", "node": "all"}) else: return jsonify({"error": f"Unknown action: {action}"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/demo/mode", methods=["POST"]) def api_demo_mode(): """Switch demo architecture mode: 'dr' (DC2 replicas of DC1) or 'standalone' (independent DCs).""" data = request.get_json(silent=True) or {} mode = data.get("mode", "dr") if mode not in ("dr", "standalone"): return jsonify({"error": f"Unknown mode: {mode}"}), 400 try: if mode == "dr": # DC2 replicas follow DC1 master (mariadb1) — DR site master_ip = subprocess.run( ["docker", "inspect", "-f", "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}", "mariadb1"], capture_output=True, text=True, timeout=5, ).stdout.strip() for port in [3310, 3311, 3312]: subprocess.run( ["mariadb", "-h", "127.0.0.1", "-P", str(port), "-u", "root", "-pmonitor123", "--skip-ssl", "-e", f"STOP SLAVE; SET GLOBAL read_only=1; CHANGE MASTER TO MASTER_HOST='{master_ip}', MASTER_PORT=3306, MASTER_USER='repl', MASTER_PASSWORD='repl123', MASTER_USE_GTID=slave_pos; START SLAVE;"], capture_output=True, text=True, timeout=10, ) else: # standalone: DC2 has its own master (mariadb4 becomes independent master) dc2_master_ip = subprocess.run( ["docker", "inspect", "-f", "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}", "mariadb4"], capture_output=True, text=True, timeout=5, ).stdout.strip() # mariadb4: stop replication, become standalone master (writable) subprocess.run( ["mariadb", "-h", "127.0.0.1", "-P", "3310", "-u", "root", "-pmonitor123", "--skip-ssl", "-e", "STOP SLAVE; RESET SLAVE ALL; SET GLOBAL read_only=0; CREATE USER IF NOT EXISTS 'repl'@'%' IDENTIFIED BY 'repl123'; GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%'; FLUSH PRIVILEGES;"], capture_output=True, text=True, timeout=10, ) # mariadb5-6: replicate from mariadb4 for port in [3311, 3312]: subprocess.run( ["mariadb", "-h", "127.0.0.1", "-P", str(port), "-u", "root", "-pmonitor123", "--skip-ssl", "-e", f"STOP SLAVE; CHANGE MASTER TO MASTER_HOST='{dc2_master_ip}', MASTER_PORT=3306, MASTER_USER='repl', MASTER_PASSWORD='repl123', MASTER_USE_GTID=slave_pos; START SLAVE;"], capture_output=True, text=True, timeout=10, ) return jsonify({"mode": mode, "status": "ok"}) except Exception as e: return jsonify({"error": str(e)}), 500 # ── MaxScale GUI Reverse Proxy ──────────────────────────── # Proxies /maxscale/* → MaxScale REST API + GUI on port 8989 # Rewrites paths in HTML, JS, and CSS so the Vue.js GUI works behind /maxscale @app.route("/maxscale", defaults={"subpath": ""}) @app.route("/maxscale/", defaults={"subpath": ""}) @app.route("/maxscale/") def proxy_maxscale(subpath): """Reverse proxy to MaxScale with content path rewriting.""" target_url = f"http://127.0.0.1:8989/{subpath}" if request.query_string: target_url += "?" + request.query_string.decode("utf-8") body = request.get_data() headers = {} for key, value in request.headers: if key.lower() not in ("host", "connection"): headers[key] = value try: headers["Accept-Encoding"] = "identity" # prevent gzip so we can rewrite req = urllib.request.Request(target_url, data=body, headers=headers, method=request.method) with urllib.request.urlopen(req, timeout=30) as resp: content = resp.read() content_type = resp.headers.get("Content-Type", "") # Rewrite paths in text-based responses (and any JS/CSS regardless of content-type) is_text = any(t in content_type for t in ("text/html", "application/javascript", "text/css", "text/javascript")) ext = subpath.rsplit(".", 1)[-1].lower() if "." in subpath else "" if is_text or ext in ("js", "css", "html"): text = content.decode("utf-8", errors="replace") # Rewrite webpack public path to be relative if '.p="/"' in text: text = text.replace('.p="/"', '.p="./"') # Rewrite axios base URL for API calls if 'baseURL:"/"' in text: text = text.replace('baseURL:"/"', 'baseURL:"/maxscale/"') # Fix paths that already contain "maxscale" (would double with baseURL) text = text.replace('"/maxscale?fields[maxscale]=version"', '"/?fields[maxscale]=version"') text = text.replace('"/maxscale/', '"/maxscale/') # no-op, just for clarity # Rewrite paths in HTML only (JS/CSS use baseURL and .p) if "text/html" in content_type: text = text.replace('"/js/', '"/maxscale/js/') text = text.replace("'/js/", "'/maxscale/js/") text = text.replace('"/css/', '"/maxscale/css/') text = text.replace("'/css/", "'/maxscale/css/") text = text.replace('"/img/', '"/maxscale/img/') text = text.replace("'/img/", "'/maxscale/img/") text = text.replace('"/fonts/', '"/maxscale/fonts/') text = text.replace("'/fonts/", "'/maxscale/fonts/") text = text.replace('"/v1/', '"/maxscale/v1/') text = text.replace("'/v1/", "'/maxscale/v1/") text = text.replace('href="/favicon', 'href="/maxscale/favicon') text = text.replace('href="/apple', 'href="/maxscale/apple') text = text.replace('href="/safari', 'href="/maxscale/safari') content = text.encode("utf-8") response_headers = {} for key, value in resp.headers.items(): if key.lower() not in ("transfer-encoding", "connection", "content-length"): response_headers[key] = value return Response(content, status=resp.status, headers=response_headers) except urllib.error.HTTPError as e: return Response(e.read(), status=e.code) except Exception as e: return jsonify({"error": str(e)}), 502 # ── SSE Stream ─────────────────────────────────────────── @app.route("/api/events") def api_events(): """Server-Sent Events stream — pushes transaction results to browser.""" event_queue = queue.Queue() def on_event(event): event_queue.put(event) engine.on_event(on_event) def generate(): try: # Send initial state yield _sse_event("connected", engine.get_state()) while engine.is_running or not event_queue.empty(): try: event = event_queue.get(timeout=0.5) yield _sse_event(event["type"], event) except queue.Empty: # Keep-alive comment to prevent proxy timeouts yield ": keepalive\n\n" yield _sse_event("stopped", {"status": "stopped"}) except GeneratorExit: pass finally: engine.remove_callback(on_event) return Response( generate(), mimetype="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # ── Helpers ────────────────────────────────────────────── def _apply_config(config: dict) -> None: """Apply connection settings from request body to engine.""" if "host" in config: engine.host = config["host"] if "port" in config: engine.port = int(config["port"]) if "user" in config: engine.user = config["user"] if "password" in config: engine.password = config["password"] if "table_name" in config: engine.table_name = config["table_name"] def _sse_event(event_type: str, data: dict) -> str: """Format a dictionary as an SSE event.""" payload = json.dumps(data) return f"event: {event_type}\ndata: {payload}\n\n" # ── Entry Point ────────────────────────────────────────── if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True, threaded=True)