Node

In this section, we will discuss the concept of a Node. A node can be used to process data at a desired rate. This could for example be a classifier to detect objects in an image or a PID controller that reduces a control error. Here, we will go through the process of creating such a Node. We will create the ButterworthFilter Node, which can be used to filter signals.

The Node base class has four abstract methods we need to implement:

Full code is available here.

alternate text

In this section we will discuss the concept of a Node. It can be added to a Graph and is engine-agnostic.

spec

Here we define the specification of the ButterworthFilter. Since we will make use of the Butterworth filter implementation from scipy, we want to initialize the node with the arguments of this implementation. Because the signature of the initialize() is defined within the spec() method, we add the parameters N, Wn and btype to the config. These are the order of the filter, the critical frequency of the filter and the filter type, respectively. Furthermore, we add a converter to the NodeSpec of the ButterworthFilter, since we want to apply this filter on a scalar signal, while the input to the filter might be multidimensional. Therefore, we make use of the GetIndex_Float32MultiArray Processor, which selects the entry of a Float32MultiArray. Finally, we will set a SpaceConverter, such that we can directly connect() the ButterworthFilter to an action without having to define the OpenAI Gym Space every time.

from typing import Optional
from scipy.signal import butter, sosfilt

# IMPORT ROS
from std_msgs.msg import Float32MultiArray

# IMPORT EAGERX
import eagerx.core.register as register
from eagerx.utils.utils import Msg
from eagerx.core.entities import Node, Processor, SpaceConverter
from eagerx.core.constants import process


class ButterworthFilter(Node):
    @staticmethod
    @register.spec("ButterworthFilter", Node)
    def spec(
        spec,
        name: str,
        rate: float,
        index: int = 0,
        N: int = 2,
        Wn: float = 1,
        btype: str = "lowpass",
        process: Optional[int] = process.NEW_PROCESS,
        color: Optional[str] = "grey",
    ):
        # Modify default node params
        spec.config.name = name
        spec.config.rate = rate
        spec.config.process = process
        spec.config.color = color
        spec.config.inputs = ["signal"]
        spec.config.outputs = ["filtered"]

        # Modify custom node params
        spec.config.N = N  # The order of the filter.
        spec.config.Wn = Wn  # The critical frequency or frequencies.
        spec.config.btype = btype  # {‘lowpass’, ‘highpass’, ‘bandpass’, ‘bandstop’} The type of filter. Default is ‘lowpass’.

        # Add converter & space_converter
        spec.inputs.signal.window = "$(config N)"
        spec.inputs.signal.converter = Processor.make("GetIndex_Float32MultiArray", index=index)
        spec.inputs.signal.space_converter = SpaceConverter.make("Space_Float32MultiArray", [-3], [3], dtype="float32")

Note

Mind the usage of the spec() decorator. This specifies the ID of the Node. Also, mind the way the window is set. Here we specify that the window size is equal to the parameter N, which is the order of the filter. The syntax $(config [parameter_name]) allows to use a parameter as variable for setting another parameter.

initialize

Within the initialize() method, we will initialize the filter.

def initialize(self, N, Wn, btype):
    for i in self.inputs:
        if i["name"] == "signal":
            assert (
                int(i["window"]) >= N
            ), "The window size of the signal {} is too small to create a filter with order {}.".format(i["window"], N)
    self.filter = butter(N, Wn, btype, output="sos", fs=self.rate)
    self.N = N

Note

Mind that the signature of the initialize() method is specified by adding parameters to config wihtin spec().

reset

The reset() method is called by the user at the beginning of an episode. Here the state of the Node can be reset. However, in our case this is not needed.

@register.states()
def reset(self):
  pass

Note

Mind the usage of the states() decorator. If the Node would have had a state that should be reset, it should be registered here. We leave it empty because there is no state to reset.

callback

The callback() method is called with at the rate of the Node. This is were the actual signal processing takes place.

@register.inputs(signal=Float32MultiArray)
@register.outputs(filtered=Float32MultiArray)
def callback(self, t_n: float, signal: Optional[Msg] = None):
  msgs = signal.msgs

  # Only apply filtering if we have received enough messages (more than the order of the filter)
  if len(msgs) >= self.N:
      unfiltered = [msgs[i].data[0] for i in range(-self.N, 0)]
      filtered = msgs[-1].data if None in unfiltered else [sosfilt(self.filter, unfiltered)[-1]]
  # If we haven't received enough messages, no filtering is applied
  elif len(msgs) > 0:
      filtered = msgs[-1].data
  # If no messages were received, return 0.0
  else:
      filtered = [0.0]
  return dict(filtered=Float32MultiArray(data=filtered))

Note

Mind the usage of the inputs() and outputs() decorators. These register the inputs inputs and outputs of the Node and their message types. Also, note that the callback() method has the t_n argument, which is the time passed (seconds) since last reset.