알고리즘

[RRT] RRT - Connect

정지홍 2025. 4. 11. 13:06

RRT - Connect : An efficient Approach to single-Query path planning

 

RRT - Connect : An efficient Approach to single-Query path planning

Steven M. LaValle Steven M. LaValleRapidly exploring Random Trees (RRTs) Back in June 1998, I introduced the RRT (see this Iowa State tech report), which is a simple, iterative algorithm that quickly searches complicated, high-dimensional spaces for feasi

jihong.tistory.com

 

RRT - Connect

  • RRT를 개선한 양뱡향 탐색 기반의 path planning algorithm이다.
  • 기존의 RRT보다 빠르며 더 좋은 경로를 찾는데 효과적이다. 그리고 connect를 통해서 자연스러운 경로가 나온다. 
    • 특히, 고차원 공간이나 장애물이 많은 환경에서도 잘 작동한다.
  • 핵심 아이디어
    • 양방향 트리 : start에서 자라는 트리와 goal에서 자라는 트리를 동시에 확장한다.
    • connect 전략 : 트리가 확장될때, 가능한 한 goal을 향해 직선으로 최대한 많이 확장하려고 시도한다.
      • 예를 들어, q_near가 q_rand방향으로 확장하다가 충돌이 없다면 계속 q_rand방향으로 확장한다는것이다.
    • 만약에 start에서 자라는 트리와 goal에서 자라는 트리가 연결되면, 전체 경로가 완성된다.
  • 특징
    • 1. start와 goal에서 트리를 자라게 해서 더 빠르게 경로가 생성될수있다.
    • 2. connect 전략으로 일반적인 rrt보다 공격적인 탐색이 가능하다.
    • 3. 고차원에 적합하며, rrt기반이니 역시 무작위성이 있다.
  • 기본적인 용어
    • q_start : 시작 지점 
    • q_goal : 목표 지점 
    • T_start , T_goal : 각각 시작/목표에서 자라는 트리
    • q_rand : 무작위로 샘플링된 점
    • q_near : 트리에서 q_rand와 가장 가까운 노드
    • q_new : q_near에서 q_rand를 향해서 확장한 새 노드( q_new는 q_rand와 q_near를 이은 edge상에 존재하는것. 단, 장애물이 없는 구간에서 )
    • Trapped : 장애물등에 막혀서 한 발자국도 못나감.
    • Advanced : 한 걸음 전진짐. ( 단, goal에는 도달을 못함 )
    • Reached : goal에 도달하였음.
    • Extend : 한번 ( step_size )만 트리를 확장한다. 즉, '한 걸음'만 뻗어나가서 새 노드를 생성.
    • Connect : 목표 방행으로 갈수있음 만큼 계속 확장한다. 즉, '여러 걸음'을 한번에 진행해서 공격적으로 전진.
  • 알고리즘은 전반적인 흐름
    • 1. T_start에서 q_rand로 한번 확장 ( Extend )
    • 2. T_goal에서 q_new를 향해서 가능한 계속 연결을 시도 ( Connect )
    • 3. 연결이 된다면, 양쪽 트리를 이어서 전체 경로 생성
    • 4. 매 반복마다, 두 트리를 swap해가며 번갈아가며 탐색한다.
      • swap을 통해서 start쪽 트리와 goal쪽 트리가 번갈아가며 확장됨.
def Extend( tree , q_rand ):
	q_near = Nearest( tree , q_rand )
        q_new = Steer( q_near , q_rand )
        if CollisionFree( q_near , q_new ):
            tree.add( q_new )
            return q_new
        else:
            return None

 

def Connect( T , q_target ):
	while True:
            result = Extend( T , q_target )
            if result is None: 
            	return "Trapped"
            if result == q_target: 
            	return "Reached"

 

T_start <= {q_start}
T_goal <= {q_goal}

while not path_found:
	q_rand = RandomSampling()
        q_new = Extend( T_start , q_rand )
        if q_new != None:
            result = Connect( T_goal , q_new )
            if result == Reached:
                path_found=True
        swap( T_start , T_goal )

위와 같이 서로를 향해서 나아갈것이다.
결과1
결과2
결과3

import random
import math
import matplotlib.pyplot as plt

