1+ package io .github .timemachinelab .core .session .application ;
2+
3+ import io .github .timemachinelab .core .qatree .QaTree ;
4+ import io .github .timemachinelab .core .qatree .QaTreeDomain ;
5+ import io .github .timemachinelab .core .session .domain .entity .ConversationSession ;
6+ import io .github .timemachinelab .core .session .infrastructure .ai .QuestionGenerationOperation ;
7+ import lombok .extern .slf4j .Slf4j ;
8+ import org .springframework .stereotype .Service ;
9+ import org .springframework .web .servlet .mvc .method .annotation .SseEmitter ;
10+
11+ import javax .annotation .Resource ;
12+ import java .io .IOException ;
13+ import java .util .HashMap ;
14+ import java .util .Map ;
15+ import java .util .concurrent .ConcurrentHashMap ;
16+
17+ /**
18+ * SSE通知服务
19+ * 负责管理SSE连接和发送消息给客户端
20+ *
21+ * @author suifeng
22+ * 日期: 2025/1/27
23+ */
24+ @ Service
25+ @ Slf4j
26+ public class SseNotificationService {
27+
28+ @ Resource
29+ private SessionManagementService sessionManagementService ;
30+ @ Resource
31+ private QaTreeDomain qaTreeDomain ;
32+
33+ // SSE连接管理
34+ private final Map <String , SseEmitter > sseEmitters = new ConcurrentHashMap <>();
35+
36+ /**
37+ * 注册SSE连接
38+ *
39+ * @param sessionId 会话ID
40+ * @param emitter SSE发射器
41+ */
42+ public void registerSseConnection (String sessionId , SseEmitter emitter ) {
43+ sseEmitters .put (sessionId , emitter );
44+ log .info ("SSE连接已注册 - 会话: {}" , sessionId );
45+ }
46+
47+ /**
48+ * 移除SSE连接
49+ *
50+ * @param sessionId 会话ID
51+ */
52+ public void removeSseConnection (String sessionId ) {
53+ sseEmitters .remove (sessionId );
54+ log .info ("SSE连接已移除 - 会话: {}" , sessionId );
55+ }
56+
57+ /**
58+ * 发送SSE消息给客户端
59+ *
60+ * @param sessionId 会话ID
61+ * @param response 消息响应对象
62+ */
63+ public void sendSseMessage (String sessionId , QuestionGenerationOperation .QuestionGenerationResponse response ) {
64+ SseEmitter emitter = sseEmitters .get (sessionId );
65+ if (emitter != null ) {
66+ try {
67+ String currentNodeId = null ;
68+
69+ // 1. 先将AI生成的新问题添加到QaTree(只填入question,answer留空)
70+ ConversationSession session = sessionManagementService .getSessionById (sessionId );
71+ if (session != null && session .getQaTree () != null && response .getQuestion () != null ) {
72+ // 使用QaTreeDomain添加新节点,answer字段会自动为空
73+ // appendNode方法内部会调用session.getNextNodeId()获取新节点ID
74+ QaTree qaTree = qaTreeDomain .appendNode (
75+ session .getQaTree (),
76+ response .getParentId (),
77+ response .getQuestion (),
78+ session
79+ );
80+
81+ // 获取刚刚创建的节点ID(当前计数器的值)
82+ currentNodeId = String .valueOf (session .getNodeIdCounter ().get ());
83+
84+ log .info ("AI问题已添加到QaTree - 会话: {}, 父节点: {}, 新节点ID: {}, 问题类型: {}" ,
85+ sessionId , response .getParentId (), currentNodeId , response .getQuestion ().getType ());
86+ } else {
87+ log .warn ("无法添加问题到QaTree - 会话: {}, session存在: {}, qaTree存在: {}, question存在: {}" ,
88+ sessionId , session != null ,
89+ session != null && session .getQaTree () != null ,
90+ response .getQuestion () != null );
91+ }
92+
93+ // 2. 创建修改后的响应对象,包含currentNodeId和parentNodeId
94+ Map <String , Object > modifiedResponse = new HashMap <>();
95+ modifiedResponse .put ("question" , response .getQuestion ());
96+ modifiedResponse .put ("currentNodeId" , currentNodeId != null ? currentNodeId : response .getParentId ());
97+ modifiedResponse .put ("parentNodeId" , response .getParentId ());
98+
99+ // 3. 发送SSE消息给前端
100+ emitter .send (SseEmitter .event ()
101+ .name ("message" )
102+ .data (modifiedResponse ));
103+ log .info ("SSE消息发送成功 - 会话: {}, 当前节点ID: {}" , sessionId , currentNodeId );
104+ } catch (IOException e ) {
105+ log .error ("SSE消息发送失败 - 会话: {}, 错误: {}" , sessionId , e .getMessage ());
106+ sseEmitters .remove (sessionId );
107+ } catch (Exception e ) {
108+ log .error ("添加问题到QaTree失败 - 会话: {}, 错误: {}" , sessionId , e .getMessage ());
109+ // 即使QaTree更新失败,仍然发送SSE消息给前端
110+ try {
111+ Map <String , Object > fallbackResponse = new HashMap <>();
112+ fallbackResponse .put ("question" , response .getQuestion ());
113+ fallbackResponse .put ("currentNodeId" , response .getParentId ()); // 使用parentId作为fallback
114+ fallbackResponse .put ("parentNodeId" , response .getParentId ());
115+
116+ emitter .send (SseEmitter .event ()
117+ .name ("message" )
118+ .data (fallbackResponse ));
119+ log .info ("SSE消息发送成功(QaTree更新失败但消息已发送) - 会话: {}" , sessionId );
120+ } catch (IOException ioException ) {
121+ log .error ("SSE消息发送失败 - 会话: {}, 错误: {}" , sessionId , ioException .getMessage ());
122+ sseEmitters .remove (sessionId );
123+ }
124+ }
125+ } else {
126+ log .warn ("SSE连接不存在 - 会话: {}" , sessionId );
127+ }
128+ }
129+
130+ /**
131+ * 获取SSE连接状态
132+ *
133+ * @return 连接状态信息
134+ */
135+ public Map <String , Object > getSseStatus () {
136+ Map <String , Object > status = new ConcurrentHashMap <>();
137+ status .put ("connectedSessions" , sseEmitters .keySet ());
138+ status .put ("totalConnections" , sseEmitters .size ());
139+ status .put ("timestamp" , System .currentTimeMillis ());
140+ return status ;
141+ }
142+
143+ /**
144+ * 发送欢迎消息
145+ *
146+ * @param sessionId 会话ID
147+ * @param message 欢迎消息内容
148+ */
149+ public void sendWelcomeMessage (String sessionId , String message ) {
150+ SseEmitter emitter = sseEmitters .get (sessionId );
151+ if (emitter != null ) {
152+ try {
153+ emitter .send (SseEmitter .event ()
154+ .name ("connected" )
155+ .data (message ));
156+ log .info ("欢迎消息发送成功 - 会话: {}" , sessionId );
157+ } catch (IOException e ) {
158+ log .error ("欢迎消息发送失败 - 会话: {}, 错误: {}" , sessionId , e .getMessage ());
159+ sseEmitters .remove (sessionId );
160+ }
161+ }
162+ }
163+
164+ /**
165+ * 发送连接数据
166+ *
167+ * @param sessionId 会话ID
168+ * @param connectionData 连接数据
169+ */
170+ public void sendWelcomeMessage (String sessionId , Map <String , Object > connectionData ) {
171+ SseEmitter emitter = sseEmitters .get (sessionId );
172+ if (emitter != null ) {
173+ try {
174+ emitter .send (SseEmitter .event ()
175+ .name ("connected" )
176+ .data (connectionData ));
177+ log .info ("连接数据发送成功 - 会话: {}" , sessionId );
178+ } catch (IOException e ) {
179+ log .error ("连接数据发送失败 - 会话: {}, 错误: {}" , sessionId , e .getMessage ());
180+ sseEmitters .remove (sessionId );
181+ }
182+ }
183+ }
184+ }
0 commit comments