Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2024"

[dependencies]
log = "0.4.28"
axum = { version = "0.8.6", features = ["multipart"] }
axum = { version = "0.8.8", features = ["multipart"] }
tokio = {version = "1.48.0", features = ["full"]}
tower = "0.5.2"
config = "0.15.18"
Expand All @@ -15,19 +15,19 @@ futures = "0.3.31"
uuid = { version = "1.18.1", features = ["v4", "serde", "v7"] }
chrono = { version = "0.4.42", features = ["serde"] }
tower-http = { version = "0.6.6", features = ["cors", "trace"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
sqlx = {version = "0.8.6", features = ["runtime-tokio", "postgres", "chrono", "uuid", "macros"]}
dotenv = "0.15.0"
serde_json = "1.0.145"
serde_json = "1.0.149"
tokio-stream = { version = "0.1.17", features = ["sync"] }
rdkafka = { version = "0.38.0", features = ["cmake-build", "tokio"] }
minio = { version = "0.3.0", features = ["default"] }
image = { version = "0.25.8"}
bytes = "1.10.1"
base64 = "0.22.1"
validator = { version = "0.20.0", features = ["derive"] }
redis = { version = "1.0.0-rc.3", features = ["tokio-comp", "connection-manager"] }
redis = { version = "1.0.2", features = ["tokio-comp", "connection-manager"] }


#keycloak:
Expand Down
46 changes: 45 additions & 1 deletion src/broadcast/event_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct BroadcastChannel {
push_notification_producer: PushNotificationProducer
}


type UserConnectionMap = RwLock<HashMap<Uuid, Sender<Notification>>>;


Expand Down Expand Up @@ -144,5 +145,48 @@ impl BroadcastChannel {
}
}


}


#[cfg(test)]
mod tests {
use super::*;
use crate::cache::redis_cache::NoOpCache;
use crate::kafka::PushNotificationProducer;
use crate::core::KafkaConfig;
use crate::broadcast::Notification;
use crate::broadcast::NotificationEvent::UserReadChat;
use serde_json;
use std::sync::Arc;

#[tokio::test]
async fn send_event_to_subscribed_user_delivers_notification() {
// initialize broadcast channel singleton with NoOpCache and logger producer
let cache: Arc<dyn crate::cache::redis_cache::Cache> = Arc::new(NoOpCache);
let kafka_cfg = KafkaConfig { bootstrap_host: String::from(""), bootstrap_port: 0, topic: String::from(""), client_id: String::from(""), partition: vec![], consumer_group: String::from("") };
BroadcastChannel::init(cache, PushNotificationProducer::new(false, kafka_cfg)).await;

let bc = BroadcastChannel::get();

let user_id = uuid::Uuid::new_v4();
// subscribe
let mut rx = bc.subscribe_to_user_events(user_id).await;

let notification = Notification {
body: UserReadChat { user_id, room_id: uuid::Uuid::new_v4() },
created_at: chrono::Utc::now()
};

// send to all (only this user)
bc.send_event_to_all(vec![user_id], notification.clone()).await;

// receive
let received = rx.recv().await.expect("Should receive notification");

let sent_json = serde_json::to_string(&notification).expect("serialize sent");
let recv_json = serde_json::to_string(&received).expect("serialize recv");
println!("Sent: {}", sent_json);
println!("Received: {}", recv_json);
assert_eq!(sent_json, recv_json);
}
}
8 changes: 7 additions & 1 deletion src/broadcast/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ pub enum NotificationEvent {
* Sending this event to all users in a room where a member has left
*/
#[serde(rename_all = "camelCase")]
RoomChangeEvent {message: MessageDTO, room_preview_text: LastMessagePreviewText}
RoomChangeEvent {message: MessageDTO, room_preview_text: LastMessagePreviewText},

/**
* Sending this event to all users in a room when a user has read the latest message
*/
#[serde(rename_all = "camelCase")]
UserReadChat {user_id: Uuid, room_id: Uuid}
}


Expand Down
1 change: 1 addition & 0 deletions src/keycloak/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl<I: ActionInput, O: ActionOutput> Action<I, O> {
}

