@@ -63,6 +63,10 @@ def __init__(self) -> None:
6363 self .message_history : Dict [str , List [ChatMessage ]] = {}
6464 self .escalations : List [EscalationEvent ] = []
6565 self .user_connections : Dict [str , Set [str ]] = {}
66+ # status: "open" | "escalated" | "assigned" | "resolved"
67+ self .conversation_statuses : Dict [str , str ] = {}
68+ self .conversation_assignments : Dict [str , Optional [str ]] = {}
69+ self .conversation_escalated_at : Dict [str , datetime ] = {}
6670 self ._lock = asyncio .Lock ()
6771
6872 async def connect (
@@ -192,6 +196,9 @@ async def escalate_conversation(
192196
193197 async with self ._lock :
194198 self .escalations .append (escalation )
199+ self .conversation_statuses [conversation_id ] = "escalated"
200+ self .conversation_assignments [conversation_id ] = None
201+ self .conversation_escalated_at [conversation_id ] = escalation .timestamp
195202
196203 if conversation_id in self .active_connections :
197204 escalation_notification = {
@@ -225,6 +232,77 @@ async def escalate_conversation(
225232 )
226233 return escalation
227234
235+ async def assign_conversation (
236+ self , conversation_id : str , agent_id : str
237+ ) -> None :
238+ """Assign an escalated conversation to a support agent."""
239+ async with self ._lock :
240+ self .conversation_statuses [conversation_id ] = "assigned"
241+ self .conversation_assignments [conversation_id ] = agent_id
242+
243+ notification = {
244+ "type" : "assignment" ,
245+ "conversation_id" : conversation_id ,
246+ "agent_id" : agent_id ,
247+ "timestamp" : datetime .utcnow ().isoformat (),
248+ "message" : "This conversation has been assigned to a support agent." ,
249+ }
250+
251+ if conversation_id in self .active_connections :
252+ disconnected : List [WebSocket ] = []
253+ for websocket in self .active_connections [conversation_id ]:
254+ try :
255+ await websocket .send_text (json .dumps (notification ))
256+ except Exception as exc :
257+ log_warning (
258+ "Failed to send assignment notification" ,
259+ {"conversation_id" : conversation_id , "error" : str (exc )},
260+ )
261+ disconnected .append (websocket )
262+
263+ if disconnected :
264+ async with self ._lock :
265+ for ws in disconnected :
266+ if ws in self .active_connections .get (conversation_id , []):
267+ self .active_connections [conversation_id ].remove (ws )
268+
269+ log_info (
270+ "Conversation assigned" ,
271+ {"conversation_id" : conversation_id , "agent_id" : agent_id },
272+ )
273+
274+ def get_conversation_status (
275+ self , conversation_id : str
276+ ) -> Dict [str , Any ]:
277+ """Return the current status and assignment for a conversation."""
278+ status = self .conversation_statuses .get (conversation_id , "open" )
279+ assigned_agent_id = self .conversation_assignments .get (conversation_id )
280+ return {
281+ "conversation_id" : conversation_id ,
282+ "status" : status ,
283+ "assigned_agent_id" : assigned_agent_id ,
284+ }
285+
286+ def get_unassigned_queue (self ) -> List [Dict [str , Any ]]:
287+ """Return all escalated conversations with no assigned agent, ordered by escalated_at asc."""
288+ queue = []
289+ for conv_id , status in self .conversation_statuses .items ():
290+ if status == "escalated" and self .conversation_assignments .get (conv_id ) is None :
291+ escalated_at = self .conversation_escalated_at .get (conv_id )
292+ # Find the most recent escalation reason for this conversation
293+ reason = ""
294+ for esc in reversed (self .escalations ):
295+ if esc .conversation_id == conv_id :
296+ reason = esc .reason
297+ break
298+ queue .append ({
299+ "conversation_id" : conv_id ,
300+ "escalated_at" : escalated_at ,
301+ "reason" : reason ,
302+ })
303+ queue .sort (key = lambda x : x ["escalated_at" ] or datetime .min )
304+ return queue
305+
228306 def get_escalations (
229307 self , conversation_id : Optional [str ] = None
230308 ) -> List [EscalationEvent ]:
0 commit comments