Compare commits

12 Commits
main ... dev

Author SHA1 Message Date
knedlik
96be7567eb Add database functions 2025-12-05 22:52:38 +01:00
handi
3c0e58dfc1 Add sidebar builder 2025-12-04 22:08:15 +01:00
handi
e2d96c712e Update migrations 2025-12-03 22:03:15 +01:00
handi
ff7560b263 Add logo 2025-12-02 08:37:20 +01:00
handi
ed04a75ce2 Add navigation in sidebar 2025-11-30 23:03:25 +01:00
handi
5277830c50 Add change password 2025-11-30 16:34:34 +01:00
handi
111d0d2756 Add some new fields for users table 2025-11-30 14:55:02 +01:00
hansi
84e1e152f8 Add session-state and cookies 2025-11-30 12:35:21 +01:00
hansi
06e5322931 Add initialize admin-account in migration 2025-11-29 16:27:34 +01:00
hansi
6158f2ddff Add authentication 2025-11-29 12:04:58 +01:00
hansi
fe33d714bd add logging and database migration 2025-11-29 00:15:41 +01:00
hansi
6f9c5eb34e Initialize project structure 2025-11-28 08:53:29 +01:00
41 changed files with 1697 additions and 1 deletions

154
README.md
View File

@@ -1,3 +1,155 @@
# co_app
Controlling-App
## Insallation
1. Git-Repository clonen
2.
## Entwicklungs- und Produktionsumgebung steuern (APP_ENV)
Die App unterscheidet zwischen Entwicklung und Produktion über die Umgebungsvariable APP_ENV.
### Entwicklung (lokal)
Im Terminal setzen:
```bash
export APP_ENV=dev
```
Die Variable gilt, solange das Terminal geöffnet ist.
Beim Starten der App:
```bash
streamlit run app/main.py
```
Die App erkennt automatisch den Dev-Modus.
### Produktion (Systemd-Dienst)
Für den produktiven Betrieb setzt systemd die Variable dauerhaft auf prod:
```ini
Environment=APP_ENV=prod
```
In prod-Modus laufen z. B. Logging und Pfade anders als lokal.
### systemd-Dienst einrichten
``/etc/systemd/system/myapp.service``
```ini
[Unit]
Description=MyApp Streamlit Service
After=network.target
[Service]
User=co_app
Group=co_app
WorkingDirectory=/opt/co_app
Environment=APP_ENV=prod
ExecStart=/usr/bin/python3 -m streamlit run app/main.py --server.port=8501
Restart=on-failure
[Install]
WantedBy=multi-user.target
```
**Installation & Start**
```bash
sudo systemctl daemon-reload
sudo systemctl enable co_app.service
sudo systemctl start co_app.service
```
**Logs ansehen**
Da streamlit + python ins systemd-Journal loggen:
```bash
journalctl -u co_app.service -f
```
**Wichtig**
- APP_ENV=dev → Entwicklung, lokale Logs, lokale DB
- APP_ENV=prod → Systemd-Betrieb, serverseitige Pfade, Logging ins Journal
- Die App liest die Variable mit:
```python
import os
APP_ENV = os.environ.get("APP_ENV", "dev")
```
## Database Setup & Migration Guide
Dieses Projekt verwendet SQLite für die interne App-Datenbank.
Das Schema wird vollständig über SQL-Migrationsdateien verwaltet, die automatisch in der richtigen Reihenfolge ausgeführt werden.
**WICHTIG:**
Bei der ersten Initialisierung muss ein admin-Passwort vergeben werden. Beim erstmaligen Anmelden an der App muss man sich dann als "adim" mit dem vergebenen Passwort anmelden. In der App können dann User angelegt.
### Ordnerstruktur
```bash
data/ ← enthält die erzeugte SQLite-Datenbank (app.db)
migrations/ ← enthält alle SQL-Migrationsdateien
migrate.py ← Python-Skript zum Anwenden der Migrationen
```
### Erstellen der Datenbank (Initial Setup)
```bash
python migrate.py
```
Das Skript:
1. erstellt den Ordner db/ (falls nicht vorhanden)
2. erzeugt eine neue Datei db/app.db
3. führt alle SQL-Dateien im Ordner migrations/ der Reihe nach aus
4. protokolliert die angewendeten Migrationen in der Tabelle schema_version
Danach ist die Datenbank vollständig initialisiert und einsatzbereit.
### Arbeiten mit Migrationen (Änderungen am Schema)
Änderungen an der Datenbankstruktur werden nie direkt an der DB vorgenommen.
Stattdessen erzeugst du eine neue Migrationsdatei im Ordner migrations/.
**Beispiel: neue Spalte hinzufügen**
Lege eine Datei an
```bash
touch /migrations/20250301_101500_add_last_login.sql
```
Dateiinhalt
```sql
BEGIN;
ALTER TABLE users ADD COLUMN last_login DATETIME;
INSERT INTO schema_version (version)
VALUES ('20250301_101500_add_last_login');
COMMIT;
```
Migration anwenden
```bash
python migrate.py
```
Das Skript erkennt automatisch, dass diese Migration neu ist und führt sie aus.
### Erneutes Ausführen (Updates)
Wenn eine bestehende Installation aktualisiert werden soll:
```bash
git pull
python migrate.py
```
Das Skript:
- liest die Tabelle schema_version
- führt nur Migrationen aus, die noch nicht angewendet wurden
Bestehende Strukturen bleiben unberührt.
### Entwicklungsphase: Hinweise
Solange das Projekt noch nicht produktiv genutzt wird, gilt:
- du kannst Migrationsdateien löschen/zusammenführen
- du kannst die komplette app.db jederzeit löschen
- vor dem ersten echten Deploy kannst du alle Migrationen zu einer sauberen initial.sql zusammenfassen
👉 **Nach dem ersten produktiven Einsatz** werden Migrationen nie mehr gelöscht, sondern nur ergänzt.
### Optional: DB neu erzeugen für Tests
Um eine frische Datenbank zu bekommen:
```bash
rm -f data/app.db
python migrate.py
```

