diff --git a/examples/decode_sensor_binary_log.py b/examples/decode_sensor_binary_log.py index 44b5cf62d..40a710b7d 100755 --- a/examples/decode_sensor_binary_log.py +++ b/examples/decode_sensor_binary_log.py @@ -80,8 +80,10 @@ class PingViewerLogReader: # timestamp format for recovery hh:mm:ss.xxx # includes optional \x00 (null byte) before every character because Windows TIMESTAMP_FORMAT = re.compile( - b'(\x00?\d){2}(\x00?:\x00?[0-5]\x00?\d){2}\x00?\.(\x00?\d){3}') + b'(\x00?\\d){2}(\x00?:\x00?[0-5]\x00?\\d){2}\x00?\\.(\x00?\\d){3}') MAX_TIMESTAMP_LENGTH = 12 * 2 + # byte encoding assumes posix until proven otherwise + ENCODING = 'UTF-8' def __init__(self, filename: str): self.filename = filename @@ -107,20 +109,17 @@ def unpack_array(cls, file: IO[Any]): if array_size <= cls.MAX_ARRAY_LENGTH: return file.read(array_size) - @classmethod - def unpack_string(cls, file: IO[Any]): - return cls.unpack_array(file).decode('UTF-8') + def unpack_string(self, file: IO[Any]): + return self.unpack_array(file).decode(self.ENCODING) - @classmethod - def unpack_message(cls, file: IO[Any]): - timestamp = cls.unpack_string(file) - message = cls.unpack_array(file) + def unpack_message(self, file: IO[Any]): + timestamp = self.unpack_string(file) + message = self.unpack_array(file) if message is None: - return cls.recover(file) + return self.recover(file) return (timestamp, message) - @classmethod - def recover(cls, file: IO[Any]): + def recover(self, file: IO[Any]): """ Attempt to recover from a failed read. Assumed that a bad number has been read from the last cls.UINT.size @@ -129,32 +128,32 @@ def recover(cls, file: IO[Any]): """ # TODO: log when recovery attempts occur, and bytes lost when they succeed - file.seek(current_pos := (file.tell() - cls.UINT.size)) + file.seek(current_pos := (file.tell() - self.UINT.size)) prev_ = next_ = b'' start = amount_read = 0 - while not (match := cls.TIMESTAMP_FORMAT.search( + while not (match := self.TIMESTAMP_FORMAT.search( roi := (prev_ + next_), start)): prev_ = next_ - next_ = file.read(cls.MAX_ARRAY_LENGTH) + next_ = file.read(self.MAX_ARRAY_LENGTH) if not next_: break # run out of file - amount_read += cls.MAX_ARRAY_LENGTH + amount_read += self.MAX_ARRAY_LENGTH if start == 0 and prev_: # onto the second read # -> match on potential overlap + new region, not the # already-checked (impossible) region - start = cls.MAX_ARRAY_LENGTH - cls.MAX_TIMESTAMP_LENGTH + start = self.MAX_ARRAY_LENGTH - self.MAX_TIMESTAMP_LENGTH else: # match was found end = match.end() - timestamp = roi[match.start():end].decode('UTF-8') + timestamp = roi[match.start():end].decode(self.ENCODING) amount_read -= (len(roi) - end) self.failed_bytes += amount_read # return the file pointer to the end of this timestamp file.seek(current_pos + amount_read) # attempt to extract the corresponding message, or recover anew - if (message := cls.unpack_array(file)) is None: - return cls.recover(file) + if (message := self.unpack_array(file)) is None: + return self.recover(file) return (timestamp, message) # Calculate bytes from start of recovery attempt to end of the file file_size = file.tell() @@ -165,6 +164,10 @@ def recover(cls, file: IO[Any]): def unpack_header(self, file: IO[Any]): self.header.string = self.unpack_string(file) + if '\x00' in self.header.string: + # Windows uses big-endian wide characters + self.ENCODING = 'UTF-16-be' + self.header.string = self.header.string.encode('UTF-8').decode(self.ENCODING) self.header.version = self.unpack_int(file) for info in ('hash_commit', 'date', 'tag', 'os_name', 'os_version'):