Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -54,6 +55,7 @@ require (
github.com/plar/go-adaptive-radix-tree v1.0.4 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/wiggin77/merror v1.0.5 // indirect
github.com/wiggin77/srslog v1.0.1 // indirect
golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down
31 changes: 22 additions & 9 deletions service/docker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,26 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er

volumeID := jobPrefix + "-" + random.NewID()

securityOpts := []string{dockerSecurityOpts}
// Build mounts list starting with data volume
mounts := []mount.Mount{
{
Target: dockerVolumePath,
Source: volumeID,
Type: "volume",
},
}

// Add certificate mount if specified
// Note: We use JOBS_DOCKER_CERT_PATH instead of DOCKER_CERT_PATH to avoid
// conflicting with Docker's built-in TLS configuration variable
if certPath := os.Getenv("JOBS_DOCKER_CERT_PATH"); certPath != "" {
mounts = append(mounts, mount.Mount{
Target: "/certs",
Source: certPath,
Type: "bind",
ReadOnly: true,
})
}

resp, err := s.client.ContainerCreate(ctx, &container.Config{
Image: jb.Runner,
Expand All @@ -322,14 +341,8 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
},
}, &container.HostConfig{
NetworkMode: networkMode,
Mounts: []mount.Mount{
{
Target: dockerVolumePath,
Source: volumeID,
Type: "volume",
},
},
SecurityOpt: securityOpts,
Mounts: mounts,
SecurityOpt: []string{dockerSecurityOpts},
}, nil, nil, "")
if err != nil {
return job.Job{}, fmt.Errorf("failed to create container: %w", err)
Expand Down
60 changes: 38 additions & 22 deletions service/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
k8s "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const (
Expand Down Expand Up @@ -80,9 +81,36 @@ type JobService struct {
}

func NewJobService(log mlog.LoggerIFace, cfg JobServiceConfig) (*JobService, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to create in-cluster config: %w", err)
// Try to load kubeconfig for local development first
var config *rest.Config
var err error

// Check if KUBECONFIG env var is set, otherwise use default location
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
// Use default kubeconfig location
if home := os.Getenv("HOME"); home != "" {
kubeconfig = home + "/.kube/config"
}
}

// Try to load out-of-cluster config (for local development)
if kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Debug("failed to load kubeconfig, trying in-cluster config", mlog.Err(err))
} else {
log.Debug("using kubeconfig", mlog.String("path", kubeconfig))
}
}

// Fall back to in-cluster config if kubeconfig loading failed
if config == nil {
config, err = rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes config: %w", err)
}
log.Debug("using in-cluster kubernetes config")
}

cs, err := k8s.NewForConfig(config)
Expand Down Expand Up @@ -159,11 +187,13 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
cfg.InputData.SetSiteURL(getSiteURLForJob(cfg.InputData.GetSiteURL()))
jobPrefix = job.RecordingJobPrefix
jobID = jobPrefix + "-job-" + random.NewID()
s.log.Debug("creating recording job", mlog.String("jobID", jobID), mlog.String("callID", fmt.Sprintf("%v", cfg.InputData["call_id"])))
env = append(env, getEnvFromJobInputData(cfg.InputData)...)
case job.TypeTranscribing:
cfg.InputData.SetSiteURL(getSiteURLForJob(cfg.InputData.GetSiteURL()))
jobPrefix = job.TranscribingJobPrefix
jobID = jobPrefix + "-job-" + random.NewID()
s.log.Debug("creating transcription job", mlog.String("jobID", jobID), mlog.String("transcriptionID", fmt.Sprintf("%v", cfg.InputData["transcription_id"])))
env = append(env, getEnvFromJobInputData(cfg.InputData)...)
}

Expand Down Expand Up @@ -203,19 +233,10 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
ttlSecondsAfterFinished = newInt32(int32(s.cfg.FailedJobsRetentionTime.Seconds()))
}