class Map2D:
    def __init__( self , width=10 , height=10 ):
        self.width = width
        self.height = height
        self.obstacles = [ (2, 10, 3, 2) , (6,0,7,8,) ]
    def is_in_obstacle( self , x , y ):
        for ( x_min , y_min , x_max , y_max ) in self.obstacles:
            if( x_min <= x_max) and ( y_min <= y_max ):
                if ( x_min <= x <= x_max ) and ( y_min <= y <= y_max ):
                    return True
            elif ( x_max <= x_min ) and ( y_min <= y_max ):
                if ( x_max <= x <= x_min ) and ( y_min <= y <= y_max ):
                    return True
            elif ( x_max <= x_min ) and ( y_max <= y_min ):
                if ( x_max <= x <= x_min ) and ( y_max <= y <= y_min ):
                    return True
            elif ( x_min <= x_max ) and ( y_max <= y_min ):
                if ( x_min <= x <= x_max ) and ( y_max <= y <= y_min ):
                    return True
        return False
    def plot_map( self ):
        fig , ax = plt.subplots()
        ax.set_xlim( 0 , self.width )
        ax.set_ylim( 0 , self.height )
        ax.set_aspect("equal" , adjustable="box" )
        ax.set_title("2d map")
        for ( x_min , y_min , x_max , y_max ) in self.obstacles:
            rect_width = x_max - x_min
            rect_height = y_max - y_min
            obstacle_rect = plt.Rectangle( (x_min,y_min) , rect_width , rect_height , fill=True , alpha=0.4 ) # alpha는 불투명도
            ax.add_patch( obstacle_rect )
        return fig , ax
class Node:
    def __init__( self , x , y , parent=None ):
        self.x , self.y , self.parent = x , y, parent
