Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 65 additions & 19 deletions hub/src/ws/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,29 +300,75 @@ export async function handleClientMessage(ws: ServerWebSocket<ClientWsData>, raw
message,
})

// Forward to channel or agent (Claude Code session)
// Forward to channel or agent (Claude Code session).
// Detect three failure modes so the UI never silently swallows a send:
// (a) no channel registered for this session (agent never connected, or
// was cleaned up by a close handler)
// (b) channel's underlying socket is not OPEN (readyState !== 1) — a
// half-open TCP connection where the agent process is gone but the
// hub hasn't observed the close yet
// (c) ws.send returns -1 (Bun backpressure / closed) — existing path
// In all three cases: clean up registry (if stale), broadcast offline,
// and emit a structured `send_refused` to the SENDER so the chat UI can
// surface "session offline — no runner connected" instead of spinning.
const channel = getChannel(msg.session_id)
if (channel) {
const forwardPayload: Record<string, unknown> = {
type: 'user_message',
id: message.id,
content: msg.content,
ts: message.created_at,
}
// Include images/attachments if present (used by agent connections)
if (msg.images) forwardPayload.images = msg.images
if (msg.attachments) forwardPayload.attachments = msg.attachments
const sent = channel.ws.send(JSON.stringify(forwardPayload))
if (sent === -1) {
// Socket is closed/dead — clean up the stale registry entry
console.log(`[client] channel send failed (socket dead), unregistering session=${msg.session_id}`)
const readyState = channel ? (channel.ws as any).readyState : undefined
const channelLive = !!channel && readyState === 1 // 1 = WebSocket.OPEN

if (!channelLive) {
const reason = !channel
? 'no_channel'
: `socket_not_open(readyState=${readyState})`
console.log(`[client] cannot forward session=${msg.session_id} reason=${reason}`)
if (channel && readyState !== 1) {
// Stale half-open entry — purge it so the next reconnect can register
// cleanly and existing clients re-render as offline.
unregisterChannel(msg.session_id)
broadcastToSubscribers(msg.session_id, { type: 'session_status', session_id: msg.session_id, status: 'offline' })
} else {
console.log(`[client] forwarding to channel session=${msg.session_id}`)
}
broadcastToSubscribers(msg.session_id, {
type: 'session_status',
session_id: msg.session_id,
status: 'offline',
})
try {
ws.send(JSON.stringify({
type: 'send_refused',
client_id: msg.id,
session_id: msg.session_id,
error: 'session_offline',
reason: 'No live runner is attached to this session. Start `claude-remote` in the project directory to reconnect.',
}))
} catch {}
return
}

const forwardPayload: Record<string, unknown> = {
type: 'user_message',
id: message.id,
content: msg.content,
ts: message.created_at,
}
// Include images/attachments if present (used by agent connections)
if (msg.images) forwardPayload.images = msg.images
if (msg.attachments) forwardPayload.attachments = msg.attachments
const sent = channel!.ws.send(JSON.stringify(forwardPayload))
if (sent === -1) {
// Socket reported OPEN moments ago but send failed — backpressure or
// a race with close. Treat as offline and notify the sender.
console.log(`[client] channel send failed (sent=-1), unregistering session=${msg.session_id}`)
unregisterChannel(msg.session_id)
broadcastToSubscribers(msg.session_id, { type: 'session_status', session_id: msg.session_id, status: 'offline' })
try {
ws.send(JSON.stringify({
type: 'send_refused',
client_id: msg.id,
session_id: msg.session_id,
error: 'session_offline',
reason: 'Send to runner failed (socket closed mid-send). Reconnect your agent and retry.',
}))
} catch {}
} else {
console.log(`[client] no channel connected for session=${msg.session_id}`)
console.log(`[client] forwarding to channel session=${msg.session_id} bytes=${sent}`)
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions web/src/hooks/useChat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,31 @@ export function useChat(
}
}

// Hub refused this send (offline session, quota threshold, etc.).
// Surface as a transient assistant bubble so the user sees WHY their
// message didn't get a response. Only render in the currently active
// session — refusals for background sessions stay silent.
if (msg.type === 'send_refused' && typeof msg.session_id === 'string') {
const incomingSessionId = msg.session_id as string
if (incomingSessionId !== activeSessionRef.current) return
const reason = (msg.reason as string) || (msg.error as string) || 'Send refused.'
const synthetic: ChatMessage = {
id: `refused-${msg.client_id || Date.now()}`,
session_id: incomingSessionId,
role: 'assistant',
content: `⚠ ${reason}`,
status: 'interrupted',
created_at: new Date().toISOString(),
}
setMessages(prev => {
if (prev.some(m => m.id === synthetic.id)) return prev
const next = [...prev, synthetic]
cacheRef.current[incomingSessionId] = next
return next
})
return
}

// Live token streaming: append delta to the corresponding placeholder
// message so the user sees the response build up in real time. The
// bubble is already persisted to Postgres so a hub restart preserves
Expand Down
19 changes: 19 additions & 0 deletions web/src/hooks/useChatSurface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,25 @@ export function useChatSurface({
return
}

// Hub refused this send — surface as a transient assistant bubble so
// the cell visibly reflects WHY the runner never replied.
if (msg.type === 'send_refused') {
const reason = (msg.reason as string) || (msg.error as string) || 'Send refused.'
const synthetic: ChatMessage = {
id: `refused-${msg.client_id || Date.now()}`,
session_id: sessionIdRef.current!,
role: 'assistant',
content: `⚠ ${reason}`,
status: 'interrupted',
created_at: new Date().toISOString(),
}
setMessages(prev => {
if (prev.some(m => m.id === synthetic.id)) return prev
return [...prev, synthetic]
})
return
}

// Activity events — minimal reducer mirroring useActivity
if (msg.type === 'status') {
const state = msg.state as ActivityState['status']
Expand Down
10 changes: 10 additions & 0 deletions web/src/hooks/useWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ export function useWebSocket(token: string | null) {
return
}

// The hub refused a send (offline session, quota threshold, etc.).
// Clear the in-flight entry so we don't replay on reconnect, then fall
// through to handlers so chat-level UI can surface the reason.
if (msg.type === 'send_refused' && typeof msg.client_id === 'string') {
if (inFlightRef.current.delete(msg.client_id)) {
persistInFlight()
}
// fall through — handlers below render the error in the chat surface
}

for (const handler of handlersRef.current) {
handler(msg)
}
Expand Down
Loading