-
Notifications
You must be signed in to change notification settings - Fork 17
Add typing to worker module #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,20 +5,25 @@ | |
| import json | ||
| import os | ||
| import sys | ||
| from typing import Sequence, Any, NoReturn | ||
|
|
||
| DEFAULT_SECRET_PHRASES = ("key", "password", "secret") | ||
| DEFAULT_SECRET_PHRASES: Sequence[str] = ("key", "password", "secret") | ||
|
|
||
|
|
||
| class Worker: | ||
| READ_TIMEOUT = 3 # seconds | ||
|
|
||
| def __init__(self, job_directory, secret_phrases): | ||
| def __init__( | ||
| self, | ||
| job_directory: str | None, | ||
| secret_phrases: Sequence[str] | None = None, | ||
| ) -> None: | ||
| if job_directory is None: | ||
| if len(sys.argv) > 1: | ||
| job_directory = sys.argv[1] | ||
| else: | ||
| job_directory = "/job" | ||
| self.job_directory = job_directory | ||
| self.job_directory: str | None = job_directory | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May look unnecessary but without this explicit annotation it clashes with L40 |
||
| if secret_phrases is None: | ||
| self.secret_phrases = DEFAULT_SECRET_PHRASES | ||
| else: | ||
|
|
@@ -57,20 +62,20 @@ def __init__(self, job_directory, secret_phrases): | |
| self.__set_proxies() | ||
|
|
||
| # Finally run check tlp | ||
| if not (self.__check_tlp()): | ||
| if not self.__check_tlp(): | ||
| self.error("TLP is higher than allowed.") | ||
|
|
||
| if not (self.__check_pap()): | ||
| if not self.__check_pap(): | ||
| self.error("PAP is higher than allowed.") | ||
|
|
||
| def __set_proxies(self): | ||
| def __set_proxies(self) -> None: | ||
| if self.http_proxy is not None: | ||
| os.environ["http_proxy"] = self.http_proxy | ||
| if self.https_proxy is not None: | ||
| os.environ["https_proxy"] = self.https_proxy | ||
|
|
||
| @staticmethod | ||
| def __set_encoding(): | ||
| def __set_encoding() -> None: | ||
| try: | ||
| if sys.stdout.encoding != "UTF-8": | ||
| sys.stdout = codecs.getwriter("utf-8")(sys.stdout.buffer, "strict") | ||
|
|
@@ -79,7 +84,13 @@ def __set_encoding(): | |
| except Exception: | ||
| pass # nosec B110 | ||
|
|
||
| def __get_param(self, source, name, default=None, message=None): | ||
| def __get_param( | ||
| self, | ||
| source: dict, | ||
| name: str | list[str], | ||
| default: Any = None, | ||
| message: str | None = None, | ||
| ) -> Any: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without validation I'm not convinced it is worth it to narrow the return type beyond Other option is to break compatibility and add a function with a |
||
| """Extract a specific parameter from given source. | ||
| :param source: Python dict to search through | ||
| :param name: Name of the parameter to get. JSON-like syntax, | ||
|
|
@@ -104,17 +115,17 @@ def __get_param(self, source, name, default=None, message=None): | |
| self.error(message) | ||
| return default | ||
|
|
||
| def __check_tlp(self): | ||
| def __check_tlp(self) -> bool: | ||
| """Check if tlp is okay or not; returns False if too high.""" | ||
|
|
||
| return not (self.enable_check_tlp and self.tlp > self.max_tlp) | ||
|
|
||
| def __check_pap(self): | ||
| def __check_pap(self) -> bool: | ||
| """Check if pap is okay or not; returns False if too high.""" | ||
|
|
||
| return not (self.enable_check_pap and self.pap > self.max_pap) | ||
|
|
||
| def __write_output(self, data, ensure_ascii=False): | ||
| def __write_output(self, data: dict, ensure_ascii: bool = False) -> None: | ||
| if self.job_directory is None: | ||
| json.dump(data, sys.stdout, ensure_ascii=ensure_ascii) | ||
| else: | ||
|
|
@@ -124,31 +135,35 @@ def __write_output(self, data, ensure_ascii=False): | |
| with open(output_path, mode="w") as f_output: | ||
| json.dump(data, f_output, ensure_ascii=ensure_ascii) | ||
|
|
||
| def get_data(self): | ||
| def get_data(self) -> Any: | ||
| """Wrapper for getting data from input dict. | ||
|
|
||
| :return: Data (observable value) given through Cortex""" | ||
| return self.get_param("data", None, "Missing data field") | ||
|
|
||
| @staticmethod | ||
| def build_operation(op_type, **parameters): | ||
| def build_operation(op_type: str, **parameters: dict) -> dict: | ||
| """ | ||
| :param op_type: an operation type as a string | ||
| :param parameters: a dict including the operation's params | ||
| :return: dict | ||
| """ | ||
| operation = {"type": op_type} | ||
| operation: dict = {"type": op_type} | ||
| operation.update(parameters) | ||
|
|
||
| return operation | ||
|
|
||
| def operations(self, raw): | ||
| def operations(self, raw: dict) -> list[dict]: | ||
| """Returns the list of operations to be executed after the job completes | ||
|
|
||
| :returns: by default return an empty array""" | ||
| return [] | ||
|
|
||
| def get_param(self, name, default=None, message=None): | ||
| def get_param( | ||
| self, | ||
| name: str, | ||
| default: Any = None, | ||
| message: str | None = None, | ||
| ) -> Any: | ||
| """Just a wrapper for Analyzer.__get_param. | ||
| :param name: Name of the parameter to get. | ||
| JSON-like syntax, e.g. `config.username` | ||
|
|
@@ -159,7 +174,12 @@ def get_param(self, name, default=None, message=None): | |
|
|
||
| return self.__get_param(self._input, name, default, message) | ||
|
|
||
| def get_env(self, key, default=None, message=None): | ||
| def get_env( | ||
| self, | ||
| key: str, | ||
| default: Any = None, | ||
| message: str | None = None, | ||
| ) -> Any: | ||
| """Wrapper for getting configuration values from the environment. | ||
| :param key: Key of the environment variable to get. | ||
| :param default: Default value, if not found. Default: None | ||
|
|
@@ -174,7 +194,7 @@ def get_env(self, key, default=None, message=None): | |
| self.error(message) | ||
| return default | ||
|
|
||
| def error(self, message, ensure_ascii=False): | ||
| def error(self, message: str, ensure_ascii: bool = False) -> NoReturn: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As the issue states, I think here is 80% of the benefit of typing this class |
||
| """Stop analyzer with an error message. | ||
|
|
||
| Changing ensure_ascii can be helpful when stuck with ascii <-> utf-8 issues. | ||
|
|
@@ -203,25 +223,25 @@ def error(self, message, ensure_ascii=False): | |
| # Force exit after error | ||
| sys.exit(1) | ||
|
|
||
| def summary(self, raw): | ||
| def summary(self, raw: dict) -> dict: | ||
| """Returns a summary, needed for 'short.html' template. | ||
|
|
||
| Overwrite it for your needs! | ||
|
|
||
| :returns: by default return an empty dict""" | ||
| return {} | ||
|
|
||
| def artifacts(self, raw): | ||
| def artifacts(self, raw: dict) -> list[dict]: | ||
| return [] | ||
|
|
||
| def report(self, output, ensure_ascii=False): | ||
| def report(self, output: dict, ensure_ascii: bool = False) -> None: | ||
| """Returns a json dict via stdout. | ||
|
|
||
| :param output: worker output. | ||
| :param ensure_ascii: Force ascii output. Default: False""" | ||
|
|
||
| self.__write_output(output, ensure_ascii=ensure_ascii) | ||
|
|
||
| def run(self): | ||
| def run(self) -> None: | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Return |
||
| """Overwritten by analyzers""" | ||
| pass | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks unused. Again, chance to break compatibility