알고리즘

Lazy PRM

정지홍 2025. 2. 19. 13:58

Lazy PRM

  • PRM의 변형이며, 충돌 검사를 최소화해서 효율성을 높인다.
    • 기존의 PRM은 sampling단계에서 node를 generate하고, 연결가능한 path를 만든 다음에 collision-check를 한다. 하지만 Lazy PRM은 이 과정에서 collision-check를 지연(lazy)시켜서 성능을 향상시킨다.
  • 핵심 기본 개념
    • 1. 기존의 PRM에서는 sampling된 node들간의 연결힐때, 모든 edge에 대해서 collision-check를 하며, 유효한 path를 저장한다. 하지만 이 방식을 collision-check가 매우 많아지면서 연산량이 증가하는 문제가 존재한다.
      그래서 Lazy PRM은 이를 개선하기 위해서 collision-check를 lazy시킨다.
      • 즉, 다시말하면, path를 찾을 때까지는 edge를 무조건 유효하다고 가정하며, 실제로 path를 추출하는 순간에만 collision-check를 수행한다. 만약에 collision이 발생한다면, 해당 edge를 제거하고 다시 경로를 탐색하는 방식이다. 
        • ( 게으르게 최대한으로 collision-check를 미루다가, 마지막에 edge연결해야할때 충돌하는지 check하는것. )
  • 장점
    • 충돌 검사를 늦추면서도 성능을 향상시킬수있음
    • sampling된 노드가 많은 경우에는 , PRM보다 더 빠르게 경로 찾는것이 가능
    • 대규모 환경에서 연산량을 효과적으로 줄이는 것이 가능하다.
  • 단점
    • 처음 찾은 경로가 충돌이 많다면 반복적으로 재탐색 해야하니, 이러한 경우에는 성능이 떨어진다.
    • 충돌이 많은 환경에서는 기존의 PRM보다 더 많은 탐색을 할 가능성이 존재한다.

 

 

 

