Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,12 @@ public enum FlowFilePolicy {
* When multiple rules match, they will all be executed against the original
* flow file.
*/
USE_ORIGINAL;
USE_ORIGINAL,
/**
* Rules are evaluated in order; each matching rule's actions are applied
* immediately so subsequent rules see the updated attributes. Basic dynamic
* properties are applied first as the baseline; rule actions override them.
* Produces one FlowFile with a single provenance event listing all matched rules.
*/
SEQUENTIAL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

Map<String, Action> defaultActions = this.defaultActions;
List<FlowFile> flowFilesToTransfer = new LinkedList<>();
boolean sequential = false;

// if there is update criteria specified, evaluate it
if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
if (criteria != null && FlowFilePolicy.SEQUENTIAL.equals(criteria.getFlowFilePolicy())) {
// sequential mode: evaluate and apply each matching rule immediately so subsequent rules see updated attributes
incomingFlowFile = evaluateSequential(session, context, criteria, incomingFlowFile, defaultActions, stateInitialAttributes, stateWorkingAttributes);

if (debugEnabled) {
logger.debug("Updated attributes for {}; transferring to '{}'", incomingFlowFile, REL_SUCCESS.getName());
}

sequential = true;
flowFilesToTransfer.add(incomingFlowFile);
} else if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
// apply the actions for each rule and transfer the flowfile
for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
FlowFile match = entry.getKey();
Expand Down Expand Up @@ -579,7 +590,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

for (FlowFile toTransfer: flowFilesToTransfer) {
session.getProvenanceReporter().modifyAttributes(toTransfer);
// sequential FlowFiles already have per-rule provenance events recorded during evaluation
if (!sequential) {
session.getProvenanceReporter().modifyAttributes(toTransfer);
}
}
session.transfer(flowFilesToTransfer, REL_SUCCESS);
}
Expand Down Expand Up @@ -624,6 +638,45 @@ private boolean evaluateCriteria(final ProcessSession session, final ProcessCont
return !matchedRules.isEmpty();
}

// Evaluates rules sequentially, applying each matching rule's actions immediately so that subsequent rules
// evaluate conditions against the progressively updated FlowFile. A single provenance event is emitted
// listing all matched rules.
private FlowFile evaluateSequential(final ProcessSession session, final ProcessContext context, final Criteria criteria, FlowFile flowfile,
final Map<String, Action> defaultActions, final Map<String, String> stateInitialAttributes, final Map<String, String> stateWorkingAttributes) {
Map<String, String> sequentialStateAttributes = stateInitialAttributes;

// apply defaults as the baseline (consistent with the no-match path in other modes)
FlowFile currentFlowFile = executeActions(session, context, null, defaultActions, flowfile, sequentialStateAttributes, stateWorkingAttributes);
if (stateWorkingAttributes != null) {
sequentialStateAttributes = new HashMap<>(stateWorkingAttributes);
}

final List<String> matchedRuleNames = new ArrayList<>();
for (final Rule rule : criteria.getRules()) {
if (evaluateRule(context, rule, currentFlowFile, sequentialStateAttributes)) {
if (debugEnabled) {
getLogger().debug("{} sequential rule '{}' matched; applying actions immediately.", this, rule.getName());
}
currentFlowFile = executeActions(session, context, List.of(rule), Map.of(), currentFlowFile, sequentialStateAttributes, stateWorkingAttributes);
matchedRuleNames.add(rule.getName());

if (stateWorkingAttributes != null) {
sequentialStateAttributes = new HashMap<>(stateWorkingAttributes);
}
}
}

if (!matchedRuleNames.isEmpty()) {
final String joinedNames = String.join(", ", matchedRuleNames);
currentFlowFile = session.putAttribute(currentFlowFile, getClass().getSimpleName() + ".matchedRules", joinedNames);
session.getProvenanceReporter().modifyAttributes(currentFlowFile, "Applied sequential rules: " + joinedNames);
} else if (!defaultActions.isEmpty()) {
session.getProvenanceReporter().modifyAttributes(currentFlowFile);
}

return currentFlowFile;
}

