Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions examples/decode_sensor_binary_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ class PingViewerLogReader:
# timestamp format for recovery hh:mm:ss.xxx
# includes optional \x00 (null byte) before every character because Windows
TIMESTAMP_FORMAT = re.compile(
b'(\x00?\d){2}(\x00?:\x00?[0-5]\x00?\d){2}\x00?\.(\x00?\d){3}')
b'(\x00?\\d){2}(\x00?:\x00?[0-5]\x00?\\d){2}\x00?\\.(\x00?\\d){3}')
MAX_TIMESTAMP_LENGTH = 12 * 2
# byte encoding assumes posix until proven otherwise
ENCODING = 'UTF-8'

def __init__(self, filename: str):
self.filename = filename
Expand All @@ -107,20 +109,17 @@ def unpack_array(cls, file: IO[Any]):
if array_size <= cls.MAX_ARRAY_LENGTH:
return file.read(array_size)

@classmethod
def unpack_string(cls, file: IO[Any]):
return cls.unpack_array(file).decode('UTF-8')
def unpack_string(self, file: IO[Any]):
Comment thread
ES-Alexander marked this conversation as resolved.
return self.unpack_array(file).decode(self.ENCODING)

@classmethod
def unpack_message(cls, file: IO[Any]):
timestamp = cls.unpack_string(file)
message = cls.unpack_array(file)
def unpack_message(self, file: IO[Any]):
timestamp = self.unpack_string(file)
message = self.unpack_array(file)
if message is None:
return cls.recover(file)
return self.recover(file)
return (timestamp, message)

@classmethod
def recover(cls, file: IO[Any]):
def recover(self, file: IO[Any]):
""" Attempt to recover from a failed read.

Assumed that a bad number has been read from the last cls.UINT.size
Expand All @@ -129,32 +128,32 @@ def recover(cls, file: IO[Any]):

"""
# TODO: log when recovery attempts occur, and bytes lost when they succeed
file.seek(current_pos := (file.tell() - cls.UINT.size))
file.seek(current_pos := (file.tell() - self.UINT.size))
prev_ = next_ = b''
start = amount_read = 0
while not (match := cls.TIMESTAMP_FORMAT.search(
while not (match := self.TIMESTAMP_FORMAT.search(
roi := (prev_ + next_), start)):
prev_ = next_
next_ = file.read(cls.MAX_ARRAY_LENGTH)
next_ = file.read(self.MAX_ARRAY_LENGTH)
if not next_:
break # run out of file
amount_read += cls.MAX_ARRAY_LENGTH
amount_read += self.MAX_ARRAY_LENGTH
if start == 0 and prev_:
# onto the second read
# -> match on potential overlap + new region, not the
# already-checked (impossible) region
start = cls.MAX_ARRAY_LENGTH - cls.MAX_TIMESTAMP_LENGTH
start = self.MAX_ARRAY_LENGTH - self.MAX_TIMESTAMP_LENGTH
else:
# match was found
end = match.end()
timestamp = roi[match.start():end].decode('UTF-8')
timestamp = roi[match.start():end].decode(self.ENCODING)
amount_read -= (len(roi) - end)
self.failed_bytes += amount_read
# return the file pointer to the end of this timestamp
file.seek(current_pos + amount_read)
# attempt to extract the corresponding message, or recover anew
if (message := cls.unpack_array(file)) is None:
return cls.recover(file)
if (message := self.unpack_array(file)) is None:
return self.recover(file)
return (timestamp, message)
# Calculate bytes from start of recovery attempt to end of the file
file_size = file.tell()
Expand All @@ -165,6 +164,10 @@ def recover(cls, file: IO[Any]):

def unpack_header(self, file: IO[Any]):
self.header.string = self.unpack_string(file)
if '\x00' in self.header.string:
# Windows uses big-endian wide characters
self.ENCODING = 'UTF-16-be'
self.header.string = self.header.string.encode('UTF-8').decode(self.ENCODING)
self.header.version = self.unpack_int(file)

for info in ('hash_commit', 'date', 'tag', 'os_name', 'os_version'):
Expand Down
Loading