Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion backend/api-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ pyo3 = { version = "0.18.0", features = ["auto-initialize"] }
env_logger = "0.11"

# Shared logic crate
shared-logic = { path = "../shared-logic" }
shared-logic = { path = "../shared-logic" }

# CSV serialization/deserialization
csv = "1.4"
158 changes: 156 additions & 2 deletions backend/api-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use axum::{
extract::State,
extract::Path,
extract::Query,
http::StatusCode,
routing::{get, post},
Json,
Expand All @@ -17,17 +18,35 @@ use pyo3::Python;
use pyo3::types::{PyList, PyModule, PyTuple};
use pyo3::PyResult;
use pyo3::{IntoPy, ToPyObject};
use chrono::{DateTime, Utc};
use axum::http::{HeaderMap, HeaderValue, header};
use axum::response::IntoResponse;

// shared logic library
use shared_logic::db::{initialize_connection, DbClient};
use shared_logic::models::{User, NewUser, UpdateUser, Session, FrontendState};
use shared_logic::db::{initialize_connection, DbClient, get_eeg_data_by_range, get_earliest_eeg_timestamp, get_time_labels_by_range};
use shared_logic::models::{User, NewUser, UpdateUser, Session, FrontendState, TimeLabel, NewTimeLabel, EegDataRow, EegDataQuery};

// Define application state
#[derive(Clone)]
struct AppState {
db_client: DbClient,
}

// define request struct for exporting EEG data
#[derive(Deserialize)]
struct ExportEEGRequest {
filename: String,
options: ExportOptions
}

#[derive(Deserialize)]
struct ExportOptions {
format: String,
includeHeader: bool,
start_time: Option<DateTime<Utc>>,
end_time: Option<DateTime<Utc>>,
}

// creates new user when POST /users is called
async fn create_user(
State(app_state): State<AppState>,
Expand Down Expand Up @@ -194,6 +213,136 @@ async fn get_frontend_state(
}
}

// Handler for POST /api/sessions/{session_id}/eeg_data/export
async fn export_eeg_data(
State(app_state): State<AppState>,
Path(session_id): Path<i32>,
Json(request): Json<ExportEEGRequest>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
info!("Received request to export EEG data for session {}", session_id);

// right now the only export format supported is CSV, so we just check for that
if request.options.format.to_lowercase() != "csv" {
return Err((StatusCode::BAD_REQUEST, format!("Unsupported export format: {}", request.options.format)));
}

// check for time range, else use defaults
// for end time, we default to the current time
// for start time, we default to the earliest timestamp for the session
let end_time = match request.options.end_time {
Some(t) => t,
None => Utc::now(),
};

let start_time = match request.options.start_time {
Some(t) => t,
None => {
// we call the helper function in db.rs to get the earliest timestamp
match get_earliest_eeg_timestamp(&app_state.db_client, session_id).await {
Ok(Some(t)) => t,
Ok(None) => return Err((StatusCode::NOT_FOUND, format!("No EEG data found for session {}", session_id))),
Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get earliest EEG timestamp: {}", e))),
}
}
};

if start_time > end_time {
return Err((StatusCode::BAD_REQUEST, "start_time cannot be after end_time".to_string()));
}

let header_included = request.options.includeHeader;

// finally call the export function in db.rs
let return_csv = match shared_logic::db::export_eeg_data_as_csv(&app_state.db_client, session_id, start_time, end_time, header_included).await {
Ok(csv_data) => csv_data,
Err(e) => {
error!("Failed to export EEG data: {}", e);
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to export EEG data: {}", e)));
}
};

// small safety: avoid quotes breaking header
let filename = request.filename.replace('"', "");

let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("text/csv; charset=utf-8"));

let content_disp = format!("attachment; filename=\"{}\"", filename);
headers.insert(
header::CONTENT_DISPOSITION,
HeaderValue::from_str(&content_disp).map_err(|e| {
(StatusCode::BAD_REQUEST, format!("Invalid filename for header: {}", e))
})?,
);

// return CSV directly as the body
Ok((headers, return_csv))

}


// Handler for POST /api/sessions/{session_id}/time-label
// Receives a batch of time labels from the frontend after a session ends and stores them in the DB.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a Get for the time-label as well, and similar to the get_eeg_data, allow the frontend to pass in a range

async fn store_time_labels(
State(app_state): State<AppState>, // DB connection pool
Path(session_id): Path<i32>, // session ID from the URL path
Json(labels): Json<Vec<NewTimeLabel>>, // array of {timestamp, label} objects from the request body
) -> Result<StatusCode, (StatusCode, String)> {
info!("Received request to store {} time labels for session {}", labels.len(), session_id);

match shared_logic::db::insert_time_labels(&app_state.db_client, session_id, labels).await {
Ok(_) => {
info!("Time labels stored successfully for session {}", session_id);
Ok(StatusCode::CREATED) // 201 — write-only, nothing to return
}
Err(e) => {
error!("Failed to store time labels: {}", e);
Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to store time labels: {}", e)))
}
}
}

