Add some new fields for users table

This commit is contained in:
handi
2025-11-30 14:55:02 +01:00
parent 84e1e152f8
commit 111d0d2756
11 changed files with 184 additions and 139 deletions

212
app/auth__.py Normal file
View File

@@ -0,0 +1,212 @@
# app/auth.py
import sqlite3
import time
import secrets
from contextlib import closing
import bcrypt
import streamlit as st
from db import get_conn
SESSION_LIFETIME_SECONDS = 8 * 60 * 60 # 8 Stunden
# ---------- User-DB ----------
def init_auth_db():
"""Legt die users-Tabelle an, falls sie noch nicht existiert."""
with closing(get_conn()) as conn, conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash BLOB NOT NULL,
role TEXT NOT NULL DEFAULT 'user'
)
"""
)
# Sessions-Tabelle
conn.execute(
"""
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
)
"""
)
def create_user(username: str, password: str, role: str = "user") -> bool:
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
try:
with closing(get_conn()) as conn, conn:
conn.execute(
"INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
(username, pw_hash, role),
)
return True
except sqlite3.IntegrityError:
return False
def verify_user(username: str, password: str):
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT password_hash, role FROM users WHERE username = ?",
(username,),
).fetchone()
if not row:
return False, None
stored_hash, role = row
ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash)
return (ok, role) if ok else (False, None)
# ---------- Session-DB ----------
def create_session(username: str) -> str:
"""Erzeugt eine neue Session-ID und speichert sie in der DB."""
sid = secrets.token_urlsafe(32) # schön zufällig, URL-tauglich
now = int(time.time())
expires_at = now + SESSION_LIFETIME_SECONDS
with closing(get_conn()) as conn, conn:
conn.execute(
"""
INSERT INTO sessions (id, username, created_at, expires_at)
VALUES (?, ?, ?, ?)
""",
(sid, username, now, expires_at),
)
return sid
def get_session(sid: str):
"""Liefert (username, role), wenn Session existiert und gültig ist, sonst (None, None)."""
now = int(time.time())
with closing(get_conn()) as conn:
row = conn.execute(
"""
SELECT s.username, u.role
FROM sessions s
JOIN users u ON u.username = s.username
WHERE s.id = ? AND s.expires_at > ?
""",
(sid, now),
).fetchone()
if not row:
return None, None
return row[0], row[1]
def delete_session(sid: str):
with closing(get_conn()) as conn, conn:
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
# Optional: abgelaufene Sessions aufräumen
def cleanup_sessions():
now = int(time.time())
with closing(get_conn()) as conn, conn:
conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,))
# ---------- SessionState-Helper ----------
def set_session_user(username: str, role: str, sid: str | None = None):
st.session_state["auth"] = True
st.session_state["username"] = username
st.session_state["role"] = role
if sid is not None:
st.session_state["session_id"] = sid
def clear_session_user():
for k in ["auth", "username", "role", "session_id"]:
st.session_state.pop(k, None)
def is_authenticated() -> bool:
return bool(st.session_state.get("auth"))
def current_user():
return (
st.session_state.get("username"),
st.session_state.get("role"),
)
def ensure_logged_in():
"""
Am Anfang jeder Page aufrufen.
Prüft erst st.session_state, dann ?session_id=... in der URL.
"""
if is_authenticated():
return
# Versuch: session_id aus URL
params = st.experimental_get_query_params()
sid_list = params.get("session_id")
sid = sid_list[0] if sid_list else None
if sid:
username, role = get_session(sid)
if username:
set_session_user(username, role, sid)
return
# Wenn wir hier landen: kein gültiger Login → Login-View anzeigen
login_view()
st.stop()
# ---------- UI ----------
def login_view():
st.title("Intranet Login")
with st.form("login"):
u = st.text_input("Username")
p = st.text_input("Passwort", type="password")
submitted = st.form_submit_button("Anmelden")
if submitted:
username = u.strip()
ok, role = verify_user(username, p)
if ok:
# neue Session in DB
sid = create_session(username)
# SessionState setzen
set_session_user(username, role, sid)
# URL-Parameter setzen (Arme-Leute-Cookie)
st.experimental_set_query_params(session_id=sid)
st.success("Erfolgreich angemeldet.")
st.rerun()
else:
st.error("Login fehlgeschlagen.")
def authed_header():
username, role = current_user()
if not username:
return
st.sidebar.write(f"Angemeldet als **{username}** ({role})")
if st.sidebar.button("Logout"):
# Session in DB löschen, falls vorhanden
sid = st.session_state.get("session_id")
if sid:
delete_session(sid)
clear_session_user()
# URL-Parameter entfernen
st.experimental_set_query_params()
st.rerun()