대각선은 없고 상하좌우만 움직이는거
import os
import math
import heapq
import traceback
from pathlib import Path
import numpy as np
import yaml
from PIL import Image
os.environ["OMNI_KIT_ACCEPT_EULA"] = "YES"
from isaacsim import SimulationApp
simulation_app = SimulationApp({"headless": False})
scene_view = None
viewport_window = None
try:
import omni
from omni.kit.viewport.utility import get_active_viewport_window
from omni.ui import scene as sc
from isaacsim.core.api import World
from isaacsim.core.utils.stage import add_reference_to_stage
from isaacsim.storage.native import get_assets_root_path
from isaacsim.robot.wheeled_robots.robots import WheeledRobot
from isaacsim.robot.wheeled_robots.controllers.differential_controller import DifferentialController
# ============================================================
# 사용자 설정
# ============================================================
MAP_YAML = "/home/jihong/Downloads/maps/carter_warehouse_navigation.yaml"
LOOKAHEAD = 0.8
GOAL_TOL = 0.30
GROUND_Z = 0.0
WHEEL_RADIUS = 0.14
WHEEL_BASE = 0.54
# 시작 위치를 직접 지정하고 싶으면 None 대신 (x, y)로 넣기
# 예: REQUESTED_START_WORLD = (-6.0, -4.0)
REQUESTED_START_WORLD = None
# ============================================================
# 맵 로딩
# ============================================================
def load_ros_occupancy_map(yaml_path: str):
yaml_path = Path(yaml_path).expanduser().resolve()
with open(yaml_path, "r") as f:
meta = yaml.safe_load(f)
image_path = Path(meta["image"])
if not image_path.is_absolute():
image_path = (yaml_path.parent / image_path).resolve()
img = Image.open(image_path).convert("L")
img = np.array(img, dtype=np.float32)
resolution = float(meta["resolution"])
origin_x = float(meta["origin"][0])
origin_y = float(meta["origin"][1])
origin_yaw = float(meta["origin"][2])
negate = int(meta.get("negate", 0))
occupied_thresh = float(meta.get("occupied_thresh", 0.65))
free_thresh = float(meta.get("free_thresh", 0.196))
# ROS map_server 스타일
if negate == 0:
occ_prob = (255.0 - img) / 255.0
else:
occ_prob = img / 255.0
# 0 = free, 1 = occupied, -1 = unknown
grid_sem = np.full(img.shape, -1, dtype=np.int8)
grid_sem[occ_prob > occupied_thresh] = 1
grid_sem[occ_prob < free_thresh] = 0
# planning grid: 0 = free, 1 = blocked
plan_grid = np.ones_like(grid_sem, dtype=np.int8)
plan_grid[grid_sem == 0] = 0
plan_grid[grid_sem == 1] = 1
plan_grid[grid_sem == -1] = 1
return {
"yaml_path": str(yaml_path),
"image_path": str(image_path),
"resolution": resolution,
"origin_x": origin_x,
"origin_y": origin_y,
"origin_yaw": origin_yaw,
"grid_sem": grid_sem,
"plan_grid": plan_grid,
"height": plan_grid.shape[0],
"width": plan_grid.shape[1],
}
# ============================================================
# 좌표 변환
# ============================================================
def world_to_cell(x, y, map_info):
res = map_info["resolution"]
ox = map_info["origin_x"]
oy = map_info["origin_y"]
h = map_info["height"]
mx = int((x - ox) / res)
my = int((y - oy) / res)
col = mx
row = h - 1 - my
return row, col
def cell_to_world(row, col, map_info):
res = map_info["resolution"]
ox = map_info["origin_x"]
oy = map_info["origin_y"]
h = map_info["height"]
mx = col
my = h - 1 - row
x = ox + (mx + 0.5) * res
y = oy + (my + 0.5) * res
return x, y
def in_bounds(grid, rc):
r, c = rc
return 0 <= r < grid.shape[0] and 0 <= c < grid.shape[1]
def nearest_free_cell(grid, rc, max_radius=250):
if in_bounds(grid, rc) and grid[rc[0], rc[1]] == 0:
return rc
r0, c0 = rc
for rad in range(1, max_radius + 1):
rmin, rmax = r0 - rad, r0 + rad
cmin, cmax = c0 - rad, c0 + rad
for r in range(rmin, rmax + 1):
for c in (cmin, cmax):
if in_bounds(grid, (r, c)) and grid[r, c] == 0:
return (r, c)
for c in range(cmin, cmax + 1):
for r in (rmin, rmax):
if in_bounds(grid, (r, c)) and grid[r, c] == 0:
return (r, c)
raise RuntimeError(f"nearest free cell not found near {rc}")
# ============================================================
# A*
# ============================================================
def heuristic(a, b):
return abs(a[0] - b[0]) + abs(a[1] - b[1])
def reconstruct_path(came_from, current):
out = [current]
while current in came_from:
current = came_from[current]
out.append(current)
out.reverse()
return out
def astar(grid, start, goal):
if not in_bounds(grid, start) or not in_bounds(grid, goal):
raise ValueError(f"start/goal out of bounds: {start}, {goal}")
if grid[start[0], start[1]] != 0:
raise ValueError(f"start is blocked: {start}")
if grid[goal[0], goal[1]] != 0:
raise ValueError(f"goal is blocked: {goal}")
open_heap = []
heapq.heappush(open_heap, (heuristic(start, goal), 0, start))
came_from = {}
g_score = {start: 0}
visited = set()
neighbors = [(1, 0), (-1, 0), (0, 1), (0, -1)]
while open_heap:
_, gcur, cur = heapq.heappop(open_heap)
if cur in visited:
continue
visited.add(cur)
if cur == goal:
return reconstruct_path(came_from, cur)
r, c = cur
for dr, dc in neighbors:
nxt = (r + dr, c + dc)
if not in_bounds(grid, nxt):
continue
if grid[nxt[0], nxt[1]] != 0:
continue
tentative_g = gcur + 1
if tentative_g < g_score.get(nxt, float("inf")):
g_score[nxt] = tentative_g
came_from[nxt] = cur
f = tentative_g + heuristic(nxt, goal)
heapq.heappush(open_heap, (f, tentative_g, nxt))
return []
def downsample_path(path_cells, stride=6):
if len(path_cells) <= 2:
return path_cells
out = [path_cells[0]]
for i in range(stride, len(path_cells) - 1, stride):
out.append(path_cells[i])
if out[-1] != path_cells[-1]:
out.append(path_cells[-1])
return out
# ============================================================
# follower
# ============================================================
def normalize_angle(a):
return (a + np.pi) % (2.0 * np.pi) - np.pi
def quat_wxyz_to_yaw(q):
w, x, y, z = q
return math.atan2(2.0 * (w * z + x * y), 1.0 - 2.0 * (y * y + z * z))
def choose_waypoint(path_world_xy, pos_xy, current_idx, lookahead):
idx = current_idx
while idx < len(path_world_xy) - 1:
if np.linalg.norm(path_world_xy[idx] - pos_xy) >= lookahead:
break
idx += 1
return idx, path_world_xy[idx]
def follower(pos_xy, yaw, wp_xy):
dx = wp_xy[0] - pos_xy[0]
dy = wp_xy[1] - pos_xy[1]
target_yaw = math.atan2(dy, dx)
yaw_err = normalize_angle(target_yaw - yaw)
dist = math.hypot(dx, dy)
v = np.clip(1.2 * dist, 0.0, 0.65)
w = np.clip(2.5 * yaw_err, -1.4, 1.4)
if abs(yaw_err) > 0.7:
v *= 0.25
return np.array([v, w], dtype=np.float32)
# ============================================================
# 맵 읽기
# ============================================================
map_info = load_ros_occupancy_map(MAP_YAML)
plan_grid = map_info["plan_grid"]
print("[INFO] yaml :", map_info["yaml_path"])
print("[INFO] image :", map_info["image_path"])
print("[INFO] resolution :", map_info["resolution"])
print("[INFO] origin :", (map_info["origin_x"], map_info["origin_y"], map_info["origin_yaw"]))
print("[INFO] grid shape :", plan_grid.shape)
if REQUESTED_START_WORLD is None:
requested_start = (
map_info["origin_x"] + 2.0,
map_info["origin_y"] + 2.0,
)
else:
requested_start = REQUESTED_START_WORLD
start_cell_raw = world_to_cell(requested_start[0], requested_start[1], map_info)
start_cell = nearest_free_cell(plan_grid, start_cell_raw)
start_world = cell_to_world(start_cell[0], start_cell[1], map_info)
print("[INFO] requested start world:", requested_start)
print("[INFO] snapped start cell :", start_cell)
print("[INFO] snapped start world:", start_world)
# ============================================================
# world / asset load
# ============================================================
world = World(stage_units_in_meters=1.0)
assets_root = get_assets_root_path()
if not assets_root:
raise RuntimeError("Could not find Isaac Sim assets root.")
env_usd = assets_root + "/Isaac/Environments/Simple_Warehouse/warehouse_with_forklifts.usd"
robot_usd = assets_root + "/Isaac/Robots/NVIDIA/NovaCarter/nova_carter.usd"
print("[INFO] env_usd :", env_usd)
print("[INFO] robot_usd :", robot_usd)
add_reference_to_stage(env_usd, "/World/Warehouse")
add_reference_to_stage(robot_usd, "/World/NovaCarter")
robot = world.scene.add(
WheeledRobot(
prim_path="/World/NovaCarter",
name="nova_carter",
wheel_dof_names=["joint_wheel_left", "joint_wheel_right"],
)
)
controller = DifferentialController(
name="nova_diff_controller",
wheel_radius=WHEEL_RADIUS,
wheel_base=WHEEL_BASE,
max_linear_speed=0.8,
max_angular_speed=1.5,
max_wheel_speed=30.0,
)
world.reset()
robot.initialize()
spawn_pos, _ = robot.get_world_pose()
spawn_z = float(spawn_pos[2])
robot.set_world_pose(
position=np.array([float(start_world[0]), float(start_world[1]), spawn_z], dtype=np.float32),
orientation=np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32),
)
robot.set_default_state(
position=np.array([float(start_world[0]), float(start_world[1]), spawn_z], dtype=np.float32),
orientation=np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32),
)
world.play()
world.step(render=True)
print("[INFO] robot num_dof :", robot.num_dof)
print("[INFO] robot wheel indices:", robot.wheel_dof_indices)
state = {
"goal_xy": None,
"path_world_xy": None,
"wp_idx": 0,
}
# ============================================================
# path planning
# ============================================================
def plan_to_clicked_goal(goal_xy_world):
pos, _ = robot.get_world_pose()
robot_xy = np.array([pos[0], pos[1]], dtype=np.float32)
start_raw2 = world_to_cell(float(robot_xy[0]), float(robot_xy[1]), map_info)
goal_raw2 = world_to_cell(float(goal_xy_world[0]), float(goal_xy_world[1]), map_info)
print("[DEBUG] robot_xy =", robot_xy)
print("[DEBUG] start_raw2 =", start_raw2)
print("[DEBUG] goal_raw2 =", goal_raw2)
start_cell2 = nearest_free_cell(plan_grid, start_raw2)
goal_cell2 = nearest_free_cell(plan_grid, goal_raw2)
print("[DEBUG] start_cell2 =", start_cell2)
print("[DEBUG] goal_cell2 =", goal_cell2)
path_cells = astar(plan_grid, start_cell2, goal_cell2)
if not path_cells:
print("[WARN] A* failed for clicked goal.")
return
path_cells = downsample_path(path_cells, stride=6)
path_world_xy = np.array(
[cell_to_world(r, c, map_info) for (r, c) in path_cells],
dtype=np.float32,
)
snapped_goal_world = np.array(
cell_to_world(goal_cell2[0], goal_cell2[1], map_info),
dtype=np.float32,
)
state["goal_xy"] = snapped_goal_world
state["path_world_xy"] = path_world_xy
state["wp_idx"] = 0
print("[INFO] clicked goal world :", goal_xy_world)
print("[INFO] snapped goal world :", snapped_goal_world)
print("[INFO] path waypoints :", len(path_world_xy))
# ============================================================
# click -> ndc -> ray
# ============================================================
def on_screen_click(shape):
try:
payload = shape.gesture_payload
mouse = getattr(payload, "mouse", None)
if mouse is None:
raise RuntimeError("gesture_payload.mouse is None")
if not isinstance(mouse, (list, tuple)) or len(mouse) < 2:
raise RuntimeError(f"unexpected mouse payload: {mouse}")
ndc_x = float(mouse[0])
ndc_y = float(mouse[1])
print("[DEBUG] ndc =", (ndc_x, ndc_y))
ray_origin, ray_dir = scene_view.get_ray_from_ndc(sc.Vector2(ndc_x, ndc_y))
ray_origin = np.array([ray_origin.x, ray_origin.y, ray_origin.z], dtype=np.float32)
ray_dir = np.array([ray_dir.x, ray_dir.y, ray_dir.z], dtype=np.float32)
print("[DEBUG] ray_origin =", ray_origin)
print("[DEBUG] ray_dir =", ray_dir)
if abs(ray_dir[2]) < 1e-6:
print("[WARN] click ray parallel to ground.")
return
t = (GROUND_Z - ray_origin[2]) / ray_dir[2]
if t <= 0.0:
print("[WARN] click ray does not hit forward ground plane.")
return
hit = ray_origin + t * ray_dir
goal_xy_world = np.array([hit[0], hit[1]], dtype=np.float32)
print("[INFO] clicked hit world =", goal_xy_world)
plan_to_clicked_goal(goal_xy_world)
except Exception:
print("[ERROR] click callback exception")
traceback.print_exc()
# ============================================================
# viewport overlay
# ============================================================
viewport_window = get_active_viewport_window()
if viewport_window is None:
raise RuntimeError("No active viewport window found.")
with viewport_window.get_frame("goal_picker_overlay"):
scene_view = sc.SceneView(child_windows_input=False)
viewport_window.viewport_api.add_scene_view(scene_view)
with scene_view.scene:
sc.Screen(
gesture=[
sc.ClickGesture(
on_screen_click,
name="goal_click_gesture",
)
]
)
print("[INFO] click on warehouse floor to set a new goal.")
# ============================================================
# main loop
# ============================================================
step = 0
while simulation_app.is_running():
if state["goal_xy"] is not None and state["path_world_xy"] is not None:
pos, orn = robot.get_world_pose()
pos_xy = np.array([pos[0], pos[1]], dtype=np.float32)
yaw = quat_wxyz_to_yaw(orn)
if np.linalg.norm(state["goal_xy"] - pos_xy) < GOAL_TOL:
action = controller.forward(np.array([0.0, 0.0], dtype=np.float32))
robot.apply_wheel_actions(action)
if step % 60 == 0:
print(f"[STEP {step}] goal reached or within tolerance")
else:
wp_idx, wp_xy = choose_waypoint(
state["path_world_xy"], pos_xy, state["wp_idx"], LOOKAHEAD
)
state["wp_idx"] = wp_idx
cmd = follower(pos_xy, yaw, wp_xy)
action = controller.forward(cmd)
robot.apply_wheel_actions(action)
if step % 60 == 0:
print(
f"[STEP {step}] "
f"pos=({pos_xy[0]:.2f}, {pos_xy[1]:.2f}) "
f"goal=({state['goal_xy'][0]:.2f}, {state['goal_xy'][1]:.2f}) "
f"wp_idx={wp_idx}/{len(state['path_world_xy']) - 1} "
f"wp=({wp_xy[0]:.2f}, {wp_xy[1]:.2f}) "
f"cmd=({cmd[0]:.2f}, {cmd[1]:.2f})"
)
world.step(render=True)
step += 1
except Exception:
print("[ERROR] exception occurred")
traceback.print_exc()
finally:
try:
if scene_view is not None and viewport_window is not None:
scene_view.scene.clear()
viewport_window.viewport_api.remove_scene_view(scene_view)
except Exception:
pass
simulation_app.close()'코딩 및 기타' 카테고리의 다른 글
| [isaac-sim] 8방향 a*알고리즘 (0) | 2026.04.09 |
|---|---|
| [isaac sim] sh파일로 로봇을 맵에 띄워보기 및 A* 알고리즘 (0) | 2026.04.08 |
| [isaac sim] Motion Generation (0) | 2026.04.07 |
| [isaac sim] isaac sim과 ros의 파이썬 버전 관련 (0) | 2026.03.12 |
| [isaac sim] ROS 2 Navigation 2 (0) | 2026.03.11 |