Node
In this section, we will discuss the concept of a Node.
A node can be used to process data at a desired rate.
This could for example be a classifier to detect objects in an image or a PID controller that reduces a control error.
Here, we will go through the process of creating such a Node.
We will create the ButterworthFilter Node, which can be used to filter signals.
The Node base class has four abstract methods we need to implement:
spec
In this section we will discuss the concept of a Node.
It can be added to a Graph and is engine-agnostic.
spec
Here we define the specification of the ButterworthFilter.
Since we will make use of the Butterworth filter implementation from scipy, we want to initialize the node with the arguments of this implementation.
Because the signature of the initialize() is defined within the spec() method, we add the parameters N, Wn and btype to the config.
These are the order of the filter, the critical frequency of the filter and the filter type, respectively.
Furthermore, we add a converter to the NodeSpec of the ButterworthFilter, since we want to apply this filter on a scalar signal, while the input to the filter might be multidimensional.
Therefore, we make use of the GetIndex_Float32MultiArray Processor, which selects the entry of a Float32MultiArray.
Finally, we will set a SpaceConverter, such that we can directly connect() the ButterworthFilter to an action without having to define the OpenAI Gym Space every time.
from typing import Optional
from scipy.signal import butter, sosfilt
# IMPORT ROS
from std_msgs.msg import Float32MultiArray
# IMPORT EAGERX
import eagerx.core.register as register
from eagerx.utils.utils import Msg
from eagerx.core.entities import Node, Processor, SpaceConverter
from eagerx.core.constants import process
class ButterworthFilter(Node):
@staticmethod
@register.spec("ButterworthFilter", Node)
def spec(
spec,
name: str,
rate: float,
index: int = 0,
N: int = 2,
Wn: float = 1,
btype: str = "lowpass",
process: Optional[int] = process.NEW_PROCESS,
color: Optional[str] = "grey",
):
# Modify default node params
spec.config.name = name
spec.config.rate = rate
spec.config.process = process
spec.config.color = color
spec.config.inputs = ["signal"]
spec.config.outputs = ["filtered"]
# Modify custom node params
spec.config.N = N # The order of the filter.
spec.config.Wn = Wn # The critical frequency or frequencies.
spec.config.btype = btype # {‘lowpass’, ‘highpass’, ‘bandpass’, ‘bandstop’} The type of filter. Default is ‘lowpass’.
# Add converter & space_converter
spec.inputs.signal.window = "$(config N)"
spec.inputs.signal.converter = Processor.make("GetIndex_Float32MultiArray", index=index)
spec.inputs.signal.space_converter = SpaceConverter.make("Space_Float32MultiArray", [-3], [3], dtype="float32")
Note
Mind the usage of the spec() decorator.
This specifies the ID of the Node.
Also, mind the way the window is set.
Here we specify that the window size is equal to the parameter N, which is the order of the filter.
The syntax $(config [parameter_name]) allows to use a parameter as variable for setting another parameter.
initialize
Within the initialize() method, we will initialize the filter.
def initialize(self, N, Wn, btype):
for i in self.inputs:
if i["name"] == "signal":
assert (
int(i["window"]) >= N
), "The window size of the signal {} is too small to create a filter with order {}.".format(i["window"], N)
self.filter = butter(N, Wn, btype, output="sos", fs=self.rate)
self.N = N
Note
Mind that the signature of the initialize() method is specified by adding parameters to config wihtin spec().
reset
The reset() method is called by the user at the beginning of an episode.
Here the state of the Node can be reset.
However, in our case this is not needed.
@register.states()
def reset(self):
pass
callback
The callback() method is called with at the rate of the Node.
This is were the actual signal processing takes place.
@register.inputs(signal=Float32MultiArray)
@register.outputs(filtered=Float32MultiArray)
def callback(self, t_n: float, signal: Optional[Msg] = None):
msgs = signal.msgs
# Only apply filtering if we have received enough messages (more than the order of the filter)
if len(msgs) >= self.N:
unfiltered = [msgs[i].data[0] for i in range(-self.N, 0)]
filtered = msgs[-1].data if None in unfiltered else [sosfilt(self.filter, unfiltered)[-1]]
# If we haven't received enough messages, no filtering is applied
elif len(msgs) > 0:
filtered = msgs[-1].data
# If no messages were received, return 0.0
else:
filtered = [0.0]
return dict(filtered=Float32MultiArray(data=filtered))