-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpart_2.py
More file actions
64 lines (47 loc) · 2.04 KB
/
part_2.py
File metadata and controls
64 lines (47 loc) · 2.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from cactus_sdk import INCOMING_EVENTS, CactusClient, CactusHandler
"""
Instructions:
1. Replace the `event_handler` with one built into `MyHandler`. It should behave exactly like the code from part 1. Consider starting by moving your existing code into the class's `on_other` method.
2. Ensure there are no runtime errors
3. remove the `event_handler` function; it's no longer needed
"""
client = CactusClient()
class MyHandler(CactusHandler):
pass # TODO: implement me!
handler = MyHandler()
def event_handler(body: str):
thin_event = client.parse_event_v2(body)
if thin_event.type == "order.shipped":
order_id = thin_event.related_object.id
order = client.retrieve_order(order_id)
print(f" Created a database record for {order.id} w/ {order.num_items=}")
elif thin_event.type == "order.delivery_attempted":
event = thin_event.pull()
print(
f" Order {event.related_object.id} has been delivered after {event.data.attempt_num} attempt(s)!"
)
elif thin_event.type == "order.lost":
event = thin_event.pull()
print(
f" An order was last seen in {event.data.last_seen_city}... we have no additional information"
)
elif thin_event.type == "movie.started":
movie_id = thin_event.related_object.id
movie = client.retrieve_movie(movie_id)
print(f" Someone started watching {movie.title}")
elif thin_event.type == "movie.completed":
event = thin_event.pull()
movie = thin_event.fetch_related_object()
print(
f" User {event.data.user} just finished {movie.title} ({movie.release_year}) and rated it {event.data.rating} stars."
)
else:
raise ValueError(f'unhandled event with type "{thin_event.type}"')
if __name__ == "__main__":
for idx, body_str in enumerate(INCOMING_EVENTS):
print(f"\n== parsing event {idx}")
try:
handler.handle(body_str)
except:
print(" failed to handle:", body_str, "\n")
raise