Otel logs source http service#6250
Otel logs source http service#6250KarstenSchnitter merged 23 commits intoopensearch-project:mainfrom
Conversation
KarstenSchnitter
left a comment
There was a problem hiding this comment.
Just a start of a review. Can you check the tests for unused imports. This is failing the CI builds. I need more time for the review. Please expect more comments to come.
| // no path provided. Will be set by config. | ||
| @Post("") | ||
| @Consumes(value = "application/json") | ||
| public ExportTraceServiceResponse exportLog(ExportLogsServiceRequest request) { |
There was a problem hiding this comment.
Why is this returning ExportTraceServiceResponse? Shouldn't this return a ExportLogServiceResponse?
There was a problem hiding this comment.
Right. I copied the Http-Service from the Trace-Source. This is the remnant from it. Fix is pushed.
1dd14dd to
965b93d
Compare
KarstenSchnitter
left a comment
There was a problem hiding this comment.
I already have some comments. I still need to review the tests.
| try { | ||
| logs = oTelProtoDecoder.parseExportLogsServiceRequest(request, Instant.now()); | ||
| } catch (Exception e) { | ||
| LOG.error("Failed to parse the request {} due to:", request, e); |
There was a problem hiding this comment.
Please use warning level and sensitive request marker.
| LOG.error("Failed to parse the request {} due to:", request, e); | |
| LOG.warn(DataPrepperMarkers.SENSITIVE, "Failed to parse request with error '{}'. Request body: {}.", e.getMessage(), request); |
There was a problem hiding this comment.
done.
Is this a general rule? Use WARN-level with SENSITIVE-Marker for errors?
Is there any documentation on how to logging should be done in DataPrepper? Could not find any in the README
| throw new BadRequestException(e.getMessage(), e); | ||
| } | ||
|
|
||
| final List<Record<Object>> records = logs.stream().map(log -> new Record<Object>(log)).collect(Collectors.toList()); |
There was a problem hiding this comment.
These records are only required in the else block of the following condition. Please move this operation to this block.
...-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java
Show resolved
Hide resolved
|
|
||
| if (oTelLogsSourceConfig.hasHealthCheck()) { | ||
| LOG.info("HTTP source health check is enabled"); | ||
| serverBuilder.service("/health", HealthCheckService.builder().longPolling(0).build()); |
There was a problem hiding this comment.
Please extract the String to a constant.
| serverBuilder.maxRequestLength(oTelLogsSourceConfig.getMaxRequestLength().getBytes()); | ||
| } | ||
| final int threadCount = oTelLogsSourceConfig.getThreadCount(); | ||
| serverBuilder.blockingTaskExecutor(new ScheduledThreadPoolExecutor(threadCount), true); |
There was a problem hiding this comment.
Please use the same logic as in CreateServer to configure the task executor.
e05da93 to
656475c
Compare
There was a problem hiding this comment.
This looks good so far. I would like to see an end-to-end test with OTLP/HTTP if possible. Can you add it? Please promote this PR to ready for review.
@TomasLongo please also fix the unused imports in the tests that lead to the failing Gradle builds.
|
Yes. I'll add an E2E Test. |
f6c5b40 to
5f2aeb3
Compare
KarstenSchnitter
left a comment
There was a problem hiding this comment.
Thanks for getting this PR ready. I like the end-to-end test. Can you please add another test case using protobuf message body and not JSON?
| .authority(String.format("127.0.0.1:%d", port)) | ||
| .method(HttpMethod.POST) | ||
| .path("/otel-logs-pipeline/logs") | ||
| .contentType(MediaType.JSON_UTF_8) |
There was a problem hiding this comment.
This sends a JSON message. What about protobuf?
| ) | ||
| ).build(); | ||
|
|
||
| return HttpData.copyOf(JsonFormat.printer().print(exportLogsServiceRequest).getBytes()); |
There was a problem hiding this comment.
This creates just the JSON representation. I think we also need the protobuf version.
| private final ApacheLogFaker apacheLogFaker = new ApacheLogFaker(); | ||
|
|
||
| @Test | ||
| public void testOtelLogsSourcePipelineEndToEnd() throws InvalidProtocolBufferException { |
There was a problem hiding this comment.
Can you please add another test case for HTTP/protobuf format?
|
please fix the unused imports warning. |
|
@TomasLongo I think I am missing something here. I can see a test using HTTP/JSON and one for GRPC protocol. But I am missing one using HTTP/protobuf. I would expect a different content type at https://github.com/sternadsoftware/data-prepper-second/blob/e9ebc47cba8877c99d4ec5d89cdd115ba4af7d59/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndOtelLogsSourceTest.java#L110 accompanied with a different payload. The payload can be generated calling Can you add this as a second test case? |
|
Nope. All good. I simply was not getting it. I've added a unit test for enabled unframed requests. Will add an E2E-Test today. |
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
Signed-off-by: Tomas Longo <tlongo@sternad.de>
148ee32 to
7269fe9
Compare
KarstenSchnitter
left a comment
There was a problem hiding this comment.
Please fix the license header issues. I think this is a rather new check. Also have a look at the media type in the integration test.
| .authority(String.format("127.0.0.1:%d", SOURCE_PORT)) | ||
| .method(HttpMethod.POST) | ||
| .path(path) | ||
| .contentType(MediaType.JSON_UTF_8) |
There was a problem hiding this comment.
Shouldn't this media type depend on the payload? There is also protobuf data being ingested.
Signed-off-by: Tomas Longo <tlongo@sternad.de>
...egrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndOtelLogsSourceTest.java
Show resolved
Hide resolved
KarstenSchnitter
left a comment
There was a problem hiding this comment.
I tried to address the license header checks with comments. Please review and merge. You can find the explanation at https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers.
...rc/main/java/org/opensearch/dataprepper/plugins/source/otellogs/http/ArmeriaHttpService.java
Show resolved
Hide resolved
.../main/java/org/opensearch/dataprepper/plugins/source/otellogs/http/HttpExceptionHandler.java
Show resolved
Hide resolved
...rg/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceGrpcServiceRequestsTest.java
Show resolved
Hide resolved
...src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java
Show resolved
Hide resolved
...va/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceUnframedRequestTest.java
Show resolved
Hide resolved
...tionTest/java/org/opensearch/dataprepper/integration/log/EndToEndOtelLogsSourceGrpcTest.java
Show resolved
Hide resolved
...egrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndOtelLogsSourceTest.java
Show resolved
Hide resolved
e2e-test/log/src/integrationTest/resources/otel-logs-source-pipeline.yml
Show resolved
Hide resolved
e2e-test/log/src/integrationTest/resources/otel-logs-source-pipeline.yml
Show resolved
Hide resolved
e2e-test/log/src/integrationTest/resources/otel-logs-source-unframed-requests-pipeline.yml
Show resolved
Hide resolved
Signed-off-by: Tomas Longo <tlongo@sternad.de>
KarstenSchnitter
left a comment
There was a problem hiding this comment.
Thanks for providing the PR and implementing all the changes for my comments. @dlvenable and @kkondaka this is good to go from our side. Do you still have change requests or can we merge this PR?
KarstenSchnitter
left a comment
There was a problem hiding this comment.
@TomasLongo #6494 shows a missing configuration in the last change. This needs to be applied here, too.
|
|
||
| public ArmeriaHttpService(Buffer<Record<Object>> buffer, final PluginMetrics pluginMetrics, final int bufferWriteTimeoutInMillis) { | ||
| this.buffer = buffer; | ||
| this.oTelProtoDecoder = new OTelProtoOpensearchCodec.OTelProtoDecoder(); |
There was a problem hiding this comment.
This needs to respect the output format. See #6494 .
|
|
||
| public ArmeriaHttpService(Buffer<Record<Object>> buffer, final PluginMetrics pluginMetrics, final int bufferWriteTimeoutInMillis) { | ||
| this.buffer = buffer; | ||
| this.oTelProtoDecoder = new OTelProtoOpensearchCodec.OTelProtoDecoder(); |
There was a problem hiding this comment.
This needs to respect the output format. See #6494
Signed-off-by: Tomas Longo <tlongo@sternad.de>
KarstenSchnitter
left a comment
There was a problem hiding this comment.
I think we should follow the idea of @dlvenable to have the logic which decoder should be used centralized in the OTelLogsSource.
| final OTelOutputFormat otelOutputFormat | ||
| ) { | ||
| this.buffer = buffer; | ||
| this.oTelProtoDecoder = (otelOutputFormat == OTelOutputFormat.OPENSEARCH) ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder(); |
There was a problem hiding this comment.
@dlvenable suggested to inject the oTelProtoDecoder directly and have just a single point, where the decision is taken in #6494 (comment) for the OtelTraceSource. I think this applies here as well. Let the OTelLogsSource provide the decoder directly and take the decision in that class.
| .exceptionHandler(createGrpExceptionHandler(oTelLogsSourceConfig)); | ||
| final OTelLogsGrpcService oTelLogsGrpcService = new OTelLogsGrpcService( | ||
| (int) (oTelLogsSourceConfig.getRequestTimeoutInMillis() * 0.8), | ||
| oTelLogsSourceConfig.getOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder(), |
There was a problem hiding this comment.
Extract this logic into a private method and use the same method in line 183.
| LOG.info("Configuring HTTP service under {} ", path); | ||
|
|
||
|
|
||
| final ArmeriaHttpService armeriaHttpService = new ArmeriaHttpService(buffer, pluginMetrics, 100, oTelLogsSourceConfig.getOutputFormat()); |
There was a problem hiding this comment.
create the decoder directly as in line 213 and pass it to the ArmeriaHttpService.
|
@TomasLongo can you make the suggested change for the ProtoDecoder. Afterwards, I really think, this is good to go. |
|
@KarstenSchnitter Sorry for the delay. Your comments were burried in my mail box. I will implement your suggestion. |
Signed-off-by: Tomas Longo <tlongo@sternad.de>
KarstenSchnitter
left a comment
There was a problem hiding this comment.
This looks good to me. Thanks for the PR.
|
@dlvenable any reservations or can we proceed and merge this PR? |
8d85609
into
opensearch-project:main
Description
This PR adds an HTTP-Service to the OtelLogsSource, which runs alongside the gRPC-Service under a separate endpoint.
Both Services share the same underlying Armeria-Server, running under the same port.
Breakdown
The PR is kind of heavy. The OtelLogsSource had to be restructured resulting in the refactoring of the present tests.
A quick breakdown of the changes will certainly help with the review
One Server, two services
OtelLogsSourceis now inhabitetd by two services running under different paths behind the same port.The path of the services can be configured by the two fields
pathandhttp_pathpathis the already present field for configuring path of the grpc service and keeps its purposehttp_pathwill set the path for the new http serviceUnframed Requests
Since unframed requests served the purpose of sending plain
httprequests to the grpc server, this functionality has been dropped. However, for backwardscompatibilty of the config, it is still present in the config data structure.CreateServer
The class
CreateServerdeals with creating both, http and gRPC servers. However, in its current form, the intended use is to create a server with a single service. Two quick attempts to rewrite the Class for the purpose if this PR yielded no results.So the final version of the server creation loosely resembles the functionality found in
CreateServer.Refactoring of tests
Many tests were making sure that the former gRPC server, with unframed requests enabled, would handle http requests correctly. I have dropped this tests completely, replacing them with a separate test class that handles tests regarding the http functionality. I added a few improvements along the way like
Decisions made and additional considerations
Getting completely rid of unframed requests
enable_unframed_requestshas no effect any more, since its purpose has been replaced with the new http serviceConfiguration
A new field,
http_pathhas been added to specify the path of the http service. However, one could extend the config toCreateServer
I think that the intention of
CreateServer, to provide a unified mechanism to bootstrap armeria servers, is absolutely the right thing todo. However, in its current form, it only supports creating servers with a single service.It's worth thinking about how to refactor
CreateServerto streamline server creation for future plugins. The challenge I encountered was how to best model the fact, that different services might need different server capabilities/configurations.Figuring this out, would have gone well beyond the scope of this PR.
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.