From b48501632e63cd3285d3806651c7471c0dfc7cad Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 4 Feb 2026 10:59:55 -0800 Subject: [PATCH 01/13] Add MultiKeyListener CRD and generated boilerplate Introduce the MultiKeyListener custom resource type with support for priority-based routing across multiple routing keys. This commit adds: - CRD schema definition (skupper_multikeylistener_crd.yaml) - API types with Priority strategy (multikeylistener_types.go) - Generated clientset, informers, and listers Signed-off-by: Christian Kruse --- .../bases/skupper_multikeylistener_crd.yaml | 230 ++++++++++++++++++ config/crd/kustomization.yaml | 1 + config/rbac/cluster/clusterrole.yaml | 2 + config/rbac/namespace/role.yaml | 2 + .../v2alpha1/multikeylistener_types.go | 193 +++++++++++++++ pkg/apis/skupper/v2alpha1/register.go | 2 +- .../skupper/v2alpha1/zz_generated.deepcopy.go | 197 +++++++++++++++ .../v2alpha1/fake/fake_multikeylistener.go | 52 ++++ .../v2alpha1/fake/fake_skupper_client.go | 4 + .../skupper/v2alpha1/generated_expansion.go | 2 + .../skupper/v2alpha1/multikeylistener.go | 70 ++++++ .../typed/skupper/v2alpha1/skupper_client.go | 5 + .../informers/externalversions/generic.go | 2 + .../skupper/v2alpha1/interface.go | 7 + .../skupper/v2alpha1/multikeylistener.go | 102 ++++++++ .../skupper/v2alpha1/expansion_generated.go | 8 + .../skupper/v2alpha1/multikeylistener.go | 70 ++++++ 17 files changed, 948 insertions(+), 1 deletion(-) create mode 100644 config/crd/bases/skupper_multikeylistener_crd.yaml create mode 100644 pkg/apis/skupper/v2alpha1/multikeylistener_types.go create mode 100644 pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/fake/fake_multikeylistener.go create mode 100644 pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/multikeylistener.go create mode 100644 pkg/generated/client/informers/externalversions/skupper/v2alpha1/multikeylistener.go create mode 100644 pkg/generated/client/listers/skupper/v2alpha1/multikeylistener.go diff --git a/config/crd/bases/skupper_multikeylistener_crd.yaml b/config/crd/bases/skupper_multikeylistener_crd.yaml new file mode 100644 index 000000000..1c565773c --- /dev/null +++ b/config/crd/bases/skupper_multikeylistener_crd.yaml @@ -0,0 +1,230 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.19.0 + name: multikeylisteners.skupper.io +spec: + group: skupper.io + names: + kind: MultiKeyListener + listKind: MultiKeyListenerList + plural: multikeylisteners + singular: multikeylistener + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: The status of the multikeylistener + jsonPath: .status.status + name: Status + type: string + - description: Any human reandable message relevant to the multikeylistener + jsonPath: .status.message + name: Message + type: string + - description: Whether there is at least one connector in the network matched + by the strategy + jsonPath: .status.hasDestination + name: HasDestination + type: boolean + name: v2alpha1 + schema: + openAPIV3Schema: + description: |- + MultiKeyListeners bind a local connection endpoint to Connectors across the + Skupper network. A MultiKeyListener has a strategy that matches it to + Connector routing keys. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + host: + description: |- + host is the hostname or IP address of the local listener. Clients at + this site use the listener host and port to establish connections to the + remote service. + type: string + port: + description: |- + port of the local listener. Clients at this site use the listener host + and port to establish connections to the remote service. + type: integer + requireClientCert: + description: |- + requireClientCert indicates that clients must present valid certificates + to the listener to connect. + type: boolean + settings: + additionalProperties: + type: string + description: |- + settings is a map containing additional settings. + + **Note:** In general, we recommend not changing settings from + their default values. + type: object + strategy: + description: |- + strategy for routing traffic from the local listener endpoint to one or + more connector instances by routing key. + properties: + priority: + description: |- + PriorityStrategySpec specifies an ordered set of routing keys to + route traffic to. + + With this strategy 100% of traffic will be directed to the first routing key + with a reachable connector. + properties: + routingKeys: + description: routingKeys to route traffic to in order of highest + to lowest priority. + items: + type: string + maxItems: 256 + minItems: 1 + type: array + x-kubernetes-list-type: set + required: + - routingKeys + type: object + type: object + x-kubernetes-validations: + - message: exactly one of the fields in [priority] must be + set + rule: '[has(self.priority)].filter(x,x==true).size() == + 1' + tlsCredentials: + description: tlsCredentials for client-to-listener + type: string + required: + - host + - port + - strategy + type: object + status: + properties: + conditions: + description: |- + conditions describing the current state of the multikeylistener + + - `Configured`: The multikeylistener configuration has been applied to the router. + - `Operational`: There is at least one connector corresponding to the multikeylistener strategy. + - `Ready`: The multikeylistener is ready to use. All other conditions are true.. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + hasDestination: + description: |- + hasDestination is set true when there is at least one connector in the + network with a routing key matched by the strategy. + type: boolean + message: + description: A human-readable status message. Error messages are reported + here. + type: string + status: + description: |- + The current state of the resource. + - `Pending`: The resource is being processed. + - `Error`: There was an error processing the resource. See `message` for more information. + - `Ready`: The resource is ready to use. + type: string + strategy: + properties: + priority: + description: priority status + properties: + routingKeysReachable: + description: |- + routingKeysReachable is a list of routingKeys with at least one + reachable connector given in priority order. + items: + type: string + type: array + required: + - routingKeysReachable + type: object + type: object + x-kubernetes-validations: + - message: exactly one of the fields in [priority] must be + set + rule: '[has(self.priority)].filter(x,x==true).size() == + 1' + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 333d1e3d0..64f07e5e0 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -9,6 +9,7 @@ resources: - bases/skupper_connector_crd.yaml - bases/skupper_link_crd.yaml - bases/skupper_listener_crd.yaml +- bases/skupper_multikeylistener_crd.yaml - bases/skupper_router_access_crd.yaml - bases/skupper_secured_access_crd.yaml - bases/skupper_site_crd.yaml \ No newline at end of file diff --git a/config/rbac/cluster/clusterrole.yaml b/config/rbac/cluster/clusterrole.yaml index dddb3a78d..1dee2c6e9 100644 --- a/config/rbac/cluster/clusterrole.yaml +++ b/config/rbac/cluster/clusterrole.yaml @@ -133,6 +133,8 @@ rules: - accessgrants/status - listeners - listeners/status + - multikeylisteners + - multikeylisteners/status - connectors - connectors/status - attachedconnectors diff --git a/config/rbac/namespace/role.yaml b/config/rbac/namespace/role.yaml index 79219737c..58c036b09 100644 --- a/config/rbac/namespace/role.yaml +++ b/config/rbac/namespace/role.yaml @@ -134,6 +134,8 @@ rules: - accessgrants/status - listeners - listeners/status + - multikeylisteners + - multikeylisteners/status - connectors - connectors/status - attachedconnectors diff --git a/pkg/apis/skupper/v2alpha1/multikeylistener_types.go b/pkg/apis/skupper/v2alpha1/multikeylistener_types.go new file mode 100644 index 000000000..608c00a68 --- /dev/null +++ b/pkg/apis/skupper/v2alpha1/multikeylistener_types.go @@ -0,0 +1,193 @@ +package v2alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/api/meta" +) + +// +genclient +// +kubebuilder:object:root=true +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name=Status,JSONPath=.status.status,description="The status of the multikeylistener",type=string +// +kubebuilder:printcolumn:name=Message,JSONPath=.status.message,description="Any human reandable message relevant to the multikeylistener",type=string +// +kubebuilder:printcolumn:name=HasDestination,JSONPath=.status.hasDestination,description="Whether there is at least one connector in the network matched by the strategy",type=boolean +// + +// MultiKeyListeners bind a local connection endpoint to Connectors across the +// Skupper network. A MultiKeyListener has a strategy that matches it to +// Connector routing keys. +type MultiKeyListener struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ObjectMeta `json:"metadata"` + // +required + Spec MultiKeyListenerSpec `json:"spec"` + // +optional + Status MultiKeyListenerStatus `json:"status"` +} + +// +kubebuilder:object:root=true +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// MultiKeyListenerList contains a list of MultiKeyListener +type MultiKeyListenerList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []MultiKeyListener `json:"items"` +} + +type MultiKeyListenerStatus struct { + // conditions describing the current state of the multikeylistener + // + // - `Configured`: The multikeylistener configuration has been applied to the router. + // - `Operational`: There is at least one connector corresponding to the multikeylistener strategy. + // - `Ready`: The multikeylistener is ready to use. All other conditions are true.. + Conditions []metav1.Condition `json:"conditions,omitempty"` + // The current state of the resource. + // - `Pending`: The resource is being processed. + // - `Error`: There was an error processing the resource. See `message` for more information. + // - `Ready`: The resource is ready to use. + StatusType StatusType `json:"status,omitempty"` + // A human-readable status message. Error messages are reported here. + Message string `json:"message,omitempty"` + // hasDestination is set true when there is at least one connector in the + // network with a routing key matched by the strategy. + HasDestination bool `json:"hasDestination,omitempty"` + + Strategy *StrategyStatus `json:"strategy,omitempty"` +} + +// +kubebuilder:validation:ExactlyOneOf=priority +type StrategyStatus struct { + // priority status + Priority *PriorityStrategyStatus `json:"priority,omitempty"` +} + +type PriorityStrategyStatus struct { + // routingKeysReachable is a list of routingKeys with at least one + // reachable connector given in priority order. + RoutingKeysReachable []string `json:"routingKeysReachable"` +} + +type MultiKeyListenerSpec struct { + // host is the hostname or IP address of the local listener. Clients at + // this site use the listener host and port to establish connections to the + // remote service. + Host string `json:"host"` + // port of the local listener. Clients at this site use the listener host + // and port to establish connections to the remote service. + Port int `json:"port"` + // tlsCredentials for client-to-listener + TlsCredentials string `json:"tlsCredentials,omitempty"` + // requireClientCert indicates that clients must present valid certificates + // to the listener to connect. + RequireClientCert bool `json:"requireClientCert,omitempty"` + + // settings is a map containing additional settings. + // + // **Note:** In general, we recommend not changing settings from + // their default values. + Settings map[string]string `json:"settings,omitempty"` + + // strategy for routing traffic from the local listener endpoint to one or + // more connector instances by routing key. + Strategy MultiKeyListenerStrategy `json:"strategy"` +} + +// MultiKeyListenerStrategy contains configuration for each strategy. Only one +// strategy can be specified at a time. +// +// Presently Priority Failover is the only strategy available. +// +// +kubebuilder:validation:ExactlyOneOf=priority +type MultiKeyListenerStrategy struct { + Priority *PriorityStrategySpec `json:"priority,omitempty"` +} + +// PriorityStrategySpec specifies an ordered set of routing keys to +// route traffic to. +// +// With this strategy 100% of traffic will be directed to the first routing key +// with a reachable connector. +type PriorityStrategySpec struct { + // +kubebuilder:validation:MinItems=1 + // +kubebuilder:validation:MaxItems=256 + // +listType=set + + // routingKeys to route traffic to in order of highest to lowest priority. + RoutingKeys []string `json:"routingKeys"` +} + +func (s *MultiKeyListenerStatus) SetCondition(conditionType string, state ConditionState, generation int64) bool { + condition := metav1.Condition{ + Type: conditionType, + ObservedGeneration: generation, + Status: state.Status, + Reason: string(state.Reason), + Message: state.Message, + } + return setStatusCondition(&s.Conditions, condition) +} + +func (s *MultiKeyListenerStatus) setReady(requiredConditions []string, generation int64) bool { + state := s.readyState(requiredConditions) + changed := false + if s.StatusType != state.Reason { + s.StatusType = state.Reason + changed = true + } + if s.Message != state.Message { + s.Message = state.Message + changed = true + } + return changed +} + +func (s *MultiKeyListenerStatus) readyState(requiredConditions []string) ConditionState { + for _, conditionType := range requiredConditions { + existing := meta.FindStatusCondition(s.Conditions, conditionType) + if existing == nil { + return PendingCondition("Not " + conditionType) + } else if existing.Status == metav1.ConditionFalse { + return ConditionState{ + Status: metav1.ConditionFalse, + Reason: StatusType(existing.Reason), + Message: existing.Message, + } + } + } + return ReadyCondition() +} + +func (m *MultiKeyListener) SetConfigured(err error) bool { + if m.Status.SetCondition(CONDITION_TYPE_CONFIGURED, ErrorOrReadyCondition(err), m.ObjectMeta.Generation) { + m.Status.setReady([]string{CONDITION_TYPE_CONFIGURED, CONDITION_TYPE_OPERATIONAL}, m.ObjectMeta.Generation) + return true + } + return false +} + +func (m *MultiKeyListener) operational() ConditionState { + if m.Status.HasDestination { + return ReadyCondition() + } + return PendingCondition("No matching connectors") +} + +func (m *MultiKeyListener) SetOperational() bool { + if m.Status.SetCondition(CONDITION_TYPE_OPERATIONAL, m.operational(), m.ObjectMeta.Generation) { + m.Status.setReady([]string{CONDITION_TYPE_CONFIGURED, CONDITION_TYPE_OPERATIONAL}, m.ObjectMeta.Generation) + return true + } + return false +} + +func (m *MultiKeyListener) SetHasDestination(value bool) bool { + if m.Status.HasDestination != value { + m.Status.HasDestination = value + m.SetOperational() + return true + } + return false +} diff --git a/pkg/apis/skupper/v2alpha1/register.go b/pkg/apis/skupper/v2alpha1/register.go index 117bed7e1..8dedf474f 100644 --- a/pkg/apis/skupper/v2alpha1/register.go +++ b/pkg/apis/skupper/v2alpha1/register.go @@ -26,7 +26,7 @@ var ( ) func addKnownTypes(scheme *runtime.Scheme) error { - scheme.AddKnownTypes(SchemeGroupVersion, &Site{}, &SiteList{}, &Listener{}, &ListenerList{}, &Connector{}, &ConnectorList{}, &Link{}, &LinkList{}, &AccessToken{}, &AccessTokenList{}, &AccessGrant{}, &AccessGrantList{}, &SecuredAccess{}, &SecuredAccessList{}, &Certificate{}, &CertificateList{}, &RouterAccess{}, &RouterAccessList{}, &AttachedConnector{}, &AttachedConnectorList{}, &AttachedConnectorBinding{}, &AttachedConnectorBindingList{}) + scheme.AddKnownTypes(SchemeGroupVersion, &Site{}, &SiteList{}, &Listener{}, &ListenerList{}, &Connector{}, &ConnectorList{}, &Link{}, &LinkList{}, &AccessToken{}, &AccessTokenList{}, &AccessGrant{}, &AccessGrantList{}, &SecuredAccess{}, &SecuredAccessList{}, &Certificate{}, &CertificateList{}, &RouterAccess{}, &RouterAccessList{}, &AttachedConnector{}, &AttachedConnectorList{}, &AttachedConnectorBinding{}, &AttachedConnectorBindingList{}, &MultiKeyListener{}, &MultiKeyListenerList{}) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil } diff --git a/pkg/apis/skupper/v2alpha1/zz_generated.deepcopy.go b/pkg/apis/skupper/v2alpha1/zz_generated.deepcopy.go index 2ad41f3be..293b3d0c0 100644 --- a/pkg/apis/skupper/v2alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/skupper/v2alpha1/zz_generated.deepcopy.go @@ -918,6 +918,140 @@ func (in *ListenerStatus) DeepCopy() *ListenerStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MultiKeyListener) DeepCopyInto(out *MultiKeyListener) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKeyListener. +func (in *MultiKeyListener) DeepCopy() *MultiKeyListener { + if in == nil { + return nil + } + out := new(MultiKeyListener) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *MultiKeyListener) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MultiKeyListenerList) DeepCopyInto(out *MultiKeyListenerList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]MultiKeyListener, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKeyListenerList. +func (in *MultiKeyListenerList) DeepCopy() *MultiKeyListenerList { + if in == nil { + return nil + } + out := new(MultiKeyListenerList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *MultiKeyListenerList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MultiKeyListenerSpec) DeepCopyInto(out *MultiKeyListenerSpec) { + *out = *in + if in.Settings != nil { + in, out := &in.Settings, &out.Settings + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + in.Strategy.DeepCopyInto(&out.Strategy) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKeyListenerSpec. +func (in *MultiKeyListenerSpec) DeepCopy() *MultiKeyListenerSpec { + if in == nil { + return nil + } + out := new(MultiKeyListenerSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MultiKeyListenerStatus) DeepCopyInto(out *MultiKeyListenerStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.Strategy != nil { + in, out := &in.Strategy, &out.Strategy + *out = new(StrategyStatus) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKeyListenerStatus. +func (in *MultiKeyListenerStatus) DeepCopy() *MultiKeyListenerStatus { + if in == nil { + return nil + } + out := new(MultiKeyListenerStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MultiKeyListenerStrategy) DeepCopyInto(out *MultiKeyListenerStrategy) { + *out = *in + if in.Priority != nil { + in, out := &in.Priority, &out.Priority + *out = new(PriorityStrategySpec) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKeyListenerStrategy. +func (in *MultiKeyListenerStrategy) DeepCopy() *MultiKeyListenerStrategy { + if in == nil { + return nil + } + out := new(MultiKeyListenerStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodDetails) DeepCopyInto(out *PodDetails) { *out = *in @@ -934,6 +1068,48 @@ func (in *PodDetails) DeepCopy() *PodDetails { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PriorityStrategySpec) DeepCopyInto(out *PriorityStrategySpec) { + *out = *in + if in.RoutingKeys != nil { + in, out := &in.RoutingKeys, &out.RoutingKeys + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PriorityStrategySpec. +func (in *PriorityStrategySpec) DeepCopy() *PriorityStrategySpec { + if in == nil { + return nil + } + out := new(PriorityStrategySpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PriorityStrategyStatus) DeepCopyInto(out *PriorityStrategyStatus) { + *out = *in + if in.RoutingKeysReachable != nil { + in, out := &in.RoutingKeysReachable, &out.RoutingKeysReachable + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PriorityStrategyStatus. +func (in *PriorityStrategyStatus) DeepCopy() *PriorityStrategyStatus { + if in == nil { + return nil + } + out := new(PriorityStrategyStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RouterAccess) DeepCopyInto(out *RouterAccess) { *out = *in @@ -1410,3 +1586,24 @@ func (in *Status) DeepCopy() *Status { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StrategyStatus) DeepCopyInto(out *StrategyStatus) { + *out = *in + if in.Priority != nil { + in, out := &in.Priority, &out.Priority + *out = new(PriorityStrategyStatus) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StrategyStatus. +func (in *StrategyStatus) DeepCopy() *StrategyStatus { + if in == nil { + return nil + } + out := new(StrategyStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/fake/fake_multikeylistener.go b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/fake/fake_multikeylistener.go new file mode 100644 index 000000000..7af78dfb0 --- /dev/null +++ b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/fake/fake_multikeylistener.go @@ -0,0 +1,52 @@ +/* +Copyright 2021 The Skupper Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1" + skupperv2alpha1 "github.com/skupperproject/skupper/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1" + gentype "k8s.io/client-go/gentype" +) + +// fakeMultiKeyListeners implements MultiKeyListenerInterface +type fakeMultiKeyListeners struct { + *gentype.FakeClientWithList[*v2alpha1.MultiKeyListener, *v2alpha1.MultiKeyListenerList] + Fake *FakeSkupperV2alpha1 +} + +func newFakeMultiKeyListeners(fake *FakeSkupperV2alpha1, namespace string) skupperv2alpha1.MultiKeyListenerInterface { + return &fakeMultiKeyListeners{ + gentype.NewFakeClientWithList[*v2alpha1.MultiKeyListener, *v2alpha1.MultiKeyListenerList]( + fake.Fake, + namespace, + v2alpha1.SchemeGroupVersion.WithResource("multikeylisteners"), + v2alpha1.SchemeGroupVersion.WithKind("MultiKeyListener"), + func() *v2alpha1.MultiKeyListener { return &v2alpha1.MultiKeyListener{} }, + func() *v2alpha1.MultiKeyListenerList { return &v2alpha1.MultiKeyListenerList{} }, + func(dst, src *v2alpha1.MultiKeyListenerList) { dst.ListMeta = src.ListMeta }, + func(list *v2alpha1.MultiKeyListenerList) []*v2alpha1.MultiKeyListener { + return gentype.ToPointerSlice(list.Items) + }, + func(list *v2alpha1.MultiKeyListenerList, items []*v2alpha1.MultiKeyListener) { + list.Items = gentype.FromPointerSlice(items) + }, + ), + fake, + } +} diff --git a/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/fake/fake_skupper_client.go b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/fake/fake_skupper_client.go index 0df119160..04a797f85 100644 --- a/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/fake/fake_skupper_client.go +++ b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/fake/fake_skupper_client.go @@ -60,6 +60,10 @@ func (c *FakeSkupperV2alpha1) Listeners(namespace string) v2alpha1.ListenerInter return newFakeListeners(c, namespace) } +func (c *FakeSkupperV2alpha1) MultiKeyListeners(namespace string) v2alpha1.MultiKeyListenerInterface { + return newFakeMultiKeyListeners(c, namespace) +} + func (c *FakeSkupperV2alpha1) RouterAccesses(namespace string) v2alpha1.RouterAccessInterface { return newFakeRouterAccesses(c, namespace) } diff --git a/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/generated_expansion.go b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/generated_expansion.go index 471aea785..4c8eb86b2 100644 --- a/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/generated_expansion.go +++ b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/generated_expansion.go @@ -34,6 +34,8 @@ type LinkExpansion interface{} type ListenerExpansion interface{} +type MultiKeyListenerExpansion interface{} + type RouterAccessExpansion interface{} type SecuredAccessExpansion interface{} diff --git a/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/multikeylistener.go b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/multikeylistener.go new file mode 100644 index 000000000..f7f2c3659 --- /dev/null +++ b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/multikeylistener.go @@ -0,0 +1,70 @@ +/* +Copyright 2021 The Skupper Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v2alpha1 + +import ( + context "context" + + skupperv2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1" + scheme "github.com/skupperproject/skupper/pkg/generated/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + gentype "k8s.io/client-go/gentype" +) + +// MultiKeyListenersGetter has a method to return a MultiKeyListenerInterface. +// A group's client should implement this interface. +type MultiKeyListenersGetter interface { + MultiKeyListeners(namespace string) MultiKeyListenerInterface +} + +// MultiKeyListenerInterface has methods to work with MultiKeyListener resources. +type MultiKeyListenerInterface interface { + Create(ctx context.Context, multiKeyListener *skupperv2alpha1.MultiKeyListener, opts v1.CreateOptions) (*skupperv2alpha1.MultiKeyListener, error) + Update(ctx context.Context, multiKeyListener *skupperv2alpha1.MultiKeyListener, opts v1.UpdateOptions) (*skupperv2alpha1.MultiKeyListener, error) + // Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + UpdateStatus(ctx context.Context, multiKeyListener *skupperv2alpha1.MultiKeyListener, opts v1.UpdateOptions) (*skupperv2alpha1.MultiKeyListener, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*skupperv2alpha1.MultiKeyListener, error) + List(ctx context.Context, opts v1.ListOptions) (*skupperv2alpha1.MultiKeyListenerList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *skupperv2alpha1.MultiKeyListener, err error) + MultiKeyListenerExpansion +} + +// multiKeyListeners implements MultiKeyListenerInterface +type multiKeyListeners struct { + *gentype.ClientWithList[*skupperv2alpha1.MultiKeyListener, *skupperv2alpha1.MultiKeyListenerList] +} + +// newMultiKeyListeners returns a MultiKeyListeners +func newMultiKeyListeners(c *SkupperV2alpha1Client, namespace string) *multiKeyListeners { + return &multiKeyListeners{ + gentype.NewClientWithList[*skupperv2alpha1.MultiKeyListener, *skupperv2alpha1.MultiKeyListenerList]( + "multikeylisteners", + c.RESTClient(), + scheme.ParameterCodec, + namespace, + func() *skupperv2alpha1.MultiKeyListener { return &skupperv2alpha1.MultiKeyListener{} }, + func() *skupperv2alpha1.MultiKeyListenerList { return &skupperv2alpha1.MultiKeyListenerList{} }, + ), + } +} diff --git a/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/skupper_client.go b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/skupper_client.go index 8882a207f..d22308175 100644 --- a/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/skupper_client.go +++ b/pkg/generated/client/clientset/versioned/typed/skupper/v2alpha1/skupper_client.go @@ -36,6 +36,7 @@ type SkupperV2alpha1Interface interface { ConnectorsGetter LinksGetter ListenersGetter + MultiKeyListenersGetter RouterAccessesGetter SecuredAccessesGetter SitesGetter @@ -78,6 +79,10 @@ func (c *SkupperV2alpha1Client) Listeners(namespace string) ListenerInterface { return newListeners(c, namespace) } +func (c *SkupperV2alpha1Client) MultiKeyListeners(namespace string) MultiKeyListenerInterface { + return newMultiKeyListeners(c, namespace) +} + func (c *SkupperV2alpha1Client) RouterAccesses(namespace string) RouterAccessInterface { return newRouterAccesses(c, namespace) } diff --git a/pkg/generated/client/informers/externalversions/generic.go b/pkg/generated/client/informers/externalversions/generic.go index 67057c78d..455abaf1d 100644 --- a/pkg/generated/client/informers/externalversions/generic.go +++ b/pkg/generated/client/informers/externalversions/generic.go @@ -69,6 +69,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Skupper().V2alpha1().Links().Informer()}, nil case v2alpha1.SchemeGroupVersion.WithResource("listeners"): return &genericInformer{resource: resource.GroupResource(), informer: f.Skupper().V2alpha1().Listeners().Informer()}, nil + case v2alpha1.SchemeGroupVersion.WithResource("multikeylisteners"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Skupper().V2alpha1().MultiKeyListeners().Informer()}, nil case v2alpha1.SchemeGroupVersion.WithResource("routeraccesses"): return &genericInformer{resource: resource.GroupResource(), informer: f.Skupper().V2alpha1().RouterAccesses().Informer()}, nil case v2alpha1.SchemeGroupVersion.WithResource("securedaccesses"): diff --git a/pkg/generated/client/informers/externalversions/skupper/v2alpha1/interface.go b/pkg/generated/client/informers/externalversions/skupper/v2alpha1/interface.go index 1bcbedbd6..80f43e4eb 100644 --- a/pkg/generated/client/informers/externalversions/skupper/v2alpha1/interface.go +++ b/pkg/generated/client/informers/externalversions/skupper/v2alpha1/interface.go @@ -40,6 +40,8 @@ type Interface interface { Links() LinkInformer // Listeners returns a ListenerInformer. Listeners() ListenerInformer + // MultiKeyListeners returns a MultiKeyListenerInformer. + MultiKeyListeners() MultiKeyListenerInformer // RouterAccesses returns a RouterAccessInformer. RouterAccesses() RouterAccessInformer // SecuredAccesses returns a SecuredAccessInformer. @@ -99,6 +101,11 @@ func (v *version) Listeners() ListenerInformer { return &listenerInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// MultiKeyListeners returns a MultiKeyListenerInformer. +func (v *version) MultiKeyListeners() MultiKeyListenerInformer { + return &multiKeyListenerInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // RouterAccesses returns a RouterAccessInformer. func (v *version) RouterAccesses() RouterAccessInformer { return &routerAccessInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/generated/client/informers/externalversions/skupper/v2alpha1/multikeylistener.go b/pkg/generated/client/informers/externalversions/skupper/v2alpha1/multikeylistener.go new file mode 100644 index 000000000..fc7bad753 --- /dev/null +++ b/pkg/generated/client/informers/externalversions/skupper/v2alpha1/multikeylistener.go @@ -0,0 +1,102 @@ +/* +Copyright 2021 The Skupper Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v2alpha1 + +import ( + context "context" + time "time" + + apisskupperv2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1" + versioned "github.com/skupperproject/skupper/pkg/generated/client/clientset/versioned" + internalinterfaces "github.com/skupperproject/skupper/pkg/generated/client/informers/externalversions/internalinterfaces" + skupperv2alpha1 "github.com/skupperproject/skupper/pkg/generated/client/listers/skupper/v2alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// MultiKeyListenerInformer provides access to a shared informer and lister for +// MultiKeyListeners. +type MultiKeyListenerInformer interface { + Informer() cache.SharedIndexInformer + Lister() skupperv2alpha1.MultiKeyListenerLister +} + +type multiKeyListenerInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewMultiKeyListenerInformer constructs a new informer for MultiKeyListener type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewMultiKeyListenerInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredMultiKeyListenerInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredMultiKeyListenerInformer constructs a new informer for MultiKeyListener type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredMultiKeyListenerInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SkupperV2alpha1().MultiKeyListeners(namespace).List(context.Background(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SkupperV2alpha1().MultiKeyListeners(namespace).Watch(context.Background(), options) + }, + ListWithContextFunc: func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SkupperV2alpha1().MultiKeyListeners(namespace).List(ctx, options) + }, + WatchFuncWithContext: func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SkupperV2alpha1().MultiKeyListeners(namespace).Watch(ctx, options) + }, + }, + &apisskupperv2alpha1.MultiKeyListener{}, + resyncPeriod, + indexers, + ) +} + +func (f *multiKeyListenerInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredMultiKeyListenerInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *multiKeyListenerInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&apisskupperv2alpha1.MultiKeyListener{}, f.defaultInformer) +} + +func (f *multiKeyListenerInformer) Lister() skupperv2alpha1.MultiKeyListenerLister { + return skupperv2alpha1.NewMultiKeyListenerLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/client/listers/skupper/v2alpha1/expansion_generated.go b/pkg/generated/client/listers/skupper/v2alpha1/expansion_generated.go index ea3038b50..14c09efd5 100644 --- a/pkg/generated/client/listers/skupper/v2alpha1/expansion_generated.go +++ b/pkg/generated/client/listers/skupper/v2alpha1/expansion_generated.go @@ -82,6 +82,14 @@ type ListenerListerExpansion interface{} // ListenerNamespaceLister. type ListenerNamespaceListerExpansion interface{} +// MultiKeyListenerListerExpansion allows custom methods to be added to +// MultiKeyListenerLister. +type MultiKeyListenerListerExpansion interface{} + +// MultiKeyListenerNamespaceListerExpansion allows custom methods to be added to +// MultiKeyListenerNamespaceLister. +type MultiKeyListenerNamespaceListerExpansion interface{} + // RouterAccessListerExpansion allows custom methods to be added to // RouterAccessLister. type RouterAccessListerExpansion interface{} diff --git a/pkg/generated/client/listers/skupper/v2alpha1/multikeylistener.go b/pkg/generated/client/listers/skupper/v2alpha1/multikeylistener.go new file mode 100644 index 000000000..b8e086f14 --- /dev/null +++ b/pkg/generated/client/listers/skupper/v2alpha1/multikeylistener.go @@ -0,0 +1,70 @@ +/* +Copyright 2021 The Skupper Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v2alpha1 + +import ( + skupperv2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1" + labels "k8s.io/apimachinery/pkg/labels" + listers "k8s.io/client-go/listers" + cache "k8s.io/client-go/tools/cache" +) + +// MultiKeyListenerLister helps list MultiKeyListeners. +// All objects returned here must be treated as read-only. +type MultiKeyListenerLister interface { + // List lists all MultiKeyListeners in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*skupperv2alpha1.MultiKeyListener, err error) + // MultiKeyListeners returns an object that can list and get MultiKeyListeners. + MultiKeyListeners(namespace string) MultiKeyListenerNamespaceLister + MultiKeyListenerListerExpansion +} + +// multiKeyListenerLister implements the MultiKeyListenerLister interface. +type multiKeyListenerLister struct { + listers.ResourceIndexer[*skupperv2alpha1.MultiKeyListener] +} + +// NewMultiKeyListenerLister returns a new MultiKeyListenerLister. +func NewMultiKeyListenerLister(indexer cache.Indexer) MultiKeyListenerLister { + return &multiKeyListenerLister{listers.New[*skupperv2alpha1.MultiKeyListener](indexer, skupperv2alpha1.Resource("multikeylistener"))} +} + +// MultiKeyListeners returns an object that can list and get MultiKeyListeners. +func (s *multiKeyListenerLister) MultiKeyListeners(namespace string) MultiKeyListenerNamespaceLister { + return multiKeyListenerNamespaceLister{listers.NewNamespaced[*skupperv2alpha1.MultiKeyListener](s.ResourceIndexer, namespace)} +} + +// MultiKeyListenerNamespaceLister helps list and get MultiKeyListeners. +// All objects returned here must be treated as read-only. +type MultiKeyListenerNamespaceLister interface { + // List lists all MultiKeyListeners in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*skupperv2alpha1.MultiKeyListener, err error) + // Get retrieves the MultiKeyListener from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*skupperv2alpha1.MultiKeyListener, error) + MultiKeyListenerNamespaceListerExpansion +} + +// multiKeyListenerNamespaceLister implements the MultiKeyListenerNamespaceLister +// interface. +type multiKeyListenerNamespaceLister struct { + listers.ResourceIndexer[*skupperv2alpha1.MultiKeyListener] +} From 0251d9c87d5fd077045181bfbdbc122ab6218a28 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 4 Feb 2026 11:01:09 -0800 Subject: [PATCH 02/13] Extend router bridge config for MultiKeyListener support Add ListenerAddress entity and multiAddressStrategy field to support priority-based routing. - Add ListenerAddress type for mapping priority values to routing keys - Add ListenerAddressMap to BridgeConfig alongside TcpListeners/TcpConnectors - Extend TcpEndpoint with MultiAddressStrategy field - Update AMQP management to query/create/delete listenerAddress entities - Update config marshaling/unmarshaling for listenerAddress entities - Add ListenerAddressDifference for config synchronization Signed-off-by: Christian Kruse --- internal/qdr/amqp_mgmt.go | 66 +++++++++++++++--- internal/qdr/qdr.go | 142 ++++++++++++++++++++++++++++++++------ internal/qdr/qdr_test.go | 1 + 3 files changed, 179 insertions(+), 30 deletions(-) diff --git a/internal/qdr/amqp_mgmt.go b/internal/qdr/amqp_mgmt.go index bcccbdca4..d53a151e7 100644 --- a/internal/qdr/amqp_mgmt.go +++ b/internal/qdr/amqp_mgmt.go @@ -6,11 +6,12 @@ import ( "fmt" "log/slog" "os" - "slices" "strconv" "strings" "time" + "slices" + amqp "github.com/interconnectedcloud/go-amqp" "github.com/skupperproject/skupper/api/types" "github.com/skupperproject/skupper/internal/config" @@ -125,14 +126,15 @@ func (r Record) AsRecord(field string) Record { func asTcpEndpoint(record Record) TcpEndpoint { endpoint := TcpEndpoint{ - Name: record.AsString("name"), - Host: record.AsString("host"), - Port: record.AsString("port"), - Address: record.AsString("address"), - SiteId: record.AsString("siteId"), - SslProfile: record.AsString("sslProfile"), - Observer: record.AsString("observer"), - ProcessID: record.AsString("processId"), + Name: record.AsString("name"), + Host: record.AsString("host"), + Port: record.AsString("port"), + Address: record.AsString("address"), + SiteId: record.AsString("siteId"), + SslProfile: record.AsString("sslProfile"), + Observer: record.AsString("observer"), + ProcessID: record.AsString("processId"), + MultiAddressStrategy: record.AsString("multiAddressStrategy"), } if value, ok := record["verifyHostname"]; ok { if verify, ok := value.(bool); ok { @@ -142,6 +144,15 @@ func asTcpEndpoint(record Record) TcpEndpoint { return endpoint } +func asListenerAddress(record Record) ListenerAddress { + return ListenerAddress{ + Name: record.AsString("name"), + Address: record.AsString("address"), + Value: record.AsInt("value"), + ListenerRef: record.AsString("listenerRef"), + } +} + func asConnection(record Record) Connection { return Connection{ Role: record.AsString("role"), @@ -900,10 +911,38 @@ func (a *Agent) GetLocalBridgeConfig() (*BridgeConfig, error) { config.AddTcpListener(asTcpEndpoint(record)) } + results, err = a.Query("io.skupper.router.listenerAddress", []string{}) + if err != nil { + return nil, err + } + for _, record := range results { + config.AddListenerAddress(asListenerAddress(record)) + } + return &config, nil } +func (a *Agent) GetLocalListenerAddresses() (map[string]ListenerAddress, error) { + results, err := a.Query("io.skupper.router.listenerAddress", []string{}) + if err != nil { + return nil, err + } + addresses := map[string]ListenerAddress{} + for _, record := range results { + la := asListenerAddress(record) + addresses[la.Name] = la + } + return addresses, nil +} + func (a *Agent) UpdateLocalBridgeConfig(changes *BridgeConfigDifference) error { + // Delete listenerAddresses before their parent tcpListeners + for _, deleted := range changes.ListenerAddresses.Deleted { + if err := a.Delete("io.skupper.router.listenerAddress", deleted); err != nil { + return fmt.Errorf("Error deleting listener addresses: %s", err) + } + } + for _, deleted := range changes.TcpConnectors.Deleted { if err := a.Delete("io.skupper.router.tcpConnector", deleted); err != nil { return fmt.Errorf("Error deleting tcp connectors: %s", err) @@ -914,6 +953,7 @@ func (a *Agent) UpdateLocalBridgeConfig(changes *BridgeConfigDifference) error { return fmt.Errorf("Error deleting tcp listeners: %s", err) } } + for _, added := range changes.TcpConnectors.Added { if err := a.Create("io.skupper.router.tcpConnector", added.Name, added); err != nil { return fmt.Errorf("Error adding tcp connectors: %s", err) @@ -924,6 +964,14 @@ func (a *Agent) UpdateLocalBridgeConfig(changes *BridgeConfigDifference) error { return fmt.Errorf("Error adding tcp listeners: %s", err) } } + + // Add listenerAddresses after their parent tcpListeners + for _, added := range changes.ListenerAddresses.Added { + if err := a.Create("io.skupper.router.listenerAddress", added.Name, added); err != nil { + return fmt.Errorf("Error adding listener addresses: %s", err) + } + } + return nil } diff --git a/internal/qdr/qdr.go b/internal/qdr/qdr.go index dedfb3f3c..013cb270e 100644 --- a/internal/qdr/qdr.go +++ b/internal/qdr/qdr.go @@ -32,10 +32,12 @@ type RouterConfigHandler interface { } type TcpEndpointMap map[string]TcpEndpoint +type ListenerAddressMap map[string]ListenerAddress type BridgeConfig struct { - TcpListeners TcpEndpointMap - TcpConnectors TcpEndpointMap + TcpListeners TcpEndpointMap + TcpConnectors TcpEndpointMap + ListenerAddresses ListenerAddressMap } func InitialConfig(id string, siteId string, version string, edge bool, helloAge int) RouterConfig { @@ -51,8 +53,9 @@ func InitialConfig(id string, siteId string, version string, edge bool, helloAge Connectors: map[string]Connector{}, LogConfig: map[string]LogConfig{}, Bridges: BridgeConfig{ - TcpListeners: map[string]TcpEndpoint{}, - TcpConnectors: map[string]TcpEndpoint{}, + TcpListeners: map[string]TcpEndpoint{}, + TcpConnectors: map[string]TcpEndpoint{}, + ListenerAddresses: map[string]ListenerAddress{}, }, } if edge { @@ -77,8 +80,9 @@ func (r *RouterConfig) AddHealthAndMetricsListener(port int32) { func NewBridgeConfig() BridgeConfig { return BridgeConfig{ - TcpListeners: map[string]TcpEndpoint{}, - TcpConnectors: map[string]TcpEndpoint{}, + TcpListeners: map[string]TcpEndpoint{}, + TcpConnectors: map[string]TcpEndpoint{}, + ListenerAddresses: map[string]ListenerAddress{}, } } @@ -90,6 +94,9 @@ func NewBridgeConfigCopy(src BridgeConfig) BridgeConfig { for k, v := range src.TcpConnectors { newBridges.TcpConnectors[k] = v } + for k, v := range src.ListenerAddresses { + newBridges.ListenerAddresses[k] = v + } return newBridges } @@ -280,6 +287,26 @@ func (bc *BridgeConfig) RemoveTcpListener(name string) (bool, TcpEndpoint) { } } +func (bc *BridgeConfig) AddListenerAddress(la ListenerAddress) bool { + var updated = true + if existing, ok := bc.ListenerAddresses[la.Name]; ok { + if la == existing { + updated = false + } + } + bc.ListenerAddresses[la.Name] = la + return updated +} + +func (bc *BridgeConfig) RemoveListenerAddress(name string) (bool, ListenerAddress) { + la, ok := bc.ListenerAddresses[name] + if !ok { + return false, ListenerAddress{} + } + delete(bc.ListenerAddresses, name) + return true, la +} + func GetTcpConnectors(bridges []BridgeConfig) []TcpEndpoint { connectors := []TcpEndpoint{} for _, bridge := range bridges { @@ -553,15 +580,38 @@ type Address struct { } type TcpEndpoint struct { - Name string `json:"name,omitempty"` - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Address string `json:"address,omitempty"` - SiteId string `json:"siteId,omitempty"` - SslProfile string `json:"sslProfile,omitempty"` - Observer string `json:"observer,omitempty"` - VerifyHostname *bool `json:"verifyHostname,omitempty"` - ProcessID string `json:"processId,omitempty"` + Name string `json:"name,omitempty"` + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Address string `json:"address,omitempty"` + SiteId string `json:"siteId,omitempty"` + SslProfile string `json:"sslProfile,omitempty"` + Observer string `json:"observer,omitempty"` + VerifyHostname *bool `json:"verifyHostname,omitempty"` + ProcessID string `json:"processId,omitempty"` + MultiAddressStrategy string `json:"multiAddressStrategy,omitempty"` +} + +type ListenerAddress struct { + Name string `json:"name,omitempty"` + Address string `json:"address,omitempty"` + Value int `json:"value"` + ListenerRef string `json:"listenerRef,omitempty"` +} + +func (e ListenerAddress) toRecord() Record { + result := make(map[string]any) + if e.Name != "" { + result["name"] = e.Name + } + if e.Address != "" { + result["address"] = e.Address + } + result["value"] = e.Value + if e.ListenerRef != "" { + result["listenerRef"] = e.ListenerRef + } + return result } func (e TcpEndpoint) toRecord() Record { @@ -593,6 +643,9 @@ func (e TcpEndpoint) toRecord() Record { if e.ProcessID != "" { result["processId"] = e.ProcessID } + if e.MultiAddressStrategy != "" { + result["multiAddressStrategy"] = e.MultiAddressStrategy + } return result } @@ -638,8 +691,9 @@ func UnmarshalRouterConfig(config string) (RouterConfig, error) { Connectors: map[string]Connector{}, LogConfig: map[string]LogConfig{}, Bridges: BridgeConfig{ - TcpListeners: map[string]TcpEndpoint{}, - TcpConnectors: map[string]TcpEndpoint{}, + TcpListeners: map[string]TcpEndpoint{}, + TcpConnectors: map[string]TcpEndpoint{}, + ListenerAddresses: map[string]ListenerAddress{}, }, } var obj interface{} @@ -724,6 +778,13 @@ func UnmarshalRouterConfig(config string) (RouterConfig, error) { return result, fmt.Errorf("Invalid %s element got %#v", entityType, element[1]) } result.Bridges.TcpListeners[listener.Name] = listener + case "listenerAddress": + la := ListenerAddress{} + err = convert(element[1], &la) + if err != nil { + return result, fmt.Errorf("Invalid %s element got %#v", entityType, element[1]) + } + result.Bridges.ListenerAddresses[la.Name] = la default: } } @@ -779,6 +840,13 @@ func MarshalRouterConfig(config RouterConfig) (string, error) { } elements = append(elements, tuple) } + for _, e := range config.Bridges.ListenerAddresses { + tuple := []interface{}{ + "listenerAddress", + e, + } + elements = append(elements, tuple) + } for _, e := range config.LogConfig { tuple := []interface{}{ "log", @@ -917,9 +985,15 @@ type TcpEndpointDifference struct { Added []TcpEndpoint } +type ListenerAddressDifference struct { + Deleted []string + Added []ListenerAddress +} + type BridgeConfigDifference struct { TcpListeners TcpEndpointDifference TcpConnectors TcpEndpointDifference + ListenerAddresses ListenerAddressDifference AddedSslProfiles []string DeletedSSlProfiles []string logger *slog.Logger @@ -989,11 +1063,32 @@ func (a TcpEndpointMap) Difference(b TcpEndpointMap) TcpEndpointDifference { return result } +func (a ListenerAddressMap) Difference(b ListenerAddressMap) ListenerAddressDifference { + result := ListenerAddressDifference{} + for key, v1 := range b { + v2, ok := a[key] + if !ok { + result.Added = append(result.Added, v1) + } else if v1 != v2 { + result.Deleted = append(result.Deleted, v1.Name) + result.Added = append(result.Added, v1) + } + } + for key, v1 := range a { + _, ok := b[key] + if !ok { + result.Deleted = append(result.Deleted, v1.Name) + } + } + return result +} + func (a *BridgeConfig) Difference(b *BridgeConfig) *BridgeConfigDifference { result := BridgeConfigDifference{ - logger: slog.New(slog.Default().Handler()).With("component", "qdr.bridgeConfigDifference"), - TcpConnectors: a.TcpConnectors.Difference(b.TcpConnectors), - TcpListeners: a.TcpListeners.Difference(b.TcpListeners), + logger: slog.New(slog.Default().Handler()).With("component", "qdr.bridgeConfigDifference"), + TcpConnectors: a.TcpConnectors.Difference(b.TcpConnectors), + TcpListeners: a.TcpListeners.Difference(b.TcpListeners), + ListenerAddresses: a.ListenerAddresses.Difference(b.ListenerAddresses), } result.AddedSslProfiles, result.DeletedSSlProfiles = getSslProfilesDifference(a, b) @@ -1054,13 +1149,18 @@ func (a *TcpEndpointDifference) Empty() bool { return len(a.Deleted) == 0 && len(a.Added) == 0 } +func (a *ListenerAddressDifference) Empty() bool { + return len(a.Deleted) == 0 && len(a.Added) == 0 +} + func (a *BridgeConfigDifference) Empty() bool { - return a.TcpConnectors.Empty() && a.TcpListeners.Empty() + return a.TcpConnectors.Empty() && a.TcpListeners.Empty() && a.ListenerAddresses.Empty() } func (a *BridgeConfigDifference) Print() { a.logger.Info("TcpConnectors", slog.Any("added", a.TcpConnectors.Added), slog.Any("deleted", a.TcpConnectors.Deleted)) a.logger.Info("TcpListeners", slog.Any("added", a.TcpListeners.Added), slog.Any("deleted", a.TcpListeners.Deleted)) + a.logger.Info("ListenerAddresses", slog.Any("added", a.ListenerAddresses.Added), slog.Any("deleted", a.ListenerAddresses.Deleted)) a.logger.Info("SslProfiles", slog.Any("added", a.AddedSslProfiles), slog.Any("deleted", a.DeletedSSlProfiles)) } diff --git a/internal/qdr/qdr_test.go b/internal/qdr/qdr_test.go index bc4de35b9..268f26f0f 100644 --- a/internal/qdr/qdr_test.go +++ b/internal/qdr/qdr_test.go @@ -252,6 +252,7 @@ func TestMarshalUnmarshalRouterConfig(t *testing.T) { SiteId: "def", }, }, + ListenerAddresses: ListenerAddressMap{}, }, Addresses: map[string]Address{ "happy": Address{ From d0d44bda998a09578250e05404f195cef1bdaab0 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 4 Feb 2026 11:02:03 -0800 Subject: [PATCH 03/13] Implement site bindings for MultiKeyListener Add core MultiKeyListener support to the site bindings package Signed-off-by: Christian Kruse --- internal/site/bindings.go | 76 +++++++++++++++++++++++++------ internal/site/multikeylistener.go | 74 ++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 13 deletions(-) create mode 100644 internal/site/multikeylistener.go diff --git a/internal/site/bindings.go b/internal/site/bindings.go index ab62171b6..f627d6afb 100644 --- a/internal/site/bindings.go +++ b/internal/site/bindings.go @@ -9,6 +9,7 @@ import ( type ListenerConfiguration func(siteId string, listener *skupperv2alpha1.Listener, config *qdr.BridgeConfig) type ConnectorConfiguration func(siteId string, connector *skupperv2alpha1.Connector, config *qdr.BridgeConfig) +type MultiKeyListenerConfiguration func(siteId string, mkl *skupperv2alpha1.MultiKeyListener, config *qdr.BridgeConfig) type BindingEventHandler interface { ListenerUpdated(listener *skupperv2alpha1.Listener) @@ -21,25 +22,29 @@ type ConnectorFunction func(*skupperv2alpha1.Connector) *skupperv2alpha1.Connect type ListenerFunction func(*skupperv2alpha1.Listener) *skupperv2alpha1.Listener type Bindings struct { - SiteId string - ProfilePath string - connectors map[string]*skupperv2alpha1.Connector - listeners map[string]*skupperv2alpha1.Listener - handler BindingEventHandler - configure struct { - listener ListenerConfiguration - connector ConnectorConfiguration + SiteId string + ProfilePath string + connectors map[string]*skupperv2alpha1.Connector + listeners map[string]*skupperv2alpha1.Listener + multiKeyListeners map[string]*skupperv2alpha1.MultiKeyListener + handler BindingEventHandler + configure struct { + listener ListenerConfiguration + connector ConnectorConfiguration + multiKeyListener MultiKeyListenerConfiguration } } func NewBindings(profilePath string) *Bindings { bindings := &Bindings{ - ProfilePath: profilePath, - connectors: map[string]*skupperv2alpha1.Connector{}, - listeners: map[string]*skupperv2alpha1.Listener{}, + ProfilePath: profilePath, + connectors: map[string]*skupperv2alpha1.Connector{}, + listeners: map[string]*skupperv2alpha1.Listener{}, + multiKeyListeners: map[string]*skupperv2alpha1.MultiKeyListener{}, } bindings.configure.listener = UpdateBridgeConfigForListener bindings.configure.connector = UpdateBridgeConfigForConnector + bindings.configure.multiKeyListener = UpdateBridgeConfigForMultiKeyListener return bindings } @@ -159,10 +164,47 @@ func (b *Bindings) deleteListener(name string) qdr.ConfigUpdate { return nil } +func (b *Bindings) GetMultiKeyListener(name string) *skupperv2alpha1.MultiKeyListener { + if existing, ok := b.multiKeyListeners[name]; ok { + return existing + } + return nil +} + +func (b *Bindings) UpdateMultiKeyListener(name string, mkl *skupperv2alpha1.MultiKeyListener) qdr.ConfigUpdate { + if mkl == nil { + return b.deleteMultiKeyListener(name) + } + return b.updateMultiKeyListener(mkl) +} + +func (b *Bindings) updateMultiKeyListener(mkl *skupperv2alpha1.MultiKeyListener) qdr.ConfigUpdate { + name := mkl.ObjectMeta.Name + existing, ok := b.multiKeyListeners[name] + b.multiKeyListeners[name] = mkl + if ok && reflect.DeepEqual(existing.Spec, mkl.Spec) { + return nil + } + return b +} + +func (b *Bindings) deleteMultiKeyListener(name string) qdr.ConfigUpdate { + if _, ok := b.multiKeyListeners[name]; !ok { + return nil + } + delete(b.multiKeyListeners, name) + return b +} + +func (b *Bindings) SetMultiKeyListenerConfiguration(configuration MultiKeyListenerConfiguration) { + b.configure.multiKeyListener = configuration +} + func (b *Bindings) ToBridgeConfig() qdr.BridgeConfig { config := qdr.BridgeConfig{ - TcpListeners: qdr.TcpEndpointMap{}, - TcpConnectors: qdr.TcpEndpointMap{}, + TcpListeners: qdr.TcpEndpointMap{}, + TcpConnectors: qdr.TcpEndpointMap{}, + ListenerAddresses: qdr.ListenerAddressMap{}, } for _, c := range b.connectors { b.configure.connector(b.SiteId, c, &config) @@ -170,6 +212,9 @@ func (b *Bindings) ToBridgeConfig() qdr.BridgeConfig { for _, l := range b.listeners { b.configure.listener(b.SiteId, l, &config) } + for _, mkl := range b.multiKeyListeners { + b.configure.multiKeyListener(b.SiteId, mkl, &config) + } return config } @@ -197,6 +242,11 @@ func (b *Bindings) AddSslProfiles(config *qdr.RouterConfig) bool { profiles[l.Spec.TlsCredentials] = qdr.ConfigureSslProfile(l.Spec.TlsCredentials, b.ProfilePath, true) } } + for _, mkl := range b.multiKeyListeners { + if _, ok := profiles[mkl.Spec.TlsCredentials]; mkl.Spec.TlsCredentials != "" && !ok { + profiles[mkl.Spec.TlsCredentials] = qdr.ConfigureSslProfile(mkl.Spec.TlsCredentials, b.ProfilePath, true) + } + } changed := false for _, profile := range profiles { if config.AddSslProfile(profile) { diff --git a/internal/site/multikeylistener.go b/internal/site/multikeylistener.go new file mode 100644 index 000000000..a4773393a --- /dev/null +++ b/internal/site/multikeylistener.go @@ -0,0 +1,74 @@ +package site + +import ( + "strconv" + + "github.com/skupperproject/skupper/internal/qdr" + skupperv2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1" +) + +const unusedAddressSentinel = "_unused_" + +// multiAddressTcpListenerName returns the tcpListener name for a MultiKeyListener. +// The format "multiAddress/" avoids collisions with regular Listener resources. +func multiAddressTcpListenerName(name string) string { + return "multiAddress/" + name +} + +// listenerAddressName returns the listenerAddress name for a routing key. +// The format "/
" provides clear association with the parent listener. +func listenerAddressName(mklName, address string) string { + return mklName + "/" + address +} + +// UpdateBridgeConfigForMultiKeyListener creates the tcpListener and listenerAddress +// entities needed to implement a MultiKeyListener with priority strategy. +func UpdateBridgeConfigForMultiKeyListener(siteId string, mkl *skupperv2alpha1.MultiKeyListener, config *qdr.BridgeConfig) { + UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId, mkl, mkl.Spec.Host, mkl.Spec.Port, config) +} + +// UpdateBridgeConfigForMultiKeyListenerWithHostAndPort creates the tcpListener and +// listenerAddress entities with the specified host and port. +func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *skupperv2alpha1.MultiKeyListener, host string, port int, config *qdr.BridgeConfig) { + name := mkl.Name + tcpListenerName := multiAddressTcpListenerName(name) + + config.AddTcpListener(qdr.TcpEndpoint{ + Name: tcpListenerName, + Address: unusedAddressSentinel, // TODO (ck) this address is unused but required for now + SiteId: siteId, + Host: host, + Port: strconv.Itoa(port), + SslProfile: mkl.Spec.TlsCredentials, + MultiAddressStrategy: "priorityFailover", + }) + + // Create listenerAddress entities for each routing key in the strategy + if mkl.Spec.Strategy.Priority != nil { + numKeys := len(mkl.Spec.Strategy.Priority.RoutingKeys) + for i, routingKey := range mkl.Spec.Strategy.Priority.RoutingKeys { + laName := listenerAddressName(name, routingKey) + config.AddListenerAddress(qdr.ListenerAddress{ + Name: laName, + Address: routingKey, + Value: numKeys - 1 - i, // higher value = higher priority + ListenerRef: tcpListenerName, + }) + } + } +} + +// RemoveBridgeConfigForMultiKeyListener removes the tcpListener and listenerAddress +// entities for a MultiKeyListener. This function removes all listenerAddress entities +// with the given listener name as their reference. +func RemoveBridgeConfigForMultiKeyListener(name string, config *qdr.BridgeConfig) { + tcpListenerName := multiAddressTcpListenerName(name) + // Remove all listenerAddresses that reference this listener + for laName, la := range config.ListenerAddresses { + if la.ListenerRef == tcpListenerName { + config.RemoveListenerAddress(laName) + } + } + // Remove the tcpListener + config.RemoveTcpListener(tcpListenerName) +} From dae3ad51dd8814ed9e756289ee4af072b2203019 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 4 Feb 2026 11:11:31 -0800 Subject: [PATCH 04/13] Add MultiKeyListener watcher types and informer Add the watcher infrastructure for MultiKeyListener resources Signed-off-by: Christian Kruse --- internal/kube/watchers/resources.go | 2 ++ internal/kube/watchers/watchers.go | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/internal/kube/watchers/resources.go b/internal/kube/watchers/resources.go index 520820842..898ae197d 100644 --- a/internal/kube/watchers/resources.go +++ b/internal/kube/watchers/resources.go @@ -58,6 +58,8 @@ type ( LinkWatcher = ResourceWatcher[*v2alpha1.Link] ListenerHandler = Handler[*v2alpha1.Listener] ListenerWatcher = ResourceWatcher[*v2alpha1.Listener] + MultiKeyListenerHandler = Handler[*v2alpha1.MultiKeyListener] + MultiKeyListenerWatcher = ResourceWatcher[*v2alpha1.MultiKeyListener] RouterAccessHandler = Handler[*v2alpha1.RouterAccess] RouterAccessWatcher = ResourceWatcher[*v2alpha1.RouterAccess] SecuredAccessHandler = Handler[*v2alpha1.SecuredAccess] diff --git a/internal/kube/watchers/watchers.go b/internal/kube/watchers/watchers.go index de6b9bf7c..95341763a 100644 --- a/internal/kube/watchers/watchers.go +++ b/internal/kube/watchers/watchers.go @@ -463,6 +463,15 @@ func (c *EventProcessor) WatchListeners(namespace string, handler ListenerHandle return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) } +func (c *EventProcessor) WatchMultiKeyListeners(namespace string, handler MultiKeyListenerHandler) *MultiKeyListenerWatcher { + informer := skupperv2alpha1informer.NewMultiKeyListenerInformer( + c.skupperClient, + namespace, + c.resyncShort, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + return addEventProcessorWatcher(c, handler, v2alpha1.SchemeGroupVersion, informer) +} + func (c *EventProcessor) WatchConnectors(namespace string, handler ConnectorHandler) *ConnectorWatcher { informer := skupperv2alpha1informer.NewConnectorInformer( c.skupperClient, From b0d167969524852ce1da0e1e5c2d9184275c3f27 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 4 Feb 2026 11:12:21 -0800 Subject: [PATCH 05/13] Implement ExtendedBindings support for MultiKeyListener Add MultiKeyListener handling to the kube site ExtendedBindings: - Track multiKeyListenerHosts for service exposure management - Implement updateBridgeConfigForMultiKeyListener with port mapping - Add multiKeyListenerUpdated for exposing services - Add multiKeyListenerDeleted for cleaning up exposed services - Implement UpdateMultiKeyListener/GetMultiKeyListener methods - Update bindings_test.go to include ListenerAddresses in expected configs Signed-off-by: Christian Kruse --- internal/kube/site/bindings_test.go | 15 ++- internal/kube/site/extended_bindings.go | 161 +++++++++++++++++++++--- 2 files changed, 152 insertions(+), 24 deletions(-) diff --git a/internal/kube/site/bindings_test.go b/internal/kube/site/bindings_test.go index 1d6ba2c40..dfc4a206d 100644 --- a/internal/kube/site/bindings_test.go +++ b/internal/kube/site/bindings_test.go @@ -291,6 +291,7 @@ func TestBindingAdaptor_updateBridgeConfigForConnector(t *testing.T) { SiteId: "00000000-0000-0000-0000-000000000001", }, }, + ListenerAddresses: qdr.ListenerAddressMap{}, }, }, }, @@ -333,6 +334,7 @@ func TestBindingAdaptor_updateBridgeConfigForConnector(t *testing.T) { ProcessID: "30af5279-be83-41e4-86fe-cc45396786f4", }, }, + ListenerAddresses: qdr.ListenerAddressMap{}, }, }, }, @@ -358,8 +360,9 @@ func TestBindingAdaptor_updateBridgeConfigForConnector(t *testing.T) { }, expected: expected{ config: qdr.BridgeConfig{ - TcpListeners: map[string]qdr.TcpEndpoint{}, - TcpConnectors: map[string]qdr.TcpEndpoint{}, + TcpListeners: map[string]qdr.TcpEndpoint{}, + TcpConnectors: map[string]qdr.TcpEndpoint{}, + ListenerAddresses: qdr.ListenerAddressMap{}, }, }, }, @@ -383,8 +386,9 @@ func TestBindingAdaptor_updateBridgeConfigForConnector(t *testing.T) { }, expected: expected{ config: qdr.BridgeConfig{ - TcpListeners: map[string]qdr.TcpEndpoint{}, - TcpConnectors: map[string]qdr.TcpEndpoint{}, + TcpListeners: map[string]qdr.TcpEndpoint{}, + TcpConnectors: map[string]qdr.TcpEndpoint{}, + ListenerAddresses: qdr.ListenerAddressMap{}, }, }, }, @@ -599,7 +603,8 @@ func TestBindingAdaptor_updateBridgeConfigForListener(t *testing.T) { SiteId: "00000000-0000-0000-0000-000000000001", }, }, - TcpConnectors: map[string]qdr.TcpEndpoint{}, + TcpConnectors: map[string]qdr.TcpEndpoint{}, + ListenerAddresses: qdr.ListenerAddressMap{}, }, }, }, diff --git a/internal/kube/site/extended_bindings.go b/internal/kube/site/extended_bindings.go index 870e7440b..467209e34 100644 --- a/internal/kube/site/extended_bindings.go +++ b/internal/kube/site/extended_bindings.go @@ -5,6 +5,8 @@ import ( "fmt" "log/slog" + corev1 "k8s.io/api/core/v1" + "github.com/skupperproject/skupper/internal/kube/watchers" "github.com/skupperproject/skupper/internal/qdr" "github.com/skupperproject/skupper/internal/site" @@ -12,31 +14,34 @@ import ( ) type ExtendedBindings struct { - context BindingContext - mapping *qdr.PortMapping - exposed ExposedPorts - selectors map[string]TargetSelection - bindings *site.Bindings - connectors map[string]*AttachedConnector - perTargetListeners map[string]*PerTargetListener - listenerHosts map[string]string // listener name -> host - controller *watchers.EventProcessor - site *Site - logger *slog.Logger + context BindingContext + mapping *qdr.PortMapping + exposed ExposedPorts + selectors map[string]TargetSelection + bindings *site.Bindings + connectors map[string]*AttachedConnector + perTargetListeners map[string]*PerTargetListener + listenerHosts map[string]string // listener name -> host + multiKeyListenerHosts map[string]string // multikeylistener name -> host + controller *watchers.EventProcessor + site *Site + logger *slog.Logger } func NewExtendedBindings(controller *watchers.EventProcessor, profilePath string) *ExtendedBindings { eb := &ExtendedBindings{ - bindings: site.NewBindings(profilePath), - connectors: map[string]*AttachedConnector{}, - perTargetListeners: map[string]*PerTargetListener{}, - listenerHosts: map[string]string{}, - controller: controller, + bindings: site.NewBindings(profilePath), + connectors: map[string]*AttachedConnector{}, + perTargetListeners: map[string]*PerTargetListener{}, + listenerHosts: map[string]string{}, + multiKeyListenerHosts: map[string]string{}, + controller: controller, logger: slog.New(slog.Default().Handler()).With( slog.String("component", "kube.site.attached_connector"), ), } eb.bindings.SetListenerConfiguration(eb.updateBridgeConfigForListener) + eb.bindings.SetMultiKeyListenerConfiguration(eb.updateBridgeConfigForMultiKeyListener) return eb } @@ -50,6 +55,7 @@ func (a *ExtendedBindings) init(context BindingContext, config *qdr.RouterConfig a.bindings.SetBindingEventHandler(a) a.bindings.SetConnectorConfiguration(a.updateBridgeConfigForConnector) a.bindings.SetListenerConfiguration(a.updateBridgeConfigForListener) + a.bindings.SetMultiKeyListenerConfiguration(a.updateBridgeConfigForMultiKeyListener) } func (a *ExtendedBindings) cleanup() { @@ -174,13 +180,86 @@ func (a *ExtendedBindings) updateBridgeConfigForListener(siteId string, listener if a.mapping == nil { a.mapping = qdr.RecoverPortMapping(nil) } - if port, err := a.mapping.GetPortForKey(listener.Name); err == nil { - site.UpdateBridgeConfigForListenerWithHostAndPort(siteId, listener, "", port, config) - } else { + port, err := a.mapping.GetPortForKey(listener.Name) + if err != nil { bindings_logger.Error("Could not allocate port for %s/%s: %s", slog.String("namespace", listener.Namespace), slog.String("name", listener.Name)) + return + } + site.UpdateBridgeConfigForListenerWithHostAndPort(siteId, listener, "", port, config) +} + +func (a *ExtendedBindings) updateBridgeConfigForMultiKeyListener(siteId string, mkl *skupperv2alpha1.MultiKeyListener, config *qdr.BridgeConfig) { + if a.mapping == nil { + a.mapping = qdr.RecoverPortMapping(nil) + } + port, err := a.mapping.GetPortForKey(mkl.Name) + if err != nil { + bindings_logger.Error("Could not allocate port for multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name)) + return + } + site.UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId, mkl, "", port, config) +} + +func (a *ExtendedBindings) multiKeyListenerUpdated(mkl *skupperv2alpha1.MultiKeyListener) { + allocatedRouterPort, err := a.mapping.GetPortForKey(mkl.Name) + if err != nil { + bindings_logger.Error("Unable to get port for multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name), + slog.Any("error", err), + ) + return + } + port := Port{ + Name: mkl.Name, + Port: mkl.Spec.Port, + TargetPort: allocatedRouterPort, + Protocol: corev1.ProtocolTCP, + } + if exposed := a.exposed.Expose(mkl.Spec.Host, port); exposed != nil { + if err := a.context.Expose(exposed); err != nil { + bindings_logger.Error("Error exposing multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name), + slog.Any("error", err)) + return + } + bindings_logger.Info("Exposed multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name)) + } +} + +func (a *ExtendedBindings) multiKeyListenerDeleted(mkl *skupperv2alpha1.MultiKeyListener) { + + exposed := a.exposed.Unexpose(mkl.Spec.Host, mkl.Name) + if exposed == nil { + return + } + a.mapping.ReleasePortForKey(mkl.Name) + if exposed.empty() { + if err := a.context.Unexpose(mkl.Spec.Host); err != nil { + bindings_logger.Error("Error unexposing multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name), + slog.Any("error", err)) + } + return } + if err := a.context.Expose(exposed); err != nil { + bindings_logger.Error("Error re-exposing service after deleting multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name), + slog.Any("error", err)) + return + } + bindings_logger.Info("Re-exposed service after deleting multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name)) } func (b *ExtendedBindings) SetListenerConfiguration(configuration site.ListenerConfiguration) { @@ -242,6 +321,50 @@ func (b *ExtendedBindings) UpdateListener(name string, listener *skupperv2alpha1 return b, errors.Join(errs...) } +func (b *ExtendedBindings) UpdateMultiKeyListener(name string, mkl *skupperv2alpha1.MultiKeyListener) (qdr.ConfigUpdate, error) { + var errs []error + updateConfig := false + + if mkl != nil { + // Handle host change - unexpose previous host if changed + if previousHost, ok := b.multiKeyListenerHosts[name]; ok && previousHost != mkl.Spec.Host { + if exposed := b.exposed.Unexpose(previousHost, name); exposed != nil && exposed.empty() { + if err := b.context.Unexpose(previousHost); err != nil { + errs = append(errs, err) + } + } + } + b.multiKeyListenerHosts[name] = mkl.Spec.Host + b.multiKeyListenerUpdated(mkl) + } else { + // Deletion case + if previousHost, ok := b.multiKeyListenerHosts[name]; ok { + existingMkl := b.bindings.GetMultiKeyListener(name) + if existingMkl != nil { + b.multiKeyListenerDeleted(existingMkl) + } + delete(b.multiKeyListenerHosts, name) + if exposed := b.exposed.Unexpose(previousHost, name); exposed != nil && exposed.empty() { + if err := b.context.Unexpose(previousHost); err != nil { + errs = append(errs, err) + } + } + } + } + + if b.bindings.UpdateMultiKeyListener(name, mkl) != nil { + updateConfig = true + } + if !updateConfig { + return nil, errors.Join(errs...) + } + return b, errors.Join(errs...) +} + +func (b *ExtendedBindings) GetMultiKeyListener(name string) *skupperv2alpha1.MultiKeyListener { + return b.bindings.GetMultiKeyListener(name) +} + func (b *ExtendedBindings) GetConnector(name string) *skupperv2alpha1.Connector { return b.bindings.GetConnector(name) } From e7a28f3913ce1c1f9be015e03edea36d1b00bef9 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 4 Feb 2026 11:12:42 -0800 Subject: [PATCH 06/13] Wire MultiKeyListener into controller and site reconciliation Signed-off-by: Christian Kruse --- internal/kube/controller/controller.go | 72 +++++++++++++++++--------- internal/kube/site/site.go | 29 +++++++++++ 2 files changed, 76 insertions(+), 25 deletions(-) diff --git a/internal/kube/controller/controller.go b/internal/kube/controller/controller.go index 16b45c2fa..cc75816ca 100644 --- a/internal/kube/controller/controller.go +++ b/internal/kube/controller/controller.go @@ -30,31 +30,32 @@ import ( ) type Controller struct { - self skupperv2alpha1.Controller - deploymentName string - deploymentUid string - eventProcessor *watchers.EventProcessor - stopCh <-chan struct{} - siteWatcher *watchers.SiteWatcher - listenerWatcher *watchers.ListenerWatcher - connectorWatcher *watchers.ConnectorWatcher - linkAccessWatcher *watchers.RouterAccessWatcher - grantWatcher *watchers.AccessGrantWatcher - serviceWatcher *watchers.ServiceWatcher - sites map[string]*site.Site - startGrantServer func() - accessMgr *securedaccess.SecuredAccessManager - accessRecovery *securedaccess.SecuredAccessResourceWatcher - certMgr *certificates.CertificateManagerImpl - siteSizing *sizing.Registry - siteSizingWatcher *watchers.ConfigMapWatcher - labelling *labels.LabelsAndAnnotations - labellingWatcher *watchers.ConfigMapWatcher - attachableConnectors map[string]*skupperv2alpha1.AttachedConnector - disableSecContext bool - log *slog.Logger - namespaces *NamespaceConfig - observedServices map[string]string + self skupperv2alpha1.Controller + deploymentName string + deploymentUid string + eventProcessor *watchers.EventProcessor + stopCh <-chan struct{} + siteWatcher *watchers.SiteWatcher + listenerWatcher *watchers.ListenerWatcher + connectorWatcher *watchers.ConnectorWatcher + multiKeyListenerWatcher *watchers.MultiKeyListenerWatcher + linkAccessWatcher *watchers.RouterAccessWatcher + grantWatcher *watchers.AccessGrantWatcher + serviceWatcher *watchers.ServiceWatcher + sites map[string]*site.Site + startGrantServer func() + accessMgr *securedaccess.SecuredAccessManager + accessRecovery *securedaccess.SecuredAccessResourceWatcher + certMgr *certificates.CertificateManagerImpl + siteSizing *sizing.Registry + siteSizingWatcher *watchers.ConfigMapWatcher + labelling *labels.LabelsAndAnnotations + labellingWatcher *watchers.ConfigMapWatcher + attachableConnectors map[string]*skupperv2alpha1.AttachedConnector + disableSecContext bool + log *slog.Logger + namespaces *NamespaceConfig + observedServices map[string]string } func skupperRouterConfig() internalinterfaces.TweakListOptionsFunc { @@ -130,6 +131,7 @@ func NewController(cli internalclient.Clients, config *Config, options ...watche controller.siteWatcher = controller.eventProcessor.WatchSites(config.WatchNamespace, filter(controller, controller.checkSite)) controller.listenerWatcher = controller.eventProcessor.WatchListeners(config.WatchNamespace, filter(controller, controller.checkListener)) + controller.multiKeyListenerWatcher = controller.eventProcessor.WatchMultiKeyListeners(config.WatchNamespace, filter(controller, controller.checkMultiKeyListener)) controller.eventProcessor.WatchServices(listenerServices(), config.WatchNamespace, filter(controller, controller.checkListenerService)) controller.serviceWatcher = controller.eventProcessor.WatchServices(sansSkupperListenerServices(), config.WatchNamespace, filter(controller, controller.checkObservedService)) controller.connectorWatcher = controller.eventProcessor.WatchConnectors(config.WatchNamespace, filter(controller, controller.checkConnector)) @@ -289,6 +291,17 @@ func (c *Controller) init(stopCh <-chan struct{}) error { _, svcExists := c.observedServices[listener.ObjectMeta.Namespace+"/"+listener.Spec.Host] site.CheckListener(listener.ObjectMeta.Name, listener, svcExists) } + for _, mkl := range c.multiKeyListenerWatcher.List() { + if !c.namespaces.isControlled(mkl.Namespace) { + continue + } + site := c.getSite(mkl.ObjectMeta.Namespace) + c.log.Info("Recovering multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name), + ) + site.CheckMultiKeyListener(mkl.ObjectMeta.Name, mkl) + } for _, la := range c.linkAccessWatcher.List() { if !c.namespaces.isControlled(la.Namespace) { continue @@ -395,6 +408,15 @@ func (c *Controller) checkListener(key string, listener *skupperv2alpha1.Listene return c.getSite(namespace).CheckListener(name, listener, svcExists) } +func (c *Controller) checkMultiKeyListener(key string, mkl *skupperv2alpha1.MultiKeyListener) error { + c.log.Debug("checkMultiKeyListener", slog.String("key", key)) + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + return c.getSite(namespace).CheckMultiKeyListener(name, mkl) +} + func (c *Controller) checkListenerService(key string, svc *corev1.Service) error { c.log.Debug("checkListenerService", slog.String("key", key)) if svc == nil { diff --git a/internal/kube/site/site.go b/internal/kube/site/site.go index 2be4ad3ee..83b9dcd46 100644 --- a/internal/kube/site/site.go +++ b/internal/kube/site/site.go @@ -958,6 +958,35 @@ func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener, sv return s.updateListenerStatus(listener, stderrors.Join(err1, err2)) } +func (s *Site) CheckMultiKeyListener(name string, mkl *skupperv2alpha1.MultiKeyListener) error { + update, err1 := s.bindings.UpdateMultiKeyListener(name, mkl) + if s.site == nil { + if mkl == nil { + return nil + } + return s.updateMultiKeyListenerStatus(mkl, stderrors.New("No active site in namespace")) + } + if update == nil { + return nil + } + err2 := s.updateRouterConfig(update) + if mkl == nil { + return stderrors.Join(err1, err2) + } + return s.updateMultiKeyListenerStatus(mkl, stderrors.Join(err1, err2)) +} + +func (s *Site) updateMultiKeyListenerStatus(mkl *skupperv2alpha1.MultiKeyListener, err error) error { + if mkl.SetConfigured(err) { + updated, updateErr := s.clients.GetSkupperClient().SkupperV2alpha1().MultiKeyListeners(mkl.ObjectMeta.Namespace).UpdateStatus(context.TODO(), mkl, metav1.UpdateOptions{}) + if updateErr != nil { + return updateErr + } + s.bindings.UpdateMultiKeyListener(updated.Name, updated) + } + return nil +} + func (s *Site) setBindingsConfiguredStatus(err error) { lf := func(listener *skupperv2alpha1.Listener) *skupperv2alpha1.Listener { if listener.SetConfigured(nil) { From caafaeb1d9a9b51017544dd9d4cdb71a884dfc9f Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 4 Feb 2026 12:15:23 -0800 Subject: [PATCH 07/13] Implement READY condition reconciliation for MultiKeyListener Add status reconciliation that updates MultiKeyListener READY condition based on matching connectors in the network. The MultiKeyListener becomes READY when any routing key in its strategy has a reachable connector. Signed-off-by: Christian Kruse --- internal/kube/site/binding_status.go | 31 +++++++++++++++++++ internal/kube/site/extended_bindings.go | 4 +++ internal/kube/site/site.go | 5 +++ internal/site/bindings.go | 11 +++++++ .../v2alpha1/multikeylistener_types.go | 28 ++++++++++++++++- 5 files changed, 78 insertions(+), 1 deletion(-) diff --git a/internal/kube/site/binding_status.go b/internal/kube/site/binding_status.go index 2244f2bf7..22e8fa136 100644 --- a/internal/kube/site/binding_status.go +++ b/internal/kube/site/binding_status.go @@ -84,6 +84,37 @@ func (s *BindingStatus) updateMatchingListenerCountForAttachedConnector(connecto } } +func (s *BindingStatus) updateMultiKeyListenerDestination(mkl *skupperv2alpha1.MultiKeyListener) *skupperv2alpha1.MultiKeyListener { + routingKeys := mkl.GetRoutingKeys() + + // Find which routing keys have matching connectors, preserving priority order + var reachable []string + for _, key := range routingKeys { + if len(s.connectors[key]) > 0 { + reachable = append(reachable, key) + } + } + + hasDestination := len(reachable) > 0 + changed := mkl.SetHasDestination(hasDestination) + if mkl.SetRoutingKeysReachable(reachable) { + changed = true + } + + if changed { + updated, err := updateMultiKeyListenerStatus(s.client, mkl) + if err != nil { + s.logger.Error("Failed to update status for multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name)) + s.errors = append(s.errors, err.Error()) + return nil + } + return updated + } + return nil +} + func (s *BindingStatus) error() error { if len(s.errors) > 0 { return fmt.Errorf("%s", strings.Join(s.errors, ", ")) diff --git a/internal/kube/site/extended_bindings.go b/internal/kube/site/extended_bindings.go index 467209e34..afe50092a 100644 --- a/internal/kube/site/extended_bindings.go +++ b/internal/kube/site/extended_bindings.go @@ -373,6 +373,10 @@ func (b *ExtendedBindings) Map(cf site.ConnectorFunction, lf site.ListenerFuncti b.bindings.Map(cf, lf) } +func (b *ExtendedBindings) MapOverMultiKeyListeners(mkf site.MultiKeyListenerFunction) { + b.bindings.MapOverMultiKeyListeners(mkf) +} + type AttachedConnectorFunction func(*AttachedConnector) func (b *ExtendedBindings) MapOverAttachedConnectors(cf AttachedConnectorFunction) { diff --git a/internal/kube/site/site.go b/internal/kube/site/site.go index 83b9dcd46..a5a431fbf 100644 --- a/internal/kube/site/site.go +++ b/internal/kube/site/site.go @@ -1198,6 +1198,7 @@ func (s *Site) NetworkStatusUpdated(network []skupperv2alpha1.SiteRecord) error bindingStatus := newBindingStatus(s.clients, network) s.bindings.Map(bindingStatus.updateMatchingListenerCount, bindingStatus.updateMatchingConnectorCount) + s.bindings.MapOverMultiKeyListeners(bindingStatus.updateMultiKeyListenerDestination) s.logger.Debug("Updating matching listeners for attached connectors") s.bindings.MapOverAttachedConnectors(bindingStatus.updateMatchingListenerCountForAttachedConnector) return bindingStatus.error() @@ -1470,6 +1471,10 @@ func updateListenerStatus(client internalclient.Clients, listener *skupperv2alph return client.GetSkupperClient().SkupperV2alpha1().Listeners(listener.ObjectMeta.Namespace).UpdateStatus(context.TODO(), listener, metav1.UpdateOptions{}) } +func updateMultiKeyListenerStatus(client internalclient.Clients, mkl *skupperv2alpha1.MultiKeyListener) (*skupperv2alpha1.MultiKeyListener, error) { + return client.GetSkupperClient().SkupperV2alpha1().MultiKeyListeners(mkl.ObjectMeta.Namespace).UpdateStatus(context.TODO(), mkl, metav1.UpdateOptions{}) +} + func getLabelsForRouter() map[string]string { return map[string]string{ "application": types.TransportDeploymentName, diff --git a/internal/site/bindings.go b/internal/site/bindings.go index f627d6afb..d23d51c52 100644 --- a/internal/site/bindings.go +++ b/internal/site/bindings.go @@ -20,6 +20,7 @@ type BindingEventHandler interface { type ConnectorFunction func(*skupperv2alpha1.Connector) *skupperv2alpha1.Connector type ListenerFunction func(*skupperv2alpha1.Listener) *skupperv2alpha1.Listener +type MultiKeyListenerFunction func(*skupperv2alpha1.MultiKeyListener) *skupperv2alpha1.MultiKeyListener type Bindings struct { SiteId string @@ -87,6 +88,16 @@ func (b *Bindings) Map(cf ConnectorFunction, lf ListenerFunction) { } } +func (b *Bindings) MapOverMultiKeyListeners(mkf MultiKeyListenerFunction) { + if mkf != nil { + for key, mkl := range b.multiKeyListeners { + if updated := mkf(mkl); updated != nil { + b.multiKeyListeners[key] = updated + } + } + } +} + func (b *Bindings) GetConnector(name string) *skupperv2alpha1.Connector { if existing, ok := b.connectors[name]; ok { return existing diff --git a/pkg/apis/skupper/v2alpha1/multikeylistener_types.go b/pkg/apis/skupper/v2alpha1/multikeylistener_types.go index 608c00a68..ba8c1582d 100644 --- a/pkg/apis/skupper/v2alpha1/multikeylistener_types.go +++ b/pkg/apis/skupper/v2alpha1/multikeylistener_types.go @@ -1,8 +1,10 @@ package v2alpha1 import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "reflect" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // +genclient @@ -191,3 +193,27 @@ func (m *MultiKeyListener) SetHasDestination(value bool) bool { } return false } + +func (m *MultiKeyListener) SetRoutingKeysReachable(keys []string) bool { + if m.Status.Strategy == nil { + m.Status.Strategy = &StrategyStatus{} + } + if m.Status.Strategy.Priority == nil { + m.Status.Strategy.Priority = &PriorityStrategyStatus{} + } + if keys == nil { + keys = []string{} + } + if !reflect.DeepEqual(m.Status.Strategy.Priority.RoutingKeysReachable, keys) { + m.Status.Strategy.Priority.RoutingKeysReachable = keys + return true + } + return false +} + +func (m *MultiKeyListener) GetRoutingKeys() []string { + if m.Spec.Strategy.Priority != nil { + return m.Spec.Strategy.Priority.RoutingKeys + } + return nil +} From 3e28caf3628d969575d091c6f6edf9e9dc688e5c Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Thu, 5 Feb 2026 09:54:23 -0800 Subject: [PATCH 08/13] Make connector address and multiAddressStrategy mututally exclusive Signed-off-by: Christian Kruse --- internal/site/multikeylistener.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/site/multikeylistener.go b/internal/site/multikeylistener.go index a4773393a..adf2ba12e 100644 --- a/internal/site/multikeylistener.go +++ b/internal/site/multikeylistener.go @@ -7,8 +7,6 @@ import ( skupperv2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1" ) -const unusedAddressSentinel = "_unused_" - // multiAddressTcpListenerName returns the tcpListener name for a MultiKeyListener. // The format "multiAddress/" avoids collisions with regular Listener resources. func multiAddressTcpListenerName(name string) string { @@ -35,7 +33,6 @@ func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *sk config.AddTcpListener(qdr.TcpEndpoint{ Name: tcpListenerName, - Address: unusedAddressSentinel, // TODO (ck) this address is unused but required for now SiteId: siteId, Host: host, Port: strconv.Itoa(port), From fff75b8c2077fcbde087c99253de8a10f03e48ec Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Mon, 9 Feb 2026 08:51:07 -0800 Subject: [PATCH 09/13] Implement RequireClientCert Signed-off-by: Christian Kruse --- internal/qdr/amqp_mgmt.go | 1 + internal/qdr/qdr.go | 36 ++++++++++++++++++++++++++++++- internal/site/multikeylistener.go | 1 + 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/internal/qdr/amqp_mgmt.go b/internal/qdr/amqp_mgmt.go index d53a151e7..f8778b356 100644 --- a/internal/qdr/amqp_mgmt.go +++ b/internal/qdr/amqp_mgmt.go @@ -135,6 +135,7 @@ func asTcpEndpoint(record Record) TcpEndpoint { Observer: record.AsString("observer"), ProcessID: record.AsString("processId"), MultiAddressStrategy: record.AsString("multiAddressStrategy"), + AuthenticatePeer: record.AsBool("authenticatePeer"), } if value, ok := record["verifyHostname"]; ok { if verify, ok := value.(bool); ok { diff --git a/internal/qdr/qdr.go b/internal/qdr/qdr.go index 013cb270e..4fdf125b6 100644 --- a/internal/qdr/qdr.go +++ b/internal/qdr/qdr.go @@ -590,6 +590,7 @@ type TcpEndpoint struct { VerifyHostname *bool `json:"verifyHostname,omitempty"` ProcessID string `json:"processId,omitempty"` MultiAddressStrategy string `json:"multiAddressStrategy,omitempty"` + AuthenticatePeer bool `json:"authenticatePeer,omitempty"` } type ListenerAddress struct { @@ -646,6 +647,9 @@ func (e TcpEndpoint) toRecord() Record { if e.MultiAddressStrategy != "" { result["multiAddressStrategy"] = e.MultiAddressStrategy } + if e.AuthenticatePeer { + result["authenticatePeer"] = true + } return result } @@ -1037,7 +1041,7 @@ func (a TcpEndpoint) Equivalent(b TcpEndpoint) bool { } if !equivalentHost(a.Host, b.Host) || a.Port != b.Port || a.Address != b.Address || a.SiteId != b.SiteId || a.ProcessID != b.ProcessID || !a.equivalentVerifyHostname(b) || - obsA != obsB { + obsA != obsB || a.AuthenticatePeer != b.AuthenticatePeer || a.SslProfile != b.SslProfile { return false } return true @@ -1091,6 +1095,36 @@ func (a *BridgeConfig) Difference(b *BridgeConfig) *BridgeConfigDifference { ListenerAddresses: a.ListenerAddresses.Difference(b.ListenerAddresses), } + // When a tcpListener is being replaced (deleted then re-added), the + // router requires all referencing listenerAddresses to be deleted + // before the tcpListener can be deleted. Ensure those dependent + // listenerAddresses are included in the diff even if they themselves + // haven't changed. + deletedListeners := make(map[string]bool, len(result.TcpListeners.Deleted)) + for _, name := range result.TcpListeners.Deleted { + deletedListeners[name] = true + } + if len(deletedListeners) > 0 { + alreadyDeletedLA := make(map[string]bool, len(result.ListenerAddresses.Deleted)) + for _, name := range result.ListenerAddresses.Deleted { + alreadyDeletedLA[name] = true + } + alreadyAddedLA := make(map[string]bool, len(result.ListenerAddresses.Added)) + for _, la := range result.ListenerAddresses.Added { + alreadyAddedLA[la.Name] = true + } + for _, la := range a.ListenerAddresses { + if deletedListeners[la.ListenerRef] && !alreadyDeletedLA[la.Name] { + result.ListenerAddresses.Deleted = append(result.ListenerAddresses.Deleted, la.Name) + } + } + for _, la := range b.ListenerAddresses { + if deletedListeners[la.ListenerRef] && !alreadyAddedLA[la.Name] { + result.ListenerAddresses.Added = append(result.ListenerAddresses.Added, la) + } + } + } + result.AddedSslProfiles, result.DeletedSSlProfiles = getSslProfilesDifference(a, b) return &result diff --git a/internal/site/multikeylistener.go b/internal/site/multikeylistener.go index adf2ba12e..233ab047d 100644 --- a/internal/site/multikeylistener.go +++ b/internal/site/multikeylistener.go @@ -38,6 +38,7 @@ func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *sk Port: strconv.Itoa(port), SslProfile: mkl.Spec.TlsCredentials, MultiAddressStrategy: "priorityFailover", + AuthenticatePeer: mkl.Spec.RequireClientCert, }) // Create listenerAddress entities for each routing key in the strategy From 3e0b4f8005d812c58112518335a7e543ad4f0342 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Sat, 14 Feb 2026 20:35:23 -0800 Subject: [PATCH 10/13] gracefully handle multikeylistener not installed Signed-off-by: Christian Kruse --- internal/kube/client/fake/fake_client.go | 13 ++++ internal/kube/controller/controller.go | 20 +++--- internal/kube/controller/controller_test.go | 67 +++++++++++++++++++++ internal/kube/resource/definitions.go | 8 +++ internal/kube/watchers/watchers.go | 8 +++ 5 files changed, 107 insertions(+), 9 deletions(-) diff --git a/internal/kube/client/fake/fake_client.go b/internal/kube/client/fake/fake_client.go index ed3462bda..ef2e3808f 100644 --- a/internal/kube/client/fake/fake_client.go +++ b/internal/kube/client/fake/fake_client.go @@ -128,5 +128,18 @@ func fakedApiResources() []*metav1.APIResourceList { }, }, }, + { + GroupVersion: "skupper.io/v2alpha1", + APIResources: []metav1.APIResource{ + { + Name: "multikeylisteners", + SingularName: "multikeylistener", + Namespaced: true, + Group: "skupper.io", + Version: "v2alpha1", + Kind: "MultiKeyListener", + }, + }, + }, } } diff --git a/internal/kube/controller/controller.go b/internal/kube/controller/controller.go index cc75816ca..3fb698d5f 100644 --- a/internal/kube/controller/controller.go +++ b/internal/kube/controller/controller.go @@ -291,16 +291,18 @@ func (c *Controller) init(stopCh <-chan struct{}) error { _, svcExists := c.observedServices[listener.ObjectMeta.Namespace+"/"+listener.Spec.Host] site.CheckListener(listener.ObjectMeta.Name, listener, svcExists) } - for _, mkl := range c.multiKeyListenerWatcher.List() { - if !c.namespaces.isControlled(mkl.Namespace) { - continue + if c.multiKeyListenerWatcher != nil { + for _, mkl := range c.multiKeyListenerWatcher.List() { + if !c.namespaces.isControlled(mkl.Namespace) { + continue + } + site := c.getSite(mkl.ObjectMeta.Namespace) + c.log.Info("Recovering multikeylistener", + slog.String("namespace", mkl.Namespace), + slog.String("name", mkl.Name), + ) + site.CheckMultiKeyListener(mkl.ObjectMeta.Name, mkl) } - site := c.getSite(mkl.ObjectMeta.Namespace) - c.log.Info("Recovering multikeylistener", - slog.String("namespace", mkl.Namespace), - slog.String("name", mkl.Name), - ) - site.CheckMultiKeyListener(mkl.ObjectMeta.Name, mkl) } for _, la := range c.linkAccessWatcher.List() { if !c.namespaces.isControlled(la.Namespace) { diff --git a/internal/kube/controller/controller_test.go b/internal/kube/controller/controller_test.go index 34f22c2a7..7571d50bc 100644 --- a/internal/kube/controller/controller_test.go +++ b/internal/kube/controller/controller_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" + discoveryfake "k8s.io/client-go/discovery/fake" "k8s.io/client-go/dynamic" fakedynamic "k8s.io/client-go/dynamic/fake" k8stesting "k8s.io/client-go/testing" @@ -1981,3 +1982,69 @@ func deleteTargetPod(name string, namespace string) WaitFunction { return true } } + +func TestMultiKeyListenerCRDNotInstalled(t *testing.T) { + flags := &flag.FlagSet{} + config, err := BoundConfig(flags) + assert.Assert(t, err) + + clients, err := fakeclient.NewFakeClient(config.Namespace, nil, []runtime.Object{ + f.site("mysite", "test", "", false, false), + f.listener("mylistener", "test", "mysvc", 8080), + }, "") + assert.Assert(t, err) + enableSSA(clients.GetDynamicClient()) + + // Remove MultiKeyListener from the fake discovery client to simulate + // the CRD not being installed. + if fd, ok := clients.Discovery.(*discoveryfake.FakeDiscovery); ok { + var filtered []*metav1.APIResourceList + for _, rl := range fd.Resources { + if rl.GroupVersion == "skupper.io/v2alpha1" { + var apis []metav1.APIResource + for _, r := range rl.APIResources { + if r.Name != "multikeylisteners" { + apis = append(apis, r) + } + } + if len(apis) > 0 { + filtered = append(filtered, &metav1.APIResourceList{ + GroupVersion: rl.GroupVersion, + APIResources: apis, + }) + } + } else { + filtered = append(filtered, rl) + } + } + fd.Resources = filtered + } + + controller, err := NewController(clients, config) + assert.Assert(t, err) + assert.Assert(t, controller.multiKeyListenerWatcher == nil) + + stopCh := make(chan struct{}) + defer close(stopCh) + err = controller.init(stopCh) + assert.Assert(t, err) + + // Process events for the skupper objects (site + listener). + for i := 0; i < 2; i++ { + controller.eventProcessor.TestProcess() + } + + // Verify the site was configured successfully. + site, err := clients.GetSkupperClient().SkupperV2alpha1().Sites("test").Get(context.Background(), "mysite", metav1.GetOptions{}) + assert.Assert(t, err) + configured := meta.FindStatusCondition(site.Status.Conditions, skupperv2alpha1.CONDITION_TYPE_CONFIGURED) + assert.Assert(t, configured != nil) + assert.Equal(t, configured.Status, metav1.ConditionTrue) + + // Verify the listener was configured successfully. + listener, err := clients.GetSkupperClient().SkupperV2alpha1().Listeners("test").Get(context.Background(), "mylistener", metav1.GetOptions{}) + assert.Assert(t, err) + listenerConfigured := meta.FindStatusCondition(listener.Status.Conditions, skupperv2alpha1.CONDITION_TYPE_CONFIGURED) + assert.Assert(t, listenerConfigured != nil) + assert.Equal(t, listenerConfigured.Status, metav1.ConditionTrue) +} diff --git a/internal/kube/resource/definitions.go b/internal/kube/resource/definitions.go index 22a77b17d..7e3625114 100644 --- a/internal/kube/resource/definitions.go +++ b/internal/kube/resource/definitions.go @@ -29,6 +29,14 @@ func TlsRouteResource() schema.GroupVersionResource { } } +func MultiKeyListenerResource() schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: "skupper.io", + Version: "v2alpha1", + Resource: "multikeylisteners", + } +} + func DeploymentResource() schema.GroupVersionResource { return schema.GroupVersionResource{ Group: "apps", diff --git a/internal/kube/watchers/watchers.go b/internal/kube/watchers/watchers.go index 95341763a..76320b416 100644 --- a/internal/kube/watchers/watchers.go +++ b/internal/kube/watchers/watchers.go @@ -154,6 +154,10 @@ func (c *EventProcessor) HasTlsRoute() bool { return resource.IsResourceAvailable(c.discoveryClient, resource.TlsRouteResource()) } +func (c *EventProcessor) HasMultiKeyListener() bool { + return resource.IsResourceAvailable(c.discoveryClient, resource.MultiKeyListenerResource()) +} + func (c *EventProcessor) GetRouteInterface() openshiftroute.Interface { return c.routeClient } @@ -464,6 +468,10 @@ func (c *EventProcessor) WatchListeners(namespace string, handler ListenerHandle } func (c *EventProcessor) WatchMultiKeyListeners(namespace string, handler MultiKeyListenerHandler) *MultiKeyListenerWatcher { + if !c.HasMultiKeyListener() { + c.logger.Warn("Cannot watch MultiKeyListeners; resource not installed") + return nil + } informer := skupperv2alpha1informer.NewMultiKeyListenerInformer( c.skupperClient, namespace, From 281ef95494c71f04da6b4168bc2a16e66525ed2a Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Mon, 16 Feb 2026 14:33:13 -0800 Subject: [PATCH 11/13] Implement MultiKeyListeners for system sites - Add MultiKeyListener to inputs - Wire up MultiKeyListener to SiteState for router config and status updates - Extend system apply and system delete commands for MultiKeyListener resources. Signed-off-by: Christian Kruse --- internal/cmd/skupper/common/constants.go | 21 ++-- .../skupper/system/nonkube/system_apply.go | 46 ++++--- .../skupper/system/nonkube/system_delete.go | 48 +++++--- internal/nonkube/client/fs/input_parser.go | 26 ++-- .../client/fs/multikeylistener_handler.go | 112 ++++++++++++++++++ .../nonkube/common/fs_config_renderer_test.go | 22 +++- internal/nonkube/common/site_state_loader.go | 5 + .../nonkube/common/site_state_loader_test.go | 5 + .../common/site_state_renderer_common.go | 3 + .../common/site_state_renderer_common_test.go | 2 + .../nonkube/common/site_state_validator.go | 40 +++++++ .../common/site_state_validator_test.go | 61 ++++++++++ pkg/nonkube/api/site_state.go | 85 +++++++++---- 13 files changed, 396 insertions(+), 80 deletions(-) create mode 100644 internal/nonkube/client/fs/multikeylistener_handler.go diff --git a/internal/cmd/skupper/common/constants.go b/internal/cmd/skupper/common/constants.go index 95cfe3945..66cec1bb2 100644 --- a/internal/cmd/skupper/common/constants.go +++ b/internal/cmd/skupper/common/constants.go @@ -11,16 +11,17 @@ var ( ) const ( - Connectors string = "Connector" - Listeners string = "Listener" - Sites string = "Site" - RouterAccesses string = "RouterAccess" - Links string = "Link" - AccessTokens string = "AccessToken" - Secrets string = "Secret" - ConfigMaps string = "ConfigMap" - Certificates string = "Certificate" - SecuredAccesses string = "SecuredAccess" + Connectors string = "Connector" + Listeners string = "Listener" + Sites string = "Site" + RouterAccesses string = "RouterAccess" + Links string = "Link" + AccessTokens string = "AccessToken" + Secrets string = "Secret" + ConfigMaps string = "ConfigMap" + Certificates string = "Certificate" + SecuredAccesses string = "SecuredAccess" + MultiKeyListeners string = "MultiKeyListener" ) const ( diff --git a/internal/cmd/skupper/system/nonkube/system_apply.go b/internal/cmd/skupper/system/nonkube/system_apply.go index c0934ec4f..7b6c1fdbc 100644 --- a/internal/cmd/skupper/system/nonkube/system_apply.go +++ b/internal/cmd/skupper/system/nonkube/system_apply.go @@ -17,23 +17,24 @@ import ( ) type CmdSystemApply struct { - Client skupperv2alpha1.SkupperV2alpha1Interface - KubeClient kubernetes.Interface - CobraCmd *cobra.Command - Namespace string - Flags *common.CommandSystemApplyFlags - ParseInput func(namespace string, reader *bufio.Reader, result *fs.InputFileResource) error - siteHandler *fs.SiteHandler - connectorHandler *fs.ConnectorHandler - listenerHandler *fs.ListenerHandler - linkHandler *fs.LinkHandler - routerAccessHandler *fs.RouterAccessHandler - accessTokenHandler *fs.AccessTokenHandler - certificateHandler *fs.CertificateHandler - securedAccessHandler *fs.SecuredAccessHandler - secretHandler *fs.SecretHandler - file string - logger *slog.Logger + Client skupperv2alpha1.SkupperV2alpha1Interface + KubeClient kubernetes.Interface + CobraCmd *cobra.Command + Namespace string + Flags *common.CommandSystemApplyFlags + ParseInput func(namespace string, reader *bufio.Reader, result *fs.InputFileResource) error + siteHandler *fs.SiteHandler + connectorHandler *fs.ConnectorHandler + listenerHandler *fs.ListenerHandler + multiKeyListenerHandler *fs.MultiKeyListenerHandler + linkHandler *fs.LinkHandler + routerAccessHandler *fs.RouterAccessHandler + accessTokenHandler *fs.AccessTokenHandler + certificateHandler *fs.CertificateHandler + securedAccessHandler *fs.SecuredAccessHandler + secretHandler *fs.SecretHandler + file string + logger *slog.Logger } func NewCmdSystemApply() *CmdSystemApply { @@ -52,6 +53,7 @@ func (cmd *CmdSystemApply) NewClient(cobraCommand *cobra.Command, args []string) cmd.connectorHandler = fs.NewConnectorHandler(cmd.Namespace) cmd.listenerHandler = fs.NewListenerHandler(cmd.Namespace) + cmd.multiKeyListenerHandler = fs.NewMultiKeyListenerHandler(cmd.Namespace) cmd.linkHandler = fs.NewLinkHandler(cmd.Namespace) cmd.routerAccessHandler = fs.NewRouterAccessHandler(cmd.Namespace) cmd.accessTokenHandler = fs.NewAccessTokenHandler(cmd.Namespace) @@ -163,6 +165,16 @@ func (cmd *CmdSystemApply) Run() error { } } + for _, multiKeyListener := range parsedInput.MultiKeyListener { + err := cmd.multiKeyListenerHandler.Add(multiKeyListener) + if err != nil { + cmd.logger.Error("Error while adding multi key listener", slog.String("multikeylistener", multiKeyListener.Name), slog.Any("error", err)) + } else { + crApplied = true + fmt.Printf("MultiKeyListener %s added\n", multiKeyListener.Name) + } + } + for _, link := range parsedInput.Link { err := cmd.linkHandler.Add(link) if err != nil { diff --git a/internal/cmd/skupper/system/nonkube/system_delete.go b/internal/cmd/skupper/system/nonkube/system_delete.go index 125c0f47c..a36ad3bdc 100644 --- a/internal/cmd/skupper/system/nonkube/system_delete.go +++ b/internal/cmd/skupper/system/nonkube/system_delete.go @@ -17,23 +17,24 @@ import ( ) type CmdSystemDelete struct { - Client skupperv2alpha1.SkupperV2alpha1Interface - KubeClient kubernetes.Interface - CobraCmd *cobra.Command - Namespace string - Flags *common.CommandSystemDeleteFlags - ParseInput func(namespace string, reader *bufio.Reader, result *fs.InputFileResource) error - siteHandler *fs.SiteHandler - connectorHandler *fs.ConnectorHandler - listenerHandler *fs.ListenerHandler - linkHandler *fs.LinkHandler - routerAccessHandler *fs.RouterAccessHandler - accessTokenHandler *fs.AccessTokenHandler - certificateHandler *fs.CertificateHandler - securedAccessHandler *fs.SecuredAccessHandler - secretHandler *fs.SecretHandler - file string - logger *slog.Logger + Client skupperv2alpha1.SkupperV2alpha1Interface + KubeClient kubernetes.Interface + CobraCmd *cobra.Command + Namespace string + Flags *common.CommandSystemDeleteFlags + ParseInput func(namespace string, reader *bufio.Reader, result *fs.InputFileResource) error + siteHandler *fs.SiteHandler + connectorHandler *fs.ConnectorHandler + listenerHandler *fs.ListenerHandler + multiKeyListenerHandler *fs.MultiKeyListenerHandler + linkHandler *fs.LinkHandler + routerAccessHandler *fs.RouterAccessHandler + accessTokenHandler *fs.AccessTokenHandler + certificateHandler *fs.CertificateHandler + securedAccessHandler *fs.SecuredAccessHandler + secretHandler *fs.SecretHandler + file string + logger *slog.Logger } func NewCmdSystemDelete() *CmdSystemDelete { @@ -52,6 +53,7 @@ func (cmd *CmdSystemDelete) NewClient(cobraCommand *cobra.Command, args []string cmd.connectorHandler = fs.NewConnectorHandler(cmd.Namespace) cmd.listenerHandler = fs.NewListenerHandler(cmd.Namespace) + cmd.multiKeyListenerHandler = fs.NewMultiKeyListenerHandler(cmd.Namespace) cmd.linkHandler = fs.NewLinkHandler(cmd.Namespace) cmd.routerAccessHandler = fs.NewRouterAccessHandler(cmd.Namespace) cmd.accessTokenHandler = fs.NewAccessTokenHandler(cmd.Namespace) @@ -168,6 +170,18 @@ func (cmd *CmdSystemDelete) Run() error { } } + for _, multiKeyListener := range parsedInput.MultiKeyListener { + if multiKeyListener.Name != "" { + err := cmd.multiKeyListenerHandler.Delete(multiKeyListener.Name) + if err != nil { + cmd.logger.Error("Error while deleting multi key listener", slog.String("multikeylistener", multiKeyListener.Name), slog.Any("error", err)) + } else { + crDeleted = true + fmt.Printf("MultiKeyListener %s deleted\n", multiKeyListener.Name) + } + } + } + for _, link := range parsedInput.Link { if link.Name != "" { err := cmd.linkHandler.Delete(link.Name) diff --git a/internal/nonkube/client/fs/input_parser.go b/internal/nonkube/client/fs/input_parser.go index bb20d1bb6..206e854ee 100644 --- a/internal/nonkube/client/fs/input_parser.go +++ b/internal/nonkube/client/fs/input_parser.go @@ -16,16 +16,17 @@ import ( ) type InputFileResource struct { - Site []v2alpha1.Site - Listener []v2alpha1.Listener - Connector []v2alpha1.Connector - RouterAccess []v2alpha1.RouterAccess - AccessGrant []v2alpha1.AccessGrant - Link []v2alpha1.Link - AccessToken []v2alpha1.AccessToken - Certificate []v2alpha1.Certificate - SecuredAccess []v2alpha1.SecuredAccess - Secret []corev1.Secret + Site []v2alpha1.Site + Listener []v2alpha1.Listener + Connector []v2alpha1.Connector + RouterAccess []v2alpha1.RouterAccess + AccessGrant []v2alpha1.AccessGrant + Link []v2alpha1.Link + AccessToken []v2alpha1.AccessToken + Certificate []v2alpha1.Certificate + SecuredAccess []v2alpha1.SecuredAccess + MultiKeyListener []v2alpha1.MultiKeyListener + Secret []corev1.Secret } func ParseInput(namespace string, reader *bufio.Reader, result *InputFileResource) error { @@ -98,6 +99,11 @@ func ParseInput(namespace string, reader *bufio.Reader, result *InputFileResourc convertTo(obj, &securedAccess) securedAccess.Namespace = namespace result.SecuredAccess = append(result.SecuredAccess, securedAccess) + case "MultiKeyListener": + var multiKeyListener v2alpha1.MultiKeyListener + convertTo(obj, &multiKeyListener) + multiKeyListener.Namespace = namespace + result.MultiKeyListener = append(result.MultiKeyListener, multiKeyListener) default: logInvalidResource(gvk) } diff --git a/internal/nonkube/client/fs/multikeylistener_handler.go b/internal/nonkube/client/fs/multikeylistener_handler.go new file mode 100644 index 000000000..5c77c7025 --- /dev/null +++ b/internal/nonkube/client/fs/multikeylistener_handler.go @@ -0,0 +1,112 @@ +package fs + +import ( + "os" + + "github.com/skupperproject/skupper/internal/cmd/skupper/common" + "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1" +) + +type MultiKeyListenerHandler struct { + BaseCustomResourceHandler + pathProvider PathProvider +} + +func NewMultiKeyListenerHandler(namespace string) *MultiKeyListenerHandler { + return &MultiKeyListenerHandler{ + pathProvider: PathProvider{ + Namespace: namespace, + }, + } +} + +func (s *MultiKeyListenerHandler) Add(resource v2alpha1.MultiKeyListener) error { + + fileName := resource.Name + ".yaml" + content, err := s.EncodeToYaml(resource) + if err != nil { + return err + } + + err = s.WriteFile(s.pathProvider.GetNamespace(), fileName, content, common.MultiKeyListeners) + if err != nil { + return err + } + + return nil +} + +func (s *MultiKeyListenerHandler) Get(name string, opts GetOptions) (*v2alpha1.MultiKeyListener, error) { + var context v2alpha1.MultiKeyListener + fileName := name + ".yaml" + + if opts.RuntimeFirst == true { + // First read from runtime directory, where output is found after bootstrap + // has run. If no runtime multikeylisteners try and display configured ones + err, file := s.ReadFile(s.pathProvider.GetRuntimeNamespace(), fileName, common.MultiKeyListeners) + if err != nil { + if opts.LogWarning { + os.Stderr.WriteString("Site not initialized yet\n") + } + err, file = s.ReadFile(s.pathProvider.GetNamespace(), fileName, common.MultiKeyListeners) + if err != nil { + return nil, err + } + } + + if err = s.DecodeYaml(file, &context); err != nil { + return nil, err + } + } else { + // read from input directory to get latest config + err, file := s.ReadFile(s.pathProvider.GetNamespace(), fileName, common.MultiKeyListeners) + if err != nil { + return nil, err + } + if err := s.DecodeYaml(file, &context); err != nil { + return nil, err + } + } + + return &context, nil +} + +func (s *MultiKeyListenerHandler) Delete(name string) error { + fileName := name + ".yaml" + + if err := s.DeleteFile(s.pathProvider.GetNamespace(), fileName, common.MultiKeyListeners); err != nil { + return err + } + + return nil +} + +func (s *MultiKeyListenerHandler) List() ([]*v2alpha1.MultiKeyListener, error) { + var multiKeyListeners []*v2alpha1.MultiKeyListener + + // First read from runtime directory, where output is found after bootstrap + // has run. If no runtime multikeylisteners try and display configured ones + path := s.pathProvider.GetRuntimeNamespace() + err, files := s.ReadDir(path, common.MultiKeyListeners) + if err != nil { + os.Stderr.WriteString("Site not initialized yet\n") + path = s.pathProvider.GetNamespace() + err, files = s.ReadDir(path, common.MultiKeyListeners) + if err != nil { + return nil, err + } + } + + for _, file := range files { + err, mkl := s.ReadFile(path, file.Name(), common.MultiKeyListeners) + if err != nil { + return nil, err + } + var context v2alpha1.MultiKeyListener + if err = s.DecodeYaml(mkl, &context); err != nil { + return nil, err + } + multiKeyListeners = append(multiKeyListeners, &context) + } + return multiKeyListeners, nil +} diff --git a/internal/nonkube/common/fs_config_renderer_test.go b/internal/nonkube/common/fs_config_renderer_test.go index fadf18ed2..a5da25021 100644 --- a/internal/nonkube/common/fs_config_renderer_test.go +++ b/internal/nonkube/common/fs_config_renderer_test.go @@ -295,6 +295,26 @@ func fakeSiteState() *api.SiteState { Claims: make(map[string]*v2alpha1.AccessToken), Certificates: make(map[string]*v2alpha1.Certificate), SecuredAccesses: make(map[string]*v2alpha1.SecuredAccess), - ConfigMaps: make(map[string]*corev1.ConfigMap), + MultiKeyListeners: map[string]*v2alpha1.MultiKeyListener{ + "mkl-one": { + TypeMeta: metav1.TypeMeta{ + Kind: "MultiKeyListener", + APIVersion: "skupper.io/v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "mkl-one", + }, + Spec: v2alpha1.MultiKeyListenerSpec{ + Host: "10.0.0.3", + Port: 5678, + Strategy: v2alpha1.MultiKeyListenerStrategy{ + Priority: &v2alpha1.PriorityStrategySpec{ + RoutingKeys: []string{"key-primary", "key-secondary"}, + }, + }, + }, + }, + }, + ConfigMaps: make(map[string]*corev1.ConfigMap), } } diff --git a/internal/nonkube/common/site_state_loader.go b/internal/nonkube/common/site_state_loader.go index 6073b4565..f805fe559 100644 --- a/internal/nonkube/common/site_state_loader.go +++ b/internal/nonkube/common/site_state_loader.go @@ -85,6 +85,7 @@ func GetNamespacesFound(s *api.SiteState) []string { addNamespacesFromMap(s.Claims, nsMap) addNamespacesFromMap(s.Certificates, nsMap) addNamespacesFromMap(s.SecuredAccesses, nsMap) + addNamespacesFromMap(s.MultiKeyListeners, nsMap) addNamespacesFromMap(s.ConfigMaps, nsMap) for ns := range nsMap { namespaces = append(namespaces, ns) @@ -154,6 +155,10 @@ func LoadIntoSiteState(reader *bufio.Reader, siteState *api.SiteState) error { var securedAccess v2alpha1.SecuredAccess runtime.DefaultUnstructuredConverter.FromUnstructured(obj.(runtime.Unstructured).UnstructuredContent(), &securedAccess) siteState.SecuredAccesses[securedAccess.Name] = &securedAccess + case "MultiKeyListener": + var mkl v2alpha1.MultiKeyListener + runtime.DefaultUnstructuredConverter.FromUnstructured(obj.(runtime.Unstructured).UnstructuredContent(), &mkl) + siteState.MultiKeyListeners[mkl.Name] = &mkl default: logInvalidResource(gvk) } diff --git a/internal/nonkube/common/site_state_loader_test.go b/internal/nonkube/common/site_state_loader_test.go index 3c20afb90..b221cbf4d 100644 --- a/internal/nonkube/common/site_state_loader_test.go +++ b/internal/nonkube/common/site_state_loader_test.go @@ -28,4 +28,9 @@ func TestFileSystemSiteStateLoder(t *testing.T) { assert.Assert(t, err) assert.Assert(t, loadedSiteState != nil) assert.Equal(t, string(loadedSiteState.Site.ObjectMeta.UID), loadedSiteState.SiteId) + assert.Equal(t, len(loadedSiteState.MultiKeyListeners), len(ss.MultiKeyListeners)) + for name := range ss.MultiKeyListeners { + _, ok := loadedSiteState.MultiKeyListeners[name] + assert.Assert(t, ok, "MultiKeyListener %q not found in loaded state", name) + } } diff --git a/internal/nonkube/common/site_state_renderer_common.go b/internal/nonkube/common/site_state_renderer_common.go index 237d0c8d3..039f8cb39 100644 --- a/internal/nonkube/common/site_state_renderer_common.go +++ b/internal/nonkube/common/site_state_renderer_common.go @@ -21,6 +21,7 @@ func CopySiteState(siteState *api.SiteState) *api.SiteState { activeSiteState.Links = copySiteStateMap(siteState.Links) activeSiteState.Grants = copySiteStateMap(siteState.Grants) activeSiteState.SecuredAccesses = copySiteStateMap(siteState.SecuredAccesses) + activeSiteState.MultiKeyListeners = copySiteStateMap(siteState.MultiKeyListeners) activeSiteState.Certificates = copySiteStateMap(siteState.Certificates) activeSiteState.Secrets = copySiteStateMap(siteState.Secrets) activeSiteState.ConfigMaps = copySiteStateMap(siteState.ConfigMaps) @@ -51,6 +52,8 @@ func copySiteStateMap[T any](m map[string]T) map[string]T { c = vv.DeepCopy() case *v2alpha1.SecuredAccess: c = vv.DeepCopy() + case *v2alpha1.MultiKeyListener: + c = vv.DeepCopy() case *corev1.Secret: c = vv.DeepCopy() } diff --git a/internal/nonkube/common/site_state_renderer_common_test.go b/internal/nonkube/common/site_state_renderer_common_test.go index f2f8a5c52..90ce5cf03 100644 --- a/internal/nonkube/common/site_state_renderer_common_test.go +++ b/internal/nonkube/common/site_state_renderer_common_test.go @@ -21,6 +21,7 @@ func TestCopySiteState(t *testing.T) { assert.DeepEqual(t, ss.Claims, newSs.Claims) assert.DeepEqual(t, ss.Certificates, newSs.Certificates) assert.DeepEqual(t, ss.SecuredAccesses, newSs.SecuredAccesses) + assert.DeepEqual(t, ss.MultiKeyListeners, newSs.MultiKeyListeners) assert.DeepEqual(t, ss.ConfigMaps, newSs.ConfigMaps) assert.Assert(t, equalsButNotShallowCopy(ss.Listeners, newSs.Listeners)) assert.Assert(t, equalsButNotShallowCopy(ss.Connectors, newSs.Connectors)) @@ -31,6 +32,7 @@ func TestCopySiteState(t *testing.T) { assert.Assert(t, equalsButNotShallowCopy(ss.Claims, newSs.Claims)) assert.Assert(t, equalsButNotShallowCopy(ss.Certificates, newSs.Certificates)) assert.Assert(t, equalsButNotShallowCopy(ss.SecuredAccesses, newSs.SecuredAccesses)) + assert.Assert(t, equalsButNotShallowCopy(ss.MultiKeyListeners, newSs.MultiKeyListeners)) assert.Assert(t, equalsButNotShallowCopy(ss.ConfigMaps, newSs.ConfigMaps)) } diff --git a/internal/nonkube/common/site_state_validator.go b/internal/nonkube/common/site_state_validator.go index 64ca34728..c609c0361 100644 --- a/internal/nonkube/common/site_state_validator.go +++ b/internal/nonkube/common/site_state_validator.go @@ -51,6 +51,9 @@ func (s *SiteStateValidator) Validate(siteState *api.SiteState) error { if err = s.validateConnectors(siteState.Connectors); err != nil { return err } + if err = s.validateMultiKeyListeners(siteState.MultiKeyListeners, siteState.Listeners); err != nil { + return err + } return nil } @@ -173,6 +176,43 @@ func (s *SiteStateValidator) validateConnectors(connectors map[string]*v2alpha1. return nil } +func (s *SiteStateValidator) validateMultiKeyListeners(multiKeyListeners map[string]*v2alpha1.MultiKeyListener, listeners map[string]*v2alpha1.Listener) error { + // collect host:port pairs already used by listeners + hostPorts := map[string][]int{} + for _, listener := range listeners { + hostPorts[listener.Spec.Host] = append(hostPorts[listener.Spec.Host], listener.Spec.Port) + } + for name, mkl := range multiKeyListeners { + if err := ValidateName(mkl.Name); err != nil { + return fmt.Errorf("invalid multikeylistener name: %w", err) + } + if mkl.Spec.Host == "" || mkl.Spec.Port == 0 { + return fmt.Errorf("invalid multikeylistener: %s - host and port are required", mkl.Name) + } + ip := net.ParseIP(mkl.Spec.Host) + validHostname := hostnameRfc1123Regex.MatchString(mkl.Spec.Host) + if ip == nil && !validHostname { + return fmt.Errorf("invalid multikeylistener host: %s - a valid IP address or hostname is expected (multikeylistener: %q)", mkl.Spec.Host, name) + } + if slices.Contains(hostPorts[mkl.Spec.Host], mkl.Spec.Port) { + return fmt.Errorf("port %d is already mapped for host %q (multikeylistener: %q)", mkl.Spec.Port, mkl.Spec.Host, name) + } + hostPorts[mkl.Spec.Host] = append(hostPorts[mkl.Spec.Host], mkl.Spec.Port) + if mkl.Spec.Strategy.Priority == nil { + return fmt.Errorf("invalid multikeylistener: %s - strategy.priority is required", mkl.Name) + } + if len(mkl.Spec.Strategy.Priority.RoutingKeys) == 0 { + return fmt.Errorf("invalid multikeylistener: %s - routingKeys must not be empty", mkl.Name) + } + for _, key := range mkl.Spec.Strategy.Priority.RoutingKeys { + if key == "" { + return fmt.Errorf("invalid multikeylistener: %s - routingKey must not be empty", mkl.Name) + } + } + } + return nil +} + func ValidateName(name string) error { if !rfc1123Regex.MatchString(name) { return fmt.Errorf("invalid name %q: %s", name, rfc1123Error) diff --git a/internal/nonkube/common/site_state_validator_test.go b/internal/nonkube/common/site_state_validator_test.go index edff96a34..3be0f9be1 100644 --- a/internal/nonkube/common/site_state_validator_test.go +++ b/internal/nonkube/common/site_state_validator_test.go @@ -233,6 +233,67 @@ func TestSiteStateValidator_Validate(t *testing.T) { valid: false, errorContains: "invalid grant name: ", }, + { + info: "invalid-multikeylistener-name", + siteState: customize(func(siteState *api.SiteState) { + for _, mkl := range siteState.MultiKeyListeners { + mkl.Name = "bad_name" + } + }), + valid: false, + errorContains: "invalid multikeylistener name:", + }, + { + info: "invalid-multikeylistener-missing-host", + siteState: customize(func(siteState *api.SiteState) { + for _, mkl := range siteState.MultiKeyListeners { + mkl.Spec.Host = "" + } + }), + valid: false, + errorContains: "host and port are required", + }, + { + info: "invalid-multikeylistener-missing-port", + siteState: customize(func(siteState *api.SiteState) { + for _, mkl := range siteState.MultiKeyListeners { + mkl.Spec.Port = 0 + } + }), + valid: false, + errorContains: "host and port are required", + }, + { + info: "invalid-multikeylistener-missing-strategy", + siteState: customize(func(siteState *api.SiteState) { + for _, mkl := range siteState.MultiKeyListeners { + mkl.Spec.Strategy.Priority = nil + } + }), + valid: false, + errorContains: "strategy.priority is required", + }, + { + info: "invalid-multikeylistener-empty-routing-keys", + siteState: customize(func(siteState *api.SiteState) { + for _, mkl := range siteState.MultiKeyListeners { + mkl.Spec.Strategy.Priority.RoutingKeys = []string{} + } + }), + valid: false, + errorContains: "routingKeys must not be empty", + }, + { + info: "invalid-multikeylistener-port-conflict-with-listener", + siteState: customize(func(siteState *api.SiteState) { + for _, mkl := range siteState.MultiKeyListeners { + mkl.Spec.Host = "10.0.0.1" + mkl.Spec.Port = 1234 + } + }), + valid: false, + errorContains: "is already mapped for host", + }, { info: "valid-site-state", siteState: fakeSiteState(), diff --git a/pkg/nonkube/api/site_state.go b/pkg/nonkube/api/site_state.go index 93434c717..9bd929690 100644 --- a/pkg/nonkube/api/site_state.go +++ b/pkg/nonkube/api/site_state.go @@ -28,35 +28,37 @@ type StaticSiteStateRenderer interface { } type SiteState struct { - SiteId string - Site *v2alpha1.Site - Listeners map[string]*v2alpha1.Listener - Connectors map[string]*v2alpha1.Connector - RouterAccesses map[string]*v2alpha1.RouterAccess - Grants map[string]*v2alpha1.AccessGrant - Links map[string]*v2alpha1.Link - Claims map[string]*v2alpha1.AccessToken - Certificates map[string]*v2alpha1.Certificate - SecuredAccesses map[string]*v2alpha1.SecuredAccess - Secrets map[string]*corev1.Secret - ConfigMaps map[string]*corev1.ConfigMap - bundle bool + SiteId string + Site *v2alpha1.Site + Listeners map[string]*v2alpha1.Listener + Connectors map[string]*v2alpha1.Connector + RouterAccesses map[string]*v2alpha1.RouterAccess + Grants map[string]*v2alpha1.AccessGrant + Links map[string]*v2alpha1.Link + Claims map[string]*v2alpha1.AccessToken + Certificates map[string]*v2alpha1.Certificate + SecuredAccesses map[string]*v2alpha1.SecuredAccess + MultiKeyListeners map[string]*v2alpha1.MultiKeyListener + Secrets map[string]*corev1.Secret + ConfigMaps map[string]*corev1.ConfigMap + bundle bool } func NewSiteState(bundle bool) *SiteState { return &SiteState{ - Site: &v2alpha1.Site{}, - Listeners: make(map[string]*v2alpha1.Listener), - Connectors: make(map[string]*v2alpha1.Connector), - RouterAccesses: make(map[string]*v2alpha1.RouterAccess), - Grants: make(map[string]*v2alpha1.AccessGrant), - Links: make(map[string]*v2alpha1.Link), - Claims: make(map[string]*v2alpha1.AccessToken), - Certificates: make(map[string]*v2alpha1.Certificate), - SecuredAccesses: make(map[string]*v2alpha1.SecuredAccess), - Secrets: make(map[string]*corev1.Secret), - ConfigMaps: make(map[string]*corev1.ConfigMap), - bundle: bundle, + Site: &v2alpha1.Site{}, + Listeners: make(map[string]*v2alpha1.Listener), + Connectors: make(map[string]*v2alpha1.Connector), + RouterAccesses: make(map[string]*v2alpha1.RouterAccess), + Grants: make(map[string]*v2alpha1.AccessGrant), + Links: make(map[string]*v2alpha1.Link), + Claims: make(map[string]*v2alpha1.AccessToken), + Certificates: make(map[string]*v2alpha1.Certificate), + SecuredAccesses: make(map[string]*v2alpha1.SecuredAccess), + MultiKeyListeners: make(map[string]*v2alpha1.MultiKeyListener), + Secrets: make(map[string]*corev1.Secret), + ConfigMaps: make(map[string]*corev1.ConfigMap), + bundle: bundle, } } @@ -224,6 +226,16 @@ func (s *SiteState) CreateBridgeCertificates() { }) } } + for _, mkl := range s.MultiKeyListeners { + if mkl.Spec.TlsCredentials != "" { + s.Certificates[mkl.Spec.TlsCredentials] = s.newCertificate(mkl.Spec.TlsCredentials, &v2alpha1.CertificateSpec{ + Ca: caName, + Subject: mkl.Spec.Host, + Hosts: []string{mkl.Spec.Host}, + Server: true, + }) + } + } } func (s *SiteState) newCertificate(name string, spec *v2alpha1.CertificateSpec) *v2alpha1.Certificate { @@ -273,6 +285,10 @@ func (s *SiteState) bindings(sslProfileBasePath string) *site.Bindings { listener.SetConfigured(nil) _ = b.UpdateListener(name, listener) } + for name, mkl := range s.MultiKeyListeners { + mkl.SetConfigured(nil) + b.UpdateMultiKeyListener(name, mkl) + } return b } @@ -348,6 +364,7 @@ func (s *SiteState) SetNamespace(namespace string) { setNamespaceOnMap(s.Claims, namespace) setNamespaceOnMap(s.Certificates, namespace) setNamespaceOnMap(s.SecuredAccesses, namespace) + setNamespaceOnMap(s.MultiKeyListeners, namespace) setNamespaceOnMap(s.ConfigMaps, namespace) } @@ -381,6 +398,21 @@ func (s *SiteState) UpdateStatus(networkStatus network.NetworkStatusInfo) { for _, connector := range s.Connectors { connector.SetHasMatchingListener(network.HasMatchingPair(networkStatus, connector.Spec.RoutingKey)) } + for _, mkl := range s.MultiKeyListeners { + hasDestination := false + var reachableKeys []string + for _, routingKey := range mkl.GetRoutingKeys() { + for _, addr := range networkStatus.Addresses { + if addr.Name == routingKey && addr.ConnectorCount > 0 { + hasDestination = true + reachableKeys = append(reachableKeys, routingKey) + break + } + } + } + mkl.SetHasDestination(hasDestination) + mkl.SetRoutingKeysReachable(reachableKeys) + } } func marshal(outputDirectory, resourceType, resourceName string, resource interface{}) error { @@ -445,6 +477,9 @@ func MarshalSiteState(siteState SiteState, outputDirectory string) error { if err = marshalMap(outputDirectory, "SecuredAccess", siteState.SecuredAccesses); err != nil { return err } + if err = marshalMap(outputDirectory, "MultiKeyListener", siteState.MultiKeyListeners); err != nil { + return err + } if err = marshalMap(outputDirectory, "Secret", siteState.Secrets); err != nil { return err } From 35158fde1961742a52fda63cbd7e595b19358427 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Tue, 17 Feb 2026 20:06:51 -0800 Subject: [PATCH 12/13] Include sample MultiKeyListener Adds a sample for operator bundles Signed-off-by: Christian Kruse --- config/samples/kustomization.yaml | 1 + .../samples/skupper_v2alpha1_multikeylistener.yaml | 12 ++++++++++++ 2 files changed, 13 insertions(+) create mode 100644 config/samples/skupper_v2alpha1_multikeylistener.yaml diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index 7fab14bf9..68f246dc2 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -8,6 +8,7 @@ resources: - skupper_v2alpha1_connector.yaml - skupper_v2alpha1_link.yaml - skupper_v2alpha1_listener.yaml +- skupper_v2alpha1_multikeylistener.yaml - skupper_v2alpha1_router_access.yaml - skupper_v2alpha1_secured_access.yaml - skupper_v2alpha1_site.yaml diff --git a/config/samples/skupper_v2alpha1_multikeylistener.yaml b/config/samples/skupper_v2alpha1_multikeylistener.yaml new file mode 100644 index 000000000..9d1588ff2 --- /dev/null +++ b/config/samples/skupper_v2alpha1_multikeylistener.yaml @@ -0,0 +1,12 @@ +apiVersion: skupper.io/v2alpha1 +kind: MultiKeyListener +metadata: + name: backend +spec: + host: backend + port: 8080 + strategy: + priority: + routingKeys: + - backend-primary + - backend-secondary From 971f20be4956c8385ac2f5567b408fd581f90438 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 18 Feb 2026 08:52:05 -0800 Subject: [PATCH 13/13] Rename router configuration options multiAddressStrategy=priority instead of priorityFailover listenerAddress.listener instead of listenerRef Signed-off-by: Christian Kruse --- internal/qdr/amqp_mgmt.go | 8 ++++---- internal/qdr/qdr.go | 16 ++++++++-------- internal/site/multikeylistener.go | 12 ++++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/internal/qdr/amqp_mgmt.go b/internal/qdr/amqp_mgmt.go index f8778b356..cb1bad100 100644 --- a/internal/qdr/amqp_mgmt.go +++ b/internal/qdr/amqp_mgmt.go @@ -147,10 +147,10 @@ func asTcpEndpoint(record Record) TcpEndpoint { func asListenerAddress(record Record) ListenerAddress { return ListenerAddress{ - Name: record.AsString("name"), - Address: record.AsString("address"), - Value: record.AsInt("value"), - ListenerRef: record.AsString("listenerRef"), + Name: record.AsString("name"), + Address: record.AsString("address"), + Value: record.AsInt("value"), + Listener: record.AsString("listener"), } } diff --git a/internal/qdr/qdr.go b/internal/qdr/qdr.go index 4fdf125b6..2ab5ec0dd 100644 --- a/internal/qdr/qdr.go +++ b/internal/qdr/qdr.go @@ -594,10 +594,10 @@ type TcpEndpoint struct { } type ListenerAddress struct { - Name string `json:"name,omitempty"` - Address string `json:"address,omitempty"` - Value int `json:"value"` - ListenerRef string `json:"listenerRef,omitempty"` + Name string `json:"name,omitempty"` + Address string `json:"address,omitempty"` + Value int `json:"value"` + Listener string `json:"listener,omitempty"` } func (e ListenerAddress) toRecord() Record { @@ -609,8 +609,8 @@ func (e ListenerAddress) toRecord() Record { result["address"] = e.Address } result["value"] = e.Value - if e.ListenerRef != "" { - result["listenerRef"] = e.ListenerRef + if e.Listener != "" { + result["listener"] = e.Listener } return result } @@ -1114,12 +1114,12 @@ func (a *BridgeConfig) Difference(b *BridgeConfig) *BridgeConfigDifference { alreadyAddedLA[la.Name] = true } for _, la := range a.ListenerAddresses { - if deletedListeners[la.ListenerRef] && !alreadyDeletedLA[la.Name] { + if deletedListeners[la.Listener] && !alreadyDeletedLA[la.Name] { result.ListenerAddresses.Deleted = append(result.ListenerAddresses.Deleted, la.Name) } } for _, la := range b.ListenerAddresses { - if deletedListeners[la.ListenerRef] && !alreadyAddedLA[la.Name] { + if deletedListeners[la.Listener] && !alreadyAddedLA[la.Name] { result.ListenerAddresses.Added = append(result.ListenerAddresses.Added, la) } } diff --git a/internal/site/multikeylistener.go b/internal/site/multikeylistener.go index 233ab047d..f212c8ea7 100644 --- a/internal/site/multikeylistener.go +++ b/internal/site/multikeylistener.go @@ -37,7 +37,7 @@ func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *sk Host: host, Port: strconv.Itoa(port), SslProfile: mkl.Spec.TlsCredentials, - MultiAddressStrategy: "priorityFailover", + MultiAddressStrategy: "priority", AuthenticatePeer: mkl.Spec.RequireClientCert, }) @@ -47,10 +47,10 @@ func UpdateBridgeConfigForMultiKeyListenerWithHostAndPort(siteId string, mkl *sk for i, routingKey := range mkl.Spec.Strategy.Priority.RoutingKeys { laName := listenerAddressName(name, routingKey) config.AddListenerAddress(qdr.ListenerAddress{ - Name: laName, - Address: routingKey, - Value: numKeys - 1 - i, // higher value = higher priority - ListenerRef: tcpListenerName, + Name: laName, + Address: routingKey, + Value: numKeys - 1 - i, // higher value = higher priority + Listener: tcpListenerName, }) } } @@ -63,7 +63,7 @@ func RemoveBridgeConfigForMultiKeyListener(name string, config *qdr.BridgeConfig tcpListenerName := multiAddressTcpListenerName(name) // Remove all listenerAddresses that reference this listener for laName, la := range config.ListenerAddresses { - if la.ListenerRef == tcpListenerName { + if la.Listener == tcpListenerName { config.RemoveListenerAddress(laName) } }