From bff24a7255f3da0bec618693254303fee2e267ee Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Sun, 1 Mar 2026 13:21:14 +0100 Subject: [PATCH] NIFI-15671 - Flow import/export with stateful components state Signed-off-by: Pierre Villard --- ...tandardVersionedComponentSynchronizer.java | 112 ++++ .../VersionedFlowSynchronizationContext.java | 37 ++ .../nifi/groups/StandardProcessGroup.java | 17 + .../flow/mapping/NiFiRegistryFlowMapper.java | 54 ++ .../flow/mapping/FlowMappingOptions.java | 64 +++ .../nifi/controller/FlowController.java | 6 +- .../apache/nifi/web/NiFiServiceFacade.java | 12 + .../nifi/web/StandardNiFiServiceFacade.java | 111 +++- .../nifi/web/api/ProcessGroupResource.java | 229 ++++++++- .../web/api/TestProcessGroupResource.java | 2 +- .../tests/system/StatefulCountProcessor.java | 70 +++ .../org.apache.nifi.processor.Processor | 1 + ...sterFlowDefinitionExportImportStateIT.java | 330 ++++++++++++ .../pg/FlowDefinitionExportImportStateIT.java | 485 ++++++++++++++++++ .../toolkit/client/ProcessGroupClient.java | 2 + .../client/impl/JerseyProcessGroupClient.java | 9 +- 16 files changed, 1516 insertions(+), 25 deletions(-) create mode 100644 nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/StatefulCountProcessor.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/ClusterFlowDefinitionExportImportStateIT.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/FlowDefinitionExportImportStateIT.java diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 4d9e6bbeaf97..a8d4a7bdf577 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -17,10 +17,15 @@ package org.apache.nifi.flow.synchronization; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.asset.Asset; import org.apache.nifi.asset.AssetManager; import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -57,6 +62,8 @@ import org.apache.nifi.flow.ParameterProviderReference; import org.apache.nifi.flow.VersionedAsset; import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedComponentState; +import org.apache.nifi.flow.VersionedConfigurableExtension; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedExternalFlow; @@ -64,6 +71,7 @@ import org.apache.nifi.flow.VersionedFlowCoordinates; import org.apache.nifi.flow.VersionedFunnel; import org.apache.nifi.flow.VersionedLabel; +import org.apache.nifi.flow.VersionedNodeState; import org.apache.nifi.flow.VersionedParameter; import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedPort; @@ -125,6 +133,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URL; import java.time.Duration; import java.util.ArrayList; @@ -327,6 +337,8 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve createdAndModifiedExtensions.clear(); setSynchronizationOptions(options); + validateLocalStateTopology(versionedExternalFlow.getFlowContents()); + for (final FlowDifference diff : flowComparison.getDifferences()) { if (FlowDifferenceFilters.isPropertyMissingFromGhostComponent(diff, context.getFlowManager())) { continue; @@ -1411,6 +1423,8 @@ private ControllerServiceNode addControllerService(final ProcessGroup destinatio updateControllerService(newService, proposed, topLevelGroup); + restoreComponentState(newService.getIdentifier(), proposed.getComponentState(), newService); + return newService; } @@ -2708,6 +2722,8 @@ private ProcessorNode addProcessor(final ProcessGroup destination, final Version procNode.onConfigurationRestored(processContext); connectableAdditionTracker.addComponent(destination.getIdentifier(), proposed.getIdentifier(), procNode); + restoreComponentState(procNode.getIdentifier(), proposed.getComponentState(), procNode); + return procNode; } @@ -4039,6 +4055,102 @@ private Map getPropertyValues(final ComponentNode componentNode) return propertyValues; } + private void validateLocalStateTopology(final VersionedProcessGroup proposed) { + final int connectedNodeCount = context.getConnectedNodeCount(); + if (connectedNodeCount <= 0) { + return; + } + + final int maxSourceNodes = findMaxLocalStateNodeCount(proposed); + if (maxSourceNodes > connectedNodeCount) { + throw new IllegalStateException( + "Cannot import flow with component state: the flow definition contains local state from %d source node(s) but the destination cluster has only %d connected node(s). " + .formatted(maxSourceNodes, connectedNodeCount) + + "Import into a cluster with at least %d node(s), or export without component state.".formatted(maxSourceNodes)); + } + } + + private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) { + int max = 0; + for (final VersionedConfigurableExtension ext : getStatefulExtensions(group)) { + final VersionedComponentState state = ext.getComponentState(); + if (state != null && state.getLocalNodeStates() != null) { + max = Math.max(max, state.getLocalNodeStates().size()); + } + } + if (group.getProcessGroups() != null) { + for (final VersionedProcessGroup child : group.getProcessGroups()) { + max = Math.max(max, findMaxLocalStateNodeCount(child)); + } + } + return max; + } + + private List getStatefulExtensions(final VersionedProcessGroup group) { + final List extensions = new ArrayList<>(); + if (group.getProcessors() != null) { + extensions.addAll(group.getProcessors()); + } + if (group.getControllerServices() != null) { + extensions.addAll(group.getControllerServices()); + } + return extensions; + } + + private void restoreComponentState(final String componentId, final VersionedComponentState componentState, final ComponentNode componentNode) { + if (componentState == null) { + return; + } + + final StateManagerProvider stateManagerProvider = context.getStateManagerProvider(); + if (stateManagerProvider == null) { + LOG.debug("StateManagerProvider is not available; skipping state restoration for component {}", componentId); + return; + } + + final ConfigurableComponent component = componentNode.getComponent(); + if (component == null) { + LOG.debug("Component {} is not available; skipping state restoration", componentId); + return; + } + + final Stateful stateful = component.getClass().getAnnotation(Stateful.class); + if (stateful == null) { + LOG.debug("Component {} ({}) is not annotated with @Stateful; skipping state restoration", componentId, component.getClass().getSimpleName()); + return; + } + + final Set supportedScopes = Set.of(stateful.scopes()); + final StateManager stateManager = stateManagerProvider.getStateManager(componentId); + + try { + if (supportedScopes.contains(Scope.CLUSTER) && componentState.getClusterState() != null && !componentState.getClusterState().isEmpty()) { + stateManager.setState(componentState.getClusterState(), Scope.CLUSTER); + LOG.debug("Restored cluster state for component {}", componentId); + } + + if (supportedScopes.contains(Scope.LOCAL) && componentState.getLocalNodeStates() != null && !componentState.getLocalNodeStates().isEmpty()) { + final int localNodeOrdinal = context.getLocalNodeOrdinal(); + if (localNodeOrdinal < 0) { + LOG.debug("Local node ordinal is not set; skipping local state restoration for component {}", componentId); + return; + } + + final List localNodeStates = componentState.getLocalNodeStates(); + final VersionedNodeState nodeState = localNodeOrdinal < localNodeStates.size() ? localNodeStates.get(localNodeOrdinal) : null; + final Map localState = nodeState != null ? nodeState.getState() : null; + if (localState != null && !localState.isEmpty()) { + stateManager.setState(localState, Scope.LOCAL); + LOG.debug("Restored local state for component {} from node ordinal {}", componentId, localNodeOrdinal); + } else { + LOG.debug("No local state found for component {} at node ordinal {}", componentId, localNodeOrdinal); + } + } + } catch (final IOException e) { + throw new UncheckedIOException("Failed to restore state for component %s".formatted(componentId), e); + } + } + private record CreatedOrModifiedExtension(ComponentNode extension, Map propertyValues) { } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java index f0b2183b3609..0769e3049935 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/VersionedFlowSynchronizationContext.java @@ -18,6 +18,7 @@ package org.apache.nifi.flow.synchronization; import org.apache.nifi.asset.AssetManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ProcessorNode; @@ -45,6 +46,9 @@ public class VersionedFlowSynchronizationContext { private final Function processContextFactory; private final Function configurationContextFactory; private final AssetManager assetManager; + private final StateManagerProvider stateManagerProvider; + private final int localNodeOrdinal; + private final int connectedNodeCount; private VersionedFlowSynchronizationContext(final Builder builder) { this.componentIdGenerator = builder.componentIdGenerator; @@ -57,6 +61,9 @@ private VersionedFlowSynchronizationContext(final Builder builder) { this.processContextFactory = builder.processContextFactory; this.configurationContextFactory = builder.configurationContextFactory; this.assetManager = builder.assetManager; + this.stateManagerProvider = builder.stateManagerProvider; + this.localNodeOrdinal = builder.localNodeOrdinal; + this.connectedNodeCount = builder.connectedNodeCount; } public ComponentIdGenerator getComponentIdGenerator() { @@ -99,6 +106,18 @@ public AssetManager getAssetManager() { return assetManager; } + public StateManagerProvider getStateManagerProvider() { + return stateManagerProvider; + } + + public int getLocalNodeOrdinal() { + return localNodeOrdinal; + } + + public int getConnectedNodeCount() { + return connectedNodeCount; + } + public static class Builder { private ComponentIdGenerator componentIdGenerator; private FlowManager flowManager; @@ -110,6 +129,9 @@ public static class Builder { private Function processContextFactory; private Function configurationContextFactory; private AssetManager assetManager; + private StateManagerProvider stateManagerProvider; + private int localNodeOrdinal = -1; + private int connectedNodeCount = -1; public Builder componentIdGenerator(final ComponentIdGenerator componentIdGenerator) { this.componentIdGenerator = componentIdGenerator; @@ -161,6 +183,21 @@ public Builder assetManager(final AssetManager assetManager) { return this; } + public Builder stateManagerProvider(final StateManagerProvider stateManagerProvider) { + this.stateManagerProvider = stateManagerProvider; + return this; + } + + public Builder localNodeOrdinal(final int localNodeOrdinal) { + this.localNodeOrdinal = localNodeOrdinal; + return this; + } + + public Builder connectedNodeCount(final int connectedNodeCount) { + this.connectedNodeCount = connectedNodeCount; + return this; + } + public VersionedFlowSynchronizationContext build() { requireNonNull(componentIdGenerator, "Component ID Generator must be set"); requireNonNull(flowManager, "Flow Manager must be set"); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 1e6deb27ae08..8968be4ad19c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -4043,6 +4043,20 @@ public void verifyCanUpdate(final VersionedExternalFlow updatedFlow, final boole private VersionedFlowSynchronizationContext createGroupSynchronizationContext(final ComponentIdGenerator componentIdGenerator, final ComponentScheduler componentScheduler, final FlowMappingOptions flowMappingOptions) { + final int localNodeOrdinal; + final int connectedNodeCount; + if (nodeTypeProvider.isClustered()) { + final Set members = nodeTypeProvider.getClusterMembers(); + final List sortedMembers = members.stream().sorted().toList(); + final String currentNode = nodeTypeProvider.getCurrentNode().orElse(null); + final int idx = currentNode != null ? sortedMembers.indexOf(currentNode) : -1; + localNodeOrdinal = idx >= 0 ? idx : 0; + connectedNodeCount = Math.max(sortedMembers.size(), 1); + } else { + localNodeOrdinal = 0; + connectedNodeCount = 1; + } + return new VersionedFlowSynchronizationContext.Builder() .componentIdGenerator(componentIdGenerator) .flowManager(flowManager) @@ -4054,6 +4068,9 @@ private VersionedFlowSynchronizationContext createGroupSynchronizationContext(fi .processContextFactory(this::createProcessContext) .configurationContextFactory(this::createConfigurationContext) .assetManager(assetManager) + .stateManagerProvider(stateManagerProvider) + .localNodeOrdinal(localNodeOrdinal) + .connectedNodeCount(connectedNodeCount) .build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 9f847a24649a..ed2994c80bdb 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -18,12 +18,17 @@ package org.apache.nifi.registry.flow.mapping; import org.apache.commons.lang3.ClassUtils; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.asset.Asset; import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.listen.ListenPortDefinition; import org.apache.nifi.components.resource.ResourceCardinality; import org.apache.nifi.components.resource.ResourceDefinition; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; @@ -53,6 +58,7 @@ import org.apache.nifi.flow.PortType; import org.apache.nifi.flow.Position; import org.apache.nifi.flow.VersionedAsset; +import org.apache.nifi.flow.VersionedComponentState; import org.apache.nifi.flow.VersionedConnection; import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedFlowAnalysisRule; @@ -61,6 +67,7 @@ import org.apache.nifi.flow.VersionedFunnel; import org.apache.nifi.flow.VersionedLabel; import org.apache.nifi.flow.VersionedListenPortDefinition; +import org.apache.nifi.flow.VersionedNodeState; import org.apache.nifi.flow.VersionedParameter; import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedParameterProvider; @@ -92,6 +99,8 @@ import org.apache.nifi.remote.PublicPort; import org.apache.nifi.remote.RemoteGroupPort; +import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -505,10 +514,54 @@ public VersionedControllerService mapControllerService(final ControllerServiceNo versionedService.setPropertyDescriptors(mapPropertyDescriptors(controllerService, serviceProvider, includedGroupIds, externalControllerServiceReferences)); versionedService.setType(controllerService.getCanonicalClassName()); versionedService.setScheduledState(flowMappingOptions.getStateLookup().getState(controllerService)); + versionedService.setComponentState(mapComponentState(controllerService)); return versionedService; } + private VersionedComponentState mapComponentState(final ComponentNode componentNode) { + if (!flowMappingOptions.isMapComponentState()) { + return null; + } + + final ConfigurableComponent component = componentNode.getComponent(); + if (component == null) { + return null; + } + + final Stateful stateful = component.getClass().getAnnotation(Stateful.class); + if (stateful == null) { + return null; + } + + final String componentId = componentNode.getIdentifier(); + final StateManager stateManager = flowMappingOptions.getStateManagerProvider().getStateManager(componentId); + final VersionedComponentState result = new VersionedComponentState(); + boolean hasState = false; + + try { + for (final Scope scope : stateful.scopes()) { + final StateMap stateMap = stateManager.getState(scope); + if (stateMap != null && !stateMap.toMap().isEmpty()) { + if (scope == Scope.CLUSTER) { + result.setClusterState(stateMap.toMap()); + hasState = true; + } else if (scope == Scope.LOCAL) { + final int ordinal = flowMappingOptions.getLocalNodeOrdinal(); + final List localStates = new ArrayList<>(Collections.nCopies(ordinal + 1, null)); + localStates.set(ordinal, new VersionedNodeState(stateMap.toMap())); + result.setLocalNodeStates(localStates); + hasState = true; + } + } + } + } catch (final IOException e) { + throw new UncheckedIOException("Failed to retrieve state for component %s".formatted(componentId), e); + } + + return hasState ? result : null; + } + private Map mapProperties(final ComponentNode component, final ControllerServiceProvider serviceProvider) { final Map mapped = new HashMap<>(); @@ -774,6 +827,7 @@ public VersionedProcessor mapProcessor(final ProcessorNode procNode, final Contr processor.setRetriedRelationships(procNode.getRetriedRelationships()); processor.setBackoffMechanism(procNode.getBackoffMechanism().name()); processor.setMaxBackoffPeriod(procNode.getMaxBackoffPeriod()); + processor.setComponentState(mapComponentState(procNode)); return processor; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java index d6a96eb32e2b..eccf808188d2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java @@ -17,6 +17,8 @@ package org.apache.nifi.registry.flow.mapping; +import org.apache.nifi.components.state.StateManagerProvider; + import static java.util.Objects.requireNonNull; public class FlowMappingOptions { @@ -29,6 +31,9 @@ public class FlowMappingOptions { private final boolean mapControllerServiceReferencesToVersionedId; private final boolean mapFlowRegistryClientId; private final boolean mapAssetReferences; + private final boolean mapComponentState; + private final StateManagerProvider stateManagerProvider; + private final int localNodeOrdinal; private FlowMappingOptions(final Builder builder) { encryptor = builder.encryptor; @@ -40,6 +45,9 @@ private FlowMappingOptions(final Builder builder) { mapControllerServiceReferencesToVersionedId = builder.mapControllerServiceReferencesToVersionedId; mapFlowRegistryClientId = builder.mapFlowRegistryClientId; mapAssetReferences = builder.mapAssetReferences; + mapComponentState = builder.mapComponentState; + stateManagerProvider = builder.stateManagerProvider; + localNodeOrdinal = builder.localNodeOrdinal; } public SensitiveValueEncryptor getSensitiveValueEncryptor() { @@ -78,6 +86,18 @@ public boolean isMapAssetReferences() { return mapAssetReferences; } + public boolean isMapComponentState() { + return mapComponentState; + } + + public StateManagerProvider getStateManagerProvider() { + return stateManagerProvider; + } + + public int getLocalNodeOrdinal() { + return localNodeOrdinal; + } + public static class Builder { private SensitiveValueEncryptor encryptor; private VersionedComponentStateLookup stateLookup; @@ -88,6 +108,9 @@ public static class Builder { private boolean mapControllerServiceReferencesToVersionedId = true; private boolean mapFlowRegistryClientId = false; private boolean mapAssetReferences = false; + private boolean mapComponentState = false; + private StateManagerProvider stateManagerProvider; + private int localNodeOrdinal = 0; /** * Sets the SensitiveValueEncryptor to use for encrypting sensitive values. This value must be set @@ -190,6 +213,42 @@ public Builder mapAssetReferences(final boolean mapAssetReferences) { return this; } + /** + * Sets whether or not the component state should be mapped to the Versioned Component during export. + * If true, the {@link #stateManagerProvider(StateManagerProvider)} must be set. + * + * @param mapComponentState whether or not component state should be mapped + * @return the builder + */ + public Builder mapComponentState(final boolean mapComponentState) { + this.mapComponentState = mapComponentState; + return this; + } + + /** + * Sets the StateManagerProvider to use for retrieving component state. This value must be set + * if {@link #mapComponentState(boolean) mapComponentState} is set to true. + * + * @param stateManagerProvider the StateManagerProvider to use + * @return the builder + */ + public Builder stateManagerProvider(final StateManagerProvider stateManagerProvider) { + this.stateManagerProvider = stateManagerProvider; + return this; + } + + /** + * Sets the ordinal index of the local node within the cluster. In standalone mode, this defaults to 0. + * Used during export to key local-scoped state entries. + * + * @param localNodeOrdinal the ordinal index of the local node + * @return the builder + */ + public Builder localNodeOrdinal(final int localNodeOrdinal) { + this.localNodeOrdinal = localNodeOrdinal; + return this; + } + /** * Creates a FlowMappingOptions object, or throws an Exception if not all required configuration has been provided * @@ -206,6 +265,10 @@ public FlowMappingOptions build() { requireNonNull(encryptor, "Property Encryptor must be set when sensitive configuration is to be mapped"); } + if (mapComponentState) { + requireNonNull(stateManagerProvider, "State Manager Provider must be set when component state is to be mapped"); + } + return new FlowMappingOptions(this); } } @@ -224,6 +287,7 @@ public FlowMappingOptions build() { .mapControllerServiceReferencesToVersionedId(true) .mapFlowRegistryClientId(false) .mapAssetReferences(false) + .mapComponentState(false) .build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index cdbbc6e2c9be..a7e902f26154 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2669,7 +2669,9 @@ public boolean isClustered() { @Override public Set getClusterMembers() { if (isClustered()) { - return clusterCoordinator.getConnectionStatuses().stream().map(s -> s.getNodeIdentifier().getApiAddress()).collect(Collectors.toSet()); + return clusterCoordinator.getConnectionStatuses().stream() + .map(s -> "%s:%d".formatted(s.getNodeIdentifier().getApiAddress(), s.getNodeIdentifier().getApiPort())) + .collect(Collectors.toSet()); } else { return Collections.emptySet(); } @@ -2678,7 +2680,7 @@ public Set getClusterMembers() { @Override public Optional getCurrentNode() { if (isClustered() && getNodeId() != null) { - return Optional.of(getNodeId().getApiAddress()); + return Optional.of("%s:%d".formatted(getNodeId().getApiAddress(), getNodeId().getApiPort())); } else { return Optional.empty(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 3b5babdc0692..284718e002b0 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1722,6 +1722,18 @@ VersionControlInformationEntity setVersionControlInformation(Revision processGro */ RegisteredFlowSnapshot getCurrentFlowSnapshotByGroupIdWithReferencedControllerServices(String processGroupId); + /** + * Get the current state of the Process Group with the given ID, converted to a Versioned Flow Snapshot for download. + * Optionally includes referenced controller services from parent groups and component state. + * + * @param processGroupId the ID of the Process Group + * @param includeReferencedServices whether to include referenced controller services from parent groups + * @param includeComponentState whether to include component state in the export. When true, all processors must be stopped + * and all controller services must be disabled. + * @return the current Process Group converted to a Versioned Flow Snapshot for download + */ + RegisteredFlowSnapshot getCurrentFlowSnapshotByGroupId(String processGroupId, boolean includeReferencedServices, boolean includeComponentState); + /** * Returns the name of the Flow Registry that is registered with the given ID. If no Flow Registry exists with the given ID, will return * the ID itself as the name diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 940310d3e8c0..ab20ace4029c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -77,6 +77,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.validation.ValidationState; import org.apache.nifi.connectable.Connectable; @@ -496,6 +497,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private FlowRegistryDAO flowRegistryDAO; private ParameterContextDAO parameterContextDAO; private ClusterCoordinator clusterCoordinator; + private StateManagerProvider stateManagerProvider; private HeartbeatMonitor heartbeatMonitor; private LeaderElectionManager leaderElectionManager; @@ -5404,6 +5406,51 @@ public RegisteredFlowSnapshot getCurrentFlowSnapshotByGroupIdWithReferencedContr return getCurrentFlowSnapshotByGroupId(processGroupId, true); } + @Override + public RegisteredFlowSnapshot getCurrentFlowSnapshotByGroupId(final String processGroupId, final boolean includeReferencedServices, final boolean includeComponentState) { + if (!includeComponentState) { + return getCurrentFlowSnapshotByGroupId(processGroupId, includeReferencedServices); + } + + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); + + // Validate all processors are stopped and all controller services are disabled + final List runningProcessors = processGroup.findAllProcessors().stream() + .filter(p -> p.getPhysicalScheduledState() != ScheduledState.STOPPED && p.getPhysicalScheduledState() != ScheduledState.DISABLED) + .toList(); + if (!runningProcessors.isEmpty()) { + throw new IllegalStateException("Cannot export component state because %d processor(s) are not stopped: %s".formatted( + runningProcessors.size(), + runningProcessors.stream().map(p -> "%s [%s]".formatted(p.getName(), p.getIdentifier())).limit(5).collect(Collectors.joining(", ")))); + } + + final List enabledServices = processGroup.findAllControllerServices().stream() + .filter(s -> s.getState() != ControllerServiceState.DISABLED) + .toList(); + if (!enabledServices.isEmpty()) { + throw new IllegalStateException("Cannot export component state because %d controller service(s) are not disabled: %s".formatted( + enabledServices.size(), + enabledServices.stream().map(s -> "%s [%s]".formatted(s.getName(), s.getIdentifier())).limit(5).collect(Collectors.joining(", ")))); + } + + final FlowMappingOptions mappingOptions = new FlowMappingOptions.Builder() + .sensitiveValueEncryptor(null) + .stateLookup(VersionedComponentStateLookup.ENABLED_OR_DISABLED) + .componentIdLookup(ComponentIdLookup.VERSIONED_OR_GENERATE) + .mapPropertyDescriptors(true) + .mapSensitiveConfiguration(false) + .mapInstanceIdentifiers(false) + .mapControllerServiceReferencesToVersionedId(true) + .mapFlowRegistryClientId(false) + .mapAssetReferences(false) + .mapComponentState(includeComponentState) + .stateManagerProvider(stateManagerProvider) + .localNodeOrdinal(computeLocalNodeOrdinal()) + .build(); + + return buildFlowSnapshot(processGroup, processGroupId, includeReferencedServices, mappingOptions); + } + private Set getAllSubGroups(ProcessGroup processGroup) { final Set result = processGroup.findAllProcessGroups().stream() .map(ProcessGroup::getIdentifier) @@ -5412,24 +5459,56 @@ private Set getAllSubGroups(ProcessGroup processGroup) { return result; } + /** + * Computes the ordinal index of the local node among connected cluster nodes. + * Nodes are sorted deterministically by apiAddress. In standalone mode, returns 0. + * + * @return the ordinal index of the local node, or 0 if not clustered + */ + private int computeLocalNodeOrdinal() { + if (clusterCoordinator == null) { + return 0; + } + + final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier(); + if (localNodeId == null) { + return 0; + } + + final List connectedNodes = clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED).stream() + .sorted(Comparator.comparing((NodeIdentifier n) -> n.getApiAddress()).thenComparingInt(NodeIdentifier::getApiPort)) + .toList(); + + for (int i = 0; i < connectedNodes.size(); i++) { + if (connectedNodes.get(i).equals(localNodeId)) { + return i; + } + } + + return 0; + } + private RegisteredFlowSnapshot getCurrentFlowSnapshotByGroupId(final String processGroupId, final boolean includeReferencedControllerServices) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); + return buildFlowSnapshot(processGroup, processGroupId, includeReferencedControllerServices, null); + } + + private RegisteredFlowSnapshot buildFlowSnapshot(final ProcessGroup processGroup, final String processGroupId, + final boolean includeReferencedControllerServices, final FlowMappingOptions customMappingOptions) { + final NiFiRegistryFlowMapper mapper = customMappingOptions != null + ? makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager(), customMappingOptions) + : makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager()); - // Create a complete (include descendant flows) VersionedProcessGroup snapshot of the flow as it is - // currently without any registry related fields populated, even if the flow is currently versioned. - final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager()); final InstantiatedVersionedProcessGroup nonVersionedProcessGroup = mapper.mapNonVersionedProcessGroup(processGroup, controllerFacade.getControllerServiceProvider()); final Map parameterProviderReferences = new HashMap<>(); - - // Create a complete (include descendant flows) map of parameter contexts final Map parameterContexts = mapper.mapParameterContexts(processGroup, true, parameterProviderReferences); final Map externalControllerServiceReferences = Optional.ofNullable(nonVersionedProcessGroup.getExternalControllerServiceReferences()).orElse(Collections.emptyMap()); final Set controllerServices = new HashSet<>(nonVersionedProcessGroup.getControllerServices()); - final RegisteredFlowSnapshot nonVersionedFlowSnapshot = new RegisteredFlowSnapshot(); + final RegisteredFlowSnapshot flowSnapshot = new RegisteredFlowSnapshot(); ProcessGroup parentGroup = processGroup.getParent(); @@ -5445,24 +5524,25 @@ private RegisteredFlowSnapshot getCurrentFlowSnapshotByGroupId(final String proc if (externalControllerServiceReferences.keySet().contains(versionedControllerService.getIdentifier())) { versionedControllerService.setGroupIdentifier(processGroupId); + versionedControllerService.setComponentState(null); externalServices.add(versionedControllerService); } } } while ((parentGroup = parentGroup.getParent()) != null); controllerServices.addAll(externalServices); - nonVersionedFlowSnapshot.setExternalControllerServices(new HashMap<>()); + flowSnapshot.setExternalControllerServices(new HashMap<>()); } else { - nonVersionedFlowSnapshot.setExternalControllerServices(externalControllerServiceReferences); + flowSnapshot.setExternalControllerServices(externalControllerServiceReferences); } nonVersionedProcessGroup.setControllerServices(controllerServices); - nonVersionedFlowSnapshot.setFlowContents(nonVersionedProcessGroup); - nonVersionedFlowSnapshot.setParameterProviders(parameterProviderReferences); - nonVersionedFlowSnapshot.setParameterContexts(parameterContexts); - nonVersionedFlowSnapshot.setFlowEncodingVersion(FlowRegistryUtil.FLOW_ENCODING_VERSION); + flowSnapshot.setFlowContents(nonVersionedProcessGroup); + flowSnapshot.setParameterProviders(parameterProviderReferences); + flowSnapshot.setParameterContexts(parameterContexts); + flowSnapshot.setFlowEncodingVersion(FlowRegistryUtil.FLOW_ENCODING_VERSION); - return nonVersionedFlowSnapshot; + return flowSnapshot; } @Override @@ -7399,6 +7479,11 @@ public void setClusterCoordinator(final ClusterCoordinator coordinator) { this.clusterCoordinator = coordinator; } + @Autowired + public void setStateManagerProvider(final StateManagerProvider stateManagerProvider) { + this.stateManagerProvider = stateManagerProvider; + } + @Autowired(required = false) public void setHeartbeatMonitor(final HeartbeatMonitor heartbeatMonitor) { this.heartbeatMonitor = heartbeatMonitor; diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 6d55f9d0651a..da181318b419 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -57,15 +57,21 @@ import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.cluster.coordination.http.replication.AsyncClusterResponse; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.flow.ConnectableComponent; import org.apache.nifi.flow.ExecutionEngine; import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedComponentState; +import org.apache.nifi.flow.VersionedConfigurableExtension; +import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedFlowCoordinates; +import org.apache.nifi.flow.VersionedNodeState; import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedPropertyDescriptor; import org.apache.nifi.groups.VersionedComponentAdditions; import org.apache.nifi.parameter.ParameterContext; @@ -138,6 +144,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -294,7 +301,8 @@ public Response getProcessGroup( @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") }, security = { - @SecurityRequirement(name = "Read - /process-groups/{uuid}") + @SecurityRequirement(name = "Read - /process-groups/{uuid}"), + @SecurityRequirement(name = "Write - /process-groups/{uuid} - Only required when includeComponentState is true") } ) public Response exportProcessGroup( @@ -305,19 +313,35 @@ public Response exportProcessGroup( @PathParam("id") final String groupId, @Parameter(description = "If referenced services from outside the target group should be included") @QueryParam("includeReferencedServices") - @DefaultValue("false") boolean includeReferencedServices) { - // authorize access + @DefaultValue("false") final boolean includeReferencedServices, + @Parameter(description = "If component state should be included in the exported flow definition. " + + "Requires all processors to be stopped and all controller services to be disabled.") + @QueryParam("includeComponentState") + @DefaultValue("false") final boolean includeComponentState) { + + // When exporting with component state in a cluster, replicate to all nodes so that each node + // contributes its LOCAL state. The coordinator merges the localNodeStates maps from all responses. + if (includeComponentState && isReplicateRequest()) { + return replicateAndMergeFlowExport(); + } + + // authorize access — exporting with component state requires WRITE (state access requires write permission) + final RequestAction requiredAction = includeComponentState ? RequestAction.WRITE : RequestAction.READ; serviceFacade.authorizeAccess(lookup -> { - // ensure access to process groups (nested), encapsulated controller services and referenced parameter contexts final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); - authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, + authorizeProcessGroup(groupAuthorizable, authorizer, lookup, requiredAction, true, false, false, false, true); }); // get the versioned flow - final RegisteredFlowSnapshot currentVersionedFlowSnapshot = includeReferencedServices - ? serviceFacade.getCurrentFlowSnapshotByGroupIdWithReferencedControllerServices(groupId) - : serviceFacade.getCurrentFlowSnapshotByGroupId(groupId); + final RegisteredFlowSnapshot currentVersionedFlowSnapshot; + if (includeComponentState) { + currentVersionedFlowSnapshot = serviceFacade.getCurrentFlowSnapshotByGroupId(groupId, includeReferencedServices, true); + } else if (includeReferencedServices) { + currentVersionedFlowSnapshot = serviceFacade.getCurrentFlowSnapshotByGroupIdWithReferencedControllerServices(groupId); + } else { + currentVersionedFlowSnapshot = serviceFacade.getCurrentFlowSnapshotByGroupId(groupId); + } // determine the name of the attachment - possible issues with spaces in file names final VersionedProcessGroup currentVersionedProcessGroup = currentVersionedFlowSnapshot.getFlowContents(); @@ -327,6 +351,160 @@ public Response exportProcessGroup( return generateOkResponse(currentVersionedFlowSnapshot).header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", filename)).build(); } + /** + * Replicates the flow export request to all cluster nodes and merges the LOCAL state from each node + * into a single response. Each node returns a RegisteredFlowSnapshot containing only its own LOCAL + * state (keyed by its ordinal). This method collects all node responses and combines the localNodeStates + * maps so the exported flow contains LOCAL state from every node. + * + * @return the merged response containing LOCAL state from all nodes + */ + private Response replicateAndMergeFlowExport() { + final URI path = getAbsolutePath(); + final Map headers = getHeaders(); + + try { + final AsyncClusterResponse asyncResponse; + if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { + asyncResponse = getRequestReplicator().replicate(HttpMethod.GET, path, getRequestParameters(), headers); + } else { + // Not the coordinator — forward to coordinator which will handle the replicate-and-merge + return getRequestReplicator().forwardToCoordinator( + getClusterCoordinatorNode(), HttpMethod.GET, path, getRequestParameters(), headers + ).awaitMergedResponse().getResponse(); + } + + // Wait for all nodes to respond + asyncResponse.awaitMergedResponse(); + final Set nodeResponses = asyncResponse.getCompletedNodeResponses(); + + // Check for errors — if any node returned a non-2xx response, return that error + for (final NodeResponse nodeResponse : nodeResponses) { + if (!nodeResponse.is2xx()) { + return nodeResponse.getResponse(); + } + } + + // Deserialize each node's RegisteredFlowSnapshot and merge localNodeStates + RegisteredFlowSnapshot mergedSnapshot = null; + for (final NodeResponse nodeResponse : nodeResponses) { + final RegisteredFlowSnapshot nodeSnapshot = nodeResponse.getClientResponse().readEntity(RegisteredFlowSnapshot.class); + if (mergedSnapshot == null) { + mergedSnapshot = nodeSnapshot; + } else { + mergeLocalNodeStates(mergedSnapshot.getFlowContents(), nodeSnapshot.getFlowContents()); + } + } + + if (mergedSnapshot == null) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("No responses received from cluster nodes") + .type(MediaType.TEXT_PLAIN) + .build(); + } + + final String flowName = mergedSnapshot.getFlowContents().getName(); + final String filename = flowName.replaceAll("\\s", "_") + ".json"; + + return generateOkResponse(mergedSnapshot) + .header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=\"%s\"", filename)) + .build(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Request was interrupted while waiting for cluster responses") + .type(MediaType.TEXT_PLAIN) + .build(); + } + } + + /** + * Recursively merges localNodeStates from a source process group into a target process group. + * For each stateful component (processor or controller service), the LOCAL state entries from + * the source are added to the target's localNodeStates list. Cluster state is identical across + * nodes and is already present in the target. + * + * @param target the process group to merge into (from the first/client node response) + * @param source the process group to merge from (from another node's response) + */ + private void mergeLocalNodeStates(final VersionedProcessGroup target, final VersionedProcessGroup source) { + // Merge processor states + if (target.getProcessors() != null && source.getProcessors() != null) { + final Map sourceProcessors = new HashMap<>(); + for (final VersionedProcessor sp : source.getProcessors()) { + sourceProcessors.put(sp.getIdentifier(), sp); + } + for (final VersionedProcessor tp : target.getProcessors()) { + mergeComponentState(tp, sourceProcessors.get(tp.getIdentifier())); + } + } + + // Merge controller service states + if (target.getControllerServices() != null && source.getControllerServices() != null) { + final Map sourceServices = new HashMap<>(); + for (final VersionedControllerService ss : source.getControllerServices()) { + sourceServices.put(ss.getIdentifier(), ss); + } + for (final VersionedControllerService ts : target.getControllerServices()) { + mergeComponentState(ts, sourceServices.get(ts.getIdentifier())); + } + } + + // Recurse into child process groups + if (target.getProcessGroups() != null && source.getProcessGroups() != null) { + final Map sourceGroups = new HashMap<>(); + for (final VersionedProcessGroup sg : source.getProcessGroups()) { + sourceGroups.put(sg.getIdentifier(), sg); + } + for (final VersionedProcessGroup tg : target.getProcessGroups()) { + final VersionedProcessGroup sg = sourceGroups.get(tg.getIdentifier()); + if (sg != null) { + mergeLocalNodeStates(tg, sg); + } + } + } + } + + /** + * Merges the localNodeStates from a source component into a target component's VersionedComponentState. + * Each node contributes its own ordinal entry to localNodeStates — this method adds the source's entries + * to the target's list at the corresponding index positions. + * + * @param target the target component (already contains state from first node) + * @param source the source component (contains state from another node), may be null + */ + private void mergeComponentState(final VersionedConfigurableExtension target, final VersionedConfigurableExtension source) { + if (source == null) { + return; + } + + final VersionedComponentState sourceState = source.getComponentState(); + if (sourceState == null || sourceState.getLocalNodeStates() == null) { + return; + } + + VersionedComponentState targetState = target.getComponentState(); + if (targetState == null) { + targetState = new VersionedComponentState(); + target.setComponentState(targetState); + } + + final List sourceList = sourceState.getLocalNodeStates(); + if (targetState.getLocalNodeStates() == null) { + targetState.setLocalNodeStates(new ArrayList<>(sourceList)); + } else { + final List targetList = targetState.getLocalNodeStates(); + while (targetList.size() < sourceList.size()) { + targetList.add(null); + } + for (int i = 0; i < sourceList.size(); i++) { + if (sourceList.get(i) != null) { + targetList.set(i, sourceList.get(i)); + } + } + } + } + /** * Generates a copy response for the given copy request. * @@ -2600,6 +2778,11 @@ public Response initiateReplaceProcessGroup(@Parameter(description = "The proces throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied"); } + if (containsComponentState(versionedFlowSnapshot.getFlowContents())) { + throw new IllegalArgumentException("Cannot replace an existing Process Group with a flow definition that contains component state. " + + "Component state can only be restored when uploading a flow definition as a new Process Group."); + } + // remove any registry-specific versioning content which could be present if the flow was exported from registry versionedFlowSnapshot.setFlow(null); versionedFlowSnapshot.setBucket(null); @@ -2624,6 +2807,31 @@ private void sanitizeRegistryInfo(final VersionedProcessGroup versionedProcessGr } } + private boolean containsComponentState(final VersionedProcessGroup group) { + if (group.getProcessors() != null) { + for (final VersionedProcessor processor : group.getProcessors()) { + if (processor.getComponentState() != null) { + return true; + } + } + } + if (group.getControllerServices() != null) { + for (final VersionedControllerService service : group.getControllerServices()) { + if (service.getComponentState() != null) { + return true; + } + } + } + if (group.getProcessGroups() != null) { + for (final VersionedProcessGroup child : group.getProcessGroups()) { + if (containsComponentState(child)) { + return true; + } + } + } + return false; + } + /** * Uploads the specified versioned flow definition and adds it to a new process group. * @@ -3264,6 +3472,11 @@ public Response replaceProcessGroup(@Parameter(description = "The process group throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied."); } + if (containsComponentState(requestFlowSnapshot.getFlowContents())) { + throw new IllegalArgumentException("Cannot replace an existing Process Group with a flow definition that contains component state. " + + "Component state can only be restored when uploading a flow definition as a new Process Group."); + } + // Perform the request if (isReplicateRequest()) { return replicate(HttpMethod.PUT, importEntity); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestProcessGroupResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestProcessGroupResource.java index f34f0704afee..7fb43da69f8b 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestProcessGroupResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestProcessGroupResource.java @@ -60,7 +60,7 @@ public void testExportProcessGroup(@Mock RegisteredFlowSnapshot versionedFlowSna when(versionedFlowSnapshot.getFlowContents()).thenReturn(versionedProcessGroup); when(versionedProcessGroup.getName()).thenReturn("flowname"); - try (Response response = processGroupResource.exportProcessGroup(groupId, false)) { + try (Response response = processGroupResource.exportProcessGroup(groupId, false, false)) { assertEquals(200, response.getStatus()); assertEquals(versionedFlowSnapshot, response.getEntity()); } diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/StatefulCountProcessor.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/StatefulCountProcessor.java new file mode 100644 index 000000000000..85d13228e0c8 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/StatefulCountProcessor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +@DefaultSchedule(period = "100 ms") +@Stateful(scopes = {Scope.CLUSTER, Scope.LOCAL}, description = "Stores a counter in both cluster and local state") +public class StatefulCountProcessor extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .build(); + + @Override + public Set getRelationships() { + return Set.of(REL_SUCCESS); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + flowFile = session.create(); + } + + try { + incrementState(session, Scope.CLUSTER); + incrementState(session, Scope.LOCAL); + } catch (final IOException e) { + throw new ProcessException(e); + } + + session.transfer(flowFile, REL_SUCCESS); + } + + private void incrementState(final ProcessSession session, final Scope scope) throws IOException { + final StateMap stateMap = session.getState(scope); + final String countValue = stateMap.toMap().get("count"); + final int count = countValue == null ? 0 : Integer.parseInt(countValue); + session.setState(Map.of("count", String.valueOf(count + 1)), scope); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a12f954cb84a..d432d4d425a9 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -49,6 +49,7 @@ org.apache.nifi.processors.tests.system.SetState org.apache.nifi.processors.tests.system.Sleep org.apache.nifi.processors.tests.system.SplitByLine org.apache.nifi.processors.tests.system.SplitTextByLine +org.apache.nifi.processors.tests.system.StatefulCountProcessor org.apache.nifi.processors.tests.system.TerminateFlowFile org.apache.nifi.processors.tests.system.TransferBatch org.apache.nifi.processors.tests.system.ThrowExceptionInFlowFileFilter diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/ClusterFlowDefinitionExportImportStateIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/ClusterFlowDefinitionExportImportStateIT.java new file mode 100644 index 000000000000..97fe6af83467 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/ClusterFlowDefinitionExportImportStateIT.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.pg; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.flow.VersionedComponentState; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; +import org.apache.nifi.tests.system.NiFiInstanceFactory; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.dto.ComponentStateDTO; +import org.apache.nifi.web.api.dto.StateEntryDTO; +import org.apache.nifi.web.api.entity.ComponentStateEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; +import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClusterFlowDefinitionExportImportStateIT extends NiFiSystemIT { + + private static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Override + public NiFiInstanceFactory getInstanceFactory() { + return createTwoNodeInstanceFactory(); + } + + @Test + public void testClusterExportCapturesClusterState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("GenerateFlowFile", pg.getId()); + getClientUtil().updateProcessorProperties(stateful, Collections.singletonMap("State Scope", "CLUSTER")); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.CLUSTER); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st13-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final VersionedProcessor proc = findProcessorByType(snapshot.getFlowContents(), "GenerateFlowFile"); + assertNotNull(proc); + assertNotNull(proc.getComponentState()); + assertNotNull(proc.getComponentState().getClusterState()); + assertNotNull(proc.getComponentState().getClusterState().get("count")); + } + + @Test + public void testClusterExportCapturesLocalStateFromBothNodes() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("GenerateFlowFile", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st14-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final VersionedProcessor proc = findProcessorByType(snapshot.getFlowContents(), "GenerateFlowFile"); + assertNotNull(proc); + assertNotNull(proc.getComponentState()); + assertNotNull(proc.getComponentState().getLocalNodeStates()); + assertEquals(2, proc.getComponentState().getLocalNodeStates().size(), + "Should have local state from both nodes"); + assertNotNull(proc.getComponentState().getLocalNodeStates().get(0)); + assertNotNull(proc.getComponentState().getLocalNodeStates().get(1)); + } + + @Test + public void testClusterExportCapturesBothScopes() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.CLUSTER); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st15-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final VersionedProcessor proc = findProcessorByType(snapshot.getFlowContents(), "StatefulCountProcessor"); + assertNotNull(proc); + + final VersionedComponentState state = proc.getComponentState(); + assertNotNull(state); + assertNotNull(state.getClusterState(), "Cluster state should be present"); + assertNotNull(state.getClusterState().get("count"), "Cluster state should contain count"); + assertNotNull(state.getLocalNodeStates(), "Local node states should be present"); + assertEquals(2, state.getLocalNodeStates().size()); + assertNotNull(state.getLocalNodeStates().get(0).getState().get("count")); + assertNotNull(state.getLocalNodeStates().get(1).getState().get("count")); + } + + @Test + public void testClusterRoundTripSameTopologyBothScopes() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.CLUSTER); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final Map originalClusterState = getProcessorState(stateful.getId(), Scope.CLUSTER); + + final File exportFile = new File("target/st16-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + emptyQueuesAndDeleteProcessGroup(pg); + + final ProcessGroupEntity uploaded = getNifiClient().getProcessGroupClient().upload("root", exportFile, "ImportedGroup", 0.0, 0.0); + final ProcessorEntity importedProcessor = findProcessorByTypeInGroup(uploaded.getId(), "StatefulCountProcessor"); + assertNotNull(importedProcessor); + + final Map importedClusterState = getProcessorState(importedProcessor.getId(), Scope.CLUSTER); + assertEquals(originalClusterState.get("count"), importedClusterState.get("count"), + "Cluster state should be restored after round-trip"); + } + + @Test + public void testClusterExportRunningProcessorReturns409() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + + try { + final File exportFile = new File("target/st20-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), false, true, exportFile); + throw new AssertionError("Expected export to fail when processors are running"); + } catch (final NiFiClientException e) { + assertNotNull(e.getMessage()); + } finally { + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + } + } + + @Test + public void testClusterReplaceRejectsFlowDefinitionWithComponentState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.CLUSTER); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st21-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final NiFiClientException exception = assertThrows(NiFiClientException.class, () -> replaceProcessGroup(pg, snapshot)); + assertTrue(exception.getMessage().contains("component state"), + "Expected rejection message about component state but got: " + exception.getMessage()); + } + + @Test + public void testClusterImportWithoutComponentStateHasNoState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.CLUSTER); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st22-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, false, exportFile); + + emptyQueuesAndDeleteProcessGroup(pg); + + final ProcessGroupEntity uploaded = getNifiClient().getProcessGroupClient().upload("root", exportFile, "ImportedGroup", 0.0, 0.0); + final ProcessorEntity importedProcessor = findProcessorByTypeInGroup(uploaded.getId(), "StatefulCountProcessor"); + assertNotNull(importedProcessor); + + final Map importedClusterState = getProcessorState(importedProcessor.getId(), Scope.CLUSTER); + assertTrue(importedClusterState.isEmpty(), "State should be empty when imported without componentState"); + } + + private void waitForStatePopulated(final String processorId, final Scope scope) throws InterruptedException { + waitFor(() -> { + try { + final Map state = getProcessorState(processorId, scope); + return state.get("count") != null; + } catch (final Exception e) { + return false; + } + }); + } + + private Map getProcessorState(final String processorId, final Scope scope) throws NiFiClientException, IOException { + final ComponentStateEntity stateEntity = getNifiClient().getProcessorClient().getProcessorState(processorId); + final ComponentStateDTO componentState = stateEntity.getComponentState(); + final Map result = new HashMap<>(); + + switch (scope) { + case LOCAL: + if (componentState != null && componentState.getLocalState() != null && componentState.getLocalState().getState() != null) { + for (final StateEntryDTO entry : componentState.getLocalState().getState()) { + result.put(entry.getKey(), entry.getValue()); + } + } + break; + case CLUSTER: + if (componentState != null && componentState.getClusterState() != null && componentState.getClusterState().getState() != null) { + for (final StateEntryDTO entry : componentState.getClusterState().getState()) { + result.put(entry.getKey(), entry.getValue()); + } + } + break; + } + return result; + } + + private VersionedProcessor findProcessorByType(final VersionedProcessGroup group, final String typeSuffix) { + if (group.getProcessors() != null) { + for (final VersionedProcessor vp : group.getProcessors()) { + if (vp.getType() != null && vp.getType().endsWith(typeSuffix)) { + return vp; + } + } + } + return null; + } + + private ProcessorEntity findProcessorByTypeInGroup(final String groupId, final String typeSuffix) throws NiFiClientException, IOException { + return getNifiClient().getFlowClient().getProcessGroup(groupId) + .getProcessGroupFlow().getFlow().getProcessors().stream() + .filter(pe -> pe.getComponent().getType().endsWith(typeSuffix)) + .findFirst().orElse(null); + } + + private void emptyQueuesAndDeleteProcessGroup(final ProcessGroupEntity pg) throws NiFiClientException, IOException, InterruptedException { + getNifiClient().getProcessGroupClient().emptyQueues(pg.getId()); + waitFor(() -> { + try { + return getNifiClient().getProcessGroupClient().getProcessGroup(pg.getId()) + .getStatus().getAggregateSnapshot().getQueuedCount().equals("0"); + } catch (final Exception e) { + return false; + } + }); + final ProcessGroupEntity refreshed = getNifiClient().getProcessGroupClient().getProcessGroup(pg.getId()); + refreshed.setDisconnectedNodeAcknowledged(true); + getNifiClient().getProcessGroupClient().deleteProcessGroup(refreshed); + } + + private void replaceProcessGroup(final ProcessGroupEntity pg, final RegisteredFlowSnapshot snapshot) throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupImportEntity importEntity = new ProcessGroupImportEntity(); + importEntity.setVersionedFlowSnapshot(snapshot); + importEntity.setProcessGroupRevision(getNifiClient().getProcessGroupClient().getProcessGroup(pg.getId()).getRevision()); + + final ProcessGroupReplaceRequestEntity replaceRequest = + getNifiClient().getProcessGroupClient().replaceProcessGroup(pg.getId(), importEntity); + final String requestId = replaceRequest.getRequest().getRequestId(); + + waitFor(() -> { + try { + final ProcessGroupReplaceRequestEntity req = + getNifiClient().getProcessGroupClient().getProcessGroupReplaceRequest(pg.getId(), requestId); + return req != null && req.getRequest().isComplete(); + } catch (final Exception e) { + return false; + } + }); + + final ProcessGroupReplaceRequestEntity finalRequest = + getNifiClient().getProcessGroupClient().getProcessGroupReplaceRequest(pg.getId(), requestId); + assertNull(finalRequest.getRequest().getFailureReason(), + "Replace failed: %s".formatted(finalRequest.getRequest().getFailureReason())); + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/FlowDefinitionExportImportStateIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/FlowDefinitionExportImportStateIT.java new file mode 100644 index 000000000000..91cb711ccb49 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/FlowDefinitionExportImportStateIT.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.pg; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.flow.VersionedComponentState; +import org.apache.nifi.flow.VersionedNodeState; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.dto.ComponentStateDTO; +import org.apache.nifi.web.api.dto.StateEntryDTO; +import org.apache.nifi.web.api.entity.ComponentStateEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; +import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class FlowDefinitionExportImportStateIT extends NiFiSystemIT { + + private static final ObjectMapper MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Test + public void testExportWithoutStateFlagHasNoComponentState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st1-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), false, false, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + for (final VersionedProcessor vp : snapshot.getFlowContents().getProcessors()) { + assertNull(vp.getComponentState(), "componentState should be null when includeComponentState=false"); + } + } + + @Test + public void testExportWithStateFlagHasBothScopes() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st2-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final VersionedProcessor proc = findProcessorByType(snapshot.getFlowContents(), "StatefulCountProcessor"); + assertNotNull(proc); + + final VersionedComponentState state = proc.getComponentState(); + assertNotNull(state, "componentState should be present when includeComponentState=true"); + assertNotNull(state.getLocalNodeStates(), "localNodeStates should be present for StatefulCountProcessor"); + assertEquals(1, state.getLocalNodeStates().size()); + final VersionedNodeState nodeState = state.getLocalNodeStates().get(0); + assertNotNull(nodeState); + assertNotNull(nodeState.getState().get("count")); + assertNotNull(state.getClusterState(), "clusterState should be present (falls back to local provider on standalone)"); + assertNotNull(state.getClusterState().get("count")); + } + + @Test + public void testExportWithStateWhileRunningReturns409() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + + try { + final File exportFile = new File("target/st3-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), false, true, exportFile); + throw new AssertionError("Expected export to fail when processors are running"); + } catch (final NiFiClientException e) { + assertNotNull(e.getMessage()); + } finally { + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + } + } + + @Test + public void testExportWithStateWhileControllerServiceEnabledReturns409() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ControllerServiceEntity service = getClientUtil().createControllerService("StandardCountService", pg.getId()); + getClientUtil().enableControllerService(service); + + try { + final File exportFile = new File("target/st4-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), false, true, exportFile); + throw new AssertionError("Expected export to fail when controller services are enabled"); + } catch (final NiFiClientException e) { + assertNotNull(e.getMessage()); + } finally { + getClientUtil().disableControllerService(service); + } + } + + @Test + public void testExportWithStateFlagNonStatefulProcessorHasNoComponentState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st5-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final VersionedProcessor statefulVersioned = findProcessorByType(snapshot.getFlowContents(), "StatefulCountProcessor"); + final VersionedProcessor terminateVersioned = findProcessorByType(snapshot.getFlowContents(), "TerminateFlowFile"); + + assertNotNull(statefulVersioned.getComponentState(), "Stateful processor should have componentState"); + assertNull(terminateVersioned.getComponentState(), "Non-stateful processor should not have componentState"); + } + + @Test + public void testExportDefaultNoParamHasNoState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st6-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), false, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + for (final VersionedProcessor vp : snapshot.getFlowContents().getProcessors()) { + assertNull(vp.getComponentState(), "componentState should be null by default"); + } + } + + @Test + public void testStandaloneRoundTripViaUploadLocalState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final Map originalLocalState = getProcessorState(stateful.getId(), Scope.LOCAL); + assertNotNull(originalLocalState.get("count")); + + final File exportFile = new File("target/st7-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + emptyQueuesAndDeleteProcessGroup(pg); + + final ProcessGroupEntity uploaded = getNifiClient().getProcessGroupClient().upload("root", exportFile, "ImportedGroup", 0.0, 0.0); + + final ProcessorEntity importedProcessor = findProcessorByTypeInGroup(uploaded.getId(), "StatefulCountProcessor"); + assertNotNull(importedProcessor); + + final Map importedLocalState = getProcessorState(importedProcessor.getId(), Scope.LOCAL); + assertEquals(originalLocalState.get("count"), importedLocalState.get("count"), "Local state count should match after round-trip"); + } + + @Test + public void testReplaceRejectsFlowDefinitionWithComponentState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st8-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final NiFiClientException exception = assertThrows(NiFiClientException.class, () -> replaceProcessGroup(pg, snapshot)); + assertTrue(exception.getMessage().contains("component state"), + "Expected rejection message about component state but got: " + exception.getMessage()); + } + + @Test + public void testImportFlowWithoutComponentStateHasNoState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st9-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, false, exportFile); + + emptyQueuesAndDeleteProcessGroup(pg); + + final ProcessGroupEntity uploaded = getNifiClient().getProcessGroupClient().upload("root", exportFile, "ImportedGroup", 0.0, 0.0); + final ProcessorEntity importedProcessor = findProcessorByTypeInGroup(uploaded.getId(), "StatefulCountProcessor"); + assertNotNull(importedProcessor); + + final Map importedLocalState = getProcessorState(importedProcessor.getId(), Scope.LOCAL); + assertTrue(importedLocalState.isEmpty(), "State should be empty when imported without componentState"); + } + + @Test + public void testNestedProcessGroupsStateExportedRecursively() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity parent = getClientUtil().createProcessGroup("ParentGroup", "root"); + final ProcessGroupEntity child = getClientUtil().createProcessGroup("ChildGroup", parent.getId()); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", child.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", child.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final Map originalLocalState = getProcessorState(stateful.getId(), Scope.LOCAL); + + final File exportFile = new File("target/st10-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(parent.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final VersionedProcessGroup childGroup = snapshot.getFlowContents().getProcessGroups().iterator().next(); + final VersionedProcessor nestedProcessor = findProcessorByType(childGroup, "StatefulCountProcessor"); + assertNotNull(nestedProcessor); + assertNotNull(nestedProcessor.getComponentState(), "Nested processor should have componentState"); + assertNotNull(nestedProcessor.getComponentState().getLocalNodeStates(), "Nested processor should have localNodeStates"); + + emptyQueuesAndDeleteProcessGroup(parent); + + final ProcessGroupEntity uploaded = getNifiClient().getProcessGroupClient().upload("root", exportFile, "ImportedParent", 0.0, 0.0); + final ProcessGroupEntity importedChild = getNifiClient().getFlowClient().getProcessGroup(uploaded.getId()) + .getProcessGroupFlow().getFlow().getProcessGroups().iterator().next(); + final ProcessorEntity importedProcessor = findProcessorByTypeInGroup(importedChild.getId(), "StatefulCountProcessor"); + assertNotNull(importedProcessor); + + assertEquals(originalLocalState.get("count"), getProcessorState(importedProcessor.getId(), Scope.LOCAL).get("count")); + } + + @Test + public void testMultipleStatefulProcessorsEachGetsOwnState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity gen1 = getClientUtil().createProcessor("GenerateFlowFile", pg.getId()); + final ProcessorEntity gen2 = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(gen1, terminate, "success"); + getClientUtil().createConnection(gen2, terminate, "success"); + + getClientUtil().startProcessor(gen1); + getClientUtil().startProcessor(gen2); + waitForStatePopulated(gen1.getId(), Scope.LOCAL); + waitForStatePopulated(gen2.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(gen1); + getClientUtil().stopProcessor(gen2); + getClientUtil().waitForStoppedProcessor(gen1.getId()); + getClientUtil().waitForStoppedProcessor(gen2.getId()); + + final Map state1 = getProcessorState(gen1.getId(), Scope.LOCAL); + final Map state2 = getProcessorState(gen2.getId(), Scope.LOCAL); + + final File exportFile = new File("target/st11-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + emptyQueuesAndDeleteProcessGroup(pg); + + final ProcessGroupEntity uploaded = getNifiClient().getProcessGroupClient().upload("root", exportFile, "ImportedGroup", 0.0, 0.0); + final ProcessorEntity imported1 = findProcessorByTypeInGroup(uploaded.getId(), "GenerateFlowFile"); + final ProcessorEntity imported2 = findProcessorByTypeInGroup(uploaded.getId(), "StatefulCountProcessor"); + + assertEquals(state1.get("count"), getProcessorState(imported1.getId(), Scope.LOCAL).get("count")); + assertEquals(state2.get("count"), getProcessorState(imported2.getId(), Scope.LOCAL).get("count")); + } + + @Test + public void testExportWithStateEmptyStateNeverRan() throws NiFiClientException, IOException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + + final File exportFile = new File("target/st23-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + final VersionedProcessor proc = findProcessorByType(snapshot.getFlowContents(), "StatefulCountProcessor"); + assertNull(proc.getComponentState(), "componentState should be null when processor has never run"); + } + + @Test + public void testExportWithStateNoStatefulComponents() throws NiFiClientException, IOException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + + final File exportFile = new File("target/st24-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, true, exportFile); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + for (final VersionedProcessor vp : snapshot.getFlowContents().getProcessors()) { + assertNull(vp.getComponentState(), "Non-stateful processor should have null componentState"); + } + } + + @Test + public void testReplaceSucceedsWhenFlowExportedWithoutComponentState() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity pg = getClientUtil().createProcessGroup("TestGroup", "root"); + final ProcessorEntity stateful = getClientUtil().createProcessor("StatefulCountProcessor", pg.getId()); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", pg.getId()); + getClientUtil().createConnection(stateful, terminate, "success"); + + getClientUtil().startProcessor(stateful); + waitForStatePopulated(stateful.getId(), Scope.LOCAL); + getClientUtil().stopProcessor(stateful); + getClientUtil().waitForStoppedProcessor(stateful.getId()); + + final File exportFile = new File("target/st27-export.json"); + getNifiClient().getProcessGroupClient().exportProcessGroup(pg.getId(), true, false, exportFile); + + getNifiClient().getProcessGroupClient().emptyQueues(pg.getId()); + waitFor(() -> { + try { + return getNifiClient().getProcessGroupClient().getProcessGroup(pg.getId()) + .getStatus().getAggregateSnapshot().getQueuedCount().equals("0"); + } catch (final Exception e) { + return false; + } + }); + + final RegisteredFlowSnapshot snapshot = MAPPER.readValue(exportFile, RegisteredFlowSnapshot.class); + replaceProcessGroup(pg, snapshot); + + final ProcessorEntity importedProcessor = findProcessorByTypeInGroup(pg.getId(), "StatefulCountProcessor"); + assertNotNull(importedProcessor); + } + + private void waitForStatePopulated(final String processorId, final Scope scope) throws InterruptedException { + waitFor(() -> { + try { + final Map state = getProcessorState(processorId, scope); + return state.get("count") != null; + } catch (final Exception e) { + return false; + } + }); + } + + private Map getProcessorState(final String processorId, final Scope scope) throws NiFiClientException, IOException { + final ComponentStateEntity stateEntity = getNifiClient().getProcessorClient().getProcessorState(processorId); + final ComponentStateDTO componentState = stateEntity.getComponentState(); + final Map result = new HashMap<>(); + + switch (scope) { + case LOCAL: + if (componentState != null && componentState.getLocalState() != null && componentState.getLocalState().getState() != null) { + for (final StateEntryDTO entry : componentState.getLocalState().getState()) { + result.put(entry.getKey(), entry.getValue()); + } + } + break; + case CLUSTER: + if (componentState != null && componentState.getClusterState() != null && componentState.getClusterState().getState() != null) { + for (final StateEntryDTO entry : componentState.getClusterState().getState()) { + result.put(entry.getKey(), entry.getValue()); + } + } + break; + } + return result; + } + + private VersionedProcessor findProcessorByType(final VersionedProcessGroup group, final String typeSuffix) { + if (group.getProcessors() != null) { + for (final VersionedProcessor vp : group.getProcessors()) { + if (vp.getType() != null && vp.getType().endsWith(typeSuffix)) { + return vp; + } + } + } + return null; + } + + private ProcessorEntity findProcessorByTypeInGroup(final String groupId, final String typeSuffix) throws NiFiClientException, IOException { + return getNifiClient().getFlowClient().getProcessGroup(groupId) + .getProcessGroupFlow().getFlow().getProcessors().stream() + .filter(pe -> pe.getComponent().getType().endsWith(typeSuffix)) + .findFirst().orElse(null); + } + + private void emptyQueuesAndDeleteProcessGroup(final ProcessGroupEntity pg) throws NiFiClientException, IOException, InterruptedException { + getNifiClient().getProcessGroupClient().emptyQueues(pg.getId()); + waitFor(() -> { + try { + return getNifiClient().getProcessGroupClient().getProcessGroup(pg.getId()) + .getStatus().getAggregateSnapshot().getQueuedCount().equals("0"); + } catch (final Exception e) { + return false; + } + }); + final ProcessGroupEntity refreshed = getNifiClient().getProcessGroupClient().getProcessGroup(pg.getId()); + refreshed.setDisconnectedNodeAcknowledged(true); + getNifiClient().getProcessGroupClient().deleteProcessGroup(refreshed); + } + + private void replaceProcessGroup(final ProcessGroupEntity pg, final RegisteredFlowSnapshot snapshot) throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupImportEntity importEntity = new ProcessGroupImportEntity(); + importEntity.setVersionedFlowSnapshot(snapshot); + importEntity.setProcessGroupRevision(getNifiClient().getProcessGroupClient().getProcessGroup(pg.getId()).getRevision()); + + final ProcessGroupReplaceRequestEntity replaceRequest = + getNifiClient().getProcessGroupClient().replaceProcessGroup(pg.getId(), importEntity); + final String requestId = replaceRequest.getRequest().getRequestId(); + + waitFor(() -> { + try { + final ProcessGroupReplaceRequestEntity req = + getNifiClient().getProcessGroupClient().getProcessGroupReplaceRequest(pg.getId(), requestId); + return req != null && req.getRequest().isComplete(); + } catch (final Exception e) { + return false; + } + }); + + final ProcessGroupReplaceRequestEntity finalRequest = + getNifiClient().getProcessGroupClient().getProcessGroupReplaceRequest(pg.getId(), requestId); + assertNull(finalRequest.getRequest().getFailureReason(), + "Replace failed: %s".formatted(finalRequest.getRequest().getFailureReason())); + } +} diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java index 2d530eff05bb..a34c27e3d866 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java @@ -68,6 +68,8 @@ FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnipp File exportProcessGroup(String processGroupId, boolean includeReferencedServices, File outputFile) throws NiFiClientException, IOException; + File exportProcessGroup(String processGroupId, boolean includeReferencedServices, boolean includeComponentState, File outputFile) throws NiFiClientException, IOException; + DropRequestEntity emptyQueues(String processGroupId) throws NiFiClientException, IOException; DropRequestEntity getEmptyQueuesRequest(String processGroupId, String requestId) throws NiFiClientException, IOException; diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java index 19798f8015e8..f5e294e0030a 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java @@ -274,6 +274,12 @@ public FlowComparisonEntity getLocalModifications(final String processGroupId) t @Override public File exportProcessGroup(final String processGroupId, final boolean includeReferencedServices, final File outputFile) throws NiFiClientException, IOException { + return exportProcessGroup(processGroupId, includeReferencedServices, false, outputFile); + } + + @Override + public File exportProcessGroup(final String processGroupId, final boolean includeReferencedServices, final boolean includeComponentState, + final File outputFile) throws NiFiClientException, IOException { if (StringUtils.isBlank(processGroupId)) { throw new IllegalArgumentException("Process group id cannot be null or blank"); } @@ -282,7 +288,8 @@ public File exportProcessGroup(final String processGroupId, final boolean includ final WebTarget target = processGroupsTarget .path("{id}/download") .resolveTemplate("id", processGroupId) - .queryParam("includeReferencedServices", includeReferencedServices); + .queryParam("includeReferencedServices", includeReferencedServices) + .queryParam("includeComponentState", includeComponentState); final Response response = getRequestBuilder(target) .accept(MediaType.APPLICATION_JSON)