Skip to content

go-pipe v2: Stage API, pooled copies, MemoryWatch removal#51

Open
znull wants to merge 95 commits into
mainfrom
znull/stage-v2-clean
Open

go-pipe v2: Stage API, pooled copies, MemoryWatch removal#51
znull wants to merge 95 commits into
mainfrom
znull/stage-v2-clean

Conversation

@znull

@znull znull commented Apr 9, 2026

Copy link
Copy Markdown
Contributor

Motivation: Stage pipe negotiation and pooled buffer copies

The original motivation of this work was the perf optimizations in #49 and #50. But rather than optimizing the ioCopier, we can instead completely1 remove it by adopting the approach taken in @mhagger's Stage interface redesign (#21). This branch rebased #21 onto a recent main and folds in the #49/#50 optimizations according to the new structure.

This PR is the integration branch for go-pipe v2. PRs included:

v2 interface changes

Summary of the new Stage interface (see pipe/stage.go, pipe/streams.go, pipe/stream_requirement.go):

type Stage interface {
    Name() string
    Requirements() StageRequirements
    Start(
        ctx context.Context, opts StageOptions,
        stdin *InputStream, stdout *OutputStream,
    ) error
    Wait() error
}

// StageOptions carries everything (other than ctx, stdin, and stdout)
// that a pipeline passes to Stage.Start.
type StageOptions struct {
    Env
    PanicHandler StagePanicHandler
}

type StagePanicHandler func(p any) error

// StageRequirements describes what a Stage needs from the streams
// connected to its stdin and stdout. The zero value is correct for
// stages happy with arbitrary io.Reader/io.Writer streams (e.g. Function).
type StageRequirements struct {
    Stdin  StreamRequirement
    Stdout StreamRequirement
}

type StreamRequirement int

const (
    StreamAcceptAny StreamRequirement = iota // no requirement (may even be nil)
    StreamPreferFile                         // prefers an *os.File fd, but works with anything
    StreamForbidden                          // stream must be nil; not read/written/closed
)

Since we're doing a major version bump, we also take advantage of the opportunity to do some additional API cleanups. This happened in phases:

Phase 1: Stage interface pipe negotiation

