알고리즘

d star 알고리즘

정지홍 2025. 2. 2. 01:39

그림1
코드1
그림2 ( (key1,key2) , node )형태이다. 이는 코드1 을 보면 g와 rhs의 최솟값 + 휴리스틱이 key1이다. 여기서 g는 무한댕이며 rhs는 0이다. 그리고 휴리스틱이 8칸이니 두개를 더하면 min( 무한대 , 0 ) + ( 8 )= (0)+(8)=(8)이다. 그리고 g와 rhs의 min은 0이다. 그래서 결과적으로 (8,0)이다.

d star 알고리즘

  • path planning에서 주로 사용되는 알고리즘
    • 특히, 동적인 환경에서 효율적으로 경로를 찾고 재계산하는데 최적화 되어있다.
    • a star의 확장형으로, 장애물이 바뀌거나 새로운 정보가 추가될때 전체 경로를 처음부터 다시 계산하는 것이 아니라 기존 경로를 효율적이게 수정한다.
  • 목적
    • start에서 goal까지 장애물을 피하면서 최적의 경로를 찾는것이다.
      • 만약 환경이 변화하거나 새로운 장애물이 감지되면 빠르게 경로를 수정한다.
  • a star와의 차이
    • a star는 정적 환경에서 최적화 된 알고리즘이며, 이는 환경이 변하면 경로를 처음부터 다시 계산해야함.
    • d star는 기존의 탐색결과를 재사용하니 보다 빠르게 새로운 경로를 찾는게 가능하다.
  •  장점
    • 동적인 환경에서 빠르게 경로 수정이 가능하며, 이로 인해서 기존 탐색 결과를 재사용하니 효율성이 향상됨.
  • 단점
    • 정적 환경에서는 a star보다 복잡도가 높다. 그리고 구현도 더 복잡.
    • 환경변화가 과도하면 성능이 저하됨.
  • d star 변형 알고리즘들...
    • d lite star 알고리즘 구현을 간소화하였으며, 구현이 쉽고 효율적이다.
    • focused d star 탐색 범위를 더 좁혀서 속도를 개선함.
    • anytime d star 시간이 제한된 상황에서만 빠르게 경로를 찾으며, 시간이 더 주어지면 점진적으로 개선함.

 


 

 

식1

d star 알고리즘의 주요 개념 및 구성 요소

  • 1. node : 로봇이 이동할 수 있는 위치
  • 2. weight : 두 node를 연결하는 경로의 비용
  • 3. edge : node간 이동 가능한 경로
  • 4. cost values 
    • 4-1. g(n) : 실제 비용
      • 시작점에서 노드 n까지 도달하는  최소비용을 의미.
        출발점에서 목표점까지의 경로를 평가할 때 사용하며, 작을수록 최적경로에 가깝다.
    • 4-2. rhs(n) : 예상 비용 
      • 노드 n이 최적의 부모노드를 선택했을때 예상되는 비용. ( 주변 노드의 g값을 참조하여 갱신 )
      • 식1을 이용하여 이웃한 노드들중에서 최소 비용인 것을 선택한다.
    • 일반적으로는 g(n) = rhs(n)이지만, 환경이 변하면 g(n) != rhs(n)이 될수도 있다.
      이 차이를 이용해서 경로를 동적으로 업데이트 함.
  • 5. 우선순위 큐
    • d star에서는 key값을 기준으로 정렬한다.
    • 노드는  ( key1 , key2 )의 형태를 따른다.
      • key1 : 현재 예상 비용 + 휴리스틱
      • key2 : 같은 key1값을 가진 노드들 사이에서 부 정렬한다.
      • key1값이 가장 작은 노드를 꺼내서 탐색함.

 


 

 

d star 알고리즘 작동 방식

  • 1. 초기 경로 탐색 : 목표지점에서 출발 지점 방향으로 탐색함. ( 역방향  )
    • 1-1. 초기화 ( 그림1 , 그림 2 참고 )
      • 초기 비용은 무한대. 목표 지점의 예상 비용은 0으로 초기화. 그리고 목표 지점을 우선순위 큐에 추가.
        g(goal)=∞
        rhs(goal)=0
    • 1-2. 우선순위 큐 관리
      • 탐색할 노드들은 ( key1 , key2 )의 key값으로 우선순위를 결정함.
    • 1-3. 노드 확장 ( 최단 경로 계산..... open리스트가 empty or 시작노드가 rhs=g가 될때까지 반복 )
      • 가장 우선순위가 높은 노드를 꺼냄. 그리고 이웃 노드들의 g와 rhs값을 업데이트 한다. 이렇게 모든 노드가 처리될 때까지 반복 
        • 탐색중인 노드들은 g != rhs 인 경우가 많다. 대신에 경로 탐색이 끝난 후에는 대부분의 노드에서 g = rhs가 된다. 만약에 환경변화가 존재시에는 g != rhs가 된다.
    • 결과적으로 초기 탐색이 끝나면, 로봇은 최적 경로를 따라서 목표지점으로 향할 준비가 된것.
  • 2. 로봇 이동
    • 로봇은 초기 탐색 결과를 바탕으로 목표지점까지 이동을한다.
      • 최적경로는 g(n)값이 최소인 이웃 노드를 따라서 이동한다.
    • 로봇이 환경 변화를 감지하는 상황들
      • 새로운 장애물 발견 or 장애물 사라짐. 비용이 변경된 지역을 탐지 
  • 3. 환경 변화 감지
    • 3-1. 장애물 업데이트인 경우
      • 장애물이 있는 노드의 비용을 ∞로 수정하고 해당 노드의 rhs값을 새로 계산. 그리고 영향을 받는 주변 노드들을 우선순위 큐에 추가 
    • 3-2. 비용 불일치을 탐지한 경우
      • 장애물로 인해서 기존 경로가 유효하지 않으면 g != rhs 상태가 되며 이때 2가지 상태로 구분한다.
        • underconsistent :  g>rhs인 경우이며 즉, 더 나은 경로가 존재
        • overconsistent :  g<rhs인 경우이며 즉, 잘못된 경로를 따르고 있음을 의미.
  • 4. 경로 수정 : 환경 변화가 감지되면, 가존 경로를 전체적으로 계산하는 것이 아닌 부분적으로만 수정함.
    • computeShortestPath()는 큐에 존재하는 노드들만 재탐색해서 경로를 빠르게 수정함.
    • 4-1. 우선순위 큐에서 노드 선택
    • 4-2. 비용 업데이트
    • 4-3. 이웃 노드 처리
    • 4-4. 경로 재구성
  • 아래 표는 d star의 상태전이에 대한 표 이다.
상태 state 설명
new 초기 상태 ( 아직 탐색이 되지 않은 노드 )
open 탐색 중인 노드 ( 우선 순위 큐에 있음 )
closed 탐색 완료된 노드
inconsistent g != rhs인 상태  --> 환경 변화로 인해서 비용 불일치가 발생
consistent g == rhs 인 상태  --> 경로가 유효함 

 

 

 

# 맵을 초기화 
def initialize():
	g[goal] = float('inf') # 초기 비용은 무한대
    rhs[goal] = 0 # 목표 지점의 예상 비용은 0 
    OPEN.insert( goal , calculate_key(goal) ) # 목표 지점을 우선순위 큐에 넣어줌
    
# 우선순위 계산 함수
# 우선순위는 ( key1 , key2 )로 구성
# key1은 g값과 rhs값 중  더 작은 값 + 시작점까지의 휴리스틱 거리
# key2는 g값과 rhs값 중 더 작은 값 
def calculate_key( node ):
	return ( min(g[node] , rhs[node]) + heuristic(start , node) , min(g[node] , rhs[node]) )
    
# 노드 업데이트 함수
# 이웃 노드를 탐색하며 rhs값을 최소 비용으로 업데이트 함 
def update_vertex( node ):
	if node != goal:
		rhs[node] = min( g[neighbor]+cost(node,neighbor) for neighbor in neghbors(node) )
	if node in OPEN:
    	OPEN.remove(node)# 큐에 이미 존재하면 삭제...중복 방지를 위함 
    if g[node] != rhs[node]:
    	OPEN.insert( node , calculate_key(node) ) # g와 rhs가 다르면 불일치 상태이니 큐에 다시 추가해서 재탐색하게 함

# 최단 경로를 탐색하는 함수
def comput_shortest_path():
	# 탐색해야할 노드가 로봇의 현재 위치보다 중요한가? or 시작점의 비용이 최적이 아닐때까지 반복
	while OPEN.top_key() < calculate_key( start ) or rhs[start] != g[start]:
    	node = OPEN.pop() # 우선 순위가 가장 높은 노드를 꺼냄 
        if g[node] > rhs[node]: # 상태 g 값이 더 크면 rhs로 업데이트. 즉, 더 나은경로 발견. underconsitent
        	g[node] = rhs[node]
        else:# overconsistent 상태. g값이 더 작으면 무효화 후에 재탐색
        	g[node] = float('inf')
            update_vertex(node) # 다시 이웃 노드의 정보를 업데이트
        for neighbor in neighbors( node ): # 이웃 노드들도 업데이트 하여 경로를 지속적으로 최적화
        	update_vertex( neighbor )
           
    
    
