import sqlite3 from version import __version__ from pathlib import Path import logging from logging_config import setup_logging import os APP_ENV = os.environ.get("APP_ENV", "dev") logger = setup_logging(APP_ENV) logger.info(f"Starting migration - APP-Version {__version__}") logger = logging.getLogger(__name__) BASE_DIR = Path(__file__).resolve().parents[1] DB_DIR = BASE_DIR / "data" DB_PATH = DB_DIR / "app.db" MIGRATIONS_DIR = BASE_DIR / "migrations" def get_connection() -> sqlite3.Connection: DB_DIR.mkdir(exist_ok=True) conn = sqlite3.connect(DB_PATH) return conn def get_applied_versions(conn: sqlite3.Connection) -> set[str]: """ Ließt aus schema_version, welche Migrationen schon gelaufen sind. Falls die Tabelle noch nicht existiert → leeres Set. """ try: cur = conn.execute("SELECT version FROM schema_version") rows = cur.fetchall() return {r[0] for r in rows} except sqlite3.OperationalError: # Tabelle existiert noch nicht (z.B. bei initialer DB) return set() def apply_migrations(): if not MIGRATIONS_DIR.exists(): raise SystemExit(f"Migrations-Ordner nicht gefunden: {MIGRATIONS_DIR}") conn = get_connection() try: applied = get_applied_versions(conn) # Alle .sql-Dateien alphabetisch sortiert sql_files = sorted(MIGRATIONS_DIR.glob("*.sql")) if not sql_files: logger.info("Keine Migrationsdateien gefunden.") return for path in sql_files: version = path.stem # z.B. '20250220_120000_initial' if version in applied: logger.info(f"[SKIP] {version} (bereits angewendet)") continue logger.info(f"[APPLY] {version}") sql = path.read_text(encoding="utf-8") try: # Eine Transaktion pro Datei with conn: conn.executescript(sql) except Exception as e: logger.info(f"Fehler in Migration {version}: {e}") # hier nicht weiter machen raise logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}") #print(f"Migrationen abgeschlossen. DB: {DB_PATH}") finally: conn.close() if __name__ == "__main__": apply_migrations()