View File

@@ -0,0 +1,2 @@
[client]
showSidebarNavigation = false

0
app/__init__.py Normal file
View File

BIN
app/app_db/app.db Normal file

Binary file not shown.

17
app/app_db/app_db.py Normal file
View File

@@ -0,0 +1,17 @@
import sqlite3
from pathlib import Path
import pandas as pd
BASE_DIR = Path(__file__).resolve().parent
# DB_PATH = BASE_DIR / "app_db" / "app.db"
DB_PATH = BASE_DIR / "app.db"
def get_conn():
# check_same_thread=False, damit Streamlit mehrere Threads nutzen kann
return sqlite3.connect(DB_PATH, check_same_thread=False)
def get_list(sql, params=None):
conn = get_conn()
df = pd.read_sql_query(sql, conn, params=params)
conn.close()
return df

168
app/auth.py Normal file
View File

@@ -0,0 +1,168 @@
from contextlib import closing
import bcrypt
from app_db.app_db import get_conn
# ---------------------------------------------------------------------------
# Benutzer anlegen
# ---------------------------------------------------------------------------
def create_user(
username: str,
password: str,
role_id: int = 1,
email: str | None = None,
firstname: str | None = None,
lastname: str | None = None
) -> bool:
"""
Passwort wird als bcrypt-Hash (TEXT) gespeichert.
"""
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
try:
with closing(get_conn()) as conn, conn:
conn.execute(
"INSERT INTO users (username, email, password_hash, role_id, firstname, lastname) VALUES (?, ?, ?, ?, ?, ?)",
(username, email, pw_hash, role_id, firstname, lastname),
)
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# Benutzer überprüfen (z.B. für deine alte Streamlit-Login-Maske)
# ---------------------------------------------------------------------------
def verify_user(username: str, password: str):
"""
Vergleicht eingegebenes Passwort mit dem gespeicherten bcrypt-Hash.
Rückgabe: (True, role_id) oder (False, None)
"""
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT password_hash, role_id FROM users WHERE username = ?",
(username,),
).fetchone()
if not row:
return False, None
stored_hash, role_id = row
# stored_hash ist TEXT → zurück nach bytes
ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash.encode("utf-8"))
return (ok, role_id) if ok else (False, None)
# ---------------------------------------------------------------------------
# Rolle für einen Benutzer holen (für Streamlit-Inhalte)
# ---------------------------------------------------------------------------
def get_role_for_user(username: str) -> str:
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT r.role_text from users u left join roles r on u.role_id = r.role_id WHERE username = ?",
(username,),
).fetchone()
return row[0] if row else "user"
# ---------------------------------------------------------------------------
# Fullname für einen Benutzer holen (für Streamlit-Inhalte)
# ---------------------------------------------------------------------------
def get_fullname_for_user(username: str) -> str:
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT firstname ||' '|| lastname as fullname FROM users WHERE username = ?",
(username,),
).fetchone()
return row[0] if row else "user"
# ---------------------------------------------------------------------------
# Credential-Lader für streamlit-authenticator
# ---------------------------------------------------------------------------
def load_credentials_from_db() -> dict:
"""
Baut das credentials-Dict so:
{
"usernames": {
"hansi": {
"email": "hansi@example.com",
"name": "hansi",
"password": "$2b$12$...",
},
...
}
}
streamlit-authenticator prüft dann Login + Cookies automatisch.
"""
creds = {"usernames": {}}
with closing(get_conn()) as conn:
rows = conn.execute(
"SELECT username, email, password_hash FROM users"
).fetchall()
for username, email, pw_hash in rows:
# pw_hash kommt als TEXT
creds["usernames"][username] = {
"name": username,
"email": email or f"{username}@example.local",
"password": pw_hash, # TEXT → streamlit-authenticator-kompatibel
}
return creds
# ---------------------------------------------------------------------------
# Muss das Passwort geändert werden (Admin-Vorgabe)?
# ---------------------------------------------------------------------------
def needs_password_change(username: str) -> bool:
"""Prüft, ob der User ein neues Passwort setzen muss (new_pwd = 1)."""
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT new_pwd FROM users WHERE username = ?",
(username,),
).fetchone()
if not row:
return False
return row[0] == 1
# ---------------------------------------------------------------------------
# Passwort ändern durch den Benutzer
# ---------------------------------------------------------------------------
def update_password(username: str, new_password: str, reset_flag: bool = True) -> bool:
"""
Setzt ein neues Passwort für den User.
Wenn reset_flag=True: new_pwd wird auf 0 zurückgesetzt.
"""
pw_hash = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
with closing(get_conn()) as conn, conn:
if reset_flag:
conn.execute(
"""
UPDATE users
SET password_hash = ?, new_pwd = 0
WHERE username = ?
""",
(pw_hash, username),
)
else:
conn.execute(
"""
UPDATE users
SET password_hash = ?
WHERE username = ?
""",
(pw_hash, username),
)
return True

71
app/auth_runtime.py Normal file
View File

