Transport Layer

Provides the TransportLayer class used to establish and maintain bidirectional serial communication with Arduino and Teensy microcontrollers running the ataraxis-transport-layer-mc library over USB / UART interface.

class ataraxis_transport_layer_pc.transport_layer.TransportLayer(port, microcontroller_serial_buffer_size, baudrate, polynomial=np.uint8(7), initial_crc_value=np.uint8(0), final_crc_xor_value=np.uint8(0), *, test_mode=False)

Bases: object

Provides methods for sending and receiving serialized data over the USB and UART communication interfaces.

This class instantiates and manages all library assets used to transcode, validate, and bidirectionally transfer serial data over the target communication interface. Critically, this includes the transmission and reception buffers that are used to temporarily store the outgoing and incoming data payloads. All user-facing class methods interact with the data stored in one of these buffers.

Parameters:
  • port (str) – The name of the serial port to connect to, e.g.: ‘COM3’ or ‘/dev/ttyUSB0’. Use the ‘axtl-ports’ CLI command to discover available port names.

  • microcontroller_serial_buffer_size (int) – The size, in bytes, of the buffer used by the connected microcontroller’s serial communication interface. Usually, this information is available from the microcontroller’s manufacturer (UART / USB controller specification).

  • baudrate (int) – The baudrate to use for communication if the microcontroller uses the UART interface. Should match the value used by the microcontroller. This parameter is ignored when using the USB interface.

  • polynomial (TypeAliasType, default: np.uint8(7)) – The polynomial to use for the generation of the CRC lookup table. The polynomial must be standard (non-reflected / non-reversed).

  • initial_crc_value (TypeAliasType, default: np.uint8(0)) – The value to which the CRC checksum is initialized before calculation.

  • final_crc_xor_value (TypeAliasType, default: np.uint8(0)) – The value with which the CRC checksum is XORed after calculation.

  • test_mode (bool, default: False) – Determines whether the instance uses a pySerial (real) or a StreamMock (mocked) communication interface. This flag is used during testing and should be disabled for all production runtimes.

_opened

Tracks whether the serial communication has been opened (the port has been connected).

_port

Depending on the test_mode flag, stores either a SerialMock or Serial object that provides the serial communication interface.

_crc_processor

Stores the CRCProcessor instance that provides methods for working CRC checksums.

_cobs_processor

Stores the COBSProcessor instance that provides methods for encoding and decoding transmitted payloads.

_start_byte

Stores the byte-value that marks the beginning of transmitted and received packets.

_delimiter_byte

Stores the byte-value that marks the end of transmitted and received packets.

_timeout

Stores the number of microseconds to wait between receiving any two consecutive bytes of a packet.

_max_tx_payload_size

Stores the maximum number of bytes that can be transmitted as a single payload.

_max_rx_payload_size

Stores the maximum number of bytes that can be received from the microcontroller as a single payload.

_min_rx_payload_size

Stores the minimum number of bytes that can be received from the Microcontroller as a single payload.

_postamble_size

Stores the byte-size of the CRC checksum.

_transmission_buffer

The buffer used to stage the data to be sent to the Microcontroller.

_reception_buffer

The buffer used to store the decoded data received from the Microcontroller.

_bytes_in_transmission_buffer

Tracks how many bytes (relative to index 0) of the transmission buffer are currently used to store the payload to be transmitted.

_bytes_in_reception_buffer

Same as _bytes_in_transmission_buffer, but for the reception buffer.

_consumed_bytes

Tracks the number of the last received payload bytes that have been consumed by the read_data() method calls.

_leftover_bytes

A buffer used to preserve any ‘unconsumed’ bytes that were read from the serial port but not used to reconstruct the payload sent from the Microcontroller. This is used to minimize the number of calls to pySerial methods, as they are costly to run.

_accepted_numpy_scalars

Stores numpy types (classes) that can be used as scalar inputs or as ‘dtype’ fields of the numpy arrays that are provided to class methods.

_minimum_packet_size

Stores the minimum number of bytes that can represent a valid packet. This value is used to optimize packet reception logic.

Raises:
  • TypeError – If any of the input arguments are not of the expected type.

  • ValueError – If any of the input arguments have invalid values.

property available: bool

Returns True if enough bytes are available from the serial port to justify attempting to receive a packet.

property bytes_in_reception_buffer: int

