Compare commits

...

2 Commits

Author SHA1 Message Date
hansi
84e1e152f8 Add session-state and cookies 2025-11-30 12:35:21 +01:00
hansi
06e5322931 Add initialize admin-account in migration 2025-11-29 16:27:34 +01:00
11 changed files with 626 additions and 22 deletions

View File

@@ -71,6 +71,10 @@ APP_ENV = os.environ.get("APP_ENV", "dev")
Dieses Projekt verwendet SQLite für die interne App-Datenbank. Dieses Projekt verwendet SQLite für die interne App-Datenbank.
Das Schema wird vollständig über SQL-Migrationsdateien verwaltet, die automatisch in der richtigen Reihenfolge ausgeführt werden. Das Schema wird vollständig über SQL-Migrationsdateien verwaltet, die automatisch in der richtigen Reihenfolge ausgeführt werden.
**WICHTIG:**
Bei der ersten Initialisierung muss ein admin-Passwort vergeben werden. Beim erstmaligen Anmelden an der App muss man sich dann als "adim" mit dem vergebenen Passwort anmelden. In der App können dann User angelegt.
### Ordnerstruktur ### Ordnerstruktur
```bash ```bash

View File

@@ -1,4 +1,7 @@
# app/auth.py
import sqlite3 import sqlite3
import time
import secrets
from contextlib import closing from contextlib import closing
import bcrypt import bcrypt
@@ -6,6 +9,38 @@ import streamlit as st
from db import get_conn from db import get_conn
SESSION_LIFETIME_SECONDS = 8 * 60 * 60 # 8 Stunden
# ---------- User-DB ----------
def init_auth_db():
"""Legt die users-Tabelle an, falls sie noch nicht existiert."""
with closing(get_conn()) as conn, conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash BLOB NOT NULL,
role TEXT NOT NULL DEFAULT 'user'
)
"""
)
# Sessions-Tabelle
conn.execute(
"""
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
)
"""
)
def create_user(username: str, password: str, role: str = "user") -> bool: def create_user(username: str, password: str, role: str = "user") -> bool:
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
try: try:
@@ -32,15 +67,64 @@ def verify_user(username: str, password: str):
return (ok, role) if ok else (False, None) return (ok, role) if ok else (False, None)
# ---------- Session / Helper ---------- # ---------- Session-DB ----------
def set_session_user(username: str, role: str): def create_session(username: str) -> str:
"""Erzeugt eine neue Session-ID und speichert sie in der DB."""
sid = secrets.token_urlsafe(32) # schön zufällig, URL-tauglich
now = int(time.time())
expires_at = now + SESSION_LIFETIME_SECONDS
with closing(get_conn()) as conn, conn:
conn.execute(
"""
INSERT INTO sessions (id, username, created_at, expires_at)
VALUES (?, ?, ?, ?)
""",
(sid, username, now, expires_at),
)
return sid
def get_session(sid: str):
"""Liefert (username, role), wenn Session existiert und gültig ist, sonst (None, None)."""
now = int(time.time())
with closing(get_conn()) as conn:
row = conn.execute(
"""
SELECT s.username, u.role
FROM sessions s
JOIN users u ON u.username = s.username
WHERE s.id = ? AND s.expires_at > ?
""",
(sid, now),
).fetchone()
if not row:
return None, None
return row[0], row[1]
def delete_session(sid: str):
with closing(get_conn()) as conn, conn:
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
# Optional: abgelaufene Sessions aufräumen
def cleanup_sessions():
now = int(time.time())
with closing(get_conn()) as conn, conn:
conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,))
# ---------- SessionState-Helper ----------
def set_session_user(username: str, role: str, sid: str | None = None):
st.session_state["auth"] = True st.session_state["auth"] = True
st.session_state["username"] = username st.session_state["username"] = username
st.session_state["role"] = role st.session_state["role"] = role
if sid is not None:
st.session_state["session_id"] = sid
def clear_session_user(): def clear_session_user():
for k in ["auth", "username", "role"]: for k in ["auth", "username", "role", "session_id"]:
st.session_state.pop(k, None) st.session_state.pop(k, None)
@@ -49,16 +133,32 @@ def is_authenticated() -> bool:
def current_user(): def current_user():
"""(username, role) oder (None, None)""" return (
return st.session_state.get("username"), st.session_state.get("role") st.session_state.get("username"),
st.session_state.get("role"),
)
def ensure_logged_in(): def ensure_logged_in():
""" """
Am Anfang jeder Page aufrufen. Am Anfang jeder Page aufrufen.
Wenn kein Login: Login-View anzeigen und Script stoppen. Prüft erst st.session_state, dann ?session_id=... in der URL.
""" """
if not is_authenticated(): if is_authenticated():
return
# Versuch: session_id aus URL
params = st.experimental_get_query_params()
sid_list = params.get("session_id")
sid = sid_list[0] if sid_list else None
if sid:
username, role = get_session(sid)
if username:
set_session_user(username, role, sid)
return
# Wenn wir hier landen: kein gültiger Login → Login-View anzeigen
login_view() login_view()
st.stop() st.stop()
@@ -73,9 +173,18 @@ def login_view():
submitted = st.form_submit_button("Anmelden") submitted = st.form_submit_button("Anmelden")
if submitted: if submitted:
ok, role = verify_user(u.strip(), p) username = u.strip()
ok, role = verify_user(username, p)
if ok: if ok:
set_session_user(u.strip(), role) # neue Session in DB
sid = create_session(username)
# SessionState setzen
set_session_user(username, role, sid)
# URL-Parameter setzen (Arme-Leute-Cookie)
st.experimental_set_query_params(session_id=sid)
st.success("Erfolgreich angemeldet.") st.success("Erfolgreich angemeldet.")
st.rerun() st.rerun()
else: else:
@@ -88,6 +197,16 @@ def authed_header():
return return
st.sidebar.write(f"Angemeldet als **{username}** ({role})") st.sidebar.write(f"Angemeldet als **{username}** ({role})")
if st.sidebar.button("Logout"): if st.sidebar.button("Logout"):
# Session in DB löschen, falls vorhanden
sid = st.session_state.get("session_id")
if sid:
delete_session(sid)
clear_session_user() clear_session_user()
# URL-Parameter entfernen
st.experimental_set_query_params()
st.rerun() st.rerun()

127
app/auth_core.py Normal file
View File

@@ -0,0 +1,127 @@
# app/auth_core.py
from contextlib import closing
import bcrypt
from db import get_conn
# ---------------------------------------------------------------------------
# DB Initialisierung: `users`-Tabelle
# ---------------------------------------------------------------------------
def init_auth_db():
"""
Legt die users-Tabelle an.
WICHTIG: password_hash wird als TEXT gespeichert, damit sowohl
streamlit-authenticator als auch dein eigener bcrypt-Code kompatibel sind.
"""
with closing(get_conn()) as conn, conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'user'
)
"""
)
# ---------------------------------------------------------------------------
# Benutzer anlegen
# ---------------------------------------------------------------------------
def create_user(username: str, password: str, role: str = "user", email: str | None = None) -> bool:
"""
Passwort wird als bcrypt-Hash (TEXT) gespeichert.
"""
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
try:
with closing(get_conn()) as conn, conn:
conn.execute(
"INSERT INTO users (username, email, password_hash, role) VALUES (?, ?, ?, ?)",
(username, email, pw_hash, role),
)
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# Benutzer überprüfen (z.B. für deine alte Streamlit-Login-Maske)
# ---------------------------------------------------------------------------
def verify_user(username: str, password: str):
"""
Vergleicht eingegebenes Passwort mit dem gespeicherten bcrypt-Hash.
Rückgabe: (True, role) oder (False, None)
"""
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT password_hash, role FROM users WHERE username = ?",
(username,),
).fetchone()
if not row:
return False, None
stored_hash, role = row
# stored_hash ist TEXT → zurück nach bytes
ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash.encode("utf-8"))
return (ok, role) if ok else (False, None)
# ---------------------------------------------------------------------------
# Rolle für einen Benutzer holen (für Streamlit-Inhalte)
# ---------------------------------------------------------------------------
def get_role_for_user(username: str) -> str:
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT role FROM users WHERE username = ?",
(username,),
).fetchone()
return row[0] if row else "user"
# ---------------------------------------------------------------------------
# Credential-Lader für streamlit-authenticator
# ---------------------------------------------------------------------------
def load_credentials_from_db() -> dict:
"""
Baut das credentials-Dict so:
{
"usernames": {
"hansi": {
"email": "hansi@example.com",
"name": "hansi",
"password": "$2b$12$...",
},
...
}
}
streamlit-authenticator prüft dann Login + Cookies automatisch.
"""
creds = {"usernames": {}}
with closing(get_conn()) as conn:
rows = conn.execute(
"SELECT username, email, password_hash FROM users"
).fetchall()
for username, email, pw_hash in rows:
# pw_hash kommt als TEXT
creds["usernames"][username] = {
"name": username,
"email": email or f"{username}@example.local",
"password": pw_hash, # TEXT → streamlit-authenticator-kompatibel
}
return creds

94
app/auth_old.py Normal file
View File

@@ -0,0 +1,94 @@
import sqlite3
from contextlib import closing
import bcrypt
import streamlit as st
from db import get_conn
def create_user(username: str, password: str, role: str = "user") -> bool:
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
try:
with closing(get_conn()) as conn, conn:
conn.execute(
"INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
(username, pw_hash, role),
)
return True
except sqlite3.IntegrityError:
return False
def verify_user(username: str, password: str):
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT password_hash, role FROM users WHERE username = ?",
(username,),
).fetchone()
if not row:
return False, None
stored_hash, role = row
ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash)
return (ok, role) if ok else (False, None)
# ---------- Session / Helper ----------
def set_session_user(username: str, role: str):
st.session_state["auth"] = True
st.session_state["username"] = username
st.session_state["role"] = role
def clear_session_user():
for k in ["auth", "username", "role"]:
st.session_state.pop(k, None)
def is_authenticated() -> bool:
return bool(st.session_state.get("auth"))
def current_user():
"""(username, role) oder (None, None)"""
return st.session_state.get("username"), st.session_state.get("role")
def ensure_logged_in():
"""
Am Anfang jeder Page aufrufen.
Wenn kein Login: Login-View anzeigen und Script stoppen.
"""
if not is_authenticated():
login_view()
st.stop()
# ---------- UI ----------
def login_view():
st.title("Intranet Login")
with st.form("login"):
u = st.text_input("Username")
p = st.text_input("Passwort", type="password")
submitted = st.form_submit_button("Anmelden")
if submitted:
ok, role = verify_user(u.strip(), p)
if ok:
set_session_user(u.strip(), role)
st.success("Erfolgreich angemeldet.")
st.rerun()
else:
st.error("Login fehlgeschlagen.")
def authed_header():
username, role = current_user()
if not username:
return
st.sidebar.write(f"Angemeldet als **{username}** ({role})")
if st.sidebar.button("Logout"):
st.session_state.pop("app_started_logged", None)
clear_session_user()
st.rerun()

View File

@@ -1,10 +1,59 @@
from version import __version__ # app/main.py
from logging_config import setup_logging import streamlit as st
import os
from migrate import apply_migrations
APP_ENV = os.environ.get("APP_ENV", "dev") from auth import (
logger = setup_logging(APP_ENV) init_auth_db,
logger.info(f"Starting app in {APP_ENV} mode - APP-Version {__version__}") ensure_logged_in,
authed_header,
current_user,
create_user, # falls du Admin-Funktion brauchst
)
apply_migrations()
def content_for(role: str, username: str):
st.header("Dashboard")
st.info(f"Willkommen, {username}!")
if role == "admin":
st.subheader("Admin-Bereich")
st.write("Nur Admins sehen das hier.")
with st.expander("Neuen Nutzer anlegen"):
new_u = st.text_input("Neuer Username", key="new_u")
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
if st.button("Anlegen"):
if new_u and new_p:
ok = create_user(new_u.strip(), new_p, new_role)
st.success("Nutzer angelegt.") if ok else st.error(
"Username bereits vorhanden."
)
else:
st.warning("Bitte Username und Passwort eingeben.")
st.subheader("Dein Bereich")
st.write(f"Personalisierter Content für **{username}**.")
# fachliche Inhalte …
def main():
st.set_page_config(
page_title="Intranet-Portal",
page_icon="🔒",
layout="centered",
)
init_auth_db()
# hier wird entweder:
# - st.session_state genutzt, oder
# - ?session_id=... aus URL geprüft, oder
# - Login-Form gezeigt (und Script gestoppt)
ensure_logged_in()
authed_header()
username, role = current_user()
content_for(role, username)
if __name__ == "__main__":
main()

