Overview

This project uses Python and MicroPython for control of up to four DFRobot 12V DC brushless motors (model number FIT0441). It is designed for use with a Raspberry Pi but could be adapted to any hardware with similar capabilities.

The DFRobot brushless motors require a Pulse-Width Modified (PWM) signal to control their speed, in an inverted mode: when fed a 100% duty cycle the motor is stopped, a 50% duty cycle is half speed, a 0% duty cycle is full speed.

Because the Raspberry Pi is a user-space OS rather than an RTOS, its software PWM is not particularly stable, as it is influenced by system load. The Pi has only two channels of hardware PWM available. Therefore, if you are trying to control more than two motors you’ll need either an external PWM board or connect to a microcontroller over a UART. In this latter mode the project supports either an RP2040 or an STM32 (i.e., the STM32H562, though it could with some pin configuration changes be used with others).

Installation

The Python files for the project are located in the root directory, core, hardware and uart directories. The upy directory contains the contents to be uploaded to the microcontroller.

Also, depending on your implementation choices you will be able to trim unused files.

Pinout

Here are the pin configurations for connecting a Raspberry Pi to a STM32H562.

UART Pins

This uses a custom build of MicroPython for the STM32H562, whose board definition can be found at mp-weact-stm32h562. The pins used for this implementation are defined in the board definition.

The STM32H562 build defines UARTs 1-3. UART 1 is discouraged as it is used to print console communications over /dev/serial0 when connected to the board (e.g., over rshell). UART 4’s pins conflict with the SD card so support for it as not built into MicroPython for the STM32H562. Therefore, only UART 2 or UART 3 are suitable: UART 2 is configured as the default.

UART

TX

RX

UART1

PA9

PA10

UART2

PA2

PA3

Note

You cannot specify tx/rx pins in MicroPython’s UART() constructor; it will use the defined pins for each.

The primary UART on the Raspberry Pi is GPIO 14 (TX) and GPIO 15 (RX). Using UART 2, GPIO 14 is therefore connected to PA3, GPIO 15 to PA2.

If using a NeoPixel or NeoPixel strip, it is connected to PA1. This is currently hard-coded in the upy/pixel.py class.

Device

Pin

NeoPixel

PA1

Motor Pins

Apart from power, there are three connections to each motor: a PWM pin used for speed control; a direction pin that sets the motor direction; and an encoder feedback pin whose ticks indicate motor rotation.

Hardware Timers on the STM32 have four channels. We use all four channels of a Timer to supply our PWM signals.

Id

Name

PWM Ch

PWM Pin

Dir Pin

Enc Pin

Enc Ch

0

M0

1

PB6

B12

PC6

1

M1

2

PB7

B14

PC7

2

M2

3

PB8

B3

PB0

3

M3

4

PB9

B5

PB1

All four motors use the same PWM Timer (4).

Modules

This project documentation intermixes the CPython and MicroPython code indescriminately. To differentiate, check the upy directory, which contains all the MicroPython code.

class core.component.Component(logger, suppressed=True, enabled=False)[source]

Bases: object

A basic component providing support for enable or disable, suppress or release, and close flags. The enable/disable and suppress/release differ in that in a disabled state a Component does not operate at all, whereas in a suppressed state it operates as normal but cannot send or receive messages. This functionality is provided solely as flags, not enforced by this class.

The Logger is passed in as an argument on the constructor. This is only implicitly an abstract class, (not explicitly) because while we expect it to be subclassed, but there is no reason to enforce an API or reimplement methods unless to hook additional functionality to them.

The Component is suppressed and disabled by default, though via optional constructor arguments either can set be set to True.

All Components are automatically added to the ComponentRegistry, which is an alternative means of gaining access to them within the application, by name.

Parameters:

logger – the Logger used for the Component

property classname

Return the name of this Component’s class.

close()[source]

Permanently close and disable the Component. This returns a True value to force currency.

property closed

Returns True if this Component is closed.

disable()[source]

Disable this Component. This returns a True value to force currency.

property disabled

Return the disabled state of this Component. This is a convenience method.

enable()[source]

Enable this Component.

property enabled

Return the enabled state of this Component.

property is_active

A convenience method that returns True if this Component is enabled and released (i.e., not suppressed).

release()[source]

Releases (un-suppresses) this Component.

suppress()[source]

Suppresses this Component.

property suppressed

Return True if this Component is suppressed.

class core.component.ComponentRegistry(level)[source]

Bases: object

Maintains a registry of all Components, in the order in which they were created.

add(name, component)[source]

Add a component to the registry using a unique name, raising a ConfigurationError if a like-named component already exists in the registry.

get(name)[source]

Return the component by name.

get_registry()[source]

Return the backing registry as a dict.

print_registry()[source]

Print the registry to the log.

exception core.component.ConfigurationError(message)[source]

Bases: RuntimeError

This exception is thrown when any error due to configuration occurs.

exception core.component.IllegalStateError(message)[source]

Bases: RuntimeError

Signals that a method has been invoked at an illegal or inappropriate time.

exception core.component.MissingComponentError(message)[source]

Bases: Exception

Thrown when a required component is not available.

class core.config_loader.ConfigLoader(level=Level.INFO)[source]

Bases: object

A loader for a YAML configuration file.

configure(filename='config.yaml')[source]

Read and return configuration from the specified YAML file.

Pretty-prints the configuration object if the log level is set to DEBUG.

Parameters:

filename – the optional name of the YAML file to load. Default: config.yaml

export(config, filename='.config.yaml', comments=None)[source]

Export the provided YAML configuration to a file.

The optional list of comments are written to the beginning of the file.

class core.logger.Level(value)[source]

Bases: Enum

CRITICAL = 50
DEBUG = 10
ERROR = 40
INFO = 20
WARN = 30
static from_string(label)[source]
class core.logger.Logger(name=None, level=None)[source]

Bases: object

close()[source]

Closes down logging, and informs the logging system to perform an orderly shutdown by flushing and closing all handlers. This should be called at application exit and no further use of the logging system should be made after this call.

critical(message)[source]

Prints a critical or otherwise application-fatal message.

debug(message)[source]

Prints a debug message.

The optional ‘end’ argument is for special circumstances where a different end-of-line is desired.

error(message)[source]

Prints an error message.

The optional ‘end’ argument is for special circumstances where a different end-of-line is desired.

file(message)[source]

This is just info() but without any formatting.

info(message)[source]

Prints an informational message.

The optional ‘end’ argument is for special circumstances where a different end-of-line is desired.

is_at_least(level)[source]

Returns True if the current log level is less than or equals the argument. E.g.,

if self._log.is_at_least(Level.WARN):

# returns True for WARN or ERROR or CRITICAL

property level

Return the level of this logger.

property name

Return the name of this Logger.

notice(message)[source]

Functionally identical to info() except it prints the message brighter.

The optional ‘end’ argument is for special circumstances where a different end-of-line is desired.

release()[source]

Releases (un-suppresses) all log messages except critical errors and log-to-file messages. This is global across all Loggers.

suppress()[source]

Suppresses all log messages except critical errors and log-to-file messages. This is global across all Loggers.

property suppressed

Return True if this logger has been suppressed.

warning(message)[source]

Prints a warning message.

The optional ‘end’ argument is for special circumstances where a different end-of-line is desired.

class core.util.Util[source]

Bases: object

A collection of static utility methods.

static already_running(process_name)[source]

Returns true if there is already an instance of a process running.

This is a static method so other processes can check prior to starting a process rather than dealing with the RuntimeException thrown.

This parses the output of ‘ps –no-headers -f -C python3’ to see if a different instance of this script is already running:

UID PID PPID C STIME TTY TIME CMD pi 4186 1058 3 13:04 pts/0 00:00:31 python3 monitor_test.py pi 5985 1058 95 13:18 pts/0 00:00:00 python3 monitor_exec.py

static clip(value, min_value, max_value)[source]

A replacement for numpy’s clip():

_value = numpy.clip(target_value, _min, _max)

static ellipsis(string, max_length)[source]

Repeat ‘string’ a given number of times.

static export_configuration(log, config, filepath)[source]

Dump the configuration to a JSON file.

:param logger the logger to capture the result :param config the configuration dict to be serialised to JSON :param filepath the target file path

