Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.secrets
4 changes: 2 additions & 2 deletions backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ COPY ./requirements.txt /code/requirements.txt
RUN pip install -r requirements.txt

COPY ./app /code/app
EXPOSE 8000
EXPOSE 3000

CMD ["fastapi", "run", "app/main.py", "--host", "0.0.0.0", "--port", "8000"]
CMD ["fastapi", "run", "app/main.py", "--host", "0.0.0.0", "--port", "3000"]
92 changes: 72 additions & 20 deletions backend/app/api/data.py → backend/app/api/posts.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,67 @@
from fastapi import APIRouter, File, UploadFile, Request, HTTPException

import datetime
import json
import re
import shutil
import time
import re

import json
from typing import List, Optional

from fastapi import APIRouter, File, HTTPException, Request, UploadFile, Cookie
from sqlmodel import Session, select, text

from app.models.blog_post import BlogPost, BlogPostCreate, BlogPostUpdate
from app.core.database import SessionDep
from typing import List, Optional
from app.models.blog_post import BlogPost, BlogPostCreate, BlogPostUpdate
from app.api.users import get_current_user
import bleach

router = APIRouter()


def sanitise_html(html: str) -> str:
return html
allowed_tags = [
"b",
"i",
"u",
"em",
"strong",
"a",
"p",
"ul",
"ol",
"li",
"br",
"blockquote",
"code",
"pre",
"img",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
]

allowed_attrs = {
"a": ["href", "title", "target", "rel"],
"img": ["src", "alt", "title", "width", "height"],
}

# Allowed protocols for href/src (data for embedded images)
allowed_protocols = ["http", "https", "mailto", "data"]

cleaned_html = bleach.clean(
html,
tags=allowed_tags,
attributes=allowed_attrs,
protocols=allowed_protocols,
strip=True, # Strip disallowed tags instead of escaping
)

return cleaned_html


def generate_unique_slug(title: str, db: Session) -> str:
# Slugify title (basic version; you can improve with unidecode or slugify lib)
# Slugify title (improve with unidecode or slugify lib)
base_slug = re.sub(r"[^a-z0-9]+", "-", title.lower()).strip("-")
slug = base_slug
counter = 1
Expand All @@ -32,12 +72,14 @@ def generate_unique_slug(title: str, db: Session) -> str:


@router.post("/posts/create_post", response_model=BlogPost)
def create_post(post: BlogPostCreate, db: SessionDep):
def create_post(post: BlogPostCreate, db: SessionDep, session_id: str = Cookie(None)):
user = get_current_user(db, session_id)
created_post = BlogPost(
title=post.title,
content=sanitise_html(post.content),
slug=generate_unique_slug(post.title, db),
cover_image=post.cover_image,
user_id=user.id,
)
db.add(created_post)
db.commit()
Expand All @@ -50,31 +92,37 @@ def read_posts(db: SessionDep):
return db.exec(select(BlogPost)).all()


@router.get("/posts/id/{post_id}", response_model=BlogPost)
def read_post(post_id: int, db: SessionDep):
post = db.get(BlogPost, post_id)
@router.get("/posts/slug/{post_slug}", response_model=BlogPost)
def read_post(post_slug: str, db: SessionDep):
post = db.exec(select(BlogPost).where(BlogPost.slug == post_slug)).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post


