Space Converter

In this section we will discuss the concept of a SpaceConverter. A space converter can be used to create Openai Gym Spaces for messages and define how we can convert them from and to a numpy.ndarray, which is the default data type in OpenAI Gym. In this section we will go through the process of creating the Space_AngleDecomposition, which will allow to convert a Float32MultiArray to a numpy.ndarray. At the same time, we will decompose one of the entries of the Float32MultiArray into a sine and cosine component. This space converter can be used when dealing with angular positions, since learning on the sine and cosine is often more efficient due to the discontinuities in the angular position.

The SpaceConverter base class has two class variables:

  • MSG_TYPE_A

  • MSG_TYPE_B

and has 5 abstract methods:

  • spec()

  • initialize()

  • get_space()

  • A_to_B()

  • B_to_A()

Full code is available here.

MSG_TYPE_A and MSG_TYPE_B

The class variables MSG_TYPE_A and MSG_TYPE_B specify the two message types that will be converted from one into the other. For the Gym space, we need an numpy.ndarray, so MSG_TYPE_A will be of this type. The second message type will be a Float32MultiArray, since this ROS message can be used for multidimensional data communication over ROS topics.

# ROS IMPORTS
from std_msgs.msg import Float32MultiArray

# RX IMPORTS
import eagerx.core.register as register
from eagerx import Processor, SpaceConverter
from eagerx.core.specs import ProcessorSpec
import numpy as np
from gym.spaces import Box


class Space_AngleDecomposition(SpaceConverter):
  MSG_TYPE_A = np.ndarray
  MSG_TYPE_B = Float32MultiArray

spec

The spec() method can be used to specify with which arguments the SpaceConverter will be initialized. In our case, we add low, high and dtype to the config.

@staticmethod
@register.spec("Space_AngleDecomposition", SpaceConverter)
def spec(spec: ProcessorSpec, low=None, high=None, dtype="float32"):
    spec.config.update(low=low, high=high, dtype=dtype)

Note

Mind the use of the spec() decorator.

initialize

Next, we implement the initialize() method. Here, the arguments are the ones we have just defined in the spec() method: low, high and dtype.

def initialize(self, low=None, high=None, dtype="float32"):
    self.low = np.array(low, dtype=dtype)
    self.high = np.array(high, dtype=dtype)
    self.dtype = dtype

get_space

The get_space() method should be used to define the Gym space.

def get_space(self):
    return Box(self.low, self.high, dtype=self.dtype)

A_to_B

The A_to_B() method takes as an argument a message of type MSG_TYPE_A and converts it into MSG_TYPE_B.

def A_to_B(self, msg):
  return Float32MultiArray(data=msg)

B_to_A

The B_to_A() method takes as an argument a message of type MSG_TYPE_B and converts it into MSG_TYPE_A. In our case, we also decompose the angle here, which will be the first entry of the array.

def B_to_A(self, msg):
    angle = msg.data[0]
    return np.concatenate(([np.sin(angle), np.cos(angle)], msg.data[1:]), axis=0)

make

In order to use this SpaceConverter, the user should call the make() method with the arguments of the spec() method.