코딩 및 기타

[isaac-sim] 4방향 a*알고리즘

정지홍 2026. 4. 8. 16:20

 

대각선은 없고 상하좌우만 움직이는거

import os
import math
import heapq
import traceback
from pathlib import Path

import numpy as np
import yaml
from PIL import Image

os.environ["OMNI_KIT_ACCEPT_EULA"] = "YES"

from isaacsim import SimulationApp
simulation_app = SimulationApp({"headless": False})

scene_view = None
viewport_window = None


try:
    import omni
    from omni.kit.viewport.utility import get_active_viewport_window
    from omni.ui import scene as sc

    from isaacsim.core.api import World
    from isaacsim.core.utils.stage import add_reference_to_stage
    from isaacsim.storage.native import get_assets_root_path
    from isaacsim.robot.wheeled_robots.robots import WheeledRobot
    from isaacsim.robot.wheeled_robots.controllers.differential_controller import DifferentialController

    # ============================================================
    # 사용자 설정
    # ============================================================
    MAP_YAML = "/home/jihong/Downloads/maps/carter_warehouse_navigation.yaml"

    LOOKAHEAD = 0.8
    GOAL_TOL = 0.30
    GROUND_Z = 0.0

    WHEEL_RADIUS = 0.14
    WHEEL_BASE = 0.54

    # 시작 위치를 직접 지정하고 싶으면 None 대신 (x, y)로 넣기
    # 예: REQUESTED_START_WORLD = (-6.0, -4.0)
    REQUESTED_START_WORLD = None

    # ============================================================
    # 맵 로딩
    # ============================================================
    def load_ros_occupancy_map(yaml_path: str):
        yaml_path = Path(yaml_path).expanduser().resolve()
        with open(yaml_path, "r") as f:
            meta = yaml.safe_load(f)

        image_path = Path(meta["image"])
        if not image_path.is_absolute():
            image_path = (yaml_path.parent / image_path).resolve()

        img = Image.open(image_path).convert("L")
        img = np.array(img, dtype=np.float32)

        resolution = float(meta["resolution"])
        origin_x = float(meta["origin"][0])
        origin_y = float(meta["origin"][1])
        origin_yaw = float(meta["origin"][2])

        negate = int(meta.get("negate", 0))
        occupied_thresh = float(meta.get("occupied_thresh", 0.65))
        free_thresh = float(meta.get("free_thresh", 0.196))

        # ROS map_server 스타일
        if negate == 0:
            occ_prob = (255.0 - img) / 255.0
        else:
            occ_prob = img / 255.0

        # 0 = free, 1 = occupied, -1 = unknown
        grid_sem = np.full(img.shape, -1, dtype=np.int8)
        grid_sem[occ_prob > occupied_thresh] = 1
        grid_sem[occ_prob < free_thresh] = 0

        # planning grid: 0 = free, 1 = blocked
        plan_grid = np.ones_like(grid_sem, dtype=np.int8)
        plan_grid[grid_sem == 0] = 0
        plan_grid[grid_sem == 1] = 1
        plan_grid[grid_sem == -1] = 1

        return {
            "yaml_path": str(yaml_path),
            "image_path": str(image_path),
            "resolution": resolution,
            "origin_x": origin_x,
            "origin_y": origin_y,
            "origin_yaw": origin_yaw,
            "grid_sem": grid_sem,
            "plan_grid": plan_grid,
            "height": plan_grid.shape[0],
            "width": plan_grid.shape[1],
        }

    # ============================================================
    # 좌표 변환
    # ============================================================
    def world_to_cell(x, y, map_info):
        res = map_info["resolution"]
        ox = map_info["origin_x"]
        oy = map_info["origin_y"]
        h = map_info["height"]

        mx = int((x - ox) / res)
        my = int((y - oy) / res)

        col = mx
        row = h - 1 - my
        return row, col

    def cell_to_world(row, col, map_info):
        res = map_info["resolution"]
        ox = map_info["origin_x"]
        oy = map_info["origin_y"]
        h = map_info["height"]

        mx = col
        my = h - 1 - row

        x = ox + (mx + 0.5) * res
        y = oy + (my + 0.5) * res
        return x, y

    def in_bounds(grid, rc):
        r, c = rc
        return 0 <= r < grid.shape[0] and 0 <= c < grid.shape[1]

    def nearest_free_cell(grid, rc, max_radius=250):
        if in_bounds(grid, rc) and grid[rc[0], rc[1]] == 0:
            return rc

        r0, c0 = rc
        for rad in range(1, max_radius + 1):
            rmin, rmax = r0 - rad, r0 + rad
            cmin, cmax = c0 - rad, c0 + rad

            for r in range(rmin, rmax + 1):
                for c in (cmin, cmax):
                    if in_bounds(grid, (r, c)) and grid[r, c] == 0:
                        return (r, c)

            for c in range(cmin, cmax + 1):
                for r in (rmin, rmax):
                    if in_bounds(grid, (r, c)) and grid[r, c] == 0:
                        return (r, c)

        raise RuntimeError(f"nearest free cell not found near {rc}")

    # ============================================================
    # A*
    # ============================================================
    def heuristic(a, b):
        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    def reconstruct_path(came_from, current):
        out = [current]
        while current in came_from:
            current = came_from[current]
            out.append(current)
        out.reverse()
        return out

    def astar(grid, start, goal):
        if not in_bounds(grid, start) or not in_bounds(grid, goal):
            raise ValueError(f"start/goal out of bounds: {start}, {goal}")
        if grid[start[0], start[1]] != 0:
            raise ValueError(f"start is blocked: {start}")
        if grid[goal[0], goal[1]] != 0:
            raise ValueError(f"goal is blocked: {goal}")

        open_heap = []
        heapq.heappush(open_heap, (heuristic(start, goal), 0, start))

        came_from = {}
        g_score = {start: 0}
        visited = set()
        neighbors = [(1, 0), (-1, 0), (0, 1), (0, -1)]

        while open_heap:
            _, gcur, cur = heapq.heappop(open_heap)
            if cur in visited:
                continue
            visited.add(cur)

            if cur == goal:
                return reconstruct_path(came_from, cur)

            r, c = cur
            for dr, dc in neighbors:
                nxt = (r + dr, c + dc)

                if not in_bounds(grid, nxt):
                    continue
                if grid[nxt[0], nxt[1]] != 0:
                    continue

                tentative_g = gcur + 1
                if tentative_g < g_score.get(nxt, float("inf")):
                    g_score[nxt] = tentative_g
                    came_from[nxt] = cur
                    f = tentative_g + heuristic(nxt, goal)
                    heapq.heappush(open_heap, (f, tentative_g, nxt))

        return []

    def downsample_path(path_cells, stride=6):
        if len(path_cells) <= 2:
            return path_cells

        out = [path_cells[0]]
        for i in range(stride, len(path_cells) - 1, stride):
            out.append(path_cells[i])

        if out[-1] != path_cells[-1]:
            out.append(path_cells[-1])

        return out

    # ============================================================
    # follower
    # ============================================================
    def normalize_angle(a):
        return (a + np.pi) % (2.0 * np.pi) - np.pi

    def quat_wxyz_to_yaw(q):
        w, x, y, z = q
        return math.atan2(2.0 * (w * z + x * y), 1.0 - 2.0 * (y * y + z * z))

    def choose_waypoint(path_world_xy, pos_xy, current_idx, lookahead):
        idx = current_idx
        while idx < len(path_world_xy) - 1:
            if np.linalg.norm(path_world_xy[idx] - pos_xy) >= lookahead:
                break
            idx += 1
        return idx, path_world_xy[idx]

    def follower(pos_xy, yaw, wp_xy):
        dx = wp_xy[0] - pos_xy[0]
        dy = wp_xy[1] - pos_xy[1]

        target_yaw = math.atan2(dy, dx)
        yaw_err = normalize_angle(target_yaw - yaw)
        dist = math.hypot(dx, dy)

        v = np.clip(1.2 * dist, 0.0, 0.65)
        w = np.clip(2.5 * yaw_err, -1.4, 1.4)

        if abs(yaw_err) > 0.7:
            v *= 0.25

        return np.array([v, w], dtype=np.float32)

    # ============================================================
    # 맵 읽기
    # ============================================================
    map_info = load_ros_occupancy_map(MAP_YAML)
    plan_grid = map_info["plan_grid"]

    print("[INFO] yaml       :", map_info["yaml_path"])
    print("[INFO] image      :", map_info["image_path"])
    print("[INFO] resolution :", map_info["resolution"])
    print("[INFO] origin     :", (map_info["origin_x"], map_info["origin_y"], map_info["origin_yaw"]))
    print("[INFO] grid shape :", plan_grid.shape)

    if REQUESTED_START_WORLD is None:
        requested_start = (
            map_info["origin_x"] + 2.0,
            map_info["origin_y"] + 2.0,
        )
    else:
        requested_start = REQUESTED_START_WORLD

    start_cell_raw = world_to_cell(requested_start[0], requested_start[1], map_info)
    start_cell = nearest_free_cell(plan_grid, start_cell_raw)
    start_world = cell_to_world(start_cell[0], start_cell[1], map_info)

    print("[INFO] requested start world:", requested_start)
    print("[INFO] snapped   start cell :", start_cell)
    print("[INFO] snapped   start world:", start_world)

    # ============================================================
    # world / asset load
    # ============================================================
    world = World(stage_units_in_meters=1.0)

    assets_root = get_assets_root_path()
    if not assets_root:
        raise RuntimeError("Could not find Isaac Sim assets root.")

    env_usd = assets_root + "/Isaac/Environments/Simple_Warehouse/warehouse_with_forklifts.usd"
    robot_usd = assets_root + "/Isaac/Robots/NVIDIA/NovaCarter/nova_carter.usd"

    print("[INFO] env_usd   :", env_usd)
    print("[INFO] robot_usd :", robot_usd)

    add_reference_to_stage(env_usd, "/World/Warehouse")
    add_reference_to_stage(robot_usd, "/World/NovaCarter")

    robot = world.scene.add(
        WheeledRobot(
            prim_path="/World/NovaCarter",
            name="nova_carter",
            wheel_dof_names=["joint_wheel_left", "joint_wheel_right"],
        )
    )

    controller = DifferentialController(
        name="nova_diff_controller",
        wheel_radius=WHEEL_RADIUS,
        wheel_base=WHEEL_BASE,
        max_linear_speed=0.8,
        max_angular_speed=1.5,
        max_wheel_speed=30.0,
    )

    world.reset()
    robot.initialize()

    spawn_pos, _ = robot.get_world_pose()
    spawn_z = float(spawn_pos[2])

    robot.set_world_pose(
        position=np.array([float(start_world[0]), float(start_world[1]), spawn_z], dtype=np.float32),
        orientation=np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32),
    )
    robot.set_default_state(
        position=np.array([float(start_world[0]), float(start_world[1]), spawn_z], dtype=np.float32),
        orientation=np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32),
    )

    world.play()
    world.step(render=True)

    print("[INFO] robot num_dof       :", robot.num_dof)
    print("[INFO] robot wheel indices:", robot.wheel_dof_indices)

    state = {
        "goal_xy": None,
        "path_world_xy": None,
        "wp_idx": 0,
    }

    # ============================================================
    # path planning
    # ============================================================
    def plan_to_clicked_goal(goal_xy_world):
        pos, _ = robot.get_world_pose()
        robot_xy = np.array([pos[0], pos[1]], dtype=np.float32)

        start_raw2 = world_to_cell(float(robot_xy[0]), float(robot_xy[1]), map_info)
        goal_raw2 = world_to_cell(float(goal_xy_world[0]), float(goal_xy_world[1]), map_info)

        print("[DEBUG] robot_xy      =", robot_xy)
        print("[DEBUG] start_raw2    =", start_raw2)
        print("[DEBUG] goal_raw2     =", goal_raw2)

        start_cell2 = nearest_free_cell(plan_grid, start_raw2)
        goal_cell2 = nearest_free_cell(plan_grid, goal_raw2)

        print("[DEBUG] start_cell2   =", start_cell2)
        print("[DEBUG] goal_cell2    =", goal_cell2)

        path_cells = astar(plan_grid, start_cell2, goal_cell2)
        if not path_cells:
            print("[WARN] A* failed for clicked goal.")
            return

        path_cells = downsample_path(path_cells, stride=6)

        path_world_xy = np.array(
            [cell_to_world(r, c, map_info) for (r, c) in path_cells],
            dtype=np.float32,
        )

        snapped_goal_world = np.array(
            cell_to_world(goal_cell2[0], goal_cell2[1], map_info),
            dtype=np.float32,
        )

        state["goal_xy"] = snapped_goal_world
        state["path_world_xy"] = path_world_xy
        state["wp_idx"] = 0

        print("[INFO] clicked goal world :", goal_xy_world)
        print("[INFO] snapped goal world :", snapped_goal_world)
        print("[INFO] path waypoints     :", len(path_world_xy))

    # ============================================================
    # click -> ndc -> ray
    # ============================================================
    def on_screen_click(shape):
        try:
            payload = shape.gesture_payload
            mouse = getattr(payload, "mouse", None)

            if mouse is None:
                raise RuntimeError("gesture_payload.mouse is None")

            if not isinstance(mouse, (list, tuple)) or len(mouse) < 2:
                raise RuntimeError(f"unexpected mouse payload: {mouse}")

            ndc_x = float(mouse[0])
            ndc_y = float(mouse[1])

            print("[DEBUG] ndc =", (ndc_x, ndc_y))

            ray_origin, ray_dir = scene_view.get_ray_from_ndc(sc.Vector2(ndc_x, ndc_y))

            ray_origin = np.array([ray_origin.x, ray_origin.y, ray_origin.z], dtype=np.float32)
            ray_dir = np.array([ray_dir.x, ray_dir.y, ray_dir.z], dtype=np.float32)

            print("[DEBUG] ray_origin =", ray_origin)
            print("[DEBUG] ray_dir    =", ray_dir)

            if abs(ray_dir[2]) < 1e-6:
                print("[WARN] click ray parallel to ground.")
                return

            t = (GROUND_Z - ray_origin[2]) / ray_dir[2]
            if t <= 0.0:
                print("[WARN] click ray does not hit forward ground plane.")
                return

            hit = ray_origin + t * ray_dir
            goal_xy_world = np.array([hit[0], hit[1]], dtype=np.float32)

            print("[INFO] clicked hit world =", goal_xy_world)
            plan_to_clicked_goal(goal_xy_world)

        except Exception:
            print("[ERROR] click callback exception")
            traceback.print_exc()

    # ============================================================
    # viewport overlay
    # ============================================================
    viewport_window = get_active_viewport_window()
    if viewport_window is None:
        raise RuntimeError("No active viewport window found.")

    with viewport_window.get_frame("goal_picker_overlay"):
        scene_view = sc.SceneView(child_windows_input=False)
        viewport_window.viewport_api.add_scene_view(scene_view)

        with scene_view.scene:
            sc.Screen(
                gesture=[
                    sc.ClickGesture(
                        on_screen_click,
                        name="goal_click_gesture",
                    )
                ]
            )

    print("[INFO] click on warehouse floor to set a new goal.")

    # ============================================================
    # main loop
    # ============================================================
    step = 0
    while simulation_app.is_running():
        if state["goal_xy"] is not None and state["path_world_xy"] is not None:
            pos, orn = robot.get_world_pose()
            pos_xy = np.array([pos[0], pos[1]], dtype=np.float32)
            yaw = quat_wxyz_to_yaw(orn)

            if np.linalg.norm(state["goal_xy"] - pos_xy) < GOAL_TOL:
                action = controller.forward(np.array([0.0, 0.0], dtype=np.float32))
                robot.apply_wheel_actions(action)

                if step % 60 == 0:
                    print(f"[STEP {step}] goal reached or within tolerance")
            else:
                wp_idx, wp_xy = choose_waypoint(
                    state["path_world_xy"], pos_xy, state["wp_idx"], LOOKAHEAD
                )
                state["wp_idx"] = wp_idx

                cmd = follower(pos_xy, yaw, wp_xy)
                action = controller.forward(cmd)
                robot.apply_wheel_actions(action)

                if step % 60 == 0:
                    print(
                        f"[STEP {step}] "
                        f"pos=({pos_xy[0]:.2f}, {pos_xy[1]:.2f}) "
                        f"goal=({state['goal_xy'][0]:.2f}, {state['goal_xy'][1]:.2f}) "
                        f"wp_idx={wp_idx}/{len(state['path_world_xy']) - 1} "
                        f"wp=({wp_xy[0]:.2f}, {wp_xy[1]:.2f}) "
                        f"cmd=({cmd[0]:.2f}, {cmd[1]:.2f})"
                    )

        world.step(render=True)
        step += 1

except Exception:
    print("[ERROR] exception occurred")
    traceback.print_exc()

finally:
    try:
        if scene_view is not None and viewport_window is not None:
            scene_view.scene.clear()
            viewport_window.viewport_api.remove_scene_view(scene_view)
    except Exception:
        pass

    simulation_app.close()