강화학습

정책 평가 , 정책 제어 , 반복적 정책 평가

정지홍 2024. 12. 5. 14:13

정책 평가

  • 정책이 주어졌을떼, 해당되는 정책의 가치 함수 v(s) or q(s,a)를 구하는 문제
    • v(s)상태 가치 함수 , q(s,a)행동 가치 함수

 

정책 제어

  • 정책을 조정하여, 최적 정책을 만들어내는 것을 의미.
  • 강화학습의 최종적인 목표

 

 

2번째식은 '추정치'라서 v가 아닌 대문자 V로 표가하였다.

벨만 방정식을 갱신적으로 변형

  • 다음 상태의 가치 함수 Vk(s')를 이용하여, 현재 상태의 가치 함수 Vk+1(s)를 개선한다.
    즉, 추정치 Vk(s')를 이용하여, Vk+1(s)를 갱신하는 것이다.
    • 위와 같은 식의 갱신하는 과정을 부트스트래핑bootstrapping이라고 한다.
  • ex)
    • 위의 식을 이용하여 V0(s)에서 V1(s)를 갱신한다. 그리고 이어서 V1(s)를 이용하여 V2(s)로 갱신한다.
      위의 과정을 반복하면 최종 목표인 V(s)가 가까워 짐.
      • 이러한 과정을 반복적 정책 평가( iterative policy evalutation )라고 한다.
      • 현실 문제에서는 갱신을 적절한 횟수로 결정해야 한다.

 

 

상태 전이가 결정적인 경우에서의 갱신적인 벨만 방적식

ex1) 2칸짜리 그리드에서의 예시 ( 반복적 정책 평가 )

  • V0(L1) = 0 이고 V0(L2)=0이라는 가정하에 다음 스텝으로 갱신해보자
    • 최적의 정책으로 수행한 경우에서, L0위치에서 최대로 낼수있는 추정값.
  • V1(L1) = 0.5{ -1 + 0.9*V0(L1)} + 0.5{ 1 + 0.9*V0(L2)} = 0 이다.
  • V1(L2) = 0.5{ 0 + 0.9*V0(L1)} + 0.5{ -1 + 0.9*V0(L2)} = -0.5 이다.
  • 위와 같은 과정으로 갱신하며, 아래는 파이썬으로 해보았다.

위의 결과를 보면 100번째 갱신할때는 거의 앞의 시점과 값이 유사함을 볼수있다.
임계값을 설정하여, 일정 값보다 낮아지면 중지.

 

 

 


ex2) 2칸짜리 그리드에서의 예시  ( 반복적 정책 평가 2 )

  • dp사용

 


ex3) 3x4칸짜리 그리드에서의 예시  ( 반복적 정책 평가 3 )

  • 가정...
  • agent는 상하좌우 움직이며, 벽 밖으로는 못나감.
  • 사과의 보상은 +1, 폭탄의 보상은 -1, 벽에 부딪히면 +0이다. 이외 나머지도 +0이다.
  • 환경에서 상태 전이는 결정적이다. 즉, 벽이나 장애물이 없다면, 반드시 이동한다.
  • 사과를 얻으면 바로 종료한다.
  • 1번째 코드는 GridWorld 클래스이며, 맵에 대한 정보와 에이전트가 수행할 수 있는 action등을 정의 함
  • 2번째 코드는 가치함수를 한 차례만 갱신을 하는 함수이다.
    • 즉, 정책과 환경을 바탕으로 한번의 가치 평가를 수행하며, 상태 가치 함수를 갱신함
  • 3번째 코드는 갱신되는 양의 최대값이 임계값보다 작아질때까지 반복적으로 업데이트 함.
    • 즉, delta값이 threshold보다 커야 계속 수행. delta가 더 작으면 break
import numpy as np

