From deee6afd75a2e9111db36dcbc95a6a331e466cd3 Mon Sep 17 00:00:00 2001 From: itang06 Date: Thu, 19 Feb 2026 02:31:43 -0800 Subject: [PATCH 01/11] add migration for time_labels table --- backend/migrations/20260219120000_add_time_labels.sql | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 backend/migrations/20260219120000_add_time_labels.sql diff --git a/backend/migrations/20260219120000_add_time_labels.sql b/backend/migrations/20260219120000_add_time_labels.sql new file mode 100644 index 0000000..c7f44ec --- /dev/null +++ b/backend/migrations/20260219120000_add_time_labels.sql @@ -0,0 +1,9 @@ +-- add time_labels table to store event labels attached to EEG timestamps during sessions + +CREATE TABLE IF NOT EXISTS time_labels ( + id SERIAL PRIMARY KEY, + session_id INTEGER NOT NULL + REFERENCES sessions(id) ON DELETE CASCADE, + timestamp TIMESTAMPTZ NOT NULL, -- timestamp of the EEG data with timezone info + label TEXT NOT NULL -- label for the EEG data +); From d6274c14626171b333746e6839d028b6f412abbb Mon Sep 17 00:00:00 2001 From: itang06 Date: Thu, 19 Feb 2026 02:50:46 -0800 Subject: [PATCH 02/11] added model structs for time labels and EEG data queries --- backend/shared-logic/src/models.rs | 36 +++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/backend/shared-logic/src/models.rs b/backend/shared-logic/src/models.rs index d9fac77..92c51b5 100644 --- a/backend/shared-logic/src/models.rs +++ b/backend/shared-logic/src/models.rs @@ -45,6 +45,40 @@ pub struct Session { #[derive(Debug, Serialize, Deserialize, sqlx::FromRow)] pub struct FrontendState { pub session_id: i32, - pub data: Value, + pub data: Value, pub updated_at: chrono::DateTime, +} + +// Struct for a time label row coming OUT of the DB (includes all columns) +#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)] +pub struct TimeLabel { + pub id: i32, + pub session_id: i32, + pub timestamp: DateTime, + pub label: String, +} + +// Struct for a time label coming INTO the API from the frontend +// No id (auto-generated) or session_id (comes from URL path) +#[derive(Debug, Serialize, Deserialize)] +pub struct NewTimeLabel { + pub timestamp: DateTime, + pub label: String, +} + +// Struct for a row of EEG data coming OUT of the DB +#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)] +pub struct EegDataRow { + pub time: DateTime, + pub channel1: i32, + pub channel2: i32, + pub channel3: i32, + pub channel4: i32, +} + +// Struct for the query parameters on GET /api/sessions/{session_id}/eeg-data +#[derive(Debug, Deserialize)] +pub struct EegDataQuery { + pub start: DateTime, + pub end: DateTime, } \ No newline at end of file From 6ebf1127e0363c05624ae9902a12d9c30f176f3e Mon Sep 17 00:00:00 2001 From: itang06 Date: Thu, 19 Feb 2026 03:07:05 -0800 Subject: [PATCH 03/11] added DB functions for inserting time labels and querying EEG data by range --- backend/shared-logic/src/db.rs | 48 +++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/backend/shared-logic/src/db.rs b/backend/shared-logic/src/db.rs index b62018b..9c9b42e 100644 --- a/backend/shared-logic/src/db.rs +++ b/backend/shared-logic/src/db.rs @@ -7,7 +7,7 @@ use tokio::time::{self, Duration}; use log::{info, error, warn}; use chrono::{DateTime, Utc}; use dotenvy::dotenv; -use super::models::{User, NewUser, TimeSeriesData, UpdateUser, Session, FrontendState}; +use super::models::{User, NewUser, TimeSeriesData, UpdateUser, Session, FrontendState, NewTimeLabel, EegDataRow}; use crate::{lsl::EEGDataPacket}; use once_cell::sync::OnceCell; use std::sync::Arc; @@ -346,4 +346,50 @@ pub async fn get_frontend_state(client: &DbClient, session_id: i32) -> Result) -> Result<(), sqlx::Error> { + if labels.is_empty() { + info!("Skipping insert - no labels to insert"); + return Ok(()); + } + + let mut query_builder = sqlx::QueryBuilder::new( + "INSERT INTO time_labels (session_id, timestamp, label) " + ); + + query_builder.push_values(labels.iter(), |mut b, label| { + b.push_bind(session_id) + .push_bind(label.timestamp) + .push_bind(&label.label); + }); + + query_builder.build().execute(&**client).await?; + info!("Inserted {} time labels for session {}", labels.len(), session_id); + Ok(()) +} + +/// Get EEG data rows within a time range. +/// +/// Returns all rows from eeg_data where time is between start and end (inclusive), +/// ordered by time. Note: eeg_data has no session_id column, so this filters by +/// time range only — if two sessions overlap in time, their data will mix. +pub async fn get_eeg_data_by_range(client: &DbClient, start: DateTime, end: DateTime) -> Result, Error> { + info!("Retrieving EEG data from {} to {}", start, end); + + let data = sqlx::query_as!( + EegDataRow, + "SELECT time, channel1, channel2, channel3, channel4 FROM eeg_data WHERE time >= $1 AND time <= $2 ORDER BY time", + start, + end + ) + .fetch_all(&**client) + .await?; + + info!("Retrieved {} EEG data rows.", data.len()); + Ok(data) } \ No newline at end of file From e3daa864a3e14f4dbb8a5c92293d9051816fdbf3 Mon Sep 17 00:00:00 2001 From: itang06 Date: Fri, 20 Feb 2026 00:35:41 -0800 Subject: [PATCH 04/11] added API handlers and routes for time labels and EEG data endpoints --- backend/api-server/src/main.rs | 50 +++++++++++++++++++++++++++++++++- backend/shared-logic/src/db.rs | 7 ++--- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/backend/api-server/src/main.rs b/backend/api-server/src/main.rs index 88671c6..84f6da5 100644 --- a/backend/api-server/src/main.rs +++ b/backend/api-server/src/main.rs @@ -1,6 +1,7 @@ use axum::{ extract::State, extract::Path, + extract::Query, http::StatusCode, routing::{get, post}, Json, @@ -20,7 +21,7 @@ use pyo3::{IntoPy, ToPyObject}; // shared logic library use shared_logic::db::{initialize_connection, DbClient}; -use shared_logic::models::{User, NewUser, UpdateUser, Session, FrontendState}; +use shared_logic::models::{User, NewUser, UpdateUser, Session, FrontendState, NewTimeLabel, EegDataRow, EegDataQuery}; // Define application state #[derive(Clone)] @@ -195,6 +196,50 @@ async fn get_frontend_state( } +// Handler for POST /api/sessions/{session_id}/time-label +// Receives a batch of time labels from the frontend after a session ends and stores them in the DB. +async fn store_time_labels( + State(app_state): State, // DB connection pool + Path(session_id): Path, // session ID from the URL path + Json(labels): Json>, // array of {timestamp, label} objects from the request body +) -> Result { + info!("Received request to store {} time labels for session {}", labels.len(), session_id); + + match shared_logic::db::insert_time_labels(&app_state.db_client, session_id, labels).await { + Ok(_) => { + info!("Time labels stored successfully for session {}", session_id); + Ok(StatusCode::CREATED) // 201 — write-only, nothing to return + } + Err(e) => { + error!("Failed to store time labels: {}", e); + Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to store time labels: {}", e))) + } + } +} + +// Handler for GET /api/sessions/{session_id}/eeg-data +// Returns EEG data rows within a given time range (passed as ?start=...&end=... query params). +async fn get_eeg_data( + State(app_state): State, // DB connection pool + Path(session_id): Path, // session ID from the URL path + Query(params): Query, // ?start=...&end=... parsed into EegDataQuery struct +) -> Result>, (StatusCode, String)> { + info!("Received request to get EEG data for session {} from {} to {}", session_id, params.start, params.end); + + // Note: session_id is accepted for URL consistency but not used in the query — + // the eeg_data table has no session_id column, so we can only filter by time range. + match shared_logic::db::get_eeg_data_by_range(&app_state.db_client, params.start, params.end).await { + Ok(rows) => { + info!("Retrieved {} EEG data rows", rows.len()); + Ok(Json(rows)) + } + Err(e) => { + error!("Failed to get EEG data: {}", e); + Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get EEG data: {}", e))) + } + } +} + async fn run_python_script_handler() -> Result, (StatusCode, String)> { info!("Received request to run Python script."); @@ -285,6 +330,9 @@ async fn main() { .route("/api/sessions/:session_id/frontend-state", post(set_frontend_state)) .route("/api/sessions/:session_id/frontend-state", get(get_frontend_state)) + .route("/api/sessions/:session_id/time-label", post(store_time_labels)) + .route("/api/sessions/:session_id/eeg-data", get(get_eeg_data)) + // Share application state with all handlers .with_state(app_state); diff --git a/backend/shared-logic/src/db.rs b/backend/shared-logic/src/db.rs index 9c9b42e..f87be04 100644 --- a/backend/shared-logic/src/db.rs +++ b/backend/shared-logic/src/db.rs @@ -381,12 +381,11 @@ pub async fn insert_time_labels(client: &DbClient, session_id: i32, labels: Vec< pub async fn get_eeg_data_by_range(client: &DbClient, start: DateTime, end: DateTime) -> Result, Error> { info!("Retrieving EEG data from {} to {}", start, end); - let data = sqlx::query_as!( - EegDataRow, + let data = sqlx::query_as::<_, EegDataRow>( "SELECT time, channel1, channel2, channel3, channel4 FROM eeg_data WHERE time >= $1 AND time <= $2 ORDER BY time", - start, - end ) + .bind(start) + .bind(end) .fetch_all(&**client) .await?; From 633894ca768bdf69127194904d43ab43ad221f41 Mon Sep 17 00:00:00 2001 From: AgastyaRai Date: Sun, 8 Feb 2026 11:27:35 -0800 Subject: [PATCH 05/11] added export --- backend/Cargo.lock | 24 +++++ backend/api-server/Cargo.toml | 5 +- backend/api-server/src/main.rs | 90 +++++++++++++++- .../20263101120000_sessions_on_eeg_data.sql | 8 ++ backend/shared-logic/Cargo.toml | 5 +- backend/shared-logic/src/bc.rs | 10 +- backend/shared-logic/src/db.rs | 101 ++++++++++++++++-- backend/websocket-server/Cargo.toml | 5 +- backend/websocket-server/src/main.rs | 42 +++++--- 9 files changed, 254 insertions(+), 36 deletions(-) create mode 100644 backend/migrations/20263101120000_sessions_on_eeg_data.sql diff --git a/backend/Cargo.lock b/backend/Cargo.lock index f531389..9738c65 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -116,6 +116,7 @@ version = "0.1.0" dependencies = [ "axum", "chrono", + "csv", "dotenvy", "env_logger", "log", @@ -396,6 +397,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "data-encoding" version = "2.9.0" @@ -1793,6 +1815,7 @@ name = "shared-logic" version = "0.1.0" dependencies = [ "chrono", + "csv", "dotenvy", "futures-util", "log", @@ -2608,6 +2631,7 @@ dependencies = [ name = "websocket-server" version = "0.1.0" dependencies = [ + "csv", "dotenvy", "env_logger", "futures-util", diff --git a/backend/api-server/Cargo.toml b/backend/api-server/Cargo.toml index 4215795..f98083b 100644 --- a/backend/api-server/Cargo.toml +++ b/backend/api-server/Cargo.toml @@ -30,4 +30,7 @@ pyo3 = { version = "0.18.0", features = ["auto-initialize"] } env_logger = "0.11" # Shared logic crate -shared-logic = { path = "../shared-logic" } \ No newline at end of file +shared-logic = { path = "../shared-logic" } + +# CSV serialization/deserialization +csv = "1.4" \ No newline at end of file diff --git a/backend/api-server/src/main.rs b/backend/api-server/src/main.rs index 84f6da5..8e0a6a0 100644 --- a/backend/api-server/src/main.rs +++ b/backend/api-server/src/main.rs @@ -18,6 +18,9 @@ use pyo3::Python; use pyo3::types::{PyList, PyModule, PyTuple}; use pyo3::PyResult; use pyo3::{IntoPy, ToPyObject}; +use chrono::{DateTime, Utc}; +use axum::http::{HeaderMap, HeaderValue, header}; +use axum::response::IntoResponse; // shared logic library use shared_logic::db::{initialize_connection, DbClient}; @@ -29,6 +32,21 @@ struct AppState { db_client: DbClient, } +// define request struct for exporting EEG data +#[derive(Deserialize)] +struct ExportEEGRequest { + filename: String, + options: ExportOptions +} + +#[derive(Deserialize)] +struct ExportOptions { + format: String, + includeHeader: bool, + start_time: Option>, + end_time: Option>, +} + // creates new user when POST /users is called async fn create_user( State(app_state): State, @@ -195,6 +213,73 @@ async fn get_frontend_state( } } +// Handler for POST /api/sessions/{session_id}/eeg_data/export +async fn export_eeg_data( + State(app_state): State, + Path(session_id): Path, + Json(request): Json, +) -> Result { + info!("Received request to export EEG data for session {}", session_id); + + // right now the only export format supported is CSV, so we just check for that + if request.options.format.to_lowercase() != "csv" { + return Err((StatusCode::BAD_REQUEST, format!("Unsupported export format: {}", request.options.format))); + } + + // check for time range, else use defaults + // for end time, we default to the current time + // for start time, we default to the earliest timestamp for the session + let end_time = match request.options.end_time { + Some(t) => t, + None => Utc::now(), + }; + + let start_time = match request.options.start_time { + Some(t) => t, + None => { + // we call the helper function in db.rs to get the earliest timestamp + match shared_logic::db::get_earliest_eeg_timestamp(&app_state.db_client, session_id).await { + Ok(Some(t)) => t, + Ok(None) => return Err((StatusCode::NOT_FOUND, format!("No EEG data found for session {}", session_id))), + Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get earliest EEG timestamp: {}", e))), + } + } + }; + + if start_time > end_time { + return Err((StatusCode::BAD_REQUEST, "start_time cannot be after end_time".to_string())); + } + + let header_included = request.options.includeHeader; + + // finally call the export function in db.rs + let return_csv = match shared_logic::db::export_eeg_data_as_csv(&app_state.db_client, session_id, start_time, end_time, header_included).await { + Ok(csv_data) => csv_data, + Err(e) => { + error!("Failed to export EEG data: {}", e); + return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to export EEG data: {}", e))); + } + }; + + // small safety: avoid quotes breaking header + let filename = request.filename.replace('"', ""); + + let mut headers = HeaderMap::new(); + headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("text/csv; charset=utf-8")); + + let content_disp = format!("attachment; filename=\"{}\"", filename); + headers.insert( + header::CONTENT_DISPOSITION, + HeaderValue::from_str(&content_disp).map_err(|e| { + (StatusCode::BAD_REQUEST, format!("Invalid filename for header: {}", e)) + })?, + ); + + // return CSV directly as the body + Ok((headers, return_csv)) + +} + // Handler for POST /api/sessions/{session_id}/time-label // Receives a batch of time labels from the frontend after a session ends and stores them in the DB. @@ -226,9 +311,7 @@ async fn get_eeg_data( ) -> Result>, (StatusCode, String)> { info!("Received request to get EEG data for session {} from {} to {}", session_id, params.start, params.end); - // Note: session_id is accepted for URL consistency but not used in the query — - // the eeg_data table has no session_id column, so we can only filter by time range. - match shared_logic::db::get_eeg_data_by_range(&app_state.db_client, params.start, params.end).await { + match shared_logic::db::get_eeg_data_by_range(&app_state.db_client, session_id, params.start, params.end).await { Ok(rows) => { info!("Retrieved {} EEG data rows", rows.len()); Ok(Json(rows)) @@ -332,6 +415,7 @@ async fn main() { .route("/api/sessions/:session_id/time-label", post(store_time_labels)) .route("/api/sessions/:session_id/eeg-data", get(get_eeg_data)) + .route("/api/sessions/:session_id/eeg_data/export", post(export_eeg_data)) // Share application state with all handlers .with_state(app_state); diff --git a/backend/migrations/20263101120000_sessions_on_eeg_data.sql b/backend/migrations/20263101120000_sessions_on_eeg_data.sql new file mode 100644 index 0000000..95083df --- /dev/null +++ b/backend/migrations/20263101120000_sessions_on_eeg_data.sql @@ -0,0 +1,8 @@ +-- note that this assumes that eeg_data has no rows (since as of this migration there should be no real data yet) +-- to do so just run TRUNCATE TABLE eeg_data before applying this migration +ALTER TABLE eeg_data +ADD COLUMN session_id INTEGER NOT NULL, +ADD CONSTRAINT fk_session FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE; + +-- we can create an index on session_id and time, since the bulk of our queries will be filtering based on these +CREATE INDEX eeg_data_session_time_idx ON eeg_data (session_id, time DESC); -- using DESC since i'm expecting recent data to be more relevant \ No newline at end of file diff --git a/backend/shared-logic/Cargo.toml b/backend/shared-logic/Cargo.toml index ed01e32..bde9bff 100644 --- a/backend/shared-logic/Cargo.toml +++ b/backend/shared-logic/Cargo.toml @@ -43,4 +43,7 @@ lsl = "0.1.1" # working with python pyo3 = { version = "0.18.0", features = ["auto-initialize"] } -numpy = "0.18" \ No newline at end of file +numpy = "0.18" + +# CSV serialization/deserialization +csv = "1.4" \ No newline at end of file diff --git a/backend/shared-logic/src/bc.rs b/backend/shared-logic/src/bc.rs index 0ed09a6..12ca971 100644 --- a/backend/shared-logic/src/bc.rs +++ b/backend/shared-logic/src/bc.rs @@ -23,6 +23,7 @@ pub async fn start_broadcast( write: Arc, Message>>>, cancel_token: CancellationToken, processing_config: ProcessingConfig, // takes in signal processing configuration from frontend + session_id: i32, // takes in session id to tag incoming data with the correct session ) { let (tx, _rx) = broadcast::channel::>(1000); // size of the broadcast buffer, not recommand below 500, websocket will miss messages let rx_ws = tx.subscribe(); @@ -53,7 +54,7 @@ pub async fn start_broadcast( // Subscribe for database Receiver tokio::spawn(async move { - db_receiver( rx_db).await; + db_receiver( rx_db, session_id).await; }); //waits for sender to complete. @@ -109,7 +110,10 @@ pub async fn ws_receiver(write: &Arc, //db_broadcast_receiver takes EEGDataPacket struct from the broadcast sender and inserts it into the database // it inserts as a batch of 100. -pub async fn db_receiver(mut rx_db: Receiver>){ +pub async fn db_receiver( + mut rx_db: Receiver>, + session_id: i32, +){ let db_client = get_db_client(); let mut packet_count = 0; // for debug purposes @@ -129,7 +133,7 @@ pub async fn db_receiver(mut rx_db: Receiver>){ // Insert the packet directly tokio::spawn(async move { let now = Instant::now(); // for debug purposes - if let Err(e) = insert_batch_eeg(&db_client_clone, &eeg_packet).await { + if let Err(e) = insert_batch_eeg(&db_client_clone, session_id, &eeg_packet).await { error!("Packet insert failed: {:?}", e); } info!("Packet insert took {:?}", now.elapsed()); // for debug purposes diff --git a/backend/shared-logic/src/db.rs b/backend/shared-logic/src/db.rs index f87be04..eb61a65 100644 --- a/backend/shared-logic/src/db.rs +++ b/backend/shared-logic/src/db.rs @@ -11,11 +11,22 @@ use super::models::{User, NewUser, TimeSeriesData, UpdateUser, Session, Frontend use crate::{lsl::EEGDataPacket}; use once_cell::sync::OnceCell; use std::sync::Arc; +use serde::{Serialize, Deserialize}; pub static DB_POOL: OnceCell> = OnceCell::new(); pub type DbClient = Arc; +// struct for EEG rows to convert to CSV +#[derive(serde::Serialize)] +struct EEGCsvRow { + time: String, + channel1: i32, + channel2: i32, + channel3: i32, + channel4: i32, +} + pub async fn initialize_connection() -> Result { dotenv().ok(); let database_url = std::env::var("DATABASE_URL") @@ -118,7 +129,7 @@ pub async fn get_testtime_series_data(client: &DbClient) -> Result Result<(), sqlx::Error> { +pub async fn insert_batch_eeg(client: &DbClient, session_id: i32, packet: &EEGDataPacket) -> Result<(), sqlx::Error> { let n_samples = packet.timestamps.len(); @@ -130,7 +141,7 @@ pub async fn insert_batch_eeg(client: &DbClient, packet: &EEGDataPacket) -> Resu // Construct a single SQL insert statement let mut query_builder = sqlx::QueryBuilder::new( - "INSERT INTO eeg_data (time, channel1, channel2, channel3, channel4) " + "INSERT INTO eeg_data (session_id, time, channel1, channel2, channel3, channel4) " ); // Iterate through all data in the packet, pairing timestamp to the signal, and insert them @@ -138,6 +149,7 @@ pub async fn insert_batch_eeg(client: &DbClient, packet: &EEGDataPacket) -> Resu query_builder.push_values( (0..n_samples).map(|sample_idx| { ( + session_id, &packet.timestamps[sample_idx], packet.signals[0][sample_idx], // Channel 0 packet.signals[1][sample_idx], // Channel 1 @@ -145,8 +157,9 @@ pub async fn insert_batch_eeg(client: &DbClient, packet: &EEGDataPacket) -> Resu packet.signals[3][sample_idx], // Channel 3 ) }), - |mut b, (timestamp, ch0, ch1, ch2, ch3)| { - b.push_bind(timestamp) + |mut b, (session_id, timestamp, ch0, ch1, ch2, ch3)| { + b.push_bind(session_id) + .push_bind(timestamp) .push_bind(ch0) .push_bind(ch1) .push_bind(ch2) @@ -373,17 +386,17 @@ pub async fn insert_time_labels(client: &DbClient, session_id: i32, labels: Vec< Ok(()) } -/// Get EEG data rows within a time range. +/// Get EEG data rows for a given session within a time range. /// -/// Returns all rows from eeg_data where time is between start and end (inclusive), -/// ordered by time. Note: eeg_data has no session_id column, so this filters by -/// time range only — if two sessions overlap in time, their data will mix. -pub async fn get_eeg_data_by_range(client: &DbClient, start: DateTime, end: DateTime) -> Result, Error> { - info!("Retrieving EEG data from {} to {}", start, end); +/// Returns all rows from eeg_data where session_id matches and time is between +/// start and end (inclusive), ordered by time. +pub async fn get_eeg_data_by_range(client: &DbClient, session_id: i32, start: DateTime, end: DateTime) -> Result, Error> { + info!("Retrieving EEG data for session {} from {} to {}", session_id, start, end); let data = sqlx::query_as::<_, EegDataRow>( - "SELECT time, channel1, channel2, channel3, channel4 FROM eeg_data WHERE time >= $1 AND time <= $2 ORDER BY time", + "SELECT time, channel1, channel2, channel3, channel4 FROM eeg_data WHERE session_id = $1 AND time >= $2 AND time <= $3 ORDER BY time", ) + .bind(session_id) .bind(start) .bind(end) .fetch_all(&**client) @@ -391,4 +404,70 @@ pub async fn get_eeg_data_by_range(client: &DbClient, start: DateTime, end: info!("Retrieved {} EEG data rows.", data.len()); Ok(data) +} + +/// Export the EEG data for a given session ID and time range as a CSV string. +/// +/// Returns the CSV data on success. +pub async fn export_eeg_data_as_csv(client: &DbClient, session_id: i32, start_time: DateTime, end_time: DateTime, include_header: bool) -> Result { + info!("Exporting EEG data for session id {} from {} to {}", session_id, start_time, end_time); + + // get the data from the database + let data = sqlx::query!( + "SELECT time, channel1, channel2, channel3, channel4 FROM eeg_data + WHERE session_id = $1 AND time >= $2 AND time <= $3 + ORDER BY time ASC", + session_id, + start_time, + end_time + ) + .fetch_all(&**client) + .await?; + + // build the CSV using the csv crate + + let mut writer = csv::WriterBuilder::new() + .has_headers(false) + .from_writer(vec![]); + + // write the header based on include_header flag + if include_header { + writer.write_record(&["time", "channel1", "channel2", "channel3", "channel4"]) + .map_err(|e| Error::Protocol(e.to_string()))?; + } + + // now, iterate through the data and write each row + for row in data { + writer.serialize(EEGCsvRow { + time: row.time.to_rfc3339(), + channel1: row.channel1, + channel2: row.channel2, + channel3: row.channel3, + channel4: row.channel4, + }) + .map_err(|e| Error::Protocol(e.to_string()))?; + } + + let byte_stream = writer.into_inner() + .map_err(|e| Error::Protocol(e.to_string()))?; + + // now, we convert the CSV data to a string and return it + let csv_data = String::from_utf8(byte_stream) + .map_err(|e| Error::Protocol(e.to_string()))?; + + Ok(csv_data) +} + +/// Helper function for eeg data to find the earliest timestamp for a given session +/// +/// Returns the earliest timestamp on success. +pub async fn get_earliest_eeg_timestamp(client: &DbClient, session_id: i32) -> Result>, Error> { + let row = sqlx::query!( + "SELECT MIN(time) as earliest_time FROM eeg_data WHERE session_id = $1", + session_id + ) + .fetch_one(&**client) + .await?; + + Ok(row.earliest_time) } \ No newline at end of file diff --git a/backend/websocket-server/Cargo.toml b/backend/websocket-server/Cargo.toml index bad7c5e..f6452bd 100644 --- a/backend/websocket-server/Cargo.toml +++ b/backend/websocket-server/Cargo.toml @@ -14,5 +14,6 @@ tokio-util = "0.7.15" # Shared logic crate shared-logic = { path = "../shared-logic" } -serde = "1.0.228" -serde_json = "1" \ No newline at end of file +serde = { version = "1", features = ["derive"] } +serde_json = "1" +csv = "1.4" \ No newline at end of file diff --git a/backend/websocket-server/src/main.rs b/backend/websocket-server/src/main.rs index 3d9b41f..02e57de 100644 --- a/backend/websocket-server/src/main.rs +++ b/backend/websocket-server/src/main.rs @@ -16,6 +16,13 @@ use shared_logic::lsl::{ProcessingConfig}; // get ProcessingConfig from lsl.rs use dotenvy::dotenv; use log::{info, error}; use serde_json; // used to parse ProcessingConfig from JSON sent by frontend +use serde::Deserialize; + +#[derive(Deserialize)] +struct WebSocketInitMessage{ + session_id: i32, + processing_config: ProcessingConfig, +} #[tokio::main] @@ -88,40 +95,45 @@ async fn handle_connection(ws_stream: WebSocketStream) { // setup registration for signal processing configuration let signal_config = read.next().await; - // we have the ProcessingConfig struct - // check if we received a message (two layers of unwrapping needed) - let processing_config: ProcessingConfig = match signal_config { - - Some(Ok(config_json)) => { - - // here, we parse the json into a signal config struct using serde_json - let config_text = config_json.to_text().unwrap(); + // we have the WebSocketInitMessage struct, with a session id and processing config + // check if we received a message (some unwrapping needed) + let init_message: WebSocketInitMessage = match signal_config { + Some(Ok(msg)) => { + let text = match msg.to_text() { + Ok(t) => t, + Err(e) => { + error!("Failed to convert init message to text: {}", e); + return; + } + }; - match serde_json::from_str(config_text) { - Ok(config) => config, + match serde_json::from_str::(text) { + Ok(init_msg) => init_msg, Err(e) => { - error!("Error parsing signal configuration JSON: {}", e); + error!("Failed to parse init message JSON: {}", e); return; } } - } Some(Err(e)) => { - error!("Error receiving signal configuration: {}", e); + error!("Error receiving initialization message: {}", e); return; } None => { - error!("No signal configuration received from client. Closing connection."); + error!("No initialization message received from client. Closing connection."); return; } }; + let session_id = init_message.session_id; + let processing_config = init_message.processing_config; + // spawns the broadcast task let mut broadcast = Some(tokio::spawn(async move { // pass ProcessingConfig into broadcast so it reaches receive_eeg - start_broadcast(write_clone, cancel_clone, processing_config).await; + start_broadcast(write_clone, cancel_clone, processing_config, session_id).await; })); From b831799e0adcc9d542dcd6b4110ea73c3077489d Mon Sep 17 00:00:00 2001 From: itang06 Date: Wed, 25 Feb 2026 21:54:47 -0800 Subject: [PATCH 06/11] cherry picked eeg session_id migration and added filtering by session_id for get endpoint --- backend/shared-logic/src/db.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/backend/shared-logic/src/db.rs b/backend/shared-logic/src/db.rs index eb61a65..d4c8aaa 100644 --- a/backend/shared-logic/src/db.rs +++ b/backend/shared-logic/src/db.rs @@ -413,14 +413,14 @@ pub async fn export_eeg_data_as_csv(client: &DbClient, session_id: i32, start_ti info!("Exporting EEG data for session id {} from {} to {}", session_id, start_time, end_time); // get the data from the database - let data = sqlx::query!( + let data = sqlx::query_as::<_, EegDataRow>( "SELECT time, channel1, channel2, channel3, channel4 FROM eeg_data WHERE session_id = $1 AND time >= $2 AND time <= $3 ORDER BY time ASC", - session_id, - start_time, - end_time ) + .bind(session_id) + .bind(start_time) + .bind(end_time) .fetch_all(&**client) .await?; @@ -462,12 +462,12 @@ pub async fn export_eeg_data_as_csv(client: &DbClient, session_id: i32, start_ti /// /// Returns the earliest timestamp on success. pub async fn get_earliest_eeg_timestamp(client: &DbClient, session_id: i32) -> Result>, Error> { - let row = sqlx::query!( - "SELECT MIN(time) as earliest_time FROM eeg_data WHERE session_id = $1", - session_id + let earliest = sqlx::query_scalar::<_, Option>>( + "SELECT MIN(time) FROM eeg_data WHERE session_id = $1", ) + .bind(session_id) .fetch_one(&**client) .await?; - Ok(row.earliest_time) + Ok(earliest) } \ No newline at end of file From 6a5ea8831343214c28fbf5e6f39f2851b12ea1fd Mon Sep 17 00:00:00 2001 From: itang06 Date: Sun, 1 Mar 2026 00:54:13 -0800 Subject: [PATCH 07/11] used compile time query_as! in get_eeg_data_by_range --- backend/shared-logic/src/db.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/backend/shared-logic/src/db.rs b/backend/shared-logic/src/db.rs index d4c8aaa..8d0bcd0 100644 --- a/backend/shared-logic/src/db.rs +++ b/backend/shared-logic/src/db.rs @@ -393,12 +393,13 @@ pub async fn insert_time_labels(client: &DbClient, session_id: i32, labels: Vec< pub async fn get_eeg_data_by_range(client: &DbClient, session_id: i32, start: DateTime, end: DateTime) -> Result, Error> { info!("Retrieving EEG data for session {} from {} to {}", session_id, start, end); - let data = sqlx::query_as::<_, EegDataRow>( + let data = sqlx::query_as!( + EegDataRow, "SELECT time, channel1, channel2, channel3, channel4 FROM eeg_data WHERE session_id = $1 AND time >= $2 AND time <= $3 ORDER BY time", + session_id, + start, + end, ) - .bind(session_id) - .bind(start) - .bind(end) .fetch_all(&**client) .await?; @@ -413,14 +414,14 @@ pub async fn export_eeg_data_as_csv(client: &DbClient, session_id: i32, start_ti info!("Exporting EEG data for session id {} from {} to {}", session_id, start_time, end_time); // get the data from the database - let data = sqlx::query_as::<_, EegDataRow>( + let data = sqlx::query!( "SELECT time, channel1, channel2, channel3, channel4 FROM eeg_data WHERE session_id = $1 AND time >= $2 AND time <= $3 ORDER BY time ASC", + session_id, + start_time, + end_time ) - .bind(session_id) - .bind(start_time) - .bind(end_time) .fetch_all(&**client) .await?; @@ -462,12 +463,12 @@ pub async fn export_eeg_data_as_csv(client: &DbClient, session_id: i32, start_ti /// /// Returns the earliest timestamp on success. pub async fn get_earliest_eeg_timestamp(client: &DbClient, session_id: i32) -> Result>, Error> { - let earliest = sqlx::query_scalar::<_, Option>>( - "SELECT MIN(time) FROM eeg_data WHERE session_id = $1", + let row = sqlx::query!( + "SELECT MIN(time) as earliest_time FROM eeg_data WHERE session_id = $1", + session_id ) - .bind(session_id) .fetch_one(&**client) .await?; - Ok(earliest) + Ok(row.earliest_time) } \ No newline at end of file From 9b0b7bf9375ee09ed10f327f26b4312826ca9f6c Mon Sep 17 00:00:00 2001 From: itang06 Date: Wed, 4 Mar 2026 15:25:05 -0800 Subject: [PATCH 08/11] cleaned up shared_logic::db imports in main.rs --- backend/api-server/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/api-server/src/main.rs b/backend/api-server/src/main.rs index 8e0a6a0..c3ce45b 100644 --- a/backend/api-server/src/main.rs +++ b/backend/api-server/src/main.rs @@ -23,7 +23,7 @@ use axum::http::{HeaderMap, HeaderValue, header}; use axum::response::IntoResponse; // shared logic library -use shared_logic::db::{initialize_connection, DbClient}; +use shared_logic::db::{initialize_connection, DbClient, get_eeg_data_by_range, get_earliest_eeg_timestamp}; use shared_logic::models::{User, NewUser, UpdateUser, Session, FrontendState, NewTimeLabel, EegDataRow, EegDataQuery}; // Define application state @@ -238,7 +238,7 @@ async fn export_eeg_data( Some(t) => t, None => { // we call the helper function in db.rs to get the earliest timestamp - match shared_logic::db::get_earliest_eeg_timestamp(&app_state.db_client, session_id).await { + match get_earliest_eeg_timestamp(&app_state.db_client, session_id).await { Ok(Some(t)) => t, Ok(None) => return Err((StatusCode::NOT_FOUND, format!("No EEG data found for session {}", session_id))), Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get earliest EEG timestamp: {}", e))), @@ -311,7 +311,7 @@ async fn get_eeg_data( ) -> Result>, (StatusCode, String)> { info!("Received request to get EEG data for session {} from {} to {}", session_id, params.start, params.end); - match shared_logic::db::get_eeg_data_by_range(&app_state.db_client, session_id, params.start, params.end).await { + match get_eeg_data_by_range(&app_state.db_client, session_id, params.start, params.end).await { Ok(rows) => { info!("Retrieved {} EEG data rows", rows.len()); Ok(Json(rows)) From 6f3033baf0817b906778778f25504272eb3428a7 Mon Sep 17 00:00:00 2001 From: itang06 Date: Wed, 4 Mar 2026 15:30:43 -0800 Subject: [PATCH 09/11] added get_time_labels_by_range db function --- backend/api-server/src/main.rs | 4 ++-- backend/shared-logic/src/db.rs | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/backend/api-server/src/main.rs b/backend/api-server/src/main.rs index c3ce45b..723f76d 100644 --- a/backend/api-server/src/main.rs +++ b/backend/api-server/src/main.rs @@ -23,8 +23,8 @@ use axum::http::{HeaderMap, HeaderValue, header}; use axum::response::IntoResponse; // shared logic library -use shared_logic::db::{initialize_connection, DbClient, get_eeg_data_by_range, get_earliest_eeg_timestamp}; -use shared_logic::models::{User, NewUser, UpdateUser, Session, FrontendState, NewTimeLabel, EegDataRow, EegDataQuery}; +use shared_logic::db::{initialize_connection, DbClient, get_eeg_data_by_range, get_earliest_eeg_timestamp, get_time_labels_by_range}; +use shared_logic::models::{User, NewUser, UpdateUser, Session, FrontendState, TimeLabel, NewTimeLabel, EegDataRow, EegDataQuery}; // Define application state #[derive(Clone)] diff --git a/backend/shared-logic/src/db.rs b/backend/shared-logic/src/db.rs index 8d0bcd0..ae5af9e 100644 --- a/backend/shared-logic/src/db.rs +++ b/backend/shared-logic/src/db.rs @@ -407,6 +407,27 @@ pub async fn get_eeg_data_by_range(client: &DbClient, session_id: i32, start: Da Ok(data) } +/// Get time labels for a given session within a time range. +/// +/// Returns all rows from time_labels where session_id matches and timestamp is between +/// start and end (inclusive), ordered by timestamp. +pub async fn get_time_labels_by_range(client: &DbClient, session_id: i32, start: DateTime, end: DateTime) -> Result, Error> { + info!("Retrieving time labels for session {} from {} to {}", session_id, start, end); + + let labels = sqlx::query_as!( + TimeLabel, + "SELECT id, session_id, timestamp, label FROM time_labels WHERE session_id = $1 AND timestamp >= $2 AND timestamp <= $3 ORDER BY timestamp", + session_id, + start, + end, + ) + .fetch_all(&**client) + .await?; + + info!("Retrieved {} time labels.", labels.len()); + Ok(labels) +} + /// Export the EEG data for a given session ID and time range as a CSV string. /// /// Returns the CSV data on success. From 5cca58ad05612b74d68fcce1714807f94ac137f5 Mon Sep 17 00:00:00 2001 From: itang06 Date: Wed, 4 Mar 2026 15:34:48 -0800 Subject: [PATCH 10/11] added get endpoint for time labels by range and registered route --- backend/api-server/src/main.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/backend/api-server/src/main.rs b/backend/api-server/src/main.rs index 723f76d..bb9c27e 100644 --- a/backend/api-server/src/main.rs +++ b/backend/api-server/src/main.rs @@ -302,6 +302,27 @@ async fn store_time_labels( } } +// Handler for GET /api/sessions/{session_id}/time-label +// Returns time labels within a given time range (passed as ?start=...&end=... query params). +async fn get_time_labels( + State(app_state): State, + Path(session_id): Path, + Query(params): Query, +) -> Result>, (StatusCode, String)> { + info!("Received request to get time labels for session {} from {} to {}", session_id, params.start, params.end); + + match get_time_labels_by_range(&app_state.db_client, session_id, params.start, params.end).await { + Ok(labels) => { + info!("Retrieved {} time labels", labels.len()); + Ok(Json(labels)) + } + Err(e) => { + error!("Failed to get time labels: {}", e); + Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to get time labels: {}", e))) + } + } +} + // Handler for GET /api/sessions/{session_id}/eeg-data // Returns EEG data rows within a given time range (passed as ?start=...&end=... query params). async fn get_eeg_data( @@ -414,6 +435,7 @@ async fn main() { .route("/api/sessions/:session_id/frontend-state", get(get_frontend_state)) .route("/api/sessions/:session_id/time-label", post(store_time_labels)) + .route("/api/sessions/:session_id/time-label", get(get_time_labels)) .route("/api/sessions/:session_id/eeg-data", get(get_eeg_data)) .route("/api/sessions/:session_id/eeg_data/export", post(export_eeg_data)) From 8659f7a7313f7c912103be36d594b675865246d8 Mon Sep 17 00:00:00 2001 From: itang06 Date: Wed, 4 Mar 2026 18:17:06 -0800 Subject: [PATCH 11/11] fixed timelabel import --- backend/shared-logic/src/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/shared-logic/src/db.rs b/backend/shared-logic/src/db.rs index ae5af9e..285ecf8 100644 --- a/backend/shared-logic/src/db.rs +++ b/backend/shared-logic/src/db.rs @@ -7,7 +7,7 @@ use tokio::time::{self, Duration}; use log::{info, error, warn}; use chrono::{DateTime, Utc}; use dotenvy::dotenv; -use super::models::{User, NewUser, TimeSeriesData, UpdateUser, Session, FrontendState, NewTimeLabel, EegDataRow}; +use super::models::{User, NewUser, TimeSeriesData, UpdateUser, Session, FrontendState, TimeLabel, NewTimeLabel, EegDataRow}; use crate::{lsl::EEGDataPacket}; use once_cell::sync::OnceCell; use std::sync::Arc;