@@ -0,0 +1,71 @@
import streamlit as st
import yaml
from yaml.loader import SafeLoader
import streamlit_authenticator as stauth
from streamlit_authenticator.utilities.exceptions import LoginError
from auth import load_credentials_from_db, needs_password_change, update_password
def get_authenticator():
with open("config/auth.yaml", "r", encoding="utf-8") as f:
base_config = yaml.load(f, Loader=SafeLoader)
db_creds = load_credentials_from_db()
base_config["credentials"] = db_creds
authenticator = stauth.Authenticate(
base_config["credentials"],
base_config["cookie"]["name"],
base_config["cookie"]["key"],
base_config["cookie"]["expiry_days"],
)
return authenticator
def require_login():
"""Sicherstellen, dass der User eingeloggt ist. Auf JEDER Page verwenden."""
authenticator = get_authenticator()
try:
authenticator.login(location="main", key="Login")
except LoginError:
authenticator.logout("ForceLogout", "sidebar")
st.error("Sitzung ungültig. Bitte neu einloggen.")
st.stop()
auth_status = st.session_state.get("authentication_status")
if auth_status is False:
st.error("Login fehlgeschlagen.")
st.stop()
if auth_status is None:
st.warning("Bitte Benutzername und Passwort eingeben.")
st.stop()
# Passwortwechsel erzwingen
username = st.session_state.get("username")
if needs_password_change(username):
st.warning("Du musst dein Passwort ändern, bevor du die Anwendung nutzen kannst.")
with st.form("pw_change_form"):
pw1 = st.text_input("Neues Passwort", type="password")
pw2 = st.text_input("Neues Passwort (Wiederholung)", type="password")
submitted = st.form_submit_button("Passwort ändern")
if submitted:
if not pw1 or not pw2:
st.error("Bitte beide Passwortfelder ausfüllen.")
st.stop()
if pw1 != pw2:
st.error("Passwörter stimmen nicht überein.")
st.stop()
if len(pw1) < 8:
st.error("Passwort sollte mindestens 8 Zeichen lang sein.")
st.stop()
update_password(username, pw1, reset_flag=True)
st.success("Passwort wurde geändert.")
st.rerun()
st.stop()
return authenticator

View File

@@ -0,0 +1,45 @@
import streamlit as st
from auth_runtime import require_login
from ui.sidebar import build_sidebar, hide_sidebar_if_logged_out
from auth import get_fullname_for_user
hide_sidebar_if_logged_out()
st.set_page_config(page_title="Co-App Home", page_icon="🏠")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
build_sidebar()
def render(username: str, role: str):
st.title("Benutzerverwaltung")
with st.expander("Neuen Nutzer anlegen"):
new_u = st.text_input("Neuer Username", key="new_u")
new_fname = st.text_input("Vorname", key="new_fname")
new_lname = st.text_input("Nachname", key="new_lname")
new_email = st.text_input("E-Mail", key="new_email")
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
if st.button("Anlegen"):
if new_u and new_p:
ok = create_user(
new_u.strip(),
new_p,
new_role,
new_email.strip() or None,
new_fname.strip() or None,
new_lname.strip() or None,
)
st.success("Nutzer angelegt.") if ok else st.error(
"Username bereits vorhanden oder Fehler."
)
else:
st.warning("Bitte Username und Passwort eingeben.")
st.subheader("Dein Bereich")
st.write(f"Personalisierter Content für **{username}**.")

View File

@@ -0,0 +1,24 @@
import importlib
def get_dashboard_renderer(code: str):
"""
Liefert die passende Render-Funktion zum Dashboard-Code.
- "home" -> interne Funktion home_dashboard
- alles andere -> Modul app.dashboards.<code> mit Funktion render(username, role)
"""
if code == "home":
return home_dashboard
module_name = f"app.dashboards.{code}"
try:
module = importlib.import_module(module_name)
except ModuleNotFoundError:
# Zum Debuggen:
st.error(f"Modul '{module_name}' wurde nicht gefunden.")
return None
if not hasattr(module, "render"):
st.error(f"Modul '{module_name}' hat keine Funktion render().")
return None
return module.render

46
app/dashboards/home.py Normal file
View File

@@ -0,0 +1,46 @@
import streamlit as st
from auth import create_user, get_fullname_for_user, get_role_for_user
from ui.sidebar import build_sidebar
build_sidebar
st.header("Controlling-Portal")
st.info(f"Willkommen, {username}!")
# if role == "admin":
# st.subheader("Admin-Bereich")
# st.write("Nur Admins sehen das hier.")
# with st.expander("Neuen Nutzer anlegen"):
# new_u = st.text_input("Neuer Username", key="new_u")
# new_fname = st.text_input("Vorname", key="new_fname")
# new_lname = st.text_input("Nachname", key="new_lname")
# new_email = st.text_input("E-Mail", key="new_email")
# new_p = st.text_input("Neues Passwort", type="password", key="new_p")
# new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
# if st.button("Anlegen"):
# if new_u and new_p:
# ok = create_user(
# new_u.strip(),
# new_p,
# new_role,
# new_email.strip() or None,
# new_fname.strip() or None,
# new_lname.strip() or None,
# )
# st.success("Nutzer angelegt.") if ok else st.error(
# "Username bereits vorhanden oder Fehler."
# )
# else:
# st.warning("Bitte Username und Passwort eingeben.")
# st.subheader("Dein Bereich")
# st.write(f"Personalisierter Content für **{username}**.")

0
app/data/__init__.py Normal file
View File

36
app/data/db.py Normal file
View File

@@ -0,0 +1,36 @@
from sqlalchemy import create_engine, Text
import pandas as pd
from dotenv import load_dotenv
from pathlib import Path
from urllib.parse import quote
import os
import logging
env_path = Path("config/settings.env")
load_dotenv(env_path)
oracle_conn_str = os.getenv("oracle_conn_str")
co_dw_conn_str = os.getenv("co_dw_conn_str")
co_daten_conn_str = os.getenv("co_daten_conn_str")
def get_conn(db):
match db:
case "oracle":
engine = create_engine(oracle_conn_str)
case "co_dw":
engine = create_engine(co_dw_conn_str)
case "co_daten":
engine = create_engine(co_dw_conn_str)
case _:
logging.info(f"Datenbank {db} konnte nicht gefunden werden")
return engine
# def get_data(db):
# engine = get_conn(db)
# with engine.connect() as conn:
# print(engine)
# return
# if __name__ == "__main__":
# get_data("co_daten")

10
app/data/scriptloader.py Normal file
View File

@@ -0,0 +1,10 @@
from pathlib import Path
def get_sql(filename):
sql_query = Path(f"app/data/sql/{filename}.sql").read_text()
return sql_query
if __name__ == "__main__":
print(get_sql("sales_umsatz"))

View File

@@ -0,0 +1 @@
select * from bi.Dim_Kostenobjekt

