Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/nx/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
"string-width": "^4.2.3",
"tar-stream": "~2.2.0",
"tmp": "~0.2.1",
"tree-kill": "^1.2.2",
"tsconfig-paths": "catalog:typescript",
"tslib": "catalog:typescript",
"yaml": "^2.6.0",
Expand Down
53 changes: 31 additions & 22 deletions packages/nx/src/executors/run-commands/run-commands.impl.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ import { readFileSync, writeFileSync } from 'fs';
import { env } from 'npm-run-path';
import { relative } from 'path';
import { dirSync, fileSync } from 'tmp';
import runCommands, {
interpolateArgsIntoCommand,
LARGE_BUFFER,
} from './run-commands.impl';
import runCommands, { interpolateArgsIntoCommand } from './run-commands.impl';

function normalize(p: string) {
return p.startsWith('/private') ? p.substring(8) : p;
Expand Down Expand Up @@ -588,7 +585,7 @@ describe('Run Commands', () => {

describe('--color', () => {
it('should not set FORCE_COLOR=true', async () => {
const exec = jest.spyOn(require('child_process'), 'exec');
const spawnSpy = jest.spyOn(require('child_process'), 'spawn');
await runCommands(
{
commands: [`echo 'Hello World'`, `echo 'Hello Universe'`],
Expand All @@ -598,27 +595,31 @@ describe('Run Commands', () => {
context
);

expect(exec).toHaveBeenCalledTimes(2);
expect(exec).toHaveBeenNthCalledWith(1, `echo 'Hello World'`, {
maxBuffer: LARGE_BUFFER,
expect(spawnSpy).toHaveBeenCalledTimes(2);
expect(spawnSpy).toHaveBeenNthCalledWith(1, `echo 'Hello World'`, [], {
shell: true,
detached: process.platform !== 'win32',
env: {
...process.env,
...env(),
},
windowsHide: false,
stdio: ['inherit', 'pipe', 'pipe'],
});
expect(exec).toHaveBeenNthCalledWith(2, `echo 'Hello Universe'`, {
maxBuffer: LARGE_BUFFER,
expect(spawnSpy).toHaveBeenNthCalledWith(2, `echo 'Hello Universe'`, [], {
shell: true,
detached: process.platform !== 'win32',
env: {
...process.env,
...env(),
},
windowsHide: false,
stdio: ['inherit', 'pipe', 'pipe'],
});
});

it('should not set FORCE_COLOR=true when --no-color is passed', async () => {
const exec = jest.spyOn(require('child_process'), 'exec');
const spawnSpy = jest.spyOn(require('child_process'), 'spawn');
await runCommands(
{
commands: [`echo 'Hello World'`, `echo 'Hello Universe'`],
Expand All @@ -629,27 +630,31 @@ describe('Run Commands', () => {
context
);

expect(exec).toHaveBeenCalledTimes(2);
expect(exec).toHaveBeenNthCalledWith(1, `echo 'Hello World'`, {
maxBuffer: LARGE_BUFFER,
expect(spawnSpy).toHaveBeenCalledTimes(2);
expect(spawnSpy).toHaveBeenNthCalledWith(1, `echo 'Hello World'`, [], {
shell: true,
detached: process.platform !== 'win32',
env: {
...process.env,
...env(),
},
windowsHide: false,
stdio: ['inherit', 'pipe', 'pipe'],
});
expect(exec).toHaveBeenNthCalledWith(2, `echo 'Hello Universe'`, {
maxBuffer: LARGE_BUFFER,
expect(spawnSpy).toHaveBeenNthCalledWith(2, `echo 'Hello Universe'`, [], {
shell: true,
detached: process.platform !== 'win32',
env: {
...process.env,
...env(),
},
windowsHide: false,
stdio: ['inherit', 'pipe', 'pipe'],
});
});

it('should set FORCE_COLOR=true when running with --color', async () => {
const exec = jest.spyOn(require('child_process'), 'exec');
const spawnSpy = jest.spyOn(require('child_process'), 'spawn');
await runCommands(
{
commands: [`echo 'Hello World'`, `echo 'Hello Universe'`],
Expand All @@ -660,16 +665,20 @@ describe('Run Commands', () => {
context
);

expect(exec).toHaveBeenCalledTimes(2);
expect(exec).toHaveBeenNthCalledWith(1, `echo 'Hello World'`, {
maxBuffer: LARGE_BUFFER,
expect(spawnSpy).toHaveBeenCalledTimes(2);
expect(spawnSpy).toHaveBeenNthCalledWith(1, `echo 'Hello World'`, [], {
shell: true,
detached: process.platform !== 'win32',
env: { ...process.env, FORCE_COLOR: `true`, ...env() },
windowsHide: false,
stdio: ['inherit', 'pipe', 'pipe'],
});
expect(exec).toHaveBeenNthCalledWith(2, `echo 'Hello Universe'`, {
maxBuffer: LARGE_BUFFER,
expect(spawnSpy).toHaveBeenNthCalledWith(2, `echo 'Hello Universe'`, [], {
shell: true,
detached: process.platform !== 'win32',
env: { ...process.env, FORCE_COLOR: `true`, ...env() },
windowsHide: false,
stdio: ['inherit', 'pipe', 'pipe'],
});
});
});
Expand Down
170 changes: 81 additions & 89 deletions packages/nx/src/executors/run-commands/running-tasks.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import * as pc from 'picocolors';
import { ChildProcess, exec, Serializable } from 'child_process';
import { ChildProcess, spawn, Serializable } from 'child_process';
import { env as appendLocalEnv } from 'npm-run-path';
import { isAbsolute, join } from 'path';
import * as treeKill from 'tree-kill';
import { ExecutorContext } from '../../config/misc-interfaces';
import { killProcessTree, killProcessTreeGraceful } from '../../native';
import {
createPseudoTerminal,
PseudoTerminal,
Expand All @@ -17,7 +17,6 @@ import {
import { registerTaskProcessStart } from '../../tasks-runner/task-io-service';
import { signalToCode } from '../../utils/exit-codes';
import {
LARGE_BUFFER,
NormalizedRunCommandsOptions,
RunCommandsCommandOptions,
} from './run-commands.impl';
Expand Down Expand Up @@ -77,16 +76,8 @@ export class ParallelRunningTasks implements RunningTask {
}
}

async kill(signal?: NodeJS.Signals) {
await Promise.all(
this.childProcesses.map(async (p) => {
try {
return p.kill();
} catch (e) {
console.error(`Unable to terminate "${p.command}"\nError:`, e);
}
})
);
async kill(signal?: NodeJS.Signals): Promise<void> {
await Promise.allSettled(this.childProcesses.map((p) => p.kill(signal)));
}

private async run() {
Expand Down Expand Up @@ -155,10 +146,7 @@ export class ParallelRunningTasks implements RunningTask {
failureDetails = { childProcess, code, terminalOutput };

// Immediately terminate all other running processes
await this.terminateRemainingProcesses(
runningProcesses,
childProcess
);
this.terminateRemainingProcesses(runningProcesses, childProcess);
}

runningProcesses.delete(childProcess);
Expand Down Expand Up @@ -186,38 +174,16 @@ export class ParallelRunningTasks implements RunningTask {
}
}

private async terminateRemainingProcesses(
private terminateRemainingProcesses(
runningProcesses: Set<RunningNodeProcess>,
failedProcess: RunningNodeProcess
): Promise<void> {
const terminationPromises: Promise<void>[] = [];

): void {
const processesToTerminate = [...runningProcesses].filter(
(p) => p !== failedProcess
);
for (const process of processesToTerminate) {
runningProcesses.delete(process);

// Terminate the process
terminationPromises.push(
process.kill('SIGTERM').catch((err) => {
// Log error but don't fail the entire operation
if (this.streamOutput) {
console.error(
`Failed to terminate process "${process.command}":`,
err
);
}
})
);
}

// Wait for all terminations to complete with a timeout
if (terminationPromises.length > 0) {
await Promise.race([
Promise.all(terminationPromises),
new Promise<void>((resolve) => setTimeout(resolve, 5_000)),
]);
process.kill('SIGTERM').catch(() => {});
}
}
}
Expand Down Expand Up @@ -275,7 +241,7 @@ export class SeriallyRunningTasks implements RunningTask {
}

kill(signal?: NodeJS.Signals) {
return this.currentProcess.kill(signal);
return this.currentProcess?.kill(signal);
}

private async run(
Expand Down Expand Up @@ -380,6 +346,8 @@ class RunningNodeProcess implements RunningTask {
private exitCallbacks: Array<(code: number, terminalOutput: string) => void> =
[];
private outputCallbacks: Array<(terminalOutput: string) => void> = [];
private killPromise: Promise<void> | null = null;
private closedPromise: Promise<void>;
public command: string;

constructor(
Expand All @@ -399,11 +367,17 @@ class RunningNodeProcess implements RunningTask {
if (streamOutput) {
process.stdout.write(header);
}
this.childProcess = exec(commandConfig.command, {
maxBuffer: LARGE_BUFFER,
this.childProcess = spawn(commandConfig.command, [], {
shell: true,
detached: process.platform !== 'win32',
env,
cwd,
windowsHide: false,
stdio: ['inherit', 'pipe', 'pipe'],
});

this.closedPromise = new Promise<void>((resolve) => {
this.childProcess.on('close', resolve);
});

// Register process for metrics collection
Expand Down Expand Up @@ -436,17 +410,23 @@ class RunningNodeProcess implements RunningTask {
}

kill(signal?: NodeJS.Signals): Promise<void> {
return new Promise<void>((res, rej) => {
treeKill(this.childProcess.pid, signal, (err) => {
// On Windows, tree-kill (which uses taskkill) may fail when the process or its child process is already terminated.
// Ignore the errors, otherwise we will log them unnecessarily.
if (err && process.platform !== 'win32') {
rej(err);
} else {
res();
}
});
});
if (this.killPromise) return this.killPromise;
if (!this.childProcess.pid) return Promise.resolve();
// Cache the promise so concurrent callers (e.g. cleanup() after a
// fire-and-forget kill from cleanUpUnneededContinuousTasks) await the
// same in-progress operation instead of no-oping.
this.killPromise = killProcessTreeGraceful(
this.childProcess.pid,
signal
).then(() =>
// Wait for stdio drain and close events before the orchestrator
// tears down the event loop via process.exit().
Promise.race([
this.closedPromise,
new Promise<void>((resolve) => setTimeout(resolve, 1000)),
])
);
return this.killPromise;
}

private triggerOutputListeners(output: string) {
Expand Down Expand Up @@ -519,25 +499,29 @@ class RunningNodeProcess implements RunningTask {
}
}
});
// Terminate any task processes on exit
// Terminate any task processes on exit (sync, last resort).
// Skip if graceful kill is already in progress — it tracks cleanup
// subprocesses and we must not interfere with them.
process.on('exit', () => {
this.kill();
});
process.on('SIGINT', () => {
this.kill('SIGTERM');
// we exit here because we don't need to write anything to cache.
process.exit(signalToCode('SIGINT'));
});
process.on('SIGTERM', () => {
this.kill('SIGTERM');
// no exit here because we expect child processes to terminate which
// will store results to the cache and will terminate this process
});
process.on('SIGHUP', () => {
this.kill('SIGTERM');
// no exit here because we expect child processes to terminate which
// will store results to the cache and will terminate this process
if (this.childProcess.pid && !this.killPromise) {
killProcessTree(this.childProcess.pid);
}
});
// In the direct path, detached children don't get OS SIGINT (own process
// group via setsid); the orchestrator's cleanup() sends SIGTERM via
// killProcessTreeGraceful. Signal handlers here are only needed in the
// forked path where there's no orchestrator to dispatch signals.
if (process.env.NX_FORKED_TASK_EXECUTOR) {
process.on('SIGINT', () => {
this.kill('SIGTERM');
});
process.on('SIGTERM', () => {
this.kill('SIGTERM');
});
process.on('SIGHUP', () => {
this.kill('SIGTERM');
});
}
}
}

Expand Down Expand Up @@ -704,23 +688,31 @@ function registerProcessListener(
}
});

// Terminate any task processes on exit
// Terminate any task processes on exit (sync, last resort).
// The per-child exit handlers and PseudoTerminal.shutdown() use the
// sync killProcessTree for this path. We call kill() here as a
// best-effort fallback — it returns a Promise but on 'exit' only
// synchronous work runs, so the initial signal is sent but the
// grace period won't be awaited. That's acceptable: 'exit' is the
// last resort after SIGINT/SIGTERM handlers have already had their
// chance to do graceful shutdown.
process.on('exit', () => {
runningTask.kill();
});
process.on('SIGINT', () => {
runningTask.kill('SIGTERM');
// we exit here because we don't need to write anything to cache.
process.exit(signalToCode('SIGINT'));
});
process.on('SIGTERM', () => {
runningTask.kill('SIGTERM');
// no exit here because we expect child processes to terminate which
// will store results to the cache and will terminate this process
});
process.on('SIGHUP', () => {
runningTask.kill('SIGTERM');
// no exit here because we expect child processes to terminate which
// will store results to the cache and will terminate this process
});
// In the direct path, the orchestrator handles signal dispatch to children.
// These handlers are only needed in the forked path where there's no
// orchestrator.
if (process.env.NX_FORKED_TASK_EXECUTOR) {
process.on('SIGINT', () => {
Promise.resolve(runningTask.kill('SIGTERM')).finally(() => {
process.exit(signalToCode('SIGINT'));
});
});
process.on('SIGTERM', () => {
runningTask.kill('SIGTERM');
});
process.on('SIGHUP', () => {
runningTask.kill('SIGTERM');
});
}
}
Loading
Loading