static frange(start=0, stop=1, jump=0.1)[source]
static get_class_name_of_method(method)[source]
static get_formatted_time(label, value)[source]
static get_formatted_value(value)[source]
static get_timestamp()[source]

Return an ISO UTC timestamp.

static import_configuration(log, filepath)[source]

Read configuration from a JSON file.

:param logger the logger to capture the result :param filepath the source file path

static is_true(value)[source]

Returns True if the value is a 1, a “1”, “y”, “yes”, or “true” (with case-insensitive matching).

static list_methods(cls)[source]

Print the methods of the provided class.

static remap_range(value, in_min, in_max, out_min, out_max)[source]

Remaps a value in the input range to the same ratio’d value in the output range.

static repeat(string, number)[source]

Repeat ‘string’ a given number of times.

static to_bin(decimal)[source]
static to_bin_v2(x)[source]
static to_decimal(binary)[source]
class hardware.brushless_motor.BrushlessMotor(*, pi=None, name='ctrl', config=None, pwm_pin=None, dir_pin=None, closed_loop_enabled=True, pwm_impl: PWMControllerImpl = PWMControllerImpl.HARDWARE_CONTROLLER, tlc_controller: TLC59711 = None, channel: ControllerChannel = None, level=Level.INFO)[source]

Bases: object

CALIBRATION_MM_PER_PERCENT_PER_SEC = 5.48

When operating in open loop mode, speeds are percentages between -100 and 100. When operating in closed loop mode, speeds are translated from percentages to RPM values between -159 and 159.

Both are called via set_speed(). You can also call set_target_rpm() directly.

Note that stall recovery is only enabled in closed loop mode.

Parameters:
  • pi – the pigpio instance

  • name – the optional motor name

  • config – the application configuration (dict)

  • pwm_pin – the GPIO pin used for PWM

  • dir_pin – the GPIO pin used to set the motor direction

  • closed_loop_enabled – if True, operates in closed loop mode. If None is provided, uses configuration instead

  • level – the log level

DIRECTION_FORWARD = 1
DIRECTION_REVERSE = 0
DIR_GPIO_PIN = 23
FG_ENCODER_PIN = 24
PWM_GPIO_PIN = 18
WHEEL_DIAMETER_MM = 70
accelerate(start_speed, end_speed, step_speed, delay=None)[source]

Gradually accelerate from start_speed to end_speed, with set delay between steps.

close()[source]

Stop motor and clean up resources.

property closed_loop_enabled

Returns True if closed loop mode is enabled.

property cumulative_distance_mm

Return the cumulative distance in millimeters. Returns None if running in open loop mode.

disable()[source]
enable()[source]
enable_closed_loop(enable=True)[source]
property enabled
get_distance_future()[source]

Used to create blocking return while running the task.

get_distance_mm()[source]

Calculate distance traveled in millimeters based on elapsed time and current speed using the calibration constant.

property measured_mm_per_sec

Returns the current measured speed in mm/sec using the formula:

linear speed = wheel circumference * RPM / 60

Returns None if running in open loop mode.

property measured_rpm

Returns the RPM indicated by counting the pulses coming from FG, the motor feedback pin. Returns None if running in open loop mode.

reset_distance()[source]

Reset odometry tracking to zero.

set_on_recovery(callback)[source]
set_on_stall(callback)[source]
set_speed(speed, target_mm=0)[source]

Sets the target speed for both open and closed loop mode.

stop()[source]

Stop motor immediately.

stop_loop()[source]

Stops the asyncio loop, to be used upon closing.

class hardware.controller_channel.ControllerChannel(value)[source]

Bases: Enum

An enumeration of the 12 channels of the TLC59711 PWM controller.

CHANNEL_0 = ('R0', 3)
CHANNEL_1 = ('R1', 2)
CHANNEL_2 = ('R2', 1)
CHANNEL_3 = ('R3', 0)
property channel

Return the integer value of the enum member.

static from_string(pin_name: str) ControllerChannel[source]

Convert a string pin name (e.g., ‘R0’) to a ControllerChannel enum member.

property pin

Return the pin name on the TCL59711.

