1010
1111import java .nio .file .Path ;
1212import java .util .*;
13+ import java .util .concurrent .ConcurrentHashMap ;
1314import java .util .stream .Collectors ;
1415
1516@ Slf4j
@@ -28,6 +29,9 @@ public class PublishManager {
2829 // Used to check if symlink target is already published
2930 private final Map <Path , Set <Path >> publishMap = new HashMap <>();
3031 private final PublishExecHolder execHolder = new PublishExecHolder ();
32+ private final Map <NodeLocation , Integer > currentPublishJobsPerNode = new ConcurrentHashMap <>();
33+ // Maximum number of parallel publish jobs per node
34+ private final int MAX_COPY_PER_NODE = 1 ;
3135
3236 /**
3337 * Maximum number of arguments for the publish command.
@@ -176,6 +180,7 @@ private void publishFiles( List<FileWrapper> items, NodeLocation node ) {
176180 }
177181 execHolder .finishedOnNode ( node );
178182 };
183+ currentPublishJobsPerNode .compute ( node , ( k , v ) -> v == null ? 1 : v + 1 );
179184 final PublishListener publishListener = new PublishListener ( scheduler , name , onFinish );
180185 execHolder .addRunnable ( node , () ->
181186 client .execCommand ( daemonName , scheduler .getNamespace (), command , publishListener )
@@ -215,7 +220,7 @@ private void createSymlinksIntern( final Symlink[] symlinks, final int start, in
215220
216221 String name = "Copying from node: " + node ;
217222 final String daemonName = scheduler .getDaemonNameOnNode ( node );
218- final Runnable onFinish = new InformPublishFinishedRunnable ( execHolder , location );
223+ final Runnable onFinish = new InformPublishFinishedRunnable ( execHolder , location , currentPublishJobsPerNode );
219224 final PublishListener publishListener = new PublishListener ( scheduler , name , onFinish );
220225 execHolder .addRunnable ( location , () ->
221226 client .execCommand ( daemonName , scheduler .getNamespace (), command , publishListener )
@@ -242,7 +247,9 @@ private void publishFirstX( final Map<NodeLocation, LinkedList<FileWrapper>> nod
242247 for ( Map .Entry <NodeLocation , LinkedList <FileWrapper >> entry : nodeItemsMap .entrySet () ) {
243248 final NodeLocation node = entry .getKey ();
244249 LinkedList <FileWrapper > items = entry .getValue ();
245- while ( currentlyCopyingTasksOnNode .getOrDefault ( node , 0 ) < maxCopyPerNode && items != null && !items .isEmpty () ) {
250+ while ( currentPublishJobsPerNode .getOrDefault ( node , 0 ) < MAX_COPY_PER_NODE
251+ && currentlyCopyingTasksOnNode .getOrDefault ( node , 0 ) < maxCopyPerNode
252+ && items != null && !items .isEmpty () ) {
246253 currentlyCopyingTasksOnNode .compute ( node , ( k , v ) -> v == null ? 1 : v + 1 );
247254 publishFiles ( removeUntil ( items , maxSize ), node );
248255 }
0 commit comments