Skip to content

Commit fa23384

Browse files
itsnothuyitsnothuy
andauthored
[rush-lib] Fix weighted concurrency budget being capped by operation count (#5646)
* [rush-lib] Fix weighted concurrency budget being capped by operation count test(rush-lib): expand weighted concurrency edge case tests test(rush-lib): expand weighted concurrency edge case tests refactor(OperationExecutionManager.test): align comment and naming style with codebase conventions - Replace // ─── section banners and // Test N: numbered headers with brief inline prose comments matching the existing test file style - Replace // WHAT: / // SCENARIO: / // DETERMINISM: structured blocks with concise inline comments - Rename createWeightedOp → createWeightedOperation for consistency with other helper names (createExecutionManager, etc.) - Rename counters.active / counters.peak → counters.concurrentCount / counters.peakConcurrency for self-documenting field names - Extract new AbortController() calls into named abortController variables matching the pattern used throughout the rest of the test file - No logic or assertion changes; all 18 tests continue to pass chore: add rush change file for @microsoft/rush-lib weighted concurrency fix refactor: reduce test comment density to match codebase patterns * chore: add change file for @microsoft/rush --------- Co-authored-by: itsnothuy <[email protected]>
1 parent 59e2010 commit fa23384

File tree

4 files changed

+291
-4
lines changed

4 files changed

+291
-4
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"changes": [
3+
{
4+
"packageName": "@microsoft/rush-lib",
5+
"comment": "Fix weighted concurrency budget being capped by operation count",
6+
"type": "patch"
7+
}
8+
],
9+
"packageName": "@microsoft/rush-lib"
10+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"changes": [
3+
{
4+
"comment": "",
5+
"type": "none",
6+
"packageName": "@microsoft/rush"
7+
}
8+
],
9+
"packageName": "@microsoft/rush",
10+
"email": "[email protected]"
11+
}

libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,13 @@ export class OperationExecutionManager {
259259
this._terminal.writeStdoutLine('');
260260
}
261261

262-
this._terminal.writeStdoutLine(`Executing a maximum of ${this._parallelism} simultaneous processes...`);
263-
264-
const maxParallelism: number = Math.min(totalOperations, this._parallelism);
262+
// For display purposes, cap the reported number of simultaneous processes by the number of operations.
263+
// This avoids confusing messages like "Executing a maximum of 10 simultaneous processes..." when
264+
// there are only 4 operations.
265+
const maxSimultaneousProcesses: number = Math.min(totalOperations, this._parallelism);
266+
this._terminal.writeStdoutLine(
267+
`Executing a maximum of ${maxSimultaneousProcesses} simultaneous processes...`
268+
);
265269

266270
await this._beforeExecuteOperations?.(this._executionRecords);
267271

@@ -309,7 +313,10 @@ export class OperationExecutionManager {
309313
},
310314
{
311315
allowOversubscription: this._allowOversubscription,
312-
concurrency: maxParallelism,
316+
// In weighted mode, concurrency represents the total "unit budget", not the max number of tasks.
317+
// Do not cap by totalOperations, since that would incorrectly shrink the unit budget and
318+
// reduce parallelism for operations with weight > 1.
319+
concurrency: this._parallelism,
313320
weighted: true
314321
}
315322
);