class RRTConnect:
    def __init__( self , map_2d , start , goal , step_size=0.5 , max_iter=100 , goal_sample_rate=0.05 , dist_threshold=0.3 ):
        self.map = map_2d
        self.start , self.goal = Node( start[0] , start[1] ) , Node( goal[0] , goal[1] ) # start와 goal 노드 생성
        self.step_size = step_size # 한번에 확장할 거리
        self.max_iter = max_iter # 알고리즘 최대 반복 횟수 
        self.goal_sample_rate = goal_sample_rate # goal 지점을 샘플링 할 확률 
        self.dist_threshold = dist_threshold # 트리 연결 판정 거리
        self.tree_start , self.tree_goal = [ self.start ] , [ self.goal ] # start와 goal 트리를 리스트로 관리 
        self.fig , self.ax = self.map.plot_map()

    # --------------------------
    # ---유틸리티 메서드 
    # --------------------------
    def distance( self , n1 , n2 ):
        return math.hypot( n2.x - n1.x , n2.y - n1.y ) # 두 노드 사이의 유클리드 거리 계산
        
    # 목표 지점으로 biasing을 고려한 랜덤한 노드 생성
    def get_random_node( self ):
        if random.random() < self.goal_sample_rate: # goal_sample_rate의 확률로 goal 노드를 사용
            return Node( self.goal.x , self.goal.y )
        else:
            x = random.uniform( 0 , self.map.width )
            y = random.uniform( 0 , self.map.height)
            return Node( x , y )
            
    # tree( 리스트 )내에서 node와 가장 가까운 노드를 찾음
    def nearest_node( self , tree , node ): # tree에 여러개의 값들이 있는데, 이들 값으로 거리들을 다 구하고 그중에서 거리가 가장 짧은 tree의 요소를 리턴하는 방식
        return min( tree , key=lambda nd: self.distance( nd , node ) ) # 거리가 key값이다.

    # node1에서 node2로 가는 선분이 장애물과 충돌하는지 검사. 선분을 여러 지점으로 샘플링해서, 각 지점이 장애물 내부에 존재하는지 확인
    def collision_free( self , node1 , node2 ):
        steps = max( int( self.distance(node1 , node2) / ( self.step_size * 0.5) ) , 1)
        for i in range( steps + 1 ):
            t = i/steps
            x = node1.x + t*( node2.x - node1.x )
            y = node1.y + t*( node2.y - node1.y )
          #  print(f'{x} , {y}')
            if self.map.is_in_obstacle( x , y ):
                return False
        return True

    # q_near에서 q_rand방향으로 step_size만큼 전진한 새로운 노드를 만든다.
    def steer( self , from_node , to_node ):
        dist = self.distance( from_node , to_node )
        if dist < self.step_size:
            new_node = Node( to_node.x , to_node.y )
            new_node.parent = from_node
            return new_node
        else:
            theta = math.atan2( to_node.y - from_node.y , to_node.x - from_node.x ) # 방향 벡터 구하기
            new_x = from_node.x + self.step_size*math.cos(theta)
            new_y = from_node.y + self.step_size*math.sin(theta)
            new_node = Node( new_x , new_y , parent=from_node )
            return new_node

    # ------------------------
    # ------ RRT - Connect의 핵심 
    # ------------------------
    def extend( self , tree , q_rand ):
        q_near = self.nearest_node( tree , q_rand )
        q_new = self.steer( q_near , q_rand )
        if self.collision_free( q_near , q_new ):
            tree.append( q_new )
            self.ax.plot( [q_near.x , q_new.x] , [q_near.y , q_new.y] , '-g' , linewidth=3 , alpha=0.5)
            return q_new
        return None

    def connect( self , tree , q_target ): # q_target은 반대편 트리에서 새로 생긴 노드 ( 목표 방향 )
        while True:
            q_new = self.extend( tree , q_target ) # 1번 확장
            if q_new is None:
                return "Trapped" # 더 이상 전진이 불가능하면 중단한다.
            dist = self.distance( q_new , q_target )
            if dist < self.step_size:
                return "Reached" # 충분히 가까워졌다면 reached

    # -------------------
    # ----main
    # -------------------
    def build( self ):
        print('build : 함수시작 ')
        for _ in range( self.max_iter ):
            q_rand = self.get_random_node() # 랜덤하게 노드 생성
            q_new = self.extend( self.tree_start , q_rand ) # 시작노드에서 1번 전진
            #print(f'build : 랜덤하게 생성한 노드는 ({q_rand.x},{q_rand.y})이며, q_new는 {q_new}입니다. 한쪽 트리는 extend를 완료...')
            if q_new is not None:
                result = self.connect( self.tree_goal , q_new ) # goal트리를 q_new 방향으로 connect를 시도한다.
               # print(f'        남은 한쪽 트리가 q_new방향으로 connect를 시도합니다... 결과는 {result}')
                if result == "Reached":
                    path = self.reconstruct_path()
                    return path
            self.tree_start , self.tree_goal = self.tree_goal , self.tree_start # 두 트리를 스왑
        return None
        
    def reconstruct_path( self ):
       # print('tree_Start')
      #  print(self.tree_start.length)
        #print('tree_goal')
        #print(self.tree_goal)
        last_start , last_goal = self.tree_start[-1] , self.tree_goal[-1] # 현재의 시점에서 두 트리의 마지막 노드를 확인
        print(f'last_start is {last_start.x},{last_start.y} and last_goal is {last_goal.x},{last_goal.y}')
        if self.distance( last_start , last_goal ) < self.dist_threshold: # 만약에 두 노드 간의 거리가 dist_threshold 이하라면 같은 지점으로 간주.
            meet_node_start = last_start
            meet_node_goal = last_goal
            print("!")
        else:
            meet_node_start , meet_node_goal = None , None
            min_dist = float('inf')
            for ns in self.tree_start:
                for ng in self.tree_goal:
                    d = self.distance( ns , ng )
                    if d < min_dist:
                        min_dist = d
                        meet_node_start = ns
                        meet_node_goal = ng
        path_start = []
        cur = meet_node_start
        while cur is not None:
            path_start.append( (cur.x , cur.y ) )
            cur = cur.parent
        path_start.reverse()

        path_goal = []
        cur = meet_node_goal
        while cur is not None:
            path_goal.append( (cur.x , cur.y ) )
            cur = cur.parent
        #path_goal.reverse()
        path_goal = path_goal[1:] # 처음 노드는 start와 중복될거임
        
        final_path = path_start + path_goal
        return final_path

    def visualize_path( self , path ):
        if path is None:
            print("error")
            return
        # 경로가 있으면 경로 시각화
        px, py = zip(*path)                           # (x, y) 리스트에서 x, y 별도 튜플로 분리
        print()
        print(f'  ({px}\n\n\n{py})  ')
        self.ax.plot(px, py, '-r', linewidth=1, label='Final Path')  # 최종 경로(빨간색 선)
        self.ax.plot(self.start.x, self.start.y, 'bo', label='Start')  # 시작점(파란 원)
        self.ax.plot(self.goal.x, self.goal.y, 'bx', label='Goal')     # 목표점(파란 X)
        self.ax.legend()                              # 범례 표시
        plt.show()                                    # 그래프 창 띄우기
if __name__ == "__main__":
    # 맵 생성
    my_map = Map2D()

    # 시작점, 목표점
    start_pos = (1, 1)
    goal_pos = (9, 9)

    # RRT-Connect 객체 생성
    rrt_connect = RRTConnect(
        map_2d=my_map,
        start=start_pos,
        goal=goal_pos,
        step_size=0.3,       # 확장 시 한 번에 이동할 거리
        max_iter=1500,       # 최대 반복 횟수
        goal_sample_rate=0.1 # goal을 직접 샘플링할 확률(10%)
    )

    # 알고리즘 실행
    final_path = rrt_connect.build()
    print(final_path)
    # 결과 시각화
    rrt_connect.visualize_path(final_path)

rrt-connect.ipynb
0.05MB