Skip to content

Commit 75e2280

Browse files
committed
WIP first draft of streaming channel
1 parent 5ca263d commit 75e2280

File tree

3 files changed

+219
-13
lines changed

3 files changed

+219
-13
lines changed

sensirion_driver_adapters/channel.py

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,60 @@
22
# (c) Copyright 2021 Sensirion AG, Switzerland
33

44
import abc
5-
from typing import Any, Iterable, Optional, Tuple
5+
from dataclasses import dataclass
6+
from typing import Any, Iterable, Optional, Tuple, Iterator, Protocol
67

78
from sensirion_driver_adapters.rx_tx_data import RxData
89

910

11+
@dataclass
12+
class MeasurementPacket:
13+
timestamp: float
14+
data: Optional[bytes]
15+
16+
17+
@dataclass
18+
class Measurement:
19+
stream_id: int # the id of the stream that produced this measurement
20+
timestamp: float # the timestamp of the measurement
21+
data: Tuple[Any, ...] # the measurement data
22+
23+
24+
class StreamingChannel(Protocol):
25+
def open_measurement_stream(self, tx_bytes: bytes,
26+
payload_offset: int,
27+
response: RxData,
28+
measurement_interval_us: int,
29+
time_before_first_read_us: int = 1000,
30+
repeat_tx_data: bool = False,
31+
slave_address: Optional[int] = None) -> Iterator[Measurement]:
32+
"""Start a measurement stream.
33+
34+
:param tx_bytes: The command and its parameters to trigger the measurement on the sensor.
35+
:param payload_offset: The bytes up to the payload_offset represent the command id
36+
:param response: The response is an object that is able to unpack a raw response.
37+
:param measurement_interval_us: The interval between two measurements in microseconds.
38+
:param time_before_first_read_us: The time to wait before the first measurement is taken.
39+
:param repeat_tx_data: If True, the tx_bytes will be repeated for each measurement until the stream is closed.
40+
Otherwise, the tx_bytes will be transmitted only once to trigger the measurement.
41+
:param slave_address: Overwrite the i2c-address of the channel
42+
"""
43+
44+
def close_measurement_stream(self, stream_id: int) -> None:
45+
"""
46+
"""
47+
48+
1049
class TxRxChannel(abc.ABC):
1150
"""
12-
This is the abstract base class for any channel. A channel is a transportation medium to transfer data from any
13-
source to any destination.
51+
Defines an abstract base class for bidirectional communication with a sensor device.
52+
53+
This class provides an interface for transmitting and receiving data with a sensor.
54+
It also allows streaming measurements, stripping protocol-level data, and enforcing a
55+
timeout property for communication.
56+
57+
Attributes:
58+
timeout (float): The maximum time duration for a communication attempt in seconds.
1459
"""
1560

