#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2025 by Murray Altheim. All rights reserved. This file is part
# of the Robot Operating System project, released under the MIT License. Please
# see the LICENSE file included as part of this package.
#
# author: Murray Altheim
# created: 2025-06-12
# modified: 2025-06-12
import serial
import asyncio
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
from colorama import init, Fore, Style
init()
from hardware.payload import Payload
from core.logger import Logger, Level
import time
[docs]
class AsyncUARTManager:
def __init__(self, port='/dev/serial0', baudrate=115200, tx_timeout_ms=25, rx_timeout_ms=25):
self._log = Logger('async-uart-mgr', Level.INFO)
self._port_name = port
self._baudrate = baudrate
self._tx_timeout_s = tx_timeout_ms / 1000
self._rx_timeout_s = rx_timeout_ms / 1000
self._log.info('TX timeout: {}ms; RX timeout: {}ms'.format(tx_timeout_ms, rx_timeout_ms))
self._serial = None
self._log.info('using port {} at {} baud.'.format(port, baudrate))
self._executor = ThreadPoolExecutor(max_workers=1)
# dedicated asyncio loop and thread
self._loop = asyncio.new_event_loop()
self._loop_thread = Thread(target=self._loop.run_forever, daemon=True)
self._loop_thread.start()
self._log.info('ready.')
# Buffer for sync-header-based framing
self._rx_buffer = bytearray()
self._log.info('ready.')
[docs]
def open(self):
if self._serial is None or not self._serial.is_open:
self._serial = serial.Serial(self._port_name, self._baudrate, timeout=self._tx_timeout_s)
self._log.info("serial port {} opened.".format(self._port_name))
[docs]
def close(self):
if self._serial and self._serial.is_open:
self._serial.close()
self._log.info("serial port closed.")
self._executor.shutdown(wait=False)
self._loop.call_soon_threadsafe(self._loop.stop)
self._loop_thread.join()
def _send_packet_sync(self, payload):
packet_bytes = payload.to_bytes()
# ensure sync header is present for robust protocol
if not packet_bytes.startswith(Payload.SYNC_HEADER):
packet_bytes = Payload.SYNC_HEADER + packet_bytes[len(Payload.SYNC_HEADER):]
self._serial.write(packet_bytes)
self._serial.flush()
# self._log.info(Style.DIM + "sent: {}".format(repr(payload)))
[docs]
def send_packet(self, payload):
'''
Synchronous wrapper: schedule async send on background loop.
'''
self._log.debug('send payload.')
future = asyncio.run_coroutine_threadsafe(
self._send_packet_async(payload), self._loop)
return future.result() # wait for completion
async def _send_packet_async(self, payload):
loop = asyncio.get_running_loop()
await loop.run_in_executor(self._executor, self._send_packet_sync, payload)
def _receive_packet_sync(self):
'''
Reads bytes, synchronizes on sync header, and returns the first valid Payload found.
'''
start_time = time.time()
while True:
if self._serial.in_waiting:
data = self._serial.read(self._serial.in_waiting)
# self._log.debug(f"RAW RX BYTES: {data.hex()}")
self._rx_buffer += data
self._log.debug('read {} bytes from serial; buffer size now: {}'.format(len(data), len(self._rx_buffer)))
start_time = time.time()
idx = self._rx_buffer.find(Payload.SYNC_HEADER)
if idx == -1:
# not found: trim buffer if too large
if len(self._rx_buffer) > len(Payload.SYNC_HEADER):
self._rx_buffer = self._rx_buffer[-(len(Payload.SYNC_HEADER)-1):]
time.sleep(0.005)
if time.time() - start_time > self._rx_timeout_s:
self._log.error("UART RX timeout; sync header not found, clearing buffer.")
self._rx_buffer = bytearray()
start_time = time.time()
continue
# found sync header: do we have a full packet?
if len(self._rx_buffer) - idx >= Payload.PACKET_SIZE:
packet = self._rx_buffer[idx: idx + Payload.PACKET_SIZE]
self._rx_buffer = self._rx_buffer[idx + Payload.PACKET_SIZE:]
try:
payload = Payload.from_bytes(packet)
self._log.debug("received: {}".format(repr(payload)))
return payload
except ValueError as e:
self._log.error("receive error: {}. Resyncing...".format(e))
# remove just the first header byte to attempt resync
self._rx_buffer = self._rx_buffer[idx+1:]
continue
else:
# not enough bytes yet for a full packet
time.sleep(0.005)
if time.time() - start_time > self._rx_timeout_s:
self._log.error('UART RX timeout; incomplete packet, clearing buffer.')
self._rx_buffer = bytearray()
start_time = time.time()
continue
[docs]
def receive_packet(self):
'''
Synchronous wrapper: schedule async receive on background loop.
'''
self._log.debug('receive packet.')
future = asyncio.run_coroutine_threadsafe(
self._receive_packet_async(), self._loop)
return future.result()
async def _receive_packet_async(self):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(self._executor, self._receive_packet_sync)
[docs]
def receive_values(self):
'''
Convenience method to receive a Payload and return the tuple (cmd, pfwd, sfwd, paft, saft).
'''
payload = self.receive_packet()
if payload:
return (payload.cmd.decode('ascii'), payload.pfwd, payload.sfwd, payload.paft, payload.saft)
return None
#EOF