Основные определения. Value iteration#
Среда, действия, награды#
В этом разделе мы рассмотрим ключевые концепции, связанные с обучением с подкреплением (Reinforcement Learning, RL), такие как среда, действия и награды, а также основные типы сред и особенности наград.
В общем случае, среда может вести себя непредсказуемым образом, и отследить причинно-следственные связи в такой среде было бы проблематично, что значительно бы усложнило обучение (если мы не знаем, какое действие привело к хорошей награде, мы не можем понять, какие действия стоит совершать чаще).
Но мы можем сделать ряд предположений о среде, которые позволят нам эффективно ее моделировать.
Марковские среды#
На практике многие среды обладают свойством «Маркова»: следующее состояние среды зависит только от текущего состояния и действия, совершаемого в данный момент.
Рассмотрим сначала конечные, марковские среды. Они описываются:
Конечным множеством состояний \(s\in\mathcal{S}\)
Конечным множеством действий \(a\in\mathcal{A}\)
Переходными вероятностями \(P(s_{n+1}|s_n,a_n,s_{n-1},a_{n-1},...) = P(s_{n+1}|s_n,a_n)\), характеризующими вероятности оказаться в состоянии \(s_{n+1}\) при совершениии действия \(a_n\) в состоянии \(s_n\)
Функцией награды \(R(s, a, s') \to \mathbb{R}\), приписывающей действительную награду переходу из состояния \(s\) в состояние \(s'\) при совершении действия \(a\)
Зададим их явно для некоторой игрушечной среды
Show code cell source
#@title Установка зависимостей и вспомогательные функции
#!pip install gym --quiet
# most of this code was politely stolen from https://github.com/berkeleydeeprlcourse/homework/
# all credit goes to https://github.com/abhishekunique
# (if I got the author right)
import numpy as np
from gym.utils import seeding
try:
from graphviz import Digraph
import graphviz
has_graphviz = True
except ImportError:
has_graphviz = False
class MDP:
def __init__(self, transition_probs, rewards, initial_state=None, seed=None):
"""
Defines an MDP. Compatible with gym Env.
:param transition_probs: transition_probs[s][a][s_next] = P(s_next | s, a)
A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> prob]
For each state and action, probabilities of next states should sum to 1
If a state has no actions available, it is considered terminal
:param rewards: rewards[s][a][s_next] = r(s,a,s')
A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> reward]
The reward for anything not mentioned here is zero.
:param get_initial_state: a state where agent starts or a callable() -> state
By default, picks initial state at random.
States and actions can be anything you can use as dict keys, but we recommend that you use strings or integers
Here's an example from MDP depicted on http://bit.ly/2jrNHNr
transition_probs = {
's0': {
'a0': {'s0': 0.5, 's2': 0.5},
'a1': {'s2': 1}
},
's1': {
'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
'a1': {'s1': 0.95, 's2': 0.05}
},
's2': {
'a0': {'s0': 0.4, 's2': 0.6},
'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
}
}
rewards = {
's1': {'a0': {'s0': +5}},
's2': {'a1': {'s0': -1}}
}
"""
self._check_param_consistency(transition_probs, rewards)
self._transition_probs = transition_probs
self._rewards = rewards
self._initial_state = initial_state
self.n_states = len(transition_probs)
self.reset()
self.np_random, _ = seeding.np_random(seed)
def get_all_states(self):
""" return a tuple of all possiblestates """
return tuple(self._transition_probs.keys())
def get_possible_actions(self, state):
""" return a tuple of possible actions in a given state """
return tuple(self._transition_probs.get(state, {}).keys())
def is_terminal(self, state):
""" return True if state is terminal or False if it isn't """
return len(self.get_possible_actions(state)) == 0
def get_next_states(self, state, action):
""" return a dictionary of {next_state1 : P(next_state1 | state, action), next_state2: ...} """
assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
return self._transition_probs[state][action]
def get_transition_prob(self, state, action, next_state):
""" return P(next_state | state, action) """
return self.get_next_states(state, action).get(next_state, 0.0)
def get_reward(self, state, action, next_state):
""" return the reward you get for taking action in state and landing on next_state"""
assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
return self._rewards.get(state, {}).get(action, {}).get(next_state, 0.0)
def reset(self):
""" reset the game, return the initial state"""
if self._initial_state is None:
self._current_state = self.np_random.choice(
tuple(self._transition_probs.keys()))
elif self._initial_state in self._transition_probs:
self._current_state = self._initial_state
elif callable(self._initial_state):
self._current_state = self._initial_state()
else:
raise ValueError(
"initial state %s should be either a state or a function() -> state" % self._initial_state)
return self._current_state
def step(self, action):
""" take action, return next_state, reward, is_done, empty_info """
possible_states, probs = zip(*self.get_next_states(self._current_state, action).items())
next_state = possible_states[self.np_random.choice(np.arange(len(possible_states)), p=probs)]
reward = self.get_reward(self._current_state, action, next_state)
is_done = self.is_terminal(next_state)
self._current_state = next_state
return next_state, reward, is_done, {}
def render(self):
print("Currently at %s" % self._current_state)
def _check_param_consistency(self, transition_probs, rewards):
for state in transition_probs:
assert isinstance(transition_probs[state], dict), \
"transition_probs for %s should be a dictionary but is instead %s" % (
state, type(transition_probs[state]))
for action in transition_probs[state]:
assert isinstance(transition_probs[state][action], dict), \
"transition_probs for %s, %s should be a a dictionary but is instead %s" % (
state, action, type(transition_probs[state][action]))
next_state_probs = transition_probs[state][action]
assert len(next_state_probs) != 0, "from state %s action %s leads to no next states" % (state, action)
sum_probs = sum(next_state_probs.values())
assert abs(sum_probs - 1) <= 1e-10, \
"next state probabilities for state %s action %s add up to %f (should be 1)" % (
state, action, sum_probs)
for state in rewards:
assert isinstance(rewards[state], dict), \
"rewards for %s should be a dictionary but is instead %s" % (
state, type(rewards[state]))
for action in rewards[state]:
assert isinstance(rewards[state][action], dict), \
"rewards for %s, %s should be a a dictionary but is instead %s" % (
state, action, type(rewards[state][action]))
msg = "The Enrichment Center once again reminds you that Android Hell is a real place where" \
" you will be sent at the first sign of defiance."
assert None not in transition_probs, "please do not use None as a state identifier. " + msg
assert None not in rewards, "please do not use None as an action identifier. " + msg
class FrozenLakeEnv(MDP):
"""
Winter is here. You and your friends were tossing around a frisbee at the park
when you made a wild throw that left the frisbee out in the middle of the lake.
The water is mostly frozen, but there are a few holes where the ice has melted.
If you step into one of those holes, you'll fall into the freezing water.
At this time, there's an international frisbee shortage, so it's absolutely imperative that
you navigate across the lake and retrieve the disc.
However, the ice is slippery, so you won't always move in the direction you intend.
The surface is described using a grid like the following
SFFF
FHFH
FFFH
HFFG
S : starting point, safe
F : frozen surface, safe
H : hole, fall to your doom
G : goal, where the frisbee is located
The episode ends when you reach the goal or fall in a hole.
You receive a reward of 1 if you reach the goal, and zero otherwise.
"""
MAPS = {
"4x4": [
"SFFF",
"FHFH",
"FFFH",
"HFFG"
],
"8x8": [
"SFFFFFFF",
"FFFFFFFF",
"FFFHFFFF",
"FFFFFHFF",
"FFFHFFFF",
"FHHFFFHF",
"FHFFHFHF",
"FFFHFFFG"
],
}
def __init__(self, desc=None, map_name="4x4", slip_chance=0.2, seed=None):
if desc is None and map_name is None:
raise ValueError('Must provide either desc or map_name')
elif desc is None:
desc = self.MAPS[map_name]
assert ''.join(desc).count(
'S') == 1, "this implementation supports having exactly one initial state"
assert all(c in "SFHG" for c in
''.join(desc)), "all cells must be either of S, F, H or G"
self.desc = desc = np.asarray(list(map(list, desc)), dtype='str')
self.lastaction = None
nrow, ncol = desc.shape
states = [(i, j) for i in range(nrow) for j in range(ncol)]
actions = ["left", "down", "right", "up"]
initial_state = states[np.array(desc == b'S').ravel().argmax()]
def move(row, col, movement):
if movement == 'left':
col = max(col - 1, 0)
elif movement == 'down':
row = min(row + 1, nrow - 1)
elif movement == 'right':
col = min(col + 1, ncol - 1)
elif movement == 'up':
row = max(row - 1, 0)
else:
raise ("invalid action")
return (row, col)
transition_probs = {s: {} for s in states}
rewards = {s: {} for s in states}
for (row, col) in states:
if desc[row, col] in "GH":
continue
for action_i in range(len(actions)):
action = actions[action_i]
transition_probs[(row, col)][action] = {}
rewards[(row, col)][action] = {}
for movement_i in [(action_i - 1) % len(actions), action_i,
(action_i + 1) % len(actions)]:
movement = actions[movement_i]
newrow, newcol = move(row, col, movement)
prob = (1. - slip_chance) if movement == action else (
slip_chance / 2.)
if prob == 0:
continue
if (newrow, newcol) not in transition_probs[row, col][
action]:
transition_probs[row, col][action][
newrow, newcol] = prob
else:
transition_probs[row, col][action][
newrow, newcol] += prob
if desc[newrow, newcol] == 'G':
rewards[row, col][action][newrow, newcol] = 1.0
MDP.__init__(self, transition_probs, rewards, initial_state, seed)
def render(self):
desc_copy = np.copy(self.desc)
desc_copy[self._current_state] = '*'
print('\n'.join(map(''.join, desc_copy)), end='\n\n')
def plot_graph(mdp, graph_size='20,20', s_node_size='1,5',
a_node_size='0,5', rankdir='LR', ):
"""
Function for pretty drawing MDP graph with graphviz library.
Requirements:
graphviz : https://www.graphviz.org/
for ubuntu users: sudo apt-get install graphviz
python library for graphviz
for pip users: pip install graphviz
:param mdp:
:param graph_size: size of graph plot
:param s_node_size: size of state nodes
:param a_node_size: size of action nodes
:param rankdir: order for drawing
:return: dot object
"""
s_node_attrs = {'shape': 'doublecircle',
'color': '#85ff75',
'style': 'filled',
'width': str(s_node_size),
'height': str(s_node_size),
'fontname': 'Arial',
'fontsize': '24'}
a_node_attrs = {'shape': 'circle',
'color': 'lightpink',
'style': 'filled',
'width': str(a_node_size),
'height': str(a_node_size),
'fontname': 'Arial',
'fontsize': '20'}
s_a_edge_attrs = {'style': 'bold',
'color': 'red',
'ratio': 'auto'}
a_s_edge_attrs = {'style': 'dashed',
'color': 'blue',
'ratio': 'auto',
'fontname': 'Arial',
'fontsize': '16'}
graph = Digraph(name='MDP')
graph.attr(rankdir=rankdir, size=graph_size)
for state_node in mdp._transition_probs:
graph.node(state_node, **s_node_attrs)
for posible_action in mdp.get_possible_actions(state_node):
action_node = state_node + "-" + posible_action
graph.node(action_node,
label=str(posible_action),
**a_node_attrs)
graph.edge(state_node, state_node + "-" +
posible_action, **s_a_edge_attrs)
for posible_next_state in mdp.get_next_states(state_node,
posible_action):
probability = mdp.get_transition_prob(
state_node, posible_action, posible_next_state)
reward = mdp.get_reward(
state_node, posible_action, posible_next_state)
if reward != 0:
label_a_s_edge = 'p = ' + str(probability) + \
' ' + 'reward =' + str(reward)
else:
label_a_s_edge = 'p = ' + str(probability)
graph.edge(action_node, posible_next_state,
label=label_a_s_edge, **a_s_edge_attrs)
return graph
def plot_graph_with_state_values(mdp, state_values):
""" Plot graph with state values"""
graph = plot_graph(mdp)
for state_node in mdp._transition_probs:
value = state_values[state_node]
graph.node(state_node, label=str(state_node) + '\n' + 'V =' + str(value)[:4])
return graph
def get_optimal_action_for_plot(mdp, state_values, state, get_action_value, gamma=0.9):
""" Finds optimal action using formula above. """
if mdp.is_terminal(state):
return None
next_actions = mdp.get_possible_actions(state)
q_values = [get_action_value(mdp, state_values, state, action, gamma) for action in next_actions]
optimal_action = next_actions[np.argmax(q_values)]
return optimal_action
def plot_graph_optimal_strategy_and_state_values(mdp, state_values, get_action_value, gamma=0.9):
""" Plot graph with state values and """
graph = plot_graph(mdp)
opt_s_a_edge_attrs = {'style': 'bold',
'color': 'green',
'ratio': 'auto',
'penwidth': '6'}
for state_node in mdp._transition_probs:
value = state_values[state_node]
graph.node(state_node, label=str(state_node) + '\n' + 'V =' + str(value)[:4])
for action in mdp.get_possible_actions(state_node):
if action == get_optimal_action_for_plot(mdp, state_values, state_node, get_action_value, gamma):
graph.edge(state_node, state_node + "-" + action, **opt_s_a_edge_attrs)
return graph
# Множество состояний:
# Три состояния
states = ['s0', 's1', 's2']
# Вероятности переходов:
# transition_probs[s][a][s'] задает P(s'|s,a)
transition_probs = {
's0': {
'a0': {'s0': 0.5, 's2': 0.5},
'a1': {'s2': 1}
},
's1': {
'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
'a1': {'s1': 0.95, 's2': 0.05}
},
's2': {
'a0': {'s0': 0.4, 's2': 0.6},
'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
}
}
# Награды:
# rewards[s][a][s'] задает награду R(s, a, s')
rewards = {
's1': {'a0': {'s0': +5}},
's2': {'a1': {'s0': -1}}
}
# Обертка для упрощения работы с этими словарями (смотрите, какие есть методы)
mdp = MDP(transition_probs, rewards, initial_state='s0')
from IPython.display import display
display(plot_graph(mdp))
Принятие решений#
Политикой (policy) называется функция \(\pi(s)\to a\), которая каждому состоянию среды \(s\in\mathcal{S}\) сопостовляет действие \(a\in \mathcal{A}\). В этом семинаре мы ограничимся рассмотрением детерминированных политик.
В общем случае, решение MDP сводится к нахождению политики, максимизирующей нагрду в среде. Подробнее об этом далее.
Награда#
Reward hypothesis (R.Sutton)
Goals and purposes can be thought of as the maximization of the expected value of the cumulative sum of a received scalar signal
Кумулятивная награда (cumulative reward, return) в момент \(t\) записывается как сумма наград, полученных после \(t\):
, где \(T\) - время окончания сессии взаимодействия со средой.
Но есть две причины, почему мы не можем оптимизировать непосредственно \(G\):
Если взаимодействие со средой идет бесконечно (\(T=\infty\)), то \(G\) - расходящийся ряд
Награда в момент \(t\) одинаково влияет на \(G_t\) и \(G_{t-100}\). Поэтому из большого значения \(G_t\) не следует, что действия принимаемые в момент \(t\) были оптимальны.
Решение: дисконтированная награда
Плюсы:
Ряд сходится абсолютно
Ближайшие награды входят с бо’льшим весом
(Важно!) \(G_t = R_t + \gamma G_{t+1}\)
В дальнейшем, мы будет заниматься именно оптимизацией кумулятивной дисконтированной награды.
Стохастичность
Так как среда недетерминированная, одним и тем же действиям будут соответствовать различные состояния и награды, а значит \(G\) - случайная величина.
Возьмем от нее матожидание: $\( \mathbb{E}_{\pi}G = \mathbb{E}_{s_1|s_0,a_0=\pi(s_0)}[R_0 + \mathbb{E}_{s_2|s_1,a_1=\pi(s_1)}[\gamma R_1 + \dots]] \)$
Видим, что помимо свойств среды и политики, \(\mathbb{E}_{\pi}G\) зависит только от начального состояния. Введем функцию $\( \mathcal{V}_\pi(s) \overset{\Delta}{=} \mathbb{E}_{\pi}[G|s_0=s] = \sum\limits_{s'}p(s'|s,a=\pi(s))[r(s, a=\pi(s), s') + \gamma \mathbb{E}_{\pi, s_0 = s'}G] = \sum\limits_{ s'}p(s'|s,a=\pi(s))[r(s, a=\pi(s), s') + \gamma \mathcal{V}_\pi(s')] \)\( , где \)p$ - вероятности перехода для среды
\(\mathcal{V}_\pi(s)\) - матожидание кумулятивной дисконтированной награды для агента с политикой \(\pi\), находящегося в состоянии \(s\)
Введем дополнительную функцию $\( q_\pi(s, a) = \mathbb{E}_{\pi}[G|s_0=s,a_0=a] = \sum\limits_{s'}p(s'|s,a)[r(s, a, s') + \gamma \mathcal{V}_\pi(s')] \)$
\(q_\pi(s, a)\) - матожидание кумулятивной дисконтированной награды для агента, находящегося в состоянии \(s\), соверщающего действие \(a\), а потом действующего по политике \(\pi\)
# Напишите код, вычисляющий Q по V и среде
def get_action_value(mdp, state_values, state, action, gamma):
""" Computes Q(s,a) as in formula above
:param mdp: Среда
Используйте методы mdp.get_next_states(state, action)
и mdp.get_reward(state, action, next_state)
:param state_values: V
dict[state -> float]
:param action: a
:param gamma: gamma
"""
Q=0
s_dict = mdp.get_next_states(state, action)
s_new_list = s_dict.keys()
for s in s_new_list:
Q += s_dict[s] * (mdp.get_reward(state, action, s) + gamma * state_values[s])
return Q
import numpy as np
test_Vs = {s: i for i, s in enumerate(sorted(mdp.get_all_states()))}
assert np.isclose(get_action_value(mdp, test_Vs, 's2', 'a1', 0.9), 0.69)
assert np.isclose(get_action_value(mdp, test_Vs, 's1', 'a0', 0.9), 3.95)
Value Iteration#
Уравнения Бэллмана#
Оптимальная политика
Введем понятие оптимальной политики \(\pi_*\) - такой политики, что \(\forall \pi, \forall s\in \mathcal{S}: \mathcal{V}_{\pi_*} (s) \ge \mathcal{V}_\pi\)(s)
Для оптимальной политики выполняется:
, так как принимаемое действие всегда максимизирует матожидание кумулятивной дисконтированной награды. То есть политику \(\pi_*\) можно записать как:
Если бы нам были известны \(\mathcal{V}_{\pi_*}(s)\) для оптимальной политики, то мы могли бы легко восстановить оптимальную политику. Но они нам неизвестны.
Мы должны их выучить.
Алгоритм#
Оператор Бэллмана
Пусть у нас есть некоторые предполагаемые значения \(\mathcal{V}_{\pi_*} = V(s) \to \mathcal{R}\). Введем на них оператор $\([\tau V](s) = \max_a \mathbb{E}_{s'|s,a}[r(s, a, s') + \gamma V(s')]\)$
, то есть мы записываем в \(V(s)\) новое предполагаемое значение, равное ожиданию \(V\), совершая действие, основываясь на предполагаемых значениях \(V\).
У этого отображения есть два полезных свойства:
\(\tau \mathcal{V}_{\pi_*} = \mathcal{V}_{\pi_*}\), как мы показали выше
\(\tau\) - сжимающее отображение, по метрике \(d(v, u) = \max_s|v(s) - u(s)|\)
Значит \(\mathcal{V}_{\pi_*}\) - стационарная точка сжимающего отображения. Может быть получена методом итерации.
Эту операцию можно записать как:
Имплементация алгоритма#
# Напишите код, вычисляющий [\tau V](s) с помощью среды, V, s и gamma
# Используйте mdp.get_possible_actions(state) для нахождения множества s' и уже написанную вами функцию get_action_value для вычисления Q
def get_new_state_value(mdp, state_values, state, gamma):
""" Computes next V(s) as in formula above. Please do not change state_values in process. """
if mdp.is_terminal(state):
return 0
action_list = mdp.get_possible_actions(state)
return max([get_action_value(mdp, state_values, state, action, gamma) for action in action_list])
test_Vs_copy = dict(test_Vs)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's0', 0.9), 1.8)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's2', 0.9), 1.08)
assert np.isclose(get_new_state_value(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9), -13500000000.0), \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert test_Vs == test_Vs_copy, "Please do not change state_values in get_new_state_value"
Решение простой задачи#
Теперь соберем готовый итерационный алгоритм
# parameters
gamma = 0.9 # discount for MDP
num_iter = 100 # maximum iterations, excluding initialization
# stop VI if new values are this close to old values (or closer)
min_difference = 0.001
# initialize V(s)
state_values = {s: 0 for s in mdp.get_all_states()}
if has_graphviz:
display(plot_graph_with_state_values(mdp, state_values))
for i in range(num_iter):
# Compute new state values using the functions you defined above.
# It must be a dict {state : float V_new(state)}
new_state_values = {s: get_new_state_value(mdp, state_values, s, gamma) for s in mdp.get_all_states() }
assert isinstance(new_state_values, dict)
# Compute difference
diff = max(abs(new_state_values[s] - state_values[s])
for s in mdp.get_all_states())
print("iter %4i | diff: %6.5f | " % (i, diff), end="")
print(' '.join("V(%s) = %.3f" % (s, v) for s, v in state_values.items()))
state_values = new_state_values
if diff < min_difference:
print("Terminated")
break
iter 0 | diff: 3.50000 | V(s0) = 0.000 V(s1) = 0.000 V(s2) = 0.000
iter 1 | diff: 0.64500 | V(s0) = 0.000 V(s1) = 3.500 V(s2) = 0.000
iter 2 | diff: 0.58050 | V(s0) = 0.000 V(s1) = 3.815 V(s2) = 0.645
iter 3 | diff: 0.43582 | V(s0) = 0.581 V(s1) = 3.959 V(s2) = 0.962
iter 4 | diff: 0.30634 | V(s0) = 0.866 V(s1) = 4.395 V(s2) = 1.272
iter 5 | diff: 0.27571 | V(s0) = 1.145 V(s1) = 4.670 V(s2) = 1.579
iter 6 | diff: 0.24347 | V(s0) = 1.421 V(s1) = 4.926 V(s2) = 1.838
iter 7 | diff: 0.21419 | V(s0) = 1.655 V(s1) = 5.169 V(s2) = 2.075
iter 8 | diff: 0.19277 | V(s0) = 1.868 V(s1) = 5.381 V(s2) = 2.290
iter 9 | diff: 0.17327 | V(s0) = 2.061 V(s1) = 5.573 V(s2) = 2.481
iter 10 | diff: 0.15569 | V(s0) = 2.233 V(s1) = 5.746 V(s2) = 2.654
iter 11 | diff: 0.14012 | V(s0) = 2.389 V(s1) = 5.902 V(s2) = 2.810
iter 12 | diff: 0.12610 | V(s0) = 2.529 V(s1) = 6.042 V(s2) = 2.950
iter 13 | diff: 0.11348 | V(s0) = 2.655 V(s1) = 6.168 V(s2) = 3.076
iter 14 | diff: 0.10213 | V(s0) = 2.769 V(s1) = 6.282 V(s2) = 3.190
iter 15 | diff: 0.09192 | V(s0) = 2.871 V(s1) = 6.384 V(s2) = 3.292
iter 16 | diff: 0.08272 | V(s0) = 2.963 V(s1) = 6.476 V(s2) = 3.384
iter 17 | diff: 0.07445 | V(s0) = 3.045 V(s1) = 6.558 V(s2) = 3.467
iter 18 | diff: 0.06701 | V(s0) = 3.120 V(s1) = 6.633 V(s2) = 3.541
iter 19 | diff: 0.06031 | V(s0) = 3.187 V(s1) = 6.700 V(s2) = 3.608
iter 20 | diff: 0.05428 | V(s0) = 3.247 V(s1) = 6.760 V(s2) = 3.668
iter 21 | diff: 0.04885 | V(s0) = 3.301 V(s1) = 6.814 V(s2) = 3.723
iter 22 | diff: 0.04396 | V(s0) = 3.350 V(s1) = 6.863 V(s2) = 3.771
iter 23 | diff: 0.03957 | V(s0) = 3.394 V(s1) = 6.907 V(s2) = 3.815
iter 24 | diff: 0.03561 | V(s0) = 3.434 V(s1) = 6.947 V(s2) = 3.855
iter 25 | diff: 0.03205 | V(s0) = 3.469 V(s1) = 6.982 V(s2) = 3.891
iter 26 | diff: 0.02884 | V(s0) = 3.502 V(s1) = 7.014 V(s2) = 3.923
iter 27 | diff: 0.02596 | V(s0) = 3.530 V(s1) = 7.043 V(s2) = 3.951
iter 28 | diff: 0.02336 | V(s0) = 3.556 V(s1) = 7.069 V(s2) = 3.977
iter 29 | diff: 0.02103 | V(s0) = 3.580 V(s1) = 7.093 V(s2) = 4.001
iter 30 | diff: 0.01892 | V(s0) = 3.601 V(s1) = 7.114 V(s2) = 4.022
iter 31 | diff: 0.01703 | V(s0) = 3.620 V(s1) = 7.133 V(s2) = 4.041
iter 32 | diff: 0.01533 | V(s0) = 3.637 V(s1) = 7.150 V(s2) = 4.058
iter 33 | diff: 0.01380 | V(s0) = 3.652 V(s1) = 7.165 V(s2) = 4.073
iter 34 | diff: 0.01242 | V(s0) = 3.666 V(s1) = 7.179 V(s2) = 4.087
iter 35 | diff: 0.01117 | V(s0) = 3.678 V(s1) = 7.191 V(s2) = 4.099
iter 36 | diff: 0.01006 | V(s0) = 3.689 V(s1) = 7.202 V(s2) = 4.110
iter 37 | diff: 0.00905 | V(s0) = 3.699 V(s1) = 7.212 V(s2) = 4.121
iter 38 | diff: 0.00815 | V(s0) = 3.708 V(s1) = 7.221 V(s2) = 4.130
iter 39 | diff: 0.00733 | V(s0) = 3.717 V(s1) = 7.230 V(s2) = 4.138
iter 40 | diff: 0.00660 | V(s0) = 3.724 V(s1) = 7.237 V(s2) = 4.145
iter 41 | diff: 0.00594 | V(s0) = 3.731 V(s1) = 7.244 V(s2) = 4.152
iter 42 | diff: 0.00534 | V(s0) = 3.736 V(s1) = 7.249 V(s2) = 4.158
iter 43 | diff: 0.00481 | V(s0) = 3.742 V(s1) = 7.255 V(s2) = 4.163
iter 44 | diff: 0.00433 | V(s0) = 3.747 V(s1) = 7.260 V(s2) = 4.168
iter 45 | diff: 0.00390 | V(s0) = 3.751 V(s1) = 7.264 V(s2) = 4.172
iter 46 | diff: 0.00351 | V(s0) = 3.755 V(s1) = 7.268 V(s2) = 4.176
iter 47 | diff: 0.00316 | V(s0) = 3.758 V(s1) = 7.271 V(s2) = 4.179
iter 48 | diff: 0.00284 | V(s0) = 3.762 V(s1) = 7.275 V(s2) = 4.183
iter 49 | diff: 0.00256 | V(s0) = 3.764 V(s1) = 7.277 V(s2) = 4.185
iter 50 | diff: 0.00230 | V(s0) = 3.767 V(s1) = 7.280 V(s2) = 4.188
iter 51 | diff: 0.00207 | V(s0) = 3.769 V(s1) = 7.282 V(s2) = 4.190
iter 52 | diff: 0.00186 | V(s0) = 3.771 V(s1) = 7.284 V(s2) = 4.192
iter 53 | diff: 0.00168 | V(s0) = 3.773 V(s1) = 7.286 V(s2) = 4.194
iter 54 | diff: 0.00151 | V(s0) = 3.775 V(s1) = 7.288 V(s2) = 4.196
iter 55 | diff: 0.00136 | V(s0) = 3.776 V(s1) = 7.289 V(s2) = 4.197
iter 56 | diff: 0.00122 | V(s0) = 3.778 V(s1) = 7.291 V(s2) = 4.199
iter 57 | diff: 0.00110 | V(s0) = 3.779 V(s1) = 7.292 V(s2) = 4.200
iter 58 | diff: 0.00099 | V(s0) = 3.780 V(s1) = 7.293 V(s2) = 4.201
Terminated
Визуализация результата#
display(plot_graph_with_state_values(mdp, state_values))
print("Final state values:", state_values)
assert abs(state_values['s0'] - 3.781) < 0.01
assert abs(state_values['s1'] - 7.294) < 0.01
assert abs(state_values['s2'] - 4.202) < 0.01
Final state values: {'s0': 3.7810348735476405, 's1': 7.294006423867229, 's2': 4.202140275227048}
Теперь используем наше приближение \(V_{\pi_*}(s)\) чтобы найти оптимальные действия:
# Напишите функцию, находящию оптимальное действие в состоянии state, используя state_values
def get_optimal_action(mdp, state_values, state, gamma=0.9):
""" Finds optimal action using formula above. """
if mdp.is_terminal(state):
return None
action_list = mdp.get_possible_actions(state)
max_action = None
q_max = -2e11
for action in action_list:
q = get_action_value(mdp, state_values, state, action, gamma)
if q > q_max:
q_max = q
max_action = action
return max_action
assert get_optimal_action(mdp, state_values, 's0', gamma) == 'a1'
assert get_optimal_action(mdp, state_values, 's1', gamma) == 'a0'
assert get_optimal_action(mdp, state_values, 's2', gamma) == 'a1'
assert get_optimal_action(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9) == 'a0', \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert get_optimal_action(mdp, {'s0': -2e10, 's1': 0, 's2': -1e10}, 's0', 0.9) == 'a1', \
"Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
display(plot_graph_optimal_strategy_and_state_values(mdp, state_values, get_action_value))
# Measure agent's average reward
s = mdp.reset()
rewards = []
for _ in range(10000):
s, r, done, _ = mdp.step(get_optimal_action(mdp, state_values, s, gamma))
rewards.append(r)
print("average reward: ", np.mean(rewards))
assert(0.40 < np.mean(rewards) < 0.55)
average reward: 0.4885