Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# CCSDSPy AI Coding Instructions

## Project Overview
CCSDSPy is a Python library for reading tightly packed bits in CCSDS (Consultative Committee for Space Data Systems) format, used by NASA and ESA missions. It provides an object-oriented API for decoding fixed-length and variable-length packets from binary files.

## Core Architecture
- **Packet Types**: `FixedLength` for uniform packets, `VariableLength` for dynamic packets
- **Field Definitions**: `PacketField` for scalar fields, `PacketArray` for multidimensional arrays
- **Data Types**: `uint`, `int`, `float`, `str`, `fill` (padding)
- **Decoding**: Bit-level parsing with configurable byte order (big/little endian or custom)
- **Post-processing**: `Converter` subclasses for calibration, enums, datetime parsing

## Key Patterns
### Packet Definition
```python
from ccsdspy import FixedLength, PacketField, PacketArray

pkt = FixedLength([
PacketField(name='SHCOARSE', data_type='uint', bit_length=32),
PacketField(name='VOLTAGE', data_type='int', bit_length=8),
PacketArray(name='SENSOR_GRID', data_type='uint', bit_length=16,
array_shape=(32, 32), array_order='C'),
])
```

### Loading Data
```python
result = pkt.load('data.bin') # Returns dict of numpy arrays
```

### CSV-Based Definitions
Packets can be defined via CSV files with columns: `name,data_type,bit_length[,bit_offset]`

## Development Workflow
- **Install**: `pip install -e '.[dev]'` for development dependencies
- **Test**: `pytest --pyargs ccsdspy --cov ccsdspy`
- **Lint**: `flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics`
- **Format**: `black --check --diff ccsdspy`
- **Docs**: `sphinx-build docs docs/_build/html -W -b html`

## Conventions
- **Imports**: Use relative imports within package (`from .. import ...`)
- **Naming**: Descriptive field names matching telemetry mnemonics
- **Bit Offsets**: Automatic calculation unless specified; includes 48-bit primary header
- **Byte Order**: "big" (default), "little", or custom string like "4321"
- **Array Order**: 'C' (row-major) or 'F' (column-major) for PacketArray
- **Error Handling**: Raise `ValueError`/`TypeError` for invalid inputs, log warnings for data issues

## Testing
- Use `pytest` with descriptive test function names
- Test exception raising with `pytest.raises()`
- Mock binary data using `io.BytesIO` and `struct.pack()`
- Reference test data in `ccsdspy/tests/data/`

## Dependencies
- **Core**: `numpy`, `pyyaml`, `appdirs`
- **Dev**: `pytest`, `black`, `flake8`, `coverage`, `sphinx`

## File Structure
- `ccsdspy/packet_types.py`: Main packet classes
- `ccsdspy/packet_fields.py`: Field definitions
- `ccsdspy/decode.py`: Internal decoding logic
- `ccsdspy/converters.py`: Post-processing converters
- `ccsdspy/utils.py`: Packet utilities
- `ccsdspy/tests/`: Comprehensive test suite with data fixtures</content>
<parameter name="filePath">/Users/schriste/Developer/repos/ccsdspy/.github/copilot-instructions.md
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,7 @@ Icon
._*

_version.py

# created during tests
*.ccsds
*.bin
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"python.testing.pytestArgs": [],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
24 changes: 19 additions & 5 deletions ccsdspy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,31 @@
"""
IO Interface for Reading CCSDS Data in Python.
"""
import os.path

# For egg_info test builds to pass, put package imports here.

from . import converters
from .packet_fields import PacketField, PacketArray
from .packet_types import FixedLength, VariableLength
from .utils import split_by_apid

try:
from ._version import __version__
from ._version import version_tuple
except ImportError:
__version__ = "unknown"
version_tuple = (0, 0, "unknown version")

from .logger import _init_log
from .config import load_config, print_config

# Load user configuration
config = load_config()

log = _init_log(config=config)
log.info(f"CCSDSPy version {__version__} initialized.")

_package_directory = os.path.dirname(os.path.realpath(__file__))
_data_directory = os.path.join(_package_directory, "data")
_test_data_directory = os.path.join(_package_directory, "tests", "data")

from . import converters
from .packet_fields import PacketField, PacketArray
from .packet_types import FixedLength, VariableLength
from .utils import split_by_apid
5 changes: 3 additions & 2 deletions ccsdspy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys

from .utils import split_by_apid
from ccsdspy import log


def module_main(argv=sys.argv, cwd=os.getcwd()):
Expand Down Expand Up @@ -42,14 +43,14 @@ def module_main(argv=sys.argv, cwd=os.getcwd()):

stream_by_apid = split_by_apid(args.file, valid_apids=valid_apids)

print("Parsing done!")
log.info("Parsing done!")

for apid in sorted(stream_by_apid):
if valid_apids and apid not in valid_apids:
continue

out_file_name = f"{cwd}/apid{apid:05d}.tlm"
print(f"Writing {out_file_name}")
log.info(f"Writing {out_file_name}")

with open(out_file_name, "wb") as file_out:
file_out.write(stream_by_apid[apid].read())
Expand Down
185 changes: 185 additions & 0 deletions ccsdspy/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""
This module provides configuration file functionality.

This code is based on that provided by SunPy see
licenses/SUNPY.rst
"""

from json import load
import os
import shutil
import yaml
from pathlib import Path
import warnings

from appdirs import AppDirs

import ccsdspy

__all__ = ["load_config", "copy_default_config", "print_config", "CONFIG_DIR"]

# We use AppDirs to locate and create the config directory.
dirs = AppDirs("ccsdspy", "ccsdspy")
# Default one set by AppDirs
CONFIG_DIR = dirs.user_config_dir
CACHE_DIR = dirs.user_cache_dir


def load_config():
"""
Load and read the configuration file.

