Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions backend/internal/cli/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ func newImportCommand(ctx *commandContext) *cobra.Command {
var opts importOptions
cmd := &cobra.Command{
Use: "import",
Short: "Import projects and orchestrator sessions from a legacy AO install",
Short: "Import projects from a legacy AO install",
Long: "Import reads the legacy Agent Orchestrator flat-file store " +
"(~/.agent-orchestrator) read-only and ports its projects, per-project " +
"settings, and each project's live orchestrator session into the rewrite " +
"database. Legacy files are never modified, and a re-run skips rows that " +
"already exist, so it is safe to run more than once.\n\n" +
"(~/.agent-orchestrator) read-only and ports its projects and per-project " +
"settings into the rewrite database. Legacy files are never modified, and " +
"a re-run skips rows that already exist, so it is safe to run more than " +
"once.\n\n" +
"The daemon must be stopped: it is the sole writer of the database.",
Args: noArgs,
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -71,7 +71,7 @@ func (c *commandContext) runImport(cmd *cobra.Command, opts importOptions) error

if !opts.dryRun && !opts.yes {
ok, err := confirm(c.deps.In, cmd.OutOrStdout(),
fmt.Sprintf("Import projects and orchestrator sessions from %s?", root), true)
fmt.Sprintf("Import projects from %s?", root), true)
if err != nil {
return err
}
Expand All @@ -82,9 +82,8 @@ func (c *commandContext) runImport(cmd *cobra.Command, opts importOptions) error
}

rep, err := c.executeImport(cmd.Context(), cfg, legacyimport.Options{
Root: root,
DataDir: cfg.DataDir,
DryRun: opts.dryRun,
Root: root,
DryRun: opts.dryRun,
})
if err != nil {
return err
Expand Down Expand Up @@ -115,8 +114,6 @@ func writeImportSummary(w io.Writer, rep legacyimport.Report) error {
b.WriteString("Dry run — no changes written.\n")
}
fmt.Fprintf(&b, "Projects: %d imported, %d already present\n", rep.ProjectsImported, rep.ProjectsSkipped)
fmt.Fprintf(&b, "Orchestrators: %d imported, %d skipped, %d absent\n", rep.OrchestratorsImported, rep.OrchestratorsSkipped, rep.OrchestratorsAbsent)
fmt.Fprintf(&b, "Transcripts: %d relocated\n", rep.TranscriptsRelocated)
if len(rep.Notes) > 0 {
b.WriteString("\nNotes:\n")
for _, n := range rep.Notes {
Expand Down
54 changes: 4 additions & 50 deletions backend/internal/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (
"github.com/spf13/cobra"

"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/legacyimport"
"github.com/aoagents/agent-orchestrator/backend/internal/runfile"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)

const defaultStartTimeout = 10 * time.Second
Expand Down Expand Up @@ -77,11 +75,10 @@ func (c *commandContext) startDaemon(ctx context.Context, opts startOptions) (da
}
}

// First-boot opt-in: before launching the daemon (so the import runs with the
// store unlocked and the daemon as sole writer afterwards), offer to import a
// legacy AO install. Declining or any import failure is non-fatal — the
// daemon still starts and the user can run `ao import` later.
c.maybeFirstBootImport(ctx, cfg)
// `ao start` is headless: it only launches the daemon. Detecting a legacy AO
// install and offering to import it is the dashboard's job; it polls
// GET /api/v1/import and POSTs to run the import through the live daemon. The
// CLI never prompts here; `ao import` remains for explicit offline imports.

exe, err := c.deps.Executable()
if err != nil {
Expand Down Expand Up @@ -118,49 +115,6 @@ func (c *commandContext) startDaemon(ctx context.Context, opts startOptions) (da
return ready, nil
}

// maybeFirstBootImport offers to import a legacy AO install the first time the
// daemon is started against an empty rewrite database. It is best-effort: every
// failure path degrades to "start the daemon fresh" so a broken or absent legacy
// store can never block startup. A non-interactive boot (Electron/headless)
// never auto-imports; it prints a one-line hint to run `ao import` explicitly.
func (c *commandContext) maybeFirstBootImport(ctx context.Context, cfg config.Config) {
root := legacyimport.DefaultLegacyRootDir()
if !legacyimport.HasLegacyData(root) {
return
}

store, err := sqlite.Open(cfg.DataDir)
if err != nil {
return // the daemon will surface a real store error on its own open
}
defer func() { _ = store.Close() }()

projects, err := store.ListProjects(ctx)
if err != nil || len(projects) > 0 {
// Already imported (or populated) — don't offer again.
return
}

out := c.deps.Out
if !stdinIsInteractive(c.deps.In) {
_, _ = fmt.Fprintf(out, "Found existing AO projects at %s. Run `ao import` to bring them in.\n", root)
return
}

ok, err := confirm(c.deps.In, out, "Found existing AO projects and sessions. Import them now?", true)
if err != nil || !ok {
_, _ = fmt.Fprintln(out, "Continuing fresh. Run `ao import` later to bring in your existing data.")
return
}

rep, err := legacyimport.Run(ctx, store, legacyimport.Options{Root: root, DataDir: cfg.DataDir})
if err != nil {
_, _ = fmt.Fprintf(out, "Import failed: %v\nContinuing fresh; legacy data is untouched. Retry with `ao import`.\n", err)
return
}
_ = writeImportSummary(out, rep)
}

func (c *commandContext) waitForReady(ctx context.Context, timeout time.Duration) (daemonStatus, error) {
if timeout <= 0 {
timeout = defaultStartTimeout
Expand Down
2 changes: 2 additions & 0 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/notify"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
"github.com/aoagents/agent-orchestrator/backend/internal/runfile"
importsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/importer"
notificationsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/notification"
projectsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/project"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
Expand Down Expand Up @@ -134,6 +135,7 @@ func Run() error {
Reviews: reviewSvc,
Notifications: notifier,
NotificationStream: notificationHub,
Import: importsvc.New(importsvc.Deps{Store: store}),
CDC: store,
Events: cdcPipe.Broadcaster,
Activity: lcStack.LCM,
Expand Down
4 changes: 4 additions & 0 deletions backend/internal/httpd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type APIDeps struct {
Reviews reviewsvc.Manager
Notifications controllers.NotificationService
NotificationStream controllers.NotificationStream
Import controllers.ImportService
CDC cdc.Source
Events cdcSubscriber
Telemetry ports.EventSink
Expand All @@ -40,6 +41,7 @@ type API struct {
prs *controllers.PRsController
reviews *controllers.ReviewsController
notifications *controllers.NotificationsController
imports *controllers.ImportController
events *EventsController
}

Expand All @@ -59,6 +61,7 @@ func NewAPI(cfg config.Config, deps APIDeps) *API {
prs: &controllers.PRsController{Svc: deps.PRs},
reviews: &controllers.ReviewsController{Svc: deps.Reviews},
notifications: &controllers.NotificationsController{Svc: deps.Notifications, Stream: deps.NotificationStream},
imports: &controllers.ImportController{Svc: deps.Import},
events: &EventsController{Source: deps.CDC, Live: deps.Events},
}
}
Expand All @@ -82,6 +85,7 @@ func (a *API) Register(root chi.Router) {
a.prs.Register(r)
a.reviews.Register(r)
a.notifications.Register(r)
a.imports.Register(r)
// Sibling REST controllers plug in here.
})
// Long-lived streams intentionally bypass the REST timeout middleware.
Expand Down
85 changes: 85 additions & 0 deletions backend/internal/httpd/apispec/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,55 @@ paths:
summary: Stream CDC events with durable replay
tags:
- events
/api/v1/import:
get:
operationId: importStatus
responses:
"200":
content:
application/json:
schema:
$ref: '#/components/schemas/ImportStatusResponse'
description: OK
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/APIError'
description: Internal Server Error
"501":
content:
application/json:
schema:
$ref: '#/components/schemas/APIError'
description: Not Implemented
summary: Report whether a legacy AO install is available to import
tags:
- import
post:
operationId: runImport
responses:
"200":
content:
application/json:
schema:
$ref: '#/components/schemas/ImportRunResponse'
description: OK
"500":
content:
application/json:
schema:
$ref: '#/components/schemas/APIError'
description: Internal Server Error
"501":
content:
application/json:
schema:
$ref: '#/components/schemas/APIError'
description: Not Implemented
summary: Import projects and the orchestrator from a legacy AO install
tags:
- import
/api/v1/notifications:
get:
operationId: listNotifications
Expand Down Expand Up @@ -1337,6 +1386,40 @@ components:
required:
- harness
type: object
ImportReport:
properties:
dryRun:
type: boolean
notes:
items:
type: string
type: array
projectsImported:
type: integer
projectsSkipped:
type: integer
required:
- dryRun
- projectsImported
- projectsSkipped
type: object
ImportRunResponse:
properties:
report:
$ref: '#/components/schemas/ImportReport'
required:
- report
type: object
ImportStatusResponse:
properties:
available:
type: boolean
legacyRoot:
type: string
required:
- available
- legacyRoot
type: object
KillSessionResponse:
properties:
freed:
Expand Down Expand Up @@ -1907,3 +1990,5 @@ tags:
name: notifications
- description: Server-sent CDC event stream with durable replay
name: events
- description: 'Legacy AO import: detection and trigger'
name: import
33 changes: 33 additions & 0 deletions backend/internal/httpd/apispec/specgen/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func Build() ([]byte, error) {
"Durable dashboard notifications"),
*(&openapi31.Tag{Name: "events"}).WithDescription(
"Server-sent CDC event stream with durable replay"),
*(&openapi31.Tag{Name: "import"}).WithDescription(
"Legacy AO import: detection and trigger"),
}

for _, op := range operations() {
Expand Down Expand Up @@ -162,6 +164,11 @@ var schemaNames = map[string]string{
"ControllersNotificationTarget": "NotificationTarget",
"ControllersNotificationResponse": "NotificationResponse",
"ControllersListNotificationsResponse": "ListNotificationsResponse",
// httpd/controllers (import wire envelopes)
"ControllersImportStatusResponse": "ImportStatusResponse",
"ControllersImportRunResponse": "ImportRunResponse",
// internal/legacyimport
"LegacyimportReport": "ImportReport",
// httpd/controllers — PR wire envelopes
"ControllersMergePRResponse": "MergePRResponse",
"ControllersResolveCommentsRequest": "ResolveCommentsRequest",
Expand Down Expand Up @@ -259,9 +266,35 @@ func operations() []operation {
ops = append(ops, prOperations()...)
ops = append(ops, reviewOperations()...)
ops = append(ops, notificationOperations()...)
ops = append(ops, importOperations()...)
return ops
}

// importOperations declares the legacy-import operations. The set must stay
// 1:1 with the routes ImportController.Register mounts (TestRouteSpecParity).
func importOperations() []operation {
return []operation{
{
method: http.MethodGet, path: "/api/v1/import", id: "importStatus", tag: "import",
summary: "Report whether a legacy AO install is available to import",
resps: []respUnit{
{http.StatusOK, controllers.ImportStatusResponse{}},
{http.StatusInternalServerError, envelope.APIError{}},
{http.StatusNotImplemented, envelope.APIError{}},
},
},
{
method: http.MethodPost, path: "/api/v1/import", id: "runImport", tag: "import",
summary: "Import projects and the orchestrator from a legacy AO install",
resps: []respUnit{
{http.StatusOK, controllers.ImportRunResponse{}},
{http.StatusInternalServerError, envelope.APIError{}},
{http.StatusNotImplemented, envelope.APIError{}},
},
},
}
}

func notificationOperations() []operation {
return []operation{
{
Expand Down
14 changes: 14 additions & 0 deletions backend/internal/httpd/controllers/dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,23 @@ import (
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/legacyimport"
projectsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/project"
)

// ImportStatusResponse is the body of GET /api/v1/import: whether a legacy AO
// install is available to import, and the root the daemon would read from.
type ImportStatusResponse struct {
Available bool `json:"available"`
LegacyRoot string `json:"legacyRoot"`
}

// ImportRunResponse is the body of POST /api/v1/import: the structured outcome
// of the import run (counts + notes), reused verbatim from the import engine.
type ImportRunResponse struct {
Report legacyimport.Report `json:"report"`
}

// HTTP response envelopes for the projects surface — the SINGLE definition of
// each wire shape. The handlers encode these (envelope.WriteJSON), and
// apispec.Build reflects these same types into openapi.yaml, so the served
Expand Down
Loading