volumes := []corev1.Volume{
{
Name: jobID,
},
}

if s.cfg.PersistentVolumeClaimName != "" {
s.log.Debug("using persistent volume claim", mlog.String("name", s.cfg.PersistentVolumeClaimName))
volumes[0].VolumeSource = corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: s.cfg.PersistentVolumeClaimName,
},
}
// Build volumes and mounts together to keep them in sync
volumes, volumeMounts, err := getVolumesAndMounts(jobID, s.cfg.PersistentVolumeClaimName, s.log)
if err != nil {
return job.Job{}, fmt.Errorf("failed to get volumes and mounts: %w", err)
}

spec := &batchv1.Job{
Expand Down Expand Up @@ -252,12 +273,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
Name: jobID,
Image: cfg.Runner,
ImagePullPolicy: corev1.PullIfNotPresent,
VolumeMounts: []corev1.VolumeMount{
{
Name: jobID,
MountPath: k8sVolumePath,
},
},
VolumeMounts: volumeMounts,
Env: env,
Resources: s.cfg.JobsResourceRequirements[cfg.Type],
SecurityContext: getJobPodSecurityContext(),
Expand Down
74 changes: 74 additions & 0 deletions service/kubernetes/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/mattermost/calls-offloader/public/job"

"github.com/mattermost/mattermost/server/public/shared/mlog"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/yaml"
Expand Down Expand Up @@ -156,3 +158,75 @@ func getJobPodSecurityContext() *corev1.SecurityContext {
Privileged: newBool(privileged),
}
}

func getVolumesAndMounts(jobID, persistentVolumeClaimName string, log mlog.LoggerIFace) ([]corev1.Volume, []corev1.VolumeMount, error) {
certConfigMap := os.Getenv("JOBS_K8S_CERT_CONFIGMAP")
certSecret := os.Getenv("JOBS_K8S_CERT_SECRET")
if certConfigMap != "" && certSecret != "" {
return nil, nil, fmt.Errorf("JOBS_K8S_CERT_CONFIGMAP and JOBS_K8S_CERT_SECRET are mutually exclusive, set only one")
}

// Start with the data volume
volumes := []corev1.Volume{
{
Name: jobID,
},
}

mounts := []corev1.VolumeMount{
{
Name: jobID,
MountPath: k8sVolumePath,
},
}

// Configure data volume source if using PVC
if persistentVolumeClaimName != "" {
log.Debug("using persistent volume claim", mlog.String("name", persistentVolumeClaimName))
volumes[0].VolumeSource = corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: persistentVolumeClaimName,
},
}
}

// Add certificate volume and mount if ConfigMap is specified
if certConfigMap != "" {
log.Debug("adding certificate ConfigMap volume", mlog.String("configMap", certConfigMap))
volumes = append(volumes, corev1.Volume{
Name: "certs",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: certConfigMap,
},
},
},
})
mounts = append(mounts, corev1.VolumeMount{
Name: "certs",
MountPath: "/certs",
ReadOnly: true,
})
}

// Add certificate volume and mount if Secret is specified
if certSecret != "" {
log.Debug("adding certificate Secret volume", mlog.String("secret", certSecret))
volumes = append(volumes, corev1.Volume{
Name: "certs",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: certSecret,
},
},
})
mounts = append(mounts, corev1.VolumeMount{
Name: "certs",
MountPath: "/certs",
ReadOnly: true,
})
}

return volumes, mounts, nil
}
97 changes: 97 additions & 0 deletions service/kubernetes/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/mattermost/calls-offloader/public/job"
"github.com/mattermost/mattermost/server/public/shared/mlog"

corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -328,3 +329,99 @@ func TestGenInitContainersWithSecurityContext(t *testing.T) {
require.True(t, *cnts[0].SecurityContext.Privileged)
})
}

