@@ -122,8 +122,13 @@ let engine = RulesEngine::new(vec![rule], None);
122122- ** Full Context Access:** Conditions can access any field — ` data ` , ` metadata ` , ` temp_data ` .
123123- ** Async-First Architecture:** Native async/await support with Tokio for high-throughput processing.
124124- ** Execution Tracing:** Step-by-step debugging with message snapshots after each action.
125- - ** Built-in Functions:** Parse (JSON/XML), Map, Validate, and Publish (JSON/XML) for complete data pipelines.
125+ - ** Built-in Functions:** Parse, Map, Validate, Filter, Log, and Publish for complete data pipelines.
126+ - ** Pipeline Control Flow:** Filter/gate function to halt workflows or skip tasks based on conditions.
127+ - ** Channel Routing:** Route messages to specific workflow channels with O(1) lookup.
128+ - ** Workflow Lifecycle:** Manage workflow status (active/paused/archived), versioning, and tagging.
129+ - ** Hot Reload:** Swap workflows at runtime without re-registering custom functions.
126130- ** Extensible:** Add custom async actions by implementing the ` AsyncFunctionHandler ` trait.
131+ - ** Typed Integration Configs:** Pre-validated configs for HTTP, Enrich, and Kafka integrations.
127132- ** WebAssembly Support:** Run rules in the browser with ` @goplasmatic/dataflow-wasm ` .
128133- ** React UI Components:** Visualize and debug rules with ` @goplasmatic/dataflow-ui ` .
129134- ** Auditing:** Full audit trail of all changes as data flows through the pipeline.
@@ -200,9 +205,104 @@ let engine = Engine::new(workflows, Some(custom_functions));
200205| ` parse_xml ` | Parse XML string into JSON data structure | Yes |
201206| ` map ` | Data transformation using JSONLogic | Yes |
202207| ` validation ` | Rule-based data validation | No (read-only) |
208+ | ` filter ` | Pipeline control flow — halt workflow or skip task | No |
209+ | ` log ` | Structured logging with JSONLogic expressions | No |
203210| ` publish_json ` | Serialize data to JSON string | Yes |
204211| ` publish_xml ` | Serialize data to XML string | Yes |
205212
213+ ### Filter (Pipeline Control Flow)
214+
215+ The ` filter ` function evaluates a JSONLogic condition and controls pipeline execution:
216+
217+ ``` json
218+ {
219+ "function" : {
220+ "name" : " filter" ,
221+ "input" : {
222+ "condition" : {"==" : [{"var" : " data.status" }, " active" ]},
223+ "on_reject" : " halt"
224+ }
225+ }
226+ }
227+ ```
228+
229+ - ` on_reject: "halt" ` — stops the entire workflow when the condition is false
230+ - ` on_reject: "skip" ` — skips just the current task and continues
231+
232+ ### Log (Structured Logging)
233+
234+ The ` log ` function outputs structured log messages using the ` log ` crate:
235+
236+ ``` json
237+ {
238+ "function" : {
239+ "name" : " log" ,
240+ "input" : {
241+ "level" : " info" ,
242+ "message" : {"cat" : [" Processing order " , {"var" : " data.order.id" }]},
243+ "fields" : {
244+ "total" : {"var" : " data.order.total" },
245+ "user" : {"var" : " data.user.name" }
246+ }
247+ }
248+ }
249+ }
250+ ```
251+
252+ Log levels: ` trace ` , ` debug ` , ` info ` , ` warn ` , ` error ` . Messages and fields support JSONLogic expressions.
253+
254+ ## Channel Routing
255+
256+ Route messages to specific workflow channels for efficient O(1) dispatch:
257+
258+ ``` rust
259+ // Workflows define their channel
260+ // { "id": "order_rule", "channel": "orders", "status": "active", ... }
261+
262+ // Process only workflows on a specific channel
263+ engine . process_message_for_channel (" orders" , & mut message ). await ? ;
264+ ```
265+
266+ Only ` active ` workflows are included in channel routing. Workflows default to the ` "default" ` channel.
267+
268+ ## Workflow Lifecycle
269+
270+ Workflows support lifecycle management fields:
271+
272+ ``` json
273+ {
274+ "id" : " my_rule" ,
275+ "channel" : " orders" ,
276+ "version" : 2 ,
277+ "status" : " active" ,
278+ "tags" : [" premium" , " high-priority" ],
279+ "created_at" : " 2025-01-15T10:00:00Z" ,
280+ "updated_at" : " 2025-06-01T14:30:00Z" ,
281+ "tasks" : [... ]
282+ }
283+ ```
284+
285+ | Field | Type | Default | Description |
286+ | -------| ------| ---------| -------------|
287+ | ` channel ` | string | ` "default" ` | Channel for message routing |
288+ | ` version ` | number | ` 1 ` | Workflow version |
289+ | ` status ` | string | ` "active" ` | ` active ` , ` paused ` , or ` archived ` |
290+ | ` tags ` | array | ` [] ` | Arbitrary tags for organization |
291+ | ` created_at ` | datetime | ` null ` | Creation timestamp (ISO 8601) |
292+ | ` updated_at ` | datetime | ` null ` | Last update timestamp (ISO 8601) |
293+
294+ All fields are optional and backward-compatible with existing configurations.
295+
296+ ## Engine Hot Reload
297+
298+ Swap workflows at runtime without losing custom function registrations:
299+
300+ ``` rust
301+ let new_workflows = vec! [Workflow :: from_json (r # " { ... }" # )? ];
302+ let new_engine = engine . with_new_workflows (new_workflows );
303+ // Old engine remains valid for in-flight messages
304+ ```
305+
206306## Related Packages
207307
208308| Package | Description |
0 commit comments