Space Converter
In this section we will discuss the concept of a SpaceConverter.
A space converter can be used to create Openai Gym Spaces for messages and define how we can convert them from and to a numpy.ndarray, which is the default data type in OpenAI Gym.
In this section we will go through the process of creating the Space_AngleDecomposition, which will allow to convert a Float32MultiArray to a numpy.ndarray.
At the same time, we will decompose one of the entries of the Float32MultiArray into a sine and cosine component.
This space converter can be used when dealing with angular positions, since learning on the sine and cosine is often more efficient due to the discontinuities in the angular position.
The SpaceConverter base class has two class variables:
MSG_TYPE_AMSG_TYPE_B
and has 5 abstract methods:
spec()initialize()get_space()A_to_B()B_to_A()
MSG_TYPE_A and MSG_TYPE_B
The class variables MSG_TYPE_A and MSG_TYPE_B specify the two message types that will be converted from one into the other.
For the Gym space, we need an numpy.ndarray, so MSG_TYPE_A will be of this type.
The second message type will be a Float32MultiArray, since this ROS message can be used for multidimensional data communication over ROS topics.
# ROS IMPORTS
from std_msgs.msg import Float32MultiArray
# RX IMPORTS
import eagerx.core.register as register
from eagerx import Processor, SpaceConverter
from eagerx.core.specs import ProcessorSpec
import numpy as np
from gym.spaces import Box
class Space_AngleDecomposition(SpaceConverter):
MSG_TYPE_A = np.ndarray
MSG_TYPE_B = Float32MultiArray
spec
The spec() method can be used to specify with which arguments the SpaceConverter will be initialized.
In our case, we add low, high and dtype to the config.
@staticmethod
@register.spec("Space_AngleDecomposition", SpaceConverter)
def spec(spec: ProcessorSpec, low=None, high=None, dtype="float32"):
spec.config.update(low=low, high=high, dtype=dtype)
Note
Mind the use of the spec() decorator.
initialize
Next, we implement the initialize() method.
Here, the arguments are the ones we have just defined in the spec() method: low, high and dtype.
def initialize(self, low=None, high=None, dtype="float32"):
self.low = np.array(low, dtype=dtype)
self.high = np.array(high, dtype=dtype)
self.dtype = dtype
get_space
The get_space() method should be used to define the Gym space.
def get_space(self):
return Box(self.low, self.high, dtype=self.dtype)
A_to_B
The A_to_B() method takes as an argument a message of type MSG_TYPE_A and converts it into MSG_TYPE_B.
def A_to_B(self, msg):
return Float32MultiArray(data=msg)
B_to_A
The B_to_A() method takes as an argument a message of type MSG_TYPE_B and converts it into MSG_TYPE_A.
In our case, we also decompose the angle here, which will be the first entry of the array.
def B_to_A(self, msg):
angle = msg.data[0]
return np.concatenate(([np.sin(angle), np.cos(angle)], msg.data[1:]), axis=0)
make
In order to use this SpaceConverter, the user should call the make() method with the arguments of the spec() method.