Add authentication
This commit is contained in:
93
app/auth.py
93
app/auth.py
@@ -0,0 +1,93 @@
|
||||
import sqlite3
|
||||
from contextlib import closing
|
||||
|
||||
import bcrypt
|
||||
import streamlit as st
|
||||
|
||||
from db import get_conn
|
||||
|
||||
def create_user(username: str, password: str, role: str = "user") -> bool:
|
||||
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
||||
try:
|
||||
with closing(get_conn()) as conn, conn:
|
||||
conn.execute(
|
||||
"INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)",
|
||||
(username, pw_hash, role),
|
||||
)
|
||||
return True
|
||||
except sqlite3.IntegrityError:
|
||||
return False
|
||||
|
||||
|
||||
def verify_user(username: str, password: str):
|
||||
with closing(get_conn()) as conn:
|
||||
row = conn.execute(
|
||||
"SELECT password_hash, role FROM users WHERE username = ?",
|
||||
(username,),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return False, None
|
||||
stored_hash, role = row
|
||||
ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash)
|
||||
return (ok, role) if ok else (False, None)
|
||||
|
||||
|
||||
# ---------- Session / Helper ----------
|
||||
def set_session_user(username: str, role: str):
|
||||
st.session_state["auth"] = True
|
||||
st.session_state["username"] = username
|
||||
st.session_state["role"] = role
|
||||
|
||||
|
||||
def clear_session_user():
|
||||
for k in ["auth", "username", "role"]:
|
||||
st.session_state.pop(k, None)
|
||||
|
||||
|
||||
def is_authenticated() -> bool:
|
||||
return bool(st.session_state.get("auth"))
|
||||
|
||||
|
||||
def current_user():
|
||||
"""(username, role) oder (None, None)"""
|
||||
return st.session_state.get("username"), st.session_state.get("role")
|
||||
|
||||
|
||||
def ensure_logged_in():
|
||||
"""
|
||||
Am Anfang jeder Page aufrufen.
|
||||
Wenn kein Login: Login-View anzeigen und Script stoppen.
|
||||
"""
|
||||
if not is_authenticated():
|
||||
login_view()
|
||||
st.stop()
|
||||
|
||||
|
||||
# ---------- UI ----------
|
||||
def login_view():
|
||||
st.title("Intranet Login")
|
||||
|
||||
with st.form("login"):
|
||||
u = st.text_input("Username")
|
||||
p = st.text_input("Passwort", type="password")
|
||||
submitted = st.form_submit_button("Anmelden")
|
||||
|
||||
if submitted:
|
||||
ok, role = verify_user(u.strip(), p)
|
||||
if ok:
|
||||
set_session_user(u.strip(), role)
|
||||
st.success("Erfolgreich angemeldet.")
|
||||
st.rerun()
|
||||
else:
|
||||
st.error("Login fehlgeschlagen.")
|
||||
|
||||
|
||||
def authed_header():
|
||||
username, role = current_user()
|
||||
if not username:
|
||||
return
|
||||
|
||||
st.sidebar.write(f"Angemeldet als **{username}** ({role})")
|
||||
if st.sidebar.button("Logout"):
|
||||
clear_session_user()
|
||||
st.rerun()
|
||||
10
app/db.py
10
app/db.py
@@ -0,0 +1,10 @@
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent.parent
|
||||
DB_PATH = BASE_DIR / "data" / "app.db"
|
||||
|
||||
|
||||
def get_conn():
|
||||
# check_same_thread=False, damit Streamlit mehrere Threads nutzen kann
|
||||
return sqlite3.connect(DB_PATH, check_same_thread=False)
|
||||
|
||||
Reference in New Issue
Block a user