|
| 1 | +import getpass |
1 | 2 | import warnings |
2 | 3 | from datetime import datetime |
3 | | -from typing import Optional |
| 4 | +from typing import TYPE_CHECKING, Literal, Optional |
4 | 5 |
|
5 | 6 | import dateutil.parser |
6 | 7 | import requests |
7 | 8 |
|
8 | 9 | from marble_client.exceptions import ServiceNotAvailableError |
9 | 10 | from marble_client.services import MarbleService |
| 11 | +from marble_client.utils import check_rich_output_shell |
| 12 | + |
| 13 | +if TYPE_CHECKING: |
| 14 | + from marble_client.client import MarbleClient |
10 | 15 |
|
11 | 16 | __all__ = ["MarbleNode"] |
12 | 17 |
|
13 | 18 |
|
14 | 19 | class MarbleNode: |
15 | 20 | """A node in the Marble network.""" |
16 | 21 |
|
17 | | - def __init__(self, nodeid: str, jsondata: dict[str]) -> None: |
| 22 | + def __init__(self, nodeid: str, jsondata: dict[str], client: "MarbleClient") -> None: |
18 | 23 | self._nodedata = jsondata |
19 | 24 | self._id = nodeid |
20 | 25 | self._name = jsondata["name"] |
| 26 | + self._client = client |
21 | 27 |
|
22 | 28 | self._links_service = None |
23 | 29 | self._links_collection = None |
@@ -159,3 +165,102 @@ def __contains__(self, service: str) -> bool: |
159 | 165 | def __repr__(self) -> str: |
160 | 166 | """Return a repr containing id and name.""" |
161 | 167 | return f"<{self.__class__.__name__}(id: '{self.id}', name: '{self.name}')>" |
| 168 | + |
| 169 | + def _login(self, session: requests.Session, user_name: str | None, password: str | None) -> None: |
| 170 | + if user_name is None or not user_name.strip(): |
| 171 | + raise RuntimeError("Username or email is required") |
| 172 | + if password is None or not password.strip(): |
| 173 | + raise RuntimeError("Password is required") |
| 174 | + response = session.post( |
| 175 | + self.url.rstrip("/") + "/magpie/signin", |
| 176 | + json={"user_name": user_name, "password": password}, |
| 177 | + ) |
| 178 | + if response.ok: |
| 179 | + return response.json().get("detail", "Success") |
| 180 | + try: |
| 181 | + raise RuntimeError(response.json().get("detail", "Unable to log in")) |
| 182 | + except requests.exceptions.JSONDecodeError as e: |
| 183 | + raise RuntimeError("Unable to log in") from e |
| 184 | + |
| 185 | + def _widget_login(self, session: requests.Session) -> tuple[str, str]: |
| 186 | + import ipywidgets # type: ignore |
| 187 | + from IPython.display import display # type: ignore |
| 188 | + |
| 189 | + font_family = "Helvetica Neue" |
| 190 | + font_size = "16px" |
| 191 | + primary_colour = "#304FFE" |
| 192 | + label_style = {"font_family": font_family, "font_size": font_size, "text_color": primary_colour} |
| 193 | + input_style = {"description_width": "initial"} |
| 194 | + button_style = { |
| 195 | + "font_family": font_family, |
| 196 | + "font_size": font_size, |
| 197 | + "button_color": primary_colour, |
| 198 | + "text_color": "white", |
| 199 | + } |
| 200 | + credentials = {} |
| 201 | + |
| 202 | + username_label = ipywidgets.Label(value="Username or email", style=label_style) |
| 203 | + username_input = ipywidgets.Text(style=input_style) |
| 204 | + password_label = ipywidgets.Label(value="Password", style=label_style) |
| 205 | + password_input = ipywidgets.Password(style=input_style) |
| 206 | + login_button = ipywidgets.Button(description="Login", tooltip="Login", style=button_style) |
| 207 | + output = ipywidgets.Output() |
| 208 | + widgets = ipywidgets.VBox( |
| 209 | + [username_label, username_input, password_label, password_input, login_button, output] |
| 210 | + ) |
| 211 | + |
| 212 | + def _on_username_change(change: dict) -> None: |
| 213 | + try: |
| 214 | + credentials["user_name"] = change["new"] |
| 215 | + except KeyError as e: |
| 216 | + raise Exception(str(e), change) |
| 217 | + |
| 218 | + username_input.observe(_on_username_change, names="value") |
| 219 | + |
| 220 | + def _on_password_change(change: dict) -> None: |
| 221 | + credentials["password"] = change["new"] |
| 222 | + |
| 223 | + password_input.observe(_on_password_change, names="value") |
| 224 | + |
| 225 | + def _on_login_click(*_) -> None: |
| 226 | + output.clear_output() |
| 227 | + with output: |
| 228 | + try: |
| 229 | + message = self._login(session, credentials.get("user_name"), credentials.get("password")) |
| 230 | + except RuntimeError as e: |
| 231 | + display(ipywidgets.Label(value=str(e), style={**label_style, "text_color": "red"})) |
| 232 | + else: |
| 233 | + display(ipywidgets.Label(value=message, style={**label_style, "text_color": "green"})) |
| 234 | + |
| 235 | + login_button.on_click(_on_login_click) |
| 236 | + display(widgets) |
| 237 | + |
| 238 | + def _stdin_login(self, session: requests.Session) -> tuple[str, str]: |
| 239 | + message = self._login(session, input("Username or email: "), getpass.getpass("Password: ")) |
| 240 | + print(message) |
| 241 | + |
| 242 | + def login( |
| 243 | + self, session: requests.Session | None = None, input_type: Literal["stdin", "widget"] | None = None |
| 244 | + ) -> requests.Session: |
| 245 | + """ |
| 246 | + Return a requests session containing login cookies for this node. |
| 247 | +
|
| 248 | + This will get user name and password using user input using jupyter widgets |
| 249 | + if available. Otherwise it will prompt the user to input details from stdin. |
| 250 | +
|
| 251 | + If you want to force the function to use either stdin or widgets specify "stdin" |
| 252 | + or "widget" as the input type. Otherwise, this function will make its best guess |
| 253 | + which one to use. |
| 254 | + """ |
| 255 | + if session is None: |
| 256 | + session = requests.Session() |
| 257 | + if input_type is None: |
| 258 | + input_type = "widget" if check_rich_output_shell() else "stdin" |
| 259 | + if input_type == "widget": |
| 260 | + self._widget_login(session) |
| 261 | + elif input_type == "stdin": |
| 262 | + self._stdin_login(session) |
| 263 | + else: |
| 264 | + raise TypeError("input_type must be one of 'stdin', 'widget' or None.") |
| 265 | + |
| 266 | + return session |
0 commit comments