Skip to content

[BUG] The Default "dlq_pipeline" no longer receives failed events from OpenSearch sink #6643

@tillwulfram

Description

@tillwulfram

Describe the bug
I may be misunderstanding the intended design here, so please correct me if so and close the issue.
Before version 2.14.0 / 2.14.1, we were using the default dlq_pipeline feature with version 2.13.1 as separate pipeline where all failed events like opensearch mapping errors, retries etc. automatically went to without configure anything. For example

dlq_pipeline:
  sink:
    - stdout:

I saw the implementation from #6349 which is a great feature which we will use in the near future, thanks for that!
For my understanding the implementation does currently not evaluate between failed and successful events which would be a great addition like an include or maybe with conditional expression (which would be even better I guess):

....
forward_to:
  pipelines:
    - dlq_pipeline
  with_data:
    source: "test-data"
  with_metadata:
    retry_count: 3
  # additonally
  include_events: all / failed / successful 
  # or for more flexibility based on an expression which would fit the current processor logic
  forward_when: "/_failure_metadata != null"

Because currently it will forward every event so I would need an additional drop-condition in the forwarding dlq pipeline like this which is additional overhead / performance drawbacks:

....
sink:
  - opensearch:
      hosts: ["https://localhost:9200"]
      insecure: true
      index_type: custom
      index: fresh-new-index
      username: admin
      password: admin
      max_retries: 10
      bulk_size: 5
      forward_to:
        pipelines:
          - dlq_pipeline

dlq_pipeline:
  processor:
    - drop_events:
        drop_when: "/_failure_metadata == null"
  sink:
    - stdout:

Currently In OpenSearchSink.java the useEventInBulkOperation is only set once at construction:

this.useEventInBulkOperation = (getFailurePipeline() != null || sinkContext.getForwardToPipelines().size() > 0);

So getFailurePipeline() is always null if no forward_to and therefore no dlq objects are forwarded bc useEventInBulkOperation = false, meaning BulkOperationWrapper is built without the Event object and skips silenty

To Reproduce
use version 2.14+ for current "bug" and for expected behavior 2.13, set the default dlq_pipeline with stdout sink, any source, any processor and send intended mapping error / timeouts etc. to the opensearch sink and see if beside the dataprepper-log from BulkRetryStrategy any dlq objects exists

Expected behavior
Use conditional forwards to in the pipeline (that would be in either way a great feature in addition) and for the pre-existing way maybe always set useEventInBulkOperation = true (if no other dependencies) or evaluate it per call

Screenshots
If applicable, add screenshots to help explain your problem.

Environment (please complete the following information):

  • docker, k8
  • Version 2.14.0, 2.14.1

Additional context
Add any other context about the problem here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    Unplanned

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions