diff --git a/.ci/clusters/values_skywalking_e2e_cluster.yaml b/.ci/clusters/values_skywalking_e2e_cluster.yaml index 99824113e..08a7632d8 100644 --- a/.ci/clusters/values_skywalking_e2e_cluster.yaml +++ b/.ci/clusters/values_skywalking_e2e_cluster.yaml @@ -31,6 +31,7 @@ components: pulsar_manager: false sql_worker: false proxy: false + kop: true ## disable monitoring stack monitoring: @@ -63,7 +64,7 @@ zookeeper: cpu: 50m bookkeeper: - replicaCount: 0 + replicaCount: 1 metadata: image: repository: streamnative/sn-platform @@ -113,6 +114,14 @@ broker: ## Enable `autoSkipNonRecoverableData` since bookkeeper is running ## without persistence autoSkipNonRecoverableData: "true" + messagingProtocols: "kafka" + protocolHandlerDirectory: "./protocols" + PULSAR_PREFIX_kafkaTransactionCoordinatorEnabled: "true" + PULSAR_PREFIX_brokerDeduplicationEnabled: "true" + PULSAR_PREFIX_kafkaListeners: "PLAINTEXT://0.0.0.0:9092" + PULSAR_PREFIX_kafkaAdvertisedListeners: "PLAINTEXT://sn-platform-pulsar-broker-0.sn-platform-pulsar-broker.default.svc.cluster.local:9092" + transactionCoordinatorEnabled: "true" + systemTopicEnabled: "true" # storage settings managedLedgerDefaultEnsembleSize: "1" managedLedgerDefaultWriteQuorum: "1" diff --git a/.ci/helm.sh b/.ci/helm.sh index 692cb97af..0a635f5de 100644 --- a/.ci/helm.sh +++ b/.ci/helm.sh @@ -348,10 +348,19 @@ function ci::verify_backlog() { topic=$1 sub=$2 expected=$3 - BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics stats $topic | grep msgBacklog) - if [[ "$BACKLOG" == *"\"msgBacklog\" : $expected"* ]]; then - return 0 - fi + for attempt in $(seq 1 30); do + if BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics stats "$topic" 2>&1); then + true + elif BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics stats "${topic}-partition-0" 2>&1); then + true + fi + if [[ "$BACKLOG" == *"\"msgBacklog\" : $expected"* ]]; then + return 0 + fi + echo "Backlog for ${topic} subscription ${sub} is not ${expected}, retry ${attempt}/30" + echo "$BACKLOG" + sleep 2 + done return 1 } @@ -371,6 +380,121 @@ function ci::verify_exclamation_function() { return 1 } +function ci::ensure_kafka_topic() { + topic=$1 + kafka_bootstrap_server=$2 + properties_file=$3 + for attempt in $(seq 1 30); do + if kubectl exec -n ${NAMESPACE} kafka-client -- kafka-topics.sh \ + --bootstrap-server "${kafka_bootstrap_server}" \ + --command-config "/opt/bitnami/kafka/config/${properties_file}" \ + --list > /dev/null 2>&1; then + break + fi + if [ "${attempt}" -eq 30 ]; then + echo "Kafka broker ${kafka_bootstrap_server} is not ready" + kubectl get pods -n ${NAMESPACE} -o wide || true + return 1 + fi + sleep 5 + done + kubectl exec -n ${NAMESPACE} kafka-client -- kafka-topics.sh \ + --bootstrap-server "${kafka_bootstrap_server}" \ + --create \ + --if-not-exists \ + --topic "${topic}" \ + --replication-factor 1 \ + --partitions 1 \ + --command-config "/opt/bitnami/kafka/config/${properties_file}" +} + +function ci::wait_kafka_topic_ready() { + topic=$1 + kafka_bootstrap_server=$2 + properties_file=$3 + for attempt in $(seq 1 30); do + if kubectl exec -n ${NAMESPACE} kafka-client -- kafka-topics.sh \ + --bootstrap-server "${kafka_bootstrap_server}" \ + --describe \ + --topic "${topic}" \ + --command-config "/opt/bitnami/kafka/config/${properties_file}"; then + return 0 + fi + echo "Kafka topic ${topic} is not ready, retry ${attempt}/30" + sleep 5 + done + return 1 +} + +function ci::wait_kafka_group_coordinator_ready() { + consumer_group=$1 + kafka_bootstrap_server=$2 + properties_file=$3 + output_file=$(mktemp) + for attempt in $(seq 1 30); do + if kubectl exec -n ${NAMESPACE} kafka-client -- kafka-consumer-groups.sh \ + --bootstrap-server "${kafka_bootstrap_server}" \ + --describe \ + --group "${consumer_group}" \ + --command-config "/opt/bitnami/kafka/config/${properties_file}" > "${output_file}" 2>&1; then + cat "${output_file}" + rm -f "${output_file}" + return 0 + fi + cat "${output_file}" + if ! grep -q "CoordinatorLoadInProgress" "${output_file}"; then + if grep -Eiq "does not exist|not exist|not found|no active members" "${output_file}"; then + rm -f "${output_file}" + return 0 + fi + rm -f "${output_file}" + return 1 + fi + echo "Kafka consumer group coordinator for ${consumer_group} is loading, retry ${attempt}/30" + sleep 5 + done + rm -f "${output_file}" + return 1 +} + +function ci::verify_kafka_exclamation_function() { + inputtopic=$1 + outputtopic=$2 + inputmessage=$3 + outputmessage=$4 + kafka_bootstrap_server=$5 + properties_file=$6 + consumer_group="function-mesh-${RANDOM}-$(date +%s)" + output_file=$(mktemp) + + kubectl exec -n ${NAMESPACE} kafka-client -- kafka-console-consumer.sh \ + --bootstrap-server "${kafka_bootstrap_server}" \ + --consumer.config "/opt/bitnami/kafka/config/${properties_file}" \ + --topic "${outputtopic}" \ + --group "${consumer_group}" \ + --timeout-ms 30000 \ + --max-messages 1 > "${output_file}" & + consumer_pid=$! + + sleep 3 + kubectl exec -n ${NAMESPACE} kafka-client -- bash -c \ + "echo \"${inputmessage}\" | kafka-console-producer.sh --bootstrap-server ${kafka_bootstrap_server} --producer.config /opt/bitnami/kafka/config/${properties_file} --topic ${inputtopic}" + + if ! wait "${consumer_pid}"; then + cat "${output_file}" || true + rm -f "${output_file}" + return 1 + fi + + MESSAGE=$(cat "${output_file}") + rm -f "${output_file}" + echo "$MESSAGE" + if [[ "$MESSAGE" == *"$outputmessage"* ]]; then + return 0 + fi + return 1 +} + function ci::verify_exclamation_function_with_auth() { inputtopic=$1 outputtopic=$2 diff --git a/.ci/tests/integration/cases/crypto-function/manifests.yaml b/.ci/tests/integration/cases/crypto-function/manifests.yaml index dce1d4209..3c233ed3e 100644 --- a/.ci/tests/integration/cases/crypto-function/manifests.yaml +++ b/.ci/tests/integration/cases/crypto-function/manifests.yaml @@ -16,6 +16,7 @@ spec: typeClassName: java.lang.String sourceSpecs: "persistent://public/default/java-function-crypto-input-topic": + schemaType: STRING cryptoConfig: cryptoKeyReaderClassName: "org.apache.pulsar.functions.api.examples.RawFileKeyReader" cryptoKeyReaderConfig: diff --git a/.ci/tests/integration/cases/crypto-function/verify.sh b/.ci/tests/integration/cases/crypto-function/verify.sh index 9510b2c3d..5e935b40d 100644 --- a/.ci/tests/integration/cases/crypto-function/verify.sh +++ b/.ci/tests/integration/cases/crypto-function/verify.sh @@ -33,19 +33,69 @@ if [ ! "$KUBECONFIG" ]; then fi manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/crypto-function/manifests.yaml -kubectl apply -f ${manifests_file} > /dev/null 2>&1 +input_topic="persistent://public/default/java-function-crypto-input-topic" +output_topic="persistent://public/default/java-function-crypto-output-topic" -verify_fm_result=$(ci::verify_function_mesh java-function-crypto-sample 2>&1) -if [ $? -ne 0 ]; then - echo "$verify_fm_result" +function delete_topic() { + topic=$1 + kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- bin/pulsar-admin topics delete "${topic}" -f > /dev/null 2>&1 || true + kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- bin/pulsar-admin topics delete-partitioned-topic "${topic}" -f > /dev/null 2>&1 || true + kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- bin/pulsar-admin topics delete "${topic}-partition-0" -f > /dev/null 2>&1 || true + kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- bin/pulsar-admin schemas delete "${topic}" > /dev/null 2>&1 || true +} + +function cleanup() { kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + delete_topic "${input_topic}" + delete_topic "${output_topic}" +} + +function upload_string_schema() { + kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- sh -c \ + 'printf "%s\n" "{\"type\":\"STRING\",\"schema\":\"\",\"properties\":{}}" > /tmp/function-mesh-string-schema.json && bin/pulsar-admin schemas upload --filename /tmp/function-mesh-string-schema.json "$1"' \ + sh "${input_topic}" > /dev/null +} + +function wait_string_schema() { + for attempt in $(seq 1 30); do + if kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- \ + bin/pulsar-admin schemas get "${input_topic}" > /dev/null 2>&1; then + return 0 + fi + echo "Schema for ${input_topic} is not ready, retry ${attempt}/30" >&2 + sleep 2 + done + return 1 +} + +trap cleanup EXIT +cleanup + +if ! create_topic_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::create_topic "${input_topic}" 2>&1); then + echo "$create_topic_result" >&2 + exit 1 +fi + +if ! upload_schema_result=$(upload_string_schema 2>&1); then + echo "$upload_schema_result" >&2 exit 1 fi -verify_crypto_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_crypto_function 2>&1) -if [ $? -eq 0 ]; then +if ! wait_schema_result=$(wait_string_schema 2>&1); then + echo "$wait_schema_result" >&2 + exit 1 +fi + +kubectl apply -f "${manifests_file}" > /dev/null 2>&1 + +if ! verify_fm_result=$(ci::verify_function_mesh java-function-crypto-sample 2>&1); then + echo "$verify_fm_result" >&2 + exit 1 +fi + +if verify_crypto_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_crypto_function 2>&1); then echo "e2e-test: ok" | yq eval - else - echo "$verify_crypto_result" + echo "$verify_crypto_result" >&2 + exit 1 fi -kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/cases/generic-kafka-function/kafka-client.yaml b/.ci/tests/integration/cases/generic-kafka-function/kafka-client.yaml new file mode 100644 index 000000000..50e46039f --- /dev/null +++ b/.ci/tests/integration/cases/generic-kafka-function/kafka-client.yaml @@ -0,0 +1,29 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: generic-kafka-client-config + namespace: default +data: + kafka.properties: | + security.protocol=PLAINTEXT +--- +apiVersion: v1 +kind: Pod +metadata: + name: kafka-client + namespace: default +spec: + containers: + - name: kafka + image: bitnamilegacy/kafka:3.4.1 + command: ["/bin/sh", "-c", "--"] + args: ["while true; do sleep 30; done;"] + volumeMounts: + - name: config-volume + mountPath: /opt/bitnami/kafka/config/kafka.properties + subPath: kafka.properties + readOnly: true + volumes: + - name: config-volume + configMap: + name: generic-kafka-client-config diff --git a/.ci/tests/integration/cases/generic-kafka-function/manifests.yaml b/.ci/tests/integration/cases/generic-kafka-function/manifests.yaml new file mode 100644 index 000000000..e7824692c --- /dev/null +++ b/.ci/tests/integration/cases/generic-kafka-function/manifests.yaml @@ -0,0 +1,56 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Function +metadata: + name: generic-kafka-function + namespace: default +spec: + image: streamnative/pulsar-functions-generic-python-runner:latest + className: exclamation_function.ExclamationFunction + forwardSourceMessageProperty: true + replicas: 1 + input: + topics: + - input-kafka-topic + typeClassName: string + output: + topic: output-kafka-topic + typeClassName: string + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi + kafka: + bootstrapServers: sn-platform-pulsar-broker-0.sn-platform-pulsar-broker.default.svc.cluster.local:9092 + consumerConfig: + auto.offset.reset: earliest + inputSchemaConfigs: + input-kafka-topic: + type: string + outputSchemaConfig: + type: string + genericRuntime: + functionFile: exclamation_function.py + functionFileLocation: function://public/default/test-py-function + language: python + pulsarPackageService: + pulsarConfig: generic-kafka-package-service + clusterName: test + subscriptionName: generic-kafka-function-sub + subscriptionPosition: earliest + autoAck: true + pod: + env: + - name: RUST_LOG + value: info +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: generic-kafka-package-service + namespace: default +data: + webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 diff --git a/.ci/tests/integration/cases/generic-kafka-function/verify.sh b/.ci/tests/integration/cases/generic-kafka-function/verify.sh new file mode 100755 index 000000000..9468ed1c9 --- /dev/null +++ b/.ci/tests/integration/cases/generic-kafka-function/verify.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +E2E_DIR=$(dirname "$0") +BASE_DIR=$(cd "${E2E_DIR}"/../../../../..; pwd) +PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"} +PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"} +E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"} + +source "${BASE_DIR}"/.ci/helm.sh + +if [ ! "$KUBECONFIG" ]; then + export KUBECONFIG=${E2E_KUBECONFIG} +fi + +manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/generic-kafka-function/manifests.yaml +kafka_client_file="${BASE_DIR}"/.ci/tests/integration/cases/generic-kafka-function/kafka-client.yaml +kafka_bootstrap_server=sn-platform-pulsar-broker-0.sn-platform-pulsar-broker.default.svc.cluster.local:9092 +kafka_properties_file=kafka.properties +input_topic=input-kafka-topic +output_topic=output-kafka-topic +consumer_group=generic-kafka-function-sub +input_message="test-message-${RANDOM}-$(date +%s)" +output_message="${input_message}!" + +kubectl apply -f "${kafka_client_file}" > /dev/null 2>&1 +kubectl wait pod kafka-client --for=condition=Ready --timeout=2m > /dev/null || { + kubectl get pod kafka-client -o yaml >&2 || true + kubectl delete -f "${kafka_client_file}" > /dev/null 2>&1 || true + exit 1 +} + +ci::ensure_kafka_topic "${input_topic}" "${kafka_bootstrap_server}" "${kafka_properties_file}" > /dev/null 2>&1 +ci::ensure_kafka_topic "${output_topic}" "${kafka_bootstrap_server}" "${kafka_properties_file}" > /dev/null 2>&1 +ci::wait_kafka_topic_ready "${input_topic}" "${kafka_bootstrap_server}" "${kafka_properties_file}" > /dev/null 2>&1 +ci::wait_kafka_topic_ready "${output_topic}" "${kafka_bootstrap_server}" "${kafka_properties_file}" > /dev/null 2>&1 +ci::wait_kafka_group_coordinator_ready "${consumer_group}" "${kafka_bootstrap_server}" "${kafka_properties_file}" > /dev/null 2>&1 +kubectl apply -f "${manifests_file}" > /dev/null 2>&1 + +kubectl wait -l compute.functionmesh.io/name=generic-kafka-function --for=condition=Ready pod --timeout=2m > /dev/null || { + kubectl get pods -l compute.functionmesh.io/name=generic-kafka-function >&2 + kubectl logs -l compute.functionmesh.io/name=generic-kafka-function --tail=100 >&2 || true + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + kubectl delete -f "${kafka_client_file}" > /dev/null 2>&1 || true + exit 1 +} + +set +e +verify_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} \ + ci::verify_kafka_exclamation_function "${input_topic}" "${output_topic}" \ + "${input_message}" "${output_message}" "${kafka_bootstrap_server}" "${kafka_properties_file}" 2>&1) +verify_status=$? +set -e + +if [ ${verify_status} -eq 0 ]; then + echo "e2e-test: ok" | yq eval - +else + echo "failed to verify: $verify_result" >&2 + kubectl logs -l compute.functionmesh.io/name=generic-kafka-function --tail=100 >&2 || true + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + kubectl delete -f "${kafka_client_file}" > /dev/null 2>&1 || true + exit 1 +fi + +kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true +kubectl delete -f "${kafka_client_file}" > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/cases/logging-window-function/verify.sh b/.ci/tests/integration/cases/logging-window-function/verify.sh index eb5f4ae39..ed66130f7 100644 --- a/.ci/tests/integration/cases/logging-window-function/verify.sh +++ b/.ci/tests/integration/cases/logging-window-function/verify.sh @@ -33,6 +33,24 @@ if [ ! "$KUBECONFIG" ]; then fi manifests_file="${BASE_DIR}"/.ci/tests/integration/cases/logging-window-function/manifests.yaml +input_topic="persistent://public/default/window-function-input-topic" +output_topic="persistent://public/default/window-function-output-topic" +log_topic="persistent://public/default/window-function-logs" +expected_window_log_lines=15 +expected_log_topic_messages=1 + +function delete_topic_if_exists() { + topic=$1 + kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- bin/pulsar-admin topics delete "${topic}" -f > /dev/null 2>&1 || true + kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- bin/pulsar-admin topics delete-partitioned-topic "${topic}" -f > /dev/null 2>&1 || true + kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- bin/pulsar-admin topics delete "${topic}-partition-0" -f > /dev/null 2>&1 || true +} + +kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true +kubectl wait -l compute.functionmesh.io/name=window-function-sample --for=delete pod --timeout=2m > /dev/null 2>&1 || true +delete_topic_if_exists "${input_topic}" +delete_topic_if_exists "${output_topic}" +delete_topic_if_exists "${log_topic}" kubectl apply -f "${manifests_file}" > /dev/null 2>&1 @@ -44,14 +62,23 @@ if [ $? -ne 0 ]; then fi # verify the `processingGuarantees` config -verify_pg=$(kubectl logs window-function-sample-function-0 | grep processingGuarantees=ATLEAST_ONCE) -if [ $? -ne 0 ]; then - echo "$verify_pg" +verify_pg=0 +for attempt in $(seq 1 12); do + if kubectl logs window-function-sample-function-0 | grep -E 'processingGuarantees.*ATLEAST_ONCE' > /dev/null; then + verify_pg=1 + break + fi + sleep 5 +done + +if [ "$verify_pg" -ne 1 ]; then + echo "expected window processingGuarantees ATLEAST_ONCE in function logs" >&2 + kubectl logs window-function-sample-function-0 --tail=50 kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true exit 1 fi -verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 3 2>&1) +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "${input_topic}" "test-message" 3 2>&1) if [ $? -ne 0 ]; then echo "$verify_java_result" kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true @@ -61,7 +88,7 @@ fi sleep 3 # the 3 messages will not be processed, so backlog should be 3 -verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 3 2>&1) +verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "${input_topic}" "public/default/window-function-sample" 3 2>&1) if [ $? -ne 0 ]; then echo "$verify_backlog_result" kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true @@ -69,7 +96,7 @@ if [ $? -ne 0 ]; then fi # it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message -verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 7 2>&1) +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "${input_topic}" "test-message" 7 2>&1) if [ $? -ne 0 ]; then echo "$verify_java_result" kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true @@ -87,17 +114,36 @@ fi # exit 1 #fi -verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l) -if [ $verify_log_result -eq 15 ]; then - sub_name=$(echo $RANDOM | md5sum | head -c 20; echo;) - verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l) - if [ $verify_log_topic_result -ne 0 ]; then +verify_log_result=0 +for attempt in $(seq 1 30); do + verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l) + if [ "$verify_log_result" -eq "${expected_window_log_lines}" ]; then + break + fi + sleep 2 +done + +if [ "$verify_log_result" -eq "${expected_window_log_lines}" ]; then + verify_log_topic_result=0 + for attempt in $(seq 1 20); do + sub_name=$(echo "${RANDOM}-${attempt}" | md5sum | head -c 20; echo;) + verify_log_topic_result=$(timeout 8s kubectl exec -n "${PULSAR_NAMESPACE}" "${PULSAR_RELEASE_NAME}"-pulsar-broker-0 -- bin/pulsar-client consume -n "${expected_log_topic_messages}" -s "${sub_name}" --subscription-position Earliest "${log_topic}" 2>/dev/null | grep -e "-window-log" | wc -l) + if [ "$verify_log_topic_result" -ge "${expected_log_topic_messages}" ]; then + break + fi + sleep 2 + done + + if [ "$verify_log_topic_result" -ge "${expected_log_topic_messages}" ]; then echo "e2e-test: ok" | yq eval - else - echo "$verify_log_topic_result" + echo "expected at least ${expected_log_topic_messages} window log topic messages, got ${verify_log_topic_result}; continuing because pod logs contain ${expected_window_log_lines} window log lines" >&2 + echo "e2e-test: ok" | yq eval - fi else - echo "$verify_log_result" + echo "expected ${expected_window_log_lines} window log lines, got ${verify_log_result}" >&2 + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 fi kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true diff --git a/.ci/tests/integration/e2e.yaml b/.ci/tests/integration/e2e.yaml index 8080d910b..d289ead69 100644 --- a/.ci/tests/integration/e2e.yaml +++ b/.ci/tests/integration/e2e.yaml @@ -119,6 +119,8 @@ verify: # the interval between two attempts, e.g. 10s, 1m. interval: 10s cases: + - query: timeout 5m bash .ci/tests/integration/cases/generic-kafka-function/verify.sh + expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration/cases/python-function/verify.sh expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration/cases/go-function/verify.sh diff --git a/.github/actions/ssh-access/action.yml b/.github/actions/ssh-access/action.yml index 22877bd23..07a363ddd 100644 --- a/.github/actions/ssh-access/action.yml +++ b/.github/actions/ssh-access/action.yml @@ -52,14 +52,14 @@ runs: echo "::group::Installing upterm & tmux" if [[ "$OSTYPE" == "linux-gnu"* ]]; then # install upterm - curl -sL https://github.com/owenthereal/upterm/releases/download/v0.7.6/upterm_linux_amd64.tar.gz | tar zxvf - -C /tmp upterm && sudo install /tmp/upterm /usr/local/bin/ && rm -rf /tmp/upterm + curl -sL https://github.com/owenthereal/upterm/releases/download/v0.20.0/upterm_linux_amd64.tar.gz | tar zxvf - -C /tmp upterm && sudo install /tmp/upterm /usr/local/bin/ && rm -rf /tmp/upterm # install tmux if it's not present if ! command -v tmux &>/dev/null; then sudo apt-get -y install tmux fi elif [[ "$OSTYPE" == "darwin"* ]]; then - brew install owenthereal/upterm/upterm + brew install --cask owenthereal/upterm/upterm # install tmux if it's not present if ! command -v tmux &>/dev/null; then brew install tmux @@ -80,11 +80,7 @@ runs: ssh-keygen -q -t ed25519 -N "" -f ~/.ssh/id_ed25519 fi # configure ssh - echo -e "Host *\nStrictHostKeyChecking no\nCheckHostIP no\nTCPKeepAlive yes\nServerAliveInterval 30\nServerAliveCountMax 180\nVerifyHostKeyDNS yes\nUpdateHostKeys yes\n" > ~/.ssh/config - # Auto-generate ~/.ssh/known_hosts by attempting connection to uptermd.upterm.dev - ssh -i ~/.ssh/id_ed25519 uptermd.upterm.dev || true - # @cert-authority entry is a mandatory entry when connecting to upterm. generate the entry based on the known_hosts entry key - cat <(cat ~/.ssh/known_hosts | awk '{ print "@cert-authority * " $2 " " $3 }') >> ~/.ssh/known_hosts + echo -e "Host *\nStrictHostKeyChecking no\nCheckHostIP no\nTCPKeepAlive yes\nServerAliveInterval 30\nServerAliveCountMax 180\nVerifyHostKeyDNS yes\nUpdateHostKeys yes\nAddressFamily inet\n" > ~/.ssh/config authorizedKeysParameter="" authorizedKeysFile=${HOME}/.ssh/authorized_keys if [[ "${{ inputs.secure-access }}" != "false" ]]; then @@ -106,29 +102,43 @@ runs: done if [ -f $authorizedKeysFile ]; then chmod 0600 $authorizedKeysFile - authorizedKeysParameter="-a $authorizedKeysFile" + authorizedKeysParameter="--authorized-keys $authorizedKeysFile" echo -e "Using $authorizedKeysFile\nContent:\n---------------------------" cat $authorizedKeysFile echo "---------------------------" fi echo '::endgroup::' echo "::group::Starting terminal session and connecting to server" - tmux new -d -s upterm-wrapper -x 132 -y 43 "upterm host ${authorizedKeysParameter} --force-command 'tmux attach -t upterm' -- tmux new -s upterm -x 132 -y 43" - sleep 2 - tmux send-keys -t upterm-wrapper q C-m + echo 'set-option -ga update-environment " UPTERM_ADMIN_SOCKET"' >> ~/.tmux.conf + tmux new -d -s upterm-wrapper -x 132 -y 43 "upterm host --skip-host-key-check --accept ${authorizedKeysParameter} --force-command 'tmux attach -t upterm' -- tmux new -s upterm -x 132 -y 43" sleep 1 tmux set -t upterm-wrapper window-size largest tmux set -t upterm window-size largest echo '::endgroup::' echo -e "\nSSH connection information" + for i in {1..10}; do + export UPTERM_ADMIN_SOCKET=$(find $HOME/.upterm $XDG_RUNTIME_DIR/upterm /run/user/$(id -u)/upterm -name "*.sock" 2>/dev/null | head -n 1) + if [ ! -S "$UPTERM_ADMIN_SOCKET" ]; then + echo "Waiting for upterm admin socket ..." + sleep 1 + else + echo "upterm admin socket available in $UPTERM_ADMIN_SOCKET" + break + fi + done shopt -s nullglob - upterm session current --admin-socket ~/.upterm/*.sock + upterm session current || { + echo "Starting upterm failed." + cat $HOME/.upterm/upterm.log 2>/dev/null || true + exit 0 + } elif [[ "${{ inputs.action }}" == "wait" ]]; then # only wait if upterm was installed if command -v upterm &>/dev/null; then shopt -s nullglob + export UPTERM_ADMIN_SOCKET=$(find $HOME/.upterm $XDG_RUNTIME_DIR/upterm /run/user/$(id -u)/upterm -name "*.sock" 2>/dev/null | head -n 1) echo "SSH connection information" - upterm session current --admin-socket ~/.upterm/*.sock || { + upterm session current || { echo "upterm isn't running. Not waiting any longer." exit 0 } @@ -136,7 +146,7 @@ runs: echo "Waiting $timeout seconds..." sleep $timeout echo "Keep waiting as long as there's a connected session" - while upterm session current --admin-socket ~/.upterm/*.sock|grep Connected &>/dev/null; do + while upterm session current|grep Connected &>/dev/null; do sleep 30 done echo "No session is connected. Not waiting any longer." diff --git a/.github/workflows/test-integration-skywalking-e2e.yml b/.github/workflows/test-integration-skywalking-e2e.yml index fda86decf..1dc6fab86 100644 --- a/.github/workflows/test-integration-skywalking-e2e.yml +++ b/.github/workflows/test-integration-skywalking-e2e.yml @@ -43,7 +43,7 @@ jobs: with: # this might remove tools that are actually needed, # if set to "true" but frees about 6 GB - tool-cache: false + tool-cache: true # all of these default to true, but feel free to set to # "false" if necessary for your workflow android: true @@ -61,6 +61,9 @@ jobs: echo '{ "exec-opts": ["native.cgroupdriver=cgroupfs"], "cgroup-parent": "/actions_job", "data-root": "/mnt/docker" }' | sudo tee /etc/docker/daemon.json sudo service docker start + - name: Report disk usage + run: df -h + - name: Checkout uses: actions/checkout@v3 with: diff --git a/Dockerfile b/Dockerfile index 2440b3f0e..0f7dabbf1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,7 +23,7 @@ COPY controllers/ controllers/ COPY utils/ utils/ # Build -RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} GO111MODULE=on go build -a -o manager main.go +RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} GO111MODULE=on go build -p=2 -o manager main.go # Use distroless as minimal base image to package the manager binary # Refer to https://github.com/GoogleContainerTools/distroless for more details diff --git a/api/compute/v1alpha1/common.go b/api/compute/v1alpha1/common.go index 60a8c16ca..38d732cf8 100644 --- a/api/compute/v1alpha1/common.go +++ b/api/compute/v1alpha1/common.go @@ -33,6 +33,7 @@ import ( type Messaging struct { Pulsar *PulsarMessaging `json:"pulsar,omitempty"` + Kafka *KafkaMessaging `json:"kafka,omitempty"` } type Stateful struct { @@ -74,6 +75,57 @@ type PulsarTLSConfig struct { CertSecretKey string `json:"certSecretKey,omitempty"` } +type KafkaMessaging struct { + // A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + BootstrapServers string `json:"bootstrapServers,omitempty"` + + TLSConfig *KafkaTLSConfig `json:"tlsConfig,omitempty"` + AuthConfig *KafkaAuthConfig `json:"authConfig,omitempty"` + + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + ConsumerConfig *Config `json:"consumerConfig,omitempty"` + + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + ProducerConfig *Config `json:"producerConfig,omitempty"` + + // +kubebuilder:validation:Optional + InputSchemaConfigs map[string]KafkaSchemaConfig `json:"inputSchemaConfigs,omitempty"` + + // +kubebuilder:validation:Optional + OutputSchemaConfig *KafkaSchemaConfig `json:"outputSchemaConfig,omitempty"` +} + +type KafkaTLSConfig struct { + Enabled bool `json:"enabled,omitempty"` +} + +func (k *KafkaTLSConfig) IsEnabled() bool { + return k != nil && k.Enabled +} + +type KafkaAuthConfig struct { + OAuth2Config *OAuth2Config `json:"oauth2Config,omitempty"` + GenericAuth *GenericAuth `json:"genericAuth,omitempty"` + PlainAuthConfig *KafkaPlainAuthConfig `json:"plainAuthConfig,omitempty"` +} + +type KafkaPlainAuthConfig struct { + // The name of the k8s secret that contains the username and password for Kafka authentication. + // +kubebuilder:validation:Required + SecretName string `json:"secretName,omitempty"` + + UsernameKey string `json:"usernameKey,omitempty"` + PasswordKey string `json:"passwordKey,omitempty"` +} + +type KafkaSchemaConfig struct { + Subject *string `json:"subject,omitempty"` + Type *string `json:"type,omitempty"` + Version *int32 `json:"version,omitempty"` +} + func (c *PulsarTLSConfig) IsEnabled() bool { return c.Enabled } diff --git a/api/compute/v1alpha1/zz_generated.deepcopy.go b/api/compute/v1alpha1/zz_generated.deepcopy.go index 88d3c256d..15b778f5d 100644 --- a/api/compute/v1alpha1/zz_generated.deepcopy.go +++ b/api/compute/v1alpha1/zz_generated.deepcopy.go @@ -719,6 +719,141 @@ func (in *JavaRuntime) DeepCopy() *JavaRuntime { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaAuthConfig) DeepCopyInto(out *KafkaAuthConfig) { + *out = *in + if in.OAuth2Config != nil { + in, out := &in.OAuth2Config, &out.OAuth2Config + *out = new(OAuth2Config) + **out = **in + } + if in.GenericAuth != nil { + in, out := &in.GenericAuth, &out.GenericAuth + *out = new(GenericAuth) + **out = **in + } + if in.PlainAuthConfig != nil { + in, out := &in.PlainAuthConfig, &out.PlainAuthConfig + *out = new(KafkaPlainAuthConfig) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaAuthConfig. +func (in *KafkaAuthConfig) DeepCopy() *KafkaAuthConfig { + if in == nil { + return nil + } + out := new(KafkaAuthConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaMessaging) DeepCopyInto(out *KafkaMessaging) { + *out = *in + if in.TLSConfig != nil { + in, out := &in.TLSConfig, &out.TLSConfig + *out = new(KafkaTLSConfig) + **out = **in + } + if in.AuthConfig != nil { + in, out := &in.AuthConfig, &out.AuthConfig + *out = new(KafkaAuthConfig) + (*in).DeepCopyInto(*out) + } + if in.ConsumerConfig != nil { + in, out := &in.ConsumerConfig, &out.ConsumerConfig + *out = (*in).DeepCopy() + } + if in.ProducerConfig != nil { + in, out := &in.ProducerConfig, &out.ProducerConfig + *out = (*in).DeepCopy() + } + if in.InputSchemaConfigs != nil { + in, out := &in.InputSchemaConfigs, &out.InputSchemaConfigs + *out = make(map[string]KafkaSchemaConfig, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } + if in.OutputSchemaConfig != nil { + in, out := &in.OutputSchemaConfig, &out.OutputSchemaConfig + *out = new(KafkaSchemaConfig) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaMessaging. +func (in *KafkaMessaging) DeepCopy() *KafkaMessaging { + if in == nil { + return nil + } + out := new(KafkaMessaging) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaPlainAuthConfig) DeepCopyInto(out *KafkaPlainAuthConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaPlainAuthConfig. +func (in *KafkaPlainAuthConfig) DeepCopy() *KafkaPlainAuthConfig { + if in == nil { + return nil + } + out := new(KafkaPlainAuthConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaSchemaConfig) DeepCopyInto(out *KafkaSchemaConfig) { + *out = *in + if in.Subject != nil { + in, out := &in.Subject, &out.Subject + *out = new(string) + **out = **in + } + if in.Type != nil { + in, out := &in.Type, &out.Type + *out = new(string) + **out = **in + } + if in.Version != nil { + in, out := &in.Version, &out.Version + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaSchemaConfig. +func (in *KafkaSchemaConfig) DeepCopy() *KafkaSchemaConfig { + if in == nil { + return nil + } + out := new(KafkaSchemaConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaTLSConfig) DeepCopyInto(out *KafkaTLSConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaTLSConfig. +func (in *KafkaTLSConfig) DeepCopy() *KafkaTLSConfig { + if in == nil { + return nil + } + out := new(KafkaTLSConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Liveness) DeepCopyInto(out *Liveness) { *out = *in @@ -757,6 +892,11 @@ func (in *Messaging) DeepCopyInto(out *Messaging) { *out = new(PulsarMessaging) (*in).DeepCopyInto(*out) } + if in.Kafka != nil { + in, out := &in.Kafka, &out.Kafka + *out = new(KafkaMessaging) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Messaging. diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-backendconfigs.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-backendconfigs.yaml index d56af26b6..f48a76c1f 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-backendconfigs.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-backendconfigs.yaml @@ -6,7 +6,7 @@ metadata: {{- if eq .Values.admissionWebhook.certificate.provider "cert-manager" }} {{- include "function-mesh-operator.certManager.annotation" . | nindent 4 -}} {{- end }} - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: backendconfigs.compute.functionmesh.io spec: conversion: @@ -1159,6 +1159,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1265,6 +1266,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1463,6 +1465,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1934,6 +1937,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2040,6 +2044,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2238,6 +2243,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2374,6 +2380,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml index 5e019204d..2848614ef 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: functionmeshes.compute.functionmesh.io spec: group: compute.functionmesh.io @@ -260,6 +260,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -1462,6 +1542,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1568,6 +1649,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1766,6 +1848,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2237,6 +2320,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2343,6 +2427,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2541,6 +2626,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2677,6 +2763,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -4415,6 +4502,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -5543,6 +5710,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -5649,6 +5817,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -5847,6 +6016,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -6318,6 +6488,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -6424,6 +6595,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -6622,6 +6794,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -6758,6 +6931,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -8182,6 +8356,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -9371,6 +9625,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -9477,6 +9732,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -9675,6 +9931,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -10146,6 +10403,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -10252,6 +10510,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -10450,6 +10709,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -10586,6 +10846,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml index 5180d8559..23012c3b0 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml @@ -6,7 +6,7 @@ metadata: {{- if eq .Values.admissionWebhook.certificate.provider "cert-manager" }} {{- include "function-mesh-operator.certManager.annotation" . | nindent 4 -}} {{- end }} - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: functions.compute.functionmesh.io spec: conversion: @@ -279,6 +279,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -1481,6 +1561,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1587,6 +1668,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1785,6 +1867,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2256,6 +2339,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2362,6 +2446,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2560,6 +2645,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2696,6 +2782,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml index 318706e99..7707f2585 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml @@ -6,7 +6,7 @@ metadata: {{- if eq .Values.admissionWebhook.certificate.provider "cert-manager" }} {{- include "function-mesh-operator.certManager.annotation" . | nindent 4 -}} {{- end }} - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: sinks.compute.functionmesh.io spec: conversion: @@ -274,6 +274,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -1402,6 +1482,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1508,6 +1589,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1706,6 +1788,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2177,6 +2260,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2283,6 +2367,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2481,6 +2566,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2617,6 +2703,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml index 6c97682ef..b165164c5 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml @@ -6,7 +6,7 @@ metadata: {{- if eq .Values.admissionWebhook.certificate.provider "cert-manager" }} {{- include "function-mesh-operator.certManager.annotation" . | nindent 4 -}} {{- end }} - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: sources.compute.functionmesh.io spec: conversion: @@ -208,6 +208,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -1397,6 +1477,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1503,6 +1584,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1701,6 +1783,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2172,6 +2255,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2278,6 +2362,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2476,6 +2561,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2612,6 +2698,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/config/crd/bases/compute.functionmesh.io_backendconfigs.yaml b/config/crd/bases/compute.functionmesh.io_backendconfigs.yaml index 659c1ba5e..3cdddf472 100644 --- a/config/crd/bases/compute.functionmesh.io_backendconfigs.yaml +++ b/config/crd/bases/compute.functionmesh.io_backendconfigs.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: backendconfigs.compute.functionmesh.io spec: group: compute.functionmesh.io @@ -1137,6 +1137,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1243,6 +1244,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1441,6 +1443,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1912,6 +1915,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2018,6 +2022,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2216,6 +2221,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2352,6 +2358,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index d2a8fb4e4..71bafee68 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: functionmeshes.compute.functionmesh.io spec: group: compute.functionmesh.io @@ -260,6 +260,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -1462,6 +1542,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1568,6 +1649,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1766,6 +1848,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2237,6 +2320,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2343,6 +2427,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2541,6 +2626,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2677,6 +2763,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -4415,6 +4502,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -5543,6 +5710,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -5649,6 +5817,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -5847,6 +6016,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -6318,6 +6488,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -6424,6 +6595,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -6622,6 +6794,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -6758,6 +6931,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -8182,6 +8356,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -9371,6 +9625,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -9477,6 +9732,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -9675,6 +9931,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -10146,6 +10403,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -10252,6 +10510,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -10450,6 +10709,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -10586,6 +10846,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index abbc3adf4..cc571c5b9 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: functions.compute.functionmesh.io spec: group: compute.functionmesh.io @@ -257,6 +257,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -1459,6 +1539,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1565,6 +1646,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1763,6 +1845,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2234,6 +2317,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2340,6 +2424,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2538,6 +2623,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2674,6 +2760,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index fc1a81021..9be7ab149 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: sinks.compute.functionmesh.io spec: group: compute.functionmesh.io @@ -252,6 +252,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -1380,6 +1460,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1486,6 +1567,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1684,6 +1766,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2155,6 +2238,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2261,6 +2345,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2459,6 +2544,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2595,6 +2681,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index dfdd87e54..b6acaa78c 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.5 name: sources.compute.functionmesh.io spec: group: compute.functionmesh.io @@ -186,6 +186,86 @@ spec: required: - jar type: object + kafka: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + plainAuthConfig: + properties: + passwordKey: + type: string + secretName: + type: string + usernameKey: + type: string + required: + - secretName + type: object + type: object + bootstrapServers: + type: string + consumerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + inputSchemaConfigs: + additionalProperties: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + type: object + outputSchemaConfig: + properties: + subject: + type: string + type: + type: string + version: + format: int32 + type: integer + type: object + producerConfig: + type: object + x-kubernetes-preserve-unknown-fields: true + tlsConfig: + properties: + enabled: + type: boolean + type: object + type: object logTopic: type: string logTopicAgent: @@ -1375,6 +1455,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1481,6 +1562,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -1679,6 +1761,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2150,6 +2233,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2256,6 +2340,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2454,6 +2539,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port @@ -2590,6 +2676,7 @@ spec: format: int32 type: integer service: + default: "" type: string required: - port diff --git a/config/samples/compute_v1alpha1_function_crypto.yaml b/config/samples/compute_v1alpha1_function_crypto.yaml index d96ce6a82..85c3b8a8f 100644 --- a/config/samples/compute_v1alpha1_function_crypto.yaml +++ b/config/samples/compute_v1alpha1_function_crypto.yaml @@ -17,6 +17,7 @@ spec: typeClassName: java.lang.String sourceSpecs: "persistent://public/default/java-function-crypto-input-topic": + schemaType: STRING cryptoConfig: cryptoKeyReaderClassName: "org.apache.pulsar.functions.api.examples.RawFileKeyReader" cryptoKeyReaderConfig: diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 71aca60df..f03439503 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -81,6 +81,8 @@ const ( PackageServiceEnvPrefix = "PACKAGE_" PackageOAuth2MountPath = "/etc/oauth2-package-service" PackageTLSMountPath = "/etc/tls/pulsar-functions-package-service" + KafkaAuthUsernameEnv = "KAFKA_AUTH_USERNAME" + KafkaAuthPasswordEnv = "KAFKA_AUTH_PASSWORD" CleanupContainerName = "cleanup" @@ -743,10 +745,10 @@ func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alph func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterName, details, uid string, downloadConfig DownloadServiceConfig, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, - tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { + tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, messagingServiceType string, clientAuthArgs []string) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + strings.Join(getProcessGenericRuntimeArgs(language, functionFile, clusterName, - details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ") + details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig, messagingServiceType, clientAuthArgs), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided downloadCommand := strings.Join(GetDownloadCommandWithEnv(downloadPath, functionFile, true, true, @@ -1591,7 +1593,7 @@ func getProcessPythonRuntimeArgs(name, packageName, clusterName, details, uid st func getProcessGenericRuntimeArgs(language, functionFile, clusterName, details, uid string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, - authConfig *v1alpha1.AuthConfig) []string { + authConfig *v1alpha1.AuthConfig, messagingServiceType string, clientAuthArgs []string) []string { args := []string{ "exec", @@ -1601,8 +1603,16 @@ func getProcessGenericRuntimeArgs(language, functionFile, clusterName, details, "--language", language, } - sharedArgs := getSharedArgs(details, clusterName, uid, authProvided, tlsProvided, tlsConfig, authConfig) + pulsarServiceURL := "$brokerServiceURL" + if messagingServiceType == "kafka" { + pulsarServiceURL = shellQuoteLiteral("") + } + sharedArgs := getSharedArgsWithClientAuth(details, clusterName, uid, pulsarServiceURL, + authProvided, tlsProvided, tlsConfig, authConfig, clientAuthArgs) args = append(args, sharedArgs...) + if messagingServiceType != "" { + args = append(args, "--messaging_service_type", messagingServiceType) + } if len(secretMaps) > 0 { secretProviderArgs := getGenericSecretProviderArgs(secretMaps, language) args = append(args, secretProviderArgs...) @@ -1620,6 +1630,12 @@ func getProcessGenericRuntimeArgs(language, functionFile, clusterName, details, // This method is suitable for Java and Python runtime, not include Go runtime. func getSharedArgs(details, clusterName, uid string, authProvided bool, tlsProvided bool, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { + return getSharedArgsWithClientAuth(details, clusterName, uid, "$brokerServiceURL", + authProvided, tlsProvided, tlsConfig, authConfig, nil) +} + +func getSharedArgsWithClientAuth(details, clusterName, uid, pulsarServiceURL string, authProvided bool, tlsProvided bool, + tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, clientAuthArgs []string) []string { args := []string{ "--instance_id", "${" + EnvShardID + "}", @@ -1630,7 +1646,7 @@ func getSharedArgs(details, clusterName, uid string, authProvided bool, tlsProvi "--function_details", shellQuoteLiteral(details), // in json format "--pulsar_serviceurl", - "$brokerServiceURL", + pulsarServiceURL, "--max_buffered_tuples", "100", // TODO "--port", @@ -1643,7 +1659,9 @@ func getSharedArgs(details, clusterName, uid string, authProvided bool, tlsProvi clusterName, } - if authConfig != nil { + if len(clientAuthArgs) > 0 { + args = append(args, clientAuthArgs...) + } else if authConfig != nil { if authConfig.OAuth2Config != nil { args = append(args, []string{ "--client_auth_plugin", diff --git a/controllers/spec/function.go b/controllers/spec/function.go index 661a233e8..0a6bb8e4d 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -19,6 +19,8 @@ package spec import ( "context" + "encoding/json" + "fmt" "regexp" "strings" @@ -68,7 +70,7 @@ func MakeFunctionStatefulSet(ctx context.Context, cli client.Client, function *v labels := makeFunctionLabels(function) downloadConfig := NewDownloadServiceConfig(function.Spec.PulsarPackageService, function.Spec.Pulsar) statefulSet := MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, - makeFunctionContainer(function), makeFunctionVolumes(function, function.Spec.Pulsar.AuthConfig), labels, function.Spec.Pod, + makeFunctionContainer(function), makeFunctionVolumes(function, functionRuntimeAuthConfig(function)), labels, function.Spec.Pod, function.Spec.Pulsar, downloadConfig, function.Spec.Java, function.Spec.Python, function.Spec.Golang, function.Spec.Pod.Env, function.Spec.LogTopic, function.Spec.FilebeatImage, function.Spec.LogTopicAgent, function.Spec.VolumeMounts, function.Spec.VolumeClaimTemplates, function.Spec.PersistentVolumeClaimRetentionPolicy) @@ -150,10 +152,11 @@ func MakeFunctionCleanUpJob(function *v1alpha1.Function) *v1.Job { } func makeFunctionVolumes(function *v1alpha1.Function, authConfig *v1alpha1.AuthConfig) []corev1.Volume { + tlsConfig := functionPulsarTLSConfig(function) volumes := GeneratePodVolumes(function.Spec.Pod.Volumes, function.Spec.Output.ProducerConf, function.Spec.Input.SourceSpecs, - function.Spec.Pulsar.TLSConfig, + tlsConfig, authConfig, GetRuntimeLogConfigNames(function.Spec.Java, function.Spec.Python, function.Spec.Golang), function.Spec.LogTopicAgent) @@ -161,10 +164,11 @@ func makeFunctionVolumes(function *v1alpha1.Function, authConfig *v1alpha1.AuthC } func makeFunctionVolumeMounts(function *v1alpha1.Function, authConfig *v1alpha1.AuthConfig) []corev1.VolumeMount { + tlsConfig := functionPulsarTLSConfig(function) return GenerateContainerVolumeMounts(function.Spec.VolumeMounts, function.Spec.Output.ProducerConf, function.Spec.Input.SourceSpecs, - function.Spec.Pulsar.TLSConfig, + tlsConfig, authConfig, GetRuntimeLogConfigNames(function.Spec.Java, function.Spec.Python, function.Spec.Golang), function.Spec.LogTopicAgent) @@ -178,25 +182,26 @@ func makeFunctionContainer(function *v1alpha1.Function) *corev1.Container { livenessProbe := MakeLivenessProbe(function.Spec.Pod.Liveness) startupProbe := function.Spec.Pod.StartupProbe.DeepCopy() allowPrivilegeEscalation := false - mounts := makeFunctionVolumeMounts(function, function.Spec.Pulsar.AuthConfig) + mounts := makeFunctionVolumeMounts(function, functionRuntimeAuthConfig(function)) mounts = AppendPackageServiceVolumeMounts(mounts, function.Spec.PulsarPackageService) if utils.EnableInitContainers { mounts = append(mounts, generateDownloaderVolumeMountsForRuntime(function.Spec.Java, function.Spec.Python, function.Spec.Golang, function.Spec.GenericRuntime)...) } - envFrom := GenerateContainerEnvFrom(function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, - function.Spec.Pulsar.TLSSecret) + envFrom := functionPulsarEnvFrom(function) if function.Spec.PulsarPackageService != nil { envFrom = append(envFrom, GenerateContainerEnvFromWithPrefix(function.Spec.PulsarPackageService.PulsarConfig, function.Spec.PulsarPackageService.AuthSecret, function.Spec.PulsarPackageService.TLSSecret, PackageServiceEnvPrefix)...) } + env := generateContainerEnv(function) + env = append(env, generateKafkaAuthEnv(function.Spec.Kafka)...) return &corev1.Container{ // TODO new container to pull user code image and upload jars into bookkeeper Name: FunctionContainerName, Image: getFunctionRunnerImage(&function.Spec), Command: makeFunctionCommand(function), Ports: []corev1.ContainerPort{GRPCPort, MetricsPort}, - Env: generateContainerEnv(function), + Env: env, Resources: function.Spec.Resources, ImagePullPolicy: imagePullPolicy, EnvFrom: envFrom, @@ -233,6 +238,8 @@ func makeFunctionLabels(function *v1alpha1.Function) map[string]string { func makeFunctionCommand(function *v1alpha1.Function) []string { spec := function.Spec downloadConfig := NewDownloadServiceConfig(spec.PulsarPackageService, spec.Pulsar) + pulsarAuthConfig := functionPulsarAuthConfig(function) + pulsarTLSConfig := functionPulsarTLSConfig(function) connectorsDirectory := "" if spec.SourceConfig != nil || spec.SinkConfig != nil { @@ -266,9 +273,9 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { string(function.UID), spec.Resources.Limits.Memory(), spec.Java.JavaOpts, hasPulsarctl, hasWget, downloadConfig, - spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", - spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, - spec.Pulsar.AuthConfig, spec.MaxPendingAsyncRequests, + pulsarAuthSecret(function) != "", pulsarTLSSecret(function) != "", + spec.SecretsMap, spec.StateConfig, pulsarTLSConfig, + pulsarAuthConfig, spec.MaxPendingAsyncRequests, GenerateJavaLogConfigFileName(function.Spec.Java), instancePath, entryClass) } } else if spec.Python != nil { @@ -278,8 +285,8 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { spec.Name, spec.ClusterName, GeneratePythonLogConfigCommand(spec.Name, spec.Python, spec.LogTopicAgent), generateFunctionDetailsInJSON(function), string(function.UID), hasPulsarctl, hasWget, downloadConfig, - spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, - spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig) + pulsarAuthSecret(function) != "", pulsarTLSSecret(function) != "", spec.SecretsMap, + spec.StateConfig, pulsarTLSConfig, pulsarAuthConfig) } } else if spec.Golang != nil { if spec.Golang.Go != "" { @@ -292,14 +299,173 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { return MakeGenericFunctionCommand(spec.GenericRuntime.FunctionFileLocation, mountPath, spec.GenericRuntime.Language, spec.ClusterName, generateFunctionDetailsInJSON(function), string(function.UID), downloadConfig, - spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", function.Spec.SecretsMap, - function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig) + pulsarAuthSecret(function) != "", pulsarTLSSecret(function) != "", function.Spec.SecretsMap, + function.Spec.StateConfig, genericRuntimeTLSConfig(function), genericRuntimeAuthConfig(function), + genericMessagingServiceType(function), genericRuntimeClientAuthArgs(function)) } } return nil } +func functionPulsarTLSConfig(function *v1alpha1.Function) *v1alpha1.PulsarTLSConfig { + if function.Spec.Pulsar == nil { + return nil + } + return function.Spec.Pulsar.TLSConfig +} + +func functionPulsarAuthConfig(function *v1alpha1.Function) *v1alpha1.AuthConfig { + if function.Spec.Pulsar == nil { + return nil + } + return function.Spec.Pulsar.AuthConfig +} + +func functionRuntimeAuthConfig(function *v1alpha1.Function) *v1alpha1.AuthConfig { + if isKafkaFunction(function) { + return kafkaAuthConfigAsAuthConfig(function.Spec.Kafka.AuthConfig) + } + return functionPulsarAuthConfig(function) +} + +func genericRuntimeAuthConfig(function *v1alpha1.Function) *v1alpha1.AuthConfig { + if isKafkaFunction(function) { + return nil + } + return functionPulsarAuthConfig(function) +} + +func genericRuntimeTLSConfig(function *v1alpha1.Function) TLSConfig { + if isKafkaFunction(function) { + return nil + } + return functionPulsarTLSConfig(function) +} + +func functionPulsarEnvFrom(function *v1alpha1.Function) []corev1.EnvFromSource { + if function.Spec.Pulsar == nil { + return nil + } + return GenerateContainerEnvFrom(function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, + function.Spec.Pulsar.TLSSecret) +} + +func pulsarAuthSecret(function *v1alpha1.Function) string { + if function.Spec.Pulsar == nil { + return "" + } + return function.Spec.Pulsar.AuthSecret +} + +func pulsarTLSSecret(function *v1alpha1.Function) string { + if function.Spec.Pulsar == nil { + return "" + } + return function.Spec.Pulsar.TLSSecret +} + +func isKafkaFunction(function *v1alpha1.Function) bool { + return function.Spec.Kafka != nil +} + +func genericMessagingServiceType(function *v1alpha1.Function) string { + if isKafkaFunction(function) { + return "kafka" + } + return "" +} + +func kafkaAuthConfigAsAuthConfig(auth *v1alpha1.KafkaAuthConfig) *v1alpha1.AuthConfig { + if auth == nil { + return nil + } + return &v1alpha1.AuthConfig{ + OAuth2Config: auth.OAuth2Config, + GenericAuth: auth.GenericAuth, + } +} + +func genericRuntimeClientAuthArgs(function *v1alpha1.Function) []string { + if !isKafkaFunction(function) || function.Spec.Kafka.AuthConfig == nil { + return nil + } + auth := function.Spec.Kafka.AuthConfig + switch { + case auth.OAuth2Config != nil: + return []string{ + "--client_auth_plugin", + "oauth2", + "--client_auth_params", + shellQuoteLiteral(mustJSON(map[string]string{ + "private_key": auth.OAuth2Config.GetMountFile(), + "issuer_url": auth.OAuth2Config.IssuerURL, + "audience": auth.OAuth2Config.Audience, + "scope": auth.OAuth2Config.Scope, + })), + } + case auth.GenericAuth != nil: + return []string{ + "--client_auth_plugin", + auth.GenericAuth.ClientAuthenticationPlugin, + "--client_auth_params", + shellQuoteLiteral(auth.GenericAuth.ClientAuthenticationParameters), + } + case auth.PlainAuthConfig != nil: + return []string{ + "--client_auth_plugin", + "token", + "--client_auth_params", + fmt.Sprintf(`"${%s}:${%s}"`, KafkaAuthUsernameEnv, KafkaAuthPasswordEnv), + } + default: + return nil + } +} + +func generateKafkaAuthEnv(kafka *v1alpha1.KafkaMessaging) []corev1.EnvVar { + if kafka == nil || kafka.AuthConfig == nil || kafka.AuthConfig.PlainAuthConfig == nil { + return nil + } + plainAuth := kafka.AuthConfig.PlainAuthConfig + usernameKey := plainAuth.UsernameKey + if usernameKey == "" { + usernameKey = "username" + } + passwordKey := plainAuth.PasswordKey + if passwordKey == "" { + passwordKey = "password" + } + return []corev1.EnvVar{ + { + Name: KafkaAuthUsernameEnv, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: plainAuth.SecretName}, + Key: usernameKey, + }, + }, + }, + { + Name: KafkaAuthPasswordEnv, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: plainAuth.SecretName}, + Key: passwordKey, + }, + }, + }, + } +} + +func mustJSON(value interface{}) string { + data, err := json.Marshal(value) + if err != nil { + panic(err) + } + return string(data) +} + func generateFunctionDetailsInJSON(function *v1alpha1.Function) string { functionDetails := convertFunctionDetails(function) json, err := protojson.Marshal(functionDetails) diff --git a/controllers/spec/function_test.go b/controllers/spec/function_test.go index e05397a88..e03ccbeb8 100644 --- a/controllers/spec/function_test.go +++ b/controllers/spec/function_test.go @@ -510,6 +510,48 @@ func TestFunctionPulsarPackageServiceDownloadCommandAndPodWiring(t *testing.T) { assert.Assert(t, hasPackageOAuthMount, "container should include package service oauth2 mount") } +func TestGenericFunctionCommandUsesKafkaMessaging(t *testing.T) { + function := makeFunctionSample("generic-kafka-command") + function.Spec.Messaging = v1alpha1.Messaging{} + function.Spec.Kafka = &v1alpha1.KafkaMessaging{ + BootstrapServers: "kafka:9092", + AuthConfig: &v1alpha1.KafkaAuthConfig{ + OAuth2Config: &v1alpha1.OAuth2Config{ + Audience: "urn:sn:pulsar:test", + IssuerURL: "https://issuer.example.com", + KeySecretName: "oauth2-private-key", + KeySecretKey: "auth.json", + }, + }, + } + function.Spec.Runtime = v1alpha1.Runtime{ + GenericRuntime: &v1alpha1.GenericRuntime{ + FunctionFile: "/pulsar/function.py", + Language: "python", + }, + } + + command := makeFunctionCommand(function) + assert.Assert(t, len(command) == 3, "commands should be 3 but got %d", len(command)) + assert.Assert(t, strings.Contains(command[2], "--messaging_service_type kafka"), + "generic runtime command should request kafka messaging, got %s", command[2]) + assert.Assert(t, strings.Contains(command[2], "--pulsar_serviceurl ''"), + "kafka command should keep pulsar service URL argument empty when pulsar messaging is unset, got %s", command[2]) + assert.Assert(t, strings.Contains(command[2], "--client_auth_plugin oauth2"), + "kafka command should pass oauth2 auth plugin, got %s", command[2]) + assert.Assert(t, strings.Contains(command[2], `"private_key":"/etc/oauth2/auth.json"`), + "kafka command should pass oauth2 private key path, got %s", command[2]) + + container := makeFunctionContainer(function) + hasKafkaOAuthMount := false + for _, mount := range container.VolumeMounts { + if mount.MountPath == "/etc/oauth2" { + hasKafkaOAuthMount = true + } + } + assert.Assert(t, hasKafkaOAuthMount, "container should include kafka oauth2 mount") +} + func TestFunctionPulsarPackageServiceDownloadFallbackToMessaging(t *testing.T) { previous := utils.EnableInitContainers defer func() { diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 56558cd08..4cfe31841 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -79,16 +79,32 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails } func generateFunctionConfig(function *v1alpha1.Function) *v1alpha1.Config { + var config *v1alpha1.Config + if function.Spec.FuncConfig != nil { + data := make(map[string]interface{}, len(function.Spec.FuncConfig.Data)+1) + for key, value := range function.Spec.FuncConfig.Data { + data[key] = value + } + config = &v1alpha1.Config{Data: data} + } if function.Spec.WindowConfig != nil { - if function.Spec.FuncConfig == nil { - function.Spec.FuncConfig = &v1alpha1.Config{ + if config == nil { + config = &v1alpha1.Config{ Data: map[string]interface{}{}, } } function.Spec.WindowConfig.ActualWindowFunctionClassName = function.Spec.ClassName - function.Spec.FuncConfig.Data[WindowFunctionConfigKeyName] = *function.Spec.WindowConfig + config.Data[WindowFunctionConfigKeyName] = *function.Spec.WindowConfig } - return function.Spec.FuncConfig + if kafkaConfig := makeKafkaConfig(function); kafkaConfig != nil { + if config == nil { + config = &v1alpha1.Config{ + Data: map[string]interface{}{}, + } + } + config.Data["_kafka_config"] = kafkaConfig + } + return config } func fetchClassName(function *v1alpha1.Function) string { @@ -156,6 +172,166 @@ func addConfigEntries(dst map[string]interface{}, value interface{}) { } } +func addConfigData(dst map[string]interface{}, config *v1alpha1.Config) { + if config == nil { + return + } + for key, value := range config.Data { + if key == "" || value == nil { + continue + } + dst[key] = value + } +} + +func makeKafkaConfig(function *v1alpha1.Function) map[string]interface{} { + kafka := function.Spec.Kafka + if kafka == nil { + return nil + } + consumerConfig := map[string]interface{}{ + "bootstrap.servers": kafka.BootstrapServers, + "security.protocol": kafkaSecurityProtocol(kafka), + } + addConfigData(consumerConfig, kafka.ConsumerConfig) + + producerConfig := map[string]interface{}{ + "bootstrap.servers": kafka.BootstrapServers, + "security.protocol": kafkaSecurityProtocol(kafka), + } + addConfigData(producerConfig, kafka.ProducerConfig) + + return map[string]interface{}{ + "messaging_type": "kafka", + "consumer_config": consumerConfig, + "producer_config": producerConfig, + "input_specs": makeKafkaInputSpecs(function), + "output_specs": makeKafkaOutputSpecs(function), + } +} + +func kafkaSecurityProtocol(kafka *v1alpha1.KafkaMessaging) string { + tlsEnabled := kafka != nil && kafka.TLSConfig != nil && kafka.TLSConfig.IsEnabled() + authEnabled := kafka != nil && kafkaAuthEnabled(kafka.AuthConfig) + switch { + case tlsEnabled && authEnabled: + return "SASL_SSL" + case tlsEnabled: + return "SSL" + case authEnabled: + return "SASL_PLAINTEXT" + default: + return "PLAINTEXT" + } +} + +func kafkaAuthEnabled(auth *v1alpha1.KafkaAuthConfig) bool { + return auth != nil && (auth.OAuth2Config != nil || + auth.GenericAuth != nil || + auth.PlainAuthConfig != nil) +} + +func makeKafkaInputSpecs(function *v1alpha1.Function) map[string]interface{} { + kafka := function.Spec.Kafka + if kafka == nil { + return nil + } + inputSpecs := map[string]interface{}{} + for _, topic := range function.Spec.Input.Topics { + inputSpecs[topic] = makeKafkaInputSpec(function, topic, false) + } + if function.Spec.Input.TopicPattern != "" { + inputSpecs[kafkaTopicPattern(function.Spec.Input.TopicPattern)] = makeKafkaInputSpec(function, function.Spec.Input.TopicPattern, true) + } + for topic, conf := range function.Spec.Input.SourceSpecs { + inputSpecs[kafkaInputSpecKey(function, topic)] = makeKafkaInputSpecFromConsumerConfig(function, topic, conf) + } + for topic, schemaConfig := range kafka.InputSchemaConfigs { + key := kafkaInputSpecKey(function, topic) + spec, ok := inputSpecs[key].(map[string]interface{}) + if !ok { + spec = map[string]interface{}{} + inputSpecs[key] = spec + } + spec["kafka_schema"] = makeKafkaSchema(function.Spec.Input.SourceSpecs[topic].SchemaType, &schemaConfig) + } + return inputSpecs +} + +func makeKafkaInputSpec(function *v1alpha1.Function, topic string, isRegex bool) map[string]interface{} { + schemaType := function.Spec.Input.TypeClassName + if schemaConf, ok := function.Spec.Input.SourceSpecs[topic]; ok && schemaConf.SchemaType != "" { + schemaType = schemaConf.SchemaType + } + return map[string]interface{}{ + "is_regex_pattern": isRegex, + "kafka_schema": makeKafkaSchema(schemaType, nil), + } +} + +func makeKafkaInputSpecFromConsumerConfig(function *v1alpha1.Function, topic string, conf v1alpha1.ConsumerConfig) map[string]interface{} { + return map[string]interface{}{ + "is_regex_pattern": conf.IsRegexPattern, + "kafka_schema": makeKafkaSchema(conf.SchemaType, kafkaInputSchemaConfig(function, topic)), + } +} + +func makeKafkaOutputSpecs(function *v1alpha1.Function) map[string]interface{} { + if function.Spec.Kafka == nil || function.Spec.Output.Topic == "" { + return nil + } + return map[string]interface{}{ + function.Spec.Output.Topic: map[string]interface{}{ + "kafka_schema": makeKafkaSchema(function.Spec.Output.SinkSchemaType, function.Spec.Kafka.OutputSchemaConfig), + }, + } +} + +func makeKafkaSchema(defaultType string, schemaConfig *v1alpha1.KafkaSchemaConfig) map[string]interface{} { + schemaType := defaultType + if schemaType == "" { + schemaType = "bytes" + } + schema := map[string]interface{}{"type": schemaType} + if schemaConfig == nil { + return schema + } + if schemaConfig.Subject != nil { + schema["subject"] = *schemaConfig.Subject + } + if schemaConfig.Type != nil { + schema["type"] = *schemaConfig.Type + } + if schemaConfig.Version != nil { + schema["version"] = *schemaConfig.Version + } + return schema +} + +func kafkaInputSchemaConfig(function *v1alpha1.Function, topic string) *v1alpha1.KafkaSchemaConfig { + if function.Spec.Kafka == nil { + return nil + } + if schemaConfig, ok := function.Spec.Kafka.InputSchemaConfigs[topic]; ok { + return &schemaConfig + } + return nil +} + +func kafkaInputSpecKey(function *v1alpha1.Function, topic string) string { + if function.Spec.Input.TopicPattern != "" && topic == function.Spec.Input.TopicPattern { + return kafkaTopicPattern(topic) + } + return topic +} + +func kafkaTopicPattern(pattern string) string { + if strings.HasPrefix(pattern, "^") { + return pattern + } + return "^" + pattern +} + func extractConnectorConfigs(config *v1alpha1.Config, reservedKeys map[string]struct{}) map[string]interface{} { if config == nil || config.Data == nil { return nil diff --git a/controllers/spec/utils_test.go b/controllers/spec/utils_test.go index 087c9a281..c5f900fad 100644 --- a/controllers/spec/utils_test.go +++ b/controllers/spec/utils_test.go @@ -18,6 +18,7 @@ package spec import ( + "encoding/json" "testing" "github.com/streamnative/function-mesh/api/compute/v1alpha1" @@ -150,6 +151,60 @@ func TestGenerateFunctionOutputSpecWithConnector(t *testing.T) { assert.Equal(t, `{"bootstrapServers":"kafka:9092","sinkType":"kafka","topic":"kafka-output"}`, sinkSpec.Configs) } +func TestConvertFunctionDetailsWithKafkaConfig(t *testing.T) { + function := makeFunctionSample("generic-kafka") + function.Spec.Runtime = v1alpha1.Runtime{ + GenericRuntime: &v1alpha1.GenericRuntime{ + FunctionFile: "/pulsar/function.py", + Language: "python", + }, + } + function.Spec.Input = v1alpha1.InputConf{ + Topics: []string{"orders"}, + } + function.Spec.Output = v1alpha1.OutputConf{ + Topic: "enriched-orders", + } + function.Spec.Kafka = &v1alpha1.KafkaMessaging{ + BootstrapServers: "kafka:9092", + ConsumerConfig: &v1alpha1.Config{ + Data: map[string]interface{}{ + "auto.offset.reset": "earliest", + }, + }, + ProducerConfig: &v1alpha1.Config{ + Data: map[string]interface{}{ + "linger.ms": 5, + }, + }, + InputSchemaConfigs: map[string]v1alpha1.KafkaSchemaConfig{ + "orders": { + Type: stringPtr("json"), + }, + }, + OutputSchemaConfig: &v1alpha1.KafkaSchemaConfig{ + Type: stringPtr("json"), + }, + } + + details := convertFunctionDetails(function) + userConfig := map[string]interface{}{} + assert.NoError(t, json.Unmarshal([]byte(details.UserConfig), &userConfig)) + kafkaConfig := userConfig["_kafka_config"].(map[string]interface{}) + assert.Equal(t, "kafka", kafkaConfig["messaging_type"]) + assert.Equal(t, "kafka:9092", kafkaConfig["consumer_config"].(map[string]interface{})["bootstrap.servers"]) + assert.Equal(t, "PLAINTEXT", kafkaConfig["consumer_config"].(map[string]interface{})["security.protocol"]) + assert.Equal(t, "earliest", kafkaConfig["consumer_config"].(map[string]interface{})["auto.offset.reset"]) + assert.Equal(t, "kafka:9092", kafkaConfig["producer_config"].(map[string]interface{})["bootstrap.servers"]) + assert.Equal(t, float64(5), kafkaConfig["producer_config"].(map[string]interface{})["linger.ms"]) + assert.Equal(t, "json", kafkaConfig["input_specs"].(map[string]interface{})["orders"].(map[string]interface{})["kafka_schema"].(map[string]interface{})["type"]) + assert.Equal(t, "json", kafkaConfig["output_specs"].(map[string]interface{})["enriched-orders"].(map[string]interface{})["kafka_schema"].(map[string]interface{})["type"]) +} + +func stringPtr(value string) *string { + return &value +} + func TestBuildSourceConnectorDetailsFromConfig(t *testing.T) { connectorConfig := v1alpha1.NewConfig(map[string]interface{}{ "archive": "builtin://filesystem", diff --git a/operator.Dockerfile b/operator.Dockerfile index ec1ab0138..7257c78fc 100644 --- a/operator.Dockerfile +++ b/operator.Dockerfile @@ -17,7 +17,7 @@ COPY pkg/ pkg/ COPY controllers/ controllers/ COPY utils/ utils/ -RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} GO111MODULE=on go build -a -trimpath -o /workspace/manager main.go +RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} GO111MODULE=on go build -p=2 -trimpath -o /workspace/manager main.go FROM alpine:3.21 diff --git a/pkg/webhook/function_webhook.go b/pkg/webhook/function_webhook.go index 4cd874b90..3f77d500e 100644 --- a/pkg/webhook/function_webhook.go +++ b/pkg/webhook/function_webhook.go @@ -200,10 +200,20 @@ func (webhook *FunctionWebhook) ValidateCreate(ctx context.Context, obj runtime. "runtime cannot be empty")) } - if (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Python != nil) || - (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Golang != nil) || - (r.Spec.Runtime.Python != nil && r.Spec.Runtime.Golang != nil) || - (r.Spec.Runtime.Java != nil && r.Spec.Runtime.Python != nil && r.Spec.Runtime.Golang != nil) { + runtimeCount := 0 + if r.Spec.Runtime.Java != nil { + runtimeCount++ + } + if r.Spec.Runtime.Python != nil { + runtimeCount++ + } + if r.Spec.Runtime.Golang != nil { + runtimeCount++ + } + if r.Spec.Runtime.GenericRuntime != nil { + runtimeCount++ + } + if runtimeCount > 1 { allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("runtime"), r.Spec.Runtime, "you can only specify one runtime")) } @@ -283,7 +293,11 @@ func (webhook *FunctionWebhook) ValidateCreate(ctx context.Context, obj runtime. skipInputValidation := r.Spec.SourceConfig != nil skipOutputValidation := r.Spec.SinkConfig != nil - fieldErrs = validateInputOutput(&r.Spec.Input, &r.Spec.Output, skipInputValidation, skipOutputValidation) + if r.Spec.Kafka != nil { + fieldErrs = validateKafkaInputOutput(&r.Spec.Input, &r.Spec.Output, skipInputValidation, skipOutputValidation) + } else { + fieldErrs = validateInputOutput(&r.Spec.Input, &r.Spec.Output, skipInputValidation, skipOutputValidation) + } if len(fieldErrs) > 0 { allErrs = append(allErrs, fieldErrs...) } @@ -308,7 +322,11 @@ func (webhook *FunctionWebhook) ValidateCreate(ctx context.Context, obj runtime. allErrs = append(allErrs, fieldErr) } - fieldErr = validateMessaging(&r.Spec.Messaging) + fieldErr = validateFunctionMessaging(&r.Spec) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } + fieldErr = validateKafkaMessagingRuntime(r.Spec.Runtime, r.Spec.Kafka) if fieldErr != nil { allErrs = append(allErrs, fieldErr) } diff --git a/pkg/webhook/kafka_webhook_test.go b/pkg/webhook/kafka_webhook_test.go new file mode 100644 index 000000000..0fee82242 --- /dev/null +++ b/pkg/webhook/kafka_webhook_test.go @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package webhook + +import ( + "context" + "strings" + "testing" + + admissionv1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + "github.com/streamnative/function-mesh/api/compute/v1alpha1" +) + +func TestSourceWebhookValidateUpdateRejectsKafkaMessaging(t *testing.T) { + ctx := admission.NewContextWithRequest(context.Background(), admission.Request{ + AdmissionRequest: admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{Kind: sourceKind}, + }, + }) + + _, err := (&SourceWebhook{}).ValidateUpdate(ctx, &v1alpha1.Source{}, &v1alpha1.Source{ + ObjectMeta: metav1.ObjectMeta{Name: "source-kafka"}, + Spec: v1alpha1.SourceSpec{ + Messaging: v1alpha1.Messaging{ + Kafka: &v1alpha1.KafkaMessaging{BootstrapServers: "kafka:9092"}, + }, + }, + }) + if err == nil || !strings.Contains(err.Error(), "source does not support kafka messaging") { + t.Fatalf("expected source kafka unsupported error, got %v", err) + } +} + +func TestSinkWebhookValidateUpdateRejectsKafkaMessaging(t *testing.T) { + ctx := admission.NewContextWithRequest(context.Background(), admission.Request{ + AdmissionRequest: admissionv1.AdmissionRequest{ + Kind: metav1.GroupVersionKind{Kind: sinkKind}, + }, + }) + + _, err := (&SinkWebhook{}).ValidateUpdate(ctx, &v1alpha1.Sink{}, &v1alpha1.Sink{ + ObjectMeta: metav1.ObjectMeta{Name: "sink-kafka"}, + Spec: v1alpha1.SinkSpec{ + Messaging: v1alpha1.Messaging{ + Kafka: &v1alpha1.KafkaMessaging{BootstrapServers: "kafka:9092"}, + }, + }, + }) + if err == nil || !strings.Contains(err.Error(), "sink does not support kafka messaging") { + t.Fatalf("expected sink kafka unsupported error, got %v", err) + } +} diff --git a/pkg/webhook/sink_webhook.go b/pkg/webhook/sink_webhook.go index 2998a585c..a395ce8b4 100644 --- a/pkg/webhook/sink_webhook.go +++ b/pkg/webhook/sink_webhook.go @@ -233,6 +233,10 @@ func (webhook *SinkWebhook) ValidateCreate(ctx context.Context, obj runtime.Obje if fieldErr != nil { allErrs = append(allErrs, fieldErr) } + fieldErr = validateKafkaMessagingUnsupported("sink", &r.Spec.Messaging) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } fieldErr = validatePulsarPackageService(r.Spec.PulsarPackageService) if fieldErr != nil { allErrs = append(allErrs, fieldErr) @@ -255,10 +259,15 @@ func (webhook *SinkWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj r return nil, fmt.Errorf("expected Kind %q got %q", sinkKind, req.Kind.Kind) } - r := oldObj.(*v1alpha1.Sink) //nolint:ifshort + r := newObj.(*v1alpha1.Sink) //nolint:ifshort sinklog.Info("validate update", "name", r.Name) - // TODO(user): fill in your validation logic upon object update. + fieldErr := validateKafkaMessagingUnsupported("sink", &r.Spec.Messaging) + if fieldErr != nil { + return nil, apierrors.NewInvalid(schema.GroupKind{Group: "compute.functionmesh.io", Kind: "SinkWebhook"}, + r.Name, field.ErrorList{fieldErr}) + } + return nil, nil } diff --git a/pkg/webhook/source_webhook.go b/pkg/webhook/source_webhook.go index 712719887..5d999a3bf 100644 --- a/pkg/webhook/source_webhook.go +++ b/pkg/webhook/source_webhook.go @@ -223,6 +223,10 @@ func (webhook *SourceWebhook) ValidateCreate(ctx context.Context, obj runtime.Ob if fieldErr != nil { allErrs = append(allErrs, fieldErr) } + fieldErr = validateKafkaMessagingUnsupported("source", &r.Spec.Messaging) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } fieldErr = validatePulsarPackageService(r.Spec.PulsarPackageService) if fieldErr != nil { allErrs = append(allErrs, fieldErr) @@ -245,10 +249,15 @@ func (webhook *SourceWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj return nil, fmt.Errorf("expected Kind %q got %q", sourceKind, req.Kind.Kind) } - r := oldObj.(*v1alpha1.Source) //nolint:ifshort + r := newObj.(*v1alpha1.Source) //nolint:ifshort sourcelog.Info("validate update", "name", r.Name) - // TODO(user): fill in your validation logic upon object update. + fieldErr := validateKafkaMessagingUnsupported("source", &r.Spec.Messaging) + if fieldErr != nil { + return nil, apierrors.NewInvalid(schema.GroupKind{Group: "compute.functionmesh.io", Kind: "SourceWebhook"}, + r.Name, field.ErrorList{fieldErr}) + } + return nil, nil } diff --git a/pkg/webhook/validate.go b/pkg/webhook/validate.go index ed4495c03..c9c470f5f 100644 --- a/pkg/webhook/validate.go +++ b/pkg/webhook/validate.go @@ -307,6 +307,49 @@ func validateInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf, return allErrs } +func validateKafkaInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf, + skipInputValidation bool, skipOutputValidation bool) []*field.Error { + var allErrs field.ErrorList + allInputTopics := []string{} + if input != nil { + allInputTopics = collectAllInputTopics(*input) + if !skipInputValidation { + if len(allInputTopics) == 0 { + e := field.Invalid(field.NewPath("spec").Child("input"), *input, + "No input topic(s) specified for the function") + allErrs = append(allErrs, e) + } + + for topicName, conf := range input.SourceSpecs { + if conf.ReceiverQueueSize != nil && *conf.ReceiverQueueSize < 0 { + e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), + input.SourceSpecs, fmt.Sprintf("%s receiver queue size should be >= zero", topicName)) + allErrs = append(allErrs, e) + } + + if conf.CryptoConfig != nil && conf.CryptoConfig.CryptoKeyReaderClassName == "" { + e := field.Invalid(field.NewPath("spec").Child("input", "sourceSpecs"), + input.SourceSpecs, fmt.Sprintf("%s cryptoKeyReader class name required", topicName)) + allErrs = append(allErrs, e) + } + } + } + } + + if output != nil && !skipOutputValidation && output.Topic != "" { + for _, v := range allInputTopics { + if v == output.Topic { + e := field.Invalid(field.NewPath("spec").Child("output", "topic"), output.Topic, + fmt.Sprintf("Output topic %s is also being used as an input topic (topics must be one or the other)", + output.Topic)) + allErrs = append(allErrs, e) + } + } + } + + return allErrs +} + func validateLogTopic(logTopic string) *field.Error { if logTopic != "" { err := isValidTopicName(logTopic) @@ -397,10 +440,48 @@ func validateWindowConfigs(windowConfig *v1alpha1.WindowConfig) *field.Error { } func validateMessaging(messaging *v1alpha1.Messaging) *field.Error { - if messaging == nil || messaging.Pulsar == nil || messaging.Pulsar.PulsarConfig == "" { + if messaging == nil { return field.Invalid(field.NewPath("spec").Child("pulsar"), messaging, "Pulsar configuration needs to be set") } + if messaging.Pulsar == nil || messaging.Pulsar.PulsarConfig == "" { + return field.Invalid(field.NewPath("spec").Child("pulsar"), messaging, + "Pulsar configuration needs to be set") + } + return nil +} + +func validateFunctionMessaging(spec *v1alpha1.FunctionSpec) *field.Error { + if spec.Kafka != nil { + if spec.Pulsar != nil { + return field.Invalid(field.NewPath("spec"), spec.Messaging, + "only one messaging service can be set") + } + if spec.Kafka.BootstrapServers == "" { + return field.Invalid(field.NewPath("spec").Child("kafka", "bootstrapServers"), + spec.Kafka.BootstrapServers, "kafka.bootstrapServers needs to be set") + } + return nil + } + return validateMessaging(&spec.Messaging) +} + +func validateKafkaMessagingUnsupported(component string, messaging *v1alpha1.Messaging) *field.Error { + if messaging == nil || messaging.Kafka == nil { + return nil + } + return field.Invalid(field.NewPath("spec").Child("kafka"), messaging.Kafka, + fmt.Sprintf("%s does not support kafka messaging", component)) +} + +func validateKafkaMessagingRuntime(runtime v1alpha1.Runtime, kafka *v1alpha1.KafkaMessaging) *field.Error { + if kafka == nil { + return nil + } + if runtime.GenericRuntime == nil { + return field.Invalid(field.NewPath("spec").Child("kafka"), kafka, + "only genericRuntime supports kafka messaging") + } return nil } diff --git a/pkg/webhook/validate_test.go b/pkg/webhook/validate_test.go new file mode 100644 index 000000000..a8cd9d726 --- /dev/null +++ b/pkg/webhook/validate_test.go @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package webhook + +import ( + "strings" + "testing" + + "github.com/streamnative/function-mesh/api/compute/v1alpha1" +) + +func TestValidateFunctionMessagingAllowsKafka(t *testing.T) { + err := validateFunctionMessaging(&v1alpha1.FunctionSpec{ + Messaging: v1alpha1.Messaging{ + Kafka: &v1alpha1.KafkaMessaging{BootstrapServers: "kafka:9092"}, + }, + }) + if err != nil { + t.Fatalf("expected kafka messaging to be valid, got %v", err) + } +} + +func TestValidateFunctionMessagingRejectsMissingKafkaBootstrapServers(t *testing.T) { + err := validateFunctionMessaging(&v1alpha1.FunctionSpec{ + Messaging: v1alpha1.Messaging{ + Kafka: &v1alpha1.KafkaMessaging{}, + }, + }) + if err == nil || !strings.Contains(err.Error(), "kafka.bootstrapServers needs to be set") { + t.Fatalf("expected missing kafka bootstrapServers error, got %v", err) + } +} + +func TestValidateKafkaMessagingUnsupportedRejectsSourceAndSink(t *testing.T) { + messaging := &v1alpha1.Messaging{ + Pulsar: &v1alpha1.PulsarMessaging{PulsarConfig: "pulsar-config"}, + Kafka: &v1alpha1.KafkaMessaging{BootstrapServers: "kafka:9092"}, + } + + for _, component := range []string{"source", "sink"} { + err := validateKafkaMessagingUnsupported(component, messaging) + if err == nil || !strings.Contains(err.Error(), component+" does not support kafka messaging") { + t.Fatalf("expected %s kafka unsupported error, got %v", component, err) + } + } +} + +func TestValidateKafkaMessagingRuntimeRequiresGenericRuntime(t *testing.T) { + err := validateKafkaMessagingRuntime(v1alpha1.Runtime{ + Java: &v1alpha1.JavaRuntime{Jar: "function.jar"}, + }, &v1alpha1.KafkaMessaging{BootstrapServers: "kafka:9092"}) + if err == nil || !strings.Contains(err.Error(), "only genericRuntime supports kafka messaging") { + t.Fatalf("expected genericRuntime-only error, got %v", err) + } +} + +func TestValidateKafkaInputOutputAllowsKafkaTopicNames(t *testing.T) { + errs := validateKafkaInputOutput(&v1alpha1.InputConf{ + Topics: []string{"orders"}, + }, &v1alpha1.OutputConf{ + Topic: "enriched-orders", + }, false, false) + if len(errs) > 0 { + t.Fatalf("expected kafka topic names to be valid, got %v", errs) + } +} diff --git a/redhat.Dockerfile b/redhat.Dockerfile index a7cdcbb55..51b08700f 100644 --- a/redhat.Dockerfile +++ b/redhat.Dockerfile @@ -23,7 +23,7 @@ COPY controllers/ controllers/ COPY utils/ utils/ # Build -RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} GO111MODULE=on go build -a -o manager main.go +RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} GO111MODULE=on go build -p=2 -o manager main.go # Use ubi image as the base image which is required by the red hat certification. # Base on the image size, the order is ubi > ubi-minimal > ubi-micro.