Skip to content
This repository was archived by the owner on May 15, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions Sources/Halos/Models/LazuliEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,41 @@ public enum MessageStreamState: String, Sendable {
case aborted
}

public enum QueuedComposerMessageState: String, Sendable {
case queued
case editing
case steering
case followUp
case failed
}

public enum QueuedComposerMoveDirection: Sendable {
case up
case down
}

public struct QueuedComposerMessage: Identifiable, Equatable, Sendable {
public let id: String
public let text: String
public let createdAt: Date
public let order: Int
public let state: QueuedComposerMessageState

public init(
id: String,
text: String,
createdAt: Date,
order: Int,
state: QueuedComposerMessageState
) {
self.id = id
self.text = text
self.createdAt = createdAt
self.order = order
self.state = state
}
}

public enum VeyraApprovalDecision: String, Sendable {
case approve
case deny
Expand Down
172 changes: 172 additions & 0 deletions Sources/Halos/Stores/MissionControlStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ public final class MissionControlStore: ObservableObject {
@Published public private(set) var slashCommandPanelState: SlashCommandPanelState?
@Published public private(set) var halosSessions: [HalosSessionSummary] = []
@Published public private(set) var currentSessionKey = MissionControlStore.halosSessionKey
@Published public private(set) var queuedComposerMessages: [QueuedComposerMessage] = []
@Published public var isCodeSessionBrowserPresented = true
@Published public var selectedPage: HalosPage = .code
@Published public var selectedAutomationID: String?
Expand Down Expand Up @@ -369,6 +370,9 @@ public final class MissionControlStore: ObservableObject {
private var didSelectInitialHalosSession = false
private var localSessionTitleOverrides: [String: LocalSessionTitleOverride] = [:]
private var deletedSessionKeys: Set<String> = []
private var queuedComposerMessagesBySessionKey: [String: [QueuedComposerMessage]] = [:]
private var deferredQueuedMessageIDs: Set<String> = []
private var gatewayRequestHandlerForTesting: ((String, [String: Any], @escaping (Bool, Any?) -> Void) -> Void)?

public init(
openClawConfigURL: URL = FileManager.default.homeDirectoryForCurrentUser.appending(path: ".openclaw/openclaw.json"),
Expand Down Expand Up @@ -418,6 +422,10 @@ public final class MissionControlStore: ObservableObject {
responseMarkerStatus.isActive
}

public var isComposerQueueingActive: Bool {
activeRunID != nil || responseMarkerStatus.isActive
}

public func sendDraft() {
let text = draft.trimmingCharacters(in: .whitespacesAndNewlines)
guard !text.isEmpty else { return }
Expand All @@ -432,6 +440,77 @@ public final class MissionControlStore: ObservableObject {
return
}
draft = ""
if isComposerQueueingActive {
enqueueComposerMessage(text)
return
}
sendTextAsNewRun(text)
}

public func steerQueuedComposerMessage(_ id: String) {
guard let item = queuedComposerMessage(id: id) else { return }
guard isComposerQueueingActive else {
removeQueuedComposerMessage(id)
sendTextAsNewRun(item.text)
return
}
updateQueuedComposerMessage(id, state: .steering)
sendGatewayRequest(
method: "chat.send",
params: [
"sessionKey": sessionKey,
"message": item.text,
"deliver": false,
"idempotencyKey": "queued-\(id)",
]
) { [weak self] ok, _ in
guard let self else { return }
if ok {
self.removeQueuedComposerMessage(id)
} else {
self.updateQueuedComposerMessage(id, state: .failed)
}
}
}

public func editQueuedComposerMessage(_ id: String) {
guard let item = queuedComposerMessage(id: id) else { return }
updateQueuedComposerMessage(id, state: .editing)
draft = item.text
removeQueuedComposerMessage(id)
}

public func sendQueuedComposerMessageAsFollowUp(_ id: String) {
guard let item = queuedComposerMessage(id: id) else { return }
if isComposerQueueingActive {
deferredQueuedMessageIDs.insert(id)
updateQueuedComposerMessage(id, state: .followUp)
} else {
removeQueuedComposerMessage(id)
sendTextAsNewRun(item.text)
}
}

public func deleteQueuedComposerMessage(_ id: String) {
removeQueuedComposerMessage(id)
}

public func moveQueuedComposerMessage(_ id: String, direction: QueuedComposerMoveDirection) {
var items = queuedComposerMessagesBySessionKey[sessionKey] ?? []
guard let index = items.firstIndex(where: { $0.id == id }) else { return }
let targetIndex: Int
switch direction {
case .up:
targetIndex = max(items.startIndex, index - 1)
case .down:
targetIndex = min(items.index(before: items.endIndex), index + 1)
}
guard targetIndex != index else { return }
items.swapAt(index, targetIndex)
setQueuedComposerMessages(items)
}

private func sendTextAsNewRun(_ text: String) {
if isCodeSessionBrowserPresented {
startFreshHalosSessionForComposer(initialText: text)
}
Expand Down Expand Up @@ -479,6 +558,74 @@ public final class MissionControlStore: ObservableObject {
}
}

private func enqueueComposerMessage(_ text: String) {
updateCurrentSessionTitle(with: text)
let nextOrder = ((queuedComposerMessagesBySessionKey[sessionKey] ?? []).map(\.order).max() ?? -1) + 1
let item = QueuedComposerMessage(
id: UUID().uuidString,
text: text,
createdAt: Date(),
order: nextOrder,
state: .queued
)
var items = queuedComposerMessagesBySessionKey[sessionKey] ?? []
items.append(item)
setQueuedComposerMessages(items)
}

private func queuedComposerMessage(id: String) -> QueuedComposerMessage? {
queuedComposerMessagesBySessionKey[sessionKey]?.first { $0.id == id }
}

private func updateQueuedComposerMessage(_ id: String, state: QueuedComposerMessageState) {
var items = queuedComposerMessagesBySessionKey[sessionKey] ?? []
guard let index = items.firstIndex(where: { $0.id == id }) else { return }
let item = items[index]
items[index] = QueuedComposerMessage(
id: item.id,
text: item.text,
createdAt: item.createdAt,
order: item.order,
state: state
)
setQueuedComposerMessages(items)
}

private func removeQueuedComposerMessage(_ id: String) {
deferredQueuedMessageIDs.remove(id)
var items = queuedComposerMessagesBySessionKey[sessionKey] ?? []
items.removeAll { $0.id == id }
setQueuedComposerMessages(items)
}

private func clearQueuedComposerMessages() {
deferredQueuedMessageIDs.subtract(queuedComposerMessages.map(\.id))
queuedComposerMessagesBySessionKey[sessionKey] = []
queuedComposerMessages = []
}

private func setQueuedComposerMessages(_ items: [QueuedComposerMessage]) {
let normalized = items.enumerated().map { index, item in
QueuedComposerMessage(
id: item.id,
text: item.text,
createdAt: item.createdAt,
order: index,
state: item.state
)
}
queuedComposerMessagesBySessionKey[sessionKey] = normalized
queuedComposerMessages = normalized
}

private func drainDeferredQueuedMessageIfPossible() {
guard !isComposerQueueingActive else { return }
guard gatewayConnection == .connected || gatewayRequestHandlerForTesting != nil else { return }
guard let item = queuedComposerMessages.first(where: { deferredQueuedMessageIDs.contains($0.id) }) else { return }
removeQueuedComposerMessage(item.id)
sendTextAsNewRun(item.text)
}

public func requestStop() {
if lazuliLifecycle != .idle {
lazuliTask?.send(.string(#"{"type":"stop"}"#)) { _ in }
Expand Down Expand Up @@ -687,6 +834,7 @@ public final class MissionControlStore: ObservableObject {
localSessionTitleOverrides.removeValue(forKey: key)

if sessionKey == key {
clearQueuedComposerMessages()
messages = []
isCodeSessionBrowserPresented = true
if let nextSession = remainingSessions.first {
Expand All @@ -703,6 +851,7 @@ public final class MissionControlStore: ObservableObject {
let suffix = UUID().uuidString.split(separator: "-").first.map(String.init) ?? "\(Int(Date().timeIntervalSince1970))"
let key = "\(Self.halosSessionKey):\(suffix.lowercased())"
let label = "Halos \(DateFormatter.localizedString(from: Date(), dateStyle: .none, timeStyle: .short))"
clearQueuedComposerMessages()
resetTranscriptState(for: key)
sendGatewayRequest(
method: "sessions.create",
Expand All @@ -723,6 +872,7 @@ public final class MissionControlStore: ObservableObject {
let suffix = UUID().uuidString.split(separator: "-").first.map(String.init) ?? "\(Int(Date().timeIntervalSince1970))"
let key = "\(Self.halosSessionKey):\(suffix.lowercased())"
let preview = Self.sessionTitlePreview(from: initialText)
clearQueuedComposerMessages()
resetTranscriptState(for: key)
localSessionTitleOverrides[key] = LocalSessionTitleOverride(preview: preview, updatedAt: Date())
halosSessions = sortHalosSessions(applyLocalSessionTitleOverrides(to: [currentSessionSummary] + halosSessions))
Expand All @@ -737,6 +887,7 @@ public final class MissionControlStore: ObservableObject {
}

private func clearLocalTranscript(command: SlashCommand) {
clearQueuedComposerMessages()
messages = []
activeRunID = nil
assistantMessageIDsByRunID.removeAll()
Expand Down Expand Up @@ -774,6 +925,7 @@ public final class MissionControlStore: ObservableObject {

private func switchHalosSession(_ key: String, loadsHistory: Bool) {
guard isHalosSessionKey(key) else { return }
clearQueuedComposerMessages()
resetTranscriptState(for: key)
ensureHalosSession(loadsHistory: loadsHistory)
refreshHalosSessions()
Expand All @@ -794,6 +946,7 @@ public final class MissionControlStore: ObservableObject {
activeSlashCommandRunID = nil
slashCommandPanelState = nil
messages = []
queuedComposerMessages = queuedComposerMessagesBySessionKey[key] ?? []
finishResponseRun(runID: key)
}

Expand Down Expand Up @@ -826,6 +979,20 @@ public final class MissionControlStore: ObservableObject {
handleGatewayDisconnect()
}

func beginResponseRunForTesting(runID: String = "test-active-run") {
runStartedAtByRunID[runID] = Date()
activateResponseRun(runID: runID, resetTokens: true)
upsertRunStatus(runID: runID, title: thinkingWord(for: runID), body: "Processing your message.")
}

func finishResponseRunForTesting(runID: String = "test-active-run") {
finishResponseRun(runID: runID)
}

func setGatewayRequestHandlerForTesting(_ handler: ((String, [String: Any], @escaping (Bool, Any?) -> Void) -> Void)?) {
gatewayRequestHandlerForTesting = handler
}

private func loadAutomations() {
guard
let data = try? Data(contentsOf: cronJobsURL),
Expand Down Expand Up @@ -1790,6 +1957,10 @@ public final class MissionControlStore: ObservableObject {
idPrefix: String = "req",
completion: @escaping (Bool, Any?) -> Void
) {
if let gatewayRequestHandlerForTesting {
gatewayRequestHandlerForTesting(method, params, completion)
return
}
guard let gatewayTask else {
completion(false, nil)
return
Expand Down Expand Up @@ -2562,6 +2733,7 @@ public final class MissionControlStore: ObservableObject {
}
responseStatusTask?.cancel()
responseStatusTask = nil
drainDeferredQueuedMessageIfPossible()
}

private func markResponseTokensVisible(runID: String) {
Expand Down
Loading
Loading