import sqlite3 from version import __version__ from pathlib import Path import logging from logging_config import setup_logging import os from contextlib import closing import getpass from auth import create_user APP_ENV = os.environ.get("APP_ENV", "dev") logger = setup_logging(APP_ENV) logger.info(f"Starting migration - APP-Version {__version__}") logger = logging.getLogger(__name__) BASE_DIR = Path(__file__).resolve().parents[1] DB_DIR = BASE_DIR / "data" DB_PATH = DB_DIR / "app.db" MIGRATIONS_DIR = BASE_DIR / "migrations" ADMIN_USERNAME = "admin" def get_connection() -> sqlite3.Connection: DB_DIR.mkdir(exist_ok=True) conn = sqlite3.connect(DB_PATH) return conn def get_applied_versions(conn: sqlite3.Connection) -> set[str]: """ Ließt aus schema_version, welche Migrationen schon gelaufen sind. Falls die Tabelle noch nicht existiert → leeres Set. """ try: cur = conn.execute("SELECT version FROM schema_version") rows = cur.fetchall() return {r[0] for r in rows} except sqlite3.OperationalError: # Tabelle existiert noch nicht (z.B. bei initialer DB) return set() def apply_migrations(): if not MIGRATIONS_DIR.exists(): raise SystemExit(f"Migrations-Ordner nicht gefunden: {MIGRATIONS_DIR}") conn = get_connection() try: applied = get_applied_versions(conn) # Alle .sql-Dateien alphabetisch sortiert sql_files = sorted(MIGRATIONS_DIR.glob("*.sql")) if not sql_files: logger.info("Keine Migrationsdateien gefunden.") return for path in sql_files: version = path.stem # z.B. '20250220_120000_initial' if version in applied: logger.info(f"[SKIP] {version} (bereits angewendet)") continue logger.info(f"[APPLY] {version}") sql = path.read_text(encoding="utf-8") try: # Eine Transaktion pro Datei with conn: conn.executescript(sql) except Exception as e: logger.info(f"Fehler in Migration {version}: {e}") # hier nicht weiter machen raise logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}") create_admin_user() finally: conn.close() def admin_exists() -> bool: """Prüft, ob der Admin-User bereits existiert.""" with closing(get_connection()) as conn: row = conn.execute( "SELECT 1 FROM users WHERE username = ?", (ADMIN_USERNAME,), ).fetchone() return row is not None def create_admin_user(): if admin_exists(): logger.info("Adminkonto existiert bereits! Kein initiales Konto angelegt.") return logger.info("Adminkonto wird angelegt ...") pw1 = getpass.getpass("Passwort: ") pw2 = getpass.getpass("Passwort wiederholen: ") if pw1 != pw2: logger.warning("Passwörter stimmen nicht überein! Abbruch.") return ok = create_user( username=ADMIN_USERNAME, password=pw1, role="admin", email="admin@co_app", firstname="co_app", lastname="admin" ) if ok: logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' wurde angelegt.") else: # Sollte eigentlich nicht passieren, weil wir vorher geprüft haben, # aber falls z.B. Parallelzugriff o.Ä. logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' konnte nicht angelegt werden.") if __name__ == "__main__": apply_migrations()