Source code for eagerx.core.specs

from warnings import warn
import copy
from typing import Dict, Optional, Union, Type, TYPE_CHECKING, List
import gymnasium as gym
from yaml import dump
import numpy as np

import eagerx
from eagerx.core.view import SpecView
from eagerx.utils.utils import (
    replace_None,
    deepcopy,
)
from eagerx.utils.utils_sub import substitute_args


if TYPE_CHECKING:
    from eagerx.core.entities import Engine
    from eagerx import EngineGraph


class EntitySpec(object):
    def __init__(self, params):
        super(EntitySpec, self).__setattr__("_params", params)

    def __setattr__(self, name, value):
        raise AttributeError("You cannot set the new attributes to EntitySpec.")

    def __str__(self):
        return dump(self._params)

    @property
    @deepcopy
    def params(self):
        return self._params


[docs]class BackendSpec(EntitySpec): """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the selected backend.""" def initialize(self, spec_cls): pass @property def config(self) -> SpecView: """Provides an API to get/set the parameters to initialize. :return: (mutable) API to get/set parameters. """ return SpecView(self, depth=["config"], unlocked=True)
[docs]class ProcessorSpec(EntitySpec): """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the processor.""" def initialize(self, spec_cls): pass @property def config(self) -> SpecView: """Provides an API to get/set the parameters to initialize. :return: (mutable) API to get/set parameters. """ return SpecView(self, depth=[], unlocked=True)
[docs]class EngineStateSpec(EntitySpec): """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the engine state.""" def initialize(self, spec_cls): pass @property def config(self) -> SpecView: """Provides an API to get/set the parameters to initialize. :return: API to get/set parameters. """ return SpecView(self, depth=[], unlocked=True)
class BaseNodeSpec(EntitySpec): def __init__(self, params): super().__init__(params) def _lookup(self, depth, unlocked=False): name = self._params["config"]["name"] return SpecView(self, depth=[depth], name=name, unlocked=unlocked) @property def config(self) -> SpecView: """Provides an API to set/get the parameters to initialize. The default parameters are: - .. py:attribute:: Spec.config.name: str User specified unique node name. - .. py:attribute:: Spec.config.rate: float Rate (Hz) at which the :func:`~eagerx.core.entities.Node.callback` is called. - .. py:attribute:: Spec.config.process: int = 0 Process in which the node is launched. See :class:`~eagerx.core.constants.process` for all options. - .. py:attribute:: Spec.config.color: str = grey Specifies the color of logged messages & node color in the GUI. Check-out the termcolor documentation for the supported colors. - .. py:attribute:: Spec.config.print_mode: int = 1 Specifies the different modes for printing: `{1: TERMCOLOR, 2: ROS}`. - .. py:attribute:: Spec.config.log_level: int = 30 Specifies the log level for the engine: `{0: SILENT, 10: DEBUG, 20: INFO, 30: WARN, 40: ERROR, 50: FATAL}` :return: API to get/set parameters. """ return self._lookup("config", unlocked=True) @property def inputs(self) -> SpecView: """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.inputs`. The mutable parameters are: - .. py:attribute:: Spec.inputs.<name>.window: int = 1 A non-negative number that specifies the number of messages to pass to the node's :func:`~eagerx.core.entities.Node.callback`. - *window* = 1: Only the last received input message. - *window* = *x* > 1: The trailing last *x* received input messages. - *window* = 0: All input messages received since the last call to the node's :func:`~eagerx.core.entities.Node.callback`. .. note:: With *window* = 0, the number of input messages may vary and can even be zero. - .. py:attribute:: Spec.inputs.<name>.processor: ProcessorSpec = None A processor that preprocesses the received input message before passing it to the node's :func:`~eagerx.core.entities.Node.callback`. - .. py:attribute:: Spec.inputs.<name>.space: dict = None This space defines the format of valid messages. - .. py:attribute:: Spec.inputs.<name>.delay: float = 0.0 A non-negative simulated delay (seconds). This delay is ignored if :attr:`~eagerx.core.entities.Engine.simulate_delays` = True in the engine's :func:`~eagerx.core.entities.Engine.spec`. - .. py:attribute:: Spec.inputs.<name>.skip: bool = False Skip the dependency on this input during the first call to the node's :func:`~eagerx.core.entities.Node.callback`. May be necessary to ensure that the connected graph is directed and acyclic. :return: API to get/set parameters. """ return self._lookup("inputs") @property def outputs(self) -> SpecView: """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.outputs`. The mutable parameters are: - .. py:attribute:: Spec.outputs.<name>.processor: ProcessorSpec = None A processor that preprocesses the output message, returned by :func:`~eagerx.core.entities.Node.callback`, before publishing it. - .. py:attribute:: Spec.outputs.<name>.space: dict = None This space defines the format of valid messages. :return: API to get/set parameters. """ return self._lookup("outputs") @property def states(self) -> SpecView: """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.states`. The mutable parameters are: - .. py:attribute:: Spec.states.<name>.space: dict = None This space defines the format of valid messages. :return: API to get/set parameters. """ return self._lookup("states") def initialize(self, spec_cls): import eagerx.core.register as register try: params = register.LOOKUP_TYPES[spec_cls.callback] except KeyError: if spec_cls.__name__ == "EnvNode": params = dict() else: raise if "targets" in params: from eagerx.core.entities import ResetNode assert issubclass( spec_cls, ResetNode ), "You can only have targets registered for nodes that inherit from the ResetNode baseclass." add_ft = True else: add_ft = False # Set default components for component, cnames in params.items(): for cname, space in cnames.items(): if component == "outputs": if cname not in self.config.outputs: self.config.outputs.append(cname) mapping = dict( rate="$(config rate)", processor=None, space=space, ) # Add feedthrough entries for each output if node is a reset node (i.e. when it has a target) if add_ft: mapping_ft = dict( delay=0.0, window=1, skip=False, processor=None, space=space, address=None, ) with self.feedthroughs as d: d[cname] = mapping_ft elif component == "inputs": if cname not in self.config.inputs: self.config.inputs.append(cname) space = eagerx.Space(shape=(), dtype="int64") if cname == "tick" else space mapping = dict( delay=0.0, window=1, skip=False, processor=None, space=space, address=None, ) elif component == "targets": if cname not in self.config.targets: self.config.targets.append(cname) mapping = dict( processor=None, space=space, address=None, ) else: if cname not in self.config.states: self.config.states.append(cname) component = "states" mapping = dict( processor=None, space=space, ) with getattr(self, component) as d: d[cname] = mapping def add_input( self, cname: str, window: int = 1, delay: float = 0.0, skip: bool = False, address: str = None, processor: Optional[ProcessorSpec] = None, space: Optional[gym.spaces.Space] = None, ): mapping = dict( window=window, delay=delay, skip=skip, space=space, address=address, processor=processor.params if processor else None, ) with self.inputs as d: d[cname] = mapping def add_output( self, cname: str, processor: Optional[ProcessorSpec] = None, space: Optional[gym.spaces.Space] = None, ): mapping = dict( rate="$(config rate)", space=space, processor=processor.params if processor else None, ) with self.outputs as d: d[cname] = mapping def build(self, ns: str): params = self.params # Creates a deepcopy name = self.config.name entity_id = self.config.entity_id # Replace args in .yaml context = { "ns": {"env_name": ns, "node_name": name}, "config": params["config"], } substitute_args(params, context, only=["config", "ns"]) # Process inputs inputs = [] for cname in self.config.inputs: assert ( cname in params["inputs"] ), f'Received unknown {"input"} "{cname}". Check the spec of "{name}" with entity_id "{entity_id}".' assert ( "targets" not in params or cname not in params["targets"] ), f'Input "{cname}" cannot have the same cname as a target. Change either the input or target cname. Check the spec of "{name}" with entity_id "{entity_id}".' n = RxInput(name=cname, **params["inputs"][cname]) inputs.append(n) # Process outputs outputs = [] for cname in self.config.outputs: msg = f"The rate ({params['outputs'][cname]['rate']} Hz) set for action '{cname}' does not equal the environment rate ({self.config.rate} Hz)." assert params["outputs"][cname]["rate"] == self.config.rate, msg assert ( cname in params["outputs"] ), f'Received unknown {"output"} "{cname}". Check the spec of "{name}" with entity_id "{entity_id}".' if "address" in params["outputs"][cname]: address = params["outputs"][cname].pop("address") else: address = "%s/outputs/%s" % (name, cname) n = RxOutput(name=cname, address=address, **params["outputs"][cname]) outputs.append(n) states = [] for cname in self.config.states: assert ( cname in params["states"] ), f'Received unknown {"state"} "{cname}". Check the spec of "{name}" with entity_id "{entity_id}".' if "address" in params["states"][cname]: # if 'env/supervisor', the state address is pre-defined (like an input) n = RxState(name=cname, **params["states"][cname]) else: address = "%s/states/%s" % (name, cname) n = RxState(name=cname, address=address, **params["states"][cname]) states.append(n) targets = [] if "targets" in self.config: for cname in self.config.targets: assert ( cname in params["targets"] ), f'Received unknown {"target"} "{cname}". Check the spec of "{name}" with entity_id "{entity_id}".' n = RxState(name=cname, **params["targets"][cname]) targets.append(n) feedthroughs = [] if "feedthroughs" in params: assert "targets" in self.config, f'No targets defined for ResetNode "{name}".' assert len(self.config.targets) > 0, f'No targets selected for ResetNode "{name}".' for cname in self.config.outputs: # Add output details to feedthroughs assert ( cname in params["feedthroughs"] ), f'Feedthrough "{cname}" must directly correspond to a selected output. Check the spec of "{name}" with entity_id "{entity_id}".' params["feedthroughs"][cname]["space"] = params["outputs"][cname]["space"] n = RxFeedthrough(feedthrough_to=cname, **params["feedthroughs"][cname]) feedthroughs.append(n) params["outputs"] = [i.build(ns=ns) for i in outputs] params["inputs"] = [i.build(ns=ns) for i in inputs] params["states"] = [i.build(ns=ns) for i in states] params["targets"] = [i.build(ns=ns) for i in targets] params["feedthroughs"] = [i.build(ns=ns) for i in feedthroughs] # Create rate dictionary with outputs chars_ns = len(ns) + 1 rate_dict = dict() for i in params["outputs"]: assert i["rate"] is not None and isinstance(i["rate"], (int, float)) and i["rate"] > 0, ( f'The rate of node "{name}" (and output cname "{i["name"]}") is misspecified: rate="{i["rate"]}". ' 'Make sure that it is of type(rate)=("int", "float",) and rate > 0.' ) address = i["address"][chars_ns:] rate_dict[address] = i["rate"] # {'rate': i['rate']} # Put parameters in node namespace (watch out, order of dict keys probably matters...) node_params = {name: params, "rate": rate_dict} return replace_None(node_params)
[docs]class NodeSpec(BaseNodeSpec): """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the node.""" pass
[docs]class ResetNodeSpec(BaseNodeSpec): """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the node.""" @property def targets(self) -> SpecView: """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.targets`. The mutable parameters are: - .. py:attribute:: Spec.targets.<name>.processor: ProcessorSpec = None A processor that preprocesses the received state message before passing it to the node's :func:`~eagerx.core.entities.ResetNode.callback`. :return: API to get/set parameters. """ return self._lookup("targets") @property def feedthroughs(self) -> SpecView: """Provides an API to set/get the parameters of a feedthrough corresponding to registered :func:`eagerx.core.register.outputs`. The mutable parameters are: - .. py:attribute:: Spec.feedthroughs.<name>.processor: ProcessorSpec = None A processor that preprocesses the received input message before passing it to the node's :func:`~eagerx.core.entities.Node.callback`. - .. py:attribute:: Spec.feedthroughs.<name>.space: dict = None This space defines the format of valid messages. - .. py:attribute:: Spec.feedthroughs.<name>.delay: float = 0.0 A non-negative simulated delay (seconds). This delay is ignored if :attr:`~eagerx.core.entities.Engine.simulate_delays` = True in the engine's :func:`~eagerx.core.entities.Engine.spec`. :return: API to get/set parameters. """ return self._lookup("feedthroughs")
[docs]class ObjectSpec(EntitySpec): """A parameter specification of an object.""" def __init__(self, params): super().__init__(params) def _lookup(self, depth, unlocked=False): name = self._params["config"]["name"] return SpecView(self, depth=[depth], name=name, unlocked=unlocked)
[docs] def gui( self, engine_cls: Type["Engine"], interactive: Optional[bool] = True, resolution: Optional[List[int]] = None, filename: Optional[str] = None, ) -> Union[None, np.ndarray]: """Opens a graphical user interface of the object's engine implementation. .. note:: Requires `eagerx-gui`: .. highlight:: python .. code-block:: python pip3 install eagerx-gui :param engine_cls: The class engine (not instance!) that was used to register the engine implementation (e.g. "PybulletEngine"). :param interactive: If `True`, an interactive application is launched. Otherwise, an RGB render of the GUI is returned. This could be useful when using a headless machine. :param resolution: Specifies the resolution of the returned render when `interactive` is `False`. If `interactive` is `True`, this argument is ignored. :param filename: If provided, the GUI is rendered to an svg file with this name. If `interactive` is `True`, this argument is ignored. :return: RGB render of the GUI if `interactive` is `False`. """ import eagerx.core.register as register spec_copy = ObjectSpec(self.params) spec_copy._params["engine"] = {} engine = engine_cls.get_specification() graph = register.add_engine(spec_copy, engine) return graph.gui(interactive=interactive, resolution=resolution, filename=filename)
@property def engine(self) -> Union[SpecView]: """Provides an API to set/get the parameters of an engine-specific implementation. The mutable parameters are: - Arguments (excluding spec) of the selected engine's :func:`~eagerx.core.entities.Engine.add_object` method. - .. py:attribute:: Spec.engine.states.<name>: EngineState Link an :class:`~eagerx.core.specs.EngineState` to a registered state with :func:`eagerx.core.register.states`. :return: API to get/set parameters. """ return SpecView(self, depth=["engine"], name=self._params["config"]["name"]) @property def sensors(self) -> SpecView: """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.sensors`. The mutable parameters are: - .. py:attribute:: Spec.sensors.<name>.rate: float = 1.0 Rate (Hz) at which the sensor's :func:`~eagerx.core.entities.EngineNode.callback` is called. - .. py:attribute:: Spec.sensors.<name>.space: dict = None This space defines the format of valid messages. :return: API to get/set parameters. """ return self._lookup("sensors") @property def actuators(self) -> SpecView: """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.actuators`. The mutable parameters are: - .. py:attribute:: Spec.actuators.<name>.rate: float = 1.0 Rate (Hz) at which the actuator's :func:`~eagerx.core.entities.EngineNode.callback` is called. - .. py:attribute:: Spec.actuators.<name>.window: int = 1 A non-negative number that specifies the number of messages to pass to the node's :func:`~eagerx.core.entities.EngineNode.callback`. - *window* = 1: Only the last received input message. - *window* = *x* > 1: The trailing last *x* received input messages. - *window* = 0: All input messages received since the last call to the node's :func:`~eagerx.core.entities.EngineNode.callback`. .. note:: With *window* = 0, the number of input messages may vary and can even be zero. - .. py:attribute:: Spec.actuators.<name>.space: dict = None This space defines the format of valid messages. - .. py:attribute:: Spec.actuators.<name>.delay: float = 0.0 A non-negative simulated delay (seconds). This delay is ignored if :attr:`~eagerx.core.entities.Engine.simulate_delays` = True in the engine's :func:`~eagerx.core.entities.Engine.spec`. - .. py:attribute:: Spec.actuators.<name>.skip: bool = False Skip the dependency on this input during the first call to the node's :func:`~eagerx.core.entities.EngineNode.callback`. May be necessary to ensure that the connected graph is directed and acyclic. :return: API to get/set parameters. """ return self._lookup("actuators") @property def states(self) -> SpecView: """Provides an API to set/get the parameters of registered :func:`eagerx.core.register.engine_states`. The mutable parameters are: - .. py:attribute:: Spec.states.<name>.space: dict = None This space defines the format of valid messages. :return: API to get/set parameters. """ return self._lookup("states") @property def config(self) -> SpecView: """Provides an API to set/get the parameters to initialize. The default parameters are: - Additional parameters registered with the :func:`eagerx.core.register.config` decorator. - .. py:attribute:: Spec.config.name: str User specified unique object name. - .. py:attribute:: Spec.config.actuators: list List with selected actuators. Must be a subset of the registered :func:`eagerx.core.register.actuators`. - .. py:attribute:: Spec.config.sensors: list List with selected sensors. Must be a subset of the registered :func:`eagerx.core.register.sensors`. - .. py:attribute:: Spec.config.states: list List with selected engine_states. Must be a subset of the registered :func:`eagerx.core.register.engine_states`. :return: API to get/set parameters. """ return self._lookup("config", unlocked=True) def initialize(self, spec_cls): import eagerx.core.register as register agnostic = register.LOOKUP_TYPES[spec_cls.make] # Set default components for component, cnames in agnostic.items(): for cname, space in cnames.items(): if component == "sensors": mapping = dict( rate=1, processor=None, space=space, ) elif component == "actuators": mapping = dict( rate=1, delay=0.0, window=1, skip=False, processor=None, space=space, ) else: component = "states" mapping = dict( processor=None, space=space, ) with getattr(self, component) as d: d[cname] = mapping # Select component per default if cname not in getattr(self.config, component): getattr(self.config, component).append(cname) def _initialize_object_graph(self): mapping = dict() for component in ["sensors", "actuators"]: try: mapping[component] = getattr(self, component) except AttributeError: continue from eagerx.core.graph_engine import EngineGraph graph = EngineGraph.create(**mapping) return graph
[docs]class EngineSpec(BaseNodeSpec): """A parameter specification that specifies how :class:`~eagerx.core.env.BaseEnv` should initialize the engine.""" @property def config(self) -> SpecView: """Provides an API to set/get the parameters to initialize. The default parameters are: - .. py:attribute:: Spec.config.rate: float Rate (Hz) at which the :func:`~eagerx.core.entities.Engine.callback` is called. - .. py:attribute:: Spec.config.process: int = 0 Process in which the engine is launched. See :class:`~eagerx.core.constants.process` for all options. - .. py:attribute:: Spec.config.sync: bool = True Flag that specifies whether we run reactive or asynchronous. - .. py:attribute:: Spec.config.real_time_factor: float = 0 A specified upper bound on the real-time factor. `Wall-clock-rate`=`real_time_factor`*`rate`. If `real_time_factor` < 1 the simulation is slower than real time. - .. py:attribute:: Spec.config.simulate_delays: bool = True Flag that specifies whether input delays are simulated. You probably want to set this to `False` when running in the real-world. - .. py:attribute:: Spec.config.color: str = grey Specifies the color of logged messages. Check-out the termcolor documentation for the supported colors. - .. py:attribute:: Spec.config.print_mode: int = 1 Specifies the different modes for printing: `{1: TERMCOLOR, 2: ROS}`. - .. py:attribute:: Spec.config.log_level: int = 30 Specifies the log level for the engine: `{0: SILENT, 10: DEBUG, 20: INFO, 30: WARN, 40: ERROR, 50: FATAL}`. :return: API to get/set parameters. """ return self._lookup("config", unlocked=True) @property def objects(self) -> SpecView: """Provides an API to set/get the parameters to add an object to the engine. To add a new object, please use :func:`~eagerx.core.specs.Enginespec.add_object`. Arguments correspond to the signature of :func:`~eagerx.core.entities.Engine.add_object`. :return: API to get/set parameters. """ return self._lookup("objects", unlocked=True)
[docs] def add_object( self, name: str, **kwargs: Union[bool, int, float, str, List, Dict], ) -> None: """Adds an object to the simulator that is interfaced by the engine. :param kwargs: Other arguments of :func:`~eagerx.core.entities.Engine.add_object`. """ # todo: check arguments of engine_spec.add_object(...) are arguments of engine.add_object(...). with self.objects as d: assert name not in d, f"There is already an object called `{name}` added. Names must be unique." d[name] = dict(engine_states=dict(), nodes=dict(), add_object=dict()) with d[name].add_object as s: s["name"] = name s.update(kwargs)
def build(self, ns: str): params = super().build(ns) # Convert engine states for name, obj in params[self.config.name]["objects"].items(): for cname, view in obj["engine_states"].items(): address = f"{name}/states/{cname}" processor = view["processor"] if view["processor"] != "null" else None spec = RxEngineState(cname, address, view["state"], processor, view["space"]) obj["engine_states"][cname] = spec.build(ns=ns) return replace_None(params) def _add_engine_states(self, name: str, spec: ObjectSpec): # Pop states that were not implemented. assert name in self.objects, f"There is no Object called `{name}' in engine.objects. First add the Object." states = spec.engine.states for cname in list(states.keys()): if cname in spec.config.states: if states[cname] is not None: self._add_engine_state( name, cname, states[cname], spec.states[cname]["space"], spec.states[cname]["processor"] ) else: warn( f"Engine state `{cname}` for object `{name}` will be ignored. " f"There is no implementation provided for it in Engine `{self.config.entity_id}`." ) def _add_engine_state(self, name, cname, engine_state, space, processor=None): with self.objects[name].engine_states as s: s[cname] = dict(state=None, space=None, processor=None) s[cname]["state"] = engine_state s[cname]["space"] = space s[cname]["processor"] = processor def _initialize_engine_config(self, spec, engine_config): # Add default config with spec.engine as d: d.update(engine_config) d["name"] = spec.config.name d["states"] = {} # Add all states to engine-specific params with d.states as s: for cname in spec.states.keys(): s[cname] = None def _register_object(self, spec: ObjectSpec) -> "EngineGraph": spec = copy.deepcopy(spec) # Construct context & replace placeholders context = {"config": spec.config.to_dict()} substitute_args(spec._params["config"], context, only=["config"]) # First resolve args within the context substitute_args(spec._params, context, only=["config"]) # Resolve rest of params # Add engine entry import eagerx.core.register as register spec._params["engine"] = {} return register.add_engine(spec, self)
# REQUIRED FOR BUILDING SPECS class Component(object): def __init__(self, **kwargs): # Iterates over provided arguments and sets the provided arguments as class properties for key, value in kwargs.items(): if key == "__class__": continue # Skip if __class__ type setattr(self, key, value) class RxInput(Component): def __init__( self, name: str, address: str, window: int = 0, processor: Dict = None, space: Dict = None, delay: float = 0.0, skip: bool = False, dtype: str = None, ): # Store parameters as properties in baseclass # IMPORTANT! Do not define variables locally you do **not** want to store # on the parameter server anywhere before calling the baseclass' constructor. kwargs = locals().copy() kwargs.pop("self") super(RxInput, self).__init__(**kwargs) # Calculate other parameters based on previously defined attributes. # Error check the parameters here. def build(self, ns=""): params = self.__dict__.copy() params["address"] = "/".join(filter(None, [ns, params["address"]])) # Set dtype if not already set by source if params["dtype"] is None: params["dtype"] = params["space"]["dtype"] return params class RxOutput(Component): def __init__( self, name: str, address: str, rate: float, processor: Dict = None, space: Dict = None, ): # Store parameters as properties in baseclass # IMPORTANT! Do not define variables locally you do **not** want to store # on the parameter server anywhere before calling the baseclass' constructor. kwargs = locals().copy() kwargs.pop("self") super(RxOutput, self).__init__(**kwargs) # Calculate other parameters based on previously defined attributes. # Error check the parameters here. def build(self, ns=""): params = self.__dict__.copy() params["address"] = "/".join(filter(None, [ns, params["address"]])) params["dtype"] = params["space"]["dtype"] return params class RxFeedthrough(Component): def __init__( self, address: str, feedthrough_to: str, window: int = 1, processor: Dict = None, space: Dict = None, delay: float = 0.0, skip: bool = False, dtype: str = None, ): # Store parameters as properties in baseclass # IMPORTANT! Do not define variables locally you do **not** want to store # on the parameter server anywhere before calling the baseclass' constructor. kwargs = locals().copy() kwargs.pop("self") super(RxFeedthrough, self).__init__(**kwargs) # Calculate other parameters based on previously defined attributes. # Error check the parameters here. def build(self, ns=""): params = self.__dict__.copy() params["address"] = "/".join(filter(None, [ns, params["address"]])) # Set dtype if not already set by source if params["dtype"] is None: params["dtype"] = params["space"]["dtype"] return params class RxState(Component): def __init__( self, name: str, address: str, space: Dict, processor: Dict = None, dtype: str = None, ): # Store parameters as properties in baseclass # IMPORTANT! Do not define variables locally you do **not** want to store # on the parameter server anywhere before calling the baseclass' constructor. kwargs = locals().copy() kwargs.pop("self") super(RxState, self).__init__(**kwargs) # Calculate other parameters based on previously defined attributes. # Error check the parameters here. def build(self, ns=""): params = self.__dict__.copy() params["address"] = "/".join(filter(None, [ns, params["address"]])) # Set dtype if not already set by source if params["dtype"] is None: params["dtype"] = params["space"]["dtype"] return params class RxEngineState(Component): def __init__( self, name: str, address: str, state: Dict, processor: Dict = None, space: Dict = None, ): # Store parameters as properties in baseclass # IMPORTANT! Do not define variables locally you do **not** want to store # on the parameter server anywhere before calling the baseclass' constructor. kwargs = locals().copy() kwargs.pop("self") super(RxEngineState, self).__init__(**kwargs) # Calculate other parameters based on previously defined attributes. # Error check the parameters here. def build(self, ns=""): params = self.__dict__.copy() params["address"] = "/".join(filter(None, [ns, params["address"]])) params["dtype"] = params["space"]["dtype"] return params