The interface now hands each stage both its stdin and stdout (as *InputStream/*OutputStream), plus a new StageOptions, and asks each stage to declare its I/O requirements via Requirements() so the pipeline can pick the cheapest pipe type between neighbors.

Phase 2: rework MemoryWatch constructor

In #54, the three flavors of MemoryWatch stage are folded together to use options to configure a single kind of stage instead of having several almost-identical variants. (This stage is then removed entirely in Phase 4 — see below.)

Phase 3: Separate close-responsibility and other concerns

In #53, rather than inferring who is responsible for closing a pipeline stage's stdin/stdout from the dynamic type, we communicate it explicitly. This is now modeled by the InputStream/OutputStream wrapper types. Env and the start options were also consolidated into the single StageOptions struct.

Phase 4: Remove non-essential Stage implementations and add WithExtraEnv stage.

Since it's (almost) possible to implement the MemoryWatch stage entirely outside of go-pipe, in #56 we add the one interface necessary to allow that (a stage that exposes its *os.Process) and remove the stage. This let a lot of code be deleted — pipe/memorylimit.go, pipe/iocopier.go, pipe/nop_closer.go, pipe/panic.go, and all of internal/ptree.

In the other direction, we add a WithExtraEnv(inner Stage, env []EnvVar) wrapper that runs a single stage under a modified environment (this complements StageOptions.Env, which sets the environment for the overall pipeline).

Phase 5: Stream ownership, early-close semantics, and pipe negotiation (#57/#58/#59)

  • Stream types (Make the stream types more capable and use them in more places #58): stdin/stdout are passed as the *InputStream/*OutputStream types above, making close-responsibility explicit at the type level rather than via a separate bool. Close() methods are made idempotent.
  • stageJoiner (Use a stageJoiner to join stages together #59): a new helper (pipe/stage_joiner.go) owns the creation of the pipe between each pair of adjacent stages.
  • Early-close regression tests + docs (Document stream ownership and v2 early-close behavior #57): v2 connects producer stages more directly to a command's stdin, so a producer that keeps writing after a downstream stage exits will now see EPIPE/io.ErrClosedPipe itself (in v1 this was often, but not reliably, hidden). This is documented in the new "Migrating to v2" section of the README, along with the pipe.IgnoreError(stage, IsPipeError) remedy and other migration patterns.

Phase 1 Details

Since Phase 1 wasn't a separate PR and was repurposed as this integration branch, its original description is below.

Perf optimizations

We want to minimize data copies, whether by letting the kernel move bytes instead of Go, or, if we do copy in Go, doing it more efficiently. With the pipeline owning the connections it can:

  • hand a command's stdout fd straight to the next stage (or to the final destination), so there's no Go-side goroutine copying between stages and dirtying the heap;
  • pass an *os.File destination's fd directly into the child, letting the kernel (sendfile(2)/splice(2) and friends) do the copy (so-called "zero-copy" i/o);
  • and where we do have to copy in-process, run it through a pooled 32KB buffer instead of letting exec.Cmd allocate a fresh one per pipeline.

#49 and #50 were aiming for similar optimizations, but now they fall out of the structure instead of being bolted onto ioCopier.

Panic Handling

go-pipe runs user code (function stages, the now-removed memory-limit event handlers) in goroutines it spawns itself, so a panic there used to be able to take down the whole process. There was already panic handling, but it was part of the Stage interface, which caused a lot of error-prone boilerplate in stages that don't do any panic handling (which is most of them). Panic handling is now a callback (StageOptions.PanicHandler, set via WithStagePanicHandler) that the pipeline threads to every stage's Start, so a single handler covers all stages. If PanicHandler is nil the panic propagates and crashes, intentionally.

Supersedes

The git-systems/pooled-copies branch (which carried #49 + #50) can be deleted after this merges.

/cc @mhagger @migue @carlosmn

Copilot Summary

Interface change

Stage.Start now takes both stdin and stdout as wrapper types plus a StageOptions struct (a struct so future run-scoped options don't break the interface again):

Start(ctx context.Context, opts StageOptions, stdin *InputStream, stdout *OutputStream) error

Each stage declares its I/O needs through Requirements() StageRequirements, and Pipeline.Start() uses those to negotiate the pipe type between adjacent stages (via the stageJoiner helper):

  • os.Pipe() when either neighbor wants a real *os.File fd (StreamPreferFile)
  • io.Pipe() when both neighbors are in-process Go functions (cheaper, all userspace)
  • the pipeline's own stdout passed directly to the last stage — no synthetic ioCopier

StreamForbidden lets a stage declare that a given stream must be nil (validated before any pipes are created). The module path is bumped to github.com/github/go-pipe/v2.

Stream ownership (replaces dynamic-type-based close inference)

InputStream/OutputStream (pipe/streams.go) carry the underlying io.Reader/io.Writer and who owns closing it:

  • Input(r) / Output(w) — non-closing; the stage reads/writes but must not close.
  • ClosingInput(r) / ClosingOutput(w) — the stage owns and closes the stream.
  • Reader()/Writer() expose the concrete underlying value (preserving *os.File/io.WriterTo/io.ReaderFrom identity for fast paths); Close() is idempotent and nil-safe.

This replaces the old NopCloser/Unwrap{Reader,Writer} forwarding machinery, which is deleted.

Fast paths preserved from #49/#50

ioCopier is deleted; the optimizations it carried now live in the stage structure:

  • fd dup-to-child / sendfile — a final commandStage writing to an *os.File destination dup's the fd into the child; for a non-*os.File writer that implements io.ReaderFrom, the copy goes through ReadFrom (sendfile where the kernel supports it). Pinned in pipe/command_stdout_fastpath_test.go.
  • pooled 32KB copy buffers — for a non-*os.File, non-ReaderFrom stdout, stdout setup builds an os.Pipe() and copies through a sync.Pool buffer rather than letting exec.Cmd allocate per-pipeline. See pipe/copy_pool.go.

Panic handling

  • StageOptions.PanicHandler (StagePanicHandler = func(p any) error) is set on the pipeline via WithStagePanicHandler and threaded to every stage's Start. The previous StagePanicHandlerAware opt-in interface is removed; wrapper stages just forward opts.
  • Covered: Function stage StageFuncs (run in a library-spawned goroutine). If PanicHandler is nil the panic propagates and crashes — intentional.

Removals

  • pipe/memorylimit.go (the MemoryWatch stage) and all of internal/ptree — replaced by an external-implementation hook (a stage exposing its *os.Process, added in add/remove stages: remove MemoryWatch and add WithExtraEnv #56).
  • pipe/iocopier.go, pipe/nop_closer.go, pipe/panic.go — superseded by the stream/joiner structure and the threaded panic handler.

Additions

  • WithExtraEnv(inner Stage, env []EnvVar) (pipe/env_stage.go) — per-stage environment override, complementing pipeline-wide StageOptions.Env.
  • pipe/stage_joiner.go — owns inter-stage pipe creation and requirement validation.
  • README "Migrating to v2" section documenting the early-close producer caveat and the IgnoreError(stage, IsPipeError) remedy.

Commit structure

Footnotes

  1. almost

@znull znull force-pushed the znull/stage-v2-clean branch from 409297e to fa6c12d Compare April 9, 2026 10:05
@znull znull changed the base branch from main to git-systems/pooled-copies April 9, 2026 10:06
@znull znull force-pushed the znull/stage-v2-clean branch 3 times, most recently from 79f4a1a to 911ed5b Compare April 9, 2026 13:37
@mhagger

mhagger commented Apr 27, 2026

Copy link
Copy Markdown
Member

This PR is still in draft mode. Do you want review/feedback already?

@znull

znull commented May 4, 2026

Copy link
Copy Markdown
Contributor Author

This PR is still in draft mode. Do you want review/feedback already?

@mhagger I ran out of time to review the LLM output before vacation. I wanted to read through the changes more fully myself before inflicting them on anyone else so I left it in draft mode.

@znull znull force-pushed the znull/stage-v2-clean branch from 911ed5b to 4764d95 Compare May 28, 2026 17:29
mhagger added 4 commits May 28, 2026 20:23
Ported from version-2 branch commits:
- 95dc2e8 pipeline_test.go: get rid of a bunch of unnecessary tmpdirs
- 5fdc22a TestPipelineStdinThatIsNeverClosed(): create stdin more simply
- c2c9802 pipeline_test.go: use WithStdoutCloser() to close stdout pipes

Tests that don't run external commands (or whose commands don't
need a specific working directory) don't need t.TempDir().
Add some benchmarks that move MB-scale data through pipelines
consisting of alternating commands and functions, one in small writes,
and one buffered into larger writes, then processing it one line at a
time. This is not so efficient, because every transition from
`Function` → `Command` requires an extra (hidden) goroutine that
copies the data from an `io.Reader` to a `*os.File`.

We can make this faster!
* Rename
  * `newNopCloser()` → `newReaderNopCloser()`
  * `nopCloser` → `readerNopCloser`
  * `nopCloserWriterTo` → `readerWriterToNopCloser`
  * `nopWriteCloser` → `writerNopCloser`

  to help keep readers and writers straight and because only the
  `Close()` part is a NOP.

* Move `writerNopCloser` to `nop_closer.go` to be with its siblings.
@znull znull force-pushed the znull/stage-v2-clean branch from 4764d95 to f97fddd Compare May 28, 2026 18:46
@znull znull changed the base branch from git-systems/pooled-copies to main May 28, 2026 18:46
@znull znull changed the title Stage interface redesign: pipe type negotiation, eliminate ioCopier Stage interface redesign: pipe type negotiation, pooled buffer copies May 28, 2026
@znull znull force-pushed the znull/stage-v2-clean branch 2 times, most recently from 2ed608b to d14ef7b Compare May 29, 2026 11:33
@znull znull self-assigned this May 29, 2026
@znull znull force-pushed the znull/stage-v2-clean branch from 08a9cf4 to fca1bfc Compare May 29, 2026 13:56
@znull znull marked this pull request as ready for review May 29, 2026 14:39
@znull znull requested a review from a team as a code owner May 29, 2026 14:39
Copilot AI review requested due to automatic review settings May 29, 2026 14:39

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR modernizes the pipeline stage contract for v2 by letting stages declare I/O preferences and receive both stdin and stdout from the pipeline, enabling better pipe selection and removing the synthetic ioCopier stage.

Changes:

  • Redesigns Stage with Preferences() and Start(..., stdin, stdout) so Pipeline.Start() can negotiate os.Pipe vs io.Pipe.
  • Reworks command/function/memory-limit stages for the new interface, including pooled stdout copies for non-file command destinations.
  • Updates module path to /v2 and adds regression/benchmark coverage for pipe matching, empty pipelines, fast-path stdout, and start-failure cleanup.
Show a summary per file
File Description
README.md Updates documentation links for the v2 module path.
go.mod Changes the module path to github.com/github/go-pipe/v2.
internal/ptree/ptree_test.go Updates internal import path for v2.
pipe/stage.go Redefines the public Stage interface and adds I/O preference types.
pipe/pipeline.go Reworks pipeline startup to negotiate pipe types and pass stdout directly.
pipe/command.go Adapts command stages to the new interface and adds pooled stdout copy handling.
pipe/function.go Adapts function stages to receive caller-provided stdout and panic handling.
pipe/filter-error.go Forwards panic handlers through error-filtering wrappers.
pipe/memorylimit.go Ports memory-watching wrappers to the new stage interface.
pipe/nop_closer.go Splits reader/writer nop closers and adds test unwrapping support.
pipe/copy_pool.go Adds pooled-buffer copy helper with ReaderFrom fast-path support.
pipe/iocopier.go Removes the old synthetic copier stage.
pipe/scanner.go Simplifies scanner error return.
pipe/command_linux.go Updates internal import path for v2.
pipe/command_test.go Applies formatting cleanup.
pipe/command_nil_panic_test.go Updates direct Start call for the new signature.
pipe/pipeline_test.go Updates tests/benchmarks for v2 behavior, empty pipelines, and panic forwarding.
pipe/memorylimit_test.go Reworks memory-limit tests for the new pipeline flow.
pipe/pipe_matching_test.go Adds coverage for negotiated stdin/stdout pipe types.
pipe/export_test.go Exposes nop-closer unwrapping for external package tests.
pipe/command_stdout_fastpath_test.go Adds tests pinning direct *os.File stdout handoff.
pipe/command_starterror_test.go Adds regression coverage for start-failure copy-goroutine cleanup.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  • Files reviewed: 22/22 changed files
  • Comments generated: 2

Comment thread pipe/memorylimit.go Outdated
Comment thread pipe/export_test.go Outdated
@znull znull force-pushed the znull/stage-v2-clean branch from d7948da to 33ca03c Compare May 29, 2026 14:56
@znull znull requested a review from Copilot May 29, 2026 15:01
@znull

znull commented May 29, 2026

Copy link
Copy Markdown
Contributor Author

@mhagger I ran out of time to review the LLM output before vacation. I wanted to read through the changes more fully myself before inflicting them on anyone else so I left it in draft mode.

I think this is actually worth taking a look at now.

@znull znull requested a review from mhagger May 29, 2026 15:03
mhagger and others added 16 commits June 13, 2026 17:24
Ignore all but the first call to `Close()`.
In the type comments, explain why these types don't implement
`io.Reader` and `io.Writer`. Otherwise, some helpful person is sure to
come along and add `Read()` and `Write()` methods, to the detriment of
performance and even changing some semantics.
Change the types of some `Pipeline` fields:

* `stdin` to `InputStream`
* `stdout` to `OutputStream`

That way we don't have to manage their closers separately.
The two aspects of a stage's stream requirements are coupled. For
example, of the four possible combinations of `(StreamRequirement,
NeedsFile)`, one of them, `(StreamForbidden, true)`, makes no sense.

So instead of encoding these two aspects separately, encode the three
meaningful combinations into a single `StreamRequirement` type with
possible values `StreamAcceptAny`, `StreamPreferFile`, and
`StreamForbidden`.
The old code used a `stageStarter` type to help with starting up
stages. There are two awkward things about that approach:

* The pipe that is needed to join stages depends on the two adjacent
  stages, not on the stdin and stdout of a single stage. So
  `stageStarter` wasn't able to figure out what pipe was needed; that
  logic kindof needed to live in the `Pipeline` type.

* Since the read and write ends of a pipe are created together, the
  `stageStarter` could also not be as helpful in closing those streams
  if there was an error.

So instead, use a new helper type, `stageJoiner`, to figure out how to
join two adjacent stages together. The `stageJoiner` can now do part
of the work for us:

* `stageJoiner.validate()` checks that the adjacent stages'
  requirements are met WRT whether they need stdin/stdout to be
  supplied at all.

* `stageJoiner.createPipe()` figures out what kind of pipe to create
  for the two adjacent stages.

* `stageJoiner.closePipe()` closes the pipe (if it has already been
  created and needs closing) on errors.

This isn't a panacea; for example, some loops have to iterate over
stages, and some over stage joiners. But I think that it's less
confusing this way.
Move the part of the explanation that is specific to command stages to
`command.go`.
needFilePipe() and validate() each call Stage.Requirements() for the
adjacent stages, so a stage's requirements can be computed/allocated
twice during startup. Caching the result on the joiner lets us call
Requirements() only once.
Reorder Start() to run stageJoiner.validate() before createPipe(), and
make validate() decide whether a stream is "connected" from the pipeline
topology (the presence of an adjacent stage or an already-set
input/output stream).

This lets us reject an invalid pipeline without allocating any io.Pipe()
objects, and in the case of os.Pipe(), doing the system calls to create
OS-level file descriptors.
Make the stream types more capable and use them in more places
Use a `stageJoiner` to join stages together
Document stream ownership and v2 early-close behavior
GitHub Advanced Security started work on behalf of znull June 15, 2026 17:00 View session
GitHub Advanced Security finished work on behalf of znull June 15, 2026 17:03
@znull znull requested a review from Copilot June 15, 2026 17:15

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot's findings

  • Files reviewed: 33/33 changed files
  • Comments generated: 4

Comment thread pipe/pipeline.go
Comment on lines +291 to +293
if err := requirements.Stdin.Validate(); err != nil {
return fmt.Errorf("stdin: %w", err)
}
Comment thread pipe/pipeline.go
Comment on lines +294 to 296
if err := requirements.Stdout.Validate(); err != nil {
return fmt.Errorf("stdout: %w", err)
}
Comment thread pipe/stage.go Outdated
Comment thread README.md Outdated
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
GitHub Advanced Security started work on behalf of znull June 15, 2026 17:23 View session
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
GitHub Advanced Security started work on behalf of znull June 15, 2026 17:24 View session
GitHub Advanced Security finished work on behalf of znull June 15, 2026 17:26
GitHub Advanced Security finished work on behalf of znull June 15, 2026 17:27
GitHub Advanced Security started work on behalf of znull June 15, 2026 21:39 View session
GitHub Advanced Security finished work on behalf of znull June 15, 2026 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants