Skip to content

Commit d386883

Browse files
authored
NIFI-15495 Restart Connectors that reference assets that were synchro… (#10806)
* NIFI-15495 Restart Connectors that reference assets that were synchronized - Ensure Connectors re-resolve property values before starting - Ensure asset clean up happens only after applyUpdate fully finishes - Add connector asset properties to default nifi.properties * Fix system test * Encapsulate restart logic in new method on ConnectorNode * Fix JavaDoc
1 parent cb479b5 commit d386883

File tree

16 files changed

+214
-60
lines changed

16 files changed

+214
-60
lines changed

nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
public class MockConnectorAssetManager implements AssetManager {
4242

4343
private static final String ASSET_STORAGE_LOCATION_PROPERTY = "directory";
44-
private static final String DEFAULT_ASSET_STORAGE_LOCATION = "target/mock-connector-assets";
44+
private static final String DEFAULT_ASSET_STORAGE_LOCATION = "target/mock_connector_assets";
4545

4646
private final Map<String, Asset> assets = new ConcurrentHashMap<>();
4747
private volatile File assetStorageLocation;

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class StandardConnectorAssetManager implements AssetManager {
4646
private static final Logger logger = LoggerFactory.getLogger(StandardConnectorAssetManager.class);
4747

4848
public static final String ASSET_STORAGE_LOCATION_PROPERTY = "directory";
49-
public static final String DEFAULT_ASSET_STORAGE_LOCATION = "./connector-assets";
49+
public static final String DEFAULT_ASSET_STORAGE_LOCATION = "./connector_assets";
5050

5151
private volatile File assetStorageLocation;
5252
private final Map<String, Asset> assets = new ConcurrentHashMap<>();

nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,23 @@ public interface ConnectorRepository {
7676
*/
7777
Future<Void> stopConnector(ConnectorNode connector);
7878

79+
/**
80+
* Restarts the given Connector, managing any appropriate lifecycle events.
81+
*
82+
* @param connector the Connector to restart
83+
* @return a CompletableFuture that will be completed when the Connector has restarted
84+
*/
85+
Future<Void> restartConnector(ConnectorNode connector);
86+
7987
void configureConnector(ConnectorNode connector, String stepName, StepConfiguration configuration) throws FlowUpdateException;
8088

8189
void applyUpdate(ConnectorNode connector, ConnectorUpdateContext context) throws FlowUpdateException;
8290

8391
void inheritConfiguration(ConnectorNode connector, List<VersionedConfigurationStep> activeFlowConfiguration,
8492
List<VersionedConfigurationStep> workingFlowConfiguration, Bundle flowContextBundle) throws FlowUpdateException;
8593

94+
void discardWorkingConfiguration(ConnectorNode connector);
95+
8696
SecretsManager getSecretsManager();
8797

8898
/**

nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ public interface MutableConnectorConfigurationContext extends ConnectorConfigura
4141
*/
4242
ConfigurationUpdateResult replaceProperties(String stepName, StepConfiguration configuration);
4343

44+
/**
45+
* Resolves all existing property values.
46+
*/
47+
void resolvePropertyValues();
48+
4449
/**
4550
* Converts this mutable configuration context to an immutable ConnectorConfiguration.
4651
* @return the ConnectorConfiguration

nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,9 @@
324324
<exclude>src/test/resources/old-swap-file.swap</exclude>
325325
<exclude>src/test/resources/xxe_template.xml</exclude>
326326
<exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
327+
<exclude>src/test/resources/colors.txt</exclude>
328+
<exclude>src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md</exclude>
329+
<exclude>src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md</exclude>
327330
</excludes>
328331
</configuration>
329332
</plugin>

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
2323
import org.apache.nifi.cluster.protocol.NodeIdentifier;
2424
import org.apache.nifi.components.connector.ConnectorNode;
25+
import org.apache.nifi.components.connector.ConnectorRepository;
26+
import org.apache.nifi.components.connector.ConnectorState;
2527
import org.apache.nifi.controller.FlowController;
2628
import org.apache.nifi.controller.flow.FlowManager;
2729
import org.apache.nifi.util.NiFiProperties;
@@ -37,8 +39,12 @@
3739
import java.time.Duration;
3840
import java.time.Instant;
3941
import java.util.Collection;
42+
import java.util.HashSet;
4043
import java.util.List;
4144
import java.util.Map;
45+
import java.util.Set;
46+
import java.util.concurrent.ExecutionException;
47+
import java.util.concurrent.Future;
4248
import java.util.function.Function;
4349
import java.util.stream.Collectors;
4450

@@ -55,6 +61,7 @@ public class StandardConnectorAssetSynchronizer implements AssetSynchronizer {
5561

5662
private final AssetManager assetManager;
5763
private final FlowManager flowManager;
64+
private final ConnectorRepository connectorRepository;
5865
private final ClusterCoordinator clusterCoordinator;
5966
private final WebClientService webClientService;
6067
private final NiFiProperties properties;
@@ -65,6 +72,7 @@ public StandardConnectorAssetSynchronizer(final FlowController flowController,
6572
final NiFiProperties properties) {
6673
this.assetManager = flowController.getConnectorAssetManager();
6774
this.flowManager = flowController.getFlowManager();
75+
this.connectorRepository = flowController.getConnectorRepository();
6876
this.clusterCoordinator = clusterCoordinator;
6977
this.webClientService = webClientService;
7078
this.properties = properties;
@@ -96,61 +104,96 @@ public void synchronize() {
96104
final List<ConnectorNode> connectors = flowManager.getAllConnectors();
97105
logger.info("Found {} connectors for synchronizing assets", connectors.size());
98106

107+
final Set<ConnectorNode> connectorsWithSynchronizedAssets = new HashSet<>();
99108
for (final ConnectorNode connector : connectors) {
100109
try {
101-
synchronize(assetsRestApiClient, connector);
110+
final boolean assetSynchronized = synchronize(assetsRestApiClient, connector);
111+
if (assetSynchronized) {
112+
connectorsWithSynchronizedAssets.add(connector);
113+
}
102114
} catch (final Exception e) {
103115
logger.error("Failed to synchronize assets for connector [{}]", connector.getIdentifier(), e);
104116
}
105117
}
118+
119+
restartConnectorsWithSynchronizedAssets(connectorsWithSynchronizedAssets);
120+
}
121+
122+
private void restartConnectorsWithSynchronizedAssets(final Set<ConnectorNode> connectorsWithSynchronizedAssets) {
123+
for (final ConnectorNode connector : connectorsWithSynchronizedAssets) {
124+
final ConnectorState currentState = connector.getDesiredState();
125+
if (currentState == ConnectorState.RUNNING) {
126+
logger.info("Restarting connector [{}] after asset synchronization", connector.getIdentifier());
127+
try {
128+
final Future<Void> restartFuture = connectorRepository.restartConnector(connector);
129+
restartFuture.get();
130+
logger.info("Successfully restarted connector [{}] after asset synchronization", connector.getIdentifier());
131+
} catch (final InterruptedException e) {
132+
Thread.currentThread().interrupt();
133+
logger.error("Interrupted while restarting connector [{}] after asset synchronization", connector.getIdentifier(), e);
134+
} catch (final ExecutionException e) {
135+
logger.error("Failed to restart connector [{}] after asset synchronization", connector.getIdentifier(), e.getCause());
136+
}
137+
} else {
138+
logger.info("Connector [{}] is not running (state={}): skipping restart after asset synchronization", connector.getIdentifier(), currentState);
139+
}
140+
}
106141
}
107142

108-
private void synchronize(final AssetsRestApiClient assetsRestApiClient, final ConnectorNode connector) {
143+
private boolean synchronize(final AssetsRestApiClient assetsRestApiClient, final ConnectorNode connector) {
109144
final String connectorId = connector.getIdentifier();
110145
final Map<String, Asset> existingAssets = assetManager.getAssets(connectorId).stream()
111146
.collect(Collectors.toMap(Asset::getIdentifier, Function.identity()));
112147

113148
final AssetsEntity coordinatorAssetsEntity = listConnectorAssetsWithRetry(assetsRestApiClient, connectorId);
114149
if (coordinatorAssetsEntity == null) {
115150
logger.error("Timeout listing assets from cluster coordinator for connector [{}]", connectorId);
116-
return;
151+
return false;
117152
}
118153

119154
final Collection<AssetEntity> coordinatorAssets = coordinatorAssetsEntity.getAssets();
120155
if (coordinatorAssets == null || coordinatorAssets.isEmpty()) {
121156
logger.info("Connector [{}] did not return any assets from the cluster coordinator", connectorId);
122-
return;
157+
return false;
123158
}
124159

125160
logger.info("Connector [{}] returned {} assets from the cluster coordinator", connectorId, coordinatorAssets.size());
126161

162+
boolean assetSynchronized = false;
127163
for (final AssetEntity coordinatorAssetEntity : coordinatorAssets) {
128164
final AssetDTO coordinatorAsset = coordinatorAssetEntity.getAsset();
129165
final Asset matchingAsset = existingAssets.get(coordinatorAsset.getId());
130166
try {
131-
synchronize(assetsRestApiClient, connectorId, coordinatorAsset, matchingAsset);
167+
final boolean assetWasSynchronized = synchronize(assetsRestApiClient, connectorId, coordinatorAsset, matchingAsset);
168+
if (assetWasSynchronized) {
169+
assetSynchronized = true;
170+
}
132171
} catch (final Exception e) {
133172
logger.error("Failed to synchronize asset [id={},name={}] for connector [{}]",
134173
coordinatorAsset.getId(), coordinatorAsset.getName(), connectorId, e);
135174
}
136175
}
176+
return assetSynchronized;
137177
}
138178

139-
private void synchronize(final AssetsRestApiClient assetsRestApiClient, final String connectorId, final AssetDTO coordinatorAsset, final Asset matchingAsset) {
179+
private boolean synchronize(final AssetsRestApiClient assetsRestApiClient, final String connectorId, final AssetDTO coordinatorAsset, final Asset matchingAsset) {
140180
final String assetId = coordinatorAsset.getId();
141181
final String assetName = coordinatorAsset.getName();
142182
if (matchingAsset == null || !matchingAsset.getFile().exists()) {
143183
logger.info("Synchronizing missing asset [id={},name={}] for connector [{}]", assetId, assetName, connectorId);
144184
synchronizeConnectorAssetWithRetry(assetsRestApiClient, connectorId, coordinatorAsset);
185+
return true;
145186
} else {
146187
final String coordinatorAssetDigest = coordinatorAsset.getDigest();
147188
final String matchingAssetDigest = matchingAsset.getDigest().orElse(null);
148189
if (!coordinatorAssetDigest.equals(matchingAssetDigest)) {
149190
logger.info("Synchronizing asset [id={},name={}] with updated digest [{}] for connector [{}]",
150191
assetId, assetName, coordinatorAssetDigest, connectorId);
151192
synchronizeConnectorAssetWithRetry(assetsRestApiClient, connectorId, coordinatorAsset);
193+
return true;
152194
} else {
153195
logger.info("Coordinator asset [id={},name={}] found for connector [{}]: retrieval not required", assetId, assetName, connectorId);
196+
return false;
154197
}
155198
}
156199
}

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.nifi.asset.AssetManager;
2222
import org.apache.nifi.components.connector.secrets.SecretProvider;
2323
import org.apache.nifi.components.connector.secrets.SecretsManager;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426

2527
import java.io.File;
2628
import java.io.IOException;
@@ -37,6 +39,8 @@
3739
import java.util.concurrent.locks.ReentrantReadWriteLock;
3840

3941
public class StandardConnectorConfigurationContext implements MutableConnectorConfigurationContext, Cloneable {
42+
private static final Logger logger = LoggerFactory.getLogger(StandardConnectorConfigurationContext.class);
43+
4044
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
4145
private final Lock readLock = rwLock.readLock();
4246
private final Lock writeLock = rwLock.writeLock();
@@ -205,6 +209,7 @@ private StringLiteralValue resolveAssetReferences(final AssetReference assetRefe
205209
.ifPresent(resolvedAssetValues::add);
206210
}
207211

212+
logger.debug("Resolved {} to {}", assetReference, resolvedAssetValues);
208213
return new StringLiteralValue(String.join(",", resolvedAssetValues));
209214
}
210215

@@ -265,6 +270,23 @@ public ConfigurationUpdateResult replaceProperties(final String stepName, final
265270
}
266271
}
267272

273+
@Override
274+
public void resolvePropertyValues() {
275+
writeLock.lock();
276+
try {
277+
for (final Map.Entry<String, StepConfiguration> entry : propertyConfigurations.entrySet()) {
278+
final String stepName = entry.getKey();
279+
final StepConfiguration stepConfig = entry.getValue();
280+
final Map<String, ConnectorValueReference> stepProperties = stepConfig.getPropertyValues();
281+
282+
final StepConfiguration resolvedConfig = resolvePropertyValues(stepProperties);
283+
this.resolvedPropertyConfigurations.put(stepName, resolvedConfig);
284+
}
285+
} finally {
286+
writeLock.unlock();
287+
}
288+
}
289+
268290
@Override
269291
public ConnectorConfiguration toConnectorConfiguration() {
270292
readLock.lock();

nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,11 +373,12 @@ public Future<Void> start(final FlowEngine scheduler) {
373373

374374
private void start(final FlowEngine scheduler, final CompletableFuture<Void> startCompleteFuture) {
375375
try {
376+
stateTransition.setDesiredState(ConnectorState.RUNNING);
377+
activeFlowContext.getConfigurationContext().resolvePropertyValues();
378+
376379
verifyCanStart();
377380

378-
stateTransition.setDesiredState(ConnectorState.RUNNING);
379381
final ConnectorState currentState = getCurrentState();
380-
381382
switch (currentState) {
382383
case STARTING -> {
383384
logger.debug("{} is already starting; adding future to pending start futures", this);
@@ -862,6 +863,8 @@ private String resolveAssetReferences(final AssetReference assetReference) {
862863
.map(File::getAbsolutePath)
863864
.ifPresent(resolvedAssetValues::add);
864865
}
866+
867+
logger.debug("Resolved {} to {} for {}", assetReference, resolvedAssetValues, this);
865868
return String.join(",", resolvedAssetValues);
866869
}
867870

0 commit comments

Comments
 (0)