from abc import ABC, abstractmethod
from typing import Union
import numpy as np
from numba import njit
from prl.typing import HistoryABC, Action, Reward, State, MemoryABC, StorageABC
from prl.utils import timeit
[docs]@njit
def calculate_returns(
all_rewards: np.ndarray,
dones: np.ndarray,
horizon: Union[int, np.float],
discount_factor: float,
_index: int,
):
if np.any(dones) and (horizon is np.inf) and dones[_index - 1]:
assert 0.0 <= discount_factor <= 1.0
splits = [-1] + list(np.nonzero(dones)[0])
all_returns = np.zeros_like(all_rewards)
for s in range(len(splits) - 1):
start = splits[s] + 1
end = splits[s + 1] + 1
rewards = all_rewards[start:end]
returns = np.zeros_like(rewards)
discounts = np.zeros_like(rewards) + discount_factor
powers = np.arange(rewards.shape[0])
discounts = np.power(discounts, powers)
length = len(rewards)
for i in range(length):
trimmed_rewards = rewards[i:length]
trimmed_discounts = discounts[: (length - i)]
returns[i] = np.sum(trimmed_discounts * trimmed_rewards)
all_returns[start:end] = returns
return all_returns
else:
raise Exception(
"Returns available only for at least one complete episode, there can't be an incomplete episode"
"and the horizon must be np.inf"
)
[docs]@njit
def calculate_total_rewards(all_rewards: np.ndarray, dones: np.ndarray, _index: int):
if np.any(dones) and dones[_index - 1]:
splits = [-1] + list(np.nonzero(dones)[0])
all_total_rewards = np.zeros_like(all_rewards)
for s in range(len(splits) - 1):
start = splits[s] + 1
end = splits[s + 1] + 1
rewards = all_rewards[start:end]
total_reward = np.sum(rewards)
all_total_rewards[start:end] = total_reward
return all_total_rewards
else:
raise Exception(
"Returns available only for at least one complete episode and all episodes must be done"
)
[docs]class Storage(StorageABC, ABC):
[docs] @abstractmethod
def update(self, action, reward, done, state):
"""
Updates the object with latest states, reward, actions and done flag.
Args:
action: action executed by the agent
reward: reward from environments
done: done flag from environments
state: new state returned by wrapped environments after executing action
"""
[docs] @abstractmethod
def new_state_update(self, state):
"""Overwrites newest state in the History
Args:
state: state array.
"""
[docs] @abstractmethod
def get_states(self) -> np.ndarray:
"""Returns an array of all states.
Returns:
array of all states
"""
[docs] @abstractmethod
def get_last_state(self) -> np.ndarray:
"""Returns only the last state.
Returns:
last state
"""
[docs] @abstractmethod
def get_rewards(self) -> np.ndarray:
"""Returns an array of all rewards.
Returns:
array of all rewards
"""
[docs] @abstractmethod
def get_actions(self) -> np.ndarray:
"""Returns an array of all actions.
Returns:
array of all actions
"""
[docs] @abstractmethod
def get_dones(self) -> np.ndarray:
"""Returns an array of all done flags.
Returns:
array of all done flags
"""
[docs] @abstractmethod
def sample_batch(
self, replay_buffor_size: int, batch_size: int, returns: bool, next_states: bool
) -> tuple:
"""Samples batch of examples from the Storage.
Args:
replay_buffer_size: length of a replay buffor to sample examples from
batch_size: number of returned examples
returns: if True, the method will return the returns from each step instead of the rewards
next_states: if True, the method will return also next states (i.e. for DQN algorithm)
Returns:
batch of samples from history in form of a tuple with np.ndarrays in order:
states, actions, rewards, dones, (new_states)
"""
@timeit
def __getitem__(self, indicies) -> tuple:
return (
self.get_states()[indicies],
self.get_actions()[indicies],
self.get_rewards()[indicies],
self.get_dones()[indicies],
)
@abstractmethod
def __len__(self):
pass
@abstractmethod
def __repr__(self):
pass
[docs]class History(Storage, HistoryABC):
"""
An object which is used to keep the episodes history (used within :py:class:`~prl.environments.environments.Environment` class
and by some agents). Agent can use this object to keep history of past episodes,
calculate returns, total rewards, etc. and sample batches from it.
Object also supports indexing and slicing because it supports python Sequence protocol,
so functions working on sequences like random.choice can be also used on history.
Args:
initial_state: initial state from enviroment
action_type: numpy type of action (e.g. np.int32)
initial_length: initial length of a history
"""
@timeit
def __init__(
self, initial_state: np.ndarray, action_type: type, initial_length: int = 512
):
self._index = 0
self.states = np.empty(
(initial_length,) + initial_state.shape, dtype=np.float32
)
self.actions = np.empty((initial_length,), dtype=action_type)
self.rewards = np.empty((initial_length,))
self.dones = np.empty((initial_length,), dtype=np.bool)
self.states[self._index] = initial_state
[docs] @timeit
def update(self, action: Action, reward: Reward, done: bool, state: State):
if self._index == (self.states.shape[0] - 1):
self._enlarge()
self.actions[self._index] = action
self.rewards[self._index] = reward
self.dones[self._index] = done
self._index += 1
self.states[self._index] = state
[docs] @timeit
def new_state_update(self, state: State):
self.states[self._index] = state
[docs] @timeit
def get_states(self) -> np.ndarray:
return self.states[: self._index]
[docs] @timeit
def get_last_state(self) -> np.ndarray:
return self.states[self._index]
[docs] @timeit
def get_rewards(self) -> np.ndarray:
return self.rewards[: self._index]
[docs] @timeit
def get_actions(self) -> np.ndarray:
return self.actions[: self._index]
[docs] @timeit
def get_dones(self) -> np.ndarray:
return self.dones[: self._index]
[docs] @timeit
def get_returns(
self, discount_factor: float = 1.0, horizon: float = np.inf
) -> np.ndarray:
"""Calculates returns for each step.
Returns:
array of discounted returns for each step
"""
return calculate_returns(
self.get_rewards(), self.get_dones(), horizon, discount_factor, self._index
)
[docs] @timeit
def get_total_rewards(self) -> np.ndarray:
"""
Calculates sum of all rewards for each episode and reports it for each state,
so every state in one episode has the same value of total reward. This can
be useful for filtering states for best episodes (e.g. in Cross Entropy Algorithm).
Returns:
total reward for each state
"""
return calculate_total_rewards(
self.get_rewards(), self.get_dones(), self._index
)
[docs] @timeit
def get_number_of_episodes(self) -> int:
"""Returns a number of full episodes in history.
Returns:
number of full episodes in history
"""
return int(self.get_dones().sum())
[docs] @timeit
def sample_batch(
self,
replay_buffer_size: int,
batch_size: int = 64,
returns: bool = False,
next_states: bool = False,
) -> tuple:
if returns:
raise NotImplementedError("The returns will be implemented soon")
elif next_states:
if self._index < 2:
raise Exception(
"Can't sample examples with next_state when the history has length 1."
)
indexes = np.random.randint(
np.max([0, self._index - replay_buffer_size]),
self._index - 1,
size=batch_size,
)
return self[indexes] + (self.get_states()[indexes + 1],)
else:
indexes = np.random.randint(
np.max(0, self._index - replay_buffer_size),
self._index,
size=batch_size,
)
return self[indexes]
[docs] def get_summary(self) -> (float, float, int):
total_rewards_mean = self.get_total_rewards()[self.get_dones()].mean()
mean_length = len(self) / self.get_number_of_episodes()
return total_rewards_mean, mean_length, self._index
@timeit
def _enlarge(self):
new_shape = list(self.states.shape)
new_shape[0] *= 2
self.states = np.resize(self.states, new_shape)
new_shape = list(self.actions.shape)
new_shape[0] *= 2
self.actions = np.resize(self.actions, new_shape)
new_shape = list(self.rewards.shape)
new_shape[0] *= 2
self.rewards = np.resize(self.rewards, new_shape)
new_shape = list(self.dones.shape)
new_shape[0] *= 2
self.dones = np.resize(self.dones, new_shape)
print("Enlarging History. New max length: ", self.dones.shape[0])
def __add__(self, other):
raise NotImplementedError(
"You can only use inplace operators between History instances"
)
@timeit
def __iadd__(self, other: HistoryABC):
self.states = np.concatenate([self.get_states(), other.states])
self.actions = np.concatenate([self.get_actions(), other.actions])
self.rewards = np.concatenate([self.get_rewards(), other.rewards])
self.dones = np.concatenate([self.get_dones(), other.dones])
self._index += other._index
return self
def __len__(self):
return self._index
def __repr__(self):
representation = ""
for k, v in self.__dict__.items():
if isinstance(v, np.ndarray):
representation += "%s:\n%s\n" % (k, v[: self._index])
else:
representation += "%s:\n%s\n" % (k, v)
return representation
[docs]class Memory(Storage, MemoryABC):
"""
An object to be used as replay buffer. Doesn't contain full episodes and acts
as limited FIFO queue. Implemented as double size numpy arrays with duplicated data
to support very fast slicing and sampling at the cost of higher memory usage.
Args:
initial_state: initial state from enviroment
action_type: numpy type of action (e.g. np.int32)
maximum_length: maximum number of examples to keep in queue
"""
@timeit
def __init__(
self, initial_state: np.ndarray, action_type, maximum_length: int = 1000
):
self._maximum_length = maximum_length
self.states = np.empty(
(2 * maximum_length + 2,) + initial_state.shape, dtype=np.float32
)
self.actions = np.empty((2 * maximum_length + 2,), dtype=action_type)
self.rewards = np.empty((2 * maximum_length + 2,))
self.dones = np.empty((2 * maximum_length + 2,), dtype=np.bool)
self.clear(initial_state)
[docs] @timeit
def clear(self, initial_state):
self._lower_index = 0
self._index = 1
self._full = False
self.states[self._index] = initial_state
[docs] @timeit
def update(self, action, reward, done, state):
self.actions[self._index] = action
self.rewards[self._index] = reward
self.dones[self._index] = done
if self._full:
self.actions[self._lower_index] = action
self.rewards[self._lower_index] = reward
self.dones[self._lower_index] = done
self._index += 1
if self._index > self._maximum_length + 1:
self._lower_index += 1
self._full = True
self.states[self._lower_index] = state
if self._index == 2 * self._maximum_length + 2:
self._index = self._maximum_length + 1
self._lower_index = 0
self.states[self._index] = state
[docs] @timeit
def new_state_update(self, state):
self.states[self._index] = state
if self._full:
self.states[self._lower_index] = state
[docs] @timeit
def get_states(self, include_last=False) -> np.ndarray:
index = self._index
if include_last:
index += 1
return self.states[(self._lower_index + 1) : index]
[docs] @timeit
def get_last_state(self) -> np.ndarray:
return self.states[self._index]
[docs] @timeit
def get_rewards(self) -> np.ndarray:
return self.rewards[(self._lower_index + 1) : self._index]
[docs] @timeit
def get_actions(self) -> np.ndarray:
return self.actions[(self._lower_index + 1) : self._index]
[docs] @timeit
def get_dones(self) -> np.ndarray:
return self.dones[(self._lower_index + 1) : self._index]
[docs] @timeit
def sample_batch(
self,
replay_buffor_size: int,
batch_size: int = 64,
returns: bool = False,
next_states: bool = False,
) -> tuple:
if returns:
raise NotImplementedError("The returns will be implemented soon")
elif next_states:
if self._index < 2:
raise Exception(
"Can't sample examples with next_state when the history has length 1."
)
indicies = np.random.randint(
self._index - self._lower_index - 2, size=batch_size
)
return self[indicies] + (self.get_states()[indicies + 1],)
else:
indicies = np.random.randint(
self._index - self._lower_index, size=batch_size
)
return self[indicies]
def __len__(self):
return self._index - (self._lower_index + 1)
def __repr__(self):
representation = ""
for k, v in self.__dict__.items():
if isinstance(v, np.ndarray):
representation += "%s:\n%s\n" % (
k,
v[(self._lower_index + 1) : self._index],
)
else:
representation += "%s:\n%s\n" % (k, v)
return representation