Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.nifi.flow.synchronization;

import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.asset.Asset;
import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
Expand Down Expand Up @@ -57,13 +62,16 @@
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedAsset;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedComponentState;
import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedFlowAnalysisRule;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedLabel;
import org.apache.nifi.flow.VersionedNodeState;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
Expand Down Expand Up @@ -125,6 +133,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -327,6 +337,8 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve
createdAndModifiedExtensions.clear();
setSynchronizationOptions(options);

validateLocalStateTopology(versionedExternalFlow.getFlowContents());

for (final FlowDifference diff : flowComparison.getDifferences()) {
if (FlowDifferenceFilters.isPropertyMissingFromGhostComponent(diff, context.getFlowManager())) {
continue;
Expand Down Expand Up @@ -1411,6 +1423,8 @@ private ControllerServiceNode addControllerService(final ProcessGroup destinatio

updateControllerService(newService, proposed, topLevelGroup);

restoreComponentState(newService.getIdentifier(), proposed.getComponentState(), newService);

return newService;
}

Expand Down Expand Up @@ -2708,6 +2722,8 @@ private ProcessorNode addProcessor(final ProcessGroup destination, final Version
procNode.onConfigurationRestored(processContext);
connectableAdditionTracker.addComponent(destination.getIdentifier(), proposed.getIdentifier(), procNode);

restoreComponentState(procNode.getIdentifier(), proposed.getComponentState(), procNode);

return procNode;
}

Expand Down Expand Up @@ -4039,6 +4055,102 @@ private Map<String, String> getPropertyValues(final ComponentNode componentNode)
return propertyValues;
}

private void validateLocalStateTopology(final VersionedProcessGroup proposed) {
final int connectedNodeCount = context.getConnectedNodeCount();
if (connectedNodeCount <= 0) {
return;
}

final int maxSourceNodes = findMaxLocalStateNodeCount(proposed);
if (maxSourceNodes > connectedNodeCount) {
throw new IllegalStateException(
"Cannot import flow with component state: the flow definition contains local state from %d source node(s) but the destination cluster has only %d connected node(s). "
.formatted(maxSourceNodes, connectedNodeCount)
+ "Import into a cluster with at least %d node(s), or export without component state.".formatted(maxSourceNodes));
}
}

private int findMaxLocalStateNodeCount(final VersionedProcessGroup group) {
int max = 0;
for (final VersionedConfigurableExtension ext : getStatefulExtensions(group)) {
final VersionedComponentState state = ext.getComponentState();
if (state != null && state.getLocalNodeStates() != null) {
max = Math.max(max, state.getLocalNodeStates().size());
}
}
if (group.getProcessGroups() != null) {
for (final VersionedProcessGroup child : group.getProcessGroups()) {
max = Math.max(max, findMaxLocalStateNodeCount(child));
}
}
return max;
}

private List<VersionedConfigurableExtension> getStatefulExtensions(final VersionedProcessGroup group) {
final List<VersionedConfigurableExtension> extensions = new ArrayList<>();
if (group.getProcessors() != null) {
extensions.addAll(group.getProcessors());
}
if (group.getControllerServices() != null) {
extensions.addAll(group.getControllerServices());
}
return extensions;
}

private void restoreComponentState(final String componentId, final VersionedComponentState componentState, final ComponentNode componentNode) {
if (componentState == null) {
return;
}

final StateManagerProvider stateManagerProvider = context.getStateManagerProvider();
if (stateManagerProvider == null) {
LOG.debug("StateManagerProvider is not available; skipping state restoration for component {}", componentId);
return;
}

final ConfigurableComponent component = componentNode.getComponent();
if (component == null) {
LOG.debug("Component {} is not available; skipping state restoration", componentId);
return;
}

final Stateful stateful = component.getClass().getAnnotation(Stateful.class);
if (stateful == null) {
LOG.debug("Component {} ({}) is not annotated with @Stateful; skipping state restoration", componentId, component.getClass().getSimpleName());
return;
}

final Set<Scope> supportedScopes = Set.of(stateful.scopes());
final StateManager stateManager = stateManagerProvider.getStateManager(componentId);

try {
if (supportedScopes.contains(Scope.CLUSTER) && componentState.getClusterState() != null && !componentState.getClusterState().isEmpty()) {
stateManager.setState(componentState.getClusterState(), Scope.CLUSTER);
LOG.debug("Restored cluster state for component {}", componentId);
}

if (supportedScopes.contains(Scope.LOCAL) && componentState.getLocalNodeStates() != null && !componentState.getLocalNodeStates().isEmpty()) {
final int localNodeOrdinal = context.getLocalNodeOrdinal();
if (localNodeOrdinal < 0) {
LOG.debug("Local node ordinal is not set; skipping local state restoration for component {}", componentId);
return;
}

final List<VersionedNodeState> localNodeStates = componentState.getLocalNodeStates();
final VersionedNodeState nodeState = localNodeOrdinal < localNodeStates.size() ? localNodeStates.get(localNodeOrdinal) : null;
final Map<String, String> localState = nodeState != null ? nodeState.getState() : null;
if (localState != null && !localState.isEmpty()) {
stateManager.setState(localState, Scope.LOCAL);
LOG.debug("Restored local state for component {} from node ordinal {}", componentId, localNodeOrdinal);
} else {
LOG.debug("No local state found for component {} at node ordinal {}", componentId, localNodeOrdinal);
}
}
} catch (final IOException e) {
throw new UncheckedIOException("Failed to restore state for component %s".formatted(componentId), e);
}
}

private record CreatedOrModifiedExtension(ComponentNode extension, Map<String, String> propertyValues) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.nifi.flow.synchronization;

import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ProcessorNode;
Expand Down Expand Up @@ -45,6 +46,9 @@ public class VersionedFlowSynchronizationContext {
private final Function<ProcessorNode, ProcessContext> processContextFactory;
private final Function<ComponentNode, ConfigurationContext> configurationContextFactory;
private final AssetManager assetManager;
private final StateManagerProvider stateManagerProvider;
private final int localNodeOrdinal;
private final int connectedNodeCount;

private VersionedFlowSynchronizationContext(final Builder builder) {
this.componentIdGenerator = builder.componentIdGenerator;
Expand All @@ -57,6 +61,9 @@ private VersionedFlowSynchronizationContext(final Builder builder) {
this.processContextFactory = builder.processContextFactory;
this.configurationContextFactory = builder.configurationContextFactory;
this.assetManager = builder.assetManager;
this.stateManagerProvider = builder.stateManagerProvider;
this.localNodeOrdinal = builder.localNodeOrdinal;
this.connectedNodeCount = builder.connectedNodeCount;
}

public ComponentIdGenerator getComponentIdGenerator() {
Expand Down Expand Up @@ -99,6 +106,18 @@ public AssetManager getAssetManager() {
return assetManager;
}

public StateManagerProvider getStateManagerProvider() {
return stateManagerProvider;
}

public int getLocalNodeOrdinal() {
return localNodeOrdinal;
}

public int getConnectedNodeCount() {
return connectedNodeCount;
}

public static class Builder {
private ComponentIdGenerator componentIdGenerator;
private FlowManager flowManager;
Expand All @@ -110,6 +129,9 @@ public static class Builder {
private Function<ProcessorNode, ProcessContext> processContextFactory;
private Function<ComponentNode, ConfigurationContext> configurationContextFactory;
private AssetManager assetManager;
private StateManagerProvider stateManagerProvider;
private int localNodeOrdinal = -1;
private int connectedNodeCount = -1;

public Builder componentIdGenerator(final ComponentIdGenerator componentIdGenerator) {
this.componentIdGenerator = componentIdGenerator;
Expand Down Expand Up @@ -161,6 +183,21 @@ public Builder assetManager(final AssetManager assetManager) {
return this;
}

public Builder stateManagerProvider(final StateManagerProvider stateManagerProvider) {
this.stateManagerProvider = stateManagerProvider;
return this;
}

public Builder localNodeOrdinal(final int localNodeOrdinal) {
this.localNodeOrdinal = localNodeOrdinal;
return this;
}

public Builder connectedNodeCount(final int connectedNodeCount) {
this.connectedNodeCount = connectedNodeCount;
return this;
}

public VersionedFlowSynchronizationContext build() {
requireNonNull(componentIdGenerator, "Component ID Generator must be set");
requireNonNull(flowManager, "Flow Manager must be set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4043,6 +4043,20 @@ public void verifyCanUpdate(final VersionedExternalFlow updatedFlow, final boole

private VersionedFlowSynchronizationContext createGroupSynchronizationContext(final ComponentIdGenerator componentIdGenerator, final ComponentScheduler componentScheduler,
final FlowMappingOptions flowMappingOptions) {
final int localNodeOrdinal;
final int connectedNodeCount;
if (nodeTypeProvider.isClustered()) {
final Set<String> members = nodeTypeProvider.getClusterMembers();
final List<String> sortedMembers = members.stream().sorted().toList();
final String currentNode = nodeTypeProvider.getCurrentNode().orElse(null);
final int idx = currentNode != null ? sortedMembers.indexOf(currentNode) : -1;
localNodeOrdinal = idx >= 0 ? idx : 0;
connectedNodeCount = Math.max(sortedMembers.size(), 1);
} else {
localNodeOrdinal = 0;
connectedNodeCount = 1;
}

return new VersionedFlowSynchronizationContext.Builder()
.componentIdGenerator(componentIdGenerator)
.flowManager(flowManager)
Expand All @@ -4054,6 +4068,9 @@ private VersionedFlowSynchronizationContext createGroupSynchronizationContext(fi
.processContextFactory(this::createProcessContext)
.configurationContextFactory(this::createConfigurationContext)
.assetManager(assetManager)
.stateManagerProvider(stateManagerProvider)
.localNodeOrdinal(localNodeOrdinal)
.connectedNodeCount(connectedNodeCount)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package org.apache.nifi.registry.flow.mapping;

import org.apache.commons.lang3.ClassUtils;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.asset.Asset;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.listen.ListenPortDefinition;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceDefinition;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
Expand Down Expand Up @@ -53,6 +58,7 @@
import org.apache.nifi.flow.PortType;
import org.apache.nifi.flow.Position;
import org.apache.nifi.flow.VersionedAsset;
import org.apache.nifi.flow.VersionedComponentState;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedFlowAnalysisRule;
Expand All @@ -61,6 +67,7 @@
import org.apache.nifi.flow.VersionedFunnel;
import org.apache.nifi.flow.VersionedLabel;
import org.apache.nifi.flow.VersionedListenPortDefinition;
import org.apache.nifi.flow.VersionedNodeState;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterProvider;
Expand Down Expand Up @@ -92,6 +99,8 @@
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -505,10 +514,54 @@ public VersionedControllerService mapControllerService(final ControllerServiceNo
versionedService.setPropertyDescriptors(mapPropertyDescriptors(controllerService, serviceProvider, includedGroupIds, externalControllerServiceReferences));
versionedService.setType(controllerService.getCanonicalClassName());
versionedService.setScheduledState(flowMappingOptions.getStateLookup().getState(controllerService));
versionedService.setComponentState(mapComponentState(controllerService));

return versionedService;
}

private VersionedComponentState mapComponentState(final ComponentNode componentNode) {
if (!flowMappingOptions.isMapComponentState()) {
return null;
}

final ConfigurableComponent component = componentNode.getComponent();
if (component == null) {
return null;
}

final Stateful stateful = component.getClass().getAnnotation(Stateful.class);
if (stateful == null) {
return null;
}

final String componentId = componentNode.getIdentifier();
final StateManager stateManager = flowMappingOptions.getStateManagerProvider().getStateManager(componentId);
final VersionedComponentState result = new VersionedComponentState();
boolean hasState = false;

try {
for (final Scope scope : stateful.scopes()) {
final StateMap stateMap = stateManager.getState(scope);
if (stateMap != null && !stateMap.toMap().isEmpty()) {
if (scope == Scope.CLUSTER) {
result.setClusterState(stateMap.toMap());
hasState = true;
} else if (scope == Scope.LOCAL) {
final int ordinal = flowMappingOptions.getLocalNodeOrdinal();
final List<VersionedNodeState> localStates = new ArrayList<>(Collections.nCopies(ordinal + 1, null));
localStates.set(ordinal, new VersionedNodeState(stateMap.toMap()));
result.setLocalNodeStates(localStates);
hasState = true;
}
}
}
} catch (final IOException e) {
throw new UncheckedIOException("Failed to retrieve state for component %s".formatted(componentId), e);
}

return hasState ? result : null;
}

private Map<String, String> mapProperties(final ComponentNode component, final ControllerServiceProvider serviceProvider) {
final Map<String, String> mapped = new HashMap<>();

Expand Down Expand Up @@ -774,6 +827,7 @@ public VersionedProcessor mapProcessor(final ProcessorNode procNode, final Contr
processor.setRetriedRelationships(procNode.getRetriedRelationships());
processor.setBackoffMechanism(procNode.getBackoffMechanism().name());
processor.setMaxBackoffPeriod(procNode.getMaxBackoffPeriod());
processor.setComponentState(mapComponentState(procNode));

return processor;
}
Expand Down
Loading
Loading