""" DatabaseEngine — pure Python transaction monitor for MariaDB HA demos. Mirrors the Swift DatabaseEngine behavior exactly: - Fresh connection per transaction (matching MySQLStoreCoordinator pattern) - INSERT → SELECT → DELETE cycle every 1 second - Thread-safe state with SSE callback pattern Design: No Flask dependency — testable in isolation. """ import threading import time import pymysql class DatabaseEngine: """Manages the transaction monitoring loop and exposes events via callbacks.""" def __init__(self): # Connection settings self.host = "127.0.0.1" self.port = 4000 self.user = "root" self.password = "monitor123" self.table_name = "test_io" self.database = "db_monitor_app" # State self._lock = threading.Lock() self._running = False self._thread = None self.total_tests = 0 self.successful_tests = 0 self._callbacks = [] # ── Public API ────────────────────────────────────────── def initialize(self) -> dict: """Create database and table. Returns {'ok': True} or {'error': str}.""" try: conn = self._connect(database="") with conn.cursor() as cur: cur.execute(f"CREATE DATABASE IF NOT EXISTS {self.database}") cur.execute(f"USE {self.database}") cur.execute( f"CREATE TABLE IF NOT EXISTS {self.table_name} " f"(id INT AUTO_INCREMENT PRIMARY KEY, data_string VARCHAR(50))" ) conn.commit() conn.close() return {"ok": True} except pymysql.Error as e: return {"error": str(e)} def start(self) -> None: """Begin the transaction monitoring loop in a background thread.""" if self._running: return with self._lock: self._running = True self.total_tests = 0 self.successful_tests = 0 self._thread = threading.Thread(target=self._run_loop, daemon=True) self._thread.start() def stop(self) -> None: """Stop the monitoring loop.""" self._running = False if self._thread and self._thread.is_alive(): self._thread.join(timeout=2) self._thread = None def on_event(self, callback) -> None: """Register a callback for SSE events. Called as callback(event_dict).""" self._callbacks.append(callback) def remove_callback(self, callback) -> None: """Remove a previously registered callback.""" if callback in self._callbacks: self._callbacks.remove(callback) @property def is_running(self) -> bool: return self._running def get_state(self) -> dict: """Snapshot of current state for polling clients.""" with self._lock: return { "running": self._running, "total": self.total_tests, "successful": self.successful_tests, "availability": self._availability(), } # ── Internal: Connection ──────────────────────────────── def _connect(self, database: str | None = None) -> pymysql.Connection: """Create a fresh MariaDB connection — matches Swift's per-transaction pattern.""" db = database if database is not None else self.database return pymysql.connect( host=self.host, port=int(self.port), user=self.user, password=self.password, database=db, connect_timeout=3, read_timeout=5, write_timeout=5, autocommit=False, ) # ── Internal: Transaction Loop ────────────────────────── def _run_loop(self) -> None: """Background thread loop: run one transaction per second.""" while self._running: success = self._run_transaction() self._update_stats(success) time.sleep(1) def _run_transaction(self) -> bool: """Execute INSERT → SELECT → DELETE as a single atomic cycle. Returns True on success, False on any failure. Creates a fresh connection each time (matching Swift behavior). """ conn = None try: conn = self._connect() test_id = self.total_tests + 1 with conn.cursor() as cur: # 1. INSERT cur.execute( f"INSERT INTO {self.table_name} (data_string) VALUES ('ping-{test_id}')" ) last_id = cur.lastrowid # 2. SELECT (verify the row exists) cur.execute( f"SELECT * FROM {self.table_name} WHERE id = {last_id}" ) cur.fetchall() # 3. DELETE (clean up, prevent table bloat) cur.execute( f"DELETE FROM {self.table_name} WHERE id = {last_id}" ) conn.commit() return True except pymysql.Error: if conn: try: conn.rollback() except Exception: pass return False finally: if conn: try: conn.close() except Exception: pass # ── Internal: State & Notifications ───────────────────── def _update_stats(self, success: bool) -> None: """Thread-safe stats update and SSE notification.""" with self._lock: self.total_tests += 1 if success: self.successful_tests += 1 event = { "type": "status", "success": success, "total": self.total_tests, "successful": self.successful_tests, "availability": self._availability(), } self._notify(event) def _availability(self) -> float: """Calculate availability percentage. Returns 100.0 if no tests yet.""" if self.total_tests == 0: return 100.0 return (self.successful_tests / self.total_tests) * 100.0 def _notify(self, event: dict) -> None: """Push event to all registered SSE callbacks.""" for cb in self._callbacks: try: cb(event) except Exception: pass