diff --git a/.github/workflows/commit.yml b/.github/workflows/commit.yml index d09e5d5..9d0ec77 100644 --- a/.github/workflows/commit.yml +++ b/.github/workflows/commit.yml @@ -5,7 +5,7 @@ on: [ push, workflow_dispatch ] jobs: semantic_library_workflow: name: push - uses: krogerco/Shared-CI-Workflow-Android/.github/workflows/semantic_library_workflow.yml@v1.5.0 + uses: krogerco/Shared-CI-Workflow-Android/.github/workflows/semantic_library_workflow.yml@v1 with: java_version: '24' ktlint_version: '-1' diff --git a/.husky/pre-commit b/.husky/pre-commit index 33558b4..17f7087 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -1,5 +1,4 @@ #!/bin/sh . "$(dirname "$0")/_/husky.sh" -ktlint "**/src/**/*.kt" "!**/generated/**" --android --reporter=plain?group_by_file - +./gradlew lintKotlin diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6283b3f..cafd0cf 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -21,7 +21,7 @@ kgpDagger = "2.56.1" kgpAndroidDesugarJdkLibs = "2.1.5" kgpAndroidxComposeBom = "2025.05.00" kgpDokka = "2.0.0" -kgpJdk = "24" +kgpJdk = "17" kgpJvmTarget = "11" kgpJunit4 = "4.13.2" kgpJunitBom = "5.12.2" diff --git a/package.json b/package.json index 1f203f7..9273f00 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,6 @@ "husky": "^7.0.1" }, "scripts": { - "prepare": "husky install" + "prepare": "husky" } } diff --git a/telemetry/src/main/java/com/kroger/telemetry/StandardTelemeter.kt b/telemetry/src/main/java/com/kroger/telemetry/StandardTelemeter.kt index 857146d..ecb7940 100644 --- a/telemetry/src/main/java/com/kroger/telemetry/StandardTelemeter.kt +++ b/telemetry/src/main/java/com/kroger/telemetry/StandardTelemeter.kt @@ -27,6 +27,7 @@ package com.kroger.telemetry import com.kroger.telemetry.facet.Facet import com.kroger.telemetry.facet.FacetResolver import com.kroger.telemetry.facet.Failure +import com.kroger.telemetry.facet.RelayFailure import com.kroger.telemetry.facet.ThreadData import com.kroger.telemetry.facet.UnresolvedFacet import kotlinx.coroutines.CoroutineScope @@ -55,18 +56,46 @@ internal class StandardTelemeter( relays.forEach { relay -> events .onEach { event -> + // Skip processing if this Relay is the cause of any RelayFailure event being + // processed. This prevents infinite loops when Relays fail while processing + // their own or each other's failures. + val shouldSkip = + event.facets + .filterIsInstance() + .any { relayFailure -> + relayIsSourceOfFailure(relayFailure, relay) + } + + if (shouldSkip) { + return@onEach + } + Result .runCatching { relay.process(event) - }.onFailure { - val message = "An error was caught during Relay processing. It was $it" - val facet = Failure(message = message, throwable = it) - // Avoid loops - if (event.facets.contains(facet)) return@onEach + }.onFailure { throwable -> + val message = + "An error was caught during Relay processing. It was $throwable" + + // If we were already processing a RelayFailure, note it as the cause. + val causeRelayFailure = + event.facets.filterIsInstance().firstOrNull() + + val relayFailureFacet = + RelayFailure( + sourceRelay = relay, + message = message, + throwable = throwable, + cause = causeRelayFailure, + ) + + val failureFacet = Failure(message = message, throwable = throwable) + val failureEvent = object : Event { override val description: String = message - override val facets: List = listOf(facet) + override val facets: List = + listOf(relayFailureFacet, failureFacet) } record(failureEvent) } @@ -74,6 +103,20 @@ internal class StandardTelemeter( } } + /** + * Checks if a relay appears anywhere in the cause chain of a [RelayFailure] + */ + private fun relayIsSourceOfFailure( + relayFailure: RelayFailure, + relay: Relay, + ): Boolean = + (relayFailure.sourceRelay === relay) || + ( + relayFailure.cause?.let { + relayIsSourceOfFailure(it, relay) + } ?: false + ) + override fun record( event: Event, withFacets: List?, diff --git a/telemetry/src/main/java/com/kroger/telemetry/facet/RelayFailure.kt b/telemetry/src/main/java/com/kroger/telemetry/facet/RelayFailure.kt new file mode 100644 index 0000000..2583c61 --- /dev/null +++ b/telemetry/src/main/java/com/kroger/telemetry/facet/RelayFailure.kt @@ -0,0 +1,54 @@ +/** + * MIT License + * + * Copyright (c) 2021 The Kroger Co. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.kroger.telemetry.facet + +import com.kroger.telemetry.Relay + +/** + * A facet representing a failure during a Relay's processing of events. + * + * A RelayFailure contains a reference to the relay that failed, allowing the Telemeter to prevent + * infinite loops by preventing that specific relay from processing its own failure events. A + * RelayFailure may include another RelayFailure as a [cause] + * + * The [cause] property enables progressive relay disqualification: when a relay fails + * while processing a RelayFailure, the new RelayFailure references the original as its cause. + * This creates a cause chain that prevents infinite loops when multiple relays fail + * while processing each other's failures. + * + * This facet is created automatically by the Telemeter. Consumers should not add this facet to their + * own events. + * + * @param sourceRelay the Relay where the failure occurred + * @param message a message associated with the failure + * @param throwable a [Throwable] associated with the failure + * @param cause an optional RelayFailure that was the cause of this failure + */ +public data class RelayFailure( + val sourceRelay: Relay, + val message: String? = null, + val throwable: Throwable? = null, + val cause: RelayFailure? = null, +) : Facet diff --git a/telemetry/src/test/java/com/kroger/telemetry/TelemeterTest.kt b/telemetry/src/test/java/com/kroger/telemetry/TelemeterTest.kt index d503afa..885cf99 100644 --- a/telemetry/src/test/java/com/kroger/telemetry/TelemeterTest.kt +++ b/telemetry/src/test/java/com/kroger/telemetry/TelemeterTest.kt @@ -28,6 +28,7 @@ import com.kroger.telemetry.facet.Facet import com.kroger.telemetry.facet.FacetResolver import com.kroger.telemetry.facet.Failure import com.kroger.telemetry.facet.Prefix +import com.kroger.telemetry.facet.RelayFailure import com.kroger.telemetry.facet.ThreadData import com.kroger.telemetry.facet.UnresolvedFacet import com.kroger.telemetry.util.FakeEvent @@ -39,6 +40,9 @@ import kotlinx.coroutines.test.UnconfinedTestDispatcher import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertSame import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import kotlin.time.Duration.Companion.milliseconds @@ -488,16 +492,9 @@ internal class TelemeterTest { fun `GIVEN telemeter with relay WHEN relay throws THEN telemeter catches and records error without looping`() = runTest { val recorded = mutableListOf() - val goodRelay = - FakeRelay { - recorded.add(it) - } + val goodRelay = FakeRelay { recorded.add(it) } val exception = NullPointerException() - val badRelay = - FakeRelay { - // This would loop if the implementation didn't prevent it - throw exception - } + val badRelay = FakeRelay { throw exception } val telemeter = Telemeter.build( @@ -507,9 +504,260 @@ internal class TelemeterTest { telemeter.record(FakeEvent()) testScheduler.runCurrent() - val failureEvents = recorded.filter { it.facets.any { facet -> facet is Failure } } - val failureFacet = failureEvents[0].facets[0] as Failure - assertEquals(exception, failureFacet.throwable) + + // Should have 2 events: original + failure + assertEquals(2, recorded.size) + + // Failure event should have both facets + val failureEvent = recorded[1] + assertTrue(failureEvent.facets.any { it is Failure }) + assertTrue(failureEvent.facets.any { it is RelayFailure }) + + // RelayFailure should reference the bad relay + val relayFailure = failureEvent.facets.filterIsInstance().first() + assertSame(badRelay, relayFailure.sourceRelay) + assertEquals(exception, relayFailure.throwable) + } + + @Test + fun `GIVEN relay that fails on Failure facets WHEN it processes its own failure THEN loop is prevented`() = + runTest { + val recorded = mutableListOf() + val recordingRelay = FakeRelay { recorded.add(it) } + + val brokenRelay = + FakeRelay { event -> + // Throw when processing Failure facets + if (event.facets.any { it is Failure }) { + throw RuntimeException("Failed to process failure event") + } + } + + val telemeter = + Telemeter.build( + relays = listOf(brokenRelay, recordingRelay), + flowConfig = Telemeter.defaultTelemetryFlowConfig.copy(scope = backgroundScope), + ) + + telemeter.record(FakeEvent(facets = listOf(Failure("A failure for testing")))) + testScheduler.runCurrent() + + // Should have 2 events: original + one failure (loop prevented) + // Without loop prevention, there would be 3+ events (original + failure + failure of failure + ...) + assertEquals(2, recorded.size) + + // First is original event + assertTrue(recorded[0].facets.any { it is Failure }) + + // Second is failure event with RelayFailure pointing to broken relay + val failureEvent = recorded[1] + assertTrue(failureEvent.facets.any { it is Failure }) + + val relayFailure = failureEvent.facets.filterIsInstance().first() + assertSame(brokenRelay, relayFailure.sourceRelay) + } + + @Test + fun `GIVEN multiple relays WHEN one fails THEN other relays can process the RelayFailure event`() = + runTest { + val recorded = mutableListOf() + val relayA = FakeRelay { throw RuntimeException("Relay A failed") } + val relayB = FakeRelay { recorded.add(it) } + + val telemeter = + Telemeter.build( + relays = listOf(relayA, relayB), + flowConfig = Telemeter.defaultTelemetryFlowConfig.copy(scope = backgroundScope), + ) + + telemeter.record(FakeEvent()) + testScheduler.runCurrent() + + // RelayB should see both events: original + failure event from RelayA + assertEquals(2, recorded.size) + + // The failure event should have RelayFailure from relayA + val failureEvent = recorded[1] + val relayFailure = failureEvent.facets.filterIsInstance().first() + assertSame(relayA, relayFailure.sourceRelay) + + // RelayB was able to process it (not skipped) because it's not the source + } + + @Test + fun `GIVEN relay failure WHEN failure event created THEN includes both RelayFailure and Failure facets`() = + runTest { + val recorded = mutableListOf() + val recordingRelay = FakeRelay { recorded.add(it) } + val exception = IllegalStateException("Test error") + val failingRelay = FakeRelay { throw exception } + + val telemeter = + Telemeter.build( + relays = listOf(failingRelay, recordingRelay), + flowConfig = Telemeter.defaultTelemetryFlowConfig.copy(scope = backgroundScope), + ) + + telemeter.record(FakeEvent()) + testScheduler.runCurrent() + + val failureEvent = recorded[1] + + // Should have both facet types for backward compatibility + val relayFailure = failureEvent.facets.filterIsInstance().firstOrNull() + val failure = failureEvent.facets.filterIsInstance().firstOrNull() + + assertNotNull(relayFailure) + assertNotNull(failure) + + // Both should have the same message and throwable + assertEquals(relayFailure?.throwable, failure?.throwable) + assertEquals(exception, failure?.throwable) + } + + @Test + fun `GIVEN multiple relays that fail on RelayFailures WHEN they process each other's failures THEN progressive disqualification prevents infinite loop`() = + runTest { + val recorded = mutableListOf() + val recordingRelay = FakeRelay { recorded.add(it) } + + var relayAProcessCount = 0 + var relayBProcessCount = 0 + + // RelayA fails on first event, then fails on ANY RelayFailure + val relayA = + FakeRelay { event -> + relayAProcessCount++ + if (relayAProcessCount == 1 || event.facets.any { it is RelayFailure }) { + throw RuntimeException("RelayA failed") + } + } + + // RelayB fails when processing ANY RelayFailure + val relayB = + FakeRelay { event -> + relayBProcessCount++ + if (event.facets.any { it is RelayFailure }) { + throw RuntimeException("RelayB failed on RelayFailure") + } + } + + val telemeter = + Telemeter.build( + relays = listOf(relayA, relayB, recordingRelay), + flowConfig = Telemeter.defaultTelemetryFlowConfig.copy(scope = backgroundScope), + ) + + // Start with a normal event + telemeter.record(FakeEvent()) + testScheduler.runCurrent() + + // Without progressive disqualification, this would create an infinite loop: + // 1. Normal event → relayA fails → RelayFailure(A, cause=null) + // 2. RelayFailure(A) → relayB fails → RelayFailure(B, cause=A) + // 3. RelayFailure(B) → relayA fails → RelayFailure(A, cause=B) + // 4. RelayFailure(A) → relayB fails → RelayFailure(B, cause=A) → LOOP! + + // With progressive disqualification, we should have exactly 3 events: + // 1. Original event + // 2. RelayFailure from relayA (cause=null) + // 3. RelayFailure from relayB (cause=relayA's failure) - then relayA is disqualified + assertEquals(3, recorded.size) + + // First event is the original + assertFalse(recorded[0].facets.any { it is RelayFailure }) + + // Second event is relayA's failure with no cause + val relayAFailure = recorded[1].facets.filterIsInstance().first() + assertSame(relayA, relayAFailure.sourceRelay) + assertEquals(null, relayAFailure.cause) + + // Third event is relayB's failure with relayA as the cause + val relayBFailure = recorded[2].facets.filterIsInstance().first() + assertSame(relayB, relayBFailure.sourceRelay) + assertSame(relayAFailure, relayBFailure.cause) + + // No fourth event should exist (relayA is disqualified from processing relayB's failure) + } + + @Test + fun `GIVEN relay failure with cause chain WHEN relay appears in chain THEN relay is disqualified from processing`() = + runTest { + val recorded = mutableListOf() + val recordingRelay = FakeRelay { recorded.add(it) } + + val relayA = FakeRelay { } + val relayB = FakeRelay { } + + val telemeter = + Telemeter.build( + relays = listOf(relayA, relayB, recordingRelay), + flowConfig = Telemeter.defaultTelemetryFlowConfig.copy(scope = backgroundScope), + ) + + // Create a cause chain manually: relayC → relayB → relayA + val relayAFailure = + RelayFailure( + sourceRelay = relayA, + message = "RelayA failed", + throwable = RuntimeException("A"), + cause = null, + ) + + val relayBFailure = + RelayFailure( + sourceRelay = relayB, + message = "RelayB failed", + throwable = RuntimeException("B"), + cause = relayAFailure, + ) + + // Record an event with relayB's failure (which has relayA in its cause chain) + telemeter.record( + FakeEvent( + facets = listOf(relayBFailure, Failure("test")), + ), + ) + testScheduler.runCurrent() + + // recordingRelay should see the event (it's not in the chain) + assertEquals(1, recorded.size) + + // But if we check which relays would be disqualified: + // - relayA should be disqualified (appears in cause chain) + // - relayB should be disqualified (is the source) + // Only recordingRelay processes it + + val processedEvent = recorded[0] + val relayFailure = processedEvent.facets.filterIsInstance().first() + + // Verify the cause chain is intact + assertSame(relayB, relayFailure.sourceRelay) + assertSame(relayAFailure, relayFailure.cause) + } + + @Test + fun `GIVEN relay failure without cause WHEN failure recorded THEN cause is null`() = + runTest { + val recorded = mutableListOf() + val recordingRelay = FakeRelay { recorded.add(it) } + val failingRelay = FakeRelay { throw RuntimeException("Failed") } + + val telemeter = + Telemeter.build( + relays = listOf(failingRelay, recordingRelay), + flowConfig = Telemeter.defaultTelemetryFlowConfig.copy(scope = backgroundScope), + ) + + // Record a normal event (no RelayFailure in it) + telemeter.record(FakeEvent()) + testScheduler.runCurrent() + + val failureEvent = recorded[1] + val relayFailure = failureEvent.facets.filterIsInstance().first() + + // Since the original event had no RelayFailure, the cause should be null + assertEquals(null, relayFailure.cause) } @Test