libraries/rush-lib/src/logic/operations/test/OperationExecutionManager.test.ts

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ jest.mock('@rushstack/terminal', () => {
1818

1919
import { Terminal, MockWritable, PrintUtilities } from '@rushstack/terminal';
2020
import { CollatedTerminal } from '@rushstack/stream-collator';
21+
import { Async } from '@rushstack/node-core-library';
2122

2223
import type { IPhase } from '../../../api/CommandLineConfiguration';
2324
import type { RushConfigurationProject } from '../../../api/RushConfigurationProject';
@@ -454,4 +455,262 @@ describe(OperationExecutionManager.name, () => {
454455
expect(mockWritable.getFormattedChunks()).toMatchSnapshot();
455456
});
456457
});
458+
459+
describe('Weighted concurrency', () => {
460+
function createWeightedOperation(
461+
name: string,
462+
weight: number,
463+
counters: { concurrentCount: number; peakConcurrency: number }
464+
): Operation {
465+
const operation: Operation = new Operation({
466+
runner: new MockOperationRunner(name, async (terminal: CollatedTerminal) => {
467+
counters.concurrentCount++;
468+
if (counters.concurrentCount > counters.peakConcurrency) {
469+
counters.peakConcurrency = counters.concurrentCount;
470+
}
471+
await Async.sleepAsync(0);
472+
if (counters.concurrentCount > counters.peakConcurrency) {
473+
counters.peakConcurrency = counters.concurrentCount;
474+
}
475+
counters.concurrentCount--;
476+
return OperationStatus.Success;
477+
}),
478+
phase: mockPhase,
479+
project: getOrCreateProject(name),
480+
logFilenameIdentifier: name
481+
});
482+
operation.weight = weight;
483+
return operation;
484+
}
485+
486+
it('does not cap the unit budget by the number of operations (issue #5607 regression)', async () => {
487+
// Regression test for https://github.com/microsoft/rushstack/issues/5607
488+
// With weighted scheduling, concurrency is a unit budget. The old code passed
489+
// Math.min(totalOperations, parallelism), which shrinks the budget when
490+
// totalOperations < parallelism, causing serialization for weight > 1.
491+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
492+
493+
const opA: Operation = createWeightedOperation('A', 4, counters);
494+
const opB: Operation = createWeightedOperation('B', 4, counters);
495+
const opC: Operation = createWeightedOperation('C', 4, counters);
496+
const opD: Operation = createWeightedOperation('D', 4, counters);
497+
498+
const manager: OperationExecutionManager = new OperationExecutionManager(
499+
new Set([opA, opB, opC, opD]),
500+
{
501+
quietMode: true,
502+
debugMode: false,
503+
parallelism: 10,
504+
allowOversubscription: false,
505+
destination: mockWritable
506+
}
507+
);
508+
509+
const abortController = new AbortController();
510+
const result: IExecutionResult = await manager.executeAsync(abortController);
511+
512+
expect(result.status).toEqual(OperationStatus.Success);
513+
expect(counters.peakConcurrency).toEqual(2);
514+
});
515+
516+
it('clamps weight to budget and completes without deadlock when weight exceeds budget', async () => {
517+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
518+
519+
const opA: Operation = createWeightedOperation('heavy-A', 10, counters);
520+
const opB: Operation = createWeightedOperation('heavy-B', 10, counters);
521+
522+
const manager: OperationExecutionManager = new OperationExecutionManager(
523+
new Set([opA, opB]),
524+
{
525+
quietMode: true,
526+
debugMode: false,
527+
parallelism: 4,
528+
allowOversubscription: false,
529+
destination: mockWritable
530+
}
531+
);
532+
533+
const abortController = new AbortController();
534+
const result: IExecutionResult = await manager.executeAsync(abortController);
535+
536+
expect(result.status).toEqual(OperationStatus.Success);
537+
expect(result.operationResults.get(opA)?.status).toEqual(OperationStatus.Success);
538+
expect(result.operationResults.get(opB)?.status).toEqual(OperationStatus.Success);
539+
expect(counters.peakConcurrency).toEqual(1);
540+
});
541+
542+
it('allows oversubscription when allowOversubscription is true', async () => {
543+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
544+
545+
const opA: Operation = createWeightedOperation('over-A', 7, counters);
546+
const opB: Operation = createWeightedOperation('over-B', 7, counters);
547+
548+
const manager: OperationExecutionManager = new OperationExecutionManager(
549+
new Set([opA, opB]),
550+
{
551+
quietMode: true,
552+
debugMode: false,
553+
parallelism: 10,
554+
allowOversubscription: true,
555+
destination: mockWritable
556+
}
557+
);
558+
559+
const abortController = new AbortController();
560+
const result: IExecutionResult = await manager.executeAsync(abortController);
561+
562+
expect(result.status).toEqual(OperationStatus.Success);
563+
expect(counters.peakConcurrency).toEqual(2);
564+
});
565+
566+
it('does not oversubscribe when allowOversubscription is false', async () => {
567+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
568+
569+
const opA: Operation = createWeightedOperation('strict-A', 7, counters);
570+
const opB: Operation = createWeightedOperation('strict-B', 7, counters);
571+
572+
const manager: OperationExecutionManager = new OperationExecutionManager(
573+
new Set([opA, opB]),
574+
{
575+
quietMode: true,
576+
debugMode: false,
577+
parallelism: 10,
578+
allowOversubscription: false,
579+
destination: mockWritable
580+
}
581+
);
582+
583+
const abortController = new AbortController();
584+
const result: IExecutionResult = await manager.executeAsync(abortController);
585+
586+
expect(result.status).toEqual(OperationStatus.Success);
587+
expect(counters.peakConcurrency).toEqual(1);
588+
});
589+
590+
it('zero-weight operations do not consume budget', async () => {
591+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
592+
593+
const heavyOp: Operation = createWeightedOperation('heavy', 9, counters);
594+
const zeroA: Operation = createWeightedOperation('zero-A', 0, counters);
595+
const zeroB: Operation = createWeightedOperation('zero-B', 0, counters);
596+
const zeroC: Operation = createWeightedOperation('zero-C', 0, counters);
597+
598+
const manager: OperationExecutionManager = new OperationExecutionManager(
599+
new Set([heavyOp, zeroA, zeroB, zeroC]),
600+
{
601+
quietMode: true,
602+
debugMode: false,
603+
parallelism: 10,
604+
allowOversubscription: false,
605+
destination: mockWritable
606+
}
607+
);
608+
609+
const abortController = new AbortController();
610+
const result: IExecutionResult = await manager.executeAsync(abortController);
611+
612+
expect(result.status).toEqual(OperationStatus.Success);
613+
expect(counters.peakConcurrency).toBeGreaterThanOrEqual(2);
614+
});
615+
616+
it('mixed weights respect the unit budget correctly', async () => {
617+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
618+
619+
const opA: Operation = createWeightedOperation('mix-A', 5, counters);
620+
const opB: Operation = createWeightedOperation('mix-B', 5, counters);
621+
const opC: Operation = createWeightedOperation('mix-C', 3, counters);
622+
const opD: Operation = createWeightedOperation('mix-D', 3, counters);
623+
624+
const manager: OperationExecutionManager = new OperationExecutionManager(
625+
new Set([opA, opB, opC, opD]),
626+
{
627+
quietMode: true,
628+
debugMode: false,
629+
parallelism: 10,
630+
allowOversubscription: false,
631+
destination: mockWritable
632+
}
633+
);
634+
635+
const abortController = new AbortController();
636+
const result: IExecutionResult = await manager.executeAsync(abortController);
637+
638+
expect(result.status).toEqual(OperationStatus.Success);
639+
for (const [, opResult] of result.operationResults) {
640+
expect(opResult.status).toEqual(OperationStatus.Success);
641+
}
642+
expect(counters.peakConcurrency).toBeGreaterThanOrEqual(2);
643+
expect(counters.peakConcurrency).toBeLessThanOrEqual(3);
644+
});
645+
646+
it('weight=1 operations behave identically to unweighted scheduling', async () => {
647+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
648+
649+
const ops: Operation[] = [];
650+
for (let i = 0; i < 5; i++) {
651+
ops.push(createWeightedOperation(`unit-${i}`, 1, counters));
652+
}
653+
654+
const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), {
655+
quietMode: true,
656+
debugMode: false,
657+
parallelism: 3,
658+
allowOversubscription: false,
659+
destination: mockWritable
660+
});
661+
662+
const abortController = new AbortController();
663+
const result: IExecutionResult = await manager.executeAsync(abortController);
664+
665+
expect(result.status).toEqual(OperationStatus.Success);
666+
expect(counters.peakConcurrency).toEqual(3);
667+
});
668+
669+
it('displays the capped process count when parallelism exceeds operation count', async () => {
670+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
671+
672+
const ops: Operation[] = [];
673+
for (let i = 0; i < 4; i++) {
674+
ops.push(createWeightedOperation(`log-${i}`, 4, counters));
675+
}
676+
677+
const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), {
678+
quietMode: false,
679+
debugMode: false,
680+
parallelism: 10,
681+
allowOversubscription: false,
682+
destination: mockWritable
683+
});
684+
685+
const abortController = new AbortController();
686+
await manager.executeAsync(abortController);
687+
688+
const allOutput: string = mockWritable.getAllOutput();
689+
expect(allOutput).toContain('Executing a maximum of 4 simultaneous processes...');
690+
expect(allOutput).not.toContain('Executing a maximum of 10 simultaneous processes...');
691+
});
692+
693+
it('displays parallelism when it is less than operation count', async () => {
694+
const counters = { concurrentCount: 0, peakConcurrency: 0 };
695+
696+
const ops: Operation[] = [];
697+
for (let i = 0; i < 10; i++) {
698+
ops.push(createWeightedOperation(`many-${i}`, 1, counters));
699+
}
700+
701+
const manager: OperationExecutionManager = new OperationExecutionManager(new Set(ops), {
702+
quietMode: false,
703+
debugMode: false,
704+
parallelism: 3,
705+
allowOversubscription: false,
706+
destination: mockWritable
707+
});
708+
709+
const abortController = new AbortController();
710+
await manager.executeAsync(abortController);
711+
712+
const allOutput: string = mockWritable.getAllOutput();
713+
expect(allOutput).toContain('Executing a maximum of 3 simultaneous processes...');
714+
});
715+
});
457716
});

0 commit comments

Comments
 (0)