Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/runners/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,17 @@ func run() error {
defer notificationsConn.Close()

grpcServer := grpc.NewServer()
runnersv1.RegisterRunnersServiceServer(grpcServer, server.New(server.Options{
srv := server.New(server.Options{
Pool: pool,
IdentityClient: identityv1.NewIdentityServiceClient(identityConn),
AuthorizationClient: authorizationv1.NewAuthorizationServiceClient(authorizationConn),
AgentsClient: agentsv1.NewAgentsServiceClient(agentsConn),
ZitiManagementClient: zitiMgmtClient,
NotificationsClient: notificationsv1.NewNotificationsServiceClient(notificationsConn),
ZitiDialer: zitiManager,
}))
})
runnersv1.RegisterRunnersServiceServer(grpcServer, srv)
go srv.RunWorkloadActivitySweep(ctx, cfg.WorkloadActivitySweepInterval, cfg.WorkloadKeepaliveGrace)

listener, err := net.Listen("tcp", cfg.GRPCAddr)
if err != nil {
Expand Down
50 changes: 41 additions & 9 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@ const (
defaultZitiManagementAddress = "ziti-management:50051"
defaultNotificationsAddress = "notifications:50051"
defaultGRPCAddr = ":50051"
defaultWorkloadActivitySweep = 5 * time.Second
defaultKeepaliveGrace = 25 * time.Second
)

// Config captures runtime configuration derived from the environment.
type Config struct {
DatabaseURL string
IdentityAddress string
AuthorizationAddress string
AgentsAddress string
ZitiManagementAddress string
NotificationsAddress string
ZitiLeaseRenewalInterval time.Duration
ZitiEnrollmentTimeout time.Duration
GRPCAddr string
DatabaseURL string
IdentityAddress string
AuthorizationAddress string
AgentsAddress string
ZitiManagementAddress string
NotificationsAddress string
ZitiLeaseRenewalInterval time.Duration
ZitiEnrollmentTimeout time.Duration
WorkloadActivitySweepInterval time.Duration
WorkloadKeepaliveGrace time.Duration
GRPCAddr string
}

// Load reads configuration from environment variables, applying defaults when
Expand Down Expand Up @@ -71,6 +75,34 @@ func Load() (Config, error) {
if cfg.ZitiEnrollmentTimeout <= 0 {
return Config{}, fmt.Errorf("ZITI_ENROLLMENT_TIMEOUT must be greater than 0")
}

activitySweepInterval := strings.TrimSpace(os.Getenv("WORKLOAD_ACTIVITY_SWEEP_INTERVAL"))
if activitySweepInterval == "" {
cfg.WorkloadActivitySweepInterval = defaultWorkloadActivitySweep
} else {
parsed, err := time.ParseDuration(activitySweepInterval)
if err != nil {
return Config{}, fmt.Errorf("parse WORKLOAD_ACTIVITY_SWEEP_INTERVAL: %w", err)
}
cfg.WorkloadActivitySweepInterval = parsed
}
if cfg.WorkloadActivitySweepInterval <= 0 {
return Config{}, fmt.Errorf("WORKLOAD_ACTIVITY_SWEEP_INTERVAL must be greater than 0")
}

keepaliveGrace := strings.TrimSpace(os.Getenv("KEEPALIVE_GRACE"))
if keepaliveGrace == "" {
cfg.WorkloadKeepaliveGrace = defaultKeepaliveGrace
} else {
parsed, err := time.ParseDuration(keepaliveGrace)
if err != nil {
return Config{}, fmt.Errorf("parse KEEPALIVE_GRACE: %w", err)
}
cfg.WorkloadKeepaliveGrace = parsed
}
if cfg.WorkloadKeepaliveGrace <= 0 {
return Config{}, fmt.Errorf("KEEPALIVE_GRACE must be greater than 0")
}
cfg.GRPCAddr = readEnv("GRPC_ADDR", defaultGRPCAddr)

return cfg, nil
Expand Down
58 changes: 58 additions & 0 deletions internal/server/workload_activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package server

import (
"context"
"fmt"
"log"
"time"
)

func (s *Server) RunWorkloadActivitySweep(ctx context.Context, interval, keepaliveGrace time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := s.sweepWorkloadActivity(ctx, time.Now().UTC(), keepaliveGrace); err != nil {
log.Printf("runners: workload activity sweep: %v", err)
}
}
}
}

func (s *Server) sweepWorkloadActivity(ctx context.Context, now time.Time, keepaliveGrace time.Duration) error {
cutoff := now.Add(-keepaliveGrace)
workloads, err := s.setIdleWorkloads(ctx, cutoff)
if err != nil {
return err
}
for _, workload := range workloads {
s.publishWorkloadUpdateNotifications(ctx, workload, false, false, false, true)
}
return nil
}

func (s *Server) setIdleWorkloads(ctx context.Context, cutoff time.Time) ([]workloadRecord, error) {
query := fmt.Sprintf("UPDATE workloads SET agent_state = $1, updated_at = NOW() WHERE status = $2 AND agent_state = $3 AND last_activity_at < $4 AND removed_at IS NULL RETURNING %s", workloadColumns)
rows, err := s.pool.Query(ctx, query, workloadAgentStateIdle, workloadStatusRunning, workloadAgentStateProcessing, cutoff)
if err != nil {
return nil, err
}
defer rows.Close()

workloads := []workloadRecord{}
for rows.Next() {
workload, err := scanWorkload(rows)
if err != nil {
return nil, err
}
workloads = append(workloads, workload)
}
if err := rows.Err(); err != nil {
return nil, err
}
return workloads, nil
}
6 changes: 3 additions & 3 deletions internal/server/workload_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestStreamWorkloadLogsProxiesRunnerStream(t *testing.T) {
}

workloadRows := pgxmock.NewRows(workloadRowColumns).
AddRow(workloadID, runnerID, threadID, agentID, organizationID, workloadStatusRunning, nil, nil, containersJSON, "ziti-id", int32(0), int64(0), instanceID, now, nil, nil, now, now)
AddRow(workloadID, runnerID, threadID, agentID, organizationID, workloadStatusRunning, workloadAgentStateProcessing, nil, nil, containersJSON, "ziti-id", int32(0), int64(0), instanceID, now, nil, nil, now, now)
workloadQuery := fmt.Sprintf("SELECT %s FROM workloads WHERE id = $1", workloadColumns)
mockPool.ExpectQuery(regexp.QuoteMeta(workloadQuery)).
WithArgs(workloadID).
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestStreamWorkloadLogsRequiresViewWorkloads(t *testing.T) {
containersJSON := []byte("[]")

workloadRows := pgxmock.NewRows(workloadRowColumns).
AddRow(workloadID, runnerID, threadID, agentID, organizationID, workloadStatusRunning, nil, nil, containersJSON, "ziti-id", int32(0), int64(0), "instance-1", now, nil, nil, now, now)
AddRow(workloadID, runnerID, threadID, agentID, organizationID, workloadStatusRunning, workloadAgentStateProcessing, nil, nil, containersJSON, "ziti-id", int32(0), int64(0), "instance-1", now, nil, nil, now, now)
workloadQuery := fmt.Sprintf("SELECT %s FROM workloads WHERE id = $1", workloadColumns)
mockPool.ExpectQuery(regexp.QuoteMeta(workloadQuery)).
WithArgs(workloadID).
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestStreamWorkloadLogsMissingInstanceID(t *testing.T) {
containersJSON := []byte("[]")

workloadRows := pgxmock.NewRows(workloadRowColumns).
AddRow(workloadID, runnerID, threadID, agentID, organizationID, workloadStatusRunning, nil, nil, containersJSON, "ziti-id", int32(0), int64(0), nil, now, nil, nil, now, now)
AddRow(workloadID, runnerID, threadID, agentID, organizationID, workloadStatusRunning, workloadAgentStateProcessing, nil, nil, containersJSON, "ziti-id", int32(0), int64(0), nil, now, nil, nil, now, now)
workloadQuery := fmt.Sprintf("SELECT %s FROM workloads WHERE id = $1", workloadColumns)
mockPool.ExpectQuery(regexp.QuoteMeta(workloadQuery)).
WithArgs(workloadID).
Expand Down
Loading
Loading