View File

@@ -0,0 +1 @@
select * from umsatz

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

24
app/logging_config.py Normal file
View File

@@ -0,0 +1,24 @@
import logging
from pathlib import Path
def setup_logging(app_env: str):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
# Always log to console (terminal / systemd)
sh = logging.StreamHandler()
sh.setFormatter(fmt)
logger.addHandler(sh)
# Dev → additional file logging
if app_env == "dev":
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
fh = logging.FileHandler(log_dir / "app.log", encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(fh)
return logger

32
app/main.py Normal file
View File

@@ -0,0 +1,32 @@
import streamlit as st
import logging
from logging_config import setup_logging
from version import __version__
from auth_runtime import require_login
from ui.sidebar import build_sidebar
import os
APP_ENV = os.environ.get("APP_ENV", "dev")
logger = setup_logging(APP_ENV)
#logger.info(f"Starting migration - APP-Version {__version__}")
logger = logging.getLogger(__name__)
def main():
st.set_page_config(
page_title=f"Co-App Start - V{__version__}",
page_icon="🔒",
layout="centered",
)
authenticator = require_login()
# damit build_sidebar den authenticator findet:
st.session_state["authenticator"] = authenticator
build_sidebar()
st.header("Controlling-Portal")
st.info(f"Willkommen, {st.session_state.get('username')}!")
if __name__ == "__main__":
main()

494
app/main_v1.py Normal file
View File

@@ -0,0 +1,494 @@
import importlib
import streamlit as st
import yaml
from yaml.loader import SafeLoader
import streamlit_authenticator as stauth
from streamlit_authenticator.utilities.exceptions import LoginError
import pandas as pd
from contextlib import closing
from auth import (
load_credentials_from_db,
get_role_for_user,
create_user,
needs_password_change,
update_password,
get_fullname_for_user,
)
from version import __version__
from db import get_conn # an deine Struktur anpassen
# ---------------------------------------------------------------------------
# Dashboards aus der DB laden
# ---------------------------------------------------------------------------
def load_dashboards_df() -> pd.DataFrame:
"""Lädt alle aktiven Dashboards aus der DB als DataFrame."""
with closing(get_conn()) as conn:
df = pd.read_sql_query(
"""
SELECT
d.dash_id,
d.dash_text as code,
d.dash_text as title,
g.group_text as grp
from
dashboards d
left join groups g
on d.group_id = g.group_id
where
d.active = 1
order by
d.dash_id
""",
conn,
)
return df
# ---------------------------------------------------------------------------
# Home-Dashboard (als Funktion, kein eigenes .py nötig)
# ---------------------------------------------------------------------------
def home_dashboard(username: str, role: str):
st.header("Controlling-Portal")
st.info(f"Willkommen, {username}!")
if role == "admin":
st.subheader("Admin-Bereich")
st.write("Nur Admins sehen das hier.")
with st.expander("Neuen Nutzer anlegen"):
new_u = st.text_input("Neuer Username", key="new_u")
new_fname = st.text_input("Vorname", key="new_fname")
new_lname = st.text_input("Nachname", key="new_lname")
new_email = st.text_input("E-Mail", key="new_email")
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
if st.button("Anlegen"):
if new_u and new_p:
ok = create_user(
new_u.strip(),
new_p,
new_role,
new_email.strip() or None,
new_fname.strip() or None,
new_lname.strip() or None,
)
st.success("Nutzer angelegt.") if ok else st.error(
"Username bereits vorhanden oder Fehler."
)
else:
st.warning("Bitte Username und Passwort eingeben.")
st.subheader("Dein Bereich")
st.write(f"Personalisierter Content für **{username}**.")
# ---------------------------------------------------------------------------
# Dashboard-Resolver: code -> render-Funktion
# ---------------------------------------------------------------------------
def get_dashboard_renderer(code: str):
"""
Liefert die passende Render-Funktion zum Dashboard-Code.
Konvention:
- Home: interne Funktion home_dashboard(username, role)
- andere: Modul dashboards.<code> mit Funktion render(username, role)
"""
if code == "home":
return home_dashboard
module_name = f"dashboards.{code}"
try:
module = importlib.import_module(module_name)
except ModuleNotFoundError:
return None
# Wir erwarten eine Funktion render(username, role)
return getattr(module, "render", None)
# ---------------------------------------------------------------------------
# Sidebar: Home + Suche + DB-Dashboards (Gruppen-Expander)
# ---------------------------------------------------------------------------
def build_sidebar(authenticator, username: str, df_dashboards: pd.DataFrame):
role = get_role_for_user(username)
fullname = get_fullname_for_user(username)
with st.sidebar:
st.logo("app/images/GMN_Logo_neu_rgb.png",size="small")
# st.markdown("*Controlling*")
st.write(f"**{fullname}** ({role})")
col1, col2 = st.columns(2)
with col1:
if st.button("Logout", use_container_width=True):
authenticator.logout("Logout", "unrendered")
st.rerun()
#authenticator.logout("Logout", "sidebar")
with col2:
# st.page_link("pages/home.py", label="Home", icon="🏠")
if st.button("🏠 Home", use_container_width=True):
st.session_state["selected_dashboard_code"] = "home"
# st.divider()
st.markdown("### Dashboards")
# st.divider()
# Default-Auswahl
if "selected_dashboard_code" not in st.session_state:
st.session_state["selected_dashboard_code"] = "home"
# Home ist immer da
# if st.button("🏠 Home", use_container_width=True):
# st.session_state["selected_dashboard_code"] = "home"
# st.markdown("---")
# Suchfeld für DB-Dashboards
query = st.text_input("Suche", placeholder="Dashboard suchen …")
filtered = df_dashboards.copy()
if query:
q = query.lower()
filtered = filtered[
filtered["title"].str.lower().str.contains(q, na=False)
| filtered["grp"].str.lower().str.contains(q, na=False)
| filtered["description"].fillna("").str.lower().str.contains(q, na=False)
]
if filtered.empty:
st.info("Keine Dashboards zur Suche gefunden.")
return
# Gruppen -> Expander, darin Buttons für Dashboards
for grp, grp_df in filtered.groupby("grp"):
with st.expander(grp, expanded=False):
for _, row in grp_df.iterrows():
code = row["code"]
title = row["title"]
if st.button(title, key=f"dash_btn_{code}", use_container_width=True):
st.session_state["selected_dashboard_code"] = code
# ---------------------------------------------------------------------------
# Login + App-Shell
# ---------------------------------------------------------------------------
def main():
st.set_page_config(
page_title=f"Co-App Start - V{__version__}",
page_icon="🔒",
layout="centered",
)
# --- Authenticator vorbereiten ---
with open("config/auth.yaml", "r", encoding="utf-8") as f:
base_config = yaml.load(f, Loader=SafeLoader)
db_creds = load_credentials_from_db()
base_config["credentials"] = db_creds
authenticator = stauth.Authenticate(
base_config["credentials"],
base_config["cookie"]["name"],
base_config["cookie"]["key"],
base_config["cookie"]["expiry_days"],
)
# --- Login-View zeichnen ---
try:
authenticator.login(location="main", key="Login")
except LoginError:
authenticator.logout("ForceLogout", "sidebar")
st.error("Sitzung ungültig. Bitte neu einloggen.")
return
auth_status = st.session_state.get("authentication_status")
name = st.session_state.get("name")
username = st.session_state.get("username")
if auth_status is False:
st.error("Login fehlgeschlagen.")
return
if auth_status is None:
st.warning("Bitte Benutzername und Passwort eingeben.")
return
# ---- Ab hier eingeloggt (persistenter Cookie) ----
# 1) Passwortwechsel erzwingen?
if needs_password_change(username):
st.warning("Du musst dein Passwort ändern, bevor du die Anwendung nutzen kannst.")
with st.form("pw_change_form"):
pw1 = st.text_input("Neues Passwort", type="password")
pw2 = st.text_input("Neues Passwort (Wiederholung)", type="password")
submitted = st.form_submit_button("Passwort ändern")
if submitted:
if not pw1 or not pw2:
st.error("Bitte beide Passwortfelder ausfüllen.")
return
if pw1 != pw2:
st.error("Passwörter stimmen nicht überein.")
return
if len(pw1) < 8:
st.error("Passwort sollte mindestens 8 Zeichen lang sein.")
return
update_password(username, pw1, reset_flag=True)
st.success("Passwort wurde geändert.")
st.rerun()
# solange new_pwd=1: keinen weiteren Inhalt anzeigen
return
# 2) Dashboards aus DB laden (ohne Home)
df_dashboards = load_dashboards_df()
# 3) Sidebar aufbauen (User, Logout, Navigation)
build_sidebar(authenticator, username, df_dashboards)
# 4) Ausgewähltes Dashboard rendern
role = get_role_for_user(username)
selected_code = st.session_state.get("selected_dashboard_code", "home")
renderer = get_dashboard_renderer(selected_code)
if renderer is None:
st.error(f"Kein Dashboard-Modul für '{selected_code}' gefunden.")
return
# Titel setzen
if selected_code == "home":
title = "Home"
else:
row = df_dashboards[df_dashboards["code"] == selected_code]
title = row["title"].iloc[0] if not row.empty else selected_code
st.title(title)
renderer(username, role)
if __name__ == "__main__":
main()
# import importlib
# import streamlit as st
# import yaml
# from yaml.loader import SafeLoader
# import streamlit_authenticator as stauth
# from streamlit_authenticator.utilities.exceptions import LoginError
# from auth import (
# load_credentials_from_db,
# get_role_for_user,
# create_user,
# needs_password_change,
# update_password,
# get_fullname_for_user,
# )
# from version import __version__
# # später: from db import get_conn, um Dashboards aus DB zu laden
# # ---------------------------------------------------------------------------
# # Home-Dashboard (als Funktion, kein eigenes .py nötig)
# # ---------------------------------------------------------------------------
# def home_dashboard(username: str, role: str):
# # st.header("Controlling-Portal")
# st.info(f"Willkommen, {username}!")
# # ---------------------------------------------------------------------------
# # Dashboard-Resolver (später: weitere Dashboards dynamisch laden)
# # ---------------------------------------------------------------------------
# def get_dashboard_renderer(code: str):
# """Liefert die passende Render-Funktion zum Dashboard-Code."""
# if code == "home":
# # internes Home-Dashboard
# return home_dashboard
# # Später: Module anhand 'code' aus dashboards-Package laden
# # Beispiel: dashboards/sales.py → code = "sales"
# try:
# module = importlib.import_module(f"dashboards.{code}")
# except ModuleNotFoundError:
# return None
# return getattr(module, "render", None)
# # ---------------------------------------------------------------------------
# # Sidebar Navigation (Home + später DB-Dashboards)
# # ---------------------------------------------------------------------------
# def build_sidebar(authenticator, username: str):
# role = get_role_for_user(username)
# fullname = get_fullname_for_user(username)
# with st.sidebar:
# st.write(f"**{fullname}** ({role})")
# authenticator.logout("Logout", "sidebar")
# st.divider()
# # Default-Auswahl
# if "selected_dashboard_code" not in st.session_state:
# st.session_state["selected_dashboard_code"] = "home"
# # Home ist immer da
# if st.button("🏠 Home", use_container_width=True):
# st.session_state["selected_dashboard_code"] = "home"
# # Platzhalter für Suche + DB-Dashboards:
# # hier später: Suchfeld + Expander aus DB-Result (DataFrame)
# # z.B.:
# # query = st.text_input("Dashboards suchen …")
# # df = load_dashboards_df()
# # ... filtern, gruppieren, Buttons setzen ...
# # ---------------------------------------------------------------------------
# # Login + App-Shell
# # ---------------------------------------------------------------------------
# def main():
# st.set_page_config(
# page_title=f"Co-App Start - V{__version__}",
# page_icon="🔒",
# layout="centered",
# )
# # --- Authenticator vorbereiten ---
# with open("config/auth.yaml", "r", encoding="utf-8") as f:
# base_config = yaml.load(f, Loader=SafeLoader)
# db_creds = load_credentials_from_db()
# base_config["credentials"] = db_creds
# authenticator = stauth.Authenticate(
# base_config["credentials"],
# base_config["cookie"]["name"],
# base_config["cookie"]["key"],
# base_config["cookie"]["expiry_days"],
# )
# # --- Login-View zeichnen ---
# try:
# authenticator.login(location="main", key="Login")
# except LoginError:
# authenticator.logout("ForceLogout", "sidebar")
# st.error("Sitzung ungültig. Bitte neu einloggen.")
# return
# auth_status = st.session_state.get("authentication_status")
# name = st.session_state.get("name")
# username = st.session_state.get("username")
# if auth_status is False:
# st.error("Login fehlgeschlagen.")
# return
# if auth_status is None:
# st.warning("Bitte Benutzername und Passwort eingeben.")
# return
# # ---- Ab hier eingeloggt (persistenter Cookie) ----
# # 1) Passwortwechsel erzwingen?
# if needs_password_change(username):
# st.warning("Du musst dein Passwort ändern, bevor du die Anwendung nutzen kannst.")
# with st.form("pw_change_form"):
# pw1 = st.text_input("Neues Passwort", type="password")
# pw2 = st.text_input("Neues Passwort (Wiederholung)", type="password")
# submitted = st.form_submit_button("Passwort ändern")
# if submitted:
# if not pw1 or not pw2:
# st.error("Bitte beide Passwortfelder ausfüllen.")
# return
# if pw1 != pw2:
# st.error("Passwörter stimmen nicht überein.")
# return
# if len(pw1) < 8:
# st.error("Passwort sollte mindestens 8 Zeichen lang sein.")
# return
# update_password(username, pw1, reset_flag=True)
# st.success("Passwort wurde geändert.")
# st.rerun()
# # solange new_pwd=1: keinen weiteren Inhalt anzeigen
# return
# # 2) Sidebar aufbauen (User, Logout, Navigation)
# build_sidebar(authenticator, username)
# # 3) Ausgewähltes Dashboard rendern
# role = get_role_for_user(username)
# selected_code = st.session_state.get("selected_dashboard_code", "home")
# renderer = get_dashboard_renderer(selected_code)
# if renderer is None:
# st.error(f"Kein Dashboard-Modul für '{selected_code}' gefunden.")
# return
# # Titel setzen (Home vs. andere Dashboards)
# if selected_code == "home":
# title = "Home"
# else:
# title = selected_code # später kannst du den Titel aus der DB holen
# st.title(title)
# renderer(username, role)
# if __name__ == "__main__":
# main()

133
app/migrate.py Normal file
View File

@@ -0,0 +1,133 @@
import sqlite3
from version import __version__
from pathlib import Path
import logging
from logging_config import setup_logging
import os
from contextlib import closing
import getpass
from auth import create_user
APP_ENV = os.environ.get("APP_ENV", "dev")
logger = setup_logging(APP_ENV)
logger.info(f"Starting migration - APP-Version {__version__}")
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).resolve().parents[1]
DB_DIR = BASE_DIR / "app" / "app_db"
DB_PATH = DB_DIR / "app.db"
MIGRATIONS_DIR = BASE_DIR / "migrations"
ADMIN_USERNAME = "admin"
print(BASE_DIR)
def get_connection() -> sqlite3.Connection:
DB_DIR.mkdir(exist_ok=True)
conn = sqlite3.connect(DB_PATH)
return conn
def get_applied_versions(conn: sqlite3.Connection) -> set[str]:
"""
Ließt aus schema_version, welche Migrationen schon gelaufen sind.
Falls die Tabelle noch nicht existiert → leeres Set.
"""
try:
cur = conn.execute("SELECT version FROM schema_version")
rows = cur.fetchall()
return {r[0] for r in rows}
except sqlite3.OperationalError:
# Tabelle existiert noch nicht (z.B. bei initialer DB)
return set()
def apply_migrations():
if not MIGRATIONS_DIR.exists():
raise SystemExit(f"Migrations-Ordner nicht gefunden: {MIGRATIONS_DIR}")
conn = get_connection()
try:
applied = get_applied_versions(conn)
# Alle .sql-Dateien alphabetisch sortiert
sql_files = sorted(MIGRATIONS_DIR.glob("*.sql"))
if not sql_files:
logger.info("Keine Migrationsdateien gefunden.")
return
for path in sql_files:
version = path.stem # z.B. '20250220_120000_initial'
if version in applied:
logger.info(f"[SKIP] {version} (bereits angewendet)")
continue
logger.info(f"[APPLY] {version}")
sql = path.read_text(encoding="utf-8")
try:
# Eine Transaktion pro Datei
with conn:
conn.executescript(sql)
except Exception as e:
logger.info(f"Fehler in Migration {version}: {e}")
# hier nicht weiter machen
raise
logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}")
create_admin_user()
finally:
conn.close()
def admin_exists() -> bool:
"""Prüft, ob der Admin-User bereits existiert."""
with closing(get_connection()) as conn:
row = conn.execute(
"SELECT 1 FROM users WHERE username = ?",
(ADMIN_USERNAME,),
).fetchone()
if row is not None:
print(row[0])
else:
print(ADMIN_USERNAME)
print("Kein Admin gefunden")
return row is not None
def create_admin_user():
if admin_exists():
logger.info("Adminkonto existiert bereits! Kein initiales Konto angelegt.")
return
logger.info("Adminkonto wird angelegt ...")
pw1 = getpass.getpass("Passwort: ")
pw2 = getpass.getpass("Passwort wiederholen: ")
if pw1 != pw2:
logger.warning("Passwörter stimmen nicht überein! Abbruch.")
return
ok = create_user(
username=ADMIN_USERNAME,
password=pw1,
role_id=1,
email="admin@co_app",
firstname="***",
lastname="admin"
)
if ok:
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' wurde angelegt.")
else:
# Sollte eigentlich nicht passieren, weil wir vorher geprüft haben,
# aber falls z.B. Parallelzugriff o.Ä.
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' konnte nicht angelegt werden.")
if __name__ == "__main__":
apply_migrations()

