From e0a503a7a0ffd6cd35f136ff36d1ee264ee82fd9 Mon Sep 17 00:00:00 2001 From: "rw-codebundle-agent[bot]" Date: Tue, 21 Apr 2026 13:09:23 +0000 Subject: [PATCH] Add k8s-airflow-workload-diagnostics CodeBundle Implements read-only diagnostics for Apache Airflow on Kubernetes: workload replica status, pod health and restarts, Warning events, PVC phases, scheduler log patterns, and executor pod saturation. Includes sli.robot with a 0-1 health score, generation rules for namespace discovery, and a minimal .test manifest for RunWhen Local workflows. Made-with: Cursor --- .../k8s-airflow-workload-diagnostics.yaml | 21 ++ .../k8s-airflow-workload-diagnostics-sli.yaml | 54 +++ .../k8s-airflow-workload-diagnostics-slx.yaml | 25 ++ ...-airflow-workload-diagnostics-taskset.yaml | 43 +++ .../.test/Taskfile.yaml | 71 ++++ .../.test/kubernetes/manifest.yaml | 7 + .../README.md | 65 ++++ .../check-airflow-executor-pods.sh | 51 +++ .../check-airflow-pod-health.sh | 65 ++++ .../fetch-airflow-events.sh | 68 ++++ .../list-airflow-workloads.sh | 75 ++++ .../runbook.robot | 326 ++++++++++++++++++ .../sample-airflow-scheduler-logs.sh | 73 ++++ .../sli-airflow-health.sh | 60 ++++ .../sli.robot | 113 ++++++ .../summarize-airflow-pvcs.sh | 56 +++ 16 files changed, 1173 insertions(+) create mode 100644 codebundles/k8s-airflow-workload-diagnostics/.runwhen/generation-rules/k8s-airflow-workload-diagnostics.yaml create mode 100644 codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-sli.yaml create mode 100644 codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-slx.yaml create mode 100644 codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-taskset.yaml create mode 100644 codebundles/k8s-airflow-workload-diagnostics/.test/Taskfile.yaml create mode 100644 codebundles/k8s-airflow-workload-diagnostics/.test/kubernetes/manifest.yaml create mode 100644 codebundles/k8s-airflow-workload-diagnostics/README.md create mode 100755 codebundles/k8s-airflow-workload-diagnostics/check-airflow-executor-pods.sh create mode 100755 codebundles/k8s-airflow-workload-diagnostics/check-airflow-pod-health.sh create mode 100755 codebundles/k8s-airflow-workload-diagnostics/fetch-airflow-events.sh create mode 100755 codebundles/k8s-airflow-workload-diagnostics/list-airflow-workloads.sh create mode 100644 codebundles/k8s-airflow-workload-diagnostics/runbook.robot create mode 100755 codebundles/k8s-airflow-workload-diagnostics/sample-airflow-scheduler-logs.sh create mode 100755 codebundles/k8s-airflow-workload-diagnostics/sli-airflow-health.sh create mode 100644 codebundles/k8s-airflow-workload-diagnostics/sli.robot create mode 100755 codebundles/k8s-airflow-workload-diagnostics/summarize-airflow-pvcs.sh diff --git a/codebundles/k8s-airflow-workload-diagnostics/.runwhen/generation-rules/k8s-airflow-workload-diagnostics.yaml b/codebundles/k8s-airflow-workload-diagnostics/.runwhen/generation-rules/k8s-airflow-workload-diagnostics.yaml new file mode 100644 index 00000000..e71f5bca --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/.runwhen/generation-rules/k8s-airflow-workload-diagnostics.yaml @@ -0,0 +1,21 @@ +apiVersion: runwhen.com/v1 +kind: GenerationRules +spec: + generationRules: + - resourceTypes: + - namespace + matchRules: + - type: pattern + pattern: ".+" + properties: [name] + mode: substring + slxs: + - baseName: af-wl-diag + qualifiers: ["namespace", "cluster"] + baseTemplateName: k8s-airflow-workload-diagnostics + levelOfDetail: basic + outputItems: + - type: slx + - type: sli + - type: runbook + templateName: k8s-airflow-workload-diagnostics-taskset.yaml diff --git a/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-sli.yaml b/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-sli.yaml new file mode 100644 index 00000000..4809f76d --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-sli.yaml @@ -0,0 +1,54 @@ +apiVersion: runwhen.com/v1 +kind: ServiceLevelIndicator +metadata: + name: {{slx_name}} + labels: + {% include "common-labels.yaml" %} + annotations: + {% include "common-annotations.yaml" %} +spec: + displayUnitsLong: OK + displayUnitsShort: ok + locations: + - {{default_location}} + description: Measures Airflow workload health via workload readiness, pod readiness, and Warning event volume in the namespace. + codeBundle: + {% if repo_url %} + repoUrl: {{repo_url}} + {% else %} + repoUrl: https://github.com/runwhen-contrib/rw-cli-codecollection.git + {% endif %} + {% if ref %} + ref: {{ref}} + {% else %} + ref: main + {% endif %} + pathToRobot: codebundles/k8s-airflow-workload-diagnostics/sli.robot + intervalStrategy: intermezzo + intervalSeconds: 300 + configProvided: + - name: KUBERNETES_DISTRIBUTION_BINARY + value: {{custom.kubernetes_distribution_binary | default("kubectl")}} + - name: NAMESPACE + value: {{match_resource.resource.metadata.name}} + - name: CONTEXT + value: "{{context}}" + - name: AIRFLOW_LABEL_SELECTOR + value: "{{custom.airflow_label_selector | default('app.kubernetes.io/name=airflow')}}" + - name: AIRFLOW_DEPLOYMENT_NAME_PREFIX + value: "{{custom.airflow_deployment_name_prefix | default('airflow')}}" + - name: RW_LOOKBACK_WINDOW + value: "{{custom.rw_lookback_window | default('1h')}}" + - name: AIRFLOW_SLI_EVENT_THRESHOLD + value: "{{custom.airflow_sli_event_threshold | default('8')}}" + secretsProvided: + {% if wb_version %} + {% include "kubernetes-auth.yaml" ignore missing %} + {% else %} + - name: kubeconfig + workspaceKey: {{custom.kubeconfig_secret_name | default("kubeconfig")}} + {% endif %} + alertConfig: + tasks: + persona: eager-edgar + sessionTTL: 10m diff --git a/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-slx.yaml b/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-slx.yaml new file mode 100644 index 00000000..c368e777 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-slx.yaml @@ -0,0 +1,25 @@ +apiVersion: runwhen.com/v1 +kind: ServiceLevelX +metadata: + name: {{slx_name}} + labels: + {% include "common-labels.yaml" %} + annotations: + {% include "common-annotations.yaml" %} +spec: + imageURL: https://storage.googleapis.com/runwhen-nonprod-shared-images/icons/kubernetes/resources/labeled/deploy.svg + alias: {{namespace.name}} Airflow Workload Diagnostics + asMeasuredBy: Aggregate SLI over workload readiness, pod readiness, and Warning event volume for Airflow-labeled resources. + configProvided: + - name: NAMESPACE + value: {{match_resource.resource.metadata.name}} + owners: + - {{workspace.owner_email}} + statement: Airflow controllers and pods in this namespace should be ready and free of excessive Warning events. + additionalContext: + {% include "kubernetes-hierarchy.yaml" ignore missing %} + qualified_name: "{{ match_resource.qualified_name }}" + tags: + {% include "kubernetes-tags.yaml" ignore missing %} + - name: access + value: read-only diff --git a/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-taskset.yaml b/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-taskset.yaml new file mode 100644 index 00000000..1c5027e3 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/.runwhen/templates/k8s-airflow-workload-diagnostics-taskset.yaml @@ -0,0 +1,43 @@ +apiVersion: runwhen.com/v1 +kind: Runbook +metadata: + name: {{slx_name}} + labels: + {% include "common-labels.yaml" %} + annotations: + {% include "common-annotations.yaml" %} +spec: + location: {{default_location}} + description: Diagnoses Apache Airflow workloads in Kubernetes for replica health, pods, events, PVCs, scheduler logs, and executor saturation. + codeBundle: + {% if repo_url %} + repoUrl: {{repo_url}} + {% else %} + repoUrl: https://github.com/runwhen-contrib/rw-cli-codecollection.git + {% endif %} + {% if ref %} + ref: {{ref}} + {% else %} + ref: main + {% endif %} + pathToRobot: codebundles/k8s-airflow-workload-diagnostics/runbook.robot + configProvided: + - name: KUBERNETES_DISTRIBUTION_BINARY + value: {{custom.kubernetes_distribution_binary | default("kubectl")}} + - name: NAMESPACE + value: {{match_resource.resource.metadata.name}} + - name: CONTEXT + value: "{{context}}" + - name: AIRFLOW_LABEL_SELECTOR + value: "{{custom.airflow_label_selector | default('app.kubernetes.io/name=airflow')}}" + - name: AIRFLOW_DEPLOYMENT_NAME_PREFIX + value: "{{custom.airflow_deployment_name_prefix | default('airflow')}}" + - name: RW_LOOKBACK_WINDOW + value: "{{custom.rw_lookback_window | default('1h')}}" + secretsProvided: + {% if wb_version %} + {% include "kubernetes-auth.yaml" ignore missing %} + {% else %} + - name: kubeconfig + workspaceKey: {{custom.kubeconfig_secret_name | default("kubeconfig")}} + {% endif %} diff --git a/codebundles/k8s-airflow-workload-diagnostics/.test/Taskfile.yaml b/codebundles/k8s-airflow-workload-diagnostics/.test/Taskfile.yaml new file mode 100644 index 00000000..272ab965 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/.test/Taskfile.yaml @@ -0,0 +1,71 @@ +version: "3" + +tasks: + default: + desc: "Run/refresh RunWhen Local config" + cmds: + - task: generate-rwl-config + + clean: + desc: "Run cleanup tasks" + cmds: + - task: clean-rwl-discovery + + build-infra: + desc: "Build test infrastructure" + cmds: + - task: create-kubernetes-objects + + create-kubernetes-objects: + desc: "Apply manifests from kubernetes directory using kubectl" + cmds: + - kubectl apply -f kubernetes/manifest.yaml + silent: true + + remove-kubernetes-objects: + desc: "Delete kubernetes objects" + cmds: + - kubectl delete -f kubernetes/manifest.yaml + silent: true + + generate-rwl-config: + desc: "Generate RunWhen Local configuration (workspaceInfo.yaml)" + env: + RW_WORKSPACE: '{{.RW_WORKSPACE | default "my-workspace"}}' + cmds: + - | + repo_url=$(git config --get remote.origin.url) + branch_name=$(git rev-parse --abbrev-ref HEAD) + codebundle=$(basename "$(dirname "$PWD")") + namespace=$(yq e 'select(.kind == "Namespace") | .metadata.name' kubernetes/manifest.yaml -N) + cat < workspaceInfo.yaml + workspaceName: "$RW_WORKSPACE" + workspaceOwnerEmail: authors@runwhen.com + defaultLocation: location-01 + defaultLOD: none + cloudConfig: + kubernetes: + kubeconfigFile: /shared/kubeconfig + namespaceLODs: + $namespace: detailed + namespaces: + - $namespace + codeCollections: + - repoURL: "$repo_url" + branch: "$branch_name" + codeBundles: ["$codebundle"] + custom: + kubeconfig_secret_name: "kubeconfig" + kubernetes_distribution_binary: kubectl + airflow_label_selector: "app.kubernetes.io/name=airflow" + airflow_deployment_name_prefix: "airflow" + rw_lookback_window: "1h" + EOF + silent: true + + clean-rwl-discovery: + desc: "Clean RunWhen Local discovery output" + cmds: + - rm -rf output + - rm -f workspaceInfo.yaml + silent: true diff --git a/codebundles/k8s-airflow-workload-diagnostics/.test/kubernetes/manifest.yaml b/codebundles/k8s-airflow-workload-diagnostics/.test/kubernetes/manifest.yaml new file mode 100644 index 00000000..43498666 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/.test/kubernetes/manifest.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: test-airflow-workload-diagnostics + labels: + app.kubernetes.io/name: airflow + environment: test diff --git a/codebundles/k8s-airflow-workload-diagnostics/README.md b/codebundles/k8s-airflow-workload-diagnostics/README.md new file mode 100644 index 00000000..c7213e70 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/README.md @@ -0,0 +1,65 @@ +# Kubernetes Airflow Workload Diagnostics + +This CodeBundle collects Kubernetes-centric health signals for Apache Airflow installations: workload controllers (webserver, scheduler, workers, triggerer), pod readiness and restarts, recent Warning events, PVCs for logs and DAGs, targeted scheduler log excerpts, and executor-style pod status. All tasks are read-only and do not trigger DAG runs or mutate workloads. + +## Overview + +- **Workload controllers**: Lists Deployments, StatefulSets, and DaemonSets that match the Airflow label selector or name prefix and compares desired versus ready replicas. +- **Pod health**: Checks Airflow-labeled pods for phase, Ready condition, restart counts, and recent termination reasons (for example OOMKilled). +- **Events**: Surfaces Warning events in the lookback window for Airflow-related object names. +- **Storage**: Summarizes PVCs tied to Airflow pods or common volume name patterns and flags non-Bound phases. +- **Scheduler logs**: Samples scheduler pod logs for DAG import errors and database connectivity hints. +- **Executors**: Best-effort summary of worker or executor-related pods that are Pending or have OOM terminations. +- **SLI**: Publishes a 0–1 health score from workload readiness, pod readiness, and Warning event volume (see `sli.robot`). + +## Configuration + +### Required variables + +- `CONTEXT`: Kubernetes context to use. +- `NAMESPACE`: Namespace that contains the Airflow release. + +### Optional variables + +- `AIRFLOW_LABEL_SELECTOR`: Label selector for Airflow workloads (default: `app.kubernetes.io/name=airflow`). +- `AIRFLOW_DEPLOYMENT_NAME_PREFIX`: Extra name prefix used when labels are inconsistent (default: `airflow`). +- `RW_LOOKBACK_WINDOW`: Time window for events and log sampling, for example `30m` or `1h` (default: `1h`). +- `KUBERNETES_DISTRIBUTION_BINARY`: `kubectl` or `oc` (default: `kubectl`). + +### SLI-only optional variables + +- `AIRFLOW_SLI_EVENT_THRESHOLD`: Maximum number of Warning events in the lookback window before the events sub-score fails (default: `8`). + +### Bash script defaults (not imported in `runbook.robot`) + +- `AIRFLOW_RESTART_WARN_THRESHOLD`: Total container restart count above which the pod health task raises a warning (default: `10`). + +### Secrets + +- `kubeconfig`: Standard kubeconfig with read-only `get`, `list`, `describe`, and `logs` on workloads and events in the target namespace. + +## Tasks overview + +### List Airflow Workloads in Namespace + +Discovers Deployments, StatefulSets, and DaemonSets via the label selector and optional name prefix merge; raises issues when ready replicas are below desired counts. + +### Check Airflow Pod Health and Restarts in Namespace + +Evaluates Airflow-labeled pods for phase, Ready condition, high restart counts, and recent container termination reasons. + +### Fetch Recent Events for Airflow Resources in Namespace + +Collects Warning events since the lookback cutoff for involved objects related to Airflow naming or workloads. + +### Summarize PVC Status for Airflow Data Volumes in Namespace + +Lists PVCs referenced by Airflow pods or matching common DAGs, logs, or plugins name patterns; issues when a PVC is not Bound. + +### Sample Scheduler Logs for DAG Import Errors in Namespace + +Tails recent scheduler logs and flags traceback, import, or database connectivity patterns. + +### Check Worker or KubernetesExecutor Pod Saturation in Namespace + +Surfaces Pending executor-related pods and OOMKilled containers when Celery or executor components are present. diff --git a/codebundles/k8s-airflow-workload-diagnostics/check-airflow-executor-pods.sh b/codebundles/k8s-airflow-workload-diagnostics/check-airflow-executor-pods.sh new file mode 100755 index 00000000..adec2868 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/check-airflow-executor-pods.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Summarizes Celery/Kubernetes executor related pods: pending reasons and resource hints from describe. +set -euo pipefail +set -x + +: "${CONTEXT:?}" "${NAMESPACE:?}" + +OUTPUT_FILE="${OUTPUT_FILE:-check_airflow_executor_pods_issues.json}" +KUBECTL="${KUBERNETES_DISTRIBUTION_BINARY:-kubectl}" +LABEL_SEL="${AIRFLOW_LABEL_SELECTOR:-app.kubernetes.io/name=airflow}" + +if ! pods_json=$("${KUBECTL}" get pods -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL}" -o json 2>/dev/null); then + echo '[{"title":"Cannot list Airflow pods","details":"kubectl get pods failed","severity":4,"next_steps":"Verify RBAC."}]' | jq . > "$OUTPUT_FILE" + exit 0 +fi + +executor_json=$(echo "$pods_json" | jq '[.items[]? | select( + (.metadata.name | test("worker|celery|kubernetes|executor"; "i")) or + (.metadata.labels["app.kubernetes.io/component"]? // "" | test("worker|celery"; "i")) +)]') + +issues_json=$(echo "$executor_json" | jq --arg ns "$NAMESPACE" ' + [ .[]? | + .metadata.name as $n | + (.status.phase // "") as $ph | + (if $ph == "Pending" then + [{ + "title": ("Executor-related pod `" + $n + "` Pending in `" + $ns + "`"), + "details": ((.status.conditions // []) | map(.message // "") | join("; ")), + "severity": 3, + "next_steps": "Describe the pod for scheduling and volume mount errors; check cluster capacity." + }] + else [] end) + + ([.status.containerStatuses[]? | + .name as $c | + (.lastState.terminated.reason // "") as $reason | + select($reason == "OOMKilled") | + { + "title": ("OOMKilled in executor pod `" + $n + "` container `" + $c + "`"), + "details": "Last termination: OOMKilled", + "severity": 4, + "next_steps": "Raise memory limits or reduce task concurrency for workers." + } + ]) + ] | flatten +') + +echo "$issues_json" > "$OUTPUT_FILE" + +echo "Executor-related pods:" +echo "$executor_json" | jq -r '.[] | [.metadata.name, .status.phase] | @tsv' || true diff --git a/codebundles/k8s-airflow-workload-diagnostics/check-airflow-pod-health.sh b/codebundles/k8s-airflow-workload-diagnostics/check-airflow-pod-health.sh new file mode 100755 index 00000000..8c838bbe --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/check-airflow-pod-health.sh @@ -0,0 +1,65 @@ +#!/usr/bin/env bash +# Evaluates Airflow-labeled pods for phase, Ready condition, restarts, and termination reasons. +set -euo pipefail +set -x + +: "${CONTEXT:?}" "${NAMESPACE:?}" + +OUTPUT_FILE="${OUTPUT_FILE:-check_airflow_pod_health_issues.json}" +KUBECTL="${KUBERNETES_DISTRIBUTION_BINARY:-kubectl}" +LABEL_SEL="${AIRFLOW_LABEL_SELECTOR:-app.kubernetes.io/name=airflow}" +RESTART_WARN="${AIRFLOW_RESTART_WARN_THRESHOLD:-10}" + +if ! pods_json=$("${KUBECTL}" get pods -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL}" -o json 2>/dev/null); then + echo '[{"title":"Cannot list Airflow pods","details":"kubectl get pods failed","severity":4,"next_steps":"Verify RBAC and label selector AIRFLOW_LABEL_SELECTOR."}]' | jq . > "$OUTPUT_FILE" + echo "kubectl get pods failed." + exit 0 +fi + +issues_json=$(echo "$pods_json" | jq --arg ns "$NAMESPACE" --arg rw "$RESTART_WARN" ' + [ .items[]? | + .metadata.name as $name | + (.status.phase // "") as $phase | + ([.status.conditions[]? | select(.type == "Ready") | .status] | first // "Unknown") as $ready | + ([.status.containerStatuses[]? | .restartCount // 0] | add) as $restarts | + (if ($phase != "Running" and $phase != "Succeeded") then + [{ + "title": ("Pod `" + $name + "` phase " + $phase + " in `" + $ns + "`"), + "details": ("Pod phase is " + $phase + " (expected Running for active workloads)."), + "severity": 3, + "next_steps": "Describe the pod and check scheduling, image pull, and init containers." + }] + else [] end) + + (if ($ready == "False" and $phase == "Running") then + [{ + "title": ("Pod `" + $name + "` not Ready in `" + $ns + "`"), + "details": "Ready condition is False while pod is Running.", + "severity": 3, + "next_steps": "Check readiness probes, failing containers, and recent events." + }] + else [] end) + + (if ($restarts > ($rw | tonumber)) then + [{ + "title": ("High restart count in pod `" + $name + "` in `" + $ns + "`"), + "details": ("Total container restarts: " + ($restarts | tostring)), + "severity": 2, + "next_steps": "Inspect container exit reasons and logs for crash loops." + }] + else [] end) + + ([.status.containerStatuses[]? | + .name as $cname | + (.lastState.terminated.reason // "") as $reason | + select($reason == "OOMKilled" or $reason == "Error" or $reason == "ContainerCannotRun") | + { + "title": ("Container `" + $cname + "` in `" + $name + "` terminated: " + $reason), + "details": ("Last termination reason: " + $reason), + "severity": (if $reason == "OOMKilled" then 4 else 3 end), + "next_steps": "Review memory limits and application logs for this container." + } + ]) + ] | flatten +') + +echo "$issues_json" > "$OUTPUT_FILE" + +echo "Checked $(echo "$pods_json" | jq '.items | length') pod(s) with label ${LABEL_SEL}." diff --git a/codebundles/k8s-airflow-workload-diagnostics/fetch-airflow-events.sh b/codebundles/k8s-airflow-workload-diagnostics/fetch-airflow-events.sh new file mode 100755 index 00000000..21c2cfe1 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/fetch-airflow-events.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +# Lists recent Warning events in the namespace relevant to Airflow workloads. +set -euo pipefail +set -x + +: "${CONTEXT:?}" "${NAMESPACE:?}" + +OUTPUT_FILE="${OUTPUT_FILE:-fetch_airflow_events_issues.json}" +KUBECTL="${KUBERNETES_DISTRIBUTION_BINARY:-kubectl}" +LABEL_SEL="${AIRFLOW_LABEL_SELECTOR:-app.kubernetes.io/name=airflow}" +PREFIX="${AIRFLOW_DEPLOYMENT_NAME_PREFIX:-airflow}" +LW="${RW_LOOKBACK_WINDOW:-1h}" + +if [[ "$LW" =~ ^([0-9]+)h$ ]]; then SEC=$((BASH_REMATCH[1] * 3600)) +elif [[ "$LW" =~ ^([0-9]+)m$ ]]; then SEC=$((BASH_REMATCH[1] * 60)) +elif [[ "$LW" =~ ^([0-9]+)s$ ]]; then SEC=$((BASH_REMATCH[1])) +else SEC=3600 +fi + +CUTOFF=$(date -u -d "@$(( $(date +%s) - SEC ))" +%Y-%m-%dT%H:%M:%SZ) + +if ! events_json=$("${KUBECTL}" get events -n "${NAMESPACE}" --context "${CONTEXT}" --field-selector type=Warning -o json 2>/dev/null); then + echo '[{"title":"Cannot list events","details":"kubectl get events failed","severity":4,"next_steps":"Verify RBAC for events in this namespace."}]' | jq . > "$OUTPUT_FILE" + exit 0 +fi + +if ! pods_json=$("${KUBECTL}" get pods -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL}" -o json 2>/dev/null); then + pods_json='{"items":[]}' +fi + +airflow_names=$(echo "$pods_json" | jq -r '[.items[]?.metadata.name] | join("|")') + +if ! workloads_json=$("${KUBECTL}" get deploy,sts,ds -n "${NAMESPACE}" --context "${CONTEXT}" -o json 2>/dev/null); then + workloads_json='{"items":[]}' +fi + +workload_names=$(echo "$workloads_json" | jq -r --arg p "$PREFIX" \ + '[.items[]? | select(.metadata.name | startswith($p)) | .metadata.name] | unique | join("|")') + +issues_json=$(echo "$events_json" | jq \ + --arg ns "$NAMESPACE" \ + --arg airflow "$airflow_names" \ + --arg wl "$workload_names" \ + --arg cutoff "$CUTOFF" ' + def name_matches($n; $pat): + ($pat != "") and (($pat | split("|")) as $parts | any($parts[]; . != "" and $n == .)); + def ts($o): + ($o.lastTimestamp // $o.firstTimestamp // ""); + [ .items[]? | + .involvedObject.name as $n | + .involvedObject.kind as $k | + select(ts(.) >= $cutoff) | + select( + name_matches($n; $airflow) or name_matches($n; $wl) or + ($n | test("scheduler|webserver|worker|triggerer|dag|airflow"; "i")) + ) | + { + "title": ("Warning event for " + $k + "/" + $n + " in `" + $ns + "`"), + "details": (.message // ""), + "severity": 2, + "next_steps": "Describe the involved object and check volume mounts, probes, and scheduling." + } + ] | unique_by(.title) +') + +echo "$issues_json" > "$OUTPUT_FILE" + +echo "Warning events (since ${CUTOFF}): $(echo "$issues_json" | jq 'length') issue(s) recorded." diff --git a/codebundles/k8s-airflow-workload-diagnostics/list-airflow-workloads.sh b/codebundles/k8s-airflow-workload-diagnostics/list-airflow-workloads.sh new file mode 100755 index 00000000..b53e731b --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/list-airflow-workloads.sh @@ -0,0 +1,75 @@ +#!/usr/bin/env bash +# Lists Deployments, StatefulSets, and DaemonSets tied to Airflow via label selector +# and optional name prefix; emits issues when replicas are not ready. +set -euo pipefail +set -x + +: "${CONTEXT:?Must set CONTEXT}" +: "${NAMESPACE:?Must set NAMESPACE}" + +OUTPUT_FILE="${OUTPUT_FILE:-list_airflow_workloads_issues.json}" +KUBECTL="${KUBERNETES_DISTRIBUTION_BINARY:-kubectl}" +LABEL_SEL="${AIRFLOW_LABEL_SELECTOR:-app.kubernetes.io/name=airflow}" +PREFIX="${AIRFLOW_DEPLOYMENT_NAME_PREFIX:-airflow}" + +if ! labeled_json=$("${KUBECTL}" get deploy,sts,ds -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL}" -o json 2>/dev/null); then + labeled_json='{"items":[]}' +fi + +if ! all_json=$("${KUBECTL}" get deploy,sts,ds -n "${NAMESPACE}" --context "${CONTEXT}" -o json 2>/dev/null); then + err_msg="kubectl get deploy,sts,ds failed for namespace ${NAMESPACE}" + echo '[]' | jq \ + --arg title "Cannot list workloads in namespace \`${NAMESPACE}\`" \ + --arg details "${err_msg}" \ + --arg severity "4" \ + --arg next_steps "Verify kubeconfig, context, and RBAC for get/list on apps resources in this namespace." \ + '. + [{ + "title": $title, + "details": $details, + "severity": ($severity | tonumber), + "next_steps": $next_steps + }]' > "$OUTPUT_FILE" + echo "$err_msg" + exit 0 +fi + +merged_json=$(jq -n --argjson labeled "$labeled_json" --argjson all "$all_json" --arg prefix "$PREFIX" ' + ($labeled.items // []) as $li | + ($all.items // []) as $ai | + {items: (($li + ($ai | map(select(.metadata.name | startswith($prefix))))) | unique_by(.metadata.uid))} +') + +issues_json=$(echo "$merged_json" | jq --arg ns "$NAMESPACE" ' + [ .items[]? | + . as $o | + ($o.kind) as $k | + ($o.metadata.name) as $n | + (if $k == "DaemonSet" then + [($o.status.desiredNumberScheduled // 0), ($o.status.numberReady // 0)] + elif ($k == "Deployment" or $k == "StatefulSet") then + [($o.spec.replicas // 0), ($o.status.readyReplicas // 0)] + else + [0, 0] + end) as $pair | + ($pair[0] | tonumber) as $desired | + ($pair[1] | tonumber) as $ready | + select($desired > 0 and $ready < $desired) | + { + "title": ($k + " `" + $n + "` not fully ready in `" + $ns + "`"), + "details": ($k + " " + $n + ": ready " + ($ready | tostring) + " / desired " + ($desired | tostring)), + "severity": 3, + "next_steps": ("Inspect pod status, events, and resource limits for " + $n + ".") + } + ] +') + +echo "$issues_json" > "$OUTPUT_FILE" + +echo "Discovered $(echo "$merged_json" | jq '.items | length') Airflow-related workload object(s) (label \`${LABEL_SEL}\` and/or name prefix \`${PREFIX}\`)." +printf '%s\n' "--- Workload summary ---" +echo "$merged_json" | jq -r '.items[]? | [.kind, .metadata.name, + (if .kind == "DaemonSet" then + ("ready " + ((.status.numberReady // 0) | tostring) + " / desired " + ((.status.desiredNumberScheduled // 0) | tostring)) + else + ("ready " + ((.status.readyReplicas // 0) | tostring) + " / desired " + ((.spec.replicas // 0) | tostring)) + end)] | @tsv' diff --git a/codebundles/k8s-airflow-workload-diagnostics/runbook.robot b/codebundles/k8s-airflow-workload-diagnostics/runbook.robot new file mode 100644 index 00000000..f3da9910 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/runbook.robot @@ -0,0 +1,326 @@ +*** Settings *** +Documentation Collects Kubernetes health signals for Apache Airflow workloads: controllers, pods, events, PVCs, scheduler logs, and executor pods to diagnose misconfiguration and executor failures without mutating the cluster. +Metadata Author rw-codebundle-agent +Metadata Display Name Kubernetes Airflow Workload Diagnostics +Metadata Supports Kubernetes Airflow Workload Diagnostics + +Library String +Library BuiltIn +Library RW.Core +Library RW.CLI +Library RW.K8sHelper +Library RW.platform + +Force Tags Kubernetes Airflow Workload Diagnostics + +Suite Setup Suite Initialization + + +*** Tasks *** +List Airflow Workloads in Namespace `${NAMESPACE}` + [Documentation] Discovers Deployments, StatefulSets, and DaemonSets associated with Airflow via label selectors and name prefix; compares desired versus ready replicas. + [Tags] Kubernetes Airflow access:read-only data:logs-config + + ${result}= RW.CLI.Run Bash File + ... bash_file=list-airflow-workloads.sh + ... env=${env} + ... secret_file__kubeconfig=${kubeconfig} + ... timeout_seconds=180 + ... include_in_history=false + ... show_in_rwl_cheatsheet=true + ... cmd_override=./list-airflow-workloads.sh + + ${issues}= RW.CLI.Run Cli + ... cmd=cat list_airflow_workloads_issues.json + ... env=${env} + ... include_in_history=false + + TRY + ${issue_list}= Evaluate json.loads(r'''${issues.stdout}''') json + EXCEPT + Log Failed to parse JSON for task, defaulting to empty list. WARN + ${issue_list}= Create List + END + + IF len(@{issue_list}) > 0 + FOR ${issue} IN @{issue_list} + RW.Core.Add Issue + ... severity=${issue['severity']} + ... expected=Airflow-related workloads should have ready replicas matching desired counts in namespace `${NAMESPACE}` + ... actual=Replica mismatch or API error reported for workloads in namespace `${NAMESPACE}` + ... title=${issue['title']} + ... reproduce_hint=${result.cmd} + ... details=${issue['details']} + ... next_steps=${issue['next_steps']} + END + END + + RW.Core.Add Pre To Report Airflow workload listing: + RW.Core.Add Pre To Report ${result.stdout} + +Check Airflow Pod Health and Restarts in Namespace `${NAMESPACE}` + [Documentation] Evaluates Airflow-labeled pods for phase, Ready condition, restart counts, and recent termination reasons such as OOMKilled or Error. + [Tags] Kubernetes Airflow access:read-only data:metrics + + ${result}= RW.CLI.Run Bash File + ... bash_file=check-airflow-pod-health.sh + ... env=${env} + ... secret_file__kubeconfig=${kubeconfig} + ... timeout_seconds=180 + ... include_in_history=false + ... show_in_rwl_cheatsheet=true + ... cmd_override=./check-airflow-pod-health.sh + + ${issues}= RW.CLI.Run Cli + ... cmd=cat check_airflow_pod_health_issues.json + ... env=${env} + ... include_in_history=false + + TRY + ${issue_list}= Evaluate json.loads(r'''${issues.stdout}''') json + EXCEPT + Log Failed to parse JSON for task, defaulting to empty list. WARN + ${issue_list}= Create List + END + + IF len(@{issue_list}) > 0 + FOR ${issue} IN @{issue_list} + RW.Core.Add Issue + ... severity=${issue['severity']} + ... expected=Airflow pods should be Running and Ready with stable containers in namespace `${NAMESPACE}` + ... actual=Pod health issue detected among Airflow-labeled pods in namespace `${NAMESPACE}` + ... title=${issue['title']} + ... reproduce_hint=${result.cmd} + ... details=${issue['details']} + ... next_steps=${issue['next_steps']} + END + END + + RW.Core.Add Pre To Report Airflow pod health check: + RW.Core.Add Pre To Report ${result.stdout} + +Fetch Recent Events for Airflow Resources in Namespace `${NAMESPACE}` + [Documentation] Pulls Warning events in the lookback window for objects tied to Airflow pods or workload names to catch scheduling, volume, and probe failures. + [Tags] Kubernetes Airflow access:read-only data:logs-config + + ${result}= RW.CLI.Run Bash File + ... bash_file=fetch-airflow-events.sh + ... env=${env} + ... secret_file__kubeconfig=${kubeconfig} + ... timeout_seconds=180 + ... include_in_history=false + ... show_in_rwl_cheatsheet=true + ... cmd_override=./fetch-airflow-events.sh + + ${issues}= RW.CLI.Run Cli + ... cmd=cat fetch_airflow_events_issues.json + ... env=${env} + ... include_in_history=false + + TRY + ${issue_list}= Evaluate json.loads(r'''${issues.stdout}''') json + EXCEPT + Log Failed to parse JSON for task, defaulting to empty list. WARN + ${issue_list}= Create List + END + + IF len(@{issue_list}) > 0 + FOR ${issue} IN @{issue_list} + RW.Core.Add Issue + ... severity=${issue['severity']} + ... expected=No unexpected Warning events for Airflow-related objects in namespace `${NAMESPACE}` during the lookback window + ... actual=Warning events matched Airflow-related resources in namespace `${NAMESPACE}` + ... title=${issue['title']} + ... reproduce_hint=${result.cmd} + ... details=${issue['details']} + ... next_steps=${issue['next_steps']} + END + END + + RW.Core.Add Pre To Report Airflow-related Warning events: + RW.Core.Add Pre To Report ${result.stdout} + +Summarize PVC Status for Airflow Data Volumes in Namespace `${NAMESPACE}` + [Documentation] Lists PVCs tied to Airflow pods or common volume name patterns and flags phases such as Pending that indicate storage provisioning problems. + [Tags] Kubernetes Airflow access:read-only data:logs-config + + ${result}= RW.CLI.Run Bash File + ... bash_file=summarize-airflow-pvcs.sh + ... env=${env} + ... secret_file__kubeconfig=${kubeconfig} + ... timeout_seconds=180 + ... include_in_history=false + ... show_in_rwl_cheatsheet=true + ... cmd_override=./summarize-airflow-pvcs.sh + + ${issues}= RW.CLI.Run Cli + ... cmd=cat summarize_airflow_pvcs_issues.json + ... env=${env} + ... include_in_history=false + + TRY + ${issue_list}= Evaluate json.loads(r'''${issues.stdout}''') json + EXCEPT + Log Failed to parse JSON for task, defaulting to empty list. WARN + ${issue_list}= Create List + END + + IF len(@{issue_list}) > 0 + FOR ${issue} IN @{issue_list} + RW.Core.Add Issue + ... severity=${issue['severity']} + ... expected=Airflow-related PVCs should be Bound and provisioned in namespace `${NAMESPACE}` + ... actual=PVC phase or provisioning issue detected in namespace `${NAMESPACE}` + ... title=${issue['title']} + ... reproduce_hint=${result.cmd} + ... details=${issue['details']} + ... next_steps=${issue['next_steps']} + END + END + + RW.Core.Add Pre To Report Airflow PVC summary: + RW.Core.Add Pre To Report ${result.stdout} + +Sample Scheduler Logs for DAG Import Errors in Namespace `${NAMESPACE}` + [Documentation] Reads recent scheduler pod logs within the lookback window and flags common DAG import, traceback, or database connectivity patterns without executing DAGs. + [Tags] Kubernetes Airflow access:read-only data:logs + + ${result}= RW.CLI.Run Bash File + ... bash_file=sample-airflow-scheduler-logs.sh + ... env=${env} + ... secret_file__kubeconfig=${kubeconfig} + ... timeout_seconds=180 + ... include_in_history=false + ... show_in_rwl_cheatsheet=true + ... cmd_override=./sample-airflow-scheduler-logs.sh + + ${issues}= RW.CLI.Run Cli + ... cmd=cat sample_airflow_scheduler_logs_issues.json + ... env=${env} + ... include_in_history=false + + TRY + ${issue_list}= Evaluate json.loads(r'''${issues.stdout}''') json + EXCEPT + Log Failed to parse JSON for task, defaulting to empty list. WARN + ${issue_list}= Create List + END + + IF len(@{issue_list}) > 0 + FOR ${issue} IN @{issue_list} + RW.Core.Add Issue + ... severity=${issue['severity']} + ... expected=Scheduler logs should be free of DAG import failures and critical connectivity errors in namespace `${NAMESPACE}` + ... actual=Log patterns indicated potential DAG or database issues in namespace `${NAMESPACE}` + ... title=${issue['title']} + ... reproduce_hint=${result.cmd} + ... details=${issue['details']} + ... next_steps=${issue['next_steps']} + END + END + + RW.Core.Add Pre To Report Scheduler log sampling: + RW.Core.Add Pre To Report ${result.stdout} + +Check Worker or KubernetesExecutor Pod Saturation in Namespace `${NAMESPACE}` + [Documentation] When Celery or executor-style pods are present, surfaces Pending scheduling problems and OOM terminations using pod status (best-effort, read-only). + [Tags] Kubernetes Airflow access:read-only data:metrics + + ${result}= RW.CLI.Run Bash File + ... bash_file=check-airflow-executor-pods.sh + ... env=${env} + ... secret_file__kubeconfig=${kubeconfig} + ... timeout_seconds=180 + ... include_in_history=false + ... show_in_rwl_cheatsheet=true + ... cmd_override=./check-airflow-executor-pods.sh + + ${issues}= RW.CLI.Run Cli + ... cmd=cat check_airflow_executor_pods_issues.json + ... env=${env} + ... include_in_history=false + + TRY + ${issue_list}= Evaluate json.loads(r'''${issues.stdout}''') json + EXCEPT + Log Failed to parse JSON for task, defaulting to empty list. WARN + ${issue_list}= Create List + END + + IF len(@{issue_list}) > 0 + FOR ${issue} IN @{issue_list} + RW.Core.Add Issue + ... severity=${issue['severity']} + ... expected=Executor and worker pods should schedule cleanly without OOM terminations in namespace `${NAMESPACE}` + ... actual=Executor-related pod issue detected in namespace `${NAMESPACE}` + ... title=${issue['title']} + ... reproduce_hint=${result.cmd} + ... details=${issue['details']} + ... next_steps=${issue['next_steps']} + END + END + + RW.Core.Add Pre To Report Executor pod check: + RW.Core.Add Pre To Report ${result.stdout} + + +*** Keywords *** +Suite Initialization + ${kubeconfig}= RW.Core.Import Secret + ... kubeconfig + ... type=string + ... description=Kubernetes kubeconfig with read-only list/get/describe/logs on workloads. + ... pattern=\w* + ${KUBERNETES_DISTRIBUTION_BINARY}= RW.Core.Import User Variable KUBERNETES_DISTRIBUTION_BINARY + ... type=string + ... description=Kubernetes CLI binary to use. + ... enum=[kubectl,oc] + ... default=kubectl + ... pattern=\w* + ${CONTEXT}= RW.Core.Import User Variable CONTEXT + ... type=string + ... description=Kubernetes context name. + ... pattern=[^\\s]+ + ${NAMESPACE}= RW.Core.Import User Variable NAMESPACE + ... type=string + ... description=Namespace that contains the Airflow release. + ... pattern=[^\\s]+ + ${AIRFLOW_LABEL_SELECTOR}= RW.Core.Import User Variable AIRFLOW_LABEL_SELECTOR + ... type=string + ... description=Label selector for Airflow workloads (pods, controllers). + ... default=app.kubernetes.io/name=airflow + ... pattern=.* + ${AIRFLOW_DEPLOYMENT_NAME_PREFIX}= RW.Core.Import User Variable AIRFLOW_DEPLOYMENT_NAME_PREFIX + ... type=string + ... description=Name prefix to match controllers when labels are inconsistent. + ... default=airflow + ... pattern=.* + ${RW_LOOKBACK_WINDOW}= RW.Core.Import User Variable RW_LOOKBACK_WINDOW + ... type=string + ... description=Lookback window for events and log sampling (for example 30m, 1h). + ... default=1h + ... pattern=.* + + Set Suite Variable ${kubeconfig} ${kubeconfig} + Set Suite Variable ${KUBERNETES_DISTRIBUTION_BINARY} ${KUBERNETES_DISTRIBUTION_BINARY} + Set Suite Variable ${CONTEXT} ${CONTEXT} + Set Suite Variable ${NAMESPACE} ${NAMESPACE} + Set Suite Variable ${AIRFLOW_LABEL_SELECTOR} ${AIRFLOW_LABEL_SELECTOR} + Set Suite Variable ${AIRFLOW_DEPLOYMENT_NAME_PREFIX} ${AIRFLOW_DEPLOYMENT_NAME_PREFIX} + Set Suite Variable ${RW_LOOKBACK_WINDOW} ${RW_LOOKBACK_WINDOW} + + ${env}= Create Dictionary + ... KUBECONFIG=./${kubeconfig.key} + ... CONTEXT=${CONTEXT} + ... NAMESPACE=${NAMESPACE} + ... KUBERNETES_DISTRIBUTION_BINARY=${KUBERNETES_DISTRIBUTION_BINARY} + ... AIRFLOW_LABEL_SELECTOR=${AIRFLOW_LABEL_SELECTOR} + ... AIRFLOW_DEPLOYMENT_NAME_PREFIX=${AIRFLOW_DEPLOYMENT_NAME_PREFIX} + ... RW_LOOKBACK_WINDOW=${RW_LOOKBACK_WINDOW} + Set Suite Variable ${env} ${env} + + RW.K8sHelper.Verify Cluster Connectivity + ... binary=${KUBERNETES_DISTRIBUTION_BINARY} + ... context=${CONTEXT} + ... env=${env} + ... secret_file__kubeconfig=${kubeconfig} diff --git a/codebundles/k8s-airflow-workload-diagnostics/sample-airflow-scheduler-logs.sh b/codebundles/k8s-airflow-workload-diagnostics/sample-airflow-scheduler-logs.sh new file mode 100755 index 00000000..ad6a2229 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/sample-airflow-scheduler-logs.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +# Fetches recent scheduler pod logs and flags common DAG import / traceback patterns. +set -euo pipefail +set -x + +: "${CONTEXT:?}" "${NAMESPACE:?}" + +OUTPUT_FILE="${OUTPUT_FILE:-sample_airflow_scheduler_logs_issues.json}" +KUBECTL="${KUBERNETES_DISTRIBUTION_BINARY:-kubectl}" +LABEL_SEL="${AIRFLOW_LABEL_SELECTOR:-app.kubernetes.io/name=airflow}" +LW="${RW_LOOKBACK_WINDOW:-1h}" + +if [[ "$LW" =~ ^([0-9]+)h$ ]]; then SINCE="${BASH_REMATCH[1]}h" +elif [[ "$LW" =~ ^([0-9]+)m$ ]]; then SINCE="${BASH_REMATCH[1]}m" +else SINCE="1h" +fi + +sched_pod=$("${KUBECTL}" get pods -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL}" -o json 2>/dev/null | \ + jq -r '[.items[]? | select(.metadata.name | test("scheduler"; "i")) | .metadata.name] | first // empty') + +if [[ -z "$sched_pod" ]]; then + sched_pod=$("${KUBECTL}" get pods -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL},component=scheduler" -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || true) +fi + +if [[ -z "$sched_pod" ]]; then + echo '[{"title":"No Airflow scheduler pod found","details":"Could not find a pod with scheduler in the name or component=scheduler under the Airflow label selector.","severity":3,"next_steps":"Adjust AIRFLOW_LABEL_SELECTOR or confirm the chart labels for the scheduler."}]' | jq . > "$OUTPUT_FILE" + echo "No scheduler pod found." + exit 0 +fi + +if ! log_out=$("${KUBECTL}" logs -n "${NAMESPACE}" --context "${CONTEXT}" "$sched_pod" --since="${SINCE}" --tail=300 2>&1); then + issues_json=$(jq -n --arg d "$log_out" ' [{ + "title": "Cannot read scheduler logs", + "details": $d, + "severity": 4, + "next_steps": "Verify RBAC for pods/log and pod state (running vs pending)." + } ]') + echo "$issues_json" | jq . > "$OUTPUT_FILE" + echo "kubectl logs failed." + exit 0 +fi + +issues_json='[]' +if echo "$log_out" | grep -qiE 'Traceback|ImportError|ModuleNotFoundError|DAG.*import|Broken DAG|SyntaxError'; then + det=$(echo "$log_out" | grep -iE 'Traceback|ImportError|ModuleNotFoundError|DAG.*import|Broken DAG|SyntaxError' | head -20 || true) + issues_json=$(echo "$issues_json" | jq \ + --arg title "DAG import or Python errors in scheduler logs (\`${sched_pod}\`)" \ + --arg details "$det" \ + '. + [{ + "title": $title, + "details": $details, + "severity": 4, + "next_steps": "Fix DAG package dependencies and syntax; validate imports in a dev environment." + }]') +fi + +if echo "$log_out" | grep -qiE 'DatabaseError|could not connect|timeout.*postgres|MySQL.*lost connection'; then + det=$(echo "$log_out" | grep -iE 'DatabaseError|could not connect|timeout.*postgres|MySQL' | head -10 || true) + issues_json=$(echo "$issues_json" | jq \ + --arg title "Database connectivity errors in scheduler logs (\`${sched_pod}\`)" \ + --arg details "$det" \ + '. + [{ + "title": $title, + "details": $details, + "severity": 3, + "next_steps": "Check metadata DB reachability and credentials (see postgres health bundle)." + }]') +fi + +echo "$issues_json" | jq . > "$OUTPUT_FILE" + +printf '%s\n' "--- Scheduler log sample (${sched_pod}, since ${SINCE}) ---" +echo "$log_out" | tail -80 diff --git a/codebundles/k8s-airflow-workload-diagnostics/sli-airflow-health.sh b/codebundles/k8s-airflow-workload-diagnostics/sli-airflow-health.sh new file mode 100755 index 00000000..a341b22e --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/sli-airflow-health.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash +# Lightweight SLI: binary scores for workload readiness, pod readiness, and warning events (Airflow-scoped). +# Prints one JSON object to stdout for sli.robot. +set -euo pipefail + +: "${CONTEXT:?}" "${NAMESPACE:?}" + +KUBECTL="${KUBERNETES_DISTRIBUTION_BINARY:-kubectl}" +LABEL_SEL="${AIRFLOW_LABEL_SELECTOR:-app.kubernetes.io/name=airflow}" +PREFIX="${AIRFLOW_DEPLOYMENT_NAME_PREFIX:-airflow}" +LW="${RW_LOOKBACK_WINDOW:-1h}" +EVENT_MAX="${AIRFLOW_SLI_EVENT_THRESHOLD:-8}" + +if [[ "$LW" =~ ^([0-9]+)h$ ]]; then SEC=$((BASH_REMATCH[1] * 3600)) +elif [[ "$LW" =~ ^([0-9]+)m$ ]]; then SEC=$((BASH_REMATCH[1] * 60)) +else SEC=3600 +fi +CUTOFF=$(date -u -d "@$(( $(date +%s) - SEC ))" +%Y-%m-%dT%H:%M:%SZ) + +workload_score=1 +if ! labeled_json=$("${KUBECTL}" get deploy,sts,ds -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL}" -o json 2>/dev/null); then + labeled_json='{"items":[]}' +fi +if ! all_json=$("${KUBECTL}" get deploy,sts,ds -n "${NAMESPACE}" --context "${CONTEXT}" -o json 2>/dev/null); then + all_json='{"items":[]}' +fi +bad_w=$(echo "$labeled_json" "$all_json" | jq -s --arg prefix "$PREFIX" ' + ((.[0].items // []) + (.[1].items // [] | map(select(.metadata.name | startswith($prefix))))) + | unique_by(.metadata.uid) + | map( + if .kind == "DaemonSet" then + [(.status.desiredNumberScheduled // 0), (.status.numberReady // 0)] + elif .kind == "Deployment" or .kind == "StatefulSet" then + [(.spec.replicas // 0), (.status.readyReplicas // 0)] + else [0,0] end + ) + | map(select(.[0] > 0 and .[1] < .[0])) + | length +') +[[ "${bad_w}" -gt 0 ]] && workload_score=0 + +pod_score=1 +if ! pods_json=$("${KUBECTL}" get pods -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL}" -o json 2>/dev/null); then + pods_json='{"items":[]}' +fi +bad_p=$(echo "$pods_json" | jq '[.items[]? | select((.status.phase != "Running" and .status.phase != "Succeeded") or ([.status.conditions[]? | select(.type=="Ready") | .status] | first // "False") == "False")] | length') +[[ "${bad_p}" -gt 0 ]] && pod_score=0 + +event_score=1 +if ! ev_json=$("${KUBECTL}" get events -n "${NAMESPACE}" --context "${CONTEXT}" --field-selector type=Warning -o json 2>/dev/null); then + ev_json='{"items":[]}' +fi +cnt=$(echo "$ev_json" | jq --arg c "$CUTOFF" '[.items[]? | select((.lastTimestamp // .firstTimestamp // "") >= $c)] | length') +[[ "${cnt}" -gt "${EVENT_MAX}" ]] && event_score=0 + +jq -n \ + --argjson w "$workload_score" \ + --argjson p "$pod_score" \ + --argjson e "$event_score" \ + '{workload: $w, pods: $p, events: $e}' diff --git a/codebundles/k8s-airflow-workload-diagnostics/sli.robot b/codebundles/k8s-airflow-workload-diagnostics/sli.robot new file mode 100644 index 00000000..54279584 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/sli.robot @@ -0,0 +1,113 @@ +*** Settings *** +Documentation Measures Airflow namespace workload health using workload readiness, pod readiness, and Warning event volume. Produces a value between 0 (failing) and 1 (healthy). +Metadata Author rw-codebundle-agent +Metadata Display Name Kubernetes Airflow Workload Diagnostics +Metadata Supports Kubernetes Airflow Workload Diagnostics + +Library BuiltIn +Library RW.Core +Library RW.CLI +Library RW.platform +Library OperatingSystem +Library Collections + +Suite Setup Suite Initialization + + +*** Keywords *** +Suite Initialization + ${kubeconfig}= RW.Core.Import Secret + ... kubeconfig + ... type=string + ... description=Kubernetes kubeconfig with read-only list/get/describe/logs on workloads. + ... pattern=\w* + ${KUBERNETES_DISTRIBUTION_BINARY}= RW.Core.Import User Variable KUBERNETES_DISTRIBUTION_BINARY + ... type=string + ... description=Kubernetes CLI binary to use. + ... enum=[kubectl,oc] + ... default=kubectl + ... pattern=\w* + ${CONTEXT}= RW.Core.Import User Variable CONTEXT + ... type=string + ... description=Kubernetes context name. + ... pattern=[^\\s]+ + ${NAMESPACE}= RW.Core.Import User Variable NAMESPACE + ... type=string + ... description=Namespace that contains the Airflow release. + ... pattern=[^\\s]+ + ${AIRFLOW_LABEL_SELECTOR}= RW.Core.Import User Variable AIRFLOW_LABEL_SELECTOR + ... type=string + ... description=Label selector for Airflow workloads. + ... default=app.kubernetes.io/name=airflow + ... pattern=.* + ${AIRFLOW_DEPLOYMENT_NAME_PREFIX}= RW.Core.Import User Variable AIRFLOW_DEPLOYMENT_NAME_PREFIX + ... type=string + ... description=Name prefix to match controllers when labels are inconsistent. + ... default=airflow + ... pattern=.* + ${RW_LOOKBACK_WINDOW}= RW.Core.Import User Variable RW_LOOKBACK_WINDOW + ... type=string + ... description=Lookback window for event counting (for example 30m, 1h). + ... default=1h + ... pattern=.* + ${AIRFLOW_SLI_EVENT_THRESHOLD}= RW.Core.Import User Variable AIRFLOW_SLI_EVENT_THRESHOLD + ... type=string + ... description=Maximum Warning events in the lookback window before the events sub-score fails. + ... default=8 + ... pattern=^\\d+$ + + Set Suite Variable ${kubeconfig} ${kubeconfig} + Set Suite Variable ${KUBERNETES_DISTRIBUTION_BINARY} ${KUBERNETES_DISTRIBUTION_BINARY} + Set Suite Variable ${CONTEXT} ${CONTEXT} + Set Suite Variable ${NAMESPACE} ${NAMESPACE} + Set Suite Variable ${AIRFLOW_LABEL_SELECTOR} ${AIRFLOW_LABEL_SELECTOR} + Set Suite Variable ${AIRFLOW_DEPLOYMENT_NAME_PREFIX} ${AIRFLOW_DEPLOYMENT_NAME_PREFIX} + Set Suite Variable ${RW_LOOKBACK_WINDOW} ${RW_LOOKBACK_WINDOW} + Set Suite Variable ${AIRFLOW_SLI_EVENT_THRESHOLD} ${AIRFLOW_SLI_EVENT_THRESHOLD} + + ${env}= Create Dictionary + ... KUBECONFIG=./${kubeconfig.key} + ... CONTEXT=${CONTEXT} + ... NAMESPACE=${NAMESPACE} + ... KUBERNETES_DISTRIBUTION_BINARY=${KUBERNETES_DISTRIBUTION_BINARY} + ... AIRFLOW_LABEL_SELECTOR=${AIRFLOW_LABEL_SELECTOR} + ... AIRFLOW_DEPLOYMENT_NAME_PREFIX=${AIRFLOW_DEPLOYMENT_NAME_PREFIX} + ... RW_LOOKBACK_WINDOW=${RW_LOOKBACK_WINDOW} + ... AIRFLOW_SLI_EVENT_THRESHOLD=${AIRFLOW_SLI_EVENT_THRESHOLD} + Set Suite Variable ${env} ${env} + + +*** Tasks *** +Generate Airflow Workload Health Score for Namespace `${NAMESPACE}` + [Documentation] Runs a lightweight kubectl summary, pushes binary sub-scores for workload readiness, pod readiness, and Warning event volume, then averages them into a 0-1 health metric. + [Tags] access:read-only data:metrics + + ${result}= RW.CLI.Run Bash File + ... bash_file=sli-airflow-health.sh + ... env=${env} + ... secret_file__kubeconfig=${kubeconfig} + ... timeout_seconds=120 + ... include_in_history=false + ... show_in_rwl_cheatsheet=true + ... cmd_override=./sli-airflow-health.sh + + TRY + ${m}= Evaluate json.loads(r'''${result.stdout}''') json + ${ws}= Convert To Number ${m['workload']} + ${ps}= Convert To Number ${m['pods']} + ${es}= Convert To Number ${m['events']} + EXCEPT + Log SLI JSON parse failed; scoring zero. WARN + ${ws}= Set Variable ${0} + ${ps}= Set Variable ${0} + ${es}= Set Variable ${0} + END + + RW.Core.Push Metric ${ws} sub_name=workload_readiness + RW.Core.Push Metric ${ps} sub_name=pod_readiness + RW.Core.Push Metric ${es} sub_name=warning_events + + ${health_score}= Evaluate (${ws} + ${ps} + ${es}) / 3 + ${health_score}= Convert to Number ${health_score} 2 + RW.Core.Add to Report Airflow workload health score: ${health_score} (workload=${ws}, pods=${ps}, events=${es}) + RW.Core.Push Metric ${health_score} diff --git a/codebundles/k8s-airflow-workload-diagnostics/summarize-airflow-pvcs.sh b/codebundles/k8s-airflow-workload-diagnostics/summarize-airflow-pvcs.sh new file mode 100755 index 00000000..a2793404 --- /dev/null +++ b/codebundles/k8s-airflow-workload-diagnostics/summarize-airflow-pvcs.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Lists PVCs used by Airflow or matching common volume name patterns; flags non-Bound phases. +set -euo pipefail +set -x + +: "${CONTEXT:?}" "${NAMESPACE:?}" + +OUTPUT_FILE="${OUTPUT_FILE:-summarize_airflow_pvcs_issues.json}" +KUBECTL="${KUBERNETES_DISTRIBUTION_BINARY:-kubectl}" +LABEL_SEL="${AIRFLOW_LABEL_SELECTOR:-app.kubernetes.io/name=airflow}" + +if ! pvc_json=$("${KUBECTL}" get pvc -n "${NAMESPACE}" --context "${CONTEXT}" -o json 2>/dev/null); then + echo '[{"title":"Cannot list PVCs","details":"kubectl get pvc failed","severity":4,"next_steps":"Verify RBAC for persistentvolumeclaims in this namespace."}]' | jq . > "$OUTPUT_FILE" + exit 0 +fi + +if ! pods_json=$("${KUBECTL}" get pods -n "${NAMESPACE}" --context "${CONTEXT}" -l "${LABEL_SEL}" -o json 2>/dev/null); then + pods_json='{"items":[]}' +fi + +used_claims=$(echo "$pods_json" | jq -r '[.items[]? | .spec.volumes[]? | .persistentVolumeClaim.claimName // empty] | unique | join("|")') + +issues_json=$(echo "$pvc_json" | jq \ + --arg used "$used_claims" \ + --arg ns "$NAMESPACE" ' + def claim_in_use($name): + (($used | split("|")) | map(select(length > 0)) | index($name)) != null; + def is_airflow_pvc($name): + claim_in_use($name) or ($name | test("dags|logs|plugins|airflow|git-sync|persistence"; "i")); + [ .items[]? + | select(is_airflow_pvc(.metadata.name)) + | (.metadata.name) as $n + | (.status.phase // "Unknown") as $ph + | (.spec.resources.requests.storage // "?") as $req + | select($ph != "Bound") + | { + "title": ("PVC `" + $n + "` is " + $ph + " in `" + $ns + "`"), + "details": ("Phase: " + $ph + ", requested: " + $req), + "severity": 3, + "next_steps": "Check storage class, provisioner, quota, and events for this PVC." + } + ] +') + +echo "$issues_json" > "$OUTPUT_FILE" + +echo "PVC summary (Airflow-related):" +echo "$pvc_json" | jq -r --arg used "$used_claims" ' + def claim_in_use($name): + (($used | split("|")) | map(select(length > 0)) | index($name)) != null; + [.items[]? | select( + claim_in_use(.metadata.name) or + (.metadata.name | test("dags|logs|plugins|airflow|git-sync|persistence"; "i")) + ) + | [.metadata.name, .status.phase, (.spec.resources.requests.storage // "-")] | @tsv + ] | .[]'