Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ out/
### VS Code ###
.vscode/
/.env

## Agents
AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface MeetingRepository extends JpaRepository<Meeting, Long> {
SELECT m
FROM Meeting m
LEFT JOIN FETCH m.meetingAnalysis
WHERE m.team.id = :teamId AND m.isNormallyEnded IS TRUE
WHERE m.team.id = :teamId
ORDER BY m.startDateTime DESC
""")
List<Meeting> findWithAnalysis(@Param("teamId") Long teamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,12 @@ public void autoEndMeetingIfEmpty(Long meetingId) {

@Transactional
public MeetingResponse.MeetingDeleteResponseDTO deleteMeeting(Long memberId, Long meetingId) {

meetingRepository.findById(meetingId)
Meeting meeting = meetingRepository.findById(meetingId)
.orElseThrow(MeetingNotFoundException::new);

meetingMemberRepository.findOwnerMeetingMember(memberId, meetingId, MeetingRole.OWNER)
.orElseThrow(() -> new ErrorHandler(MeetingErrorCode.MEETING_NOT_OWNER));

Meeting meeting = meetingRepository.findById(meetingId)
.orElseThrow(MeetingNotFoundException::new);
stopRecording(meeting);
meetingCleanupService.deleteByMeetingId(meetingId);
scheduleAfterCommit(() -> meetingSocketRoomService.closeRoom(meetingId));
Expand Down Expand Up @@ -182,16 +179,20 @@ private MeetingResponse.MeetingEndResponseDTO finishMeeting(Meeting meeting, boo
stopRecording(meeting);
meetingSocketRoomService.closeRoom(meeting.getId());

scheduleAfterCommit(() -> CompletableFuture.runAsync(() -> meetingAnalysisService.analyzeMeetingAudio(meeting.getId()))
.exceptionally(ex -> {
log.error("회의 오디오 분석 실패: meetingId={}", meeting.getId(), ex);
return null;
}));
scheduleAfterCommit(() -> analyzeMeetingAudioAsync(meeting.getId()));

return MeetingResponse.MeetingEndResponseDTO.builder()
.meetingId(meeting.getId())
.endDateTime(endDateTime)
.build();
}

private void analyzeMeetingAudioAsync(Long meetingId) {
CompletableFuture.runAsync(() -> meetingAnalysisService.analyzeMeetingAudio(meetingId))
.exceptionally(ex -> {
log.error("회의 오디오 분석 실패: meetingId={}", meetingId, ex);
return null;
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ public class MeetingUseCase implements MeetingSpeakerResolver {

private final MeetingRepository meetingRepository;

public Meeting findMeetingById(Long id){
public Meeting findMeetingById(Long id) {
return meetingRepository.findById(id)
.orElseThrow(MeetingNotFoundException::new);
}

public Meeting findMeetingWithMembersById(Long id){
public Meeting findMeetingWithMembersById(Long id) {
return meetingRepository.findWithMembers(id)
.orElseThrow(MeetingNotFoundException::new);
}

public List<Meeting> findMeetingByTeamId(Long teamId){
public List<Meeting> findMeetingByTeamId(Long teamId) {
return meetingRepository.findWithAnalysis(teamId);
}

// 회의 참여자 수
public int getMeetingMemberCount(Meeting meeting){
public int getMeetingMemberCount(Meeting meeting) {
return meeting.getMeetingMembers().size();
}

// 회의의 참여자 정보
public List<Member> getParticipantsInfo(Meeting meeting){
public List<Member> getParticipantsInfo(Meeting meeting) {
return meeting.getMeetingMembers().stream()
.map(MeetingMember::getMember)
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,22 @@ public void afterConnectionEstablished(@NonNull WebSocketSession session) throws
}

if (meetingSocketRoomService.existsParticipant(participant.meetingId(), participant.memberId())) {
sendError(session, MeetingMessageType.PARTICIPANT_ALREADY_JOINED, "이미 실시간으로 참여 중인 회의입니다.");
session.close(CloseStatus.NORMAL);
return;
sendError(session, MeetingMessageType.PARTICIPANT_ALREADY_JOINED, "이미 실시간으로 참여 중인 회의입니다.");
session.close(CloseStatus.NORMAL);
return;
}
meetingSocketRoomService.join(participant);

List<ParticipantSummary> currentParticipants = participantSummaries(participant.meetingId());
ConnectedMessage connectedMessage = ConnectedMessage.create(
participant, participantSummaries(participant.meetingId())
participant, currentParticipants
);
session.sendMessage(new TextMessage(JsonConverter.toJson(connectedMessage)));

broadcastRoster(participant.meetingId());
broadcastRoster(participant.meetingId(), currentParticipants);
meetingSocketRoomService.broadcastText(
participant.meetingId(),
JsonConverter.toJson(ParticipantJoinedMessage.create(participant))
new TextMessage(JsonConverter.toJson(ParticipantJoinedMessage.create(participant)))
);
}

Expand All @@ -68,7 +69,8 @@ protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull Tex
try {
incoming = JsonConverter.readValue(message, MeetingSocketMessage.class);
} catch (JsonProcessingException exception) {
throw new IllegalArgumentException("Invalid websocket message payload", exception);
sendError(session, "Invalid websocket message payload");
return;
}

MeetingMessageType type = incoming.type();
Expand All @@ -78,63 +80,8 @@ protected void handleTextMessage(@NonNull WebSocketSession session, @NonNull Tex
}

switch (type) {
case CHAT -> {
logIncomingText(participant, type, incoming);
meetingSocketRoomService.broadcastChatText(
participant.meetingId(),
JsonConverter.toJson(MeetingTextMessage.createTextMessage(
participant,
type,
null,
Optional.ofNullable(incoming.text()).orElse(""),
incoming.payload()
))
);
}
case SPEECH -> {
logIncomingText(participant, type, incoming);
meetingSocketRoomService.broadcastSpeechText(
participant.meetingId(),
JsonConverter.toJson(MeetingTextMessage.createTextMessage(
participant,
type,
null,
Optional.ofNullable(incoming.text()).orElse(""),
incoming.payload()
))
);
}
case AUDIO_TEXT -> {
logIncomingText(participant, type, incoming);
meetingSocketRoomService.broadcastAudioText(
participant.meetingId(),
JsonConverter.toJson(MeetingTextMessage.createTextMessage(
participant,
type,
null,
Optional.ofNullable(incoming.text()).orElse(""),
incoming.payload()
))
);
}
case OFFER, ANSWER, ICE -> {
if (incoming.targetMemberId() == null) {
sendError(session, "targetMemberId is required for " + type.value());
return;
}

meetingSocketRoomService.sendToMember(
participant.meetingId(),
incoming.targetMemberId(),
JsonConverter.toJson(MeetingTextMessage.createTextMessage(
participant,
type,
incoming.targetMemberId(),
null,
incoming.payload()
))
);
}
case CHAT, SPEECH, AUDIO_TEXT -> broadcastTextMessage(participant, type, incoming);
case OFFER, ANSWER, ICE -> forwardSignal(session, participant, type, incoming);
default -> sendError(session, "Unsupported message type: " + type.value());
}
}
Expand Down Expand Up @@ -172,25 +119,26 @@ private void removeParticipant(WebSocketSession session) {
return;
}

meetingSocketRoomService.broadcastText(meetingId, JsonConverter.toJson(new ParticipantLeftMessage(
List<ParticipantSummary> currentParticipants = participantSummaries(meetingId);
meetingSocketRoomService.broadcastText(meetingId, new TextMessage(JsonConverter.toJson(new ParticipantLeftMessage(
MeetingMessageType.PARTICIPANT_LEFT,
meetingId,
removed.memberId(),
removed.name(),
now()
)));
broadcastRoster(meetingId);
))));
broadcastRoster(meetingId, currentParticipants);

if (meetingSocketRoomService.listParticipants(meetingId).isEmpty()) {
if (currentParticipants.isEmpty()) {
meetingCommandService.autoEndMeetingIfEmpty(meetingId);
}
}

// 현재 회의 참가자 목록을 모든 클라이언트에 전파합니다.
private void broadcastRoster(Long meetingId) {
private void broadcastRoster(Long meetingId, List<ParticipantSummary> participantSummaries) {
meetingSocketRoomService.broadcastText(
meetingId,
JsonConverter.toJson(RosterMessage.create(meetingId, participantSummaries(meetingId)))
new TextMessage(JsonConverter.toJson(RosterMessage.create(meetingId, participantSummaries)))
);
}

Expand Down Expand Up @@ -242,8 +190,46 @@ private List<ParticipantSummary> participantSummaries(Long meetingId) {
return meetingSocketRoomService.listParticipants(meetingId);
}

private void broadcastTextMessage(MeetingParticipant participant, MeetingMessageType type, MeetingSocketMessage incoming) {
logIncomingText(participant, type, incoming);
meetingSocketRoomService.broadcastText(
participant.meetingId(),
JsonConverter.toJson(MeetingTextMessage.createTextMessage(
participant,
type,
null,
Optional.ofNullable(incoming.text()).orElse(""),
incoming.payload()
))
);
}

private void forwardSignal(
WebSocketSession session,
MeetingParticipant participant,
MeetingMessageType type,
MeetingSocketMessage incoming
) {
if (incoming.targetMemberId() == null) {
sendError(session, "targetMemberId is required for " + type.value());
return;
}

meetingSocketRoomService.sendToMember(
participant.meetingId(),
incoming.targetMemberId(),
JsonConverter.toJson(MeetingTextMessage.createTextMessage(
participant,
type,
incoming.targetMemberId(),
null,
incoming.payload()
))
);
}

// 웹소켓 메시지에 사용할 현재 시각 문자열을 생성합니다.
private String now() {
private String now() {
return Instant.now().toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.Function;

// 회의별 참가자 세션 저장소 역할을 하며 텍스트/오디오 메시지 전달을 담당합니다.
@Service
Expand Down Expand Up @@ -99,34 +98,12 @@ public List<ParticipantSummary> listParticipants(Long meetingId) {

// 텍스트 메시지를 회의방의 모든 참가자에게 브로드캐스트합니다.
public void broadcastText(Long meetingId, String payload) {
dispatch(() -> broadcast(
meetingId,
participant -> new TextMessage(payload),
participant -> false));
}

// 채팅 메시지를 별도 경로로 브로드캐스트합니다.
public void broadcastChatText(Long meetingId, String payload) {
dispatch(() -> broadcast(
meetingId,
participant -> new TextMessage(payload),
participant -> false));
broadcastText(meetingId, new TextMessage(payload));
}

// 실시간 자막/STT 메시지를 채팅과 분리해 브로드캐스트합니다.
public void broadcastSpeechText(Long meetingId, String payload) {
dispatch(() -> broadcast(
meetingId,
participant -> new TextMessage(payload),
participant -> false));
}

// 오디오 텍스트 변환 결과를 별도 경로로 브로드캐스트합니다.
public void broadcastAudioText(Long meetingId, String payload) {
dispatch(() -> broadcast(
meetingId,
participant -> new TextMessage(payload),
participant -> false));
// 이미 직렬화된 텍스트 메시지를 회의방의 모든 참가자에게 브로드캐스트합니다.
public void broadcastText(Long meetingId, TextMessage message) {
dispatch(() -> broadcast(meetingId, message));
}

// 특정 대상 참가자 한 명에게만 시그널링 메시지를 전달합니다.
Expand Down Expand Up @@ -178,11 +155,7 @@ private void sendToMemberInternal(Long meetingId, Long targetMemberId, String pa
}

// 회의방 참가자 전체를 순회하면서 메시지를 보내고 끊어진 세션은 정리합니다.
private void broadcast(
Long meetingId,
Function<MeetingParticipant, WebSocketMessage<?>> messageFactory,
Function<MeetingParticipant, Boolean> skipCondition
) {
private void broadcast(Long meetingId, WebSocketMessage<?> message) {

MeetingRoomRepository room = getRoom(meetingId);
if (room == null) {
Expand All @@ -191,11 +164,7 @@ private void broadcast(

List<MeetingParticipant> disconnectedParticipants = new ArrayList<>();
for (MeetingParticipant participant : new ArrayList<>(room.participants())) {
if (skipCondition.apply(participant)) {
continue;
}

if (!sendMessage(participant, messageFactory.apply(participant))) {
if (!sendMessage(participant, message)) {
disconnectedParticipants.add(participant);
}
}
Expand Down
Loading