//Evaluates the specified rule on the specified flowfile.
private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile, final Map<String, String> statefulAttributes) {
// go through each condition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ Taken together, this rule says:
**Combining the Basic Usage with the Advanced Usage**

The UpdateAttribute processor allows you to make both basic usage changes (i.e., to every FlowFile) and advanced usage
changes (i.e., conditional) at the same time; however, if they both affect the same attribute(s), then the conditional
changes take precedence. This has the added benefit of supporting a type of "else" construct. In other words, if none of
the rules match for the attribute, then the basic usage changes will be made.
changes (i.e., conditional) at the same time. Basic properties are always applied first as the baseline attribute set.
Advanced rules are then evaluated and, if they match, their actions are applied on top — overriding any basic property
that targets the same attribute name. This has the added benefit of supporting a type of "else" construct: if none of
the advanced rules match a given attribute, the basic property value will stand as the final value.

**Deleting Attributes**

Expand All @@ -156,22 +157,73 @@ The delete attributes function does not produce a Provenance Event if the **alte

**FlowFile Policy**

Another setting in the Advanced UI is the FlowFile Policy. It is located in the upper-left corner of the UI, and it
defines the processor's behavior when multiple rules match. It may be changed using the slide toggle. By default, the
FlowFile Policy is set to use a clone of the original FlowFile for each matching rule.

If the FlowFile policy is set to "use clone", and multiple rules match, then a copy of the incoming FlowFile is created,
such that the number of outgoing FlowFiles is equal to the number of rules that match. In other words, if two rules (A
and B) both match, then there will be two outgoing FlowFiles, one for Rule A and one for Rule B. This can be useful in
situations where you want to add an attribute to use as a flag for routing later. In this example, there will be two
copies of the file available, one to route for the A path, and one to route for the B path.

If the FlowFile policy is set to "use original", then all matching rules are applied to the same incoming FlowFile, and
there is only one outgoing FlowFile with all the attribute changes applied. In this case, the order of the rules matters
and the action for each rule that matches will be applied in that order. If multiple rules contain actions that update
the same attribute, the action from the last matching rule will take precedence. Notably, you can drag and drop the
rules into a certain order within the Rules list once the FlowFile Policy is set to "use original" and the user has
toggled the "Reorder rules" control. While in this reordering mode, other Rule modifications are not allowed.
Another setting in the Advanced UI is the FlowFile Policy. It is located in the upper-left corner of the UI and
defines the processor's behavior when multiple rules match. It is selected using a three-way toggle. By default, the
FlowFile Policy is set to **Clone**.

**Clone**

If the FlowFile policy is set to "Clone", and multiple rules match, then a copy of the incoming FlowFile is created for
each matching rule, such that the number of outgoing FlowFiles equals the number of rules that match. In other words,
if two rules (A and B) both match, then there will be two outgoing FlowFiles, one for Rule A and one for Rule B. This
can be useful in situations where you want to add an attribute to use as a flag for routing later. Each clone receives
the attribute changes from its own matching rule only. All conditions for all rules are evaluated against the original
incoming FlowFile, so no rule can see attribute changes made by another rule.

**Original**

If the FlowFile policy is set to "Original", then all matching rules are applied to the same incoming FlowFile, and
there is only one outgoing FlowFile with all the attribute changes applied. All conditions for all rules are still
evaluated against the original incoming FlowFile — a rule's condition cannot see attribute updates made by a
previously matched rule. If multiple rules contain actions that update the same attribute, the action from the last
matching rule will take precedence. Rule order matters: you can drag and drop the rules into a certain order within the
Rules list once the FlowFile Policy is set to "Original" and the "Reorder rules" control is toggled on. While in
reordering mode, other rule modifications are not allowed.

**Sequential**

If the FlowFile policy is set to "Sequential", rules are evaluated in configured order and applied one at a time to the
same FlowFile, producing one output — similar to "Original". The key difference is that each rule's conditions are
evaluated against the FlowFile *as it has been updated by all previously matched rules*, rather than against the
original. When a rule's conditions match, its actions are applied immediately before the next rule is evaluated.

This enables incremental, dependent attribute transformations within a single processor — for example, initialising a
running total in one rule and then adding optional values to it in subsequent rules. Rule order is significant; use the
"Reorder rules" control to set the desired evaluation sequence.

*Basic properties and Sequential rules:* Basic dynamic properties (defined outside the Advanced UI) are always applied
first as the baseline for the FlowFile. Sequential rules are then evaluated on top of that baseline. If a sequential
rule sets the same attribute as a basic property, the rule's value takes precedence. If no sequential rules match,
the basic property values remain on the output FlowFile unchanged.

*Written attributes in Sequential mode:* When one or more rules match, the processor writes an
`UpdateAttribute.matchedRules` attribute to the FlowFile containing the names of all matched rules in evaluation
order, comma-separated. This attribute is not written when no rules match.

*Example — accumulating an optional hotel bill:*

Assume the following incoming attributes: `room.rate=200`, `room.service=35`, `in.room.entertainment=15`
(no `late.checkout` attribute).

| Rule | Condition | Action |
|---|---|---|
| Init | (always matches) | `hotel.bill.total = 0` |
| Room rate | `${room.rate:isEmpty():not()}` | `hotel.bill.total = ${hotel.bill.total:plus(${room.rate})}` |
| Room service | `${room.service:isEmpty():not()}` | `hotel.bill.total = ${hotel.bill.total:plus(${room.service})}` |
| Entertainment | `${in.room.entertainment:isEmpty():not()}` | `hotel.bill.total = ${hotel.bill.total:plus(${in.room.entertainment})}` |
| Late checkout | `${late.checkout:isEmpty():not()}` | `hotel.bill.total = ${hotel.bill.total:plus(${late.checkout})}` |

Sequential evaluation result:

* After Init: `hotel.bill.total = 0`
* After Room rate: `hotel.bill.total = 200`
* After Room service: `hotel.bill.total = 235`
* After Entertainment: `hotel.bill.total = 250`
* Late checkout skipped (attribute not present)
* **Final: `hotel.bill.total = 250`**

This would not be achievable with Clone or Original policies because those evaluate all rule conditions against the
original FlowFile, meaning `hotel.bill.total` would be `0` when every rule's action expression is evaluated.

**Filtering Rules**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.attributes.UpdateAttribute;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
import org.apache.nifi.util.MockFlowFile;
Expand Down Expand Up @@ -895,6 +897,68 @@ void testMigrateProperties() {
assertEquals(expectedRenamed, propertyMigrationResult.getPropertiesRenamed());
}

@Test
public void testSequential() {
// Covers incremental accumulation across rules, prior-rule attribute visibility, conditions that reference
// attributes set by earlier rules, skipped non-matching rules, single output FlowFile, basic-property
// baseline, matchedRules attribute, and a single provenance event listing all matched rules.
final Criteria criteria = getCriteria();
criteria.setFlowFilePolicy(FlowFilePolicy.SEQUENTIAL);
addRule(criteria, "init", List.of("${literal(1):equals('1')}"), Map.of("hotel.bill.total", "0"));
addRule(criteria, "room.rate", List.of("${room.rate:isEmpty():not()}"), Map.of("hotel.bill.total", "${hotel.bill.total:plus(${room.rate})}"));
addRule(criteria, "room.service", List.of("${room.service:isEmpty():not()}"), Map.of("hotel.bill.total", "${hotel.bill.total:plus(${room.service})}"));
addRule(criteria, "entertainment", List.of("${in.room.entertainment:isEmpty():not()}"), Map.of("hotel.bill.total", "${hotel.bill.total:plus(${in.room.entertainment})}"));
addRule(criteria, "large.bill", List.of("${hotel.bill.total:toNumber():gt(100)}"), Map.of("bill.category", "large"));
addRule(criteria, "late.checkout", List.of("${late.checkout:isEmpty():not()}"), Map.of("hotel.bill.total", "${hotel.bill.total:plus(${late.checkout})}"));

runner.setProperty("currency", "USD");
runner.setAnnotationData(serialize(criteria));
runner.enqueue(new byte[0], Map.of("room.rate", "200", "room.service", "35", "in.room.entertainment", "15"));
runner.run();

runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
final MockFlowFile ff = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).getFirst();

ff.assertAttributeEquals("hotel.bill.total", "250");
ff.assertAttributeEquals("bill.category", "large");
ff.assertAttributeEquals("currency", "USD");
ff.assertAttributeNotExists("late.checkout");
ff.assertAttributeEquals("UpdateAttribute.matchedRules", "init, room.rate, room.service, entertainment, large.bill");

final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, events.getFirst().getEventType());
assertTrue(events.getFirst().getDetails().contains("init"));
assertTrue(events.getFirst().getDetails().contains("large.bill"));
}

@Test
public void testSequentialUsesUpdatedStateForFollowingRules() throws Exception {
final Criteria criteria = getCriteria();
criteria.setFlowFilePolicy(FlowFilePolicy.SEQUENTIAL);
addRule(criteria, "increment",
List.of("${literal(1):equals('1')}"),
Map.of("count", "${getStateValue('count'):plus(1)}"));
addRule(criteria, "mark-threshold",
List.of("${getStateValue('count'):ge(1)}"),
Map.of("threshold.reached", "true"));

runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
runner.setProperty("count", "${getStateValue('count')}");
runner.setAnnotationData(serialize(criteria));

runner.enqueue(new byte[0]);
runner.run();

runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals("count", "1");
flowFile.assertAttributeEquals("threshold.reached", "true");

mockStateManager.assertStateEquals(Map.of("count", "1", "threshold.reached", "true"), Scope.LOCAL);
}

private Criteria getCriteria() {
return new Criteria();
}
Expand Down
Loading
Loading