0
app/pages/__init__.py Normal file
View File

24
app/pages/costobjects.py Normal file
View File

@@ -0,0 +1,24 @@
import streamlit as st
import pandas as pd
from data.scriptloader import get_sql
from data.db import get_conn
def load_data():
sql = get_sql("co_kostenobjekte")
print(sql)
engine = get_conn("co_dw")
with engine.connect() as conn:
df = pd.read_sql(sql, engine)
print(df)
return df
st.dataframe(load_data())
if __name__ == "__main__":
df = load_data()
print(df)

17
app/pages/home.py Normal file
View File

@@ -0,0 +1,17 @@
import streamlit as st
from auth_runtime import require_login
from ui.sidebar import build_sidebar, hide_sidebar_if_logged_out
from auth import get_fullname_for_user
hide_sidebar_if_logged_out()
st.set_page_config(page_title="Co-App Home", page_icon="🏠")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
build_sidebar()
username = st.session_state.get("username")
st.header("Controlling-Portal")
st.info(f"Willkommen, {get_fullname_for_user(username)}!")

43
app/pages/user.py Normal file
View File

@@ -0,0 +1,43 @@
import streamlit as st
from auth_runtime import require_login
from ui.sidebar import hide_sidebar_if_logged_out
from auth import create_user
hide_sidebar_if_logged_out()
st.set_page_config(page_title="Co-App Home", page_icon="🏠")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
username = st.session_state.get("username")
# build_sidebar()
st.title("Benutzerverwaltung")
with st.expander("Neuen Nutzer anlegen"):
new_u = st.text_input("Neuer Username", key="new_u")
new_fname = st.text_input("Vorname", key="new_fname")
new_lname = st.text_input("Nachname", key="new_lname")
new_email = st.text_input("E-Mail", key="new_email")
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
if st.button("Anlegen"):
if new_u and new_p:
ok = create_user(
new_u.strip(),
new_p,
new_role,
new_email.strip() or None,
new_fname.strip() or None,
new_lname.strip() or None,
)
st.success("Nutzer angelegt.") if ok else st.error(
"Username bereits vorhanden oder Fehler."
)
else:
st.warning("Bitte Username und Passwort eingeben.")
st.subheader("Dein Bereich")
st.write(f"Personalisierter Content für **{username}**.")

