Source code for ccsdspy.packet_types

"""High-level Object-oriented API for the different types of packets
(FixedLength and VariableLength) supported by the package.
"""

import csv
import os
import warnings

import numpy as np

from .converters import Converter
from .decode import _decode_fixed_length, _decode_variable_length
from .packet_fields import PacketField, PacketArray


__author__ = "Daniel da Silva <mail@danieldasilva.org>"


class _BasePacket:
    """Base class of FixedLength and VariableLength. Not to be instantiated
    directly.
    """

    def _init(self, fields):
        """
        Parameters
        ----------
        fields : list of `ccsdspy.PacketField`
            Layout of packet fields contained in the definition.
        """
        # List of PacketField instances
        self._fields = fields[:]

        # Dictionary mapping input name to tuple (output_name: str, Converter instance)
        self._converters = {}

    @classmethod
    def from_file(cls, file):
        """
        Parameters
        ----------
        file : str
           Path to file on the local file system that defines the packet fields.
           Currently only supports csv files.
           See :download:`basic_csv_3col.csv <../../ccsdspy/tests/data/packet_def/basic_csv_3col.csv>`  # noqa: E501
           and :download:`extended_csv_4col.csv <../../ccsdspy/tests/data/packet_def/extended_csv_4col.csv>`  # noqa: E501

        Returns
        -------
        An instance of FixedLength.
        """
        file_extension = os.path.splitext(file)
        if file_extension[1] == ".csv":
            fields = _get_fields_csv_file(file)
        else:
            raise ValueError(f"File type {file_extension[1]} not supported.")

        return cls(fields)

    def add_converted_field(self, input_field_name, output_field_name, converter):
        """Add a converted field to the packet definition, used to apply
        post-processing transformations of decoded fields.

        Parameters
        ----------
        input_field_name : str or list/tuple
           Name of input field, or list/tuple of names of fields. There must be field(s)
           which exists in the packet definition corresponding to these name(s).
        output_field_name : str
           Name of output field. When the packet is decoded using `pkt.load()`,
           a new field named this will be present in the output dictionary.
        converter : instance of subclass of `:py:class:~ccsdspy.converters.Converter`
           A converter object to apply post-processing conversions, such as
           calibration curves or value replacement. Converter objects
           can be found in`:py:mod:~ccsdspy.converters`.

        Raises
        ------
        TypeError
           If one of the arguments is not of the correct type.
        ValueError
           The provided `input_field_name` is not present in the packet definition
        """
        if not isinstance(output_field_name, str):
            raise TypeError("output_field_name must be a str")
        if not isinstance(converter, Converter):
            raise TypeError("converter must be an instance of a Converter subclass")

        # Get tuple of input field names for storing; this handles the input_field_name
        # argument being either a str, or list/tuple
        if isinstance(input_field_name, str):
            input_field_names = (input_field_name,)
        elif isinstance(input_field_name, (list, tuple)):
            input_field_names = tuple(input_field_name)
        else:
            raise TypeError("input_field_name must be either str, list, or tuple")

        del input_field_name  # don't use the variable again in this function

        # Check that each of the input field names exists in the packet, and report
        # the missing fields if not
        # Collect valid names of fields, which include primary header fields as well
        # as fields defined in the packet.
        fields_in_packet_set = set()

        for field in _prepend_primary_header_fields(self._fields):
            fields_in_packet_set.add(field._name)

        input_field_names_set = set(input_field_names)
        all_fields_present = input_field_names_set <= fields_in_packet_set  # subset

        if not all_fields_present:
            missing_fields = input_field_names_set - fields_in_packet_set  # set op A \ B
            raise ValueError(
                "Some fields specified as inputs to converters were missing: "
                f"{sorted(missing_fields)}"
            )

        self._converters[input_field_names] = (output_field_name, converter)


