multi-agent path finding문제에서 자중 사용되는 최적화 알고리즘이다. 각각의 로봇이 서로 충돌 없이 목표 지점까지 도달하게 경로를 찾는 문제
여기서 충돌이란 롯봇이 간은 간선을 서로 다른 방향으로 지나는 경우와 서로 같은 시간에 서로 같은 위치에 도착하는 경우이다.
CBS의 아이디어
각각의 로봇이 개별로 최단 경로를 찾게한다.
만약 경로들간에 충돌conflict가 발생하며, 해당 충돌을 해결하는 새로운 경로를 찾는다. 이를 위해서 search tree를 생성하고 분할,정복방식으로 문제를 해결한다.
장점
최적해를 보장
충동 햐결 방식의 직관적
필요한 경우에만 탐색을 수행하며 경로 계산 속도가 빠름
단점
경로가 많아지면 탐색 공간이 증가한다.
제약이 늘어날수록 속도가 저하됨
개선된 것
ICBS ( Improved CBS )
MA-CBS ( Meta-Agent CBS )
CBS( Conflict-Based Search )의 구조
고수준 탐색
탐색 트리 ( search tree )를 사용한다. 각 노드는 모든 로봇의 경로 집합과 충돌 목록을 포함한다. root node에서는 모든 로봇들이 충돌을 무시하고 최단 경로로 이동 먼역 충돌이 발생하면, 이를 해결하기 위해서 트리를 분할(branching)을 한다.
저수준 탐색
개별 로봇의 최단 경로를 a*등을 사용해서 계산한다.
만약 충돌 존재시 경로를 수정한다.
CBS( Conflict-Based Search )의 동작 과정
1. 초기 경로 찾기
각 로봇이 충돌을 고려하지 않고 개별적으로 최단 경로를 찾음. ( 주로 a*사용 )
2. 모든 로봇들의 경로를 검사해서 충돌이 존재하는지 확인
충돌이 없으면 최적 경로로 판단. 충돌 존재시 이를 해결해야함
3. 충돌 해결 branching
발견된 충돌을 해결하기 위해서 트리를 두개의 하위 문제로 나눈다. 각 분기branch는 한개의 로봇이 충돌을 피하게 제약을 추가한다.
ex) 만약 2개의 로봇이 같은 지점에서 충돌한다면...
첫번째 분기로는 로봇 A가 특정 시간에 해당 위치를 못지나가게 제한
두번째 분기로는 로봇 B가 해당 위치를 못지나게 제한
이후에 해당 2개의 새로운 노드에 대해서 각각 저수준 탐색을 다시 수행해서 새로운 경로 찾기
즉, 해결방법을 2가지로 나누어서 탐색하고 더 나은 방식을 고르는 것이다.
4. 최적해를 탐색
탐색 트리를 확장해나가며 비용이 가장 낮은 올바를 경로만 찾는다. 주로 prority queue를 사용
import heapq
import matplotlib.pyplot as plt
import numpy as np
from collections import deque
import random
MAP_SIZE = 20
MOVES = [ (0,1) , (0,-1) , (1,0) , (-1,0) ]
# grid에 장애물을 생성하는 함수이다.
# 반환하는 것은 생성된 장애물들의 집합
def generate_obstacles( grid , num_obstacles=30 ):
obstacles = set() # 장애물 좌표를 저장할 집합이다.
while len(obstacles) < num_obstacles:
x,y = random.randint( 0 , MAP_SIZE-1 ) , random.randint( 0 , MAP_SIZE-1 )
if (x,y) not in obstacles:
obstacles.add( (x,y) )
grid[x][y] = 1
return obstacles
class AStar:
def __init__( self , grid , start , goal , constraints ):
self.grid = grid
self.start = start
self.goal = goal
self.constraints = constraints # 로봇이 피해야할 위치들
# 맨해튼 거리
def heuristic( self , a , b ):
return abs( a[0]-b[0] ) + abs( a[1]-b[1] )
def search( self ):
open_set = [] # 우선순위 큐를 초기화
heapq.heappush( open_set , ( 0 , self.start ) ) # 시작 지점을 푸쉬한다.
came_from = {} # 각 노드의 부모를 기록함
g_score = { self.start: 0 }
f_score = { self.start: self.heuristic( self.start , self.goal ) }
# 탐색할것이 없을때 까지 반복
while open_set:
_ , current = heapq.heappop( open_set ) # 가장 낮은 f_score을 가진 노드를 꺼냄 f = g + h
if current == self.goal: # 목표지점이면 경로를 복원해서 리턴
return self.reconstruct_path( came_from , current )
for move in MOVES: # 상하좌우
neighbor = ( current[0]+move[0] , current[1]+move[1] ) # 이동할 노드 탐색
if not ( 0 <= neighbor[0] < MAP_SIZE and 0 <= neighbor[1] < MAP_SIZE ): # 막히면 건너뜀
continue
if neighbor in self.constraints or self.grid[ neighbor[0] ][ neighbor[1] ] == 1: # 제약에 걸리거나, 장애물 좌표면 건너뜀
continue
temp_g_score = g_score[current] + 1 # 현재까지의 g score에서 이웃 노드로 이동하는 비용은 1이다. ( 격자이니 )
if neighbor not in g_score or temp_g_score < g_score[neighbor]: # 이웃노드가 미방문 or 더짧은 경로를 찾은 경우 업데이트
came_from[ neighbor ] = current # 이웃노드의 부모를 현재 노드로 설정( 즉, neighbor가 더 짧은 경로를 발견한것. 그리고 이어져야함 )
g_score[ neighbor ] = temp_g_score # 이웃노드의 g_score를 갱신
f_score[ neighbor ] = temp_g_score + self.heuristic( neighbor , self.goal ) # 이웃의 f_score도 갱신
heapq.heappush( open_set , ( f_score[neighbor] , neighbor ) )
return None # 모든 경로 탐색에도 목표에 도달 못한 경우
def reconstruct_path( self , came_from , current ):
path = []
while current in came_from:
path.append( current )
current = came_from[current] # 부모는 came_from에 없음
path.append(self.start)
path.reverse()
return path
# CBS 알고리즘으로 모든 로봇의 충돌없는 경로를 찾는다
class CBS:
def __init__( self , grid , starts , goals ):
self.grid = grid
self.starts = starts
self.goals = goals
def detect_conflict( self , paths ):
# 모든 경로들 중에서 가장 긴 경로의 길이를 구한다.
max_time = max( len(path) for path in paths )
# 각 시간 단계마다 모든 로봇의 위치를 확인
for t in range( max_time ):
positions = {} # 시간 단계 t에서 각 위치에 있는 로봇 번호를 저장할 딕셔너리
for i , path in enumerate( paths ):
# 각 로봇의 위치: 경로의 길이보다 시간이 크면 마지막 위치에서 대기하는 것으로 가정
pos = path[ min( t , len(path) -1 ) ]
if pos in positions: # 이미 다른 로봇이 해당위치에 존재시 충돌이 발생함.
print(f'\n충돌 발생 i is {i} , pos is {pos} , t is {t} , positions[pos] is {positions[pos]}')
return positions[pos] , i , pos , t # 충돌 정보 반환
positions[pos] = i # 현재 로봇의 위치 기록
return None # 시간 전체 확인해도 충돌 없다면 none을 반환
# 리턴은 경로들의 리스트와 충돌 해결 이유들의 리스트
def search(self):
root = { 'paths':[] , 'constraints':[] , 'reasons':[] }
print(f'초기의 root {root}')
# 각 로봇에 대해서 초기 경로(충돌 제약 조건 없이 )를 a star로 찾음
for i in range( len(self.starts) ):
astar = AStar( self.grid , self.starts[i] , self.goals[i] , set() )
path = astar.search()
if path is None:
return None , []
root['paths'].append(path)
print(f'astar를 수행하고 난 뒤의 root\n\n0번 {root['paths'][0]}\n\n1번 {root['paths'][1]}\n\n2번 {root['paths'][2]}\n\n3번 {root['paths'][3]}\n')
queue = deque( [root] ) # CBS 탐색을 위한 큐 생성 ( FIFO )
while queue:
node = queue.popleft()
conflict = self.detect_conflict( node['paths'] )
if not conflict:
return node['paths'] , node['reasons']
agent1 , agent2 , pos , time = conflict
print(f'agent1 is {agent1} agent2 is {agent2} pos is {pos} time is {time}')
for agent in [agent1 , agent2]:
new_constraints = node['constraints'][:] # 충돌시 기존 제약 조건을 복사해서 새로운 제약 조건 목록 생성
print(f'before new constraints {new_constraints}')
new_constraints.append( (agent,pos,time) ) # 현재 충돌 위치와 시간을 해당 로봇에 대한 제약 조건으로 추가
print(f'after new constraints {new_constraints}')
new_paths = node['paths'][:] # 기존의 경로들을 복사
# 해당 로봇의 제약 조건만 추려내어 set으로 전달
# 여기에서는 제약조건 튜플에서 로봇 번호가 일치하는 것의 위치만 전달함
astar = AStar( self.grid , self.starts[agent] , self.goals[agent] , { c[1] for c in new_constraints if c[0]==agent } )
new_path = astar.search()
if new_path is None:
continue
new_paths[agent] = new_path
reason=f"robot {agent} rerouted at {pos} due to conflict at time {time}"
queue.append({'paths': new_paths, 'constraints': new_constraints, 'reasons': node['reasons'] + [reason]})
return None, []
def visualize_paths_separately(paths, starts, goals, obstacles, reasons):
"""
paths: 각 로봇의 경로 리스트 (좌표 튜플의 리스트)
starts: 시작 위치 리스트
goals: 목표 위치 리스트
obstacles: 장애물 좌표들이 담긴 set
reasons: CBS 알고리즘에서 발생한 충돌 해결 사유 리스트
"""
# 각 로봇별로 별도의 그림 생성
for i, path in enumerate(paths):
plt.figure(figsize=(6, 6)) # 그림 크기 설정
# 경로를 numpy 배열로 변환 (인덱싱 용이)
path = np.array(path)
# x축은 열 인덱스, y축은 행 인덱스 (하지만 시각화를 위해 y축을 반전)
plt.plot(path[:, 1], MAP_SIZE - path[:, 0] - 1, marker='o', label=f'Robot {i}')
# 시작 지점을 검은색 사각형으로 표시
plt.scatter(starts[i][1], MAP_SIZE - starts[i][0] - 1, marker='s', color='black', s=100, label='Start')
# 목표 지점을 빨간색 별표로 표시
plt.scatter(goals[i][1], MAP_SIZE - goals[i][0] - 1, marker='*', color='red', s=150, label='Goal')
# 경로상의 각 점에 시간 스탬프(순서)를 텍스트로 표시
for j, (x, y) in enumerate(path):
plt.text(y, MAP_SIZE - x - 1, str(j), fontsize=9, ha='right', color='blue')
# 장애물 위치들을 회색 'X' 마커로 표시
# label은 첫 장애물에 대해서만 추가 (범례 중복 방지)
for obs in obstacles:
plt.scatter(obs[1], MAP_SIZE - obs[0] - 1, marker='X', color='gray', s=100,
label='Obstacle' if obs == list(obstacles)[0] else "")
plt.legend() # 범례 표시
plt.grid() # 그리드 표시
plt.title(f'Robot {i} Path') # 제목 설정
plt.show() # 그림 출력
# 모든 로봇의 충돌 해결 사유 출력
print("Conflict reasons:")
for reason in reasons:
print(reason)
if __name__ == "__main__":
# 맵을 0으로 초기화합니다. (0: 이동 가능 영역, 1: 장애물)
grid = [ [0] * MAP_SIZE for _ in range(MAP_SIZE) ]
# 격자에 무작위로 장애물을 생성하고, 장애물 좌표들을 obstacles에 저장
obstacles = generate_obstacles(grid)
# 여러 로봇의 시작점과 목표점을 미리 정의합니다.
starts = [(0, 0), (19, 19), (0, 19), (19, 0)]
goals = [(19, 19), (0, 0), (19, 0), (0, 19)]
# CBS 알고리즘 인스턴스 생성 (여러 로봇의 경로 계획)
cbs = CBS(grid, starts, goals)
# CBS 알고리즘을 실행하여 충돌 없는 경로를 탐색
solution, reasons = cbs.search()
# 경로를 찾은 경우
if solution:
# 각 로봇의 경로를 출력
for i, path in enumerate(solution):
print(f"Robot {i}: {path}")
# 각 로봇의 경로를 시각화 (각 로봇 별 그림 출력)
visualize_paths_separately(solution, starts, goals, obstacles, reasons)
else:
# 경로를 찾지 못한 경우 메시지 출력
print("No solution found.")