Skip to content

Otel logs source http service#6250

Merged
KarstenSchnitter merged 23 commits intoopensearch-project:mainfrom
sternadsoftware:otel-logs-source-http-service
Feb 20, 2026
Merged

Otel logs source http service#6250
KarstenSchnitter merged 23 commits intoopensearch-project:mainfrom
sternadsoftware:otel-logs-source-http-service

Conversation

@TomasLongo
Copy link
Contributor

Description

This PR adds an HTTP-Service to the OtelLogsSource, which runs alongside the gRPC-Service under a separate endpoint.

Both Services share the same underlying Armeria-Server, running under the same port.

Breakdown

The PR is kind of heavy. The OtelLogsSource had to be restructured resulting in the refactoring of the present tests.

A quick breakdown of the changes will certainly help with the review

One Server, two services

OtelLogsSource is now inhabitetd by two services running under different paths behind the same port.
The path of the services can be configured by the two fields path and http_path

  • path is the already present field for configuring path of the grpc service and keeps its purpose
  • http_path will set the path for the new http service

Unframed Requests

Since unframed requests served the purpose of sending plain http requests to the grpc server, this functionality has been dropped. However, for backwardscompatibilty of the config, it is still present in the config data structure.

CreateServer

The class CreateServer deals with creating both, http and gRPC servers. However, in its current form, the intended use is to create a server with a single service. Two quick attempts to rewrite the Class for the purpose if this PR yielded no results.

So the final version of the server creation loosely resembles the functionality found in CreateServer.

Refactoring of tests

Many tests were making sure that the former gRPC server, with unframed requests enabled, would handle http requests correctly. I have dropped this tests completely, replacing them with a separate test class that handles tests regarding the http functionality. I added a few improvements along the way like

  • Getting rid of unnecessary mocks
  • Introducing parameterized tests
  • Using unified test data and fixtures

Decisions made and additional considerations

Getting completely rid of unframed requests

enable_unframed_requests has no effect any more, since its purpose has been replaced with the new http service

Configuration

A new field, http_path has been added to specify the path of the http service. However, one could extend the config to

  • enable the client to specify only on of both service by omitting one of both paths. Currently, both services will be instantiated.

CreateServer

I think that the intention of CreateServer, to provide a unified mechanism to bootstrap armeria servers, is absolutely the right thing todo. However, in its current form, it only supports creating servers with a single service.

It's worth thinking about how to refactor CreateServer to streamline server creation for future plugins. The challenge I encountered was how to best model the fact, that different services might need different server capabilities/configurations.

Figuring this out, would have gone well beyond the scope of this PR.

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@TomasLongo
Copy link
Contributor Author

@KarstenSchnitter

Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a start of a review. Can you check the tests for unused imports. This is failing the CI builds. I need more time for the review. Please expect more comments to come.

// no path provided. Will be set by config.
@Post("")
@Consumes(value = "application/json")
public ExportTraceServiceResponse exportLog(ExportLogsServiceRequest request) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this returning ExportTraceServiceResponse? Shouldn't this return a ExportLogServiceResponse?

Copy link
Contributor Author

@TomasLongo TomasLongo Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I copied the Http-Service from the Trace-Source. This is the remnant from it. Fix is pushed.

@TomasLongo TomasLongo force-pushed the otel-logs-source-http-service branch 3 times, most recently from 1dd14dd to 965b93d Compare November 14, 2025 07:18
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already have some comments. I still need to review the tests.

try {
logs = oTelProtoDecoder.parseExportLogsServiceRequest(request, Instant.now());
} catch (Exception e) {
LOG.error("Failed to parse the request {} due to:", request, e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use warning level and sensitive request marker.

Suggested change
LOG.error("Failed to parse the request {} due to:", request, e);
LOG.warn(DataPrepperMarkers.SENSITIVE, "Failed to parse request with error '{}'. Request body: {}.", e.getMessage(), request);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Is this a general rule? Use WARN-level with SENSITIVE-Marker for errors?

Is there any documentation on how to logging should be done in DataPrepper? Could not find any in the README

throw new BadRequestException(e.getMessage(), e);
}

final List<Record<Object>> records = logs.stream().map(log -> new Record<Object>(log)).collect(Collectors.toList());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These records are only required in the else block of the following condition. Please move this operation to this block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


if (oTelLogsSourceConfig.hasHealthCheck()) {
LOG.info("HTTP source health check is enabled");
serverBuilder.service("/health", HealthCheckService.builder().longPolling(0).build());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract the String to a constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

serverBuilder.maxRequestLength(oTelLogsSourceConfig.getMaxRequestLength().getBytes());
}
final int threadCount = oTelLogsSourceConfig.getThreadCount();
serverBuilder.blockingTaskExecutor(new ScheduledThreadPoolExecutor(threadCount), true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the same logic as in CreateServer to configure the task executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@TomasLongo TomasLongo force-pushed the otel-logs-source-http-service branch from e05da93 to 656475c Compare November 21, 2025 08:27
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good so far. I would like to see an end-to-end test with OTLP/HTTP if possible. Can you add it? Please promote this PR to ready for review.

@TomasLongo please also fix the unused imports in the tests that lead to the failing Gradle builds.

@TomasLongo
Copy link
Contributor Author

Yes. I'll add an E2E Test.

@TomasLongo TomasLongo force-pushed the otel-logs-source-http-service branch from f6c5b40 to 5f2aeb3 Compare November 26, 2025 07:12
@TomasLongo TomasLongo marked this pull request as ready for review November 30, 2025 21:52
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for getting this PR ready. I like the end-to-end test. Can you please add another test case using protobuf message body and not JSON?

.authority(String.format("127.0.0.1:%d", port))
.method(HttpMethod.POST)
.path("/otel-logs-pipeline/logs")
.contentType(MediaType.JSON_UTF_8)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sends a JSON message. What about protobuf?

)
).build();

