Kei Minagawa's Blog

皆川圭(@keimina)のブログ、Pythonで試したことを書いていきます

gymモジュール Pendulum-v0 で、振り子を頂上で止まらせる

gymモジュール Pendulum-v0 で、振り子を頂上で止まらせる

今回は gym モジュールの Pendulum-v0 の環境において、強化学習により振り子を頂上でとまらせます。

1 前回の記事

前回の記事は以下の通りです。本記事を読むにあたり参考になるかもしれません。

1.1 gymモジュール Pendulum-v0 で振り子を等速度で回転させる

1.2 gymモジュール Pendulum-v0 で振り子を揺らす

1.3 gymモジュール Pendulum-v0 で振り子をトルクを与えて静止させる

2 勉強会で発表したスライド

勉強会でも発表しています。資料は以下におきました。

speakerdeck.com

3 今回行うこと

強化学習により振り子を頂上でとまらせます。

4 解決方法

今回は、強化学習の枠組みで、振り子が「頂上で止まる」ようにします。本章では、実装したコードの部分的な説明も行います。コード全体は コード全体 を参照してください。

4.1 報酬の定義

ここでは、報酬を \(\cos{\theta}\) のように定義します。頂上に近ければ高い報酬を、遠ければ低い報酬を与えて強化学習を行えば頂上で止まるようにしています。また、運動エネルギーが少ないほど高い報酬を与えるよう、\(\dot \theta^2\)で割り算しています。理論上は \(\cos{\theta}\) でも良いと思いますが、何度か試したところ、\(\cos{\theta}\)を3乗し、さらに運動エネルギーを考慮した報酬の式の方が頂上でとまることが多かったです。これが効率良い報酬の式だとは思いませんし、物理学的に頂上でとまるのに適した報酬の式があると思います。

表1: 報酬の定義
報酬の定義
\(r=\frac{\cos{\theta}^3}{\dot{\theta}^2}\)


表2: 振り子の位置と角度
振り子の位置 \(\theta\) \(\cos{\theta}\)
一番上 \(0^\circ\) 1
中間 \(90^\circ\) 0
一番下 \(180^\circ\) -1

4.2 状態遷移確率、方策の表の作成

状態遷移確率、方策の表の作成について説明します。

