diff --git a/.gitignore b/.gitignore index 6cd1491..a3c39b8 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,8 @@ bin/ cover.out coverage.out coverage.html +go.work.local +go.work.local.sum *.test *.prof vendor/ diff --git a/.mockery.yml b/.mockery.yml index 0aed8e7..bffc890 100644 --- a/.mockery.yml +++ b/.mockery.yml @@ -12,3 +12,11 @@ packages: structname: MockService interfaces: Service: + github.com/meigma/imgcli/internal/publish: + config: + dir: internal/publish/mocks + filename: uploads_client.go + pkgname: mocks + structname: MockUploadsClient + interfaces: + UploadsClient: diff --git a/go.mod b/go.mod index 65782e8..74b80b8 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,8 @@ require ( github.com/charmbracelet/colorprofile v0.4.2 github.com/gofrs/flock v0.13.0 github.com/lxc/incus-os/incus-osd v0.0.0-20260505023852-d32ba1f13f6f - github.com/meigma/imgcli/schemas v0.0.0-20260504225557-fa97d8c3fe0c + github.com/meigma/imgcli/schemas v0.0.0-20260505154605-5bbbe47a1e06 + github.com/meigma/imgsrv v0.0.0-20260505181350-0de592b46f88 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 github.com/spf13/viper v1.21.0 @@ -53,7 +54,7 @@ require ( github.com/sagikazarmark/locafero v0.12.0 // indirect github.com/spf13/afero v1.15.0 // indirect github.com/spf13/cast v1.10.0 // indirect - github.com/stretchr/objx v0.5.2 // indirect + github.com/stretchr/objx v0.5.3 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect github.com/zitadel/oidc/v3 v3.47.5 // indirect diff --git a/go.sum b/go.sum index db121ef..d6ae3a0 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,10 @@ github.com/lxc/incus/v7 v7.0.0 h1:xLz1Q1Xk+yCNL148MFBOSWWrzJVOS1N6PcS0zd8usSc= github.com/lxc/incus/v7 v7.0.0/go.mod h1:Dxu4id/fVr+OmFPQt9tU3fu4E8LhW89NeFxCtjPLCdo= github.com/mattn/go-runewidth v0.0.23 h1:7ykA0T0jkPpzSvMS5i9uoNn2Xy3R383f9HDx3RybWcw= github.com/mattn/go-runewidth v0.0.23/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= -github.com/meigma/imgcli/schemas v0.0.0-20260504225557-fa97d8c3fe0c h1:Uhz9SD1P/JEqDCuAxDL7AKbevYy2CTUP4i+OgFEwkdc= -github.com/meigma/imgcli/schemas v0.0.0-20260504225557-fa97d8c3fe0c/go.mod h1:d5JPNaAIyFEh8Evcgqi4ng1hW6K+BLQNIpbiz4XfX/M= +github.com/meigma/imgcli/schemas v0.0.0-20260505154605-5bbbe47a1e06 h1:v/8R2UInUH6gsz099SZQwPapXkFRh+Q5p3fSplImoTw= +github.com/meigma/imgcli/schemas v0.0.0-20260505154605-5bbbe47a1e06/go.mod h1:vgdSiTx7yikg0x4QmozD0dh4AYM90ZVALn0k45amZuQ= +github.com/meigma/imgsrv v0.0.0-20260505181350-0de592b46f88 h1:ebNLYGiyABFvk7LS43DkkSonxcolqqPqkAjSMaRqodM= +github.com/meigma/imgsrv v0.0.0-20260505181350-0de592b46f88/go.mod h1:aRf/9hRpxhb53jgUmZdo7XVOqQkEn2FEzVqmGEZtx3I= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= @@ -105,8 +107,8 @@ github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/objx v0.5.3 h1:jmXUvGomnU1o3W/V5h2VEradbpJDwGrzugQQvL0POH4= +github.com/stretchr/objx v0.5.3/go.mod h1:rDQraq+vQZU7Fde9LOZLr8Tax6zZvy4kuNKF+QYS+U0= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= diff --git a/internal/cli/build.go b/internal/cli/build.go index c374beb..c692b9c 100644 --- a/internal/cli/build.go +++ b/internal/cli/build.go @@ -30,24 +30,12 @@ func newBuildCommand(rt *runtime) *cobra.Command { return err } - if rt.usesDefaultIncusOSCache() { - return withLockedCache(cmd.Context(), rt.config, func( - catalog incusosprovider.Catalog, - downloader incusosprovider.Downloader, - ) error { - ports, portsErr := rt.incusOSBuildPorts(catalog, downloader) - if portsErr != nil { - return portsErr - } - return runIncusOSBuild(cmd.Context(), config, ports, rt.opts.stdout()) - }) - } - - ports, err := rt.incusOSBuildPorts(nil, nil) + result, err := rt.runIncusOSBuild(cmd.Context(), config) if err != nil { return err } - return runIncusOSBuild(cmd.Context(), config, ports, rt.opts.stdout()) + + return printBuildArtifacts(rt.opts.stdout(), result) }, } } @@ -101,12 +89,47 @@ func (rt *runtime) incusOSBuildPorts( return ports, nil } +func (rt *runtime) runIncusOSBuild( + ctx context.Context, + config imgschemas.Config, +) (providers.BuildResult, error) { + if rt.usesDefaultIncusOSCache() { + var result providers.BuildResult + err := withLockedCache(ctx, rt.config, func( + catalog incusosprovider.Catalog, + downloader incusosprovider.Downloader, + ) error { + ports, portsErr := rt.incusOSBuildPorts(catalog, downloader) + if portsErr != nil { + return portsErr + } + + buildResult, buildErr := runIncusOSBuild(ctx, config, ports) + if buildErr != nil { + return buildErr + } + result = buildResult + return nil + }) + if err != nil { + return providers.BuildResult{}, err + } + + return result, nil + } + + ports, err := rt.incusOSBuildPorts(nil, nil) + if err != nil { + return providers.BuildResult{}, err + } + return runIncusOSBuild(ctx, config, ports) +} + func runIncusOSBuild( ctx context.Context, config imgschemas.Config, ports incusOSBuildPorts, - output io.Writer, -) error { +) (providers.BuildResult, error) { provider := incusosprovider.New(*config.Incusos, incusosprovider.Options{ Catalog: ports.catalog, Downloader: ports.downloader, @@ -121,9 +144,13 @@ func runIncusOSBuild( OutputDir: buildOutputDir(config.Output), }) if err != nil { - return err + return providers.BuildResult{}, err } + return result, nil +} + +func printBuildArtifacts(output io.Writer, result providers.BuildResult) error { for _, artifact := range result.Artifacts { if _, err := fmt.Fprintln(output, artifact.Path); err != nil { return fmt.Errorf("write build artifact path: %w", err) diff --git a/internal/cli/config.go b/internal/cli/config.go index 78c0039..90f1688 100644 --- a/internal/cli/config.go +++ b/internal/cli/config.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -29,6 +30,18 @@ const ( KeyCacheDir = "cache.dir" // KeyCacheMaxSize is the Viper key for the maximum cache size before LRU pruning. KeyCacheMaxSize = "cache.max-size" + // KeyImgsrvURL is the Viper key for the imgsrv API base URL used by publish. + KeyImgsrvURL = "imgsrv.url" + // KeyImgsrvToken is the Viper key for the optional imgsrv bearer token used by publish. + KeyImgsrvToken = "imgsrv.token" // #nosec G101 -- config key name, not a credential value. + // KeyPublishPartSize is the Viper key for the publish multipart upload part size. + KeyPublishPartSize = "publish.part-size" + // KeyPublishWait is the Viper key for waiting until uploaded blobs become CAS-ready. + KeyPublishWait = "publish.wait" + // KeyPublishTimeout is the Viper key for the publish wait timeout. + KeyPublishTimeout = "publish.timeout" + // KeyPublishPollInterval is the Viper key for the publish wait poll interval. + KeyPublishPollInterval = "publish.poll-interval" ) const ( @@ -38,18 +51,31 @@ const ( flagNoColor = "no-color" flagCacheDir = "cache-dir" flagCacheMaxSize = "cache-max-size" + + flagImgsrvURL = "imgsrv-url" + flagImgsrvToken = "imgsrv-token" // #nosec G101 -- flag name, not a credential value. + flagPublishPartSize = "publish-part-size" + flagPublishWait = "publish-wait" + flagPublishTimeout = "publish-timeout" + flagPublishPollInterval = "publish-poll-interval" ) const ( - defaultConfigDirName = "imgcli" - defaultConfigFileName = "config.yaml" - defaultLogLevel = "info" - defaultLogFormat = "text" - defaultCacheMaxSize = "10GB" + defaultConfigDirName = "imgcli" + defaultConfigFileName = "config.yaml" + defaultLogLevel = "info" + defaultLogFormat = "text" + defaultCacheMaxSize = "10GB" + defaultPublishPartSize = "64MB" + defaultPublishTimeout = "10m" + defaultPublishPollInterval = "2s" cacheSizeKiBShift = 10 cacheSizeMiBShift = 20 cacheSizeGiBShift = 30 + + minPublishPartSizeBytes = int64(5 * (1 << cacheSizeMiBShift)) + maxPublishPartSizeBytes = int64(5 * (1 << cacheSizeGiBShift)) ) // Config is the CLI edge configuration resolved from flags, environment, config file, and defaults. @@ -73,6 +99,15 @@ type Config struct { CacheMaxSizeBytes int64 } +type publishConfig struct { + imgsrvURL string + imgsrvToken string + partSizeBytes int64 + wait bool + timeout time.Duration + pollInterval time.Duration +} + func configureViper(vp *viper.Viper) { vp.SetEnvPrefix(envPrefix) vp.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_")) @@ -83,6 +118,12 @@ func configureViper(vp *viper.Viper) { vp.SetDefault(KeyNoColor, false) vp.SetDefault(KeyCacheDir, "") vp.SetDefault(KeyCacheMaxSize, defaultCacheMaxSize) + vp.SetDefault(KeyImgsrvURL, "") + vp.SetDefault(KeyImgsrvToken, "") + vp.SetDefault(KeyPublishPartSize, defaultPublishPartSize) + vp.SetDefault(KeyPublishWait, true) + vp.SetDefault(KeyPublishTimeout, defaultPublishTimeout) + vp.SetDefault(KeyPublishPollInterval, defaultPublishPollInterval) } func (rt *runtime) registerGlobalFlags(root *cobra.Command) error { @@ -232,6 +273,52 @@ func parseSizeConfig(vp *viper.Viper, key string) (int64, error) { return int64(size), nil } +func loadPublishConfig(vp *viper.Viper) (publishConfig, error) { + partSizeBytes, err := parseSizeConfig(vp, KeyPublishPartSize) + if err != nil { + return publishConfig{}, err + } + timeout, err := parseDurationConfig(vp, KeyPublishTimeout) + if err != nil { + return publishConfig{}, err + } + pollInterval, err := parseDurationConfig(vp, KeyPublishPollInterval) + if err != nil { + return publishConfig{}, err + } + + cfg := publishConfig{ + imgsrvURL: strings.TrimSpace(vp.GetString(KeyImgsrvURL)), + imgsrvToken: strings.TrimSpace(vp.GetString(KeyImgsrvToken)), + partSizeBytes: partSizeBytes, + wait: vp.GetBool(KeyPublishWait), + timeout: timeout, + pollInterval: pollInterval, + } + if err := validatePublishConfig(cfg); err != nil { + return publishConfig{}, err + } + + return cfg, nil +} + +func parseDurationConfig(vp *viper.Viper, key string) (time.Duration, error) { + raw := strings.TrimSpace(vp.GetString(key)) + if raw == "" { + return 0, fmt.Errorf("invalid %s %q: duration is required", key, raw) + } + + duration, err := time.ParseDuration(raw) + if err != nil { + return 0, fmt.Errorf("invalid %s %q: %w", key, raw, err) + } + if duration <= 0 { + return 0, fmt.Errorf("invalid %s %q: duration must be positive", key, raw) + } + + return duration, nil +} + func parseSizeLiteral(raw string) (int64, error) { if raw == "" { return 0, errors.New("size is required") @@ -279,3 +366,40 @@ func validateConfig(cfg Config) error { } return nil } + +func validatePublishConfig(cfg publishConfig) error { + if cfg.imgsrvURL == "" { + return errors.New("publish requires imgsrv.url: set --imgsrv-url, IMGCLI_IMGSRV_URL, or config imgsrv.url") + } + if cfg.partSizeBytes < minPublishPartSizeBytes { + return fmt.Errorf( + "invalid %s %q: must be at least %s", + KeyPublishPartSize, + viperValueForError(cfg.partSizeBytes), + defaultSizeForError(minPublishPartSizeBytes), + ) + } + if cfg.partSizeBytes > maxPublishPartSizeBytes { + return fmt.Errorf( + "invalid %s %q: must be at most %s", + KeyPublishPartSize, + viperValueForError(cfg.partSizeBytes), + defaultSizeForError(maxPublishPartSizeBytes), + ) + } + return nil +} + +func viperValueForError(value int64) string { + return strconv.FormatInt(value, 10) +} + +func defaultSizeForError(value int64) string { + if value%(1< MaxPartSizeBytes { + return nil, fmt.Errorf("configure imgsrv uploader: part size must be at most %d bytes", MaxPartSizeBytes) + } + if options.Wait { + if options.Timeout <= 0 { + return nil, errors.New("configure imgsrv uploader: wait timeout must be positive") + } + if options.PollInterval <= 0 { + return nil, errors.New("configure imgsrv uploader: wait poll interval must be positive") + } + } + + return &Uploader{ + client: client, + options: options, + }, nil +} + +// UploadArtifact uploads one local artifact and optionally waits until imgsrv marks it ready. +func (u *Uploader) UploadArtifact(ctx context.Context, artifact Artifact) (Result, error) { + if artifact.Path == "" { + return Result{}, errors.New("upload artifact: path is required") + } + if artifact.SHA256 == "" { + return Result{}, fmt.Errorf("upload artifact %q: sha256 digest is required", artifact.Path) + } + if artifact.Size <= 0 { + return Result{}, fmt.Errorf("upload artifact %q: size must be positive", artifact.Path) + } + if parts := partCount(artifact.Size, u.options.PartSizeBytes); parts > maxPartNumber { + return Result{}, fmt.Errorf( + "upload artifact %q: %d parts exceeds imgsrv maximum %d; increase --publish-part-size", + artifact.Path, + parts, + maxPartNumber, + ) + } + + file, err := os.Open(artifact.Path) + if err != nil { + return Result{}, fmt.Errorf("open artifact %q: %w", artifact.Path, err) + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return Result{}, fmt.Errorf("stat artifact %q: %w", artifact.Path, err) + } + if stat.Size() != artifact.Size { + return Result{}, fmt.Errorf( + "upload artifact %q: expected size %d, found %d", + artifact.Path, + artifact.Size, + stat.Size(), + ) + } + + digest := imgsrv.Digest("sha256:" + artifact.SHA256) + request := imgsrv.BeginUploadRequest{ + ExpectedDigest: digest, + ExpectedSizeBytes: artifact.Size, + MediaTypeHint: optionalString(artifact.MediaType), + FilenameHint: optionalString(filepath.Base(artifact.Path)), + } + session, err := u.client.BeginUpload(ctx, request) + if err != nil { + return Result{}, fmt.Errorf("begin imgsrv upload for %s: %w", digest, err) + } + uploadID := session.ID.String() + + parts, err := u.putParts(ctx, file, uploadID, artifact.Size) + if err != nil { + return Result{}, u.abortIncomplete(ctx, uploadID, err) + } + + session, err = u.client.CompleteUpload(ctx, uploadID, imgsrv.CompleteUploadRequest{Parts: parts}) + if err != nil { + return Result{}, u.abortIncomplete(ctx, uploadID, fmt.Errorf("complete imgsrv upload %s: %w", uploadID, err)) + } + + if u.options.Wait { + session, err = u.waitReady(ctx, session) + if err != nil { + return Result{}, err + } + } + + return Result{ + Digest: digest, + UploadID: session.ID, + State: session.State, + }, nil +} + +func (u *Uploader) abortIncomplete(ctx context.Context, uploadID string, cause error) error { + if uploadID == "" { + return cause + } + + abortCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), abortTimeout) + defer cancel() + + if _, abortErr := u.client.AbortUpload(abortCtx, uploadID); abortErr != nil { + return errors.Join(cause, fmt.Errorf("abort imgsrv upload %s: %w", uploadID, abortErr)) + } + + return cause +} + +func (u *Uploader) putParts( + ctx context.Context, + file *os.File, + uploadID string, + totalSize int64, +) ([]imgsrv.CompleteUploadPart, error) { + var parts []imgsrv.CompleteUploadPart + for offset, partNumber := int64(0), 1; offset < totalSize; partNumber++ { + size := min(u.options.PartSizeBytes, totalSize-offset) + reader := io.NewSectionReader(file, offset, size) + + part, err := u.client.PutUploadPart(ctx, uploadID, partNumber, reader, size) + if err != nil { + return nil, fmt.Errorf("upload imgsrv part %d for %s: %w", partNumber, uploadID, err) + } + + parts = append(parts, imgsrv.CompleteUploadPart{ + Number: part.PartNumber, + ETag: part.ETag, + SizeBytes: part.SizeBytes, + }) + offset += size + } + + return parts, nil +} + +func (u *Uploader) waitReady(ctx context.Context, session imgsrv.UploadSession) (imgsrv.UploadSession, error) { + if err := terminalStateError(session); err != nil { + return imgsrv.UploadSession{}, err + } + if session.State == imgsrv.UploadStateReady { + return session, nil + } + + waitCtx, cancel := context.WithTimeout(ctx, u.options.Timeout) + defer cancel() + + ticker := time.NewTicker(u.options.PollInterval) + defer ticker.Stop() + + last := session + for { + select { + case <-waitCtx.Done(): + if errors.Is(waitCtx.Err(), context.DeadlineExceeded) { + return imgsrv.UploadSession{}, fmt.Errorf( + "upload completed but did not become ready before timeout; last state was %q", + last.State, + ) + } + return imgsrv.UploadSession{}, fmt.Errorf("wait for imgsrv upload %s: %w", session.ID, waitCtx.Err()) + case <-ticker.C: + current, err := u.client.GetUpload(waitCtx, session.ID.String()) + if err != nil { + return imgsrv.UploadSession{}, fmt.Errorf("get imgsrv upload %s status: %w", session.ID, err) + } + if err := terminalStateError(current); err != nil { + return imgsrv.UploadSession{}, err + } + if current.State == imgsrv.UploadStateReady { + return current, nil + } + last = current + } + } +} + +func terminalStateError(session imgsrv.UploadSession) error { + switch session.State { + case imgsrv.UploadStateCreated, + imgsrv.UploadStateUploading, + imgsrv.UploadStateCompleted, + imgsrv.UploadStateIngesting, + imgsrv.UploadStateReady: + return nil + case imgsrv.UploadStateFailed: + return fmt.Errorf("imgsrv upload %s failed", session.ID) + case imgsrv.UploadStateAborted: + return fmt.Errorf("imgsrv upload %s was aborted", session.ID) + default: + return nil + } +} + +func optionalString(value string) *string { + if value == "" { + return nil + } + return &value +} + +func partCount(size int64, partSize int64) int64 { + return (size + partSize - 1) / partSize +} diff --git a/internal/publish/publish_test.go b/internal/publish/publish_test.go new file mode 100644 index 0000000..b846b36 --- /dev/null +++ b/internal/publish/publish_test.go @@ -0,0 +1,312 @@ +package publish_test + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + imgsrv "github.com/meigma/imgsrv/client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/meigma/imgcli/internal/publish" + "github.com/meigma/imgcli/internal/publish/mocks" +) + +func TestUploaderUploadsMultipartArtifact(t *testing.T) { + uploads := mocks.NewMockUploadsClient(t) + body := append(bytes.Repeat([]byte("a"), int(publish.MinPartSizeBytes)), []byte("end")...) + path := writePublishTestArtifact(t, "artifact.raw.gz", body) + artifact := publish.Artifact{ + Path: path, + Size: int64(len(body)), + SHA256: "abc123", + MediaType: "application/gzip", + } + mediaTypeHint := "application/gzip" + filenameHint := "artifact.raw.gz" + + beginRequest := imgsrv.BeginUploadRequest{ + ExpectedDigest: "sha256:abc123", + ExpectedSizeBytes: int64(len(body)), + MediaTypeHint: &mediaTypeHint, + FilenameHint: &filenameHint, + } + uploads.EXPECT(). + BeginUpload(mock.Anything, beginRequest). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateCreated}, nil). + Once() + uploads.EXPECT(). + PutUploadPart(mock.Anything, "upload-1", 1, mock.Anything, publish.MinPartSizeBytes). + RunAndReturn(func(_ context.Context, _ string, _ int, reader io.Reader, _ int64) (imgsrv.UploadPart, error) { + assertReaderBytes(t, reader, body[:int(publish.MinPartSizeBytes)]) + return imgsrv.UploadPart{PartNumber: 1, ETag: "etag-1", SizeBytes: publish.MinPartSizeBytes}, nil + }). + Once() + uploads.EXPECT(). + PutUploadPart(mock.Anything, "upload-1", 2, mock.Anything, int64(3)). + RunAndReturn(func(_ context.Context, _ string, _ int, reader io.Reader, _ int64) (imgsrv.UploadPart, error) { + assertReaderBytes(t, reader, []byte("end")) + return imgsrv.UploadPart{PartNumber: 2, ETag: "etag-2", SizeBytes: 3}, nil + }). + Once() + uploads.EXPECT(). + CompleteUpload(mock.Anything, "upload-1", imgsrv.CompleteUploadRequest{ + Parts: []imgsrv.CompleteUploadPart{ + {Number: 1, ETag: "etag-1", SizeBytes: publish.MinPartSizeBytes}, + {Number: 2, ETag: "etag-2", SizeBytes: 3}, + }, + }). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateCompleted}, nil). + Once() + + uploader := newPublishTestUploader(t, uploads, publish.Options{ + PartSizeBytes: publish.MinPartSizeBytes, + Wait: false, + }) + result, err := uploader.UploadArtifact(context.Background(), artifact) + + require.NoError(t, err) + assert.Equal(t, imgsrv.Digest("sha256:abc123"), result.Digest) + assert.Equal(t, imgsrv.UploadID("upload-1"), result.UploadID) + assert.Equal(t, imgsrv.UploadStateCompleted, result.State) +} + +func TestUploaderWaitsUntilReady(t *testing.T) { + uploads := mocks.NewMockUploadsClient(t) + body := bytes.Repeat([]byte("a"), int(publish.MinPartSizeBytes)) + path := writePublishTestArtifact(t, "artifact.raw", body) + + expectSinglePartUpload(t, uploads, int64(len(body)), imgsrv.UploadStateCompleted) + uploads.EXPECT(). + GetUpload(mock.Anything, "upload-1"). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateIngesting}, nil). + Once() + uploads.EXPECT(). + GetUpload(mock.Anything, "upload-1"). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateReady}, nil). + Once() + + uploader := newPublishTestUploader(t, uploads, publish.Options{ + PartSizeBytes: publish.MinPartSizeBytes, + Wait: true, + Timeout: time.Second, + PollInterval: time.Nanosecond, + }) + result, err := uploader.UploadArtifact(context.Background(), publish.Artifact{ + Path: path, + Size: int64(len(body)), + SHA256: "abc123", + }) + + require.NoError(t, err) + assert.Equal(t, imgsrv.UploadStateReady, result.State) +} + +func TestUploaderFailsWhenUploadFails(t *testing.T) { + uploads := mocks.NewMockUploadsClient(t) + body := bytes.Repeat([]byte("a"), int(publish.MinPartSizeBytes)) + path := writePublishTestArtifact(t, "artifact.raw", body) + expectSinglePartUpload(t, uploads, int64(len(body)), imgsrv.UploadStateFailed) + + uploader := newPublishTestUploader(t, uploads, publish.Options{ + PartSizeBytes: publish.MinPartSizeBytes, + Wait: true, + Timeout: time.Second, + PollInterval: time.Nanosecond, + }) + _, err := uploader.UploadArtifact(context.Background(), publish.Artifact{ + Path: path, + Size: int64(len(body)), + SHA256: "abc123", + }) + + require.Error(t, err) + assert.ErrorContains(t, err, "imgsrv upload upload-1 failed") +} + +func TestUploaderTimesOutWhenUploadNeverBecomesReady(t *testing.T) { + uploads := mocks.NewMockUploadsClient(t) + body := bytes.Repeat([]byte("a"), int(publish.MinPartSizeBytes)) + path := writePublishTestArtifact(t, "artifact.raw", body) + expectSinglePartUpload(t, uploads, int64(len(body)), imgsrv.UploadStateCompleted) + + uploader := newPublishTestUploader(t, uploads, publish.Options{ + PartSizeBytes: publish.MinPartSizeBytes, + Wait: true, + Timeout: time.Nanosecond, + PollInterval: time.Hour, + }) + _, err := uploader.UploadArtifact(context.Background(), publish.Artifact{ + Path: path, + Size: int64(len(body)), + SHA256: "abc123", + }) + + require.Error(t, err) + require.ErrorContains(t, err, "upload completed but did not become ready before timeout") + require.ErrorContains(t, err, `last state was "completed"`) +} + +func TestUploaderSurfacesProblemErrorWithContext(t *testing.T) { + uploads := mocks.NewMockUploadsClient(t) + body := bytes.Repeat([]byte("a"), int(publish.MinPartSizeBytes)) + path := writePublishTestArtifact(t, "artifact.raw", body) + problem := &imgsrv.ProblemError{ + HTTPStatus: http.StatusInternalServerError, + Title: "Upload failed", + Detail: "object store unavailable", + } + uploads.EXPECT(). + BeginUpload(mock.Anything, mock.Anything). + Return(imgsrv.UploadSession{}, problem). + Once() + + uploader := newPublishTestUploader(t, uploads, publish.Options{ + PartSizeBytes: publish.MinPartSizeBytes, + Wait: false, + }) + _, err := uploader.UploadArtifact(context.Background(), publish.Artifact{ + Path: path, + Size: int64(len(body)), + SHA256: "abc123", + }) + + require.Error(t, err) + require.ErrorContains(t, err, "begin imgsrv upload for sha256:abc123") + require.ErrorContains(t, err, "imgsrv: 500 Upload failed: object store unavailable") + var gotProblem *imgsrv.ProblemError + require.ErrorAs(t, err, &gotProblem) +} + +func TestUploaderSurfacesHTTPErrorWithContext(t *testing.T) { + uploads := mocks.NewMockUploadsClient(t) + body := bytes.Repeat([]byte("a"), int(publish.MinPartSizeBytes)) + path := writePublishTestArtifact(t, "artifact.raw", body) + httpErr := &imgsrv.HTTPError{ + StatusCode: http.StatusServiceUnavailable, + Status: "503 Service Unavailable", + Body: []byte("down"), + } + uploads.EXPECT(). + BeginUpload(mock.Anything, mock.Anything). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateCreated}, nil). + Once() + uploads.EXPECT(). + PutUploadPart(mock.Anything, "upload-1", 1, mock.Anything, publish.MinPartSizeBytes). + Return(imgsrv.UploadPart{}, httpErr). + Once() + uploads.EXPECT(). + AbortUpload(mock.Anything, "upload-1"). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateAborted}, nil). + Once() + + uploader := newPublishTestUploader(t, uploads, publish.Options{ + PartSizeBytes: publish.MinPartSizeBytes, + Wait: false, + }) + _, err := uploader.UploadArtifact(context.Background(), publish.Artifact{ + Path: path, + Size: int64(len(body)), + SHA256: "abc123", + }) + + require.Error(t, err) + require.ErrorContains(t, err, "upload imgsrv part 1 for upload-1") + require.ErrorContains(t, err, "imgsrv: 503 Service Unavailable: down") + var gotHTTP *imgsrv.HTTPError + require.ErrorAs(t, err, &gotHTTP) +} + +func TestUploaderAbortsWhenCompleteFails(t *testing.T) { + uploads := mocks.NewMockUploadsClient(t) + body := bytes.Repeat([]byte("a"), int(publish.MinPartSizeBytes)) + path := writePublishTestArtifact(t, "artifact.raw", body) + completeErr := errors.New("object store failed") + uploads.EXPECT(). + BeginUpload(mock.Anything, mock.Anything). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateCreated}, nil). + Once() + uploads.EXPECT(). + PutUploadPart(mock.Anything, "upload-1", 1, mock.Anything, publish.MinPartSizeBytes). + Return(imgsrv.UploadPart{PartNumber: 1, ETag: "etag-1", SizeBytes: publish.MinPartSizeBytes}, nil). + Once() + uploads.EXPECT(). + CompleteUpload(mock.Anything, "upload-1", imgsrv.CompleteUploadRequest{ + Parts: []imgsrv.CompleteUploadPart{{Number: 1, ETag: "etag-1", SizeBytes: publish.MinPartSizeBytes}}, + }). + Return(imgsrv.UploadSession{}, completeErr). + Once() + uploads.EXPECT(). + AbortUpload(mock.Anything, "upload-1"). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateAborted}, nil). + Once() + + uploader := newPublishTestUploader(t, uploads, publish.Options{ + PartSizeBytes: publish.MinPartSizeBytes, + Wait: false, + }) + _, err := uploader.UploadArtifact(context.Background(), publish.Artifact{ + Path: path, + Size: int64(len(body)), + SHA256: "abc123", + }) + + require.ErrorIs(t, err, completeErr) + require.ErrorContains(t, err, "complete imgsrv upload upload-1") +} + +func expectSinglePartUpload( + t *testing.T, + uploads *mocks.MockUploadsClient, + size int64, + completeState imgsrv.UploadState, +) { + t.Helper() + + uploads.EXPECT(). + BeginUpload(mock.Anything, mock.Anything). + Return(imgsrv.UploadSession{ID: "upload-1", State: imgsrv.UploadStateCreated}, nil). + Once() + uploads.EXPECT(). + PutUploadPart(mock.Anything, "upload-1", 1, mock.Anything, size). + Return(imgsrv.UploadPart{PartNumber: 1, ETag: "etag-1", SizeBytes: size}, nil). + Once() + uploads.EXPECT(). + CompleteUpload(mock.Anything, "upload-1", imgsrv.CompleteUploadRequest{ + Parts: []imgsrv.CompleteUploadPart{{Number: 1, ETag: "etag-1", SizeBytes: size}}, + }). + Return(imgsrv.UploadSession{ID: "upload-1", State: completeState}, nil). + Once() +} + +func newPublishTestUploader(t *testing.T, uploads publish.UploadsClient, options publish.Options) *publish.Uploader { + t.Helper() + + uploader, err := publish.NewUploader(uploads, options) + require.NoError(t, err) + return uploader +} + +func writePublishTestArtifact(t *testing.T, name string, body []byte) string { + t.Helper() + + path := filepath.Join(t.TempDir(), name) + require.NoError(t, os.WriteFile(path, body, 0o600)) + return path +} + +func assertReaderBytes(t *testing.T, reader io.Reader, want []byte) { + t.Helper() + + got, err := io.ReadAll(reader) + require.NoError(t, err) + assert.Equal(t, want, got) +}