# 메인 루프
initialize()
while start != goal:
	compute_shortest_path() # 현재까지의 최단 경로 탐색
    move_along_path() # 로봇이 경로를 따라서 이동
    if obstacle_detected(): # 새로운 장애물 감지시 
    	update_environment() # 환경 정보를 업데이트
        compute_shortest_path() # 변경된 정보를 기반으로 경로 재탐색 
        
# g[node]는 현재까지 탐색된 실제 비용. 즉, 출발점에서 해당 노드까지의 거리
# rhs[node]는 예상 비용. 이론적으로 최적 경로를 따를 경우의 예상 비용
# cost( node , neighbor )는 두 노드 간의 이동비용

 

 

 


 

구현

import heapq
import numpy as np

INF = float('inf')

class DStar:
    def __init__( self , grid , start , goal ):
        self.grid = np.array( grid ) # 2D 격자 지도이며, 0은 이동가능 , 1은 장애물
        self.start = tuple( start ) # 시작 위치이자 로봇의 현재 위치이다.
        self.goal = tuple( goal ) # 목표 위치이다.
        
        self.g = {} # 실제 경로 비용 ( g-value  or g(n) )
        self.rhs = {} # 예상 경로 비용 ( rhs-value )
        self.OPEN = [] # 탐색할 노드들을 저장하는 우선 순위 큐이다. min-heap구조

        self.init_grid() # 초기화 작업 수행

    # g와 rhs 값을 초기화하고, 목표 지점을 OPEN 큐에 삽입
    def init_grid( self ):
        rows , cols = self.grid.shape
        for r in range( rows ):
            for c in range( cols ):
                self.g[ (r,c) ] = INF # 모든 노드의 초기 g값은 무한대 
                self.rhs[ (r,c) ] = INF # 모든 노드의 초기 rhs 값도 무한대
        self.rhs[ self.goal ] = 0 # 목표지점의 rhs는 항상 0이다. ( 도착 비용이 없으니... )
        # 목표 지점을 open큐에 넣어서 역방향 탐색을 시작한다.
        heapq.heappush( self.OPEN , ( self.calculate_key( self.goal)  , self.goal ) )
        print(f'초기화 함수입니다... 현재 open큐에는 {self.OPEN}이 존재합니다.\n\n')

    # 노드의 우선순위 키를 계산하는 함수.
    # g(node)는 현재까지의 실제비용
    # rhs(node)는 예상되는 최소비용
    # heuristic은 현재 노드에서 시작 지점까지의 거리 추정 
    def calculate_key( self , node ):
        key1 = min(self.g[node] , self.rhs[node] ) + self.heuristic( self.start , node )
        key2 = min( self.g[ node ] , self.rhs[node] )
        return ( key1 , key2 )

    # 맨해튼 거리 
    def heuristic( self , a , b ):
        #print(f'\t\t\t휴리스틱 함수... 들어온 a값 , b값은 {a}  {b} 이며 반환값은 {abs( a[0]-b[0] )} + {abs( a[1]-b[1] )} = {abs( a[0]-b[0] ) + abs( a[1]-b[1] ) }')
        return abs( a[0]-b[0] ) + abs( a[1]-b[1] )

    # 현재 노드의 이웃 노드 찾기 ( 상 하 좌 우 ). 대신 장애물이 아닌 노드만 반환하며, 격자 범위도 벗어나면 안됨
    def get_neighbors( self , node ):
        x , y = node
        neighbors = []
        for dx , dy in [ (-1,0) , (1,0) , (0,-1) , (0,1) ]:
            nx , ny = x+dx , y+dy
            if 0 <= nx < self.grid.shape[0] and 0<= ny < self.grid.shape[1]:
                if self.grid[ nx , ny ] != 1: # 장애물이 아닌 경우 
                    neighbors.append( (nx,ny) )
        return neighbors
    
    # 노드의 비용을 업데이트 하는 함수이다.
    # g와 rhs값이 다르면 OPEN큐에 삽입 ( 즉, 재탐색이 필요한 경우 )
    # 단, 목표지점은 업데이트 안함
    def update_vertex( self ,node ):
        print(f'\t\t\t노드{node}의 비용을 업데이트...')
        if node != self.goal:
            # 이웃한 노드들 중에서 가장 작은 g값 + 1 ( 이동 비용 )
            self.rhs[ node ] = min( self.g[neighbor] + 1 for neighbor in self.get_neighbors( node ) )
            print(f'\t\t\t\t이웃한 노드들중에서 가장 작은 g(n)+1의 결과는 {self.rhs[node]}')
        # open큐에서 기존 노드를 삭제. (중복 방지 ) 즉, 방금 막 탐색 완료한 노드를 삭제하는것
        self.OPEN = [ item for item in self.OPEN if item[1] != node ]
        heapq.heapify( self.OPEN ) # 큐를 재정렬
        print(f'\t\t\t\t최종적인 open: {self.OPEN}')

        # 만약 g와 rhs가 다르면 open 큐에 다시 추가 ( 아직 최적 경로가 아닌 경우다.)
        if self.g[node] != self.rhs[ node ]:
            print(f'\t\t\t\t\tg!=rhs라서 {node}를 다시 open에 추가합니다...')
            heapq.heappush( self.OPEN , ( self.calculate_key( node ) , node ) )
    
    # 최단 경로 계산 함수이다.
    def compute_shortest_path( self ):
        print(f'\t최단 경로 계산함수입니다. 현재 open큐는 {self.OPEN}입니다. ')
        # open큐가 비어있지 않고, g != rhs 이거나 open큐의 top_key가 현재 시작 위치의 key값보다 작은 경우
        # (8,0)과 ((8,2)과 같이 튜플형식은 값 비교 가능...
        while self.OPEN and ( self.OPEN[0][0] < self.calculate_key( self.start ) or self.rhs[self.start] != self.g[self.start] ):
            print(f'\t\t최단 경로 계산함수의 while문 입니다... 현재 open큐는 {self.OPEN}입니다. ', end='')
            _ , node = heapq.heappop( self.OPEN ) # 우선순위가 가장 높은 노드 선택
            print(f'우선순위가 가장 높은 노드는 {node}입니다. 해당 노드의 g(n)은 {self.g[node]}이며 rhs(n)은 {self.rhs[node]}입니다.')
            # 현재 실제 비용보다 예상되는 비용의 값이 더 작은 경우에 g값을 업데이트. 즉, 더나은 경로를 발견한 경우
            if self.g[node] > self.rhs[node]:
                print('\t\t\t더 나은 경로 발견! rhs값을 g에 넣어줌...')
                self.g[node] = self.rhs[node]
            else: # 경로가 더 이상 최적이 아닌 경우이다. g값을 초기화하고 재계산한다.
                print(f'\t\t\t경로가 더이상 최적이 아님... g <= rhs인 경우 ')
                self.g[node] = INF
                self.update_vertex(node)
            for neighbor in self.get_neighbors( node ):
                print(f'\t\t\t\t이웃 {neighbor}도 업데이트')
                self.update_vertex( neighbor )
    
    # 최적 경로 찾기 함수
    # 시작 지점부터 목표지점까지 g값이 가장 작은 방향으로 이동한다.
    def find_path( self ):
        print(f'find_path함수입니다. 이는 compute_shorest_path함수를 이용해서 최단 경로를 계산하고, 로봇의 이동경로를 반환 합니다.')
        self.compute_shortest_path() # 우선 최단 경로 계산
        path = []
        current = self.start 
        while current != self.goal:
            path.append( current )
            neighbors = self.get_neighbors( current )
            if not neighbors:
                return None # 경로가 없는 경우이다. (막혀있음 )
            # g값이 가장 작은 이웃 노드로 이동
            current = min( neighbors , key=lambda n : self.g[n] )
        path.append( self.goal )
        return path

    # 장애물을 추가하고 경로 재탐색
    # 장애물이 생긴 위치 주변 노드의 비용을 업데이트 함 
    def add_obstacle( self , pos ):
        self.grid[ pos ] = 1
        for neighbor in self.get_neighbors(pos):
            self.update_vertex(neighbor) # 주변 노드 업데이트 
        self.compute_shortest_path() # 최단 경로 재계산
grid = [
    [0, 0, 0, 0, 0],
    [0, 1, 1, 0, 0],
    [0, 0, 0, 1, 0],
    [0, 1, 0, 0, 0],
    [0, 0, 0, 0, 0]
]

# 시작점과 목표점 
start = (0, 0)
goal = (4, 4)

dstar = DStar(grid, start, goal )


# 초기 최적 경로 찾기
path = dstar.find_path() 
print("최적 경로:", path)


# 장애물 추가 후 경로 재탐색
dstar.add_obstacle((2, 2))
new_path = dstar.find_path()
print("장애물 추가 후 새로운 경로:", new_path)