Skip to content

Commit 89e6f9e

Browse files
Copilotintel352
andauthored
feat: field-level envelope encryption for step.event_publish + step.event_decrypt (#313)
* Initial plan * feat: add field-level encryption to step.event_publish and step.event_decrypt Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * fix: address PR review - env-var only keys, validate provider/algo, broker required, fix test assertion, decrypt algorithm validation Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * fix: add step.event_decrypt to plugin test list, documentation, and wfctl type registry Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> Co-authored-by: Jonathan Langevin <codingsloth@pm.me>
1 parent 159a3d4 commit 89e6f9e

8 files changed

Lines changed: 1196 additions & 14 deletions

DOCUMENTATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ flowchart TD
143143
| `step.log` | Logs pipeline data for debugging | pipelinesteps |
144144
| `step.publish` | Publishes events to EventBus | pipelinesteps |
145145
| `step.event_publish` | Publishes events to EventBus with full envelope control | pipelinesteps |
146+
| `step.event_decrypt` | Decrypts field-level-encrypted CloudEvents produced by step.event_publish | pipelinesteps |
146147
| `step.http_call` | Makes outbound HTTP requests | pipelinesteps |
147148
| `step.graphql` | Execute GraphQL queries/mutations with data extraction, pagination, batching, APQ | pipelinesteps |
148149
| `step.delegate` | Delegates to a named service | pipelinesteps |

cmd/wfctl/type_registry.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,11 @@ func KnownStepTypes() map[string]StepTypeInfo {
582582
Plugin: "pipelinesteps",
583583
ConfigKeys: []string{"topic", "stream", "broker", "provider", "payload", "data", "headers", "event_type", "source"},
584584
},
585+
"step.event_decrypt": {
586+
Type: "step.event_decrypt",
587+
Plugin: "pipelinesteps",
588+
ConfigKeys: []string{"key_id"},
589+
},
585590
"step.http_call": {
586591
Type: "step.http_call",
587592
Plugin: "pipelinesteps",
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package module
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/GoCodeAlone/modular"
8+
)
9+
10+
// EventDecryptStep decrypts field-level encryption applied by step.event_publish.
11+
// It reads the CloudEvents extension attributes ("encrypteddek", "encryptedfields",
12+
// "keyid") from the current pipeline context and decrypts the specified fields
13+
// inside the event's "data" object.
14+
type EventDecryptStep struct {
15+
name string
16+
keyID string // overrides the keyid extension when set
17+
app modular.Application
18+
tmpl *TemplateEngine
19+
}
20+
21+
// NewEventDecryptStepFactory returns a StepFactory that creates EventDecryptStep instances.
22+
func NewEventDecryptStepFactory() StepFactory {
23+
return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) {
24+
step := &EventDecryptStep{
25+
name: name,
26+
app: app,
27+
tmpl: NewTemplateEngine(),
28+
}
29+
30+
// key_id overrides the keyid extension found in the event.
31+
// Supports "${ENV_VAR}" references.
32+
if k, ok := config["key_id"].(string); ok {
33+
step.keyID = k
34+
}
35+
36+
return step, nil
37+
}
38+
}
39+
40+
// Name returns the step name.
41+
func (s *EventDecryptStep) Name() string { return s.name }
42+
43+
// supportedEncryptionAlgorithm is the only algorithm this step can decrypt.
44+
const supportedEncryptionAlgorithm = "AES-256-GCM"
45+
46+
// Execute decrypts the fields in the incoming CloudEvent.
47+
//
48+
// Expected shape of pc.Current (CloudEvents envelope from step.event_publish):
49+
//
50+
// {
51+
// "specversion": "1.0", // optional
52+
// "type": "...", // optional
53+
// "source": "...", // optional
54+
// "id": "...", // optional
55+
// "time": "...", // optional
56+
// "encryption": "AES-256-GCM", // extension — validated before decryption
57+
// "keyid": "<key-id>", // extension
58+
// "encrypteddek": "<base64>", // extension
59+
// "encryptedfields": "field1,field2", // extension
60+
// "data": { // payload with encrypted fields
61+
// "field1": "<base64-ciphertext>",
62+
// "field2": "<base64-ciphertext>",
63+
// ...
64+
// }
65+
// }
66+
//
67+
// The step returns the same envelope structure with "data" containing the
68+
// decrypted field values. The encryption extension attributes are preserved.
69+
func (s *EventDecryptStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
70+
event := pc.Current
71+
if event == nil {
72+
return &StepResult{Output: map[string]any{"decrypted": false, "reason": "no event data"}}, nil
73+
}
74+
75+
// Read encryption extension attributes.
76+
encryptedDEKB64, _ := event["encrypteddek"].(string)
77+
encryptedFields, _ := event["encryptedfields"].(string)
78+
keyID, _ := event["keyid"].(string)
79+
algorithm, _ := event["encryption"].(string)
80+
81+
// Override keyID from step configuration if provided.
82+
if s.keyID != "" {
83+
keyID = s.keyID
84+
}
85+
86+
// If the event has no encryption metadata, pass through unchanged.
87+
if encryptedDEKB64 == "" || encryptedFields == "" || keyID == "" {
88+
return &StepResult{Output: event}, nil
89+
}
90+
91+
// Validate the encryption algorithm before attempting decryption.
92+
// Events produced by an unknown scheme should not silently fail or
93+
// produce garbage — return a clear error instead.
94+
if algorithm != "" && algorithm != supportedEncryptionAlgorithm {
95+
return nil, fmt.Errorf("event_decrypt step %q: unsupported encryption algorithm %q (supported: %s)", s.name, algorithm, supportedEncryptionAlgorithm)
96+
}
97+
98+
// Locate the payload — either under "data" (CloudEvents envelope) or the event itself.
99+
payload, hasData := event["data"].(map[string]any)
100+
if !hasData {
101+
// Treat the whole event as the payload (flat structure without envelope).
102+
payload = event
103+
}
104+
105+
decrypted, err := decryptEventFields(payload, encryptedDEKB64, encryptedFields, keyID)
106+
if err != nil {
107+
return nil, fmt.Errorf("event_decrypt step %q: %w", s.name, err)
108+
}
109+
110+
// Rebuild the output envelope, preserving all non-data fields.
111+
output := make(map[string]any, len(event))
112+
for k, v := range event {
113+
output[k] = v
114+
}
115+
if hasData {
116+
output["data"] = decrypted
117+
} else {
118+
// Merge decrypted fields back into the top-level map.
119+
for k, v := range decrypted {
120+
output[k] = v
121+
}
122+
}
123+
124+
return &StepResult{Output: output}, nil
125+
}

0 commit comments

Comments
 (0)