@router.get("/posts/{post_slug}", response_model=BlogPost)
def read_post(post_slug: str, db: SessionDep):
post = db.exec(select(BlogPost).where(BlogPost.slug == post_slug)).first()
@router.get("/posts/id/{post_id}", response_model=BlogPost)
def read_post_by_id(post_id: int, db: SessionDep):
post = db.get(BlogPost, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post


@router.put("/posts/{post_id}", response_model=BlogPost)
@router.put("/posts/id/{post_id}", response_model=BlogPost)
def update_post(
post_id: int,
updated_post: BlogPostUpdate,
db: SessionDep,
session_id: str = Cookie(None),
):
db_post = db.get(BlogPost, post_id)
if not db_post:
raise HTTPException(status_code=404, detail="Post not found")
user = get_current_user(db, session_id)
if user.id != db_post.user_id:
raise HTTPException(
status_code=403, detail="Not authorized to update this post"
)

if updated_post.title is not None:
db_post.title = updated_post.title
Expand All @@ -86,18 +134,22 @@ def update_post(
if updated_post.cover_image is not None:
db_post.cover_image = updated_post.cover_image

# db_post.timestamp = int(time.time())
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post


@router.delete("/posts/{post_id}")
def delete_post(post_id: int, db: SessionDep):
@router.delete("/posts/id/{post_id}")
def delete_post(post_id: int, db: SessionDep, session_id: str = Cookie(None)):
post = db.get(BlogPost, post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
user = get_current_user(db, session_id)
if user.id != post.user_id:
raise HTTPException(
status_code=403, detail="Not authorized to delete this post"
)
db.delete(post)
db.commit()
return {"detail": "Post deleted"}
134 changes: 134 additions & 0 deletions backend/app/api/users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import datetime
import json
import os
import re
import shutil
import time
from pathlib import Path
from typing import List, Optional

import bcrypt
from dotenv import load_dotenv
from fastapi import (APIRouter, Cookie, Depends, File, HTTPException, Request,
Response, UploadFile, status)
from itsdangerous import TimestampSigner
from sqlmodel import Session, select, text

from app.core.database import SessionDep, get_session
from app.models.blog_post import BlogPost, BlogPostCreate, BlogPostUpdate
from app.models.session import SessionModel
from app.models.user import User, UserCreate, UserLogin

secret_file = Path("/run/secrets/backend_secret_key")
SECRET_KEY = secret_file.read_text().strip()
signer = TimestampSigner(SECRET_KEY)

router = APIRouter()


def hash_password(password: str) -> str:
hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode()
return hashed


def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(
plain_password.encode("utf-8"), hashed_password.encode("utf-8")
)


def get_current_user(db: Session, session_id: str):
print(f"session_id: {session_id}, type: {type(session_id)}")
if not session_id:
raise HTTPException(status_code=401, detail="Not logged in")

try:
unsigned_id = signer.unsign(session_id, max_age=86400).decode()
except Exception as e:
raise HTTPException(
status_code=401,
detail=f"Invalid or expired session {session_id}, {e}",
)

session_obj = db.get(SessionModel, unsigned_id)
if not session_obj or session_obj.expires_at < time.time():
raise HTTPException(status_code=401, detail="Session expired")

user = db.get(User, session_obj.user_id)
if not user:
raise HTTPException(status_code=401, detail="User not found")

return user


def cleanup_expired_sessions(db: Session):
now = int(time.time())
expired_sessions = db.exec(
select(SessionModel).where(SessionModel.expires_at < now)
).all()

for session in expired_sessions:
db.delete(session)

db.commit()

return len(expired_sessions)


@router.post("/users/", response_model=User, status_code=status.HTTP_201_CREATED)
def create_user(user: UserCreate, db: SessionDep, session_id: str = Cookie(None)):
user = get_current_user(db, session_id)
if not user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized to create users")
existing_user = db.exec(select(User).where(User.username == user.username)).first()
if existing_user:
raise HTTPException(status_code=400, detail="Username already registered")
new_user = User(
username=user.username,
hashed_password=hash_password(user.password),
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user


@router.post("/login/")
def login_attempt(login: UserLogin, response: Response, db: SessionDep):
user = db.exec(select(User).where(User.username == login.username)).first()
if not user or not verify_password(login.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")

# Check for existing session
existing_session = db.exec(
select(SessionModel).where(SessionModel.user_id == user.id)
).first()

if existing_session:
db.delete(existing_session)
db.commit()

session = SessionModel(user_id=user.id)
db.add(session)
db.commit()
db.refresh(session)

# Sign the session ID and set as HttpOnly cookie
signed_session_id = signer.sign(session.id).decode()
response.set_cookie(
"session_id",
signed_session_id,
httponly=True,
secure=False,
samesite="lax",
max_age=86400, # 1 day
path="/",
)

return {"message": "Login successful"}


@router.get("/me")
def read_me(db: SessionDep, session_id: str = Cookie(None)):
user = get_current_user(db, session_id)
return {"id": user.id, "username": user.username, "is_admin": user.is_admin}
2 changes: 1 addition & 1 deletion backend/app/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ def get_session():
yield session


SessionDep = Annotated[Session, Depends(get_session)]
SessionDep = Annotated[Session, Depends(get_session)]
38 changes: 38 additions & 0 deletions backend/app/core/db_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
from pathlib import Path

import bcrypt
from sqlmodel import select

from app.core.database import get_session
from app.models.user import User

ADMIN_USERNAME = os.getenv("ADMIN_USERNAME")
secret_file = Path("/run/secrets/admin_password")
ADMIN_PASSWORD = secret_file.read_text().strip()


def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()


def create_initial_admin():
db = next(get_session())
existing_admin = db.exec(
select(User).where(User.username == ADMIN_USERNAME)
).first()
if not existing_admin:
admin_user = User(
username=ADMIN_USERNAME,
hashed_password=hash_password(ADMIN_PASSWORD),
is_admin=True,
)
db.add(admin_user)
db.commit()
print(f"Admin user '{ADMIN_USERNAME}' created.")
else:
print(f"Admin user '{ADMIN_USERNAME}' already exists.")


if __name__ == "__main__":
create_initial_admin()
Loading