Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from flask import Flask

from config import Config
from app.extensions import db, migrate
from app.extensions import db, migrate, oauth
from app.storage import public_url


Expand All @@ -12,6 +12,17 @@ def create_app(config_class=Config):
db.init_app(app)
migrate.init_app(app, db)

# Authlib loads the realm's OIDC discovery doc on first use to pick up
# endpoints and JWKS — no need to hard-code per-realm URLs.
oauth.init_app(app)
oauth.register(
name="keycloak",
server_metadata_url=f"{app.config['OIDC_ISSUER_URL'].rstrip('/')}/.well-known/openid-configuration",
client_id=app.config["OIDC_CLIENT_ID"],
client_secret=app.config["OIDC_CLIENT_SECRET"],
Comment on lines +17 to +22
client_kwargs={"scope": "openid email profile"},
)

from app.auth import bp as auth_bp
from app.library import bp as library_bp
from app.admin import bp as admin_bp
Expand Down
49 changes: 6 additions & 43 deletions app/admin/routes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from flask import abort, flash, redirect, render_template, request, session, url_for
from werkzeug.security import generate_password_hash
from flask import flash, redirect, render_template, session, url_for
from sqlalchemy.orm import selectinload

from app.admin import bp
Expand Down Expand Up @@ -40,50 +39,14 @@ def users():
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)

# Prevent admin from deleting themselves

# Self-protect: an admin shouldn't be able to wipe their own row out from
# under their session. Disabling in Keycloak is the right escape hatch.
if user.id == session.get("user_id"):
flash("You cannot delete your own account.", "error")
return redirect(url_for("admin.users"))

db.session.delete(user)
db.session.commit()
flash(f"User '{user.username}' has been deleted.", "success")
return redirect(url_for("admin.users"))


@bp.route("/admin/users/<int:user_id>/reset-password", methods=["POST"])
@admin_required
def reset_password(user_id):
user = User.query.get_or_404(user_id)
new_password = request.form.get("new_password", "")

if not new_password or len(new_password) < 6:
flash("Password must be at least 6 characters.", "error")
return redirect(url_for("admin.users"))

user.password_hash = generate_password_hash(new_password)
db.session.commit()
flash(f"Password for '{user.username}' has been reset.", "success")
return redirect(url_for("admin.users"))


@bp.route("/admin/users/<int:user_id>/change-role", methods=["POST"])
@admin_required
def change_role(user_id):
user = User.query.get_or_404(user_id)
new_role = request.form.get("role", "borrower")

if new_role not in ["borrower", "admin"]:
flash("Invalid role.", "error")
return redirect(url_for("admin.users"))

# Prevent admin from changing their own role
if user.id == session.get("user_id"):
flash("You cannot change your own role.", "error")
return redirect(url_for("admin.users"))

user.role = new_role
db.session.delete(user)
db.session.commit()
flash(f"Role for '{user.username}' changed to '{new_role}'.", "success")
flash(f"Local mirror row for '{user.username}' deleted. Disable in Keycloak to block sign-in.", "success")
return redirect(url_for("admin.users"))
2 changes: 1 addition & 1 deletion app/admin/templates/admin/dashboard.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ <h1>Admin dashboard</h1>
</div>
{% endif %}
<p><a href="{{ url_for('library.index') }}">Manage Books</a> — View, edit, and delete books in the library.</p>
<p><a href="{{ url_for('admin.users') }}">Manage Users</a> — View, delete, and reset passwords for all registered users.</p>
<p><a href="{{ url_for('admin.users') }}">Manage Users</a> — View the local mirror of users signed in via Keycloak. Passwords and roles are managed in Keycloak.</p>
<p><a href="{{ url_for('admin.borrowers') }}">Borrower Dashboard</a> — See all signed-up accounts and their borrowed books, borrow dates, due dates, and return dates.</p>
<p class="muted">This page is only visible to users with the <code>admin</code> role. Borrowers who try to visit it get a <code>403 Forbidden</code>.</p>
<p><a href="{{ url_for('library.index') }}">← Back home</a></p>
Expand Down
29 changes: 12 additions & 17 deletions app/admin/templates/admin/users.html
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,22 @@
<div class="card">
<h1>Manage Users</h1>
<p><a href="{{ url_for('admin.dashboard') }}">← Back to dashboard</a></p>


<p class="muted">
Identity is managed in Keycloak. Use the Keycloak admin console to
create users, reset passwords, or change roles. Deleting a row here
only removes the local mirror; the user can sign in again to recreate
it.
</p>

{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="flash {{ category }}">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}

