Compare commits

...

29 Commits

Author SHA1 Message Date
knedlik
20cd0b8547 Änderung: Filterfunktionen costobjects.py implementiert 2026-02-26 07:58:25 +01:00
knedlik
3f7e405824 Neues Dashboard: Kostenobjekte 2026-02-23 08:11:38 +01:00
knedlik
07854cc0ad Modify dashboard management 2025-12-29 11:04:16 +01:00
knedlik
3725e7fe5d Add dashboard management delete and edit 2025-12-29 09:46:37 +01:00
hansi
3f3ec5097d Add dashboard management - edit function 2025-12-22 22:02:36 +01:00
hansi
13385db5ef Add dashboard management 2025-12-22 08:21:04 +01:00
hansi
c711a75eed reorder database ids 2025-12-20 00:42:33 +01:00
hansi
00692e1641 Add role management 2025-12-19 22:59:50 +01:00
knedlik
4207fe29d6 code cleanup 2025-12-19 20:27:47 +01:00
knedlik
52010b0730 add groups management 2025-12-19 11:01:21 +01:00
knedlik
9c5ed967ec Add field 'acitve' to user authentication 2025-12-19 08:27:36 +01:00
knedlik
37d1daa052 Add tool numgen 2025-12-18 22:22:26 +01:00
hansi
9435399096 Modify the order of the sidebar, remove tabs from users 2025-12-16 22:09:55 +01:00
knedlik
0143d9579b Add user edit functionality 2025-12-16 08:53:36 +01:00
knedlik
13d747827d added users administration tool 2025-12-15 08:21:18 +01:00
knedlik
e2091ec677 clea up code and layout 2025-12-10 22:52:22 +01:00
knedlik
7f21716358 Add pages permission 2025-12-10 07:43:10 +01:00
knedlik
96be7567eb Add database functions 2025-12-05 22:52:38 +01:00
handi
3c0e58dfc1 Add sidebar builder 2025-12-04 22:08:15 +01:00
handi
e2d96c712e Update migrations 2025-12-03 22:03:15 +01:00
handi
ff7560b263 Add logo 2025-12-02 08:37:20 +01:00
handi
ed04a75ce2 Add navigation in sidebar 2025-11-30 23:03:25 +01:00
handi
5277830c50 Add change password 2025-11-30 16:34:34 +01:00
handi
111d0d2756 Add some new fields for users table 2025-11-30 14:55:02 +01:00
hansi
84e1e152f8 Add session-state and cookies 2025-11-30 12:35:21 +01:00
hansi
06e5322931 Add initialize admin-account in migration 2025-11-29 16:27:34 +01:00
hansi
6158f2ddff Add authentication 2025-11-29 12:04:58 +01:00
hansi
fe33d714bd add logging and database migration 2025-11-29 00:15:41 +01:00
hansi
6f9c5eb34e Initialize project structure 2025-11-28 08:53:29 +01:00
57 changed files with 3554 additions and 1 deletions

154
README.md
View File

@@ -1,3 +1,155 @@
# co_app
Controlling-App
## Insallation
1. Git-Repository clonen
2.
## Entwicklungs- und Produktionsumgebung steuern (APP_ENV)
Die App unterscheidet zwischen Entwicklung und Produktion über die Umgebungsvariable APP_ENV.
### Entwicklung (lokal)
Im Terminal setzen:
```bash
export APP_ENV=dev
```
Die Variable gilt, solange das Terminal geöffnet ist.
Beim Starten der App:
```bash
streamlit run app/main.py
```
Die App erkennt automatisch den Dev-Modus.
### Produktion (Systemd-Dienst)
Für den produktiven Betrieb setzt systemd die Variable dauerhaft auf prod:
```ini
Environment=APP_ENV=prod
```
In prod-Modus laufen z. B. Logging und Pfade anders als lokal.
### systemd-Dienst einrichten
``/etc/systemd/system/myapp.service``
```ini
[Unit]
Description=MyApp Streamlit Service
After=network.target
[Service]
User=co_app
Group=co_app
WorkingDirectory=/opt/co_app
Environment=APP_ENV=prod
ExecStart=/usr/bin/python3 -m streamlit run app/main.py --server.port=8501
Restart=on-failure
[Install]
WantedBy=multi-user.target
```
**Installation & Start**
```bash
sudo systemctl daemon-reload
sudo systemctl enable co_app.service
sudo systemctl start co_app.service
```
**Logs ansehen**
Da streamlit + python ins systemd-Journal loggen:
```bash
journalctl -u co_app.service -f
```
**Wichtig**
- APP_ENV=dev → Entwicklung, lokale Logs, lokale DB
- APP_ENV=prod → Systemd-Betrieb, serverseitige Pfade, Logging ins Journal
- Die App liest die Variable mit:
```python
import os
APP_ENV = os.environ.get("APP_ENV", "dev")
```
## Database Setup & Migration Guide
Dieses Projekt verwendet SQLite für die interne App-Datenbank.
Das Schema wird vollständig über SQL-Migrationsdateien verwaltet, die automatisch in der richtigen Reihenfolge ausgeführt werden.
**WICHTIG:**
Bei der ersten Initialisierung muss ein admin-Passwort vergeben werden. Beim erstmaligen Anmelden an der App muss man sich dann als "adim" mit dem vergebenen Passwort anmelden. In der App können dann User angelegt.
### Ordnerstruktur
```bash
data/ ← enthält die erzeugte SQLite-Datenbank (app.db)
migrations/ ← enthält alle SQL-Migrationsdateien
migrate.py ← Python-Skript zum Anwenden der Migrationen
```
### Erstellen der Datenbank (Initial Setup)
```bash
python migrate.py
```
Das Skript:
1. erstellt den Ordner db/ (falls nicht vorhanden)
2. erzeugt eine neue Datei db/app.db
3. führt alle SQL-Dateien im Ordner migrations/ der Reihe nach aus
4. protokolliert die angewendeten Migrationen in der Tabelle schema_version
Danach ist die Datenbank vollständig initialisiert und einsatzbereit.
### Arbeiten mit Migrationen (Änderungen am Schema)
Änderungen an der Datenbankstruktur werden nie direkt an der DB vorgenommen.
Stattdessen erzeugst du eine neue Migrationsdatei im Ordner migrations/.
**Beispiel: neue Spalte hinzufügen**
Lege eine Datei an
```bash
touch /migrations/20250301_101500_add_last_login.sql
```
Dateiinhalt
```sql
BEGIN;
ALTER TABLE users ADD COLUMN last_login DATETIME;
INSERT INTO schema_version (version)
VALUES ('20250301_101500_add_last_login');
COMMIT;
```
Migration anwenden
```bash
python migrate.py
```
Das Skript erkennt automatisch, dass diese Migration neu ist und führt sie aus.
### Erneutes Ausführen (Updates)
Wenn eine bestehende Installation aktualisiert werden soll:
```bash
git pull
python migrate.py
```
Das Skript:
- liest die Tabelle schema_version
- führt nur Migrationen aus, die noch nicht angewendet wurden
Bestehende Strukturen bleiben unberührt.
### Entwicklungsphase: Hinweise
Solange das Projekt noch nicht produktiv genutzt wird, gilt:
- du kannst Migrationsdateien löschen/zusammenführen
- du kannst die komplette app.db jederzeit löschen
- vor dem ersten echten Deploy kannst du alle Migrationen zu einer sauberen initial.sql zusammenfassen
👉 **Nach dem ersten produktiven Einsatz** werden Migrationen nie mehr gelöscht, sondern nur ergänzt.
### Optional: DB neu erzeugen für Tests
Um eine frische Datenbank zu bekommen:
```bash
rm -f data/app.db
python migrate.py
```

BIN
app.db Normal file

Binary file not shown.

View File

@@ -0,0 +1,11 @@
[client]
showSidebarNavigation = false
# toolbarMode = "minimal"
# toolbarMode = "auto"
toolbarMode = "viewer"
[theme]
# primaryColor = "blue"
# backgroundColor = "black"
# secondaryBackgroundColor = "blue"
# borderColor = "blue"

21
app/.streamlit/style.css Normal file
View File

@@ -0,0 +1,21 @@
/* Footer entfernen */
footer {visibility: hidden !important;}
div[data-testid="stStatusWidget"] {display: none !important;}
/* Sidebar-Spacing kompakter */
section[data-testid="stSidebar"] > div {
padding-top: 0.5rem !important;
}
/* Haupt-Container kompakter */
.block-container {
padding-top: 0.6rem !important;
padding-bottom: 0.6rem !important;
}
/* Widgets enger */
.stButton, .stTextInput, .stSelectbox {
margin-bottom: 0.3rem !important;
}

0
app/__init__.py Normal file
View File

86
app/admin_users.py Normal file
View File

@@ -0,0 +1,86 @@
# admin_users.py
import streamlit as st
import pandas as pd
from db_users import init_db, list_users, get_user, create_user, update_user, set_password, delete_user
st.set_page_config(page_title="User Admin", layout="wide")
init_db()
st.title("Benutzerverwaltung")
# --- Übersicht ---
users = list_users()
df = pd.DataFrame(users)
if df.empty:
st.info("Noch keine Benutzer vorhanden.")
df = pd.DataFrame(columns=["id","username","display_name","email","role","is_active","created_at"])
st.subheader("Übersicht")
st.dataframe(df, use_container_width=True, hide_index=True)
st.divider()
colA, colB = st.columns(2, gap="large")
# --- Neuen Benutzer anlegen ---
with colA:
st.subheader("Neuen Benutzer anlegen")
with st.form("create_user", clear_on_submit=True):
username = st.text_input("Username*", max_chars=50)
password = st.text_input("Initiales Passwort*", type="password")
display_name = st.text_input("Anzeigename")
email = st.text_input("E-Mail")
role = st.selectbox("Rolle", ["user", "admin"], index=0)
is_active = st.checkbox("Aktiv", value=True)
submitted = st.form_submit_button("Anlegen")
if submitted:
if not username or not password:
st.error("Username und Passwort sind Pflicht.")
else:
try:
create_user(username, password, display_name, email, role, is_active)
st.success("Benutzer angelegt.")
st.rerun()
except Exception as e:
st.error(f"Fehler beim Anlegen: {e}")
# --- Bestehenden Benutzer bearbeiten/löschen ---
with colB:
st.subheader("Benutzer bearbeiten / löschen")
user_ids = df["id"].tolist() if "id" in df.columns else []
selected_id = st.selectbox("Benutzer wählen", user_ids, format_func=lambda uid: f"{uid} {df.loc[df.id==uid,'username'].values[0] if len(df)>0 else uid}" if uid in user_ids else str(uid))
user = get_user(int(selected_id)) if selected_id else None
if user:
with st.form("edit_user"):
display_name2 = st.text_input("Anzeigename", value=user.get("display_name") or "")
email2 = st.text_input("E-Mail", value=user.get("email") or "")
role2 = st.selectbox("Rolle", ["user", "admin"], index=0 if user["role"]=="user" else 1)
is_active2 = st.checkbox("Aktiv", value=bool(user["is_active"]))
save = st.form_submit_button("Änderungen speichern")
if save:
update_user(int(selected_id), display_name2, email2, role2, is_active2)
st.success("Gespeichert.")
st.rerun()
st.markdown("**Passwort zurücksetzen**")
with st.form("reset_pw", clear_on_submit=True):
new_pw = st.text_input("Neues Passwort", type="password")
reset = st.form_submit_button("Passwort setzen")
if reset:
if not new_pw:
st.error("Bitte ein Passwort eingeben.")
else:
set_password(int(selected_id), new_pw)
st.success("Passwort aktualisiert.")
st.markdown("**Löschen**")
confirm = st.checkbox("Ich möchte diesen Benutzer wirklich löschen.")
if st.button("Benutzer löschen", disabled=not confirm):
delete_user(int(selected_id))
st.success("Benutzer gelöscht.")
st.rerun()

BIN
app/app_db/app.db Normal file

Binary file not shown.

45
app/app_db/app_db.py Normal file
View File

@@ -0,0 +1,45 @@
import sqlite3
from pathlib import Path
import pandas as pd
BASE_DIR = Path(__file__).resolve().parent
# DB_PATH = BASE_DIR / "app_db" / "app.db"
DB_PATH = BASE_DIR / "app.db"
#-------------------------------------------------------------------------------------
# Allgemeine Datenbankfunktionen
#-------------------------------------------------------------------------------------
def get_conn():
# check_same_thread=False, damit Streamlit mehrere Threads nutzen kann
return sqlite3.connect(DB_PATH, check_same_thread=False)
def get_list(sql, params=None):
conn = get_conn()
df = pd.read_sql_query(sql, conn, params=params)
conn.close()
return df
def get_value(sql, params=None):
conn = get_conn()
df = pd.read_sql_query(sql, conn, params=params)
conn.close()
value = df.iloc[0,0]
return value
def send_cmd(sql, params=None):
try:
with get_conn() as conn:
if params is None:
conn.execute(sql)
else:
conn.execute(sql, params)
return True
except Exception as e:
print("DB-Fehler:", e)
return False
#-------------------------------------------------------------------------------------
#
#-------------------------------------------------------------------------------------

224
app/auth.py Normal file
View File