class hardware.digital_pot_async.DigitalPotentiometer(fps=30, multiplier=100.0, level=Level.INFO)[source]

Bases: object

BRIGHTNESS = 0.5
I2C_ADDR = 12
PERIOD = 510
PIN_BLUE = 2
PIN_GREEN = 7
PIN_RED = 1
POT_ENC_A = 12
POT_ENC_B = 3
POT_ENC_C = 11
property analog
property data
property normalised_value

Return a normalised int value between -100 and 100, or the multiplier if changed in the constructor.

off()[source]
property rgb
start()[source]
stop()[source]
property value

Return the analog value (voltage) from the ADC pin.

class hardware.payload.Payload(cmd: str, pfwd: float, sfwd: float, paft: float, saft: float)[source]

Bases: object

PACKET_SIZE = 19
PACK_FORMAT = '<2s4f'
static calculate_crc8(data: bytes, poly=7, init=0) int[source]
classmethod from_bytes(packet: bytes)[source]
to_bytes() bytes[source]
class hardware.pwm_controller.HardwarePWMController(pi, pwm_pin, pwm_freq, level=Level.INFO)[source]

Bases: PWMController

A hardware PWM controller, using one of the Raspberry Pi specific hardware PWM pins.

set_pwm(speed_percent)[source]
stop_pwm()[source]
class hardware.pwm_controller.PWMController[source]

Bases: ABC

Abstract base class for motor PWM controllers. All PWM controllers must implement set_pwm and stop_pwm.

FULL_SPEED = 0
STOPPED = 1000000
abstractmethod set_pwm(speed_percent)[source]
abstractmethod stop_pwm()[source]
class hardware.pwm_controller.SoftwarePWMController(pi, pwm_pin, pwm_freq, level=Level.INFO)[source]

Bases: PWMController

A software PWM controller, using one of the Raspberry Pi GPIO pins.

set_pwm(speed_percent)[source]
stop_pwm()[source]
class hardware.pwm_controller_impl.PWMControllerImpl(value)[source]

Bases: Enum

HARDWARE_CONTROLLER = 'hardware'
SOFTWARE_CONTROLLER = 'software'
TLC59711_CONTROLLER = 'tlc59711'

An enumeration of the implementing classes of the PWMController API, with a factory method and a utility to return the member matching its name.

create(*, pi: Any | None, config: dict | None, pin: int | None, freq: int | None, tlc_controller: TLC59711 | None = None, channel: ControllerChannel | None = None, level: Any = None)[source]
static from_string(name: str) PWMControllerImpl[source]

Convert a string representation to a PWMControllerImpl enum member.

class hardware.rotary_encoder.RotaryEncoder(i2c_addr=11, multiplier=1, brightness=0.5)[source]

Bases: object

I2C_ADDR = 15
PIN_BLUE = 2
PIN_GREEN = 7
PIN_RED = 1
POT_ENC_A = 12
POT_ENC_B = 3
POT_ENC_C = 11
static bounded_rollover(value, limit)[source]
off()[source]
start()[source]
update()[source]
class hardware.slew_limiter.SlewLimiter(max_delta_per_sec, safe_threshold=10)[source]

Bases: object

A slew limiter for the BrushlessMotor, used to limit the rate of change of the motor’s target speed, preventing sudden, abrupt changes that could be harmful to the motor and ensuring the changes to the motor’s speed are gradual, more elegant, rather than instantaneous.

limit(value)[source]

Returns the limited value based on the time passed since the last call,

reset(value=None)[source]

Resets the limiter to a specific value (or clears it).

class hardware.stm32_pwm_controller.STM32PWMController(pi, config: dict, level=Level.INFO)[source]

Bases: PWMController

Implements the PWMController API using an STM32F405 Pyboard for motor control.

set_pwm(speed)[source]

Set the motor speed by adjusting the PWM duty cycle. Note that direction is not indicated by a negative number but changing the value of the direction pin, therefore it’s not handled here.

Parameters:

speed – target speed as a percentage (0-100)

stop()[source]

Stop the motor by turning off the PWM signal.

stop_pwm()[source]

Stop the PWM output by setting all RGB values to zero (off).

