Files
co_app/app/db_users.py
2025-12-15 08:21:18 +01:00

74 lines
2.5 KiB
Python

# db_users.py
import sqlite3
from contextlib import contextmanager
from typing import Optional
import bcrypt
DB_PATH = "app.db"
@contextmanager
def get_conn():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db():
with get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
display_name TEXT,
email TEXT,
password_hash BLOB NOT NULL,
role TEXT NOT NULL DEFAULT 'user',
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
def list_users():
with get_conn() as conn:
rows = conn.execute("""
SELECT id, username, display_name, email, role, is_active, created_at
FROM users ORDER BY id
""").fetchall()
return [dict(r) for r in rows]
def get_user(user_id: int) -> Optional[dict]:
with get_conn() as conn:
r = conn.execute("""
SELECT id, username, display_name, email, role, is_active
FROM users WHERE id = ?
""", (user_id,)).fetchone()
return dict(r) if r else None
def create_user(username: str, password: str, display_name: str = "", email: str = "", role: str = "user", is_active: bool = True):
pw_hash = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
with get_conn() as conn:
conn.execute("""
INSERT INTO users (username, display_name, email, password_hash, role, is_active)
VALUES (?, ?, ?, ?, ?, ?)
""", (username, display_name, email, pw_hash, role, 1 if is_active else 0))
def update_user(user_id: int, display_name: str, email: str, role: str, is_active: bool):
with get_conn() as conn:
conn.execute("""
UPDATE users
SET display_name=?, email=?, role=?, is_active=?
WHERE id=?
""", (display_name, email, role, 1 if is_active else 0, user_id))
def set_password(user_id: int, new_password: str):
pw_hash = bcrypt.hashpw(new_password.encode("utf-8"), bcrypt.gensalt())
with get_conn() as conn:
conn.execute("UPDATE users SET password_hash=? WHERE id=?", (pw_hash, user_id))
def delete_user(user_id: int):
with get_conn() as conn:
conn.execute("DELETE FROM users WHERE id=?", (user_id,))