If a configuration file does not exist in the user's home directory,
it will read in the defaults from the package's data directory.

The selected configuration can be overridden by setting the `ccsdspy_CONFIGDIR`
environment variable. This environment variable will take precedence
over the mission specified in the configuration file.

Returns
-------
config : dict
The loaded configuration data, as a dictionary.
"""
config_path = Path(_get_user_configdir()) / "config.yml"
if not config_path.exists():
config_path = Path(ccsdspy.__file__).parent / "data" / "config.yml"

with open(config_path, "r") as file:
config = yaml.safe_load(file)

return config


def _get_user_configdir():
"""
Return the configuration directory path.

The configuration directory is determined by the "ccsdspy_CONFIGDIR"
environment variable or a default directory set by the application.

Returns
-------
str: The path to the configuration directory.

Raises
------
RuntimeError: If the configuration directory is not writable.
"""
configdir = os.environ.get("ccsdspy_CONFIGDIR", CONFIG_DIR)
if not _is_writable_dir(configdir):
raise RuntimeError(f'Could not write to ccsdspy_CONFIGDIR="{configdir}"')
return configdir


def _is_writable_dir(path):
"""
Check if the specified path is a writable directory.

Parameters
----------
path: str or Path
The path to check.

Returns
-------
bool: True if the path is a writable directory, False otherwise.

Raises
------
FileExistsError: If a file exists at the path instead of a directory.
"""
# Worried about multiple threads creating the directory at the same time.
try:
Path(path).mkdir(parents=True, exist_ok=True)
except FileExistsError: # raised if there's an existing file instead of a directory
return False
else:
return Path(path).is_dir() and os.access(path, os.W_OK)


def copy_default_config(overwrite=False):
"""
Copy the default configuration file to the user's configuration directory.

If the configuration file already exists, it will be overwritten if the
`overwrite` parameter is set to True.

Parameters
----------
overwrite : bool
Whether to overwrite an existing configuration file.

Raises
------
RuntimeError: If the configuration directory is not writable.
"""
config_filename = "config.yml"
config_file = Path(ccsdspy.__file__).parent / "data" / config_filename

# Note: get_user_configdir() ensures directory is writable
user_config_dir = Path(_get_user_configdir())
user_config_file = user_config_dir / config_filename

if user_config_file.exists():
if overwrite:
message = (
"User config file already exists. "
"This will be overwritten with a backup written in the same location."
)
warnings.warn(message)
os.rename(str(user_config_file), str(user_config_file) + ".bak")
shutil.copyfile(config_file, user_config_file)
else:
message = (
"User config file already exists. "
"To overwrite it use `copy_default_config(overwrite=True)`"
)
warnings.warn(message)
else:
shutil.copyfile(config_file, user_config_file)


def print_config():
"""
Print the current configuration options.
"""
config = load_config()
print("FILES USED:")
for file_ in _find_config_files():
print(" " + file_)

print("\nCONFIGURATION:")
for section, settings in config.items():
if isinstance(settings, dict): # Nested configuration
print(f" [{section}]")
for option, value in settings.items():
print(f" {option} = {value}")
print("")
else: # Not a nested configuration
print(f" {section} = {settings}")


def _find_config_files():
"""
Find the locations of configuration files.

Returns
-------
list: A list of paths to the configuration files.
"""
config_files = []
config_filename = "config.yml"

# find default configuration file
module_dir = Path(ccsdspy.__file__).parent
config_files.append(str(module_dir / "data" / config_filename))

# if a user configuration file exists, add that to list of files to read
# so that any values set there will override ones specified in the default
# config file
config_path = Path(_get_user_configdir())
if config_path.joinpath(config_filename).exists():
config_files.append(str(config_path.joinpath(config_filename)))

return config_files
30 changes: 30 additions & 0 deletions ccsdspy/data/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Configuration
# This is the default configuration file

general:
# Time Format to be used for displaying time in output (e.g. graphs)
# The default time format is based on ISO8601 (replacing the T with space)
# note that the extra '%'s are escape characters
time_format: "%Y-%m-%d %H:%M:%S"

logger:
# Threshold for the logging messages. Logging messages that are less severe
# than this level will be ignored. The levels are 'DEBUG', 'INFO', 'WARNING', 'ERROR'
log_level: DEBUG

log_format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

# Whether to always log messages to a log file
log_to_file: true

# Whether the log file should be in JSON format
log_file_json: false

# The file to log messages to
log_file_path: ccsdspy.log

# Threshold for logging messages to log_file_path
log_file_level: INFO

# Format for log file entries
log_file_format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
5 changes: 3 additions & 2 deletions ccsdspy/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from __future__ import division
from collections import namedtuple
import math
import warnings


import numpy as np

from ccsdspy.constants import (
BITS_PER_BYTE,
PRIMARY_HEADER_NUM_BYTES,
)
from ccsdspy import log

__author__ = "Daniel da Silva <[email protected]>"

Expand Down Expand Up @@ -316,7 +317,7 @@ def _decode_variable_length(file_bytes, fields):
message = (
f"File appears truncated - missing {missing_bytes} bytes (or maybe garbage at end)"
)
warnings.warn(message)
log.warning(message)

npackets = len(packet_starts)

Expand Down
Loading