133 lines
3.7 KiB
Python
133 lines
3.7 KiB
Python
import sqlite3
|
|
from version import __version__
|
|
from pathlib import Path
|
|
import logging
|
|
from logging_config import setup_logging
|
|
import os
|
|
from contextlib import closing
|
|
import getpass
|
|
from auth import create_user
|
|
|
|
APP_ENV = os.environ.get("APP_ENV", "dev")
|
|
logger = setup_logging(APP_ENV)
|
|
logger.info(f"Starting migration - APP-Version {__version__}")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parents[1]
|
|
DB_DIR = BASE_DIR / "app" / "app_db"
|
|
DB_PATH = DB_DIR / "app.db"
|
|
MIGRATIONS_DIR = BASE_DIR / "migrations"
|
|
ADMIN_USERNAME = "admin"
|
|
|
|
print(BASE_DIR)
|
|
|
|
def get_connection() -> sqlite3.Connection:
|
|
DB_DIR.mkdir(exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
return conn
|
|
|
|
def get_applied_versions(conn: sqlite3.Connection) -> set[str]:
|
|
"""
|
|
Ließt aus schema_version, welche Migrationen schon gelaufen sind.
|
|
Falls die Tabelle noch nicht existiert → leeres Set.
|
|
"""
|
|
try:
|
|
cur = conn.execute("SELECT version FROM schema_version")
|
|
rows = cur.fetchall()
|
|
return {r[0] for r in rows}
|
|
except sqlite3.OperationalError:
|
|
# Tabelle existiert noch nicht (z.B. bei initialer DB)
|
|
|
|
return set()
|
|
|
|
def apply_migrations():
|
|
if not MIGRATIONS_DIR.exists():
|
|
raise SystemExit(f"Migrations-Ordner nicht gefunden: {MIGRATIONS_DIR}")
|
|
|
|
conn = get_connection()
|
|
|
|
try:
|
|
applied = get_applied_versions(conn)
|
|
|
|
# Alle .sql-Dateien alphabetisch sortiert
|
|
sql_files = sorted(MIGRATIONS_DIR.glob("*.sql"))
|
|
|
|
if not sql_files:
|
|
logger.info("Keine Migrationsdateien gefunden.")
|
|
return
|
|
|
|
for path in sql_files:
|
|
version = path.stem # z.B. '20250220_120000_initial'
|
|
|
|
if version in applied:
|
|
logger.info(f"[SKIP] {version} (bereits angewendet)")
|
|
continue
|
|
|
|
logger.info(f"[APPLY] {version}")
|
|
sql = path.read_text(encoding="utf-8")
|
|
|
|
try:
|
|
# Eine Transaktion pro Datei
|
|
with conn:
|
|
conn.executescript(sql)
|
|
except Exception as e:
|
|
logger.info(f"Fehler in Migration {version}: {e}")
|
|
# hier nicht weiter machen
|
|
raise
|
|
logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}")
|
|
|
|
create_admin_user()
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def admin_exists() -> bool:
|
|
"""Prüft, ob der Admin-User bereits existiert."""
|
|
with closing(get_connection()) as conn:
|
|
row = conn.execute(
|
|
"SELECT 1 FROM users WHERE username = ?",
|
|
(ADMIN_USERNAME,),
|
|
).fetchone()
|
|
if row is not None:
|
|
print(row[0])
|
|
else:
|
|
print(ADMIN_USERNAME)
|
|
print("Kein Admin gefunden")
|
|
return row is not None
|
|
|
|
|
|
def create_admin_user():
|
|
if admin_exists():
|
|
logger.info("Adminkonto existiert bereits! Kein initiales Konto angelegt.")
|
|
return
|
|
|
|
logger.info("Adminkonto wird angelegt ...")
|
|
|
|
pw1 = getpass.getpass("Passwort: ")
|
|
pw2 = getpass.getpass("Passwort wiederholen: ")
|
|
|
|
if pw1 != pw2:
|
|
logger.warning("Passwörter stimmen nicht überein! Abbruch.")
|
|
return
|
|
|
|
ok = create_user(
|
|
username=ADMIN_USERNAME,
|
|
password=pw1,
|
|
role_id=1,
|
|
email="admin@co_app",
|
|
firstname="***",
|
|
lastname="admin"
|
|
)
|
|
|
|
if ok:
|
|
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' wurde angelegt.")
|
|
else:
|
|
# Sollte eigentlich nicht passieren, weil wir vorher geprüft haben,
|
|
# aber falls z.B. Parallelzugriff o.Ä.
|
|
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' konnte nicht angelegt werden.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
apply_migrations() |