Skip to content

fix: enforce disableBufferFullDiscard when qos>=1#4024

Open
pulkitvats2007-crypto wants to merge 1 commit intolf-edge:masterfrom
pulkitvats2007-crypto:fix-qos-buffer-full-discard
Open

fix: enforce disableBufferFullDiscard when qos>=1#4024
pulkitvats2007-crypto wants to merge 1 commit intolf-edge:masterfrom
pulkitvats2007-crypto:fix-qos-buffer-full-discard

Conversation

@pulkitvats2007-crypto
Copy link
Copy Markdown

Description

This PR fixes a critical data loss issue in the streaming pipeline where messages could be silently dropped under backpressure, even when QoS was set to At-Least-Once (1) or Exactly-Once (2).

Previously, when output channels became full, the system would drop the oldest messages by default (disableBufferFullDiscard == false). This behavior was not overridden for higher QoS levels, breaking delivery guarantees and potentially causing permanent data loss.

Fix

Updated SetQos in internal/topo/node/node.go to enforce safe backpressure handling:

func (o *defaultNode) SetQos(qos def.Qos) {
    o.qos = qos
    if qos >= def.AtLeastOnce {
        o.disableBufferFullDiscard = true
    }
}

This ensures that for QoS ≥ 1, the system blocks instead of dropping messages, preserving data integrity.

Testing

• Added TestSetQos to verify correct behavior of disableBufferFullDiscard for different QoS levels.
• All new tests pass successfully.
• Existing unrelated test failures were observed but are pre-existing.

Bug Summary

• Issue: Silent data loss due to buffer overflow handling ignoring QoS guarantees
• Root Cause: Lossy default backpressure strategy not overridden for QoS ≥ 1
• Affected Area: doBroadcast and QoS configuration logic

Impact

• Restores correct At-Least-Once and Exactly-Once semantics
• Prevents silent data loss under high throughput or slow sinks
• Ensures checkpoint integrity and accurate offset commits

Fixes #4023

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent silent data loss under congestion by ensuring that for QoS ≥ 1 (at-least-once / exactly-once) the pipeline uses backpressure (blocking) instead of dropping buffered messages.

Changes:

  • Changed RuleOption.DisableBufferFullDiscard from bool to *bool to distinguish “unset” from “explicitly false”.
  • Defaulted DisableBufferFullDiscard to true during rule option validation when Qos >= AtLeastOnce, and added a warning for inconsistent configuration.
  • Updated planner/node/test code to handle the pointer semantics (nil vs true/false).

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
internal/topo/topotest/rule_test.go Updates test options to pass *bool for DisableBufferFullDiscard.
internal/topo/planner/planner_test.go Updates planner test to use *bool for DisableBufferFullDiscard.
internal/topo/planner/planner.go Treats DisableBufferFullDiscard == nil as disabled; avoids nil deref.
internal/topo/node/node.go Initializes node discard behavior from *bool rule option.
internal/pkg/def/rule.go Changes DisableBufferFullDiscard type to *bool in RuleOption.
internal/conf/conf.go Enforces default disableBufferFullDiscard=true for QoS≥1 when unset; warns on explicit false.
Comments suppressed due to low confidence (1)

internal/topo/planner/planner.go:515

  • With DisableBufferFullDiscard now being auto-defaulted for QoS>=1 (ValidateRuleOption), this error can be triggered even when the user did not explicitly enable disableBufferFullDiscard. The message "disableBufferFullDiscard can't be enabled..." may be misleading in that case; consider making the error mention that QoS>=1 requires backpressure/no-discard and that shared streams are therefore incompatible (or otherwise clarify the root cause).
	if opt.DisableBufferFullDiscard == nil || !*opt.DisableBufferFullDiscard {
		return nil
	}
	for _, stream := range streams {
		if stream.stmt.Options.SHARED {
			return fmt.Errorf("disableBufferFullDiscard can't be enabled with shared stream %v", stream.stmt.Name)
		}

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread internal/topo/node/node.go
Comment thread internal/conf/conf.go
Comment thread internal/conf/conf.go
Comment thread internal/topo/topotest/rule_test.go Outdated
Comment thread internal/topo/planner/planner_test.go
Comment thread internal/topo/topotest/rule_test.go Outdated
Comment thread internal/topo/topotest/rule_test.go Outdated
@pulkitvats2007-crypto pulkitvats2007-crypto force-pushed the fix-qos-buffer-full-discard branch from 00937b8 to 5cb7238 Compare April 17, 2026 11:19
…tage

Signed-off-by: pulkitvats2007-crypto <pulkitvats2007@gmail.com>
@pulkitvats2007-crypto pulkitvats2007-crypto force-pushed the fix-qos-buffer-full-discard branch from 5cb7238 to 040b5ee Compare April 17, 2026 11:21
@ngjaying ngjaying changed the title fix: prevent silent data loss by enforcing backpressure for QoS >= 1 fix: enforce disableBufferFullDiscard when qos>=1 Apr 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Silent Data Loss and Broken QoS Guarantees Due To Default Buffer-Full Discard

2 participants