diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 092d2f7e7bb9..a9a8eb70a953 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -270,8 +270,8 @@ public ComponentAdditions addVersionedComponentsToProcessGroup(final ProcessGrou final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(), additions.getParameterContexts(), additions.getParameterProviders(), group); additionsBuilder.addProcessGroup(newProcessGroup); - } catch (final ProcessorInstantiationException pie) { - throw new RuntimeException(pie); + } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { + throw new RuntimeException(e); } }); @@ -386,8 +386,8 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); - } catch (final ProcessorInstantiationException pie) { - throw new RuntimeException(pie); + } catch (final ProcessorInstantiationException | FlowSynchronizationException e) { + throw new RuntimeException(e); } }); @@ -416,7 +416,7 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, final Map parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings) - throws ProcessorInstantiationException { + throws ProcessorInstantiationException, FlowSynchronizationException { // Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we // transition the service into the RUNNING state, and then we need to update a Connection that is connected to it, @@ -691,7 +691,7 @@ private String determineRegistryId(final VersionedFlowCoordinates coordinates) { private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map versionedParameterContexts, final Map childGroupsByVersionedId, final Map parameterProviderReferences, - final ProcessGroup topLevelGroup) throws ProcessorInstantiationException { + final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException { for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) { final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier()); @@ -1193,21 +1193,39 @@ private void removeMissingComponents(final Pro private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map processorsByVersionedId, final ProcessGroup topLevelGroup) - throws ProcessorInstantiationException { + throws ProcessorInstantiationException, FlowSynchronizationException { - for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { - final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); - if (processor == null) { - final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); - LOG.info("Added {} to {}", added, group); - } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { - updateProcessor(processor, proposedProcessor, topLevelGroup); - // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, - // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state - createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); - LOG.info("Updated {}", processor); - } else { - processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); + final Set processorsToRestart = new HashSet<>(); + + try { + for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) { + final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier()); + if (processor == null) { + final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup); + LOG.info("Added {} to {}", added, group); + } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) { + final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis(); + try { + final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions); + if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { + processorsToRestart.add(processor); + } + } catch (final TimeoutException e) { + throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e); + } + updateProcessor(processor, proposedProcessor, topLevelGroup); + // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state, + // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state + createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor))); + LOG.info("Updated {}", processor); + } else { + processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY())); + } + } + } finally { + for (final ProcessorNode processor : processorsToRestart) { + processor.getProcessGroup().startProcessor(processor, false); + notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } } } @@ -1379,7 +1397,8 @@ private void verifyCanInstantiateConnections(final ProcessGroup group, final Set private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator, final Map versionedParameterContexts, - final Map parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException { + final Map parameterProviderReferences, ProcessGroup topLevelGroup) + throws ProcessorInstantiationException, FlowSynchronizationException { final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier()); final String connectorId = destination.getConnectorIdentifier().orElse(null); final ProcessGroup group = context.getFlowManager().createProcessGroup(id, connectorId); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java index 1dd5bd17f7f1..d07309b333f1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java @@ -25,6 +25,7 @@ import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.BackoffMechanism; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceInitializationContext; @@ -577,6 +578,85 @@ public void testTerminateWhenProcessorDoesNotStop() throws FlowSynchronizationEx verify(processorA, times(1)).setName(versionedProcessor.getName()); } + private ProcessorNode createMappableProcessor(final ProcessGroup processGroup) { + final ProcessorNode processor = createMockProcessor(); + when(processor.getProcessGroup()).thenReturn(processGroup); + when(processor.getVersionedComponentId()).thenReturn(Optional.of(UUID.randomUUID().toString())); + when(processor.getBulletinLevel()).thenReturn(LogLevel.WARN); + when(processor.getCanonicalClassName()).thenReturn("org.apache.nifi.processors.Test"); + when(processor.getAutoTerminatedRelationships()).thenReturn(Collections.emptySet()); + when(processor.getMaxConcurrentTasks()).thenReturn(1); + when(processor.getExecutionNode()).thenReturn(ExecutionNode.ALL); + when(processor.getPenalizationPeriod()).thenReturn("30 sec"); + when(processor.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0)); + when(processor.getRunDuration(TimeUnit.MILLISECONDS)).thenReturn(0L); + when(processor.getSchedulingPeriod()).thenReturn("0 sec"); + when(processor.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN); + when(processor.getYieldPeriod()).thenReturn("1 sec"); + when(processor.getRetryCount()).thenReturn(10); + when(processor.getRetriedRelationships()).thenReturn(Collections.emptySet()); + when(processor.getBackoffMechanism()).thenReturn(BackoffMechanism.PENALIZE_FLOWFILE); + when(processor.getMaxBackoffPeriod()).thenReturn("10 mins"); + when(processor.getRelationships()).thenReturn(Collections.emptySet()); + return processor; + } + + @Test + public void testGroupSynchronizeStopsRunningProcessorBeforeUpdate() { + final ProcessGroup processGroup = createMockProcessGroup(); + final ProcessorNode runningProcessor = createMappableProcessor(processGroup); + when(runningProcessor.isRunning()).thenReturn(true); + when(runningProcessor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.RUNNING); + when(processGroup.stopProcessor(runningProcessor)).thenReturn(CompletableFuture.completedFuture(null)); + when(processGroup.getProcessors()).thenReturn(List.of(runningProcessor)); + + final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); + versionedProcessor.setIdentifier(runningProcessor.getVersionedComponentId().orElse(runningProcessor.getIdentifier())); + versionedProcessor.setProperties(Collections.singletonMap("abc", "updated-value")); + versionedProcessor.setScheduledState(ScheduledState.RUNNING); + + final VersionedProcessGroup versionedGroup = new VersionedProcessGroup(); + versionedGroup.setIdentifier("pg-v1"); + versionedGroup.setProcessors(Set.of(versionedProcessor)); + + final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); + externalFlow.setFlowContents(versionedGroup); + + assertDoesNotThrow(() -> synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions)); + + verify(processGroup, atLeast(1)).stopProcessor(runningProcessor); + verify(runningProcessor).setProperties(eq(Collections.singletonMap("abc", "updated-value")), eq(true), anySet()); + verify(processGroup, atLeast(1)).startProcessor(runningProcessor, false); + } + + @Test + public void testGroupSynchronizeDoesNotRestartProcessorWhenProposedStateNotRunning() { + final ProcessGroup processGroup = createMockProcessGroup(); + final ProcessorNode runningProcessor = createMappableProcessor(processGroup); + when(runningProcessor.isRunning()).thenReturn(true); + when(runningProcessor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.RUNNING); + when(processGroup.stopProcessor(runningProcessor)).thenReturn(CompletableFuture.completedFuture(null)); + when(processGroup.getProcessors()).thenReturn(List.of(runningProcessor)); + + final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor(); + versionedProcessor.setIdentifier(runningProcessor.getVersionedComponentId().orElse(runningProcessor.getIdentifier())); + versionedProcessor.setProperties(Collections.singletonMap("abc", "updated-value")); + versionedProcessor.setScheduledState(ScheduledState.ENABLED); + + final VersionedProcessGroup versionedGroup = new VersionedProcessGroup(); + versionedGroup.setIdentifier("pg-v1"); + versionedGroup.setProcessors(Set.of(versionedProcessor)); + + final VersionedExternalFlow externalFlow = new VersionedExternalFlow(); + externalFlow.setFlowContents(versionedGroup); + + assertDoesNotThrow(() -> synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions)); + + verify(processGroup, atLeast(1)).stopProcessor(runningProcessor); + verify(runningProcessor).setProperties(eq(Collections.singletonMap("abc", "updated-value")), eq(true), anySet()); + verify(processGroup, never()).startProcessor(runningProcessor, false); + } + @Test public void testUpdateConnectionWithSourceDestStopped() throws FlowSynchronizationException, TimeoutException { final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);