Skip to content

Commit f6da729

Browse files
committed
Provide streaming infrastructure
A StreamingChannel is used to open a MeasurementStream. Using such a MeasurementStream allows for faster data acquisition. A measurement device can buffer single measurements and return them in larger chunks. Not every programming device will support a StreamingChannel. A first example implementation is provided for the SensorBridge using start_repeated_i2c_transceive function and its counterparts.
1 parent 5ca263d commit f6da729

File tree

3 files changed

+275
-13
lines changed

3 files changed

+275
-13
lines changed

sensirion_driver_adapters/channel.py

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,82 @@
11
# -*- coding: utf-8 -*-
22
# (c) Copyright 2021 Sensirion AG, Switzerland
33

4+
from __future__ import annotations
5+
46
import abc
5-
from typing import Any, Iterable, Optional, Tuple
7+
from dataclasses import dataclass
8+
from typing import Any, Iterable, Optional, Tuple, Iterator, Protocol
69

710
from sensirion_driver_adapters.rx_tx_data import RxData
811

912

13+
@dataclass
14+
class MeasurementPacket:
15+
timestamp: float
16+
data: Optional[bytes]
17+
18+
19+
@dataclass
20+
class Measurement:
21+
stream_id: int # the id of the stream that produced this measurement
22+
timestamp: float # the timestamp of the measurement
23+
data: Tuple[Any, ...] # the measurement data
24+
25+
26+
class MeasurementStream(Protocol):
27+
28+
def open(self) -> Iterator[Measurement]:
29+
""" Open the stream"""
30+
31+
def close(self):
32+
""" Close the stream"""
33+
34+
def get_stream_id(self) -> int:
35+
"""Return the id of the stream"""
36+
37+
def __enter__(self) -> MeasurementStream:
38+
"""Enter the streaming context"""
39+
40+
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
41+
"""Exit the streaming context"""
42+
43+
def __iter__(self) -> Iterator[Measurement]:
44+
"""Open the stream and return an iterator over the measurements"""
45+
46+
def __next__(self) -> Measurement:
47+
"""Get the next measurement from the stream"""
48+
49+
50+
class StreamingChannel(Protocol):
51+
def get_measurement_stream(self, tx_bytes: bytes,
52+
payload_offset: int,
53+
response: RxData,
54+
measurement_interval_us: int,
55+
buffered_samples: int = 100,
56+
sensor_busy_delay_us: int = 10,
57+
repeat_tx_data: bool = False) -> MeasurementStream:
58+
"""Start a measurement stream.
59+
60+
:param tx_bytes: The command and its parameters to trigger the measurement on the sensor.
61+
:param payload_offset: The bytes up to the payload_offset represent the command id
62+
:param response: The response is an object that is able to unpack a raw response.
63+
:param measurement_interval_us: The interval between two measurements in microseconds.
64+
:param buffered_samples: The number of samples that shall be buffered.
65+
:param sensor_busy_delay_us: The number of microseconds between write data tx and read result.
66+
:param repeat_tx_data: If True, the tx_bytes will be repeated for each measurement until the stream is closed.
67+
Otherwise, the tx_bytes will be transmitted only once to trigger the measurement.
68+
:param slave_address: Overwrite the i2c-address of the channel
69+
"""
70+
71+
1072
class TxRxChannel(abc.ABC):
1173
"""
12-
This is the abstract base class for any channel. A channel is a transportation medium to transfer data from any
13-
source to any destination.
74+
Defines an abstract base class for bidirectional communication with a sensor device.
75+
76+
This class provides an interface for transmitting and receiving data with a sensor.
77+
It also allows streaming measurements, stripping protocol-level data, and enforcing a
78+
timeout property for communication.
79+
1480
"""
1581

1682
@abc.abstractmethod
@@ -21,16 +87,16 @@ def write_read(self, tx_bytes: Iterable, payload_offset: int,
2187
slave_address: Optional[int] = None,
2288
ignore_errors: bool = False) -> Optional[Tuple[Any, ...]]:
2389
"""
24-
Transfers the data to and from sensor.
90+
Transfers the data to and from a sensor.
2591
2692
:param tx_bytes:
2793
Raw bytes to be transmitted
2894
:param payload_offset:
29-
The data my contain a header that needs to be left untouched, pushing the date through the protocol stack.
95+
The data may contain a header that needs to be left untouched, pushing the date through the protocol stack.
3096
The Payload offset points to the end of the header and the beginning of the data
3197
:param response:
3298
The response is an object that is able to unpack a raw response.
33-
It has to provide a method 'interpret_response.
99+
It has to provide a method 'interpret_response'.
34100
:param device_busy_delay:
35101
Indication how long the receiver of the message will be busy until processing of the data has been
36102
completed.
@@ -61,7 +127,7 @@ def timeout(self) -> float:
61127

62128
class AbstractMultiChannel(TxRxChannel):
63129
"""
64-
This is the base class for any multi channel implementation. A multi channel is used to mimic simultaneous
130+
This is the base class for any multichannel implementation. A multichannel is used to mimic simultaneous
65131
communication with several sensors and is used by the MultiDeviceDecorator.
66132
"""
67133

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# -*- coding: utf-8 -*-
2+
# (c) Copyright 2026 Sensirion AG, Switzerland
3+
import logging
4+
from typing import Any, Optional, Tuple, Callable, Iterable
5+
6+
from sensirion_driver_adapters.channel import StreamingChannel, TxRxChannel, \
7+
MeasurementStream
8+
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel
9+
from sensirion_driver_adapters.rx_tx_data import RxData
10+
11+
log = logging.getLogger(__name__)
12+
13+
14+
class I2cStreamingChannel(StreamingChannel, TxRxChannel):
15+
def __init__(self,
16+
create_measurement_stream: Callable[[bytes,
17+
int,
18+
RxData,
19+
int,
20+
int,
21+
int,
22+
bool], MeasurementStream],
23+
i2c_channel: I2cChannel) -> None:
24+
25+
self._i2_channel = i2c_channel
26+
self._create_measurement_stream = create_measurement_stream
27+
28+
def get_measurement_stream(self, tx_bytes: bytes,
29+
payload_offset: int,
30+
response: RxData,
31+
measurement_interval_us: int,
32+
buffered_samples: int = 100,
33+
sensor_busy_delay_us: int = 10,
34+
repeat_tx_data: bool = False) -> MeasurementStream:
35+
36+
measurement_stream = self._create_measurement_stream(
37+
tx_bytes,
38+
payload_offset,
39+
response,
40+
measurement_interval_us,
41+
buffered_samples,
42+
sensor_busy_delay_us,
43+
repeat_tx_data)
44+
return measurement_stream
45+
46+
def write_read(self, tx_bytes: Iterable, payload_offset: int,
47+
response: RxData,
48+
device_busy_delay: float = 0.0,
49+
post_processing_delay: Optional[float] = None,
50+
slave_address: Optional[int] = None,
51+
ignore_errors: bool = False) -> Optional[Tuple[Any, ...]]:
52+
return self._i2_channel.write_read(tx_bytes, payload_offset, response, device_busy_delay,
53+
post_processing_delay, slave_address, ignore_errors)
54+
55+
def strip_protocol(self, data) -> None:
56+
""""""
57+
self._i2_channel.strip_protocol(data)
58+
59+
def timeout(self) -> float:
60+
return self._i2_channel.timeout

sensirion_driver_adapters/i2c_adapter/sensor_bridge_i2c_channel_provider.py

Lines changed: 142 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,124 @@
22
# (c) Copyright 2023 Sensirion AG, Switzerland
33

44
import time
5-
from typing import Optional, Tuple
5+
from datetime import datetime, timezone
6+
from functools import partial
7+
from typing import Optional, Tuple, Iterator, List
68

79
from sensirion_i2c_driver import I2cConnection
10+
from sensirion_i2c_driver.errors import I2cChecksumError
811
from sensirion_shdlc_driver import ShdlcSerialPort, ShdlcConnection
912
from sensirion_shdlc_sensorbridge import (SensorBridgePort,
1013
SensorBridgeShdlcDevice,
1114
SensorBridgeI2cProxy)
15+
from sensirion_shdlc_sensorbridge.device import ReadBufferResponse, RepeatedTransceiveHandle
1216

13-
from sensirion_driver_adapters.channel import TxRxChannel
17+
from sensirion_driver_adapters.channel import TxRxChannel, MeasurementStream, Measurement
1418
from sensirion_driver_adapters.channel_provider import I2cChannelProvider
15-
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel
19+
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel, RxData
20+
from sensirion_driver_adapters.i2c_adapter.i2c_streaming_channel import I2cStreamingChannel
21+
22+
23+
class SensorBridgeMeasurementStream(MeasurementStream):
24+
def __init__(self, sensor_bridge: SensorBridgeShdlcDevice,
25+
sensor_bridge_port: SensorBridgePort,
26+
channel: I2cChannel,
27+
tx_bytes: bytes,
28+
payload_offset: int,
29+
response: RxData,
30+
measurement_interval_us: int,
31+
buffered_samples: int = 100,
32+
sensor_busy_delay_us: int = 10):
33+
self._tx_bytes = tx_bytes
34+
self._payload_offset = payload_offset
35+
self._response_descriptor = response
36+
self._measurement_interval_us = measurement_interval_us
37+
self._buffered_samples = buffered_samples
38+
self._sensor_busy_delay_us = sensor_busy_delay_us
39+
self._channel = channel
40+
self._i2c_address = channel._slave_address # noqa: protected-access
41+
self._sensor_bridge_port = sensor_bridge_port
42+
self._sensor_bridge = sensor_bridge
43+
self._rx_length = response.rx_length * 3 // 2
44+
self._handle: Optional[RepeatedTransceiveHandle] = None
45+
self._start_time = datetime.now(tz=timezone.utc).timestamp()
46+
self._stream_iterator = None
47+
48+
def open(self) -> MeasurementStream:
49+
self._handle: RepeatedTransceiveHandle = self._sensor_bridge.start_repeated_i2c_transceive(
50+
self._sensor_bridge_port,
51+
self._measurement_interval_us,
52+
self._i2c_address,
53+
self._tx_bytes,
54+
self._rx_length,
55+
timeout_us=1000,
56+
read_delay_us=self._sensor_busy_delay_us)
57+
self._start_time = datetime.now(tz=timezone.utc).timestamp()
58+
time.sleep(self._buffered_samples * self._measurement_interval_us / 1000000)
59+
return self
60+
61+
def close(self) -> None:
62+
print("closing stream")
63+
if self._handle is not None:
64+
self._sensor_bridge.stop_repeated_i2c_transceive(self._handle)
65+
self._handle = None
66+
self._stream_iterator = None
67+
68+
def __enter__(self) -> MeasurementStream:
69+
return self.open()
70+
71+
def __exit__(self, exc_type, exc_val, exc_tb):
72+
try:
73+
self.close()
74+
self._stream_iterator = None
75+
return False
76+
except Exception: # noqa
77+
return True
78+
79+
def __iter__(self) -> Iterator[Measurement]:
80+
self._stream_iterator = self.get_next_measurement()
81+
return self
82+
83+
def __next__(self) -> Measurement:
84+
assert self._stream_iterator is not None, "Illegal state: Stream iterator is None!"
85+
return next(self._stream_iterator)
86+
87+
def get_next_measurement(self) -> Iterator[Measurement]:
88+
def handle_buffer_response(response: ReadBufferResponse) -> List[Optional[bytes]]:
89+
lost_packets = response.lost_bytes % self._rx_length
90+
measurement_data = [None for _ in range(lost_packets)]
91+
measurement_data += [None if d.error else d.data for d in response.values]
92+
return measurement_data
93+
94+
interval_s = self._measurement_interval_us / 1_000_000
95+
while self._handle is not None:
96+
buffer_response: ReadBufferResponse = self._sensor_bridge.read_buffer(self._handle)
97+
receive_time = datetime.now(tz=timezone.utc).timestamp()
98+
measurements = handle_buffer_response(buffer_response)
99+
computed_interval_s = interval_s
100+
if len(measurements) > 0:
101+
computed_interval_s = (receive_time - self._start_time) / len(measurements)
102+
effective_measurement_interval = min(interval_s,
103+
computed_interval_s) # shrink but to not extend the interval
104+
start_time = self._start_time
105+
for data in measurements:
106+
unpacked_data = None
107+
if data is not None:
108+
try:
109+
unpacked_data = self._channel.strip_protocol(data)
110+
except I2cChecksumError: # errors are reported the same way as lost packets
111+
...
112+
yield Measurement(stream_id=self._handle.raw_handle,
113+
timestamp=start_time,
114+
data=None if data is None else self._response_descriptor.unpack(unpacked_data))
115+
start_time += effective_measurement_interval
116+
new_start_time = datetime.now(tz=timezone.utc).timestamp()
117+
start_time = min(start_time, new_start_time) # measurements must not be in the future
118+
next_receive_time = receive_time + effective_measurement_interval * self._buffered_samples
119+
sleep_time = max(0.0, (next_receive_time - new_start_time))
120+
if sleep_time > 1.0:
121+
time.sleep(sleep_time)
122+
self._start_time = start_time
16123

17124

18125
class SensorBridgeI2cChannelProvider(I2cChannelProvider):
@@ -40,6 +147,7 @@ def __init__(self, sensor_bridge_port: SensorBridgePort,
40147
self._shdlc_port: Optional[ShdlcSerialPort] = None
41148
self._sensor_bridge: Optional[SensorBridgeShdlcDevice] = None
42149
self._i2c_transceiver: Optional[SensorBridgeI2cProxy] = None
150+
self._streams: List[SensorBridgeMeasurementStream] = []
43151

44152
def release_channel_resources(self):
45153
"""
@@ -49,6 +157,9 @@ def release_channel_resources(self):
49157
"""
50158
if self._sensor_bridge is None:
51159
return
160+
for stream in self._streams:
161+
stream.close()
162+
self._streams.clear()
52163
assert self._sensor_bridge_port is not None, "Illegal state: SensorBridgePort is None!"
53164
assert self._shdlc_port is not None, "Illegal state: Shdlc port is None!"
54165
self._sensor_bridge.switch_supply_off(self._sensor_bridge_port)
@@ -80,6 +191,31 @@ def get_channel(self, slave_address: int,
80191
The crc calculator that can compute the crc checksum of the byte stream
81192
"""
82193