#[cfg(test)]
#[allow(unused)]
mod test {
use assertr::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ism::welcome::welcome;
//learn to code rust axum here:
//https://gitlab.com/famedly/conduit/-/tree/next?ref_type=heads
//https://github.com/AarambhDevHub/rust-backend-axum
//https://github.com/rust-lang/crates.io/
//https://github.com/rust-lang/crates.io/ <---- THE BEST!
#[tokio::main(flavor = "multi_thread")]
async fn main() {

Expand Down
10 changes: 10 additions & 0 deletions src/rooms/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,14 @@ pub async fn handle_save_room_image(
} else {
Err(AppError::ValidationError("Required field 'image' not found in the upload.".to_string()))
}
}

pub async fn handle_get_read_states(
Extension(token): Extension<KeycloakToken<String>>,
State(state): State<Arc<AppState>>,
Path(room_id): Path<Uuid>
) -> Result<Json<Vec<RoomMember>>, AppError> {
check_user_in_room(&state, &token.subject, &room_id).await?;
let read_states = RoomService::get_read_states(state, room_id).await?;
Ok(Json(read_states))
}
98 changes: 97 additions & 1 deletion src/rooms/room_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::Utc;
use log::{error};
use uuid::Uuid;
use crate::broadcast::{BroadcastChannel, Notification};
use crate::broadcast::NotificationEvent::{LeaveRoom, RoomChangeEvent};
use crate::broadcast::NotificationEvent::{LeaveRoom, RoomChangeEvent, UserReadChat};
use crate::core::AppState;
use crate::errors::{AppError};
use crate::messaging::model::{Message, MessageBody, RoomChangeBody};
Expand Down Expand Up @@ -44,9 +44,45 @@ impl RoomService {
pub async fn mark_room_as_read(state: Arc<AppState>, client_id: Uuid, room_id: Uuid) -> Result<(), AppError> {
let pl = state.room_repository.get_connection();
state.room_repository.update_user_read_status(pl, &room_id, &client_id).await?;

let room = state.room_repository.select_room(&room_id).await?;
if let Some(latest_msg_time) = room.latest_message {
let user = state.room_repository.select_joined_user_by_id(&room_id, &client_id).await?;
if let Some(read_time) = user.last_message_read_at {
if read_time >= latest_msg_time {
let users_in_room = state.room_repository.select_room_participants_ids(&room_id).await?;
BroadcastChannel::get().send_event_to_all(
users_in_room,
Notification {
body: UserReadChat { user_id: client_id, room_id },
created_at: Utc::now()
}
).await;
}
}
} else {
let users_in_room = state.room_repository.select_room_participants_ids(&room_id).await?;
BroadcastChannel::get().send_event_to_all(
users_in_room,
Notification {
body: UserReadChat { user_id: client_id, room_id },
created_at: Utc::now()
}
).await;
}

Ok(())
}

pub async fn get_read_states(state: Arc<AppState>, room_id: Uuid) -> Result<Vec<RoomMember>, AppError> {
let users = state.room_repository.select_joined_user_in_room(&room_id).await?;
let room = state.room_repository.select_room(&room_id).await?;
let read_users: Vec<RoomMember> = users.into_iter().filter(|user| {
user_has_read(user, room.latest_message)
}).collect();
Ok(read_users)
}

pub async fn create_room(state: Arc<AppState>, client_id: Uuid, new_room: NewRoom) -> Result<ChatRoomDto, AppError> {
let room_entity = state.room_repository.insert_room(new_room.clone()).await?;
let users = new_room.invited_users;
Expand Down Expand Up @@ -204,6 +240,16 @@ impl RoomService {

}

// Helper used by `get_read_states` — extracted for easier unit testing of the read logic.
fn user_has_read(user: &RoomMember, room_latest: Option<chrono::DateTime<chrono::Utc>>) -> bool {
match (room_latest, user.last_message_read_at) {
(Some(latest_msg_time), Some(read_time)) => read_time >= latest_msg_time,
(Some(_), None) => false,
(None, _) => true,
}
}


async fn handle_leave_private_room(state: Arc<AppState>, room: ChatRoomEntity, users: Vec<RoomMember>) -> Result<(), AppError> {
let mut tx = state.room_repository.start_transaction().await?;
state.room_repository.delete_room(&mut *tx, &room.id).await?;
Expand Down Expand Up @@ -294,4 +340,54 @@ async fn save_room_change_message_and_broadcast(message: Message, state: &Arc<Ap

BroadcastChannel::get().send_event_to_all(to_users, notification).await;
Ok(())
}


#[cfg(test)]
mod tests {
use super::*;
use chrono::{Utc, Duration};
use uuid::Uuid;
use crate::model::room_member::{RoomMember, MembershipStatus};

fn make_member(read_at: Option<chrono::DateTime<Utc>>) -> RoomMember {
RoomMember {
id: Uuid::new_v4(),
display_name: "test".to_string(),
profile_picture: None,
joined_at: Utc::now(),
last_message_read_at: read_at,
membership_status: MembershipStatus::Joined
}
}

#[test]
fn user_has_read_when_no_latest_message() {
let user = make_member(None);
let result = user_has_read(&user, None);
assert!(result, "When room has no latest message, every user should be considered read");
}

#[test]
fn user_has_read_when_read_time_ge_latest() {
let latest = Utc::now();
let read_time = latest + Duration::seconds(1);
let user = make_member(Some(read_time));
assert!(user_has_read(&user, Some(latest)));
}

#[test]
fn user_has_not_read_when_read_time_before_latest() {
let latest = Utc::now();
let read_time = latest - Duration::seconds(10);
let user = make_member(Some(read_time));
assert!(!user_has_read(&user, Some(latest)));
}

#[test]
fn user_has_not_read_when_no_read_time_and_latest_present() {
let latest = Utc::now();
let user = make_member(None);
assert!(!user_has_read(&user, Some(latest)));
}
}
Loading