from contextlib import closing import bcrypt from db import get_conn #, create_user, verify_user, get_role_for_user # --------------------------------------------------------------------------- # Benutzer anlegen # --------------------------------------------------------------------------- def create_user( username: str, password: str, role_id: str = "user", email: str | None = None, firstname: str | None = None, lastname: str | None = None ) -> bool: """ Passwort wird als bcrypt-Hash (TEXT) gespeichert. """ pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") try: with closing(get_conn()) as conn, conn: conn.execute( "INSERT INTO users (username, email, password_hash, role_id, firstname, lastname) VALUES (?, ?, ?, ?, ?, ?)", (username, email, pw_hash, role_id, firstname, lastname), ) return True except Exception: return False # --------------------------------------------------------------------------- # Benutzer überprüfen (z.B. für deine alte Streamlit-Login-Maske) # --------------------------------------------------------------------------- def verify_user(username: str, password: str): """ Vergleicht eingegebenes Passwort mit dem gespeicherten bcrypt-Hash. Rückgabe: (True, role_id) oder (False, None) """ with closing(get_conn()) as conn: row = conn.execute( "SELECT password_hash, role_id FROM users WHERE username = ?", (username,), ).fetchone() if not row: return False, None stored_hash, role_id = row # stored_hash ist TEXT → zurück nach bytes ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash.encode("utf-8")) return (ok, role_id) if ok else (False, None) # --------------------------------------------------------------------------- # Rolle für einen Benutzer holen (für Streamlit-Inhalte) # --------------------------------------------------------------------------- def get_role_for_user(username: str) -> str: with closing(get_conn()) as conn: row = conn.execute( "SELECT r.role_text from users u left join roles r on u.role_id = r.role_id WHERE username = ?", (username,), ).fetchone() return row[0] if row else "user" # --------------------------------------------------------------------------- # Fullname für einen Benutzer holen (für Streamlit-Inhalte) # --------------------------------------------------------------------------- def get_fullname_for_user(username: str) -> str: with closing(get_conn()) as conn: row = conn.execute( "SELECT firstname ||' '|| lastname as fullname FROM users WHERE username = ?", (username,), ).fetchone() return row[0] if row else "user" # --------------------------------------------------------------------------- # Credential-Lader für streamlit-authenticator # --------------------------------------------------------------------------- def load_credentials_from_db() -> dict: """ Baut das credentials-Dict so: { "usernames": { "hansi": { "email": "hansi@example.com", "name": "hansi", "password": "$2b$12$...", }, ... } } streamlit-authenticator prüft dann Login + Cookies automatisch. """ creds = {"usernames": {}} with closing(get_conn()) as conn: rows = conn.execute( "SELECT username, email, password_hash FROM users" ).fetchall() for username, email, pw_hash in rows: # pw_hash kommt als TEXT creds["usernames"][username] = { "name": username, "email": email or f"{username}@example.local", "password": pw_hash, # TEXT → streamlit-authenticator-kompatibel } return creds # --------------------------------------------------------------------------- # Muss das Passwort geändert werden (Admin-Vorgabe)? # --------------------------------------------------------------------------- def needs_password_change(username: str) -> bool: """Prüft, ob der User ein neues Passwort setzen muss (new_pwd = 1).""" with closing(get_conn()) as conn: row = conn.execute( "SELECT new_pwd FROM users WHERE username = ?", (username,), ).fetchone() if not row: return False return row[0] == 1 # --------------------------------------------------------------------------- # Passwort ändern durch den Benutzer # --------------------------------------------------------------------------- def update_password(username: str, new_password: str, reset_flag: bool = True) -> bool: """ Setzt ein neues Passwort für den User. Wenn reset_flag=True: new_pwd wird auf 0 zurückgesetzt. """ pw_hash = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") with closing(get_conn()) as conn, conn: if reset_flag: conn.execute( """ UPDATE users SET password_hash = ?, new_pwd = 0 WHERE username = ? """, (pw_hash, username), ) else: conn.execute( """ UPDATE users SET password_hash = ? WHERE username = ? """, (pw_hash, username), ) return True