Skip to content

Commit ca51668

Browse files
NIFI-15682: Add Bulk Replay feature for provenance events
1 parent 1d2c25f commit ca51668

64 files changed

Lines changed: 8444 additions & 1 deletion

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ public class NiFiProperties extends ApplicationProperties {
104104
public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation";
105105
public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold";
106106

107+
// bulk replay properties
108+
public static final String BULK_REPLAY_MAX_CONCURRENT = "nifi.bulk.replay.max.concurrent";
109+
public static final int DEFAULT_BULK_REPLAY_MAX_CONCURRENT = 5;
110+
public static final String BULK_REPLAY_MAX_JOBS = "nifi.bulk.replay.max.jobs";
111+
public static final int DEFAULT_BULK_REPLAY_MAX_JOBS = 50;
112+
public static final String BULK_REPLAY_NODE_DISCONNECT_TIMEOUT = "nifi.bulk.replay.node.disconnect.timeout";
113+
public static final String DEFAULT_BULK_REPLAY_NODE_DISCONNECT_TIMEOUT = "5 mins";
114+
107115
// provenance properties
108116
public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation";
109117
public static final String PROVENANCE_REPO_DIRECTORY_PREFIX = "nifi.provenance.repository.directory.";
@@ -503,6 +511,19 @@ public int getQueueSwapThreshold() {
503511
}
504512
}
505513