Returns the number of payload bytes stored inside the instance’s reception buffer.

property bytes_in_transmission_buffer: int

Returns the number of payload bytes stored inside the instance’s transmission buffer.

read_data(data_object)

Overwrites the input object’s data with the data from the instance’s reception buffer, consuming (discarding) all read bytes.

This method deserializes the objects stored in the reception buffer as a sequence of bytes. Calling this method consumes the read bytes, making it impossible to retrieve the same data from the reception buffer again.

Notes

At this time, the method only works with valid numpy scalars and arrays as well as python dataclasses entirely made out of valid numpy types.

The maximum runtime speed of this method is achieved when reading data as numpy arrays, which is optimized to a single read operation. The minimum runtime speed is achieved by reading dataclasses, as it involves looping over dataclass attributes.

Parameters:

data_object (Any) – An initialized numpy scalar or array object or a python dataclass made entirely out of valid numpy objects. Supported numpy types are: uint8, uint16, uint32, uint64, int8, int16, int32, int64, float32, float64, and bool. Array prototypes have to be 1-dimensional and not empty to be supported.

Return type:

Any

Returns:

The deserialized data object extracted from the instance’s reception buffer.

Raises:
  • TypeError – If the input object is not a supported numpy scalar, numpy array, or python dataclass.

  • ValueError – If the payload stored inside the reception buffer does not have enough unconsumed bytes available to reconstruct the requested object. If the input object is a multidimensional or empty numpy array.

receive_data()

Receives a data packet from the communication interface, verifies its integrity, and decodes its payload into the instance’s reception buffer.

Notes

Before attempting to receive the packet, the method uses the available property to check whether the communication interface is likely to store a well-formed packet. It is safe to call this method cyclically (as part of a loop) until a packet is received.

This method resets the instance’s reception buffer before attempting to receive the data, discarding any potentially unprocessed data.

Return type:

bool

Returns:

True if the packet was successfully received and unpacked and False if the communication interface does not contain enough bytes to justify processing the packet.

Raises:

RuntimeError – If the method runs into an error while receiving or processing the packet’s data.

property reception_buffer: ndarray[tuple[Any, ...], dtype[uint8]]

Returns a copy of the reception buffer array.

This buffer stores the decoded data received from the Microcontroller. Use this method to safely access the contents of the buffer.

reset_reception_buffer()

Resets the instance’s reception buffer, discarding any stored data.

Return type:

None

reset_transmission_buffer()

Resets the instance’s transmission buffer, discarding any stored data.

Return type:

None

send_data()

Packages the data inside the instance’s transmission buffer into a serialized packet and transmits it over the communication interface.

Return type:

None

Notes

This method resets the instance’s transmission buffer after transmitting the data, discarding any data stored inside the buffer.

property transmission_buffer: ndarray[tuple[Any, ...], dtype[uint8]]

Returns a copy of the transmission buffer array.

This buffer stores the ‘staged’ data to be sent to the Microcontroller. Use this method to safely access the contents of the buffer.

write_data(data_object)

Serializes and writes the input object’s data to the end of the payload stored in the instance’s transmission buffer.

Notes

At this time, the method only works with numpy scalars and arrays, as well as python dataclasses entirely made out of valid numpy types.

The maximum runtime speed for this method is achieved when writing data as numpy arrays, which is optimized to a single write operation. The minimum runtime speed is achieved by writing dataclasses, as it involves looping over dataclass attributes. When writing dataclasses, all attributes are serialized and written as a consecutive data block.

Parameters:

data_object (Any) – A numpy scalar or array object or a python dataclass made entirely out of valid numpy objects. Supported numpy types are: uint8, uint16, uint32, uint64, int8, int16, int32, int64, float32, float64, and bool. Arrays have to be 1-dimensional and not empty to be supported.

Raises:
  • TypeError – If the input object is not a supported numpy scalar, numpy array, or python dataclass.

  • ValueError – If the transmission buffer does not have enough space to accommodate the written object’s data. If the input object is a multidimensional or empty numpy array.

Return type:

None

class ataraxis_transport_layer_pc.transport_layer.TransportLayerStatus(*values)

Bases: IntEnum

Defines the status codes used by the TransportLayer class to communicate the state of various processing steps between the JIT-compiled methods and the user-facing API methods.

DELIMITER_FOUND_TOO_EARLY = 6

