Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ public ComponentAdditions addVersionedComponentsToProcessGroup(final ProcessGrou
final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(),
additions.getParameterContexts(), additions.getParameterProviders(), group);
additionsBuilder.addProcessGroup(newProcessGroup);
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
} catch (final ProcessorInstantiationException | FlowSynchronizationException e) {
throw new RuntimeException(e);
}
});

Expand Down Expand Up @@ -386,8 +386,8 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId());
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(),
parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings());
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
} catch (final ProcessorInstantiationException | FlowSynchronizationException e) {
throw new RuntimeException(e);
}
});

Expand Down Expand Up @@ -416,7 +416,7 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve

private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings)
throws ProcessorInstantiationException {
throws ProcessorInstantiationException, FlowSynchronizationException {

// Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
// transition the service into the RUNNING state, and then we need to update a Connection that is connected to it,
Expand Down Expand Up @@ -691,7 +691,7 @@ private String determineRegistryId(final VersionedFlowCoordinates coordinates) {

private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ProcessGroup> childGroupsByVersionedId, final Map<String, ParameterProviderReference> parameterProviderReferences,
final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException {

for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
Expand Down Expand Up @@ -1193,21 +1193,39 @@ private <C, V extends VersionedComponent> void removeMissingComponents(final Pro

private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ProcessorNode> processorsByVersionedId,
final ProcessGroup topLevelGroup)
throws ProcessorInstantiationException {
throws ProcessorInstantiationException, FlowSynchronizationException {

for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
if (processor == null) {
final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
LOG.info("Added {} to {}", added, group);
} else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
updateProcessor(processor, proposedProcessor, topLevelGroup);
// Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
// so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
LOG.info("Updated {}", processor);
} else {
processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
final Set<ProcessorNode> processorsToRestart = new HashSet<>();

try {
for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
if (processor == null) {
final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
LOG.info("Added {} to {}", added, group);
} else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
try {
final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions);
if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
processorsToRestart.add(processor);
}
} catch (final TimeoutException e) {
throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e);
}
updateProcessor(processor, proposedProcessor, topLevelGroup);
// Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
// so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
LOG.info("Updated {}", processor);
} else {
processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
}
}
} finally {
for (final ProcessorNode processor : processorsToRestart) {
processor.getProcessGroup().startProcessor(processor, false);
notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
}
}
Expand Down Expand Up @@ -1379,7 +1397,8 @@ private void verifyCanInstantiateConnections(final ProcessGroup group, final Set

private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator,
final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup)
throws ProcessorInstantiationException, FlowSynchronizationException {
final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
final String connectorId = destination.getConnectorIdentifier().orElse(null);
final ProcessGroup group = context.getFlowManager().createProcessGroup(id, connectorId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.BackoffMechanism;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
Expand Down Expand Up @@ -577,6 +578,85 @@ public void testTerminateWhenProcessorDoesNotStop() throws FlowSynchronizationEx
verify(processorA, times(1)).setName(versionedProcessor.getName());
}

private ProcessorNode createMappableProcessor(final ProcessGroup processGroup) {
final ProcessorNode processor = createMockProcessor();
when(processor.getProcessGroup()).thenReturn(processGroup);
when(processor.getVersionedComponentId()).thenReturn(Optional.of(UUID.randomUUID().toString()));
when(processor.getBulletinLevel()).thenReturn(LogLevel.WARN);
when(processor.getCanonicalClassName()).thenReturn("org.apache.nifi.processors.Test");
when(processor.getAutoTerminatedRelationships()).thenReturn(Collections.emptySet());
when(processor.getMaxConcurrentTasks()).thenReturn(1);
when(processor.getExecutionNode()).thenReturn(ExecutionNode.ALL);
when(processor.getPenalizationPeriod()).thenReturn("30 sec");
when(processor.getPosition()).thenReturn(new org.apache.nifi.connectable.Position(0, 0));
when(processor.getRunDuration(TimeUnit.MILLISECONDS)).thenReturn(0L);
when(processor.getSchedulingPeriod()).thenReturn("0 sec");
when(processor.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN);
when(processor.getYieldPeriod()).thenReturn("1 sec");
when(processor.getRetryCount()).thenReturn(10);
when(processor.getRetriedRelationships()).thenReturn(Collections.emptySet());
when(processor.getBackoffMechanism()).thenReturn(BackoffMechanism.PENALIZE_FLOWFILE);
when(processor.getMaxBackoffPeriod()).thenReturn("10 mins");
when(processor.getRelationships()).thenReturn(Collections.emptySet());
return processor;
}

@Test
public void testGroupSynchronizeStopsRunningProcessorBeforeUpdate() {
final ProcessGroup processGroup = createMockProcessGroup();
final ProcessorNode runningProcessor = createMappableProcessor(processGroup);
when(runningProcessor.isRunning()).thenReturn(true);
when(runningProcessor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.RUNNING);
when(processGroup.stopProcessor(runningProcessor)).thenReturn(CompletableFuture.completedFuture(null));
when(processGroup.getProcessors()).thenReturn(List.of(runningProcessor));

final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
versionedProcessor.setIdentifier(runningProcessor.getVersionedComponentId().orElse(runningProcessor.getIdentifier()));
versionedProcessor.setProperties(Collections.singletonMap("abc", "updated-value"));
versionedProcessor.setScheduledState(ScheduledState.RUNNING);

final VersionedProcessGroup versionedGroup = new VersionedProcessGroup();
versionedGroup.setIdentifier("pg-v1");
versionedGroup.setProcessors(Set.of(versionedProcessor));

final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
externalFlow.setFlowContents(versionedGroup);

assertDoesNotThrow(() -> synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions));

verify(processGroup, atLeast(1)).stopProcessor(runningProcessor);
verify(runningProcessor).setProperties(eq(Collections.singletonMap("abc", "updated-value")), eq(true), anySet());
verify(processGroup, atLeast(1)).startProcessor(runningProcessor, false);
}

@Test
public void testGroupSynchronizeDoesNotRestartProcessorWhenProposedStateNotRunning() {
final ProcessGroup processGroup = createMockProcessGroup();
final ProcessorNode runningProcessor = createMappableProcessor(processGroup);
when(runningProcessor.isRunning()).thenReturn(true);
when(runningProcessor.getScheduledState()).thenReturn(org.apache.nifi.controller.ScheduledState.RUNNING);
when(processGroup.stopProcessor(runningProcessor)).thenReturn(CompletableFuture.completedFuture(null));
when(processGroup.getProcessors()).thenReturn(List.of(runningProcessor));

final VersionedProcessor versionedProcessor = createMinimalVersionedProcessor();
versionedProcessor.setIdentifier(runningProcessor.getVersionedComponentId().orElse(runningProcessor.getIdentifier()));
versionedProcessor.setProperties(Collections.singletonMap("abc", "updated-value"));
versionedProcessor.setScheduledState(ScheduledState.ENABLED);

final VersionedProcessGroup versionedGroup = new VersionedProcessGroup();
versionedGroup.setIdentifier("pg-v1");
versionedGroup.setProcessors(Set.of(versionedProcessor));

final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
externalFlow.setFlowContents(versionedGroup);

assertDoesNotThrow(() -> synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions));

verify(processGroup, atLeast(1)).stopProcessor(runningProcessor);
verify(runningProcessor).setProperties(eq(Collections.singletonMap("abc", "updated-value")), eq(true), anySet());
verify(processGroup, never()).startProcessor(runningProcessor, false);
}

@Test
public void testUpdateConnectionWithSourceDestStopped() throws FlowSynchronizationException, TimeoutException {
final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
Expand Down
Loading