514+
public int getBulkReplayMaxConcurrent() {
515+
return getIntegerProperty(BULK_REPLAY_MAX_CONCURRENT, DEFAULT_BULK_REPLAY_MAX_CONCURRENT);
516+
}
517+
518+
public int getBulkReplayMaxJobs() {
519+
return getIntegerProperty(BULK_REPLAY_MAX_JOBS, DEFAULT_BULK_REPLAY_MAX_JOBS);
520+
}
521+
522+
public String getBulkReplayNodeDisconnectTimeout() {
523+
final String value = getProperty(BULK_REPLAY_NODE_DISCONNECT_TIMEOUT);
524+
return (value == null || value.isBlank()) ? DEFAULT_BULK_REPLAY_NODE_DISCONNECT_TIMEOUT : value.trim();
525+
}
526+
506527
public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) {
507528
final String value = getProperty(propertyName);
508529
if (value == null || value.isBlank()) {

nifi-docs/src/main/asciidoc/administration-guide.adoc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3149,6 +3149,25 @@ may increase the rate at which the Provenance Repository is able to process thes
31493149
|`nifi.provenance.repository.buffer.size`|The Provenance Repository buffer size. The default value is `100000` provenance events.
31503150
|====
31513151

3152+
[[bulk_replay_properties]]
3153+
=== Bulk Replay Properties
3154+
3155+
The following properties control the server-side behavior of the Bulk Replay feature (see link:user-guide.html#bulk_replay[Bulk Replay] in the User Guide).
3156+
Bulk replay jobs are tracked in memory and are lost when the entire cluster is restarted. In a cluster, jobs execute on the primary node; if the primary node is lost, interrupted jobs are automatically resumed by the new primary.
3157+
3158+
|====
3159+
|*Property*|*Description*
3160+
|`nifi.bulk.replay.max.concurrent`|The number of worker threads available for executing bulk replay jobs.
3161+
Each thread handles one job at a time, so this value caps the number of jobs that can run concurrently.
3162+
Raising this value allows more jobs to execute in parallel but increases load on the node.
3163+
The default value is `5`.
3164+
|`nifi.bulk.replay.max.jobs`|The maximum number of job summaries retained in memory.
3165+
When this limit is exceeded, the oldest terminal (completed, partial success, failed, or cancelled) job is automatically evicted. Active jobs (queued, running, or interrupted) are never evicted.
3166+
The default value is `50`.
3167+
|`nifi.bulk.replay.node.disconnect.timeout`|When a replay item's content resides on a cluster node that is currently disconnected, the worker waits up to this duration for the node to reconnect before failing the item.
3168+
The value is a NiFi duration string (e.g., `5 mins`, `30 secs`).
3169+
The default value is `5 mins`.
3170+
|====
31523171

31533172
=== Status History Repository
31543173

42.4 KB
Loading
138 KB
Loading
47 KB
Loading
146 KB
Loading

nifi-docs/src/main/asciidoc/user-guide.adoc

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ NOTE: For Processors, Ports, Remote Process Groups, Connections and Labels, it i
306306
- *Enable* or *Disable*: This option allows the user to enable or disable a Processor; the option will be either Enable or Disable, depending on the current state of the Processor.
307307
- *View data provenance*: This option displays the NiFi Data Provenance table, with information about data provenance events for the FlowFiles routed through that Processor (see <<data_provenance>>).
308308
- *Replay last event*: This option will replay the last Provenance event, effectively requeuing the last FlowFile that was processed by the Processor (see <<replay_flowfile>>).
309+
- *Bulk Replay*: This option opens a provenance search dialog scoped to the selected Processor, allowing the user to select multiple events and replay them all at once (see <<bulk_replay>>).
309310
- *View status history*: This option opens a graphical representation of the Processor's statistical information over time.
310311
- *View usage*: This option takes the user to the Processor's usage documentation.
311312
- *View connections->Upstream*: This option allows the user to see and "jump to" upstream connections that are coming into the Processor. This is particularly useful when processors connect into and out of other Process Groups.
@@ -2963,6 +2964,68 @@ item. From here, the user can choose to either replay the last event from just t
29632964

29642965
image:event-content.png["Event Content", width=700]
29652966

2967+
[[bulk_replay]]
2968+
=== Bulk Replay
2969+
2970+
Bulk Replay allows a user to replay multiple provenance events for a Processor in a single operation. This is useful when a configuration error or downstream failure has caused a batch of FlowFiles to be processed incorrectly, and those FlowFiles need to be requeued for reprocessing.
2971+
2972+
==== Starting a Bulk Replay
2973+
2974+
To start a Bulk Replay, right-click on a Processor and select *Bulk Replay* from the context menu.
2975+
2976+
image:bulkReplayProcessorMenu.png["Bulk Replay context menu option"]
2977+
2978+
This opens a provenance search dialog scoped to the selected Processor. Use the date range, FlowFile attribute, and other search filters to locate the events to be replayed, then select the desired events and confirm. You can also provide a job name to help identify the replay later; if left blank, the name defaults to the processor name and submission timestamp.
2979+
2980+
image:bulkReplaySelectWindow.png["Bulk Replay event selection dialog", width=900]
2981+
2982+
Once submitted, the replay job is created on the server and appears in the *Bulk Replay Status* page, accessible from the Global Menu.
2983+
2984+
==== Bulk Replay Status Page
2985+
2986+
The Bulk Replay Status page shows a table of all replay jobs known to the NiFi instance.
2987+
2988+
image:bulkReplayStatusWindow.png["Bulk Replay Status page", width=900]
2989+
2990+
For each job the table displays:
2991+
2992+
- *Job Name* — the name of the job (defaults to the processor name and timestamp if not specified).
2993+
- *Submitted* — the time the job was submitted.
2994+
- *Submitted By* — the user who submitted the job.
2995+
- *Processor Name* — the name of the Processor whose events are being replayed.
2996+
- *Processor Type* — the type of the Processor (e.g., `GenerateFlowFile`).
2997+
- *Status* — the current job status (`QUEUED`, `RUNNING`, `COMPLETED`, `PARTIAL_SUCCESS`, `CANCELLED`, `FAILED`, or `INTERRUPTED`).
2998+
- *Progress* — a progress bar shown while the job is running.
2999+
- *Total* — total number of events in the job.
3000+
- *Replayed* — number of events successfully replayed.
3001+
- *Failed* — number of events that could not be replayed.
3002+
3003+
The table supports filtering by job name, processor name, processor type, status, or submitted by, and all columns are sortable. An auto-refresh toggle (default on, 5-second interval) keeps the counts and progress current while jobs are running.
3004+
3005+
The *View Details* button opens a dialog showing per-event results, including the event ID, FlowFile UUID, event time, replay status, and any failure reason. This dialog also supports filtering and auto-refresh. The *Go to Processor* button navigates directly to the Processor on the canvas.
3006+
3007+
image:bulkReplayViewDetails.png["Bulk Replay job detail dialog", width=900]
3008+
3009+
Running or queued jobs can be cancelled using the cancel button in the actions column. Completed jobs can be removed from the list using the eraser icon, which offers the option to clear successful jobs only, all finished jobs, or all jobs.
3010+
3011+
==== Cluster Behavior
3012+
3013+
In a clustered environment, bulk replay jobs execute on the primary node. When a job is submitted, it is replicated to all nodes so every node has a local copy, but only the primary node starts the worker thread. Requests for job listings, details, and item-level status are forwarded to the primary node. Cancel and delete operations are replicated to all nodes.
3014+
3015+
If the primary node is lost while a job is running, the worker detects the loss of primary role and sets the job status to `INTERRUPTED`. When the new primary node is elected, interrupted jobs are automatically detected and restarted. Because replay creates new FlowFiles, restarting a job from scratch is safe.
3016+
3017+
If a replay item's content resides on a cluster node that is currently disconnected, the worker waits for the node to reconnect (up to the timeout configured by `nifi.bulk.replay.node.disconnect.timeout`, default 5 minutes). If the node does not reconnect in time, items on that node are marked as failed. See the link:administration-guide.html#bulk_replay_properties[Bulk Replay Properties] section for details.
3018+
3019+
==== Concurrency
3020+
3021+
The number of jobs that can execute at the same time is controlled by the `nifi.bulk.replay.max.concurrent` property in `nifi.properties` (default 5). Each worker thread processes one job at a time, replaying items sequentially within the job. See the link:administration-guide.html#bulk_replay_properties[Bulk Replay Properties] section of the System Administrator's Guide for details.
3022+
3023+
==== Persistence
3024+
3025+
Bulk replay jobs are held in memory by default and are lost when NiFi is restarted. The maximum number of jobs retained is controlled by the `nifi.bulk.replay.max.jobs` property (default 50). When the limit is reached, the oldest terminal (completed, partial success, failed, or cancelled) job is automatically removed. Active jobs (queued, running, or interrupted) are never evicted.
3026+
3027+
If the entire cluster is restarted, any in-progress replay will not complete and pending items will not be dispatched. Completed job results should be noted before restarting if they are needed. However, if only the primary node is lost and a new primary is elected, interrupted jobs are automatically resumed (see Cluster Behavior above).
3028+
29663029
=== Viewing FlowFile Lineage
29673030

29683031
It is often useful to see a graphical representation of the lineage or path a FlowFile took within the dataflow. To see a FlowFile's lineage,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.web.api.dto.bulkreplay;
18+
19+
public enum BulkReplayItemStatus {
20+
QUEUED,
21+
RUNNING,
22+
SUCCEEDED,
23+
FAILED,
24+
SKIPPED
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.web.api.dto.bulkreplay;
18+
19+
import io.swagger.v3.oas.annotations.media.Schema;
20+
import jakarta.xml.bind.annotation.XmlType;
21+
22+
import java.util.List;
23+
24+
/**
25+
* Submission body for a bulk replay job. Extends the summary with the list of items to replay.
26+
* The {@code items} field is populated by the client on POST and is absent in GET responses
27+
* (items are retrieved separately via {@code GET /bulk-replay/jobs/{id}/items}).
28+
*/
29+
@XmlType(name = "bulkReplayJobDetail")
30+
public class BulkReplayJobDetailDTO extends BulkReplayJobSummaryDTO {
31+
32+
private List<BulkReplayJobItemDTO> items;
33+
34+
@Schema(description = "Items to replay. Populated on submission; absent in responses.")
35+
public List<BulkReplayJobItemDTO> getItems() {
36+
return items;
37+
}
38+
39+
public void setItems(List<BulkReplayJobItemDTO> items) {
40+
this.items = items;
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.web.api.dto.bulkreplay;
18+
19+
import io.swagger.v3.oas.annotations.media.Schema;
20+
import jakarta.xml.bind.annotation.XmlType;
21+
22+
/**
23+
* A single item in a bulk replay job. On submission, only the provenance-related fields are
24+
* required. Server-assigned fields (itemId, itemIndex, status, times) are populated in responses.
25+
*/
26+
@XmlType(name = "bulkReplayJobItem")
27+
public class BulkReplayJobItemDTO {
28+
29+
// --- Server-assigned ---
30+
private String itemId;
31+
private Integer itemIndex;
32+
private BulkReplayItemStatus status;
33+
private String errorMessage;
34+
private String startTime;
35+
private String endTime;
36+
private String lastUpdated;
37+
38+
// --- Submitted by client ---
39+
private Long provenanceEventId;
40+
private String clusterNodeId;
41+
private String flowFileUuid;
42+
private String eventType;
43+
private String eventTime;
44+
private String componentName;
45+
46+
private Long fileSizeBytes;
47+
48+
@Schema(description = "Server-assigned unique id of this item.")
49+
public String getItemId() {
50+
return itemId;
51+
}
52+
53+
public void setItemId(String itemId) {
54+
this.itemId = itemId;
55+
}
56+
57+
@Schema(description = "Zero-based index of this item within the job.")
58+
public Integer getItemIndex() {
59+
return itemIndex;
60+
}
61+
62+
public void setItemIndex(Integer itemIndex) {
63+
this.itemIndex = itemIndex;
64+
}
65+
66+
@Schema(description = "Current status of this item.")
67+
public BulkReplayItemStatus getStatus() {
68+
return status;
69+
}
70+
71+
public void setStatus(BulkReplayItemStatus status) {
72+
this.status = status;
73+
}
74+
75+
@Schema(description = "Error message if replay failed, null otherwise.")
76+
public String getErrorMessage() {
77+
return errorMessage;
78+
}
79+
80+
public void setErrorMessage(String errorMessage) {
81+
this.errorMessage = errorMessage;
82+
}
83+
84+
@Schema(description = "Time at which replay of this item began.", type = "string")
85+
public String getStartTime() {
86+
return startTime;
87+
}
88+
89+
public void setStartTime(String startTime) {
90+
this.startTime = startTime;
91+
}
92+
93+
@Schema(description = "Time at which replay of this item completed.", type = "string")
94+
public String getEndTime() {
95+
return endTime;
96+
}
97+
98+
public void setEndTime(String endTime) {
99+
this.endTime = endTime;
100+
}
101+
102+
@Schema(description = "Time at which this record was last updated.", type = "string")
103+
public String getLastUpdated() {
104+
return lastUpdated;
105+
}
106+
107+
public void setLastUpdated(String lastUpdated) {
108+
this.lastUpdated = lastUpdated;
109+
}
110+
111+
@Schema(description = "The numeric id of the provenance event to replay.")
112+
public Long getProvenanceEventId() {
113+
return provenanceEventId;
114+
}
115+
116+
public void setProvenanceEventId(Long provenanceEventId) {
117+
this.provenanceEventId = provenanceEventId;
118+
}
119+
120+
@Schema(description = "The cluster node id that owns the provenance event. Null in standalone mode.")
121+
public String getClusterNodeId() {
122+
return clusterNodeId;
123+
}
124+
125+
public void setClusterNodeId(String clusterNodeId) {
126+
this.clusterNodeId = clusterNodeId;
127+
}
128+
129+
@Schema(description = "The UUID of the FlowFile associated with this event.")
130+
public String getFlowFileUuid() {
131+
return flowFileUuid;
132+
}
133+
134+
public void setFlowFileUuid(String flowFileUuid) {
135+
this.flowFileUuid = flowFileUuid;
136+
}
137+
138+
@Schema(description = "The type of the provenance event.")
139+
public String getEventType() {
140+
return eventType;
141+
}
142+
143+
public void setEventType(String eventType) {
144+
this.eventType = eventType;
145+
}
146+
147+
@Schema(description = "The timestamp of the provenance event.")
148+
public String getEventTime() {
149+
return eventTime;
150+
}
151+
152+
public void setEventTime(String eventTime) {
153+
this.eventTime = eventTime;
154+
}
155+
156+
@Schema(description = "The name of the component that generated the provenance event.")
157+
public String getComponentName() {
158+
return componentName;
159+
}
160+
161+
public void setComponentName(String componentName) {
162+
this.componentName = componentName;
163+
}
164+
165+
@Schema(description = "The size of the FlowFile content in bytes. Used to determine if replay can proceed when a cluster node is disconnected.")
166+
public Long getFileSizeBytes() {
167+
return fileSizeBytes;
168+
}
169+
170+
public void setFileSizeBytes(Long fileSizeBytes) {
171+
this.fileSizeBytes = fileSizeBytes;
172+
}
173+
}

0 commit comments

Comments
 (0)