Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,11 @@ packages:
config:
all: true
interfaces:
github.com/codesphere-cloud/cs-go/pkg/deploy:
config:
all: true
interfaces:
github.com/codesphere-cloud/cs-go/pkg/pipeline:
config:
all: true
interfaces:
101 changes: 101 additions & 0 deletions api/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
package api

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"

"github.com/codesphere-cloud/cs-go/api/errors"
"github.com/codesphere-cloud/cs-go/api/openapi_client"
Expand Down Expand Up @@ -210,3 +217,97 @@ func (c Client) GitPull(workspaceId int, remote string, branch string) error {
_, err := req.Execute()
return errors.FormatAPIError(err)
}

// logEntry represents a single log line from the SSE stream.
type logEntry struct {
Timestamp string `json:"timestamp"`
Kind string `json:"kind"`
Data string `json:"data"`
}

// StreamLogs connects to the Codesphere SSE log endpoint and writes parsed
// log entries to the provided writer until the context is cancelled or the
// stream ends. This is used during pipeline execution to provide real-time
// log output.
func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error {
endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", apiUrl, wsId, stage, step)

req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
if err != nil {
return fmt.Errorf("failed to construct log stream request: %w", err)
}

req.Header.Set("Accept", "text/event-stream")

// Set auth from the client's context token
if token, ok := ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" {
req.Header.Set("Authorization", "Bearer "+token)
} else if token, ok := c.ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
// Context cancellation is expected when the stage finishes
if ctx.Err() != nil {
return nil
}
return fmt.Errorf("failed to connect to log stream: %w", err)
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("log stream responded with status %d", resp.StatusCode)
}

reader := bufio.NewReader(resp.Body)

for {
// Check if context is done
select {
case <-ctx.Done():
return nil
default:
}

// Parse one SSE event
var eventData string
for {
line, err := reader.ReadString('\n')
if err != nil {
if ctx.Err() != nil || err == io.EOF {
return nil
}
return fmt.Errorf("failed to read log stream: %w", err)
}

line = strings.TrimSpace(line)

if strings.HasPrefix(line, "data:") {
data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if eventData != "" {
eventData += "\n" + data
} else {
eventData = data
}
} else if line == "" && eventData != "" {
// Empty line marks end of SSE event
break
}
}

// Parse and print log entries
var entries []logEntry
if err := json.Unmarshal([]byte(eventData), &entries); err != nil {
// Skip unparseable events (e.g. error responses)
log.Printf("⚠ log stream: %s", eventData)
eventData = ""
continue
}

for _, entry := range entries {
_, _ = fmt.Fprintf(w, "%s | %s\n", entry.Timestamp, entry.Data)
}
eventData = ""
}
}
1 change: 1 addition & 0 deletions cli/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Client interface {
GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error)
GitPull(wsId int, remote string, branch string) error
DeployLandscape(wsId int, profile string) error
StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error
}

// CommandExecutor abstracts command execution for testing
Expand Down
81 changes: 81 additions & 0 deletions cli/cmd/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 17 additions & 114 deletions cli/cmd/start_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ package cmd

import (
"fmt"
"log"
"slices"
"time"

"github.com/codesphere-cloud/cs-go/api"
"github.com/codesphere-cloud/cs-go/pkg/io"
"github.com/codesphere-cloud/cs-go/pkg/pipeline"

"github.com/spf13/cobra"
)
Expand All @@ -27,8 +26,6 @@ type StartPipelineOpts struct {
Timeout *time.Duration
}

const IdeServer string = "codesphere-ide"

func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error {

workspaceId, err := c.Opts.GetWorkspaceId()
Expand All @@ -41,11 +38,11 @@ func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error {
return fmt.Errorf("failed to create Codesphere client: %w", err)
}

return c.StartPipelineStages(client, workspaceId, args)
return c.StartPipelineStages(client, workspaceId, args, c.Opts.GetApiUrl())
}

