Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions internal/intg/ct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func inspectContainer(ctx context.Context, dockerClient *client.Client, containe
}

func TestDockerExecutor(t *testing.T) {
requireDockerDaemon(t)

tests := []dockerExecutorTest{
{
name: "BasicExecution",
Expand Down Expand Up @@ -110,6 +112,7 @@ type containerTest struct {
}

func TestDAGLevelContainer(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

tests := []containerTest{
Expand Down Expand Up @@ -414,6 +417,8 @@ steps:
}

func TestContainerPullPolicy(t *testing.T) {
requireDockerDaemon(t)

th := test.Setup(t)

pullPolicyTestDAG := fmt.Sprintf(`
Expand Down Expand Up @@ -446,6 +451,8 @@ steps:
}

func TestContainerStartup_Entrypoint_WithHealthyFallback(t *testing.T) {
requireDockerDaemon(t)

th := test.Setup(t)

// Use nginx which stays up by default; most tags have no healthcheck,
Expand All @@ -469,6 +476,7 @@ steps:
}

func TestContainerStartup_Command_LongRunning(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

th := test.Setup(t)
Expand All @@ -493,9 +501,7 @@ steps:

func TestDockerExecutor_ExecInExistingContainer(t *testing.T) {
th := test.Setup(t)
dockerClient, err := client.New(client.FromEnv)
require.NoError(t, err, "failed to create docker client")
defer func() { _ = dockerClient.Close() }()
dockerClient := requireDockerClient(t)

containerName := fmt.Sprintf("dagu-existing-%d", time.Now().UnixNano())
containerID := createLongRunningContainer(t, th, dockerClient, containerName)
Expand All @@ -521,6 +527,8 @@ steps:
}

func TestDockerExecutor_ErrorIncludesRecentStderr(t *testing.T) {
requireDockerDaemon(t)

th := test.Setup(t)

dagConfig := fmt.Sprintf(`
Expand Down Expand Up @@ -655,6 +663,7 @@ func waitForContainerStop(t *testing.T, th test.Helper, dockerClient *client.Cli
// which allows specifying a container field directly on a step instead of
// using the executor syntax.
func TestStepLevelContainer(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

tests := []containerTest{
Expand Down Expand Up @@ -1077,9 +1086,7 @@ func TestContainerExecMode(t *testing.T) {
t.Parallel()

th := test.Setup(t)
dockerClient, err := client.New(client.FromEnv)
require.NoError(t, err, "failed to create docker client")
defer func() { _ = dockerClient.Close() }()
dockerClient := requireDockerClient(t)

// Create a long-running container for exec tests
containerName := fmt.Sprintf("dagu-exec-mode-%d", time.Now().UnixNano())
Expand Down Expand Up @@ -1254,6 +1261,7 @@ steps:

// TestContainerExecNotFound tests that exec mode fails when the container doesn't exist.
func TestContainerExecNotFound(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

th := test.Setup(t)
Expand All @@ -1276,9 +1284,7 @@ func TestContainerExecNotRunning(t *testing.T) {
t.Parallel()

th := test.Setup(t)
dockerClient, err := client.New(client.FromEnv)
require.NoError(t, err, "failed to create docker client")
defer func() { _ = dockerClient.Close() }()
dockerClient := requireDockerClient(t)

// Create a container but don't start it
containerName := fmt.Sprintf("dagu-exec-stopped-%d", time.Now().UnixNano())
Expand Down Expand Up @@ -1322,6 +1328,7 @@ steps:
// is passed to Docker and waitFor: healthy waits for the container to become
// healthy (not just running). Uses a file creation delay to prove actual waiting.
func TestContainerCustomHealthcheck(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

th := test.Setup(t)
Expand Down Expand Up @@ -1353,6 +1360,7 @@ steps:

// TestContainerCustomHealthcheck_StepLevel tests custom healthcheck at step level.
func TestContainerCustomHealthcheck_StepLevel(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

th := test.Setup(t)
Expand Down Expand Up @@ -1388,9 +1396,7 @@ func TestContainerExecVariableExpansion(t *testing.T) {
t.Parallel()

th := test.Setup(t)
dockerClient, err := client.New(client.FromEnv)
require.NoError(t, err, "failed to create docker client")
defer func() { _ = dockerClient.Close() }()
dockerClient := requireDockerClient(t)

// Create a long-running container for exec tests
containerName := fmt.Sprintf("dagu-exec-var-%d", time.Now().UnixNano())
Expand Down
44 changes: 44 additions & 0 deletions internal/intg/docker_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (C) 2026 Yota Hamada
// SPDX-License-Identifier: GPL-3.0-or-later

package intg_test

import (
"context"
"testing"
"time"

"github.com/moby/moby/client"
)

func requireDockerDaemon(t *testing.T) {
t.Helper()

dockerClient := newDockerClientOrSkip(t)
defer func() { _ = dockerClient.Close() }()
}

func requireDockerClient(t *testing.T) *client.Client {
t.Helper()

dockerClient := newDockerClientOrSkip(t)
t.Cleanup(func() { _ = dockerClient.Close() })
return dockerClient
}

func newDockerClientOrSkip(t *testing.T) *client.Client {
t.Helper()

dockerClient, err := client.New(client.FromEnv)
if err != nil {
t.Skipf("Skipping Docker-backed integration test: failed to create docker client: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

if _, err := dockerClient.Info(ctx, client.InfoOptions{}); err != nil {
_ = dockerClient.Close()
t.Skipf("Skipping Docker-backed integration test: docker daemon unavailable: %v", err)
}
return dockerClient
}
1 change: 1 addition & 0 deletions internal/intg/mltcmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ steps:
}

func TestMultipleCommands_Docker(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

const testImage = "alpine:3"
Expand Down
1 change: 1 addition & 0 deletions internal/intg/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type redisTest struct {
}

func TestDAGLevelRedis(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

tests := []redisTest{
Expand Down
1 change: 1 addition & 0 deletions internal/intg/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const minioImage = "minio/minio:RELEASE.2024-10-02T17-50-41Z"
// TestMinIOContainer_WithMCCommands tests S3-like operations using MinIO's mc client
// inside a container. This validates the container-based workflow pattern for object storage.
func TestMinIOContainer_WithMCCommands(t *testing.T) {
requireDockerDaemon(t)
t.Parallel()

tempDir, err := os.MkdirTemp("", "dagu-s3-test-*")
Expand Down
5 changes: 1 addition & 4 deletions internal/intg/sftp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path/filepath"
"testing"

"github.com/moby/moby/client"
"github.com/stretchr/testify/require"

"github.com/dagucloud/dagu/internal/core"
Expand All @@ -24,9 +23,7 @@ func TestSFTPExecutorIntegration(t *testing.T) {

th := test.Setup(t)

dockerClient, err := client.New(client.FromEnv)
require.NoError(t, err, "failed to create docker client")
defer func() { _ = dockerClient.Close() }()
dockerClient := requireDockerClient(t)

// Start SSH server container (reuses helpers from ssh_test.go)
sshServer := startSSHServer(t, th, dockerClient)
Expand Down
4 changes: 1 addition & 3 deletions internal/intg/ssh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ func TestSSHExecutorIntegration(t *testing.T) {

th := test.Setup(t)

dockerClient, err := client.New(client.FromEnv)
require.NoError(t, err, "failed to create docker client")
defer func() { _ = dockerClient.Close() }()
dockerClient := requireDockerClient(t)

// Start SSH server container
sshServer := startSSHServer(t, th, dockerClient)
Expand Down
1 change: 1 addition & 0 deletions internal/license/claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type LicenseClaims struct {
Features []string `json:"features"`
ActivationID string `json:"activation_id"`
WarningCode string `json:"wc,omitempty"`
GraceDays *int `json:"grace_days,omitempty"`
}

// HasFeature returns true if the given feature is included in the license claims.
Expand Down
22 changes: 22 additions & 0 deletions internal/license/claims_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ func TestLicenseClaims_WarningCode(t *testing.T) {
})
}

func TestLicenseClaims_GraceDays(t *testing.T) {
t.Parallel()

t.Run("round-trips zero grace days through JSON", func(t *testing.T) {
t.Parallel()
zero := 0
original := &LicenseClaims{
Plan: "trial",
Features: []string{FeatureAudit},
GraceDays: &zero,
}
data, err := json.Marshal(original)
require.NoError(t, err)
assert.Contains(t, string(data), `"grace_days":0`)

var decoded LicenseClaims
require.NoError(t, json.Unmarshal(data, &decoded))
require.NotNil(t, decoded.GraceDays)
assert.Equal(t, 0, *decoded.GraceDays)
})
}

func TestLicenseClaims_HasFeature(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions internal/license/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func (m *Manager) doHeartbeat(ctx context.Context, ad *ActivationData) {
slog.String("error", cloudErr.Message))
m.state.Update(nil, "")
return
case 400: // Expired - keep cached token so runtime can enforce expiry/grace locally
m.logger.Warn("License heartbeat reported an expired license, continuing with cached token",
slog.String("error", cloudErr.Message))
return
}
}
// Network error or other transient failure - continue with cached JWT
Expand Down
24 changes: 24 additions & 0 deletions internal/license/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,30 @@ func TestManager_doHeartbeat(t *testing.T) {
assert.Equal(t, "pro", m.Checker().Plan())
})

t.Run("400 Bad Request keeps expired token state for local enforcement", func(t *testing.T) {
t.Parallel()

pub, priv := testKeyPair(t)
claims := expiredInGraceClaims()
token := signToken(t, priv, claims)

srv := newMockCloudServer(t, mockCloudServerConfig{
heartbeatHandler: errorHandlerFn(http.StatusBadRequest, "license has expired"),
})

m := NewManager(ManagerConfig{
LicenseDir: t.TempDir(),
CloudURL: srv.URL,
}, pub, nil, slog.Default())
m.state.Update(claims, token)

m.doHeartbeat(context.Background(), makeAD("server-001"))

assert.False(t, m.Checker().IsCommunity())
assert.Equal(t, "pro", m.Checker().Plan())
assert.True(t, m.Checker().IsGracePeriod())
})

t.Run("invalid refreshed token leaves state unchanged", func(t *testing.T) {
t.Parallel()

Expand Down
28 changes: 27 additions & 1 deletion internal/license/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,26 @@ func cloneClaims(c *LicenseClaims) *LicenseClaims {
return nil
}
cp := *c
if c.ExpiresAt != nil {
expiresAt := *c.ExpiresAt
cp.ExpiresAt = &expiresAt
}
if c.NotBefore != nil {
notBefore := *c.NotBefore
cp.NotBefore = &notBefore
}
if c.IssuedAt != nil {
issuedAt := *c.IssuedAt
cp.IssuedAt = &issuedAt
}
if c.Features != nil {
cp.Features = make([]string, len(c.Features))
copy(cp.Features, c.Features)
}
if c.GraceDays != nil {
graceDays := *c.GraceDays
cp.GraceDays = &graceDays
}
return &cp
}

Expand Down Expand Up @@ -106,7 +122,17 @@ func (s *State) isInGracePeriod() bool {
if time.Now().Before(expiry) {
return false // not expired yet
}
return time.Now().Before(expiry.Add(gracePeriod))
return time.Now().Before(expiry.Add(s.graceDuration()))
}

func (s *State) graceDuration() time.Duration {
if s.claims == nil || s.claims.GraceDays == nil {
return gracePeriod
}
if *s.claims.GraceDays <= 0 {
return 0
}
return time.Duration(*s.claims.GraceDays) * 24 * time.Hour
}

// WarningCode returns the warning code from the license claims.
Expand Down
Loading