#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2025 by Murray Altheim. All rights reserved. This file is part
# of the Robot Operating System project, released under the MIT License. Please
# see the LICENSE file included as part of this package.
#
# author: Murray Altheim
# created: 2025-06-23
# modified: 2025-06-23
import serial
import time
from colorama import init, Fore, Style
init()
from uart.payload import Payload
from core.logger import Logger, Level
[docs]
class SyncUARTManager:
def __init__(self, port='/dev/serial0', baudrate=115200, tx_timeout_ms=10, rx_timeout_ms=25):
self._log = Logger('sync-uart-mgr', Level.INFO)
self._port_name = port
self._baudrate = baudrate
self._tx_timeout_s = tx_timeout_ms / 1000
self._rx_timeout_s = rx_timeout_ms / 1000
self._log.info('tx timeout: {}ms; rx timeout: {}ms'.format(tx_timeout_ms, rx_timeout_ms))
self._serial = None
self._errors = 0
self._rx_buffer = None # buffer for sync-header-based framing
self._log.info('ready.')
[docs]
def open(self):
if self._serial is None or not self._serial.is_open:
self._rx_buffer = bytearray()
self._serial = serial.Serial(self._port_name, self._baudrate, timeout=self._tx_timeout_s)
self._log.info("serial port {} opened at {} baud.".format(self._port_name, self._baudrate))
[docs]
def close(self):
if self._serial and self._serial.is_open:
self._serial.close()
self._log.info("serial port closed.")
self._serial = None
[docs]
def send_packet(self, payload):
packet_bytes = payload.to_bytes()
# ensure sync header is present for robust protocol
if not packet_bytes.startswith(Payload.SYNC_HEADER):
packet_bytes = Payload.SYNC_HEADER + packet_bytes[len(Payload.SYNC_HEADER):]
self._serial.write(packet_bytes)
self._serial.flush()
# self._log.info(Style.DIM + "sent: {}".format(repr(payload)))
[docs]
def receive_packet(self):
'''
Reads bytes, synchronizes on sync header, and returns the first valid Payload found.
'''
start_time = time.time()
while True:
if self._serial.in_waiting:
data = self._serial.read(self._serial.in_waiting)
self._rx_buffer += data
start_time = time.time()
idx = self._rx_buffer.find(Payload.SYNC_HEADER)
if idx == -1:
# not found: trim buffer if too large
if len(self._rx_buffer) > len(Payload.SYNC_HEADER):
self._rx_buffer = self._rx_buffer[-(len(Payload.SYNC_HEADER)-1):]
# tight loop, no sleep
if time.time() - start_time > self._rx_timeout_s:
self._errors += 1
self._log.error('UART RX timeout error {}; sync header not found, clearing buffer…'.format(self._errors))
self._rx_buffer = bytearray()
start_time = time.time()
continue
# found sync header: do we have a full packet?
if len(self._rx_buffer) - idx >= Payload.PACKET_SIZE:
packet = self._rx_buffer[idx: idx + Payload.PACKET_SIZE]
self._rx_buffer = self._rx_buffer[idx + Payload.PACKET_SIZE:]
try:
payload = Payload.from_bytes(packet)
return payload
except ValueError as e:
self._errors += 1
self._log.error('receive error {}: {}. Resyncing…'.format(self._errors, e))
# remove just the first header byte to attempt resync
self._rx_buffer = self._rx_buffer[idx+1:]
self._errors = 0
continue
else:
# not enough bytes yet for a full packet
# tight loop, no sleep
if time.time() - start_time > self._rx_timeout_s:
self._errors += 1
self._log.error('UART RX timeout error {}; incomplete packet, clearing buffer.'.format(self._errors))
self._rx_buffer = bytearray()
start_time = time.time()
continue
[docs]
def receive_values(self):
'''
Convenience method to receive a Payload and return the tuple (cmd, pfwd, sfwd, paft, saft).
'''
payload = self.receive_packet()
if payload:
return (payload.cmd, payload.pfwd, payload.sfwd, payload.paft, payload.saft)
return None
#EOF