Source code for hardware.brushless_motor

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2025 by Murray Altheim. All rights reserved. This file is part
# of the Robot Operating System project, released under the MIT License. Please
# see the LICENSE file included as part of this package.
#
# author:   Murray Altheim
# created:  2025-06-02
# modified: 2025-06-09
#
# A controller for a DFRobot Brushless DC Motor, which contains its own internal
# hardware controller and an 'FG' feedback pin permitting closed-loop control.
# Its PWM is inverted: 100% is stopped, 0% is full speed.
#
# This motor controller includes support for open- or closed-loop control,
# stall-and-recovery, deadband control, and control by target RPM when operating
# in closed-loop mode. Given a wheel diameter this also provides for odometric
# distance and speed measurements.
#
# There are three PWM controller implementations: software PWM, hardware PWM,
# and using a TI TLC59711 PWM controller.
#
# Status: note that this file is no longer maintained, and is only currently
# suitable for use with for one motor.

import time
import traceback
import asyncio
import itertools
from enum import Enum
from threading import Thread
from math import pi as π
from datetime import datetime as dt
import pigpio
from colorama import init, Fore, Style
init()

from core.logger import Logger, Level
from hardware.controller_channel import ControllerChannel
from hardware.slew_limiter import SlewLimiter
from hardware.pwm_controller_impl import PWMControllerImpl
from hardware.tlc59711 import TLC59711

# ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈
[docs] class BrushlessMotor: # defaults: PWM_GPIO_PIN = 18 DIR_GPIO_PIN = 23 FG_ENCODER_PIN = 24 DIRECTION_FORWARD = 1 DIRECTION_REVERSE = 0 WHEEL_DIAMETER_MM = 70 # calibration constant: millimeters traveled per 1% speed per second CALIBRATION_MM_PER_PERCENT_PER_SEC = 5.48 # based on 10 rotations being 2199mm ''' When operating in open loop mode, speeds are percentages between -100 and 100. When operating in closed loop mode, speeds are translated from percentages to RPM values between -159 and 159. Both are called via set_speed(). You can also call set_target_rpm() directly. Note that stall recovery is only enabled in closed loop mode. :param pi: the pigpio instance :param name: the optional motor name :param config: the application configuration (dict) :param pwm_pin: the GPIO pin used for PWM :param dir_pin: the GPIO pin used to set the motor direction :param closed_loop_enabled: if True, operates in closed loop mode. If None is provided, uses configuration instead :param level: the log level ''' def __init__(self, *, pi=None, name='ctrl', config=None, pwm_pin=None, dir_pin=None, closed_loop_enabled=True, pwm_impl: PWMControllerImpl = PWMControllerImpl.HARDWARE_CONTROLLER, tlc_controller: TLC59711 = None, channel: ControllerChannel = None, level=Level.INFO): self._log = Logger('motor-{}'.format(name), level=level) try: if pi is None: raise ValueError('no pi instance provided.') self._pi = pi if config is None: raise ValueError('no configuration provided.') _cfg = config['kros'].get('hardware').get('motor_controller') # configuration ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._verbose = _cfg.get('verbose', True) _closed_loop_enabled = _cfg.get('closed_loop_enabled', True) self._closed_loop_enabled = _closed_loop_enabled if closed_loop_enabled is None else closed_loop_enabled self._pwm_pin = _cfg.get('pwm_pin', self.PWM_GPIO_PIN) self._dir_pin = _cfg.get('dir_pin', self.DIR_GPIO_PIN) self._pi.set_mode(self._dir_pin, pigpio.OUTPUT) # set motor direction pin as output self._encoder_pin = _cfg.get('encoder_pin', self.FG_ENCODER_PIN) self._pi.set_mode(self._encoder_pin, pigpio.INPUT) self._log.info("configuration pins: " + Style.BRIGHT + 'pwm={}; direction={}; encoder={}'.format(self._pwm_pin, self._dir_pin, self._encoder_pin)) self._pi.set_pull_up_down(self._encoder_pin, pigpio.PUD_OFF) # establish callback from FB on GPIO pin using falling edge detection self._callback = self._pi.callback(self._encoder_pin, pigpio.FALLING_EDGE, self._fg_callback) self._pwm_freq = _cfg.get('pwm_freq', 25000) self._accel_delay = _cfg.get('accel_delay_sec', 0.1) self._enable_ramping = _cfg.get('enable_ramping', True) self._ramp_step = _cfg.get('ramp_step', 20) self._kickstart_speed = _cfg.get('kickstart_speed', 14) # speed threshold to kickstart motor from zero self._feedback_interval = _cfg.get('feedback_interval', 0.05) # seconds between corrections self._kp = _cfg.get('kp', 0.1) self._ki = _cfg.get('ki', 0.0) self._kd = _cfg.get('kd', 0.0) # hardare & geometry ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._gear_ratio = _cfg.get('gear_ratio', 45) # from motor spec on DFRobot page self._pulses_per_motor_rev = _cfg.get('pulses_per_motor_rev', 6) self._pulses_per_output_rev = self._pulses_per_motor_rev * self._gear_ratio # 270 _wheel_diameter = _cfg.get('wheel_diameter_mm', self.WHEEL_DIAMETER_MM) self._circumference_mm = _wheel_diameter * π # slew limiter ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ _max_delta_rpm_per_sec = _cfg.get('max_delta_rpm_per_sec', 120.0) # used for RPM slew limiter self._rpm_limiter = SlewLimiter(max_delta_per_sec=_max_delta_rpm_per_sec) _max_delta_speed_per_sec = _cfg.get('max_delta_speed_per_sec', 100.0) # used for speed slew limiter self._speed_limiter = SlewLimiter(max_delta_per_sec=_max_delta_speed_per_sec) self._slew_limiter_enabled = _cfg.get('slew_limiter_enabled', True) # deadband configuration ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._stall_grace_period_ms = _cfg.get('stall_grace_period_ms', 400) # or tweak to ~500ms self._stall_timeout_ms = _cfg.get('stall_timeout_ms', 300) # duration with no pulses to declare stall (in ms) self._deadband_rpm = _cfg.get('deadband_rpm', 6) # don't try speed less than this self._fixed_deadband = _cfg.get('fixed_deadband', True) self._dyn_deadband = _cfg.get('dynamic_deadband', 5) # dynamic deadband as percentage # variables ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._current_speed = 0 # current speed (-100 to 100) self._target_speed = 0 # target speed (-100 to 100) self._start_time = None # when current motion started self._stop_time = None # when motor stopped or last speed change self._enabled = True # open or closed loop support ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._pulse_count = 0 self._last_tick = None self._pulse_intervals = [] self._max_interval_buffer = 10 # average over last N intervals self._rpm_errors = [] # for calculating PID performance self._feedback_task = None # async task self._direction = self.DIRECTION_FORWARD self._rpm = 0 # odometry ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._cumulative_distance_mm = 0.0 self._initial_distance = 0.0 self._target_distance = None self._distance_target_reached = False self._distance_future = None # stall & recovery management ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._target_rpm = 0 # target RPM (used in closed-loop mode) self._is_stalled = False self._last_target_rpm_set_dt = None self._last_tick_dt = None # timestamp of last FG pulse # PWM controller implementation ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._log.info("using {} PWM controller.".format(pwm_impl.name)) self._pwm_controller = pwm_impl.create( pi=self._pi, config=config, pin=self._pwm_pin, freq=self._pwm_freq, tlc_controller=tlc_controller, channel=channel, level=level ) # callback handlers ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ self._on_stall_callback = None self._on_recovery_callback = None if self.closed_loop_enabled: self.set_on_stall(self._handle_stall) self.set_on_recovery(self._handle_recovery) # asyncio event loop in background thread ┈┈┈┈┈┈ self._loop = asyncio.new_event_loop() self._loop, self._loop_thread = self._start_event_loop_thread() # start periodic stall monitor asyncio.run_coroutine_threadsafe(self._stall_monitor(), self._loop) self._log.info('ready.') except Exception as e: self._log.error('{} raised initialising motor controller: {}\n{}'.format(type(e), e, traceback.format_exc())) def _start_event_loop_thread(self): loop = asyncio.new_event_loop() thread = Thread(target=loop.run_forever, daemon=True) thread.start() return loop, thread
[docs] def enable(self): self._enabled = True
[docs] def disable(self): self._enabled = False
# properties ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ @property def enabled(self): return self._enabled @property def closed_loop_enabled(self): ''' Returns True if closed loop mode is enabled. ''' return self._closed_loop_enabled @property def measured_rpm(self): ''' Returns the RPM indicated by counting the pulses coming from FG, the motor feedback pin. Returns None if running in open loop mode. ''' if self._rpm is None: self._log.warning("RPM not yet calculated.") return 0.0 # apply direction sign dynamically when reading return -abs(self._rpm) if self._direction == self.DIRECTION_REVERSE else abs(self._rpm) @property def measured_mm_per_sec(self): ''' Returns the current measured speed in mm/sec using the formula: linear speed = wheel circumference * RPM / 60 Returns None if running in open loop mode. ''' return (self._circumference_mm * self.measured_rpm) / 60.0 if self._closed_loop_enabled else None @property def cumulative_distance_mm(self): ''' Return the cumulative distance in millimeters. Returns None if running in open loop mode. ''' return self._cumulative_distance_mm if self._closed_loop_enabled else None # stall & recovery support ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈
[docs] def set_on_stall(self, callback): self._on_stall_callback = callback
[docs] def set_on_recovery(self, callback): self._on_recovery_callback = callback
def _handle_stall(self): self._log.info("stall detected — attempting recovery…") self._stop_rpm_control() # pause PID temporarily asyncio.run_coroutine_threadsafe(self._attempt_recovery(), self._loop) async def _attempt_recovery(self): if abs(self._target_rpm) < self._deadband_rpm: self._log.warning("recovery aborted — target RPM below minimum threshold.") return self._log.info("attempting stall recovery…") direction = 1 if self._target_rpm >= 0 else -1 ramp_speeds = range(20, 80, 10) # Try 20% to 70% in steps for s in ramp_speeds: speed = direction * s self._log.info(f"Trying recovery speed: {speed}%") await self._apply_pwm_async(speed) await asyncio.sleep(0.3) # if FG pulses resume, _handle_recovery() will be called # so we just exit and let PID resume now = dt.now() if self._last_tick_dt and (now - self._last_tick_dt).total_seconds() < 0.3: self._log.info("Motor response detected during recovery.") return self._log.warning("Recovery ramp failed — no FG detected.") def _handle_recovery(self): self._log.info("motor has recovered and is moving again.") if self._target_rpm > 0.0: # smooth re-ramp to target RPM after recovery asyncio.run_coroutine_threadsafe( self._resume_after_recovery(), self._loop ) async def _resume_after_recovery(self): self._log.info("smoothly resuming PID after recovery.") await self._apply_pwm_async(0) # reset to zero (optional) await asyncio.sleep(0.2) self._set_target_rpm(self._target_rpm) def _check_stall(self): now = dt.now() # cooldown window: ignore stall detection immediately after RPM changes if self._last_target_rpm_set_dt: since_change_ms = (now - self._last_target_rpm_set_dt).total_seconds() * 1000 if since_change_ms < self._stall_grace_period_ms: return # skip checking # determine if we should be moving target_moving = ( (self.closed_loop_enabled and self._target_rpm > 0.0 and abs(self._target_rpm) >= self._deadband_rpm) or (not self.closed_loop_enabled and self._target_speed and abs(self._target_speed) >= 10) ) if not target_moving: self._is_stalled = False return if not hasattr(self, '_last_tick_dt') or self._last_tick_dt is None: return # haven’t seen a pulse yet elapsed_ms = (now - self._last_tick_dt).total_seconds() * 1000 if elapsed_ms > self._stall_timeout_ms: if not self._is_stalled: self._is_stalled = True self._log.warning("STALL DETECTED: no FG pulse in {:.1f}ms.".format(elapsed_ms)) if self._on_stall_callback: self._on_stall_callback() else: if self._is_stalled: self._is_stalled = False self._log.info("RECOVERY DETECTED: FG pulses resumed.") # reset speed and PID state to prevent jerk on recovery self._current_speed = 0 self._reset_pid_state() if self._on_recovery_callback: self._on_recovery_callback() def _reset_pid_state(self): ''' Reset PID control state variables. ''' self._integral_error = 0 self._last_error = 0 async def _stall_monitor(self): while True: self._check_stall() await asyncio.sleep(0.1) # check every 100 ms
[docs] def enable_closed_loop(self, enable=True): self._closed_loop_enabled = enable
[docs] def set_speed(self, speed, target_mm=0): ''' Sets the target speed for both open and closed loop mode. ''' if self.closed_loop_enabled: if speed < -100 or speed > 100: raise ValueError("Percentage must be between -100 and 100") # convert percentage to RPM, where -100% corresponds to -159 RPM and 100% corresponds to 159 RPM rpm = (speed / 100) * 159 self._set_target_rpm(rpm, target_mm=target_mm) else: self._set_open_loop_speed(speed, target_mm=target_mm, poll_delay=0.01)
def _set_target_rpm(self, rpm, target_mm=0): if not self.enabled: self._log.warning('motor controller not enabled.') return elif not self._closed_loop_enabled: raise Exception("closed-loop mode must be enabled for RPM control.") elif rpm is None: raise ValueError("null rpm argument.") elif rpm == 0 or abs(rpm) < self._deadband_rpm: # stop if RPM is zero or within deadband self._log.debug("target RPM ({}) is 0 or below viable threshold.".format(rpm)) self.stop() self._target_rpm = 0.0 return if self._slew_limiter_enabled: limited_rpm = int(self._rpm_limiter.limit(rpm)) if self._verbose: if rpm != limited_rpm: self._log.info("set target RPM: " + Fore.GREEN + "{:>6.2f}".format(rpm) + Fore.CYAN + Style.DIM + " (limited to {:>6.2f})".format(limited_rpm)) else: self._log.info("set target RPM: " + Fore.GREEN + "{:>6.2f}".format(rpm)) rpm = limited_rpm else: self._log.info("set target RPM {}".format(rpm)) # setup target distance tracking if target_mm > 0: self._initial_distance = self.cumulative_distance_mm self._target_distance = self._initial_distance + target_mm else: self._initial_distance = None self._target_distance = None self._distance_target_reached = False self._target_rpm = rpm self._last_target_rpm_set_dt = dt.now() self._log.debug("closed loop target set to: " + Fore.GREEN + Style.DIM + "{} RPM".format(rpm)) # apply kickstart if starting from zero and below threshold if self.measured_rpm == 0 and abs(rpm) < self._kickstart_speed: direction = 1 if rpm >= 0 else -1 kick_speed = direction * self._kickstart_speed asyncio.run_coroutine_threadsafe(self._apply_kickstart(kick_speed), self._loop).result() self._reset_pid_state() time.sleep(0.1) # start feedback coroutine if not already running if self._feedback_task is None: try: self._log.debug("scheduling feedback coroutine…") future = asyncio.run_coroutine_threadsafe(self._run_rpm_feedback(), self._loop) self._feedback_task = future self._log.debug("feedback coroutine started successfully.") except Exception as e: self._log.error("{} raised starting feedback coroutine: {}".format(type(e), e)) async def _apply_kickstart(self, speed): if self._verbose: self._log.info("applying kickstart for low RPM startup…") await self._apply_pwm_async(speed) await asyncio.sleep(0.3) # brief burst async def _run_rpm_feedback(self): if self._verbose: self._log.info(Fore.WHITE + "starting RPM feedback task…") self._integral_error = 0 self._last_error = 0 self._last_target_rpm = None dt = self._feedback_interval max_adjustment = 10 # max PID output adjustment (%) try: counter = itertools.count(1) while abs(self._target_rpm) > 0.0: # Reset integral if target RPM sign changes if self._last_target_rpm is None or (self._last_target_rpm * self._target_rpm < 0): self._integral_error = 0 self._last_target_rpm = self._target_rpm # check distance target if set if self._target_distance is not None and not self._distance_target_reached: if self._check_distance_target_reached(): self._log.info( Fore.MAGENTA + "Target distance {:.1f} mm reached, stopping motor.".format( abs(self._target_distance - self._initial_distance) ) ) self.stop() break adjustment, error = self._compute_pid_adjustment(dt, max_adjustment) new_speed = self._compute_new_speed(adjustment) if self._verbose: if next(counter) % 4 == 0: self._log.info( "PID control target set to: " + Fore.GREEN + "{} RPM ({} measured); ".format(self._target_rpm, int(self.measured_rpm)) + Fore.YELLOW + "distance: {:.1f}mm; ".format(self.cumulative_distance_mm) + Fore.WHITE + "Kp={:.2f}; Ki={:.2f}; Kd={:.2f}; ".format(self._kp, self._ki, self._kd) + Fore.CYAN + Style.DIM + "error={:.2f}, i={:.2f}, adj={:.2f}, new_speed={:.2f}".format( error, self._integral_error, adjustment, new_speed ) ) await self._apply_pwm_async(new_speed) await asyncio.sleep(dt) except asyncio.CancelledError: if self._verbose: self._log.info(Fore.WHITE + Style.DIM + "RPM feedback task cancelled.") def _check_distance_target_reached(self): current = self.cumulative_distance_mm start = self._initial_distance target = self._target_distance if (self._target_rpm > 0 and current >= target) or (self._target_rpm < 0 and current <= target): self._distance_target_reached = True return True return False def _compute_pid_adjustment(self, dt, max_adjustment): error = self._target_rpm - self.measured_rpm self._integral_error += error * dt derivative = (error - self._last_error) / dt if dt > 0 else 0 self._last_error = error adjustment = self._kp * error + self._ki * self._integral_error + self._kd * derivative adjustment = self._clamp(adjustment, -max_adjustment, max_adjustment) if abs(adjustment) > 20: self._log.warning("Huge PID adjustment: {:.2f}".format(adjustment)) return adjustment, error def _compute_new_speed(self, adjustment): new_speed = self._clamp(self._current_speed + adjustment, -100, 100) if self._fixed_deadband: new_speed = self._apply_deadband(new_speed, self._deadband_rpm) else: threshold = max(1, self._dyn_deadband * (self._target_rpm / 100)) new_speed = self._apply_deadband(new_speed, threshold) return new_speed def _clamp(self, value, min_val, max_val): return max(min_val, min(max_val, value)) def _apply_deadband(self, value, threshold): return 0 if abs(value) < threshold else value def _stop_rpm_control(self): if self.closed_loop_enabled and self._feedback_task: self._feedback_task.cancel() self._feedback_task = None self._log.debug("closed-loop RPM control disabled.") def _fg_callback(self, gpio, level, tick): ''' Callback for open or closed loop control. ''' # self._log.info(Style.DIM + "tick: {}".format(tick)) self._pulse_count += 1 self._last_tick_dt = dt.now() # record time of this tick if self._last_tick is not None: interval = pigpio.tickDiff(self._last_tick, tick) # in microseconds self._pulse_intervals.append(interval) if len(self._pulse_intervals) > self._max_interval_buffer: self._pulse_intervals.pop(0) self._calculate_rpm() # update cumulative distance based on direction distance_per_pulse = self._circumference_mm / self._pulses_per_output_rev if self._direction == self.DIRECTION_FORWARD: self._cumulative_distance_mm += distance_per_pulse else: self._cumulative_distance_mm -= distance_per_pulse self._check_stall() self._last_tick = tick def _calculate_rpm(self): if not self._pulse_intervals: self._rpm = 0.0 self._log.debug("no pulse intervals available, RPM set to 0.") return avg_interval_us = sum(self._pulse_intervals) / len(self._pulse_intervals) if avg_interval_us == 0: self._rpm = 0.0 self._log.info("average pulse interval is zero, RPM set to 0.") return pulses_per_minute = 60_000_000 / avg_interval_us output_shaft_rpm = pulses_per_minute / self._pulses_per_output_rev # adjust RPM sign based on current direction if self._direction == self.DIRECTION_REVERSE: output_shaft_rpm = -abs(output_shaft_rpm) else: output_shaft_rpm = abs(output_shaft_rpm) self._rpm = output_shaft_rpm self._log.debug("measured RPM: {:.2f}".format(self._rpm))
[docs] def get_distance_future(self): ''' Used to create blocking return while running the task. ''' return self._distance_future
def _start_loop(self): self._log.info(Fore.GREEN + 'starting asyncio loop…') asyncio.set_event_loop(self._loop) self._loop.run_forever()
[docs] def stop_loop(self): ''' Stops the asyncio loop, to be used upon closing. ''' self._log.info(Fore.YELLOW + 'stopping asyncio loop…') self._loop.stop() self._log.info(Fore.YELLOW + "event loop stopped..")
[docs] def reset_distance(self): ''' Reset odometry tracking to zero. ''' self._start_time = dt.now() self._stop_time = None self._pulse_count = 0 # reset session pulse count for get_distance_mm() self._log.info("distance reset.")
[docs] def get_distance_mm(self): ''' Calculate distance traveled in millimeters based on elapsed time and current speed using the calibration constant. ''' if self.closed_loop_enabled: rotations = self._pulse_count / self._pulses_per_output_rev self._log.info(Style.DIM + "{:.2f} wheel rotations.".format(rotations)) return rotations * self._circumference_mm else: if self._start_time is None: return 0.0 end_time = self._stop_time or dt.now() elapsed = (end_time - self._start_time).total_seconds() # Distance = speed (%) * elapsed time (sec) * calibration constant (mm/%/sec) distance = abs(self._target_speed) * elapsed * self.CALIBRATION_MM_PER_PERCENT_PER_SEC return distance
def _set_open_loop_speed(self, speed, target_mm=0, poll_delay=0.01): ''' Set motor speed (-100 to 100) in open loop. If target_mm > 0, run until distance reached then stop. ''' if not self.enabled: self._log.warning('motor controller not enabled.') return # clamp speed to [-100, 100] speed = max(-100, min(100, speed)) if speed == 0: self._stop_time = dt.now() self._target_speed = 0 self._apply_pwm_sync(0) return if self._slew_limiter_enabled: _limited_speed = self._speed_limiter.limit(speed) if self._verbose: self._log.info("set target speed: " + Fore.GREEN + "{:>6.2f}".format(speed) + Fore.CYAN + Style.DIM + " (limited to {:>6.2f})".format(_limited_speed)) speed = _limited_speed elif self._verbose: self._log.info("set target speed: " + Fore.GREEN + "{:>6.2f}".format(speed)) if target_mm > 0: self.reset_distance() self._target_speed = speed self._start_time = dt.now() self._apply_pwm_sync(speed) self._log.info("speed set to {:+.2f}%, running to target {:.1f}mm".format(speed, target_mm)) # run stop-at-distance coroutine in background self._log.info("scheduling feedback coroutine.") future = asyncio.run_coroutine_threadsafe( self._stop_at_distance_async(target_mm, poll_delay), self._loop ) # holds a concurrent.futures.Future representing the scheduled coroutine. self._distance_future = future else: self._target_speed = speed self._start_time = dt.now() self._apply_pwm_sync(speed) self._distance_future = None self._calculate_rpm() async def _stop_at_distance_async(self, target_mm, poll_delay): self._log.info(Fore.MAGENTA + "monitoring distance to stop at {:.1f}mm".format(target_mm)) try: counter = itertools.count(1) while self.get_distance_mm() < target_mm: if next(counter) % 10 == 0: # print every 10th iteration self._log.info(Fore.MAGENTA + "current distance: {:.1f}mm".format(self.get_distance_mm())) await asyncio.sleep(poll_delay) except Exception as e: self._log.error("{} raised by _stop_at_distance_async: {}".format(type(e), e)) finally: self.stop() self._log.info(Fore.MAGENTA + "target distance {:.1f}mm reached, motor stopped.".format(target_mm)) def _apply_pwm_sync(self, speed): ''' Synchronously ramps motor speed to target, sending PWM signals with delay between steps. ''' if self._enable_ramping: for _speed in self._ramp_speeds(speed): self._send_pwm(_speed) self._log.debug("_apply_pwm_sync() ramping to: {:3.2f} (direction: {})".format(_speed, "FORWARD" if self._direction == self.DIRECTION_FORWARD else "REVERSE")) time.sleep(self._accel_delay) self._log.debug("_apply_pwm_sync() final speed: {:3.2f} (direction: {})".format(_speed, "FORWARD" if self._direction == self.DIRECTION_FORWARD else "REVERSE")) else: self._log.debug("_apply_pwm_sync() speed: {:3.2f} (direction: {})".format(_speed, "FORWARD" if self._direction == self.DIRECTION_FORWARD else "REVERSE")) self._send_pwm(_speed) async def _apply_pwm_async(self, speed): ''' Asynchronously ramps motor speed to target, sending PWM signals with async delays between steps. This is the older version that uses _ramp_speeds(). ''' if self._enable_ramping: async def async_sleep(): await asyncio.sleep(self._accel_delay) for _speed in self._ramp_speeds(speed): self._send_pwm(_speed) self._log.debug("_apply_pwm_async() ramping to: {:3.2f} (direction: {})".format(_speed, "FORWARD" if self._direction == self.DIRECTION_FORWARD else "REVERSE")) await async_sleep() self._log.debug("_apply_pwm_async() final speed: {:3.2f} (direction: {})".format(_speed, "FORWARD" if self._direction == self.DIRECTION_FORWARD else "REVERSE")) else: self._log.debug("_apply_pwm_async() speed: {:3.2f} (direction: {})".format(_speed, "FORWARD" if self._direction == self.DIRECTION_FORWARD else "REVERSE")) self._send_pwm(_speed) def _send_pwm(self, speed): ''' Send PWM and direction to the motor. :param speed: signed percentage of speed (magnitude used for PWM duty) ''' self._direction = self.DIRECTION_FORWARD if speed >= 0 else self.DIRECTION_REVERSE self._current_speed = speed abs_speed = abs(speed) if abs_speed == 0: self._pwm_controller.stop_pwm() self._pi.write(self._dir_pin, self.DIRECTION_FORWARD) # default to forward when stopped if self._verbose: self._log.debug("motor stopped from PWM.") else: self._pi.write(self._dir_pin, self._direction) self._pwm_controller.set_pwm(abs_speed) if self._verbose: self._log.debug("speed set to {:.2f}% (direction: {})".format( speed, ('FORWARD' if self._direction == self.DIRECTION_FORWARD else 'REVERSE'))) def _ramp_speeds(self, speed): ''' Generator yielding intermediate speeds from current to target with slew limiting. :param speed: target speed for ramp ''' speed = self._clamp(speed, -100, 100) if self._ramp_step <= 0: yield speed return delta = speed - self._current_speed if abs(delta) <= self._ramp_step: yield speed return step = self._ramp_step if delta > 0 else -self._ramp_step current = self._current_speed while abs(speed - current) > abs(step): current += step current = self._clamp(current, min(speed, current), max(speed, current)) yield current yield speed
[docs] def accelerate(self, start_speed, end_speed, step_speed, delay=None): ''' Gradually accelerate from start_speed to end_speed, with set delay between steps. ''' if delay is None: delay = self._accel_delay start_speed = max(-100, min(100, start_speed)) end_speed = max(-100, min(100, end_speed)) step_speed = abs(step_speed) if start_speed == end_speed: self.set_speed(start_speed) return self._log.info("ramp speed from {} to {} with step {} and delay {}.".format(start_speed, end_speed, step_speed, delay)) direction = 1 if end_speed > start_speed else -1 current_speed = start_speed while (direction == 1 and current_speed <= end_speed) or (direction == -1 and current_speed >= end_speed): self.set_speed(current_speed) time.sleep(delay) current_speed += direction * step_speed self.set_speed(end_speed)
[docs] def stop(self): ''' Stop motor immediately. ''' self._close_distance_future() self._stop_rpm_control() self._rpm_limiter.reset() self._speed_limiter.reset() self._reset_pid_state() self._target_rpm = 0.0 self._target_speed = 0 self._apply_pwm_sync(0) self._reset_fg_state() self._log.debug("motor stopped.")
def _close_distance_future(self): if self._distance_future and not self._distance_future.done(): self._log.info("cancelling distance monitoring task…") self._distance_future.cancel() self._distance_future = None def _reset_fg_state(self): ''' Reset all FG (feedback) pulse tracking state. ''' self._rpm = 0.0 # reset measured RPM self._last_tick = None self._pulse_intervals.clear() self._pulse_count = 0 # optional, if you track odometry elsewhere
[docs] def close(self): ''' Stop motor and clean up resources. ''' self.stop() self.disable() if self._callback: if self._verbose: self._log.info("cancelling callback…") self._callback.cancel() # cancel all tasks for task in asyncio.all_tasks(self._loop): task.cancel() # stop the event loop self._loop.call_soon_threadsafe(self._loop.stop) self._loop_thread.join() self._pi.write(self._dir_pin, 0) self._pwm_controller.stop_pwm() self._log.info("motor closed.")
#EOF