ROS 2

[yolo][ros] yolo와 ros연동 시키기

정지홍 2025. 11. 27. 16:49

1. 

  • cd ~/jeong/src
  • ros2 pkg create --build-type ament_python yolo_rt_ros --dependencies rclpy sensor_msgs cv_bridge vision_msgs

 

 

2.

  • cd ~/jeong/src/yolo_rt_ros/yolo_rt_ros
  • vim run_yolo.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import pyrealsense2 as rs
import numpy as np
from ultralytics import YOLO
import torch

class YoloRealSenseNode(Node):
    def __init__(self):
        super().__init__('yolo_realsense_node')

        # ROS 2 Publisher (결과 이미지를 'yolo/result' 토픽으로 쏨)
        self.pub_image = self.create_publisher(Image, 'yolo/result', 10)
        self.br = CvBridge()

        # RealSense 초기화
        self.pipeline = rs.pipeline()
        self.config = rs.config()
        self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

        try:
            self.pipeline.start(self.config)
            self.get_logger().info("RealSense Camera Started")
        except Exception as e:
            self.get_logger().error(f"Camera Error: {e}")

        # YOLO 모델 로드
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.model = YOLO("yolov8n.pt")
        self.get_logger().info(f"YOLOv8 Loaded on {self.device}")

        # 0.03초마다(30FPS) 실행되는 타이머
        self.timer = self.create_timer(0.033, self.timer_callback)

    def timer_callback(self):
        frames = self.pipeline.wait_for_frames()
        color_frame = frames.get_color_frame()
        if not color_frame:
            return

        # 이미지 변환 및 추론
        frame = np.asanyarray(color_frame.get_data())
        results = self.model.predict(frame, imgsz=640, device=self.device, verbose=False)

        # 결과 그리기
        annotated_frame = results[0].plot()

        # ROS 메시지로 변환하여 전송
        msg = self.br.cv2_to_imgmsg(annotated_frame, encoding="bgr8")
        self.pub_image.publish(msg)

        # (디버깅용) 로컬 화면에도 띄우기 - 필요 없으면 주석 처리
        # cv2.imshow("YOLO Node View", annotated_frame)
        # cv2.waitKey(1)

def main(args=None):
    rclpy.init(args=args)
    node = YoloRealSenseNode()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.pipeline.stop()
        node.destroy_node()
        rclpy.shutdown()
        cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

 

 

3.

  • cd ~/jeong/src/yolo_rt_ros
  • vim setup.py
    entry_points={
        'console_scripts': [
            'run_yolo = yolo_rt_ros.run_yolo:main',
        ],
    },

 

 

 

 

4.

  • colcon build --symlink-install --packages-select yolo_rt_ros

 

 

5.

  • export LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1
  • ros2 run yolo_rt_ros run_yolo

 

 

 

 

 

 

 

 

 


import torch
from ultralytics import YOLO
import cv2

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String  # 좌표를 문자로 보내기 위해 추가
from cv_bridge import CvBridge
import pyrealsense2 as rs
import numpy as np
import json # 데이터를 예쁘게 포장하기 위해 사용

class YoloRealSenseNode(Node):
    def __init__(self):
        super().__init__('yolo_realsense_node')
        
        # 1. 이미지 보내는 토픽 (사람이 보는 용도)
        self.pub_image = self.create_publisher(Image, 'yolo/image_raw', 10)
        
        # 2. ★ 좌표 데이터 보내는 토픽 (로봇이 쓰는 용도) ★
        self.pub_box = self.create_publisher(String, 'yolo/detections', 10)
        
        self.br = CvBridge()

        # 리얼센스 설정
        self.pipeline = rs.pipeline()
        self.config = rs.config()
        self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
        
        try:
            self.pipeline.start(self.config)
            self.get_logger().info("RealSense Camera Started")
        except Exception as e:
            self.get_logger().error(f"Camera Error: {e}")

        # YOLO 모델 로드
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.model = YOLO("yolov8n.pt")
        self.get_logger().info(f"YOLOv8 Loaded on {self.device}")

        # 타이머
        self.timer = self.create_timer(0.033, self.timer_callback)

    def timer_callback(self):
        frames = self.pipeline.wait_for_frames()
        color_frame = frames.get_color_frame()
        if not color_frame:
            return

        frame = np.asanyarray(color_frame.get_data())
        
        # 추론
        results = self.model.predict(frame, imgsz=640, device=self.device, verbose=False)
        
        # --- [추가된 부분] 좌표 데이터 추출 및 발행 ---
        detection_list = []
        
        # 감지된 물체가 있다면 반복문 돌리기
        if len(results[0].boxes) > 0:
            for box in results[0].boxes:
                # 좌표 (x1, y1, x2, y2)
                xyxy = box.xyxy[0].cpu().numpy().astype(int).tolist()
                # 클래스 ID (0=사람, etc)
                cls_id = int(box.cls[0])
                # 정확도
                conf = float(box.conf[0])
                # 클래스 이름
                name = results[0].names[cls_id]

                # 데이터 포장
                obj_data = {
                    "class": name,
                    "id": cls_id,
                    "confidence": round(conf, 2),
                    "box": xyxy  # [x_min, y_min, x_max, y_max]
                }
                detection_list.append(obj_data)

        # JSON 문자열로 변환하여 ROS 메시지 발행
        if detection_list:
            msg_str = String()
            msg_str.data = json.dumps(detection_list)
            self.pub_box.publish(msg_str)
            # 로그로도 살짝 보여줌
            # self.get_logger().info(f"Detected: {msg_str.data}")

        # ----------------------------------------

        # 시각화 이미지 발행 (기존 기능)
        annotated_frame = results[0].plot()
        img_msg = self.br.cv2_to_imgmsg(annotated_frame, encoding="bgr8")
        self.pub_image.publish(img_msg)

def main(args=None):
    rclpy.init(args=args)
    node = YoloRealSenseNode()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.pipeline.stop()
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

'ROS 2' 카테고리의 다른 글

isaac sim ros에서 workspace설정  (0) 2026.01.06
[ynet][ros] ros환경에서 작동  (0) 2025.12.04
ros2 realsense camera image저장  (0) 2025.11.10
Q-RRT* F-RRT* 비교대상으로 정리한것  (0) 2025.08.14
rplidar a3m1 연결  (0) 2025.07.02