1.
- cd ~/jeong/src
- ros2 pkg create --build-type ament_python yolo_rt_ros --dependencies rclpy sensor_msgs cv_bridge vision_msgs
2.
- cd ~/jeong/src/yolo_rt_ros/yolo_rt_ros
- vim run_yolo.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import pyrealsense2 as rs
import numpy as np
from ultralytics import YOLO
import torch
class YoloRealSenseNode(Node):
def __init__(self):
super().__init__('yolo_realsense_node')
# ROS 2 Publisher (결과 이미지를 'yolo/result' 토픽으로 쏨)
self.pub_image = self.create_publisher(Image, 'yolo/result', 10)
self.br = CvBridge()
# RealSense 초기화
self.pipeline = rs.pipeline()
self.config = rs.config()
self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
try:
self.pipeline.start(self.config)
self.get_logger().info("RealSense Camera Started")
except Exception as e:
self.get_logger().error(f"Camera Error: {e}")
# YOLO 모델 로드
self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
self.model = YOLO("yolov8n.pt")
self.get_logger().info(f"YOLOv8 Loaded on {self.device}")
# 0.03초마다(30FPS) 실행되는 타이머
self.timer = self.create_timer(0.033, self.timer_callback)
def timer_callback(self):
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
return
# 이미지 변환 및 추론
frame = np.asanyarray(color_frame.get_data())
results = self.model.predict(frame, imgsz=640, device=self.device, verbose=False)
# 결과 그리기
annotated_frame = results[0].plot()
# ROS 메시지로 변환하여 전송
msg = self.br.cv2_to_imgmsg(annotated_frame, encoding="bgr8")
self.pub_image.publish(msg)
# (디버깅용) 로컬 화면에도 띄우기 - 필요 없으면 주석 처리
# cv2.imshow("YOLO Node View", annotated_frame)
# cv2.waitKey(1)
def main(args=None):
rclpy.init(args=args)
node = YoloRealSenseNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.pipeline.stop()
node.destroy_node()
rclpy.shutdown()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
3.
- cd ~/jeong/src/yolo_rt_ros
- vim setup.py
entry_points={
'console_scripts': [
'run_yolo = yolo_rt_ros.run_yolo:main',
],
},

4.
- colcon build --symlink-install --packages-select yolo_rt_ros
5.
- export LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1
- ros2 run yolo_rt_ros run_yolo
import torch
from ultralytics import YOLO
import cv2
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String # 좌표를 문자로 보내기 위해 추가
from cv_bridge import CvBridge
import pyrealsense2 as rs
import numpy as np
import json # 데이터를 예쁘게 포장하기 위해 사용
class YoloRealSenseNode(Node):
def __init__(self):
super().__init__('yolo_realsense_node')
# 1. 이미지 보내는 토픽 (사람이 보는 용도)
self.pub_image = self.create_publisher(Image, 'yolo/image_raw', 10)
# 2. ★ 좌표 데이터 보내는 토픽 (로봇이 쓰는 용도) ★
self.pub_box = self.create_publisher(String, 'yolo/detections', 10)
self.br = CvBridge()
# 리얼센스 설정
self.pipeline = rs.pipeline()
self.config = rs.config()
self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
try:
self.pipeline.start(self.config)
self.get_logger().info("RealSense Camera Started")
except Exception as e:
self.get_logger().error(f"Camera Error: {e}")
# YOLO 모델 로드
self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
self.model = YOLO("yolov8n.pt")
self.get_logger().info(f"YOLOv8 Loaded on {self.device}")
# 타이머
self.timer = self.create_timer(0.033, self.timer_callback)
def timer_callback(self):
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
return
frame = np.asanyarray(color_frame.get_data())
# 추론
results = self.model.predict(frame, imgsz=640, device=self.device, verbose=False)
# --- [추가된 부분] 좌표 데이터 추출 및 발행 ---
detection_list = []
# 감지된 물체가 있다면 반복문 돌리기
if len(results[0].boxes) > 0:
for box in results[0].boxes:
# 좌표 (x1, y1, x2, y2)
xyxy = box.xyxy[0].cpu().numpy().astype(int).tolist()
# 클래스 ID (0=사람, etc)
cls_id = int(box.cls[0])
# 정확도
conf = float(box.conf[0])
# 클래스 이름
name = results[0].names[cls_id]
# 데이터 포장
obj_data = {
"class": name,
"id": cls_id,
"confidence": round(conf, 2),
"box": xyxy # [x_min, y_min, x_max, y_max]
}
detection_list.append(obj_data)
# JSON 문자열로 변환하여 ROS 메시지 발행
if detection_list:
msg_str = String()
msg_str.data = json.dumps(detection_list)
self.pub_box.publish(msg_str)
# 로그로도 살짝 보여줌
# self.get_logger().info(f"Detected: {msg_str.data}")
# ----------------------------------------
# 시각화 이미지 발행 (기존 기능)
annotated_frame = results[0].plot()
img_msg = self.br.cv2_to_imgmsg(annotated_frame, encoding="bgr8")
self.pub_image.publish(img_msg)
def main(args=None):
rclpy.init(args=args)
node = YoloRealSenseNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.pipeline.stop()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()'ROS 2' 카테고리의 다른 글
| isaac sim ros에서 workspace설정 (0) | 2026.01.06 |
|---|---|
| [ynet][ros] ros환경에서 작동 (0) | 2025.12.04 |
| ros2 realsense camera image저장 (0) | 2025.11.10 |
| Q-RRT* F-RRT* 비교대상으로 정리한것 (0) | 2025.08.14 |
| rplidar a3m1 연결 (0) | 2025.07.02 |