diff --git a/README.md b/README.md index ee0cb311..7c973ad9 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,22 @@ The operator runs the following controllers and webhooks: | **NetworkPolicy Controller** | Creates permissive or restrictive NetworkPolicies based on signature verification status | | **MLflow Controller** | Auto-discovers MLflow instances, creates experiments per agent, injects tracking env vars and RBAC | +## Bundle Service + +Kagenti includes a dedicated bundle service used by AuthBridge clients to fetch authorization bundles. + +This service is deployed using the manifests in `kagenti-operator/config/bundleservice/` and is intended for SRE operational use. + +Key facts: + +- Deployment name: `bundle-service` +- Namespace: `system` +- Service type: `ClusterIP` +- Port: `8080` +- Health endpoints: `/healthz`, `/readyz` + +Use `kagenti-operator/kagenti-operator/cmd/bundle-service/README.md` for SRE runbook guidance and operational details. + ## Quick Start ### Prerequisites diff --git a/charts/kagenti-operator/crds/agent.kagenti.dev_authorizationpolicies.yaml b/charts/kagenti-operator/crds/agent.kagenti.dev_authorizationpolicies.yaml new file mode 100644 index 00000000..ff0d240a --- /dev/null +++ b/charts/kagenti-operator/crds/agent.kagenti.dev_authorizationpolicies.yaml @@ -0,0 +1,109 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: authorizationpolicies.agent.kagenti.dev +spec: + group: agent.kagenti.dev + names: + kind: AuthorizationPolicy + listKind: AuthorizationPolicyList + plural: authorizationpolicies + singular: authorizationpolicy + shortNames: + - ap + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - name: Scope + type: string + jsonPath: .spec.scope + - name: ClientID + type: string + jsonPath: .spec.clientID + - name: Hash + type: string + jsonPath: .status.bundleHash + priority: 1 + - name: Age + type: date + jsonPath: .metadata.creationTimestamp + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - scope + - policies + properties: + scope: + type: string + enum: + - global + - namespace + - client + default: client + clientID: + type: string + maxLength: 253 + pattern: "^[a-z0-9]([a-z0-9._-]*[a-z0-9])?$" + policies: + type: array + minItems: 1 + items: + type: object + required: + - path + - content + properties: + path: + type: string + minLength: 1 + pattern: "^[a-z0-9][a-z0-9/_.-]*\\.rego$" + content: + type: string + minLength: 1 + x-kubernetes-validations: + - rule: "self.scope == 'client' ? self.clientID != '' : true" + message: "clientID is required when scope is 'client'" + - rule: "self.scope != 'client' ? !has(self.clientID) || self.clientID == '' : true" + message: "clientID must not be set when scope is 'global' or 'namespace'" + status: + type: object + properties: + bundleHash: + type: string + lastBuilt: + type: string + format: date-time + conditions: + type: array + items: + type: object + required: + - type + - status + properties: + type: + type: string + status: + type: string + enum: + - "True" + - "False" + - "Unknown" + lastTransitionTime: + type: string + format: date-time + reason: + type: string + message: + type: string diff --git a/kagenti-operator/api/v1alpha1/authorizationpolicy_conversion.go b/kagenti-operator/api/v1alpha1/authorizationpolicy_conversion.go new file mode 100644 index 00000000..458ce7da --- /dev/null +++ b/kagenti-operator/api/v1alpha1/authorizationpolicy_conversion.go @@ -0,0 +1,40 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +func AuthorizationPolicyFromUnstructured(obj *unstructured.Unstructured) (*AuthorizationPolicy, error) { + var ap AuthorizationPolicy + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &ap); err != nil { + return nil, fmt.Errorf("converting from unstructured: %w", err) + } + return &ap, nil +} + +func AuthorizationPolicyToUnstructured(ap *AuthorizationPolicy) (*unstructured.Unstructured, error) { + obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ap) + if err != nil { + return nil, fmt.Errorf("converting to unstructured: %w", err) + } + return &unstructured.Unstructured{Object: obj}, nil +} diff --git a/kagenti-operator/api/v1alpha1/authorizationpolicy_conversion_test.go b/kagenti-operator/api/v1alpha1/authorizationpolicy_conversion_test.go new file mode 100644 index 00000000..37cf7398 --- /dev/null +++ b/kagenti-operator/api/v1alpha1/authorizationpolicy_conversion_test.go @@ -0,0 +1,141 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func TestAuthorizationPolicyFromUnstructured(t *testing.T) { + obj := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "agent.kagenti.dev/v1alpha1", + "kind": "AuthorizationPolicy", + "metadata": map[string]any{ + "name": "test-client", + "namespace": "default", + }, + "spec": map[string]any{ + "scope": "client", + "clientID": "test-client", + "policies": []any{ + map[string]any{ + "path": "inbound/request.rego", + "content": "package authbridge.client\ndefault allow := true\n", + }, + }, + }, + }, + } + + ap, err := AuthorizationPolicyFromUnstructured(obj) + if err != nil { + t.Fatalf("AuthorizationPolicyFromUnstructured failed: %v", err) + } + if ap.Spec.Scope != PolicyScopeClient { + t.Fatalf("unexpected scope: %s", ap.Spec.Scope) + } + if ap.Spec.ClientID != "test-client" { + t.Fatalf("unexpected clientID: %s", ap.Spec.ClientID) + } + if len(ap.Spec.Policies) != 1 { + t.Fatalf("expected 1 policy, got %d", len(ap.Spec.Policies)) + } +} + +func TestAuthorizationPolicyFromUnstructured_Global(t *testing.T) { + obj := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "agent.kagenti.dev/v1alpha1", + "kind": "AuthorizationPolicy", + "metadata": map[string]any{ + "name": "global-policy", + "namespace": "kagenti-system", + }, + "spec": map[string]any{ + "scope": "global", + "policies": []any{ + map[string]any{ + "path": "inbound/request.rego", + "content": "package authbridge.global\ndefault allow := true\n", + }, + }, + }, + }, + } + + ap, err := AuthorizationPolicyFromUnstructured(obj) + if err != nil { + t.Fatalf("AuthorizationPolicyFromUnstructured failed: %v", err) + } + if ap.Spec.Scope != PolicyScopeGlobal { + t.Fatalf("unexpected scope: %s", ap.Spec.Scope) + } + if ap.Spec.ClientID != "" { + t.Fatalf("expected empty clientID for global, got: %s", ap.Spec.ClientID) + } +} + +func TestAuthorizationPolicyRoundTrip(t *testing.T) { + original := &AuthorizationPolicy{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "agent.kagenti.dev/v1alpha1", + Kind: "AuthorizationPolicy", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "my-client", + Namespace: "default", + }, + Spec: AuthorizationPolicySpec{ + Scope: PolicyScopeClient, + ClientID: "my-client", + Policies: []PolicyEntry{ + { + Path: "inbound/request.rego", + Content: "package authbridge.client\ndefault allow := false\n", + }, + { + Path: "outbound/request.rego", + Content: "package authbridge.client\ndefault allow := true\n", + }, + }, + }, + } + + obj, err := AuthorizationPolicyToUnstructured(original) + if err != nil { + t.Fatalf("AuthorizationPolicyToUnstructured failed: %v", err) + } + + roundTripped, err := AuthorizationPolicyFromUnstructured(obj) + if err != nil { + t.Fatalf("AuthorizationPolicyFromUnstructured failed: %v", err) + } + + if roundTripped.Spec.Scope != original.Spec.Scope { + t.Fatalf("scope mismatch: %s vs %s", roundTripped.Spec.Scope, original.Spec.Scope) + } + if roundTripped.Spec.ClientID != original.Spec.ClientID { + t.Fatalf("clientID mismatch: %s vs %s", roundTripped.Spec.ClientID, original.Spec.ClientID) + } + if len(roundTripped.Spec.Policies) != len(original.Spec.Policies) { + t.Fatalf("policy count mismatch: %d vs %d", len(roundTripped.Spec.Policies), len(original.Spec.Policies)) + } +} diff --git a/kagenti-operator/api/v1alpha1/authorizationpolicy_types.go b/kagenti-operator/api/v1alpha1/authorizationpolicy_types.go new file mode 100644 index 00000000..e32eea78 --- /dev/null +++ b/kagenti-operator/api/v1alpha1/authorizationpolicy_types.go @@ -0,0 +1,97 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +const ( + AuthorizationPolicyResource = "authorizationpolicies" + AuthorizationPolicyKind = "AuthorizationPolicy" +) + +func AuthorizationPolicyGVR() schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: GroupVersion.Group, + Version: GroupVersion.Version, + Resource: AuthorizationPolicyResource, + } +} + +type PolicyScope string + +const ( + PolicyScopeGlobal PolicyScope = "global" + PolicyScopeNamespace PolicyScope = "namespace" + PolicyScopeClient PolicyScope = "client" +) + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:shortName=ap +// +kubebuilder:printcolumn:name="Scope",type=string,JSONPath=`.spec.scope` +// +kubebuilder:printcolumn:name="ClientID",type=string,JSONPath=`.spec.clientID` +// +kubebuilder:printcolumn:name="Hash",type=string,JSONPath=`.status.bundleHash`,priority=1 +// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` +type AuthorizationPolicy struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + Spec AuthorizationPolicySpec `json:"spec"` + Status AuthorizationPolicyStatus `json:"status,omitempty"` +} + +type AuthorizationPolicySpec struct { + // +kubebuilder:validation:Enum=global;namespace;client + // +kubebuilder:default=client + Scope PolicyScope `json:"scope"` + + // +optional + // +kubebuilder:validation:MaxLength=253 + // +kubebuilder:validation:Pattern="^[a-z0-9]([a-z0-9._-]*[a-z0-9])?$" + ClientID string `json:"clientID,omitempty"` + + // +kubebuilder:validation:MinItems=1 + Policies []PolicyEntry `json:"policies"` +} + +type PolicyEntry struct { + // +kubebuilder:validation:MinLength=1 + // +kubebuilder:validation:Pattern="^[a-z0-9][a-z0-9/_.-]*\\.rego$" + Path string `json:"path"` + + // +kubebuilder:validation:MinLength=1 + Content string `json:"content"` +} + +type AuthorizationPolicyStatus struct { + BundleHash string `json:"bundleHash,omitempty"` + LastBuilt *metav1.Time `json:"lastBuilt,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + +// +kubebuilder:object:root=true +type AuthorizationPolicyList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []AuthorizationPolicy `json:"items"` +} + +func init() { + SchemeBuilder.Register(&AuthorizationPolicy{}, &AuthorizationPolicyList{}) +} diff --git a/kagenti-operator/api/v1alpha1/zz_generated.deepcopy.go b/kagenti-operator/api/v1alpha1/zz_generated.deepcopy.go index 5a87d25b..25bc2c2c 100644 --- a/kagenti-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/kagenti-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -611,3 +611,123 @@ func (in *TargetRef) DeepCopy() *TargetRef { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthorizationPolicy) DeepCopyInto(out *AuthorizationPolicy) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthorizationPolicy. +func (in *AuthorizationPolicy) DeepCopy() *AuthorizationPolicy { + if in == nil { + return nil + } + out := new(AuthorizationPolicy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *AuthorizationPolicy) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthorizationPolicyList) DeepCopyInto(out *AuthorizationPolicyList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]AuthorizationPolicy, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthorizationPolicyList. +func (in *AuthorizationPolicyList) DeepCopy() *AuthorizationPolicyList { + if in == nil { + return nil + } + out := new(AuthorizationPolicyList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *AuthorizationPolicyList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthorizationPolicySpec) DeepCopyInto(out *AuthorizationPolicySpec) { + *out = *in + if in.Policies != nil { + in, out := &in.Policies, &out.Policies + *out = make([]PolicyEntry, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthorizationPolicySpec. +func (in *AuthorizationPolicySpec) DeepCopy() *AuthorizationPolicySpec { + if in == nil { + return nil + } + out := new(AuthorizationPolicySpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthorizationPolicyStatus) DeepCopyInto(out *AuthorizationPolicyStatus) { + *out = *in + if in.LastBuilt != nil { + in, out := &in.LastBuilt, &out.LastBuilt + *out = (*in).DeepCopy() + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthorizationPolicyStatus. +func (in *AuthorizationPolicyStatus) DeepCopy() *AuthorizationPolicyStatus { + if in == nil { + return nil + } + out := new(AuthorizationPolicyStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PolicyEntry) DeepCopyInto(out *PolicyEntry) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PolicyEntry. +func (in *PolicyEntry) DeepCopy() *PolicyEntry { + if in == nil { + return nil + } + out := new(PolicyEntry) + in.DeepCopyInto(out) + return out +} diff --git a/kagenti-operator/cmd/bundle-service/ARCHITECTURE.md b/kagenti-operator/cmd/bundle-service/ARCHITECTURE.md new file mode 100644 index 00000000..b750d327 --- /dev/null +++ b/kagenti-operator/cmd/bundle-service/ARCHITECTURE.md @@ -0,0 +1,183 @@ +# Bundle Service Architecture + +This document describes the architecture of the Kagenti bundle service and its HTTP API contract. + +## Overview + +The bundle service builds authorization bundles for AuthBridge clients and serves them as compressed tarballs. + +Bundles are assembled from three policy tiers: + +1. **Global policies** — cluster-wide rules defined in a global `AuthorizationPolicy` CR in the service namespace. The global CR owns the OPA query entry points (`package authbridge.inbound.request`, etc.) and contains the tier combination logic. +2. **Namespace policies** — scoped to the namespace from the client's SPIFFE ID. +3. **Client-specific policies** — scoped to the `AuthorizationPolicy` CR whose name and namespace match the SPIFFE ID. + +The global CR serves as the decision combiner — it defines how the three tiers interact (e.g., whether namespace can override, whether all tiers must allow). Platform engineers can modify this logic by editing the global CR without code changes. + +## Client identity + +Clients identify themselves via a SPIFFE ID passed as a query parameter: `?spiffe={trust-domain}/ns/{namespace}/sa/{name}`. The service parses the namespace and name to locate the relevant policy resources. + +Currently, no caller verification is performed (verification is handled upstream). In the future, the service will verify the caller's identity using either: + +- **mTLS** — validate that the client certificate's SPIFFE SAN matches the claimed identity +- **JWT** — validate a Kubernetes service account token and confirm the token's namespace/name matches the claimed identity + +The `Verifier` interface in `internal/bundleservice/identity/` is the extension point for this. + +## Policy model + +The service collects policies from `AuthorizationPolicy` CRs and packages them into the bundle payload. + +- **Global policies** — `scope: global`, must reside in the service namespace (`kagenti-system`). These declare the OPA query entry-point packages (e.g., `package authbridge.inbound.request`) and contain the decision logic that combines namespace and client tiers. +- **Namespace policies** — `scope: namespace`, scoped to the namespace from the client's SPIFFE ID. +- **Client policies** — `scope: client`, scoped to the CR whose name and namespace match the SPIFFE ID. + +### OPA query paths + +The OPA plugin queries these four paths: + +- `authbridge/inbound/request` +- `authbridge/inbound/response` +- `authbridge/outbound/request` +- `authbridge/outbound/response` + +The global CR's Rego declares these packages directly. Namespace and client policies use sub-packages (e.g., `package authbridge.ns.inbound.request`, `package authbridge.client.inbound.request`). + +### Default decision logic + +The default global CR (`config/bundleservice/default-policy.yaml`) implements: + +``` +allow if ns.override +allow if { ns_ok AND client_ok } +``` + +Where: +- `ns_ok` = namespace tier allows, or namespace tier is undefined (no namespace CR) +- `client_ok` = client tier allows, or client tier is undefined (no client CR) +- `ns.override` = namespace tier forces allow regardless of client tier + +Platform engineers can modify this logic (e.g., remove namespace override support, add additional tiers, change from AND to OR) by editing the global CR. + +### Bundle path structure + +Inside the tar.gz bundle: + +- `authbridge/global/` — policies from the global CR +- `authbridge/ns/` — policies from namespace CRs +- `authbridge/client/` — policies from client CRs +- `.manifest` — OPA bundle manifest with revision hash and roots + +OPA resolves policies by their `package` declaration, not their file path in the bundle. + +## Caching + +The bundle service uses three caches: + +| Cache | Key | Value | Eviction | +|-------|-----|-------|----------| +| **ETagCache** | client ID | content hash | Invalidated on policy change | +| **BundleCache** | client ID | gzipped tar + hash | LRU + 1-minute TTL | +| **PolicyCache** | `"global"` or namespace | parsed policy entries + hash | Invalidated on CR change | + +If the client sends an `If-None-Match` header matching the cached ETag, the service returns `304 Not Modified` without building a bundle. + +### Cache invalidation + +- Global CR change → invalidates all caches (all clients get fresh bundles) +- Namespace CR change → invalidates ETag and bundle caches for that namespace +- Client CR change → invalidates ETag and bundle caches for that specific client + +## Concurrency control + +The service uses two mechanisms to handle concurrent bundle requests efficiently: + +1. **Singleflight** — deduplicates concurrent requests for the same client identity. If 10 pods with the same identity request simultaneously, only one build runs; the others wait for its result. +2. **Build semaphore** — limits concurrent bundle builds across different identities to 10. This prevents thundering herd scenarios (e.g., cluster restart causing 1000 clients to miss cache simultaneously) from overwhelming the Kubernetes API server and etcd. + +Requests that cannot acquire a build slot block until one is available, respecting context cancellation (client disconnect). + +## Bundle size limit + +Bundles are limited to `5 MB`. If a generated bundle exceeds this size, the service returns `413 Payload Too Large`. + +## HTTP API + +### GET /bundles?spiffe={trust-domain}/ns/{namespace}/sa/{name} + +Fetch the authorization bundle for the client identified by a SPIFFE ID. + +Request: + +- Method: `GET` +- Path: `/bundles` +- Query parameter: `spiffe={trust-domain}/ns/{namespace}/sa/{name}` +- Optional header: `If-None-Match: ""` + +Response codes: + +- `200 OK` — the bundle is returned in the response body +- `304 Not Modified` — the bundle has not changed since the provided ETag +- `400 Bad Request` — the SPIFFE ID could not be parsed or is missing +- `403 Forbidden` — identity verification failed +- `413 Payload Too Large` — bundle exceeds 5 MB +- `503 Service Unavailable` — service not ready +- `500 Internal Server Error` — internal failure + +Response headers for `200 OK`: + +- `Content-Type: application/gzip` +- `ETag: ""` +- `Cache-Control: max-age=0, must-revalidate` + +Example request: + +```bash +curl -v 'http://bundle-service.kagenti-system.svc.cluster.local:8080/bundles?spiffe=localtest.me/ns/default/sa/my-agent' +``` + +ETag example: + +```bash +curl -v -H 'If-None-Match: "sha256:abc123..."' \ + 'http://bundle-service.kagenti-system.svc.cluster.local:8080/bundles?spiffe=localtest.me/ns/default/sa/my-agent' +``` + +### GET /healthz + +Returns `200 OK` and body `ok` when the service is alive. + +### GET /readyz + +Returns `200 OK` and body `ok` when the service is ready to serve bundles. If the service is not ready, it returns `503 Service Unavailable`. + +## Startup and readiness + +At startup, the service: + +1. Loads Kubernetes configuration from in-cluster credentials or `KUBECONFIG` +2. Creates dynamic clients +3. Starts the `AuthorizationPolicy` informer with scope-based indexing +4. Waits for informer sync before serving HTTP traffic +5. Begins serving on port 8080 + +## Logging + +The service logs structured messages via `log/slog`: + +- Bundle requests received (URL, method) +- Bundle responses sent (URL, status, size) +- Bundle build events (namespace, name, hash) +- Policy change events (scope, namespace) +- Errors (conversion failures, oversized bundles) + +Log level is configurable via `LOG_LEVEL` environment variable (`info`, `debug`, `warn`, `error`). + +## Error handling + +- Invalid or missing SPIFFE IDs return `400 Bad Request` +- Identity verification failures return `403 Forbidden` +- Oversized bundles return `413 Payload Too Large` +- Internal build or watch failures return `500 Internal Server Error` +- Readiness failures return `503 Service Unavailable` diff --git a/kagenti-operator/cmd/bundle-service/Dockerfile b/kagenti-operator/cmd/bundle-service/Dockerfile new file mode 100644 index 00000000..46af9c2b --- /dev/null +++ b/kagenti-operator/cmd/bundle-service/Dockerfile @@ -0,0 +1,25 @@ +# `--platform=$BUILDPLATFORM` pins the builder stage to the host arch so +# Go cross-compiles to $TARGETARCH instead of running under QEMU. With +# CGO disabled (below), the resulting binary is bit-for-bit identical to +# a native build but ~9x faster for the arm64 variant. +FROM --platform=$BUILDPLATFORM docker.io/golang:1.26 AS builder +ARG TARGETOS +ARG TARGETARCH + +WORKDIR /workspace +COPY go.mod go.mod +COPY go.sum go.sum +RUN go mod download + +COPY cmd/bundle-service/ cmd/bundle-service/ +COPY api/ api/ +COPY internal/bundleservice/ internal/bundleservice/ + +RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -o bundle-service ./cmd/bundle-service/ + +FROM gcr.io/distroless/static:nonroot +WORKDIR / +COPY --from=builder /workspace/bundle-service . +USER 65532:65532 + +ENTRYPOINT ["/bundle-service"] diff --git a/kagenti-operator/cmd/bundle-service/README.md b/kagenti-operator/cmd/bundle-service/README.md new file mode 100644 index 00000000..bf5dd9ed --- /dev/null +++ b/kagenti-operator/cmd/bundle-service/README.md @@ -0,0 +1,138 @@ +# Bundle Service + +This package contains the Kagenti bundle-service binary and SRE-facing operational guidance for running the service in Kubernetes. + +## Purpose + +The bundle service serves OPA authorization bundles to AuthBridge clients over HTTP. It runs as a cluster service and provides policy bundles assembled from global, namespace, and client-specific `AuthorizationPolicy` CRs. + +## Deployment + +### Quick start (kind) + +```bash +./hack/bundle-service-kind.sh [cluster-name] [namespace] +# Defaults: cluster=kagenti, namespace=kagenti-system +``` + +This script builds the image, loads it into kind, installs the CRD, applies the default global policy CR, and deploys the service. + +### Production manifests + +Deploy using the manifests in `kagenti-operator/config/bundleservice/`: + +- `deployment.yaml` +- `service.yaml` +- `serviceaccount.yaml` +- `rbac.yaml` +- `networkpolicy.yaml` +- `default-policy.yaml` — the default global `AuthorizationPolicy` CR + +Default deployment settings: + +- Namespace: `kagenti-system` +- Deployment name: `bundle-service` +- Service name: `bundle-service` +- Port: `8080` + +### Prerequisites + +The `AuthorizationPolicy` CRD must be installed before deploying: + +```bash +kubectl apply -f config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml +``` + +The default global policy CR must be applied for the service to produce valid bundles: + +```bash +kubectl apply -f config/bundleservice/default-policy.yaml +``` + +### Runtime configuration + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `POD_NAMESPACE` | (from downward API) | Namespace where the service runs; used to identify global policies | +| `LOG_LEVEL` | `info` | Log verbosity: `debug`, `info`, `warn`, `error` | +| `KUBECONFIG` | (in-cluster) | Path to kubeconfig when running outside the cluster | + +## Health and readiness + +| Endpoint | Purpose | Success | Failure | +|----------|---------|---------|---------| +| `GET /healthz` | Liveness probe | `200 OK` | — | +| `GET /readyz` | Readiness probe | `200 OK` | `503 Service Unavailable` | + +The service reports ready once the Kubernetes informer has synced. Until then, all `/bundles` requests return `503`. + +## Operational behavior + +### Request flow + +1. Client sends `GET /bundles?spiffe={trust-domain}/ns/{namespace}/sa/{name}` +2. Service checks readiness, parses identity, verifies authorization +3. Fast path: ETagCache hit + `If-None-Match` match → `304 Not Modified` +4. Medium path: BundleCache hit → return cached bundle +5. Slow path: build bundle from CRs (deduplicated by singleflight, bounded by semaphore) + +### Concurrency limits + +At most 10 bundle builds run concurrently. Additional requests queue until a slot is available. This protects the Kubernetes API server and etcd from thundering herd during cluster restarts. + +Requests for the same client identity are deduplicated — only one build runs while others wait for its result. + +### Expected response codes + +| Code | Cause | +|------|-------| +| `200` | Bundle served successfully | +| `304` | Bundle unchanged (ETag match) | +| `400` | Missing or unparseable SPIFFE ID | +| `403` | Identity verification failed | +| `413` | Bundle exceeds 5 MB limit | +| `500` | Internal error during bundle build | +| `503` | Service not ready (informer not synced) | + +### Logs + +Structured logs via `log/slog`. Key log events: + +- `bundle request received` — every incoming request (URL, method) +- `bundle response` — every response (URL, status, size) +- `bundle built` — new bundle generated (namespace, name, hash) +- `bundle exceeds size limit` — bundle too large (namespace, name, size) +- Policy change events from watcher (scope, namespace, key) + +## Global policy CR + +The default global `AuthorizationPolicy` CR (`config/bundleservice/default-policy.yaml`) defines the decision logic for all four OPA query paths. It determines how namespace and client tiers are combined. + +Platform engineers can customize this CR to: + +- Remove namespace tier support entirely +- Add or remove namespace override capability +- Change combination logic (AND → OR, add additional checks) +- Set default allow/deny behavior + +If the global CR is deleted, OPA has no rules at the query paths and all decisions default to deny (fail-closed). + +## Audit and monitoring + +Monitor: + +- Deployment availability and pod restarts +- Readiness/liveness probe status +- `413` and `500` response rates +- Bundle build latency (via log timestamps) +- Cache effectiveness (frequency of `304` vs `200` responses) + +## Related resources + +- `kagenti-operator/config/bundleservice/` — deployment manifests and default policy +- `kagenti-operator/config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml` — CRD definition +- `kagenti-operator/internal/bundleservice/` — service implementation + +## Architecture and API details + +For architecture and API contract details, see [ARCHITECTURE.md](ARCHITECTURE.md). diff --git a/kagenti-operator/cmd/bundle-service/main.go b/kagenti-operator/cmd/bundle-service/main.go new file mode 100644 index 00000000..2d47a9a8 --- /dev/null +++ b/kagenti-operator/cmd/bundle-service/main.go @@ -0,0 +1,156 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "log/slog" + "net/http" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + + "github.com/kagenti/operator/internal/bundleservice/cache" + "github.com/kagenti/operator/internal/bundleservice/handler" + "github.com/kagenti/operator/internal/bundleservice/identity" + "github.com/kagenti/operator/internal/bundleservice/provider" + "github.com/kagenti/operator/internal/bundleservice/watcher" +) + +const version = "0.3.1" + +func main() { + slog.Info("bundle-service starting", "version", version) + + level := slog.LevelInfo + if l := os.Getenv("LOG_LEVEL"); l != "" { + switch l { + case "debug": + level = slog.LevelDebug + case "warn": + level = slog.LevelWarn + case "error": + level = slog.LevelError + } + } + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: level}))) + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) + defer cancel() + + cfg, err := restConfig() + if err != nil { + slog.Error("failed to get kubernetes config", "error", err) + os.Exit(1) + } + + dynClient, err := dynamic.NewForConfig(cfg) + if err != nil { + slog.Error("failed to create dynamic client", "error", err) + os.Exit(1) + } + + serviceNamespace := os.Getenv("POD_NAMESPACE") + if serviceNamespace == "" { + serviceNamespace = detectNamespace() + } + slog.Info("service namespace", "namespace", serviceNamespace) + + etagCache := cache.NewETagCache() + bundleCache := cache.NewBundleCache(100, 1*time.Minute) + policyCache := cache.NewPolicyCache() + + w := watcher.New(dynClient, serviceNamespace, etagCache, bundleCache, policyCache) + go w.Run(ctx) + + // Block until informer syncs or context is cancelled + slog.Info("waiting for informer sync") + syncTimeout := time.After(60 * time.Second) + for !w.HasSynced() { + select { + case <-ctx.Done(): + slog.Error("context cancelled before informer sync") + os.Exit(1) + case <-syncTimeout: + slog.Error("timed out waiting for informer sync") + os.Exit(1) + default: + time.Sleep(50 * time.Millisecond) + } + } + slog.Info("informer synced, starting HTTP server") + + prov := provider.New(etagCache, bundleCache, policyCache, w) + h := handler.New(prov, w, identity.NoopVerifier{}) + mux := http.NewServeMux() + h.RegisterRoutes(mux) + + srv := &http.Server{ + Addr: ":8080", + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 60 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + slog.Error("server error", "error", err) + os.Exit(1) + } + }() + + <-ctx.Done() + slog.Info("shutting down") + + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + if err := srv.Shutdown(shutdownCtx); err != nil { + slog.Error("shutdown error", "error", err) + } +} + +func restConfig() (*rest.Config, error) { + cfg, err := rest.InClusterConfig() + if err == nil { + return cfg, nil + } + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig == "" { + if home := homedir.HomeDir(); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + } + return clientcmd.BuildConfigFromFlags("", kubeconfig) +} + +func detectNamespace() string { + // In-cluster: read from the mounted service account namespace file + if ns, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + return string(ns) + } + return "kagenti-system" +} diff --git a/kagenti-operator/config/bundleservice/default-policy.yaml b/kagenti-operator/config/bundleservice/default-policy.yaml new file mode 100644 index 00000000..ace9fc9b --- /dev/null +++ b/kagenti-operator/config/bundleservice/default-policy.yaml @@ -0,0 +1,64 @@ +apiVersion: agent.kagenti.dev/v1alpha1 +kind: AuthorizationPolicy +metadata: + name: default + namespace: kagenti-system +spec: + scope: global + policies: + - path: "inbound/request.rego" + content: | + package authbridge.inbound.request + import rego.v1 + default allow := false + allow if data.authbridge.ns.inbound.request.override + allow if { + ns_ok + client_ok + } + ns_ok if data.authbridge.ns.inbound.request.allow + ns_ok if not data.authbridge.ns.inbound.request + client_ok if data.authbridge.client.inbound.request.allow + client_ok if not data.authbridge.client.inbound.request + - path: "inbound/response.rego" + content: | + package authbridge.inbound.response + import rego.v1 + default allow := false + allow if data.authbridge.ns.inbound.response.override + allow if { + ns_ok + client_ok + } + ns_ok if data.authbridge.ns.inbound.response.allow + ns_ok if not data.authbridge.ns.inbound.response + client_ok if data.authbridge.client.inbound.response.allow + client_ok if not data.authbridge.client.inbound.response + - path: "outbound/request.rego" + content: | + package authbridge.outbound.request + import rego.v1 + default allow := false + allow if data.authbridge.ns.outbound.request.override + allow if { + ns_ok + client_ok + } + ns_ok if data.authbridge.ns.outbound.request.allow + ns_ok if not data.authbridge.ns.outbound.request + client_ok if data.authbridge.client.outbound.request.allow + client_ok if not data.authbridge.client.outbound.request + - path: "outbound/response.rego" + content: | + package authbridge.outbound.response + import rego.v1 + default allow := false + allow if data.authbridge.ns.outbound.response.override + allow if { + ns_ok + client_ok + } + ns_ok if data.authbridge.ns.outbound.response.allow + ns_ok if not data.authbridge.ns.outbound.response + client_ok if data.authbridge.client.outbound.response.allow + client_ok if not data.authbridge.client.outbound.response diff --git a/kagenti-operator/config/bundleservice/deployment.yaml b/kagenti-operator/config/bundleservice/deployment.yaml new file mode 100644 index 00000000..0dd0c77d --- /dev/null +++ b/kagenti-operator/config/bundleservice/deployment.yaml @@ -0,0 +1,58 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: bundle-service + namespace: system + labels: + app: bundle-service +spec: + replicas: 1 + selector: + matchLabels: + app: bundle-service + template: + metadata: + labels: + app: bundle-service + spec: + serviceAccountName: bundle-service + securityContext: + runAsNonRoot: true + containers: + - name: bundle-service + image: ghcr.io/kagenti/bundle-service:latest + ports: + - name: http + containerPort: 8080 + protocol: TCP + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: LOG_LEVEL + value: info + livenessProbe: + httpGet: + path: /healthz + port: http + initialDelaySeconds: 5 + periodSeconds: 10 + readinessProbe: + httpGet: + path: /readyz + port: http + initialDelaySeconds: 2 + periodSeconds: 5 + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + memory: 256Mi + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: + - ALL diff --git a/kagenti-operator/config/bundleservice/networkpolicy.yaml b/kagenti-operator/config/bundleservice/networkpolicy.yaml new file mode 100644 index 00000000..f3bd4a31 --- /dev/null +++ b/kagenti-operator/config/bundleservice/networkpolicy.yaml @@ -0,0 +1,45 @@ +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: bundle-service + namespace: system +spec: + podSelector: + matchLabels: + app: bundle-service + policyTypes: + - Ingress + - Egress + ingress: + # Only allow traffic from pods with the authbridge label + - from: + - podSelector: + matchLabels: + kagenti.dev/authbridge: "true" + namespaceSelector: {} + ports: + - protocol: TCP + port: 8080 + egress: + # Allow DNS resolution + - to: + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: kube-system + podSelector: + matchLabels: + k8s-app: kube-dns + ports: + - protocol: UDP + port: 53 + - protocol: TCP + port: 53 + # Allow access to Kubernetes API server + - to: + - ipBlock: + cidr: 0.0.0.0/0 + ports: + - protocol: TCP + port: 443 + - protocol: TCP + port: 6443 diff --git a/kagenti-operator/config/bundleservice/rbac.yaml b/kagenti-operator/config/bundleservice/rbac.yaml new file mode 100644 index 00000000..3420dde1 --- /dev/null +++ b/kagenti-operator/config/bundleservice/rbac.yaml @@ -0,0 +1,34 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kagenti-bundle-service +rules: + - apiGroups: + - agent.kagenti.dev + resources: + - authorizationpolicies + verbs: + - get + - list + - watch + - apiGroups: + - agent.kagenti.dev + resources: + - authorizationpolicies/status + verbs: + - get + - update + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kagenti-bundle-service +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: kagenti-bundle-service +subjects: + - kind: ServiceAccount + name: bundle-service + namespace: system diff --git a/kagenti-operator/config/bundleservice/service.yaml b/kagenti-operator/config/bundleservice/service.yaml new file mode 100644 index 00000000..7df0b8c8 --- /dev/null +++ b/kagenti-operator/config/bundleservice/service.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Service +metadata: + name: bundle-service + namespace: system +spec: + type: ClusterIP + selector: + app: bundle-service + ports: + - name: http + port: 8080 + targetPort: http + protocol: TCP diff --git a/kagenti-operator/config/bundleservice/serviceaccount.yaml b/kagenti-operator/config/bundleservice/serviceaccount.yaml new file mode 100644 index 00000000..0aa1bc02 --- /dev/null +++ b/kagenti-operator/config/bundleservice/serviceaccount.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: bundle-service + namespace: system diff --git a/kagenti-operator/config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml b/kagenti-operator/config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml new file mode 100644 index 00000000..ff0d240a --- /dev/null +++ b/kagenti-operator/config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml @@ -0,0 +1,109 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: authorizationpolicies.agent.kagenti.dev +spec: + group: agent.kagenti.dev + names: + kind: AuthorizationPolicy + listKind: AuthorizationPolicyList + plural: authorizationpolicies + singular: authorizationpolicy + shortNames: + - ap + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: {} + additionalPrinterColumns: + - name: Scope + type: string + jsonPath: .spec.scope + - name: ClientID + type: string + jsonPath: .spec.clientID + - name: Hash + type: string + jsonPath: .status.bundleHash + priority: 1 + - name: Age + type: date + jsonPath: .metadata.creationTimestamp + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + required: + - scope + - policies + properties: + scope: + type: string + enum: + - global + - namespace + - client + default: client + clientID: + type: string + maxLength: 253 + pattern: "^[a-z0-9]([a-z0-9._-]*[a-z0-9])?$" + policies: + type: array + minItems: 1 + items: + type: object + required: + - path + - content + properties: + path: + type: string + minLength: 1 + pattern: "^[a-z0-9][a-z0-9/_.-]*\\.rego$" + content: + type: string + minLength: 1 + x-kubernetes-validations: + - rule: "self.scope == 'client' ? self.clientID != '' : true" + message: "clientID is required when scope is 'client'" + - rule: "self.scope != 'client' ? !has(self.clientID) || self.clientID == '' : true" + message: "clientID must not be set when scope is 'global' or 'namespace'" + status: + type: object + properties: + bundleHash: + type: string + lastBuilt: + type: string + format: date-time + conditions: + type: array + items: + type: object + required: + - type + - status + properties: + type: + type: string + status: + type: string + enum: + - "True" + - "False" + - "Unknown" + lastTransitionTime: + type: string + format: date-time + reason: + type: string + message: + type: string diff --git a/kagenti-operator/config/rbac/role.yaml b/kagenti-operator/config/rbac/role.yaml index 4c365137..3211652d 100644 --- a/kagenti-operator/config/rbac/role.yaml +++ b/kagenti-operator/config/rbac/role.yaml @@ -88,6 +88,22 @@ rules: - get - patch - update +- apiGroups: + - agent.kagenti.dev + resources: + - authorizationpolicies + verbs: + - get + - list + - watch +- apiGroups: + - agent.kagenti.dev + resources: + - authorizationpolicies/status + verbs: + - get + - update + - patch - apiGroups: - agents.x-k8s.io resources: diff --git a/kagenti-operator/hack/bundle-service-kind.sh b/kagenti-operator/hack/bundle-service-kind.sh new file mode 100755 index 00000000..45f7d4da --- /dev/null +++ b/kagenti-operator/hack/bundle-service-kind.sh @@ -0,0 +1,174 @@ +#!/usr/bin/env bash +# Build the bundle-service image, load it into a kind cluster, and deploy. +# +# Usage: +# ./hack/bundle-service-kind.sh [kind-cluster-name] [namespace] +# +# Defaults: +# cluster: kagenti +# namespace: kagenti-system + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" + +CLUSTER="${1:-kagenti}" +NAMESPACE="${2:-kagenti-system}" +IMAGE="localhost/bundle-service:latest" + +echo "==> Building bundle-service image" +docker build -t "${IMAGE}" -f "${ROOT_DIR}/cmd/bundle-service/Dockerfile" "${ROOT_DIR}" + +echo "==> Loading image into kind cluster '${CLUSTER}'" +kind load docker-image "${IMAGE}" --name "${CLUSTER}" + +echo "==> Ensuring namespace '${NAMESPACE}' exists" +kubectl create namespace "${NAMESPACE}" --dry-run=client -o yaml | kubectl apply -f - + +echo "==> Ensuring CRD is installed" +if [ -f "${ROOT_DIR}/config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml" ]; then + kubectl apply -f "${ROOT_DIR}/config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml" +fi + +echo "==> Applying default global AuthorizationPolicy" +kubectl apply -f "${ROOT_DIR}/config/bundleservice/default-policy.yaml" + +echo "==> Deploying bundle-service to namespace '${NAMESPACE}'" + +# ServiceAccount +kubectl apply -f - < Waiting for rollout" +kubectl rollout status deployment/bundle-service -n "${NAMESPACE}" --timeout=60s + +echo "==> bundle-service deployed successfully" +echo " URL: http://bundle-service.${NAMESPACE}.svc.cluster.local:8080" +echo " To port-forward: kubectl port-forward -n ${NAMESPACE} svc/bundle-service 8080:8080" diff --git a/kagenti-operator/hack/kind-reload-all.sh b/kagenti-operator/hack/kind-reload-all.sh new file mode 100755 index 00000000..8301ec33 --- /dev/null +++ b/kagenti-operator/hack/kind-reload-all.sh @@ -0,0 +1,247 @@ +#!/usr/bin/env bash +# Build all operator services, load them into a Kind cluster, and deploy. +# Creates deployments if they don't exist, updates them if they do. +# +# Services: +# 1. kagenti-controller-manager (operator) +# 2. bundle-service (OPA bundle server) +# +# Usage: +# ./hack/kind-reload-all.sh [kind-cluster-name] [namespace] +# +# Defaults: +# cluster: kagenti +# namespace: kagenti-system + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT_DIR="$(cd "${SCRIPT_DIR}/.." && pwd)" + +CLUSTER="${1:-kagenti}" +NAMESPACE="${2:-kagenti-system}" +CONTAINER_TOOL="${CONTAINER_TOOL:-docker}" +IMAGE_TAG="$(git -C "${ROOT_DIR}" rev-parse --short HEAD)" + +OPERATOR_IMG="localhost/kagenti-operator:${IMAGE_TAG}" +BUNDLE_IMG="localhost/bundle-service:${IMAGE_TAG}" + +echo "============================================" +echo " Building and loading to Kind: ${CLUSTER}" +echo " Namespace: ${NAMESPACE}" +echo " Tag: ${IMAGE_TAG}" +echo "============================================" + +# --- Ensure namespace --- + +echo "" +echo "==> Ensuring namespace '${NAMESPACE}' exists" +kubectl create namespace "${NAMESPACE}" --dry-run=client -o yaml | kubectl apply -f - + +# --- Ensure CRDs --- + +echo "" +echo "==> Ensuring CRDs are installed" +if [ -f "${ROOT_DIR}/config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml" ]; then + kubectl apply -f "${ROOT_DIR}/config/crd/bases/agent.kagenti.dev_authorizationpolicies.yaml" +fi + +# --- Build images --- + +echo "" +echo "==> Building kagenti-operator image" +${CONTAINER_TOOL} build -t "${OPERATOR_IMG}" -f "${ROOT_DIR}/Dockerfile" "${ROOT_DIR}" + +echo "" +echo "==> Building bundle-service image" +${CONTAINER_TOOL} build -t "${BUNDLE_IMG}" -f "${ROOT_DIR}/cmd/bundle-service/Dockerfile" "${ROOT_DIR}" + +# --- Load into Kind --- + +echo "" +echo "==> Loading images into Kind cluster '${CLUSTER}'" +kind load docker-image "${OPERATOR_IMG}" --name "${CLUSTER}" +kind load docker-image "${BUNDLE_IMG}" --name "${CLUSTER}" + +# --- Deploy kagenti-controller-manager --- + +echo "" +echo "==> Deploying kagenti-controller-manager" +if kubectl get deployment kagenti-controller-manager -n "${NAMESPACE}" &>/dev/null; then + kubectl set image deployment/kagenti-controller-manager \ + manager="${OPERATOR_IMG}" \ + -n "${NAMESPACE}" +else + echo " Deployment not found — install the operator with 'make deploy' first" + echo " (the controller-manager requires webhook certs, RBAC, and CRDs from kustomize)" +fi + +# --- Deploy bundle-service --- + +echo "" +echo "==> Deploying bundle-service" + +# ServiceAccount +kubectl apply -f - < Deleting pods to pick up new images" +kubectl delete pods -n "${NAMESPACE}" -l app=bundle-service --wait=false +kubectl delete pods -n "${NAMESPACE}" -l control-plane=controller-manager --wait=false 2>/dev/null || true + +# --- Wait for rollouts --- + +echo "" +echo "==> Waiting for kagenti-controller-manager rollout" +kubectl rollout status deployment/kagenti-controller-manager -n "${NAMESPACE}" --timeout=120s 2>/dev/null || \ + echo " WARNING: kagenti-controller-manager rollout did not complete" + +echo "==> Waiting for bundle-service rollout" +kubectl rollout status deployment/bundle-service -n "${NAMESPACE}" --timeout=60s + +# --- Summary --- + +echo "" +echo "============================================" +echo " Done!" +echo "" +echo " Images loaded:" +echo " ${OPERATOR_IMG}" +echo " ${BUNDLE_IMG}" +echo "" +echo " Namespace: ${NAMESPACE}" +echo " - kagenti-controller-manager" +echo " - bundle-service" +echo "" +echo " bundle-service URL: http://bundle-service.${NAMESPACE}.svc.cluster.local:8080" +echo " To port-forward: kubectl port-forward -n ${NAMESPACE} svc/bundle-service 8080:8080" +echo "============================================" diff --git a/kagenti-operator/internal/bundleservice/bundle/builder.go b/kagenti-operator/internal/bundleservice/bundle/builder.go new file mode 100644 index 00000000..cadfc9df --- /dev/null +++ b/kagenti-operator/internal/bundleservice/bundle/builder.go @@ -0,0 +1,108 @@ +package bundle + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "crypto/sha256" + "encoding/json" + "fmt" + "path/filepath" + "sort" + "strings" +) + +type Policy struct { + Path string + Content string +} + +type manifest struct { + Revision string `json:"revision"` + Roots []string `json:"roots"` +} + +func validatePath(path string) error { + if path == "" { + return fmt.Errorf("empty policy path") + } + if filepath.IsAbs(path) { + return fmt.Errorf("absolute path not allowed: %s", path) + } + if strings.Contains(path, "..") { + return fmt.Errorf("path traversal not allowed: %s", path) + } + clean := filepath.Clean(path) + if strings.HasPrefix(clean, "..") { + return fmt.Errorf("path escapes bundle root: %s", path) + } + return nil +} + +func Build(policies []Policy) ([]byte, string, error) { + for _, p := range policies { + if err := validatePath(p.Path); err != nil { + return nil, "", err + } + } + + sorted := make([]Policy, len(policies)) + copy(sorted, policies) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Path < sorted[j].Path + }) + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + tw := tar.NewWriter(gw) + + for _, p := range sorted { + content := []byte(p.Content) + hdr := &tar.Header{ + Name: p.Path, + Mode: 0644, + Size: int64(len(content)), + } + if err := tw.WriteHeader(hdr); err != nil { + return nil, "", fmt.Errorf("writing tar header for %s: %w", p.Path, err) + } + if _, err := tw.Write(content); err != nil { + return nil, "", fmt.Errorf("writing tar content for %s: %w", p.Path, err) + } + } + + h := sha256.Sum256(buf.Bytes()) + hash := fmt.Sprintf("sha256:%x", h) + + m := manifest{ + Revision: hash, + Roots: []string{"authbridge"}, + } + manifestJSON, err := json.Marshal(m) + if err != nil { + return nil, "", fmt.Errorf("marshaling manifest: %w", err) + } + + hdr := &tar.Header{ + Name: ".manifest", + Mode: 0644, + Size: int64(len(manifestJSON)), + } + if err := tw.WriteHeader(hdr); err != nil { + return nil, "", fmt.Errorf("writing manifest header: %w", err) + } + if _, err := tw.Write(manifestJSON); err != nil { + return nil, "", fmt.Errorf("writing manifest content: %w", err) + } + + if err := tw.Close(); err != nil { + return nil, "", fmt.Errorf("closing tar writer: %w", err) + } + if err := gw.Close(); err != nil { + return nil, "", fmt.Errorf("closing gzip writer: %w", err) + } + + data := buf.Bytes() + finalHash := fmt.Sprintf("sha256:%x", sha256.Sum256(data)) + return data, finalHash, nil +} diff --git a/kagenti-operator/internal/bundleservice/bundle/builder_test.go b/kagenti-operator/internal/bundleservice/bundle/builder_test.go new file mode 100644 index 00000000..643a2f0e --- /dev/null +++ b/kagenti-operator/internal/bundleservice/bundle/builder_test.go @@ -0,0 +1,194 @@ +package bundle + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "encoding/json" + "io" + "testing" +) + +func TestBuild_ProducesValidTarGz(t *testing.T) { + policies := []Policy{ + {Path: "authbridge/inbound/request.rego", Content: "package authbridge.inbound.request\ndefault allow := true\n"}, + } + + data, hash, err := Build(policies) + if err != nil { + t.Fatalf("Build failed: %v", err) + } + if len(data) == 0 { + t.Fatal("empty bundle data") + } + if hash == "" { + t.Fatal("empty hash") + } + if len(hash) < 10 || hash[:7] != "sha256:" { + t.Fatalf("unexpected hash format: %s", hash) + } + + files := extractTarGz(t, data) + if _, ok := files["authbridge/inbound/request.rego"]; !ok { + t.Fatal("missing policy file in bundle") + } + if _, ok := files[".manifest"]; !ok { + t.Fatal("missing .manifest in bundle") + } +} + +func TestBuild_ManifestHasRevisionAndRoots(t *testing.T) { + policies := []Policy{ + {Path: "authbridge/outbound/request.rego", Content: "package authbridge.outbound.request\ndefault allow := true\n"}, + } + + data, _, err := Build(policies) + if err != nil { + t.Fatalf("Build failed: %v", err) + } + + files := extractTarGz(t, data) + var m manifest + if err := json.Unmarshal([]byte(files[".manifest"]), &m); err != nil { + t.Fatalf("unmarshal manifest: %v", err) + } + if m.Revision == "" { + t.Fatal("manifest revision is empty") + } + if len(m.Roots) != 1 || m.Roots[0] != "authbridge" { + t.Fatalf("unexpected roots: %v", m.Roots) + } +} + +func TestBuild_Deterministic(t *testing.T) { + policies := []Policy{ + {Path: "authbridge/inbound/request.rego", Content: "package authbridge.inbound.request\ndefault allow := true\n"}, + {Path: "authbridge/outbound/request.rego", Content: "package authbridge.outbound.request\ndefault allow := false\n"}, + } + + data1, hash1, _ := Build(policies) + data2, hash2, _ := Build(policies) + + if hash1 != hash2 { + t.Fatalf("non-deterministic hashes: %s vs %s", hash1, hash2) + } + if !bytes.Equal(data1, data2) { + t.Fatal("non-deterministic bundle bytes") + } +} + +func TestBuild_SortsByPath(t *testing.T) { + policies := []Policy{ + {Path: "authbridge/outbound/request.rego", Content: "out"}, + {Path: "authbridge/inbound/request.rego", Content: "in"}, + } + + data, _, err := Build(policies) + if err != nil { + t.Fatalf("Build failed: %v", err) + } + + names := extractTarGzOrder(t, data) + if len(names) < 2 { + t.Fatal("expected at least 2 entries") + } + if names[0] != "authbridge/inbound/request.rego" { + t.Fatalf("expected inbound first, got %s", names[0]) + } + if names[1] != "authbridge/outbound/request.rego" { + t.Fatalf("expected outbound second, got %s", names[1]) + } +} + +func TestBuild_RejectsPathTraversal(t *testing.T) { + cases := []struct { + name string + path string + }{ + {"dotdot prefix", "../etc/passwd"}, + {"dotdot middle", "authbridge/../../../etc/shadow"}, + {"absolute path", "/etc/passwd"}, + {"empty path", ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + policies := []Policy{{Path: tc.path, Content: "package test\n"}} + _, _, err := Build(policies) + if err == nil { + t.Fatalf("expected error for path %q, got nil", tc.path) + } + }) + } +} + +func TestBuild_AcceptsValidPaths(t *testing.T) { + cases := []string{ + "authbridge/inbound/request.rego", + "authbridge/outbound/response.rego", + "authbridge/tools/mcp-restrict.rego", + } + for _, path := range cases { + t.Run(path, func(t *testing.T) { + policies := []Policy{{Path: path, Content: "package test\n"}} + _, _, err := Build(policies) + if err != nil { + t.Fatalf("unexpected error for valid path %q: %v", path, err) + } + }) + } +} + +func extractTarGz(t *testing.T, data []byte) map[string]string { + t.Helper() + gr, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + t.Fatalf("gzip reader: %v", err) + } + defer func() { + if err := gr.Close(); err != nil { + t.Errorf("failed to close gzip reader: %v", err) + } + }() + + tr := tar.NewReader(gr) + files := make(map[string]string) + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("tar next: %v", err) + } + content, _ := io.ReadAll(tr) + files[hdr.Name] = string(content) + } + return files +} + +func extractTarGzOrder(t *testing.T, data []byte) []string { + t.Helper() + gr, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + t.Fatalf("gzip reader: %v", err) + } + defer func() { + if err := gr.Close(); err != nil { + t.Errorf("failed to close gzip reader: %v", err) + } + }() + + tr := tar.NewReader(gr) + var names []string + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("tar next: %v", err) + } + names = append(names, hdr.Name) + } + return names +} diff --git a/kagenti-operator/internal/bundleservice/bundle/hash.go b/kagenti-operator/internal/bundleservice/bundle/hash.go new file mode 100644 index 00000000..9d7a38d0 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/bundle/hash.go @@ -0,0 +1,28 @@ +package bundle + +import ( + "crypto/sha256" + "fmt" + "io" + "sort" +) + +func ContentHash(policies []Policy) string { + if len(policies) == 0 { + return "sha256:empty" + } + sorted := make([]Policy, len(policies)) + copy(sorted, policies) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Path < sorted[j].Path + }) + + h := sha256.New() + for _, p := range sorted { + _, _ = io.WriteString(h, p.Path) + _, _ = io.WriteString(h, "\x00") + _, _ = io.WriteString(h, p.Content) + _, _ = io.WriteString(h, "\x00") + } + return fmt.Sprintf("sha256:%x", h.Sum(nil)) +} diff --git a/kagenti-operator/internal/bundleservice/bundle/hash_test.go b/kagenti-operator/internal/bundleservice/bundle/hash_test.go new file mode 100644 index 00000000..dab5ec72 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/bundle/hash_test.go @@ -0,0 +1,58 @@ +package bundle + +import "testing" + +func TestContentHash_Deterministic(t *testing.T) { + policies := []Policy{ + {Path: "authbridge/agent/inbound.rego", Content: "package test\ndefault allow := true\n"}, + {Path: "authbridge/global/base.rego", Content: "package global\n"}, + } + + h1 := ContentHash(policies) + h2 := ContentHash(policies) + + if h1 != h2 { + t.Fatalf("non-deterministic: %s vs %s", h1, h2) + } +} + +func TestContentHash_OrderIndependent(t *testing.T) { + p1 := []Policy{ + {Path: "b.rego", Content: "b"}, + {Path: "a.rego", Content: "a"}, + } + p2 := []Policy{ + {Path: "a.rego", Content: "a"}, + {Path: "b.rego", Content: "b"}, + } + + if ContentHash(p1) != ContentHash(p2) { + t.Fatal("hash should be order-independent") + } +} + +func TestContentHash_DifferentContent(t *testing.T) { + p1 := []Policy{{Path: "a.rego", Content: "v1"}} + p2 := []Policy{{Path: "a.rego", Content: "v2"}} + + if ContentHash(p1) == ContentHash(p2) { + t.Fatal("different content should produce different hash") + } +} + +func TestContentHash_Empty(t *testing.T) { + h := ContentHash(nil) + if h != "sha256:empty" { + t.Fatalf("unexpected empty hash: %s", h) + } +} + +func TestContentHash_PathContentSeparation(t *testing.T) { + // Ensure path+content boundary doesn't cause collisions + p1 := []Policy{{Path: "ab", Content: "cd"}} + p2 := []Policy{{Path: "a", Content: "bcd"}} + + if ContentHash(p1) == ContentHash(p2) { + t.Fatal("different path/content splits should not collide") + } +} diff --git a/kagenti-operator/internal/bundleservice/cache/bundle.go b/kagenti-operator/internal/bundleservice/cache/bundle.go new file mode 100644 index 00000000..4a75a85e --- /dev/null +++ b/kagenti-operator/internal/bundleservice/cache/bundle.go @@ -0,0 +1,130 @@ +package cache + +import ( + "container/list" + "sync" + "time" +) + +const ( + DefaultBundleMaxEntries = 100 + DefaultBundleTTL = 1 * time.Minute +) + +type bundleEntry struct { + key string + data []byte + etag string + expires time.Time +} + +type BundleCache struct { + mu sync.Mutex + maxEntries int + ttl time.Duration + entries map[string]*list.Element + evictList *list.List +} + +func NewBundleCache(maxEntries int, ttl time.Duration) *BundleCache { + if maxEntries <= 0 { + maxEntries = DefaultBundleMaxEntries + } + if ttl <= 0 { + ttl = DefaultBundleTTL + } + return &BundleCache{ + maxEntries: maxEntries, + ttl: ttl, + entries: make(map[string]*list.Element), + evictList: list.New(), + } +} + +func (c *BundleCache) Get(agentID string) ([]byte, string, bool) { + c.mu.Lock() + defer c.mu.Unlock() + el, ok := c.entries[agentID] + if !ok { + return nil, "", false + } + e := el.Value.(*bundleEntry) + if time.Now().After(e.expires) { + c.removeElement(el) + return nil, "", false + } + c.evictList.MoveToFront(el) + return e.data, e.etag, true +} + +func (c *BundleCache) Set(agentID string, data []byte, etag string) { + c.mu.Lock() + defer c.mu.Unlock() + + if el, ok := c.entries[agentID]; ok { + c.evictList.MoveToFront(el) + e := el.Value.(*bundleEntry) + e.data = data + e.etag = etag + e.expires = time.Now().Add(c.ttl) + return + } + + e := &bundleEntry{ + key: agentID, + data: data, + etag: etag, + expires: time.Now().Add(c.ttl), + } + el := c.evictList.PushFront(e) + c.entries[agentID] = el + + if c.evictList.Len() > c.maxEntries { + c.removeLRU() + } +} + +func (c *BundleCache) Invalidate(agentID string) { + c.mu.Lock() + defer c.mu.Unlock() + if el, ok := c.entries[agentID]; ok { + c.removeElement(el) + } +} + +func (c *BundleCache) InvalidateAll() { + c.mu.Lock() + defer c.mu.Unlock() + c.entries = make(map[string]*list.Element) + c.evictList.Init() +} + +func (c *BundleCache) InvalidateFunc(fn func(agentID string) bool) { + c.mu.Lock() + defer c.mu.Unlock() + for id, el := range c.entries { + if fn(id) { + c.removeElement(el) + delete(c.entries, id) + } + } +} + +func (c *BundleCache) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.evictList.Len() +} + +func (c *BundleCache) removeLRU() { + el := c.evictList.Back() + if el != nil { + c.removeElement(el) + } +} + +func (c *BundleCache) removeElement(el *list.Element) { + c.evictList.Remove(el) + e := el.Value.(*bundleEntry) + delete(c.entries, e.key) +} diff --git a/kagenti-operator/internal/bundleservice/cache/bundle_test.go b/kagenti-operator/internal/bundleservice/cache/bundle_test.go new file mode 100644 index 00000000..d800f17f --- /dev/null +++ b/kagenti-operator/internal/bundleservice/cache/bundle_test.go @@ -0,0 +1,81 @@ +package cache + +import ( + "testing" + "time" +) + +func TestBundleCache_SetAndGet(t *testing.T) { + c := NewBundleCache(10, 1*time.Minute) + c.Set("agent1", []byte("data"), "sha256:abc") + + data, etag, ok := c.Get("agent1") + if !ok { + t.Fatal("expected hit") + } + if string(data) != "data" { + t.Fatalf("unexpected data: %s", data) + } + if etag != "sha256:abc" { + t.Fatalf("unexpected etag: %s", etag) + } +} + +func TestBundleCache_Miss(t *testing.T) { + c := NewBundleCache(10, 1*time.Minute) + _, _, ok := c.Get("nonexistent") + if ok { + t.Fatal("expected miss") + } +} + +func TestBundleCache_ExpiresAfterTTL(t *testing.T) { + c := NewBundleCache(10, 50*time.Millisecond) + c.Set("agent1", []byte("data"), "hash") + + time.Sleep(60 * time.Millisecond) + + _, _, ok := c.Get("agent1") + if ok { + t.Fatal("expected miss after TTL expiry") + } +} + +func TestBundleCache_LRUEviction(t *testing.T) { + c := NewBundleCache(3, 1*time.Minute) + c.Set("a", []byte("1"), "h1") + c.Set("b", []byte("2"), "h2") + c.Set("c", []byte("3"), "h3") + c.Set("d", []byte("4"), "h4") // evicts "a" + + _, _, ok := c.Get("a") + if ok { + t.Fatal("expected 'a' to be evicted") + } + _, _, ok = c.Get("d") + if !ok { + t.Fatal("expected 'd' to be present") + } +} + +func TestBundleCache_InvalidateAll(t *testing.T) { + c := NewBundleCache(10, 1*time.Minute) + c.Set("a", []byte("1"), "h1") + c.Set("b", []byte("2"), "h2") + c.InvalidateAll() + + if c.Len() != 0 { + t.Fatalf("expected 0 entries, got %d", c.Len()) + } +} + +func TestBundleCache_Invalidate(t *testing.T) { + c := NewBundleCache(10, 1*time.Minute) + c.Set("a", []byte("1"), "h1") + c.Invalidate("a") + + _, _, ok := c.Get("a") + if ok { + t.Fatal("expected miss after invalidation") + } +} diff --git a/kagenti-operator/internal/bundleservice/cache/etag.go b/kagenti-operator/internal/bundleservice/cache/etag.go new file mode 100644 index 00000000..5fe5637d --- /dev/null +++ b/kagenti-operator/internal/bundleservice/cache/etag.go @@ -0,0 +1,55 @@ +package cache + +import "sync" + +type ETagCache struct { + mu sync.RWMutex + entries map[string]string // clientID → content hash +} + +func NewETagCache() *ETagCache { + return &ETagCache{ + entries: make(map[string]string), + } +} + +func (c *ETagCache) Get(agentID string) (string, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + hash, ok := c.entries[agentID] + return hash, ok +} + +func (c *ETagCache) Set(agentID, hash string) { + c.mu.Lock() + defer c.mu.Unlock() + c.entries[agentID] = hash +} + +func (c *ETagCache) Invalidate(agentID string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.entries, agentID) +} + +func (c *ETagCache) InvalidateAll() { + c.mu.Lock() + defer c.mu.Unlock() + c.entries = make(map[string]string) +} + +func (c *ETagCache) InvalidateFunc(fn func(agentID string) bool) { + c.mu.Lock() + defer c.mu.Unlock() + for id := range c.entries { + if fn(id) { + delete(c.entries, id) + } + } +} + +func (c *ETagCache) Len() int { + c.mu.RLock() + defer c.mu.RUnlock() + return len(c.entries) +} diff --git a/kagenti-operator/internal/bundleservice/cache/etag_test.go b/kagenti-operator/internal/bundleservice/cache/etag_test.go new file mode 100644 index 00000000..04a8cf09 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/cache/etag_test.go @@ -0,0 +1,69 @@ +package cache + +import ( + "strings" + "testing" +) + +func TestETagCache_SetAndGet(t *testing.T) { + c := NewETagCache() + c.Set("agent1", "sha256:abc") + + hash, ok := c.Get("agent1") + if !ok { + t.Fatal("expected hit") + } + if hash != "sha256:abc" { + t.Fatalf("unexpected hash: %s", hash) + } +} + +func TestETagCache_Miss(t *testing.T) { + c := NewETagCache() + _, ok := c.Get("nonexistent") + if ok { + t.Fatal("expected miss") + } +} + +func TestETagCache_Invalidate(t *testing.T) { + c := NewETagCache() + c.Set("agent1", "hash") + c.Invalidate("agent1") + + _, ok := c.Get("agent1") + if ok { + t.Fatal("expected miss after invalidation") + } +} + +func TestETagCache_InvalidateAll(t *testing.T) { + c := NewETagCache() + c.Set("a", "h1") + c.Set("b", "h2") + c.Set("c", "h3") + c.InvalidateAll() + + if c.Len() != 0 { + t.Fatalf("expected 0 entries, got %d", c.Len()) + } +} + +func TestETagCache_InvalidateFunc(t *testing.T) { + c := NewETagCache() + c.Set("ns1-agent1", "h1") + c.Set("ns1-agent2", "h2") + c.Set("ns2-agent3", "h3") + + c.InvalidateFunc(func(id string) bool { + return strings.HasPrefix(id, "ns1-") + }) + + if c.Len() != 1 { + t.Fatalf("expected 1 entry, got %d", c.Len()) + } + _, ok := c.Get("ns2-agent3") + if !ok { + t.Fatal("expected ns2-agent3 to survive") + } +} diff --git a/kagenti-operator/internal/bundleservice/cache/policy.go b/kagenti-operator/internal/bundleservice/cache/policy.go new file mode 100644 index 00000000..13050c1b --- /dev/null +++ b/kagenti-operator/internal/bundleservice/cache/policy.go @@ -0,0 +1,68 @@ +package cache + +import "sync" + +type PolicyEntry struct { + Path string + Content string +} + +type policyData struct { + policies []PolicyEntry + hash string +} + +type PolicyCache struct { + mu sync.RWMutex + global *policyData + ns map[string]*policyData +} + +func NewPolicyCache() *PolicyCache { + return &PolicyCache{ + ns: make(map[string]*policyData), + } +} + +func (c *PolicyCache) GetGlobal() ([]PolicyEntry, string, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + if c.global == nil { + return nil, "", false + } + return c.global.policies, c.global.hash, true +} + +func (c *PolicyCache) SetGlobal(policies []PolicyEntry, hash string) { + c.mu.Lock() + defer c.mu.Unlock() + c.global = &policyData{policies: policies, hash: hash} +} + +func (c *PolicyCache) InvalidateGlobal() { + c.mu.Lock() + defer c.mu.Unlock() + c.global = nil +} + +func (c *PolicyCache) GetNamespace(ns string) ([]PolicyEntry, string, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + d, ok := c.ns[ns] + if !ok { + return nil, "", false + } + return d.policies, d.hash, true +} + +func (c *PolicyCache) SetNamespace(ns string, policies []PolicyEntry, hash string) { + c.mu.Lock() + defer c.mu.Unlock() + c.ns[ns] = &policyData{policies: policies, hash: hash} +} + +func (c *PolicyCache) InvalidateNamespace(ns string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.ns, ns) +} diff --git a/kagenti-operator/internal/bundleservice/cache/policy_test.go b/kagenti-operator/internal/bundleservice/cache/policy_test.go new file mode 100644 index 00000000..632e76e7 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/cache/policy_test.go @@ -0,0 +1,78 @@ +package cache + +import "testing" + +func TestPolicyCache_Global(t *testing.T) { + c := NewPolicyCache() + + _, _, ok := c.GetGlobal() + if ok { + t.Fatal("expected miss before set") + } + + policies := []PolicyEntry{{Path: "inbound/request.rego", Content: "package test"}} + c.SetGlobal(policies, "hash1") + + got, hash, ok := c.GetGlobal() + if !ok { + t.Fatal("expected hit") + } + if hash != "hash1" { + t.Fatalf("unexpected hash: %s", hash) + } + if len(got) != 1 || got[0].Path != "inbound/request.rego" { + t.Fatal("unexpected policies") + } + + c.InvalidateGlobal() + _, _, ok = c.GetGlobal() + if ok { + t.Fatal("expected miss after invalidation") + } +} + +func TestPolicyCache_Namespace(t *testing.T) { + c := NewPolicyCache() + + _, _, ok := c.GetNamespace("test-ns") + if ok { + t.Fatal("expected miss before set") + } + + policies := []PolicyEntry{{Path: "outbound/request.rego", Content: "package ns"}} + c.SetNamespace("test-ns", policies, "hash2") + + got, hash, ok := c.GetNamespace("test-ns") + if !ok { + t.Fatal("expected hit") + } + if hash != "hash2" { + t.Fatalf("unexpected hash: %s", hash) + } + if len(got) != 1 || got[0].Path != "outbound/request.rego" { + t.Fatal("unexpected policies") + } + + c.InvalidateNamespace("test-ns") + _, _, ok = c.GetNamespace("test-ns") + if ok { + t.Fatal("expected miss after invalidation") + } +} + +func TestPolicyCache_MultipleNamespaces(t *testing.T) { + c := NewPolicyCache() + c.SetNamespace("ns1", []PolicyEntry{{Path: "a.rego", Content: "a"}}, "h1") + c.SetNamespace("ns2", []PolicyEntry{{Path: "b.rego", Content: "b"}}, "h2") + + c.InvalidateNamespace("ns1") + + _, _, ok := c.GetNamespace("ns1") + if ok { + t.Fatal("expected miss for ns1") + } + _, _, ok = c.GetNamespace("ns2") + if !ok { + t.Fatal("expected hit for ns2") + } +} diff --git a/kagenti-operator/internal/bundleservice/handler/handler.go b/kagenti-operator/internal/bundleservice/handler/handler.go new file mode 100644 index 00000000..97f6b179 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/handler/handler.go @@ -0,0 +1,99 @@ +package handler + +import ( + "context" + "errors" + "log/slog" + "net/http" + + "github.com/kagenti/operator/internal/bundleservice/identity" + "github.com/kagenti/operator/internal/bundleservice/provider" +) + +type BundleProvider interface { + GetBundle(ctx context.Context, id identity.ClientIdentity, ifNoneMatch string) ([]byte, string, error) +} + +type ReadinessChecker interface { + Ready() bool +} + +type Handler struct { + provider BundleProvider + readyz ReadinessChecker + verifier identity.Verifier +} + +func New(p BundleProvider, readyz ReadinessChecker, verifier identity.Verifier) *Handler { + return &Handler{provider: p, readyz: readyz, verifier: verifier} +} + +func (h *Handler) RegisterRoutes(mux *http.ServeMux) { + mux.HandleFunc("/bundles", h.serveBundle) + mux.HandleFunc("/healthz", h.healthz) + mux.HandleFunc("/readyz", h.readyz_) +} + +func (h *Handler) serveBundle(w http.ResponseWriter, r *http.Request) { + slog.Info("bundle request received", "url", r.URL.String(), "method", r.Method) + + if h.readyz != nil && !h.readyz.Ready() { + http.Error(w, "service not ready", http.StatusServiceUnavailable) + return + } + + id, err := identity.FromRequest(r) + if err != nil { + http.Error(w, "invalid client identity", http.StatusBadRequest) + return + } + + if err := h.verifier.Verify(r, id); err != nil { + http.Error(w, "unauthorized", http.StatusForbidden) + return + } + + // Extract If-None-Match, strip quotes + ifNoneMatch := r.Header.Get("If-None-Match") + if len(ifNoneMatch) >= 2 && ifNoneMatch[0] == '"' && ifNoneMatch[len(ifNoneMatch)-1] == '"' { + ifNoneMatch = ifNoneMatch[1 : len(ifNoneMatch)-1] + } + + data, etag, err := h.provider.GetBundle(r.Context(), id, ifNoneMatch) + if err != nil { + if errors.Is(err, provider.ErrBundleTooLarge) { + http.Error(w, "bundle too large", http.StatusRequestEntityTooLarge) + return + } + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + + // nil data with non-empty etag signals 304 + if data == nil && etag != "" { + slog.Info("bundle response", "url", r.URL.String(), "status", http.StatusNotModified) + w.WriteHeader(http.StatusNotModified) + return + } + + quotedETag := `"` + etag + `"` + w.Header().Set("Content-Type", "application/gzip") + w.Header().Set("ETag", quotedETag) + w.Header().Set("Cache-Control", "max-age=0, must-revalidate") + _, _ = w.Write(data) + slog.Info("bundle response", "url", r.URL.String(), "status", http.StatusOK, "size", len(data)) +} + +func (h *Handler) healthz(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) +} + +func (h *Handler) readyz_(w http.ResponseWriter, _ *http.Request) { + if h.readyz == nil || h.readyz.Ready() { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + return + } + http.Error(w, "not ready", http.StatusServiceUnavailable) +} diff --git a/kagenti-operator/internal/bundleservice/handler/handler_test.go b/kagenti-operator/internal/bundleservice/handler/handler_test.go new file mode 100644 index 00000000..788bc6da --- /dev/null +++ b/kagenti-operator/internal/bundleservice/handler/handler_test.go @@ -0,0 +1,211 @@ +package handler + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/kagenti/operator/internal/bundleservice/identity" + "github.com/kagenti/operator/internal/bundleservice/provider" +) + +type mockProvider struct { + bundles map[string]struct { + data []byte + etag string + } +} + +func (m *mockProvider) GetBundle(_ context.Context, id identity.ClientIdentity, ifNoneMatch string) ([]byte, string, error) { + key := id.CacheKey() + b, ok := m.bundles[key] + if !ok { + return []byte("empty-bundle"), "sha256:empty", nil + } + if ifNoneMatch != "" && ifNoneMatch == b.etag { + return nil, b.etag, nil + } + return b.data, b.etag, nil +} + +type mockReadiness struct { + ready bool +} + +func (m *mockReadiness) Ready() bool { return m.ready } + +func newTestHandler(p BundleProvider, readyz ReadinessChecker) http.Handler { + h := New(p, readyz, identity.NoopVerifier{}) + mux := http.NewServeMux() + h.RegisterRoutes(mux) + return mux +} + +func bundleURL(ns, name string) string { + return "/bundles?spiffe=localtest.me/ns/" + ns + "/sa/" + name +} + +func TestServeBundle_200(t *testing.T) { + p := &mockProvider{bundles: map[string]struct { + data []byte + etag string + }{ + "default/my-agent": {data: []byte("bundle-data"), etag: "sha256:abc123"}, + }} + + srv := newTestHandler(p, &mockReadiness{ready: true}) + req := httptest.NewRequest("GET", bundleURL("default", "my-agent"), nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if w.Header().Get("Content-Type") != "application/gzip" { + t.Fatalf("unexpected content-type: %s", w.Header().Get("Content-Type")) + } + if w.Header().Get("ETag") != `"sha256:abc123"` { + t.Fatalf("unexpected etag: %s", w.Header().Get("ETag")) + } + if w.Body.String() != "bundle-data" { + t.Fatalf("unexpected body: %s", w.Body.String()) + } +} + +func TestServeBundle_304(t *testing.T) { + p := &mockProvider{bundles: map[string]struct { + data []byte + etag string + }{ + "default/my-agent": {data: []byte("bundle-data"), etag: "sha256:abc123"}, + }} + + srv := newTestHandler(p, &mockReadiness{ready: true}) + req := httptest.NewRequest("GET", bundleURL("default", "my-agent"), nil) + req.Header.Set("If-None-Match", `"sha256:abc123"`) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusNotModified { + t.Fatalf("expected 304, got %d", w.Code) + } +} + +func TestServeBundle_UnknownClient(t *testing.T) { + p := &mockProvider{bundles: map[string]struct { + data []byte + etag string + }{}} + + srv := newTestHandler(p, &mockReadiness{ready: true}) + req := httptest.NewRequest("GET", bundleURL("default", "no-such-agent"), nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestServeBundle_BundleTooLarge(t *testing.T) { + p := &errorProvider{err: provider.ErrBundleTooLarge} + + srv := newTestHandler(p, &mockReadiness{ready: true}) + req := httptest.NewRequest("GET", bundleURL("default", "big-agent"), nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusRequestEntityTooLarge { + t.Fatalf("expected 413, got %d", w.Code) + } +} + +func TestServeBundle_MissingQueryParam(t *testing.T) { + p := &mockProvider{} + srv := newTestHandler(p, &mockReadiness{ready: true}) + req := httptest.NewRequest("GET", "/bundles", nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d", w.Code) + } +} + +func TestServeBundle_InvalidSpiffeID(t *testing.T) { + p := &mockProvider{} + srv := newTestHandler(p, &mockReadiness{ready: true}) + + cases := []struct { + name string + url string + }{ + {"missing path", "/bundles?spiffe=localtest.me"}, + {"wrong format", "/bundles?spiffe=localtest.me/wrong/path"}, + {"empty value", "/bundles?spiffe="}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + req := httptest.NewRequest("GET", tc.url, nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400 for %s, got %d: %s", tc.url, w.Code, w.Body.String()) + } + }) + } +} + +func TestServeBundle_NotReady(t *testing.T) { + p := &mockProvider{} + srv := newTestHandler(p, &mockReadiness{ready: false}) + req := httptest.NewRequest("GET", bundleURL("default", "my-agent"), nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %d", w.Code) + } +} + +func TestHealthz(t *testing.T) { + srv := newTestHandler(&mockProvider{}, &mockReadiness{ready: true}) + req := httptest.NewRequest("GET", "/healthz", nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } +} + +func TestReadyz_Ready(t *testing.T) { + srv := newTestHandler(&mockProvider{}, &mockReadiness{ready: true}) + req := httptest.NewRequest("GET", "/readyz", nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } +} + +func TestReadyz_NotReady(t *testing.T) { + srv := newTestHandler(&mockProvider{}, &mockReadiness{ready: false}) + req := httptest.NewRequest("GET", "/readyz", nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %d", w.Code) + } +} + +type errorProvider struct { + err error +} + +func (e *errorProvider) GetBundle(_ context.Context, _ identity.ClientIdentity, _ string) ([]byte, string, error) { + return nil, "", e.err +} diff --git a/kagenti-operator/internal/bundleservice/identity/identity.go b/kagenti-operator/internal/bundleservice/identity/identity.go new file mode 100644 index 00000000..e1d62bc9 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/identity/identity.go @@ -0,0 +1,61 @@ +package identity + +import ( + "fmt" + "net/http" + + "github.com/kagenti/operator/internal/bundleservice/spiffeid" +) + +// ClientIdentity represents a client's namespace and name derived from the +// request query parameters. The query parameter name identifies the scheme +// (e.g. "spiffe"), and the value is the scheme-specific identifier validated +// against that scheme's parser. +type ClientIdentity struct { + Namespace string + Name string +} + +// CacheKey returns a unique key for caching bundles per identity. +func (c ClientIdentity) CacheKey() string { + return c.Namespace + "/" + c.Name +} + +// FromRequest extracts the ClientIdentity from request query parameters. +// The query parameter name is the scheme, the value is the identifier: +// +// GET /bundles?spiffe=trust-domain/ns/{ns}/sa/{name} +// +// The value is always validated against the scheme's strict parser. +// Supported schemes: spiffe. +func FromRequest(r *http.Request) (ClientIdentity, error) { + q := r.URL.Query() + + if v := q.Get("spiffe"); v != "" { + id, err := spiffeid.Parse(v) + if err != nil { + return ClientIdentity{}, err + } + return ClientIdentity{ + Namespace: id.Namespace, + Name: id.Name, + }, nil + } + + return ClientIdentity{}, fmt.Errorf("missing client identity query parameter") +} + +// Verifier verifies that the caller is authorized to request bundles for the +// given identity. Implementations may check mTLS client certificates (SPIFFE) +// or JWT bearer tokens (Kubernetes service accounts). +type Verifier interface { + Verify(r *http.Request, id ClientIdentity) error +} + +// NoopVerifier performs no verification. Used when the service does not yet +// terminate TLS or validate tokens (verification is handled upstream). +type NoopVerifier struct{} + +func (NoopVerifier) Verify(_ *http.Request, _ ClientIdentity) error { + return nil +} diff --git a/kagenti-operator/internal/bundleservice/identity/identity_test.go b/kagenti-operator/internal/bundleservice/identity/identity_test.go new file mode 100644 index 00000000..5d4f903a --- /dev/null +++ b/kagenti-operator/internal/bundleservice/identity/identity_test.go @@ -0,0 +1,86 @@ +package identity + +import ( + "net/http/httptest" + "testing" +) + +func TestFromRequest(t *testing.T) { + tests := []struct { + name string + url string + wantNS string + wantN string + wantErr bool + }{ + { + name: "valid spiffe", + url: "/bundles?spiffe=localtest.me/ns/default/sa/my-agent", + wantNS: "default", + wantN: "my-agent", + }, + { + name: "different namespace", + url: "/bundles?spiffe=prod.io/ns/kube-system/sa/gateway", + wantNS: "kube-system", + wantN: "gateway", + }, + { + name: "missing query param", + url: "/bundles", + wantErr: true, + }, + { + name: "invalid spiffe value", + url: "/bundles?spiffe=bad-format", + wantErr: true, + }, + { + name: "empty spiffe value", + url: "/bundles?spiffe=", + wantErr: true, + }, + { + name: "unknown scheme only", + url: "/bundles?unknown=something", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest("GET", tt.url, nil) + id, err := FromRequest(req) + if tt.wantErr { + if err == nil { + t.Fatalf("expected error, got %+v", id) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if id.Namespace != tt.wantNS { + t.Errorf("namespace = %q, want %q", id.Namespace, tt.wantNS) + } + if id.Name != tt.wantN { + t.Errorf("name = %q, want %q", id.Name, tt.wantN) + } + }) + } +} + +func TestClientIdentity_CacheKey(t *testing.T) { + id := ClientIdentity{Namespace: "ns1", Name: "agent1"} + if got := id.CacheKey(); got != "ns1/agent1" { + t.Errorf("CacheKey() = %q, want %q", got, "ns1/agent1") + } +} + +func TestNoopVerifier(t *testing.T) { + req := httptest.NewRequest("GET", "/bundles/test.tar.gz", nil) + id := ClientIdentity{Namespace: "default", Name: "agent"} + if err := (NoopVerifier{}).Verify(req, id); err != nil { + t.Fatalf("NoopVerifier should never error, got: %v", err) + } +} diff --git a/kagenti-operator/internal/bundleservice/provider/errors.go b/kagenti-operator/internal/bundleservice/provider/errors.go new file mode 100644 index 00000000..cdf22b8d --- /dev/null +++ b/kagenti-operator/internal/bundleservice/provider/errors.go @@ -0,0 +1,8 @@ +package provider + +import "errors" + +var ( + ErrBundleTooLarge = errors.New("bundle exceeds size limit") + ErrNotFound = errors.New("policy not found") +) diff --git a/kagenti-operator/internal/bundleservice/provider/provider.go b/kagenti-operator/internal/bundleservice/provider/provider.go new file mode 100644 index 00000000..a3008b2c --- /dev/null +++ b/kagenti-operator/internal/bundleservice/provider/provider.go @@ -0,0 +1,207 @@ +package provider + +import ( + "context" + "log/slog" + + "golang.org/x/sync/singleflight" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + v1alpha1 "github.com/kagenti/operator/api/v1alpha1" + "github.com/kagenti/operator/internal/bundleservice/bundle" + "github.com/kagenti/operator/internal/bundleservice/cache" + "github.com/kagenti/operator/internal/bundleservice/identity" + "github.com/kagenti/operator/internal/bundleservice/watcher" +) + +const ( + maxBundleSize = 5 * 1024 * 1024 // 5MB + maxConcurrentBuilds = 10 +) + +type buildResult struct { + data []byte + hash string +} + +type Provider struct { + etagCache *cache.ETagCache + bundleCache *cache.BundleCache + policyCache *cache.PolicyCache + watcher *watcher.Watcher + buildGroup singleflight.Group + buildSem chan struct{} +} + +func New(etagCache *cache.ETagCache, bundleCache *cache.BundleCache, policyCache *cache.PolicyCache, w *watcher.Watcher) *Provider { + return &Provider{ + etagCache: etagCache, + bundleCache: bundleCache, + policyCache: policyCache, + watcher: w, + buildSem: make(chan struct{}, maxConcurrentBuilds), + } +} + +func (p *Provider) GetBundle(ctx context.Context, id identity.ClientIdentity, ifNoneMatch string) ([]byte, string, error) { + key := id.CacheKey() + + // Fast path: check ETag cache for 304 + if cachedHash, ok := p.etagCache.Get(key); ok { + if ifNoneMatch != "" && ifNoneMatch == cachedHash { + return nil, cachedHash, nil + } + } + + // Check bundle cache (1-min TTL) + if data, etag, ok := p.bundleCache.Get(key); ok { + p.etagCache.Set(key, etag) + return data, etag, nil + } + + // Build via singleflight (dedup concurrent requests for same client) + // Semaphore limits concurrent builds to avoid overwhelming etcd/API server + result, err, _ := p.buildGroup.Do(key, func() (interface{}, error) { + select { + case p.buildSem <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + defer func() { <-p.buildSem }() + return p.buildBundle(ctx, id) + }) + if err != nil { + return nil, "", err + } + + br := result.(*buildResult) + return br.data, br.hash, nil +} + +func (p *Provider) buildBundle(ctx context.Context, id identity.ClientIdentity) (*buildResult, error) { + _ = ctx + key := id.CacheKey() + + // Gather policies from all three tiers + var allPolicies []bundle.Policy + + // 1. Global tier + globalPolicies := p.getGlobalPolicies() + for _, pol := range globalPolicies { + allPolicies = append(allPolicies, bundle.Policy{ + Path: "authbridge/global/" + pol.Path, + Content: pol.Content, + }) + } + + // 2. Namespace tier + nsPolicies := p.getNamespacePolicies(id.Namespace) + for _, pol := range nsPolicies { + allPolicies = append(allPolicies, bundle.Policy{ + Path: "authbridge/ns/" + pol.Path, + Content: pol.Content, + }) + } + + // 3. Client tier — look up CR by name in the namespace + clientPolicies, err := p.getClientPolicies(id) + if err != nil { + return nil, err + } + for _, pol := range clientPolicies { + allPolicies = append(allPolicies, bundle.Policy{ + Path: "authbridge/client/" + pol.Path, + Content: pol.Content, + }) + } + + // Compute content hash and update ETag cache + hash := bundle.ContentHash(allPolicies) + p.etagCache.Set(key, hash) + + // Build the tar.gz + data, _, err := bundle.Build(allPolicies) + if err != nil { + return nil, err + } + + if len(data) > maxBundleSize { + slog.Error("bundle exceeds size limit", "ns", id.Namespace, "name", id.Name, "size", len(data), "limit", maxBundleSize) + return nil, ErrBundleTooLarge + } + + // Store in bundle cache (short TTL for replica bursts) + p.bundleCache.Set(key, data, hash) + slog.Info("bundle built", "ns", id.Namespace, "name", id.Name, "hash", hash) + + return &buildResult{data: data, hash: hash}, nil +} + +func (p *Provider) getGlobalPolicies() []cache.PolicyEntry { + if policies, _, ok := p.policyCache.GetGlobal(); ok { + return policies + } + + objs := p.watcher.GetGlobalPolicies() + policies := extractPolicies(objs) + hash := hashPolicyEntries(policies) + p.policyCache.SetGlobal(policies, hash) + return policies +} + +func (p *Provider) getNamespacePolicies(ns string) []cache.PolicyEntry { + if policies, _, ok := p.policyCache.GetNamespace(ns); ok { + return policies + } + + objs := p.watcher.GetNamespacePolicies(ns) + policies := extractPolicies(objs) + hash := hashPolicyEntries(policies) + p.policyCache.SetNamespace(ns, policies, hash) + return policies +} + +func (p *Provider) getClientPolicies(id identity.ClientIdentity) ([]cache.PolicyEntry, error) { + obj, err := p.watcher.GetPolicy(id.Name, id.Namespace) + if err != nil { + return nil, err + } + if obj == nil { + return nil, nil + } + + ap, err := v1alpha1.AuthorizationPolicyFromUnstructured(obj) + if err != nil { + slog.Error("failed to convert client policy", "ns", id.Namespace, "name", id.Name, "error", err) + return nil, err + } + + policies := make([]cache.PolicyEntry, len(ap.Spec.Policies)) + for i, pe := range ap.Spec.Policies { + policies[i] = cache.PolicyEntry{Path: pe.Path, Content: pe.Content} + } + return policies, nil +} + +func extractPolicies(objs []*unstructured.Unstructured) []cache.PolicyEntry { + var result []cache.PolicyEntry + for _, obj := range objs { + ap, err := v1alpha1.AuthorizationPolicyFromUnstructured(obj) + if err != nil { + slog.Error("failed to convert policy CR", "name", obj.GetName(), "error", err) + continue + } + for _, pe := range ap.Spec.Policies { + result = append(result, cache.PolicyEntry{Path: pe.Path, Content: pe.Content}) + } + } + return result +} + +func hashPolicyEntries(entries []cache.PolicyEntry) string { + policies := make([]bundle.Policy, len(entries)) + for i, e := range entries { + policies[i] = bundle.Policy{Path: e.Path, Content: e.Content} + } + return bundle.ContentHash(policies) +} diff --git a/kagenti-operator/internal/bundleservice/provider/provider_test.go b/kagenti-operator/internal/bundleservice/provider/provider_test.go new file mode 100644 index 00000000..865816af --- /dev/null +++ b/kagenti-operator/internal/bundleservice/provider/provider_test.go @@ -0,0 +1,204 @@ +package provider + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + dynamicfake "k8s.io/client-go/dynamic/fake" + + "github.com/kagenti/operator/internal/bundleservice/cache" + "github.com/kagenti/operator/internal/bundleservice/identity" + "github.com/kagenti/operator/internal/bundleservice/watcher" +) + +func newFakeClient(objects ...runtime.Object) *dynamicfake.FakeDynamicClient { + scheme := runtime.NewScheme() + return dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, + map[schema.GroupVersionResource]string{ + {Group: "agent.kagenti.dev", Version: "v1alpha1", Resource: "authorizationpolicies"}: "AuthorizationPolicyList", + }, + objects..., + ) +} + +func newClientPolicy(name, ns string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "agent.kagenti.dev/v1alpha1", + "kind": "AuthorizationPolicy", + "metadata": map[string]any{ + "name": name, + "namespace": ns, + }, + "spec": map[string]any{ + "scope": "client", + "policies": []any{ + map[string]any{ + "path": "inbound/request.rego", + "content": "package authbridge.client\ndefault allow := true\n", + }, + }, + }, + }, + } +} + +func newGlobalPolicy(name, ns string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "agent.kagenti.dev/v1alpha1", + "kind": "AuthorizationPolicy", + "metadata": map[string]any{ + "name": name, + "namespace": ns, + }, + "spec": map[string]any{ + "scope": "global", + "policies": []any{ + map[string]any{ + "path": "base/allow-all.rego", + "content": "package authbridge.global\ndefault allow := true\n", + }, + }, + }, + }, + } +} + +func setupWatcher(t *testing.T, objects ...runtime.Object) (*watcher.Watcher, *cache.ETagCache, *cache.BundleCache, *cache.PolicyCache) { + t.Helper() + client := newFakeClient(objects...) + ec := cache.NewETagCache() + bc := cache.NewBundleCache(100, time.Minute) + pc := cache.NewPolicyCache() + + w := watcher.New(client, "kagenti-system", ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go w.Run(ctx) + + deadline := time.After(5 * time.Second) + for !w.HasSynced() { + select { + case <-deadline: + t.Fatal("informer never synced") + default: + time.Sleep(10 * time.Millisecond) + } + } + return w, ec, bc, pc +} + +func TestProvider_GetBundle_BuildsFromCR(t *testing.T) { + obj := newClientPolicy("test-agent", "default") + w, ec, bc, pc := setupWatcher(t, obj) + + p := New(ec, bc, pc, w) + id := identity.ClientIdentity{Namespace: "default", Name: "test-agent"} + data, etag, err := p.GetBundle(context.Background(), id, "") + if err != nil { + t.Fatalf("GetBundle failed: %v", err) + } + if len(data) == 0 { + t.Fatal("expected non-empty bundle data") + } + if etag == "" { + t.Fatal("expected non-empty etag") + } +} + +func TestProvider_GetBundle_304WhenETagMatches(t *testing.T) { + obj := newClientPolicy("test-agent", "default") + w, ec, bc, pc := setupWatcher(t, obj) + + p := New(ec, bc, pc, w) + id := identity.ClientIdentity{Namespace: "default", Name: "test-agent"} + + _, etag, err := p.GetBundle(context.Background(), id, "") + if err != nil { + t.Fatalf("first GetBundle failed: %v", err) + } + + data, returnedEtag, err := p.GetBundle(context.Background(), id, etag) + if err != nil { + t.Fatalf("second GetBundle failed: %v", err) + } + if data != nil { + t.Fatal("expected nil data for 304") + } + if returnedEtag != etag { + t.Fatalf("expected same etag, got %s vs %s", returnedEtag, etag) + } +} + +func TestProvider_GetBundle_NoCRsReturnsEmptyBundle(t *testing.T) { + w, ec, bc, pc := setupWatcher(t) + + p := New(ec, bc, pc, w) + id := identity.ClientIdentity{Namespace: "default", Name: "no-cr-agent"} + data, _, err := p.GetBundle(context.Background(), id, "") + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + if len(data) == 0 { + t.Fatal("expected non-empty bundle (at least decision policy)") + } +} + +func TestProvider_GetBundle_IncludesGlobalPolicies(t *testing.T) { + global := newGlobalPolicy("global", "kagenti-system") + clientPol := newClientPolicy("my-agent", "default") + w, ec, bc, pc := setupWatcher(t, global, clientPol) + + p := New(ec, bc, pc, w) + id := identity.ClientIdentity{Namespace: "default", Name: "my-agent"} + data, _, err := p.GetBundle(context.Background(), id, "") + if err != nil { + t.Fatalf("GetBundle failed: %v", err) + } + if len(data) == 0 { + t.Fatal("expected non-empty bundle") + } +} + +func TestProvider_GetBundle_SingleflightDedup(t *testing.T) { + obj := newClientPolicy("test-agent", "default") + w, ec, bc, pc := setupWatcher(t, obj) + + p := New(ec, bc, pc, w) + + bc.InvalidateAll() + ec.InvalidateAll() + + var buildCount atomic.Int32 + var wg sync.WaitGroup + const concurrency = 50 + + id := identity.ClientIdentity{Namespace: "default", Name: "test-agent"} + + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + data, _, err := p.GetBundle(context.Background(), id, "") + if err != nil { + t.Errorf("GetBundle failed: %v", err) + return + } + if len(data) > 0 { + buildCount.Add(1) + } + }() + } + wg.Wait() + + if buildCount.Load() != concurrency { + t.Fatalf("expected %d successful results, got %d", concurrency, buildCount.Load()) + } +} diff --git a/kagenti-operator/internal/bundleservice/spiffeid/spiffeid.go b/kagenti-operator/internal/bundleservice/spiffeid/spiffeid.go new file mode 100644 index 00000000..d31da2f1 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/spiffeid/spiffeid.go @@ -0,0 +1,45 @@ +package spiffeid + +import ( + "fmt" + "strings" +) + +type Identity struct { + TrustDomain string + Namespace string + Name string +} + +// Parse extracts namespace and service account name from a SPIFFE identity path. +// Expected format: {trust-domain}/ns/{namespace}/sa/{name} +func Parse(raw string) (Identity, error) { + slashIdx := strings.Index(raw, "/") + if slashIdx < 0 { + return Identity{}, fmt.Errorf("SPIFFE ID missing path: %q", raw) + } + + trustDomain := raw[:slashIdx] + if trustDomain == "" { + return Identity{}, fmt.Errorf("SPIFFE ID has empty trust domain: %q", raw) + } + path := raw[slashIdx+1:] + + parts := strings.Split(path, "/") + if len(parts) != 4 || parts[0] != "ns" || parts[2] != "sa" { + return Identity{}, fmt.Errorf("SPIFFE ID path must be ns/{namespace}/sa/{name}, got %q", path) + } + + ns := parts[1] + name := parts[3] + + if ns == "" || name == "" { + return Identity{}, fmt.Errorf("SPIFFE ID has empty namespace or name: %q", raw) + } + + return Identity{ + TrustDomain: trustDomain, + Namespace: ns, + Name: name, + }, nil +} diff --git a/kagenti-operator/internal/bundleservice/spiffeid/spiffeid_test.go b/kagenti-operator/internal/bundleservice/spiffeid/spiffeid_test.go new file mode 100644 index 00000000..8860ae1e --- /dev/null +++ b/kagenti-operator/internal/bundleservice/spiffeid/spiffeid_test.go @@ -0,0 +1,78 @@ +package spiffeid + +import ( + "testing" +) + +func TestParse(t *testing.T) { + tests := []struct { + name string + input string + want Identity + wantErr bool + }{ + { + name: "valid spiffe id", + input: "localtest.me/ns/default/sa/my-agent", + want: Identity{TrustDomain: "localtest.me", Namespace: "default", Name: "my-agent"}, + }, + { + name: "different trust domain", + input: "prod.example.com/ns/kube-system/sa/gateway", + want: Identity{TrustDomain: "prod.example.com", Namespace: "kube-system", Name: "gateway"}, + }, + { + name: "no slash", + input: "localtest.me", + wantErr: true, + }, + { + name: "wrong path format", + input: "localtest.me/ns/foo/bar/baz", + wantErr: true, + }, + { + name: "missing sa segment", + input: "localtest.me/ns/foo/notsa/bar", + wantErr: true, + }, + { + name: "empty namespace", + input: "localtest.me/ns//sa/bar", + wantErr: true, + }, + { + name: "empty name", + input: "localtest.me/ns/foo/sa/", + wantErr: true, + }, + { + name: "extra path segments", + input: "localtest.me/ns/foo/sa/bar/extra", + wantErr: true, + }, + { + name: "empty trust domain", + input: "/ns/foo/sa/bar", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := Parse(tt.input) + if tt.wantErr { + if err == nil { + t.Errorf("Parse(%q) expected error, got %+v", tt.input, got) + } + return + } + if err != nil { + t.Fatalf("Parse(%q) unexpected error: %v", tt.input, err) + } + if got != tt.want { + t.Errorf("Parse(%q) = %+v, want %+v", tt.input, got, tt.want) + } + }) + } +} diff --git a/kagenti-operator/internal/bundleservice/watcher/watcher.go b/kagenti-operator/internal/bundleservice/watcher/watcher.go new file mode 100644 index 00000000..f7c6e749 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/watcher/watcher.go @@ -0,0 +1,184 @@ +package watcher + +import ( + "context" + "fmt" + "log/slog" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + + v1alpha1 "github.com/kagenti/operator/api/v1alpha1" + bundlecache "github.com/kagenti/operator/internal/bundleservice/cache" +) + +const ( + ScopeIndex = "spec.scope" +) + +type Watcher struct { + informer cache.SharedIndexInformer + etagCache *bundlecache.ETagCache + bundleCache *bundlecache.BundleCache + policyCache *bundlecache.PolicyCache + serviceNamespace string +} + +const defaultScope = "client" + +func New(client dynamic.Interface, serviceNamespace string, etagCache *bundlecache.ETagCache, bundleCache *bundlecache.BundleCache, policyCache *bundlecache.PolicyCache) *Watcher { + factory := dynamicinformer.NewDynamicSharedInformerFactory(client, 0) + informer := factory.ForResource(v1alpha1.AuthorizationPolicyGVR()).Informer() + + _ = informer.AddIndexers(cache.Indexers{ + ScopeIndex: func(obj interface{}) ([]string, error) { + u, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("unexpected type %T", obj) + } + scope, _, _ := unstructured.NestedString(u.Object, "spec", "scope") + if scope == "" { + scope = defaultScope + } + return []string{scope}, nil + }, + }) + + w := &Watcher{ + informer: informer, + etagCache: etagCache, + bundleCache: bundleCache, + policyCache: policyCache, + serviceNamespace: serviceNamespace, + } + + _, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { w.invalidate(obj) }, + UpdateFunc: func(_, newObj interface{}) { w.invalidate(newObj) }, + DeleteFunc: func(obj interface{}) { w.invalidate(obj) }, + }) + + return w +} + +func (w *Watcher) Run(ctx context.Context) { + slog.Info("starting AuthorizationPolicy watcher") + w.informer.Run(ctx.Done()) +} + +func (w *Watcher) HasSynced() bool { + return w.informer.HasSynced() +} + +func (w *Watcher) Ready() bool { + return w.informer.HasSynced() +} + +// GetGlobalPolicies returns global-scope CRs that reside in the service namespace. +// Global CRs in other namespaces are ignored. +func (w *Watcher) GetGlobalPolicies() []*unstructured.Unstructured { + items, err := w.informer.GetIndexer().ByIndex(ScopeIndex, "global") + if err != nil { + slog.Error("global index lookup failed", "error", err) + return nil + } + var result []*unstructured.Unstructured + for _, item := range items { + u, ok := item.(*unstructured.Unstructured) + if !ok { + continue + } + if u.GetNamespace() != w.serviceNamespace { + slog.Warn("ignoring global CR outside service namespace", + "name", u.GetName(), "namespace", u.GetNamespace(), + "expected", w.serviceNamespace) + continue + } + result = append(result, u) + } + return result +} + +// GetNamespacePolicies returns namespace-scope CRs that reside in the given namespace. +func (w *Watcher) GetNamespacePolicies(ns string) []*unstructured.Unstructured { + items, err := w.informer.GetIndexer().ByIndex(ScopeIndex, "namespace") + if err != nil { + slog.Error("namespace index lookup failed", "error", err) + return nil + } + var result []*unstructured.Unstructured + for _, item := range items { + u, ok := item.(*unstructured.Unstructured) + if !ok { + continue + } + if u.GetNamespace() == ns { + result = append(result, u) + } + } + return result +} + +// GetPolicy looks up a client-scope AuthorizationPolicy CR by name and namespace. +func (w *Watcher) GetPolicy(name, namespace string) (*unstructured.Unstructured, error) { + key := namespace + "/" + name + item, exists, err := w.informer.GetIndexer().GetByKey(key) + if err != nil { + return nil, fmt.Errorf("lookup %s: %w", key, err) + } + if !exists { + return nil, nil + } + u, ok := item.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("unexpected type %T for key %s", item, key) + } + return u, nil +} + +func (w *Watcher) invalidate(obj interface{}) { + u, ok := obj.(*unstructured.Unstructured) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + u, ok = tombstone.Obj.(*unstructured.Unstructured) + if !ok { + return + } + } + + scope, _, _ := unstructured.NestedString(u.Object, "spec", "scope") + if scope == "" { + scope = "client" + } + + switch scope { + case "global": + if u.GetNamespace() != w.serviceNamespace { + return + } + slog.Info("global policy changed, invalidating all caches") + w.policyCache.InvalidateGlobal() + w.etagCache.InvalidateAll() + w.bundleCache.InvalidateAll() + + case "namespace": + ns := u.GetNamespace() + slog.Info("namespace policy changed", "namespace", ns) + w.policyCache.InvalidateNamespace(ns) + w.etagCache.InvalidateAll() + w.bundleCache.InvalidateAll() + + case "client": + ns := u.GetNamespace() + name := u.GetName() + key := ns + "/" + name + slog.Info("client policy changed", "key", key) + w.etagCache.Invalidate(key) + w.bundleCache.Invalidate(key) + } +} diff --git a/kagenti-operator/internal/bundleservice/watcher/watcher_test.go b/kagenti-operator/internal/bundleservice/watcher/watcher_test.go new file mode 100644 index 00000000..19fc0226 --- /dev/null +++ b/kagenti-operator/internal/bundleservice/watcher/watcher_test.go @@ -0,0 +1,249 @@ +package watcher + +import ( + "context" + "testing" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + dynamicfake "k8s.io/client-go/dynamic/fake" + + bundlecache "github.com/kagenti/operator/internal/bundleservice/cache" +) + +const testServiceNamespace = "kagenti-system" + +func newFakeClient(objects ...runtime.Object) *dynamicfake.FakeDynamicClient { + scheme := runtime.NewScheme() + return dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, + map[schema.GroupVersionResource]string{ + {Group: "agent.kagenti.dev", Version: "v1alpha1", Resource: "authorizationpolicies"}: "AuthorizationPolicyList", + }, + objects..., + ) +} + +func newPolicy(name, ns, scope, clientID string) *unstructured.Unstructured { + spec := map[string]any{ + "scope": scope, + "policies": []any{ + map[string]any{ + "path": "inbound/request.rego", + "content": "package test\ndefault allow := true\n", + }, + }, + } + if clientID != "" { + spec["clientID"] = clientID + } + return &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "agent.kagenti.dev/v1alpha1", + "kind": "AuthorizationPolicy", + "metadata": map[string]any{ + "name": name, + "namespace": ns, + }, + "spec": spec, + }, + } +} + +func waitForSync(t *testing.T, w *Watcher) { + t.Helper() + deadline := time.After(5 * time.Second) + for !w.HasSynced() { + select { + case <-deadline: + t.Fatal("informer never synced") + default: + time.Sleep(10 * time.Millisecond) + } + } +} + +func TestWatcher_GetPolicy(t *testing.T) { + obj := newPolicy("my-policy", "default", "client", "") + client := newFakeClient(obj) + + ec := bundlecache.NewETagCache() + bc := bundlecache.NewBundleCache(10, time.Minute) + pc := bundlecache.NewPolicyCache() + + w := New(client, testServiceNamespace, ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Run(ctx) + waitForSync(t, w) + + result, err := w.GetPolicy("my-policy", "default") + if err != nil { + t.Fatalf("GetPolicy failed: %v", err) + } + if result == nil { + t.Fatal("expected non-nil result") + } +} + +func TestWatcher_GetPolicy_NotFound(t *testing.T) { + client := newFakeClient() + + ec := bundlecache.NewETagCache() + bc := bundlecache.NewBundleCache(10, time.Minute) + pc := bundlecache.NewPolicyCache() + + w := New(client, testServiceNamespace, ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Run(ctx) + waitForSync(t, w) + + result, err := w.GetPolicy("nonexistent", "default") + if err != nil { + t.Fatalf("GetPolicy failed: %v", err) + } + if result != nil { + t.Fatal("expected nil result") + } +} + +func TestWatcher_GetGlobalPolicies(t *testing.T) { + obj := newPolicy("global-policy", testServiceNamespace, "global", "") + client := newFakeClient(obj) + + ec := bundlecache.NewETagCache() + bc := bundlecache.NewBundleCache(10, time.Minute) + pc := bundlecache.NewPolicyCache() + + w := New(client, testServiceNamespace, ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Run(ctx) + waitForSync(t, w) + + result := w.GetGlobalPolicies() + if len(result) != 1 { + t.Fatalf("expected 1 global policy, got %d", len(result)) + } +} + +func TestWatcher_GetGlobalPolicies_IgnoresWrongNamespace(t *testing.T) { + obj := newPolicy("rogue-global", "other-namespace", "global", "") + client := newFakeClient(obj) + + ec := bundlecache.NewETagCache() + bc := bundlecache.NewBundleCache(10, time.Minute) + pc := bundlecache.NewPolicyCache() + + w := New(client, testServiceNamespace, ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Run(ctx) + waitForSync(t, w) + + result := w.GetGlobalPolicies() + if len(result) != 0 { + t.Fatalf("expected 0 global policies (wrong namespace), got %d", len(result)) + } +} + +func TestWatcher_GetNamespacePolicies(t *testing.T) { + obj1 := newPolicy("ns-policy", "test-ns", "namespace", "") + obj2 := newPolicy("other-ns-policy", "other-ns", "namespace", "") + client := newFakeClient(obj1, obj2) + + ec := bundlecache.NewETagCache() + bc := bundlecache.NewBundleCache(10, time.Minute) + pc := bundlecache.NewPolicyCache() + + w := New(client, testServiceNamespace, ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Run(ctx) + waitForSync(t, w) + + result := w.GetNamespacePolicies("test-ns") + if len(result) != 1 { + t.Fatalf("expected 1 namespace policy, got %d", len(result)) + } +} + +func TestWatcher_GlobalChangeInvalidatesAll(t *testing.T) { + obj := newPolicy("global-policy", testServiceNamespace, "global", "") + client := newFakeClient(obj) + + ec := bundlecache.NewETagCache() + bc := bundlecache.NewBundleCache(10, time.Minute) + pc := bundlecache.NewPolicyCache() + + ec.Set("default/agent1", "hash1") + ec.Set("default/agent2", "hash2") + bc.Set("default/agent1", []byte("data"), "hash1") + pc.SetGlobal([]bundlecache.PolicyEntry{{Path: "old.rego", Content: "old"}}, "oldhash") + + w := New(client, testServiceNamespace, ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Run(ctx) + waitForSync(t, w) + + if ec.Len() != 0 { + t.Fatalf("expected etag cache cleared, got %d entries", ec.Len()) + } + if bc.Len() != 0 { + t.Fatalf("expected bundle cache cleared, got %d entries", bc.Len()) + } + if _, _, ok := pc.GetGlobal(); ok { + t.Fatal("expected policy cache global to be invalidated") + } +} + +func TestWatcher_GlobalChangeInWrongNamespaceIgnored(t *testing.T) { + obj := newPolicy("rogue-global", "wrong-namespace", "global", "") + client := newFakeClient(obj) + + ec := bundlecache.NewETagCache() + bc := bundlecache.NewBundleCache(10, time.Minute) + pc := bundlecache.NewPolicyCache() + + ec.Set("default/agent1", "hash1") + + w := New(client, testServiceNamespace, ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Run(ctx) + waitForSync(t, w) + + if _, ok := ec.Get("default/agent1"); !ok { + t.Fatal("expected agent1 etag to survive — global CR is in wrong namespace") + } +} + +func TestWatcher_ClientChangeInvalidatesOnlyClient(t *testing.T) { + obj := newPolicy("agent1", "default", "client", "") + client := newFakeClient(obj) + + ec := bundlecache.NewETagCache() + bc := bundlecache.NewBundleCache(10, time.Minute) + pc := bundlecache.NewPolicyCache() + + ec.Set("default/agent1", "hash1") + ec.Set("default/agent2", "hash2") + bc.Set("default/agent1", []byte("data1"), "hash1") + bc.Set("default/agent2", []byte("data2"), "hash2") + + w := New(client, testServiceNamespace, ec, bc, pc) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Run(ctx) + waitForSync(t, w) + + if _, ok := ec.Get("default/agent1"); ok { + t.Fatal("expected default/agent1 etag to be invalidated") + } + if _, ok := ec.Get("default/agent2"); !ok { + t.Fatal("expected default/agent2 etag to survive") + } +}