class GridWorld:
    def __init__( self ):
        self.action_space = [ 0, 1, 2 , 3 ]  # 행동 할 수 있는 가지 수
        self.action_meaning = { 0:'UP' , 1:'DOWN' , 2:'LEFT' , 3:'RIGHT' } # 위에서 정의한 행동 번호가 의미 하는 바
        self.reward_map = np.array( # 게임 맵
                    [ [ 0 , 0    , 0 ,  1.0 ] ,
                      [ 0 , None , 0 , -1.0 ] ,
                      [ 0 , 0    , 0 ,  0   ] ] )
        self.goal_state = ( 0 , 3 ) # 목표 지점
        self.wall_state = ( 1 , 1 ) # 벽 지점
        self.start_state = ( 2 , 0 ) # 시작 지점
        self.agent_state = self.start_state # 에이전트 초기 상태

    @property # 맵의 행 갯수
    def height( self ):
        return len( self.reward_map )

    @property # 맵의 컬럼 갯수
    def width( self ):
        return len( self.reward_map[0] )

    @property # 맵의 쉐잎
    def shape( self ):
        return self.reward_map.shape

    def actions( self ): # 행동할수 있는 범위 
        return self.action_space

    def states( self ):
       for h in range ( self.height):
            for w in range ( self.width):
                yield ( h , w ) # yield가 있는 라인은 함수를 호출한 쪽으로 프로그램의 제어를 넘겨줌을 의미 
                # yield는 return과 다르게 함수 종료가 아니라 일지중지이다. 그래서 다음에 함수 실행시 이전 상태를 기억하고 이어서 호출이 가능
    
    def next_state( self , state , action ):
        action_move_map = [ ( -1 , 0 ) , ( 1, 0 ) , ( 0 , -1 ) , ( 0 , 1 ) ]
        move = action_move_map[ action ] 
        next_state = ( state[0] + move[0] , state[1] + move[1] ) # 위치 이동에 대해서 계산을 수행
        ny , nx = next_state

        if nx < 0 or nx >= self.width or ny < 0 or ny >= self.height: # agent가 맵을 벗어나려는 경우
            next_state = state
        elif next_state == self.wall_state: # 벽에 부딪히는 경우
            next_state = state
        return next_state

    def reward( self , state , action , next_state ):
        return self.reward_map[ next_state ]
# 정책과 환경을 바탕으로 한번의 가치 평가를 수행하며, 상태 가치 함수를 갱신함.
def eval_onestep( pi , V , env , gamma=0.9 ): # pi는 정책 , V는 가치함수 , env는 환경 , 감마는 할인률
    for state in env.states(): # 각각의 상태에 접근함. 즉, 모든 상태를 하나씩 추적한다
        if state == env.goal_state: # 만약 목표 상태인 경우, 이때는 가치함수가 항상 0이다.
            V[ state ] = 0
            continue

        action_probabilities = pi[state] # 인자로 전달받은 정책들을 가져온다.  정책은 0.25확률로 아래로 , 0.25확률로 왼쪽으로 , 0.25확률로 오른쪽으로가는것
        new_V = 0 # 새로운 상태 가치를 계산하기 위한 변수 

        for action , action_probabilities in action_probabilities.items(): # action_probabilities는 key-value로 행동-확률 쌍이다.
            next_state = env.next_state( state , action ) # 현재의 state를 기반으로 다음 state르 계산한다.
            r = env.reward( state , action , next_state ) # 현재 state에서 다음 state로 이동하는 action을 하는경우의 보상을 구한다.
            new_V += action_probabilities * ( r + gamma * V[ next_state ] ) # 갱신적으로 변형한 벨만 방정식을 이용해서 상태 가치 추정

        V[ state ] = new_V # 계산된 상태 가치를 현재 상태에 반영. 즉 k+1을 의미

    return V
# 주어진 정책에 대해서, 상태가치 함수를 평가해서, 갱신된 양의 최대값이 임계값보다 작아질때까지 반복적 업데이트함.
# pi는 상태별 행동 확률을 나타내는 정책. 0.25확률로 위로, 0.25확률로 아래로 , 0.25확률로 왼쪽으로 , 0.25확률로 오른쪽으로...
# V는 초기 상태 가치 함수이며 0으로 초기화
# env는 환경
# gamma는 할인률
# threshold는 임계값
# return 값은 수렴된 상태 가치 함수이다.

def policy_eval( pi , V , env , gamma , threshold=0.001 ):
    while True:
        old_V = V.copy() # 갱신을 수행하기전의 가치함수를 copy한다.
        V = eval_onestep( pi , V , env , gamma ) # 한차례 가치를 갱신한다.
        delta = 0 # 수렴하는지 여부를 확인하기 위해 최대 변화량을 초기화
        # 각 상태에 대해서 변화량을 갱신함
        for state in V.keys(): # 여기에는 총 12개의 키가 존재하며, 각각 key값의 value는 상태의 가치를 의미
            t = abs( V[state] - old_V[ state ] ) # 현재 상태와 이전 상태의 가치의 차이를 계산
            if delta < t: # delta값을 업데이트. 이 값이 thresbhold보다 작아져야함.
                delta = t

        if delta < threshold:
            break

    return V

결과...
eval_onestep의 아래쪽은 갱신적으로 변형한 벨만 방정식이다.

  • pi = defaultdict( lambda: { 0:0.25 , 1:0.25 , 2:0.25 , 3:0.25 } )라는 부분이 있을것이다.
    이것이 정책을 의미하며, 이러한 정책을 주었을때의 정책을 평가한것이다.