Source code for hardware.digital_pot_async

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2024 by Murray Altheim. All rights reserved. This file is part
# of the Robot Operating System project, released under the MIT License. Please
# see the LICENSE file included as part of this package.
#
# author:   Murray Altheim
# created:  2020-09-19
# modified: 2025-06-09
#
# A simplified, asynchronous version of the DigitalPotentiometer class, used
# for testing.
#

import time
import asyncio
import traceback
import threading
import colorsys
from math import isclose
from colorama import init, Fore, Style
init()

import ioexpander as io
from core.logger import Logger, Level

# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[docs] class DigitalPotentiometer: I2C_ADDR = 0x0C PIN_RED = 1 PIN_GREEN = 7 PIN_BLUE = 2 POT_ENC_A = 12 POT_ENC_B = 3 POT_ENC_C = 11 BRIGHTNESS = 0.5 PERIOD = int(255 / BRIGHTNESS) def __init__(self, fps=30, multiplier=100.0, level=Level.INFO): self._log = Logger('pot', level) self._max = 3.3 self._multiplier = multiplier self._red = 0 self._green = 0 self._blue = 0 self._fps = fps self._task = None self._loop = None self._use_deadzone = True # if near 50% disable LED self._loop_thread = None self._stop_event = threading.Event() try: self._ioe = io.IOE(i2c_addr=DigitalPotentiometer.I2C_ADDR) self._ioe.set_mode(DigitalPotentiometer.POT_ENC_A, io.PIN_MODE_PP) self._ioe.set_mode(DigitalPotentiometer.POT_ENC_B, io.PIN_MODE_PP) self._ioe.set_mode(DigitalPotentiometer.POT_ENC_C, io.ADC) self._ioe.output(DigitalPotentiometer.POT_ENC_A, 1) self._ioe.output(DigitalPotentiometer.POT_ENC_B, 0) self._ioe.set_pwm_period(DigitalPotentiometer.PERIOD) self._ioe.set_pwm_control(divider=2) self._ioe.set_mode(DigitalPotentiometer.PIN_RED, io.PWM, invert=True) self._ioe.set_mode(DigitalPotentiometer.PIN_GREEN, io.PWM, invert=True) self._ioe.set_mode(DigitalPotentiometer.PIN_BLUE, io.PWM, invert=True) self._log.info("running LED with {} brightness steps.".format(int(DigitalPotentiometer.PERIOD * DigitalPotentiometer.BRIGHTNESS))) self._log.info('ready.') except Exception as e: self._log.info('{} raised: {}'.format(type(e), e)) # ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ @property def analog(self): return self._ioe.input(self.POT_ENC_C) # ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ @property def value(self): ''' Return the analog value (voltage) from the ADC pin. ''' _value = max(0.0, min(self.analog, self._max)) return 1.0 - (_value / self._max) # ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ @property def normalised_value(self): ''' Return a normalised int value between -100 and 100, or the multiplier if changed in the constructor. ''' raw_value = (self.value * 2) - 1 # scale 0..1 to -1..+1 if self._use_deadzone and isclose(raw_value, 0.0, abs_tol=0.02): return 0 else: return int(self._multiplier * raw_value) # ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ @property def rgb(self): return self._red, self._green, self._blue # ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ @property def data(self): return self.normalised_value, self._red, self._green, self._blue # ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈ def _update(self): hue = max(0.0, min(self.analog / self._max, 1.0)) if self._use_deadzone and isclose(hue, 0.5, abs_tol=0.01): self._red = self._green = self._blue = 0.0 self._ioe.output(self.PIN_RED, 0) self._ioe.output(self.PIN_GREEN, 0) self._ioe.output(self.PIN_BLUE, 0) else: self._red, self._green, self._blue = [int(c * self.PERIOD * self.BRIGHTNESS) for c in colorsys.hsv_to_rgb(hue, 1.0, 1.0)] self._ioe.output(self.PIN_RED, self._red) self._ioe.output(self.PIN_GREEN, self._green) self._ioe.output(self.PIN_BLUE, self._blue) # ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈
[docs] def start(self): if self._task is not None: self._log.warning('already running.') return # already running def run_loop(): self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) async def runner(): self._task = asyncio.create_task(self._run()) await self._task # wait for _run() to complete try: self._loop.run_until_complete(runner()) except Exception as e: self._log.error('{} raised in pot run loop: {}\n{}'.format(type(e), e, traceback.format_exc())) finally: self._loop.close() self._stop_event.clear() self._loop_thread = threading.Thread(target=run_loop, daemon=True) self._log.info('starting loop…') self._loop_thread.start()
async def _run(self): delay = 1.0 / self._fps while not self._stop_event.is_set(): self._update() await asyncio.sleep(delay) # ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈
[docs] def off(self): self._log.info('off.') self._ioe.output(self.PIN_RED, 0) self._ioe.output(self.PIN_GREEN, 0) self._ioe.output(self.PIN_BLUE, 0)
# ┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈
[docs] def stop(self): if self._task is None: return # not running self._stop_event.set() # wait for the loop thread to finish gracefully if self._loop_thread: self._loop_thread.join() self._task = None self._loop = None self._loop_thread = None
#EOF