Skip to content

Add integration test for CommandExecutor that checks for deadlocks #32

@tkilias

Description

@tkilias

We need a test that checks that the CommandExecutor doesn't get stuck in a deadlock while consume_results. However, as a command, we can only use Bash (this should be available on all dev machines) and write a Bash script on the fly to a temp dir.

AI answer:

To test for deadlocks in process execution—especially when handling stdout and stderr—we need to simulate a "Buffer Overflow" scenario. A deadlock typically occurs when a subprocess fills its OS pipe buffer (often 64KB) and pauses to wait for the parent to read, while the parent is blocked waiting for the subprocess to finish.

The following pytest example creates a temporary Bash script that floods both stdout and stderr simultaneously to ensure your threading logic in consume_results handles concurrent stream draining correctly.

The Pytest Implementation

import pytest
import subprocess
import os
import stat
from unittest.mock import MagicMock

# Assuming the classes are in a file named executor.py
# from executor import CommandExecutor

def test_command_executor_no_deadlock(tmp_path):
    """
    Tests that CommandExecutor doesn't deadlock when stdout and stderr
    buffers are filled simultaneously.
    """
    # 1. Create a bash script that writes heavily to both streams
    # 70,000 chars exceeds the standard 64KB pipe buffer on most Linux systems
    script_path = tmp_path / "flood_buffers.sh"
    script_content = """#!/bin/bash
    # Print 70,000 'A's to stdout
    printf 'A%.0s' {1..70000}
    # Print 70,000 'B's to stderr 
    printf 'B%.0s' {1..70000} >&2
    exit 0
    """
    script_path.write_text(script_content)
    
    # 2. Setup Mock Logger
    mock_logger = MagicMock()
    executor = CommandExecutor(mock_logger)

    # 3. Execute and Consume
    # We use a simple list to capture output to verify we actually read it all
    captured_out = []
    captured_err = []

    def consume_out(line, _): captured_out.append(line)
    def consume_err(line, _): captured_err.append(line)

    result = executor.execute(["bash", str(script_path)])
    
    # If this deadlocks, pytest will hang here
    ret_code = result.consume_results(consume_out, consume_err)

    # 4. Assertions
    assert ret_code == 0
    # Join all chunks and check length to ensure no data was lost/truncated
    full_out = "".join(captured_out)
    full_err = "".join(captured_err)
    
    assert len(full_out) >= 70000
    assert len(full_err) >= 70000
    assert "A" in full_out
    assert "B" in full_err

Why this works

  1. Buffer Saturation: By writing 70,000 bytes (more than the typical byte buffer), the Bash script is forced to block until your Python code reads the data.
  2. Parallel Consumption: Because you used threading.Thread for both stdout and stderr in consume_results, the parent process drains both pipes simultaneously.
  3. Deadlock Prevention: If you had tried to read stdout.join() before starting the stderr thread, or if you had called sub_process.wait() before reading, the buffers would stay full, the Bash script would stay blocked, and the Python script would wait forever.

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureProduct feature

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions