Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
From 3893ce8d059f7d9365ba48b3f5e4f898701b71e8 Mon Sep 17 00:00:00 2001
From: Andrew Kenworthy <andrew.kenworthy@stackable.tech>
Date: Thu, 30 Apr 2026 18:05:11 +0200
Subject: check services are disabled before calling updateControllerService

---
...tandardVersionedComponentSynchronizer.java | 54 ++++++++++++++-----
1 file changed, 40 insertions(+), 14 deletions(-)

diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index b79ee4d6e8..d5b4d8c36e 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(),
additions.getParameterContexts(), additions.getParameterProviders(), group);
additionsBuilder.addProcessGroup(newProcessGroup);
- } catch (final ProcessorInstantiationException pie) {
- throw new RuntimeException(pie);
+ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) {
+ throw new RuntimeException(e);
}
});

@@ -392,8 +392,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId());
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(),
parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings());
- } catch (final ProcessorInstantiationException pie) {
- throw new RuntimeException(pie);
+ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) {
+ throw new RuntimeException(e);
}
});

@@ -422,7 +422,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen

private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings)
- throws ProcessorInstantiationException {
+ throws ProcessorInstantiationException, FlowSynchronizationException {

// Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
// transition the service into the RUNNING state, and then we need to update a Connection that is connected to it,
@@ -687,7 +687,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen

private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ProcessGroup> childGroupsByVersionedId, final Map<String, ParameterProviderReference> parameterProviderReferences,
- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
+ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException {

for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}

private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> servicesByVersionedId,
- final ProcessGroup topLevelGroup) {
+ final ProcessGroup topLevelGroup) throws FlowSynchronizationException {
// Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
// Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
// Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
@@ -741,17 +741,43 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
updateControllerService(addedService, proposedService, topLevelGroup);
}

- // Update all of the Controller Services to match the VersionedControllerService
+ // Update all Controller Services to match the VersionedControllerService.
+ // Services may be ENABLED here if the outer "affected components" pass did not
+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by AffectedComponentSet).
+ // We must disable before calling updateControllerService, which calls setProperties
+ // which calls verifyModifiable and throws IllegalStateException on ENABLED services.
+ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
final ControllerServiceNode service = entry.getKey();
final VersionedControllerService proposedService = entry.getValue();

if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
- updateControllerService(service, proposedService, topLevelGroup);
- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
- LOG.info("Updated {}", service);
+ final Set<ComponentNode> referencesToRestart = new HashSet<>();
+ final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();
+
+ try {
+ try {
+ stopControllerService(service, proposedService, stopTimeout,
+ syncOptions.getComponentStopTimeoutAction(),
+ referencesToRestart, servicesToRestart, syncOptions);
+ } catch (final TimeoutException e) {
+ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e);
+ }
+ updateControllerService(service, proposedService, topLevelGroup);
+ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
+ LOG.info("Updated {}", service);
+ } finally {
+ // Re-enable services and restart components that were stopped for the update,
+ // restoring the controller to its pre-update running state.
+ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) {
+ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart);
+ context.getControllerServiceProvider().scheduleReferencingComponents(
+ service, referencesToRestart, context.getComponentScheduler());
+ }
+ }
}
}

@@ -1375,7 +1401,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen

private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator,
final Map<String, VersionedParameterContext> versionedParameterContexts,
- final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
+ final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException {
final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
final ProcessGroup group = context.getFlowManager().createProcessGroup(id);
group.setVersionedComponentId(proposed.getIdentifier());
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
From 8f7007ac0ac2f83e27666d865727335156b5053b Mon Sep 17 00:00:00 2001
From: Andrew Kenworthy <andrew.kenworthy@stackable.tech>
Date: Wed, 29 Apr 2026 17:32:02 +0200
Subject: check services are disabled before calling updateControllerService

---
...tandardVersionedComponentSynchronizer.java | 54 ++++++++++++++-----
1 file changed, 40 insertions(+), 14 deletions(-)

diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 092d2f7e7b..7cbcc8c6dc 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -270,8 +270,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(),
additions.getParameterContexts(), additions.getParameterProviders(), group);
additionsBuilder.addProcessGroup(newProcessGroup);
- } catch (final ProcessorInstantiationException pie) {
- throw new RuntimeException(pie);
+ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) {
+ throw new RuntimeException(e);
}
});

@@ -386,8 +386,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId());
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(),
parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings());
- } catch (final ProcessorInstantiationException pie) {
- throw new RuntimeException(pie);
+ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) {
+ throw new RuntimeException(e);
}
});

@@ -416,7 +416,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen

private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ParameterProviderReference> parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings)
- throws ProcessorInstantiationException {
+ throws ProcessorInstantiationException, FlowSynchronizationException {

// Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
// transition the service into the RUNNING state, and then we need to update a Connection that is connected to it,
@@ -691,7 +691,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen

private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
final Map<String, ProcessGroup> childGroupsByVersionedId, final Map<String, ParameterProviderReference> parameterProviderReferences,
- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
+ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException {

for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
@@ -711,7 +711,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}

private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> servicesByVersionedId,
- final ProcessGroup topLevelGroup) {
+ final ProcessGroup topLevelGroup) throws FlowSynchronizationException {
// Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
// Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
// Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
@@ -745,17 +745,43 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
updateControllerService(addedService, proposedService, topLevelGroup);
}

- // Update all of the Controller Services to match the VersionedControllerService
+ // Update all Controller Services to match the VersionedControllerService.
+ // Services may be ENABLED here if the outer "affected components" pass did not
+ // disable them (e.g. COMPONENT_ADDED diffs are skipped by AffectedComponentSet).
+ // We must disable before calling updateControllerService, which calls setProperties
+ // which calls verifyModifiable and throws IllegalStateException on ENABLED services.
+ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
final ControllerServiceNode service = entry.getKey();
final VersionedControllerService proposedService = entry.getValue();

if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
- updateControllerService(service, proposedService, topLevelGroup);
- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
- LOG.info("Updated {}", service);
+ final Set<ComponentNode> referencesToRestart = new HashSet<>();
+ final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();
+
+ try {
+ try {
+ stopControllerService(service, proposedService, stopTimeout,
+ syncOptions.getComponentStopTimeoutAction(),
+ referencesToRestart, servicesToRestart, syncOptions);
+ } catch (final TimeoutException e) {
+ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e);
+ }
+ updateControllerService(service, proposedService, topLevelGroup);
+ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
+ LOG.info("Updated {}", service);
+ } finally {
+ // Re-enable services and restart components that were stopped for the update,
+ // restoring the controller to its pre-update running state.
+ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) {
+ context.getControllerServiceProvider().enableControllerServicesAsync(servicesToRestart);
+ context.getControllerServiceProvider().scheduleReferencingComponents(
+ service, referencesToRestart, context.getComponentScheduler());
+ }
+ }
}
}

@@ -1379,7 +1405,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen

private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator,
final Map<String, VersionedParameterContext> versionedParameterContexts,
- final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
+ final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException {
final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
final String connectorId = destination.getConnectorIdentifier().orElse(null);
final ProcessGroup group = context.getFlowManager().createProcessGroup(id, connectorId);
Loading