0
app/tools/__init__.py Normal file
View File

173
app/ui/sidebar.py Normal file
View File

@@ -0,0 +1,173 @@
from contextlib import closing
import streamlit as st
from auth import get_fullname_for_user, get_role_for_user
from app_db.app_db import get_conn, get_list
# import sqlite3
def build_sidebar():
if st.session_state.get("authentication_status") != True:
return
authenticator = st.session_state.get("authenticator")
username = st.session_state.get("username")
if not authenticator or not username:
return
role = get_role_for_user(username)
fullname = get_fullname_for_user(username)
if role == "admin":
sql = """
select
g.group_id,
g.group_text,
d.dash_id,
d.dash_text,
d.page_file,
d.dash_type
from
groups g
left join dashboards d
on g.group_id = d.group_id
where
g.active = 1
and d.active = 1
"""
else:
sql = """
SELECT
d.group_id,
g.group_text,
p.dash_id,
d.dash_text,
d.page_file,
d.dash_type
FROM
users u
left join permissions p
on u.role_id = p.role_id
left join dashboards d
on p.dash_id = d.dash_id
left join groups g
on d.group_id = g.group_id
where
u.active = 1
and g.active = 1
and d.active = 1
and p.active = 1
and u.username = ?
order by
g.order_no,
d.order_no
"""
params = (username,) if "?" in sql else None
df = get_list(sql, params)
with st.sidebar:
st.logo("app/images/GMN_Logo_neu_rgb.png", size="small")
st.write(f"**{fullname}** ({role})")
col1, col2 = st.columns(2)
with col1:
if st.button("Logout", use_container_width=True):
authenticator.logout("Logout", "unrendered")
st.rerun()
with col2:
if st.button("Home", use_container_width=True):
st.switch_page("pages/home.py")
st.divider()
st.markdown("## Menü")
# for group_text, df_group in df.groupby("group_text"):
# with st.expander(group_text, expanded=False):
# for _, row in df_group.iterrows():
# dash_type = row.get("dash_type")
# page_file = row.get("page_file")
# label = row.get("dash_text", "")
# print(dash_type, page_file, label)
# # 1) echte Streamlit-Page
# if dash_type == "page" and isinstance(page_file, str) and page_file.strip():
# st.page_link(
# page_file, # z.B. "pages/umsatz.py"
# label=label,
# )
# # 2) externer Link (oder interner HTTP-Link)
# elif dash_type == "url" and isinstance(page_file, str) and page_file.strip():
# st.markdown(
# f"[{label}]({page_file})",
# unsafe_allow_html=False,
# )
# # 3) Platzhalter / noch nicht implementiert
# else:
# st.write(f"▫️ {label} (in Vorbereitung)")
# --- Suchfeld ---
query = st.text_input("Menü-Suche", "", placeholder="z.B. Umsatz, Kosten, User ...")
query = query.strip()
# Aktive Seite ermitteln (für Expander-Status / Highlight)
current_page = st.session_state.get("_page_path")
# --- DF filtern, falls Suchbegriff gesetzt ---
if query:
mask = (
df["dash_text"].str.contains(query, case=False, na=False)
| df["group_text"].str.contains(query, case=False, na=False)
)
df_view = df[mask].copy()
else:
df_view = df
if df_view.empty:
st.info("Keine Einträge zum Suchbegriff gefunden.")
return
# --- Gruppiert durchlaufen ---
for group_text, df_group in df_view.groupby("group_text"):
# Expander offen, wenn:
# - aktuelle Seite in dieser Gruppe liegt
group_open = any(
(row["page_file"] == current_page)
for _, row in df_group.iterrows()
)
with st.expander(group_text, expanded=(group_open or bool(query))):
for _, row in df_group.iterrows():
dash_type = row.get("dash_type")
page_file = row.get("page_file")
label = row.get("dash_text", "")
# Streamlit-Page
if dash_type == "page" and isinstance(page_file, str) and page_file.strip():
st.page_link(page_file, label=label)
# Externer Link
elif dash_type == "url" and isinstance(page_file, str) and page_file.strip():
st.markdown(f"[{label}]({page_file})")
# Platzhalter / Sonstiges
else:
st.write(f"▫️ {label}")
# Damit die leere Sidebar komplett verschwindet nach dem Logout
def hide_sidebar_if_logged_out():
if st.session_state.get("authentication_status") != True:
st.markdown("""
<style>
/* komplette Sidebar + Toggle ausblenden */
[data-testid="stSidebar"] {display: none;}
[data-testid="stSidebarNav"] {display: none;}
[data-testid="collapsedControl"] {display: none;}
/* Content wieder ganz nach links */
[data-testid="stAppViewContainer"] {
margin-left: 0;
}
</style>
""", unsafe_allow_html=True)

