알고리즘

CBS( Conflict-Based Search )

정지홍 2025. 2. 10. 13:30

CBS( Conflict-Based Search )

  • multi-agent path finding문제에서 자중 사용되는 최적화 알고리즘이다. 
    각각의 로봇이 서로 충돌 없이 목표 지점까지 도달하게 경로를 찾는 문제
    • 여기서 충돌이란 롯봇이 간은 간선을 서로 다른 방향으로 지나는 경우와 서로 같은 시간에 서로 같은 위치에 도착하는 경우이다.
  • CBS의 아이디어
    • 각각의 로봇이 개별로 최단 경로를 찾게한다.
    • 만약 경로들간에 충돌conflict가 발생하며, 해당 충돌을 해결하는 새로운 경로를 찾는다.
      이를 위해서 search tree를 생성하고 분할,정복방식으로 문제를 해결한다.
  • 장점
    • 최적해를 보장
    • 충동 햐결 방식의 직관적
    • 필요한 경우에만 탐색을 수행하며 경로 계산 속도가 빠름
  • 단점
    • 경로가 많아지면 탐색 공간이 증가한다.
    • 제약이 늘어날수록 속도가 저하됨
  • 개선된 것
    • ICBS ( Improved CBS )
    • MA-CBS ( Meta-Agent CBS )

 

 

CBS( Conflict-Based Search )의 구조

  • 고수준 탐색
    • 탐색 트리 ( search tree )를 사용한다. 
      각 노드는 모든 로봇의 경로 집합과 충돌 목록을 포함한다.
      root node에서는 모든 로봇들이 충돌을 무시하고 최단 경로로 이동
      먼역 충돌이 발생하면, 이를 해결하기 위해서 트리를 분할(branching)을 한다.
  • 저수준 탐색
    • 개별 로봇의 최단 경로를 a*등을 사용해서 계산한다.
    • 만약 충돌 존재시 경로를 수정한다.

 

 

 

CBS( Conflict-Based Search )의 동작 과정

  • 1. 초기 경로 찾기
    • 각 로봇이 충돌을 고려하지 않고 개별적으로 최단 경로를 찾음. ( 주로 a*사용 )
  • 2. 모든 로봇들의 경로를 검사해서 충돌이 존재하는지 확인
    • 충돌이 없으면 최적 경로로 판단. 충돌 존재시 이를 해결해야함
  • 3. 충돌 해결 branching
    • 발견된 충돌을 해결하기 위해서 트리를 두개의 하위 문제로 나눈다. 
      각 분기branch는 한개의 로봇이 충돌을 피하게 제약을 추가한다.
    • ex) 만약 2개의 로봇이 같은 지점에서 충돌한다면...
      • 첫번째 분기로는 로봇 A가 특정 시간에 해당 위치를 못지나가게 제한
      • 두번째 분기로는 로봇 B가 해당 위치를 못지나게 제한
      • 이후에 해당 2개의 새로운 노드에 대해서 각각 저수준 탐색을 다시 수행해서 새로운 경로 찾기
        • 즉, 해결방법을 2가지로 나누어서 탐색하고 더 나은 방식을 고르는 것이다.
  • 4. 최적해를 탐색
    • 탐색 트리를 확장해나가며 비용이 가장 낮은 올바를 경로만 찾는다.
      주로 prority queue를 사용

 

import heapq                       
import matplotlib.pyplot as plt     
import numpy as np              
from collections import deque         
import random                         

MAP_SIZE = 20
MOVES = [ (0,1) , (0,-1) , (1,0) , (-1,0) ]

# grid에 장애물을 생성하는 함수이다.
# 반환하는 것은 생성된 장애물들의 집합
def generate_obstacles( grid , num_obstacles=30 ):
    obstacles = set() # 장애물 좌표를 저장할 집합이다.
    while len(obstacles) < num_obstacles:
        x,y = random.randint( 0 , MAP_SIZE-1 ) , random.randint( 0 , MAP_SIZE-1 )
        if (x,y) not in obstacles:
            obstacles.add( (x,y) )
            grid[x][y] = 1
    return obstacles