class hardware.tlc59711.TLC59711(spi: spidev.SpiDev)[source]

Bases: object

Parameters:

spi – an instance of the SPI bus connected to the chip.

chip_set_BCData(chip_index: int, bcr: int = 127, bcg: int = 127, bcb: int = 127) None[source]

Set BC-Data.

set_chipheader_bits_in_buffer(*, chip_index: int = 0, part_bit_offset: int = 0, field: Dict[str, int] | None = None, value: int = 0) None[source]

Set chip header bits in buffer.

set_pixel_16bit_value(pixel_index: int, value_r: int, value_g: int, value_b: int) None[source]

Set the value for pixel.

stop()[source]
class hardware.tlc59711_pwm_controller.TLC59711PWMController(pi, config: dict, pwm_controller: TLC59711, channel: ControllerChannel, level=Level.INFO)[source]

Bases: PWMController

Implements the PWMController API using the Adafruit 12-channel 16-bit PWM controller board with the TLC59711 chip for motor control.

set_pwm(speed)[source]

Set the motor speed by adjusting the PWM duty cycle. Note that direction is not indicated by a negative number but changing the value of the direction pin, therefore it’s not handled here.

Parameters:

speed – target speed as a percentage (0-100)

stop()[source]

Stop the motor by turning off the PWM signal.

stop_pwm()[source]

Stop the PWM output by setting all RGB values to zero (off).

class hardware.value_provider.DigitalPotSpeedProvider(multiplier=100.0)[source]

Bases: ValueProvider

close()[source]
class hardware.value_provider.RotaryEncoderCommandProvider[source]

Bases: ValueProvider

close()[source]
class hardware.value_provider.ValueProvider[source]

Bases: object

off()[source]
class uart.async_uart_manager.AsyncUARTManager(port='/dev/serial0', baudrate=115200, tx_timeout_ms=25, rx_timeout_ms=25)[source]

Bases: object

close()[source]
open()[source]
receive_packet()[source]

Synchronous wrapper: schedule async receive on background loop.

receive_values()[source]

Convenience method to receive a Payload and return the tuple (cmd, pfwd, sfwd, paft, saft).

send_packet(payload)[source]

Synchronous wrapper: schedule async send on background loop.

class uart.payload.Payload(cmd, pfwd, sfwd, paft, saft)[source]

Bases: object

CRC_SIZE = 1
PACKET_SIZE = 21
PACK_FORMAT = '<2sffff'
PAYLOAD_SIZE = 18
SYNC_HEADER = b'zz'
static calculate_crc8(data: bytes) int[source]
property cmd
classmethod from_bytes(packet)[source]
property paft
property pfwd
property saft
property sfwd
to_bytes()[source]

A convenience method that calls __bytes__(). This is probably safer as in certain cases __bytes__() doesn’t get triggered correctly in MicroPython.

class uart.sync_uart_manager.SyncUARTManager(port='/dev/serial0', baudrate=115200, tx_timeout_ms=10, rx_timeout_ms=25)[source]

Bases: object

close()[source]
open()[source]
receive_packet()[source]

Reads bytes, synchronizes on sync header, and returns the first valid Payload found.

receive_values()[source]

Convenience method to receive a Payload and return the tuple (cmd, pfwd, sfwd, paft, saft).

send_packet(payload)[source]
class uart.uart_master.UARTMaster(port='/dev/ttyAMA0', baudrate=115200)[source]

Bases: object

ERROR_PAYLOAD = Payload(cmd=ER, pfwd=-1.0, sfwd=-1.0, paft=-1.0, saft=-1.0)

Uses UART 4 on port /dev/ttyAMA0 as the default.

receive_payload()[source]

Receive a Payload object.

run(command_source: Callable[[], int] | None = None, speed_source: Callable[[], int] | None = None, delay_sec=0)[source]

Main loop for communication with elapsed time measurement. This is currently used for testing but could easily be modified for continuous use.

send_payload(payload)[source]

Send a Payload object after converting it to bytes.

send_receive_payload(payload)[source]

Accept a Payload, send it, then wait for the response and return the Payload result. This method can be used without needing to run the full loop. If an error occurs this returns the ERROR_PAYLOAD.