Skip to content

Commit df67391

Browse files
committed
chore: Add better tests for serial data.
Try different combinations of serial data and SMP data. Test error handling.
1 parent c555894 commit df67391

File tree

1 file changed

+167
-35
lines changed

1 file changed

+167
-35
lines changed

tests/test_smp_serial_transport.py

Lines changed: 167 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from smp import packet as smppacket
1111

1212
from smpclient.requests.os_management import EchoWrite
13+
from smpclient.transport import SMPTransportDisconnected
1314
from smpclient.transport.serial import SMPSerialTransport
1415

1516

@@ -129,57 +130,188 @@ async def test_read_one_smp_packet() -> None:
129130
for p in packets:
130131
assert p == await t._read_one_smp_packet()
131132

133+
await t.disconnect()
134+
132135

133136
@pytest.mark.asyncio
134-
async def test_read_one_smp_packet_with_smp_server_logging(
135-
caplog: pytest.LogCaptureFixture,
136-
) -> None:
137+
async def test_send_and_receive() -> None:
138+
t = SMPSerialTransport()
139+
t.send = AsyncMock() # type: ignore
140+
t.receive = AsyncMock() # type: ignore
141+
142+
await t.send_and_receive(b"some data")
143+
144+
t.send.assert_awaited_once_with(b"some data")
145+
t.receive.assert_awaited_once_with()
146+
147+
148+
@pytest.mark.asyncio
149+
async def test_only_serial_data_no_smp(caplog: pytest.LogCaptureFixture) -> None:
150+
t = SMPSerialTransport()
151+
await t.connect("/dev/ttyACM0", timeout_s=1.0)
152+
153+
t._conn.read_all = MagicMock( # type: ignore
154+
side_effect=[
155+
b"First line\n",
156+
b"Second line\nThird line\n",
157+
b"Partial line",
158+
b" continues here\n",
159+
b"No newline at end",
160+
]
161+
)
162+
163+
with caplog.at_level(logging.WARNING):
164+
await asyncio.sleep(0.05) # Flush mock serial.
165+
messages = {r.message for r in caplog.records}
166+
assert "/dev/ttyACM0: First line" in messages
167+
assert "/dev/ttyACM0: Second line" in messages
168+
assert "/dev/ttyACM0: Third line" in messages
169+
assert "/dev/ttyACM0: Partial line continues here" in messages
170+
assert "/dev/ttyACM0: No newline at end" not in messages # NOT!
171+
172+
await t.disconnect()
173+
messages = {r.message for r in caplog.records}
174+
assert "/dev/ttyACM0: No newline at end" in messages
175+
176+
177+
@pytest.mark.asyncio
178+
async def test_only_smp_data_no_serial() -> None:
137179
t = SMPSerialTransport()
138180
await t.connect("/dev/ttyUSB0", timeout_s=1.0)
139181

140-
m1 = EchoWrite._Response.get_default()(sequence=0, r="Hello pytest!") # type: ignore
141-
m2 = EchoWrite._Response.get_default()(sequence=1, r="Hello computer!") # type: ignore
142-
p1 = [p for p in smppacket.encode(m1.BYTES, 8)]
143-
p2 = [p for p in smppacket.encode(m2.BYTES, 8)]
144-
packets = p1 + p2
182+
m1 = EchoWrite._Response.get_default()(sequence=0, r="SMP Message 1") # type: ignore
183+
m2 = EchoWrite._Response.get_default()(sequence=1, r="SMP Message 2") # type: ignore
184+
m3 = EchoWrite._Response.get_default()(sequence=2, r="SMP Message 3") # type: ignore
185+
186+
packets = (
187+
list(smppacket.encode(m1.BYTES, 512))
188+
+ list(smppacket.encode(m2.BYTES, 512))
189+
+ list(smppacket.encode(m3.BYTES, 512))
190+
)
191+
192+
t._conn.read_all = MagicMock(side_effect=packets) # type: ignore
193+
194+
for expected_msg in [m1.BYTES, m2.BYTES, m3.BYTES]:
195+
received = await t.receive()
196+
assert received == expected_msg
197+
198+
await t.disconnect()
199+
200+
201+
@pytest.mark.asyncio
202+
async def test_serial_and_smp(caplog: pytest.LogCaptureFixture) -> None:
203+
t = SMPSerialTransport()
204+
await t.connect("/dev/ttyUSB0", timeout_s=1.0)
205+
206+
m1 = EchoWrite._Response.get_default()(sequence=0, r="SMP1") # type: ignore
207+
m2 = EchoWrite._Response.get_default()(sequence=1, r="SMP2") # type: ignore
208+
209+
p1 = next(smppacket.encode(m1.BYTES, 512))
210+
p2 = next(smppacket.encode(m2.BYTES, 512))
145211

146212
t._conn.read_all = MagicMock( # type: ignore
147-
side_effect=(
148-
[b"Hi, there!"]
149-
+ [b"newline\n"]
150-
+ [b"Another line\nAgain\n"]
151-
+ [b"log with no newline,"]
152-
+ p1
153-
+ [b" ends here\n"]
154-
+ [bytes([0, 1, 2, 3])]
155-
+ [b"Bye!\n"]
156-
+ p2
157-
+ [b"One more thing...\n"]
158-
+ [b"We \n could \n use \n newlines\n"]
159-
)
213+
side_effect=[
214+
b"Start\n" + p1[:10],
215+
p1[10:] + b"Mid1\n",
216+
b"Mid2\n" + p2,
217+
b"End\n",
218+
]
160219
)
161220

162221
with caplog.at_level(logging.WARNING):
163-
for p in packets:
164-
assert p == await t._read_one_smp_packet()
222+
received1 = await t.receive()
223+
assert received1 == m1.BYTES
165224

166-
await t.disconnect() # Flush remaining serial data
225+
received2 = await t.receive()
226+
assert received2 == m2.BYTES
167227

228+
await asyncio.sleep(0.05) # Flush mock serial.
168229
messages = {r.message for r in caplog.records}
169-
assert "/dev/ttyUSB0: Hi, there!newline" in messages
170-
assert "/dev/ttyUSB0: Another line" in messages
171-
assert "/dev/ttyUSB0: Again" in messages
172-
assert "/dev/ttyUSB0: log with no newline, ends here" in messages
173-
assert "/dev/ttyUSB0: \x00\x01\x02\x03Bye!" in messages
230+
assert "/dev/ttyUSB0: Start" in messages
231+
assert "/dev/ttyUSB0: Mid1" in messages
232+
assert "/dev/ttyUSB0: Mid2" in messages
233+
assert "/dev/ttyUSB0: End" in messages
234+
235+
await t.disconnect()
174236

175237

176238
@pytest.mark.asyncio
177-
async def test_send_and_receive() -> None:
239+
async def test_disconnect_flushes_partial_smp_as_serial(
240+
caplog: pytest.LogCaptureFixture,
241+
) -> None:
178242
t = SMPSerialTransport()
179-
t.send = AsyncMock() # type: ignore
180-
t.receive = AsyncMock() # type: ignore
243+
await t.connect("/dev/ttyACM0", timeout_s=1.0)
181244

182-
await t.send_and_receive(b"some data")
245+
partial_smp = smppacket.START_DELIMITER + b"AAAA"
183246

184-
t.send.assert_awaited_once_with(b"some data")
185-
t.receive.assert_awaited_once_with()
247+
t._conn.read_all = MagicMock( # type: ignore
248+
side_effect=[
249+
b"Normal serial\n",
250+
partial_smp,
251+
]
252+
)
253+
254+
with caplog.at_level(logging.WARNING):
255+
await asyncio.sleep(0.05) # Flush mock serial.
256+
await t.disconnect()
257+
messages = {r.message for r in caplog.records}
258+
assert "/dev/ttyACM0: Normal serial" in messages
259+
assert any(
260+
b"AAAA" in r.message.encode("utf-8", errors="ignore")
261+
for r in caplog.records
262+
)
263+
264+
265+
@pytest.mark.asyncio
266+
async def test_not_connected_exception_handling() -> None:
267+
t = SMPSerialTransport()
268+
269+
with pytest.raises(SMPTransportDisconnected):
270+
await t.receive()
271+
272+
273+
@pytest.mark.asyncio
274+
async def test_serial_callback_exception_handling(
275+
caplog: pytest.LogCaptureFixture,
276+
) -> None:
277+
async def bad_callback(data: bytes) -> None:
278+
raise ValueError("Callback error")
279+
280+
t = SMPSerialTransport(serial_line_callback=bad_callback)
281+
await t.connect("/dev/ttyACM0", timeout_s=1.0)
282+
283+
t._conn.read_all = MagicMock( # type: ignore
284+
side_effect=[b"Line 1\n", b"Line 2\n"]
285+
)
286+
287+
with caplog.at_level(logging.ERROR):
288+
await asyncio.sleep(0.05) # Flush mock serial.
289+
assert any(
290+
"Serial data callback raised an exception" in r.message
291+
for r in caplog.records
292+
)
293+
294+
await t.disconnect()
295+
296+
297+
@pytest.mark.asyncio
298+
async def test_zero_length_serial_lines(caplog: pytest.LogCaptureFixture) -> None:
299+
t = SMPSerialTransport()
300+
await t.connect("/dev/ttyUSB0", timeout_s=1.0)
301+
302+
t._conn.read_all = MagicMock( # type: ignore
303+
side_effect=[
304+
b"\n",
305+
b"\n\n\n",
306+
b"Text\n\n",
307+
b"\nText\n",
308+
b"",
309+
]
310+
)
311+
312+
with caplog.at_level(logging.WARNING):
313+
await asyncio.sleep(0.05) # Flush mock serial.
314+
empty_messages = [r for r in caplog.records if r.message == "/dev/ttyUSB0: "]
315+
assert len(empty_messages) >= 4
316+
317+
await t.disconnect()

0 commit comments

Comments
 (0)