87
app/main_authenticator.py Normal file
View File

@@ -0,0 +1,87 @@
# app/main.py
import streamlit as st
import yaml
from yaml.loader import SafeLoader
import streamlit_authenticator as stauth
from auth_core import (
init_auth_db,
load_credentials_from_db,
get_role_for_user,
create_user,
)
from version import __version__
def content_for(username: str, role: str):
st.header("Dashboard")
st.info(f"Willkommen, {username}!")
if role == "admin":
st.subheader("Admin-Bereich")
st.write("Nur Admins sehen das hier.")
with st.expander("Neuen Nutzer anlegen"):
new_u = st.text_input("Neuer Username", key="new_u")
new_email = st.text_input("E-Mail", key="new_email")
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
if st.button("Anlegen"):
if new_u and new_p:
ok = create_user(new_u.strip(), new_p, new_role, new_email.strip() or None)
st.success("Nutzer angelegt.") if ok else st.error("Username bereits vorhanden oder Fehler.")
else:
st.warning("Bitte Username und Passwort eingeben.")
st.subheader("Dein Bereich")
st.write(f"Personalisierter Content für **{username}**.")
def main():
st.set_page_config(
page_title=f"Intranet-Portal v{__version__}",
page_icon="🔒",
layout="centered",
)
# DB-Struktur sicherstellen
init_auth_db()
# --- Config laden (Cookie, etc.) ---
with open("config/auth.yaml", "r", encoding="utf-8") as f:
base_config = yaml.load(f, Loader=SafeLoader)
# --- Credentials dynamisch aus DB laden ---
db_creds = load_credentials_from_db()
base_config["credentials"] = db_creds
authenticator = stauth.Authenticate(
base_config["credentials"],
base_config["cookie"]["name"],
base_config["cookie"]["key"],
base_config["cookie"]["expiry_days"],
base_config.get("preauthorized", {}),
)
name, auth_status, username = authenticator.login("Login", "main")
if auth_status is False:
st.error("Login fehlgeschlagen.")
return
if auth_status is None:
st.warning("Bitte Benutzername und Passwort eingeben.")
return
# ---- Ab hier eingeloggt (persistenter Cookie) ----
role = get_role_for_user(username)
authenticator.logout("Logout", "sidebar")
st.sidebar.write(f"Angemeldet als **{name}** ({username}, Rolle: {role})")
content_for(username, role)
if __name__ == "__main__":
main()

70
app/main_old.py Normal file
View File

@@ -0,0 +1,70 @@
import streamlit as st
from version import __version__
from logging_config import setup_logging
import os
from auth import init_auth_db, ensure_logged_in, login_view, authed_header, current_user, create_user, clear_session_user
APP_ENV = os.environ.get("APP_ENV", "dev")
logger = setup_logging(APP_ENV)
def content_for(role: str, username: str):
st.header("Dashboard")
st.info(f"Willkommen, {username}!")
if role == "admin":
st.subheader("Admin-Bereich")
st.write("Nur Admins sehen das hier.")
with st.expander("Neuen Nutzer anlegen"):
new_u = st.text_input("Neuer Username", key="new_u")
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
if st.button("Anlegen"):
if new_u and new_p:
ok = create_user(new_u.strip(), new_p, new_role)
st.success("Nutzer angelegt.") if ok else st.error(
"Username bereits vorhanden."
)
else:
st.warning("Bitte Username und Passwort eingeben.")
st.subheader("Dein Bereich")
st.write(f"Personalisierter Content für **{username}**.")
# hier: weitere Seite(n), DB-Zugriffe etc.
def main():
if "app_started_logged" not in st.session_state:
logger.info(f"Starting app in {APP_ENV} mode - APP-Version {__version__}")
st.session_state["app_started_logged"] = True
# logger.info(f"User Starting app in {APP_ENV} mode - APP-Version {__version__}")
st.set_page_config(
page_title="Intranet-Portal",
page_icon="🔒",
layout="centered",
)
# Falls kein Login: Login-View anzeigen und raus
if not st.session_state.get("auth"):
login_view()
return
# Ab hier sind wir eingeloggt
authed_header()
username, role = current_user()
content_for(role, username)
st.write(st.session_state)
if st.sidebar.button("set state"):
st.session_state["app_started_logged"] = True
#clear_session_user()
st.write(st.session_state)
#st.rerun()
if st.sidebar.button("delete state"):
st.session_state.pop("app_started_logged", None)
#clear_session_user()
st.write(st.session_state)
#st.rerun()
if __name__ == "__main__":
main()

View File

@@ -4,6 +4,9 @@ from pathlib import Path
import logging import logging
from logging_config import setup_logging from logging_config import setup_logging
import os import os
from contextlib import closing
import getpass
from auth import create_user
APP_ENV = os.environ.get("APP_ENV", "dev") APP_ENV = os.environ.get("APP_ENV", "dev")
logger = setup_logging(APP_ENV) logger = setup_logging(APP_ENV)
@@ -17,7 +20,7 @@ BASE_DIR = Path(__file__).resolve().parents[1]
DB_DIR = BASE_DIR / "data" DB_DIR = BASE_DIR / "data"
DB_PATH = DB_DIR / "app.db" DB_PATH = DB_DIR / "app.db"
MIGRATIONS_DIR = BASE_DIR / "migrations" MIGRATIONS_DIR = BASE_DIR / "migrations"
ADMIN_USERNAME = "admin"
def get_connection() -> sqlite3.Connection: def get_connection() -> sqlite3.Connection:
@@ -74,11 +77,45 @@ def apply_migrations():
# hier nicht weiter machen # hier nicht weiter machen
raise raise
logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}") logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}")
#print(f"Migrationen abgeschlossen. DB: {DB_PATH}")
create_admin_user()
finally: finally:
conn.close() conn.close()
def admin_exists() -> bool:
"""Prüft, ob der Admin-User bereits existiert."""
with closing(get_connection()) as conn:
row = conn.execute(
"SELECT 1 FROM users WHERE username = ?",
(ADMIN_USERNAME,),
).fetchone()
return row is not None
def create_admin_user():
if admin_exists():
logger.info("Adminkonto existiert bereits! Kein initiales Konto angelegt.")
return
logger.info("Adminkonto wird angelegt ...")
pw1 = getpass.getpass("Passwort: ")
pw2 = getpass.getpass("Passwort wiederholen: ")
if pw1 != pw2:
logger.warning("Passwörter stimmen nicht überein! Abbruch.")
return
ok = create_user(ADMIN_USERNAME, pw1, role="admin")
if ok:
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' wurde angelegt.")
else:
# Sollte eigentlich nicht passieren, weil wir vorher geprüft haben,
# aber falls z.B. Parallelzugriff o.Ä.
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' konnte nicht angelegt werden.")
if __name__ == "__main__": if __name__ == "__main__":
apply_migrations() apply_migrations()

7
config/auth.yaml Normal file
View File

@@ -0,0 +1,7 @@
cookie:
expiry_days: 7
key: "please_change_this_cookie_signing_key"
name: "intranet_session"
preauthorized:
emails: []

Binary file not shown.

View File

@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
);
INSERT INTO schema_version (version) VALUES ('20251129_162900_add_table_sessions');
COMMIT;