from __future__ import division, print_function, absolute_import
import copy
import math
import numpy as np
from .distrib import ProbabilityDistribution
[docs]class Experience(object):
"""Experience base class.
Representation of an experience occurring from acting in the environment.
Parameters
----------
state : State
The representation of the current state.
action : Action
The executed action.
next_state : State
The representation of the state following from acting
with `action` in state `state`.
reward : int or float
The reward awarded by the environment for the state-action
pair.
Attributes
----------
state : State
The experienced state
action : Action
The experienced action.
next_state : State
The experienced next state.
reward : float
The experienced reward.
"""
__slots__ = ('state', 'action', 'next_state', 'reward')
def __init__(self, state, action, next_state, reward=None):
self.state = state
self.action = action
self.next_state = next_state
self.reward = reward
def __str__(self):
s = "state={0} act={1} next_state={2}".format(self.state, self.action, self.next_state) if self.reward else \
"state={0} act={1} reward={2:.2f} next_state={3}".format(
self.state, self.action, self.reward, self.next_state)
return s
[docs]class RewardFunction(object):
"""The reward function.
The reward function is responsible for calculating the proper value
of the reward. Callback functions can be specified for custom calculation
of the reward value.
Attributes
----------
cb_get : callable
Callback function to retrieve the reward value.
cb_set : callable
Callback function to set the reward value.
reward : float
The reward value.
bonus
rmax : float
The maximum possible reward.
activate_bonus : bool
Flag activating/deactivating the bonus.
Notes
-----
To ensure that the correct value of the reward is being accessed,
the user should not access the class variables directly but instead
use the methods :meth:`set` and :meth:`get` to set and get the reward
respectively.
Examples
--------
>>> RewardFunction.cb_get = staticmethod(lambda r, s: np.dot(s, RewardFunction.reward))
In this cas the reward function is calculated by taking the dot product
of the stored reward and a passed in value.
>>> RewardFunction.reward = [0.1, 0.9. 1.0, 0.0]
This sets the reward for all instances of the reward function.
>>> reward_func = RewardFunction()
>>> print reward_func.get([0.9, 0.5, 0.0, 1.0])
0.54
This calculates the reward `r` according to previously defined the
callback function.
"""
__slots__ = ('_bonus', 'activate_bonus', 'reward', 'rmax', 'cb_get', 'cb_set',)
cb_get = None
cb_set = None
reward = 0.0
rmax = 0.0
activate_bonus = False
@property
def bonus(self):
"""The bonus added to the reward to encourage exploration.
Returns
-------
float :
The bonus added to the reward.
"""
return self._bonus
@bonus.setter
def bonus(self, value):
self._bonus = value
def __init__(self):
self._bonus = 0.0
""":type: float"""
def __getstate__(self):
return {
'reward': self.reward,
'rmax': self.rmax,
'bonus': self.bonus,
'activate_bonus': self.activate_bonus
}
def __setstate__(self, d):
for name, value in d.iteritems():
if not name == 'bonus':
setattr(type(self), name, value)
else:
setattr(self, name, value)
[docs] def set(self, value, *args, **kwargs):
"""Set the reward value.
If :meth:`cb_set` is set, the callback will be called
to set the value.
Parameters
----------
args : tuple
Positional arguments passed to the callback.
kwargs : dict
Non-positional arguments passed to the callback.
"""
if self.cb_set is not None:
type(self).reward = self.cb_set(*args, **kwargs)
return
type(self).reward = value
[docs] def get(self, *args, **kwargs):
"""Retrieve the reward value.
If :meth:`cb_get` is set, the callback will be called
to retrieve the value.
Parameters
----------
args : tuple
Positional arguments passed to the callback.
kwargs : dict
Non-positional arguments passed to the callback.
Returns
-------
float :
The (calculated) reward value.
"""
reward = self.reward
if self.cb_get is not None:
reward = self.cb_get(self.reward, *args, **kwargs)
if self.activate_bonus:
reward = max(self.reward + self.bonus, self.rmax)
return reward
[docs]class StateActionInfo(object):
"""The models interface.
Contains all relevant information predicted by a model for a
given state-action pair. This includes the (predicted) reward and
transition probabilities to possible next states.
Attributes
----------
transition_proba : ProbabilityDistribution
The transition probability distribution.
reward_func : RewardFunction
The reward function.
visits : int
The number of times the state-action pair has been visited.
known : bool
Flag indicating whether a reward value is known or not.
"""
__slots__ = ('transition_proba', 'reward_func', 'visits', 'known')
def __init__(self):
self.transition_proba = ProbabilityDistribution()
self.reward_func = RewardFunction()
self.visits = 0
self.known = False
def __getstate__(self):
data = {}
for name in self.__slots__:
data[name] = getattr(self, name)
return data
def __setstate__(self, d):
for name, value in d.iteritems():
setattr(self, name, value)
[docs]class StateData(object):
"""State information interface.
Information about the state can be accessed here.
Parameters
----------
state_id : int
The unique id of the state
actions : list[Action]
List of actions that can be taken in this state.
Attributes
----------
id : int
The unique id of the state.
models : dict
The reward and transition models for each action.
q : dict
The q-table, containing a q-value for each action.
steps_away : int
The number of steps the state is away from its closest neighbor.
"""
__slots__ = ('id', 'models', 'q', 'steps_away')
def __init__(self, state_id, actions):
self.id = state_id
""":type: int"""
self.models = {a: StateActionInfo() for a in actions}
""":type: dict[Action, StateActionInfo]"""
# Randomizing the initial q-values impedes performance
# self.q = {a: ((0.01 - 0.0) * np.random.random() + 0.0) for a in actions}
self.q = {a: 0.0 for a in actions}
""":type: dict[Action, float]"""
self.steps_away = 100000
""":type: int"""
def __getstate__(self):
data = {}
for name in self.__slots__:
data[name] = getattr(self, name)
return data
def __setstate__(self, d):
for name, value in d.iteritems():
setattr(self, name, value)
[docs]class MDPPrimitive(object):
"""A Markov decision process primitive.
The base class for :class:`State` and :class:`Action`. Primitives
are represented by a list of features. They optionally can have a `name`.
Parameters
----------
features : array_like, shape (`nfeatures`,)
List of features, where `nfeatures` is the number of features
identifying the primitive.
name : str, optional
The name of the primitive. Default is "".
Attributes
----------
name
dtype : {DTYPE_FLOAT, DTYPE_INT, DTYPE_OBJECT}
The type of the features.
nfeatures : int
The number of features.
discretized : bool
Flag indicating whether the features are discretized or not.
min_features : list
The minimum value for each feature.
max_features : list
The minimum value for each feature.
states_per_dim : list
The number of states per dimension.
description : dict
A description of the features.
Raises
------
ValueError
If the feature array is not one-dimensional.
Notes
-----
Use the `description` to encode action information. The information
should contain the list of all available feature combinations, the
name of each feature.
:Examples:
A description of an action with three possible discrete actions:
::
{
"out": {"value": [-0.004]},
"in": {"value": [0.004]},
"kick": {"value": [-1.0]}
}
A description of an action with one possible continuous action with
name `move`, a value of `*` allows to find the action for every
feature array. Additional information encodes the feature name together
with its index into the feature array are given for each higher level
element of feature array:
::
{
"move": {
"value": "*",
"descr": {
"LArm": {"dx": 0, "dy": 1, "dz": 2},
"RArm": {"dx": 3, "dy": 4, "dz": 5},
"LLeg": {"dx": 6, "dy": 7, "dz": 8},
"RLeg": {"dx": 9, "dy": 10, "dz": 11},
"Torso": {"dx": 12, "dy": 13, "dz": 14}
}
}
}
Similarly, a continuous state can be encoded as follows, which identifies
the name of each feature together with its index into the feature array:
::
{
"LArm": {"x": 0, "y": 1, "z": 2},
"RArm": {"x": 3, "y": 4, "z": 5},
"LLeg": {"x": 6, "y": 7, "z": 8},
"RLeg": {"x": 9, "y": 10, "z": 11},
"Torso": {"x": 12, "y": 13, "z": 14}
}
A discrete state can be encoded by identifying the position of each feature:
::
{
"image x-position": 0,
"displacement (mm)": 1
}
Alternatively, the feature can be identified by a list of features, giving he
positional description:
::
["image x-position", "displacement (mm)"]
Rather then setting the attributes directly, use the methods :meth:`set_nfeatures`,
:meth:`set_dtype`, :meth:`set_description`, :meth:`set_discretized`, :meth:`set_minmax_features`,
and :meth:`set_states_per_dim` in order to enforce type checking.
"""
__slots__ = ('dtype', 'nfeatures', 'description', 'discretized', 'min_features', 'max_features',
'states_per_dim', '_features', '_name', 'ix')
DTYPE_OBJECT = np.object
DTYPE_FLOAT = np.float64
DTYPE_INT = np.int32
dtype = DTYPE_FLOAT
nfeatures = None
description = None
discretized = False
min_features = None
max_features = None
states_per_dim = None
@property
def name(self):
"""The name of the MDP primitive.
Returns
-------
str :
The name of the primitive.
"""
return self._name
@classmethod
[docs] def set_nfeatures(cls, n):
"""Set the number of features.
Parameters
----------
n : int
The number of features.
Raises
------
ValueError
If `n` is not of type integer.
"""
if not isinstance(n, int):
raise ValueError("Attribute 'nfeatures' must be of <type 'int'>, got %s" % str(type(n)))
cls.nfeatures = n
@classmethod
[docs] def set_dtype(cls, value=DTYPE_FLOAT):
"""Set the feature's data type.
Parameters
----------
value : {DTYPE_FLOAT, DTYPE_INT, DTYPE_OBJECT}
The data type.
Raises
------
ValueError
If the data type is not one of the allowed types.
"""
if value not in [np.float64, np.int32, np.object]:
raise ValueError("Attribute 'dtype' must be one of the allowed types, got %s" % str(type(value)))
cls.dtype = value
@classmethod
[docs] def set_description(cls, descr):
"""Set the feature description.
This extracts the number of features from the description and checks
that it matches with the `nfeatures`. If `nfeatures` is None, `nfeatures`
is set to the extracted value.
Parameters
----------
descr : dict
The feature description.
Raises
------
ValueError
If the number of features extracted from the description does not
match `nfeatures` or if `name` isn't of type string.
Notes
-----
Use the `description` to encode action information. The information
should contain the list of all available feature combinations, the
name of each feature.
Examples
--------
A description of an action with three possible discrete actions:
::
{
"out": {"value": [-0.004]},
"in": {"value": [0.004]},
"kick": {"value": [-1.0]}
}
A description of an action with one possible continuous action with
name `move`, a value of `*` allows to find the action for every
feature array. Additional information encodes the feature name together
with its index into the feature array are given for each higher level
element of feature array:
::
{
"move": {
"value": "*",
"descr": {
"LArm": {"dx": 0, "dy": 1, "dz": 2},
"RArm": {"dx": 3, "dy": 4, "dz": 5},
"LLeg": {"dx": 6, "dy": 7, "dz": 8},
"RLeg": {"dx": 9, "dy": 10, "dz": 11},
"Torso": {"dx": 12, "dy": 13, "dz": 14}
}
}
}
Similarly, a continuous state can be encoded as follows, which identifies
the name of each feature together with its index into the feature array:
::
{
"LArm": {"x": 0, "y": 1, "z": 2},
"RArm": {"x": 3, "y": 4, "z": 5},
"LLeg": {"x": 6, "y": 7, "z": 8},
"RLeg": {"x": 9, "y": 10, "z": 11},
"Torso": {"x": 12, "y": 13, "z": 14}
}
A discrete state can be encoded by identifying the position of each feature:
::
"descr": {
"image x-position": 0,
"displacement (mm)": 1
}
Alternatively, the feature can be identified by a list of features, giving he
positional description:
::
["image x-position", "displacement (mm)"]
"""
nfeatures = None
if isinstance(descr, dict):
config = descr.itervalues().next()
if 'descr' in config:
nfeatures = sum(len(v) for v in config['descr'].itervalues())
if cls.nfeatures is not None and not cls.nfeatures == nfeatures:
raise ValueError("Dimension mismatch: array described by 'descr' is a vector of length %d,"
" but attribute cls.nfeatures = %d" % (nfeatures, cls.nfeatures))
elif 'value' in config and not config['value'] == '*':
nfeatures = len(config['value'])
if cls.nfeatures is not None and not cls.nfeatures == nfeatures:
raise ValueError("Dimension mismatch: array described by 'value' is a vector of length %d,"
" but attribute cls.nfeatures = %d" % (nfeatures, cls.nfeatures))
else:
nfeatures = sum(len(v) for v in descr.itervalues())
if cls.nfeatures is not None and not cls.nfeatures == nfeatures:
raise ValueError("Dimension mismatch: 'descr' is a vector of length %d,"
" but attribute cls.nfeatures = %d" % (nfeatures, cls.nfeatures))
elif isinstance(descr, list):
nfeatures = len(descr)
if cls.nfeatures is not None and not cls.nfeatures == nfeatures:
raise ValueError("Dimension mismatch: 'descr' is a vector of length %d,"
" but attribute cls.nfeatures = %d" % (nfeatures, cls.nfeatures))
if cls.nfeatures is None:
cls.nfeatures = nfeatures
cls.description = descr
@classmethod
[docs] def set_discretized(cls, val=False):
"""Sets the `discretized` flag.
Parameters
----------
val : bool
Flag identifying whether the features are discretized or not.
Default is False.
Raises
------
ValueError
If `val` is not boolean type.
"""
if not isinstance(val, bool):
raise ValueError("Attribute 'nfeatures' must be of <type 'bool'>, got %s" % str(type(val)))
cls.discretized = val
@classmethod
[docs] def set_minmax_features(cls, _min, _max):
"""Sets the minimum and maximum value for each feature.
This extracts the number of features from the `_min` and `_max`
values and ensures that it matches with `nfeatures`. If `nfeatures`
is None, the `nfeatures` attribute is set to the extracted value.
Parameters
----------
_min : array_like, shape(`nfeatures`,)
The minimum value for each feature
_max : array_like, shape(`nfeatures`,)
The maximum value for each feature
Raises
------
ValueError
If the arrays are not one-dimensional vectors, the shapes of the
arrays don't match, or the number of features does not agree with
the attribute `nfeatures`.
"""
_min = np.asarray(_min, dtype=cls.dtype)
_max = np.asarray(_max, dtype=cls.dtype)
dim = _min.size
if dim == 1:
_min.shape = (1,)
dim = _max.size
if dim == 1:
_max.shape = (1,)
if _min.shape[0] != _max.shape[0]:
raise ValueError("Dimension mismatch: array '_min' is a vector of length %d,"
" but '_max' is of length %d" % (_min.shape[0], _max.shape[0]))
if cls.nfeatures is None:
cls.nfeatures = _min.shape[0]
if _min.shape[0] != cls.nfeatures:
raise ValueError("Arrays '_min' and '_max' must be of length %d." % cls.nfeatures)
cls.min_features = _min
cls.max_features = _max
@classmethod
[docs] def set_states_per_dim(cls, nstates):
"""Sets the number of states per feature.
This extracts the number of features from `nstates` and compares
it to the attribute `nfeatures`. If it doesn't match, an exception
is thrown. If the `nfeatures` attribute is None, `nfeatures` is set
to the extracted value.
Parameters
----------
nstates : array_like, shape (`nfeatures`,)
The number of states per features
Raises
------
ValueError
If the array is not a vector of length `nfeatures`.
"""
nstates = np.asarray(nstates, dtype=cls.dtype)
dim = nstates.size
if dim == 1:
nstates.shape = (1,)
if cls.nfeatures is None:
cls.nfeatures = nstates.shape[0]
if nstates.ndim != 1 or nstates.shape[0] != cls.nfeatures:
raise ValueError("Array 'nstates' must be a vector of length %d." % cls.nfeatures)
cls.states_per_dim = nstates
def __init__(self, features, name=None):
if type(self).dtype is None:
type(self).dtype = MDPPrimitive.DTYPE_FLOAT
self._features = np.asarray(features)
if self._features.ndim != 1:
raise ValueError("Array 'features' must be one-dimensional,"
" but features.ndim = %d" % self._features.ndim)
self._name = name if name is not None else ""
if not isinstance(self._name, basestring):
raise ValueError("'name' must be a string, but got %s" % str(type(self._name)))
if type(self).nfeatures is None:
type(self).nfeatures = self._features.shape[0]
elif not self._features.shape[0] == type(self).nfeatures:
raise ValueError("Dimension mismatch: array 'features' is a vector of length %d, but"
" attribute cls.nfeatures = %d" % (self._features.shape[0], type(self).nfeatures))
if type(self).discretized and type(self).states_per_dim:
self.discretize()
# noinspection PyUnusedLocal
def __get__(self, instance, owner):
return self._features
def __getitem__(self, index):
checker = np.vectorize(lambda x: isinstance(x, slice))
if index > len(self) and not np.any(checker(index)):
raise IndexError("Assignment index out of range")
return self._features[index]
def __setitem__(self, index, value):
if index > len(self):
raise IndexError("Assignment index out of range")
self._features[index] = value
def __len__(self):
return len(self._features)
def __contains__(self, item):
return item in self._features
def __hash__(self):
return hash(tuple(self._features)) if self._features is not None else None
def __eq__(self, other):
return np.array_equal(other.get(), self._features)
def __sub__(self, other):
return self._features - other
def __mul__(self, other):
return self._features * other
def __imul__(self, other):
self._features *= other
return self
def __iter__(self):
self.ix = 0
return self
def __str__(self):
features = np.array_str(self.encode())
return "\'" + self._name + "\':\t" + features if self._name else features
def __repr__(self):
features = np.array_str(self.encode())
return "\'" + self._name + "\':\t" + features if self._name else features
[docs] def next(self):
if self.ix == len(self):
raise StopIteration
item = self._features[self.ix]
self.ix += 1
return item
def __copy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k in self.__slots__:
try:
setattr(result, k, copy.copy(getattr(self, k)))
except AttributeError:
pass
return result
def __getstate__(self):
data = {}
for name in self.__slots__:
if not name == 'ix':
data[name] = getattr(self, name)
return data
def __setstate__(self, d):
for name, value in d.iteritems():
if name not in ['nfeatures', 'dtype', 'description', 'discretized',
'min_features', 'max_features', 'states_per_dim']:
setattr(self, name, value)
type(self).nfeatures = self._features.shape[0]
[docs] def get(self):
"""Return the feature array.
Returns
-------
ndarray :
The feature array.
"""
return self._features
[docs] def tolist(self):
"""Returns the feature array as a list.
Returns
-------
list :
The features list.
"""
return self._features.tolist()
[docs] def set(self, features):
"""Sets the feature array to the given array.
Parameters
----------
features : array_like, shape (`nfeatures`,)
The new feature values.
"""
features = np.asarray(features, dtype=type(self).dtype)
if features.ndim != 1 or features.shape[0] != type(self).nfeatures:
raise ValueError("Array 'features' must be a vector of length %d." % type(self).nfeatures)
self._features = np.asarray(features)
[docs] def discretize(self):
"""Discretizes the state.
Discretize the state using the information from the minimum and
maximum values for each feature and the number of states attributed
to each feature.
"""
if not self.discretized:
return
nfeatures = type(self).nfeatures
min_features = type(self).min_features
max_features = type(self).max_features
states_per_dim = type(self).states_per_dim
if min_features is None or min_features.shape[0] != nfeatures:
raise ValueError("Attribute 'min_features' must be a vectors of length %d." % nfeatures)
if max_features is None or max_features.shape[0] != nfeatures:
raise ValueError("Attribute 'max_features' must be a vectors of length %d." % nfeatures)
if states_per_dim is None or states_per_dim.shape[0] != nfeatures:
raise ValueError("Attribute 'states_per_dim' must be a vectors of length %d." % nfeatures)
ds = []
for i, feat in enumerate(self):
factor = math.ceil(
(max_features[i] - min_features[i]) / states_per_dim[i])
if feat > 0:
bin_num = int((feat + factor / 2) / factor)
else:
bin_num = int((feat - factor / 2) / factor)
ds.append(bin_num * factor)
self._features = np.asarray(ds)
[docs] def encode(self):
# noinspection PyUnresolvedReferences,PyUnusedLocal
"""Encodes the state into a human readable representation.
Returns
-------
ndarray :
The encoded state.
Notes
-----
Optionally this method can be overwritten at runtime.
Examples
--------
>>> def my_encode(self)
... pass
...
>>> MDPPrimitive.encode = my_encode
"""
return self._features
@classmethod
[docs] def decode(cls, _repr):
# noinspection PyUnresolvedReferences,PyUnusedLocal
"""Decodes the state into its original representation.
Parameters
----------
_repr : tuple
The readable representation of the primitive.
Returns
-------
State :
The decoded state.
Notes
-----
Optionally this method can be overwritten at runtime.
Examples
--------
>>> def my_decode(cls, _repr)
... pass
...
>>> MDPPrimitive.decode = classmethod(my_decode)
"""
return cls(_repr)
@staticmethod
[docs] def key_to_index(key):
# noinspection PyUnresolvedReferences,PyUnusedLocal
"""Maps internal name to group index.
Maps the internal name of a feature to the index of the corresponding
feature grouping. For example for a feature vector consisting of the
x-y-z position of the left and the right arm, the features for the left
and the right arm can be extracted separately as a group, effectively
splitting the feature vector into two vectors with x, y, and z at the
positions specified by the the mapping of this function.
Parameters
----------
key : str
The key into the mapping
Returns
-------
int :
The index in the feature array.
Raises
------
NotImplementedError
If the child class does not implement this function.
Notes
-----
Optionally this method can be overwritten at runtime.
Examples
--------
>>> def my_key_to_index(key)
... return {
... "x": 0,
... "y": 1,
... "z": 2
... }[key]
...
>>> State.description = {'LArm': {'x': 0, 'y': 1, 'z': 2}
... 'RArm': {'x': 3, 'y': 4, 'z': 5}}
>>> State.key_to_index = staticmethod(my_key_to_index)
This specifies the mapping in both direction.
>>> state = [0.1, 0.4, 0.3. 4.6. 2.5. 0.9]
>>>
>>> mapping = State.description['LArm']
>>>
>>> larm = np.zeros[len(mapping.keys())]
>>> for key, axis in mapping.iteritems():
... larm[State.key_to_index(key)] = state[axis]
...
>>> print larm
[0.1, 0.4, 0.3]
This extracts the features for the left arm from the `state` vector.
"""
raise NotImplementedError
# noinspection PyAbstractClass,PyUnresolvedReferences
[docs]class State(MDPPrimitive):
"""Representation of the state.
States are represented by an array of features.
Parameters
----------
features : array_like, shape (`nfeatures`,)
List of features, where `nfeatures` is the number of features
identifying the primitive.
name : str, optional
The name of the primitive. Default is ''.
Attributes
----------
name
dtype : {DTYPE_FLOAT, DTYPE_INT, DTYPE_OBJECT}
The type of the features.
nfeatures : int
The number of features.
discretized : bool
Flag indicating whether the features are discretized or not.
min_features : list
The minimum value for each feature.
max_features : list
The minimum value for each feature.
states_per_dim : list
The number of states per dimension.
description : dict
A description of the features.
Notes
-----
Use the `description` to encode action information. The information
should contain the list of all available feature combinations, the
name of each feature.
:Examples:
A description of an action with three possible discrete actions:
::
{
"out": {"value": [-0.004]},
"in": {"value": [0.004]},
"kick": {"value": [-1.0]}
}
A description of an action with one possible continuous action with
name `move`, a value of `*` allows to find the action for every
feature array. Additional information encodes the feature name together
with its index into the feature array are given for each higher level
element of feature array:
::
{
"move": {
"value": "*",
"descr": {
"LArm": {"dx": 0, "dy": 1, "dz": 2},
"RArm": {"dx": 3, "dy": 4, "dz": 5},
"LLeg": {"dx": 6, "dy": 7, "dz": 8},
"RLeg": {"dx": 9, "dy": 10, "dz": 11},
"Torso": {"dx": 12, "dy": 13, "dz": 14}
}
}
}
Similarly, a continuous state can be encoded as follows, which identifies
the name of each feature together with its index into the feature array:
::
{
"LArm": {"x": 0, "y": 1, "z": 2},
"RArm": {"x": 3, "y": 4, "z": 5},
"LLeg": {"x": 6, "y": 7, "z": 8},
"RLeg": {"x": 9, "y": 10, "z": 11},
"Torso": {"x": 12, "y": 13, "z": 14}
}
A discrete state can be encoded by identifying the position of each feature:
::
{
"image x-position": 0,
"displacement (mm)": 1
}
Alternatively, the feature can be identified by a list of features, giving he
positional description:
::
["image x-position", "displacement (mm)"]
Rather then setting the attributes directly, use the methods :meth:`set_nfeatures`,
:meth:`set_dtype`, :meth:`set_description`, :meth:`set_discretized`, :meth:`set_minmax_features`,
and :meth:`set_states_per_dim` in order to enforce type checking.
Examples
--------
>>> State.description = {'LArm': {'x': 0, 'y': 1, 'z': 2}
... 'RArm': {'x': 3, 'y': 4, 'z': 5}}
This description identifies the features to be the x-y-z-position of
the left and the right arm. The position into the feature array is given
by the integer numbers.
>>> def my_key_to_index(key)
... return {
... "x": 0,
... "y": 1,
... "z": 2
... }[key]
...
>>> State.key_to_index = staticmethod(my_key_to_index)
This defines a mapping for each key.
>>> state = [0.1, 0.4, 0.3. 4.6. 2.5. 0.9]
>>>
>>> mapping = State.description['LArm']
>>>
>>> larm = np.zeros[len(mapping.keys())]
>>> for key, axis in mapping.iteritems():
... larm[State.key_to_index(key)] = state[axis]
...
>>> print larm
[0.1, 0.4, 0.3]
This extracts the features for the left arm from the `state` vector.
>>> s1 = State([0.1, 0.4, 0.2])
>>> s2 = State([0.5, 0.3, 0.5])
>>> print s1 - s2
[-0.4, 0.1, -0.3]
Subtract states from each other.
>>> print s1 * s2
[0.05, 0.12, 0.1]
Multiplies two states with each other.
>>> s1 *= s2
>>> print s1
[0.05, 0.12, 0.1]
Multiplies two states in place.
"""
initial_states = None
"""List of initial states.
:type: str | list"""
terminal_states = None
"""List of terminal states.
:type: str | list"""
def __init__(self, features, name=None):
super(State, self).__init__(features, name)
[docs] def is_initial(self):
"""Checks if the state is an initial state.
Returns
-------
bool :
Whether the state is an initial state or not.
"""
if State.initial_states is None:
return False
if isinstance(State.initial_states, list):
return self.name in State.initial_states
return self.name == self.initial_states
[docs] def is_terminal(self):
"""Checks if the state is a terminal state.
Returns
-------
bool :
Whether the state is a terminal state or not.
"""
if State.terminal_states is None:
return False
if isinstance(State.terminal_states, list):
return self.name in State.terminal_states
return self.name == self.terminal_states
# noinspection PyMethodMayBeStatic
[docs] def is_valid(self):
# noinspection PyUnresolvedReferences,PyUnusedLocal
"""Check if this state is a valid state.
Returns
-------
bool :
Whether the state is valid or not.
Notes
-----
Optionally this method can be overwritten at runtime.
Examples
--------
>>> def my_is_valid(self)
... pass
...
>>> MDPPrimitive.is_valid = my_is_valid
"""
return True
# noinspection PyAbstractClass,PyUnresolvedReferences
[docs]class Action(MDPPrimitive):
"""Representation of an action.
Actions are represented by an array of features.
Parameters
----------
features : array_like, shape (`nfeatures`,)
List of features, where `nfeatures` is the number of features
identifying the primitive.
name : str, optional
The name of the primitive. Default is ''.
Attributes
----------
name
dtype : {DTYPE_FLOAT, DTYPE_INT, DTYPE_OBJECT}
The type of the features.
nfeatures : int
The number of features.
discretized : bool
Flag indicating whether the features are discretized or not.
min_features : list
The minimum value for each feature.
max_features : list
The minimum value for each feature.
states_per_dim : list
The number of states per dimension.
description : dict
A description of the features.
Notes
-----
Use the `description` to encode action information. The information
should contain the list of all available feature combinations, the
name of each feature.
:Examples:
A description of an action with three possible discrete actions:
::
{
"out": {"value": [-0.004]},
"in": {"value": [0.004]},
"kick": {"value": [-1.0]}
}
A description of an action with one possible continuous action with
name `move`, a value of `*` allows to find the action for every
feature array. Additional information encodes the feature name together
with its index into the feature array are given for each higher level
element of feature array:
::
{
"move": {
"value": "*",
"descr": {
"LArm": {"dx": 0, "dy": 1, "dz": 2},
"RArm": {"dx": 3, "dy": 4, "dz": 5},
"LLeg": {"dx": 6, "dy": 7, "dz": 8},
"RLeg": {"dx": 9, "dy": 10, "dz": 11},
"Torso": {"dx": 12, "dy": 13, "dz": 14}
}
}
}
Similarly, a continuous state can be encoded as follows, which identifies
the name of each feature together with its index into the feature array:
::
{
"LArm": {"x": 0, "y": 1, "z": 2},
"RArm": {"x": 3, "y": 4, "z": 5},
"LLeg": {"x": 6, "y": 7, "z": 8},
"RLeg": {"x": 9, "y": 10, "z": 11},
"Torso": {"x": 12, "y": 13, "z": 14}
}
A discrete state can be encoded by identifying the position of each feature:
::
{
"image x-position": 0,
"displacement (mm)": 1
}
Alternatively, the feature can be identified by a list of features, giving he
positional description:
::
["image x-position", "displacement (mm)"]
Rather then setting the attributes directly, use the methods :meth:`set_nfeatures`,
:meth:`set_dtype`, :meth:`set_description`, :meth:`set_discretized`, :meth:`set_minmax_features`,
and :meth:`set_states_per_dim` in order to enforce type checking.
Examples
--------
>>> Action.set_description({'LArm': {'dx': 0, 'dy': 1, 'dz': 2}
... 'RArm': {'dx': 3, 'dy': 4, 'dz': 5}})
This description identifies the features to be the delta x-y-z-position of
the left and the right arm. The position into the feature array is given
by the integer numbers.
>>> def my_key_to_index(key)
... return {
... "dx": 0,
... "dy": 1,
... "dz": 2
... }[key]
...
>>> Action.key_to_index = staticmethod(my_key_to_index)
This defines a mapping for each key.
>>> action = [0.1, 0.4, 0.3. 4.6. 2.5. 0.9]
>>>
>>> mapping = Action.description['LArm']
>>>
>>> larm = np.zeros[len(mapping.keys())]
>>> for key, axis in mapping.iteritems():
... larm[Action.key_to_index(key)] = action[axis]
...
>>> print larm
[0.1, 0.4, 0.3]
This extracts the features for the left arm from the `action` vector.
>>> a1 = Action([0.1, 0.4, 0.2])
>>> a2 = Action([0.5, 0.3, 0.5])
>>> print a1 - a2
[-0.4, 0.1, -0.3]
Subtract actions from each other.
>>> print a1 * a2
[0.05, 0.12, 0.1]
Multiplies two actions with each other.
>>> a1 *= a2
>>> print a1
[0.05, 0.12, 0.1]
Multiplies two actions in place.
"""
def __init__(self, features, name=None):
super(Action, self).__init__(features, name)
self._name = name if name is not None else Action.get_name(self._features)
@classmethod
[docs] def get_name(cls, features):
"""Retrieves the name of the action.
Retrieve the name of the action using the action's description. In the case
that all features are zero the action is considered a `no-op` action.
Parameters
----------
features : ndarray
A feature array.
Returns
-------
str :
The name of the action.
"""
features = np.asarray(features, dtype=cls.dtype)
if cls.description is not None:
for e, config in cls.description.iteritems():
if np.asarray(config["value"]).shape != features.shape:
ValueError("Dimension mismatch: array 'config['value']' is vector of length %d,"
" but 'features' is a vector of length %d." % (np.asarray(config["value"]).shape[0],
features.shape[0]))
if config["value"] == features or config["value"] == "*":
return e
if not features.any():
return "no-op"
return ""
@classmethod
[docs] def get_noop_action(cls):
"""Creates a `no-op` action.
A `no-op` action does not have any effect.
Returns
-------
Action :
A `no-op` action.
"""
if not isinstance(cls.nfeatures, int):
raise ValueError("Attribute 'nfeatures' must be of <type 'int'>, got %s" % str(type(cls.nfeatures)))
return cls(np.zeros(cls.nfeatures), "no-op")