[최준영] sprint12#126
Hidden character warning
Conversation
Highjune
left a comment
There was a problem hiding this comment.
준영님 수고하셧어요! 분산 구현 관련해서 조금 미흡한 부분이 있는데 그것만 해결해봅시다!
자 준영님은 아래와 같아요.
새 메시지 -> kafka 발행
- 고정 groupId → App-1만 수신 하게 되죠?
- App-1이 알림 DB 저장
- App-1이 NotificationCreatedEvent 발행 (로컬에만!)
- App-1의 SseService.send() → App-1에 연결된 유저만 SSE 받음.
- App-2, App-3에 연결된 유저는 실시간 알림 못 받음 ✗
즉 알림에 대한 것을 다른 인스턴스에 전파해줄 필요가 있어요.
그래서 아래와 같이 중간에 추가해줘야 합니다.
새 메시지 → Kafka "discodeit.MessageCreatedEvent"
- 고정 groupId → App-1만 수신
- App-1이 알림 DB 저장
- App-1이 다시 Kafka "discodeit.SseBroadcast" 토픽에 발행 ← 여기가 핵심이에요! 2번째 발행하는거죠
- 고유 groupId → App-1, App-2, App-3 모두 수신
- 각 인스턴스가 자기에게 연결된 SSE 클라이언트에게 전송
- 모든 유저가 실시간 알림 받게 되어요
즉 카프카를 2번 거쳐요.
1차: 고정 groupId -> 1개 인스턴스만 db 저장(중복 방지)
2차: 고유 groupId(각각 달라요) -> 모든 인스턴스가 수신 -> 자기 클라이언트에게 sse 전송입니다.
소켓 메시지랑, sse 관련쪽은 카프카 (사실 레디스 써도 됩니다) 사용해서 해결하시는 것이 나쁘지 않을 것 같습니다. 그리고 jwt 도 현재는 인메모리잖아요? 분산에서는 공유가 되어야 하는데 현재는 인메모리라 인스턴스별 독립이에요. 이 부분은 레디스를 추천드려요
마지막 스프린트로 알고 있는데, 되게 중요한 점이라서 어떻게든 이해 & 추가 구현하셔야 합니다.
그리고 가볍게나마 아키텍쳐 그려보는 것 추천드려요. 문제를 더 정확하게 알 수 있습니다.
질문 몇 가지만 남겨놓을게요~!
카프카가 현재 2가지 역할 하잖아요?
역할 A: 알림 저장 등 1번만 처리 (고정 groupId)
역할 B: SSE 이벤트를 모든 인스턴스에 브로드캐스트 (고유 groupId)
Q1) 위에서 redis pub/sub 으로 대체할 수 있는 영역이랑 없는 영역은? 바꿀 수 있다면 장/단점은?
현재 SseMessageRepository는 JVM 메모리(ConcurrentHashMap)에 이벤트를 저장하고 있잖아요.
Q2) 분산 환경(인스턴스 3개)에서 이 구현이 실패하는 시나리오를 설명하고, Redis로 어떻게 개선할 수 있는지 한번 고민해봐주세요~!
| @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) | ||
| public void handleMessage(MessageCreatedEvent event) { | ||
| MessageDto messageDto = messageService.findMessage(event.messageId()); | ||
| messagingTemplate.convertAndSend( |
There was a problem hiding this comment.
준영님, 대부분의 실시간 이벤트가 로컬 전송이라 인스턴스가 여러개일 떄 다른 인스턴스의 클리어은트는 이벤트를 못 받는 구조인것 같은데요?
TransactionalEventListener 는 같은 jvm 내에서만 동작하는 거잖아요.
인스턴스 3개 띄우면
유저 A -> app1 에서만 채널 생성되고 app1 의 sse/소켓 클라이언트만 이벤트 받고, app2, app3 클라이언트는 아무것도 못 받지 않나요.
중간에 다른 매개체가 필요한 것 같아요. kafka 같은 것 통해서 모든 인스턴스에 브로드캐스트하는 방식으로 해결해야 할 것 같아요.
현재 kafka 는 그냥 알림 저장 용도로만 쓰고 있고, 실시간 이벤트 분배에은 사용되지 않는 것 같은데요!?
|
|
||
| @Override | ||
| public void registerStompEndpoints(StompEndpointRegistry registry) { | ||
| registry.addEndpoint("/ws").withSockJS(); |
There was a problem hiding this comment.
채널인터셉터가 없는 것 같은데요? 이러면 누구나 웹소켓에 연결해서 모든 채널의 메시지 구독할 수 잇어요. 심화 요구사항인 "WebSocket 인증/인가 처리" 가 빠져잇는 것 같습니다.
|
|
||
| private final ApplicationEventPublisher eventPublisher; | ||
|
|
||
| private final Map<UUID, Queue<JwtInformation>> origin = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
분산 환경에서 동작이 불가해요. jwt 레지스터리가 메모리 기반이잖아요?
인스턴스 3개 띄우면, app1 에서 로그인 -> app1 메모리에만 jwt 저장되어 있어서,
만약 app2 로 요청이 라우팅되면 -> app2 메모리에는 업으니까 해당 jwt 없음 -> 인증 실패
이렇게 될 것 같아요. 레디스로 구혀낳는 것 어떠실까요?
| # ─── Backend ───────────────────────────────────────────────────────────────── | ||
| app: | ||
| build: . | ||
| container_name: discodeit-app |
There was a problem hiding this comment.
컨테이너명 이렇게 명시하면 레플리카 사용이 되나요?
단일 인스턴스만 지정하는 것으로 되어있고, replicas 설정이 없어 보이는데요?
| private final SseMessageRepository messageRepository; | ||
|
|
||
| public SseEmitter connect(UUID receiverId, UUID lastEventId) { | ||
| SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); |
There was a problem hiding this comment.
Long.Max_value -> 타임아웃 무한아닌가요. 클라이언트가 비정상 종료(모바일 네트워크 전환, 브라우저 강제 종료)하면 서버가 감지 못할 수도 있거든요. 30분 cleanup까지 좀비 emitter가 메모리에 남을 수 있어요.
5분 정도로 설정하고 클라이언트가 자동 재연결하도록 하는 것이 더 일반적이에요
| @Repository | ||
| public class SseMessageRepository { | ||
|
|
||
| private static final int MAX_SIZE = 1000; |
There was a problem hiding this comment.
맥스 사이즈 설정 아주 잘하셧어요. oom 방지로 잘했습니다.
| origin.entrySet().removeIf(entry -> entry.getValue().isEmpty()); | ||
|
|
||
| goingOffline.forEach(dto -> { | ||
| UserDto offlineDto = new UserDto(dto.id(), dto.username(), dto.email(), dto.profile(), false, dto.role()); |
There was a problem hiding this comment.
UserDto 를 직접 생성하고 있잖아요? false 만 추가한채로요. JwtLogoutHandler 에서도 마찬가지인것 같아요.
DTO 필드가 추가되면 여기도 수정이 필요하게 되잖아요. UserDto에 withOnline(false) 같은 메서드를 만들 거나 별도의 UserPresenceDto(userId, online) 같은 좀 더 가벼운 이벤트를 만드는 것이 더 좋을 것 같아요
| gzip on; | ||
| gzip_types text/plain text/css application/json application/javascript text/xml application/xml text/javascript; | ||
|
|
||
| upstream backend { |
There was a problem hiding this comment.
지금 app -> 1개잖아요?
만약에 docker compose 에서 app 을 Replicas 로 나중에 3개로 띄우면
nginx -> app:80 -> docker 내부 dns 가 3개 중 하나를 골라줘요(켠테이너1:80, 컨테이너2:80, 컨테이너3:80) .
Docker DNS 라운드로빈: app이라는 이름으로 요청하면, Docker가 알아서 3개 컨테이너 중 하나를 돌아가면서 연결해줌. nginx 설정을 안 바꿔도 "어느 정도" 동작은 하게 될거에요.
근데 왜 "투명하지 않다"고 했냐면
문제: nginx 입장에서는 app이 1개인지 3개인지 몰라요
- 로그에서 어떤 인스턴스로 갔는지 안 보임
- 트러블슈팅할 때 어디서 문제인지 추적 어려워요
upstream backend {
server app-1:80;
server app-2:80;
server app-3:80;
}
이런식으로 명시해주면, nginx가 직접 분배하고, 어디로 갔는지 로그에 남고, 전략도 지정 가능하거든어ㅛ.
지금 구조에서도 추후에 replicas 쓰면 Docker DNS가 알아서 해주긴 하지만, 운영 관점에서 덜 투명하다는 의미입니다. 지금은 사실 replicas도 안 쓰고 단일 인스턴스에요
| log.info("[KAFKA_CONSUMER] RoleUpdatedEvent 처리 완료 userId={}", event.userId()); | ||
| } catch (JsonProcessingException e) { | ||
| throw new RuntimeException(e); | ||
| log.error("[KAFKA_CONSUMER] RoleUpdatedEvent 역직렬화 실패 - 메시지 건너뜀", e); |
요구사항
기본
심화
스크린샷
멘토에게