4.2.1 表の作成

  1. 理論

    状態遷移確率を求めます。振り子の状態\(s\)に行動としてトルク τ を与えると状態が\(s'\)になります。今回は以下のような表形式で、状態\(s\)の時にトルク τ を与えた時の次の状態\(s'\)と\(r\)(報酬)を表すこととします。

    表3: 状態、トルク、次の状態、報酬
    \(s\) \(a\) \(r\) \(s'\)
    (\(\theta\), \(\dot \theta\)) τ \(\frac{\cos{\theta}^3}{\dot{\theta}^2}\) (\(\theta'\), \(\dot \theta'\))

    パラメータのとりうる値の範囲は以下になります。

    表4: パラメータのとりうる値の範囲
    \(\theta\) \(\dot \theta\) τ \(r(=\frac{\cos{\theta}^3}{\dot{\theta}^2})\) \(\theta'\) \(\dot \theta'\)
    \([0^\circ,360^\circ)\) \([-8, 8]\) \([-2, 2]\) \([-\infty, \infty]\) \([0^\circ, 360^\circ°)\) \([-8, 8]\)

    \(\dot \theta\) や τ のパラメータの上限、下限値は、 gym モジュールの Pendulum-v0 の環境でデフォルトで設定されている値です。これらのパラメータのとりうる値の範囲の詳細については以下を参照してください。

    github.com

    報酬 \(r(=\frac{\cos{\theta}^3}{\dot{\theta}^2})\)については、\(\dot \theta\) が 0 を取りうるため、計算すると \(\infty\) になる可能性があります。実際のプログラムでは \(\infty\) にならないように微小な値を分母に加算しています。

    上記をもとに\(s\) , \(a\) , \(r\) , \(s'\)の表(状態遷移確率)を作成します。パラメータの値は連続の値ですが、今回はこれを扱い易くするため離散化します。

    振り子の運動は物理的な現象ですので初期状態が同じなら必ず同じ結果になります。(\(\theta\), \(\dot \theta\), τ ) で行動すれば確率100%で一つの状態(\(\theta'\), \(\dot \theta'\))に遷移します。しかし、今回は離散化するため、そうならない時もありますので注意が必要です。

  2. 実装

    以下のコードにより、状態遷移確率、方策の表を作成します。

    • コード
    import numpy as np
    import pandas as pd
    
    ############################
    # パラメータの表を作成する #
    ############################
    from itertools import product
    
    class N():
        theta = 2
        theta_dot = 2
        tau = 2
        actions = tau
    
    ##################################
    # 表に使用するインデックスの定義 #
    ##################################
    # theta, tau, theta_dot の範囲を定義
    bin_theta = np.linspace(0, 360, N.theta + 1)
    bin_theta_dot = np.linspace(-8, 8, N.theta_dot + 1)
    bin_tau = np.linspace(-2, 2, N.tau + 1)
    # 角度、速度、トルク を離散化(pd.IntervalIndex.from_breaks を使用)
    theta_index = pd.IntervalIndex.from_breaks(bin_theta)
    theta_dot_index = pd.IntervalIndex.from_breaks(bin_theta_dot)
    tau_index = pd.IntervalIndex.from_breaks(bin_tau)
    # インデックスの作成
    index_0 = pd.MultiIndex.from_product(
        [theta_index, theta_dot_index, tau_index],
        names=["theta_cat", "theta_dot_cat", "tau_cat"])
    index_1 = pd.MultiIndex.from_product([theta_index, theta_dot_index],
                                         names=["theta_2_cat", "theta_dot_2_cat"])
    
    ############
    # P の作成 #
    ############
    # 「角度、速度、トルク」 - 「次の角度、次の速度」の表を作る
    P = pd.DataFrame(0, index=index_0, columns=index_1)
    
    ##################
    # 方策の表を作成 #
    ##################
    policy = pd.Series(1 / N.actions, index=index_0)
    
    ########
    # 表示 #
    ########
    print("状態遷移確率の表")
    print(P)
    print("方策の表")
    print(policy)
    
  3. 表のサンプル

    コードを実行すると以下のような状態遷移確率の表(P) と 方策の表(policy) が得られます。実際に学習に使用する表は大きすぎるため、サイズを小さくして表示しています。

    • 状態遷移確率の表(P)
    表5: 状態遷移確率(P)
    \(\theta'\)     (0.0, 180.0]   (180.0, 360.0]  
    \(\dot \theta'\)     (-8.0, 0.0] (0.0, 8.0] (-8.0, 0.0] (0.0, 8.0]
    \(\theta\) \(\dot \theta\) τ        
    (0.0, 180.0] (-8.0, 0.0] (-2.0, 0.0] 0 0 0 0
        (0.0, 2.0] 0 0 0 0
      (0.0, 8.0] (-2.0, 0.0] 0 0 0 0
        (0.0, 2.0] 0 0 0 0
    (180.0, 360.0] (-8.0, 0.0] (-2.0, 0.0] 0 0 0 0
        (0.0, 2.0] 0 0 0 0
      (0.0, 8.0] (-2.0, 0.0] 0 0 0 0
        (0.0, 2.0] 0 0 0 0
    • 方策の表(policy)
    表6: 方策の表(policy)
    \(\theta\) \(\dot \theta\) τ Prob
    (0.0, 180.0] (-8.0, 0.0] (-2.0, 0.0] 0.5
        (0.0, 2.0] 0.5
      (0.0, 8.0] (-2.0, 0.0] 0.5
        (0.0, 2.0] 0.5
    (180.0, 360.0] (-8.0, 0.0] (-2.0, 0.0] 0.5
        (0.0, 2.0] 0.5
      (0.0, 8.0] (-2.0, 0.0] 0.5
        (0.0, 2.0] 0.5

    状態遷移確率の実際の値についてはシミュレータを動かすまでわからないのですが、ここでは、その値を求めるのは後回しにし、ひとまず 0 にしています。方策の表については、どの方策が最適化わからないので、全ての行動をまんべんなく行うよう(上の表で Prob の値を 0.5 にしています)にしています。このような方策の表の値を強化学習アルゴリズムにより報酬が最大になるように調整します。

4.2.2 状態遷移確率の実際の値を求める

表の作成 の 表5で、状態遷移確率の表を作成しましたが、値を全て 0 にしていました。ここでは、実際にシミュレータを動かして実際の値を求めます。

  1. 考え方

    (\(\theta\), \(\dot \theta\), τ ) のそれぞれの要素は実数です。今回は、これを扱い易くするため、離散化します。以下のような、ビンの数で分割して、それぞれのビンの中間の値を使用することとします。全部で25*50*25通りのパターンになります。一つ一つのパターンについて、シミュレータに(\(\theta\), \(\dot \theta\), τ )を与え、(\(\theta'\), \(\dot \theta'\))を得ます。このときの、(\(\theta\), \(\dot \theta\), τ ) と (\(\theta'\), \(\dot \theta'\)) の値を、 状態遷移確率Pの表にキーとして与え、そのバリューを +1 します。最後に、 P の各行を25(行動数) で割り、確率にすれば、実際のPが得られます。

    表7: (\(\theta\), \(\dot \theta\), τ ) の分割数
      分割数
    \(\theta\) 25
    \(\dot \theta\) 50
    τ 25
  2. コード

    以下のコードにより、状態遷移確率(P) の実際の値を求めています。

    ########################
    # シミュレータを動かす #
    ########################
    ###############
    # gymの初期化 #
    ###############
    # 環境の作成
    env = gym.make('Pendulum-v0') 
    # 環境の初期化(明示的に行う必要がある)
    env.reset()  
    for theta_cat, theta_dot_cat, tau_cat in Psas.index:
        # 振り子の theta, speed を設定
        # ビンの中間の値を取得するため Interval オブジェクトの mid プロパティを使用
        env.env.state = np.deg2rad(theta_cat.mid), theta_dot_cat.mid
        obser, r, done, info = env.step([tau_cat.mid])
        theta_2, theta_dot_2 = env.env.state
        # デフォルトで rad なので degree に変換
        theta_2 = np.rad2deg(theta_2)
        theta_2 = f(theta_2)
        # カウントを +1 する
        Psas.loc[(theta_cat, theta_dot_cat, tau_cat), (theta_2, theta_dot_2)] += 1
    
    # Psas 確率に変換
    Psas = Psas / Psas.sum(axis=1).values.reshape(-1,1)
    

4.2.3 方策の初期化

以下のコードにより、方策(PI) を初期化しています。

# 方策の初期化
PIas =  pd.DataFrame(1/N.actions, index=index_0, columns=["PI_tau_Prob"])

4.3 学習

学習について説明します。

4.3.1 理論

学習はベルマン方程式を解析的に解くことにより行います。詳細な説明は 参考文献 を参考にしてください。ざっくりと説明すると、以下の計算を行うことで状態価値Vと行動価値Qを求め、それを用いて方策を更新することで学習を行います。

\begin{eqnarray} \bigl[\mathbf{P}^\pi\bigr]_{ss'} &=& \sum_a\pi(a|s)p(s'|s,a)\\ \bigl[\mathbf{R}^\pi\bigr]_{s} &=& \sum_a\pi(a|s)\sum_{s'}p(s'|s,a)r(s,a,s')\\ \mathbf{v} &=& \left(\mathbf{1} - \gamma\mathbf{P}^\pi\right)^{-1}\mathbf{R}^\pi\\ q_{\pi}(s,a) &=& \sum_{s'}p(s'|s,a)\biggl[r(s,a,s') + \gamma v_{\pi}(s')\biggr] \end{eqnarray}

(1)(2)(3)(4)は 参考文献 現場で使える!Python深層強化学習入門 強化学習と深層学習による探索と制御 P36, P32 より引用

4.3.2 コード

  1. 状態遷移確率(P) と方策の確率を合わせたものを求める

    以下のコードにより、状態遷移確率(P) と方策の確率を合わせたものを求めています。状態遷移確率と方策の行列の各要素の積をとり、行動 τ について和をとっています。

    # 状態遷移確率と方策の確率を合わせたものを求める
    P_PI_ss = Psas * PIas.values.reshape(-1, 1)
    P_PI_ss = P_PI_ss.sum(level=[0, 1])
    
  2. 状態遷移確率と方策から報酬の期待値を求める

    以下のコードにより、状態遷移確率と方策から報酬の期待値を求めます。状態遷移確率と方策と報酬の行列の各要素の積をとり、行動 τ について和をとっています。

    # R_PI_s を求める
    R_PI_s = Psas * PIas.values.reshape(-1, 1) * Rsas
    R_PI_s = R_PI_s.sum(axis=1).sum(level=[0, 1])
    
  3. ベルマン方程式を解き状態価値を求める

    以下のコードにより、ベルマン方程式を解き状態価値を求めます。ベルマン方程式を解くのは以下の行列計算で完了します。計算は逆行列の計算を含みます。

    # 状態価値 V を求める(ベルマン方程式を解析的に解く)
    size = len(R_PI_s)
    V_values = np.linalg.inv(np.eye(size) - gamma*P_PI_ss.values).dot(R_PI_s)
    V_s = pd.Series(V_values, index=R_PI_s.index)
    
  4. 行動価値を求める

    以下のコードにより、行動価値を求めます。行動価値は即時報酬、状態価値を使用して計算できます。

    # 行動価値 Q を求める
    Q_sa = Psas*(Rsas + gamma*V_s.values.reshape(1,-1))
    Q_sa = Q_sa.sum(axis=1)
    
  5. 方策を更新する

    以下のコードにより、方策を更新します。ここでは、行動価値が最大になる行動を確率1でとるように方策を更新します。ここまでの 状態遷移確率(P) と方策の確率を合わせたものを求める方策を更新する までが学習に必要な1ステップです。

    # 方策を更新
    # 状態のグループから行動価値が最大となるインデックスを求める
    # https://stackoverflow.com/questions/27842613/pandas-groupby-sort-within-groups
    largest_Qsa =  Q_sa.groupby(level=[0,1], group_keys=False).nlargest(1)
    PIas.loc[:] = 0
    for (theta_cat, theta_dot_cat), df in largest_Qsa.groupby(level=[0, 1]):
        PIas.loc[df.index] = 1
    
  6. 方策が更新されなくなるか、指定回数を超えたら方策の更新を打ち切る

    方策が更新されなくなるか、指定回数を超えたら方策の更新を打ち切ります。方策を更新したのに、前の方策と同じになった場合は、その次も同じ、その次もと永遠と同じ方策になるため更新を打ち切ります。今回は、指定回数を超えても終了するようにプログラムしています。

    # 方策が更新されなくなったら終了
    if (pre_PIas.values==PIas.values).all():
        break
    # 上限回数を超えても終了する
    cnt += 1
    if cnt > 10:
        break
    

5 コード全体

作成したコードの全体がこちらです。

import numpy as np
import pandas as pd
import gym
import time

# 乱数のシードを設定
seed = 100
np.random.seed(seed)

# 離散化に使用する分割数を定義
class N():
    theta = 25
    theta_dot = 50
    tau = 25
    actions = tau

# 角度を [0-360) にする関数を定義
f = lambda x: x - np.floor(x/360)*360

######################################
# 状態遷移確率の表の実際の値を求める #
######################################
##################
# ビンを作成する #
##################
# theta, tau, theta_dot の範囲を定義
bin_theta = np.linspace(0, 360, N.theta + 1)
bin_theta_dot = np.linspace(-8.01, 8.01, N.theta_dot + 1)
bin_tau = np.linspace(-2.01, 2.01, N.tau + 1)
# 角度、速度、トルク を離散化(pd.IntervalIndex.from_breaks を使用)
theta_index = pd.IntervalIndex.from_breaks(bin_theta)
theta_dot_index = pd.IntervalIndex.from_breaks(bin_theta_dot)
tau_index = pd.IntervalIndex.from_breaks(bin_tau)
# インデックスの作成
index_0 = pd.MultiIndex.from_product(
    [theta_index, theta_dot_index, tau_index],
    names=["theta_cat", "theta_dot_cat", "tau_cat"])
index_1 = pd.MultiIndex.from_product([theta_index, theta_dot_index],
                                     names=["theta_2_cat", "theta_dot_2_cat"])

print("Psasを作成")

# 状態遷移確率用
Psas = pd.DataFrame(0, index=index_0, columns=index_1)

print("Rsasを作成")

# 報酬は theta が 0°に近いほどよいかつ、運動エネルギーが小さいほどよいようにした。
data = []
for theta_2_cat, theta_dot_2_cat in Psas.columns:
    data.append(np.cos(np.deg2rad(theta_2_cat.mid))**3/(theta_dot_2_cat.mid**2 + 0.00001))

arr = np.zeros((len(Psas),1)) +  np.array(data)
Rsas = pd.DataFrame(arr, index=index_0, columns=index_1)

########################
# シミュレータを動かす #
########################
###############
# gymの初期化 #
###############
# 環境の作成
env = gym.make('Pendulum-v0') 
# 環境の初期化(明示的に行う必要がある)
env.reset()  
for theta_cat, theta_dot_cat, tau_cat in Psas.index:
    # 振り子の theta, speed を設定
    # ビンの中間の値を取得するため Interval オブジェクトの mid プロパティを使用
    env.env.state = np.deg2rad(theta_cat.mid), theta_dot_cat.mid
    obser, r, done, info = env.step([tau_cat.mid])
    theta_2, theta_dot_2 = env.env.state
    # デフォルトで rad なので degree に変換
    theta_2 = np.rad2deg(theta_2)
    theta_2 = f(theta_2)
    # カウントを +1 する
    Psas.loc[(theta_cat, theta_dot_cat, tau_cat), (theta_2, theta_dot_2)] += 1

# Psas 確率に変換
Psas = Psas / Psas.sum(axis=1).values.reshape(-1,1)

# 方策の初期化
PIas =  pd.DataFrame(1/N.actions, index=index_0, columns=["PI_tau_Prob"])

cnt = 0
gamma = 0.99

while True:
    print(cnt)
    # 状態遷移確率と方策の確率を合わせたものを求める
    P_PI_ss = Psas * PIas.values.reshape(-1, 1)
    P_PI_ss = P_PI_ss.sum(level=[0, 1])

    # R_PI_s を求める
    R_PI_s = Psas * PIas.values.reshape(-1, 1) * Rsas
    R_PI_s = R_PI_s.sum(axis=1).sum(level=[0, 1])

    # pre_V_s = V_s.copy()
    pre_PIas = PIas.copy()

    # 状態価値 V を求める(ベルマン方程式を解析的に解く)
    size = len(R_PI_s)
    V_values = np.linalg.inv(np.eye(size) - gamma*P_PI_ss.values).dot(R_PI_s)
    V_s = pd.Series(V_values, index=R_PI_s.index)

    # 行動価値 Q を求める
    Q_sa = Psas*(Rsas + gamma*V_s.values.reshape(1,-1))
    Q_sa = Q_sa.sum(axis=1)

    # 方策を更新
    # 状態のグループから行動価値が最大となるインデックスを求める
    # https://stackoverflow.com/questions/27842613/pandas-groupby-sort-within-groups
    largest_Qsa =  Q_sa.groupby(level=[0,1], group_keys=False).nlargest(1)
    PIas.loc[:] = 0
    for (theta_cat, theta_dot_cat), df in largest_Qsa.groupby(level=[0, 1]):
        PIas.loc[df.index] = 1

    # 状態価値の最小値を表示
    print(V_s.min())
    # 方策が更新されなくなったら終了
    if (pre_PIas.values==PIas.values).all():
        break
    # 上限回数を超えても終了する
    cnt += 1
    if cnt > 10:
        break
    first = False

##########################################
# 求めた方策に従い、シミュレータを動かす #
##########################################

# 環境の作成
env = gym.make('Pendulum-v0') 
env.seed(1)
for __ in range(10):
    # 環境の初期化(明示的に行う必要がある)
    env.reset()
    for _ in range(400):
        # 現在の状態を取得
        theta, theta_dot = env.env.state
        # デフォルトで rad なので degree に変換
        theta = np.rad2deg(theta)
        theta = f(theta)
        tau = PIas.loc[theta].loc[theta_dot].idxmax().values[0].mid
        env.step([tau])
        env.render()
        # time.sleep(0.03)

env.close()

6 出力結果

出力結果は以下になります。

https://raw.githubusercontent.com/keimina/gif/master/gymモジュール_Pendulum-v0_方策反復法_get_real_P_2.gif

図1. コード全体の実行結果

図1. が コード全体 実行結果ですが、頂上で止まっていません。ここから乱数のシードやコードを変えて試行錯誤を繰り返し手動でチューニングを行い学習させたところ、それらしい動きになりました。

https://github.com/keimina/gif/blob/master/gymモジュール_Pendulum-v0_方策反復法_get_real_P_good_2.gif?raw=true

図2. 手動チューニングを行い学習させた結果

7 考察

振り子の動きを見ると、左右に振って徐々に上昇させていることがわかります。これは、一方向に τ の上限のトルクを加えても、頂上まで上昇することがあまりないため、上昇させるには左右に振るのが定石だからです。(頂上付近だったり、初速度が大きい場合は一方向のトルクでも頂上にたどり着きます)また、左右に振るだけでも、振り子が一番下にいる確率を下げれますので、これだけで、マイナスの報酬を少なくできます。少なくともこの左右に振る動きを自動的に獲得することができました。ただし、頂上で静止するのは難しいようです。振り子が垂直の時、重力と釣り合うため、他の状態に比べ、少しのトルクで大きく動きます。そこに離散化による誤差の影響が加わり学習が難しくなっているかもしれません。ただ、上記図2.のように、パラメータによっては、うまく学習するときもあるようです。離散化の時に\(\dot \theta\)を他の分割数を他のパラメータの分割数より多くしているのは、他の分割数をあげるよりより、\(\dot \theta\)の分割数をあげる方が良い動きが見られたからです。離散化の方法も学習に大きな影響を及ぼすことがわかりました。出力結果をみると、真の最適な方策に収束しているようには思いませんが、それに近い方策になっていると思います。簡単な報酬を定義しただけで、アルゴリズムが自動的に最適に近い方策を出してくれるので便利です。

8 まとめ

Pendulum-v0 について、報酬を独自に定義し、連続値を離散化し状態遷移確率などを表で表し、ベルマン方程式を解くことにより、方策の学習を行いました。学習済のパラメータを用いて、シミュレーションを10回繰り返したところ、10回全てで振り子を頂上で止まらせることができました。(図2のアニメーションを参照)

9 参考文献

9.1 現場で使える!Python深層強化学習入門 強化学習と深層学習による探索と制御

9.2 これからの強化学習

www.morikita.co.jp

以上です。おやすみなさい。