return HttpData.copyOf(JsonFormat.printer().print(exportLogsServiceRequest).getBytes());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates just the JSON representation. I think we also need the protobuf version.

private final ApacheLogFaker apacheLogFaker = new ApacheLogFaker();

@Test
public void testOtelLogsSourcePipelineEndToEnd() throws InvalidProtocolBufferException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add another test case for HTTP/protobuf format?

@KarstenSchnitter
Copy link
Collaborator

please fix the unused imports warning.

@KarstenSchnitter
Copy link
Collaborator

@TomasLongo I think I am missing something here. I can see a test using HTTP/JSON and one for GRPC protocol. But I am missing one using HTTP/protobuf.

I would expect a different content type at https://github.com/sternadsoftware/data-prepper-second/blob/e9ebc47cba8877c99d4ec5d89cdd115ba4af7d59/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndOtelLogsSourceTest.java#L110 accompanied with a different payload. The payload can be generated calling exportLogsServiceRequest.toByteArray() at https://github.com/sternadsoftware/data-prepper-second/blob/e9ebc47cba8877c99d4ec5d89cdd115ba4af7d59/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndOtelLogsSourceTest.java#L78.

Can you add this as a second test case?

@TomasLongo
Copy link
Contributor Author

Nope. All good. I simply was not getting it.

I've added a unit test for enabled unframed requests. Will add an E2E-Test today.

Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
@TomasLongo TomasLongo force-pushed the otel-logs-source-http-service branch from 148ee32 to 7269fe9 Compare January 7, 2026 10:25
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the license header issues. I think this is a rather new check. Also have a look at the media type in the integration test.

.authority(String.format("127.0.0.1:%d", SOURCE_PORT))
.method(HttpMethod.POST)
.path(path)
.contentType(MediaType.JSON_UTF_8)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this media type depend on the payload? There is also protobuf data being ingested.

Signed-off-by: Tomas Longo <tlongo@sternad.de>
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to address the license header checks with comments. Please review and merge. You can find the explanation at https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers.

Signed-off-by: Tomas Longo <tlongo@sternad.de>
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing the PR and implementing all the changes for my comments. @dlvenable and @kkondaka this is good to go from our side. Do you still have change requests or can we merge this PR?

Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomasLongo #6494 shows a missing configuration in the last change. This needs to be applied here, too.


public ArmeriaHttpService(Buffer<Record<Object>> buffer, final PluginMetrics pluginMetrics, final int bufferWriteTimeoutInMillis) {
this.buffer = buffer;
this.oTelProtoDecoder = new OTelProtoOpensearchCodec.OTelProtoDecoder();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to respect the output format. See #6494 .


public ArmeriaHttpService(Buffer<Record<Object>> buffer, final PluginMetrics pluginMetrics, final int bufferWriteTimeoutInMillis) {
this.buffer = buffer;
this.oTelProtoDecoder = new OTelProtoOpensearchCodec.OTelProtoDecoder();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to respect the output format. See #6494

Signed-off-by: Tomas Longo <tlongo@sternad.de>
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should follow the idea of @dlvenable to have the logic which decoder should be used centralized in the OTelLogsSource.

final OTelOutputFormat otelOutputFormat
) {
this.buffer = buffer;
this.oTelProtoDecoder = (otelOutputFormat == OTelOutputFormat.OPENSEARCH) ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable suggested to inject the oTelProtoDecoder directly and have just a single point, where the decision is taken in #6494 (comment) for the OtelTraceSource. I think this applies here as well. Let the OTelLogsSource provide the decoder directly and take the decision in that class.

.exceptionHandler(createGrpExceptionHandler(oTelLogsSourceConfig));
final OTelLogsGrpcService oTelLogsGrpcService = new OTelLogsGrpcService(
(int) (oTelLogsSourceConfig.getRequestTimeoutInMillis() * 0.8),
oTelLogsSourceConfig.getOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract this logic into a private method and use the same method in line 183.

LOG.info("Configuring HTTP service under {} ", path);


final ArmeriaHttpService armeriaHttpService = new ArmeriaHttpService(buffer, pluginMetrics, 100, oTelLogsSourceConfig.getOutputFormat());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create the decoder directly as in line 213 and pass it to the ArmeriaHttpService.

@KarstenSchnitter
Copy link
Collaborator

@TomasLongo can you make the suggested change for the ProtoDecoder. Afterwards, I really think, this is good to go.

@TomasLongo
Copy link
Contributor Author

@KarstenSchnitter Sorry for the delay. Your comments were burried in my mail box. I will implement your suggestion.

Signed-off-by: Tomas Longo <tlongo@sternad.de>
Copy link
Collaborator

@KarstenSchnitter KarstenSchnitter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me. Thanks for the PR.

@KarstenSchnitter
Copy link
Collaborator

@dlvenable any reservations or can we proceed and merge this PR?

@KarstenSchnitter KarstenSchnitter merged commit 8d85609 into opensearch-project:main Feb 20, 2026
71 of 72 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants