-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_control_system.py
More file actions
191 lines (141 loc) · 5.55 KB
/
test_control_system.py
File metadata and controls
191 lines (141 loc) · 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import asyncio
from dataclasses import dataclass
import pytest
from fastcs.attributes import AttributeIO, AttributeIORef, AttrR, AttrRW
from fastcs.control_system import FastCS, build_controller_api
from fastcs.controllers import Controller
from fastcs.datatypes import Int
from fastcs.methods import Command, command, scan
from fastcs.util import ONCE
@pytest.mark.asyncio
async def test_scan_tasks(controller):
loop = asyncio.get_event_loop()
transport_options = []
fastcs = FastCS(controller, transport_options, loop)
asyncio.create_task(fastcs.serve(interactive=False))
await asyncio.sleep(0.1)
for _ in range(3):
count = controller.count
await asyncio.sleep(controller.counter.period + 0.01)
assert controller.count > count
def test_controller_api():
class MyTestController(Controller):
attr1: AttrRW[int] = AttrRW(Int())
def __init__(self):
super().__init__(description="Controller for testing")
self.attr2 = AttrRW(Int())
@command()
async def do_nothing(self):
pass
@scan(1.0)
async def scan_nothing(self):
pass
controller = MyTestController()
api = build_controller_api(controller)
assert api.description == controller.description
assert list(api.attributes) == ["attr1", "attr2"]
assert list(api.command_methods) == ["do_nothing"]
assert list(api.scan_methods) == ["scan_nothing"]
@pytest.mark.asyncio
async def test_controller_api_methods():
class MyTestController(Controller):
def __init__(self):
super().__init__()
async def initialise(self):
async def do_nothing_dynamic() -> None:
pass
self.do_nothing_dynamic = Command(do_nothing_dynamic)
@command()
async def do_nothing_static(self):
pass
controller = MyTestController()
loop = asyncio.get_event_loop()
transport_options = []
fastcs = FastCS(controller, transport_options, loop)
asyncio.create_task(fastcs.serve(interactive=False))
await asyncio.sleep(0.1)
await controller.do_nothing_static()
await controller.do_nothing_dynamic()
await fastcs.controller_api.command_methods["do_nothing_static"]()
await fastcs.controller_api.command_methods["do_nothing_dynamic"]()
@pytest.mark.asyncio
async def test_update_periods():
@dataclass
class AttributeIORefTimesCalled(AttributeIORef):
update_period: float | None = None
_times_called = 0
class AttributeIOTimesCalled(AttributeIO[int, AttributeIORefTimesCalled]):
async def update(self, attr: AttrR[int, AttributeIORefTimesCalled]):
attr.io_ref._times_called += 1
await attr.update(attr.io_ref._times_called)
class MyController(Controller):
update_once = AttrR(Int(), io_ref=AttributeIORefTimesCalled(update_period=ONCE))
update_quickly = AttrR(
Int(), io_ref=AttributeIORefTimesCalled(update_period=0.1)
)
update_never = AttrR(
Int(), io_ref=AttributeIORefTimesCalled(update_period=None)
)
controller = MyController(ios=[AttributeIOTimesCalled()])
loop = asyncio.get_event_loop()
transport_options = []
fastcs = FastCS(controller, transport_options, loop)
assert controller.update_quickly.get() == 0
assert controller.update_once.get() == 0
assert controller.update_never.get() == 0
asyncio.create_task(fastcs.serve(interactive=False))
await asyncio.sleep(0.5)
assert controller.update_quickly.get() > 1
assert controller.update_once.get() == 1
assert controller.update_never.get() == 0
assert len(fastcs._scan_tasks) == 1
assert len(fastcs._initial_coros) == 1
@pytest.mark.asyncio
async def test_scan_raises_exception_via_callback():
class MyTestController(Controller):
def __init__(self):
super().__init__()
@scan(0.1)
async def raise_exception(self):
raise ValueError("Scan Exception")
controller = MyTestController()
loop = asyncio.get_event_loop()
transport_options = []
fastcs = FastCS(controller, transport_options, loop)
exception_info = {}
# This will intercept the exception raised in _scan_done
loop.set_exception_handler(
lambda _loop, context: exception_info.update(
{"exception": context.get("exception")}
)
)
task = asyncio.create_task(fastcs.serve(interactive=False))
# This allows scan time to run
await asyncio.sleep(0.2)
for task in fastcs._scan_tasks:
internal_exception = task.exception()
assert internal_exception
# The task exception comes from scan method raise_exception
assert isinstance(internal_exception, ValueError)
assert "Scan Exception" == str(internal_exception)
@pytest.mark.asyncio
async def test_controller_connect_disconnect():
class MyTestController(Controller):
def __init__(self):
super().__init__()
self.connected = False
async def connect(self):
self.connected = True
async def disconnect(self):
self.connected = False
controller = MyTestController()
loop = asyncio.get_event_loop()
fastcs = FastCS(controller, [], loop)
task = asyncio.create_task(fastcs.serve(interactive=False))
# connect is called at the start of serve
await asyncio.sleep(0.1)
assert controller.connected
task.cancel()
# disconnect is called at the end of serve
await asyncio.sleep(0.1)
assert not controller.connected