diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 379346e9..3a31a91e 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -71,30 +71,6 @@ metadata: name: manager-role namespace: openshift-lightspeed rules: -- apiGroups: - - "" - resources: - - pods - verbs: - - get - - list - - watch -- apiGroups: - - "" - resources: - - pods/log - verbs: - - get -- apiGroups: - - batch - resources: - - jobs - verbs: - - create - - delete - - get - - list - - watch - apiGroups: - operators.coreos.com resources: diff --git a/internal/controller/funcs.go b/internal/controller/funcs.go index cea3eeea..7c807bfa 100644 --- a/internal/controller/funcs.go +++ b/internal/controller/funcs.go @@ -18,25 +18,16 @@ package controller import ( "context" - "crypto/sha256" "encoding/json" "fmt" - "io" "math/rand" "strconv" - "strings" - "time" - "github.com/openstack-k8s-operators/lib-common/modules/common/condition" apiv1beta1 "github.com/openstack-lightspeed/operator/api/v1beta1" - batchv1 "k8s.io/api/batch/v1" - "k8s.io/client-go/kubernetes" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" common_helper "github.com/openstack-k8s-operators/lib-common/modules/common/helper" - corev1 "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" uns "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -139,7 +130,6 @@ func PatchOLSConfig( helper *common_helper.Helper, instance *apiv1beta1.OpenStackLightspeed, olsConfig *uns.Unstructured, - indexID string, ) error { // Patch the Providers section providersPatch := []interface{}{ @@ -165,10 +155,12 @@ func PatchOLSConfig( } // Patch the RAG section + // NOTE(lucasagomes): We don't need indexID here because the tag on our RAG images + // already matches the indexID that the Vector DB used when it was built. OLS leverages + // that to set the right index. openstackRAG := []interface{}{ map[string]interface{}{ "image": instance.Spec.RAGImage, - "indexID": indexID, "indexPath": OpenStackLightspeedVectorDBPath, }, } @@ -261,161 +253,6 @@ func IsOLSConfigReady(ctx context.Context, helper *common_helper.Helper) (bool, return true, nil } -// ResolveIndexID - returns index ID for the data stored in the vector DB container image. The discovery of the -// index ID is done through spawning a pod with the rag-content image and looking at the INDEX_NAME env variable value. -func ResolveIndexID( - ctx context.Context, - helper *common_helper.Helper, - instance *apiv1beta1.OpenStackLightspeed, -) (string, ctrl.Result, error) { - err := createOLSJob(ctx, helper, instance) - if err != nil { - return "", ctrl.Result{}, err - } - - podList := &corev1.PodList{} - labelSelector := client.MatchingLabels{"app": OpenStackLightspeedJobName} - if err := helper.GetClient().List(ctx, podList, client.InNamespace(instance.Namespace), labelSelector); err != nil { - return "", ctrl.Result{}, err - } - - var OLSPod *corev1.Pod - for _, pod := range podList.Items { - if pod.Spec.Containers[0].Image == instance.Spec.RAGImage { - OLSPod = &pod - break - } - } - if OLSPod == nil { - return requeueWaitingPod(helper, instance) - } - - switch OLSPod.Status.Phase { - case corev1.PodSucceeded: - indexName, err := extractEnvFromPodLogs(ctx, OLSPod, "INDEX_NAME") - if err != nil && k8s_errors.IsNotFound(err) { - return requeueWaitingPod(helper, instance) - } - return indexName, ctrl.Result{}, err - case corev1.PodFailed: - return "", ctrl.Result{}, fmt.Errorf("failed to start OpenStack Lightpseed RAG pod") - default: - return requeueWaitingPod(helper, instance) - } -} - -// extractEnvFromPodLogs - discovers an environment variable value from the pod logs. The pod must be started using -// createOLSJob. -func extractEnvFromPodLogs(ctx context.Context, pod *corev1.Pod, envVarName string) (string, error) { - cfg, err := config.GetConfig() - if err != nil { - return "", err - } - - k8sClient, err := kubernetes.NewForConfig(cfg) - if err != nil { - return "", err - } - - req := k8sClient.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{}) - podLogs, err := req.Stream(ctx) - if err != nil { - return "", err - } - defer func() { - _ = podLogs.Close() - }() - - buf := new(strings.Builder) - _, err = io.Copy(buf, podLogs) - if err != nil { - return "", fmt.Errorf("error in copying logs: %w", err) - } - - logs := buf.String() - for _, envLine := range strings.Split(logs, "\n") { - parts := strings.Split(envLine, "=") - if len(parts) != 2 { - continue - } - - if parts[0] == envVarName { - return parts[1], nil - } - } - - return "", fmt.Errorf("env var not discovered: %s", envVarName) -} - -// createOLSJob - starts OLS pod with entrypoint that lists environment variables after the start of the pod. It used -// to discover INDEX_NAME value. -func createOLSJob( - ctx context.Context, - helper *common_helper.Helper, - instance *apiv1beta1.OpenStackLightspeed, -) error { - imageHash := sha256.Sum256([]byte(instance.Spec.RAGImage)) - imageHashStr := fmt.Sprintf("%x", imageHash) - imageHashStr = imageHashStr[len(imageHashStr)-9:] - imageName := fmt.Sprintf("%s-%s", OpenStackLightspeedJobName, imageHashStr) - - ttlSecondsAfterFinished := int32(600) // 10 mins - activeDeadlineSeconds := int64(1200) // 20 mins - OLSPod := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: imageName, - Namespace: instance.Namespace, - Labels: map[string]string{ - "app": OpenStackLightspeedJobName, - }, - }, - Spec: batchv1.JobSpec{ - TTLSecondsAfterFinished: &ttlSecondsAfterFinished, - ActiveDeadlineSeconds: &activeDeadlineSeconds, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "app": OpenStackLightspeedJobName, - }, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "rag-content", - Image: instance.Spec.RAGImage, - Command: []string{"/bin/sh", "-c"}, - Args: []string{"env"}, - }, - }, - RestartPolicy: corev1.RestartPolicyNever, - }, - }, - }, - } - - if err := controllerutil.SetControllerReference(instance, OLSPod, helper.GetScheme()); err != nil { - return err - } - - err := helper.GetClient().Create(ctx, OLSPod) - if err != nil && !k8s_errors.IsAlreadyExists(err) { - return err - } - - return nil -} - -func requeueWaitingPod(helper *common_helper.Helper, instance *apiv1beta1.OpenStackLightspeed) (string, ctrl.Result, error) { - instance.Status.Conditions.Set(condition.FalseCondition( - apiv1beta1.OpenStackLightspeedReadyCondition, - condition.RequestedReason, - condition.SeverityInfo, - apiv1beta1.OpenStackLightspeedWaitingVectorDBMessage, - )) - helper.GetLogger().Info(apiv1beta1.OpenStackLightspeedReadyMessage) - return "", ctrl.Result{RequeueAfter: 5 * time.Second}, nil -} - // IsOwnedBy returns true if 'object' is owned by 'owner' based on OwnerReference UID. func IsOwnedBy(object metav1.Object, owner metav1.Object) bool { for _, ref := range object.GetOwnerReferences() { diff --git a/internal/controller/openstacklightspeed_controller.go b/internal/controller/openstacklightspeed_controller.go index 18d3cb3b..2813f354 100644 --- a/internal/controller/openstacklightspeed_controller.go +++ b/internal/controller/openstacklightspeed_controller.go @@ -63,9 +63,6 @@ func (r *OpenStackLightspeedReconciler) GetLogger(ctx context.Context) logr.Logg // +kubebuilder:rbac:groups=operators.coreos.com,resources=clusterserviceversions,namespace=openshift-lightspeed,verbs=update;patch;delete // +kubebuilder:rbac:groups=operators.coreos.com,resources=subscriptions,namespace=openshift-lightspeed,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=operators.coreos.com,resources=installplans,namespace=openshift-lightspeed,verbs=get;list;watch;update;delete -// +kubebuilder:rbac:groups=batch,resources=jobs,namespace=openshift-lightspeed,verbs=get;list;watch;create;delete -// +kubebuilder:rbac:groups="",resources=pods,namespace=openshift-lightspeed,verbs=get;list;watch -// +kubebuilder:rbac:groups="",resources=pods/log,namespace=openshift-lightspeed,verbs=get // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -189,21 +186,6 @@ func (r *OpenStackLightspeedReconciler) Reconcile(ctx context.Context, req ctrl. apiv1beta1.OpenShiftLightspeedOperatorReady, ) - // TODO(lpiwowar): Remove ResolveIndexID once OpenShift Lightspeed supports auto discovery of the indexID directly - // from the vector db image. - indexID, result, err := ResolveIndexID(ctx, helper, instance) - if err != nil { - instance.Status.Conditions.Set(condition.FalseCondition( - apiv1beta1.OpenStackLightspeedReadyCondition, - condition.ErrorReason, - condition.SeverityWarning, - condition.DeploymentReadyErrorMessage, - err.Error())) - return result, err - } else if (result != ctrl.Result{}) { - return result, nil - } - // NOTE: We cannot consume the OLSConfig definition directly from the OLS operator's code due to // a conflict in Go versions. When this comment was written, the min. required Go version for // openstack-operator was 1.21 whereas OLS operator required at least Go version 1.23. Once the @@ -232,7 +214,7 @@ func (r *OpenStackLightspeedReconciler) Reconcile(ctx context.Context, req ctrl. return fmt.Errorf("OLSConfig is managed by different OpenStackLightspeed instance") } - err = PatchOLSConfig(helper, instance, &olsConfig, indexID) + err = PatchOLSConfig(helper, instance, &olsConfig) if err != nil { return err }