From 418c2d88d4db0cc7c2266083dc1569a5ee7bec02 Mon Sep 17 00:00:00 2001 From: Kevin Doran Date: Mon, 6 Apr 2026 21:14:12 -0400 Subject: [PATCH 1/2] NIFI-15801 Stop processors in synchronizeProcessors before updating synchronizeProcessors() calls updateProcessor() which requires the processor to be stopped (setAnnotationData throws IllegalStateException if isRunning). The NiFi REST path stops affected processors before calling synchronizeFlow, but the Connector applyUpdate path does not, exposing this gap. Stop processors via stopOrTerminate before updateProcessor, matching the single-processor synchronize(ProcessorNode, ...) path. Track stopped processors and restart them in a finally block, matching the pattern used in synchronizeProcessGroupSettings. --- ...tandardVersionedComponentSynchronizer.java | 44 +++++++--- ...ardVersionedComponentSynchronizerTest.java | 80 +++++++++++++++++++ 2 files changed, 111 insertions(+), 13 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 092d2f7e7bb9..675eb8fe6f08 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -1195,19 +1195,37 @@ private void synchronizeProcessors(final ProcessGroup group, final VersionedProc final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { - for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { - final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); - if (processor == null) { - final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); - LOG.info("Added {} to {}", added, group); - } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { - updateProcessor(processor, proposedProcessor, topLevelGroup); - // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, - // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state - createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); - LOG.info("Updated {}", processor); - } else { - processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); + final Set stoppedProcessors = new HashSet<>(); + + try { + for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { + final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); + if (processor == null) { + final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); + LOG.info("Added {} to {}", added, group); + } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { + final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); + try { + final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); + if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { + stoppedProcessors.add(processor); + } + } catch (final TimeoutException | FlowSynchronizationException e) { + throw new ProcessorInstantiationException(processor.getIdentifier(), e); + } + updateProcessor(processor, proposedProcessor, topLevelGroup); + // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, + // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state + createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); + LOG.info("Updated {}", processor); + } else { + processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); + } + } + } finally { + for (final ProcessorNode processor : stoppedProcessors) { + processor.getProcessGroup().startProcessor(processor, false); + notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java index 1dd5bd17f7f1..d07309b333f1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java @@ -25,6 +25,7 @@ import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceInitializationContext; @@ -577,6 +578,85 @@ public void testTerminateWhenProcessorDoesNotStop() throws FlowSynchronizationEx verify(processorA, times(1)).setName(versionedProcessor.getName()); } + private ProcessorNode createMappableProcessor(final ProcessGroup processGroup) { + final ProcessorNode processor = createMockProcessor(); + when(processor.getProcessGroup()).thenReturn(processGroup); + when(processor.getVersionedComponentId()).thenReturn(Optional.of(UUID.randomUUID().toString())); + when(processor.getBulletinLevel()).thenReturn(LogLevel.WARN); + when(processor.getCanonicalClassName()).thenReturn("org.apache.nifi.processors.Test"); + when(processor.getAutoTerminatedRelationships()).thenReturn(Collections.emptySet()); + when(processor.getMaxConcurrentTasks()).thenReturn(1); + when(processor.getExecutionNode()).thenReturn(ExecutionNode.ALL); + when(processor.getPenalizationPeriod()).thenReturn("30 sec"); + when(processor.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0)); + when(processor.getRunDuration(TimeUnit.MILLISECONDS)).thenReturn(0L); + when(processor.getSchedulingPeriod()).thenReturn("0 sec"); + when(processor.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN); + when(processor.getYieldPeriod()).thenReturn("1 sec"); + when(processor.getRetryCount()).thenReturn(10); + when(processor.getRetriedRelationships()).thenReturn(Collections.emptySet()); + when(processor.getBackoffMechanism()).thenReturn(BackoffMechanism.PENALIZE_FLOWFILE); + when(processor.getMaxBackoffPeriod()).thenReturn("10 mins"); + when(processor.getRelationships()).thenReturn(Collections.emptySet()); + return processor; + } + + @Test + public void testGroupSynchronizeStopsRunningProcessorBeforeUpdate() { + final ProcessGroup processGroup = createMockProcessGroup(); + final ProcessorNode runningProcessor = createMappableProcessor(processGroup); + when(runningProcessor.isRunning()).thenReturn(true); + when(runningProcessor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.RUNNING); + when(processGroup.stopProcessor(runningProcessor)).thenReturn(CompletableFuture.completedFuture(null)); + when(processGroup.getProcessors()).thenReturn(List.of(runningProcessor)); + + final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); + versionedProcessor.setIdentifier(runningProcessor.getVersionedComponentId().orElse(runningProcessor.getIdentifier())); + versionedProcessor.setProperties(Collections.singletonMap("abc", "updated-value")); + versionedProcessor.setScheduledState(ScheduledState.RUNNING); + + final VersionedProcessGroup versionedGroup = new VersionedProcessGroup(); + versionedGroup.setIdentifier("pg-v1"); + versionedGroup.setProcessors(Set.of(versionedProcessor)); + + final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); + externalFlow.setFlowContents(versionedGroup); + + assertDoesNotThrow(() -> synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions)); + + verify(processGroup, atLeast(1)).stopProcessor(runningProcessor); + verify(runningProcessor).setProperties(eq(Collections.singletonMap("abc", "updated-value")), eq(true), anySet()); + verify(processGroup, atLeast(1)).startProcessor(runningProcessor, false); + } + + @Test + public void testGroupSynchronizeDoesNotRestartProcessorWhenProposedStateNotRunning() { + final ProcessGroup processGroup = createMockProcessGroup(); + final ProcessorNode runningProcessor = createMappableProcessor(processGroup); + when(runningProcessor.isRunning()).thenReturn(true); + when(runningProcessor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.RUNNING); + when(processGroup.stopProcessor(runningProcessor)).thenReturn(CompletableFuture.completedFuture(null)); + when(processGroup.getProcessors()).thenReturn(List.of(runningProcessor)); + + final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); + versionedProcessor.setIdentifier(runningProcessor.getVersionedComponentId().orElse(runningProcessor.getIdentifier())); + versionedProcessor.setProperties(Collections.singletonMap("abc", "updated-value")); + versionedProcessor.setScheduledState(ScheduledState.ENABLED); + + final VersionedProcessGroup versionedGroup = new VersionedProcessGroup(); + versionedGroup.setIdentifier("pg-v1"); + versionedGroup.setProcessors(Set.of(versionedProcessor)); + + final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); + externalFlow.setFlowContents(versionedGroup); + + assertDoesNotThrow(() -> synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions)); + + verify(processGroup, atLeast(1)).stopProcessor(runningProcessor); + verify(runningProcessor).setProperties(eq(Collections.singletonMap("abc", "updated-value")), eq(true), anySet()); + verify(processGroup, never()).startProcessor(runningProcessor, false); + } + @Test public void testUpdateConnectionWithSourceDestStopped() throws FlowSynchronizationException, TimeoutException { final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB); From 7fad80428a2549b02afc369c3e9e524c16d71fc7 Mon Sep 17 00:00:00 2001 From: Kevin Doran Date: Mon, 13 Apr 2026 13:17:32 -0400 Subject: [PATCH 2/2] Address peer review feedback --- ...tandardVersionedComponentSynchronizer.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 675eb8fe6f08..a9a8eb70a953 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -270,8 +270,8 @@ public ComponentAdditions addVersionedComponentsToProcessGroup(final ProcessGrou final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), additions.getParameterContexts(), additions.getParameterProviders(), group); additionsBuilder.addProcessGroup(newProcessGroup); - } catch (final ProcessorInstantiationException pie) { - throw new RuntimeException(pie); + } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { + throw new RuntimeException(e); } }); @@ -386,8 +386,8 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); - } catch (final ProcessorInstantiationException pie) { - throw new RuntimeException(pie); + } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { + throw new RuntimeException(e); } }); @@ -416,7 +416,7 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) - throws ProcessorInstantiationException { + throws ProcessorInstantiationException, FlowSynchronizationException { // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, @@ -691,7 +691,7 @@ private String determineRegistryId(final VersionedFlowCoordinates coordinates) { private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, final Map childGroupsByVersionedId, final Map parameterProviderReferences, - final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { + final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); @@ -1193,9 +1193,9 @@ private void removeMissingComponents(final Pro private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, final ProcessGroup topLevelGroup) - throws ProcessorInstantiationException { + throws ProcessorInstantiationException, FlowSynchronizationException { - final Set stoppedProcessors = new HashSet<>(); + final Set processorsToRestart = new HashSet<>(); try { for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { @@ -1208,10 +1208,10 @@ private void synchronizeProcessors(final ProcessGroup group, final VersionedProc try { final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { - stoppedProcessors.add(processor); + processorsToRestart.add(processor); } - } catch (final TimeoutException | FlowSynchronizationException e) { - throw new ProcessorInstantiationException(processor.getIdentifier(), e); + } catch (final TimeoutException e) { + throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e); } updateProcessor(processor, proposedProcessor, topLevelGroup); // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, @@ -1223,7 +1223,7 @@ private void synchronizeProcessors(final ProcessGroup group, final VersionedProc } } } finally { - for (final ProcessorNode processor : stoppedProcessors) { + for (final ProcessorNode processor : processorsToRestart) { processor.getProcessGroup().startProcessor(processor, false); notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } @@ -1397,7 +1397,8 @@ private void verifyCanInstantiateConnections(final ProcessGroup group, final Set private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, final Map versionedParameterContexts, - final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { + final Map parameterProviderReferences, ProcessGroup topLevelGroup) + throws ProcessorInstantiationException, FlowSynchronizationException { final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); final String connectorId = destination.getConnectorIdentifier().orElse(null); final ProcessGroup group = context.getFlowManager().createProcessGroup(id, connectorId);