Delimiter byte value encountered before reaching the end of the encoded payload data block. It is expected that the last byte of the encoded payload is set to the delimiter value and that the value is not present anywhere else inside the encoded payload region. Encountering the delimiter early indicates packet corruption.

DELIMITER_NOT_FOUND = 7

Delimiter byte value not encountered at the end of the encoded payload data block. See code 104 description for more details, but this code also indicates packet corruption.

EMPTY_ARRAY_ERROR = -3

The data to be written or the prototype to be read is an empty NumPy array.

INSUFFICIENT_BUFFER_SPACE_ERROR = -1

The reception or transmission buffer does not have enough space for the requested operation.

MULTIDIMENSIONAL_ARRAY_ERROR = -2

The data to be written or the prototype to be read are not a one-dimensional NumPy array.

NOT_ENOUGH_CRC_BYTES = 3

Not enough bytes read to fully parse the packet. The packet payload was successfully parsed, but there were not enough bytes to fully parse the CRC postamble.

NOT_ENOUGH_PACKET_BYTES = 2

Not enough bytes read to fully parse the packet. The packet size was resolved, but there were not enough bytes to fully parse the packet (encoded payload + crc postamble).

NO_BYTES_TO_READ = 4

No start byte found, which is interpreted as ‘no bytes to read,’ as the class is configured to ignore start byte errors. Usually, this situation is caused by communication line noise generating ‘noise bytes’.

PACKET_PARSED = 1

Packet fully parsed.

PACKET_SIZE_UNKNOWN = 0

Not enough bytes read to fully parse the packet. The start byte was found, but packet size has not been resolved and, therefore, not known.

PAYLOAD_SIZE_MISMATCH = 5

Parsed payload_size value does not match the expected value. This likely indicates packet corruption or communication parameter mismatch between the TransportLayer instance and the connected Microcontroller.

ataraxis_transport_layer_pc.transport_layer.list_available_ports()

Provides the information about each serial port addressable through the pySerial library.

This function is intended to be used for discovering and selecting the serial port ‘names’ to use with TransportLayer instances.

Return type:

tuple[ListPortInfo, ...]

Returns:

A tuple of ListPortInfo instances, each storing ID and descriptive information about each discovered serial port.

ataraxis_transport_layer_pc.transport_layer.print_available_ports()

Prints all serial ports active on the host-system with descriptive information about the device connected to that port to the terminal.

This command is intended to be used for discovering the USB ports that can be connected to by a TransportLayer class instance.

Return type:

None

Helper Modules

Provides the low-level helper classes that support the runtime of TransportLayer class methods.

class ataraxis_transport_layer_pc.helper_modules.COBSProcessor

Bases: object

Exposes the API for encoding and decoding data using the Consistent Overhead Byte Stuffing (COBS) scheme.

This class wraps a JIT-compiled COBS processor implementation, combining the convenience of a pure-python API with the speed of the C-compiled processing code.

Notes

This class is intended to be used by the TransportLayer class and should not be used directly by the end-users. It makes specific assumptions about the layout and contents of the processed data buffers that are not verified during runtime and must be enforced through the use of the TransportLayer class.

_processor

Stores the jit-compiled _COBSProcessor instance, which carries out all computations.

decode_payload(packet)

Decodes the COBS-encoded payload from the input packet.

Expects the input packets to adhere to the following structure: [Overhead] … [COBS Encoded Payload] … [Delimiter].

Parameters:

packet (ndarray[tuple[Any, ...], dtype[uint8]]) – The COBS-encoded packet from which to decode the payload.

Return type:

ndarray[tuple[Any, ...], dtype[uint8]]

Returns:

The payload decoded from the packet.

Raises:

ValueError – If the decoding fails, indicating uncaught packet corruption.

encode_payload(payload)

Encodes the input payload into a transmittable packet using COBS scheme.

The encoding produces the following packet structure: [Overhead] … [COBS Encoded Payload] … [Delimiter].

Parameters:

payload (ndarray[tuple[Any, ...], dtype[uint8]]) – The payload to be encoded using the COBS scheme.

Return type:

ndarray[tuple[Any, ...], dtype[uint8]]

Returns:

The serialized packet encoded using the COBS scheme.

property processor: _COBSProcessor

Returns the jit-compiled COBS processor class instance.

This accessor allows external methods to directly interface with the JIT-compiled class, bypassing the Python wrapper.

