Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/sippy/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/openshift/sippy/pkg/dataloader/prowloader/github"
"github.com/openshift/sippy/pkg/dataloader/releaseloader"
"github.com/openshift/sippy/pkg/dataloader/testownershiploader"
"github.com/openshift/sippy/pkg/dataloader/testreportcacheloader"
"github.com/openshift/sippy/pkg/db"
"github.com/openshift/sippy/pkg/flags"
"github.com/openshift/sippy/pkg/github/commenter"
Expand Down Expand Up @@ -318,6 +319,16 @@ func NewLoadCommand() *cobra.Command {
loaders = append(loaders, fgLoader)
}

if l == "test-report-cache" {
if dbErr != nil {
return dbErr
}
if cacheErr != nil {
return errors.Wrap(cacheErr, "couldn't get cache client")
}
loaders = append(loaders, testreportcacheloader.New(dbc, cacheClient, releaseConfigs))
}

}

// Run loaders with the metrics wrapper
Expand Down
19 changes: 17 additions & 2 deletions pkg/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,25 @@ Endpoint: `/api/tests`
| filter | Filter | Filters the results by the specified value. | See filtering |
| sortField| Field name | Sort by this field | |
| sort | asc / desc | Sort type, ascending or descending | "asc" or "desc" |
| limit | Integer | The maximum amount of results to return | N/A |
| limit | Integer | The maximum amount of results to return (legacy, use perPage instead) | N/A |
| perPage | Integer | Number of results per page (enables server-side pagination) | 1-1000 |
| page | Integer | Zero-indexed page number (requires perPage) | 0+ |

When `perPage` is provided, the response is wrapped in a pagination envelope:

```json
{
"rows": [ ... ],
"total_rows": 2847,
"page_size": 25,
"page": 0
}
```

When `perPage` is omitted, the legacy array response format is returned for backward compatibility.

<details>
<summary>Example response</summary>
<summary>Example response (legacy, without pagination)</summary>

```json
[
Expand Down
98 changes: 86 additions & 12 deletions pkg/api/cache.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package api

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"reflect"
"strings"
"time"
Expand All @@ -15,6 +18,17 @@ import (

var defaultCacheDuration = 8 * time.Hour

func formatBytes(b int) string {
switch {
case b >= 1<<20:
return fmt.Sprintf("%.1f MB", float64(b)/float64(1<<20))
case b >= 1<<10:
return fmt.Sprintf("%.1f KB", float64(b)/float64(1<<10))
default:
return fmt.Sprintf("%d B", b)
}
}

// CacheSpec specifies caching parameters for an individual query
type CacheSpec struct {
// function to turn the specified key struct into cacheable bytes
Expand Down Expand Up @@ -94,8 +108,12 @@ func GetDataFromCacheOrGenerate[T any](
"key": string(cacheKey),
"type": reflect.TypeOf(defaultVal).String(),
}).Infof("cache hit")
decompressed, err := cacheDecompress(res)
if err != nil {
return defaultVal, []error{errors.WithMessagef(err, "failed to decompress cached item. cacheKey=%+v", cacheKey)}
}
var cr T
if err := json.Unmarshal(res, &cr); err != nil {
if err := json.Unmarshal(decompressed, &cr); err != nil {
return defaultVal, []error{errors.WithMessagef(err, "failed to unmarshal cached item. cacheKey=%+v", cacheKey)}
}
return cr, nil
Expand All @@ -118,20 +136,73 @@ func GetDataFromCacheOrGenerate[T any](
return generateFn(ctx)
}

func CacheSet[T any](ctx context.Context, c cache.Cache, result T, cacheKey []byte, cacheDuration time.Duration) {
// gzipMagic is prepended to compressed cache entries so reads can detect them.
var gzipMagic = []byte{0x1f, 0x8b}

func CacheSet[T any](ctx context.Context, c cache.Cache, result T, cacheKey []byte, cacheDuration time.Duration, opts ...CacheSetOption) {
o := cacheSetOptions{}
for _, opt := range opts {
opt(&o)
}

cr, err := json.Marshal(result)
if err == nil {
if err := c.Set(ctx, string(cacheKey), cr, cacheDuration); err != nil {
if strings.Contains(err.Error(), "connection refused") {
logrus.WithError(err).Fatalf("redis URL specified but got connection refused, exiting due to cost issues in this configuration")
}
logrus.WithError(err).Warningf("couldn't persist new item to cache")
} else {
logrus.Debugf("cache set for cache key: %s", string(cacheKey))
if err != nil {
logrus.WithError(err).Errorf("Failed to marshall cache item")
return
}

stored := cr
if o.compress {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := gz.Write(cr); err != nil {
logrus.WithError(err).Errorf("Failed to gzip cache item")
return
}
if err := gz.Close(); err != nil {
logrus.WithError(err).Errorf("Failed to close gzip writer")
return
}
stored = buf.Bytes()
logrus.Infof("cache set: key=%s raw=%s compressed=%s", string(cacheKey), formatBytes(len(cr)), formatBytes(len(stored)))
} else {
logrus.WithError(err).Errorf("Failed to marshall cache item: %v", result)
logrus.Infof("cache set: key=%s size=%s", string(cacheKey), formatBytes(len(cr)))
}

if err := c.Set(ctx, string(cacheKey), stored, cacheDuration); err != nil {
if strings.Contains(err.Error(), "connection refused") {
logrus.WithError(err).Fatalf("redis URL specified but got connection refused, exiting due to cost issues in this configuration")
}
logrus.WithError(err).Warningf("couldn't persist new item to cache, size=%s", formatBytes(len(stored)))
}
}

type cacheSetOptions struct {
compress bool
}

// CacheSetOption configures optional CacheSet behavior.
type CacheSetOption func(*cacheSetOptions)

// WithCompression enables gzip compression for the cache entry.
func WithCompression() CacheSetOption {
return func(o *cacheSetOptions) {
o.compress = true
}
}

// cacheDecompress returns decompressed bytes if the input is gzip-compressed,
// otherwise returns the input unchanged.
func cacheDecompress(data []byte) ([]byte, error) {
if len(data) < 2 || data[0] != gzipMagic[0] || data[1] != gzipMagic[1] {
return data, nil
}
gz, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
defer gz.Close()
return io.ReadAll(gz)
}

func CalculateRoundedCacheDuration(cacheOptions cache.RequestOptions) time.Duration {
Expand Down Expand Up @@ -198,7 +269,10 @@ func GetDataFromCacheOrMatview[T any](ctx context.Context,
"type": reflect.TypeOf(defaultVal).String(),
}).Debugf("cache hit")

if err := json.Unmarshal(cached, &cacheVal); err != nil {
decompressed, decompErr := cacheDecompress(cached)
if decompErr != nil {
logrus.WithError(decompErr).Warnf("failed to decompress cached item. cacheKey=%+v", cacheKey)
} else if err := json.Unmarshal(decompressed, &cacheVal); err != nil {
logrus.WithError(err).Warnf("failed to unmarshal cached item. cacheKey=%+v", cacheKey)
// fall through to generate the data instead
} else {
Expand Down
Loading