diff --git a/.mockery.yml b/.mockery.yml index c1fe4a6..4e084af 100644 --- a/.mockery.yml +++ b/.mockery.yml @@ -38,3 +38,11 @@ packages: config: all: true interfaces: + github.com/codesphere-cloud/cs-go/pkg/deploy: + config: + all: true + interfaces: + github.com/codesphere-cloud/cs-go/pkg/pipeline: + config: + all: true + interfaces: diff --git a/api/workspace.go b/api/workspace.go index 2ec8ef1..530fe46 100644 --- a/api/workspace.go +++ b/api/workspace.go @@ -4,7 +4,14 @@ package api import ( + "bufio" + "context" + "encoding/json" "fmt" + "io" + "log" + "net/http" + "strings" "github.com/codesphere-cloud/cs-go/api/errors" "github.com/codesphere-cloud/cs-go/api/openapi_client" @@ -210,3 +217,97 @@ func (c Client) GitPull(workspaceId int, remote string, branch string) error { _, err := req.Execute() return errors.FormatAPIError(err) } + +// logEntry represents a single log line from the SSE stream. +type logEntry struct { + Timestamp string `json:"timestamp"` + Kind string `json:"kind"` + Data string `json:"data"` +} + +// StreamLogs connects to the Codesphere SSE log endpoint and writes parsed +// log entries to the provided writer until the context is cancelled or the +// stream ends. This is used during pipeline execution to provide real-time +// log output. +func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { + endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", apiUrl, wsId, stage, step) + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + if err != nil { + return fmt.Errorf("failed to construct log stream request: %w", err) + } + + req.Header.Set("Accept", "text/event-stream") + + // Set auth from the client's context token + if token, ok := ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } else if token, ok := c.ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + // Context cancellation is expected when the stage finishes + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("failed to connect to log stream: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("log stream responded with status %d", resp.StatusCode) + } + + reader := bufio.NewReader(resp.Body) + + for { + // Check if context is done + select { + case <-ctx.Done(): + return nil + default: + } + + // Parse one SSE event + var eventData string + for { + line, err := reader.ReadString('\n') + if err != nil { + if ctx.Err() != nil || err == io.EOF { + return nil + } + return fmt.Errorf("failed to read log stream: %w", err) + } + + line = strings.TrimSpace(line) + + if strings.HasPrefix(line, "data:") { + data := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + if eventData != "" { + eventData += "\n" + data + } else { + eventData = data + } + } else if line == "" && eventData != "" { + // Empty line marks end of SSE event + break + } + } + + // Parse and print log entries + var entries []logEntry + if err := json.Unmarshal([]byte(eventData), &entries); err != nil { + // Skip unparseable events (e.g. error responses) + log.Printf("⚠ log stream: %s", eventData) + eventData = "" + continue + } + + for _, entry := range entries { + _, _ = fmt.Fprintf(w, "%s | %s\n", entry.Timestamp, entry.Data) + } + eventData = "" + } +} diff --git a/cli/cmd/client.go b/cli/cmd/client.go index 4c33402..ead6c47 100644 --- a/cli/cmd/client.go +++ b/cli/cmd/client.go @@ -32,6 +32,7 @@ type Client interface { GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error) GitPull(wsId int, remote string, branch string) error DeployLandscape(wsId int, profile string) error + StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error } // CommandExecutor abstracts command execution for testing diff --git a/cli/cmd/mocks.go b/cli/cmd/mocks.go index 5a6f9cf..5ddd4ba 100644 --- a/cli/cmd/mocks.go +++ b/cli/cmd/mocks.go @@ -888,6 +888,87 @@ func (_c *MockClient_StartPipelineStage_Call) RunAndReturn(run func(wsId int, pr return _c } +// StreamLogs provides a mock function for the type MockClient +func (_mock *MockClient) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { + ret := _mock.Called(ctx, apiUrl, wsId, stage, step, w) + + if len(ret) == 0 { + panic("no return value specified for StreamLogs") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string, int, string, int, io.Writer) error); ok { + r0 = returnFunc(ctx, apiUrl, wsId, stage, step, w) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockClient_StreamLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamLogs' +type MockClient_StreamLogs_Call struct { + *mock.Call +} + +// StreamLogs is a helper method to define mock.On call +// - ctx context.Context +// - apiUrl string +// - wsId int +// - stage string +// - step int +// - w io.Writer +func (_e *MockClient_Expecter) StreamLogs(ctx interface{}, apiUrl interface{}, wsId interface{}, stage interface{}, step interface{}, w interface{}) *MockClient_StreamLogs_Call { + return &MockClient_StreamLogs_Call{Call: _e.mock.On("StreamLogs", ctx, apiUrl, wsId, stage, step, w)} +} + +func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer)) *MockClient_StreamLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + var arg2 int + if args[2] != nil { + arg2 = args[2].(int) + } + var arg3 string + if args[3] != nil { + arg3 = args[3].(string) + } + var arg4 int + if args[4] != nil { + arg4 = args[4].(int) + } + var arg5 io.Writer + if args[5] != nil { + arg5 = args[5].(io.Writer) + } + run( + arg0, + arg1, + arg2, + arg3, + arg4, + arg5, + ) + }) + return _c +} + +func (_c *MockClient_StreamLogs_Call) Return(err error) *MockClient_StreamLogs_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockClient_StreamLogs_Call) RunAndReturn(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error) *MockClient_StreamLogs_Call { + _c.Call.Return(run) + return _c +} + // WaitForWorkspaceRunning provides a mock function for the type MockClient func (_mock *MockClient) WaitForWorkspaceRunning(workspace *api.Workspace, timeout time.Duration) error { ret := _mock.Called(workspace, timeout) diff --git a/cli/cmd/start_pipeline.go b/cli/cmd/start_pipeline.go index 3807854..9d339e9 100644 --- a/cli/cmd/start_pipeline.go +++ b/cli/cmd/start_pipeline.go @@ -5,12 +5,11 @@ package cmd import ( "fmt" - "log" - "slices" "time" "github.com/codesphere-cloud/cs-go/api" "github.com/codesphere-cloud/cs-go/pkg/io" + "github.com/codesphere-cloud/cs-go/pkg/pipeline" "github.com/spf13/cobra" ) @@ -27,8 +26,6 @@ type StartPipelineOpts struct { Timeout *time.Duration } -const IdeServer string = "codesphere-ide" - func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error { workspaceId, err := c.Opts.GetWorkspaceId() @@ -41,11 +38,11 @@ func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error { return fmt.Errorf("failed to create Codesphere client: %w", err) } - return c.StartPipelineStages(client, workspaceId, args) + return c.StartPipelineStages(client, workspaceId, args, c.Opts.GetApiUrl()) } func AddStartPipelineCmd(start *cobra.Command, opts GlobalOptions) { - pipeline := StartPipelineCmd{ + p := StartPipelineCmd{ cmd: &cobra.Command{ Use: "pipeline", Short: "Start pipeline stages of a workspace", @@ -72,116 +69,22 @@ func AddStartPipelineCmd(start *cobra.Command, opts GlobalOptions) { Time: &api.RealTime{}, } - pipeline.Opts.Timeout = pipeline.cmd.Flags().Duration("timeout", 30*time.Minute, "Time to wait per stage before stopping the command execution (e.g. 10m)") - pipeline.Opts.Profile = pipeline.cmd.Flags().StringP("profile", "p", "", "CI profile to use (e.g. 'prod' for the profile defined in 'ci.prod.yml'), defaults to the ci.yml profile") - start.AddCommand(pipeline.cmd) - - pipeline.cmd.RunE = pipeline.RunE -} - -func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string) error { - for _, stage := range stages { - if !isValidStage(stage) { - return fmt.Errorf("invalid pipeline stage: %s", stage) - } - } - for _, stage := range stages { - err := c.startStage(client, wsId, stage) - if err != nil { - return err - } - } - return nil -} - -func isValidStage(stage string) bool { - return slices.Contains([]string{"prepare", "test", "run"}, stage) -} - -func (c *StartPipelineCmd) startStage(client Client, wsId int, stage string) error { - log.Printf("starting %s stage on workspace %d...", stage, wsId) - - err := client.StartPipelineStage(wsId, *c.Opts.Profile, stage) - if err != nil { - log.Println() - return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err) - } - - err = c.waitForPipelineStage(client, wsId, stage) - if err != nil { - return fmt.Errorf("failed waiting for stage %s to finish: %w", stage, err) + p.Opts.Timeout = p.cmd.Flags().Duration("timeout", 30*time.Minute, "Time to wait per stage before stopping the command execution (e.g. 10m)") + p.Opts.Profile = p.cmd.Flags().StringP("profile", "p", "", "CI profile to use (e.g. 'prod' for the profile defined in 'ci.prod.yml'), defaults to the ci.yml profile") + start.AddCommand(p.cmd) - } - return nil -} - -func (c *StartPipelineCmd) waitForPipelineStage(client Client, wsId int, stage string) error { - delay := 5 * time.Second - - maxWaitTime := c.Time.Now().Add(*c.Opts.Timeout) - for { - status, err := client.GetPipelineState(wsId, stage) - if err != nil { - log.Printf("\nError getting pipeline status: %s, trying again...", err.Error()) - c.Time.Sleep(delay) - continue - } - - if c.allFinished(status) { - log.Println("(finished)") - break - } - - if allRunning(status) && stage == "run" { - log.Println("(running)") - break - } - - err = shouldAbort(status) - if err != nil { - log.Println("(failed)") - return fmt.Errorf("stage %s failed: %w", stage, err) - } - - log.Print(".") - if c.Time.Now().After(maxWaitTime) { - log.Println() - return fmt.Errorf("timed out waiting for pipeline stage %s to be complete", stage) - } - c.Time.Sleep(delay) - } - return nil -} - -func allRunning(status []api.PipelineStatus) bool { - for _, s := range status { - // Run stage is only running customer servers, ignore IDE server - if s.Server != IdeServer && s.State != "running" { - return false - } - } - return true -} - -func (c *StartPipelineCmd) allFinished(status []api.PipelineStatus) bool { - io.Verboseln(*c.Opts.Verbose, "====") - for _, s := range status { - io.Verbosef(*c.Opts.Verbose, "Server: %s, State: %s, Replica: %s\n", s.Server, s.State, s.Replica) - } - for _, s := range status { - // Prepare and Test stage is only running in the IDE server, ignore customer servers - if s.Server == IdeServer && s.State != "success" { - return false - } - } - return true + p.cmd.RunE = p.RunE } -func shouldAbort(status []api.PipelineStatus) error { - for _, s := range status { - if slices.Contains([]string{"failure", "aborted"}, s.State) { - return fmt.Errorf("server %s, replica %s reached unexpected state %s", s.Server, s.Replica, s.State) - } +func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string, apiUrl ...string) error { + url := "" + if len(apiUrl) > 0 { + url = apiUrl[0] } - return nil + runner := pipeline.NewRunner(client, c.Time) + return runner.RunStages(wsId, stages, pipeline.Config{ + Profile: *c.Opts.Profile, + Timeout: *c.Opts.Timeout, + ApiUrl: url, + }) } diff --git a/cli/cmd/start_pipeline_test.go b/cli/cmd/start_pipeline_test.go index c1f664d..c190f68 100644 --- a/cli/cmd/start_pipeline_test.go +++ b/cli/cmd/start_pipeline_test.go @@ -95,7 +95,8 @@ var _ = Describe("StartPipeline", func() { testStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[1]).Return(nil).NotBefore(prepareStatusCall) testStatusCall := mockClient.EXPECT().GetPipelineState(wsId, stages[1]).Return(reportedStatusSuccess, nil).NotBefore(testStartCall) - runStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[2]).Return(nil).NotBefore(testStatusCall) + syncCall := mockClient.EXPECT().DeployLandscape(wsId, profile).Return(nil).NotBefore(testStatusCall) + runStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[2]).Return(nil).NotBefore(syncCall) mockClient.EXPECT().GetPipelineState(wsId, stages[2]).Return(reportedStatusRunning, nil).NotBefore(runStartCall) }) @@ -125,7 +126,8 @@ var _ = Describe("StartPipeline", func() { testStatusCall := mockClient.EXPECT().GetPipelineState(wsId, stages[1]).Return(reportedStatusRunning, nil).Times(2).NotBefore(testStartCall) testStatusCallSuccess := mockClient.EXPECT().GetPipelineState(wsId, stages[1]).Return(reportedStatusSuccess, nil).NotBefore(testStatusCall) - runStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[2]).Return(nil).NotBefore(testStatusCallSuccess) + syncCall := mockClient.EXPECT().DeployLandscape(wsId, profile).Return(nil).NotBefore(testStatusCallSuccess) + runStartCall := mockClient.EXPECT().StartPipelineStage(wsId, profile, stages[2]).Return(nil).NotBefore(syncCall) mockClient.EXPECT().GetPipelineState(wsId, stages[2]).Return(reportedStatusWaiting, nil).Times(2).NotBefore(runStartCall) mockClient.EXPECT().GetPipelineState(wsId, stages[2]).Return(reportedStatusRunning, nil).NotBefore(runStartCall) @@ -145,7 +147,7 @@ var _ = Describe("StartPipeline", func() { mockClient.EXPECT().GetPipelineState(wsId, stages[1]).Return(reportedStatusRunning, nil).Times(8).NotBefore(testStartCall) err := c.StartPipelineStages(mockClient, wsId, stages) - Expect(err).To(MatchError("failed waiting for stage test to finish: timed out waiting for pipeline stage test to be complete")) + Expect(err).To(MatchError("timed out waiting for pipeline stage test to be complete")) }) }) @@ -155,7 +157,7 @@ var _ = Describe("StartPipeline", func() { mockClient.EXPECT().GetPipelineState(wsId, stages[0]).Return(reportedStatusFailure, nil).NotBefore(prepareStartCall) err := c.StartPipelineStages(mockClient, wsId, stages) - Expect(err).To(MatchError("failed waiting for stage prepare to finish: stage prepare failed: server A, replica 0 reached unexpected state failure")) + Expect(err).To(MatchError("stage prepare failed: server A, replica 0 reached unexpected state failure")) }) }) }) diff --git a/pkg/pipeline/mocks.go b/pkg/pipeline/mocks.go new file mode 100644 index 0000000..dc0aeb6 --- /dev/null +++ b/pkg/pipeline/mocks.go @@ -0,0 +1,308 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package pipeline + +import ( + "context" + "github.com/codesphere-cloud/cs-go/api" + mock "github.com/stretchr/testify/mock" + "io" +) + +// NewMockClient creates a new instance of MockClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockClient(t interface { + mock.TestingT + Cleanup(func()) +}) *MockClient { + mock := &MockClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockClient is an autogenerated mock type for the Client type +type MockClient struct { + mock.Mock +} + +type MockClient_Expecter struct { + mock *mock.Mock +} + +func (_m *MockClient) EXPECT() *MockClient_Expecter { + return &MockClient_Expecter{mock: &_m.Mock} +} + +// DeployLandscape provides a mock function for the type MockClient +func (_mock *MockClient) DeployLandscape(wsId int, profile string) error { + ret := _mock.Called(wsId, profile) + + if len(ret) == 0 { + panic("no return value specified for DeployLandscape") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(int, string) error); ok { + r0 = returnFunc(wsId, profile) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockClient_DeployLandscape_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeployLandscape' +type MockClient_DeployLandscape_Call struct { + *mock.Call +} + +// DeployLandscape is a helper method to define mock.On call +// - wsId int +// - profile string +func (_e *MockClient_Expecter) DeployLandscape(wsId interface{}, profile interface{}) *MockClient_DeployLandscape_Call { + return &MockClient_DeployLandscape_Call{Call: _e.mock.On("DeployLandscape", wsId, profile)} +} + +func (_c *MockClient_DeployLandscape_Call) Run(run func(wsId int, profile string)) *MockClient_DeployLandscape_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_DeployLandscape_Call) Return(err error) *MockClient_DeployLandscape_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockClient_DeployLandscape_Call) RunAndReturn(run func(wsId int, profile string) error) *MockClient_DeployLandscape_Call { + _c.Call.Return(run) + return _c +} + +// GetPipelineState provides a mock function for the type MockClient +func (_mock *MockClient) GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error) { + ret := _mock.Called(wsId, stage) + + if len(ret) == 0 { + panic("no return value specified for GetPipelineState") + } + + var r0 []api.PipelineStatus + var r1 error + if returnFunc, ok := ret.Get(0).(func(int, string) ([]api.PipelineStatus, error)); ok { + return returnFunc(wsId, stage) + } + if returnFunc, ok := ret.Get(0).(func(int, string) []api.PipelineStatus); ok { + r0 = returnFunc(wsId, stage) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]api.PipelineStatus) + } + } + if returnFunc, ok := ret.Get(1).(func(int, string) error); ok { + r1 = returnFunc(wsId, stage) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_GetPipelineState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPipelineState' +type MockClient_GetPipelineState_Call struct { + *mock.Call +} + +// GetPipelineState is a helper method to define mock.On call +// - wsId int +// - stage string +func (_e *MockClient_Expecter) GetPipelineState(wsId interface{}, stage interface{}) *MockClient_GetPipelineState_Call { + return &MockClient_GetPipelineState_Call{Call: _e.mock.On("GetPipelineState", wsId, stage)} +} + +func (_c *MockClient_GetPipelineState_Call) Run(run func(wsId int, stage string)) *MockClient_GetPipelineState_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_GetPipelineState_Call) Return(vs []api.PipelineStatus, err error) *MockClient_GetPipelineState_Call { + _c.Call.Return(vs, err) + return _c +} + +func (_c *MockClient_GetPipelineState_Call) RunAndReturn(run func(wsId int, stage string) ([]api.PipelineStatus, error)) *MockClient_GetPipelineState_Call { + _c.Call.Return(run) + return _c +} + +// StartPipelineStage provides a mock function for the type MockClient +func (_mock *MockClient) StartPipelineStage(wsId int, profile string, stage string) error { + ret := _mock.Called(wsId, profile, stage) + + if len(ret) == 0 { + panic("no return value specified for StartPipelineStage") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(int, string, string) error); ok { + r0 = returnFunc(wsId, profile, stage) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockClient_StartPipelineStage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StartPipelineStage' +type MockClient_StartPipelineStage_Call struct { + *mock.Call +} + +// StartPipelineStage is a helper method to define mock.On call +// - wsId int +// - profile string +// - stage string +func (_e *MockClient_Expecter) StartPipelineStage(wsId interface{}, profile interface{}, stage interface{}) *MockClient_StartPipelineStage_Call { + return &MockClient_StartPipelineStage_Call{Call: _e.mock.On("StartPipelineStage", wsId, profile, stage)} +} + +func (_c *MockClient_StartPipelineStage_Call) Run(run func(wsId int, profile string, stage string)) *MockClient_StartPipelineStage_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + var arg2 string + if args[2] != nil { + arg2 = args[2].(string) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockClient_StartPipelineStage_Call) Return(err error) *MockClient_StartPipelineStage_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockClient_StartPipelineStage_Call) RunAndReturn(run func(wsId int, profile string, stage string) error) *MockClient_StartPipelineStage_Call { + _c.Call.Return(run) + return _c +} + +// StreamLogs provides a mock function for the type MockClient +func (_mock *MockClient) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { + ret := _mock.Called(ctx, apiUrl, wsId, stage, step, w) + + if len(ret) == 0 { + panic("no return value specified for StreamLogs") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string, int, string, int, io.Writer) error); ok { + r0 = returnFunc(ctx, apiUrl, wsId, stage, step, w) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockClient_StreamLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamLogs' +type MockClient_StreamLogs_Call struct { + *mock.Call +} + +// StreamLogs is a helper method to define mock.On call +// - ctx context.Context +// - apiUrl string +// - wsId int +// - stage string +// - step int +// - w io.Writer +func (_e *MockClient_Expecter) StreamLogs(ctx interface{}, apiUrl interface{}, wsId interface{}, stage interface{}, step interface{}, w interface{}) *MockClient_StreamLogs_Call { + return &MockClient_StreamLogs_Call{Call: _e.mock.On("StreamLogs", ctx, apiUrl, wsId, stage, step, w)} +} + +func (_c *MockClient_StreamLogs_Call) Run(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer)) *MockClient_StreamLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + var arg2 int + if args[2] != nil { + arg2 = args[2].(int) + } + var arg3 string + if args[3] != nil { + arg3 = args[3].(string) + } + var arg4 int + if args[4] != nil { + arg4 = args[4].(int) + } + var arg5 io.Writer + if args[5] != nil { + arg5 = args[5].(io.Writer) + } + run( + arg0, + arg1, + arg2, + arg3, + arg4, + arg5, + ) + }) + return _c +} + +func (_c *MockClient_StreamLogs_Call) Return(err error) *MockClient_StreamLogs_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockClient_StreamLogs_Call) RunAndReturn(run func(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error) *MockClient_StreamLogs_Call { + _c.Call.Return(run) + return _c +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go new file mode 100644 index 0000000..249ed2b --- /dev/null +++ b/pkg/pipeline/pipeline.go @@ -0,0 +1,233 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package pipeline + +//go:generate go tool mockery + +import ( + "context" + "fmt" + "io" + "log" + "os" + "slices" + "sync" + "time" + + "github.com/codesphere-cloud/cs-go/api" +) + +const IdeServer string = "codesphere-ide" + +// Client defines the API operations needed for pipeline execution. +type Client interface { + StartPipelineStage(wsId int, profile string, stage string) error + GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error) + DeployLandscape(wsId int, profile string) error + StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error +} + +// Config holds parameters for pipeline execution. +type Config struct { + Profile string + Timeout time.Duration + ApiUrl string +} + +// Runner orchestrates pipeline stage execution. +type Runner struct { + Client Client + Time api.Time +} + +// NewRunner creates a new pipeline runner with the given API client. +func NewRunner(client Client, clock api.Time) *Runner { + if clock == nil { + clock = &api.RealTime{} + } + return &Runner{Client: client, Time: clock} +} + +// RunStages runs pipeline stages sequentially: prepare and test are awaited, +// the run stage is preceded by a landscape sync and then fire-and-forget. +func (r *Runner) RunStages(wsId int, stages []string, cfg Config) error { + for _, stage := range stages { + if !IsValidStage(stage) { + return fmt.Errorf("invalid pipeline stage: %s", stage) + } + } + + for _, stage := range stages { + // Sync the landscape before the run stage + if stage == "run" { + fmt.Println(" 🔄 Syncing landscape...") + if err := r.Client.DeployLandscape(wsId, cfg.Profile); err != nil { + return fmt.Errorf("syncing landscape: %w", err) + } + fmt.Println(" ✅ Landscape synced.") + } + + if err := r.runStage(wsId, stage, cfg); err != nil { + return err + } + } + return nil +} + +func (r *Runner) runStage(wsId int, stage string, cfg Config) error { + log.Printf("starting %s stage on workspace %d...", stage, wsId) + + if err := r.Client.StartPipelineStage(wsId, cfg.Profile, stage); err != nil { + log.Println() + return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err) + } + + // Step-aware log streaming for non-run stages. + // Each step gets its own context; when a new step is discovered the + // previous step's stream is cancelled and drained before moving on. + streamEnabled := stage != "run" && cfg.ApiUrl != "" + streamingStep := -1 + var stepCancel context.CancelFunc + var stepWg sync.WaitGroup + + // drainStream waits for the current stream to deliver logs, then cancels. + drainStream := func() { + if stepCancel == nil { + return + } + done := make(chan struct{}) + go func() { + stepWg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(3 * time.Second): + stepCancel() + stepWg.Wait() + } + } + + startStreamForStep := func(step int, totalSteps int) { + if !streamEnabled || step <= streamingStep { + return + } + + // Drain previous step before starting next + drainStream() + + streamingStep = step + fmt.Printf("\n 📋 Step %d/%d\n", step+1, totalSteps) + + ctx, cancel := context.WithCancel(context.Background()) + stepCancel = cancel + stepWg.Add(1) + go func() { + defer stepWg.Done() + if err := r.Client.StreamLogs(ctx, cfg.ApiUrl, wsId, stage, step, os.Stdout); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "⚠ log stream error (step %d): %v\n", step, err) + } + }() + } + + err := r.waitForStageWithStepCallback(wsId, stage, cfg, startStreamForStep) + + // Drain final step's logs + drainStream() + + return err +} + +func (r *Runner) waitForStageWithStepCallback(wsId int, stage string, cfg Config, onStep func(step int, total int)) error { + delay := 5 * time.Second + timeout := cfg.Timeout + if timeout == 0 { + timeout = 30 * time.Minute + } + + maxWaitTime := r.Time.Now().Add(timeout) + for { + status, err := r.Client.GetPipelineState(wsId, stage) + if err != nil { + log.Printf("\nError getting pipeline status: %s, trying again...", err.Error()) + r.Time.Sleep(delay) + continue + } + + // Discover active step from IDE server's Steps array + if onStep != nil { + for _, s := range status { + if s.Server == IdeServer { + total := len(s.Steps) + for i, step := range s.Steps { + if step.State == "running" || step.State == "success" { + onStep(i, total) + } + } + break + } + } + } + + if AllFinished(status) { + log.Println("(finished)") + break + } + + if AllRunning(status) && stage == "run" { + log.Println("(running)") + break + } + + if err = ShouldAbort(status); err != nil { + log.Println("(failed)") + return fmt.Errorf("stage %s failed: %w", stage, err) + } + + log.Print(".") + if r.Time.Now().After(maxWaitTime) { + log.Println() + return fmt.Errorf("timed out waiting for pipeline stage %s to be complete", stage) + } + r.Time.Sleep(delay) + } + return nil +} + +// IsValidStage returns true if the given stage name is valid. +func IsValidStage(stage string) bool { + return slices.Contains([]string{"prepare", "test", "run"}, stage) +} + +// AllFinished returns true when all IDE server replicas have succeeded. +// Prepare and test stages only run in the IDE server; customer servers are ignored. +func AllFinished(status []api.PipelineStatus) bool { + for _, s := range status { + if s.Server == IdeServer && s.State != "success" { + return false + } + } + return true +} + +// AllRunning returns true when all customer server replicas are running. +// The IDE server is ignored since the run stage only applies to customer servers. +func AllRunning(status []api.PipelineStatus) bool { + for _, s := range status { + if s.Server != IdeServer && s.State != "running" { + return false + } + } + return true +} + +// ShouldAbort returns an error if any replica has reached a terminal failure state. +func ShouldAbort(status []api.PipelineStatus) error { + for _, s := range status { + if slices.Contains([]string{"failure", "aborted"}, s.State) { + return fmt.Errorf("server %s, replica %s reached unexpected state %s", s.Server, s.Replica, s.State) + } + } + return nil +} diff --git a/pkg/pipeline/pipeline_suite_test.go b/pkg/pipeline/pipeline_suite_test.go new file mode 100644 index 0000000..e353b48 --- /dev/null +++ b/pkg/pipeline/pipeline_suite_test.go @@ -0,0 +1,16 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package pipeline_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestPipeline(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Pipeline Suite") +} diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go new file mode 100644 index 0000000..fc6e458 --- /dev/null +++ b/pkg/pipeline/pipeline_test.go @@ -0,0 +1,177 @@ +// Copyright (c) Codesphere Inc. +// SPDX-License-Identifier: Apache-2.0 + +package pipeline_test + +import ( + "context" + "fmt" + "io" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/mock" + + "github.com/codesphere-cloud/cs-go/api" + openapi "github.com/codesphere-cloud/cs-go/api/openapi_client" + "github.com/codesphere-cloud/cs-go/pkg/pipeline" +) + +func statusWithSteps(server string, state string, stepStates ...string) api.PipelineStatus { + steps := make([]openapi.WorkspacesPipelineStatus200ResponseInnerStepsInner, len(stepStates)) + for i, s := range stepStates { + steps[i] = openapi.WorkspacesPipelineStatus200ResponseInnerStepsInner{State: s} + } + return api.PipelineStatus{ + State: state, + Replica: "0", + Server: server, + Steps: steps, + } +} + +var _ = Describe("Runner", func() { + var ( + mockClient *pipeline.MockClient + mockTime *api.MockTime + runner *pipeline.Runner + wsId int + cfg pipeline.Config + ) + + BeforeEach(func() { + wsId = 42 + cfg = pipeline.Config{ + Profile: "", + Timeout: 30 * time.Second, + ApiUrl: "https://codesphere.com/api", + } + }) + + JustBeforeEach(func() { + mockClient = pipeline.NewMockClient(GinkgoT()) + mockTime = api.NewMockTime(GinkgoT()) + runner = pipeline.NewRunner(mockClient, mockTime) + + currentTime := time.Unix(1746190963, 0) + mockTime.EXPECT().Now().RunAndReturn(func() time.Time { + return currentTime + }).Maybe() + mockTime.EXPECT().Sleep(mock.Anything).Run(func(t time.Duration) { + currentTime = currentTime.Add(t) + }).Maybe() + }) + + Describe("log streaming during prepare stage", func() { + Context("with a single step", func() { + It("calls StreamLogs with step 0", func() { + mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "prepare").Return(nil) + + pollCount := 0 + mockClient.EXPECT().GetPipelineState(wsId, "prepare").RunAndReturn( + func(_ int, _ string) ([]api.PipelineStatus, error) { + pollCount++ + if pollCount == 1 { + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "running", "running"), + }, nil + } + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "success", "success"), + }, nil + }, + ) + + mockClient.EXPECT().StreamLogs( + mock.Anything, cfg.ApiUrl, wsId, "prepare", 0, mock.Anything, + ).Return(nil) + + err := runner.RunStages(wsId, []string{"prepare"}, cfg) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("with multiple steps", func() { + It("streams each step sequentially", func() { + mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "prepare").Return(nil) + + pollCount := 0 + mockClient.EXPECT().GetPipelineState(wsId, "prepare").RunAndReturn( + func(_ int, _ string) ([]api.PipelineStatus, error) { + pollCount++ + switch pollCount { + case 1: + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "running", "running", "waiting"), + }, nil + case 2: + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "running", "success", "running"), + }, nil + default: + return []api.PipelineStatus{ + statusWithSteps("codesphere-ide", "success", "success", "success"), + }, nil + } + }, + ) + + // Step 0 stream + step0Called := make(chan struct{}) + mockClient.EXPECT().StreamLogs( + mock.Anything, cfg.ApiUrl, wsId, "prepare", 0, mock.Anything, + ).RunAndReturn(func(_ context.Context, _ string, _ int, _ string, _ int, _ io.Writer) error { + close(step0Called) + return nil + }) + + // Step 1 stream — only called after step 0 + mockClient.EXPECT().StreamLogs( + mock.Anything, cfg.ApiUrl, wsId, "prepare", 1, mock.Anything, + ).RunAndReturn(func(_ context.Context, _ string, _ int, _ string, _ int, _ io.Writer) error { + select { + case <-step0Called: + // good — step 0 was called first + default: + return fmt.Errorf("step 1 started before step 0") + } + return nil + }) + + err := runner.RunStages(wsId, []string{"prepare"}, cfg) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("when ApiUrl is empty", func() { + It("does not call StreamLogs", func() { + cfg.ApiUrl = "" + + startCall := mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "prepare").Return(nil).Call + mockClient.EXPECT().GetPipelineState(wsId, "prepare").Return([]api.PipelineStatus{ + statusWithSteps("codesphere-ide", "success", "success"), + }, nil).NotBefore(startCall) + // StreamLogs should NOT be called — mockery will fail if it is + + err := runner.RunStages(wsId, []string{"prepare"}, cfg) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("for the run stage", func() { + It("does not stream logs", func() { + syncCall := mockClient.EXPECT().DeployLandscape(wsId, cfg.Profile).Return(nil).Call + startCall := mockClient.EXPECT().StartPipelineStage(wsId, cfg.Profile, "run").Return(nil).NotBefore(syncCall) + mockClient.EXPECT().GetPipelineState(wsId, "run").Return([]api.PipelineStatus{ + {State: "running", Replica: "0", Server: "A"}, + {State: "waiting", Replica: "0", Server: "codesphere-ide"}, + }, nil).NotBefore(startCall) + // StreamLogs should NOT be called + + err := runner.RunStages(wsId, []string{"run"}, cfg) + Expect(err).ToNot(HaveOccurred()) + }) + }) + }) +})