Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 73 additions & 7 deletions sensirion_driver_adapters/channel.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,82 @@
# -*- coding: utf-8 -*-
# (c) Copyright 2021 Sensirion AG, Switzerland

from __future__ import annotations

import abc
from typing import Any, Iterable, Optional, Tuple
from dataclasses import dataclass
from typing import Any, Iterable, Optional, Tuple, Iterator, Protocol

from sensirion_driver_adapters.rx_tx_data import RxData


@dataclass
class MeasurementPacket:
timestamp: float
data: Optional[bytes]


@dataclass
class Measurement:
stream_id: int # the id of the stream that produced this measurement
timestamp: float # the timestamp of the measurement
data: Tuple[Any, ...] # the measurement data


class MeasurementStream(Protocol):

def open(self) -> Iterator[Measurement]:
""" Open the stream"""

def close(self):
""" Close the stream"""

def get_stream_id(self) -> int:
"""Return the id of the stream"""

def __enter__(self) -> MeasurementStream:
"""Enter the streaming context"""

def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
"""Exit the streaming context"""

def __iter__(self) -> Iterator[Measurement]:
"""Open the stream and return an iterator over the measurements"""

def __next__(self) -> Measurement:
"""Get the next measurement from the stream"""


class StreamingChannel(Protocol):
def get_measurement_stream(self, tx_bytes: bytes,
payload_offset: int,
response: RxData,
measurement_interval_us: int,
buffered_samples: int = 100,
sensor_busy_delay_us: int = 10,
repeat_tx_data: bool = False) -> MeasurementStream:
"""Start a measurement stream.

:param tx_bytes: The command and its parameters to trigger the measurement on the sensor.
:param payload_offset: The bytes up to the payload_offset represent the command id
:param response: The response is an object that is able to unpack a raw response.
:param measurement_interval_us: The interval between two measurements in microseconds.
:param buffered_samples: The number of samples that shall be buffered.
:param sensor_busy_delay_us: The number of microseconds between write data tx and read result.
:param repeat_tx_data: If True, the tx_bytes will be repeated for each measurement until the stream is closed.
Otherwise, the tx_bytes will be transmitted only once to trigger the measurement.
:param slave_address: Overwrite the i2c-address of the channel
"""


class TxRxChannel(abc.ABC):
"""
This is the abstract base class for any channel. A channel is a transportation medium to transfer data from any
source to any destination.
Defines an abstract base class for bidirectional communication with a sensor device.

This class provides an interface for transmitting and receiving data with a sensor.
It also allows streaming measurements, stripping protocol-level data, and enforcing a
timeout property for communication.

"""

@abc.abstractmethod
Expand All @@ -21,16 +87,16 @@ def write_read(self, tx_bytes: Iterable, payload_offset: int,
slave_address: Optional[int] = None,
ignore_errors: bool = False) -> Optional[Tuple[Any, ...]]:
"""
Transfers the data to and from sensor.
Transfers the data to and from a sensor.

:param tx_bytes:
Raw bytes to be transmitted
:param payload_offset:
The data my contain a header that needs to be left untouched, pushing the date through the protocol stack.
The data may contain a header that needs to be left untouched, pushing the date through the protocol stack.
The Payload offset points to the end of the header and the beginning of the data
:param response:
The response is an object that is able to unpack a raw response.
It has to provide a method 'interpret_response.
It has to provide a method 'interpret_response'.
:param device_busy_delay:
Indication how long the receiver of the message will be busy until processing of the data has been
completed.
Expand Down Expand Up @@ -61,7 +127,7 @@ def timeout(self) -> float:

class AbstractMultiChannel(TxRxChannel):
"""
This is the base class for any multi channel implementation. A multi channel is used to mimic simultaneous
This is the base class for any multichannel implementation. A multichannel is used to mimic simultaneous
communication with several sensors and is used by the MultiDeviceDecorator.
"""

Expand Down
56 changes: 56 additions & 0 deletions sensirion_driver_adapters/i2c_adapter/i2c_streaming_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# (c) Copyright 2026 Sensirion AG, Switzerland
import logging
from typing import Any, Optional, Tuple, Callable, Iterable

from sensirion_driver_adapters.channel import StreamingChannel, TxRxChannel, MeasurementStream
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel
from sensirion_driver_adapters.rx_tx_data import RxData

log = logging.getLogger(__name__)


class I2cStreamingChannel(StreamingChannel, TxRxChannel):
def __init__(self,
create_measurement_stream: Callable[[bytes,
int,
RxData,
int,
int,
int,
bool], MeasurementStream],
i2c_channel: I2cChannel) -> None:
self._i2_channel = i2c_channel
self._create_measurement_stream = create_measurement_stream

def get_measurement_stream(self, tx_bytes: bytes,
payload_offset: int,
response: RxData,
measurement_interval_us: int,
buffered_samples: int = 100,
sensor_busy_delay_us: int = 10,
repeat_tx_data: bool = False) -> MeasurementStream:
measurement_stream = self._create_measurement_stream(tx_bytes,
payload_offset,
response,
measurement_interval_us,
buffered_samples,
sensor_busy_delay_us,
repeat_tx_data)
return measurement_stream

def write_read(self, tx_bytes: Iterable, payload_offset: int,
response: RxData,
device_busy_delay: float = 0.0,
post_processing_delay: Optional[float] = None,
slave_address: Optional[int] = None,
ignore_errors: bool = False) -> Optional[Tuple[Any, ...]]:
return self._i2_channel.write_read(tx_bytes, payload_offset, response, device_busy_delay,
post_processing_delay, slave_address, ignore_errors)

def strip_protocol(self, data) -> None:
""""""
self._i2_channel.strip_protocol(data)

def timeout(self) -> float:
return self._i2_channel.timeout
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,124 @@
# (c) Copyright 2023 Sensirion AG, Switzerland

import time
from typing import Optional, Tuple
from datetime import datetime, timezone
from functools import partial
from typing import Optional, Tuple, Iterator, List

from sensirion_i2c_driver import I2cConnection
from sensirion_i2c_driver.errors import I2cChecksumError
from sensirion_shdlc_driver import ShdlcSerialPort, ShdlcConnection
from sensirion_shdlc_sensorbridge import (SensorBridgePort,
SensorBridgeShdlcDevice,
SensorBridgeI2cProxy)
from sensirion_shdlc_sensorbridge.device import ReadBufferResponse, RepeatedTransceiveHandle

from sensirion_driver_adapters.channel import TxRxChannel
from sensirion_driver_adapters.channel import TxRxChannel, MeasurementStream, Measurement
from sensirion_driver_adapters.channel_provider import I2cChannelProvider
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel
from sensirion_driver_adapters.i2c_adapter.i2c_channel import I2cChannel, RxData
from sensirion_driver_adapters.i2c_adapter.i2c_streaming_channel import I2cStreamingChannel


class SensorBridgeMeasurementStream(MeasurementStream):
def __init__(self, sensor_bridge: SensorBridgeShdlcDevice,
sensor_bridge_port: SensorBridgePort,
channel: I2cChannel,
tx_bytes: bytes,
payload_offset: int,
response: RxData,
measurement_interval_us: int,
buffered_samples: int = 100,
sensor_busy_delay_us: int = 10):
self._tx_bytes = tx_bytes
self._payload_offset = payload_offset
self._response_descriptor = response
self._measurement_interval_us = measurement_interval_us
self._buffered_samples = buffered_samples
self._sensor_busy_delay_us = sensor_busy_delay_us
self._channel = channel
self._i2c_address = channel._slave_address # noqa: protected-access
self._sensor_bridge_port = sensor_bridge_port
self._sensor_bridge = sensor_bridge
self._rx_length = response.rx_length * 3 // 2
self._handle: Optional[RepeatedTransceiveHandle] = None
self._start_time = datetime.now(tz=timezone.utc).timestamp()
self._stream_iterator = None

def open(self) -> MeasurementStream:
self._handle: RepeatedTransceiveHandle = self._sensor_bridge.start_repeated_i2c_transceive(
self._sensor_bridge_port,
self._measurement_interval_us,
self._i2c_address,
self._tx_bytes,
self._rx_length,
timeout_us=1000,
read_delay_us=self._sensor_busy_delay_us)
self._start_time = datetime.now(tz=timezone.utc).timestamp()
time.sleep(self._buffered_samples * self._measurement_interval_us / 1000000)
return self

def close(self) -> None:
print("closing stream")
if self._handle is not None:
self._sensor_bridge.stop_repeated_i2c_transceive(self._handle)
self._handle = None
self._stream_iterator = None

def __enter__(self) -> MeasurementStream:
return self.open()

def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.close()
self._stream_iterator = None
return False
except Exception: # noqa
return True

def __iter__(self) -> Iterator[Measurement]:
self._stream_iterator = self.get_next_measurement()
return self

def __next__(self) -> Measurement:
assert self._stream_iterator is not None, "Illegal state: Stream iterator is None!"
return next(self._stream_iterator)

def get_next_measurement(self) -> Iterator[Measurement]:
def handle_buffer_response(response: ReadBufferResponse) -> List[Optional[bytes]]:
lost_packets = response.lost_bytes % self._rx_length
measurement_data = [None for _ in range(lost_packets)]
measurement_data += [None if d.error else d.data for d in response.values]
return measurement_data

interval_s = self._measurement_interval_us / 1_000_000
while self._handle is not None:
buffer_response: ReadBufferResponse = self._sensor_bridge.read_buffer(self._handle)
receive_time = datetime.now(tz=timezone.utc).timestamp()
measurements = handle_buffer_response(buffer_response)
computed_interval_s = interval_s
if len(measurements) > 0:
computed_interval_s = (receive_time - self._start_time) / len(measurements)
effective_measurement_interval = min(interval_s,
computed_interval_s) # shrink but to not extend the interval
start_time = self._start_time
for data in measurements:
unpacked_data = None
if data is not None:
try:
unpacked_data = self._channel.strip_protocol(data)
except I2cChecksumError: # errors are reported the same way as lost packets
...
yield Measurement(stream_id=self._handle.raw_handle,
timestamp=start_time,
data=None if data is None else self._response_descriptor.unpack(unpacked_data))
start_time += effective_measurement_interval
new_start_time = datetime.now(tz=timezone.utc).timestamp()
start_time = min(start_time, new_start_time) # measurements must not be in the future
next_receive_time = receive_time + effective_measurement_interval * self._buffered_samples
sleep_time = max(0.0, (next_receive_time - new_start_time))
if sleep_time > 1.0:
time.sleep(sleep_time)
self._start_time = start_time


class SensorBridgeI2cChannelProvider(I2cChannelProvider):
Expand Down Expand Up @@ -40,6 +147,7 @@ def __init__(self, sensor_bridge_port: SensorBridgePort,
self._shdlc_port: Optional[ShdlcSerialPort] = None
self._sensor_bridge: Optional[SensorBridgeShdlcDevice] = None
self._i2c_transceiver: Optional[SensorBridgeI2cProxy] = None
self._streams: List[SensorBridgeMeasurementStream] = []

def release_channel_resources(self):
"""
Expand All @@ -49,6 +157,9 @@ def release_channel_resources(self):
"""
if self._sensor_bridge is None:
return
for stream in self._streams:
stream.close()
self._streams.clear()
assert self._sensor_bridge_port is not None, "Illegal state: SensorBridgePort is None!"
assert self._shdlc_port is not None, "Illegal state: Shdlc port is None!"
self._sensor_bridge.switch_supply_off(self._sensor_bridge_port)
Expand Down Expand Up @@ -80,6 +191,31 @@ def get_channel(self, slave_address: int,
The crc calculator that can compute the crc checksum of the byte stream
"""

return I2cChannel(I2cConnection(self._i2c_transceiver),
slave_address=slave_address,
crc=self.try_create_crc_calculator(crc_parameters))
i2c_channel = I2cChannel(I2cConnection(self._i2c_transceiver),
slave_address=slave_address,
crc=self.try_create_crc_calculator(crc_parameters))
channel = I2cStreamingChannel(
partial(self._create_measurement_stream, i2c_channel),
i2c_channel=i2c_channel)
return channel

def _create_measurement_stream(self, i2c_channel: I2cChannel,
tx_bytes: bytes,
payload_offset: int,
response: RxData,
measurement_interval_us: int,
buffered_samples: int = 100,
sensor_busy_delay_us: int = 10,
_: bool = False
) -> MeasurementStream:
stream = SensorBridgeMeasurementStream(self._sensor_bridge,
self._sensor_bridge_port,
i2c_channel,
tx_bytes,
payload_offset,
response,
measurement_interval_us,
buffered_samples,
sensor_busy_delay_us)
self._streams.append(stream)
return stream
Loading