@@ -0,0 +1,224 @@
from contextlib import closing
import bcrypt
from app_db.app_db import get_conn, get_list
# ---------------------------------------------------------------------------
# Benutzer anlegen
# ---------------------------------------------------------------------------
def create_user(
username: str,
password: str,
role_id: int = 1,
email: str | None = None,
firstname: str | None = None,
lastname: str | None = None
) -> bool:
"""
Passwort wird als bcrypt-Hash (TEXT) gespeichert.
"""
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
try:
with closing(get_conn()) as conn, conn:
conn.execute(
"INSERT INTO users (username, email, password_hash, role_id, firstname, lastname) VALUES (?, ?, ?, ?, ?, ?)",
(username, email, pw_hash, role_id, firstname, lastname),
)
return True
except Exception:
return False
# ---------------------------------------------------------------------------
# Benutzer überprüfen
# ---------------------------------------------------------------------------
def verify_user(username: str, password: str):
"""
Vergleicht eingegebenes Passwort mit dem gespeicherten bcrypt-Hash.
Rückgabe: (True, role_id) oder (False, None)
"""
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT password_hash, role_id FROM users WHERE username = ?",
(username,),
).fetchone()
if not row:
return False, None
stored_hash, role_id = row
# stored_hash ist TEXT → zurück nach bytes
ok = bcrypt.checkpw(password.encode("utf-8"), stored_hash.encode("utf-8"))
return (ok, role_id) if ok else (False, None)
# ---------------------------------------------------------------------------
# Rolle für einen Benutzer holen (für Streamlit-Inhalte)
# ---------------------------------------------------------------------------
def get_role_for_user(username: str) -> str:
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT r.role_text from users u left join roles r on u.role_id = r.role_id WHERE username = ?",
(username,),
).fetchone()
return row[0] if row else "user"
# ---------------------------------------------------------------------------
# Fullname für einen Benutzer holen (für Streamlit-Inhalte)
# ---------------------------------------------------------------------------
def get_fullname_for_user(username: str) -> str:
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT firstname ||' '|| lastname as fullname FROM users WHERE username = ?",
(username,),
).fetchone()
return row[0] if row else "user"
# ---------------------------------------------------------------------------
# Sidebar-Einträge für den Benutzer
# ---------------------------------------------------------------------------
def get_sidebar(role_text: str, username: str):
if role_text == "admin":
sql = """
select
g.group_id,
g.group_text,
d.dash_id,
d.dash_text,
d.page_file,
d.dash_type,
d.order_no as dash_order,
g.order_no as group_order
from
groups g
left join dashboards d
on g.group_id = d.group_id
where
g.active = 1
and d.active = 1
order by g.order_no, d.order_no
"""
else:
sql = """
SELECT
d.group_id,
g.group_text,
p.dash_id,
d.dash_text,
d.page_file,
d.dash_type,
d.order_no as dash_order,
g.order_no as group_order
FROM
users u
left join permissions p
on u.role_id = p.role_id
left join dashboards d
on p.dash_id = d.dash_id
left join groups g
on d.group_id = g.group_id
where
u.active = 1
and g.active = 1
and d.active = 1
and p.active = 1
and u.username = ?
order by g.order_no, d.order_no
"""
params = (username,) if "?" in sql else None
df = get_list(sql, params)
return df
# ---------------------------------------------------------------------------
# Credential-Lader für streamlit-authenticator
# ---------------------------------------------------------------------------
def load_credentials_from_db() -> dict:
"""
Baut das credentials-Dict so:
{
"usernames": {
"hansi": {
"email": "hansi@example.com",
"name": "hansi",
"password": "$2b$12$...",
},
...
}
}
streamlit-authenticator prüft dann Login + Cookies automatisch.
"""
creds = {"usernames": {}}
with closing(get_conn()) as conn:
rows = conn.execute(
"SELECT username, email, password_hash FROM users where active = 1"
).fetchall()
for username, email, pw_hash in rows:
# pw_hash kommt als TEXT
creds["usernames"][username] = {
"name": username,
"email": email or f"{username}@example.local",
"password": pw_hash, # TEXT → streamlit-authenticator-kompatibel
}
return creds
# ---------------------------------------------------------------------------
# Muss das Passwort geändert werden (Admin-Vorgabe)?
# ---------------------------------------------------------------------------
def needs_password_change(username: str) -> bool:
"""Prüft, ob der User ein neues Passwort setzen muss (new_pwd = 1)."""
with closing(get_conn()) as conn:
row = conn.execute(
"SELECT new_pwd FROM users WHERE username = ?",
(username,),
).fetchone()
if not row:
return False
return row[0] == 1
# ---------------------------------------------------------------------------
# Passwort ändern durch den Benutzer
# ---------------------------------------------------------------------------
def update_password(username: str, new_password: str, reset_flag: bool = True) -> bool:
"""
Setzt ein neues Passwort für den User.
Wenn reset_flag=True: new_pwd wird auf 0 zurückgesetzt.
"""
pw_hash = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
with closing(get_conn()) as conn, conn:
if reset_flag:
conn.execute(
"""
UPDATE users
SET password_hash = ?, new_pwd = 0
WHERE username = ?
""",
(pw_hash, username),
)
else:
conn.execute(
"""
UPDATE users
SET password_hash = ?
WHERE username = ?
""",
(pw_hash, username),
)
return True

108
app/auth_runtime.py Normal file
View File

@@ -0,0 +1,108 @@
import streamlit as st
import yaml
from yaml.loader import SafeLoader
import streamlit_authenticator as stauth
from streamlit_authenticator.utilities.exceptions import LoginError
from auth import (
load_credentials_from_db,
needs_password_change,
update_password,
get_role_for_user,
get_fullname_for_user,
get_sidebar
)
st.markdown("""
<style>
/* Streamlit Hamburger-Menü ausblenden */
div[data-testid="stToolbar"] {
visibility: hidden !important;
}
/* Optional: ganz entfernen statt nur unsichtbar machen */
div[data-testid="stDecoration"] {
display: none !important;
}
</style>
""", unsafe_allow_html=True)
def get_authenticator():
with open("config/auth.yaml", "r", encoding="utf-8") as f:
base_config = yaml.load(f, Loader=SafeLoader)
db_creds = load_credentials_from_db() # Hier wird name, email und password geladen
base_config["credentials"] = db_creds
authenticator = stauth.Authenticate(
base_config["credentials"],
base_config["cookie"]["name"],
base_config["cookie"]["key"],
base_config["cookie"]["expiry_days"],
)
return authenticator
def require_login():
"""Sicherstellen, dass der User eingeloggt ist. Auf JEDER Page verwenden."""
authenticator = get_authenticator()
try:
authenticator.login(location="main", key="Login")
except LoginError:
authenticator.logout("ForceLogout", "sidebar")
st.error("Sitzung ungültig. Bitte neu einloggen.")
st.stop()
auth_status = st.session_state.get("authentication_status")
if auth_status is False:
st.error("Login fehlgeschlagen.")
st.stop()
if auth_status is None:
st.warning("Bitte Benutzername und Passwort eingeben.")
st.stop()
#--------------------------------------------------------------------------------------
# Ab hier bin ich eingeloggt
#--------------------------------------------------------------------------------------
username = st.session_state.get("username")
if "df_sidebar" not in st.session_state:
role_text = get_role_for_user(username)
fullname = get_fullname_for_user(username)
sidebar = get_sidebar(role_text, username)
st.session_state["role_text"] = role_text
st.session_state["fullname"] = fullname
st.session_state["df_sidebar"] = sidebar
# Passwortwechsel erzwingen
if needs_password_change(username):
st.warning("Du musst dein Passwort ändern, bevor du die Anwendung nutzen kannst.")
with st.form("pw_change_form"):
pw1 = st.text_input("Neues Passwort", type="password")
pw2 = st.text_input("Neues Passwort (Wiederholung)", type="password")
submitted = st.form_submit_button("Passwort ändern")
if submitted:
if not pw1 or not pw2:
st.error("Bitte beide Passwortfelder ausfüllen.")
st.stop()
if pw1 != pw2:
st.error("Passwörter stimmen nicht überein.")
st.stop()
if len(pw1) < 8:
st.error("Passwort sollte mindestens 8 Zeichen lang sein.")
st.stop()
update_password(username, pw1, reset_flag=True)
st.success("Passwort wurde geändert.")
st.rerun()
st.stop()
return authenticator

0
app/data/__init__.py Normal file
View File

3
app/data/datasets.py Normal file
View File

@@ -0,0 +1,3 @@
from scriptloader import get_sql
from db import get_conn

58
app/data/db.py Normal file
View File

@@ -0,0 +1,58 @@
import streamlit as st
from sqlalchemy import create_engine, Text
from data.scriptloader import get_sql
import oracledb
import pandas as pd
from dotenv import load_dotenv
from pathlib import Path
from urllib.parse import quote
import os
import logging
env_path = Path("config/settings.env")
load_dotenv(env_path)
oracle_conn_str = os.getenv("oracle_conn_str")
co_dw_conn_str = os.getenv("co_dw_conn_str")
co_daten_conn_str = os.getenv("co_daten_conn_str")
def get_conn(db):
match db:
case "oracle":
engine = create_engine(oracle_conn_str)
case "co_dw":
engine = create_engine(co_dw_conn_str)
case "co_daten":
engine = create_engine(co_dw_conn_str)
case _:
logging.info(f"Datenbank {db} konnte nicht gefunden werden")
return engine
def load_data(sql_file, db):
sql = get_sql(sql_file)
engine = get_conn(db)
with engine.connect():
df = pd.read_sql(sql, engine)
return df
# def get_data(db):
# engine = get_conn(db)
# with engine.connect() as conn:
# print(engine)
# return
# if __name__ == "__main__":
# get_data("co_daten")

10
app/data/scriptloader.py Normal file
View File

@@ -0,0 +1,10 @@
from pathlib import Path
def get_sql(filename):
sql_query = Path(f"app/data/sql/{filename}.sql").read_text()
return sql_query
if __name__ == "__main__":
print(get_sql("ergebnis_kpi"))

View File

@@ -0,0 +1 @@
select * from bi.Dim_Kostenobjekt

View File

@@ -0,0 +1,92 @@
/*******************************************************************************************
* Basis-Daten aus der Kostenrechnung
*******************************************************************************************/
with basis as(
Select
p.geschaeftsjahr as jahr,
p.geschaeftsperiode as monat,
p.periode_key,
b.zgrp1,
b.zgrp2,
b.zgrp3,
b.bezeichnung as Bereich,
ko.kostenobjekt,
ko.bezeichnung as obj_bezeichnung,
ko.obj_text,
ko.objektgruppe,
ko.objekttyp,
ko.verantwortlicher,
ks.co_koa_grp,
ks.co_grp_bez,
ks.co_grp_text,
ks.kostenart,
ks.koa_text,
coalesce(ws.uml_kz,'') as uml_kz,
coalesce(ws.zgrp_1_entlastung, '') as zgrp1_entlastung,
coalesce(ws.wert * -1, 0) as wert,
coalesce(ws.wert_plan * -1, 0) as wert_plan,
0 as wert_forecast
from
co_dw.bi.fact_wertsummen ws
left join co_dw.bi.dim_periode p
on ws.periode_key = p.periode_key
left join co_dw.bi.dim_kostenobjekt ko
on ws.objekt_key = ko.objekt_key
left join co_dw.bi.dim_koastruktur ks
on ws.kostenart = ks.kostenart
left join co_dw.bi.dim_bereich b
on ko.bereich_key = b.bereich_key
where
ks.co_koa_grp_01 = 'GMN001'
and ko.objektgruppe in ('HKST', 'VKST', 'KTRG', 'KtrBereich', 'KtrMNr')
and p.geschaeftsperiode < 14
and (p.geschaeftsjahr = :jahr or p.geschaeftsjahr = :jahr -1)
),
/*******************************************************************************************
* Gesamtergebnis
*******************************************************************************************/
ergebnis_gesamt as (
select
zgrp1,
zgrp2,
zgrp3,
kostenobjekt,
obj_bezeichnung,
obj_text,
co_koa_grp,
co_grp_bez,
co_grp_text,
'Ergebnis' as main_kpi,
'Gesamtergebnis' as kpi,
round(sum(case when jahr = :jahr-1 then wert else 0 end),0) as vor_jahr,
round(sum(case when jahr = :jahr-1 and monat <= :monat then wert else 0 end),0) as vor_jahr_bis_monat,
round(sum(case when jahr = :jahr-1 and monat = :monat then wert else 0 end),0) as vor_jahr_monat,
round(sum(case when jahr = :jahr then wert else 0 end),0) as akt_jahr,
round(sum(case when jahr = :jahr and monat <= :monat then wert else 0 end),0) as akt_jahr_bis_monat,
round(sum(case when jahr = :jahr and monat = :monat then wert else 0 end),0) as akt_jahr_monat,
round(sum(case when jahr = :jahr then wert_plan else 0 end),0) as plan_akt_jahr,
round(sum(case when jahr = :jahr and monat <= :monat then wert_plan else 0 end),0) as plan_akt_jahr_bis_monat,
round(sum(case when jahr = :jahr and monat = :monat then wert_plan else 0 end),0) as plan_akt_jahr_monat,
round(sum(case when jahr = :jahr then wert_forecast else 0 end),0) as forecast_akt_jahr,
round(sum(case when jahr = :jahr and monat <= :monat then wert_forecast else 0 end),0) as forecast_akt_jahr_bis_monat,
round(sum(case when jahr = :jahr and monat = :monat then wert_forecast else 0 end),0) as forecast_akt_jahr_monat
from
basis
group by
zgrp1,
zgrp2,
zgrp3,
kostenobjekt,
obj_bezeichnung,
obj_text,
co_koa_grp,
co_grp_bez,
co_grp_text
)
select * from ergebnis_gesamt
select * from co_dw.bi.Fact_Wertsummen