class AStar:
    def __init__( self , grid , start , goal , constraints ):
        self.grid = grid
        self.start = start
        self.goal = goal
        self.constraints = constraints # 로봇이 피해야할 위치들

    # 맨해튼 거리
    def heuristic( self , a , b ):
        return abs( a[0]-b[0] ) + abs( a[1]-b[1] )

    def search( self ):
        open_set = [] # 우선순위 큐를 초기화
        heapq.heappush( open_set , ( 0 , self.start ) ) # 시작 지점을 푸쉬한다.
        came_from = {} # 각 노드의 부모를 기록함 
        g_score = { self.start: 0 }
        f_score = { self.start: self.heuristic( self.start , self.goal ) }
        # 탐색할것이 없을때 까지 반복
        while open_set:
            _ , current = heapq.heappop( open_set ) # 가장 낮은 f_score을 가진 노드를 꺼냄 f = g + h
            if current == self.goal: # 목표지점이면 경로를 복원해서 리턴
                return self.reconstruct_path( came_from , current )
            for move in MOVES: # 상하좌우
                neighbor = ( current[0]+move[0] , current[1]+move[1] ) # 이동할 노드 탐색
                if not ( 0 <= neighbor[0] < MAP_SIZE and 0 <= neighbor[1] < MAP_SIZE ): # 막히면 건너뜀
                    continue
                if neighbor in self.constraints or self.grid[ neighbor[0] ][ neighbor[1] ] == 1: # 제약에 걸리거나, 장애물 좌표면 건너뜀
                    continue
                temp_g_score = g_score[current] + 1 # 현재까지의 g score에서 이웃 노드로 이동하는 비용은 1이다. ( 격자이니 )
                if neighbor not in g_score or temp_g_score < g_score[neighbor]: # 이웃노드가 미방문 or  더짧은 경로를 찾은 경우 업데이트
                    came_from[ neighbor ] = current # 이웃노드의 부모를 현재 노드로 설정( 즉, neighbor가 더 짧은 경로를 발견한것. 그리고 이어져야함 )
                    g_score[ neighbor ] = temp_g_score # 이웃노드의 g_score를 갱신
                    f_score[ neighbor ] = temp_g_score + self.heuristic( neighbor , self.goal ) # 이웃의 f_score도 갱신
                    heapq.heappush( open_set , ( f_score[neighbor] , neighbor ) )
        return None # 모든 경로 탐색에도 목표에 도달 못한 경우 

    def reconstruct_path( self , came_from , current ):
        path = []
        while current in came_from:
            path.append( current )
            current = came_from[current] # 부모는 came_from에 없음 
        path.append(self.start)
        path.reverse()
        return path
# CBS 알고리즘으로 모든 로봇의 충돌없는 경로를 찾는다

class CBS:
    def __init__( self , grid , starts , goals ):
        self.grid = grid
        self.starts = starts
        self.goals = goals

    def detect_conflict( self , paths ):
        # 모든 경로들 중에서 가장 긴 경로의 길이를 구한다.
        max_time = max( len(path) for path in paths )
        # 각 시간 단계마다 모든 로봇의 위치를 확인
        for t in range( max_time ):
            positions = {} # 시간 단계 t에서 각 위치에 있는 로봇 번호를 저장할 딕셔너리
            for i , path in enumerate( paths ):
                # 각 로봇의 위치: 경로의 길이보다 시간이 크면 마지막 위치에서 대기하는 것으로 가정
                pos = path[ min( t , len(path) -1 ) ]
                if pos in positions: # 이미 다른 로봇이 해당위치에 존재시 충돌이 발생함.
                    print(f'\n충돌 발생 i is {i}  ,  pos is {pos} ,   t is {t}  ,  positions[pos] is {positions[pos]}')
                    return positions[pos] , i , pos , t # 충돌 정보 반환
                positions[pos] = i # 현재 로봇의 위치 기록
        return None # 시간 전체 확인해도 충돌 없다면 none을 반환

    # 리턴은 경로들의 리스트와 충돌 해결 이유들의 리스트
    def search(self):
        root = { 'paths':[] , 'constraints':[] , 'reasons':[] }
        print(f'초기의 root {root}')
        # 각 로봇에 대해서 초기 경로(충돌 제약 조건 없이 )를 a star로 찾음
        for i in range( len(self.starts) ):
            astar = AStar( self.grid , self.starts[i] , self.goals[i] , set() )
            path = astar.search()
            if path is None:
                return None , []
            root['paths'].append(path)
        print(f'astar를 수행하고 난 뒤의 root\n\n0번 {root['paths'][0]}\n\n1번 {root['paths'][1]}\n\n2번 {root['paths'][2]}\n\n3번 {root['paths'][3]}\n')
        queue = deque( [root] ) # CBS 탐색을 위한 큐 생성 ( FIFO )
        while queue:
            node = queue.popleft()
            conflict = self.detect_conflict( node['paths'] )
            if not conflict:
                return node['paths'] , node['reasons']
            agent1 , agent2 , pos , time = conflict
            print(f'agent1 is {agent1}   agent2 is {agent2}   pos is {pos}   time is {time}')
            for agent in [agent1 , agent2]:
                new_constraints = node['constraints'][:] # 충돌시 기존 제약 조건을 복사해서 새로운 제약 조건 목록 생성
                print(f'before new constraints {new_constraints}')
                new_constraints.append( (agent,pos,time) ) # 현재 충돌 위치와 시간을 해당 로봇에 대한 제약 조건으로 추가
                print(f'after new constraints {new_constraints}')
                new_paths = node['paths'][:] # 기존의 경로들을 복사
                # 해당 로봇의 제약 조건만 추려내어 set으로 전달
                # 여기에서는 제약조건 튜플에서 로봇 번호가 일치하는 것의 위치만 전달함 
                astar = AStar( self.grid , self.starts[agent] , self.goals[agent] , { c[1] for c in new_constraints if c[0]==agent } )
                new_path = astar.search()
                if new_path is None:
                    continue
                new_paths[agent] = new_path
                reason=f"robot {agent} rerouted at {pos} due to conflict at time {time}"
                queue.append({'paths': new_paths, 'constraints': new_constraints, 'reasons': node['reasons'] + [reason]})
        return None, []