[docs] class FixedLength(_BasePacket): """Define a fixed length packet to decode binary data. Fixed length packets correspond to packets that are the same length and layout every time. A common example of this is housekeeping or status messages. """ def __init__(self, fields): """ Parameters ---------- fields : list of :py:class:`~ccsdspy.PacketField` or :py:class:`~ccsdspy.PacketArray` Layout of packet fields contained in the definition. Raises ------ ValueError one or more of the arguments are invalid """ if any(isinstance(field._array_shape, str) for field in fields): raise ValueError( "The FixedLength class does not support variable fields. " "Instead, use the VariableLength class." ) self._init(fields)
[docs] def load(self, file, include_primary_header=False, reset_file_obj=False): """Decode a file-like object containing a sequence of these packets. Parameters ---------- file : str Path to file on the local file system, or file-like object include_primary_header : bool If True, provides the primary header in the output. The names of the fields are: `CCSDS_VERSION_NUMBER`, `CCSDS_PACKET_TYPE`, `CCSDS_SECONDARY_FLAG`, `CCSDS_SEQUENCE_FLAG`, `CCSDS_APID`, `CCSDS_SEQUENCE_COUNT`, and `CCSDS_PACKET_LENGTH` reset_file_obj : bool If True, leave the file object, when it is file buffer, where it was before load is called. Otherwise, (default), leave the file stream pos after the read packets. Does not apply when file is a string. Returns ------- field_arrays : dict, string to NumPy array dictionary mapping field names to NumPy arrays, with key order matching the order of fields in the packet. Warns ----- UserWarning If the ccsds sequence count is not in order UserWarning If the ccsds sequence count is missing packets UserWarning If there are more than one APID """ packet_arrays = _load( file, self._fields, self._converters, "fixed_length", include_primary_header=True, reset_file_obj=reset_file_obj, ) # inspect the primary header and issue warning if appropriate _inspect_primary_header_fields(packet_arrays) if not include_primary_header: _delete_primary_header_fields(packet_arrays) return packet_arrays
[docs] class VariableLength(_BasePacket): """Define a variable length packet to decode binary data. Variable length packets are packets which have a different length each time. Variable length fields are defined as `~ccsdspy.PacketArray` fields where `array_shape="expand"` (causing the field to grow to fill the packet) or `array_shape="other_field"` (causes the field named `other_field` to set the number of elements in this array). Please note that while this class is able to parse fixed length packets, it is much slower. Use the :py:class:`~ccsdspy.FixedLength` class instead. Rules for variable length packets: * Do only specify a `~ccsdspy.PacketArray` with the `array_shape="other_field"` when `other_field` precedes it in the packet definition * Do not provide more than one expanding `~ccsdspy.PacketArray` with `array_shape="expand"` * Do not specify the primary header fields manually * Do not specify explicit bit_offsets (they will be computed automatically) """ def __init__(self, fields): """ Parameters ---------- fields : list of :py:class:`~ccsdspy.PacketField` or :py:class:`~ccsdspy.PacketArray` Layout of packet fields contained in the definition. No more than one field should have array_shape="expand". The field must have no bit_offset's. Do not include the primary header fields. Raises ------ ValueError one or more of the arguments are invalid, or do not follow the specified rules. """ # Check there is only one expanding field in the packet definition expand_arrays = [ field for field in fields if isinstance(field, PacketArray) and field._array_shape == "expand" ] if len(expand_arrays) > 1: raise ValueError( "The VariableLength class does not support more than one field " "with array_shape='expand', as the decoding process becomes " "ambiguous." ) # Check variable fields with their sizes set by other fields only do so when # the previous field precedes it field_names = [field._name for field in fields] for i, field in enumerate(fields): if ( isinstance(field, PacketArray) and isinstance(field._array_shape, str) and field._array_shape != "expand" and field._array_shape not in field_names[:i] ): raise ValueError( "The VariableLength class requires that variable fields with " "their sizes set by other fields only do so when the " "previous field precedes it." ) # Check that bit offsets are not set if not all(field._bit_offset is None for field in fields): raise ValueError( "The VariableLength class does not support explicit bit " "offsets. You must specify the entire packet so they can be " "determined automatically." ) self._init(fields)
[docs] def load(self, file, include_primary_header=False, reset_file_obj=False): """Decode a file-like object containing a sequence of these packets. Parameters ---------- file : str Path to file on the local file system, or file-like object include_primary_header : bool If True, provides the primary header in the output. The names of the fields are: `CCSDS_VERSION_NUMBER`, `CCSDS_PACKET_TYPE`, `CCSDS_SECONDARY_FLAG`, `CCSDS_SEQUENCE_FLAG`, `CCSDS_APID`, `CCSDS_SEQUENCE_COUNT`, and `CCSDS_PACKET_LENGTH` reset_file_obj : bool If True, leave the file object, when it is file buffer, where it was before load is called. Otherwise, (default), leave the file stream pos after the read packets. Does not apply when file is a string. Returns ------- field_arrays : dict, string to NumPy array dictionary mapping field names to NumPy arrays, with key order matching the order of fields in the packet. Warns ----- UserWarning If the ccsds sequence count is not in order UserWarning If the ccsds sequence count is missing packets UserWarning If there are more than one APID """ # The variable length decoder requires the full packet definition, so if # they didn't want the primary header fields, we parse for them and then # remove them after. packet_arrays = _load( file, self._fields, self._converters, "variable_length", include_primary_header=True, reset_file_obj=reset_file_obj, ) # inspect the primary header and issue warning if appropriate _inspect_primary_header_fields(packet_arrays) if not include_primary_header: _delete_primary_header_fields(packet_arrays) return packet_arrays
def _inspect_primary_header_fields(packet_arrays): """Inspects the primary header fields. Checks for the following issues * all apids are the same * sequence count is not missing any values * sequence count is in order Parameters ----------- packet_arrays dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Modified in place """ seq_counts = packet_arrays["CCSDS_SEQUENCE_COUNT"] start, end = seq_counts[0], seq_counts[-1] missing_elements = sorted(set(range(start, end + 1)).difference(seq_counts)) if len(missing_elements) != 0: warnings.warn(f"Missing packets found {missing_elements}.", UserWarning) if not np.all(seq_counts == np.sort(seq_counts)): warnings.warn("Sequence count are out of order.", UserWarning) individual_ap_ids = set(packet_arrays["CCSDS_APID"]) if len(individual_ap_ids) != 1: warnings.warn(f"Found multiple AP IDs {individual_ap_ids}.", UserWarning) return None def _delete_primary_header_fields(packet_arrays): """Modifies in place the packet arrays dictionary to delete primary header fields. Parameters ----------- packet_arrays dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Modified in place """ header_fields = _prepend_primary_header_fields([]) for header_field in header_fields: del packet_arrays[header_field._name] def _expand_array_fields(existing_fields): """Expand arrays into multiple fields, one for each element. Returns a new list of fields as well as a data structure which can be used to reverse this process. See the `_unexpand_field_arrays()` function to reverse this process. Parameters ---------- existing_fields : list of `ccsdspy.PacketField` Layout of packet fields contained in the definition, with PacketArray Returns ------- return_fields : list of `ccsdspy.PacketField` Layout of packet fields contained in the definition, without PacketArray's expand_history : dict Dictionary mapping array name with shape/data-type and field expansions """ return_fields = [] expand_history = {} for existing_field in existing_fields: if existing_field._field_type != "array" or isinstance(existing_field._array_shape, str): return_fields.append(existing_field) continue array_shape = existing_field._array_shape array_order = existing_field._array_order index_vecs = [np.arange(dim) for dim in array_shape] index_grids = np.meshgrid(*index_vecs, indexing="ij") indices_flat = [index_grid.flatten(order=array_order) for index_grid in index_grids] expand_history[existing_field._name] = { "shape": array_shape, "data_type": existing_field._data_type, "fields": {}, } for i, indices in enumerate(zip(*indices_flat)): name = f"{existing_field._name}[{','.join(map(str,indices))}]" if existing_field._bit_offset is None: bit_offset = None else: bit_offset = existing_field._bit_offset + i * existing_field._bit_length return_field = PacketField( name=name, data_type=existing_field._data_type, bit_length=existing_field._bit_length, bit_offset=bit_offset, byte_order=existing_field._byte_order, ) expand_history[existing_field._name]["fields"][name] = indices return_fields.append(return_field) return return_fields, expand_history def _unexpand_field_arrays(field_arrays, expand_history): """Reverse the array expansion process from `_expand_array_fields`. Parameters ---------- field_arrays : dict, str to numpy array Dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Has a key for each array element. expand_history : dict Dictionary mapping array name with shape/data-type and field expansions Returns ------- return_field_arrays : dict, str to array Dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Has keys mapping to full arrays. """ npackets = list(field_arrays.values())[0].shape[0] return_field_arrays = field_arrays.copy() for array_name, array_details in expand_history.items(): array_shape = (npackets,) + array_details["shape"] array_dtype = field_arrays[list(array_details["fields"].keys())[0]].dtype array = np.zeros(array_shape, dtype=array_dtype) for element_name, indices in array_details["fields"].items(): array.__setitem__((slice(None),) + indices, field_arrays[element_name]) # get index of the position where the array field was pos = list(return_field_arrays.keys()).index(element_name) del return_field_arrays[element_name] # do the following trick to insert the unexpanded array where the expanded arrays fields were. return_field_items = list(return_field_arrays.items()) return_field_items.insert(pos, (array_name, array)) return_field_arrays = dict(return_field_items) return return_field_arrays def _prepend_primary_header_fields(existing_fields): """Helper function that prepends primary header fields to a list of packet fields, to support load(include_primary_header=True) Parameters ---------- existing_fields : list of `ccsdspy.PacketField` Non-primary header fields defined by the packet. Returns ------- New list of fields with the primary header fields prepended. """ return_fields = [ PacketField( name="CCSDS_VERSION_NUMBER", data_type="uint", bit_length=3, bit_offset=0, ), PacketField( name="CCSDS_PACKET_TYPE", data_type="uint", bit_length=1, bit_offset=3, ), PacketField( name="CCSDS_SECONDARY_FLAG", data_type="uint", bit_length=1, bit_offset=4, ), PacketField(name="CCSDS_APID", data_type="uint", bit_length=11, bit_offset=5), PacketField( name="CCSDS_SEQUENCE_FLAG", data_type="uint", bit_length=2, bit_offset=16, ), PacketField( name="CCSDS_SEQUENCE_COUNT", data_type="uint", bit_length=14, bit_offset=18, ), PacketField( name="CCSDS_PACKET_LENGTH", data_type="uint", bit_length=16, bit_offset=32, ), ] return_fields.extend(existing_fields) return return_fields def _parse_csv_array_shape(data_type_str): """Parse a data type string from a CSV to determine the array shape. Parameters ---------- data_type_str : str Full string specifying the data type, e.g. `uint(1, 2)` Returns ------- array_shape : str, int, tuple of int Parsed array shape to be used in loading CSV. """ array_shape_str = data_type_str[data_type_str.find("(") + 1 : data_type_str.find(")")] if array_shape_str == "expand": array_shape = "expand" elif "," in array_shape_str: try: array_shape = tuple(map(int, array_shape_str.split(", "))) except ValueError: raise ValueError( "Array shape must be `expand`, the name of another field, or a tuple of ints." ) else: # string is either another field for reference or a single integer for a one dimensional array shape array_shape = int(array_shape_str) if array_shape_str.isnumeric() else array_shape_str return array_shape def _get_fields_csv_file(csv_file): """Parse a simple comma-delimited file that defines a packet. Should not include the CCSDS header. The minimum set of columns are (name, data_type, bit_length). An optional bit_offset can also be provided. Parameters ---------- csv_file : str Path to file on the local file system Returns ------- fields : list A list of `PacketField` objects. """ req_columns = ["name", "data_type", "bit_length"] with open(csv_file, "r") as fp: fields = [] reader = csv.DictReader(fp, skipinitialspace=True) headers = reader.fieldnames if headers is None: raise RuntimeError("CSV file must not be empty") if not all((req_col in headers) for req_col in req_columns): raise ValueError(f"Minimum required columns are {req_columns}.") for row in reader: # skip the header row if "bit_offset" not in headers: # basic 3 col csv file if (row["data_type"].count("(") == 1) and (row["data_type"].count(")") == 1): data_type = row["data_type"].split("(")[0] array_shape = _parse_csv_array_shape(row["data_type"]) fields.append( PacketArray( name=row["name"], data_type=data_type, bit_length=int(row["bit_length"]), array_shape=array_shape, ) ) else: fields.append( PacketField( name=row["name"], data_type=row["data_type"], bit_length=int(row["bit_length"]), ) ) if "bit_offset" in headers: # extended 4 col csv file provides bit offsets # TODO: Check the consistency of bit_offsets versus previous bit_lengths if (row["data_type"].count("(") == 1) and (row["data_type"].count(")") == 1): data_type = row["data_type"].split("(")[0] array_shape = _parse_csv_array_shape(row["data_type"]) fields.append( PacketArray( name=row["name"], data_type=data_type, bit_length=int(row["bit_length"]), bit_offset=int(row["bit_offset"]), array_shape=array_shape, ) ) else: fields.append( PacketField( name=row["name"], data_type=row["data_type"], bit_length=int(row["bit_length"]), bit_offset=int(row["bit_offset"]), ) ) return fields def _load( file, fields, converters, decoder_name, include_primary_header=False, reset_file_obj=False ): """Decode a file-like object containing a sequence of these packets. Parameters ---------- file: str Path to file on the local file system, or file-like object fields : list of `ccsdspy.PacketField` Layout of packet fields contained in the definition. converters : dict, str to tuple (str, Converter) Dictionary of post-processing conversions. keys are input field names, values are tuples of (output_field_name, Converter instance) decoder_name: {'fixed_length', 'variable_length'} String identifying which decoder to use. include_primary_header: bool If True, provides the primary header in the output reset_file_obj : bool If True, leave the file object, when it is a file buffer, where it was before _load is called. Otherwise, (default), leave the file stream pos after the read packets. Does not apply when file is a string. Returns ------- dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Raises ------ ValueError the decoder_name is not one of the allowed values """ if hasattr(file, "read"): file_pos = file.tell() file_bytes = np.frombuffer(file.read(), "u1") else: file_bytes = np.fromfile(file, "u1") orig_fields = fields if include_primary_header: fields = _prepend_primary_header_fields(fields) fields, expand_history = _expand_array_fields(fields) if decoder_name == "fixed_length": field_arrays = _decode_fixed_length(file_bytes, fields) elif decoder_name == "variable_length": field_arrays = _decode_variable_length(file_bytes, fields) else: raise ValueError( f"Invalid decoder_name 'f{decoder_name}' specified. Must be " "either 'fixed_length', or 'variable_length'" ) field_arrays = _unexpand_field_arrays(field_arrays, expand_history) field_arrays = _apply_post_byte_reoderings(field_arrays, orig_fields) field_arrays = _apply_converters(field_arrays, converters) if hasattr(file, "read") and reset_file_obj: file.seek(file_pos) return field_arrays def _apply_converters(field_arrays, converters): """Apply post-processing converters in place to a dictionary of field arrays. Parameters ---------- field_arrays : dict of string to NumPy arrays The decoded packet field arrays without any post-processing applied converters : dict, str to tuple (str, Converter) Dictionary of post-processing conversions. keys are input field names, values are tuples of (output_field_name, Converter instance) Returns ------- converted_field_arrays : dict of string to NumPy arrays The converted decoded packet field arrays, as a dictionary with the same key as the passed `field_arrays`. """ converted = field_arrays.copy() for input_field_names, (output_field_name, converter) in converters.items(): # Collect list of input arrays to pass as *args to converter function input_arrays = [] for input_field_name in input_field_names: input_arrays.append(field_arrays[input_field_name]) # Call converter function converted[output_field_name] = converter.convert(*input_arrays) return converted def _apply_post_byte_reoderings(field_arrays, orig_fields): """Step of load procedure to apply post-processing byte reorderings. A field gets post-processing byte reordering if the attribute `field._byte_order_post` is not None. Parameters ---------- field_arrays : dict of string to NumPy arrays The decoded packet field arrays without any post-processing applied orig_fields : List of PacketField Original fields as specified in the packet, before any replacements which occur in the processing step. Returns ------- Reference to argument field_arrays (object was mutuated). """ for field in orig_fields: if field._byte_order_post is None: continue byte_order_string = field._byte_order_post byte_order_ints = [int(digit) for digit in byte_order_string] is_obj_array = field_arrays[field._name].dtype == object if is_obj_array: new_packet_arrays = [] for i, packet_array in enumerate(field_arrays[field._name]): field_arrays[field._name][i] = _do_array_byte_reordering( packet_array, byte_order_ints ) else: field_arrays[field._name] = _do_array_byte_reordering( field_arrays[field._name], byte_order_ints ) return field_arrays def _do_array_byte_reordering(array, byte_order_ints): """Reorder the bytes of an array. Parameters ---------- array : NumPy array May be multidimensional. Dtype of array must not be object. byte_order_ints : list of int Inceces of the bytes in order, e.g., 2314. Returns ------- Array with bytes reordered according to the passed order. """ assert array.dtype != object, "Error in byte reordering, please report a bug:.{array.dtype}" parsed_byte_length = array.itemsize native_byte_length = max(byte_order_ints) array_bytes = array.copy() array_bytes.dtype = np.uint8 array_bytes = array_bytes.reshape((array.size, parsed_byte_length)) digits_zero_idx = [digit - 1 for digit in reversed(byte_order_ints)] select_indeces = [] select_indeces.extend(digits_zero_idx) select_indeces.extend(sorted(set(range(array.itemsize)) - set(digits_zero_idx))) padding = array.itemsize - len(byte_order_ints) reordered = np.zeros_like(array_bytes) for i in range(reordered.shape[0]): reordered[i, :] = array_bytes[i, ::-1][select_indeces] shifted = np.zeros_like(reordered) if padding > 0: shifted[:, padding:] = reordered[:, :-padding] else: shifted[:] = reordered shifted.dtype = array.dtype shifted = shifted.reshape(array.shape) return shifted