Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added cortexutils/py.typed
Empty file.
66 changes: 43 additions & 23 deletions cortexutils/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,25 @@
import json
import os
import sys
from typing import Sequence, Any, NoReturn

DEFAULT_SECRET_PHRASES = ("key", "password", "secret")
DEFAULT_SECRET_PHRASES: Sequence[str] = ("key", "password", "secret")


class Worker:
READ_TIMEOUT = 3 # seconds

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks unused. Again, chance to break compatibility


def __init__(self, job_directory, secret_phrases):
def __init__(
self,
job_directory: str | None,
secret_phrases: Sequence[str] | None = None,
) -> None:
if job_directory is None:
if len(sys.argv) > 1:
job_directory = sys.argv[1]
else:
job_directory = "/job"
self.job_directory = job_directory
self.job_directory: str | None = job_directory

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May look unnecessary but without this explicit annotation it clashes with L40 self.job_directory = None

if secret_phrases is None:
self.secret_phrases = DEFAULT_SECRET_PHRASES
else:
Expand Down Expand Up @@ -57,20 +62,20 @@ def __init__(self, job_directory, secret_phrases):
self.__set_proxies()

# Finally run check tlp
if not (self.__check_tlp()):
if not self.__check_tlp():
self.error("TLP is higher than allowed.")

if not (self.__check_pap()):
if not self.__check_pap():
self.error("PAP is higher than allowed.")

def __set_proxies(self):
def __set_proxies(self) -> None:
if self.http_proxy is not None:
os.environ["http_proxy"] = self.http_proxy
if self.https_proxy is not None:
os.environ["https_proxy"] = self.https_proxy

@staticmethod
def __set_encoding():
def __set_encoding() -> None:
try:
if sys.stdout.encoding != "UTF-8":
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.buffer, "strict")
Expand All @@ -79,7 +84,13 @@ def __set_encoding():
except Exception:
pass # nosec B110

def __get_param(self, source, name, default=None, message=None):
def __get_param(
self,
source: dict,
name: str | list[str],
default: Any = None,
message: str | None = None,
) -> Any:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without validation I'm not convinced it is worth it to narrow the return type beyond Any, but we could narrow it down to a JSON subset dict | list | str | int | float | bool | None (https://docs.python.org/3/library/json.html#json.JSONEncoder).

Other option is to break compatibility and add a function with a expected_type param that errors if an isinstance check fails, for example.

"""Extract a specific parameter from given source.
:param source: Python dict to search through
:param name: Name of the parameter to get. JSON-like syntax,
Expand All @@ -104,17 +115,17 @@ def __get_param(self, source, name, default=None, message=None):
self.error(message)
return default

def __check_tlp(self):
def __check_tlp(self) -> bool:
"""Check if tlp is okay or not; returns False if too high."""

return not (self.enable_check_tlp and self.tlp > self.max_tlp)

def __check_pap(self):
def __check_pap(self) -> bool:
"""Check if pap is okay or not; returns False if too high."""

return not (self.enable_check_pap and self.pap > self.max_pap)

def __write_output(self, data, ensure_ascii=False):
def __write_output(self, data: dict, ensure_ascii: bool = False) -> None:
if self.job_directory is None:
json.dump(data, sys.stdout, ensure_ascii=ensure_ascii)
else:
Expand All @@ -124,31 +135,35 @@ def __write_output(self, data, ensure_ascii=False):
with open(output_path, mode="w") as f_output:
json.dump(data, f_output, ensure_ascii=ensure_ascii)

def get_data(self):
def get_data(self) -> Any:
"""Wrapper for getting data from input dict.

:return: Data (observable value) given through Cortex"""
return self.get_param("data", None, "Missing data field")

@staticmethod
def build_operation(op_type, **parameters):
def build_operation(op_type: str, **parameters: dict) -> dict:
"""
:param op_type: an operation type as a string
:param parameters: a dict including the operation's params
:return: dict
"""
operation = {"type": op_type}
operation: dict = {"type": op_type}
operation.update(parameters)

return operation

def operations(self, raw):
def operations(self, raw: dict) -> list[dict]:
"""Returns the list of operations to be executed after the job completes

:returns: by default return an empty array"""
return []

def get_param(self, name, default=None, message=None):
def get_param(
self,
name: str,
default: Any = None,
message: str | None = None,
) -> Any:
"""Just a wrapper for Analyzer.__get_param.
:param name: Name of the parameter to get.
JSON-like syntax, e.g. `config.username`
Expand All @@ -159,7 +174,12 @@ def get_param(self, name, default=None, message=None):

return self.__get_param(self._input, name, default, message)

def get_env(self, key, default=None, message=None):
def get_env(
self,
key: str,
default: Any = None,
message: str | None = None,
) -> Any:
"""Wrapper for getting configuration values from the environment.
:param key: Key of the environment variable to get.
:param default: Default value, if not found. Default: None
Expand All @@ -174,7 +194,7 @@ def get_env(self, key, default=None, message=None):
self.error(message)
return default

def error(self, message, ensure_ascii=False):
def error(self, message: str, ensure_ascii: bool = False) -> NoReturn:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the issue states, I think here is 80% of the benefit of typing this class

"""Stop analyzer with an error message.

Changing ensure_ascii can be helpful when stuck with ascii <-> utf-8 issues.
Expand Down Expand Up @@ -203,25 +223,25 @@ def error(self, message, ensure_ascii=False):
# Force exit after error
sys.exit(1)

def summary(self, raw):
def summary(self, raw: dict) -> dict:
"""Returns a summary, needed for 'short.html' template.

Overwrite it for your needs!

:returns: by default return an empty dict"""
return {}

def artifacts(self, raw):
def artifacts(self, raw: dict) -> list[dict]:
return []

def report(self, output, ensure_ascii=False):
def report(self, output: dict, ensure_ascii: bool = False) -> None:
"""Returns a json dict via stdout.

:param output: worker output.
:param ensure_ascii: Force ascii output. Default: False"""

self.__write_output(output, ensure_ascii=ensure_ascii)

def run(self):
def run(self) -> None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return -> None might be opinionated here. We may want users to have more freedom if run should ever return something

"""Overwritten by analyzers"""
pass
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ dev = ["cortexutils[audit, lint, test, build]", "nox"]
[tool.setuptools.packages.find]
include = ["cortexutils*"]

[tool.setuptools.package-data]
cortexutils = ["py.typed"]

[tool.coverage.run]
omit = ["tests/*"]

Expand Down
Loading