Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,6 @@ metadata:
name: manager-role
namespace: openshift-lightspeed
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods/log
verbs:
- get
- apiGroups:
- batch
resources:
- jobs
verbs:
- create
- delete
- get
- list
- watch
- apiGroups:
- operators.coreos.com
resources:
Expand Down
169 changes: 3 additions & 166 deletions internal/controller/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,16 @@ package controller

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"time"

"github.com/openstack-k8s-operators/lib-common/modules/common/condition"
apiv1beta1 "github.com/openstack-lightspeed/operator/api/v1beta1"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"

common_helper "github.com/openstack-k8s-operators/lib-common/modules/common/helper"
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
uns "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -139,7 +130,6 @@ func PatchOLSConfig(
helper *common_helper.Helper,
instance *apiv1beta1.OpenStackLightspeed,
olsConfig *uns.Unstructured,
indexID string,
) error {
// Patch the Providers section
providersPatch := []interface{}{
Expand All @@ -165,10 +155,12 @@ func PatchOLSConfig(
}

// Patch the RAG section
// NOTE(lucasagomes): We don't need indexID here because the tag on our RAG images
// already matches the indexID that the Vector DB used when it was built. OLS leverages
// that to set the right index.
openstackRAG := []interface{}{
map[string]interface{}{
"image": instance.Spec.RAGImage,
"indexID": indexID,
"indexPath": OpenStackLightspeedVectorDBPath,
},
}
Expand Down Expand Up @@ -261,161 +253,6 @@ func IsOLSConfigReady(ctx context.Context, helper *common_helper.Helper) (bool,
return true, nil
}

// ResolveIndexID - returns index ID for the data stored in the vector DB container image. The discovery of the
// index ID is done through spawning a pod with the rag-content image and looking at the INDEX_NAME env variable value.
func ResolveIndexID(
Comment thread
umago marked this conversation as resolved.
ctx context.Context,
helper *common_helper.Helper,
instance *apiv1beta1.OpenStackLightspeed,
) (string, ctrl.Result, error) {
err := createOLSJob(ctx, helper, instance)
if err != nil {
return "", ctrl.Result{}, err
}

podList := &corev1.PodList{}
labelSelector := client.MatchingLabels{"app": OpenStackLightspeedJobName}
if err := helper.GetClient().List(ctx, podList, client.InNamespace(instance.Namespace), labelSelector); err != nil {
return "", ctrl.Result{}, err
}

var OLSPod *corev1.Pod
for _, pod := range podList.Items {
if pod.Spec.Containers[0].Image == instance.Spec.RAGImage {
OLSPod = &pod
break
}
}
if OLSPod == nil {
return requeueWaitingPod(helper, instance)
}

switch OLSPod.Status.Phase {
case corev1.PodSucceeded:
indexName, err := extractEnvFromPodLogs(ctx, OLSPod, "INDEX_NAME")
if err != nil && k8s_errors.IsNotFound(err) {
return requeueWaitingPod(helper, instance)
}
return indexName, ctrl.Result{}, err
case corev1.PodFailed:
return "", ctrl.Result{}, fmt.Errorf("failed to start OpenStack Lightpseed RAG pod")
default:
return requeueWaitingPod(helper, instance)
}
}

// extractEnvFromPodLogs - discovers an environment variable value from the pod logs. The pod must be started using
// createOLSJob.
func extractEnvFromPodLogs(ctx context.Context, pod *corev1.Pod, envVarName string) (string, error) {
cfg, err := config.GetConfig()
if err != nil {
return "", err
}

k8sClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return "", err
}

req := k8sClient.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{})
podLogs, err := req.Stream(ctx)
if err != nil {
return "", err
}
defer func() {
_ = podLogs.Close()
}()

buf := new(strings.Builder)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "", fmt.Errorf("error in copying logs: %w", err)
}

logs := buf.String()
for _, envLine := range strings.Split(logs, "\n") {
parts := strings.Split(envLine, "=")
if len(parts) != 2 {
continue
}

if parts[0] == envVarName {
return parts[1], nil
}
}

return "", fmt.Errorf("env var not discovered: %s", envVarName)
}