1
app/version.py Normal file
View File

@@ -0,0 +1 @@
__version__ = "0.0.1"

7
config/auth.yaml Normal file
View File

@@ -0,0 +1,7 @@
cookie:
expiry_days: 7
key: "tHr1FXOYg-xsYBGVu7amRrY8PAdC2gM_zyjPEPPkPgG8tAsLY4QGPZvCvMS9win0D94uawAxsmZjN6O_VlRhUQ"
name: "co_app-session"
preauthorized:
emails: []

10
config/settings.env Normal file
View File

@@ -0,0 +1,10 @@
# Databases
# Oracle
oracle_conn_str=oracle+oracledb://sp84p:data@ora:1522/?service_name=prod84
# gmn-conn\co_dw
co_dw_conn_str = mssql+pyodbc://co_app:JRHmi1KLwjgnF6@gmn-cont\controlling/co_dw?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes
# gmn-conn\co_daten
co_daten_conn_str = mssql+pyodbc://dw_user:Schneewittchen%4089887%%21@gmn-cont\controlling/co_daten?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes

View File

@@ -0,0 +1,20 @@
BEGIN;
CREATE TABLE IF NOT EXISTS schema_version (
version TEXT PRIMARY KEY,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
username TEXT UNIQUE NOT NULL,
password_hash BLOB NOT NULL,
role TEXT NOT NULL DEFAULT 'guest'
);
INSERT INTO schema_version (version) VALUES ('20251128_210200_initial');
COMMIT;