def visualize_paths_separately(paths, starts, goals, obstacles, reasons):
    """
    paths: 각 로봇의 경로 리스트 (좌표 튜플의 리스트)
    starts: 시작 위치 리스트
    goals: 목표 위치 리스트
    obstacles: 장애물 좌표들이 담긴 set
    reasons: CBS 알고리즘에서 발생한 충돌 해결 사유 리스트
    """
    # 각 로봇별로 별도의 그림 생성
    for i, path in enumerate(paths):
        plt.figure(figsize=(6, 6))  # 그림 크기 설정
        
        # 경로를 numpy 배열로 변환 (인덱싱 용이)
        path = np.array(path)
        # x축은 열 인덱스, y축은 행 인덱스 (하지만 시각화를 위해 y축을 반전)
        plt.plot(path[:, 1], MAP_SIZE - path[:, 0] - 1, marker='o', label=f'Robot {i}')
        
        # 시작 지점을 검은색 사각형으로 표시
        plt.scatter(starts[i][1], MAP_SIZE - starts[i][0] - 1, marker='s', color='black', s=100, label='Start')
        # 목표 지점을 빨간색 별표로 표시
        plt.scatter(goals[i][1], MAP_SIZE - goals[i][0] - 1, marker='*', color='red', s=150, label='Goal')
        
        # 경로상의 각 점에 시간 스탬프(순서)를 텍스트로 표시
        for j, (x, y) in enumerate(path):
            plt.text(y, MAP_SIZE - x - 1, str(j), fontsize=9, ha='right', color='blue')
        
        # 장애물 위치들을 회색 'X' 마커로 표시
        # label은 첫 장애물에 대해서만 추가 (범례 중복 방지)
        for obs in obstacles:
            plt.scatter(obs[1], MAP_SIZE - obs[0] - 1, marker='X', color='gray', s=100,
                        label='Obstacle' if obs == list(obstacles)[0] else "")
        
        plt.legend()  # 범례 표시
        plt.grid()    # 그리드 표시
        plt.title(f'Robot {i} Path')  # 제목 설정
        plt.show()   # 그림 출력
    
    # 모든 로봇의 충돌 해결 사유 출력
    print("Conflict reasons:")
    for reason in reasons:
        print(reason)
if __name__ == "__main__":
    # 맵을 0으로 초기화합니다. (0: 이동 가능 영역, 1: 장애물)
    grid = [ [0] * MAP_SIZE for _ in range(MAP_SIZE) ]
    
    # 격자에 무작위로 장애물을 생성하고, 장애물 좌표들을 obstacles에 저장
    obstacles = generate_obstacles(grid)
    
    # 여러 로봇의 시작점과 목표점을 미리 정의합니다.
    starts = [(0, 0), (19, 19), (0, 19), (19, 0)]
    goals = [(19, 19), (0, 0), (19, 0), (0, 19)]
    
    # CBS 알고리즘 인스턴스 생성 (여러 로봇의 경로 계획)
    cbs = CBS(grid, starts, goals)
    
    # CBS 알고리즘을 실행하여 충돌 없는 경로를 탐색
    solution, reasons = cbs.search()
    
    # 경로를 찾은 경우
    if solution:
        # 각 로봇의 경로를 출력
        for i, path in enumerate(solution):
            print(f"Robot {i}: {path}")
        # 각 로봇의 경로를 시각화 (각 로봇 별 그림 출력)
        visualize_paths_separately(solution, starts, goals, obstacles, reasons)
    else:
        # 경로를 찾지 못한 경우 메시지 출력
        print("No solution found.")

'알고리즘' 카테고리의 다른 글

GTSP , TSP ( Traveling Salesman Problem )  (0) 2025.02.14
Artificial Potential Field (APF)  (0) 2025.02.11
PRM ( probabilistic Roadmap )  (0) 2025.02.06
RRT (Rapidly-Exploring Random Tree)  (0) 2025.02.03
d star 알고리즘  (0) 2025.02.02