-
Notifications
You must be signed in to change notification settings - Fork 0
Description
We need a test that checks that the CommandExecutor doesn't get stuck in a deadlock while consume_results. However, as a command, we can only use Bash (this should be available on all dev machines) and write a Bash script on the fly to a temp dir.
AI answer:
To test for deadlocks in process execution—especially when handling stdout and stderr—we need to simulate a "Buffer Overflow" scenario. A deadlock typically occurs when a subprocess fills its OS pipe buffer (often 64KB) and pauses to wait for the parent to read, while the parent is blocked waiting for the subprocess to finish.
The following pytest example creates a temporary Bash script that floods both stdout and stderr simultaneously to ensure your threading logic in consume_results handles concurrent stream draining correctly.
The Pytest Implementation
import pytest
import subprocess
import os
import stat
from unittest.mock import MagicMock
# Assuming the classes are in a file named executor.py
# from executor import CommandExecutor
def test_command_executor_no_deadlock(tmp_path):
"""
Tests that CommandExecutor doesn't deadlock when stdout and stderr
buffers are filled simultaneously.
"""
# 1. Create a bash script that writes heavily to both streams
# 70,000 chars exceeds the standard 64KB pipe buffer on most Linux systems
script_path = tmp_path / "flood_buffers.sh"
script_content = """#!/bin/bash
# Print 70,000 'A's to stdout
printf 'A%.0s' {1..70000}
# Print 70,000 'B's to stderr
printf 'B%.0s' {1..70000} >&2
exit 0
"""
script_path.write_text(script_content)
# 2. Setup Mock Logger
mock_logger = MagicMock()
executor = CommandExecutor(mock_logger)
# 3. Execute and Consume
# We use a simple list to capture output to verify we actually read it all
captured_out = []
captured_err = []
def consume_out(line, _): captured_out.append(line)
def consume_err(line, _): captured_err.append(line)
result = executor.execute(["bash", str(script_path)])
# If this deadlocks, pytest will hang here
ret_code = result.consume_results(consume_out, consume_err)
# 4. Assertions
assert ret_code == 0
# Join all chunks and check length to ensure no data was lost/truncated
full_out = "".join(captured_out)
full_err = "".join(captured_err)
assert len(full_out) >= 70000
assert len(full_err) >= 70000
assert "A" in full_out
assert "B" in full_errWhy this works
- Buffer Saturation: By writing 70,000 bytes (more than the typical byte buffer), the Bash script is forced to block until your Python code reads the data.
- Parallel Consumption: Because you used
threading.Threadfor bothstdoutandstderrinconsume_results, the parent process drains both pipes simultaneously. - Deadlock Prevention: If you had tried to read
stdout.join()before starting thestderrthread, or if you had calledsub_process.wait()before reading, the buffers would stay full, the Bash script would stay blocked, and the Python script would wait forever.