Skip to content

Flaky kafka-newest test on 3.14 #2630

@xrmx

Description

@xrmx
____________________ test_kafka_consume_ongoing_transaction ____________________

instrument = None
elasticapm_client = <tests.fixtures.TempStoreClient object at 0x7f408d41d9d0>
producer = <kafka.producer.kafka.KafkaProducer object at 0x7f408d41ecf0>
consumer = <kafka.consumer.group.KafkaConsumer object at 0x7f408c2a1ba0>
topics = ['test', 'foo', 'bar']

    def test_kafka_consume_ongoing_transaction(instrument, elasticapm_client, producer, consumer, topics):
        def delayed_send():
            time.sleep(0.2)
            elasticapm_client.begin_transaction("foo")
            producer.send("test", key=b"foo", value=b"bar")
            producer.send("test", key=b"baz", value=b"bazzinga")
            producer.flush()
            elasticapm_client.end_transaction("foo")
    
        thread = threading.Thread(target=delayed_send)
        thread.start()
        transaction = elasticapm_client.begin_transaction("foo")
        for item in consumer:
            pass
        thread.join()
        elasticapm_client.end_transaction("foo")
        transactions = elasticapm_client.events[TRANSACTION]
        assert len(transactions) == 2
        external_spans = elasticapm_client.spans_for_transaction(transactions[0])
        spans = elasticapm_client.spans_for_transaction(transactions[1])
        assert len(external_spans) == 2
>       assert len(spans) == 2
E       assert 0 == 2
E        +  where 0 = len([])

consumer   = <kafka.consumer.group.KafkaConsumer object at 0x7f408c2a1ba0>
delayed_send = <function test_kafka_consume_ongoing_transaction.<locals>.delayed_send at 0x7f408c16c1a0>
elasticapm_client = <tests.fixtures.TempStoreClient object at 0x7f408d41d9d0>
external_spans = [{'action': 'send', 'context': {'destination': {'address': 'kafka', 'port': 9092, 'service': {'name': 'kafka', 'resour...vice': {'target': {'name': 'test', 'type': 'kafka'}}}, 'duration': 0.48899999999999993, 'id': 'f964a73fb9260ef4', ...}]
instrument = None
producer   = <kafka.producer.kafka.KafkaProducer object at 0x7f408d41ecf0>
spans      = []
thread     = <Thread(Thread-12 (delayed_send), stopped 139915119552192)>
topics     = ['test', 'foo', 'bar']
transaction = <elasticapm.traces.Transaction object at 0x7f408d3ffd90>
transactions = [{'context': {'tags': {}}, 'duration': 12.602, 'id': '0a2d2ff318df135c', 'name': 'foo', ...}, {'context': {'tags': {}}, 'duration': 502.87899999999996, 'id': '60f261ba230a8914', 'name': 'foo', ...}]

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions