fix: enforce disableBufferFullDiscard when qos>=1#4024
Open
pulkitvats2007-crypto wants to merge 1 commit intolf-edge:masterfrom
Open
fix: enforce disableBufferFullDiscard when qos>=1#4024pulkitvats2007-crypto wants to merge 1 commit intolf-edge:masterfrom
pulkitvats2007-crypto wants to merge 1 commit intolf-edge:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent silent data loss under congestion by ensuring that for QoS ≥ 1 (at-least-once / exactly-once) the pipeline uses backpressure (blocking) instead of dropping buffered messages.
Changes:
- Changed
RuleOption.DisableBufferFullDiscardfromboolto*boolto distinguish “unset” from “explicitly false”. - Defaulted
DisableBufferFullDiscardtotrueduring rule option validation whenQos >= AtLeastOnce, and added a warning for inconsistent configuration. - Updated planner/node/test code to handle the pointer semantics (
nilvstrue/false).
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/topo/topotest/rule_test.go | Updates test options to pass *bool for DisableBufferFullDiscard. |
| internal/topo/planner/planner_test.go | Updates planner test to use *bool for DisableBufferFullDiscard. |
| internal/topo/planner/planner.go | Treats DisableBufferFullDiscard == nil as disabled; avoids nil deref. |
| internal/topo/node/node.go | Initializes node discard behavior from *bool rule option. |
| internal/pkg/def/rule.go | Changes DisableBufferFullDiscard type to *bool in RuleOption. |
| internal/conf/conf.go | Enforces default disableBufferFullDiscard=true for QoS≥1 when unset; warns on explicit false. |
Comments suppressed due to low confidence (1)
internal/topo/planner/planner.go:515
- With DisableBufferFullDiscard now being auto-defaulted for QoS>=1 (ValidateRuleOption), this error can be triggered even when the user did not explicitly enable disableBufferFullDiscard. The message "disableBufferFullDiscard can't be enabled..." may be misleading in that case; consider making the error mention that QoS>=1 requires backpressure/no-discard and that shared streams are therefore incompatible (or otherwise clarify the root cause).
if opt.DisableBufferFullDiscard == nil || !*opt.DisableBufferFullDiscard {
return nil
}
for _, stream := range streams {
if stream.stmt.Options.SHARED {
return fmt.Errorf("disableBufferFullDiscard can't be enabled with shared stream %v", stream.stmt.Name)
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
00937b8 to
5cb7238
Compare
…tage Signed-off-by: pulkitvats2007-crypto <pulkitvats2007@gmail.com>
5cb7238 to
040b5ee
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This PR fixes a critical data loss issue in the streaming pipeline where messages could be silently dropped under backpressure, even when QoS was set to At-Least-Once (1) or Exactly-Once (2).
Previously, when output channels became full, the system would drop the oldest messages by default (
disableBufferFullDiscard == false). This behavior was not overridden for higher QoS levels, breaking delivery guarantees and potentially causing permanent data loss.Fix
Updated
SetQosininternal/topo/node/node.goto enforce safe backpressure handling:This ensures that for QoS ≥ 1, the system blocks instead of dropping messages, preserving data integrity.
Testing
• Added TestSetQos to verify correct behavior of disableBufferFullDiscard for different QoS levels.
• All new tests pass successfully.
• Existing unrelated test failures were observed but are pre-existing.
Bug Summary
• Issue: Silent data loss due to buffer overflow handling ignoring QoS guarantees
• Root Cause: Lossy default backpressure strategy not overridden for QoS ≥ 1
• Affected Area: doBroadcast and QoS configuration logic
Impact
• Restores correct At-Least-Once and Exactly-Once semantics
• Prevents silent data loss under high throughput or slow sinks
• Ensures checkpoint integrity and accurate offset commits
Fixes #4023