go-pipe v2: Stage API, pooled copies, MemoryWatch removal#51
Conversation
409297e to
fa6c12d
Compare
79f4a1a to
911ed5b
Compare
|
This PR is still in draft mode. Do you want review/feedback already? |
@mhagger I ran out of time to review the LLM output before vacation. I wanted to read through the changes more fully myself before inflicting them on anyone else so I left it in draft mode. |
911ed5b to
4764d95
Compare
Ported from version-2 branch commits: - 95dc2e8 pipeline_test.go: get rid of a bunch of unnecessary tmpdirs - 5fdc22a TestPipelineStdinThatIsNeverClosed(): create stdin more simply - c2c9802 pipeline_test.go: use WithStdoutCloser() to close stdout pipes Tests that don't run external commands (or whose commands don't need a specific working directory) don't need t.TempDir().
Add some benchmarks that move MB-scale data through pipelines consisting of alternating commands and functions, one in small writes, and one buffered into larger writes, then processing it one line at a time. This is not so efficient, because every transition from `Function` → `Command` requires an extra (hidden) goroutine that copies the data from an `io.Reader` to a `*os.File`. We can make this faster!
* Rename * `newNopCloser()` → `newReaderNopCloser()` * `nopCloser` → `readerNopCloser` * `nopCloserWriterTo` → `readerWriterToNopCloser` * `nopWriteCloser` → `writerNopCloser` to help keep readers and writers straight and because only the `Close()` part is a NOP. * Move `writerNopCloser` to `nop_closer.go` to be with its siblings.
4764d95 to
f97fddd
Compare
2ed608b to
d14ef7b
Compare
08a9cf4 to
fca1bfc
Compare
There was a problem hiding this comment.
Pull request overview
This PR modernizes the pipeline stage contract for v2 by letting stages declare I/O preferences and receive both stdin and stdout from the pipeline, enabling better pipe selection and removing the synthetic ioCopier stage.
Changes:
- Redesigns
StagewithPreferences()andStart(..., stdin, stdout)soPipeline.Start()can negotiateos.Pipevsio.Pipe. - Reworks command/function/memory-limit stages for the new interface, including pooled stdout copies for non-file command destinations.
- Updates module path to
/v2and adds regression/benchmark coverage for pipe matching, empty pipelines, fast-path stdout, and start-failure cleanup.
Show a summary per file
| File | Description |
|---|---|
README.md |
Updates documentation links for the v2 module path. |
go.mod |
Changes the module path to github.com/github/go-pipe/v2. |
internal/ptree/ptree_test.go |
Updates internal import path for v2. |
pipe/stage.go |
Redefines the public Stage interface and adds I/O preference types. |
pipe/pipeline.go |
Reworks pipeline startup to negotiate pipe types and pass stdout directly. |
pipe/command.go |
Adapts command stages to the new interface and adds pooled stdout copy handling. |
pipe/function.go |
Adapts function stages to receive caller-provided stdout and panic handling. |
pipe/filter-error.go |
Forwards panic handlers through error-filtering wrappers. |
pipe/memorylimit.go |
Ports memory-watching wrappers to the new stage interface. |
pipe/nop_closer.go |
Splits reader/writer nop closers and adds test unwrapping support. |
pipe/copy_pool.go |
Adds pooled-buffer copy helper with ReaderFrom fast-path support. |
pipe/iocopier.go |
Removes the old synthetic copier stage. |
pipe/scanner.go |
Simplifies scanner error return. |
pipe/command_linux.go |
Updates internal import path for v2. |
pipe/command_test.go |
Applies formatting cleanup. |
pipe/command_nil_panic_test.go |
Updates direct Start call for the new signature. |
pipe/pipeline_test.go |
Updates tests/benchmarks for v2 behavior, empty pipelines, and panic forwarding. |
pipe/memorylimit_test.go |
Reworks memory-limit tests for the new pipeline flow. |
pipe/pipe_matching_test.go |
Adds coverage for negotiated stdin/stdout pipe types. |
pipe/export_test.go |
Exposes nop-closer unwrapping for external package tests. |
pipe/command_stdout_fastpath_test.go |
Adds tests pinning direct *os.File stdout handoff. |
pipe/command_starterror_test.go |
Adds regression coverage for start-failure copy-goroutine cleanup. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 22/22 changed files
- Comments generated: 2
d7948da to
33ca03c
Compare
I think this is actually worth taking a look at now. |
This will be needed in a moment.
Ignore all but the first call to `Close()`.
In the type comments, explain why these types don't implement `io.Reader` and `io.Writer`. Otherwise, some helpful person is sure to come along and add `Read()` and `Write()` methods, to the detriment of performance and even changing some semantics.
Change the types of some `Pipeline` fields: * `stdin` to `InputStream` * `stdout` to `OutputStream` That way we don't have to manage their closers separately.
The two aspects of a stage's stream requirements are coupled. For example, of the four possible combinations of `(StreamRequirement, NeedsFile)`, one of them, `(StreamForbidden, true)`, makes no sense. So instead of encoding these two aspects separately, encode the three meaningful combinations into a single `StreamRequirement` type with possible values `StreamAcceptAny`, `StreamPreferFile`, and `StreamForbidden`.
The old code used a `stageStarter` type to help with starting up stages. There are two awkward things about that approach: * The pipe that is needed to join stages depends on the two adjacent stages, not on the stdin and stdout of a single stage. So `stageStarter` wasn't able to figure out what pipe was needed; that logic kindof needed to live in the `Pipeline` type. * Since the read and write ends of a pipe are created together, the `stageStarter` could also not be as helpful in closing those streams if there was an error. So instead, use a new helper type, `stageJoiner`, to figure out how to join two adjacent stages together. The `stageJoiner` can now do part of the work for us: * `stageJoiner.validate()` checks that the adjacent stages' requirements are met WRT whether they need stdin/stdout to be supplied at all. * `stageJoiner.createPipe()` figures out what kind of pipe to create for the two adjacent stages. * `stageJoiner.closePipe()` closes the pipe (if it has already been created and needs closing) on errors. This isn't a panacea; for example, some loops have to iterate over stages, and some over stage joiners. But I think that it's less confusing this way.
Move the part of the explanation that is specific to command stages to `command.go`.
needFilePipe() and validate() each call Stage.Requirements() for the adjacent stages, so a stage's requirements can be computed/allocated twice during startup. Caching the result on the joiner lets us call Requirements() only once.
Reorder Start() to run stageJoiner.validate() before createPipe(), and make validate() decide whether a stream is "connected" from the pipeline topology (the presence of an adjacent stage or an already-set input/output stream). This lets us reject an invalid pipeline without allocating any io.Pipe() objects, and in the case of os.Pipe(), doing the system calls to create OS-level file descriptors.
Make the stream types more capable and use them in more places
Use a `stageJoiner` to join stages together
Document stream ownership and v2 early-close behavior
| if err := requirements.Stdin.Validate(); err != nil { | ||
| return fmt.Errorf("stdin: %w", err) | ||
| } |
| if err := requirements.Stdout.Validate(); err != nil { | ||
| return fmt.Errorf("stdout: %w", err) | ||
| } |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Motivation: Stage pipe negotiation and pooled buffer copies
The original motivation of this work was the perf optimizations in #49 and #50. But rather than optimizing the
ioCopier, we can instead completely1 remove it by adopting the approach taken in @mhagger's Stage interface redesign (#21). This branch rebased #21 onto a recent main and folds in the #49/#50 optimizations according to the new structure.This PR is the integration branch for go-pipe v2. PRs included:
stageJoinerto join stages together #59v2 interface changes
Summary of the new
Stageinterface (seepipe/stage.go,pipe/streams.go,pipe/stream_requirement.go):Since we're doing a major version bump, we also take advantage of the opportunity to do some additional API cleanups. This happened in phases:
Phase 1: Stage interface pipe negotiation
The interface now hands each stage both its stdin and stdout (as
*InputStream/*OutputStream), plus a newStageOptions, and asks each stage to declare its I/O requirements viaRequirements()so the pipeline can pick the cheapest pipe type between neighbors.Phase 2: rework MemoryWatch constructor
In #54, the three flavors of MemoryWatch stage are folded together to use options to configure a single kind of stage instead of having several almost-identical variants. (This stage is then removed entirely in Phase 4 — see below.)
Phase 3: Separate close-responsibility and other concerns
In #53, rather than inferring who is responsible for closing a pipeline stage's stdin/stdout from the dynamic type, we communicate it explicitly. This is now modeled by the
InputStream/OutputStreamwrapper types.Envand the start options were also consolidated into the singleStageOptionsstruct.Phase 4: Remove non-essential Stage implementations and add
WithExtraEnvstage.Since it's (almost) possible to implement the MemoryWatch stage entirely outside of go-pipe, in #56 we add the one interface necessary to allow that (a stage that exposes its
*os.Process) and remove the stage. This let a lot of code be deleted —pipe/memorylimit.go,pipe/iocopier.go,pipe/nop_closer.go,pipe/panic.go, and all ofinternal/ptree.In the other direction, we add a
WithExtraEnv(inner Stage, env []EnvVar)wrapper that runs a single stage under a modified environment (this complementsStageOptions.Env, which sets the environment for the overall pipeline).Phase 5: Stream ownership, early-close semantics, and pipe negotiation (#57/#58/#59)
*InputStream/*OutputStreamtypes above, making close-responsibility explicit at the type level rather than via a separate bool.Close()methods are made idempotent.stageJoiner(Use astageJoinerto join stages together #59): a new helper (pipe/stage_joiner.go) owns the creation of the pipe between each pair of adjacent stages.EPIPE/io.ErrClosedPipeitself (in v1 this was often, but not reliably, hidden). This is documented in the new "Migrating to v2" section of the README, along with thepipe.IgnoreError(stage, IsPipeError)remedy and other migration patterns.Phase 1 Details
Since Phase 1 wasn't a separate PR and was repurposed as this integration branch, its original description is below.
Perf optimizations
We want to minimize data copies, whether by letting the kernel move bytes instead of Go, or, if we do copy in Go, doing it more efficiently. With the pipeline owning the connections it can:
*os.Filedestination's fd directly into the child, letting the kernel (sendfile(2)/splice(2)and friends) do the copy (so-called "zero-copy" i/o);exec.Cmdallocate a fresh one per pipeline.#49 and #50 were aiming for similar optimizations, but now they fall out of the structure instead of being bolted onto
ioCopier.Panic Handling
go-pipe runs user code (function stages, the now-removed memory-limit event handlers) in goroutines it spawns itself, so a panic there used to be able to take down the whole process. There was already panic handling, but it was part of the Stage interface, which caused a lot of error-prone boilerplate in stages that don't do any panic handling (which is most of them). Panic handling is now a callback (
StageOptions.PanicHandler, set viaWithStagePanicHandler) that the pipeline threads to every stage'sStart, so a single handler covers all stages. IfPanicHandleris nil the panic propagates and crashes, intentionally.Supersedes
Stageinterface to make stdin/stdout handling more flexible #21 (stale, merge conflicts)Stage2interface that allows such stages to be started more flexibly #20 (the opt-inStage2variant)sync.PoolforioCopiercopy buffers) — pool preserved incopy_pool.goioCopier) — sendfile preserved structurally; theWriterTopool-bypass workaround is gone, since we control the copy site nowThe
git-systems/pooled-copiesbranch (which carried #49 + #50) can be deleted after this merges./cc @mhagger @migue @carlosmn
Copilot Summary
Interface change
Stage.Startnow takes both stdin and stdout as wrapper types plus aStageOptionsstruct (a struct so future run-scoped options don't break the interface again):Each stage declares its I/O needs through
Requirements() StageRequirements, andPipeline.Start()uses those to negotiate the pipe type between adjacent stages (via thestageJoinerhelper):os.Pipe()when either neighbor wants a real*os.Filefd (StreamPreferFile)io.Pipe()when both neighbors are in-process Go functions (cheaper, all userspace)stdoutpassed directly to the last stage — no syntheticioCopierStreamForbiddenlets a stage declare that a given stream must benil(validated before any pipes are created). The module path is bumped togithub.com/github/go-pipe/v2.Stream ownership (replaces dynamic-type-based close inference)
InputStream/OutputStream(pipe/streams.go) carry the underlyingio.Reader/io.Writerand who owns closing it:Input(r)/Output(w)— non-closing; the stage reads/writes but must not close.ClosingInput(r)/ClosingOutput(w)— the stage owns and closes the stream.Reader()/Writer()expose the concrete underlying value (preserving*os.File/io.WriterTo/io.ReaderFromidentity for fast paths);Close()is idempotent and nil-safe.This replaces the old
NopCloser/Unwrap{Reader,Writer}forwarding machinery, which is deleted.Fast paths preserved from #49/#50
ioCopieris deleted; the optimizations it carried now live in the stage structure:commandStagewriting to an*os.Filedestination dup's the fd into the child; for a non-*os.Filewriter that implementsio.ReaderFrom, the copy goes throughReadFrom(sendfile where the kernel supports it). Pinned inpipe/command_stdout_fastpath_test.go.*os.File, non-ReaderFromstdout, stdout setup builds anos.Pipe()and copies through async.Poolbuffer rather than lettingexec.Cmdallocate per-pipeline. Seepipe/copy_pool.go.Panic handling
StageOptions.PanicHandler(StagePanicHandler = func(p any) error) is set on the pipeline viaWithStagePanicHandlerand threaded to every stage'sStart. The previousStagePanicHandlerAwareopt-in interface is removed; wrapper stages just forwardopts.FunctionstageStageFuncs (run in a library-spawned goroutine). IfPanicHandleris nil the panic propagates and crashes — intentional.Removals
pipe/memorylimit.go(the MemoryWatch stage) and all ofinternal/ptree— replaced by an external-implementation hook (a stage exposing its*os.Process, added in add/remove stages: remove MemoryWatch and add WithExtraEnv #56).pipe/iocopier.go,pipe/nop_closer.go,pipe/panic.go— superseded by the stream/joiner structure and the threaded panic handler.Additions
WithExtraEnv(inner Stage, env []EnvVar)(pipe/env_stage.go) — per-stage environment override, complementing pipeline-wideStageOptions.Env.pipe/stage_joiner.go— owns inter-stage pipe creation and requirement validation.IgnoreError(stage, IsPipeError)remedy.Commit structure
version-2(authorship preserved): linter shush,pipeline_test.gocleanup, pipeline benchmarks, NopCloser simplification, the Stage interface change, pipe-matching tests.MemoryLimitWithObserver, restore the Function-stage panic handler, fixmemoryWatchStage.Wait()to alwaysstopWatching(), lint.stageJoinerto join stages together #59): close-responsibility decoupling, MemoryWatch consolidation then removal,WithExtraEnv, stream types,stageJoiner, early-close regression tests, and the panic-handler threading.Footnotes
almost ↩