import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
import cv2
from cv_bridge import CvBridge, CvBridgeError
class DepthCollisionWarningNode(Node):
def __init__(self):
super().__init__('depth_collision_warning_node')
self.subscription = self.create_subscription(
Image,
'/oakd/rgb/preview/depth',
self.listener_callback,
qos_profile=rclpy.qos.QoSProfile(
reliability=rclpy.qos.ReliabilityPolicy.RELIABLE,
durability=rclpy.qos.DurabilityPolicy.VOLATILE,
depth=10
)
)
self.bridge = CvBridge()
self.collision_distance = 2.0 # 2미터 이하에서 경고
# ROI 비율 설정 (화면 중심부 영역)
self.region_top_ratio = 0.3
self.region_bottom_ratio = 0.6
self.region_left_ratio = 0.3
self.region_right_ratio = 0.7
def listener_callback(self, msg):
try:
depth_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='32FC1')
h, w = depth_image.shape
top = int(h * self.region_top_ratio)
bottom = int(h * self.region_bottom_ratio)
left = int(w * self.region_left_ratio)
right = int(w * self.region_right_ratio)
roi = depth_image[top:bottom, left:right]
valid_mask = np.isfinite(roi)
if np.any(valid_mask):
valid_depths = roi[valid_mask]
min_distance = np.min(valid_depths)
if min_distance < self.collision_distance:
# 가장 가까운 픽셀의 좌표 찾기
min_idx = np.argmin(roi + (~valid_mask)*np.inf)
min_y, min_x = np.unravel_index(min_idx, roi.shape)
# 원본 이미지 기준으로 좌표 조정
pixel_x = left + min_x
pixel_y = top + min_y
self.get_logger().warn(
f"⚠️ 충돌 위험! 가장 가까운 물체: {min_distance:.2f}m (픽셀 좌표: x={pixel_x}, y={pixel_y})"
)
except CvBridgeError as e:
self.get_logger().error(f"CvBridge Error: {e}")
def main(args=None):
rclpy.init(args=args)
depth_collision_warning_node = DepthCollisionWarningNode()
rclpy.spin(depth_collision_warning_node)
depth_collision_warning_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
cd ~/collision_ws
colcon build --symlink-install
source install/setup.bash
ros2 run depth_collision_warning collision_warning