#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020-2024 by Murray Altheim. All rights reserved. This file is part
# of the Robot Operating System project, released under the MIT License. Please
# see the LICENSE file included as part of this package.
#
# author: Murray Altheim
# created: 2021-06-29
# modified: 2025-06-27
#
# ConfigurationError, IllegalStateError and MissingComponentError at bottom
from threading import Lock
from collections import OrderedDict
from core.logger import Logger
from core.util import Util
from colorama import init, Fore, Style
init()
import core.globals as globals
globals.init()
[docs]
class Component(object):
'''
A basic component providing support for enable or disable, suppress or
release, and close flags. The enable/disable and suppress/release differ
in that in a disabled state a Component does not operate at all, whereas
in a suppressed state it operates as normal but cannot send or receive
messages. This functionality is provided solely as flags, not enforced by
this class.
The Logger is passed in as an argument on the constructor. This is only
implicitly an abstract class, (not explicitly) because while we expect it
to be subclassed, but there is no reason to enforce an API or reimplement
methods unless to hook additional functionality to them.
The Component is suppressed and disabled by default, though via optional
constructor arguments either can set be set to ``True``.
All Components are automatically added to the ComponentRegistry, which is
an alternative means of gaining access to them within the application, by
name.
:param logger: the Logger used for the Component
'''
def __init__(self, logger, suppressed=True, enabled=False):
if not isinstance(logger, Logger):
raise ValueError('wrong type for logger argument: {}'.format(type(logger)))
self._log = logger
if not isinstance(suppressed, bool):
raise ValueError('wrong type for suppressed argument: {}'.format(type(suppressed)))
self._suppressed = suppressed
if not isinstance(enabled, bool):
raise ValueError('wrong type for enabled argument: {}'.format(type(enabled)))
if not globals.has('component-registry'):
self._registry = ComponentRegistry(logger.level)
globals.put('component-registry', self._registry)
self._registry = globals.get('component-registry')
self._registry.add(logger.name, self)
self._enabled = enabled
self._closed = False
@property
def classname(self):
'''
Return the name of this Component's class.
'''
return type(self).__name__
@property
def enabled(self):
'''
Return the enabled state of this Component.
'''
return self._enabled
@property
def disabled(self):
'''
Return the disabled state of this Component.
This is a convenience method.
'''
return not self._enabled
@property
def suppressed(self):
'''
Return True if this Component is suppressed.
'''
return self._suppressed
@property
def is_active(self):
'''
A convenience method that returns True if this Component is enabled
and released (i.e., not suppressed).
'''
return self.enabled and not self.suppressed
@property
def closed(self):
'''
Returns True if this Component is closed.
'''
return self._closed
[docs]
def enable(self):
'''
Enable this Component.
'''
if not self.closed:
self._enabled = True
self._log.debug('enabled.')
else:
self._log.warning('cannot enable: already closed.')
[docs]
def suppress(self):
'''
Suppresses this Component.
'''
self._suppressed = True
self._log.debug('suppressed.')
[docs]
def release(self):
'''
Releases (un-suppresses) this Component.
'''
self._suppressed = False
self._log.debug('released.')
[docs]
def disable(self):
'''
Disable this Component.
This returns a True value to force currency.
'''
if self.enabled:
self._enabled = False
self._log.debug('disabled.')
else:
self._log.debug('already disabled.')
return True
[docs]
def close(self):
'''
Permanently close and disable the Component.
This returns a True value to force currency.
'''
if not self.closed:
_nil = self.disable()
self._closed = True
self._log.debug('closed.')
else:
self._log.debug('already closed.')
return True
[docs]
class ComponentRegistry(object):
'''
Maintains a registry of all Components, in the order in which they were created.
'''
def __init__(self, level):
self._log = Logger("comp-registry", level)
self._dict = OrderedDict()
[docs]
def add(self, name, component):
'''
Add a component to the registry using a unique name, raising a
ConfigurationError if a like-named component already exists in
the registry.
'''
if name in self._dict:
raise ConfigurationError('component \'{}\' already in registry.'.format(name))
else:
self._dict[name] = component
self._log.info('added component \'{}\' to registry ({:d} total).'.format(name, len(self._dict)))
[docs]
def get(self, name):
'''
Return the component by name.
'''
return self._dict.get(name)
[docs]
def print_registry(self):
'''
Print the registry to the log.
'''
_mutex = Lock()
with _mutex:
self._log.info('component list:')
for _name, _component in self._dict.items():
self._log.info(' {} {}'.format(_name, Util.repeat(' ', 16 - len(_name)))
+ Fore.YELLOW + '{}'.format(_component.classname)
+ Fore.CYAN + '{} {}'.format(Util.repeat(' ', 24 - len(_component.classname)), _component.enabled))
[docs]
def get_registry(self):
'''
Return the backing registry as a dict.
'''
return self._dict
[docs]
class ConfigurationError(RuntimeError):
'''
This exception is thrown when any error due to configuration occurs.
'''
def __init__(self, message):
super().__init__(message)
[docs]
class IllegalStateError(RuntimeError):
'''
Signals that a method has been invoked at an illegal or inappropriate time.
'''
def __init__(self, message):
super().__init__(message)
[docs]
class MissingComponentError(Exception):
'''
Thrown when a required component is not available.
'''
def __init__(self, message):
super().__init__(message)
#EOF