일반적으로는 g(n) = rhs(n)이지만, 환경이 변하면 g(n) != rhs(n)이 될수도 있다. 이 차이를 이용해서 경로를 동적으로 업데이트 함.
5. 우선순위 큐
d star에서는 key값을 기준으로 정렬한다.
노드는 ( key1 , key2 )의 형태를 따른다.
key1 : 현재 예상 비용 + 휴리스틱
key2 : 같은 key1값을 가진 노드들 사이에서 부 정렬한다.
key1값이 가장 작은 노드를 꺼내서 탐색함.
d star 알고리즘 작동 방식
1. 초기 경로 탐색 : 목표지점에서 출발 지점 방향으로 탐색함. ( 역방향 )
1-1. 초기화 ( 그림1 , 그림 2 참고 )
초기 비용은 무한대. 목표 지점의 예상 비용은 0으로 초기화. 그리고 목표 지점을 우선순위 큐에 추가. g(goal)=∞ rhs(goal)=0
1-2. 우선순위 큐 관리
탐색할 노드들은 ( key1 , key2 )의 key값으로 우선순위를 결정함.
1-3. 노드 확장 ( 최단 경로 계산..... open리스트가 empty or 시작노드가 rhs=g가 될때까지 반복 )
가장 우선순위가 높은 노드를 꺼냄. 그리고 이웃 노드들의 g와 rhs값을 업데이트 한다. 이렇게 모든 노드가 처리될 때까지 반복
탐색중인 노드들은 g != rhs 인 경우가 많다. 대신에 경로 탐색이 끝난 후에는 대부분의 노드에서 g = rhs가 된다. 만약에 환경변화가 존재시에는 g != rhs가 된다.
결과적으로 초기 탐색이 끝나면, 로봇은 최적 경로를 따라서 목표지점으로 향할 준비가 된것.
2. 로봇 이동
로봇은 초기 탐색 결과를 바탕으로 목표지점까지 이동을한다.
최적경로는 g(n)값이 최소인 이웃 노드를 따라서 이동한다.
로봇이 환경 변화를 감지하는 상황들
새로운 장애물 발견 or 장애물 사라짐. 비용이 변경된 지역을 탐지
3. 환경 변화 감지
3-1. 장애물 업데이트인 경우
장애물이 있는 노드의 비용을 ∞로 수정하고 해당 노드의 rhs값을 새로 계산. 그리고 영향을 받는 주변 노드들을 우선순위 큐에 추가
3-2. 비용 불일치을 탐지한 경우
장애물로 인해서 기존 경로가 유효하지 않으면 g != rhs 상태가 되며 이때 2가지 상태로 구분한다.
underconsistent : g>rhs인 경우이며 즉, 더 나은 경로가 존재
overconsistent : g<rhs인 경우이며 즉, 잘못된 경로를 따르고 있음을 의미.
4. 경로 수정 : 환경 변화가 감지되면, 가존 경로를 전체적으로 계산하는 것이 아닌 부분적으로만 수정함.
computeShortestPath()는 큐에 존재하는 노드들만 재탐색해서 경로를 빠르게 수정함.
4-1. 우선순위 큐에서 노드 선택
4-2. 비용 업데이트
4-3. 이웃 노드 처리
4-4. 경로 재구성
아래 표는 d star의 상태전이에 대한 표 이다.
상태 state
설명
new
초기 상태 ( 아직 탐색이 되지 않은 노드 )
open
탐색 중인 노드 ( 우선 순위 큐에 있음 )
closed
탐색 완료된 노드
inconsistent
g != rhs인 상태 --> 환경 변화로 인해서 비용 불일치가 발생
consistent
g == rhs 인 상태 --> 경로가 유효함
# 맵을 초기화
def initialize():
g[goal] = float('inf') # 초기 비용은 무한대
rhs[goal] = 0 # 목표 지점의 예상 비용은 0
OPEN.insert( goal , calculate_key(goal) ) # 목표 지점을 우선순위 큐에 넣어줌
# 우선순위 계산 함수
# 우선순위는 ( key1 , key2 )로 구성
# key1은 g값과 rhs값 중 더 작은 값 + 시작점까지의 휴리스틱 거리
# key2는 g값과 rhs값 중 더 작은 값
def calculate_key( node ):
return ( min(g[node] , rhs[node]) + heuristic(start , node) , min(g[node] , rhs[node]) )
# 노드 업데이트 함수
# 이웃 노드를 탐색하며 rhs값을 최소 비용으로 업데이트 함
def update_vertex( node ):
if node != goal:
rhs[node] = min( g[neighbor]+cost(node,neighbor) for neighbor in neghbors(node) )
if node in OPEN:
OPEN.remove(node)# 큐에 이미 존재하면 삭제...중복 방지를 위함
if g[node] != rhs[node]:
OPEN.insert( node , calculate_key(node) ) # g와 rhs가 다르면 불일치 상태이니 큐에 다시 추가해서 재탐색하게 함
# 최단 경로를 탐색하는 함수
def comput_shortest_path():
# 탐색해야할 노드가 로봇의 현재 위치보다 중요한가? or 시작점의 비용이 최적이 아닐때까지 반복
while OPEN.top_key() < calculate_key( start ) or rhs[start] != g[start]:
node = OPEN.pop() # 우선 순위가 가장 높은 노드를 꺼냄
if g[node] > rhs[node]: # 상태 g 값이 더 크면 rhs로 업데이트. 즉, 더 나은경로 발견. underconsitent
g[node] = rhs[node]
else:# overconsistent 상태. g값이 더 작으면 무효화 후에 재탐색
g[node] = float('inf')
update_vertex(node) # 다시 이웃 노드의 정보를 업데이트
for neighbor in neighbors( node ): # 이웃 노드들도 업데이트 하여 경로를 지속적으로 최적화
update_vertex( neighbor )
# 메인 루프
initialize()
while start != goal:
compute_shortest_path() # 현재까지의 최단 경로 탐색
move_along_path() # 로봇이 경로를 따라서 이동
if obstacle_detected(): # 새로운 장애물 감지시
update_environment() # 환경 정보를 업데이트
compute_shortest_path() # 변경된 정보를 기반으로 경로 재탐색
# g[node]는 현재까지 탐색된 실제 비용. 즉, 출발점에서 해당 노드까지의 거리
# rhs[node]는 예상 비용. 이론적으로 최적 경로를 따를 경우의 예상 비용
# cost( node , neighbor )는 두 노드 간의 이동비용
구현
import heapq
import numpy as np
INF = float('inf')
class DStar:
def __init__( self , grid , start , goal ):
self.grid = np.array( grid ) # 2D 격자 지도이며, 0은 이동가능 , 1은 장애물
self.start = tuple( start ) # 시작 위치이자 로봇의 현재 위치이다.
self.goal = tuple( goal ) # 목표 위치이다.
self.g = {} # 실제 경로 비용 ( g-value or g(n) )
self.rhs = {} # 예상 경로 비용 ( rhs-value )
self.OPEN = [] # 탐색할 노드들을 저장하는 우선 순위 큐이다. min-heap구조
self.init_grid() # 초기화 작업 수행
# g와 rhs 값을 초기화하고, 목표 지점을 OPEN 큐에 삽입
def init_grid( self ):
rows , cols = self.grid.shape
for r in range( rows ):
for c in range( cols ):
self.g[ (r,c) ] = INF # 모든 노드의 초기 g값은 무한대
self.rhs[ (r,c) ] = INF # 모든 노드의 초기 rhs 값도 무한대
self.rhs[ self.goal ] = 0 # 목표지점의 rhs는 항상 0이다. ( 도착 비용이 없으니... )
# 목표 지점을 open큐에 넣어서 역방향 탐색을 시작한다.
heapq.heappush( self.OPEN , ( self.calculate_key( self.goal) , self.goal ) )
print(f'초기화 함수입니다... 현재 open큐에는 {self.OPEN}이 존재합니다.\n\n')
# 노드의 우선순위 키를 계산하는 함수.
# g(node)는 현재까지의 실제비용
# rhs(node)는 예상되는 최소비용
# heuristic은 현재 노드에서 시작 지점까지의 거리 추정
def calculate_key( self , node ):
key1 = min(self.g[node] , self.rhs[node] ) + self.heuristic( self.start , node )
key2 = min( self.g[ node ] , self.rhs[node] )
return ( key1 , key2 )
# 맨해튼 거리
def heuristic( self , a , b ):
#print(f'\t\t\t휴리스틱 함수... 들어온 a값 , b값은 {a} {b} 이며 반환값은 {abs( a[0]-b[0] )} + {abs( a[1]-b[1] )} = {abs( a[0]-b[0] ) + abs( a[1]-b[1] ) }')
return abs( a[0]-b[0] ) + abs( a[1]-b[1] )
# 현재 노드의 이웃 노드 찾기 ( 상 하 좌 우 ). 대신 장애물이 아닌 노드만 반환하며, 격자 범위도 벗어나면 안됨
def get_neighbors( self , node ):
x , y = node
neighbors = []
for dx , dy in [ (-1,0) , (1,0) , (0,-1) , (0,1) ]:
nx , ny = x+dx , y+dy
if 0 <= nx < self.grid.shape[0] and 0<= ny < self.grid.shape[1]:
if self.grid[ nx , ny ] != 1: # 장애물이 아닌 경우
neighbors.append( (nx,ny) )
return neighbors
# 노드의 비용을 업데이트 하는 함수이다.
# g와 rhs값이 다르면 OPEN큐에 삽입 ( 즉, 재탐색이 필요한 경우 )
# 단, 목표지점은 업데이트 안함
def update_vertex( self ,node ):
print(f'\t\t\t노드{node}의 비용을 업데이트...')
if node != self.goal:
# 이웃한 노드들 중에서 가장 작은 g값 + 1 ( 이동 비용 )
self.rhs[ node ] = min( self.g[neighbor] + 1 for neighbor in self.get_neighbors( node ) )
print(f'\t\t\t\t이웃한 노드들중에서 가장 작은 g(n)+1의 결과는 {self.rhs[node]}')
# open큐에서 기존 노드를 삭제. (중복 방지 ) 즉, 방금 막 탐색 완료한 노드를 삭제하는것
self.OPEN = [ item for item in self.OPEN if item[1] != node ]
heapq.heapify( self.OPEN ) # 큐를 재정렬
print(f'\t\t\t\t최종적인 open: {self.OPEN}')
# 만약 g와 rhs가 다르면 open 큐에 다시 추가 ( 아직 최적 경로가 아닌 경우다.)
if self.g[node] != self.rhs[ node ]:
print(f'\t\t\t\t\tg!=rhs라서 {node}를 다시 open에 추가합니다...')
heapq.heappush( self.OPEN , ( self.calculate_key( node ) , node ) )
# 최단 경로 계산 함수이다.
def compute_shortest_path( self ):
print(f'\t최단 경로 계산함수입니다. 현재 open큐는 {self.OPEN}입니다. ')
# open큐가 비어있지 않고, g != rhs 이거나 open큐의 top_key가 현재 시작 위치의 key값보다 작은 경우
# (8,0)과 ((8,2)과 같이 튜플형식은 값 비교 가능...
while self.OPEN and ( self.OPEN[0][0] < self.calculate_key( self.start ) or self.rhs[self.start] != self.g[self.start] ):
print(f'\t\t최단 경로 계산함수의 while문 입니다... 현재 open큐는 {self.OPEN}입니다. ', end='')
_ , node = heapq.heappop( self.OPEN ) # 우선순위가 가장 높은 노드 선택
print(f'우선순위가 가장 높은 노드는 {node}입니다. 해당 노드의 g(n)은 {self.g[node]}이며 rhs(n)은 {self.rhs[node]}입니다.')
# 현재 실제 비용보다 예상되는 비용의 값이 더 작은 경우에 g값을 업데이트. 즉, 더나은 경로를 발견한 경우
if self.g[node] > self.rhs[node]:
print('\t\t\t더 나은 경로 발견! rhs값을 g에 넣어줌...')
self.g[node] = self.rhs[node]
else: # 경로가 더 이상 최적이 아닌 경우이다. g값을 초기화하고 재계산한다.
print(f'\t\t\t경로가 더이상 최적이 아님... g <= rhs인 경우 ')
self.g[node] = INF
self.update_vertex(node)
for neighbor in self.get_neighbors( node ):
print(f'\t\t\t\t이웃 {neighbor}도 업데이트')
self.update_vertex( neighbor )
# 최적 경로 찾기 함수
# 시작 지점부터 목표지점까지 g값이 가장 작은 방향으로 이동한다.
def find_path( self ):
print(f'find_path함수입니다. 이는 compute_shorest_path함수를 이용해서 최단 경로를 계산하고, 로봇의 이동경로를 반환 합니다.')
self.compute_shortest_path() # 우선 최단 경로 계산
path = []
current = self.start
while current != self.goal:
path.append( current )
neighbors = self.get_neighbors( current )
if not neighbors:
return None # 경로가 없는 경우이다. (막혀있음 )
# g값이 가장 작은 이웃 노드로 이동
current = min( neighbors , key=lambda n : self.g[n] )
path.append( self.goal )
return path
# 장애물을 추가하고 경로 재탐색
# 장애물이 생긴 위치 주변 노드의 비용을 업데이트 함
def add_obstacle( self , pos ):
self.grid[ pos ] = 1
for neighbor in self.get_neighbors(pos):
self.update_vertex(neighbor) # 주변 노드 업데이트
self.compute_shortest_path() # 최단 경로 재계산