Source code for core.component

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2024 by Murray Altheim. All rights reserved. This file is part
# of the Robot Operating System project, released under the MIT License. Please
# see the LICENSE file included as part of this package.
#
# author:   Murray Altheim
# created:  2021-06-29
# modified: 2025-06-27
#
# ConfigurationError, IllegalStateError and MissingComponentError at bottom

from threading import Lock
from collections import OrderedDict
from core.logger import Logger
from core.util import Util
from colorama import init, Fore, Style
init()

import core.globals as globals
globals.init()

[docs] class Component(object): ''' A basic component providing support for enable or disable, suppress or release, and close flags. The enable/disable and suppress/release differ in that in a disabled state a Component does not operate at all, whereas in a suppressed state it operates as normal but cannot send or receive messages. This functionality is provided solely as flags, not enforced by this class. The Logger is passed in as an argument on the constructor. This is only implicitly an abstract class, (not explicitly) because while we expect it to be subclassed, but there is no reason to enforce an API or reimplement methods unless to hook additional functionality to them. The Component is suppressed and disabled by default, though via optional constructor arguments either can set be set to ``True``. All Components are automatically added to the ComponentRegistry, which is an alternative means of gaining access to them within the application, by name. :param logger: the Logger used for the Component ''' def __init__(self, logger, suppressed=True, enabled=False): if not isinstance(logger, Logger): raise ValueError('wrong type for logger argument: {}'.format(type(logger))) self._log = logger if not isinstance(suppressed, bool): raise ValueError('wrong type for suppressed argument: {}'.format(type(suppressed))) self._suppressed = suppressed if not isinstance(enabled, bool): raise ValueError('wrong type for enabled argument: {}'.format(type(enabled))) if not globals.has('component-registry'): self._registry = ComponentRegistry(logger.level) globals.put('component-registry', self._registry) self._registry = globals.get('component-registry') self._registry.add(logger.name, self) self._enabled = enabled self._closed = False @property def classname(self): ''' Return the name of this Component's class. ''' return type(self).__name__ @property def enabled(self): ''' Return the enabled state of this Component. ''' return self._enabled @property def disabled(self): ''' Return the disabled state of this Component. This is a convenience method. ''' return not self._enabled @property def suppressed(self): ''' Return True if this Component is suppressed. ''' return self._suppressed @property def is_active(self): ''' A convenience method that returns True if this Component is enabled and released (i.e., not suppressed). ''' return self.enabled and not self.suppressed @property def closed(self): ''' Returns True if this Component is closed. ''' return self._closed
[docs] def enable(self): ''' Enable this Component. ''' if not self.closed: self._enabled = True self._log.debug('enabled.') else: self._log.warning('cannot enable: already closed.')
[docs] def suppress(self): ''' Suppresses this Component. ''' self._suppressed = True self._log.debug('suppressed.')
[docs] def release(self): ''' Releases (un-suppresses) this Component. ''' self._suppressed = False self._log.debug('released.')
[docs] def disable(self): ''' Disable this Component. This returns a True value to force currency. ''' if self.enabled: self._enabled = False self._log.debug('disabled.') else: self._log.debug('already disabled.') return True
[docs] def close(self): ''' Permanently close and disable the Component. This returns a True value to force currency. ''' if not self.closed: _nil = self.disable() self._closed = True self._log.debug('closed.') else: self._log.debug('already closed.') return True
[docs] class ComponentRegistry(object): ''' Maintains a registry of all Components, in the order in which they were created. ''' def __init__(self, level): self._log = Logger("comp-registry", level) self._dict = OrderedDict()
[docs] def add(self, name, component): ''' Add a component to the registry using a unique name, raising a ConfigurationError if a like-named component already exists in the registry. ''' if name in self._dict: raise ConfigurationError('component \'{}\' already in registry.'.format(name)) else: self._dict[name] = component self._log.info('added component \'{}\' to registry ({:d} total).'.format(name, len(self._dict)))
[docs] def get(self, name): ''' Return the component by name. ''' return self._dict.get(name)
[docs] def print_registry(self): ''' Print the registry to the log. ''' _mutex = Lock() with _mutex: self._log.info('component list:') for _name, _component in self._dict.items(): self._log.info(' {} {}'.format(_name, Util.repeat(' ', 16 - len(_name))) + Fore.YELLOW + '{}'.format(_component.classname) + Fore.CYAN + '{} {}'.format(Util.repeat(' ', 24 - len(_component.classname)), _component.enabled))
[docs] def get_registry(self): ''' Return the backing registry as a dict. ''' return self._dict
[docs] class ConfigurationError(RuntimeError): ''' This exception is thrown when any error due to configuration occurs. ''' def __init__(self, message): super().__init__(message)
[docs] class IllegalStateError(RuntimeError): ''' Signals that a method has been invoked at an illegal or inappropriate time. ''' def __init__(self, message): super().__init__(message)
[docs] class MissingComponentError(Exception): ''' Thrown when a required component is not available. ''' def __init__(self, message): super().__init__(message)
#EOF