func AddStartPipelineCmd(start *cobra.Command, opts GlobalOptions) {
pipeline := StartPipelineCmd{
p := StartPipelineCmd{
cmd: &cobra.Command{
Use: "pipeline",
Short: "Start pipeline stages of a workspace",
Expand All @@ -72,116 +69,22 @@ func AddStartPipelineCmd(start *cobra.Command, opts GlobalOptions) {
Time: &api.RealTime{},
}

pipeline.Opts.Timeout = pipeline.cmd.Flags().Duration("timeout", 30*time.Minute, "Time to wait per stage before stopping the command execution (e.g. 10m)")
pipeline.Opts.Profile = pipeline.cmd.Flags().StringP("profile", "p", "", "CI profile to use (e.g. 'prod' for the profile defined in 'ci.prod.yml'), defaults to the ci.yml profile")
start.AddCommand(pipeline.cmd)

pipeline.cmd.RunE = pipeline.RunE
}

func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string) error {
for _, stage := range stages {
if !isValidStage(stage) {
return fmt.Errorf("invalid pipeline stage: %s", stage)
}
}
for _, stage := range stages {
err := c.startStage(client, wsId, stage)
if err != nil {
return err
}
}
return nil
}

func isValidStage(stage string) bool {
return slices.Contains([]string{"prepare", "test", "run"}, stage)
}

func (c *StartPipelineCmd) startStage(client Client, wsId int, stage string) error {
log.Printf("starting %s stage on workspace %d...", stage, wsId)

err := client.StartPipelineStage(wsId, *c.Opts.Profile, stage)
if err != nil {
log.Println()
return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err)
}

err = c.waitForPipelineStage(client, wsId, stage)
if err != nil {
return fmt.Errorf("failed waiting for stage %s to finish: %w", stage, err)
p.Opts.Timeout = p.cmd.Flags().Duration("timeout", 30*time.Minute, "Time to wait per stage before stopping the command execution (e.g. 10m)")
p.Opts.Profile = p.cmd.Flags().StringP("profile", "p", "", "CI profile to use (e.g. 'prod' for the profile defined in 'ci.prod.yml'), defaults to the ci.yml profile")
start.AddCommand(p.cmd)

}
return nil
}

func (c *StartPipelineCmd) waitForPipelineStage(client Client, wsId int, stage string) error {
delay := 5 * time.Second

maxWaitTime := c.Time.Now().Add(*c.Opts.Timeout)
for {
status, err := client.GetPipelineState(wsId, stage)
if err != nil {
log.Printf("\nError getting pipeline status: %s, trying again...", err.Error())
c.Time.Sleep(delay)
continue
}

if c.allFinished(status) {
log.Println("(finished)")
break
}

if allRunning(status) && stage == "run" {
log.Println("(running)")
break
}

err = shouldAbort(status)
if err != nil {
log.Println("(failed)")
return fmt.Errorf("stage %s failed: %w", stage, err)
}

log.Print(".")
if c.Time.Now().After(maxWaitTime) {
log.Println()
return fmt.Errorf("timed out waiting for pipeline stage %s to be complete", stage)
}
c.Time.Sleep(delay)
}
return nil
}

func allRunning(status []api.PipelineStatus) bool {
for _, s := range status {
// Run stage is only running customer servers, ignore IDE server
if s.Server != IdeServer && s.State != "running" {
return false
}
}
return true
}

func (c *StartPipelineCmd) allFinished(status []api.PipelineStatus) bool {
io.Verboseln(*c.Opts.Verbose, "====")
for _, s := range status {
io.Verbosef(*c.Opts.Verbose, "Server: %s, State: %s, Replica: %s\n", s.Server, s.State, s.Replica)
}
for _, s := range status {
// Prepare and Test stage is only running in the IDE server, ignore customer servers
if s.Server == IdeServer && s.State != "success" {
return false
}
}
return true
p.cmd.RunE = p.RunE
}

func shouldAbort(status []api.PipelineStatus) error {
for _, s := range status {
if slices.Contains([]string{"failure", "aborted"}, s.State) {
return fmt.Errorf("server %s, replica %s reached unexpected state %s", s.Server, s.Replica, s.State)
}
func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string, apiUrl ...string) error {
url := ""
if len(apiUrl) > 0 {
url = apiUrl[0]
}
return nil
runner := pipeline.NewRunner(client, c.Time)
return runner.RunStages(wsId, stages, pipeline.Config{
Profile: *c.Opts.Profile,
Timeout: *c.Opts.Timeout,
ApiUrl: url,
})
}
Loading
Loading