From 5728da380fff72b12dc5d12333b345d2810adaea Mon Sep 17 00:00:00 2001 From: mugunthan-anbalagan Date: Fri, 26 Apr 2024 15:40:24 +0530 Subject: [PATCH] Handled maintaining active users using Redis cache --- .../tomcat/controller/DocumentEditorHub.java | 153 +++++++++++++----- 1 file changed, 115 insertions(+), 38 deletions(-) diff --git a/Java/Java web service/src/main/java/com/syncfusion/tomcat/controller/DocumentEditorHub.java b/Java/Java web service/src/main/java/com/syncfusion/tomcat/controller/DocumentEditorHub.java index 5cbee69..6088aba 100644 --- a/Java/Java web service/src/main/java/com/syncfusion/tomcat/controller/DocumentEditorHub.java +++ b/Java/Java web service/src/main/java/com/syncfusion/tomcat/controller/DocumentEditorHub.java @@ -26,9 +26,7 @@ import com.syncfusion.ej2.wordprocessor.ActionInfo; @Controller -public class DocumentEditorHub { - public static HashMap userActions = new HashMap(); - public static HashMap actions = new HashMap(); +public class DocumentEditorHub { public static final HashMap> roomList = new HashMap<>(); public static SimpMessagingTemplate messagingTemplate; private static final int MAX_RETRIES = 5; @@ -56,58 +54,137 @@ public DocumentEditorHub(SimpMessagingTemplate messagingTemplate) { @MessageMapping("/join/{documentName}") public void joinGroup(ActionInfo info, SimpMessageHeaderAccessor headerAccessor, - @DestinationVariable String documentName) { - + @DestinationVariable String documentName) throws JsonProcessingException { + // To get the connection Id String connectionId = headerAccessor.getSessionId(); - info.setConnectionId(connectionId); String docName = info.getRoomName(); - // info.setAction("connectionId"); HashMap additionalHeaders = new HashMap<>(); additionalHeaders.put("action", "connectionId"); MessageHeaders headers = new MessageHeaders(additionalHeaders); + // send the conection Id to the client broadcastToRoom(docName, info, headers); - - if (!actions.containsKey(connectionId)) { - actions.put(connectionId, info); + try (Jedis jedis = jedisPool.getResource()) { + // to maintain the session id with its corresponding ActionInfo details. + jedis.hset("documentMap", connectionId, documentName); + // add the user details to the Redis cache + jedis.sadd(docName, new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(info)); + // Subscribe to the room, so that all users can get the JOIN/LEAVE notification + joinLeaveUsersubscribe(docName); + // publish the user list to the redis + jedis.publish(docName, "JOIN|" + connectionId); + + } catch (JedisConnectionException e) { + System.out.println(e); } - ArrayList actionsList = roomList.computeIfAbsent(documentName, k -> new ArrayList<>()); - // Add the new user info to the list - actionsList.add(info); - HashMap addUser = new HashMap<>(); - addUser.put("action", "addUser"); - MessageHeaders addUserheaders = new MessageHeaders(addUser); - broadcastToRoom(docName, actionsList, addUserheaders); } @EventListener public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) throws Exception { String sessionId = event.getSessionId(); - HashMap userDetails = DocumentEditorHub.actions; - if (userDetails.containsKey(sessionId)) { - ActionInfo info = userDetails.get(sessionId); - - String docName = info.getRoomName(); - HashMap removeUser = new HashMap<>(); - removeUser.put("action", "removeUser"); - MessageHeaders removeUserheaders = new MessageHeaders(removeUser); - broadcastToRoom(docName, info, removeUserheaders); - userDetails.remove(sessionId); - ArrayList actionsList = roomList.computeIfAbsent(info.getRoomName(), k -> new ArrayList<>()); - for (ActionInfo action : actionsList) { + try (Jedis jedis = jedisPool.getResource()) { + //to get the user details of the provided sessionId + String docName = jedis.hget("documentMap", sessionId); + // Publish a message indicating the user's departure from the group + jedis.publish(docName, "LEAVE|" + sessionId); + } catch (JedisConnectionException e) { + System.out.println(e); + } + } - if (action.getConnectionId() == sessionId) { - actionsList.remove(action); - break; - } + private void joinLeaveUsersubscribe(String docName) { + new Thread(() -> { + try (Jedis jedis = jedisPool.getResource()) { + jedis.subscribe(new JedisPubSub() { + @Override + public void onMessage(String channel, String message) { + String[] parts = message.split("\\|"); + if (parts.length == 2) { + String eventType = parts[0]; + String sessionId = parts[1]; + notifyUsers(channel, eventType, sessionId); + } + } + }, docName); + } catch (JedisConnectionException e) { + System.out.println(e); } - if (userDetails.isEmpty()) { - Connection connection = DriverManager.getConnection(datasourceUrl, datasourceUsername, - datasourcePassword); - CollaborativeEditingController.updateOperationsToSourceDocument(docName, false, 0, connection, - datasourceAccessKey, datasourceSecretKey, datasourceBucketName); + }).start(); + } + + public void notifyUsers(String docName, String eventType, String sessionId) { + try (Jedis jedis = jedisPool.getResource()) { + if ("JOIN".equals(eventType)) { + HashMap addUser = new HashMap<>(); + addUser.put("action", "addUser"); + MessageHeaders addUserheaders = new MessageHeaders(addUser); + // get the list of users from Redis + Set userJsonStrings = jedis.smembers(docName); + System.out.println("userJsonStrings to join" + userJsonStrings); + ArrayList actionsList = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + for (String userJson : userJsonStrings) { + try { + ActionInfo actionInfo = mapper.readValue(userJson, ActionInfo.class); + actionsList.add(actionInfo); + } catch (Exception e) { + System.err.println("Error parsing user information JSON: " + e.getMessage()); + } + } + //Boradcast the user list to all the users connected in that room + broadcastToRoom(docName, actionsList, addUserheaders); + } else if ("LEAVE".equals(eventType)) { + // get the user list from the redis + Set userJsonStrings = jedis.smembers(docName); + System.out.println("userJsonStrings to leave" + userJsonStrings); + if (!userJsonStrings.isEmpty()) { + ObjectMapper mapper = new ObjectMapper(); + for (String userJson : userJsonStrings) { + ActionInfo action = null; + try { + action = mapper.readValue(userJson, ActionInfo.class); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + if (action.getConnectionId().equals(sessionId)) { + // Remove the user from the user list + jedis.srem(docName, userJson); + HashMap removeUser = new HashMap<>(); + removeUser.put("action", "removeUser"); + MessageHeaders removeUserheaders = new MessageHeaders(removeUser); + // Broadcast the removal notification to all users in the document + broadcastToRoom(docName, action, removeUserheaders); + // Remove the session ID from the session-document mapping + jedis.hdel("documentMap", sessionId); + break; + } + } + } else { + System.out.println("No users found in the document."); + } + if (userJsonStrings.isEmpty()) { + Connection connection = null; + try { + connection = DriverManager.getConnection(datasourceUrl, datasourceUsername, datasourcePassword); + } catch (SQLException e1) { + e1.printStackTrace(); + } + try { + TomcatApplication.updateOperationsToSourceDocument(docName, + "doc_18d1e2a949604a1f8430710ae19aa354", false, 0, connection); + } catch (Exception e) { + e.printStackTrace(); + } + jedis.del(docName); + } } + + } catch (JedisConnectionException e) { + System.out.println(e); } + } public static void broadcastToRoom(String roomName, Object payload, MessageHeaders headers) {