// Handler for GET /api/sessions/{session_id}/time-label
// Returns time labels within a given time range (passed as ?start=...&end=... query params).
async fn get_time_labels(
State(app_state): State<AppState>,
Path(session_id): Path<i32>,
Query(params): Query<EegDataQuery>,
) -> Result<Json<Vec<TimeLabel>>, (StatusCode, String)> {
info!("Received request to get time labels for session {} from {} to {}", session_id, params.start, params.end);

match get_time_labels_by_range(&app_state.db_client, session_id, params.start, params.end).await {
Ok(labels) => {
info!("Retrieved {} time labels", labels.len());
Ok(Json(labels))
}
Err(e) => {
error!("Failed to get time labels: {}", e);
Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get time labels: {}", e)))
}
}
}

// Handler for GET /api/sessions/{session_id}/eeg-data
// Returns EEG data rows within a given time range (passed as ?start=...&end=... query params).
async fn get_eeg_data(
State(app_state): State<AppState>, // DB connection pool
Path(session_id): Path<i32>, // session ID from the URL path
Query(params): Query<EegDataQuery>, // ?start=...&end=... parsed into EegDataQuery struct
) -> Result<Json<Vec<EegDataRow>>, (StatusCode, String)> {
info!("Received request to get EEG data for session {} from {} to {}", session_id, params.start, params.end);

match get_eeg_data_by_range(&app_state.db_client, session_id, params.start, params.end).await {
Ok(rows) => {
info!("Retrieved {} EEG data rows", rows.len());
Ok(Json(rows))
}
Err(e) => {
error!("Failed to get EEG data: {}", e);
Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get EEG data: {}", e)))
}
}
}

async fn run_python_script_handler() -> Result<Json<Value>, (StatusCode, String)> {
info!("Received request to run Python script.");
Expand Down Expand Up @@ -285,6 +434,11 @@ async fn main() {
.route("/api/sessions/:session_id/frontend-state", post(set_frontend_state))
.route("/api/sessions/:session_id/frontend-state", get(get_frontend_state))

.route("/api/sessions/:session_id/time-label", post(store_time_labels))
.route("/api/sessions/:session_id/time-label", get(get_time_labels))
.route("/api/sessions/:session_id/eeg-data", get(get_eeg_data))
.route("/api/sessions/:session_id/eeg_data/export", post(export_eeg_data))

// Share application state with all handlers
.with_state(app_state);

Expand Down
9 changes: 9 additions & 0 deletions backend/migrations/20260219120000_add_time_labels.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- add time_labels table to store event labels attached to EEG timestamps during sessions

CREATE TABLE IF NOT EXISTS time_labels (
id SERIAL PRIMARY KEY,
session_id INTEGER NOT NULL
REFERENCES sessions(id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL, -- timestamp of the EEG data with timezone info
label TEXT NOT NULL -- label for the EEG data
);
8 changes: 8 additions & 0 deletions backend/migrations/20263101120000_sessions_on_eeg_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- note that this assumes that eeg_data has no rows (since as of this migration there should be no real data yet)
-- to do so just run TRUNCATE TABLE eeg_data before applying this migration
ALTER TABLE eeg_data
ADD COLUMN session_id INTEGER NOT NULL,
ADD CONSTRAINT fk_session FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE;

-- we can create an index on session_id and time, since the bulk of our queries will be filtering based on these
CREATE INDEX eeg_data_session_time_idx ON eeg_data (session_id, time DESC); -- using DESC since i'm expecting recent data to be more relevant
5 changes: 4 additions & 1 deletion backend/shared-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ lsl = "0.1.1"

# working with python
pyo3 = { version = "0.18.0", features = ["auto-initialize"] }
numpy = "0.18"
numpy = "0.18"

# CSV serialization/deserialization
csv = "1.4"
10 changes: 7 additions & 3 deletions backend/shared-logic/src/bc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub async fn start_broadcast(
write: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
cancel_token: CancellationToken,
processing_config: ProcessingConfig, // takes in signal processing configuration from frontend
session_id: i32, // takes in session id to tag incoming data with the correct session
) {
let (tx, _rx) = broadcast::channel::<Arc<EEGDataPacket>>(1000); // size of the broadcast buffer, not recommand below 500, websocket will miss messages
let rx_ws = tx.subscribe();
Expand Down Expand Up @@ -53,7 +54,7 @@ pub async fn start_broadcast(

// Subscribe for database Receiver
tokio::spawn(async move {
db_receiver( rx_db).await;
db_receiver( rx_db, session_id).await;
});

//waits for sender to complete.
Expand Down Expand Up @@ -109,7 +110,10 @@ pub async fn ws_receiver(write: &Arc<Mutex<SplitSink<WebSocketStream<TcpStream>,

//db_broadcast_receiver takes EEGDataPacket struct from the broadcast sender and inserts it into the database
// it inserts as a batch of 100.
pub async fn db_receiver(mut rx_db: Receiver<Arc<EEGDataPacket>>){
pub async fn db_receiver(
mut rx_db: Receiver<Arc<EEGDataPacket>>,
session_id: i32,
){
let db_client = get_db_client();

let mut packet_count = 0; // for debug purposes
Expand All @@ -129,7 +133,7 @@ pub async fn db_receiver(mut rx_db: Receiver<Arc<EEGDataPacket>>){
// Insert the packet directly
tokio::spawn(async move {
let now = Instant::now(); // for debug purposes
if let Err(e) = insert_batch_eeg(&db_client_clone, &eeg_packet).await {
if let Err(e) = insert_batch_eeg(&db_client_clone, session_id, &eeg_packet).await {
error!("Packet insert failed: {:?}", e);
}
info!("Packet insert took {:?}", now.elapsed()); // for debug purposes
Expand Down
Loading