1661
@abc.abstractmethod
@@ -21,16 +66,16 @@ def write_read(self, tx_bytes: Iterable, payload_offset: int,
2166
slave_address: Optional[int] = None,
2267
ignore_errors: bool = False) -> Optional[Tuple[Any, ...]]:
2368
"""
24-
Transfers the data to and from sensor.
69+
Transfers the data to and from a sensor.
2570
2671
:param tx_bytes:
2772
Raw bytes to be transmitted
2873
:param payload_offset:
29-
The data my contain a header that needs to be left untouched, pushing the date through the protocol stack.
74+
The data may contain a header that needs to be left untouched, pushing the date through the protocol stack.
3075
The Payload offset points to the end of the header and the beginning of the data
3176
:param response:
3277
The response is an object that is able to unpack a raw response.
33-
It has to provide a method 'interpret_response.
78+
It has to provide a method 'interpret_response'.
3479
:param device_busy_delay:
3580
Indication how long the receiver of the message will be busy until processing of the data has been
3681
completed.
@@ -61,7 +106,7 @@ def timeout(self) -> float:
61106

62107
class AbstractMultiChannel(TxRxChannel):
63108
"""
64-
This is the base class for any multi channel implementation. A multi channel is used to mimic simultaneous
109+
This is the base class for any multichannel implementation. A multichannel is used to mimic simultaneous
65110
communication with several sensors and is used by the MultiDeviceDecorator.
66111
"""
67112

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# -*- coding: utf-8 -*-
2+
# (c) Copyright 2026 Sensirion AG, Switzerland
3+
import datetime
4+
import logging
5+
import time
6+
from typing import Any, Iterator, Optional, Tuple, Callable, Iterable
7+
8+
from sensirion_driver_adapters.channel import StreamingChannel, Measurement, MeasurementPacket, TxRxChannel
9+
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel
10+
from sensirion_driver_adapters.rx_tx_data import RxData
11+
12+
log = logging.getLogger(__name__)
13+
14+
15+
class I2cStreamingChannel(StreamingChannel, TxRxChannel):
16+
def __init__(self,
17+
start_repeated_measurement: Callable[[int, int, bytes, bool, int], int],
18+
get_next_measurement_data: Callable[[int, RxData, float, int], Iterator[MeasurementPacket]],
19+
stop_repeated_measurement: Callable[[int], None],
20+
i2c_channel: I2cChannel) -> None:
21+
22+
self._i2_channel = i2c_channel
23+
self._start_repeated_measurement = start_repeated_measurement
24+
self._get_next_measurement_data = get_next_measurement_data
25+
self._stop_repeated_measurement = stop_repeated_measurement
26+
self._get_next_measurement = self.default_get_next_measurement
27+
self._stream_id = -1
28+
29+
def open_measurement_stream(self, tx_bytes: bytes,
30+
payload_offset: int,
31+
response: RxData,
32+
measurement_interval_us: int,
33+
time_before_first_read_us: int = 1000,
34+
repeat_tx_data: bool = False,
35+
slave_address: Optional[int] = None,
36+
_get_next_measurement: Optional[Callable] = None) -> Iterator[Measurement]:
37+
38+
if _get_next_measurement:
39+
# allow overriding the default get_next_measurement function
40+
self._get_next_measurement = _get_next_measurement
41+
tx_data = self._i2_channel.build_tx_data(tx_bytes, payload_offset, self._i2_channel._crc)
42+
i2c_address = self._i2_channel._slave_address
43+
rx_len = response.rx_length * 3 // 2 if self._i2_channel._crc else response.rx_length
44+
stream_id = self._start_repeated_measurement(measurement_interval_us,
45+
i2c_address,
46+
tx_data,
47+
repeat_tx_data,
48+
rx_len)
49+
return self._get_next_measurement(stream_id, self._get_next_measurement_data, response, measurement_interval_us)
50+
51+
def close_measurement_stream(self, stream_id: int) -> None:
52+
self._stop_repeated_measurement(stream_id)
53+
54+
def default_get_next_measurement(self, stream_id: int,
55+
get_next_measurement_data: Callable[[int, RxData, float, int],
56+
Iterator[MeasurementPacket]],
57+
response: RxData,
58+
measurement_interval_us: int) -> Iterator[Measurement]:
59+
60+
start_time: datetime.datetime = datetime.datetime.now()
61+
measurement_interval_s = measurement_interval_us / 1000000.0
62+
time.sleep(measurement_interval_s * 2) # wait for two samples before starting
63+
while True:
64+
nr_of_packets = 0
65+
time_stamp = start_time.timestamp()
66+
for new_packet in get_next_measurement_data(stream_id, response, time_stamp, measurement_interval_us):
67+
raw_data = self._i2_channel.strip_protocol(new_packet.data)
68+
next_measurement = Measurement(stream_id=stream_id,
69+
timestamp=new_packet.timestamp,
70+
data=response.unpack(raw_data))
71+
time_stamp = new_packet.timestamp
72+
yield next_measurement
73+
nr_of_packets += 1
74+
if nr_of_packets == 0:
75+
time.sleep(measurement_interval_s * 2) # wait for approx 50 samples
76+
start_time = datetime.datetime.now()
77+
continue
78+
next_measurement_time = datetime.datetime.fromtimestamp(time_stamp + measurement_interval_s)
79+
current_time = datetime.datetime.now()
80+
sleep_time = (next_measurement_time - current_time).total_seconds()
81+
if sleep_time > 0:
82+
time.sleep(sleep_time)
83+
start_time = datetime.datetime.now()
84+
start_time = next_measurement_time
85+
86+
def write_read(self, tx_bytes: Iterable, payload_offset: int,
87+
response: RxData,
88+
device_busy_delay: float = 0.0,
89+
post_processing_delay: Optional[float] = None,
90+
slave_address: Optional[int] = None,
91+
ignore_errors: bool = False) -> Optional[Tuple[Any, ...]]:
92+
return self._i2_channel.write_read(tx_bytes, payload_offset, response, device_busy_delay,
93+
post_processing_delay, slave_address, ignore_errors)
94+
95+
def strip_protocol(self, data) -> None:
96+
""""""
97+
self._i2_channel.strip_protocol(data)
98+
99+
def timeout(self) -> float:
100+
return self._i2_channel.timeout

sensirion_driver_adapters/i2c_adapter/sensor_bridge_i2c_channel_provider.py

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@
22
# (c) Copyright 2023 Sensirion AG, Switzerland
33

44
import time
5-
from typing import Optional, Tuple
5+
from typing import Optional, Tuple, Iterator, Dict
66

77
from sensirion_i2c_driver import I2cConnection
88
from sensirion_shdlc_driver import ShdlcSerialPort, ShdlcConnection
99
from sensirion_shdlc_sensorbridge import (SensorBridgePort,
1010
SensorBridgeShdlcDevice,
1111
SensorBridgeI2cProxy)
12+
from sensirion_shdlc_sensorbridge.device import ReadBufferResponse, RepeatedTransceiveHandle
1213

13-
from sensirion_driver_adapters.channel import TxRxChannel
14+
from sensirion_driver_adapters.channel import TxRxChannel, MeasurementPacket
1415
from sensirion_driver_adapters.channel_provider import I2cChannelProvider
15-
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel
16+
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel, RxData
17+
from sensirion_driver_adapters.i2c_adapter.i2c_streaming_channel import I2cStreamingChannel
1618

1719

1820
class SensorBridgeI2cChannelProvider(I2cChannelProvider):
@@ -40,6 +42,7 @@ def __init__(self, sensor_bridge_port: SensorBridgePort,
4042
self._shdlc_port: Optional[ShdlcSerialPort] = None
4143
self._sensor_bridge: Optional[SensorBridgeShdlcDevice] = None
4244
self._i2c_transceiver: Optional[SensorBridgeI2cProxy] = None
45+
self._streams: Dict[int, RepeatedTransceiveHandle] = {}
4346

4447
def release_channel_resources(self):
4548
"""
@@ -49,6 +52,8 @@ def release_channel_resources(self):
4952
"""
5053
if self._sensor_bridge is None:
5154
return
55+
for stream_id in self._streams.keys():
56+
self._stop_repeated_measurement(stream_id)
5257
assert self._sensor_bridge_port is not None, "Illegal state: SensorBridgePort is None!"
5358
assert self._shdlc_port is not None, "Illegal state: Shdlc port is None!"
5459
self._sensor_bridge.switch_supply_off(self._sensor_bridge_port)
@@ -80,6 +85,62 @@ def get_channel(self, slave_address: int,
8085
The crc calculator that can compute the crc checksum of the byte stream
8186
"""
8287

83-
return I2cChannel(I2cConnection(self._i2c_transceiver),
84-
slave_address=slave_address,
85-
crc=self.try_create_crc_calculator(crc_parameters))
88+
i2c_channel = I2cChannel(I2cConnection(self._i2c_transceiver),
89+
slave_address=slave_address,
90+
crc=self.try_create_crc_calculator(crc_parameters))
91+
channel = I2cStreamingChannel(
92+
start_repeated_measurement=self._start_repeated_i2c_transceive,
93+
get_next_measurement_data=self._get_next_measurement_data,
94+
stop_repeated_measurement=self._stop_repeated_measurement,
95+
i2c_channel=i2c_channel)
96+
return channel
97+
98+
def _start_repeated_i2c_transceive(self, measurement_interval_us: int,
99+
i2c_address: int,
100+
tx_data: bytes,
101+
_: bool,
102+
rx_length: int) -> int:
103+
"""Adapter to for sensor bridge start_repeated_i2c_transceive function."""
104+
handle: RepeatedTransceiveHandle = self._sensor_bridge.start_repeated_i2c_transceive(self._sensor_bridge_port,
105+
measurement_interval_us,
106+
i2c_address, tx_data,
107+
rx_length, 1000000)
108+
self._streams[handle.raw_handle] = handle
109+
return handle.raw_handle
110+
111+
def _stop_repeated_measurement(self, stream_id) -> None:
112+
"""Adapter to for sensor bridge stop_repeated_measurement function."""
113+
handle = self._streams.get(stream_id)
114+
if handle is None:
115+
raise ValueError(f"No stream with id {stream_id} found!")
116+
self._sensor_bridge.stop_repeated_i2c_transceive(handle)
117+
118+
def _get_next_measurement_data(self, stream_id: int, rx_descriptor: RxData,
119+
start_time:
120+
float, interval_us: int) -> Iterator[MeasurementPacket]:
121+
122+
rx_length = rx_descriptor.rx_length * 3 // 2
123+
124+
def handle_buffer_response(response: ReadBufferResponse, start_time: float) -> Iterator[MeasurementPacket]:
125+
lost_packets = response.lost_bytes % rx_length
126+
for i in range(lost_packets):
127+
p = MeasurementPacket(timestamp=start_time, data=None)
128+
start_time += interval_us / 1000000
129+
yield p
130+
for d in response.values:
131+
p = MeasurementPacket(timestamp=start_time,
132+
data=None if d.error else d.data)
133+
start_time += interval_us / 1000000
134+
yield p
135+
136+
handle = self._streams.get(stream_id)
137+
if handle is None:
138+
raise ValueError(f"No stream with id {stream_id} found!")
139+
buffer_response: ReadBufferResponse = self._sensor_bridge.read_buffer(handle)
140+
for p in handle_buffer_response(buffer_response, start_time):
141+
yield p
142+
143+
while buffer_response.remaining_bytes > 0:
144+
buffer_response = self._sensor_bridge.read_buffer(handle)
145+
for p in handle_buffer_response(buffer_response, start_time):
146+
yield p

0 commit comments

Comments
 (0)