{% if users %}
<table>
<thead>
Expand All @@ -105,23 +112,11 @@ <h1>Manage Users</h1>
<td>{{ user.id }}</td>
<td>{{ user.username }}</td>
<td>{{ user.email }}</td>
<td>
<form method="POST" action="{{ url_for('admin.change_role', user_id=user.id) }}" style="display: flex; gap: 0.5rem;">
<select name="role" style="padding: 0.3rem; border-radius: 4px; border: 1px solid var(--border);">
<option value="borrower" {% if user.role == 'borrower' %}selected{% endif %}>Borrower</option>
<option value="admin" {% if user.role == 'admin' %}selected{% endif %}>Admin</option>
</select>
<button type="submit" class="btn btn-info" style="padding: 0.3rem 0.6rem; font-size: 0.8rem;">Save</button>
</form>
</td>
<td>{{ user.role }}</td>
<td class="muted">{{ user.created_at.strftime('%Y-%m-%d') }}</td>
<td class="actions">
<form method="POST" action="{{ url_for('admin.reset_password', user_id=user.id) }}" style="display: flex; gap: 0.5rem;">
<input type="password" name="new_password" placeholder="New password" required minlength="6">
<button type="submit" class="btn btn-warning">Reset</button>
</form>
<form method="POST" action="{{ url_for('admin.delete_user', user_id=user.id) }}" onsubmit="return confirm('Are you sure you want to delete this user?');">
<button type="submit" class="btn btn-danger">Delete</button>
<form method="POST" action="{{ url_for('admin.delete_user', user_id=user.id) }}" onsubmit="return confirm('Delete the local mirror row for this user? They can sign in again to recreate it. To prevent sign-in entirely, disable the user in Keycloak.');">
<button type="submit" class="btn btn-danger">Delete local row</button>
</form>
</td>
</tr>
Expand Down
148 changes: 80 additions & 68 deletions app/auth/routes.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,92 @@
from flask import redirect, render_template, request, session, url_for, flash, current_app
from werkzeug.security import generate_password_hash, check_password_hash
import logging
from urllib.parse import urlencode

logger = logging.getLogger(__name__)
from flask import current_app, redirect, request, session, url_for

from app.auth import bp
from app.extensions import db
from app.forms import SignupForm
from app.extensions import db, oauth
from app.models import User

@bp.route("/login", methods=["GET", "POST"])
logger = logging.getLogger(__name__)


@bp.route("/login")
def login():
error = None
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")

user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password_hash, password):
session["user_id"] = user.id
session["username"] = user.username
session["role"] = user.role
return redirect(url_for("library.index"))

error = "Invalid username or password."
return render_template("auth/login.html", error=error)


@bp.route("/signup", methods=["GET", "POST"])
def signup():
form = SignupForm()
if request.method == "POST":
logger.info(f"POST data: {request.form}")
logger.info(f"Form validate_on_submit: {form.validate_on_submit()}")
logger.info(f"Form errors: {form.errors}")

if form.validate_on_submit():
username = form.username.data.strip()
email = form.email.data.strip()
password = form.password.data

logger.info(f"Signup attempt: username={username}, email={email}")

# Check if user already exists
if User.query.filter_by(username=username).first():
flash("Username already exists.", "error")
return render_template("auth/signup.html", form=form)

if User.query.filter_by(email=email).first():
flash("Email already registered.", "error")
return render_template("auth/signup.html", form=form)

# Create new user with hashed password
try:
new_user = User(
username=username,
email=email,
password_hash=generate_password_hash(password),
role="borrower"
)
db.session.add(new_user)
db.session.commit()

logger.info(f"User created successfully: id={new_user.id}, username={username}")

flash("Account created! Please log in.", "success")
return redirect(url_for("auth.login"))
except Exception as e:
db.session.rollback()
logger.error(f"Error creating user: {e}")
flash(f"Error creating account: {str(e)}", "error")

return render_template("auth/signup.html", form=form)
# Authlib generates state + PKCE verifier and stashes them in the session;
# `authorize_redirect` returns a 302 to Keycloak's /auth endpoint.
redirect_uri = url_for("auth.callback", _external=True)
return oauth.keycloak.authorize_redirect(redirect_uri)


@bp.route("/callback")
def callback():
token = oauth.keycloak.authorize_access_token()
claims = token.get("userinfo") or {}

sub = claims.get("sub")
if not sub:
logger.error("OIDC callback returned no sub claim: %s", claims)
return redirect(url_for("auth.login"))

user = _upsert_user_from_claims(claims)

session.clear()
session["user_id"] = user.id
session["username"] = user.username
session["role"] = user.role
# id_token kept for RP-initiated logout (`id_token_hint`).
session["id_token"] = token.get("id_token")
return redirect(url_for("library.index"))


@bp.route("/logout", methods=["POST"])
def logout():
id_token = session.get("id_token")
session.clear()
return redirect(url_for("auth.login"))

issuer = current_app.config.get("OIDC_ISSUER_URL")
if not (id_token and issuer):
return redirect(url_for("auth.login"))

# RP-initiated logout: tells Keycloak to end its session too, otherwise
# the next /login bounces straight back without a credential prompt.
params = {
"id_token_hint": id_token,
"post_logout_redirect_uri": url_for("auth.login", _external=True),
}
end_session = f"{issuer.rstrip('/')}/protocol/openid-connect/logout"
return redirect(f"{end_session}?{urlencode(params)}")


def _upsert_user_from_claims(claims):
"""Find-or-create the local mirror row for the authenticated Keycloak user.

Lookup order is sub → email, so the existing seeded admin (no sub yet)
gets linked the first time they sign in via Keycloak.
"""
sub = claims["sub"]
email = claims.get("email") or f"{sub}@unknown.local"
username = claims.get("preferred_username") or email

Comment on lines +68 to +70
# Keycloak realm roles are surfaced into userinfo via a realm-roles mapper
# configured on the client (see the keycloak-config repo). Without that
# mapper, every user defaults to borrower.
admin_role = current_app.config["OIDC_ADMIN_ROLE"]
realm_roles = (claims.get("realm_access") or {}).get("roles", [])
role = "admin" if admin_role in realm_roles else "borrower"

user = User.query.filter_by(keycloak_sub=sub).first()
if user is None:
user = User.query.filter_by(email=email).first()

if user is None:
user = User(keycloak_sub=sub, username=username, email=email, role=role)
db.session.add(user)
else:
user.keycloak_sub = sub
user.username = username
user.email = email
user.role = role

db.session.commit()
return user
Loading