from contextlib import closing import bcrypt from app_db.app_db import get_conn, get_list # --------------------------------------------------------------------------- # Benutzer anlegen # --------------------------------------------------------------------------- def create_user( username: str, password: str, role_id: int = 1, email: str | None = None, firstname: str | None = None, lastname: str | None = None ) -> bool: """ Passwort wird als bcrypt-Hash (TEXT) gespeichert. """ pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") try: with closing(get_conn()) as conn, conn: conn.execute( "INSERT INTO users (username, email, password_hash, role_id, firstname, lastname) VALUES (?, ?, ?, ?, ?, ?)", (username, email, pw_hash, role_id, firstname, lastname), ) return True except Exception: return False # --------------------------------------------------------------------------- # Benutzer überprüfen # --------------------------------------------------------------------------- def verify_user(username: str, password: str): """ Vergleicht eingegebenes Passwort mit dem gespeicherten bcrypt-Hash. Rückgabe: (True, role_id) oder (False, None) """ with closing(get_conn()) as conn: row = conn.execute( "SELECT password_hash, role_id FROM users WHERE username = ?", (username,), ).fetchone() if not row: return False, None stored_hash, role_id = row # stored_hash ist TEXT → zurück nach bytes ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash.encode("utf-8")) return (ok, role_id) if ok else (False, None) # --------------------------------------------------------------------------- # Rolle für einen Benutzer holen (für Streamlit-Inhalte) # --------------------------------------------------------------------------- def get_role_for_user(username: str) -> str: with closing(get_conn()) as conn: row = conn.execute( "SELECT r.role_text from users u left join roles r on u.role_id = r.role_id WHERE username = ?", (username,), ).fetchone() return row[0] if row else "user" # --------------------------------------------------------------------------- # Fullname für einen Benutzer holen (für Streamlit-Inhalte) # --------------------------------------------------------------------------- def get_fullname_for_user(username: str) -> str: with closing(get_conn()) as conn: row = conn.execute( "SELECT firstname ||' '|| lastname as fullname FROM users WHERE username = ?", (username,), ).fetchone() return row[0] if row else "user" # --------------------------------------------------------------------------- # Sidebar-Einträge für den Benutzer # --------------------------------------------------------------------------- def get_sidebar(role_text: str, username: str): if role_text == "admin": sql = """ select g.group_id, g.group_text, d.dash_id, d.dash_text, d.page_file, d.dash_type, d.order_no as dash_order, g.order_no as group_order from groups g left join dashboards d on g.group_id = d.group_id where g.active = 1 and d.active = 1 order by g.order_no, d.order_no """ else: sql = """ SELECT d.group_id, g.group_text, p.dash_id, d.dash_text, d.page_file, d.dash_type, d.order_no as dash_order, g.order_no as group_order FROM users u left join permissions p on u.role_id = p.role_id left join dashboards d on p.dash_id = d.dash_id left join groups g on d.group_id = g.group_id where u.active = 1 and g.active = 1 and d.active = 1 and p.active = 1 and u.username = ? order by g.order_no, d.order_no """ params = (username,) if "?" in sql else None df = get_list(sql, params) return df # --------------------------------------------------------------------------- # Credential-Lader für streamlit-authenticator # --------------------------------------------------------------------------- def load_credentials_from_db() -> dict: """ Baut das credentials-Dict so: { "usernames": { "hansi": { "email": "hansi@example.com", "name": "hansi", "password": "$2b$12$...", }, ... } } streamlit-authenticator prüft dann Login + Cookies automatisch. """ creds = {"usernames": {}} with closing(get_conn()) as conn: rows = conn.execute( "SELECT username, email, password_hash FROM users where active = 1" ).fetchall() for username, email, pw_hash in rows: # pw_hash kommt als TEXT creds["usernames"][username] = { "name": username, "email": email or f"{username}@example.local", "password": pw_hash, # TEXT → streamlit-authenticator-kompatibel } return creds # --------------------------------------------------------------------------- # Muss das Passwort geändert werden (Admin-Vorgabe)? # --------------------------------------------------------------------------- def needs_password_change(username: str) -> bool: """Prüft, ob der User ein neues Passwort setzen muss (new_pwd = 1).""" with closing(get_conn()) as conn: row = conn.execute( "SELECT new_pwd FROM users WHERE username = ?", (username,), ).fetchone() if not row: return False return row[0] == 1 # --------------------------------------------------------------------------- # Passwort ändern durch den Benutzer # --------------------------------------------------------------------------- def update_password(username: str, new_password: str, reset_flag: bool = True) -> bool: """ Setzt ein neues Passwort für den User. Wenn reset_flag=True: new_pwd wird auf 0 zurückgesetzt. """ pw_hash = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") with closing(get_conn()) as conn, conn: if reset_flag: conn.execute( """ UPDATE users SET password_hash = ?, new_pwd = 0 WHERE username = ? """, (pw_hash, username), ) else: conn.execute( """ UPDATE users SET password_hash = ? WHERE username = ? """, (pw_hash, username), ) return True