View File

@@ -0,0 +1,22 @@
/* Orlacle (PENTA) Kostenobjekte */
select
GESCHAEFTSJAHR as jahr,
OBJEKTTYP as typ,
KOSTENOBJEKT as obj,
BEZEICHNUNG,
VERANTWORTLICHER,
FELD_2_X30 as vorgesetzter,
ZUORDNUNGSGRUPPE_1 as zgrp1,
ZUORDNUNGSGRUPPE_2 as zgrp2,
ZUORDNUNGSGRUPPE_3 as zgrp3,
ZUORDNUNGSGRUPPE_4 as zgrp4,
ZUORDNUNGSGRUPPE_5 as zgrp5,
ZUORDNUNGSGRUPPE_6 as zgrp6,
FELD_1_X30 as fertigung,
OBJEKTGRUPPE as objgrp,
sysdate
from
pkos
where
objekttyp in ('01', '02')

View File

@@ -0,0 +1 @@
select * from umsatz

73
app/db_users.py Normal file
View File

@@ -0,0 +1,73 @@
# db_users.py
import sqlite3
from contextlib import contextmanager
from typing import Optional
import bcrypt
DB_PATH = "app.db"
@contextmanager
def get_conn():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db():
with get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
display_name TEXT,
email TEXT,
password_hash BLOB NOT NULL,
role TEXT NOT NULL DEFAULT 'user',
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
def list_users():
with get_conn() as conn:
rows = conn.execute("""
SELECT id, username, display_name, email, role, is_active, created_at
FROM users ORDER BY id
""").fetchall()
return [dict(r) for r in rows]
def get_user(user_id: int) -> Optional[dict]:
with get_conn() as conn:
r = conn.execute("""
SELECT id, username, display_name, email, role, is_active
FROM users WHERE id = ?
""", (user_id,)).fetchone()
return dict(r) if r else None
def create_user(username: str, password: str, display_name: str = "", email: str = "", role: str = "user", is_active: bool = True):
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
with get_conn() as conn:
conn.execute("""
INSERT INTO users (username, display_name, email, password_hash, role, is_active)
VALUES (?, ?, ?, ?, ?, ?)
""", (username, display_name, email, pw_hash, role, 1 if is_active else 0))
def update_user(user_id: int, display_name: str, email: str, role: str, is_active: bool):
with get_conn() as conn:
conn.execute("""
UPDATE users
SET display_name=?, email=?, role=?, is_active=?
WHERE id=?
""", (display_name, email, role, 1 if is_active else 0, user_id))
def set_password(user_id: int, new_password: str):
pw_hash = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt())
with get_conn() as conn:
conn.execute("UPDATE users SET password_hash=? WHERE id=?", (pw_hash, user_id))
def delete_user(user_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM users WHERE id=?", (user_id,))

41
app/db_users_search.py Normal file
View File

@@ -0,0 +1,41 @@
# db_users_search.py
import sqlite3
from contextlib import contextmanager
DB_PATH = "app.db"
@contextmanager
def get_conn():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def search_users(q: str, limit: int, offset: int):
q = (q or "").strip()
like = f"%{q}%"
with get_conn() as conn:
rows = conn.execute("""
SELECT id, username, display_name, email, role, is_active, created_at
FROM users
WHERE (? = '')
OR (username LIKE ? COLLATE NOCASE
OR display_name LIKE ? COLLATE NOCASE
OR email LIKE ? COLLATE NOCASE)
ORDER BY username
LIMIT ? OFFSET ?
""", (q, like, like, like, limit, offset)).fetchall()
total = conn.execute("""
SELECT COUNT(*)
FROM users
WHERE (? = '')
OR (username LIKE ? COLLATE NOCASE
OR display_name LIKE ? COLLATE NOCASE
OR email LIKE ? COLLATE NOCASE)
""", (q, like, like, like)).fetchone()[0]
return [dict(r) for r in rows], int(total)

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

BIN
app/images/home.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 308 B

View File

24
app/logging_config.py Normal file
View File

@@ -0,0 +1,24 @@
import logging
from pathlib import Path
def setup_logging(app_env: str):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
# Always log to console (terminal / systemd)
sh = logging.StreamHandler()
sh.setFormatter(fmt)
logger.addHandler(sh)
# Dev → additional file logging
if app_env == "dev":
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
fh = logging.FileHandler(log_dir / "app.log", encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(fh)
return logger

61
app/main.py Normal file
View File

@@ -0,0 +1,61 @@
import streamlit as st
import logging
from logging_config import setup_logging
from version import __version__
from auth import get_role_for_user, get_fullname_for_user, get_sidebar
from auth_runtime import require_login
from ui.sidebar import build_sidebar
import os
from app_db.app_db import get_list
from pathlib import Path
from tools.load_css import load_css
APP_ENV = os.environ.get("APP_ENV", "dev")
logger = setup_logging(APP_ENV)
logger = logging.getLogger(__name__)
load_css()
def main():
st.set_page_config(
page_title=f"Co-App Home - V{__version__}",
page_icon="🔒",
layout="centered",
menu_items=None,
)
authenticator = require_login()
#--------------------------------------------------------------------------------
# Ab hier ist Benutzer angemeldet!!
#--------------------------------------------------------------------------------
# Alles was man so braucht in der App wird in session_state abgelegt
# zuerst den authenticator holen ...
st.session_state["authenticator"] = authenticator
# ... dann ist der name des users in der sesstion_state verfügbar!
# Ich suche mir erst mal alles zusammen, was ich dann in session_state speichern will ...
username = st.session_state.get("name") # wird duch authenticator gesetzt
role_text = get_role_for_user(username)
fullname = get_fullname_for_user(username)
sidebar = get_sidebar(role_text, username)
# ... und lege es dann im session_state ab.
st.session_state["role_text"] = role_text
st.session_state["fullname"] = fullname
st.session_state["df_sidebar"] = sidebar
build_sidebar()
st.header("Controlling-Portal")
st.info(f"Willkommen, {st.session_state.get('username')}!")
st.text("Hier könnte eine Nachricht für den Benutzer stehen")
if __name__ == "__main__":
main()

494
app/main_v1.py Normal file
View File

@@ -0,0 +1,494 @@
import importlib
import streamlit as st
import yaml
from yaml.loader import SafeLoader
import streamlit_authenticator as stauth
from streamlit_authenticator.utilities.exceptions import LoginError
import pandas as pd
from contextlib import closing
from auth import (
load_credentials_from_db,
get_role_for_user,
create_user,
needs_password_change,
update_password,
get_fullname_for_user,
)
from version import __version__
from db import get_conn # an deine Struktur anpassen
# ---------------------------------------------------------------------------
# Dashboards aus der DB laden
# ---------------------------------------------------------------------------
def load_dashboards_df() -> pd.DataFrame:
"""Lädt alle aktiven Dashboards aus der DB als DataFrame."""
with closing(get_conn()) as conn:
df = pd.read_sql_query(
"""
SELECT
d.dash_id,
d.dash_text as code,
d.dash_text as title,
g.group_text as grp
from
dashboards d
left join groups g
on d.group_id = g.group_id
where
d.active = 1
order by
d.dash_id
""",
conn,
)
return df
# ---------------------------------------------------------------------------
# Home-Dashboard (als Funktion, kein eigenes .py nötig)
# ---------------------------------------------------------------------------
def home_dashboard(username: str, role: str):
st.header("Controlling-Portal")
st.info(f"Willkommen, {username}!")
if role == "admin":
st.subheader("Admin-Bereich")
st.write("Nur Admins sehen das hier.")
with st.expander("Neuen Nutzer anlegen"):
new_u = st.text_input("Neuer Username", key="new_u")
new_fname = st.text_input("Vorname", key="new_fname")
new_lname = st.text_input("Nachname", key="new_lname")
new_email = st.text_input("E-Mail", key="new_email")
new_p = st.text_input("Neues Passwort", type="password", key="new_p")
new_role = st.selectbox("Rolle", ["user", "admin"], key="new_role")
if st.button("Anlegen"):
if new_u and new_p:
ok = create_user(
new_u.strip(),
new_p,
new_role,
new_email.strip() or None,
new_fname.strip() or None,
new_lname.strip() or None,
)
st.success("Nutzer angelegt.") if ok else st.error(
"Username bereits vorhanden oder Fehler."
)
else:
st.warning("Bitte Username und Passwort eingeben.")
st.subheader("Dein Bereich")
st.write(f"Personalisierter Content für **{username}**.")
# ---------------------------------------------------------------------------
# Dashboard-Resolver: code -> render-Funktion
# ---------------------------------------------------------------------------
def get_dashboard_renderer(code: str):
"""
Liefert die passende Render-Funktion zum Dashboard-Code.
Konvention:
- Home: interne Funktion home_dashboard(username, role)
- andere: Modul dashboards.<code> mit Funktion render(username, role)
"""
if code == "home":
return home_dashboard
module_name = f"dashboards.{code}"
try:
module = importlib.import_module(module_name)
except ModuleNotFoundError:
return None
# Wir erwarten eine Funktion render(username, role)
return getattr(module, "render", None)
# ---------------------------------------------------------------------------
# Sidebar: Home + Suche + DB-Dashboards (Gruppen-Expander)
# ---------------------------------------------------------------------------
def build_sidebar(authenticator, username: str, df_dashboards: pd.DataFrame):
role = get_role_for_user(username)
fullname = get_fullname_for_user(username)
with st.sidebar:
st.logo("app/images/GMN_Logo_neu_rgb.png",size="small")
# st.markdown("*Controlling*")
st.write(f"**{fullname}** ({role})")
col1, col2 = st.columns(2)
with col1:
if st.button("Logout", use_container_width=True):
authenticator.logout("Logout", "unrendered")
st.rerun()
#authenticator.logout("Logout", "sidebar")
with col2:
# st.page_link("pages/home.py", label="Home", icon="🏠")
if st.button("🏠 Home", use_container_width=True):
st.session_state["selected_dashboard_code"] = "home"
# st.divider()
st.markdown("### Dashboards")
# st.divider()
# Default-Auswahl
if "selected_dashboard_code" not in st.session_state:
st.session_state["selected_dashboard_code"] = "home"
# Home ist immer da
# if st.button("🏠 Home", use_container_width=True):
# st.session_state["selected_dashboard_code"] = "home"
# st.markdown("---")
# Suchfeld für DB-Dashboards
query = st.text_input("Suche", placeholder="Dashboard suchen …")
filtered = df_dashboards.copy()
if query:
q = query.lower()
filtered = filtered[
filtered["title"].str.lower().str.contains(q, na=False)
| filtered["grp"].str.lower().str.contains(q, na=False)
| filtered["description"].fillna("").str.lower().str.contains(q, na=False)
]
if filtered.empty:
st.info("Keine Dashboards zur Suche gefunden.")
return
# Gruppen -> Expander, darin Buttons für Dashboards
for grp, grp_df in filtered.groupby("grp"):
with st.expander(grp, expanded=False):
for _, row in grp_df.iterrows():
code = row["code"]
title = row["title"]
if st.button(title, key=f"dash_btn_{code}", use_container_width=True):
st.session_state["selected_dashboard_code"] = code
# ---------------------------------------------------------------------------
# Login + App-Shell
# ---------------------------------------------------------------------------
def main():
st.set_page_config(
page_title=f"Co-App Start - V{__version__}",
page_icon="🔒",
layout="centered",
)
# --- Authenticator vorbereiten ---
with open("config/auth.yaml", "r", encoding="utf-8") as f:
base_config = yaml.load(f, Loader=SafeLoader)
db_creds = load_credentials_from_db()
base_config["credentials"] = db_creds
authenticator = stauth.Authenticate(
base_config["credentials"],
base_config["cookie"]["name"],
base_config["cookie"]["key"],
base_config["cookie"]["expiry_days"],
)
# --- Login-View zeichnen ---
try:
authenticator.login(location="main", key="Login")
except LoginError:
authenticator.logout("ForceLogout", "sidebar")
st.error("Sitzung ungültig. Bitte neu einloggen.")
return
auth_status = st.session_state.get("authentication_status")
name = st.session_state.get("name")
username = st.session_state.get("username")
if auth_status is False:
st.error("Login fehlgeschlagen.")
return
if auth_status is None:
st.warning("Bitte Benutzername und Passwort eingeben.")
return
# ---- Ab hier eingeloggt (persistenter Cookie) ----
# 1) Passwortwechsel erzwingen?
if needs_password_change(username):
st.warning("Du musst dein Passwort ändern, bevor du die Anwendung nutzen kannst.")
with st.form("pw_change_form"):
pw1 = st.text_input("Neues Passwort", type="password")
pw2 = st.text_input("Neues Passwort (Wiederholung)", type="password")
submitted = st.form_submit_button("Passwort ändern")
if submitted:
if not pw1 or not pw2:
st.error("Bitte beide Passwortfelder ausfüllen.")
return
if pw1 != pw2:
st.error("Passwörter stimmen nicht überein.")
return
if len(pw1) < 8:
st.error("Passwort sollte mindestens 8 Zeichen lang sein.")
return
update_password(username, pw1, reset_flag=True)
st.success("Passwort wurde geändert.")
st.rerun()
# solange new_pwd=1: keinen weiteren Inhalt anzeigen
return
# 2) Dashboards aus DB laden (ohne Home)
df_dashboards = load_dashboards_df()
# 3) Sidebar aufbauen (User, Logout, Navigation)
build_sidebar(authenticator, username, df_dashboards)
# 4) Ausgewähltes Dashboard rendern
role = get_role_for_user(username)
selected_code = st.session_state.get("selected_dashboard_code", "home")
renderer = get_dashboard_renderer(selected_code)
if renderer is None:
st.error(f"Kein Dashboard-Modul für '{selected_code}' gefunden.")
return
# Titel setzen
if selected_code == "home":
title = "Home"
else:
row = df_dashboards[df_dashboards["code"] == selected_code]
title = row["title"].iloc[0] if not row.empty else selected_code
st.title(title)
renderer(username, role)
if __name__ == "__main__":
main()
# import importlib
# import streamlit as st
# import yaml
# from yaml.loader import SafeLoader
# import streamlit_authenticator as stauth
# from streamlit_authenticator.utilities.exceptions import LoginError
# from auth import (
# load_credentials_from_db,
# get_role_for_user,
# create_user,
# needs_password_change,
# update_password,
# get_fullname_for_user,
# )
# from version import __version__
# # später: from db import get_conn, um Dashboards aus DB zu laden
# # ---------------------------------------------------------------------------
# # Home-Dashboard (als Funktion, kein eigenes .py nötig)
# # ---------------------------------------------------------------------------
# def home_dashboard(username: str, role: str):
# # st.header("Controlling-Portal")
# st.info(f"Willkommen, {username}!")
# # ---------------------------------------------------------------------------
# # Dashboard-Resolver (später: weitere Dashboards dynamisch laden)
# # ---------------------------------------------------------------------------
# def get_dashboard_renderer(code: str):
# """Liefert die passende Render-Funktion zum Dashboard-Code."""
# if code == "home":
# # internes Home-Dashboard
# return home_dashboard
# # Später: Module anhand 'code' aus dashboards-Package laden
# # Beispiel: dashboards/sales.py → code = "sales"
# try:
# module = importlib.import_module(f"dashboards.{code}")
# except ModuleNotFoundError:
# return None
# return getattr(module, "render", None)
# # ---------------------------------------------------------------------------
# # Sidebar Navigation (Home + später DB-Dashboards)
# # ---------------------------------------------------------------------------
# def build_sidebar(authenticator, username: str):
# role = get_role_for_user(username)
# fullname = get_fullname_for_user(username)
# with st.sidebar:
# st.write(f"**{fullname}** ({role})")
# authenticator.logout("Logout", "sidebar")
# st.divider()
# # Default-Auswahl
# if "selected_dashboard_code" not in st.session_state:
# st.session_state["selected_dashboard_code"] = "home"
# # Home ist immer da
# if st.button("🏠 Home", use_container_width=True):
# st.session_state["selected_dashboard_code"] = "home"
# # Platzhalter für Suche + DB-Dashboards:
# # hier später: Suchfeld + Expander aus DB-Result (DataFrame)
# # z.B.:
# # query = st.text_input("Dashboards suchen …")
# # df = load_dashboards_df()
# # ... filtern, gruppieren, Buttons setzen ...
# # ---------------------------------------------------------------------------
# # Login + App-Shell
# # ---------------------------------------------------------------------------
# def main():
# st.set_page_config(
# page_title=f"Co-App Start - V{__version__}",
# page_icon="🔒",
# layout="centered",
# )
# # --- Authenticator vorbereiten ---
# with open("config/auth.yaml", "r", encoding="utf-8") as f:
# base_config = yaml.load(f, Loader=SafeLoader)
# db_creds = load_credentials_from_db()
# base_config["credentials"] = db_creds
# authenticator = stauth.Authenticate(
# base_config["credentials"],
# base_config["cookie"]["name"],
# base_config["cookie"]["key"],
# base_config["cookie"]["expiry_days"],
# )
# # --- Login-View zeichnen ---
# try:
# authenticator.login(location="main", key="Login")
# except LoginError:
# authenticator.logout("ForceLogout", "sidebar")
# st.error("Sitzung ungültig. Bitte neu einloggen.")
# return
# auth_status = st.session_state.get("authentication_status")
# name = st.session_state.get("name")
# username = st.session_state.get("username")
# if auth_status is False:
# st.error("Login fehlgeschlagen.")
# return
# if auth_status is None:
# st.warning("Bitte Benutzername und Passwort eingeben.")
# return
# # ---- Ab hier eingeloggt (persistenter Cookie) ----
# # 1) Passwortwechsel erzwingen?
# if needs_password_change(username):
# st.warning("Du musst dein Passwort ändern, bevor du die Anwendung nutzen kannst.")
# with st.form("pw_change_form"):
# pw1 = st.text_input("Neues Passwort", type="password")
# pw2 = st.text_input("Neues Passwort (Wiederholung)", type="password")
# submitted = st.form_submit_button("Passwort ändern")
# if submitted:
# if not pw1 or not pw2:
# st.error("Bitte beide Passwortfelder ausfüllen.")
# return
# if pw1 != pw2:
# st.error("Passwörter stimmen nicht überein.")
# return
# if len(pw1) < 8:
# st.error("Passwort sollte mindestens 8 Zeichen lang sein.")
# return
# update_password(username, pw1, reset_flag=True)
# st.success("Passwort wurde geändert.")
# st.rerun()
# # solange new_pwd=1: keinen weiteren Inhalt anzeigen
# return
# # 2) Sidebar aufbauen (User, Logout, Navigation)
# build_sidebar(authenticator, username)
# # 3) Ausgewähltes Dashboard rendern
# role = get_role_for_user(username)
# selected_code = st.session_state.get("selected_dashboard_code", "home")
# renderer = get_dashboard_renderer(selected_code)
# if renderer is None:
# st.error(f"Kein Dashboard-Modul für '{selected_code}' gefunden.")
# return
# # Titel setzen (Home vs. andere Dashboards)
# if selected_code == "home":
# title = "Home"
# else:
# title = selected_code # später kannst du den Titel aus der DB holen
# st.title(title)
# renderer(username, role)
# if __name__ == "__main__":
# main()

133
app/migrate.py Normal file
View File

@@ -0,0 +1,133 @@
import sqlite3
from version import __version__
from pathlib import Path
import logging
from logging_config import setup_logging
import os
from contextlib import closing
import getpass
from auth import create_user
APP_ENV = os.environ.get("APP_ENV", "dev")
logger = setup_logging(APP_ENV)
logger.info(f"Starting migration - APP-Version {__version__}")
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).resolve().parents[1]
DB_DIR = BASE_DIR / "app" / "app_db"
DB_PATH = DB_DIR / "app.db"
MIGRATIONS_DIR = BASE_DIR / "migrations"
ADMIN_USERNAME = "admin"
print(BASE_DIR)
def get_connection() -> sqlite3.Connection:
DB_DIR.mkdir(exist_ok=True)
conn = sqlite3.connect(DB_PATH)
return conn
def get_applied_versions(conn: sqlite3.Connection) -> set[str]:
"""
Ließt aus schema_version, welche Migrationen schon gelaufen sind.
Falls die Tabelle noch nicht existiert → leeres Set.
"""
try:
cur = conn.execute("SELECT version FROM schema_version")
rows = cur.fetchall()
return {r[0] for r in rows}
except sqlite3.OperationalError:
# Tabelle existiert noch nicht (z.B. bei initialer DB)
return set()
def apply_migrations():
if not MIGRATIONS_DIR.exists():
raise SystemExit(f"Migrations-Ordner nicht gefunden: {MIGRATIONS_DIR}")
conn = get_connection()
try:
applied = get_applied_versions(conn)
# Alle .sql-Dateien alphabetisch sortiert
sql_files = sorted(MIGRATIONS_DIR.glob("*.sql"))
if not sql_files:
logger.info("Keine Migrationsdateien gefunden.")
return
for path in sql_files:
version = path.stem # z.B. '20250220_120000_initial'
if version in applied:
logger.info(f"[SKIP] {version} (bereits angewendet)")
continue
logger.info(f"[APPLY] {version}")
sql = path.read_text(encoding="utf-8")
try:
# Eine Transaktion pro Datei
with conn:
conn.executescript(sql)
except Exception as e:
logger.info(f"Fehler in Migration {version}: {e}")
# hier nicht weiter machen
raise
logger.info(f"Migrationen abgeschlossen. DB: {DB_PATH}")
create_admin_user()
finally:
conn.close()
def admin_exists() -> bool:
"""Prüft, ob der Admin-User bereits existiert."""
with closing(get_connection()) as conn:
row = conn.execute(
"SELECT 1 FROM users WHERE username = ?",
(ADMIN_USERNAME,),
).fetchone()
if row is not None:
print(row[0])
else:
print(ADMIN_USERNAME)
print("Kein Admin gefunden")
return row is not None
def create_admin_user():
if admin_exists():
logger.info("Adminkonto existiert bereits! Kein initiales Konto angelegt.")
return
logger.info("Adminkonto wird angelegt ...")
pw1 = getpass.getpass("Passwort: ")
pw2 = getpass.getpass("Passwort wiederholen: ")
if pw1 != pw2:
logger.warning("Passwörter stimmen nicht überein! Abbruch.")
return
ok = create_user(
username=ADMIN_USERNAME,
password=pw1,
role_id=1,
email="admin@co_app",
firstname="***",
lastname="admin"
)
if ok:
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' wurde angelegt.")
else:
# Sollte eigentlich nicht passieren, weil wir vorher geprüft haben,
# aber falls z.B. Parallelzugriff o.Ä.
logger.info(f"Admin-Benutzer '{ADMIN_USERNAME}' konnte nicht angelegt werden.")
if __name__ == "__main__":
apply_migrations()

0
app/pages/__init__.py Normal file
View File

153
app/pages/costobjects.py Normal file
View File

@@ -0,0 +1,153 @@
import streamlit as st
import pandas as pd
from data.scriptloader import get_sql
from data.db import load_data
from auth_runtime import require_login
from ui.sidebar import build_sidebar, hide_sidebar_if_logged_out
from auth import get_fullname_for_user
import numpy as np
st.set_page_config(page_title="Co-App Home", page_icon="🏠", layout="wide")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
@st.cache_data
def cache_data() -> pd.DataFrame:
"""
Load and cache the base dataset for this page.
Streamlit reruns the script on every interaction; caching avoids
repeated I/O and makes filtering feel instant.
"""
try:
df = load_data("ora_kostenobjekte", "oracle")
return df
except:
st.warning("Fehler beim Laden der Daten", icon="⚠️")
def sidebar_filters(df) -> dict:
"""
Render the sidebar UI and return the current filter selections.
This function should contain UI concerns only (widgets, layout),
and not data filtering logic, to keep the code maintainable.
"""
st.sidebar.header("Filter")
if st.sidebar.button("Refresh (Global)"):
cache_data.clear()
st.rerun()
year = st.sidebar.selectbox(label="Jahr", options=(2025, 2026), index=1)
filter_text = st.sidebar.text_input(label="Textsuche", placeholder="Suche Objekt, Text, Verantwortlicher")
col_s1, col_s2 = st.sidebar.columns(2)
with col_s1:
typ = st.sidebar.selectbox(label="Typ", options=sorted(df["typ"].dropna().unique()), index=1)
with col_s2:
obj = st.sidebar.multiselect("obj", sorted(df["obj"].dropna().unique()))
zgrp1 = st.sidebar.multiselect("ZGrp1", sorted(df["zgrp1"].dropna().unique()))
zgrp2 = st.sidebar.multiselect("ZGrp2", sorted(df["zgrp2"].dropna().unique()))
zgrp3 = st.sidebar.multiselect("ZGrp3", sorted(df["zgrp3"].dropna().unique()))
return {"year": year, "filter_text": filter_text, "typ": typ, "obj": obj, "zgrp1": zgrp1, "zgrp2": zgrp2, "zgrp3": zgrp3}
def build_mask(df: pd.DataFrame, sidebar_filter: dict) -> np.ndarray:
"""
Build a boolean mask based on filter selections.
The mask approach keeps the logic readable and makes it easy
to add more conditions later.
"""
mask = np.ones(len(df), dtype=bool)
filter_text = (sidebar_filter.get("filter_text") or "").strip()
if filter_text:
m1 = df["bezeichnung"].astype("string").str.contains(filter_text, case=False, na=False, regex=False)
m2 = df["verantwortlicher"].astype("string").str.contains(filter_text, case=False, na=False, regex=False)
m3 = df["vorgesetzter"].astype("string").str.contains(filter_text, case=False, na=False, regex=False)
mask &= (m1 | m2 | m3)
if sidebar_filter["year"]:
mask &= df["jahr"].eq(sidebar_filter["year"])
if sidebar_filter["typ"]:
mask &= df["typ"].eq(sidebar_filter["typ"])
if sidebar_filter["obj"]:
mask &= df["obj"].isin(sidebar_filter["obj"])
if sidebar_filter["zgrp1"]:
mask &= df["zgrp1"].isin(sidebar_filter["zgrp1"])
if sidebar_filter["zgrp2"]:
mask &= df["zgrp2"].isin(sidebar_filter["zgrp2"])
if sidebar_filter["zgrp3"]:
mask &= df["zgrp3"].isin(sidebar_filter["zgrp3"])
return mask
def render_table(df):
"""
Render the result table.
Keep this function presentation-only: it should not modify data.
"""
st.dataframe(df, hide_index=True, width="stretch", height="stretch")
def page():
"""
Page entry point: orchestrates data loading, UI, filtering, and rendering.
"""
# -----------------------------
# Data loading (cached)
# -----------------------------
df = cache_data()
# -----------------------------
# UI (Sidebar)
# -----------------------------
sidebar_filter = sidebar_filters(df)
# -----------------------------
# Business logic (filtering)
# -----------------------------
mask = build_mask(df, sidebar_filter)
# -----------------------------
# Presentation
# -----------------------------
df = df.sort_values(by="obj", key=lambda s: pd.to_numeric(s, errors="coerce"))
df_clean = df[df.columns[:-1]] # letzte Spalten entfernen (sysdate)
df_display = df_clean.loc[mask]
render_table(df_display)
col1, col2 = st.columns(2)
data_as_of = df["sysdate"].max()
row_count = len(df_display)
with col1:
st.text(f"Datenstand: {data_as_of}")
st.text("Datenquelle: Oracle (PENTA)")
with col2:
st.markdown(f"<div style='text-align: right;'>Anzahl Zeilen: {row_count}</div>", unsafe_allow_html=True)
if __name__ == "__main__":
df = page()

318
app/pages/dashboards.py Normal file
View File

@@ -0,0 +1,318 @@
import streamlit as st
from auth_runtime import require_login
from pathlib import Path
from tools.load_css import load_css
from app_db.app_db import get_list, send_cmd
from ui.selectboxes import get_groups, get_id
from tools.numgen import get_num, update_num
DASH_NAME = Path(__file__).stem # Hier muss die dash_id aus der DB stehen -> wird gegen die session_state geprüft (User-Berechtigung)
DASH_PATH = Path(__file__).resolve().parent
load_css()
st.set_page_config(page_title="Co-App Benutzer", page_icon=":material/person:", layout="wide", initial_sidebar_state="collapsed")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
def get_dashboards_from_pages():
dashboards = sorted(
d.stem
for d in DASH_PATH.glob("*.py")
if d.name != "__init__.py"
)
#dash_list = [""] + dashboards
return dashboards
def sidebar():
fullname = st.session_state.get("fullname")
role_text = st.session_state.get("role_text")
with st.sidebar:
st.logo("app/images/GMN_Logo_neu_rgb.png", size="small")
st.markdown(f"**{fullname}** ({role_text})")
col1, col2 = st.columns([2,2])
with col1:
authenticator.logout("Logout")
with col2:
if st.button("Home", use_container_width=True, icon=":material/home:"):
st.switch_page("pages/home.py")
dashborad()
#-------------------------------------------------------------------------------------
# Dialog Dashboard anlegen
#-------------------------------------------------------------------------------------
@st.dialog("Dashboard anlegen")
def dialog_create_dashboard(dash_id):
dash_types = ["page", "url"]
df_groups = get_groups()
groups = df_groups["group"].tolist()
dash_list = get_dashboards_from_pages()
dash_default_index = dash_list.index("default")
txt_dash_id = st.text_input("Dash-Id", value=dash_id, disabled=True)
col1, col2 = st.columns([2,1],vertical_alignment="center")
with col1:
txt_dash_text = st.text_input(label="Dashboard")
with col2:
is_active = st.checkbox(label="Aktiv", value=1)
txt_dash_description = st.text_area(label="Beschreibung")
cmb_dash_type = st.selectbox(label="Typ", options=dash_types)
if cmb_dash_type == "url":
txt_page_file = st.text_input(label="URL", placeholder="URL, z.B. https://www.gmn.de")
else:
txt_page_file = st.selectbox(label="Dashboard", options=dash_list, index=dash_default_index)
cmb_group = st.selectbox(label="Gruppe", options=groups, placeholder="Gruppe auswählen")
txt_order_no = st.text_input(label="Order-Nr")
if st.button("Save"):
sql = """
insert into dashboards (
dash_id,
dash_text,
dash_description,
group_id,
page_file,
dash_type,
active,
order_no
)
values (?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (
txt_dash_id,
txt_dash_text,
txt_dash_description,
get_id(cmb_group),
txt_page_file,
cmb_dash_type,
is_active,
txt_order_no,
)
if send_cmd(sql, params):
st.session_state.save_msg = f"✅ Dashboard '{txt_dash_text}' erfolgreich gespeichert"
update_num(dash_id,1,"numgen_dashboard")
else:
st.session_state.save_msg = "❌ Fehler beim Speichern"
st.rerun()
#-------------------------------------------------------------------------------------
# Dialog Dashboard löschen
#-------------------------------------------------------------------------------------
@st.dialog("Dashboard löschen")
def dialog_delete_dashboard(dash_id):
if dash_id == None:
st.write("kein Dashboard ausgewählt")
else:
df = get_list("select dash_text from dashboards where dash_id = ?",(dash_id,))
dash_text = df.iloc[0]["dash_text"]
st.write(f"Das Dashboard {dash_text} wird gelöscht! Sind Sie sicher?")
if st.button("Löschen"):
if send_cmd("delete from dashboards where dash_id = ?",(dash_id,)):
st.session_state.delete_msg = f"✅ Dashboard '{dash_text}' erfolgreich gelöscht!"
else:
st.session_state.delete_msg = f"❌ Daschboard '{dash_id}' konnte nicht gelöscht werden!"
st.rerun()
#-------------------------------------------------------------------------------------
# Dialog Dashboard bearbeiten
#-------------------------------------------------------------------------------------
@st.dialog("Dashboard bearbeiten")
def dialog_edit_dashboard(dash_id):
if dash_id == None:
st.write("kein Dashboard ausgewählt")
else:
sql = """
select
d.dash_id,
d.dash_text,
d.dash_description,
d.group_id || ' | ' || g.group_text as "group",
d.page_file,
d.dash_type,
d.active,
d.order_no
from
dashboards d
left join groups g
on d.group_id = g.group_id
where
d.dash_id = ?
"""
df = get_list(sql,(dash_id,))
# Index für Gruppe aus DB für selectbox ermitteln
df_groups = get_groups()
groups = df_groups["group"].tolist()
group = df.iloc[0]["group"]
try:
idx = groups.index(group)
except:
idx = None
# Index für Dash-Typ aus DB für selectbox ermitteln
dash_types = ["page", "url"]
dash_type_index = dash_types.index(df.iloc[0]["dash_type"])
dash_list = get_dashboards_from_pages()
dash_default_index = dash_list.index(df.iloc[0]["page_file"]) if df.iloc[0]["page_file"] in dash_list else dash_list.index("default")
col1, col2 = st.columns([2,1],vertical_alignment="center")
with col1:
txt_dash_text = st.text_input(label="Dashboard", value=df.iloc[0]["dash_text"])
with col2:
is_active = st.checkbox(label="Aktiv", value=df.iloc[0]["active"])
txt_dash_description = st.text_area(label="Beschreibung", value=df.iloc[0]["dash_description"])
cmb_dash_type = st.selectbox(label="Typ", options=dash_types, index=dash_type_index)
if cmb_dash_type == "url":
if df.iloc[0]["dash_type"] == "page":
txt_page_file = st.text_input(label="URL", value="", placeholder="URL, z.B. https://www.gmn.de")
else:
txt_page_file = st.text_input(label="URL", value=df.iloc[0]["page_file"])
dash_default_index = dash_list.index("default")
else:
txt_page_file = st.selectbox(label="Dashboard", options=dash_list, index=dash_default_index)
cmb_group = st.selectbox(label="Gruppe", options=groups, placeholder="Gruppe auswählen", index=idx)
txt_order_no = st.text_input(label="Order-Nr",value=df.iloc[0]["order_no"])
if st.button("Save"):
sql = """
update dashboards set
dash_text = ?,
dash_description = ?,
group_id = ?,
page_file = ?,
dash_type = ?,
active = ?,
order_no = ?
where dash_id = ?
"""
params = (txt_dash_text, txt_dash_description, get_id(cmb_group), txt_page_file, cmb_dash_type, is_active, txt_order_no, dash_id)
if send_cmd(sql, params):
st.session_state.save_msg = f"✅ Dashboard '{txt_dash_text}' erfolgreich geändert"
else:
st.session_state.save_msg = "❌ Fehler beim Speichern"
st.rerun()
def dashborad():
if "selected_dash_id" not in st.session_state:
st.session_state.selected_dash_id = None
#--------------------------------------------------------------------------------------------------
# Dashboard-Verwaltung
#--------------------------------------------------------------------------------------------------
df = get_list("""
select
d.dash_id,
d.dash_text,
d.dash_description,
d.group_id || ' | ' || g.group_text as "group",
d.page_file,
d.dash_type,
d.active,
d.order_no
from
dashboards d
left join groups g
on d.group_id = g.group_id
""")
col_find_dashboard, col_create_dashboard, col_edit_dashboard, col_delete_dashboard = st.columns([3,2,2,2], vertical_alignment="bottom")
with col_find_dashboard:
txt_search = st.text_input(label="Suche", label_visibility="hidden", placeholder="Dashboard, Beschreibung ...", icon=":material/search:")
with col_create_dashboard:
if st.button(label="Dashboard anlegen", use_container_width=True, icon=":material/add:"):
dialog_create_dashboard(get_num("numgen_dashboard"))
if "save_msg" in st.session_state:
st.toast(st.session_state.save_msg)
del st.session_state.save_msg
with col_edit_dashboard:
if st.button(label="Dashboard bearbeiten", use_container_width=True, icon=":material/edit:"):
if not st.session_state.selected_dash_id is None:
dialog_edit_dashboard(st.session_state.selected_dash_id)
else:
st.toast("❌ Bitte erst eine Zeile auswählen")
if "save_msg" in st.session_state:
st.toast(st.session_state.save_msg)
del st.session_state.save_msg
with col_delete_dashboard:
if st.button(label="Dashboard löschen", use_container_width=True, icon=":material/delete:"):
if not st.session_state.selected_dash_id is None:
dialog_delete_dashboard(st.session_state.selected_dash_id)
else:
st.toast("❌ Bitte erst eine Zeile auswählen")
if "delete_msg" in st.session_state:
st.toast(st.session_state.delete_msg)
del st.session_state.delete_msg
if txt_search.strip():
txt_search_norm = txt_search.strip().lower()
mask = (
df["dash_text"].fillna("").str.lower().str.contains(txt_search_norm)
| df["dash_description"].fillna("").str.lower().str.contains(txt_search_norm)
| df["group"].fillna("").str.lower().str.contains(txt_search_norm)
)
df_view = df.loc[mask].copy()
else:
df_view = df.copy()
event = st.dataframe(df_view,key="dash_data", on_select="rerun", selection_mode="single-row")
rows = event.selection.rows
if rows:
row_idx = rows[0]
st.session_state.selected_dash_id = int(df_view.iloc[row_idx]["dash_id"])
else:
st.session_state.selected_dash_id = None
if __name__ == "__main__":
sidebar()

3
app/pages/default.py Normal file
View File

@@ -0,0 +1,3 @@
import streamlit as st
st.header("Baustelle!")

210
app/pages/groups.py Normal file
View File

@@ -0,0 +1,210 @@
import streamlit as st
from auth_runtime import require_login
# from ui.sidebar import build_sidebar
from pathlib import Path
from tools.load_css import load_css
from app_db.app_db import get_list, send_cmd
from tools.numgen import get_num, update_num
DASH_NAME = Path(__file__).stem
load_css()
st.set_page_config(page_title="Co-App Benutzer", page_icon=":material/person:", layout="wide", initial_sidebar_state="collapsed")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
def sidebar():
fullname = st.session_state.get("fullname")
role_text = st.session_state.get("role_text")
with st.sidebar:
st.logo("app/images/GMN_Logo_neu_rgb.png", size="small")
st.markdown(f"**{fullname}** ({role_text})")
col1, col2 = st.columns([2,2])
with col1:
authenticator.logout("Logout")
with col2:
if st.button("Home", use_container_width=True, icon=":material/home:"):
st.switch_page("pages/home.py")
groups()
#-------------------------------------------------------------------------------------
# Dialog Gruppe anlegen
#-------------------------------------------------------------------------------------
@st.dialog("Gruppe anlegen")
def dialog_create_group(group_id):
txt_group_id = st.text_input("GrpId", value=group_id, disabled=True)
txt_group_text = st.text_input("Gruppenbezeichnung")
txt_description = st.text_area("Beschreibung")
txt_order_no = st.text_input("Reihenfolge")
if st.button("Save"):
if send_cmd(f"insert into groups (group_id, group_text, group_description, order_no) " \
"values (?, ?, ?, ?)",(txt_group_id, txt_group_text, txt_description, txt_order_no)):
st.session_state.save_msg = f"✅ Gruppe '{txt_group_text}' erfolgreich gespeichert"
update_num(group_id,1,"numgen_group")
else:
st.session_state.save_msg = "❌ Fehler beim Speichern"
st.rerun()
#-------------------------------------------------------------------------------------
# Dialog Gruppe bearbeiten
#-------------------------------------------------------------------------------------
@st.dialog("Gruppe bearbeiten")
def dialog_edit_group(group_id):
if group_id == None:
st.write("keine Gruppe ausgewählt")
else:
sql = """
select
group_id,
group_text,
group_description,
active,
order_no
from
groups
where
group_id = ?
order by
order_no
"""
df = get_list(sql,(group_id,))
col1, col2 = st.columns([2,1],vertical_alignment="center")
with col1:
txt_group_text = st.text_input(label="Gruppenbezeichnung", value=df.iloc[0]["group_text"])
with col2:
is_active = st.checkbox(label="Aktiv", value=df.iloc[0]["active"])
txt_group_description = st.text_area(label="Beschreibung", value=df.iloc[0]["group_description"])
txt_oder_no = st.text_input(label="Reihenfolge", value=df.iloc[0]["order_no"])
if st.button("Save"):
sql = """
update groups set
group_text = ?,
active = ?,
group_description = ?,
order_no = ?
where group_id = ?
"""
params = (txt_group_text, is_active, txt_group_description, txt_oder_no, group_id)
if send_cmd(sql, params):
st.session_state.save_msg = f"✅ Gruppe '{txt_group_text}' erfolgreich geändert"
else:
st.session_state.save_msg = "❌ Fehler beim Speichern"
st.rerun()
#-------------------------------------------------------------------------------------
# Dialog Gruppe löschen
#-------------------------------------------------------------------------------------
@st.dialog("Gruppe löschen")
def dialog_delete_group(group_id):
if id == None:
st.write("keine Gruppe ausgewählt")
else:
df = get_list("select group_text from groups where group_id = ?",(group_id,))
group_text = df.iloc[0]["group_text"]
st.write(f"Die Gruppe {group_text} wird gelöscht! Sind Sie sicher?")
if st.button("Löschen"):
if send_cmd("delete from groups where group_id = ?",(group_id,)):
st.session_state.delete_msg = f"✅ Gruppe '{group_text}' erfolgreich gelöscht!"
else:
st.session_state.delete_msg = f"❌ Gruppe '{group_text}' konnte nicht gelöscht werden!"
st.rerun()
def groups():
if "selected_group_id" not in st.session_state:
st.session_state.selected_group_id = None
#--------------------------------------------------------------------------------------------------
# Gruppenverwaltung
#--------------------------------------------------------------------------------------------------
df = get_list("""
select
group_id,
group_text,
group_description,
active,
order_no
from
groups
order by
order_no
""")
col_find_group, col_create_group, col_edit_group, col_delete_group = st.columns([3,2,2,2], vertical_alignment="bottom")
with col_find_group:
txt_search = st.text_input(label="Suche", label_visibility="hidden", placeholder="Gruppe", icon=":material/search:")
with col_create_group:
if st.button(label="Gruppe anlegen", use_container_width=True, icon=":material/add:"):
dialog_create_group(get_num("numgen_group"))
if "save_msg" in st.session_state:
st.toast(st.session_state.save_msg)
del st.session_state.save_msg
with col_edit_group:
if st.button(label="Gruppe bearbeiten", use_container_width=True, icon=":material/edit:"):
if not st.session_state.selected_group_id is None:
dialog_edit_group(st.session_state.selected_group_id)
else:
st.toast("❌ Bitte erst eine Zeile auswählen")
if "save_msg" in st.session_state:
st.toast(st.session_state.save_msg)
del st.session_state.save_msg
with col_delete_group:
if st.button(label="Gruppe löschen", use_container_width=True, icon=":material/delete:"):
if not st.session_state.selected_group_id is None:
dialog_delete_group(st.session_state.selected_group_id)
else:
st.toast("❌ Bitte erst eine Zeile auswählen")
if "delete_msg" in st.session_state:
st.toast(st.session_state.delete_msg)
del st.session_state.delete_msg
if txt_search.strip():
txt_search_norm = txt_search.strip().lower()
mask = (
df["group_text"].fillna("").str.lower().str.contains(txt_search_norm)
)
df_view = df.loc[mask].copy()
else:
df_view = df.copy()
event = st.dataframe(df_view, key="group_data", on_select="rerun", selection_mode="single-row", height="auto")
rows = event.selection.rows
if rows:
row_idx = rows[0]
st.session_state.selected_group_id = int(df_view.iloc[row_idx]["group_id"])
else:
st.session_state.selected_group_id = None
if __name__ == "__main__":
sidebar()

31
app/pages/home.py Normal file
View File

@@ -0,0 +1,31 @@
import streamlit as st
from auth_runtime import require_login
from ui.sidebar import build_sidebar
from auth import get_fullname_for_user
import pandas as pd
from numpy.random import default_rng as rng
from tools.load_css import load_css
load_css()
st.set_page_config(page_title="Co-App Home", page_icon=":material/home:", layout="centered", initial_sidebar_state="expanded")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
build_sidebar()
def home():
username = st.session_state.get("name")
st.header("Controlling-Portal")
st.info(f"Willkommen, {get_fullname_for_user(username)}!")
st.markdown("**Hier könnte eine Hinweistext für den Benutzer stehen**")
df = pd.DataFrame(rng(0).standard_normal((20, 3)), columns=["a", "b", "c"])
st.area_chart(df)
if __name__ == "__main__":
home()

View File

@@ -0,0 +1,255 @@
import streamlit as st
import pandas as pd
from data.scriptloader import get_sql
from data.db import get_conn
from auth_runtime import require_login
from ui.sidebar import build_sidebar, hide_sidebar_if_logged_out
from auth import get_fullname_for_user
from sqlalchemy import text
import duckdb
import altair as alt
from tools.helpers import display_value, calc_variance_pct
# hide_sidebar_if_logged_out()
st.set_page_config(page_title="Co-App Home", page_icon="🏠", layout="wide")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
DISPLAY_UNIT = "Mio. €"
def load_data():
sql = get_sql("ergebnis_kpi")
engine = get_conn("co_dw")
with engine.connect() as conn:
df = pd.read_sql(text(sql), con=conn, params={"jahr": 2025, "monat": 12})
return df
# def calc_variance_pct(actual, plan):
# variance = actual - plan
# if plan == 0:
# return None
# if actual * plan < 0:
# return None
# return variance / abs(plan)
def build_dashboard():
df = load_data()
# st.dataframe(df)
be_ist_kum = df["akt_jahr"].sum()
#++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# Kalkulation KPI Betriebsergebnis
#++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
operating_result_actual_ytd = df["akt_jahr_bis_monat"].sum()
operating_result_actual_ytd_str = f"{int(operating_result_actual_ytd)/ANZ_EINHEIT:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") + " Mio. €"
operating_result_plan_ytd = df["plan_akt_jahr_bis_monat"].sum()
operating_result_actual_ytd_py = df["vor_jahr_bis_monat"].sum()
operating_result_variance = operating_result_actual_ytd - operating_result_plan_ytd
operating_result_vaiance_pct = calc_variance_pct(operating_result_actual_ytd, operating_result_plan_ytd)
#++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# Dashboard - Ebene 1 Betriebsergebnis
#++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
col_1, col_2 = st.columns(2, border=True,vertical_alignment="center")
with col_1:
st.metric(label="Betriebsergebnis", value=operating_result_actual_ytd)
operation_result_monthly = df["akt_jahr_monat"].sum()
be_ist_vorjahr_kum = df["vor_jahr"].sum()
be_ist_monat = df["akt_jahr_monat"].sum()
be_ist_vorjahr_monat = df["vor_jahr_monat"].sum()
be_ist_kum_anz = f"{int(be_ist_kum)/ANZ_EINHEIT:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") + " Mio. €"
be_ist_monat_anz = f"{int(be_ist_monat)/ANZ_EINHEIT:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") + " Mio. €"
be_ist_vorjahr_kum_anz = f"{int(be_ist_vorjahr_kum)/ANZ_EINHEIT:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") + " Mio. €"
be_ist_monat_vorjahr_anz = f"{int(be_ist_vorjahr_monat)/ANZ_EINHEIT:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".") + " Mio. €"
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# Ebene 1 - Ergebnis-KPIs
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# Beispiel-Daten (ersetze das durch deine echten Monatswerte)
months = pd.date_range(end=pd.Timestamp.today().normalize(), periods=12, freq="MS")
values = pd.Series([120, 80, -30, 50, 40, 10, -20, 60, 70, 30, 20, -10], index=months)
df = pd.DataFrame({"month": months, "BE": values}).set_index("month")
current = df["BE"].iloc[-1]
prev = df["BE"].iloc[-2]
delta = current - prev
col1, col2, col3 = st.columns(3)
with col1:
st.metric(
"Betriebsergebnis (intern)",
f"{current:,.0f}",
f"{delta:,.0f}"
)
with st.expander("Verlauf letzte 12 Monate", expanded=False):
# st.bar_chart(df["BE"])
# fig, ax = plt.subplots()
# ax.plot(df.index, df["BE"], marker="o")
# ax.axhline(0, linewidth=1) # Nulllinie für negative Werte
# ax.set_xlabel("")
# ax.set_ylabel("€")
# ax.tick_params(axis="x", rotation=45)
# st.pyplot(fig, clear_figure=True)
df_reset = df.reset_index()
df_reset["Monat"] = df_reset["month"].dt.strftime("%Y-%m")
chart = (
alt.Chart(df_reset)
.mark_bar(size=28) # Balkendicke
.encode(
x=alt.X("Monat:N", sort=None, title=""),
y=alt.Y("BE:Q", title=""),
color=alt.condition(
alt.datum.BE < 0,
alt.value("#d62728"), # rot
alt.value("#2ca02c"), # grün
)
)
)
labels = (
alt.Chart(df_reset)
.mark_text(dy=-8)
.encode(
x="Monat:N",
y="BE:Q",
text=alt.Text("BE:Q", format=",.0f")
)
)
spark = (
alt.Chart(df_reset)
.mark_line(point=True)
.encode(
x=alt.X("Monat:N", axis=None),
y=alt.Y("BE:Q", axis=None)
)
.properties(height=60)
)
st.altair_chart(spark, use_container_width=True)
# st.altair_chart(chart + labels, use_container_width=True)
# col_operating_result, col_contribution_margin = st.columns(2,border=True)
# col_ue_n, col_ae_f, col_ab_f = st.columns(3,border=True,)
# with col_ue_n:
# with st.expander(label="Umsatz",):
# st.metric(label="BE-Ist kumuliert (monat)",value=f"{be_ist_kum_anz} ({be_ist_monat_anz})", delta="-5", border=True)
# st.text("Umsatz")
# with col_ae_f:
# st.text("Auftragseingang fest")
# with col_ab_f:
# st.text("Auftragsbestand fest")
# with col_operating_result:
# internal_operating_result = f"BE = {be_ist_kum_anz} Mio. €"
# st.metric(label="BE-Ist kumuliert (monat)",value=f"{internal_operating_result} ({be_ist_monat_anz})", delta="-5", border=True)
# with st.expander(label=st.markdown(f"# {internal_operating_result}")):
# st.text("Verlauf...")
# # st.text("ERGTAB")
# st.metric(label="BE-Ist kumuliert (monat)",value=f"{be_ist_kum_anz} ({be_ist_monat_anz})", delta="-5", border=True)
# erg = duckdb.sql("""
# select
# 'Umsatz' as Kostenart,
# sum(case when co_koa_grp = 'CO1000' then akt_jahr else 0 end) as Aktuell,
# sum(case when co_koa_grp = 'CO1000' then plan_akt_jahr else 0 end) as Plan,
# sum(case when co_koa_grp = 'CO1000' then vor_jahr else 0 end) as Vorjahr
# from
# df
# """).fetchdf()
# st.metric(label="BE-Ist kumuliert (monat)",value=f"{be_ist_kum_anz} ({be_ist_monat_anz})", delta="-5", border=True)
# st.dataframe(erg, hide_index=True)
# col_erg_ist = st.columns(1)
# with col_erg_ist:
# st.metric(label="BE-Ist (monat)",value=be_ist_monat_anz, delta="-5", border=True)
# col1, col2 = st.columns(2)
# with col1:
# # st.metric(label="BE-Ist (kumuliert)",value=be_ist_kum_anz, delta="-5", border=True)
# with st.success("Ergebnis"):
# st.button(f"{be_ist_kum_anz}")
# # st.success("plus 10% zu Vorjahr")
# # with col2:
# # st.error("-10% zu Plan")
# # with col_erg_plan:
# # st.info("minus 5%")
# # st.warning("minus 10%")
# # st.success("plus 10%")
# # st.error("minus 20%")
# # with col_erg_vorjahr:
# # st.metric(label="BE-Vorjahr (kumuliert)",value=be_ist_vorjahr_kum_anz, delta="-5", border=True)
# # st.metric(label="BE-Vorjahr (monat)",value=be_ist_monat_anz, delta="-5", border=True)
# # st.dataframe(load_data())
if __name__ == "__main__":
df = build_dashboard()

213
app/pages/roles.py Normal file
View File

@@ -0,0 +1,213 @@
import streamlit as st
from auth_runtime import require_login
# from ui.sidebar import build_sidebar
from pathlib import Path
from tools.load_css import load_css
from app_db.app_db import get_list, send_cmd
from tools.numgen import get_num, update_num
DASH_NAME = Path(__file__).stem
load_css()
st.set_page_config(page_title="Co-App Benutzer", page_icon=":material/person:", layout="wide", initial_sidebar_state="collapsed")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
def sidebar():
fullname = st.session_state.get("fullname")
role_text = st.session_state.get("role_text")
with st.sidebar:
st.logo("app/images/GMN_Logo_neu_rgb.png", size="small")
st.markdown(f"**{fullname}** ({role_text})")
col1, col2 = st.columns([2,2])
with col1:
authenticator.logout("Logout")
with col2:
if st.button("Home", use_container_width=True, icon=":material/home:"):
st.switch_page("pages/home.py")
roles()
#-------------------------------------------------------------------------------------
# Dialog Rollen anlegen
#-------------------------------------------------------------------------------------
@st.dialog("Rolle anlegen")
def dialog_create_role(role_id):
txt_role_id = st.text_input("RoleId", value=role_id, disabled=True)
txt_role_text = st.text_input("Gruppenbezeichnung")
txt_description = st.text_area("Beschreibung")
txt_order_no = st.text_input("Reihenfolge")
if st.button("Save"):
if send_cmd(f"insert into roles (role_id, role_text, role_description, order_no) " \
"values (?, ?, ?, ?)",(txt_role_id, txt_role_text, txt_description, txt_order_no)):
st.session_state.save_msg = f"✅ Rolle '{txt_role_text}' erfolgreich gespeichert"
update_num(role_id,1,"numgen_role")
else:
st.session_state.save_msg = "❌ Fehler beim Speichern"
st.rerun()
#-------------------------------------------------------------------------------------
# Dialog Rollen bearbeiten
#-------------------------------------------------------------------------------------
@st.dialog("Rolle bearbeiten")
def dialog_edit_role(role_id):
if role_id == None:
st.write("keine Rolle ausgewählt")
else:
sql = """
select
role_id,
role_text,
role_description,
active,
order_no
from
roles
where
role_id = ?
order by
order_no
"""
df = get_list(sql,(role_id,))
col1, col2 = st.columns([2,1],vertical_alignment="center")
with col1:
txt_role_text = st.text_input(label="Rollenbezeichnung", value=df.iloc[0]["role_text"])
with col2:
is_active = st.checkbox(label="Aktiv", value=df.iloc[0]["active"])
txt_role_description = st.text_area(label="Beschreibung", value=df.iloc[0]["role_description"])
txt_oder_no = st.text_input(label="Reihenfolge", value=df.iloc[0]["order_no"])
if st.button("Save"):
sql = """
update roles set
role_text = ?,
active = ?,
role_description = ?,
order_no = ?
where role_id = ?
"""
params = (txt_role_text, is_active, txt_role_description, txt_oder_no, role_id)
if send_cmd(sql, params):
st.session_state.save_msg = f"✅ Rolle '{txt_role_text}' erfolgreich geändert"
else:
st.session_state.save_msg = "❌ Fehler beim Speichern"
st.rerun()
#-------------------------------------------------------------------------------------
# Dialog Rollen löschen
#-------------------------------------------------------------------------------------
@st.dialog("Rolle löschen")
def dialog_delete_role(role_id):
if role_id == None:
st.write("keine Rolle ausgewählt")
else:
df = get_list("select role_text from roles where role_id = ?",(role_id,))
role_text = df.iloc[0]["role_text"]
if role_text != "admin":
st.write(f"Die Rolle {role_text} wird gelöscht! Sind Sie sicher?")
if st.button("Löschen"):
if send_cmd("delete from roles where role_id = ?",(role_id,)):
st.session_state.delete_msg = f"✅ Rolle '{role_text}' erfolgreich gelöscht!"
else:
st.session_state.delete_msg = f"❌ Rolle '{role_text}' konnte nicht gelöscht werden!"
else:
st.session_state.delete_msg = f"❌ Rolle '{role_text}' darf nicht gelöscht werden!"
st.rerun()
def roles():
if "selected_role_id" not in st.session_state:
st.session_state.selected_role_id = None
#--------------------------------------------------------------------------------------------------
# Rollenverwaltung
#--------------------------------------------------------------------------------------------------
df = get_list("""
select
role_id,
role_text,
role_description,
active,
order_no
from
roles
order by
order_no
""")
col_find_role, col_create_role, col_edit_role, col_delete_role = st.columns([3,2,2,2], vertical_alignment="bottom")
with col_find_role:
txt_search = st.text_input(label="Suche", label_visibility="hidden", placeholder="Rolle", icon=":material/search:")
with col_create_role:
if st.button(label="Rolle anlegen", use_container_width=True, icon=":material/add:"):
dialog_create_role(get_num("numgen_role"))
if "save_msg" in st.session_state:
st.toast(st.session_state.save_msg,)
del st.session_state.save_msg
with col_edit_role:
if st.button(label="Rolle bearbeiten", use_container_width=True, icon=":material/edit:"):
if not st.session_state.selected_role_id is None:
dialog_edit_role(st.session_state.selected_role_id)
else:
st.toast("❌ Bitte erst eine Zeile auswählen")
if "save_msg" in st.session_state:
st.toast(st.session_state.save_msg)
del st.session_state.save_msg
with col_delete_role:
if st.button(label="Rolle löschen", use_container_width=True, icon=":material/delete:"):
if not st.session_state.selected_role_id is None:
dialog_delete_role(st.session_state.selected_role_id)
else:
st.toast("❌ Bitte erst eine Zeile auswählen")
if "delete_msg" in st.session_state:
st.toast(st.session_state.delete_msg)
del st.session_state.delete_msg
if txt_search.strip():
txt_search_norm = txt_search.strip().lower()
mask = (
df["role_text"].fillna("").str.lower().str.contains(txt_search_norm)
)
df_view = df.loc[mask].copy()
else:
df_view = df.copy()
event = st.dataframe(df_view, key="role_data", on_select="rerun", selection_mode="single-row", height="auto")
rows = event.selection.rows
if rows:
row_idx = rows[0]
st.session_state.selected_role_id = int(df_view.iloc[row_idx]["role_id"])
else:
st.session_state.selected_role_id = None
if __name__ == "__main__":
sidebar()

275
app/pages/user.py Normal file
View File

@@ -0,0 +1,275 @@
import streamlit as st
from auth_runtime import require_login
# from ui.sidebar import build_sidebar
from auth import create_user
from pathlib import Path
from tools.load_css import load_css
from app_db.app_db import get_list, send_cmd
from ui.selectboxes import get_roles, get_id
import bcrypt
DASH_NAME = Path(__file__).stem # Hier muss die dash_id aus der DB stehen -> wird gegen die session_state geprüft (User-Berechtigung)
load_css()
st.set_page_config(page_title="Co-App Benutzer", page_icon=":material/person:", layout="wide", initial_sidebar_state="collapsed")
authenticator = require_login()
st.session_state["authenticator"] = authenticator
def sidebar():
fullname = st.session_state.get("fullname")
role_text = st.session_state.get("role_text")
with st.sidebar:
st.logo("app/images/GMN_Logo_neu_rgb.png", size="small")
st.markdown(f"**{fullname}** ({role_text})")
col1, col2 = st.columns([2,2])
with col1:
authenticator.logout("Logout")
with col2:
if st.button("Home", use_container_width=True, icon=":material/home:"):
st.switch_page("pages/home.py")
user()
@st.dialog("Benutzer anlegen")
def dialog_create_user():
txt_username = st.text_input("Benutzername")
txt_firstname = st.text_input("Vorname")
txt_lastname = st.text_input("Nachname")
txt_email = st.text_input("Email")
txt_pwd = st.text_input("Kennwort", type="password")
cmb_role = st.selectbox("Rolle", get_roles(), index=None)
if st.button("Save"):
if create_user(
username=txt_username,
firstname=txt_firstname,
lastname=txt_lastname,
email=txt_email,
role_id=get_id(cmb_role),
password=txt_pwd
):
st.session_state.save_msg = f"✅ Benutzer '{txt_username}' erfolgreich gespeichert"
else:
st.session_state.save_msg = "❌ Fehler beim Speichern"
st.rerun()
@st.dialog("Benutzer löschen")
def dialog_delete_user(id):
if id == None:
st.write("kein Benutzer ausgewählt")
else:
df = get_list("select username from users where id = ?",(id,))
username = df.iloc[0]["username"]
st.write(f"Der Benutzer {username} wird gelöscht! Sind Sie sicher?")
if st.button("Löschen"):
if username != "admin":
if send_cmd("delete from users where id = ?",(id,)):
st.session_state.delete_msg = f"✅ Benutzer '{username}' erfolgreich gelöscht!"
else:
st.session_state.delete_msg = f"❌ Benutzer '{username}' konnte nicht gelöscht werden!"
else:
st.session_state.delete_msg = f"❌ Benutzer '{username}' darf nicht gelöscht werden!"
st.rerun()
@st.dialog("Benutzer bearbeiten")
def dialog_edit_user(id):
if id == None:
st.write("kein Benutzer ausgewählt")
else:
sql = """
select
u.id,
u.username as user, -- ACHTUNG: nicht mit username arbeiten, da Überschneidung in sessionstate!!
u.firstname,
u.lastname,
u.email,
u.role_id || ' | ' || r.role_text as role,
r.role_text,
u.new_pwd,
u.active
from
users u
left join roles r
on u.role_id = r.role_id
where u.id = ?
"""
df = get_list(sql,(id,))
# df = get_list("select username from users where id = ?",(id,))
# st.session_state.orig_user_data = df
df_roles = get_roles()
roles = df_roles["text"].tolist()
role = df.iloc[0]["role"]
try:
idx = roles.index(role)
except:
idx = None
col1, col2 = st.columns([2,1],vertical_alignment="center")
with col1:
txt_username = st.text_input(label="Benutzername", value=df.iloc[0]["user"])
with col2:
is_active = st.checkbox(label="Aktiv", value=df.iloc[0]["active"])
txt_firstname = st.text_input(label="Vorname", value=df.iloc[0]["firstname"])
txt_lastname = st.text_input(label="Nachname", value=df.iloc[0]["lastname"])
txt_email = st.text_input(label="Email", value=df.iloc[0]["email"])
txt_pwd = st.text_input(label="Passwort", placeholder="Neues Passwort eingeben", type="password")
new_pwd = st.checkbox(label="Neues Passwort", value=df.iloc[0]["new_pwd"])
cmb_role = st.selectbox(label="Rolle", options=roles, placeholder="Rolle auswählen", index=idx)
if st.button("Save"):
pw_hash = bcrypt.hashpw(txt_pwd.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
if txt_pwd and txt_pwd.strip():
sql = """
update users set
username = ?,
active = ?,
firstname = ?,
lastname = ?,
email = ?,
password_hash = ?,
new_pwd = ?,
role_id = ?
where id = ?
"""
params = (txt_username, is_active, txt_firstname, txt_lastname, txt_email, pw_hash, new_pwd, get_id(cmb_role), id)
# send_cmd(sql,(txt_username, txt_firstname, txt_lastname, txt_email, pw_hash, new_pwd, get_id(cmb_role), id))
else:
sql = """
update users set
username = ?,
active = ?,
firstname = ?,
lastname = ?,
email = ?,
new_pwd = ?,
role_id = ?
where id = ?
"""
params = (txt_username, is_active, txt_firstname, txt_lastname, txt_email, new_pwd, get_id(cmb_role), id)
# send_cmd(sql,(txt_username, txt_firstname, txt_lastname, txt_email, new_pwd, get_id(cmb_role), id))
print (params)
if send_cmd(sql, params):
st.session_state.save_msg = f"✅ Benutzer '{txt_username}' erfolgreich geändert"
else:
st.session_state.save_msg = "❌ Fehler beim Speichern"
st.rerun()
def user():
if "selected_user_id" not in st.session_state:
st.session_state.selected_user_id = None
# tab_user, tab_role, tab_permission = st.tabs(["Benutzer", "Rollen", "Berechtigungen"])
#--------------------------------------------------------------------------------------------------
# Benutzerverwaltung
#--------------------------------------------------------------------------------------------------
# with tab_user:
df = get_list("""
select
u.id,
u.username,
u.firstname,
u.lastname,
u.role_id || ' | ' || r.role_text as role,
u.new_pwd,
u.active
from
users u
left join roles r
on u.role_id = r.role_id
""")
col_find_user, col_create_user, col_edit_user, col_delete_user = st.columns([3,2,2,2], vertical_alignment="bottom")
with col_find_user:
txt_search = st.text_input(label="Suche", label_visibility="hidden", placeholder="Benutzer, Vorname, ...", icon=":material/search:")
with col_create_user:
if st.button(label="Benutzer anlegen", use_container_width=True, icon=":material/person_add:"):
dialog_create_user()
if "save_msg" in st.session_state:
st.toast(st.session_state.save_msg)
del st.session_state.save_msg
with col_edit_user:
if st.button(label="Benutzer bearbeiten", use_container_width=True, icon=":material/person_edit:"):
if not st.session_state.selected_user_id is None:
dialog_edit_user(st.session_state.selected_user_id)
else:
st.toast("❌ Bitte erst eine Zeile auswählen")
if "save_msg" in st.session_state:
st.toast(st.session_state.save_msg)
del st.session_state.save_msg
with col_delete_user:
if st.button(label="Benutzer löschen", use_container_width=True, icon=":material/person_remove:"):
if not st.session_state.selected_user_id is None:
dialog_delete_user(st.session_state.selected_user_id)
else:
st.toast("❌ Bitte erst eine Zeile auswählen")
if "delete_msg" in st.session_state:
st.toast(st.session_state.delete_msg)
del st.session_state.delete_msg
if txt_search.strip():
txt_search_norm = txt_search.strip().lower()
mask = (
df["username"].fillna("").str.lower().str.contains(txt_search_norm)
| df["firstname"].fillna("").str.lower().str.contains(txt_search_norm)
| df["lastname"].fillna("").str.lower().str.contains(txt_search_norm)
)
df_view = df.loc[mask].copy()
else:
df_view = df.copy()
event = st.dataframe(df_view,key="data", on_select="rerun", selection_mode="single-row")
rows = event.selection.rows
if rows:
row_idx = rows[0]
st.session_state.selected_user_id = int(df_view.iloc[row_idx]["id"])
else:
st.session_state.selected_user_id = None
if __name__ == "__main__":
sidebar()

79
app/tools/helpers.py Normal file
View File

@@ -0,0 +1,79 @@
def calc_variance_pct(actual, plan):
"""
Calculates the percentage variance between actual and plan values
for reporting purposes.
The percentage variance is only returned if it is economically
meaningful and interpretable:
- The plan value must not be zero
- Actual and plan must have the same sign (no sign change)
In cases where a percentage variance would be misleading
(e.g. sign change from loss to profit), the function returns None
and absolute variance should be used instead.
Parameters
----------
actual : float | int
Actual (realized) value.
plan : float | int
Planned or budgeted value.
Returns
-------
float | None
Percentage variance relative to the absolute plan value,
or None if the percentage variance is not meaningful.
"""
variance = actual - plan
if plan == 0:
return None
if actual * plan < 0:
return None
return variance / abs(plan)
def display_value(value, unit):
"""
Formats a numeric KPI value for reporting output based on the configured
display unit (e.g. €, T€, Mio. €).
- Scales the input value according to DISPLAY_UNIT
- Applies European number formatting (thousands separator, decimal comma)
- Returns 'n/a' if the input value is None
Parameters
----------
value : float | int | None
Raw KPI value in base currency (e.g. EUR).
Returns
-------
str
Formatted value including display unit, ready for dashboard display.
"""
if value is None:
return "n/a"
unit_factors = {
"Mio. €": 1_000_000,
"T€": 1_000,
"": 1,
}
factor = unit_factors.get(unit, 1)
scaled = value / factor
formatted = f"{scaled:,.2f}"
formatted = (
formatted
.replace(",", "X")
.replace(".", ",")
.replace("X", ".")
)
return f"{formatted} {unit}"

8
app/tools/load_css.py Normal file
View File

@@ -0,0 +1,8 @@
import streamlit as st
from pathlib import Path
def load_css():
css_path = Path(__file__).parent.parent / ".streamlit" / "style.css"
if css_path.exists():
st.markdown(f"<style>{css_path.read_text()}</style>", unsafe_allow_html=True)

21
app/tools/numgen.py Normal file
View File

@@ -0,0 +1,21 @@
from app_db.app_db import get_value, send_cmd
from logging_config import setup_logging
import os
APP_ENV = os.environ.get("APP_ENV", "dev")
logger = setup_logging(APP_ENV)
def get_num(numgen):
num = get_value(f"select param_value from param where parameter = '{numgen}'")
#print(int(num) + step)
# update_num(num, step, numgen)
return num
def update_num(num, step, numgen):
print(num,step,numgen)
try:
send_cmd(f"update param set param_value = {int(num) + step} where parameter = '{numgen}'")
return True
except:
logger.warning(f"Fehler beim Hochzählen von {numgen}. Bitte manuel ändern!")
return False

32
app/ui/selectboxes.py Normal file
View File

@@ -0,0 +1,32 @@
from app_db.app_db import get_list
def get_roles():
sql = """
select
role_id || ' | ' || role_text as text
from
roles
"""
df = get_list(sql)
return df
def get_groups():
sql = """
select
group_id || ' | ' || group_text as "group"
from
groups
"""
df = get_list(sql)
return df
def get_id(id_text: str):
id = int(id_text.split("|")[0])
if not id:
id = ""
return id

98
app/ui/sidebar.py Normal file
View File

@@ -0,0 +1,98 @@
from contextlib import closing
import streamlit as st
from auth import get_fullname_for_user, get_role_for_user
from app_db.app_db import get_conn, get_list
# import sqlite3
def build_sidebar():
authenticator = st.session_state.get("authenticator")
username = st.session_state.get("name")
role_text = st.session_state.get("role_text")
fullname = st.session_state.get("fullname")
if not authenticator or not username:
return
df = st.session_state.get("df_sidebar")
with st.sidebar:
st.logo("app/images/GMN_Logo_neu_rgb.png", size="small")
st.markdown(f"**{fullname}** ({role_text})")
col1, col2 = st.columns([2,2])
with col1:
authenticator.logout("Logout")
with col2:
if st.button("Home", use_container_width=True, icon=":material/home:"):
st.switch_page("pages/home.py")
# --- Suchfeld ---
query = st.text_input("Menü-Suche", "", placeholder="z.B. Umsatz, Kosten, User ...")
query = query.strip()
# Aktive Seite ermitteln (für Expander-Status / Highlight)
current_page = st.session_state.get("_page_path")
# --- DF filtern, falls Suchbegriff gesetzt ---
if query:
mask = (
df["dash_text"].str.contains(query, case=False, na=False)
| df["group_text"].str.contains(query, case=False, na=False)
)
df_view = df[mask].copy()
else:
df_view = df
if df_view.empty:
st.info("Keine Einträge zum Suchbegriff gefunden.")
return
# --- Gruppiert durchlaufen ---
# df_view = df_view.sort_values(["group_order", "dash_order"]) # vorher noch sortieren
for group_text, df_group in df_view.groupby("group_text", sort=False): # und beim gruppieren nicht nach der Gruppe sortieren!
# Expander offen, wenn:
# - aktuelle Seite in dieser Gruppe liegt
group_open = any(
(row["page_file"] == current_page)
for _, row in df_group.iterrows()
)
with st.expander(group_text, expanded=(group_open or bool(query))):
for _, row in df_group.iterrows():
dash_type = row.get("dash_type")
page_file = row.get("page_file")
label = row.get("dash_text", "")
# Streamlit-Page
if dash_type == "page" and isinstance(page_file, str) and page_file.strip():
st.page_link(f"pages/{page_file}.py", label=label, help="Das ist die Hilfe")
# Externer Link
elif dash_type == "url" and isinstance(page_file, str) and page_file.strip():
st.markdown(f"[{label}]({page_file})")
# Platzhalter / Sonstiges
else:
st.write(f"▫️ {label}")
# Damit die leere Sidebar komplett verschwindet nach dem Logout
def hide_sidebar_if_logged_out():
if st.session_state.get("authentication_status") != True:
st.markdown("""
<style>
/* komplette Sidebar + Toggle ausblenden */
[data-testid="stSidebar"] {display: none;}
[data-testid="stSidebarNav"] {display: none;}
[data-testid="collapsedControl"] {display: none;}
/* Content wieder ganz nach links */
[data-testid="stAppViewContainer"] {
margin-left: 0;
}
</style>
""", unsafe_allow_html=True)

1
app/version.py Normal file
View File

@@ -0,0 +1 @@
__version__ = "0.0.1"

7
config/auth.yaml Normal file
View File

@@ -0,0 +1,7 @@
cookie:
expiry_days: 7
key: "tHr1FXOYg-xsYBGVu7amRrY8PAdC2gM_zyjPEPPkPgG8tAsLY4QGPZvCvMS9win0D94uawAxsmZjN6O_VlRhUQ"
name: "co_app-session"
preauthorized:
emails: []

10
config/settings.env Normal file
View File

@@ -0,0 +1,10 @@
# Databases
# Oracle
oracle_conn_str=oracle+oracledb://sp84p:data@ora:1522/?service_name=prod84
# gmn-conn\co_dw
co_dw_conn_str = mssql+pyodbc://co_app:JRHmi1KLwjgnF6@gmn-cont\controlling/co_dw?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes
# gmn-conn\co_daten
co_daten_conn_str = mssql+pyodbc://dw_user:Schneewittchen%4089887%%21@gmn-cont\controlling/co_daten?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes

View File

@@ -0,0 +1,20 @@
BEGIN;
CREATE TABLE IF NOT EXISTS schema_version (
version TEXT PRIMARY KEY,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
username TEXT UNIQUE NOT NULL,
password_hash BLOB NOT NULL,
role TEXT NOT NULL DEFAULT 'guest'
);
INSERT INTO schema_version (version) VALUES ('20251128_210200_initial');
COMMIT;

View File

@@ -0,0 +1,13 @@
begin;
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
username TEXT NOT NULL,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
FOREIGN KEY (username) REFERENCES users(username) ON DELETE CASCADE
);
INSERT INTO schema_version (version) VALUES ('20251129_162900_add_table_sessions');
COMMIT;

View File

@@ -0,0 +1,8 @@
begin;
alter table users
add column email text not null;
INSERT INTO schema_version (version) VALUES ('20251130_124700_add_col_email_users');
COMMIT;

View File

@@ -0,0 +1,11 @@
begin;
alter table users
add column firstname text;
alter table users
add column lastname text;
INSERT INTO schema_version (version) VALUES ('20251130_1411_add_col_name_users');
COMMIT;

View File

@@ -0,0 +1,8 @@
begin;
alter table users
add column new_pwd integer not null default 1;
INSERT INTO schema_version (version) VALUES ('20251130_161100_add_col_newpwd_users');
COMMIT;

View File

@@ -0,0 +1,42 @@
begin;
create table if not exists dashboards (
dash_id integer unique not null,
dash_text text not null,
dash_description text,
group_id integer not null,
active integer not null default 1,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement
);
create table if not exists groups (
group_id integer unique not null,
group_text text not null,
group_description text,
active integer not null default 1,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement
);
create table if not exists roles (
role_id integer unique not null,
role_text text not null,
role_description text,
active integer not null default 1,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement
);
create table if not exists permissions (
role_id integer not null,
dash_id integer not null,
active integer not null default 1,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement,
unique (role_id, dash_id)
);
INSERT INTO schema_version (version) VALUES ('20251130_191100_add_table_dashboards');
COMMIT;

View File

@@ -0,0 +1,14 @@
begin;
alter table dashboards
add column zgrp1 text;
alter table dashboards
add column zgrp2 text;
alter table dashboards
add column zgrp3 text;
INSERT INTO schema_version (version) VALUES ('20251130_200000_add_col_area');
COMMIT;

View File

@@ -0,0 +1,11 @@
begin;
alter table dashboards
add column order_no integer default 0;
alter table groups
add column order_no integer default 0;
INSERT INTO schema_version (version) VALUES ('20251130_200600_add_col_order');
COMMIT;

View File

@@ -0,0 +1,8 @@
begin;
alter table users
rename column role to role_id;
INSERT INTO schema_version (version) VALUES ('20251203_201300_rename_col_table_role');
COMMIT;

View File

@@ -0,0 +1,8 @@
begin;
alter table users
add column active integer default 1;
INSERT INTO schema_version (version) VALUES ('20251203_202200_add_col_active_table_users');
COMMIT;

View File

@@ -0,0 +1,11 @@
begin;
alter table roles
add column order_no integer;
alter table permissions
add column order_no integer;
INSERT INTO schema_version (version) VALUES ('20251203_202900_add_col_order_tables');
COMMIT;

View File

@@ -0,0 +1,11 @@
begin;
alter table dashboards
add column page_file text;
alter table dashboards
add column dash_type text;
INSERT INTO schema_version (version) VALUES ('20251204_210700_add_col_page_dashboards');
COMMIT;

View File

@@ -0,0 +1,14 @@
begin;
create table if not exists param (
parameter text unique not null,
parameter_text text not null,
param_description text,
param_value text not null,
date_create TEXT NOT NULL DEFAULT (datetime('now')),
id integer primary key autoincrement
);
INSERT INTO schema_version (version) VALUES ('20251218_211700_add_table_param');
COMMIT;