83-
return I2cChannel(I2cConnection(self._i2c_transceiver),
84-
slave_address=slave_address,
85-
crc=self.try_create_crc_calculator(crc_parameters))
194+
i2c_channel = I2cChannel(I2cConnection(self._i2c_transceiver),
195+
slave_address=slave_address,
196+
crc=self.try_create_crc_calculator(crc_parameters))
197+
channel = I2cStreamingChannel(
198+
partial(self._create_measurement_stream, i2c_channel),
199+
i2c_channel=i2c_channel)
200+
return channel
201+
202+
def _create_measurement_stream(self, i2c_channel: I2cChannel,
203+
tx_bytes: bytes,
204+
payload_offset: int,
205+
response: RxData,
206+
measurement_interval_us: int,
207+
buffered_samples: int = 100,
208+
sensor_busy_delay_us: int = 10,
209+
_: bool = False
210+
) -> MeasurementStream:
211+
stream = SensorBridgeMeasurementStream(self._sensor_bridge,
212+
self._sensor_bridge_port,
213+
i2c_channel,
214+
tx_bytes,
215+
payload_offset,
216+
response,
217+
measurement_interval_us,
218+
buffered_samples,
219+
sensor_busy_delay_us)
220+
self._streams.append(stream)
221+
return stream

0 commit comments

Comments
 (0)