Compare commits
2 Commits
6158f2ddff
...
84e1e152f8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
84e1e152f8 | ||
|
|
06e5322931 |
@@ -71,6 +71,10 @@ APP_ENV = os.environ.get("APP_ENV", "dev")
|
|||||||
Dieses Projekt verwendet SQLite für die interne App-Datenbank.
|
Dieses Projekt verwendet SQLite für die interne App-Datenbank.
|
||||||
Das Schema wird vollständig über SQL-Migrationsdateien verwaltet, die automatisch in der richtigen Reihenfolge ausgeführt werden.
|
Das Schema wird vollständig über SQL-Migrationsdateien verwaltet, die automatisch in der richtigen Reihenfolge ausgeführt werden.
|
||||||
|
|
||||||
|
**WICHTIG:**
|
||||||
|
|
||||||
|
Bei der ersten Initialisierung muss ein admin-Passwort vergeben werden. Beim erstmaligen Anmelden an der App muss man sich dann als "adim" mit dem vergebenen Passwort anmelden. In der App können dann User angelegt.
|
||||||
|
|
||||||
### Ordnerstruktur
|
### Ordnerstruktur
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|||||||
137
app/auth.py
137
app/auth.py
@@ -1,4 +1,7 @@
|
|||||||
|
# app/auth.py
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import time
|
||||||
|
import secrets
|
||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
|
|
||||||
import bcrypt
|
import bcrypt
|
||||||
@@ -6,6 +9,38 @@ import streamlit as st
|
|||||||
|
|
||||||
from db import get_conn
|
from db import get_conn
|
||||||
|
|
||||||
|
|
||||||
|
SESSION_LIFETIME_SECONDS = 8 * 60 * 60 # 8 Stunden
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- User-DB ----------
|
||||||
|
def init_auth_db():
|
||||||
|
"""Legt die users-Tabelle an, falls sie noch nicht existiert."""
|
||||||
|
with closing(get_conn()) as conn, conn:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS users (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
username TEXT UNIQUE NOT NULL,
|
||||||
|
password_hash BLOB NOT NULL,
|
||||||
|
role TEXT NOT NULL DEFAULT 'user'
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
# Sessions-Tabelle
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS sessions (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
username TEXT NOT NULL,
|
||||||
|
created_at INTEGER NOT NULL,
|
||||||
|
expires_at INTEGER NOT NULL,
|
||||||
|
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_user(username: str, password: str, role: str = "user") -> bool:
|
def create_user(username: str, password: str, role: str = "user") -> bool:
|
||||||
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
||||||
try:
|
try:
|
||||||
@@ -32,15 +67,64 @@ def verify_user(username: str, password: str):
|
|||||||
return (ok, role) if ok else (False, None)
|
return (ok, role) if ok else (False, None)
|
||||||
|
|
||||||
|
|
||||||
# ---------- Session / Helper ----------
|
# ---------- Session-DB ----------
|
||||||
def set_session_user(username: str, role: str):
|
def create_session(username: str) -> str:
|
||||||
|
"""Erzeugt eine neue Session-ID und speichert sie in der DB."""
|
||||||
|
sid = secrets.token_urlsafe(32) # schön zufällig, URL-tauglich
|
||||||
|
now = int(time.time())
|
||||||
|
expires_at = now + SESSION_LIFETIME_SECONDS
|
||||||
|
with closing(get_conn()) as conn, conn:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO sessions (id, username, created_at, expires_at)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(sid, username, now, expires_at),
|
||||||
|
)
|
||||||
|
return sid
|
||||||
|
|
||||||
|
|
||||||
|
def get_session(sid: str):
|
||||||
|
"""Liefert (username, role), wenn Session existiert und gültig ist, sonst (None, None)."""
|
||||||
|
now = int(time.time())
|
||||||
|
with closing(get_conn()) as conn:
|
||||||
|
row = conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT s.username, u.role
|
||||||
|
FROM sessions s
|
||||||
|
JOIN users u ON u.username = s.username
|
||||||
|
WHERE s.id = ? AND s.expires_at > ?
|
||||||
|
""",
|
||||||
|
(sid, now),
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return None, None
|
||||||
|
return row[0], row[1]
|
||||||
|
|
||||||
|
|
||||||
|
def delete_session(sid: str):
|
||||||
|
with closing(get_conn()) as conn, conn:
|
||||||
|
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
|
||||||
|
|
||||||
|
|
||||||
|
# Optional: abgelaufene Sessions aufräumen
|
||||||
|
def cleanup_sessions():
|
||||||
|
now = int(time.time())
|
||||||
|
with closing(get_conn()) as conn, conn:
|
||||||
|
conn.execute("DELETE FROM sessions WHERE expires_at <= ?", (now,))
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- SessionState-Helper ----------
|
||||||
|
def set_session_user(username: str, role: str, sid: str | None = None):
|
||||||
st.session_state["auth"] = True
|
st.session_state["auth"] = True
|
||||||
st.session_state["username"] = username
|
st.session_state["username"] = username
|
||||||
st.session_state["role"] = role
|
st.session_state["role"] = role
|
||||||
|
if sid is not None:
|
||||||
|
st.session_state["session_id"] = sid
|
||||||
|
|
||||||
|
|
||||||
def clear_session_user():
|
def clear_session_user():
|
||||||
for k in ["auth", "username", "role"]:
|
for k in ["auth", "username", "role", "session_id"]:
|
||||||
st.session_state.pop(k, None)
|
st.session_state.pop(k, None)
|
||||||
|
|
||||||
|
|
||||||
@@ -49,16 +133,32 @@ def is_authenticated() -> bool:
|
|||||||
|
|
||||||
|
|
||||||
def current_user():
|
def current_user():
|
||||||
"""(username, role) oder (None, None)"""
|
return (
|
||||||
return st.session_state.get("username"), st.session_state.get("role")
|
st.session_state.get("username"),
|
||||||
|
st.session_state.get("role"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def ensure_logged_in():
|
def ensure_logged_in():
|
||||||
"""
|
"""
|
||||||
Am Anfang jeder Page aufrufen.
|
Am Anfang jeder Page aufrufen.
|
||||||
Wenn kein Login: Login-View anzeigen und Script stoppen.
|
Prüft erst st.session_state, dann ?session_id=... in der URL.
|
||||||
"""
|
"""
|
||||||
if not is_authenticated():
|
if is_authenticated():
|
||||||
|
return
|
||||||
|
|
||||||
|
# Versuch: session_id aus URL
|
||||||
|
params = st.experimental_get_query_params()
|
||||||
|
sid_list = params.get("session_id")
|
||||||
|
sid = sid_list[0] if sid_list else None
|
||||||
|
|
||||||
|
if sid:
|
||||||
|
username, role = get_session(sid)
|
||||||
|
if username:
|
||||||
|
set_session_user(username, role, sid)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Wenn wir hier landen: kein gültiger Login → Login-View anzeigen
|
||||||
login_view()
|
login_view()
|
||||||
st.stop()
|
st.stop()
|
||||||
|
|
||||||
@@ -73,9 +173,18 @@ def login_view():
|
|||||||
submitted = st.form_submit_button("Anmelden")
|
submitted = st.form_submit_button("Anmelden")
|
||||||
|
|
||||||
if submitted:
|
if submitted:
|
||||||
ok, role = verify_user(u.strip(), p)
|
username = u.strip()
|
||||||
|
ok, role = verify_user(username, p)
|
||||||
if ok:
|
if ok:
|
||||||
set_session_user(u.strip(), role)
|
# neue Session in DB
|
||||||
|
sid = create_session(username)
|
||||||
|
|
||||||
|
# SessionState setzen
|
||||||
|
set_session_user(username, role, sid)
|
||||||
|
|
||||||
|
# URL-Parameter setzen (Arme-Leute-Cookie)
|
||||||
|
st.experimental_set_query_params(session_id=sid)
|
||||||
|
|
||||||
st.success("Erfolgreich angemeldet.")
|
st.success("Erfolgreich angemeldet.")
|
||||||
st.rerun()
|
st.rerun()
|
||||||
else:
|
else:
|
||||||
@@ -88,6 +197,16 @@ def authed_header():
|
|||||||
return
|
return
|
||||||
|
|
||||||
st.sidebar.write(f"Angemeldet als **{username}** ({role})")
|
st.sidebar.write(f"Angemeldet als **{username}** ({role})")
|
||||||
|
|
||||||
if st.sidebar.button("Logout"):
|
if st.sidebar.button("Logout"):
|
||||||
|
# Session in DB löschen, falls vorhanden
|
||||||
|
sid = st.session_state.get("session_id")
|
||||||
|
if sid:
|
||||||
|
delete_session(sid)
|
||||||
|
|
||||||
clear_session_user()
|
clear_session_user()
|
||||||
|
|
||||||
|
# URL-Parameter entfernen
|
||||||
|
st.experimental_set_query_params()
|
||||||
|
|
||||||
st.rerun()
|
st.rerun()
|
||||||
127
app/auth_core.py
Normal file
127
app/auth_core.py
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
# app/auth_core.py
|
||||||
|
|
||||||
|
from contextlib import closing
|
||||||
|
import bcrypt
|
||||||
|
|
||||||
|
from db import get_conn
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# DB Initialisierung: `users`-Tabelle
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def init_auth_db():
|
||||||
|
"""
|
||||||
|
Legt die users-Tabelle an.
|
||||||
|
WICHTIG: password_hash wird als TEXT gespeichert, damit sowohl
|
||||||
|
streamlit-authenticator als auch dein eigener bcrypt-Code kompatibel sind.
|
||||||
|
"""
|
||||||
|
with closing(get_conn()) as conn, conn:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS users (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
username TEXT UNIQUE NOT NULL,
|
||||||
|
email TEXT,
|
||||||
|
password_hash TEXT NOT NULL,
|
||||||
|
role TEXT NOT NULL DEFAULT 'user'
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Benutzer anlegen
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def create_user(username: str, password: str, role: str = "user", email: str | None = None) -> bool:
|
||||||
|
"""
|
||||||
|
Passwort wird als bcrypt-Hash (TEXT) gespeichert.
|
||||||
|
"""
|
||||||
|
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with closing(get_conn()) as conn, conn:
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO users (username, email, password_hash, role) VALUES (?, ?, ?, ?)",
|
||||||
|
(username, email, pw_hash, role),
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Benutzer überprüfen (z.B. für deine alte Streamlit-Login-Maske)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def verify_user(username: str, password: str):
|
||||||
|
"""
|
||||||
|
Vergleicht eingegebenes Passwort mit dem gespeicherten bcrypt-Hash.
|
||||||
|
Rückgabe: (True, role) oder (False, None)
|
||||||
|
"""
|
||||||
|
with closing(get_conn()) as conn:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT password_hash, role FROM users WHERE username = ?",
|
||||||
|
(username,),
|
||||||
|
).fetchone()
|
||||||
|
|
||||||
|
if not row:
|
||||||
|
return False, None
|
||||||
|
|
||||||
|
stored_hash, role = row
|
||||||
|
|
||||||
|
# stored_hash ist TEXT → zurück nach bytes
|
||||||
|
ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash.encode("utf-8"))
|
||||||
|
|
||||||
|
return (ok, role) if ok else (False, None)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Rolle für einen Benutzer holen (für Streamlit-Inhalte)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def get_role_for_user(username: str) -> str:
|
||||||
|
with closing(get_conn()) as conn:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT role FROM users WHERE username = ?",
|
||||||
|
(username,),
|
||||||
|
).fetchone()
|
||||||
|
return row[0] if row else "user"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Credential-Lader für streamlit-authenticator
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def load_credentials_from_db() -> dict:
|
||||||
|
"""
|
||||||
|
Baut das credentials-Dict so:
|
||||||
|
{
|
||||||
|
"usernames": {
|
||||||
|
"hansi": {
|
||||||
|
"email": "hansi@example.com",
|
||||||
|
"name": "hansi",
|
||||||
|
"password": "$2b$12$...",
|
||||||
|
},
|
||||||
|
...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
streamlit-authenticator prüft dann Login + Cookies automatisch.
|
||||||
|
"""
|
||||||
|
creds = {"usernames": {}}
|
||||||
|
|
||||||
|
with closing(get_conn()) as conn:
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT username, email, password_hash FROM users"
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
for username, email, pw_hash in rows:
|
||||||
|
# pw_hash kommt als TEXT
|
||||||
|
creds["usernames"][username] = {
|
||||||
|
"name": username,
|
||||||
|
"email": email or f"{username}@example.local",
|
||||||
|
"password": pw_hash, # TEXT → streamlit-authenticator-kompatibel
|
||||||
|
}
|
||||||
|
|
||||||
|
return creds
|
||||||
94
app/auth_old.py
Normal file
94
app/auth_old.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
import sqlite3
|
||||||
|
from contextlib import closing
|
||||||
|
|
||||||
|
import bcrypt
|
||||||
|
import streamlit as st
|
||||||
|
|
||||||
|
from db import get_conn
|
||||||
|
|
||||||
|
def create_user(username: str, password: str, role: str = "user") -> bool:
|
||||||
|
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
||||||
|
try:
|
||||||
|
with closing(get_conn()) as conn, conn:
|
||||||
|
conn.execute(
|
||||||
|
"INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
|
||||||
|
(username, pw_hash, role),
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except sqlite3.IntegrityError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def verify_user(username: str, password: str):
|
||||||
|
with closing(get_conn()) as conn:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT password_hash, role FROM users WHERE username = ?",
|
||||||
|
(username,),
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return False, None
|
||||||
|
stored_hash, role = row
|
||||||
|
ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash)
|
||||||
|
return (ok, role) if ok else (False, None)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- Session / Helper ----------
|
||||||
|
def set_session_user(username: str, role: str):
|
||||||
|
st.session_state["auth"] = True
|
||||||
|
st.session_state["username"] = username
|
||||||
|
st.session_state["role"] = role
|
||||||
|
|
||||||
|
|
||||||
|
def clear_session_user():
|
||||||
|
for k in ["auth", "username", "role"]:
|
||||||
|
st.session_state.pop(k, None)
|
||||||
|
|
||||||
|
|
||||||
|
def is_authenticated() -> bool:
|
||||||
|
return bool(st.session_state.get("auth"))
|
||||||
|
|
||||||
|
|
||||||
|
def current_user():
|
||||||
|
"""(username, role) oder (None, None)"""
|
||||||
|
return st.session_state.get("username"), st.session_state.get("role")
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_logged_in():
|
||||||
|
"""
|
||||||
|
Am Anfang jeder Page aufrufen.
|
||||||
|
Wenn kein Login: Login-View anzeigen und Script stoppen.
|
||||||
|
"""
|
||||||
|
if not is_authenticated():
|
||||||
|
login_view()
|
||||||
|
st.stop()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------- UI ----------
|
||||||
|
def login_view():
|
||||||
|
st.title("Intranet Login")
|
||||||
|
|
||||||
|
with st.form("login"):
|
||||||
|
u = st.text_input("Username")
|
||||||
|
p = st.text_input("Passwort", type="password")
|
||||||
|
submitted = st.form_submit_button("Anmelden")
|
||||||
|
|
||||||
|
if submitted:
|
||||||
|
ok, role = verify_user(u.strip(), p)
|
||||||
|
if ok:
|
||||||
|
set_session_user(u.strip(), role)
|
||||||
|
st.success("Erfolgreich angemeldet.")
|
||||||
|
st.rerun()
|
||||||
|
else:
|
||||||
|
st.error("Login fehlgeschlagen.")
|
||||||
|
|
||||||
|
|
||||||
|
def authed_header():
|
||||||
|
username, role = current_user()
|
||||||
|
if not username:
|
||||||
|
return
|
||||||
|
|
||||||
|
st.sidebar.write(f"Angemeldet als **{username}** ({role})")
|
||||||
|
if st.sidebar.button("Logout"):
|
||||||
|
st.session_state.pop("app_started_logged", None)
|
||||||
|
clear_session_user()
|
||||||
|
st.rerun()
|
||||||
65
app/main.py
65
app/main.py
@@ -1,10 +1,59 @@
|
|||||||
from version import __version__
|
# app/main.py
|
||||||
from logging_config import setup_logging
|
import streamlit as st
|
||||||
import os
|
|
||||||
from migrate import apply_migrations
|
|
||||||
|
|
||||||
APP_ENV = os.environ.get("APP_ENV", "dev")
|
from auth import (
|
||||||
logger = setup_logging(APP_ENV)
|
init_auth_db,
|
||||||
logger.info(f"Starting app in {APP_ENV} mode - APP-Version {__version__}")
|
ensure_logged_in,
|
||||||
|
authed_header,
|
||||||
|
current_user,
|
||||||
|
create_user, # falls du Admin-Funktion brauchst
|
||||||
|
)
|
||||||
|
|
||||||
apply_migrations()
|
|
||||||
|
def content_for(role: str, username: str):
|
||||||
|
st.header("Dashboard")
|
||||||
|
st.info(f"Willkommen, {username}!")
|
||||||
|
|
||||||
|
if role == "admin":
|
||||||
|
st.subheader("Admin-Bereich")
|
||||||
|
st.write("Nur Admins sehen das hier.")
|
||||||
|
with st.expander("Neuen Nutzer anlegen"):
|
||||||
|
new_u = st.text_input("Neuer Username", key="new_u")
|
||||||
|
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
|
||||||
|
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
|
||||||
|
if st.button("Anlegen"):
|
||||||
|
if new_u and new_p:
|
||||||
|
ok = create_user(new_u.strip(), new_p, new_role)
|
||||||
|
st.success("Nutzer angelegt.") if ok else st.error(
|
||||||
|
"Username bereits vorhanden."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
st.warning("Bitte Username und Passwort eingeben.")
|
||||||
|
|
||||||
|
st.subheader("Dein Bereich")
|
||||||
|
st.write(f"Personalisierter Content für **{username}**.")
|
||||||
|
# fachliche Inhalte …
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
st.set_page_config(
|
||||||
|
page_title="Intranet-Portal",
|
||||||
|
page_icon="🔒",
|
||||||
|
layout="centered",
|
||||||
|
)
|
||||||
|
|
||||||
|
init_auth_db()
|
||||||
|
|
||||||
|
# hier wird entweder:
|
||||||
|
# - st.session_state genutzt, oder
|
||||||
|
# - ?session_id=... aus URL geprüft, oder
|
||||||
|
# - Login-Form gezeigt (und Script gestoppt)
|
||||||
|
ensure_logged_in()
|
||||||
|
|
||||||
|
authed_header()
|
||||||
|
username, role = current_user()
|
||||||
|
content_for(role, username)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|||||||
87
app/main_authenticator.py
Normal file
87
app/main_authenticator.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
# app/main.py
|
||||||
|
import streamlit as st
|
||||||
|
import yaml
|
||||||
|
from yaml.loader import SafeLoader
|
||||||
|
import streamlit_authenticator as stauth
|
||||||
|
|
||||||
|
from auth_core import (
|
||||||
|
init_auth_db,
|
||||||
|
load_credentials_from_db,
|
||||||
|
get_role_for_user,
|
||||||
|
create_user,
|
||||||
|
)
|
||||||
|
from version import __version__
|
||||||
|
|
||||||
|
|
||||||
|
def content_for(username: str, role: str):
|
||||||
|
st.header("Dashboard")
|
||||||
|
st.info(f"Willkommen, {username}!")
|
||||||
|
|
||||||
|
if role == "admin":
|
||||||
|
st.subheader("Admin-Bereich")
|
||||||
|
st.write("Nur Admins sehen das hier.")
|
||||||
|
|
||||||
|
with st.expander("Neuen Nutzer anlegen"):
|
||||||
|
new_u = st.text_input("Neuer Username", key="new_u")
|
||||||
|
new_email = st.text_input("E-Mail", key="new_email")
|
||||||
|
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
|
||||||
|
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
|
||||||
|
|
||||||
|
if st.button("Anlegen"):
|
||||||
|
if new_u and new_p:
|
||||||
|
ok = create_user(new_u.strip(), new_p, new_role, new_email.strip() or None)
|
||||||
|
st.success("Nutzer angelegt.") if ok else st.error("Username bereits vorhanden oder Fehler.")
|
||||||
|
else:
|
||||||
|
st.warning("Bitte Username und Passwort eingeben.")
|
||||||
|
|
||||||
|
st.subheader("Dein Bereich")
|
||||||
|
st.write(f"Personalisierter Content für **{username}**.")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
st.set_page_config(
|
||||||
|
page_title=f"Intranet-Portal v{__version__}",
|
||||||
|
page_icon="🔒",
|
||||||
|
layout="centered",
|
||||||
|
)
|
||||||
|
|
||||||
|
# DB-Struktur sicherstellen
|
||||||
|
init_auth_db()
|
||||||
|
|
||||||
|
# --- Config laden (Cookie, etc.) ---
|
||||||
|
with open("config/auth.yaml", "r", encoding="utf-8") as f:
|
||||||
|
base_config = yaml.load(f, Loader=SafeLoader)
|
||||||
|
|
||||||
|
# --- Credentials dynamisch aus DB laden ---
|
||||||
|
db_creds = load_credentials_from_db()
|
||||||
|
base_config["credentials"] = db_creds
|
||||||
|
|
||||||
|
authenticator = stauth.Authenticate(
|
||||||
|
base_config["credentials"],
|
||||||
|
base_config["cookie"]["name"],
|
||||||
|
base_config["cookie"]["key"],
|
||||||
|
base_config["cookie"]["expiry_days"],
|
||||||
|
base_config.get("preauthorized", {}),
|
||||||
|
)
|
||||||
|
|
||||||
|
name, auth_status, username = authenticator.login("Login", "main")
|
||||||
|
|
||||||
|
if auth_status is False:
|
||||||
|
st.error("Login fehlgeschlagen.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if auth_status is None:
|
||||||
|
st.warning("Bitte Benutzername und Passwort eingeben.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# ---- Ab hier eingeloggt (persistenter Cookie) ----
|
||||||
|
role = get_role_for_user(username)
|
||||||
|
|
||||||
|
authenticator.logout("Logout", "sidebar")
|
||||||
|
st.sidebar.write(f"Angemeldet als **{name}** ({username}, Rolle: {role})")
|
||||||
|
|
||||||
|
content_for(username, role)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
70
app/main_old.py
Normal file
70
app/main_old.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
import streamlit as st
|
||||||
|
from version import __version__
|
||||||
|
from logging_config import setup_logging
|
||||||
|
import os
|
||||||
|
from auth import init_auth_db, ensure_logged_in, login_view, authed_header, current_user, create_user, clear_session_user
|
||||||
|
|
||||||
|
APP_ENV = os.environ.get("APP_ENV", "dev")
|
||||||
|
logger = setup_logging(APP_ENV)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def content_for(role: str, username: str):
|
||||||
|
st.header("Dashboard")
|
||||||
|
st.info(f"Willkommen, {username}!")
|
||||||
|
|
||||||
|
if role == "admin":
|
||||||
|
st.subheader("Admin-Bereich")
|
||||||
|
st.write("Nur Admins sehen das hier.")
|
||||||
|
with st.expander("Neuen Nutzer anlegen"):
|
||||||
|
new_u = st.text_input("Neuer Username", key="new_u")
|
||||||
|
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
|
||||||
|
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
|
||||||
|
if st.button("Anlegen"):
|
||||||
|
if new_u and new_p:
|
||||||
|
ok = create_user(new_u.strip(), new_p, new_role)
|
||||||
|
st.success("Nutzer angelegt.") if ok else st.error(
|
||||||
|
"Username bereits vorhanden."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
st.warning("Bitte Username und Passwort eingeben.")
|
||||||
|
|
||||||
|
st.subheader("Dein Bereich")
|
||||||
|
st.write(f"Personalisierter Content für **{username}**.")
|
||||||
|
# hier: weitere Seite(n), DB-Zugriffe etc.
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if "app_started_logged" not in st.session_state:
|
||||||
|
logger.info(f"Starting app in {APP_ENV} mode - APP-Version {__version__}")
|
||||||
|
st.session_state["app_started_logged"] = True
|
||||||
|
# logger.info(f"User Starting app in {APP_ENV} mode - APP-Version {__version__}")
|
||||||
|
st.set_page_config(
|
||||||
|
page_title="Intranet-Portal",
|
||||||
|
page_icon="🔒",
|
||||||
|
layout="centered",
|
||||||
|
)
|
||||||
|
# Falls kein Login: Login-View anzeigen und raus
|
||||||
|
if not st.session_state.get("auth"):
|
||||||
|
login_view()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Ab hier sind wir eingeloggt
|
||||||
|
authed_header()
|
||||||
|
username, role = current_user()
|
||||||
|
content_for(role, username)
|
||||||
|
|
||||||
|
st.write(st.session_state)
|
||||||
|
|
||||||
|
if st.sidebar.button("set state"):
|
||||||
|
st.session_state["app_started_logged"] = True
|
||||||
|
#clear_session_user()
|
||||||
|
st.write(st.session_state)
|
||||||
|
#st.rerun()
|
||||||
|
if st.sidebar.button("delete state"):
|
||||||
|
st.session_state.pop("app_started_logged", None)
|
||||||
|
#clear_session_user()
|
||||||
|
st.write(st.session_state)
|
||||||
|
#st.rerun()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -4,6 +4,9 @@ from pathlib import Path
|
|||||||
import logging
|
import logging
|
||||||
from logging_config import setup_logging
|
from logging_config import setup_logging
|
||||||
import os
|
import os
|
||||||
|
from contextlib import closing
|
||||||
|
import getpass
|
||||||
|
from auth import create_user
|
||||||
|
|
||||||
APP_ENV = os.environ.get("APP_ENV", "dev")
|
APP_ENV = os.environ.get("APP_ENV", "dev")
|
||||||
logger = setup_logging(APP_ENV)
|
logger = setup_logging(APP_ENV)
|
||||||
@@ -17,7 +20,7 @@ BASE_DIR = Path(__file__).resolve().parents[1]
|
|||||||
DB_DIR = BASE_DIR / "data"
|
DB_DIR = BASE_DIR / "data"
|
||||||
DB_PATH = DB_DIR / "app.db"
|
DB_PATH = DB_DIR / "app.db"
|
||||||
MIGRATIONS_DIR = BASE_DIR / "migrations"
|
MIGRATIONS_DIR = BASE_DIR / "migrations"
|
||||||
|
ADMIN_USERNAME = "admin"
|
||||||
|
|
||||||
|
|
||||||
def get_connection() -> sqlite3.Connection:
|
def get_connection() -> sqlite3.Connection:
|
||||||
@@ -74,11 +77,45 @@ def apply_migrations():
|
|||||||
# hier nicht weiter machen
|
# hier nicht weiter machen
|
||||||
raise
|
raise
|
||||||
logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}")
|
logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}")
|
||||||
#print(f"Migrationen abgeschlossen. DB: {DB_PATH}")
|
|
||||||
|
create_admin_user()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def admin_exists() -> bool:
|
||||||
|
"""Prüft, ob der Admin-User bereits existiert."""
|
||||||
|
with closing(get_connection()) as conn:
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT 1 FROM users WHERE username = ?",
|
||||||
|
(ADMIN_USERNAME,),
|
||||||
|
).fetchone()
|
||||||
|
return row is not None
|
||||||
|
|
||||||
|
|
||||||
|
def create_admin_user():
|
||||||
|
if admin_exists():
|
||||||
|
logger.info("Adminkonto existiert bereits! Kein initiales Konto angelegt.")
|
||||||
|
return
|
||||||
|
logger.info("Adminkonto wird angelegt ...")
|
||||||
|
|
||||||
|
pw1 = getpass.getpass("Passwort: ")
|
||||||
|
pw2 = getpass.getpass("Passwort wiederholen: ")
|
||||||
|
|
||||||
|
if pw1 != pw2:
|
||||||
|
logger.warning("Passwörter stimmen nicht überein! Abbruch.")
|
||||||
|
return
|
||||||
|
|
||||||
|
ok = create_user(ADMIN_USERNAME, pw1, role="admin")
|
||||||
|
|
||||||
|
if ok:
|
||||||
|
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' wurde angelegt.")
|
||||||
|
else:
|
||||||
|
# Sollte eigentlich nicht passieren, weil wir vorher geprüft haben,
|
||||||
|
# aber falls z.B. Parallelzugriff o.Ä.
|
||||||
|
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' konnte nicht angelegt werden.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
apply_migrations()
|
apply_migrations()
|
||||||
7
config/auth.yaml
Normal file
7
config/auth.yaml
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
cookie:
|
||||||
|
expiry_days: 7
|
||||||
|
key: "please_change_this_cookie_signing_key"
|
||||||
|
name: "intranet_session"
|
||||||
|
|
||||||
|
preauthorized:
|
||||||
|
emails: []
|
||||||
BIN
data/app.db
BIN
data/app.db
Binary file not shown.
10
migrations/20251129_162900_add_table_sessions.sql
Normal file
10
migrations/20251129_162900_add_table_sessions.sql
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS sessions (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
username TEXT NOT NULL,
|
||||||
|
created_at INTEGER NOT NULL,
|
||||||
|
expires_at INTEGER NOT NULL,
|
||||||
|
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
|
||||||
|
);
|
||||||
|
INSERT INTO schema_version (version) VALUES ('20251129_162900_add_table_sessions');
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
Reference in New Issue
Block a user