# app/auth.py import sqlite3 import time import secrets from contextlib import closing import bcrypt import streamlit as st from db import get_conn SESSION_LIFETIME_SECONDS = 8 * 60 * 60 # 8 Stunden # ---------- User-DB ---------- def init_auth_db(): """Legt die users-Tabelle an, falls sie noch nicht existiert.""" with closing(get_conn()) as conn, conn: conn.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash BLOB NOT NULL, role TEXT NOT NULL DEFAULT 'user' ) """ ) # Sessions-Tabelle conn.execute( """ CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, username TEXT NOT NULL, created_at INTEGER NOT NULL, expires_at INTEGER NOT NULL, FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE ) """ ) def create_user(username: str, password: str, role: str = "user") -> bool: pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) try: with closing(get_conn()) as conn, conn: conn.execute( "INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)", (username, pw_hash, role), ) return True except sqlite3.IntegrityError: return False def verify_user(username: str, password: str): with closing(get_conn()) as conn: row = conn.execute( "SELECT password_hash, role FROM users WHERE username = ?", (username,), ).fetchone() if not row: return False, None stored_hash, role = row ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash) return (ok, role) if ok else (False, None) # ---------- Session-DB ---------- def create_session(username: str) -> str: """Erzeugt eine neue Session-ID und speichert sie in der DB.""" sid = secrets.token_urlsafe(32) # schön zufällig, URL-tauglich now = int(time.time()) expires_at = now + SESSION_LIFETIME_SECONDS with closing(get_conn()) as conn, conn: conn.execute( """ INSERT INTO sessions (id, username, created_at, expires_at) VALUES (?, ?, ?, ?) """, (sid, username, now, expires_at), ) return sid def get_session(sid: str): """Liefert (username, role), wenn Session existiert und gültig ist, sonst (None, None).""" now = int(time.time()) with closing(get_conn()) as conn: row = conn.execute( """ SELECT s.username, u.role FROM sessions s JOIN users u ON u.username = s.username WHERE s.id = ? AND s.expires_at > ? """, (sid, now), ).fetchone() if not row: return None, None return row[0], row[1] def delete_session(sid: str): with closing(get_conn()) as conn, conn: conn.execute("DELETE FROM sessions WHERE id = ?", (sid,)) # Optional: abgelaufene Sessions aufräumen def cleanup_sessions(): now = int(time.time()) with closing(get_conn()) as conn, conn: conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,)) # ---------- SessionState-Helper ---------- def set_session_user(username: str, role: str, sid: str | None = None): st.session_state["auth"] = True st.session_state["username"] = username st.session_state["role"] = role if sid is not None: st.session_state["session_id"] = sid def clear_session_user(): for k in ["auth", "username", "role", "session_id"]: st.session_state.pop(k, None) def is_authenticated() -> bool: return bool(st.session_state.get("auth")) def current_user(): return ( st.session_state.get("username"), st.session_state.get("role"), ) def ensure_logged_in(): """ Am Anfang jeder Page aufrufen. Prüft erst st.session_state, dann ?session_id=... in der URL. """ if is_authenticated(): return # Versuch: session_id aus URL params = st.experimental_get_query_params() sid_list = params.get("session_id") sid = sid_list[0] if sid_list else None if sid: username, role = get_session(sid) if username: set_session_user(username, role, sid) return # Wenn wir hier landen: kein gültiger Login → Login-View anzeigen login_view() st.stop() # ---------- UI ---------- def login_view(): st.title("Intranet Login") with st.form("login"): u = st.text_input("Username") p = st.text_input("Passwort", type="password") submitted = st.form_submit_button("Anmelden") if submitted: username = u.strip() ok, role = verify_user(username, p) if ok: # neue Session in DB sid = create_session(username) # SessionState setzen set_session_user(username, role, sid) # URL-Parameter setzen (Arme-Leute-Cookie) st.experimental_set_query_params(session_id=sid) st.success("Erfolgreich angemeldet.") st.rerun() else: st.error("Login fehlgeschlagen.") def authed_header(): username, role = current_user() if not username: return st.sidebar.write(f"Angemeldet als **{username}** ({role})") if st.sidebar.button("Logout"): # Session in DB löschen, falls vorhanden sid = st.session_state.get("session_id") if sid: delete_session(sid) clear_session_user() # URL-Parameter entfernen st.experimental_set_query_params() st.rerun()