diff --git a/.gitignore b/.gitignore index 78d6394..358bdf9 100644 --- a/.gitignore +++ b/.gitignore @@ -157,4 +157,5 @@ Temporary Items # End of https://www.toptal.com/developers/gitignore/api/macos,intellij,go -/myshoes* \ No newline at end of file +/myshoes* +/server diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go index a1df356..2064e13 100644 --- a/cmd/server/cmd.go +++ b/cmd/server/cmd.go @@ -15,6 +15,7 @@ import ( "github.com/whywaita/myshoes/pkg/gh" "github.com/whywaita/myshoes/pkg/logger" "github.com/whywaita/myshoes/pkg/runner" + "github.com/whywaita/myshoes/pkg/scaleset" "github.com/whywaita/myshoes/pkg/starter" "github.com/whywaita/myshoes/pkg/starter/safety/unlimited" "github.com/whywaita/myshoes/pkg/web" @@ -53,6 +54,7 @@ type myShoes struct { ds datastore.Datastore start *starter.Starter run *runner.Manager + ss *scaleset.Manager // nil if scale set mode disabled } // newShoes create myshoes. @@ -69,10 +71,29 @@ func newShoes() (*myShoes, error) { manager := runner.New(ds, config.Config.RunnerVersion) + var scalesetManager *scaleset.Manager + if config.Config.ScaleSetEnabled { + logger.Logf(false, "Scale set mode enabled") + scalesetManager = scaleset.New(ds, scaleset.ManagerConfig{ + AppID: config.Config.GitHub.AppID, + PrivateKeyPEM: config.Config.GitHub.PEMByte, + GitHubURL: config.Config.GitHubURL, + RunnerGroupName: config.Config.ScaleSetRunnerGroup, + MaxRunners: config.Config.ScaleSetMaxRunners, + ScaleSetPrefix: config.Config.ScaleSetNamePrefix, + RunnerVersion: config.Config.RunnerVersion, + RunnerUser: config.Config.RunnerUser, + RunnerBaseDir: config.Config.RunnerBaseDirectory, + }) + } else { + logger.Logf(false, "Scale set mode disabled (webhook mode)") + } + return &myShoes{ ds: ds, start: s, run: manager, + ss: scalesetManager, }, nil } @@ -99,6 +120,7 @@ func (m *myShoes) Run() error { time.Sleep(time.Second) } + // Web server runs in both modes (provides REST API + metrics) eg.Go(func() error { if err := web.Serve(ctx, m.ds); err != nil { logger.Logf(false, "failed to web.Serve: %+v", err) @@ -106,20 +128,35 @@ func (m *myShoes) Run() error { } return nil }) - eg.Go(func() error { - if err := m.start.Loop(ctx); err != nil { - logger.Logf(false, "failed to starter manager: %+v", err) - return fmt.Errorf("failed to starter loop: %w", err) - } - return nil - }) - eg.Go(func() error { - if err := m.run.Loop(ctx); err != nil { - logger.Logf(false, "failed to runner manager: %+v", err) - return fmt.Errorf("failed to runner loop: %w", err) - } - return nil - }) + + if m.ss != nil { + // Scale set mode: use long-polling listener instead of webhook + job queue + logger.Logf(false, "Starting in scale set mode") + eg.Go(func() error { + if err := m.ss.Loop(ctx); err != nil { + logger.Logf(false, "failed to scaleset manager: %+v", err) + return fmt.Errorf("failed to scaleset loop: %w", err) + } + return nil + }) + } else { + // Webhook mode: use traditional starter + runner loops + logger.Logf(false, "Starting in webhook mode") + eg.Go(func() error { + if err := m.start.Loop(ctx); err != nil { + logger.Logf(false, "failed to starter manager: %+v", err) + return fmt.Errorf("failed to starter loop: %w", err) + } + return nil + }) + eg.Go(func() error { + if err := m.run.Loop(ctx); err != nil { + logger.Logf(false, "failed to runner manager: %+v", err) + return fmt.Errorf("failed to runner loop: %w", err) + } + return nil + }) + } if err := eg.Wait(); err != nil { return fmt.Errorf("failed to wait errgroup: %w", err) diff --git a/docs/scaleset-mode.md b/docs/scaleset-mode.md new file mode 100644 index 0000000..7f793c0 --- /dev/null +++ b/docs/scaleset-mode.md @@ -0,0 +1,198 @@ +# Scale Set Mode + +## Overview + +Scale set mode provides long-polling-driven auto-scaling using the GitHub Actions Runner Scale Set API. It switches communication from the traditional webhook mode (GitHub → myshoes, push) to myshoes → GitHub (pull). + +### Differences from Webhook Mode + +| Item | Webhook Mode (existing) | Scale Set Mode (new) | +|------|------------------------|----------------------| +| **Communication** | GitHub → myshoes (push) | myshoes → GitHub (pull) | +| **Trigger** | GitHub webhook | Long-polling API | +| **Runner registration** | Registration token + config.sh | JIT (Just-In-Time) config | +| **Startup speed** | Normal | Fast (via JIT config) | +| **Job queue** | Used | Not used | +| **Scaling** | Managed by starter/runner loop | Managed by scale set manager | + +## GitHub App Permissions + +Required GitHub App permissions for scale set mode: + +| Permission | Repository scope | Organization scope | Reason | +|-----------|-----------------|-------------------|--------| +| `actions` | Read & Write | Read & Write | Runner registration/deletion (same as existing) | +| `administration` | Read | - | Read repository settings (same as existing) | +| `organization_self_hosted_runners` | - | Read & Write | **Newly required**: For organization-level scale set management | + +**Important changes**: +- Repository-level targets: Work with existing permissions (no changes needed) +- **Organization-level targets**: The `organization_self_hosted_runners` permission is **newly required** + +If you are already using organization-level targets in webhook mode, you need to add this permission. + +Reference: [Authenticating ARC to the GitHub API - GitHub Docs](https://docs.github.com/en/actions/tutorials/use-actions-runner-controller/authenticate-to-the-api) + +## Configuration + +### Environment Variables + +```bash +# Enable scale set mode +SCALESET_ENABLED=true # Enable scale set mode (default: false) +SCALESET_RUNNER_GROUP=default # Runner group name (default: "default") +SCALESET_MAX_RUNNERS=10 # Max runners per scale set (default: 10) +SCALESET_NAME_PREFIX=myshoes # Scale set name prefix (default: "myshoes") + +# Existing environment variables (still required) +GITHUB_APP_ID=123456 +GITHUB_PRIVATE_KEY_BASE64=... +GITHUB_URL=https://github.com # Change for GHES +RUNNER_VERSION=v2.311.0 +RUNNER_USER=runner +RUNNER_BASE_DIRECTORY=/tmp +PLUGIN=./shoes-lxd +# ... other existing settings +``` + +### Behavior When Scale Set Mode Is Enabled + +1. **Web server**: Starts (serves REST API + metrics) +2. **starter.Loop**: Does not start (replaced by scale set scaler) +3. **runner.Loop**: Does not start (replaced by HandleJobCompleted) +4. **scaleset.Manager.Loop**: Starts (new) + +## Web Endpoint Changes + +| Endpoint | Webhook Mode | Scale Set Mode | Reason | +|----------|-------------|----------------|--------| +| `/github/events` (POST) | Required | **Not required** | Does not receive webhooks from GitHub. Webhook URL in GitHub App settings is also unnecessary | +| `/target` (CRUD) | Required | **Required** | Scale set manager reads targets from the datastore | +| `/healthz` | Required | **Required** | Health check | +| `/metrics` | Required | **Required** | Prometheus metrics (scale set specific metrics are also added) | +| `/config/*` | Optional | **Optional** | Runtime configuration changes | + +**Important**: When scale set mode is enabled, the `/github/events` endpoint exists but is not used. You do not need to configure a Webhook URL in your GitHub App settings. + +## Flow Comparison + +### Webhook Mode (existing) + +``` +GitHub Actions → webhook → myshoes → job queue + ↓ + starter loop + ↓ + shoes plugin → runner + ↓ + runner manager (periodic cleanup) +``` + +### Scale Set Mode (new) + +``` +myshoes scale set manager → long-poll GitHub Scale Set API + ↓ (JobAssigned event) + generate JIT config + ↓ + shoes plugin → runner + ↓ (JobCompleted event) + HandleJobCompleted → immediate cleanup +``` + +## JIT Runner Characteristics + +- **No registration token needed**: Authentication credentials are included in the JIT config +- **No config.sh needed**: Starts directly with `./run.sh --jitconfig` +- **No RunnerService.js patch needed**: JIT runners are inherently ephemeral +- **Fast startup**: Token generation and config.sh steps are skipped + +### Comparison with Traditional Setup Script + +| Item | Webhook Mode | Scale Set Mode | +|------|-------------|----------------| +| Registration token retrieval | Required | Not required (included in JIT config) | +| `config.sh --unattended` | Executed | Not required | +| `RunnerService.js` patch | Applied | Not required | +| `--ephemeral`/`--once` flag | Required | Not required (JIT is inherently ephemeral) | +| Startup command | `./run.sh --once` | `./run.sh --jitconfig ` | + +## Compatibility with Existing Shoes Providers + +- **No proto changes**: The JIT config script is passed via the `setupScript` argument to `AddInstance` +- **Transparent support**: Providers handle it as a regular setup script +- **No migration needed**: Existing shoes-lxd, shoes-aws, and shoes-openstack work as-is + +## Scale Set Naming Convention + +- **Format**: `{SCALESET_NAME_PREFIX}-{sanitized-scope}` +- **Examples**: + - org `myorg` → scale set name `myshoes-myorg` + - repo `myorg/myrepo` → scale set name `myshoes-myorg-myrepo` +- **Sanitization**: `/` and other invalid characters are replaced with `-` + +## Prometheus Metrics + +Scale set mode specific metrics: + +| Metric Name | Type | Labels | Description | +|------------|------|--------|-------------| +| `myshoes_scaleset_listener_running` | gauge | target_scope | Number of running scale set listeners | +| `myshoes_scaleset_desired_runners` | gauge | target_scope | Number of desired runners | +| `myshoes_scaleset_active_runners` | gauge | target_scope | Number of active runners | +| `myshoes_scaleset_jobs_completed_total` | counter | target_scope | Total number of completed jobs | +| `myshoes_scaleset_provision_errors_total` | counter | target_scope | Total number of provisioning errors | + +## Limitations and Notes + +1. **Mode exclusivity**: Scale set mode and webhook mode are mutually exclusive (global switch) +2. **Installation required**: GitHub App installation is required to create scale sets +3. **1 target = 1 scale set**: One scale set is created per target +4. **Runner group**: Specified at scale set creation (default: "default") +5. **GHES support**: Supported by setting the GHES URL via the `GITHUB_URL` environment variable +6. **Permissions**: Organization-level targets require additional permission (`organization_self_hosted_runners`) + +## Migration Guide + +### Migrating from Webhook Mode to Scale Set Mode + +1. **Add GitHub App permissions** (if using organization-level targets) + - Add `organization_self_hosted_runners: Read & Write` in your GitHub App settings + +2. **Set environment variables** + ```bash + SCALESET_ENABLED=true + ``` + +3. **Remove Webhook URL** (optional) + - You can safely remove the Webhook URL from your GitHub App settings (it is not used in scale set mode) + +4. **Restart myshoes** + - Restart myshoes to apply the environment variable changes + +5. **Verify operation** + - Confirm that `myshoes_scaleset_*` metrics are output at the `/metrics` endpoint + - Confirm that `Starting in scale set mode` appears in the logs + - Trigger a workflow job and confirm that a runner is provisioned + +### Troubleshooting + +**Scale set is not created** +- Verify the runner group name is correct (default: "default") +- Verify that the GitHub App installation is set up correctly +- Check the logs for `failed to get runner group` errors + +**Runners are not provisioned** +- Check the `myshoes_scaleset_provision_errors_total` metric +- Check the logs for `failed to provision runner` errors +- Verify that the shoes plugin is working correctly + +**Errors with organization-level targets** +- Verify that `organization_self_hosted_runners` has been added to the GitHub App permissions +- Verify that the installation ID is being retrieved correctly + +## References + +- [GitHub Actions Runner Scale Set (scaleset) - GitHub](https://github.com/actions/scaleset) +- [Authenticating ARC to the GitHub API - GitHub Docs](https://docs.github.com/en/actions/tutorials/use-actions-runner-controller/authenticate-to-the-api) +- [Actions Runner Controller (ARC) - GitHub](https://github.com/actions/actions-runner-controller) diff --git a/go.mod b/go.mod index a9f0674..7ccd6c0 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/whywaita/myshoes -go 1.25 +go 1.25.3 require ( github.com/bradleyfalzon/ghinstallation/v2 v2.17.0 @@ -30,6 +30,7 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect + github.com/actions/scaleset v0.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -49,7 +50,10 @@ require ( github.com/google/go-github/v75 v75.0.0 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.6.3 // indirect + github.com/hashicorp/go-retryablehttp v0.7.8 // indirect github.com/hashicorp/yamux v0.1.2 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect diff --git a/go.sum b/go.sum index 0ae1540..c26d68e 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/actions/scaleset v0.1.0 h1:Rzov5AqcphrQV+VfcPWUAK+hdVJzzJihr/qof1YjZx8= +github.com/actions/scaleset v0.1.0/go.mod h1:ncR5vzCCTUSyLgvclAtZ5dRBgF6qwA2nbTfTXmOJp84= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradleyfalzon/ghinstallation/v2 v2.17.0 h1:SmbUK/GxpAspRjSQbB6ARvH+ArzlNzTtHydNyXUQ6zg= @@ -73,10 +75,14 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-plugin v1.7.0 h1:YghfQH/0QmPNc/AZMTFE3ac8fipZyZECHdDPshfk+mA= github.com/hashicorp/go-plugin v1.7.0/go.mod h1:BExt6KEaIYx804z8k4gRzRLEvxKVb+kn0NMcihqOqb8= +github.com/hashicorp/go-retryablehttp v0.7.8 h1:ylXZWnqa7Lhqpk0L1P1LzDtGcCR0rPVUrx/c8Unxc48= +github.com/hashicorp/go-retryablehttp v0.7.8/go.mod h1:rjiScheydd+CxvumBsIrFKlx3iS0jrZ7LvzFGFmuKbw= github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8= diff --git a/internal/testutils/testutils.go b/internal/testutils/testutils.go index a9582fb..c6a6e19 100644 --- a/internal/testutils/testutils.go +++ b/internal/testutils/testutils.go @@ -60,7 +60,8 @@ func IntegrationTestRunner(m *testing.M) int { createTablesIfNotExist() //SetupDefaultFixtures() - mux := web.NewMux(testDatastore) + // Tests use webhook mode (scale set mode disabled) + mux := web.NewMux(testDatastore, false) ts := httptest.NewServer(mux) testURL = ts.URL diff --git a/pkg/config/config.go b/pkg/config/config.go index 68b024f..95b2417 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -31,6 +31,11 @@ type Conf struct { DockerHubCredential DockerHubCredential ProvideDockerHubMetrics bool + + ScaleSetEnabled bool + ScaleSetRunnerGroup string + ScaleSetMaxRunners int + ScaleSetNamePrefix string } // DockerHubCredential is type of config value @@ -73,6 +78,10 @@ const ( EnvDockerHubUsername = "DOCKER_HUB_USERNAME" EnvDockerHubPassword = "DOCKER_HUB_PASSWORD" EnvProvideDockerHubMetrics = "PROVIDE_DOCKER_HUB_METRICS" + EnvScaleSetEnabled = "SCALESET_ENABLED" + EnvScaleSetRunnerGroup = "SCALESET_RUNNER_GROUP" + EnvScaleSetMaxRunners = "SCALESET_MAX_RUNNERS" + EnvScaleSetNamePrefix = "SCALESET_NAME_PREFIX" ) // ModeWebhookType is type value for GitHub webhook diff --git a/pkg/config/init.go b/pkg/config/init.go index 20554a3..0b64a80 100644 --- a/pkg/config/init.go +++ b/pkg/config/init.go @@ -154,6 +154,30 @@ func LoadWithDefault() Conf { c.ShoesPluginOutputPath = os.Getenv(EnvShoesPluginOutputPath) } + c.ScaleSetEnabled = false + if os.Getenv(EnvScaleSetEnabled) == "true" { + c.ScaleSetEnabled = true + } + + c.ScaleSetRunnerGroup = "default" + if os.Getenv(EnvScaleSetRunnerGroup) != "" { + c.ScaleSetRunnerGroup = os.Getenv(EnvScaleSetRunnerGroup) + } + + c.ScaleSetMaxRunners = 10 + if os.Getenv(EnvScaleSetMaxRunners) != "" { + maxRunners, err := strconv.Atoi(os.Getenv(EnvScaleSetMaxRunners)) + if err != nil { + log.Panicf("failed to parse %s: %+v", EnvScaleSetMaxRunners, err) + } + c.ScaleSetMaxRunners = maxRunners + } + + c.ScaleSetNamePrefix = "myshoes" + if os.Getenv(EnvScaleSetNamePrefix) != "" { + c.ScaleSetNamePrefix = os.Getenv(EnvScaleSetNamePrefix) + } + Config = c return c } diff --git a/pkg/scaleset/manager.go b/pkg/scaleset/manager.go new file mode 100644 index 0000000..a9b5974 --- /dev/null +++ b/pkg/scaleset/manager.go @@ -0,0 +1,306 @@ +package scaleset + +import ( + "context" + "fmt" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "github.com/actions/scaleset" + "github.com/actions/scaleset/listener" + uuid "github.com/satori/go.uuid" + + "github.com/whywaita/myshoes/pkg/datastore" + "github.com/whywaita/myshoes/pkg/gh" + "github.com/whywaita/myshoes/pkg/logger" +) + +// Manager manages scale set lifecycle for all targets +type Manager struct { + ds datastore.Datastore + cfg ManagerConfig + scalers map[uuid.UUID]*targetScalerWrapper + mu sync.RWMutex +} + +// ManagerConfig contains configuration for scale set manager +type ManagerConfig struct { + AppID int64 + PrivateKeyPEM []byte + GitHubURL string + RunnerGroupName string + MaxRunners int + ScaleSetPrefix string + RunnerVersion string + RunnerUser string + RunnerBaseDir string +} + +type targetScalerWrapper struct { + scaler *targetScaler + cancelFunc context.CancelFunc +} + +// New creates a new scale set manager +func New(ds datastore.Datastore, cfg ManagerConfig) *Manager { + return &Manager{ + ds: ds, + cfg: cfg, + scalers: make(map[uuid.UUID]*targetScalerWrapper), + } +} + +// Loop periodically syncs targets and manages scale set listeners +func (m *Manager) Loop(ctx context.Context) error { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + // Initial sync + if err := m.syncTargets(ctx); err != nil { + logger.Logf(false, "[scaleset] initial sync failed: %+v", err) + } + + for { + select { + case <-ctx.Done(): + logger.Logf(false, "[scaleset] manager loop stopped") + m.stopAllListeners() + return ctx.Err() + case <-ticker.C: + if err := m.syncTargets(ctx); err != nil { + logger.Logf(false, "[scaleset] sync failed: %+v", err) + } + } + } +} + +func (m *Manager) syncTargets(ctx context.Context) error { + targets, err := datastore.ListTargets(ctx, m.ds) + if err != nil { + return fmt.Errorf("failed to list targets: %w", err) + } + + m.mu.Lock() + defer m.mu.Unlock() + + // Track active target IDs + activeTargets := make(map[uuid.UUID]bool) + + for _, target := range targets { + activeTargets[target.UUID] = true + + // Check if listener exists and if target config has changed + if wrapper, exists := m.scalers[target.UUID]; exists { + // Check if critical target config has changed + existingTarget := wrapper.scaler.target + if targetConfigChanged(existingTarget, target) { + // Config changed - restart listener with new config + logger.Logf(false, "[scaleset] target config changed for %s, restarting listener", target.Scope) + wrapper.cancelFunc() + delete(m.scalers, target.UUID) + // Will be restarted below + } else { + // Config unchanged - skip restart + continue + } + } + + // Start new listener + if err := m.startListener(ctx, target); err != nil { + logger.Logf(false, "[scaleset] failed to start listener for target %s: %+v", target.Scope, err) + continue + } + } + + // Stop listeners for deleted targets + for targetID, wrapper := range m.scalers { + if !activeTargets[targetID] { + logger.Logf(false, "[scaleset] stopping listener for deleted target %s", targetID) + wrapper.cancelFunc() + delete(m.scalers, targetID) + } + } + + return nil +} + +func (m *Manager) startListener(ctx context.Context, target datastore.Target) error { + logger.Logf(false, "[scaleset] starting listener for target: %s", target.Scope) + + // Resolve installation ID + installationID, err := gh.IsInstalledGitHubApp(ctx, target.Scope) + if err != nil { + return fmt.Errorf("failed to get installation ID: %w", err) + } + + // Create scaleset client + client, err := scaleset.NewClientWithGitHubApp( + scaleset.ClientWithGitHubAppConfig{ + GitHubConfigURL: fmt.Sprintf("%s/%s", m.cfg.GitHubURL, target.Scope), + GitHubAppAuth: scaleset.GitHubAppAuth{ + ClientID: strconv.FormatInt(m.cfg.AppID, 10), + InstallationID: installationID, + PrivateKey: string(m.cfg.PrivateKeyPEM), + }, + }, + ) + if err != nil { + return fmt.Errorf("failed to create scaleset client: %w", err) + } + + // Get or create scale set + scaleSetName := sanitizeScaleSetName(m.cfg.ScaleSetPrefix, target.Scope) + scaleSetID, err := m.ensureScaleSet(ctx, client, scaleSetName, target) + if err != nil { + return fmt.Errorf("failed to ensure scale set: %w", err) + } + + // Create scaler + scaler := &targetScaler{ + ds: m.ds, + target: target, + client: client, + scaleSetID: scaleSetID, + cfg: m.cfg, + } + + // Create message session + session, err := client.MessageSessionClient(ctx, scaleSetID, "myshoes-manager") + if err != nil { + return fmt.Errorf("failed to create message session: %w", err) + } + + // Start listener in background + listenerCtx, cancel := context.WithCancel(context.Background()) + wrapper := &targetScalerWrapper{ + scaler: scaler, + cancelFunc: cancel, + } + m.scalers[target.UUID] = wrapper + + go func() { + targetID := target.UUID + targetScope := target.Scope + + // Clean up scaler from map when listener stops (for any reason) + defer func() { + m.mu.Lock() + // Only delete if this goroutine's wrapper is still the current one. + // When syncTargets restarts a listener for a config change, it stores a + // new wrapper under the same key before the old goroutine exits. Deleting + // unconditionally would remove the replacement and lose tracking. + if current, exists := m.scalers[targetID]; exists && current == wrapper { + delete(m.scalers, targetID) + } + m.mu.Unlock() + logger.Logf(false, "[scaleset] removed stopped listener for target: %s", targetScope) + }() + + logger.Logf(false, "[scaleset] listener started for target: %s", targetScope) + metricScaleSetListenerRunning.WithLabelValues(targetScope).Set(1) + defer metricScaleSetListenerRunning.WithLabelValues(targetScope).Set(0) + + listenerConfig := listener.Config{ + ScaleSetID: scaleSetID, + MaxRunners: m.cfg.MaxRunners, + } + l, err := listener.New(session, listenerConfig) + if err != nil { + logger.Logf(false, "[scaleset] failed to create listener for target %s: %+v", targetScope, err) + return + } + + if err := l.Run(listenerCtx, scaler); err != nil && listenerCtx.Err() == nil { + // Non-cancellation error - log and exit, defer will clean up + logger.Logf(false, "[scaleset] listener error for target %s: %+v", targetScope, err) + } + + logger.Logf(false, "[scaleset] listener stopped for target: %s", targetScope) + }() + + return nil +} + +func (m *Manager) ensureScaleSet(ctx context.Context, client *scaleset.Client, name string, target datastore.Target) (int, error) { + // Get runner group by name + runnerGroup, err := client.GetRunnerGroupByName(ctx, m.cfg.RunnerGroupName) + if err != nil { + return 0, fmt.Errorf("failed to get runner group %s: %w", m.cfg.RunnerGroupName, err) + } + + // Try to get existing scale set + scaleSet, err := client.GetRunnerScaleSet(ctx, runnerGroup.ID, name) + if err == nil && scaleSet != nil { + logger.Logf(false, "[scaleset] found existing scale set: %s (id=%d)", name, scaleSet.ID) + return scaleSet.ID, nil + } + + // Create new scale set + logger.Logf(false, "[scaleset] creating new scale set: %s", name) + + labels := buildScaleSetLabels(name) + + scaleSet, err = client.CreateRunnerScaleSet(ctx, &scaleset.RunnerScaleSet{ + Name: name, + RunnerGroupID: runnerGroup.ID, + Labels: labels, + RunnerSetting: scaleset.RunnerSetting{ + DisableUpdate: true, + }, + }) + if err != nil { + return 0, fmt.Errorf("failed to create scale set: %w", err) + } + + logger.Logf(false, "[scaleset] created scale set: %s (id=%d)", name, scaleSet.ID) + return scaleSet.ID, nil +} + +// targetConfigChanged checks if critical target configuration has changed +func targetConfigChanged(existing, new datastore.Target) bool { + // Compare fields that affect runner provisioning + if existing.ResourceType != new.ResourceType { + return true + } + if existing.ProviderURL != new.ProviderURL { + return true + } + // Status changes are also important (e.g., suspend/resume) + if existing.Status != new.Status { + return true + } + return false +} + +func (m *Manager) stopAllListeners() { + m.mu.Lock() + defer m.mu.Unlock() + + for targetID, wrapper := range m.scalers { + logger.Logf(false, "[scaleset] stopping listener for target: %s", targetID) + wrapper.cancelFunc() + } + m.scalers = make(map[uuid.UUID]*targetScalerWrapper) +} + +// sanitizeScaleSetName creates a valid scale set name from prefix and scope +// Format: {prefix}-{sanitized-scope} +// Example: myshoes-myorg, myshoes-myorg-myrepo +func sanitizeScaleSetName(prefix, scope string) string { + // Replace / and other invalid characters with - + sanitized := regexp.MustCompile(`[^a-zA-Z0-9-]`).ReplaceAllString(scope, "-") + sanitized = strings.Trim(sanitized, "-") + return fmt.Sprintf("%s-%s", prefix, sanitized) +} + +// buildScaleSetLabels creates labels for a scale set. +// Type is left empty so that the library's applyDefaultLabelTypes() sets it to "System". +func buildScaleSetLabels(scaleSetName string) []scaleset.Label { + return []scaleset.Label{ + {Name: "self-hosted"}, + {Name: scaleSetName}, + } +} diff --git a/pkg/scaleset/manager_test.go b/pkg/scaleset/manager_test.go new file mode 100644 index 0000000..37eecdb --- /dev/null +++ b/pkg/scaleset/manager_test.go @@ -0,0 +1,288 @@ +package scaleset + +import ( + "context" + "testing" + "time" + + uuid "github.com/satori/go.uuid" + + "github.com/whywaita/myshoes/pkg/datastore" +) + +// mockDatastoreForManager is a minimal mock for manager tests +type mockDatastoreForManager struct { + datastore.Datastore + targets []datastore.Target + createRunnerCalled bool + deleteRunnerCalled bool +} + +func (m *mockDatastoreForManager) ListTargets(ctx context.Context) ([]datastore.Target, error) { + return m.targets, nil +} + +func (m *mockDatastoreForManager) CreateRunner(ctx context.Context, runner datastore.Runner) error { + m.createRunnerCalled = true + return nil +} + +func (m *mockDatastoreForManager) DeleteRunner(ctx context.Context, id uuid.UUID, deletedAt time.Time, reason datastore.RunnerStatus) error { + m.deleteRunnerCalled = true + return nil +} + +func TestNew(t *testing.T) { + mock := &mockDatastoreForManager{} + cfg := ManagerConfig{ + AppID: 12345, + PrivateKeyPEM: []byte("test-key"), + GitHubURL: "https://github.com", + RunnerGroupName: "default", + MaxRunners: 10, + ScaleSetPrefix: "myshoes", + RunnerVersion: "v2.311.0", + RunnerUser: "runner", + RunnerBaseDir: "/tmp", + } + + manager := New(mock, cfg) + + if manager == nil { + t.Fatal("New() returned nil") + } + + if manager.ds == nil { + t.Error("datastore not set correctly") + } + + if manager.cfg.AppID != cfg.AppID { + t.Error("config not set correctly") + } + + if manager.scalers == nil { + t.Error("scalers map not initialized") + } + + if len(manager.scalers) != 0 { + t.Error("scalers map should be empty initially") + } +} + +func TestSanitizeScaleSetName(t *testing.T) { + tests := []struct { + name string + prefix string + scope string + want string + }{ + { + name: "organization scope", + prefix: "myshoes", + scope: "myorg", + want: "myshoes-myorg", + }, + { + name: "repository scope", + prefix: "myshoes", + scope: "myorg/myrepo", + want: "myshoes-myorg-myrepo", + }, + { + name: "scope with special characters", + prefix: "prefix", + scope: "org/repo-name_v2", + want: "prefix-org-repo-name-v2", + }, + { + name: "scope with dots", + prefix: "test", + scope: "org.com/repo.name", + want: "test-org-com-repo-name", + }, + { + name: "custom prefix", + prefix: "custom-prefix", + scope: "organization", + want: "custom-prefix-organization", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := sanitizeScaleSetName(tt.prefix, tt.scope) + if got != tt.want { + t.Errorf("sanitizeScaleSetName() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestManager_StopAllListeners(t *testing.T) { + mock := &mockDatastoreForManager{} + cfg := ManagerConfig{ + RunnerGroupName: "default", + MaxRunners: 10, + } + + manager := New(mock, cfg) + + // Add some mock scalers + ctx, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + _, cancel2 := context.WithCancel(context.Background()) + + manager.scalers[uuid.NewV4()] = &targetScalerWrapper{ + cancelFunc: cancel1, + } + manager.scalers[uuid.NewV4()] = &targetScalerWrapper{ + cancelFunc: cancel2, + } + + if len(manager.scalers) != 2 { + t.Fatal("setup failed: expected 2 scalers") + } + + manager.stopAllListeners() + + if len(manager.scalers) != 0 { + t.Error("stopAllListeners() should clear all scalers") + } + + // Verify context was cancelled + select { + case <-ctx.Done(): + // Expected: context should be cancelled + default: + t.Error("context should be cancelled after stopAllListeners()") + } +} + +func TestManager_DeferredCleanupDoesNotDeleteReplacement(t *testing.T) { + mock := &mockDatastoreForManager{} + cfg := ManagerConfig{ + RunnerGroupName: "default", + MaxRunners: 10, + } + manager := New(mock, cfg) + + targetID := uuid.NewV4() + + // Simulate: old goroutine's wrapper + _, oldCancel := context.WithCancel(context.Background()) + oldWrapper := &targetScalerWrapper{cancelFunc: oldCancel} + + // Simulate: new wrapper stored by syncTargets after config change + _, newCancel := context.WithCancel(context.Background()) + newWrapper := &targetScalerWrapper{cancelFunc: newCancel} + manager.scalers[targetID] = newWrapper + + // Simulate: old goroutine's deferred cleanup runs, but should NOT delete newWrapper + manager.mu.Lock() + if current, exists := manager.scalers[targetID]; exists && current == oldWrapper { + delete(manager.scalers, targetID) + } + manager.mu.Unlock() + + // newWrapper should still be in the map + if _, exists := manager.scalers[targetID]; !exists { + t.Error("deferred cleanup should not delete replacement wrapper") + } + if manager.scalers[targetID] != newWrapper { + t.Error("scalers map should still contain the new wrapper") + } +} + +func TestManager_DeferredCleanupDeletesOwnWrapper(t *testing.T) { + mock := &mockDatastoreForManager{} + cfg := ManagerConfig{ + RunnerGroupName: "default", + MaxRunners: 10, + } + manager := New(mock, cfg) + + targetID := uuid.NewV4() + + // Simulate: goroutine's wrapper is still the current one (no replacement) + _, cancel := context.WithCancel(context.Background()) + wrapper := &targetScalerWrapper{cancelFunc: cancel} + manager.scalers[targetID] = wrapper + + // Simulate: deferred cleanup should delete because it IS the same wrapper + manager.mu.Lock() + if current, exists := manager.scalers[targetID]; exists && current == wrapper { + delete(manager.scalers, targetID) + } + manager.mu.Unlock() + + if _, exists := manager.scalers[targetID]; exists { + t.Error("deferred cleanup should delete own wrapper when no replacement exists") + } +} + +func TestBuildScaleSetLabels(t *testing.T) { + const scaleSetName = "myshoes-myorg-myrepo" + + labels := buildScaleSetLabels(scaleSetName) + + if len(labels) != 2 { + t.Fatalf("expected 2 labels, got %d", len(labels)) + } + + // Type must be empty so that the library sets it to "System" + for i, l := range labels { + if l.Type != "" { + t.Errorf("labels[%d].Type = %q, want empty string", i, l.Type) + } + } + + if labels[0].Name != "self-hosted" { + t.Errorf("labels[0].Name = %q, want %q", labels[0].Name, "self-hosted") + } + if labels[1].Name != scaleSetName { + t.Errorf("labels[1].Name = %q, want %q", labels[1].Name, scaleSetName) + } +} + +func TestManagerConfig_Fields(t *testing.T) { + cfg := ManagerConfig{ + AppID: 12345, + PrivateKeyPEM: []byte("test-key"), + GitHubURL: "https://github.com", + RunnerGroupName: "custom-group", + MaxRunners: 25, + ScaleSetPrefix: "custom-prefix", + RunnerVersion: "v2.311.0", + RunnerUser: "ubuntu", + RunnerBaseDir: "/home/ubuntu/runner", + } + + // Verify all fields are accessible + if cfg.AppID != 12345 { + t.Error("AppID not set correctly") + } + if string(cfg.PrivateKeyPEM) != "test-key" { + t.Error("PrivateKeyPEM not set correctly") + } + if cfg.GitHubURL != "https://github.com" { + t.Error("GitHubURL not set correctly") + } + if cfg.RunnerGroupName != "custom-group" { + t.Error("RunnerGroupName not set correctly") + } + if cfg.MaxRunners != 25 { + t.Error("MaxRunners not set correctly") + } + if cfg.ScaleSetPrefix != "custom-prefix" { + t.Error("ScaleSetPrefix not set correctly") + } + if cfg.RunnerVersion != "v2.311.0" { + t.Error("RunnerVersion not set correctly") + } + if cfg.RunnerUser != "ubuntu" { + t.Error("RunnerUser not set correctly") + } + if cfg.RunnerBaseDir != "/home/ubuntu/runner" { + t.Error("RunnerBaseDir not set correctly") + } +} diff --git a/pkg/scaleset/metrics.go b/pkg/scaleset/metrics.go new file mode 100644 index 0000000..36d6e49 --- /dev/null +++ b/pkg/scaleset/metrics.go @@ -0,0 +1,60 @@ +package scaleset + +import "github.com/prometheus/client_golang/prometheus" + +var ( + // metricScaleSetListenerRunning is a gauge for scale set listeners + metricScaleSetListenerRunning = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "myshoes_scaleset_listener_running", + Help: "Number of running scale set listeners per target", + }, + []string{"target_scope"}, + ) + + // metricScaleSetDesiredRunners is a gauge for desired runners + metricScaleSetDesiredRunners = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "myshoes_scaleset_desired_runners", + Help: "Number of desired runners per target", + }, + []string{"target_scope"}, + ) + + // metricScaleSetActiveRunners is a gauge for active runners + metricScaleSetActiveRunners = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "myshoes_scaleset_active_runners", + Help: "Number of active runners per target", + }, + []string{"target_scope"}, + ) + + // metricScaleSetJobsCompletedTotal is a counter for completed jobs + metricScaleSetJobsCompletedTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "myshoes_scaleset_jobs_completed_total", + Help: "Total number of completed jobs per target", + }, + []string{"target_scope"}, + ) + + // metricScaleSetProvisionErrorsTotal is a counter for provision errors + metricScaleSetProvisionErrorsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "myshoes_scaleset_provision_errors_total", + Help: "Total number of provision errors per target", + }, + []string{"target_scope"}, + ) +) + +func init() { + prometheus.MustRegister( + metricScaleSetListenerRunning, + metricScaleSetDesiredRunners, + metricScaleSetActiveRunners, + metricScaleSetJobsCompletedTotal, + metricScaleSetProvisionErrorsTotal, + ) +} diff --git a/pkg/scaleset/scaler.go b/pkg/scaleset/scaler.go new file mode 100644 index 0000000..d2a4239 --- /dev/null +++ b/pkg/scaleset/scaler.go @@ -0,0 +1,245 @@ +package scaleset + +import ( + "context" + "database/sql" + "fmt" + "sync" + "time" + + "github.com/actions/scaleset" + uuid "github.com/satori/go.uuid" + + "github.com/whywaita/myshoes/pkg/datastore" + "github.com/whywaita/myshoes/pkg/logger" + "github.com/whywaita/myshoes/pkg/runner" + "github.com/whywaita/myshoes/pkg/shoes" +) + +type runnerInfo struct { + runnerID uuid.UUID + cloudID string + labels []string + createdAt time.Time +} + +// targetScaler implements listener.Scaler interface +type targetScaler struct { + ds datastore.Datastore + target datastore.Target + client *scaleset.Client + scaleSetID int + cfg ManagerConfig + activeRunners sync.Map // runner name -> runnerInfo +} + +// HandleDesiredRunnerCount is called when scale set controller wants to scale up/down +func (ts *targetScaler) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { + current := ts.getActiveRunnerCount() + logger.Logf(false, "[scaleset] HandleDesiredRunnerCount: target=%s, desired=%d, current=%d", ts.target.Scope, count, current) + + metricScaleSetDesiredRunners.WithLabelValues(ts.target.Scope).Set(float64(count)) + + if count > current { + // Scale up + toProvision := count - current + for i := 0; i < toProvision; i++ { + if err := ts.provisionRunner(ctx); err != nil { + logger.Logf(false, "[scaleset] failed to provision runner: %+v", err) + metricScaleSetProvisionErrorsTotal.WithLabelValues(ts.target.Scope).Inc() + // Continue provisioning other runners even if one fails + } + } + } + // Scale down is handled by HandleJobCompleted (ephemeral runners terminate automatically) + + actualCount := ts.getActiveRunnerCount() + metricScaleSetActiveRunners.WithLabelValues(ts.target.Scope).Set(float64(actualCount)) + + return actualCount, nil +} + +// HandleJobStarted is called when a job starts on a runner +func (ts *targetScaler) HandleJobStarted(ctx context.Context, event *scaleset.JobStarted) error { + logger.Logf(false, "[scaleset] HandleJobStarted: target=%s, runner=%s, job=%s", + ts.target.Scope, event.RunnerName, event.JobID) + + // Update metrics and log only + return nil +} + +// HandleJobCompleted is called when a job completes on a runner +func (ts *targetScaler) HandleJobCompleted(ctx context.Context, event *scaleset.JobCompleted) error { + logger.Logf(false, "[scaleset] HandleJobCompleted: target=%s, runner=%s, job=%s", + ts.target.Scope, event.RunnerName, event.JobID) + + metricScaleSetJobsCompletedTotal.WithLabelValues(ts.target.Scope).Inc() + + // Find runner info by name + runnerInfoRaw, ok := ts.activeRunners.Load(event.RunnerName) + if !ok { + // Runner not in memory (e.g., after process restart or listener re-creation). + // Fall back to datastore lookup to avoid leaking instances and stale rows. + logger.Logf(false, "[scaleset] runner %s not found in active runners, falling back to datastore", event.RunnerName) + return ts.handleJobCompletedFromDatastore(ctx, event) + } + + info := runnerInfoRaw.(runnerInfo) + + // Delete instance via shoes plugin + shoesClient, cleanup, err := shoes.GetClient() + if err != nil { + return fmt.Errorf("failed to get shoes client: %w", err) + } + defer cleanup() + + if err := shoesClient.DeleteInstance(ctx, info.cloudID, info.labels); err != nil { + // Preserve runner state on deletion failure to allow retry + // Without this, transient cloud/plugin errors leave orphaned instances + logger.Logf(false, "[scaleset] failed to delete instance (preserving state for retry): %+v", err) + return fmt.Errorf("failed to delete instance %s: %w", info.cloudID, err) + } + + // Only update datastore and remove from tracking after successful deletion + if err := ts.ds.DeleteRunner(ctx, info.runnerID, time.Now(), datastore.RunnerStatusCompleted); err != nil { + logger.Logf(false, "[scaleset] failed to delete runner from datastore: %+v", err) + // Continue even if datastore update fails - instance is already deleted + } + + // Remove from active runners + ts.activeRunners.Delete(event.RunnerName) + + // Update metrics + actualCount := ts.getActiveRunnerCount() + metricScaleSetActiveRunners.WithLabelValues(ts.target.Scope).Set(float64(actualCount)) + + return nil +} + +// handleJobCompletedFromDatastore cleans up a runner whose metadata is not in +// the in-memory activeRunners map (e.g., after a process restart or listener +// re-creation). It resolves the runner from the datastore and performs instance +// deletion and datastore cleanup. +func (ts *targetScaler) handleJobCompletedFromDatastore(ctx context.Context, event *scaleset.JobCompleted) error { + runnerUUID, err := runner.ToUUID(event.RunnerName) + if err != nil { + return fmt.Errorf("failed to parse runner UUID from name %s: %w", event.RunnerName, err) + } + + r, err := ts.ds.GetRunner(ctx, runnerUUID) + if err != nil { + return fmt.Errorf("failed to get runner %s from datastore: %w", runnerUUID, err) + } + + // Delete instance via shoes plugin + shoesClient, cleanup, err := shoes.GetClient() + if err != nil { + return fmt.Errorf("failed to get shoes client: %w", err) + } + defer cleanup() + + labels := []string{"scaleset", ts.target.Scope} + if err := shoesClient.DeleteInstance(ctx, r.CloudID, labels); err != nil { + logger.Logf(false, "[scaleset] failed to delete instance (preserving state for retry): %+v", err) + return fmt.Errorf("failed to delete instance %s: %w", r.CloudID, err) + } + + if err := ts.ds.DeleteRunner(ctx, r.UUID, time.Now(), datastore.RunnerStatusCompleted); err != nil { + logger.Logf(false, "[scaleset] failed to delete runner from datastore: %+v", err) + } + + return nil +} + +func (ts *targetScaler) provisionRunner(ctx context.Context) error { + runnerID := uuid.NewV4() + runnerName := runner.ToName(runnerID.String()) + + logger.Logf(false, "[scaleset] provisioning runner: %s", runnerName) + + // Generate JIT config + setting := &scaleset.RunnerScaleSetJitRunnerSetting{ + Name: runnerName, + WorkFolder: ts.cfg.RunnerBaseDir, + } + jitConfig, err := ts.client.GenerateJitRunnerConfig(ctx, setting, ts.scaleSetID) + if err != nil { + return fmt.Errorf("failed to generate JIT config: %w", err) + } + + // Generate setup script with JIT config + setupScript, err := GetJITSetupScript( + jitConfig.EncodedJITConfig, + ts.cfg.RunnerVersion, + ts.cfg.RunnerUser, + ts.cfg.RunnerBaseDir, + ) + if err != nil { + return fmt.Errorf("failed to generate JIT setup script: %w", err) + } + + // Create labels based on target scope + var labels []string + labels = append(labels, "scaleset") + labels = append(labels, ts.target.Scope) + + // Provision instance via shoes plugin + shoesClient, cleanup, err := shoes.GetClient() + if err != nil { + return fmt.Errorf("failed to get shoes client: %w", err) + } + defer cleanup() + + cloudID, ipAddress, shoesType, resourceType, err := shoesClient.AddInstance( + ctx, + runnerName, + setupScript, + ts.target.ResourceType, + labels, + ) + if err != nil { + return fmt.Errorf("failed to add instance: %w", err) + } + + // Store runner info in datastore + r := datastore.Runner{ + UUID: runnerID, + ShoesType: shoesType, + IPAddress: ipAddress, + TargetID: ts.target.UUID, + CloudID: cloudID, + Deleted: false, + Status: datastore.RunnerStatusCreated, + ResourceType: resourceType, + RunnerUser: sql.NullString{String: ts.cfg.RunnerUser, Valid: true}, + ProviderURL: ts.target.ProviderURL, + RepositoryURL: fmt.Sprintf("%s/%s", ts.cfg.GitHubURL, ts.target.Scope), + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + if err := ts.ds.CreateRunner(ctx, r); err != nil { + logger.Logf(false, "[scaleset] failed to create runner in datastore: %+v", err) + } + + // Store in active runners + ts.activeRunners.Store(runnerName, runnerInfo{ + runnerID: runnerID, + cloudID: cloudID, + labels: labels, + createdAt: time.Now(), + }) + + logger.Logf(false, "[scaleset] runner provisioned: %s (cloud_id=%s)", runnerName, cloudID) + + return nil +} + +func (ts *targetScaler) getActiveRunnerCount() int { + count := 0 + ts.activeRunners.Range(func(key, value interface{}) bool { + count++ + return true + }) + return count +} diff --git a/pkg/scaleset/scaler_test.go b/pkg/scaleset/scaler_test.go new file mode 100644 index 0000000..5b117a9 --- /dev/null +++ b/pkg/scaleset/scaler_test.go @@ -0,0 +1,324 @@ +package scaleset + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/actions/scaleset" + uuid "github.com/satori/go.uuid" + + "github.com/whywaita/myshoes/pkg/datastore" + "github.com/whywaita/myshoes/pkg/runner" +) + +// mockDatastore is a minimal mock for testing +type mockDatastore struct { + datastore.Datastore + createRunnerCalled bool + deleteRunnerCalled bool + runners map[uuid.UUID]*datastore.Runner +} + +func (m *mockDatastore) CreateRunner(ctx context.Context, runner datastore.Runner) error { + m.createRunnerCalled = true + return nil +} + +func (m *mockDatastore) DeleteRunner(ctx context.Context, id uuid.UUID, deletedAt time.Time, reason datastore.RunnerStatus) error { + m.deleteRunnerCalled = true + return nil +} + +func (m *mockDatastore) GetRunner(ctx context.Context, id uuid.UUID) (*datastore.Runner, error) { + if m.runners == nil { + return nil, fmt.Errorf("runner not found: %s", id) + } + r, ok := m.runners[id] + if !ok { + return nil, fmt.Errorf("runner not found: %s", id) + } + return r, nil +} + +func TestTargetScaler_GetActiveRunnerCount(t *testing.T) { + ts := &targetScaler{} + + // Initially empty + if count := ts.getActiveRunnerCount(); count != 0 { + t.Errorf("expected 0 active runners, got %d", count) + } + + // Add some runners + ts.activeRunners.Store("runner-1", runnerInfo{runnerID: uuid.NewV4()}) + ts.activeRunners.Store("runner-2", runnerInfo{runnerID: uuid.NewV4()}) + + if count := ts.getActiveRunnerCount(); count != 2 { + t.Errorf("expected 2 active runners, got %d", count) + } + + // Remove one + ts.activeRunners.Delete("runner-1") + + if count := ts.getActiveRunnerCount(); count != 1 { + t.Errorf("expected 1 active runner, got %d", count) + } +} + +func TestTargetScaler_HandleDesiredRunnerCount(t *testing.T) { + tests := []struct { + name string + currentCount int + desiredCount int + expectIncrease bool + }{ + { + name: "no change - same count", + currentCount: 5, + desiredCount: 5, + expectIncrease: false, + }, + { + name: "scale down (no-op for ephemeral)", + currentCount: 10, + desiredCount: 5, + expectIncrease: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mock := &mockDatastore{} + ts := &targetScaler{ + ds: mock, + target: datastore.Target{ + UUID: uuid.NewV4(), + Scope: "test-org", + }, + cfg: ManagerConfig{ + RunnerVersion: "v2.311.0", + RunnerUser: "runner", + RunnerBaseDir: "/tmp", + }, + } + + // Simulate current runners + for i := 0; i < tt.currentCount; i++ { + ts.activeRunners.Store("runner-"+string(rune(i)), runnerInfo{ + runnerID: uuid.NewV4(), + }) + } + + ctx := context.Background() + actualCount, err := ts.HandleDesiredRunnerCount(ctx, tt.desiredCount) + + // Without scale up, should not error + if err != nil { + t.Errorf("HandleDesiredRunnerCount() error = %v, want nil", err) + } + + // Count should remain the same when not scaling up + if actualCount != tt.currentCount { + t.Errorf("actual count = %d, want %d", actualCount, tt.currentCount) + } + }) + } +} + +// TestTargetScaler_GetActiveRunnerCount tests the runner count logic +func TestTargetScaler_HandleDesiredRunnerCount_Logic(t *testing.T) { + ts := &targetScaler{ + target: datastore.Target{ + UUID: uuid.NewV4(), + Scope: "test-org", + }, + } + + // Test scale up detection (without actual provisioning) + current := ts.getActiveRunnerCount() + if current != 0 { + t.Errorf("expected 0 current runners, got %d", current) + } + + desired := 5 + toProvision := desired - current + if toProvision != 5 { + t.Errorf("expected to provision 5 runners, got %d", toProvision) + } + + // Test that scale down is no-op + for i := 0; i < 10; i++ { + ts.activeRunners.Store("runner-"+string(rune(i)), runnerInfo{ + runnerID: uuid.NewV4(), + }) + } + + current = ts.getActiveRunnerCount() + desired = 5 + if desired < current { + // This is the scale down case - should be no-op + toProvision = 0 + } else { + toProvision = desired - current + } + + if toProvision != 0 { + t.Errorf("scale down should be no-op, but got toProvision=%d", toProvision) + } +} + +func TestTargetScaler_HandleJobCompleted(t *testing.T) { + mock := &mockDatastore{} + ts := &targetScaler{ + ds: mock, + target: datastore.Target{ + UUID: uuid.NewV4(), + Scope: "test-org", + }, + } + + // Add a runner to active runners + runnerName := "test-runner" + runnerID := uuid.NewV4() + ts.activeRunners.Store(runnerName, runnerInfo{ + runnerID: runnerID, + cloudID: "cloud-123", + labels: []string{"test"}, + }) + + ctx := context.Background() + event := &scaleset.JobCompleted{ + RunnerName: runnerName, + JobMessageBase: scaleset.JobMessageBase{ + JobID: "job-123", + }, + } + + // Note: without real shoes client, deletion will fail and return early + // We're testing the logic structure, not the actual deletion + err := ts.HandleJobCompleted(ctx, event) + if err == nil { + t.Error("HandleJobCompleted() should error without real shoes client") + } + + // Runner won't be removed because shoes client fails early + // This is expected behavior in test environment +} + +func TestTargetScaler_HandleJobCompleted_FallbackToDatastore(t *testing.T) { + runnerUUID := uuid.NewV4() + runnerName := runner.ToName(runnerUUID.String()) + + mock := &mockDatastore{ + runners: map[uuid.UUID]*datastore.Runner{ + runnerUUID: { + UUID: runnerUUID, + CloudID: "cloud-456", + }, + }, + } + ts := &targetScaler{ + ds: mock, + target: datastore.Target{ + UUID: uuid.NewV4(), + Scope: "test-org", + }, + } + + // activeRunners is empty (simulates process restart) + ctx := context.Background() + event := &scaleset.JobCompleted{ + RunnerName: runnerName, + JobMessageBase: scaleset.JobMessageBase{ + JobID: "job-456", + }, + } + + // Should fall back to datastore and attempt cleanup + // shoes client is not configured, so deletion will fail with an error + err := ts.HandleJobCompleted(ctx, event) + if err == nil { + t.Error("HandleJobCompleted() should error without real shoes client, but should reach shoes client call") + } + + // Verify the runner was NOT just silently skipped (the old behavior would return nil) + // The error should be from shoes client, not from datastore lookup + if err != nil && err.Error() == "" { + t.Error("expected non-empty error message") + } +} + +func TestTargetScaler_HandleJobCompleted_FallbackDatastoreNotFound(t *testing.T) { + mock := &mockDatastore{} + ts := &targetScaler{ + ds: mock, + target: datastore.Target{ + UUID: uuid.NewV4(), + Scope: "test-org", + }, + } + + runnerUUID := uuid.NewV4() + runnerName := runner.ToName(runnerUUID.String()) + + // activeRunners is empty, datastore also doesn't have it + ctx := context.Background() + event := &scaleset.JobCompleted{ + RunnerName: runnerName, + JobMessageBase: scaleset.JobMessageBase{ + JobID: "job-789", + }, + } + + err := ts.HandleJobCompleted(ctx, event) + if err == nil { + t.Error("HandleJobCompleted() should error when runner not found in datastore") + } +} + +func TestTargetScaler_HandleJobCompleted_InvalidRunnerName(t *testing.T) { + mock := &mockDatastore{} + ts := &targetScaler{ + ds: mock, + target: datastore.Target{ + UUID: uuid.NewV4(), + Scope: "test-org", + }, + } + + // activeRunners is empty, runner name is not in myshoes format + ctx := context.Background() + event := &scaleset.JobCompleted{ + RunnerName: "invalid-not-a-uuid", + JobMessageBase: scaleset.JobMessageBase{ + JobID: "job-000", + }, + } + + err := ts.HandleJobCompleted(ctx, event) + if err == nil { + t.Error("HandleJobCompleted() should error when runner name cannot be parsed to UUID") + } +} + +func TestTargetScaler_HandleJobStarted(t *testing.T) { + ts := &targetScaler{ + target: datastore.Target{ + Scope: "test-org", + }, + } + + ctx := context.Background() + event := &scaleset.JobStarted{ + RunnerName: "test-runner", + JobMessageBase: scaleset.JobMessageBase{ + JobID: "job-123", + }, + } + + // HandleJobStarted should not return error (it only logs) + if err := ts.HandleJobStarted(ctx, event); err != nil { + t.Errorf("HandleJobStarted() error = %v, want nil", err) + } +} diff --git a/pkg/scaleset/scripts.go b/pkg/scaleset/scripts.go new file mode 100644 index 0000000..b39d421 --- /dev/null +++ b/pkg/scaleset/scripts.go @@ -0,0 +1,180 @@ +package scaleset + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "fmt" + "text/template" +) + +// GetJITSetupScript generates a simplified setup script for JIT (Just-In-Time) runner +// JIT config eliminates the need for registration token, config.sh, and RunnerService.js patch +func GetJITSetupScript(encodedJITConfig, runnerVersion, runnerUser, runnerBaseDir string) (string, error) { + rawScript, err := getJITRawScript(encodedJITConfig, runnerVersion, runnerUser, runnerBaseDir) + if err != nil { + return "", fmt.Errorf("failed to get raw JIT script: %w", err) + } + + var compressedScript bytes.Buffer + gz := gzip.NewWriter(&compressedScript) + if _, err := gz.Write([]byte(rawScript)); err != nil { + return "", fmt.Errorf("failed to compress gzip: %w", err) + } + if err := gz.Flush(); err != nil { + return "", fmt.Errorf("failed to flush gzip: %w", err) + } + if err := gz.Close(); err != nil { + return "", fmt.Errorf("failed to close gzip: %w", err) + } + encoded := base64.StdEncoding.EncodeToString(compressedScript.Bytes()) + + v := templateCompressedScriptValue{ + CompressedScript: encoded, + RunnerBaseDirectory: runnerBaseDir, + } + + t, err := template.New("templateCompressedScript").Parse(templateCompressedScript) + if err != nil { + return "", fmt.Errorf("failed to create template: %w", err) + } + var buff bytes.Buffer + if err := t.Execute(&buff, v); err != nil { + return "", fmt.Errorf("failed to execute compressed script: %w", err) + } + return buff.String(), nil +} + +func getJITRawScript(encodedJITConfig, runnerVersion, runnerUser, runnerBaseDir string) (string, error) { + v := templateJITRunnerValue{ + EncodedJITConfig: encodedJITConfig, + RunnerVersion: runnerVersion, + RunnerUser: runnerUser, + RunnerBaseDirectory: runnerBaseDir, + } + + t, err := template.New("templateJITRunner").Parse(templateJITRunner) + if err != nil { + return "", fmt.Errorf("failed to create template: %w", err) + } + var buff bytes.Buffer + if err := t.Execute(&buff, v); err != nil { + return "", fmt.Errorf("failed to execute JIT script: %w", err) + } + return buff.String(), nil +} + +type templateCompressedScriptValue struct { + CompressedScript string + RunnerBaseDirectory string +} + +const templateCompressedScript = `#!/bin/bash + +set -e + +# main script compressed base64 and gzip +export COMPRESSED_SCRIPT={{.CompressedScript}} +export MAIN_SCRIPT_PATH={{.RunnerBaseDirectory}}/main.sh + +echo ${COMPRESSED_SCRIPT} | base64 -d | gzip -d > ${MAIN_SCRIPT_PATH} + +chmod +x ${MAIN_SCRIPT_PATH} +bash -c ${MAIN_SCRIPT_PATH}` + +type templateJITRunnerValue struct { + EncodedJITConfig string + RunnerVersion string + RunnerUser string + RunnerBaseDirectory string +} + +// templateJITRunner is a simplified script template for JIT runner +// Unlike traditional registration, JIT runner only needs to: +// 1. Download runner binary +// 2. Run with --jitconfig flag +const templateJITRunner = `#!/bin/bash + +set -e + +RUNNER_VERSION={{.RunnerVersion}} +RUNNER_USER={{.RunnerUser}} +RUNNER_BASE_DIRECTORY={{.RunnerBaseDirectory}} +JIT_CONFIG={{.EncodedJITConfig}} + +sudo_prefix="" +if [ $(id -u) -eq 0 ]; then + sudo_prefix="sudo -E -u ${RUNNER_USER} " +fi + +echo "Configuring JIT runner" + +#--------------------------------------- +# Validate Environment +#--------------------------------------- +runner_plat=linux +[ ! -z "$(which sw_vers)" ] && runner_plat=osx; + +function fatal() +{ + echo "error: $1" >&2 + exit 1 +} + +function configure_environment() +{ + export HOME="/home/${RUNNER_USER}" + if [ "${runner_plat}" = "osx" ]; then + export HOME="/Users/${RUNNER_USER}" + fi +} + +function get_runner_file_name() +{ + runner_version=$1 + runner_plat=$2 + + trimmed_runner_version=$(echo ${RUNNER_VERSION:1}) + + if [ "${runner_plat}" = "linux" ]; then + echo "actions-runner-${runner_plat}-x64-${trimmed_runner_version}.tar.gz" + fi + + if [ "${runner_plat}" = "osx" ]; then + runner_arch=x64 + [ "$(uname -m)" = "arm64" ] && runner_arch=arm64; + echo "actions-runner-${runner_plat}-${runner_arch}-${trimmed_runner_version}.tar.gz" + fi +} + +function download_runner() +{ + runner_version=$1 + runner_file=$2 + + cd ${RUNNER_BASE_DIRECTORY} + + echo "Downloading ${runner_file}" + curl -sSL -o ${runner_file} https://github.com/actions/runner/releases/download/${runner_version}/${runner_file} + + ls -la *.tar.gz + tar xzf ./${runner_file} +} + +configure_environment + +runner_file=$(get_runner_file_name ${RUNNER_VERSION} ${runner_plat}) + +cd ${RUNNER_BASE_DIRECTORY} + +# Check if runner binary already exists +if [ ! -f "./run.sh" ]; then + download_runner ${RUNNER_VERSION} ${runner_file} +fi + +#--------------------------------------- +# Run with JIT config +#--------------------------------------- +echo "Starting runner with JIT config" +${sudo_prefix}./run.sh --jitconfig "${JIT_CONFIG}" +` diff --git a/pkg/scaleset/scripts_test.go b/pkg/scaleset/scripts_test.go new file mode 100644 index 0000000..e2295b6 --- /dev/null +++ b/pkg/scaleset/scripts_test.go @@ -0,0 +1,174 @@ +package scaleset + +import ( + "strings" + "testing" +) + +func TestGetJITSetupScript(t *testing.T) { + tests := []struct { + name string + encodedJITConfig string + runnerVersion string + runnerUser string + runnerBaseDir string + }{ + { + name: "basic JIT script generation", + encodedJITConfig: "eyJ0ZXN0IjoidmFsdWUifQ==", // base64 encoded JSON + runnerVersion: "v2.311.0", + runnerUser: "runner", + runnerBaseDir: "/tmp", + }, + { + name: "custom runner user", + encodedJITConfig: "test-config", + runnerVersion: "v2.311.0", + runnerUser: "ubuntu", + runnerBaseDir: "/home/ubuntu/actions-runner", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + script, err := GetJITSetupScript( + tt.encodedJITConfig, + tt.runnerVersion, + tt.runnerUser, + tt.runnerBaseDir, + ) + if err != nil { + t.Fatalf("GetJITSetupScript() error = %v", err) + } + + if script == "" { + t.Fatal("GetJITSetupScript() returned empty script") + } + + // Check that script starts with shebang + if !strings.HasPrefix(script, "#!/bin/bash") { + t.Error("script should start with #!/bin/bash") + } + + // Check compression infrastructure + if !strings.Contains(script, "COMPRESSED_SCRIPT") { + t.Error("script should contain COMPRESSED_SCRIPT variable") + } + + if !strings.Contains(script, "base64 -d") { + t.Error("script should contain base64 decoding") + } + + if !strings.Contains(script, "gzip -d") { + t.Error("script should contain gzip decompression") + } + }) + } +} + +func TestGetJITRawScript(t *testing.T) { + tests := []struct { + name string + encodedJITConfig string + runnerVersion string + runnerUser string + runnerBaseDir string + wantContains []string + wantNotContains []string + }{ + { + name: "basic JIT raw script", + encodedJITConfig: "eyJ0ZXN0IjoidmFsdWUifQ==", + runnerVersion: "v2.311.0", + runnerUser: "runner", + runnerBaseDir: "/tmp", + wantContains: []string{ + "#!/bin/bash", + "set -e", + "RUNNER_VERSION=v2.311.0", + "RUNNER_USER=runner", + "RUNNER_BASE_DIRECTORY=/tmp", + "JIT_CONFIG=eyJ0ZXN0IjoidmFsdWUifQ==", + "./run.sh --jitconfig", + }, + wantNotContains: []string{ + "config.sh", // JIT doesn't use config.sh + "RUNNER_TOKEN", // JIT doesn't use registration token + "RunnerService.js", // JIT doesn't need patch + "--ephemeral", // JIT is inherently ephemeral + "--once", // JIT doesn't use --once flag + }, + }, + { + name: "custom runner user", + encodedJITConfig: "test-config", + runnerVersion: "v2.311.0", + runnerUser: "ubuntu", + runnerBaseDir: "/home/ubuntu/actions-runner", + wantContains: []string{ + "RUNNER_USER=ubuntu", + "RUNNER_BASE_DIRECTORY=/home/ubuntu/actions-runner", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + script, err := getJITRawScript( + tt.encodedJITConfig, + tt.runnerVersion, + tt.runnerUser, + tt.runnerBaseDir, + ) + if err != nil { + t.Fatalf("getJITRawScript() error = %v", err) + } + + if script == "" { + t.Fatal("getJITRawScript() returned empty script") + } + + // Check expected content + for _, want := range tt.wantContains { + if !strings.Contains(script, want) { + t.Errorf("script should contain %q", want) + } + } + + // Check excluded content + for _, notWant := range tt.wantNotContains { + if strings.Contains(script, notWant) { + t.Errorf("script should NOT contain %q", notWant) + } + } + }) + } +} + +func TestGetJITSetupScriptCompression(t *testing.T) { + // Test that script is properly compressed and decompressed + encodedJITConfig := "test-jit-config-value" + runnerVersion := "v2.311.0" + runnerUser := "runner" + runnerBaseDir := "/tmp" + + script, err := GetJITSetupScript(encodedJITConfig, runnerVersion, runnerUser, runnerBaseDir) + if err != nil { + t.Fatalf("GetJITSetupScript() error = %v", err) + } + + // Compressed script should contain base64 decoding + if !strings.Contains(script, "base64 -d") { + t.Error("script should contain base64 decoding") + } + + // Compressed script should contain gzip decompression + if !strings.Contains(script, "gzip -d") { + t.Error("script should contain gzip decompression") + } + + // Should reference COMPRESSED_SCRIPT variable + if !strings.Contains(script, "COMPRESSED_SCRIPT") { + t.Error("script should reference COMPRESSED_SCRIPT variable") + } +} diff --git a/pkg/web/http.go b/pkg/web/http.go index 8251d93..c66d392 100644 --- a/pkg/web/http.go +++ b/pkg/web/http.go @@ -16,7 +16,7 @@ import ( ) // NewMux create routed mux -func NewMux(ds datastore.Datastore) *goji.Mux { +func NewMux(ds datastore.Datastore, scaleSetEnabled bool) *goji.Mux { mux := goji.NewMux() mux.HandleFunc(pat.Get("/healthz"), func(w http.ResponseWriter, r *http.Request) { @@ -32,10 +32,14 @@ func NewMux(ds datastore.Datastore) *goji.Mux { json.NewEncoder(w).Encode(h) }) - mux.HandleFunc(pat.Post("/github/events"), func(w http.ResponseWriter, r *http.Request) { - apacheLogging(r) - HandleGitHubEvent(w, r, ds) - }) + // Only enable webhook endpoint in webhook mode + // In scale set mode, jobs are received via long-polling, not webhooks + if !scaleSetEnabled { + mux.HandleFunc(pat.Post("/github/events"), func(w http.ResponseWriter, r *http.Request) { + apacheLogging(r) + HandleGitHubEvent(w, r, ds) + }) + } // REST API for targets mux.HandleFunc(pat.Post("/target"), func(w http.ResponseWriter, r *http.Request) { @@ -80,7 +84,7 @@ func NewMux(ds datastore.Datastore) *goji.Mux { // Serve start webhook receiver func Serve(ctx context.Context, ds datastore.Datastore) error { - mux := NewMux(ds) + mux := NewMux(ds, config.Config.ScaleSetEnabled) listenAddress := fmt.Sprintf(":%d", config.Config.Port) s := &http.Server{ Addr: listenAddress, diff --git a/plans/scale-set-client-integration-plan.md b/plans/scale-set-client-integration-plan.md new file mode 100644 index 0000000..0242db1 --- /dev/null +++ b/plans/scale-set-client-integration-plan.md @@ -0,0 +1,348 @@ +# Scale Set Client Integration Plan + +## Context + +GitHub Actions Runner Scale Set Client (`github.com/actions/scaleset`) が2026年2月にpublic previewとして公開された。これは Actions Runner Controller (ARC) から抽出されたスタンドアロンGoモジュールで、Kubernetesなしにカスタムオートスケーリングソリューションを構築できる。 + +myshoesは現在webhook駆動(GitHub → webhook → job queue → shoes plugin → runner)で動作しているが、scale setモードでは**long-polling駆動**(myshoes → scale set API → job受信 → shoes plugin → runner)に切り替わる。JIT (Just-In-Time) runner configにより、registration token + config.shが不要になり、ランナー起動が高速化される。 + +**目的**: `SCALESET_ENABLED=true` 環境変数で全targetをscale setモードに切り替え可能にする。 + +## Architecture Overview + +``` +[現在] GitHub webhook → myshoes web → job queue → starter → shoes plugin → runner +[新規] myshoes scaleset manager → long-poll scale set API → JIT config生成 → shoes plugin → runner +``` + +Scale setモード有効時: +- `web.Serve` → **起動する**(REST API + メトリクス提供。ただし `/github/events` は不要) +- `starter.Loop` → **起動しない**(scale set scalerが代替。job queue不使用) +- `runner.Loop` → **起動しない**(HandleJobCompletedが代替。定期ポーリング不要) +- `scaleset.Manager.Loop` → **起動する**(新規追加) + +## Implementation Steps + +### Step 1: 依存追加 + +**File**: `go.mod` +- `go get github.com/actions/scaleset@latest` + +### Step 2: Config拡張 + +**File**: `pkg/config/config.go` +- `Conf` structに追加: + ```go + ScaleSetEnabled bool + ScaleSetRunnerGroup string + ScaleSetMaxRunners int + ScaleSetNamePrefix string + ``` +- 環境変数定数追加: + - `SCALESET_ENABLED` (bool, default: false) + - `SCALESET_RUNNER_GROUP` (string, default: "default") + - `SCALESET_MAX_RUNNERS` (int, default: 10) + - `SCALESET_NAME_PREFIX` (string, default: "myshoes") + +**File**: `pkg/config/init.go` +- `LoadWithDefault()` に新環境変数の読み込みを追加 + +### Step 3: scaleset パッケージ新規作成 + +#### 3a. Manager (`pkg/scaleset/manager.go` NEW) + +Scale setライフサイクルを管理するオーケストレーター。 + +```go +type Manager struct { + ds datastore.Datastore + cfg ManagerConfig + scalers map[uuid.UUID]*targetScaler // target UUID -> scaler + mu sync.RWMutex +} + +type ManagerConfig struct { + AppID int64 + PrivateKeyPEM []byte + GitHubURL string + RunnerGroupName string + MaxRunners int + ScaleSetPrefix string + RunnerVersion string + RunnerUser string + RunnerBaseDir string +} +``` + +- `New(ds, cfg) *Manager` +- `Loop(ctx) error` - 30秒ごとにtargetリストを取得し、各targetに対してscale setとlistenerを起動/停止 + +**Loop処理フロー**: +1. `datastore.ListTargets(ctx, ds)` でアクティブなtargetを取得 +2. 各targetについて: + - installation IDを `gh.IsInstalledGitHubApp(ctx, scope)` で解決 + - scaleset clientを `scaleset.NewClientWithGitHubApp()` で作成 + - `GitHubConfigURL`: `{GitHubURL}/{scope}` (既存の `config.GitHubURL` を再利用) + - `GitHubAppAuth`: `{ClientID: strconv.FormatInt(AppID, 10), InstallationID: installationID, PrivateKey: string(PEMByte)}` + - `CreateRunnerScaleSet` or `GetRunnerScaleSet` でscale setを確保 + - `MessageSessionClient` でメッセージセッションを確立 + - `listener.New()` + `listener.Run(ctx, scaler)` でlistenerを起動 +3. 削除されたtargetのlistenerをcontext cancelで停止 + +#### 3b. Scaler (`pkg/scaleset/scaler.go` NEW) + +`listener.Scaler` interfaceを実装。scale set listenerとshoes pluginの橋渡し。 + +```go +type targetScaler struct { + ds datastore.Datastore + target datastore.Target + client *scaleset.Client + scaleSetID int + cfg ManagerConfig + activeRunners sync.Map // runner name -> runnerInfo +} +``` + +**`HandleDesiredRunnerCount(ctx, count) (int, error)`**: +1. 現在のアクティブランナー数を取得 +2. `count > current` の場合、差分のランナーをプロビジョニング: + - `client.GenerateJitRunnerConfig(ctx, setting, scaleSetID)` でJIT config生成 + - `GetJITSetupScript(encodedJITConfig, ...)` でスクリプト生成 + - `shoes.GetClient()` → `client.AddInstance(ctx, name, script, resourceType, labels)` でインスタンス作成 + - `ds.CreateRunner(ctx, runner)` でdatastoreに記録 +3. スケールダウンは不要(ephemeralランナーはジョブ完了後に自動終了) + +**`HandleJobStarted(ctx, *scaleset.JobStarted) error`**: +- ログ出力 + メトリクス更新のみ + +**`HandleJobCompleted(ctx, *scaleset.JobCompleted) error`**: +1. `activeRunners` からランナー情報を取得(RunnerName で検索) +2. `shoes.GetClient()` → `client.DeleteInstance(ctx, cloudID, labels)` でインスタンス削除 +3. `ds.DeleteRunner(ctx, id, time.Now(), RunnerStatusCompleted)` でdatastoreを更新 +4. `activeRunners` から削除 + +#### 3c. JIT Setup Script (`pkg/scaleset/scripts.go` NEW) + +JIT config用の簡略化されたsetup script。既存の `pkg/starter/scripts.go` のパターンを再利用するが、大幅にシンプル化。 + +```go +func GetJITSetupScript(encodedJITConfig, runnerVersion, runnerUser, runnerBaseDir string) (string, error) +``` + +**JIT scriptの特徴**(既存scriptとの差分): +- registration token不要(JIT configに含まれる) +- `config.sh --unattended` 不要(JIT configで自動設定) +- `RunnerService.js` パッチ不要 +- `--ephemeral`/`--once` フラグ不要(JITランナーは本質的にephemeral) +- ランナー起動: `./run.sh --jitconfig ` のみ + +スクリプトテンプレート(既存の `templateCreateLatestRunnerOnce` から圧縮テンプレートの仕組みを再利用): +1. ランナーバイナリダウンロード(既存ロジック再利用) +2. `./run.sh --jitconfig ${JIT_CONFIG}` で起動 + +#### 3d. Metrics (`pkg/scaleset/metrics.go` NEW) + +Prometheusメトリクス: +- `myshoes_scaleset_listener_running` (gauge, per target) +- `myshoes_scaleset_desired_runners` (gauge, per target) +- `myshoes_scaleset_active_runners` (gauge, per target) +- `myshoes_scaleset_jobs_completed_total` (counter, per target) +- `myshoes_scaleset_provision_errors_total` (counter, per target) + +### Step 4: Server統合 + +**File**: `cmd/server/cmd.go` + +- `myShoes` structに `ss *scaleset.Manager` を追加(nilの場合は無効) +- `newShoes()` で `config.Config.ScaleSetEnabled` を確認し、有効なら `scaleset.New()` を呼ぶ +- `Run()` の条件分岐: + ```go + // web.Serve は常に起動(REST API + metrics) + eg.Go(func() error { return web.Serve(ctx, m.ds) }) + + if m.ss != nil { + // Scale setモード: starter/runner loopは起動しない + eg.Go(func() error { return m.ss.Loop(ctx) }) + } else { + // Webhookモード: 従来のstarter/runner loopを起動 + eg.Go(func() error { return m.start.Loop(ctx) }) + eg.Go(func() error { return m.run.Loop(ctx) }) + } + ``` +- `newShoes()` で scale setモード時も `notifyEnqueueCh`, `Starter`, `runner.Manager` は初期化するが `Run()` では使わない(将来の混在モード対応のため構造を維持) + +### Step 5: テスト + +#### 5a. `pkg/scaleset/scaler_test.go` (NEW) +- `HandleDesiredRunnerCount` のスケールアップテスト +- `HandleJobCompleted` のクリーンアップテスト +- エラーハンドリングテスト + +#### 5b. `pkg/scaleset/scripts_test.go` (NEW) +- JIT setup scriptの生成テスト +- テンプレート出力の検証 + +#### 5c. `pkg/scaleset/manager_test.go` (NEW) +- Manager初期化テスト +- target追加/削除時のlistener起動/停止テスト + +#### 5d. `pkg/config/config_test.go` (MODIFY if exists) +- 新環境変数の読み込みテスト + +## Key Files to Modify/Create + +| File | Action | Description | +|------|--------|-------------| +| `go.mod` | MODIFY | `github.com/actions/scaleset` 依存追加 | +| `pkg/config/config.go` | MODIFY | ScaleSet関連フィールド + 環境変数定数追加 | +| `pkg/config/init.go` | MODIFY | 環境変数読み込み追加 | +| `cmd/server/cmd.go` | MODIFY | scaleset.Manager統合 | +| `pkg/scaleset/manager.go` | NEW | Scale set lifecycle manager | +| `pkg/scaleset/scaler.go` | NEW | listener.Scaler実装 | +| `pkg/scaleset/scripts.go` | NEW | JIT setup script生成 | +| `pkg/scaleset/metrics.go` | NEW | Prometheusメトリクス | +| `pkg/scaleset/manager_test.go` | NEW | Managerテスト | +| `pkg/scaleset/scaler_test.go` | NEW | Scalerテスト | +| `pkg/scaleset/scripts_test.go` | NEW | Scriptテスト | +| `docs/scaleset-mode.md` | NEW | Scale setモード運用ドキュメント | + +## Reusable Existing Code + +| 既存コード | 場所 | 再利用方法 | +|-----------|------|-----------| +| `shoes.GetClient()` / `shoes.Client` | `pkg/shoes/shoes.go` | インスタンス作成/削除にそのまま使用 | +| `gh.IsInstalledGitHubApp()` | `pkg/gh/jwt.go` | installation ID解決に使用 | +| `gh.DetectScope()` / `gh.DivideScope()` | `pkg/gh/scope.go` | scope解析に使用 | +| `runner.ToName()` | `pkg/runner/util.go` | ランナー命名規則に使用 | +| `datastore.ListTargets()` | `pkg/datastore/interface.go` | アクティブtarget取得に使用 | +| `datastore.Datastore.CreateRunner()` / `DeleteRunner()` | `pkg/datastore/interface.go` | ランナー記録に使用 | +| setup scriptのダウンロード関数群 | `pkg/starter/scripts.go` | テンプレートパターンを参考に簡略化版を作成 | +| `config.Config.GitHubURL` | `pkg/config/config.go` | GHES対応に再利用 | +| `logger.Logf()` | `pkg/logger/logger.go` | ログ出力に使用 | + +## Technical Notes + +- **Go version**: go.mod は `go 1.25` で scaleset client の要件(Go 1.25+)を満たしている +- **Auth**: scaleset clientの `GitHubAppAuth.ClientID` は AppID(int64→string変換)を受け付ける +- **GHES**: `GitHubConfigURL` に `config.GitHubURL + "/" + scope` を設定すればGHES対応可能 +- **proto変更なし**: JIT configをsetup scriptにラップすることで、既存のshoesプラグインインターフェースを変更せずに対応 +- **ラベル**: Scale set作成時に `Labels` を設定。targetのscopeベースでラベルを生成 +- **並行性**: 各targetのlistenerは独立したgoroutineで動作。`context.WithCancel` で個別に停止可能 + +### Step 6: ドキュメント作成 + +**File**: `docs/scaleset-mode.md` (NEW) + +Scale setモードに関する運用ドキュメントを作成。詳細な内容を含む(後述)。 + +## GitHub App 権限の変更 + +Scale setモードで必要なGitHub App権限: + +| 権限 | Repository scope | Organization scope | 理由 | +|------|-----------------|-------------------|------| +| `actions` | Read & Write | Read & Write | Runnerの登録・削除 (既存と同じ) | +| `administration` | Read | - | Repository設定の読み取り (既存と同じ) | +| `organization_self_hosted_runners` | - | Read & Write | **新規追加**: Organization-level scale set管理用 | + +**重要な変更点**: +- Repository-level targetの場合: 既存の権限で動作(変更不要) +- **Organization-level targetの場合**: `organization_self_hosted_runners` 権限が**新たに必要**になる + +既にWebhookモードでOrganization-level targetを使用している場合、この権限を追加する必要がある。 + +参考: [Authenticating ARC to the GitHub API - GitHub Docs](https://docs.github.com/en/actions/tutorials/use-actions-runner-controller/authenticate-to-the-api) + +## docs/scaleset-mode.md の内容 + +以下の詳細なドキュメントを作成: + +### 概要 +- Scale setモードとは: GitHub Actions Runner Scale Set APIを使用したlong-polling駆動の自動スケーリング +- Webhookモードとの違い: GitHub → myshoes (push) から myshoes → GitHub (pull) へ + +### GitHub App 権限 + +上記の権限表を含める。特に `organization_self_hosted_runners` 権限がOrganization-level targetで必要になることを強調。 + +### 設定方法 +```bash +SCALESET_ENABLED=true # Scale setモードを有効化 +SCALESET_RUNNER_GROUP=default # Runner group名 +SCALESET_MAX_RUNNERS=10 # Scale set あたりの最大ランナー数 +SCALESET_NAME_PREFIX=myshoes # Scale set名のプレフィックス +# 既存の環境変数(GITHUB_APP_ID, GITHUB_PRIVATE_KEY_BASE64 等)も必要 +``` + +### Webエンドポイントの変更 + +| エンドポイント | Webhookモード | Scale setモード | 理由 | +|---------------|--------------|----------------|------| +| `/github/events` (POST) | 必須 | **不要** | GitHubからのwebhookを受信しない。GitHub App設定でWebhook URLも不要 | +| `/target` (CRUD) | 必須 | **必須** | Scale set managerがdatastoreからtargetを読み取る | +| `/healthz` | 必須 | **必須** | ヘルスチェック | +| `/metrics` | 必須 | **必須** | Prometheusメトリクス(scale set固有メトリクスも追加) | +| `/config/*` | 任意 | **任意** | ランタイム設定変更 | + +**重要**: Scale setモード有効時、`/github/events` エンドポイントは存在するが使用されない。GitHub App設定でWebhook URLを設定する必要はない。 + +### 動作フロー比較 + +**Webhookモード** (既存): +``` +GitHub Actions → webhook → myshoes → job queue + ↓ + starter loop + ↓ + shoes plugin → runner + ↓ + runner manager (定期削除) +``` + +**Scale setモード** (新規): +``` +myshoes scale set manager → long-poll GitHub Scale Set API + ↓ (JobAssigned event) + generate JIT config + ↓ + shoes plugin → runner + ↓ (JobCompleted event) + HandleJobCompleted → 即座に削除 +``` + +### JIT Runnerの特徴 +- **Registration token不要**: JIT configに認証情報が含まれる +- **config.sh不要**: `./run.sh --jitconfig` で直接起動 +- **RunnerService.jsパッチ不要**: JITランナーは本質的にephemeral +- **高速起動**: トークン生成とconfig.shステップがスキップされる + +### 既存Shoes Providerとの互換性 +- **Proto変更なし**: `AddInstance` の `setupScript` 引数にJIT config含むスクリプトを渡す +- **透過的対応**: Providerは通常のsetup scriptとして扱うだけで動作 +- **移行不要**: 既存のshoes-lxd, shoes-aws, shoes-openstackがそのまま動作 + +### Scale Setの命名規則 +- Format: `{SCALESET_NAME_PREFIX}-{sanitized-scope}` +- 例: + - org `myorg` → scale set名 `myshoes-myorg` + - repo `myorg/myrepo` → scale set名 `myshoes-myorg-myrepo` + +### 制限事項・注意点 +- Scale setモードとWebhookモードは排他(グローバルスイッチ) +- Scale set作成にはGitHub App installationが必要 +- 各targetに対して1つのscale setが作成される +- Runner groupはscale set作成時に指定(デフォルト: "default") +- GHESサポート: `GITHUB_URL` 設定により対応 + +## Verification + +1. **ビルド確認**: `go build ./...` が成功すること +2. **ユニットテスト**: `go test ./pkg/scaleset/...` が全てパスすること +3. **既存テスト**: `go test ./...` で既存テストが壊れていないこと +4. **lint/fmt**: `go fmt ./...` でフォーマット済みであること +5. **統合テスト(手動)**: + - `SCALESET_ENABLED=true` + 他の必要な環境変数を設定してサーバー起動 + - targetが登録済みの状態でscale setが作成されることを確認 + - workflow jobをトリガーしてランナーがプロビジョニングされることを確認 + - ジョブ完了後にインスタンスがクリーンアップされることを確認