Основные определения. Value iteration#

Среда, действия, награды#

В этом разделе мы рассмотрим ключевые концепции, связанные с обучением с подкреплением (Reinforcement Learning, RL), такие как среда, действия и награды, а также основные типы сред и особенности наград.

В общем случае, среда может вести себя непредсказуемым образом, и отследить причинно-следственные связи в такой среде было бы проблематично, что значительно бы усложнило обучение (если мы не знаем, какое действие привело к хорошей награде, мы не можем понять, какие действия стоит совершать чаще).

Но мы можем сделать ряд предположений о среде, которые позволят нам эффективно ее моделировать.

Марковские среды#

На практике многие среды обладают свойством «Маркова»: следующее состояние среды зависит только от текущего состояния и действия, совершаемого в данный момент.

Рассмотрим сначала конечные, марковские среды. Они описываются:

  • Конечным множеством состояний \(s\in\mathcal{S}\)

  • Конечным множеством действий \(a\in\mathcal{A}\)

  • Переходными вероятностями \(P(s_{n+1}|s_n,a_n,s_{n-1},a_{n-1},...) = P(s_{n+1}|s_n,a_n)\), характеризующими вероятности оказаться в состоянии \(s_{n+1}\) при совершениии действия \(a_n\) в состоянии \(s_n\)

  • Функцией награды \(R(s, a, s') \to \mathbb{R}\), приписывающей действительную награду переходу из состояния \(s\) в состояние \(s'\) при совершении действия \(a\)

Зададим их явно для некоторой игрушечной среды

Hide code cell source
#@title Установка зависимостей и вспомогательные функции

#!pip install gym --quiet

# most of this code was politely stolen from https://github.com/berkeleydeeprlcourse/homework/
# all credit goes to https://github.com/abhishekunique
# (if I got the author right)
import numpy as np
from gym.utils import seeding

try:
    from graphviz import Digraph
    import graphviz
    has_graphviz = True
except ImportError:
    has_graphviz = False


class MDP:
    def __init__(self, transition_probs, rewards, initial_state=None, seed=None):
        """
        Defines an MDP. Compatible with gym Env.
        :param transition_probs: transition_probs[s][a][s_next] = P(s_next | s, a)
            A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> prob]
            For each state and action, probabilities of next states should sum to 1
            If a state has no actions available, it is considered terminal
        :param rewards: rewards[s][a][s_next] = r(s,a,s')
            A dict[state -> dict] of dicts[action -> dict] of dicts[next_state -> reward]
            The reward for anything not mentioned here is zero.
        :param get_initial_state: a state where agent starts or a callable() -> state
            By default, picks initial state at random.

        States and actions can be anything you can use as dict keys, but we recommend that you use strings or integers

        Here's an example from MDP depicted on http://bit.ly/2jrNHNr
        transition_probs = {
            's0': {
                'a0': {'s0': 0.5, 's2': 0.5},
                'a1': {'s2': 1}
            },
            's1': {
                'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
                'a1': {'s1': 0.95, 's2': 0.05}
            },
            's2': {
                'a0': {'s0': 0.4, 's2': 0.6},
                'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
            }
        }
        rewards = {
            's1': {'a0': {'s0': +5}},
            's2': {'a1': {'s0': -1}}
        }
        """
        self._check_param_consistency(transition_probs, rewards)
        self._transition_probs = transition_probs
        self._rewards = rewards
        self._initial_state = initial_state
        self.n_states = len(transition_probs)
        self.reset()
        self.np_random, _ = seeding.np_random(seed)

    def get_all_states(self):
        """ return a tuple of all possiblestates """
        return tuple(self._transition_probs.keys())

    def get_possible_actions(self, state):
        """ return a tuple of possible actions in a given state """
        return tuple(self._transition_probs.get(state, {}).keys())

    def is_terminal(self, state):
        """ return True if state is terminal or False if it isn't """
        return len(self.get_possible_actions(state)) == 0

    def get_next_states(self, state, action):
        """ return a dictionary of {next_state1 : P(next_state1 | state, action), next_state2: ...} """
        assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
        return self._transition_probs[state][action]

    def get_transition_prob(self, state, action, next_state):
        """ return P(next_state | state, action) """
        return self.get_next_states(state, action).get(next_state, 0.0)

    def get_reward(self, state, action, next_state):
        """ return the reward you get for taking action in state and landing on next_state"""
        assert action in self.get_possible_actions(state), "cannot do action %s from state %s" % (action, state)
        return self._rewards.get(state, {}).get(action, {}).get(next_state, 0.0)

    def reset(self):
        """ reset the game, return the initial state"""
        if self._initial_state is None:
            self._current_state = self.np_random.choice(
                tuple(self._transition_probs.keys()))
        elif self._initial_state in self._transition_probs:
            self._current_state = self._initial_state
        elif callable(self._initial_state):
            self._current_state = self._initial_state()
        else:
            raise ValueError(
                "initial state %s should be either a state or a function() -> state" % self._initial_state)
        return self._current_state

    def step(self, action):
        """ take action, return next_state, reward, is_done, empty_info """
        possible_states, probs = zip(*self.get_next_states(self._current_state, action).items())
        next_state = possible_states[self.np_random.choice(np.arange(len(possible_states)), p=probs)]
        reward = self.get_reward(self._current_state, action, next_state)
        is_done = self.is_terminal(next_state)
        self._current_state = next_state
        return next_state, reward, is_done, {}

    def render(self):
        print("Currently at %s" % self._current_state)

    def _check_param_consistency(self, transition_probs, rewards):
        for state in transition_probs:
            assert isinstance(transition_probs[state], dict), \
                "transition_probs for %s should be a dictionary but is instead %s" % (
                    state, type(transition_probs[state]))
            for action in transition_probs[state]:
                assert isinstance(transition_probs[state][action], dict), \
                    "transition_probs for %s, %s should be a a dictionary but is instead %s" % (
                        state, action, type(transition_probs[state][action]))
                next_state_probs = transition_probs[state][action]
                assert len(next_state_probs) != 0, "from state %s action %s leads to no next states" % (state, action)
                sum_probs = sum(next_state_probs.values())
                assert abs(sum_probs - 1) <= 1e-10, \
                    "next state probabilities for state %s action %s add up to %f (should be 1)" % (
                        state, action, sum_probs)
        for state in rewards:
            assert isinstance(rewards[state], dict), \
                "rewards for %s should be a dictionary but is instead %s" % (
                    state, type(rewards[state]))
            for action in rewards[state]:
                assert isinstance(rewards[state][action], dict), \
                    "rewards for %s, %s should be a a dictionary but is instead %s" % (
                        state, action, type(rewards[state][action]))
        msg = "The Enrichment Center once again reminds you that Android Hell is a real place where" \
              " you will be sent at the first sign of defiance."
        assert None not in transition_probs, "please do not use None as a state identifier. " + msg
        assert None not in rewards, "please do not use None as an action identifier. " + msg


class FrozenLakeEnv(MDP):
    """
    Winter is here. You and your friends were tossing around a frisbee at the park
    when you made a wild throw that left the frisbee out in the middle of the lake.
    The water is mostly frozen, but there are a few holes where the ice has melted.
    If you step into one of those holes, you'll fall into the freezing water.
    At this time, there's an international frisbee shortage, so it's absolutely imperative that
    you navigate across the lake and retrieve the disc.
    However, the ice is slippery, so you won't always move in the direction you intend.
    The surface is described using a grid like the following

        SFFF
        FHFH
        FFFH
        HFFG

    S : starting point, safe
    F : frozen surface, safe
    H : hole, fall to your doom
    G : goal, where the frisbee is located

    The episode ends when you reach the goal or fall in a hole.
    You receive a reward of 1 if you reach the goal, and zero otherwise.

    """

    MAPS = {
        "4x4": [
            "SFFF",
            "FHFH",
            "FFFH",
            "HFFG"
        ],
        "8x8": [
            "SFFFFFFF",
            "FFFFFFFF",
            "FFFHFFFF",
            "FFFFFHFF",
            "FFFHFFFF",
            "FHHFFFHF",
            "FHFFHFHF",
            "FFFHFFFG"
        ],
    }

    def __init__(self, desc=None, map_name="4x4", slip_chance=0.2, seed=None):
        if desc is None and map_name is None:
            raise ValueError('Must provide either desc or map_name')
        elif desc is None:
            desc = self.MAPS[map_name]
        assert ''.join(desc).count(
            'S') == 1, "this implementation supports having exactly one initial state"
        assert all(c in "SFHG" for c in
                   ''.join(desc)), "all cells must be either of S, F, H or G"

        self.desc = desc = np.asarray(list(map(list, desc)), dtype='str')
        self.lastaction = None

        nrow, ncol = desc.shape
        states = [(i, j) for i in range(nrow) for j in range(ncol)]
        actions = ["left", "down", "right", "up"]

        initial_state = states[np.array(desc == b'S').ravel().argmax()]

        def move(row, col, movement):
            if movement == 'left':
                col = max(col - 1, 0)
            elif movement == 'down':
                row = min(row + 1, nrow - 1)
            elif movement == 'right':
                col = min(col + 1, ncol - 1)
            elif movement == 'up':
                row = max(row - 1, 0)
            else:
                raise ("invalid action")
            return (row, col)

        transition_probs = {s: {} for s in states}
        rewards = {s: {} for s in states}
        for (row, col) in states:
            if desc[row, col] in "GH":
                continue
            for action_i in range(len(actions)):
                action = actions[action_i]
                transition_probs[(row, col)][action] = {}
                rewards[(row, col)][action] = {}
                for movement_i in [(action_i - 1) % len(actions), action_i,
                                   (action_i + 1) % len(actions)]:
                    movement = actions[movement_i]
                    newrow, newcol = move(row, col, movement)
                    prob = (1. - slip_chance) if movement == action else (
                        slip_chance / 2.)
                    if prob == 0:
                        continue
                    if (newrow, newcol) not in transition_probs[row, col][
                            action]:
                        transition_probs[row, col][action][
                            newrow, newcol] = prob
                    else:
                        transition_probs[row, col][action][
                            newrow, newcol] += prob
                    if desc[newrow, newcol] == 'G':
                        rewards[row, col][action][newrow, newcol] = 1.0

        MDP.__init__(self, transition_probs, rewards, initial_state, seed)

    def render(self):
        desc_copy = np.copy(self.desc)
        desc_copy[self._current_state] = '*'
        print('\n'.join(map(''.join, desc_copy)), end='\n\n')


def plot_graph(mdp, graph_size='20,20', s_node_size='1,5',
               a_node_size='0,5', rankdir='LR', ):
    """
    Function for pretty drawing MDP graph with graphviz library.
    Requirements:
    graphviz : https://www.graphviz.org/
    for ubuntu users: sudo apt-get install graphviz
    python library for graphviz
    for pip users: pip install graphviz
    :param mdp:
    :param graph_size: size of graph plot
    :param s_node_size: size of state nodes
    :param a_node_size: size of action nodes
    :param rankdir: order for drawing
    :return: dot object
    """
    s_node_attrs = {'shape': 'doublecircle',
                    'color': '#85ff75',
                    'style': 'filled',
                    'width': str(s_node_size),
                    'height': str(s_node_size),
                    'fontname': 'Arial',
                    'fontsize': '24'}

    a_node_attrs = {'shape': 'circle',
                    'color': 'lightpink',
                    'style': 'filled',
                    'width': str(a_node_size),
                    'height': str(a_node_size),
                    'fontname': 'Arial',
                    'fontsize': '20'}

    s_a_edge_attrs = {'style': 'bold',
                      'color': 'red',
                      'ratio': 'auto'}

    a_s_edge_attrs = {'style': 'dashed',
                      'color': 'blue',
                      'ratio': 'auto',
                      'fontname': 'Arial',
                      'fontsize': '16'}

    graph = Digraph(name='MDP')
    graph.attr(rankdir=rankdir, size=graph_size)
    for state_node in mdp._transition_probs:
        graph.node(state_node, **s_node_attrs)

        for posible_action in mdp.get_possible_actions(state_node):
            action_node = state_node + "-" + posible_action
            graph.node(action_node,
                       label=str(posible_action),
                       **a_node_attrs)
            graph.edge(state_node, state_node + "-" +
                       posible_action, **s_a_edge_attrs)

            for posible_next_state in mdp.get_next_states(state_node,
                                                          posible_action):
                probability = mdp.get_transition_prob(
                    state_node, posible_action, posible_next_state)
                reward = mdp.get_reward(
                    state_node, posible_action, posible_next_state)

                if reward != 0:
                    label_a_s_edge = 'p = ' + str(probability) + \
                                     '  ' + 'reward =' + str(reward)
                else:
                    label_a_s_edge = 'p = ' + str(probability)

                graph.edge(action_node, posible_next_state,
                           label=label_a_s_edge, **a_s_edge_attrs)
    return graph


def plot_graph_with_state_values(mdp, state_values):
    """ Plot graph with state values"""
    graph = plot_graph(mdp)
    for state_node in mdp._transition_probs:
        value = state_values[state_node]
        graph.node(state_node, label=str(state_node) + '\n' + 'V =' + str(value)[:4])
    return graph


def get_optimal_action_for_plot(mdp, state_values, state, get_action_value, gamma=0.9):
    """ Finds optimal action using formula above. """
    if mdp.is_terminal(state):
        return None
    next_actions = mdp.get_possible_actions(state)
    q_values = [get_action_value(mdp, state_values, state, action, gamma) for action in next_actions]
    optimal_action = next_actions[np.argmax(q_values)]
    return optimal_action


def plot_graph_optimal_strategy_and_state_values(mdp, state_values, get_action_value, gamma=0.9):
    """ Plot graph with state values and """
    graph = plot_graph(mdp)
    opt_s_a_edge_attrs = {'style': 'bold',
                          'color': 'green',
                          'ratio': 'auto',
                          'penwidth': '6'}

    for state_node in mdp._transition_probs:
        value = state_values[state_node]
        graph.node(state_node, label=str(state_node) + '\n' + 'V =' + str(value)[:4])
        for action in mdp.get_possible_actions(state_node):
            if action == get_optimal_action_for_plot(mdp, state_values, state_node, get_action_value, gamma):
                graph.edge(state_node, state_node + "-" + action, **opt_s_a_edge_attrs)
    return graph
# Множество состояний:
#   Три состояния
states = ['s0', 's1', 's2']

# Вероятности переходов:
#   transition_probs[s][a][s'] задает P(s'|s,a)
transition_probs = {
    's0': {
        'a0': {'s0': 0.5, 's2': 0.5},
        'a1': {'s2': 1}
    },
    's1': {
        'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
        'a1': {'s1': 0.95, 's2': 0.05}
    },
    's2': {
        'a0': {'s0': 0.4, 's2': 0.6},
        'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
    }
}


# Награды:
#   rewards[s][a][s'] задает награду R(s, a, s')
rewards = {
    's1': {'a0': {'s0': +5}},
    's2': {'a1': {'s0': -1}}
}

# Обертка для упрощения работы с этими словарями (смотрите, какие есть методы)
mdp = MDP(transition_probs, rewards, initial_state='s0')
from IPython.display import display

display(plot_graph(mdp))
../../_images/3999c8c95a3984054c40ca95e06d9b775d10661c9981fc037ea585bb2abfd294.svg

Принятие решений#

Политикой (policy) называется функция \(\pi(s)\to a\), которая каждому состоянию среды \(s\in\mathcal{S}\) сопостовляет действие \(a\in \mathcal{A}\). В этом семинаре мы ограничимся рассмотрением детерминированных политик.

В общем случае, решение MDP сводится к нахождению политики, максимизирующей нагрду в среде. Подробнее об этом далее.

Награда#

Reward hypothesis (R.Sutton)

Goals and purposes can be thought of as the maximization of the expected value of the cumulative sum of a received scalar signal

Кумулятивная награда (cumulative reward, return) в момент \(t\) записывается как сумма наград, полученных после \(t\):

\[G_t \overset{\Delta}{=} R_t + R_{t+1} + R_{t+2} + \dots + R_T\]

, где \(T\) - время окончания сессии взаимодействия со средой.

Но есть две причины, почему мы не можем оптимизировать непосредственно \(G\):

  • Если взаимодействие со средой идет бесконечно (\(T=\infty\)), то \(G\) - расходящийся ряд

  • Награда в момент \(t\) одинаково влияет на \(G_t\) и \(G_{t-100}\). Поэтому из большого значения \(G_t\) не следует, что действия принимаемые в момент \(t\) были оптимальны.

Решение: дисконтированная награда

\[G_t \overset{\Delta}{=} R_t + {\color{red} \gamma} R_{t+1} + {\color{red} \gamma^2} R_{t+2} + \dots + {\color{red} \gamma^{T-t}}R_T\]

Плюсы:

  • Ряд сходится абсолютно

  • Ближайшие награды входят с бо’льшим весом

  • (Важно!) \(G_t = R_t + \gamma G_{t+1}\)

В дальнейшем, мы будет заниматься именно оптимизацией кумулятивной дисконтированной награды.

Стохастичность

Так как среда недетерминированная, одним и тем же действиям будут соответствовать различные состояния и награды, а значит \(G\) - случайная величина.

Возьмем от нее матожидание: $\( \mathbb{E}_{\pi}G = \mathbb{E}_{s_1|s_0,a_0=\pi(s_0)}[R_0 + \mathbb{E}_{s_2|s_1,a_1=\pi(s_1)}[\gamma R_1 + \dots]] \)$

Видим, что помимо свойств среды и политики, \(\mathbb{E}_{\pi}G\) зависит только от начального состояния. Введем функцию $\( \mathcal{V}_\pi(s) \overset{\Delta}{=} \mathbb{E}_{\pi}[G|s_0=s] = \sum\limits_{s'}p(s'|s,a=\pi(s))[r(s, a=\pi(s), s') + \gamma \mathbb{E}_{\pi, s_0 = s'}G] = \sum\limits_{ s'}p(s'|s,a=\pi(s))[r(s, a=\pi(s), s') + \gamma \mathcal{V}_\pi(s')] \)\( , где \)p$ - вероятности перехода для среды

\(\mathcal{V}_\pi(s)\) - матожидание кумулятивной дисконтированной награды для агента с политикой \(\pi\), находящегося в состоянии \(s\)

Введем дополнительную функцию $\( q_\pi(s, a) = \mathbb{E}_{\pi}[G|s_0=s,a_0=a] = \sum\limits_{s'}p(s'|s,a)[r(s, a, s') + \gamma \mathcal{V}_\pi(s')] \)$

\(q_\pi(s, a)\) - матожидание кумулятивной дисконтированной награды для агента, находящегося в состоянии \(s\), соверщающего действие \(a\), а потом действующего по политике \(\pi\)

# Напишите код, вычисляющий Q по V и среде
def get_action_value(mdp, state_values, state, action, gamma):
    """ Computes Q(s,a) as in formula above
        :param mdp: Среда
            Используйте методы mdp.get_next_states(state, action)
                             и mdp.get_reward(state, action, next_state)
        :param state_values: V
            dict[state -> float]
        :param action: a
        :param gamma: gamma
    """
    Q=0
    s_dict = mdp.get_next_states(state, action)
    s_new_list = s_dict.keys()
    for s in s_new_list:
      Q += s_dict[s] * (mdp.get_reward(state, action, s) + gamma * state_values[s])


    return Q
import numpy as np
test_Vs = {s: i for i, s in enumerate(sorted(mdp.get_all_states()))}
assert np.isclose(get_action_value(mdp, test_Vs, 's2', 'a1', 0.9), 0.69)
assert np.isclose(get_action_value(mdp, test_Vs, 's1', 'a0', 0.9), 3.95)

Value Iteration#

Уравнения Бэллмана#

Оптимальная политика

Введем понятие оптимальной политики \(\pi_*\) - такой политики, что \(\forall \pi, \forall s\in \mathcal{S}: \mathcal{V}_{\pi_*} (s) \ge \mathcal{V}_\pi\)(s)

Для оптимальной политики выполняется:

\[ \mathcal{V}_{\pi_*} (s) = max_{a\in \mathcal{A}} [q_{\pi_*}(s, a)] \]

, так как принимаемое действие всегда максимизирует матожидание кумулятивной дисконтированной награды. То есть политику \(\pi_*\) можно записать как:

\[ \pi_*(s) = argmax_{a\in \mathcal{A}} [q_{\pi_*}(s, a)] = argmax_{a\in \mathcal{A}}[\sum\limits_{s'}p(s'|s,a)[r(s, a, s') + \gamma \mathcal{V}_\pi(s')]] = argmax_{a\in \mathcal{A}}[q_{\pi_*}(s, a)] \]

Если бы нам были известны \(\mathcal{V}_{\pi_*}(s)\) для оптимальной политики, то мы могли бы легко восстановить оптимальную политику. Но они нам неизвестны.

Мы должны их выучить.

Алгоритм#

Оператор Бэллмана

Пусть у нас есть некоторые предполагаемые значения \(\mathcal{V}_{\pi_*} = V(s) \to \mathcal{R}\). Введем на них оператор $\([\tau V](s) = \max_a \mathbb{E}_{s'|s,a}[r(s, a, s') + \gamma V(s')]\)$

, то есть мы записываем в \(V(s)\) новое предполагаемое значение, равное ожиданию \(V\), совершая действие, основываясь на предполагаемых значениях \(V\).

У этого отображения есть два полезных свойства:

  • \(\tau \mathcal{V}_{\pi_*} = \mathcal{V}_{\pi_*}\), как мы показали выше

  • \(\tau\) - сжимающее отображение, по метрике \(d(v, u) = \max_s|v(s) - u(s)|\)

Значит \(\mathcal{V}_{\pi_*}\) - стационарная точка сжимающего отображения. Может быть получена методом итерации.

Эту операцию можно записать как:

\[V_{(i+1)}(s) = \max_a \sum_{s'} P(s' | s,a) \cdot [ r(s,a,s') + \gamma V_{i}(s')] = \max_a Q_i(s,a)\]

Имплементация алгоритма#

# Напишите код, вычисляющий [\tau V](s) с помощью среды, V, s и gamma
# Используйте mdp.get_possible_actions(state) для нахождения множества s' и уже написанную вами функцию get_action_value для вычисления Q

def get_new_state_value(mdp, state_values, state, gamma):
    """ Computes next V(s) as in formula above. Please do not change state_values in process. """
    if mdp.is_terminal(state):
        return 0

    action_list = mdp.get_possible_actions(state)

    return max([get_action_value(mdp, state_values, state, action, gamma) for action in action_list])
test_Vs_copy = dict(test_Vs)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's0', 0.9), 1.8)
assert np.isclose(get_new_state_value(mdp, test_Vs, 's2', 0.9), 1.08)
assert np.isclose(get_new_state_value(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9), -13500000000.0), \
    "Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert test_Vs == test_Vs_copy, "Please do not change state_values in get_new_state_value"

Решение простой задачи#

Теперь соберем готовый итерационный алгоритм

# parameters
gamma = 0.9            # discount for MDP
num_iter = 100         # maximum iterations, excluding initialization
# stop VI if new values are this close to old values (or closer)
min_difference = 0.001

# initialize V(s)
state_values = {s: 0 for s in mdp.get_all_states()}

if has_graphviz:
    display(plot_graph_with_state_values(mdp, state_values))

for i in range(num_iter):

    # Compute new state values using the functions you defined above.
    # It must be a dict {state : float V_new(state)}
    new_state_values = {s: get_new_state_value(mdp, state_values, s, gamma) for s in mdp.get_all_states() }

    assert isinstance(new_state_values, dict)

    # Compute difference
    diff = max(abs(new_state_values[s] - state_values[s])
               for s in mdp.get_all_states())
    print("iter %4i   |   diff: %6.5f   |   " % (i, diff), end="")
    print('   '.join("V(%s) = %.3f" % (s, v) for s, v in state_values.items()))
    state_values = new_state_values

    if diff < min_difference:
        print("Terminated")
        break
../../_images/96e79352894ae5f106921d26a51b7a8385a34ebb151b4c645747312f83753ffa.svg
iter    0   |   diff: 3.50000   |   V(s0) = 0.000   V(s1) = 0.000   V(s2) = 0.000
iter    1   |   diff: 0.64500   |   V(s0) = 0.000   V(s1) = 3.500   V(s2) = 0.000
iter    2   |   diff: 0.58050   |   V(s0) = 0.000   V(s1) = 3.815   V(s2) = 0.645
iter    3   |   diff: 0.43582   |   V(s0) = 0.581   V(s1) = 3.959   V(s2) = 0.962
iter    4   |   diff: 0.30634   |   V(s0) = 0.866   V(s1) = 4.395   V(s2) = 1.272
iter    5   |   diff: 0.27571   |   V(s0) = 1.145   V(s1) = 4.670   V(s2) = 1.579
iter    6   |   diff: 0.24347   |   V(s0) = 1.421   V(s1) = 4.926   V(s2) = 1.838
iter    7   |   diff: 0.21419   |   V(s0) = 1.655   V(s1) = 5.169   V(s2) = 2.075
iter    8   |   diff: 0.19277   |   V(s0) = 1.868   V(s1) = 5.381   V(s2) = 2.290
iter    9   |   diff: 0.17327   |   V(s0) = 2.061   V(s1) = 5.573   V(s2) = 2.481
iter   10   |   diff: 0.15569   |   V(s0) = 2.233   V(s1) = 5.746   V(s2) = 2.654
iter   11   |   diff: 0.14012   |   V(s0) = 2.389   V(s1) = 5.902   V(s2) = 2.810
iter   12   |   diff: 0.12610   |   V(s0) = 2.529   V(s1) = 6.042   V(s2) = 2.950
iter   13   |   diff: 0.11348   |   V(s0) = 2.655   V(s1) = 6.168   V(s2) = 3.076
iter   14   |   diff: 0.10213   |   V(s0) = 2.769   V(s1) = 6.282   V(s2) = 3.190
iter   15   |   diff: 0.09192   |   V(s0) = 2.871   V(s1) = 6.384   V(s2) = 3.292
iter   16   |   diff: 0.08272   |   V(s0) = 2.963   V(s1) = 6.476   V(s2) = 3.384
iter   17   |   diff: 0.07445   |   V(s0) = 3.045   V(s1) = 6.558   V(s2) = 3.467
iter   18   |   diff: 0.06701   |   V(s0) = 3.120   V(s1) = 6.633   V(s2) = 3.541
iter   19   |   diff: 0.06031   |   V(s0) = 3.187   V(s1) = 6.700   V(s2) = 3.608
iter   20   |   diff: 0.05428   |   V(s0) = 3.247   V(s1) = 6.760   V(s2) = 3.668
iter   21   |   diff: 0.04885   |   V(s0) = 3.301   V(s1) = 6.814   V(s2) = 3.723
iter   22   |   diff: 0.04396   |   V(s0) = 3.350   V(s1) = 6.863   V(s2) = 3.771
iter   23   |   diff: 0.03957   |   V(s0) = 3.394   V(s1) = 6.907   V(s2) = 3.815
iter   24   |   diff: 0.03561   |   V(s0) = 3.434   V(s1) = 6.947   V(s2) = 3.855
iter   25   |   diff: 0.03205   |   V(s0) = 3.469   V(s1) = 6.982   V(s2) = 3.891
iter   26   |   diff: 0.02884   |   V(s0) = 3.502   V(s1) = 7.014   V(s2) = 3.923
iter   27   |   diff: 0.02596   |   V(s0) = 3.530   V(s1) = 7.043   V(s2) = 3.951
iter   28   |   diff: 0.02336   |   V(s0) = 3.556   V(s1) = 7.069   V(s2) = 3.977
iter   29   |   diff: 0.02103   |   V(s0) = 3.580   V(s1) = 7.093   V(s2) = 4.001
iter   30   |   diff: 0.01892   |   V(s0) = 3.601   V(s1) = 7.114   V(s2) = 4.022
iter   31   |   diff: 0.01703   |   V(s0) = 3.620   V(s1) = 7.133   V(s2) = 4.041
iter   32   |   diff: 0.01533   |   V(s0) = 3.637   V(s1) = 7.150   V(s2) = 4.058
iter   33   |   diff: 0.01380   |   V(s0) = 3.652   V(s1) = 7.165   V(s2) = 4.073
iter   34   |   diff: 0.01242   |   V(s0) = 3.666   V(s1) = 7.179   V(s2) = 4.087
iter   35   |   diff: 0.01117   |   V(s0) = 3.678   V(s1) = 7.191   V(s2) = 4.099
iter   36   |   diff: 0.01006   |   V(s0) = 3.689   V(s1) = 7.202   V(s2) = 4.110
iter   37   |   diff: 0.00905   |   V(s0) = 3.699   V(s1) = 7.212   V(s2) = 4.121
iter   38   |   diff: 0.00815   |   V(s0) = 3.708   V(s1) = 7.221   V(s2) = 4.130
iter   39   |   diff: 0.00733   |   V(s0) = 3.717   V(s1) = 7.230   V(s2) = 4.138
iter   40   |   diff: 0.00660   |   V(s0) = 3.724   V(s1) = 7.237   V(s2) = 4.145
iter   41   |   diff: 0.00594   |   V(s0) = 3.731   V(s1) = 7.244   V(s2) = 4.152
iter   42   |   diff: 0.00534   |   V(s0) = 3.736   V(s1) = 7.249   V(s2) = 4.158
iter   43   |   diff: 0.00481   |   V(s0) = 3.742   V(s1) = 7.255   V(s2) = 4.163
iter   44   |   diff: 0.00433   |   V(s0) = 3.747   V(s1) = 7.260   V(s2) = 4.168
iter   45   |   diff: 0.00390   |   V(s0) = 3.751   V(s1) = 7.264   V(s2) = 4.172
iter   46   |   diff: 0.00351   |   V(s0) = 3.755   V(s1) = 7.268   V(s2) = 4.176
iter   47   |   diff: 0.00316   |   V(s0) = 3.758   V(s1) = 7.271   V(s2) = 4.179
iter   48   |   diff: 0.00284   |   V(s0) = 3.762   V(s1) = 7.275   V(s2) = 4.183
iter   49   |   diff: 0.00256   |   V(s0) = 3.764   V(s1) = 7.277   V(s2) = 4.185
iter   50   |   diff: 0.00230   |   V(s0) = 3.767   V(s1) = 7.280   V(s2) = 4.188
iter   51   |   diff: 0.00207   |   V(s0) = 3.769   V(s1) = 7.282   V(s2) = 4.190
iter   52   |   diff: 0.00186   |   V(s0) = 3.771   V(s1) = 7.284   V(s2) = 4.192
iter   53   |   diff: 0.00168   |   V(s0) = 3.773   V(s1) = 7.286   V(s2) = 4.194
iter   54   |   diff: 0.00151   |   V(s0) = 3.775   V(s1) = 7.288   V(s2) = 4.196
iter   55   |   diff: 0.00136   |   V(s0) = 3.776   V(s1) = 7.289   V(s2) = 4.197
iter   56   |   diff: 0.00122   |   V(s0) = 3.778   V(s1) = 7.291   V(s2) = 4.199
iter   57   |   diff: 0.00110   |   V(s0) = 3.779   V(s1) = 7.292   V(s2) = 4.200
iter   58   |   diff: 0.00099   |   V(s0) = 3.780   V(s1) = 7.293   V(s2) = 4.201
Terminated

Визуализация результата#

display(plot_graph_with_state_values(mdp, state_values))
../../_images/97a165571631178d8290ee5295dc2e0fcc70feac7d246d68e26296a398d46e1a.svg
print("Final state values:", state_values)

assert abs(state_values['s0'] - 3.781) < 0.01
assert abs(state_values['s1'] - 7.294) < 0.01
assert abs(state_values['s2'] - 4.202) < 0.01
Final state values: {'s0': 3.7810348735476405, 's1': 7.294006423867229, 's2': 4.202140275227048}

Теперь используем наше приближение \(V_{\pi_*}(s)\) чтобы найти оптимальные действия:

\[ \pi_*(s) = argmax_{a\in \mathcal{A}} [q_{\pi_*}(s, a)] = argmax_{a\in \mathcal{A}}[\sum\limits_{r, s'}p(r,s'|s,a)[r + \gamma \mathcal{V}_\pi(s')]] = argmax_{a\in \mathcal{A}}[q_{\pi_*}(s, a)] \]
# Напишите функцию, находящию оптимальное действие в состоянии state, используя state_values

def get_optimal_action(mdp, state_values, state, gamma=0.9):
    """ Finds optimal action using formula above. """
    if mdp.is_terminal(state):
        return None

    action_list = mdp.get_possible_actions(state)
    max_action = None
    q_max = -2e11
    for action in action_list:
        q = get_action_value(mdp, state_values, state, action, gamma)
        if q > q_max:
          q_max = q
          max_action = action

    return max_action
assert get_optimal_action(mdp, state_values, 's0', gamma) == 'a1'
assert get_optimal_action(mdp, state_values, 's1', gamma) == 'a0'
assert get_optimal_action(mdp, state_values, 's2', gamma) == 'a1'

assert get_optimal_action(mdp, {'s0': -1e10, 's1': 0, 's2': -2e10}, 's0', 0.9) == 'a0', \
    "Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
assert get_optimal_action(mdp, {'s0': -2e10, 's1': 0, 's2': -1e10}, 's0', 0.9) == 'a1', \
    "Please ensure that you handle negative Q-values of arbitrary magnitude correctly"
display(plot_graph_optimal_strategy_and_state_values(mdp, state_values, get_action_value))
../../_images/6718f177453d709bcf965704f325921a3588f8d8ff1dbc6ddd4a7816a899caaa.svg
# Measure agent's average reward

s = mdp.reset()
rewards = []
for _ in range(10000):
    s, r, done, _ = mdp.step(get_optimal_action(mdp, state_values, s, gamma))
    rewards.append(r)

print("average reward: ", np.mean(rewards))

assert(0.40 < np.mean(rewards) < 0.55)
average reward:  0.4885