mariadb-demo/engine.py
administrator c4c7dd3f05 chore: initial release — MariaDB HA Demo
- 6-node MariaDB cluster with GTID replication
- MaxScale 24.02 proxy with automatic failover
- Flask dashboard with SSE transaction monitor
- Per-server toggle controls + mode selector
- Systemd services for auto-start on boot
- One-command deploy.sh
2026-06-24 11:16:16 +00:00

197 lines
6.6 KiB
Python

"""
DatabaseEngine — pure Python transaction monitor for MariaDB HA demos.
Mirrors the Swift DatabaseEngine behavior exactly:
- Fresh connection per transaction (matching MySQLStoreCoordinator pattern)
- INSERT → SELECT → DELETE cycle every 1 second
- Thread-safe state with SSE callback pattern
Design: No Flask dependency — testable in isolation.
"""
import threading
import time
import pymysql
class DatabaseEngine:
"""Manages the transaction monitoring loop and exposes events via callbacks."""
def __init__(self):
# Connection settings
self.host = "127.0.0.1"
self.port = 4000
self.user = "root"
self.password = "monitor123"
self.table_name = "test_io"
self.database = "db_monitor_app"
# State
self._lock = threading.Lock()
self._running = False
self._thread = None
self.total_tests = 0
self.successful_tests = 0
self._callbacks = []
# ── Public API ──────────────────────────────────────────
def initialize(self) -> dict:
"""Create database and table. Returns {'ok': True} or {'error': str}."""
try:
conn = self._connect(database="")
with conn.cursor() as cur:
cur.execute(f"CREATE DATABASE IF NOT EXISTS {self.database}")
cur.execute(f"USE {self.database}")
cur.execute(
f"CREATE TABLE IF NOT EXISTS {self.table_name} "
f"(id INT AUTO_INCREMENT PRIMARY KEY, data_string VARCHAR(50))"
)
conn.commit()
conn.close()
return {"ok": True}
except pymysql.Error as e:
return {"error": str(e)}
def start(self) -> None:
"""Begin the transaction monitoring loop in a background thread."""
if self._running:
return
with self._lock:
self._running = True
self.total_tests = 0
self.successful_tests = 0
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
def stop(self) -> None:
"""Stop the monitoring loop."""
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2)
self._thread = None
def on_event(self, callback) -> None:
"""Register a callback for SSE events. Called as callback(event_dict)."""
self._callbacks.append(callback)
def remove_callback(self, callback) -> None:
"""Remove a previously registered callback."""
if callback in self._callbacks:
self._callbacks.remove(callback)
@property
def is_running(self) -> bool:
return self._running
def get_state(self) -> dict:
"""Snapshot of current state for polling clients."""
with self._lock:
return {
"running": self._running,
"total": self.total_tests,
"successful": self.successful_tests,
"availability": self._availability(),
}
# ── Internal: Connection ────────────────────────────────
def _connect(self, database: str | None = None) -> pymysql.Connection:
"""Create a fresh MariaDB connection — matches Swift's per-transaction pattern."""
db = database if database is not None else self.database
return pymysql.connect(
host=self.host,
port=int(self.port),
user=self.user,
password=self.password,
database=db,
connect_timeout=3,
read_timeout=5,
write_timeout=5,
autocommit=False,
)
# ── Internal: Transaction Loop ──────────────────────────
def _run_loop(self) -> None:
"""Background thread loop: run one transaction per second."""
while self._running:
success = self._run_transaction()
self._update_stats(success)
time.sleep(1)
def _run_transaction(self) -> bool:
"""Execute INSERT → SELECT → DELETE as a single atomic cycle.
Returns True on success, False on any failure.
Creates a fresh connection each time (matching Swift behavior).
"""
conn = None
try:
conn = self._connect()
test_id = self.total_tests + 1
with conn.cursor() as cur:
# 1. INSERT
cur.execute(
f"INSERT INTO {self.table_name} (data_string) VALUES ('ping-{test_id}')"
)
last_id = cur.lastrowid
# 2. SELECT (verify the row exists)
cur.execute(
f"SELECT * FROM {self.table_name} WHERE id = {last_id}"
)
cur.fetchall()
# 3. DELETE (clean up, prevent table bloat)
cur.execute(
f"DELETE FROM {self.table_name} WHERE id = {last_id}"
)
conn.commit()
return True
except pymysql.Error:
if conn:
try:
conn.rollback()
except Exception:
pass
return False
finally:
if conn:
try:
conn.close()
except Exception:
pass
# ── Internal: State & Notifications ─────────────────────
def _update_stats(self, success: bool) -> None:
"""Thread-safe stats update and SSE notification."""
with self._lock:
self.total_tests += 1
if success:
self.successful_tests += 1
event = {
"type": "status",
"success": success,
"total": self.total_tests,
"successful": self.successful_tests,
"availability": self._availability(),
}
self._notify(event)
def _availability(self) -> float:
"""Calculate availability percentage. Returns 100.0 if no tests yet."""
if self.total_tests == 0:
return 100.0
return (self.successful_tests / self.total_tests) * 100.0
def _notify(self, event: dict) -> None:
"""Push event to all registered SSE callbacks."""
for cb in self._callbacks:
try:
cb(event)
except Exception:
pass