Lazy PRM의 작동 방식

  • 1. sampling 단계
    • 로봇이 이동할 수 있는 workspace에서 무작위로 node를 generate한다.
    • 샘플링 방법
      • 1. uniform sampling 균일 샘플링
        • 공간 내에서 무작위로 고르게 분포된 점을 선택하는 방식. ( 단, 장애물이 많으면 쓸데없는 sample이 많아질수있음. )
      • 2. Gaussian sampling 가우시안 샘플링
        • 장애물의 가장자리 근처에 sample을 많이 배치하는 방식. ( 장애물 회피가 중요한 환경에서 효과적 )
      • 3. bridge sampling 다리 샘플링
        • 장애물과 장애물 사이의 빈 공간에 sample을 생성하는 방식. ( 장애물 사이를 연결하는 경로를 만들기가 좋음 )
      • =====> lazy prm도 일반적인 prm처럼, 여러한 방법중에 하나를 선택해서 sample을 생성한다. 보통 uniform sampling 을 많이 사용하지만, 환경에 따라서 최적의 sampling 방법을 선택해야한다. 
  • 2. graph construction 
    • sampling된 node를 graph의 정점으로 사용하여, 가까운 node 끼리 연결한다.
    • 연결 기준은 가까운 node끼리 연결해야한다.
      • 1. k nearest neighbors ( K NN )
        • 각 노드에서 가장 가까운 k개의 노드를 찾는다. ( k개를 실험적으로 설정해야한다. 너무 크면 계산량이 많아지며, 작으면 그래프의 연결성이 약해짐. )
      • 2. radius-based connection
        • 노드 간 거리가 특정 반경 r 이내일 때만 연결하는 방식. ( 이것도 적절한 r값을 설정해야 효율적인 그래프 생성 가능 )
      • 3. knn + radius
        • 위의 두 방법을 조합하는 방식
      • ===> 핵심적 차이점은 PRM과 다르게 collision-check를 하지 않는다. 단순히 노드간의 연결만 생성하고 path에 대한 유효성 검증은 이후에 한다.

 

 

  • 3. 경로 탐색 단계 initial path search
    • 위에서 생성한 그래프를 가지고, 시작점과 목표점을 연결하는 경로를 찾는다. ( a* or dijkstra같은 알고리즘 사용 )
      • 여기 단계에서도 충돌 검사는 안한다. 즉, 경로를 탐색할때도 모든 간선을 "유효하다"라고 가정한다.

 

 

  • 4. Lazy collision checking 경로 검증 단계
    • 알고리즘이 반환한 경로를 실제 사용할수 있는지 확인한다. ( 해당 단계가 Lazy PRM의 핵심 )
      • 각 간선을 하나씩 확인한다. 그리고 해당 간선이 충돌이 존재하는지 확인하고, 충돌이 없으면 간선을 유지하며, 충돌이 발생하면 해당 간선을 제거하고 새로운 경로를 다시 찾는다. ( 이 과정을 유효한 경로 발견까지 반복한다.)
    • 해당 단계에서 충돌 검사가 최소화 되니, 기존의 PRM보다 연산량이 감소함.
      • 하지만 처음 찾은 경로에 충돌이 많으면 반복적인 탐색이 필요.
    • 충돌검사 방법...
      • 1. Ray Casting
        • edge가 직선이라고 가정하고, 공간 내에서 해당 직선이 장애물과 교차하는지 확인하는 방식.
          3D 환경에서는 ray tracing 기법을 활용할수도 있음
      • 2. Voxel Grid 
        • 환경을 격자로 나누고, 간선이 지나가는 격자 셀이 장애물인지 아닌지 확인하는 방식 
      • 3. Bounding Volume Hierachy ( BVH )
        • 복잡한 환경에서 빠르게 충돌 검사를 수행하는 계층적 충돌 검사 방법

 

 

  • 5. 반복 수행 Re-planning
    •  경로의 검증과정에서 충돌이 발견된다면, 해당 간선을 삭제하고, 경로를 다시 찾는다.
    • 재탐색 방법
      • 1. 충돌이 발생한 간선을 제거하고, A*와 같은 알고리즘을 다시 실행.
      • 2. 새로운 path를 찾고, 다시 collision-check를 수행
      • 3. 충돌이 없는 path가 발견되면 경로를 반환하고, 그렇지 않다면 다시 재탐색 수행.
      • 4. 특정 횟수 이상 반복될 경우에는 실패를 반환한다.
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from scipy.spatial import KDTree
from queue import PriorityQueue
from matplotlib.animation import FuncAnimation
class LazyPRM:
    def __init__( self , sample_size=100 , k=10 , collision_check_fn=None , num_obstacles=15 ):
        self.sample_size = sample_size # 샘플링 할 노드 개수
        self.k = k # k 최근접 이웃 연결 갯수
        self.collision_check_fn = collision_check_fn # 충돌 검사 함수
        self.graph = nx.Graph() # 네트워크 그래프 생성
        self.samples = [] # 샘플링된 노드 리스트
        self.kd_tree = None # KDTree를 저장할 변수. 여기에는 미리 샘플링된 노드에 대한 좌표 정보가 들어감 
        self.num_obstacles = max( 6 , num_obstacles ) # 최소 6개의 장애물 설정
        self.obstacles = self._generate_obstacles() # 장애물 생성 

    # 원형과 사각형 장애물을 무작위로 생성 
    def _generate_obstacles( self ):
        obstacles = []
        for _ in range( self.num_obstacles ):
            shape = np.random.choice( ["circle" , "rectangle"] ) # 장애물 유형 선택
            center = np.random.uniform( 2 , 8 , 2 ) # 장애물 중심 좌표
            size = np.random.uniform( 0.5 , 1.5 ) # 장애물 크기 
            obstacles.append( (shape , center , size) )
        #print(f'generate_obstacles함수입니다... {self.num_obstacles}개의 장애물을 무작위로 생성했습니다.')
        print(obstacles)
        print()
        return obstacles

    # 균등 분포를 이용한 샘플링
    def sample_points( self , bounds ): 
       # print(f'sample_points함수... 균둥 분포를 이용한 샘플링을 수행합니다.')
        self.samples = np.random.uniform( bounds[0] , bounds[1] , (self.sample_size , 2 ) ) # bounds는 샘플링할 좌표의 범위이다. sample_size행 2열의 배열 생성
        #print(f'\n\n샘플링된 노드 리스트를 출력합니다.\n{self.samples}')
        self.kd_tree = KDTree( self.samples )
        for i , point in enumerate( self.samples ):
            self.graph.add_node( i , pos=tuple(point) )
       # print(f'{self.graph}가 생성한 그래프이며 {self.graph.nodes}')

    # 각 노드를 k개의 최근접 이웃과 연결 
    def connect_nodes( self ):
        for i , point in enumerate( self.samples ):
            # KDTree의 query함수는 특징점 point에서 가장 가까운 k+1개의 이웃을 찾아준다. 민약 k=1로 하면 자기 자신만 찾음 
            # 반환해주는 distances는 point에서 k+1개의 노드까지의 거리 리스트. neighbors는 k+1개의 최근접 노드의 인덱스 리스트 
            distances , neighbors = self.kd_tree.query( point , self.k + 1 ) # KDTree를 이용한 최근접 이웃 탐색
            for j in range( 1 , len(neighbors) ):
                # 자기 자신이 아니면서, 충돌이 없는 경우에만, 해당 노드를 그래프에 연결한다.
                if neighbors[j] != i and self.is_collision_free( self.samples[i] , self.samples[ neighbors[j] ] ):
                    self.graph.add_edge( i , neighbors[j] , weight=distances[j] )

    # 두 점 사이 직선이 장애물과  충돌하는지 검사 
    def is_collision_free( self , p1 , p2 ):
        for shape , center , size in self.obstacles:
            if shape == "circle":
                if np.linalg.norm( np.cross( p2-p1 , p1-center ) ) / np.linalg.norm( p2-p1 ) < size:
                    return False
            elif shape == "rectangle":
                if all( np.abs( p1-center ) < size ) or all( np.abs( p2-center ) < size ):
                    return False
        return True

    # A star 알고리즘으로 최단 경로를 탐색합니다.
    def find_path( self , start , goal ):
        nearst_node_of_start_idx = self._find_nearest( start ) # 시작점에서 가장 가까운 '샘플링한 노드'를 찾습니다.
        nearst_node_of_goal_idx = self._find_nearest( goal ) # 목표지점에서 가장 가까운 '샘플힝 한 노드'를 찾습니다.
        self.graph.add_node("start" , pos=tuple(start) )
        self.graph.add_node("goal" , pos=tuple(goal) )
        # graph_add_edge( u , v , weight=w  )
        # u와 v는 선분을 이을 2개의 노드이다. weight는 간선의 가중치이다. 여기서는 두 점 사이의 거리이다.
        self.graph.add_edge("start", nearst_node_of_start_idx , weight=np.linalg.norm(np.array(start) - self.samples[nearst_node_of_start_idx]))
        self.graph.add_edge("goal", nearst_node_of_goal_idx , weight=np.linalg.norm(np.array(goal) - self.samples[nearst_node_of_goal_idx]))

        while True:
            try:
                # shortest_path함수는 source에서 targer까지의 최단 경로를 찾는다. 여기서 weight가 최단 경로를 찾는데 사용된다.
                # 만약에 none으로 설정한다면, 가중치 정보는 무시하고 간선의 개수를 기준으로 찾는다. 즉, 간선의 개수가 적은 경로를 선택함 
                # 이동하는 경로들의 인덱스를 차례대로 저장한 리스트를 반환
                path = nx.shortest_path( self.graph , source="start" , target="goal" , weight='weight' )
                valid , invalid_edge = self.lazy_validate(path)
                if valid:
                    return path
                self.graph.remove_edge( *invalid_edge )
            except nx.NetworkXNoPath:
                return None

    def _find_nearest( self , point ):
        _ , near_node_idx = self.kd_tree.query( point ) # point에서 가장 가까운 점을 찾습니다.
        return near_node_idx  # 거리에 대한 정보를 제외하고, 가장 가까운 노드의 인덱스만 리턴함.

    def lazy_validate( self , path ):
        for i in range( len(path) - 1 ):
            u , v = path[i] , path[i+1]
            if not self.is_collision_free( np.array( self.graph.nodes[u]['pos']) , np.array( self.graph.nodes[v]['pos'] ) ):
                self.graph.remove_edge( u , v )
                return False , ( u,v)
        return True , None

    def visualize(self, path=None):
        """PRM의 노드와 경로를 시각화"""
        plt.figure(figsize=(20, 20), dpi=100)
        pos = nx.get_node_attributes(self.graph, 'pos')
        nx.draw(self.graph, pos, node_size=20, edge_color='gray', alpha=0.5)
        
        if path:
            path_edges = [(path[i], path[i + 1]) for i in range(len(path) - 1)]
            nx.draw_networkx_edges(self.graph, pos, edgelist=path_edges, edge_color='red', width=2)
        
        plt.scatter(*zip(*self.samples), c='blue', s=10)
        plt.scatter(*self.graph.nodes["start"]['pos'], c='green', s=100, marker='s', label='Start')
        plt.scatter(*self.graph.nodes["goal"]['pos'], c='red', s=100, marker='o', label='Goal')
        
        for shape, center, size in self.obstacles:
            if shape == "circle":
                circle = plt.Circle(center, size, color='black', alpha=0.5)
                plt.gca().add_patch(circle)
            elif shape == "rectangle":
                rect = plt.Rectangle(center - size / 2, size, size, color='black', alpha=0.5)
                plt.gca().add_patch(rect)
        
        plt.legend()
        plt.xlim(0, 10)
        plt.ylim(0, 10)
        plt.show()
lazy_prm = LazyPRM(sample_size=500, k=10, num_obstacles=15) # 객체 생성
lazy_prm.sample_points(bounds=((0, 0), (12, 12))) # bounds는 샘플링할 좌표의 범위를 지정하는 튜플. bounds[0]은 최소 좌표(왼쪽 아래), bounds[1]은 최대 좌표(오른쪽 위)
lazy_prm.connect_nodes()
path = lazy_prm.find_path((1, 1), (9, 9)) 
print("Found path (node indices):", path)
lazy_prm.visualize(path)