func TestGetVolumesAndMounts(t *testing.T) {
log, err := mlog.NewLogger()
require.NoError(t, err)

t.Run("default, no PVC, no cert", func(t *testing.T) {
volumes, mounts, err := getVolumesAndMounts("job-1", "", log)
require.NoError(t, err)
require.Len(t, volumes, 1)
require.Len(t, mounts, 1)

require.Equal(t, "job-1", volumes[0].Name)
require.Nil(t, volumes[0].VolumeSource.PersistentVolumeClaim)

require.Equal(t, "job-1", mounts[0].Name)
require.Equal(t, "/data", mounts[0].MountPath)
})

t.Run("with PVC", func(t *testing.T) {
volumes, mounts, err := getVolumesAndMounts("job-2", "my-pvc", log)
require.NoError(t, err)
require.Len(t, volumes, 1)
require.Len(t, mounts, 1)

require.NotNil(t, volumes[0].VolumeSource.PersistentVolumeClaim)
require.Equal(t, "my-pvc", volumes[0].VolumeSource.PersistentVolumeClaim.ClaimName)
})

t.Run("with ConfigMap cert", func(t *testing.T) {
t.Setenv("JOBS_K8S_CERT_CONFIGMAP", "my-ca-configmap")

volumes, mounts, err := getVolumesAndMounts("job-3", "", log)
require.NoError(t, err)
require.Len(t, volumes, 2)
require.Len(t, mounts, 2)

// Data volume
require.Equal(t, "job-3", volumes[0].Name)

// Cert volume
require.Equal(t, "certs", volumes[1].Name)
require.NotNil(t, volumes[1].VolumeSource.ConfigMap)
require.Equal(t, "my-ca-configmap", volumes[1].VolumeSource.ConfigMap.Name)

// Cert mount
require.Equal(t, "certs", mounts[1].Name)
require.Equal(t, "/certs", mounts[1].MountPath)
require.True(t, mounts[1].ReadOnly)
})

t.Run("with Secret cert", func(t *testing.T) {
t.Setenv("JOBS_K8S_CERT_SECRET", "my-ca-secret")

volumes, mounts, err := getVolumesAndMounts("job-4", "", log)
require.NoError(t, err)
require.Len(t, volumes, 2)
require.Len(t, mounts, 2)

// Cert volume
require.Equal(t, "certs", volumes[1].Name)
require.NotNil(t, volumes[1].VolumeSource.Secret)
require.Equal(t, "my-ca-secret", volumes[1].VolumeSource.Secret.SecretName)

// Cert mount
require.Equal(t, "certs", mounts[1].Name)
require.Equal(t, "/certs", mounts[1].MountPath)
require.True(t, mounts[1].ReadOnly)
})

t.Run("with PVC and ConfigMap cert", func(t *testing.T) {
t.Setenv("JOBS_K8S_CERT_CONFIGMAP", "my-ca-configmap")

volumes, mounts, err := getVolumesAndMounts("job-5", "my-pvc", log)
require.NoError(t, err)
require.Len(t, volumes, 2)
require.Len(t, mounts, 2)

// Data volume has PVC
require.NotNil(t, volumes[0].VolumeSource.PersistentVolumeClaim)
require.Equal(t, "my-pvc", volumes[0].VolumeSource.PersistentVolumeClaim.ClaimName)

// Cert volume has ConfigMap
require.Equal(t, "certs", volumes[1].Name)
require.NotNil(t, volumes[1].VolumeSource.ConfigMap)
})

t.Run("both ConfigMap and Secret set", func(t *testing.T) {
t.Setenv("JOBS_K8S_CERT_CONFIGMAP", "my-ca-configmap")
t.Setenv("JOBS_K8S_CERT_SECRET", "my-ca-secret")

volumes, mounts, err := getVolumesAndMounts("job-6", "", log)
require.EqualError(t, err, "JOBS_K8S_CERT_CONFIGMAP and JOBS_K8S_CERT_SECRET are mutually exclusive, set only one")
require.Nil(t, volumes)
require.Nil(t, mounts)
})
}
Loading