#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from mavros_msgs.msg import State # /mavros/state topic에서 올라오는 드론 상태
from sensor_msgs.msg import BatteryState # /mavros/battery 토픽에서 올라오는 배터리 상태
#-----------------
# 네트워크 및 시관 관련
#-----------------
import socket
from datetime import datetime, timedelta
# x값을 [ lo , hi ]구간으로 제한(clamp)하는 함수
def clamp(x, lo, hi):
return max(lo, min(hi, x))
# 현재 시간을 "KST(UTC+9)" 기준으로 YYYYMMDDHHMMSS 문자열로 반환.
def now_kst_yyyymmddhhmmss() -> str:
# 시스템 TZ에 의존하지 않고 UTC+9로 고정 (KST)
return (datetime.utcnow() + timedelta(hours=9)).strftime("%Y%m%d%H%M%S")
class StateBatterySampler(Node):
def __init__(self):
super().__init__('state_battery_sampler')
# -----------------------
# SQL Server connection info
# -----------------------
self.sql_server = "" # "192.192.1.1,8080" "ipAddr , Port "
self.sql_db = ""
self.sql_user = ""
self.sql_password = ""
self.sql_login_timeout_sec = 5
# ---- params ----
self.declare_parameter('cell_count', 3) # 3S 기준
self.declare_parameter('ema_alpha', 0.25)
# 파라미터 값 읽기
self.cell_count = int(self.get_parameter('cell_count').value)
self.ema_alpha = float(self.get_parameter('ema_alpha').value)
# (셀 전압 기준) 대략 OCV(SOC) 테이블
self.soc_curve = [
(4.20, 100),
(4.15, 95),
(4.11, 90),
(4.08, 85),
(4.02, 80),
(3.98, 75),
(3.95, 70),
(3.91, 65),
(4.87, 60),
(3.85, 55),
(3.84, 50),
(3.82, 45),
(3.80, 40),
(3.79, 35),
(3.77, 30),
(3.75, 25),
(3.73, 20),
(3.71, 15),
(3.69, 10),
(3.61, 5),
(3.20, 0),
]
self.ema_percent = None
# ---- per-tick cache (for DB insert) ----
# tick = 5초 주기로 한 사이클
# 하나의 tick 동안 받은 state/battery를 저장했다가, 둘 다 준비되면 db에 1회 insert
self.tick_connected = None # bool
self.tick_armed = None # bool
self.tick_voltage = None # float
self.tick_inserted = False
self.tick_percent = None
# ---- gate flags ----
# 이는 5초(tick)마다 gate를 열어서, 해당 5초동안 /mavros/state에서 1개, /mavros/battery에서 1개만 골라서 db에 한번 insert하는것
# 아래들은 각각 첫 메시지를 처리하면 False로 내려서 추가 메시지를 무시.
# state/batt 둘 다 모였을때만 try_insert_if_ready()가 insert진행
self.allow_state = True # state 메시지 첫 1개만 처리
self.allow_batt = True # battery 메시지 첫 1개만 처리
# ---- subscribers ----
self.sub_state = self.create_subscription(
State, '/mavros/state', self.cb_state, qos_profile_sensor_data
)
self.sub_batt = self.create_subscription(
BatteryState, '/mavros/battery', self.cb_batt, qos_profile_sensor_data
)
# ---- timer ----
# 5초마다 실행
self.timer = self.create_timer(5.0, self.on_timer)
# ---- SQL check ----
self.sql_ok = self.check_sql_connection()
self.get_logger().info(
f"Listening: /mavros/state, /mavros/battery (1 msg each per 5s) | "
f"cell_count={self.cell_count}, ema_alpha={self.ema_alpha}"
)
# -----------------------
# SQL helpers (pytds)
# -----------------------
def _parse_server(self):
if "," in self.sql_server:
host, port_s = self.sql_server.split(",", 1)
return host.strip(), int(port_s.strip())
return self.sql_server.strip(), 1433
def _connect_pytds(self):
try:
import pytds
except ImportError:
self.get_logger().error("[SQL] python-tds not installed. Run: pip3 install python-tds")
return None
host, port = self._parse_server()
try:
conn = pytds.connect(
dsn=host,
port=port,
database=self.sql_db,
user=self.sql_user,
password=self.sql_password,
login_timeout=self.sql_login_timeout_sec,
timeout=5,
autocommit=True,
)
return conn
except Exception as e:
self.get_logger().error(f"[SQL] pytds connect failed: {e}")
return None
def check_sql_connection(self) -> bool:
host, port = self._parse_server()
# TCP 포트 확인(빠른 원인 분리)
try:
with socket.create_connection((host, port), timeout=3):
self.get_logger().info(f"[SQL] TCP OK: {host}:{port}")
except Exception as e:
self.get_logger().error(f"[SQL] TCP FAILED: {host}:{port} ({e})")
return False
conn = self._connect_pytds()
if conn is None:
return False
try:
cur = conn.cursor()
cur.execute("SELECT 1")
cur.fetchone()
cur.close()
conn.close()
self.get_logger().info("[SQL] Connected to SQL Server OK. (pytds)")
return True
except Exception as e:
self.get_logger().error(f"[SQL] Connection FAILED (pytds): {e}")
try:
conn.close()
except Exception:
pass
return False
def insert_drone_status(self, update_time: str, armed: bool, connected: bool, voltage: float, percent: float) -> bool:
if not self.sql_ok:
return False
conn = self._connect_pytds()
if conn is None:
self.sql_ok = False
return False
# pytds는 %s 플레이스홀더를 사용해서 파라미터 바인딩 가능 :contentReference[oaicite:1]{index=1}
sql = """
INSERT INTO dbo.T_DRONE_STATUS (UPDATE_TIME, ARMED, CONNECTED, VOLTAGE, [PERCENT])
VALUES (%s, %s, %s, %s, %s);
"""
try:
with conn.cursor() as cur:
# bool을 그대로 넣어도 되지만, 테이블이 bit/int면 0/1이 더 안전
#cur.execute(sql, (update_time, int(armed), int(connected), float(voltage)))
# cur.execute(sql, (update_time, bool(armed), bool(connected), float(voltage)))
cur.execute(sql, (update_time, "True" if armed else "False",
"True" if connected else "False", float(voltage), float(percent)))
conn.close()
self.get_logger().info(
f"[SQL] INSERT OK: UPDATE_TIME={update_time} ARMED={bool(armed)} CONNECTED={bool(connected)} VOLTAGE={voltage:.3f}"
)
return True
except Exception as e:
self.get_logger().error(f"[SQL] INSERT FAILED: {e}")
try:
conn.close()
except Exception:
pass
# 한번 실패하면 이후도 계속 실패할 수 있으니 플래그 내려둠(원하면 주기적 재시도도 가능)
self.sql_ok = False
return False
# -----------------------
# ROS timers / callbacks
# -----------------------
def on_timer(self):
# 새 틱 시작: 게이트 열고, 틱 캐시 리셋
self.allow_state = True
self.allow_batt = True
self.tick_connected = None
self.tick_armed = None
self.tick_voltage = None
self.tick_inserted = False
self.get_logger().info("----- tick (opened gates for next 1 msg) -----")
def cb_state(self, msg: State):
if not self.allow_state:
return
self.allow_state = False
self.tick_connected = bool(msg.connected)
self.tick_armed = bool(msg.armed)
self.get_logger().info(f"[STATE] connected={msg.connected} armed={msg.armed}")
self.try_insert_if_ready()
def voltage_to_percent(self, v_cell: float) -> float:
curve = self.soc_curve
if v_cell >= curve[0][0]:
return float(curve[0][1])
if v_cell <= curve[-1][0]:
return float(curve[-1][1])
for (v_hi, p_hi), (v_lo, p_lo) in zip(curve[:-1], curve[1:]):
if v_hi >= v_cell >= v_lo:
t = (v_cell - v_lo) / (v_hi - v_lo)
return p_lo + t * (p_hi - p_lo)
return 0.0
def cb_batt(self, msg: BatteryState):
if not self.allow_batt:
return
self.allow_batt = False
v_pack = float(msg.voltage)
self.tick_voltage = v_pack
v_cell = v_pack / float(self.cell_count)
est = self.voltage_to_percent(v_cell)
self.tick_percent = float(est)
if self.ema_percent is None:
self.ema_percent = est
else:
a = clamp(self.ema_alpha, 0.0, 1.0)
self.ema_percent = (1.0 - a) * self.ema_percent + a * est
warn = ""
if v_cell < 3.0:
warn = " [WARN: <3.0V/cell (under-load sag or wrong voltage source/scale)]"
self.get_logger().info(
f"[BATTERY] V={v_pack:.3f}V ({self.cell_count}S, {v_cell:.3f}V/cell) "
f"est={est:5.1f}% ema={self.ema_percent:5.1f}%{warn}"
)
self.try_insert_if_ready()
def try_insert_if_ready(self):
if self.tick_inserted:
return
if self.tick_connected is None or self.tick_armed is None or self.tick_voltage is None or self.tick_percent is None:
return
update_time = now_kst_yyyymmddhhmmss()
ok = self.insert_drone_status(
update_time=update_time,
armed=self.tick_armed,
connected=self.tick_connected,
voltage=self.tick_voltage,
percent=self.tick_percent,
)
if ok:
self.tick_inserted = True
def main():
rclpy.init()
node = StateBatterySampler()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()