// createOLSJob - starts OLS pod with entrypoint that lists environment variables after the start of the pod. It used
// to discover INDEX_NAME value.
func createOLSJob(
ctx context.Context,
helper *common_helper.Helper,
instance *apiv1beta1.OpenStackLightspeed,
) error {
imageHash := sha256.Sum256([]byte(instance.Spec.RAGImage))
imageHashStr := fmt.Sprintf("%x", imageHash)
imageHashStr = imageHashStr[len(imageHashStr)-9:]
imageName := fmt.Sprintf("%s-%s", OpenStackLightspeedJobName, imageHashStr)

ttlSecondsAfterFinished := int32(600) // 10 mins
activeDeadlineSeconds := int64(1200) // 20 mins
OLSPod := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: imageName,
Namespace: instance.Namespace,
Labels: map[string]string{
"app": OpenStackLightspeedJobName,
},
},
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
ActiveDeadlineSeconds: &activeDeadlineSeconds,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": OpenStackLightspeedJobName,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "rag-content",
Image: instance.Spec.RAGImage,
Command: []string{"/bin/sh", "-c"},
Args: []string{"env"},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
}

if err := controllerutil.SetControllerReference(instance, OLSPod, helper.GetScheme()); err != nil {
return err
}

err := helper.GetClient().Create(ctx, OLSPod)
if err != nil && !k8s_errors.IsAlreadyExists(err) {
return err
}

return nil
}

func requeueWaitingPod(helper *common_helper.Helper, instance *apiv1beta1.OpenStackLightspeed) (string, ctrl.Result, error) {
instance.Status.Conditions.Set(condition.FalseCondition(
apiv1beta1.OpenStackLightspeedReadyCondition,
condition.RequestedReason,
condition.SeverityInfo,
apiv1beta1.OpenStackLightspeedWaitingVectorDBMessage,
))
helper.GetLogger().Info(apiv1beta1.OpenStackLightspeedReadyMessage)
return "", ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

// IsOwnedBy returns true if 'object' is owned by 'owner' based on OwnerReference UID.
func IsOwnedBy(object metav1.Object, owner metav1.Object) bool {
for _, ref := range object.GetOwnerReferences() {
Expand Down
20 changes: 1 addition & 19 deletions internal/controller/openstacklightspeed_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ func (r *OpenStackLightspeedReconciler) GetLogger(ctx context.Context) logr.Logg
// +kubebuilder:rbac:groups=operators.coreos.com,resources=clusterserviceversions,namespace=openshift-lightspeed,verbs=update;patch;delete
// +kubebuilder:rbac:groups=operators.coreos.com,resources=subscriptions,namespace=openshift-lightspeed,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operators.coreos.com,resources=installplans,namespace=openshift-lightspeed,verbs=get;list;watch;update;delete
// +kubebuilder:rbac:groups=batch,resources=jobs,namespace=openshift-lightspeed,verbs=get;list;watch;create;delete
// +kubebuilder:rbac:groups="",resources=pods,namespace=openshift-lightspeed,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods/log,namespace=openshift-lightspeed,verbs=get

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down Expand Up @@ -189,21 +186,6 @@ func (r *OpenStackLightspeedReconciler) Reconcile(ctx context.Context, req ctrl.
apiv1beta1.OpenShiftLightspeedOperatorReady,
)

// TODO(lpiwowar): Remove ResolveIndexID once OpenShift Lightspeed supports auto discovery of the indexID directly
// from the vector db image.
indexID, result, err := ResolveIndexID(ctx, helper, instance)
if err != nil {
instance.Status.Conditions.Set(condition.FalseCondition(
apiv1beta1.OpenStackLightspeedReadyCondition,
condition.ErrorReason,
condition.SeverityWarning,
condition.DeploymentReadyErrorMessage,
err.Error()))
return result, err
} else if (result != ctrl.Result{}) {
return result, nil
}

// NOTE: We cannot consume the OLSConfig definition directly from the OLS operator's code due to
// a conflict in Go versions. When this comment was written, the min. required Go version for
// openstack-operator was 1.21 whereas OLS operator required at least Go version 1.23. Once the
Expand Down Expand Up @@ -232,7 +214,7 @@ func (r *OpenStackLightspeedReconciler) Reconcile(ctx context.Context, req ctrl.
return fmt.Errorf("OLSConfig is managed by different OpenStackLightspeed instance")
}

err = PatchOLSConfig(helper, instance, &olsConfig, indexID)
err = PatchOLSConfig(helper, instance, &olsConfig)
if err != nil {
return err
}
Expand Down
Loading