diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 97ca58b3..1aa4acf9 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -143,6 +143,7 @@ flowchart TD | `step.log` | Logs pipeline data for debugging | pipelinesteps | | `step.publish` | Publishes events to EventBus | pipelinesteps | | `step.event_publish` | Publishes events to EventBus with full envelope control | pipelinesteps | +| `step.event_decrypt` | Decrypts field-level-encrypted CloudEvents produced by step.event_publish | pipelinesteps | | `step.http_call` | Makes outbound HTTP requests | pipelinesteps | | `step.graphql` | Execute GraphQL queries/mutations with data extraction, pagination, batching, APQ | pipelinesteps | | `step.delegate` | Delegates to a named service | pipelinesteps | diff --git a/cmd/wfctl/type_registry.go b/cmd/wfctl/type_registry.go index b14b6f2e..e3e8786f 100644 --- a/cmd/wfctl/type_registry.go +++ b/cmd/wfctl/type_registry.go @@ -582,6 +582,11 @@ func KnownStepTypes() map[string]StepTypeInfo { Plugin: "pipelinesteps", ConfigKeys: []string{"topic", "stream", "broker", "provider", "payload", "data", "headers", "event_type", "source"}, }, + "step.event_decrypt": { + Type: "step.event_decrypt", + Plugin: "pipelinesteps", + ConfigKeys: []string{"key_id"}, + }, "step.http_call": { Type: "step.http_call", Plugin: "pipelinesteps", diff --git a/module/pipeline_step_event_decrypt.go b/module/pipeline_step_event_decrypt.go new file mode 100644 index 00000000..735991e7 --- /dev/null +++ b/module/pipeline_step_event_decrypt.go @@ -0,0 +1,125 @@ +package module + +import ( + "context" + "fmt" + + "github.com/GoCodeAlone/modular" +) + +// EventDecryptStep decrypts field-level encryption applied by step.event_publish. +// It reads the CloudEvents extension attributes ("encrypteddek", "encryptedfields", +// "keyid") from the current pipeline context and decrypts the specified fields +// inside the event's "data" object. +type EventDecryptStep struct { + name string + keyID string // overrides the keyid extension when set + app modular.Application + tmpl *TemplateEngine +} + +// NewEventDecryptStepFactory returns a StepFactory that creates EventDecryptStep instances. +func NewEventDecryptStepFactory() StepFactory { + return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { + step := &EventDecryptStep{ + name: name, + app: app, + tmpl: NewTemplateEngine(), + } + + // key_id overrides the keyid extension found in the event. + // Supports "${ENV_VAR}" references. + if k, ok := config["key_id"].(string); ok { + step.keyID = k + } + + return step, nil + } +} + +// Name returns the step name. +func (s *EventDecryptStep) Name() string { return s.name } + +// supportedEncryptionAlgorithm is the only algorithm this step can decrypt. +const supportedEncryptionAlgorithm = "AES-256-GCM" + +// Execute decrypts the fields in the incoming CloudEvent. +// +// Expected shape of pc.Current (CloudEvents envelope from step.event_publish): +// +// { +// "specversion": "1.0", // optional +// "type": "...", // optional +// "source": "...", // optional +// "id": "...", // optional +// "time": "...", // optional +// "encryption": "AES-256-GCM", // extension — validated before decryption +// "keyid": "", // extension +// "encrypteddek": "", // extension +// "encryptedfields": "field1,field2", // extension +// "data": { // payload with encrypted fields +// "field1": "", +// "field2": "", +// ... +// } +// } +// +// The step returns the same envelope structure with "data" containing the +// decrypted field values. The encryption extension attributes are preserved. +func (s *EventDecryptStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { + event := pc.Current + if event == nil { + return &StepResult{Output: map[string]any{"decrypted": false, "reason": "no event data"}}, nil + } + + // Read encryption extension attributes. + encryptedDEKB64, _ := event["encrypteddek"].(string) + encryptedFields, _ := event["encryptedfields"].(string) + keyID, _ := event["keyid"].(string) + algorithm, _ := event["encryption"].(string) + + // Override keyID from step configuration if provided. + if s.keyID != "" { + keyID = s.keyID + } + + // If the event has no encryption metadata, pass through unchanged. + if encryptedDEKB64 == "" || encryptedFields == "" || keyID == "" { + return &StepResult{Output: event}, nil + } + + // Validate the encryption algorithm before attempting decryption. + // Events produced by an unknown scheme should not silently fail or + // produce garbage — return a clear error instead. + if algorithm != "" && algorithm != supportedEncryptionAlgorithm { + return nil, fmt.Errorf("event_decrypt step %q: unsupported encryption algorithm %q (supported: %s)", s.name, algorithm, supportedEncryptionAlgorithm) + } + + // Locate the payload — either under "data" (CloudEvents envelope) or the event itself. + payload, hasData := event["data"].(map[string]any) + if !hasData { + // Treat the whole event as the payload (flat structure without envelope). + payload = event + } + + decrypted, err := decryptEventFields(payload, encryptedDEKB64, encryptedFields, keyID) + if err != nil { + return nil, fmt.Errorf("event_decrypt step %q: %w", s.name, err) + } + + // Rebuild the output envelope, preserving all non-data fields. + output := make(map[string]any, len(event)) + for k, v := range event { + output[k] = v + } + if hasData { + output["data"] = decrypted + } else { + // Merge decrypted fields back into the top-level map. + for k, v := range decrypted { + output[k] = v + } + } + + return &StepResult{Output: output}, nil +} diff --git a/module/pipeline_step_event_decrypt_test.go b/module/pipeline_step_event_decrypt_test.go new file mode 100644 index 00000000..1ed875ef --- /dev/null +++ b/module/pipeline_step_event_decrypt_test.go @@ -0,0 +1,743 @@ +package module + +import ( + "context" + "encoding/json" + "os" + "strings" + "testing" +) + +// ---- encryption helper tests ------------------------------------------------ + +func TestResolveEncryptionKey_LiteralRejected(t *testing.T) { + // Literal key strings must now be rejected — only env-var references are accepted. + _, err := resolveEncryptionKey("mysecretkey") + if err == nil { + t.Fatal("expected error for literal key_id; only env-var references are allowed") + } + if !strings.Contains(err.Error(), "environment variable reference") { + t.Errorf("expected message about env-var reference, got: %v", err) + } +} + +func TestResolveEncryptionKey_EnvVar(t *testing.T) { + t.Setenv("TEST_ENC_KEY", "env-secret") + key, err := resolveEncryptionKey("$TEST_ENC_KEY") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(key) != 32 { + t.Errorf("expected 32-byte key, got %d bytes", len(key)) + } +} + +func TestResolveEncryptionKey_BraceEnvVar(t *testing.T) { + t.Setenv("TEST_BRACE_KEY", "brace-secret") + key, err := resolveEncryptionKey("${TEST_BRACE_KEY}") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(key) != 32 { + t.Errorf("expected 32-byte key, got %d bytes", len(key)) + } +} + +func TestResolveEncryptionKey_EmptyEnvVar(t *testing.T) { + os.Unsetenv("TEST_MISSING_KEY") + _, err := resolveEncryptionKey("$TEST_MISSING_KEY") + if err == nil { + t.Fatal("expected error for empty resolved key") + } +} + +func TestResolveEncryptionKey_EmptyLiteral(t *testing.T) { + _, err := resolveEncryptionKey("") + if err == nil { + t.Fatal("expected error for empty key") + } +} + +func TestEncryptDecryptFieldRoundTrip(t *testing.T) { + dek, err := generateDEK() + if err != nil { + t.Fatalf("generateDEK: %v", err) + } + plain := "sensitive-phone-number" + enc, err := encryptFieldWithDEK(dek, plain) + if err != nil { + t.Fatalf("encrypt: %v", err) + } + if enc == plain { + t.Fatal("encrypted value should differ from plaintext") + } + got, err := decryptFieldWithDEK(dek, enc) + if err != nil { + t.Fatalf("decrypt: %v", err) + } + if got != plain { + t.Errorf("round-trip failed: got %q, want %q", got, plain) + } +} + +func TestApplyEventFieldEncryption(t *testing.T) { + t.Setenv("TEST_MASTER_KEY", "test-master-key-value") + + cfg := &EventEncryptionConfig{ + Provider: "aes", + KeyID: "${TEST_MASTER_KEY}", + Fields: []string{"phone", "message_body"}, + Algorithm: "AES-256-GCM", + } + + payload := map[string]any{ + "phone": "+15551234567", + "message_body": "I need help", + "safe_field": "not encrypted", + } + + encrypted, meta, err := applyEventFieldEncryption(payload, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Encrypted fields should differ from originals. + if encrypted["phone"] == payload["phone"] { + t.Error("phone should be encrypted") + } + if encrypted["message_body"] == payload["message_body"] { + t.Error("message_body should be encrypted") + } + // Non-targeted fields should be unchanged. + if encrypted["safe_field"] != "not encrypted" { + t.Errorf("safe_field should be unchanged, got %v", encrypted["safe_field"]) + } + // Original payload must not be mutated. + if payload["phone"] != "+15551234567" { + t.Error("original payload should not be mutated") + } + + // Metadata checks. + if meta.Algorithm != "AES-256-GCM" { + t.Errorf("expected algorithm AES-256-GCM, got %v", meta.Algorithm) + } + if meta.KeyID != "${TEST_MASTER_KEY}" { + t.Errorf("expected keyID ${TEST_MASTER_KEY}, got %v", meta.KeyID) + } + if meta.EncryptedDEK == "" { + t.Error("expected non-empty EncryptedDEK") + } + if len(meta.EncryptedFields) != 2 { + t.Errorf("expected 2 encrypted fields, got %d", len(meta.EncryptedFields)) + } +} + +func TestDecryptEventFields(t *testing.T) { + t.Setenv("TEST_ROUND_TRIP_KEY", "round-trip-secret-value") + + cfg := &EventEncryptionConfig{ + Provider: "aes", + KeyID: "${TEST_ROUND_TRIP_KEY}", + Fields: []string{"phone", "message_body"}, + Algorithm: "AES-256-GCM", + } + original := map[string]any{ + "phone": "+15559876543", + "message_body": "Help me please", + "other": "untouched", + } + + encrypted, meta, err := applyEventFieldEncryption(original, cfg) + if err != nil { + t.Fatalf("encrypt: %v", err) + } + + decrypted, err := decryptEventFields(encrypted, meta.EncryptedDEK, strings.Join(meta.EncryptedFields, ","), meta.KeyID) + if err != nil { + t.Fatalf("decrypt: %v", err) + } + + if decrypted["phone"] != original["phone"] { + t.Errorf("phone: got %v, want %v", decrypted["phone"], original["phone"]) + } + if decrypted["message_body"] != original["message_body"] { + t.Errorf("message_body: got %v, want %v", decrypted["message_body"], original["message_body"]) + } + if decrypted["other"] != "untouched" { + t.Errorf("other should be untouched, got %v", decrypted["other"]) + } +} + +// ---- step.event_publish with encryption ------------------------------------- + +func TestEventPublishStep_EncryptionConfig_CloudEventsEnvelope(t *testing.T) { + t.Setenv("PUB_ENC_TEST_KEY", "test-key-secret-value") + + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-enc", map[string]any{ + "topic": "sensitive.events", + "broker": "bus", + "event_type": "user.contact", + "source": "/api/users", + "payload": map[string]any{ + "phone": "+15551234567", + "message": "please help", + "id": "user-1", + }, + "encryption": map[string]any{ + "provider": "aes", + "key_id": "${PUB_ENC_TEST_KEY}", + "fields": []any{"phone", "message"}, + "algorithm": "AES-256-GCM", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["published"] != true { + t.Errorf("expected published=true") + } + + var envelope map[string]any + if err := json.Unmarshal(broker.producer.published[0].message, &envelope); err != nil { + t.Fatalf("failed to unmarshal published message: %v", err) + } + + // CloudEvents base attributes. + if envelope["specversion"] != "1.0" { + t.Errorf("expected specversion=1.0, got %v", envelope["specversion"]) + } + if envelope["type"] != "user.contact" { + t.Errorf("expected type=user.contact, got %v", envelope["type"]) + } + + // Encryption extensions. + if envelope["encryption"] != "AES-256-GCM" { + t.Errorf("expected encryption=AES-256-GCM, got %v", envelope["encryption"]) + } + // keyid stores the original key_id config string (env-var reference). + if envelope["keyid"] != "${PUB_ENC_TEST_KEY}" { + t.Errorf("expected keyid=${PUB_ENC_TEST_KEY}, got %v", envelope["keyid"]) + } + if envelope["encrypteddek"] == "" { + t.Error("expected non-empty encrypteddek") + } + encryptedFields, _ := envelope["encryptedfields"].(string) + if !strings.Contains(encryptedFields, "phone") || !strings.Contains(encryptedFields, "message") { + t.Errorf("expected encryptedfields to contain phone,message; got %q", encryptedFields) + } + + // Data fields should be encrypted (not equal to original values). + data, ok := envelope["data"].(map[string]any) + if !ok { + t.Fatal("expected data field in envelope") + } + if data["phone"] == "+15551234567" { + t.Error("phone should be encrypted in published envelope") + } + if data["phone"] == nil { + t.Error("phone field missing from published data; encryption may not have run") + } + if data["message"] == "please help" { + t.Error("message should be encrypted in published envelope") + } + // Non-encrypted field stays unchanged. + if data["id"] != "user-1" { + t.Errorf("id should be unchanged, got %v", data["id"]) + } +} + +func TestEventPublishStep_EncryptionConfig_EnvVarKey(t *testing.T) { + t.Setenv("MY_EVENT_KEY", "runtime-secret-key") + + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-enc-env", map[string]any{ + "topic": "events", + "broker": "bus", + "payload": map[string]any{ + "phone": "+15550000001", + }, + "encryption": map[string]any{ + "key_id": "${MY_EVENT_KEY}", + "fields": []any{"phone"}, + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var envelope map[string]any + if err := json.Unmarshal(broker.producer.published[0].message, &envelope); err != nil { + t.Fatalf("unmarshal: %v", err) + } + // When encryption is enabled, buildEventEnvelope always wraps under "data". + data, ok := envelope["data"].(map[string]any) + if !ok { + t.Fatalf("expected envelope with data field; phone encrypted events always produce a wrapper") + } + if data["phone"] == nil { + t.Error("phone field missing from data; encryption may not have run") + } + if data["phone"] == "+15550000001" { + t.Error("phone should be encrypted, not plaintext") + } +} + +func TestEventPublishStep_NoEncryption_Unchanged(t *testing.T) { + // Verify that when no encryption config is set, behaviour is identical to before. + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-no-enc", map[string]any{ + "topic": "events", + "broker": "bus", + "payload": map[string]any{ + "phone": "+15559999999", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var payload map[string]any + if err := json.Unmarshal(broker.producer.published[0].message, &payload); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if payload["phone"] != "+15559999999" { + t.Errorf("expected phone unchanged, got %v", payload["phone"]) + } +} + +func TestEventPublishStep_EncryptionConfigMissingKey_Ignored(t *testing.T) { + // Encryption config without key_id should be silently ignored (not configured). + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + factory := NewEventPublishStepFactory() + step, err := factory("pub-no-keyid", map[string]any{ + "topic": "events", + "broker": "bus", + "payload": map[string]any{ + "phone": "+15558888888", + }, + "encryption": map[string]any{ + "fields": []any{"phone"}, + // key_id intentionally missing + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var payload map[string]any + if err := json.Unmarshal(broker.producer.published[0].message, &payload); err != nil { + t.Fatalf("unmarshal: %v", err) + } + // Without encryption, phone should be plaintext. + if payload["phone"] != "+15558888888" { + t.Errorf("expected plaintext phone, got %v", payload["phone"]) + } +} + +func TestEventPublishStep_UnsupportedProvider_Error(t *testing.T) { + factory := NewEventPublishStepFactory() + _, err := factory("pub-bad-provider", map[string]any{ + "topic": "events", + "broker": "bus", + "encryption": map[string]any{ + "provider": "kms", // unsupported + "key_id": "${SOME_KEY}", + "fields": []any{"phone"}, + }, + }, NewMockApplication()) + if err == nil { + t.Fatal("expected error for unsupported provider") + } + if !strings.Contains(err.Error(), "unsupported encryption provider") { + t.Errorf("expected 'unsupported encryption provider' error, got: %v", err) + } +} + +func TestEventPublishStep_UnsupportedAlgorithm_Error(t *testing.T) { + factory := NewEventPublishStepFactory() + _, err := factory("pub-bad-algo", map[string]any{ + "topic": "events", + "broker": "bus", + "encryption": map[string]any{ + "key_id": "${SOME_KEY}", + "fields": []any{"phone"}, + "algorithm": "ChaCha20-Poly1305", // unsupported + }, + }, NewMockApplication()) + if err == nil { + t.Fatal("expected error for unsupported algorithm") + } + if !strings.Contains(err.Error(), "unsupported encryption algorithm") { + t.Errorf("expected 'unsupported encryption algorithm' error, got: %v", err) + } +} + +func TestEventPublishStep_EncryptionWithoutBroker_Error(t *testing.T) { + // Encryption requires a broker so the full envelope (with metadata) is published. + factory := NewEventPublishStepFactory() + _, err := factory("pub-enc-no-broker", map[string]any{ + "topic": "events", + // no broker configured + "encryption": map[string]any{ + "key_id": "${SOME_KEY}", + "fields": []any{"phone"}, + }, + }, NewMockApplication()) + if err == nil { + t.Fatal("expected error when encryption is configured without a broker") + } + if !strings.Contains(err.Error(), "'broker' or 'provider' is required when encryption") { + t.Errorf("expected broker-required error, got: %v", err) + } +} + +// ---- step.event_decrypt ----------------------------------------------------- + +func TestEventDecryptStep_RoundTrip(t *testing.T) { + t.Setenv("DECRYPT_STEP_KEY", "decrypt-step-secret-value") + + // Simulate the publish side: encrypt an event. + cfg := &EventEncryptionConfig{ + Provider: "aes", + KeyID: "${DECRYPT_STEP_KEY}", + Fields: []string{"phone", "message_body"}, + Algorithm: "AES-256-GCM", + } + originalData := map[string]any{ + "phone": "+15551234567", + "message_body": "I need help", + "id": "conv-1", + } + encData, meta, err := applyEventFieldEncryption(originalData, cfg) + if err != nil { + t.Fatalf("encrypt: %v", err) + } + + // Build a CloudEvents envelope as published by step.event_publish. + event := map[string]any{ + "specversion": "1.0", + "type": "test.event", + "source": "/test", + "id": "evt-1", + "time": "2026-01-01T00:00:00Z", + "encryption": meta.Algorithm, + "keyid": meta.KeyID, + "encrypteddek": meta.EncryptedDEK, + "encryptedfields": strings.Join(meta.EncryptedFields, ","), + "data": encData, + } + + // Now decrypt with step.event_decrypt. + factory := NewEventDecryptStepFactory() + step, err := factory("decrypt-step", map[string]any{}, NewMockApplication()) + if err != nil { + t.Fatalf("factory: %v", err) + } + + pc := NewPipelineContext(event, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute: %v", err) + } + + output := result.Output + data, ok := output["data"].(map[string]any) + if !ok { + t.Fatalf("expected data field in output, got %T", output["data"]) + } + + if data["phone"] != "+15551234567" { + t.Errorf("phone: got %v, want +15551234567", data["phone"]) + } + if data["message_body"] != "I need help" { + t.Errorf("message_body: got %v, want 'I need help'", data["message_body"]) + } + if data["id"] != "conv-1" { + t.Errorf("id: got %v, want conv-1", data["id"]) + } + + // CloudEvents attributes should be preserved. + if output["specversion"] != "1.0" { + t.Errorf("specversion should be preserved, got %v", output["specversion"]) + } + if output["type"] != "test.event" { + t.Errorf("type should be preserved, got %v", output["type"]) + } +} + +func TestEventDecryptStep_KeyIDOverride(t *testing.T) { + t.Setenv("OVERRIDE_KEY", "override-master-key-secret") + + // Encrypt using the env-var key. + cfg := &EventEncryptionConfig{ + Provider: "aes", + KeyID: "${OVERRIDE_KEY}", + Fields: []string{"phone"}, + Algorithm: "AES-256-GCM", + } + encData, meta, err := applyEventFieldEncryption(map[string]any{"phone": "+15550001111"}, cfg) + if err != nil { + t.Fatalf("encrypt: %v", err) + } + + // Simulate an event where the keyid extension has been tampered/replaced with + // a wrong value — the step config key_id should override it. + event := map[string]any{ + "encryption": meta.Algorithm, + "keyid": "$SOME_OTHER_UNSET_VAR", // wrong key in event + "encrypteddek": meta.EncryptedDEK, + "encryptedfields": strings.Join(meta.EncryptedFields, ","), + "data": encData, + } + + factory := NewEventDecryptStepFactory() + // The step uses key_id="${OVERRIDE_KEY}" which resolves to the correct value. + step, err := factory("decrypt-override", map[string]any{ + "key_id": "${OVERRIDE_KEY}", + }, NewMockApplication()) + if err != nil { + t.Fatalf("factory: %v", err) + } + + pc := NewPipelineContext(event, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute: %v", err) + } + + data := result.Output["data"].(map[string]any) + if data["phone"] != "+15550001111" { + t.Errorf("phone: got %v, want +15550001111", data["phone"]) + } +} + +func TestEventDecryptStep_NoEncryptionMetadata_Passthrough(t *testing.T) { + // An event without encryption metadata should be passed through unchanged. + event := map[string]any{ + "specversion": "1.0", + "type": "plain.event", + "data": map[string]any{ + "foo": "bar", + }, + } + + factory := NewEventDecryptStepFactory() + step, err := factory("decrypt-passthrough", map[string]any{}, NewMockApplication()) + if err != nil { + t.Fatalf("factory: %v", err) + } + + pc := NewPipelineContext(event, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute: %v", err) + } + + data, ok := result.Output["data"].(map[string]any) + if !ok { + t.Fatal("expected data field in output") + } + if data["foo"] != "bar" { + t.Errorf("expected foo=bar, got %v", data["foo"]) + } +} + +func TestEventDecryptStep_NilData_Passthrough(t *testing.T) { + factory := NewEventDecryptStepFactory() + step, err := factory("decrypt-nil", map[string]any{}, NewMockApplication()) + if err != nil { + t.Fatalf("factory: %v", err) + } + + // NewPipelineContext always creates a non-nil Current map, even with nil trigger data. + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute: %v", err) + } + // No encryption metadata → passthrough of the empty map. + if result.Output == nil { + t.Error("expected non-nil output for empty event") + } +} + +func TestEventDecryptStep_WrongKey_Error(t *testing.T) { + t.Setenv("CORRECT_ENC_KEY", "correct-secret-value-abc") + t.Setenv("WRONG_ENC_KEY", "completely-different-value-xyz") + + cfg := &EventEncryptionConfig{ + Provider: "aes", + KeyID: "${CORRECT_ENC_KEY}", + Fields: []string{"phone"}, + Algorithm: "AES-256-GCM", + } + encData, meta, err := applyEventFieldEncryption(map[string]any{"phone": "+15550002222"}, cfg) + if err != nil { + t.Fatalf("encrypt: %v", err) + } + + // The event's keyid extension points to a different key than was used for encryption. + event := map[string]any{ + "encryption": meta.Algorithm, + "keyid": "${WRONG_ENC_KEY}", // different key → DEK unwrap must fail + "encrypteddek": meta.EncryptedDEK, + "encryptedfields": strings.Join(meta.EncryptedFields, ","), + "data": encData, + } + + factory := NewEventDecryptStepFactory() + step, err := factory("decrypt-wrong-key", map[string]any{}, NewMockApplication()) + if err != nil { + t.Fatalf("factory: %v", err) + } + + pc := NewPipelineContext(event, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error when using wrong key") + } + if !strings.Contains(err.Error(), "unwrap DEK") { + t.Errorf("expected 'unwrap DEK' error, got: %v", err) + } +} + +func TestEventDecryptStep_UnsupportedAlgorithm_Error(t *testing.T) { + // An event with an unknown encryption algorithm should return a clear error. + t.Setenv("SOME_TEST_KEY", "some-test-secret") + + event := map[string]any{ + "encryption": "ChaCha20-Poly1305", // unsupported + "keyid": "${SOME_TEST_KEY}", + "encrypteddek": "dGVzdA==", + "encryptedfields": "phone", + "data": map[string]any{ + "phone": "some-value", + }, + } + + factory := NewEventDecryptStepFactory() + step, err := factory("decrypt-bad-algo", map[string]any{}, NewMockApplication()) + if err != nil { + t.Fatalf("factory: %v", err) + } + + pc := NewPipelineContext(event, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for unsupported algorithm") + } + if !strings.Contains(err.Error(), "unsupported encryption algorithm") { + t.Errorf("expected 'unsupported encryption algorithm' error, got: %v", err) + } +} + +// TestEventPublishAndDecrypt_FullPipeline tests the full publish→decrypt round trip +// using both steps together. +func TestEventPublishAndDecrypt_FullPipeline(t *testing.T) { + t.Setenv("PIPELINE_ENC_KEY", "full-pipeline-integration-secret") + + broker := newMockBroker() + app := mockAppWithBroker("bus", broker) + + // Publish step with encryption. + publishFactory := NewEventPublishStepFactory() + publishStep, err := publishFactory("publish", map[string]any{ + "topic": "healthcare.events", + "broker": "bus", + "event_type": "patient.contact", + "source": "/api/healthcare", + "payload": map[string]any{ + "phone": "+15559990000", + "responder_name": "Dr. Smith", + "case_id": "case-42", + }, + "encryption": map[string]any{ + "provider": "aes", + "key_id": "${PIPELINE_ENC_KEY}", + "fields": []any{"phone", "responder_name"}, + "algorithm": "AES-256-GCM", + }, + }, app) + if err != nil { + t.Fatalf("publish factory: %v", err) + } + + publishCtx := NewPipelineContext(nil, nil) + _, err = publishStep.Execute(context.Background(), publishCtx) + if err != nil { + t.Fatalf("publish: %v", err) + } + + // Retrieve the published message from the broker. + if len(broker.producer.published) != 1 { + t.Fatalf("expected 1 published message, got %d", len(broker.producer.published)) + } + var publishedEnvelope map[string]any + if err := json.Unmarshal(broker.producer.published[0].message, &publishedEnvelope); err != nil { + t.Fatalf("unmarshal: %v", err) + } + + // Simulate consumer receiving the message — decrypt step. + decryptFactory := NewEventDecryptStepFactory() + decryptStep, err := decryptFactory("decrypt", map[string]any{}, app) + if err != nil { + t.Fatalf("decrypt factory: %v", err) + } + + decryptCtx := NewPipelineContext(publishedEnvelope, nil) + decryptResult, err := decryptStep.Execute(context.Background(), decryptCtx) + if err != nil { + t.Fatalf("decrypt: %v", err) + } + + data, ok := decryptResult.Output["data"].(map[string]any) + if !ok { + t.Fatalf("expected data field in decrypt output, got type %T", decryptResult.Output["data"]) + } + + if data["phone"] != "+15559990000" { + t.Errorf("phone: got %v, want +15559990000", data["phone"]) + } + if data["responder_name"] != "Dr. Smith" { + t.Errorf("responder_name: got %v, want 'Dr. Smith'", data["responder_name"]) + } + if data["case_id"] != "case-42" { + t.Errorf("case_id should be unencrypted, got %v", data["case_id"]) + } +} diff --git a/module/pipeline_step_event_encryption.go b/module/pipeline_step_event_encryption.go new file mode 100644 index 00000000..38146b24 --- /dev/null +++ b/module/pipeline_step_event_encryption.go @@ -0,0 +1,233 @@ +package module + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "encoding/base64" + "fmt" + "io" + "maps" + "os" + "strings" +) + +// eventEncryptionMeta holds the per-event encryption metadata that is stored +// as CloudEvents extension attributes alongside the encrypted payload. +type eventEncryptionMeta struct { + // Algorithm is the encryption algorithm used (e.g. "AES-256-GCM"). + Algorithm string + // KeyID is the key identifier used to wrap the DEK. + KeyID string + // EncryptedDEK is the data encryption key (DEK) encrypted with the master key, + // base64-encoded. + EncryptedDEK string + // EncryptedFields is the list of field names that were encrypted. + EncryptedFields []string +} + +// resolveEncryptionKey resolves the master encryption key from the KeyID. +// Only env-var references are accepted ($VAR or ${VAR}) to prevent operators +// from accidentally treating a non-secret identifier (e.g. a KMS ARN) as key +// material. Literal key strings are rejected with a descriptive error. +func resolveEncryptionKey(keyID string) ([]byte, error) { + var raw string + + switch { + case strings.HasPrefix(keyID, "${") && strings.HasSuffix(keyID, "}"): + raw = os.Getenv(keyID[2 : len(keyID)-1]) + case strings.HasPrefix(keyID, "$"): + raw = os.Getenv(keyID[1:]) + default: + return nil, fmt.Errorf("key_id %q must be an environment variable reference (use $VAR or ${VAR})", keyID) + } + + if raw == "" { + return nil, fmt.Errorf("encryption key is empty (key_id=%q resolved to empty string)", keyID) + } + return deriveAES256Key(raw), nil +} + +// deriveAES256Key derives a 32-byte AES-256 key from an arbitrary string by +// taking the SHA-256 hash — matching the behaviour of NewFieldEncryptor. +func deriveAES256Key(raw string) []byte { + // Reuse NewFieldEncryptor to get the same SHA-256 derivation. + enc := NewFieldEncryptor(raw) + return enc.key +} + +// generateDEK generates a random 32-byte Data Encryption Key. +func generateDEK() ([]byte, error) { + dek := make([]byte, 32) + if _, err := io.ReadFull(rand.Reader, dek); err != nil { + return nil, fmt.Errorf("failed to generate DEK: %w", err) + } + return dek, nil +} + +// aesGCMEncryptBytes encrypts plaintext bytes using AES-256-GCM with the given key. +// Returns nonce || ciphertext. +func aesGCMEncryptBytes(key, plaintext []byte) ([]byte, error) { + block, err := aes.NewCipher(key) + if err != nil { + return nil, fmt.Errorf("failed to create AES cipher: %w", err) + } + aead, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("failed to create GCM: %w", err) + } + nonce := make([]byte, aead.NonceSize()) + if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + return nil, fmt.Errorf("failed to generate nonce: %w", err) + } + ciphertext := aead.Seal(nonce, nonce, plaintext, nil) + return ciphertext, nil +} + +// aesGCMDecryptBytes decrypts nonce || ciphertext using AES-256-GCM with the given key. +func aesGCMDecryptBytes(key, data []byte) ([]byte, error) { + block, err := aes.NewCipher(key) + if err != nil { + return nil, fmt.Errorf("failed to create AES cipher: %w", err) + } + aead, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("failed to create GCM: %w", err) + } + nonceSize := aead.NonceSize() + if len(data) < nonceSize { + return nil, fmt.Errorf("ciphertext too short") + } + nonce, ciphertext := data[:nonceSize], data[nonceSize:] + plaintext, err := aead.Open(nil, nonce, ciphertext, nil) + if err != nil { + return nil, fmt.Errorf("AES-GCM decryption failed: %w", err) + } + return plaintext, nil +} + +// encryptFieldWithDEK encrypts a string field value using the DEK. +// Returns the base64-encoded ciphertext. +func encryptFieldWithDEK(dek []byte, plaintext string) (string, error) { + ciphertext, err := aesGCMEncryptBytes(dek, []byte(plaintext)) + if err != nil { + return "", err + } + return base64.StdEncoding.EncodeToString(ciphertext), nil +} + +// decryptFieldWithDEK decrypts a base64-encoded field value using the DEK. +func decryptFieldWithDEK(dek []byte, encoded string) (string, error) { + ciphertext, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + return "", fmt.Errorf("base64 decode failed: %w", err) + } + plaintext, err := aesGCMDecryptBytes(dek, ciphertext) + if err != nil { + return "", err + } + return string(plaintext), nil +} + +// applyEventFieldEncryption encrypts the configured fields in the payload using a +// per-event DEK. The DEK is wrapped with the master key derived from KeyID. +// Returns the modified payload and the encryption metadata to attach to the event envelope. +func applyEventFieldEncryption(payload map[string]any, cfg *EventEncryptionConfig) (map[string]any, *eventEncryptionMeta, error) { + masterKey, err := resolveEncryptionKey(cfg.KeyID) + if err != nil { + return nil, nil, err + } + + dek, err := generateDEK() + if err != nil { + return nil, nil, err + } + + // Wrap the DEK with the master key. + wrappedDEK, err := aesGCMEncryptBytes(masterKey, dek) + if err != nil { + return nil, nil, fmt.Errorf("failed to wrap DEK: %w", err) + } + + // Copy the payload so we don't mutate the caller's map. + result := make(map[string]any, len(payload)) + maps.Copy(result, payload) + + var encrypted []string + for _, field := range cfg.Fields { + val, ok := result[field] + if !ok { + continue + } + strVal, ok := val.(string) + if !ok { + continue + } + enc, encErr := encryptFieldWithDEK(dek, strVal) + if encErr != nil { + return nil, nil, fmt.Errorf("failed to encrypt field %q: %w", field, encErr) + } + result[field] = enc + encrypted = append(encrypted, field) + } + + meta := &eventEncryptionMeta{ + Algorithm: cfg.Algorithm, + KeyID: cfg.KeyID, + EncryptedDEK: base64.StdEncoding.EncodeToString(wrappedDEK), + EncryptedFields: encrypted, + } + return result, meta, nil +} + +// decryptEventFields decrypts fields in an event payload using the wrapped DEK stored +// in the CloudEvents extension attributes. +// +// Parameters: +// - payload: the (possibly nested) data map containing encrypted field values. +// - encryptedDEKB64: base64-encoded wrapped DEK from the "encrypteddek" extension. +// - encryptedFields: comma-separated field names from the "encryptedfields" extension. +// - keyID: master key identifier from the "keyid" extension. +// +// Returns a copy of payload with the specified fields decrypted. +func decryptEventFields(payload map[string]any, encryptedDEKB64, encryptedFields, keyID string) (map[string]any, error) { + masterKey, err := resolveEncryptionKey(keyID) + if err != nil { + return nil, err + } + + wrappedDEK, err := base64.StdEncoding.DecodeString(encryptedDEKB64) + if err != nil { + return nil, fmt.Errorf("failed to base64-decode encrypteddek: %w", err) + } + + dek, err := aesGCMDecryptBytes(masterKey, wrappedDEK) + if err != nil { + return nil, fmt.Errorf("failed to unwrap DEK: %w", err) + } + + result := make(map[string]any, len(payload)) + maps.Copy(result, payload) + + for _, field := range strings.Split(encryptedFields, ",") { + field = strings.TrimSpace(field) + if field == "" { + continue + } + val, ok := result[field] + if !ok { + continue + } + strVal, ok := val.(string) + if !ok { + continue + } + plain, decErr := decryptFieldWithDEK(dek, strVal) + if decErr != nil { + return nil, fmt.Errorf("failed to decrypt field %q: %w", field, decErr) + } + result[field] = plain + } + + return result, nil +} diff --git a/module/pipeline_step_event_publish.go b/module/pipeline_step_event_publish.go index 5fb6e00f..2650e7e9 100644 --- a/module/pipeline_step_event_publish.go +++ b/module/pipeline_step_event_publish.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "github.com/GoCodeAlone/modular" @@ -11,19 +12,32 @@ import ( "github.com/google/uuid" ) +// EventEncryptionConfig holds the field-level encryption configuration for event publishing. +type EventEncryptionConfig struct { + // Provider is the encryption provider: "aes" or "envelope" (default: "aes"). + Provider string + // KeyID is the encryption key identifier. Supports "${ENV_VAR}" env-var references. + KeyID string + // Fields lists the payload field names to encrypt. + Fields []string + // Algorithm is the encryption algorithm (currently only "AES-256-GCM" is supported). + Algorithm string +} + // EventPublishStep publishes events to a messaging broker, EventPublisher, or EventBus // from pipeline execution. It supports CloudEvents envelope format and multiple // provider backends including external plugins (e.g., Bento). type EventPublishStep struct { - name string - topic string - payload map[string]any - headers map[string]string - eventType string - source string - broker string // service name for a MessageBroker or EventPublisher - app modular.Application - tmpl *TemplateEngine + name string + topic string + payload map[string]any + headers map[string]string + eventType string + source string + broker string // service name for a MessageBroker or EventPublisher + app modular.Application + tmpl *TemplateEngine + encryption *EventEncryptionConfig } // NewEventPublishStepFactory returns a StepFactory that creates EventPublishStep instances. @@ -70,6 +84,47 @@ func NewEventPublishStepFactory() StepFactory { step.broker, _ = config["provider"].(string) } + // Parse optional encryption config block. + if encCfg, ok := config["encryption"].(map[string]any); ok { + enc := &EventEncryptionConfig{ + Provider: "aes", + Algorithm: "AES-256-GCM", + } + if p, ok := encCfg["provider"].(string); ok && p != "" { + if p != "aes" && p != "envelope" { + return nil, fmt.Errorf("event_publish step %q: unsupported encryption provider %q (supported: aes, envelope)", name, p) + } + enc.Provider = p + } + if k, ok := encCfg["key_id"].(string); ok { + enc.KeyID = k + } + if a, ok := encCfg["algorithm"].(string); ok && a != "" { + if a != "AES-256-GCM" { + return nil, fmt.Errorf("event_publish step %q: unsupported encryption algorithm %q (supported: AES-256-GCM)", name, a) + } + enc.Algorithm = a + } + if fields, ok := encCfg["fields"].([]any); ok { + for _, f := range fields { + if fs, ok := f.(string); ok && fs != "" { + enc.Fields = append(enc.Fields, fs) + } + } + } else if fields, ok := encCfg["fields"].([]string); ok { + enc.Fields = fields + } + if len(enc.Fields) > 0 && enc.KeyID != "" { + // Encryption requires a broker/provider so that the full envelope + // (including encryption metadata) is published. The EventBus path + // does not carry the envelope's extension attributes. + if step.broker == "" { + return nil, fmt.Errorf("event_publish step %q: 'broker' or 'provider' is required when encryption is configured", name) + } + step.encryption = enc + } + } + return step, nil } } @@ -113,8 +168,17 @@ func (s *EventPublishStep) Execute(ctx context.Context, pc *PipelineContext) (*S } } + // Apply field-level encryption if configured. + var encMeta *eventEncryptionMeta + if s.encryption != nil { + resolvedPayload, encMeta, err = applyEventFieldEncryption(resolvedPayload, s.encryption) + if err != nil { + return nil, fmt.Errorf("event_publish step %q: encryption failed: %w", s.name, err) + } + } + // Build event envelope for broker/EventPublisher paths - event := s.buildEventEnvelope(resolvedPayload, resolvedHeaders, resolvedSource) + event := s.buildEventEnvelope(resolvedPayload, resolvedHeaders, resolvedSource, encMeta) if s.broker != "" { // Try EventPublisher interface first (supports external plugins like Bento) @@ -149,8 +213,9 @@ func (s *EventPublishStep) tryGetEventPublisher() (pub EventPublisher) { // envelope is emitted with specversion, type, source, id, time, and data fields. // When only headers are provided (without event_type/source), the payload is // wrapped as {data, headers} without adding CloudEvents-required attributes. -func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string) map[string]any { - if s.eventType == "" && resolvedSource == "" && len(headers) == 0 { +// Encryption metadata (if present) is added as CloudEvents extension attributes. +func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers map[string]string, resolvedSource string, encMeta *eventEncryptionMeta) map[string]any { + if s.eventType == "" && resolvedSource == "" && len(headers) == 0 && encMeta == nil { return payload } envelope := map[string]any{ @@ -167,6 +232,13 @@ func (s *EventPublishStep) buildEventEnvelope(payload map[string]any, headers ma if len(headers) > 0 { envelope["headers"] = headers } + // Embed encryption metadata as CloudEvents extension attributes. + if encMeta != nil { + envelope["encryption"] = encMeta.Algorithm + envelope["keyid"] = encMeta.KeyID + envelope["encrypteddek"] = encMeta.EncryptedDEK + envelope["encryptedfields"] = strings.Join(encMeta.EncryptedFields, ",") + } return envelope } diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index fd729b96..ad40e47b 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -100,7 +100,8 @@ func New() *Plugin { "step.cli_print", "step.cli_invoke", "step.parallel", - "step.graphql", + "step.graphql", + "step.event_decrypt", }, WorkflowTypes: []string{"pipeline"}, OverridableTypes: []string{"step.authz_check"}, @@ -185,7 +186,8 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.parallel": wrapStepFactory(module.NewParallelStepFactory(func() *module.StepRegistry { return p.concreteStepRegistry })), - "step.graphql": wrapStepFactory(module.NewGraphQLStepFactory()), + "step.graphql": wrapStepFactory(module.NewGraphQLStepFactory()), + "step.event_decrypt": wrapStepFactory(module.NewEventDecryptStepFactory()), } } diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index bad6fee3..87d5204a 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -40,6 +40,7 @@ func TestStepFactories(t *testing.T) { "step.jq", "step.publish", "step.event_publish", + "step.event_decrypt", "step.http_call", "step.request_parse", "step.db_query",