알고리즘
CBS( Conflict-Based Search )
정지홍
2025. 2. 10. 13:30
CBS( Conflict-Based Search )
- multi-agent path finding문제에서 자중 사용되는 최적화 알고리즘이다.
각각의 로봇이 서로 충돌 없이 목표 지점까지 도달하게 경로를 찾는 문제- 여기서 충돌이란 롯봇이 간은 간선을 서로 다른 방향으로 지나는 경우와 서로 같은 시간에 서로 같은 위치에 도착하는 경우이다.
- CBS의 아이디어
- 각각의 로봇이 개별로 최단 경로를 찾게한다.
- 만약 경로들간에 충돌conflict가 발생하며, 해당 충돌을 해결하는 새로운 경로를 찾는다.
이를 위해서 search tree를 생성하고 분할,정복방식으로 문제를 해결한다.
- 장점
- 최적해를 보장
- 충동 햐결 방식의 직관적
- 필요한 경우에만 탐색을 수행하며 경로 계산 속도가 빠름
- 단점
- 경로가 많아지면 탐색 공간이 증가한다.
- 제약이 늘어날수록 속도가 저하됨
- 개선된 것
- ICBS ( Improved CBS )
- MA-CBS ( Meta-Agent CBS )
CBS( Conflict-Based Search )의 구조
- 고수준 탐색
- 탐색 트리 ( search tree )를 사용한다.
각 노드는 모든 로봇의 경로 집합과 충돌 목록을 포함한다.
root node에서는 모든 로봇들이 충돌을 무시하고 최단 경로로 이동
먼역 충돌이 발생하면, 이를 해결하기 위해서 트리를 분할(branching)을 한다.
- 탐색 트리 ( search tree )를 사용한다.
- 저수준 탐색
- 개별 로봇의 최단 경로를 a*등을 사용해서 계산한다.
- 만약 충돌 존재시 경로를 수정한다.
CBS( Conflict-Based Search )의 동작 과정
- 1. 초기 경로 찾기
- 각 로봇이 충돌을 고려하지 않고 개별적으로 최단 경로를 찾음. ( 주로 a*사용 )
- 2. 모든 로봇들의 경로를 검사해서 충돌이 존재하는지 확인
- 충돌이 없으면 최적 경로로 판단. 충돌 존재시 이를 해결해야함
- 3. 충돌 해결 branching
- 발견된 충돌을 해결하기 위해서 트리를 두개의 하위 문제로 나눈다.
각 분기branch는 한개의 로봇이 충돌을 피하게 제약을 추가한다. - ex) 만약 2개의 로봇이 같은 지점에서 충돌한다면...
- 첫번째 분기로는 로봇 A가 특정 시간에 해당 위치를 못지나가게 제한
- 두번째 분기로는 로봇 B가 해당 위치를 못지나게 제한
- 이후에 해당 2개의 새로운 노드에 대해서 각각 저수준 탐색을 다시 수행해서 새로운 경로 찾기
- 즉, 해결방법을 2가지로 나누어서 탐색하고 더 나은 방식을 고르는 것이다.
- 발견된 충돌을 해결하기 위해서 트리를 두개의 하위 문제로 나눈다.
- 4. 최적해를 탐색
- 탐색 트리를 확장해나가며 비용이 가장 낮은 올바를 경로만 찾는다.
주로 prority queue를 사용
- 탐색 트리를 확장해나가며 비용이 가장 낮은 올바를 경로만 찾는다.
import heapq
import matplotlib.pyplot as plt
import numpy as np
from collections import deque
import random
MAP_SIZE = 20
MOVES = [ (0,1) , (0,-1) , (1,0) , (-1,0) ]
# grid에 장애물을 생성하는 함수이다.
# 반환하는 것은 생성된 장애물들의 집합
def generate_obstacles( grid , num_obstacles=30 ):
obstacles = set() # 장애물 좌표를 저장할 집합이다.
while len(obstacles) < num_obstacles:
x,y = random.randint( 0 , MAP_SIZE-1 ) , random.randint( 0 , MAP_SIZE-1 )
if (x,y) not in obstacles:
obstacles.add( (x,y) )
grid[x][y] = 1
return obstacles
class AStar:
def __init__( self , grid , start , goal , constraints ):
self.grid = grid
self.start = start
self.goal = goal
self.constraints = constraints # 로봇이 피해야할 위치들
# 맨해튼 거리
def heuristic( self , a , b ):
return abs( a[0]-b[0] ) + abs( a[1]-b[1] )
def search( self ):
open_set = [] # 우선순위 큐를 초기화
heapq.heappush( open_set , ( 0 , self.start ) ) # 시작 지점을 푸쉬한다.
came_from = {} # 각 노드의 부모를 기록함
g_score = { self.start: 0 }
f_score = { self.start: self.heuristic( self.start , self.goal ) }
# 탐색할것이 없을때 까지 반복
while open_set:
_ , current = heapq.heappop( open_set ) # 가장 낮은 f_score을 가진 노드를 꺼냄 f = g + h
if current == self.goal: # 목표지점이면 경로를 복원해서 리턴
return self.reconstruct_path( came_from , current )
for move in MOVES: # 상하좌우
neighbor = ( current[0]+move[0] , current[1]+move[1] ) # 이동할 노드 탐색
if not ( 0 <= neighbor[0] < MAP_SIZE and 0 <= neighbor[1] < MAP_SIZE ): # 막히면 건너뜀
continue
if neighbor in self.constraints or self.grid[ neighbor[0] ][ neighbor[1] ] == 1: # 제약에 걸리거나, 장애물 좌표면 건너뜀
continue
temp_g_score = g_score[current] + 1 # 현재까지의 g score에서 이웃 노드로 이동하는 비용은 1이다. ( 격자이니 )
if neighbor not in g_score or temp_g_score < g_score[neighbor]: # 이웃노드가 미방문 or 더짧은 경로를 찾은 경우 업데이트
came_from[ neighbor ] = current # 이웃노드의 부모를 현재 노드로 설정( 즉, neighbor가 더 짧은 경로를 발견한것. 그리고 이어져야함 )
g_score[ neighbor ] = temp_g_score # 이웃노드의 g_score를 갱신
f_score[ neighbor ] = temp_g_score + self.heuristic( neighbor , self.goal ) # 이웃의 f_score도 갱신
heapq.heappush( open_set , ( f_score[neighbor] , neighbor ) )
return None # 모든 경로 탐색에도 목표에 도달 못한 경우
def reconstruct_path( self , came_from , current ):
path = []
while current in came_from:
path.append( current )
current = came_from[current] # 부모는 came_from에 없음
path.append(self.start)
path.reverse()
return path
# CBS 알고리즘으로 모든 로봇의 충돌없는 경로를 찾는다
class CBS:
def __init__( self , grid , starts , goals ):
self.grid = grid
self.starts = starts
self.goals = goals
def detect_conflict( self , paths ):
# 모든 경로들 중에서 가장 긴 경로의 길이를 구한다.
max_time = max( len(path) for path in paths )
# 각 시간 단계마다 모든 로봇의 위치를 확인
for t in range( max_time ):
positions = {} # 시간 단계 t에서 각 위치에 있는 로봇 번호를 저장할 딕셔너리
for i , path in enumerate( paths ):
# 각 로봇의 위치: 경로의 길이보다 시간이 크면 마지막 위치에서 대기하는 것으로 가정
pos = path[ min( t , len(path) -1 ) ]
if pos in positions: # 이미 다른 로봇이 해당위치에 존재시 충돌이 발생함.
print(f'\n충돌 발생 i is {i} , pos is {pos} , t is {t} , positions[pos] is {positions[pos]}')
return positions[pos] , i , pos , t # 충돌 정보 반환
positions[pos] = i # 현재 로봇의 위치 기록
return None # 시간 전체 확인해도 충돌 없다면 none을 반환
# 리턴은 경로들의 리스트와 충돌 해결 이유들의 리스트
def search(self):
root = { 'paths':[] , 'constraints':[] , 'reasons':[] }
print(f'초기의 root {root}')
# 각 로봇에 대해서 초기 경로(충돌 제약 조건 없이 )를 a star로 찾음
for i in range( len(self.starts) ):
astar = AStar( self.grid , self.starts[i] , self.goals[i] , set() )
path = astar.search()
if path is None:
return None , []
root['paths'].append(path)
print(f'astar를 수행하고 난 뒤의 root\n\n0번 {root['paths'][0]}\n\n1번 {root['paths'][1]}\n\n2번 {root['paths'][2]}\n\n3번 {root['paths'][3]}\n')
queue = deque( [root] ) # CBS 탐색을 위한 큐 생성 ( FIFO )
while queue:
node = queue.popleft()
conflict = self.detect_conflict( node['paths'] )
if not conflict:
return node['paths'] , node['reasons']
agent1 , agent2 , pos , time = conflict
print(f'agent1 is {agent1} agent2 is {agent2} pos is {pos} time is {time}')
for agent in [agent1 , agent2]:
new_constraints = node['constraints'][:] # 충돌시 기존 제약 조건을 복사해서 새로운 제약 조건 목록 생성
print(f'before new constraints {new_constraints}')
new_constraints.append( (agent,pos,time) ) # 현재 충돌 위치와 시간을 해당 로봇에 대한 제약 조건으로 추가
print(f'after new constraints {new_constraints}')
new_paths = node['paths'][:] # 기존의 경로들을 복사
# 해당 로봇의 제약 조건만 추려내어 set으로 전달
# 여기에서는 제약조건 튜플에서 로봇 번호가 일치하는 것의 위치만 전달함
astar = AStar( self.grid , self.starts[agent] , self.goals[agent] , { c[1] for c in new_constraints if c[0]==agent } )
new_path = astar.search()
if new_path is None:
continue
new_paths[agent] = new_path
reason=f"robot {agent} rerouted at {pos} due to conflict at time {time}"
queue.append({'paths': new_paths, 'constraints': new_constraints, 'reasons': node['reasons'] + [reason]})
return None, []
def visualize_paths_separately(paths, starts, goals, obstacles, reasons):
"""
paths: 각 로봇의 경로 리스트 (좌표 튜플의 리스트)
starts: 시작 위치 리스트
goals: 목표 위치 리스트
obstacles: 장애물 좌표들이 담긴 set
reasons: CBS 알고리즘에서 발생한 충돌 해결 사유 리스트
"""
# 각 로봇별로 별도의 그림 생성
for i, path in enumerate(paths):
plt.figure(figsize=(6, 6)) # 그림 크기 설정
# 경로를 numpy 배열로 변환 (인덱싱 용이)
path = np.array(path)
# x축은 열 인덱스, y축은 행 인덱스 (하지만 시각화를 위해 y축을 반전)
plt.plot(path[:, 1], MAP_SIZE - path[:, 0] - 1, marker='o', label=f'Robot {i}')
# 시작 지점을 검은색 사각형으로 표시
plt.scatter(starts[i][1], MAP_SIZE - starts[i][0] - 1, marker='s', color='black', s=100, label='Start')
# 목표 지점을 빨간색 별표로 표시
plt.scatter(goals[i][1], MAP_SIZE - goals[i][0] - 1, marker='*', color='red', s=150, label='Goal')
# 경로상의 각 점에 시간 스탬프(순서)를 텍스트로 표시
for j, (x, y) in enumerate(path):
plt.text(y, MAP_SIZE - x - 1, str(j), fontsize=9, ha='right', color='blue')
# 장애물 위치들을 회색 'X' 마커로 표시
# label은 첫 장애물에 대해서만 추가 (범례 중복 방지)
for obs in obstacles:
plt.scatter(obs[1], MAP_SIZE - obs[0] - 1, marker='X', color='gray', s=100,
label='Obstacle' if obs == list(obstacles)[0] else "")
plt.legend() # 범례 표시
plt.grid() # 그리드 표시
plt.title(f'Robot {i} Path') # 제목 설정
plt.show() # 그림 출력
# 모든 로봇의 충돌 해결 사유 출력
print("Conflict reasons:")
for reason in reasons:
print(reason)
if __name__ == "__main__":
# 맵을 0으로 초기화합니다. (0: 이동 가능 영역, 1: 장애물)
grid = [ [0] * MAP_SIZE for _ in range(MAP_SIZE) ]
# 격자에 무작위로 장애물을 생성하고, 장애물 좌표들을 obstacles에 저장
obstacles = generate_obstacles(grid)
# 여러 로봇의 시작점과 목표점을 미리 정의합니다.
starts = [(0, 0), (19, 19), (0, 19), (19, 0)]
goals = [(19, 19), (0, 0), (19, 0), (0, 19)]
# CBS 알고리즘 인스턴스 생성 (여러 로봇의 경로 계획)
cbs = CBS(grid, starts, goals)
# CBS 알고리즘을 실행하여 충돌 없는 경로를 탐색
solution, reasons = cbs.search()
# 경로를 찾은 경우
if solution:
# 각 로봇의 경로를 출력
for i, path in enumerate(solution):
print(f"Robot {i}: {path}")
# 각 로봇의 경로를 시각화 (각 로봇 별 그림 출력)
visualize_paths_separately(solution, starts, goals, obstacles, reasons)
else:
# 경로를 찾지 못한 경우 메시지 출력
print("No solution found.")