class ataraxis_transport_layer_pc.helper_modules.CRCProcessor(polynomial, initial_crc_value, final_xor_value)

Bases: object

Exposes the API for working with Cyclic Redundancy Check (CRC) checksums used to verify the integrity of transferred data packets.

This class wraps a JIT-compiled CRC processor implementation, combining the convenience of a pure-python API with the speed of the C-compiled processing code.

Notes

This class is intended to be used by the TransportLayer class and should not be used directly by the end-users. It makes specific assumptions about the layout and contents of the processed data buffers that are not verified during runtime and must be enforced through the use of the TransportLayer class.

Parameters:
  • polynomial (TypeAliasType) – The polynomial to use for the generation of the CRC lookup table. The polynomial must be standard (non-reflected / non-reversed).

  • initial_crc_value (TypeAliasType) – The value to which the CRC checksum is initialized before calculation.

  • final_xor_value (TypeAliasType) – The value with which the CRC checksum is XORed after calculation.

_processor

Stores the jit-compiled _CRCProcessor instance, which carries out all computations.

Raises:

TypeError – If class initialization arguments are not of the valid type.

calculate_checksum(buffer, check)

Calculates the checksum for the data stored in the input buffer.

Depending on configuration, this method can be used to either generate and write the CRC checksum to the end of the packet or to verify the integrity of the incoming packet using its checksum postamble.

Parameters:
  • buffer (ndarray[tuple[Any, ...], dtype[uint8]]) – The buffer that contains the COBS-encoded packet for which to resolve the checksum. The buffer must include the space for the CRC checksum at the end of the packet.

  • check (bool) – Determines whether the method is called to verify the incoming packet’s data integrity or to generate and write the CRC checksum to the outgoing packet’s postamble section.

Return type:

uint16

Returns:

The calculated numpy uint8, uint16, or uint32 integer CRC checksum value.

Raises:

ValueError – If the method is unable to verify the incoming packet’s data integrity.

property crc_byte_length: uint8

Returns the byte-size used by the CRC checksums.

property crc_table: ndarray[tuple[Any, ...], dtype[CRCType]]

Returns the CRC checksum lookup table.

property final_xor_value: CRCType

Returns the final XOR value used for checksum calculation.

property initial_crc_value: CRCType

Returns the initial value used for checksum calculation.

property polynomial: CRCType

Returns the polynomial used for checksum calculation.

property processor: _CRCProcessor

Returns the jit-compiled CRC processor class instance.

This accessor allows external methods to directly interface with the JIT-compiled class, bypassing the Python wrapper.

type ataraxis_transport_layer_pc.helper_modules.CRCType = uint8 | uint16 | uint32
class ataraxis_transport_layer_pc.helper_modules.SerialMock

Bases: object

Mocks the behavior of the PySerial’s Serial class for testing purposes.

This class provides a mock implementation of the Serial class, enabling unit tests for the TransportLayer class without a hardware connection. It replicates the core functionalities of the PySerial’s Serial class that are relevant for testing, such as reading and writing data.

is_open

A flag indicating if the mock serial port is open.

tx_buffer

A byte buffer that stores transmitted data.

rx_buffer

A byte buffer that stores received data.

close()

Closes the mock serial port, setting is_open to False.

Return type:

None

property in_waiting: int

Returns the number of bytes stored in the rx_buffer.

open()

Opens the mock serial port, setting is_open to True.

Return type:

None

property out_waiting: int

Returns the number of bytes stored in the tx_buffer.

read(size=1)

Reads a specified number of bytes from the rx_buffer.

Parameters:

size (int, default: 1) – The number of bytes to read from the input buffer.

Return type:

bytes

Returns:

A bytes’ object containing the requested data from the rx_buffer.

Raises:

RuntimeError – If the mock serial port is not open.

reset_input_buffer()

Clears the rx_buffer attribute.

Raises:

RuntimeError – If the mock serial port is not open.

Return type:

None

reset_output_buffer()

Clears the tx_buffer attribute.

Raises:

RuntimeError – If the mock serial port is not open.

Return type:

None

write(data)

Writes data to the tx_buffer.

Parameters:

data (bytes) – The serialized data to be written to the output buffer.

Raises:
  • TypeError – If data is not a bytes’ object.

  • RuntimeError – If the mock serial port is not open.

Return type:

None