From cfa694afbdd2f1839387b2c55d85d11fe79e9052 Mon Sep 17 00:00:00 2001 From: Richard Scott Date: Thu, 26 Mar 2026 20:01:49 +1100 Subject: [PATCH] NIFI-15737: Add Sequential FlowFile policy to UpdateAttribute --- .../update/attributes/FlowFilePolicy.java | 9 +- .../attributes/UpdateAttribute.java | 57 +++++++++++- .../additionalDetails.md | 90 +++++++++++++++---- .../attributes/TestUpdateAttribute.java | 64 +++++++++++++ .../rule-listing/rule-listing.component.html | 39 +++++--- .../rule-listing.component.spec.ts | 20 +++++ .../ui/rule-listing/rule-listing.component.ts | 22 +++-- 7 files changed, 260 insertions(+), 41 deletions(-) diff --git a/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/FlowFilePolicy.java b/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/FlowFilePolicy.java index 4749870162e0..33f39d1e78ec 100644 --- a/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/FlowFilePolicy.java +++ b/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-model/src/main/java/org/apache/nifi/update/attributes/FlowFilePolicy.java @@ -30,5 +30,12 @@ public enum FlowFilePolicy { * When multiple rules match, they will all be executed against the original * flow file. */ - USE_ORIGINAL; + USE_ORIGINAL, + /** + * Rules are evaluated in order; each matching rule's actions are applied + * immediately so subsequent rules see the updated attributes. Basic dynamic + * properties are applied first as the baseline; rule actions override them. + * Produces one FlowFile with a single provenance event listing all matched rules. + */ + SEQUENTIAL; } diff --git a/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 0a3a781e1498..6e09a235efca 100644 --- a/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -509,9 +509,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session Map defaultActions = this.defaultActions; List flowFilesToTransfer = new LinkedList<>(); + boolean sequential = false; // if there is update criteria specified, evaluate it - if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { + if (criteria != null && FlowFilePolicy.SEQUENTIAL.equals(criteria.getFlowFilePolicy())) { + // sequential mode: evaluate and apply each matching rule immediately so subsequent rules see updated attributes + incomingFlowFile = evaluateSequential(session, context, criteria, incomingFlowFile, defaultActions, stateInitialAttributes, stateWorkingAttributes); + + if (debugEnabled) { + logger.debug("Updated attributes for {}; transferring to '{}'", incomingFlowFile, REL_SUCCESS.getName()); + } + + sequential = true; + flowFilesToTransfer.add(incomingFlowFile); + } else if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) { // apply the actions for each rule and transfer the flowfile for (final Map.Entry> entry : matchedRules.entrySet()) { FlowFile match = entry.getKey(); @@ -579,7 +590,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } for (FlowFile toTransfer: flowFilesToTransfer) { - session.getProvenanceReporter().modifyAttributes(toTransfer); + // sequential FlowFiles already have per-rule provenance events recorded during evaluation + if (!sequential) { + session.getProvenanceReporter().modifyAttributes(toTransfer); + } } session.transfer(flowFilesToTransfer, REL_SUCCESS); } @@ -624,6 +638,45 @@ private boolean evaluateCriteria(final ProcessSession session, final ProcessCont return !matchedRules.isEmpty(); } + // Evaluates rules sequentially, applying each matching rule's actions immediately so that subsequent rules + // evaluate conditions against the progressively updated FlowFile. A single provenance event is emitted + // listing all matched rules. + private FlowFile evaluateSequential(final ProcessSession session, final ProcessContext context, final Criteria criteria, FlowFile flowfile, + final Map defaultActions, final Map stateInitialAttributes, final Map stateWorkingAttributes) { + Map sequentialStateAttributes = stateInitialAttributes; + + // apply defaults as the baseline (consistent with the no-match path in other modes) + FlowFile currentFlowFile = executeActions(session, context, null, defaultActions, flowfile, sequentialStateAttributes, stateWorkingAttributes); + if (stateWorkingAttributes != null) { + sequentialStateAttributes = new HashMap<>(stateWorkingAttributes); + } + + final List matchedRuleNames = new ArrayList<>(); + for (final Rule rule : criteria.getRules()) { + if (evaluateRule(context, rule, currentFlowFile, sequentialStateAttributes)) { + if (debugEnabled) { + getLogger().debug("{} sequential rule '{}' matched; applying actions immediately.", this, rule.getName()); + } + currentFlowFile = executeActions(session, context, List.of(rule), Map.of(), currentFlowFile, sequentialStateAttributes, stateWorkingAttributes); + matchedRuleNames.add(rule.getName()); + + if (stateWorkingAttributes != null) { + sequentialStateAttributes = new HashMap<>(stateWorkingAttributes); + } + } + } + + if (!matchedRuleNames.isEmpty()) { + final String joinedNames = String.join(", ", matchedRuleNames); + currentFlowFile = session.putAttribute(currentFlowFile, getClass().getSimpleName() + ".matchedRules", joinedNames); + session.getProvenanceReporter().modifyAttributes(currentFlowFile, "Applied sequential rules: " + joinedNames); + } else if (!defaultActions.isEmpty()) { + session.getProvenanceReporter().modifyAttributes(currentFlowFile); + } + + return currentFlowFile; + } + //Evaluates the specified rule on the specified flowfile. private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile, final Map statefulAttributes) { // go through each condition diff --git a/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.md b/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.md index 3f730b68b8ae..cdfc041dd23e 100644 --- a/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.md +++ b/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.md @@ -136,9 +136,10 @@ Taken together, this rule says: **Combining the Basic Usage with the Advanced Usage** The UpdateAttribute processor allows you to make both basic usage changes (i.e., to every FlowFile) and advanced usage -changes (i.e., conditional) at the same time; however, if they both affect the same attribute(s), then the conditional -changes take precedence. This has the added benefit of supporting a type of "else" construct. In other words, if none of -the rules match for the attribute, then the basic usage changes will be made. +changes (i.e., conditional) at the same time. Basic properties are always applied first as the baseline attribute set. +Advanced rules are then evaluated and, if they match, their actions are applied on top — overriding any basic property +that targets the same attribute name. This has the added benefit of supporting a type of "else" construct: if none of +the advanced rules match a given attribute, the basic property value will stand as the final value. **Deleting Attributes** @@ -156,22 +157,73 @@ The delete attributes function does not produce a Provenance Event if the **alte **FlowFile Policy** -Another setting in the Advanced UI is the FlowFile Policy. It is located in the upper-left corner of the UI, and it -defines the processor's behavior when multiple rules match. It may be changed using the slide toggle. By default, the -FlowFile Policy is set to use a clone of the original FlowFile for each matching rule. - -If the FlowFile policy is set to "use clone", and multiple rules match, then a copy of the incoming FlowFile is created, -such that the number of outgoing FlowFiles is equal to the number of rules that match. In other words, if two rules (A -and B) both match, then there will be two outgoing FlowFiles, one for Rule A and one for Rule B. This can be useful in -situations where you want to add an attribute to use as a flag for routing later. In this example, there will be two -copies of the file available, one to route for the A path, and one to route for the B path. - -If the FlowFile policy is set to "use original", then all matching rules are applied to the same incoming FlowFile, and -there is only one outgoing FlowFile with all the attribute changes applied. In this case, the order of the rules matters -and the action for each rule that matches will be applied in that order. If multiple rules contain actions that update -the same attribute, the action from the last matching rule will take precedence. Notably, you can drag and drop the -rules into a certain order within the Rules list once the FlowFile Policy is set to "use original" and the user has -toggled the "Reorder rules" control. While in this reordering mode, other Rule modifications are not allowed. +Another setting in the Advanced UI is the FlowFile Policy. It is located in the upper-left corner of the UI and +defines the processor's behavior when multiple rules match. It is selected using a three-way toggle. By default, the +FlowFile Policy is set to **Clone**. + +**Clone** + +If the FlowFile policy is set to "Clone", and multiple rules match, then a copy of the incoming FlowFile is created for +each matching rule, such that the number of outgoing FlowFiles equals the number of rules that match. In other words, +if two rules (A and B) both match, then there will be two outgoing FlowFiles, one for Rule A and one for Rule B. This +can be useful in situations where you want to add an attribute to use as a flag for routing later. Each clone receives +the attribute changes from its own matching rule only. All conditions for all rules are evaluated against the original +incoming FlowFile, so no rule can see attribute changes made by another rule. + +**Original** + +If the FlowFile policy is set to "Original", then all matching rules are applied to the same incoming FlowFile, and +there is only one outgoing FlowFile with all the attribute changes applied. All conditions for all rules are still +evaluated against the original incoming FlowFile — a rule's condition cannot see attribute updates made by a +previously matched rule. If multiple rules contain actions that update the same attribute, the action from the last +matching rule will take precedence. Rule order matters: you can drag and drop the rules into a certain order within the +Rules list once the FlowFile Policy is set to "Original" and the "Reorder rules" control is toggled on. While in +reordering mode, other rule modifications are not allowed. + +**Sequential** + +If the FlowFile policy is set to "Sequential", rules are evaluated in configured order and applied one at a time to the +same FlowFile, producing one output — similar to "Original". The key difference is that each rule's conditions are +evaluated against the FlowFile *as it has been updated by all previously matched rules*, rather than against the +original. When a rule's conditions match, its actions are applied immediately before the next rule is evaluated. + +This enables incremental, dependent attribute transformations within a single processor — for example, initialising a +running total in one rule and then adding optional values to it in subsequent rules. Rule order is significant; use the +"Reorder rules" control to set the desired evaluation sequence. + +*Basic properties and Sequential rules:* Basic dynamic properties (defined outside the Advanced UI) are always applied +first as the baseline for the FlowFile. Sequential rules are then evaluated on top of that baseline. If a sequential +rule sets the same attribute as a basic property, the rule's value takes precedence. If no sequential rules match, +the basic property values remain on the output FlowFile unchanged. + +*Written attributes in Sequential mode:* When one or more rules match, the processor writes an +`UpdateAttribute.matchedRules` attribute to the FlowFile containing the names of all matched rules in evaluation +order, comma-separated. This attribute is not written when no rules match. + +*Example — accumulating an optional hotel bill:* + +Assume the following incoming attributes: `room.rate=200`, `room.service=35`, `in.room.entertainment=15` +(no `late.checkout` attribute). + +| Rule | Condition | Action | +|---|---|---| +| Init | (always matches) | `hotel.bill.total = 0` | +| Room rate | `${room.rate:isEmpty():not()}` | `hotel.bill.total = ${hotel.bill.total:plus(${room.rate})}` | +| Room service | `${room.service:isEmpty():not()}` | `hotel.bill.total = ${hotel.bill.total:plus(${room.service})}` | +| Entertainment | `${in.room.entertainment:isEmpty():not()}` | `hotel.bill.total = ${hotel.bill.total:plus(${in.room.entertainment})}` | +| Late checkout | `${late.checkout:isEmpty():not()}` | `hotel.bill.total = ${hotel.bill.total:plus(${late.checkout})}` | + +Sequential evaluation result: + +* After Init: `hotel.bill.total = 0` +* After Room rate: `hotel.bill.total = 200` +* After Room service: `hotel.bill.total = 235` +* After Entertainment: `hotel.bill.total = 250` +* Late checkout skipped (attribute not present) +* **Final: `hotel.bill.total = 250`** + +This would not be achievable with Clone or Original policies because those evaluate all rule conditions against the +original FlowFile, meaning `hotel.bill.total` would be `0` when every rule's action expression is evaluated. **Filtering Rules** diff --git a/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index c83d877e74a6..c0c2c2da70a7 100644 --- a/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-extension-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -20,6 +20,8 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.attributes.UpdateAttribute; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.update.attributes.serde.CriteriaSerDe; import org.apache.nifi.util.MockFlowFile; @@ -895,6 +897,68 @@ void testMigrateProperties() { assertEquals(expectedRenamed, propertyMigrationResult.getPropertiesRenamed()); } + @Test + public void testSequential() { + // Covers incremental accumulation across rules, prior-rule attribute visibility, conditions that reference + // attributes set by earlier rules, skipped non-matching rules, single output FlowFile, basic-property + // baseline, matchedRules attribute, and a single provenance event listing all matched rules. + final Criteria criteria = getCriteria(); + criteria.setFlowFilePolicy(FlowFilePolicy.SEQUENTIAL); + addRule(criteria, "init", List.of("${literal(1):equals('1')}"), Map.of("hotel.bill.total", "0")); + addRule(criteria, "room.rate", List.of("${room.rate:isEmpty():not()}"), Map.of("hotel.bill.total", "${hotel.bill.total:plus(${room.rate})}")); + addRule(criteria, "room.service", List.of("${room.service:isEmpty():not()}"), Map.of("hotel.bill.total", "${hotel.bill.total:plus(${room.service})}")); + addRule(criteria, "entertainment", List.of("${in.room.entertainment:isEmpty():not()}"), Map.of("hotel.bill.total", "${hotel.bill.total:plus(${in.room.entertainment})}")); + addRule(criteria, "large.bill", List.of("${hotel.bill.total:toNumber():gt(100)}"), Map.of("bill.category", "large")); + addRule(criteria, "late.checkout", List.of("${late.checkout:isEmpty():not()}"), Map.of("hotel.bill.total", "${hotel.bill.total:plus(${late.checkout})}")); + + runner.setProperty("currency", "USD"); + runner.setAnnotationData(serialize(criteria)); + runner.enqueue(new byte[0], Map.of("room.rate", "200", "room.service", "35", "in.room.entertainment", "15")); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1); + final MockFlowFile ff = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).getFirst(); + + ff.assertAttributeEquals("hotel.bill.total", "250"); + ff.assertAttributeEquals("bill.category", "large"); + ff.assertAttributeEquals("currency", "USD"); + ff.assertAttributeNotExists("late.checkout"); + ff.assertAttributeEquals("UpdateAttribute.matchedRules", "init, room.rate, room.service, entertainment, large.bill"); + + final List events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, events.getFirst().getEventType()); + assertTrue(events.getFirst().getDetails().contains("init")); + assertTrue(events.getFirst().getDetails().contains("large.bill")); + } + + @Test + public void testSequentialUsesUpdatedStateForFollowingRules() throws Exception { + final Criteria criteria = getCriteria(); + criteria.setFlowFilePolicy(FlowFilePolicy.SEQUENTIAL); + addRule(criteria, "increment", + List.of("${literal(1):equals('1')}"), + Map.of("count", "${getStateValue('count'):plus(1)}")); + addRule(criteria, "mark-threshold", + List.of("${getStateValue('count'):ge(1)}"), + Map.of("threshold.reached", "true")); + + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); + runner.setProperty("count", "${getStateValue('count')}"); + runner.setAnnotationData(serialize(criteria)); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).getFirst(); + flowFile.assertAttributeEquals("count", "1"); + flowFile.assertAttributeEquals("threshold.reached", "true"); + + mockStateManager.assertStateEquals(Map.of("count", "1", "threshold.reached", "true"), Scope.LOCAL); + } + private Criteria getCriteria() { return new Criteria(); } diff --git a/nifi-frontend/src/main/frontend/apps/update-attribute/src/app/pages/update-attribute/ui/rule-listing/rule-listing.component.html b/nifi-frontend/src/main/frontend/apps/update-attribute/src/app/pages/update-attribute/ui/rule-listing/rule-listing.component.html index 9001da64ea06..fc8253e64382 100644 --- a/nifi-frontend/src/main/frontend/apps/update-attribute/src/app/pages/update-attribute/ui/rule-listing/rule-listing.component.html +++ b/nifi-frontend/src/main/frontend/apps/update-attribute/src/app/pages/update-attribute/ui/rule-listing/rule-listing.component.html @@ -30,18 +30,33 @@ @if (rulesList.length > 0 || !newRule) {
-
-
Use original FlowFile for matching rules
- -
+
FlowFile policy
- + + + Clone + + + Original + + + Sequential + +
@@ -56,7 +71,7 @@ }
- @if (flowFilePolicy === 'USE_ORIGINAL' && !reorderDisabled()) { + @if ((flowFilePolicy === 'USE_ORIGINAL' || flowFilePolicy === 'SEQUENTIAL') && !reorderDisabled()) {
Reorder rules
{ it('should create', () => { expect(component).toBeTruthy(); }); + + it('should disable rule reordering when switching to clone policy', () => { + component.setAllowRuleReordering({ checked: true } as MatSlideToggleChange); + + component.flowFilePolicyChanged('USE_CLONE'); + + expect(component.allowRuleReordering).toBeFalse(); + }); + + it('should disable rule reordering when clone policy is loaded', () => { + component.setAllowRuleReordering({ checked: true } as MatSlideToggleChange); + + component.evaluationContext = { + ruleOrder: [], + flowFilePolicy: 'USE_CLONE' + }; + + expect(component.allowRuleReordering).toBeFalse(); + }); }); diff --git a/nifi-frontend/src/main/frontend/apps/update-attribute/src/app/pages/update-attribute/ui/rule-listing/rule-listing.component.ts b/nifi-frontend/src/main/frontend/apps/update-attribute/src/app/pages/update-attribute/ui/rule-listing/rule-listing.component.ts index 9e8712f70561..222968ab7dbc 100644 --- a/nifi-frontend/src/main/frontend/apps/update-attribute/src/app/pages/update-attribute/ui/rule-listing/rule-listing.component.ts +++ b/nifi-frontend/src/main/frontend/apps/update-attribute/src/app/pages/update-attribute/ui/rule-listing/rule-listing.component.ts @@ -21,6 +21,7 @@ import { afterNextRender, Component, ElementRef, Input, inject } from '@angular/ import { CommonModule } from '@angular/common'; import { Store } from '@ngrx/store'; import { UpdateAttributeState } from '../../state'; +import { MatButtonToggleModule } from '@angular/material/button-toggle'; import { MatSlideToggleChange, MatSlideToggleModule } from '@angular/material/slide-toggle'; import { MatFormFieldModule } from '@angular/material/form-field'; import { FormBuilder, FormGroup, FormsModule, ReactiveFormsModule } from '@angular/forms'; @@ -50,6 +51,7 @@ import { selectEvaluationContextError } from '../../state/evaluation-context/eva selector: 'rule-listing', imports: [ CommonModule, + MatButtonToggleModule, MatSlideToggleModule, MatFormFieldModule, FormsModule, @@ -89,9 +91,9 @@ export class RuleListing { this.isEditable = editable; if (editable) { - this.flowFilePolicyForm.get('useOriginalFlowFilePolicy')?.enable(); + this.flowFilePolicyForm.get('flowFilePolicy')?.enable(); } else { - this.flowFilePolicyForm.get('useOriginalFlowFilePolicy')?.disable(); + this.flowFilePolicyForm.get('flowFilePolicy')?.disable(); } } @@ -107,7 +109,7 @@ export class RuleListing { searchForm: FormGroup; flowFilePolicyForm: FormGroup; - flowFilePolicy = 'USE_ORIGINAL'; + flowFilePolicy: string = 'USE_CLONE'; scrollToNewRule = false; @@ -115,7 +117,7 @@ export class RuleListing { constructor() { this.searchForm = this.formBuilder.group({ searchRules: '' }); - this.flowFilePolicyForm = this.formBuilder.group({ useOriginalFlowFilePolicy: true }); + this.flowFilePolicyForm = this.formBuilder.group({ flowFilePolicy: 'USE_CLONE' }); this.searchForm .get('searchRules') @@ -184,7 +186,10 @@ export class RuleListing { } private updateFlowFilePolicy(): void { - this.flowFilePolicyForm.get('useOriginalFlowFilePolicy')?.setValue(this.flowFilePolicy === 'USE_ORIGINAL'); + if (this.flowFilePolicy === 'USE_CLONE') { + this.allowRuleReordering = false; + } + this.flowFilePolicyForm.get('flowFilePolicy')?.setValue(this.flowFilePolicy); } reorderDisabled(): boolean { @@ -220,12 +225,15 @@ export class RuleListing { this.store.dispatch(promptRuleDeletion({ rule })); } - flowFilePolicyToggled(event: MatSlideToggleChange): void { + flowFilePolicyChanged(value: string): void { + if (value === 'USE_CLONE') { + this.allowRuleReordering = false; + } this.store.dispatch( saveEvaluationContext({ evaluationContext: { ruleOrder: this.ruleOrder, - flowFilePolicy: event.checked ? 'USE_ORIGINAL' : 'USE_CLONE' + flowFilePolicy: value } }) );