View File

@@ -0,0 +1,13 @@
begin;
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
);
INSERT INTO schema_version (version) VALUES ('20251129_162900_add_table_sessions');
COMMIT;

View File

@@ -0,0 +1,8 @@
begin;
alter table users
add column email text not null;
INSERT INTO schema_version (version) VALUES ('20251130_124700_add_col_email_users');
COMMIT;

View File

@@ -0,0 +1,11 @@
begin;
alter table users
add column firstname text;
alter table users
add column lastname text;
INSERT INTO schema_version (version) VALUES ('20251130_1411_add_col_name_users');
COMMIT;

View File

@@ -0,0 +1,8 @@
begin;
alter table users
add column new_pwd integer not null default 1;
INSERT INTO schema_version (version) VALUES ('20251130_161100_add_col_newpwd_users');
COMMIT;

View File

@@ -0,0 +1,42 @@
begin;
create table if not exists dashboards (
dash_id integer unique not null,
dash_text text not null,
dash_description text,
group_id integer not null,
active integer not null default 1,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement
);
create table if not exists groups (
group_id text unique not null,
group_text text not null,
group_description text,
active integer not null default 1,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement
);
create table if not exists roles (
role_id integer unique not null,
role_text text not null,
role_description text,
active integer not null default 1,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement
);
create table if not exists permissions (
role_id integer not null,
dash_id integer not null,
active integer not null default 1,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement,
unique (role_id, dash_id)
);
INSERT INTO schema_version (version) VALUES ('20251130_191100_add_table_dashboards');
COMMIT;

View File

@@ -0,0 +1,14 @@
begin;
alter table dashboards
add column zgrp1 text;
alter table dashboards
add column zgrp2 text;
alter table dashboards
add column zgrp3 text;
INSERT INTO schema_version (version) VALUES ('20251130_200000_add_col_area');
COMMIT;

View File

@@ -0,0 +1,11 @@
begin;
alter table dashboards
add column order_no integer default 0;
alter table groups
add column order_no integer default 0;
INSERT INTO schema_version (version) VALUES ('20251130_200600_add_col_order');
COMMIT;

View File

@@ -0,0 +1,8 @@
begin;
alter table users
rename column role to role_id;
INSERT INTO schema_version (version) VALUES ('20251203_201300_rename_col_table_role');
COMMIT;

View File

@@ -0,0 +1,8 @@
begin;
alter table users
add column active integer default 1;
INSERT INTO schema_version (version) VALUES ('20251203_202200_add_col_active_table_users');
COMMIT;

View File

@@ -0,0 +1,11 @@
begin;
alter table roles
add column order_no integer;
alter table permissions
add column order_no integer;
INSERT INTO schema_version (version) VALUES ('20251203_202900_add_col_order_tables');
COMMIT;

View File

@@ -0,0 +1,11 @@
begin;
alter table dashboards
add column page_file text;
alter table dashboards
add column dash_type text;
INSERT INTO schema_version (version) VALUES ('20251204_210700_add_col_page_dashboards');
COMMIT;