군집드론 무력화 시스템 노드간 연결 구조.png

20240526_112808.png

  1. 활용언어
  1. 인터페이스 설계 도구
  1. 통신 프로토콜
  1. 화면 송출 라이브러리

소스 코드(py) <코드 최적화 필요>

import cv2
import rospy
import numpy as np
import torch
import threading
import time
from cv_bridge import CvBridge, CvBridgeError
from geometry_msgs.msg import PoseStamped
from mavros_msgs.msg import State
from mavros_msgs.srv import CommandBool, CommandBoolRequest, SetMode, SetModeRequest
from sensor_msgs.msg import Image
from std_msgs.msg import String
from pynput import keyboard
import tkinter as tk
from tkinter import ttk
from PIL import Image as PILImage
from PIL import ImageTk
import math

class PID:
    def __init__(self, Kp, Ki, Kd, setpoint=0):
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.setpoint = setpoint
        self.prev_error = 0
        self.integral = 0
        self.prev_time = time.time()

    def compute(self, current_value):
        error = self.setpoint - current_value
        current_time = time.time()
        dt = current_time - self.prev_time

        if dt == 0:
            dt = 1e-16

        self.integral += error * dt
        derivative = (error - self.prev_error) / dt

        output = self.Kp * error + self.Ki * self.integral + self.Kd * derivative

        self.prev_error = error
        self.prev_time = current_time

        return output

class RealTimeObjectDetection(threading.Thread):
    def __init__(self):
        super(RealTimeObjectDetection, self).__init__()
        self.daemon = True
        self.average_bounding_box_size = 0
        self.bounding_box_size = 0
        self.initialized = False
        self.frame_count = 0
        self.start_time = time.time()
        self.bridge = CvBridge()
        weights = './drone_yolov5.pt'
        self.model = torch.hub.load('ultralytics/yolov5', 'custom', path=weights, force_reload=False)
        self.tracker = cv2.TrackerGOTURN_create()
        self.subscriber = rospy.Subscriber("iris1/flow_camera/image_raw", Image, self.callback)
        self.detection_pub = rospy.Publisher("detection_info", String, queue_size=10)
        self.cv_image = None
        self.cv_image_resized = None

    def callback(self, data):
        object_position = [0, 0]
        try:
            self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
        except CvBridgeError as e:
            rospy.logerr(e)
            return

        self.frame_count += 1
        if not self.initialized or self.frame_count % 30 == 0:
            results = self.model(self.cv_image)
            if len(results.xyxy[0]) > 0:
                bounding_box_sizes = []
                for i in range(len(results.xyxy[0])):
                    x1, y1, x2, y2 = results.xyxy[0][i][:4]
                    width = x2 - x1
                    height = y2 - y1
                    self.bounding_box_size = width * height
                    bounding_box_sizes.append(self.bounding_box_size)
                
                if bounding_box_sizes:
                    self.average_bounding_box_size = sum(bounding_box_sizes) / len(bounding_box_sizes)
                else:
                    self.average_bounding_box_size = 0

                min_x = results.xyxy[0][:, 0].min().item()
                min_y = results.xyxy[0][:, 1].min().item()
                max_x = results.xyxy[0][:, 2].max().item()
                max_y = results.xyxy[0][:, 3].max().item()
                init_bb = (min_x, min_y, max_x - min_x, max_y - min_y)
                self.tracker.init(self.cv_image, tuple(map(int, init_bb)))
                self.initialized = True
            else:
                self.initialized = False
        else:
            success, box = self.tracker.update(self.cv_image)
            if success:
                x, y, w, h = map(int, box)
                cv2.rectangle(self.cv_image, (x, y), (x + w, y + h), (0, 255, 0), 2, 1)
                object_position = (x + w / 2, y + h / 2)
                detection_info = f"{object_position[0]} {object_position[1]} {self.average_bounding_box_size} {w}"
                self.detection_pub.publish(detection_info)
            else:
                self.initialized = False

        cv2.putText(self.cv_image, "x : {:.2f}".format(object_position[0]), (140, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (50, 170, 50), 1)
        cv2.putText(self.cv_image, "y : {:.2f}".format(object_position[1]), (140, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (50, 170, 50), 1)
        cv2.putText(self.cv_image, "avg size : {:.2f}".format(self.average_bounding_box_size), (120, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (50, 170, 50), 1)
        self.cv_image_resized = cv2.resize(self.cv_image, None, fx=2.0, fy=2.0, interpolation=cv2.INTER_LINEAR)
    
    def get_frame(self):
            if self.cv_image_resized is not None:
                return cv2.cvtColor(self.cv_image_resized, cv2.COLOR_BGR2RGB)
            else:
                return np.zeros((480, 640, 3), dtype=np.uint8)  # 기본 검은색 배경

    def run(self):
        rospy.spin()

class DroneController:
    def __init__(self, uav_id):
        self.detection_info = ""
        self.uav_id = uav_id
        self.current_state = State()
        self.pose = PoseStamped()
        self.tracking_enabled = False

        rospy.Subscriber("detection_info", String, self.detection_cb)
        rospy.Subscriber(f"/{self.uav_id}/mavros/state", State, self.state_cb)
        self.local_pos_pub = rospy.Publisher(f"/{self.uav_id}/mavros/setpoint_position/local", PoseStamped, queue_size=10)
        self.arming_client = rospy.ServiceProxy(f"/{self.uav_id}/mavros/cmd/arming", CommandBool)
        self.set_mode_client = rospy.ServiceProxy(f"/{self.uav_id}/mavros/set_mode", SetMode)
        self.rate = rospy.Rate(20)

        keyboard_thread = threading.Thread(target=self.keyboard_listener)
        keyboard_thread.daemon = True
        keyboard_thread.start()

        # PID 컨트롤러 초기화
        self.yaw_pid = PID(0.005, 0.0001, 0.001)  # Kp, Ki, Kd 값은 튜닝 필요
        self.altitude_pid = PID(0.1, 0.01, 0.05)  # Kp, Ki, Kd 값은 튜닝 필요

    def state_cb(self, msg):
        self.current_state = msg

    def detection_cb(self, msg):
        self.detection_info = msg.data

    def on_press(self, key):
        try:
            if key.char == 't':
                self.tracking_enabled = not self.tracking_enabled
                rospy.loginfo(f"Tracking mode: {'Enabled' if self.tracking_enabled else 'Disabled'}")
        except AttributeError:
            pass

    def keyboard_listener(self):
        listener = keyboard.Listener(on_press=self.on_press)
        listener.start()
        listener.join()

    def connect_and_takeoff(self, altitude):
        while not rospy.is_shutdown() and not self.current_state.connected:
            rospy.logwarn(f"Waiting for {self.uav_id} to connect...")
            self.rate.sleep()

        rospy.loginfo(f"{self.uav_id} is connected, proceeding to takeoff.")

        self.pose.pose.position.z = altitude
        for _ in range(100):
            if rospy.is_shutdown():
                break
            self.local_pos_pub.publish(self.pose)
            self.rate.sleep()

        offb_set_mode = SetModeRequest()
        offb_set_mode.custom_mode = 'OFFBOARD'
        arm_cmd = CommandBoolRequest()
        arm_cmd.value = True
        last_req = rospy.Time.now()

        while not rospy.is_shutdown():
            if self.current_state.mode != "OFFBOARD" and (rospy.Time.now() - last_req) > rospy.Duration(5.0):
                if self.set_mode_client.call(offb_set_mode).mode_sent:
                    rospy.loginfo(f"{self.uav_id} OFFBOARD enabled")
                last_req = rospy.Time.now()
            elif not self.current_state.armed and (rospy.Time.now() - last_req) > rospy.Duration(5.0):
                if self.arming_client.call(arm_cmd).success:
                    rospy.loginfo(f"{self.uav_id} Vehicle armed")
                last_req = rospy.Time.now()

            rospy.logdebug(f"{self.uav_id} current mode: {self.current_state.mode}, armed: {self.current_state.armed}")

            self.local_pos_pub.publish(self.pose)
            self.rate.sleep()
            if self.tracking_enabled:
                self.tracking()

    def tracking(self):
        try:
            cx, cy, s, w = map(float, self.detection_info.split())

            # 드론의 yaw를 조정하여 바운딩 박스가 중앙에 위치하게 함
            frame_center_x = 320  # 이미지 중심 (가로 방향)
            error_x = cx - frame_center_x

            yaw_rate = self.yaw_pid.compute(error_x)  # PID 컨트롤러로 yaw 제어
            
            # 드론의 roll, pitch, yaw를 사용한 이동 명령
            self.pose.pose.orientation.z = math.sin(yaw_rate / 2)
            self.pose.pose.orientation.w = math.cos(yaw_rate / 2)

            # Z 축 고도 조정 (크기 기반)
            size_setpoint = 150  # 목표 바운딩 박스 크기
            altitude_error = size_setpoint - s
            z_pose = self.altitude_pid.compute(altitude_error)  # PID 컨트롤러로 고도 제어

            self.pose.pose.position.z += z_pose

            rospy.loginfo("추적 제어: yaw rate={}, 고도 (z좌표) 상승/하강={}".format(yaw_rate, z_pose))
            
            rospy.sleep(0.1)
        except ValueError:
            rospy.loginfo("Invalid detection info received")

    def move_forward(self, distance):
        self.pose.pose.position.x += distance
        rospy.loginfo(f"{self.uav_id} 이동: {distance}m 앞으로")

class FollowerController(DroneController):
    def __init__(self, uav_id, leader_id):
        super().__init__(uav_id)
        self.leader_id = leader_id
        self.tracking_enabled = False
        self.follow_enabled = True  # 리더를 계속 따라갈지 여부 결정

        rospy.Subscriber(f"/{self.leader_id}/mavros/local_position/pose", PoseStamped, self.follow_leader)

    def follow_leader(self, msg):
        if self.follow_enabled:
            # 로깅 추가
            rospy.loginfo(f"Following leader: {self.leader_id} | Leader position: x={msg.pose.position.x}, y={msg.pose.position.y}, z={msg.pose.position.z}")

            self.pose.pose.position.x = msg.pose.position.x
            self.pose.pose.position.y = msg.pose.position.y
            self.pose.pose.position.z = msg.pose.position.z - 10

    def update_pose(self):
        while not rospy.is_shutdown():
            self.local_pos_pub.publish(self.pose)
            self.rate.sleep()

    def disable_following(self):
        self.follow_enabled = False
        rospy.loginfo(f"{self.uav_id} 자폭 드론 출격, 더 이상 리더를 따라가지 않습니다.")

class DroneControlInterface:
    def __init__(self, root, rtod, uav0, uav1):
        self.root = root
        self.uav0 = uav0
        self.uav1 = uav1
        self.rtod = rtod

        self.root.title("Drone Control Interface")
        self.root.geometry("1200x800")

        self.video_frame = ttk.Label(self.root)
        self.video_frame.grid(row=0, column=0, columnspan=4, padx=10, pady=10)

        ttk.Button(self.root, text="추적 시작", command=self.start_tracking).grid(row=1, column=0, padx=10, pady=10)
        ttk.Button(self.root, text="추적 중지", command=self.stop_tracking).grid(row=1, column=1, padx=10, pady=10)
        ttk.Button(self.root, text="자폭 드론 출격", command=self.deploy_kamikaze_drone).grid(row=1, column=2, padx=10, pady=10)
        ttk.Button(self.root, text="재밍 실시", command=self.jamming).grid(row=1, column=3, padx=10, pady=10)
        ttk.Button(self.root, text="리더 드론 복귀", command=self.return_to_home).grid(row=1, column=4, padx=10, pady=10)

        self.update_video()

    def update_video(self):
        frame = self.rtod.get_frame()
        img = PILImage.fromarray(frame)
        imgtk = ImageTk.PhotoImage(image=img)
        self.video_frame.imgtk = imgtk
        self.video_frame.configure(image=imgtk)
        self.root.after(10, self.update_video)

    def start_tracking(self):
        self.uav0.tracking_enabled = True
        rospy.loginfo("추적 시작")

    def stop_tracking(self):
        self.uav0.tracking_enabled = False
        rospy.loginfo("추적 중지")

    def jamming(self):
        rospy.loginfo("재밍 실시")

    def return_to_home(self):
        self.uav0.pose.pose.position.x = 0
        self.uav0.pose.pose.position.y = 0
        self.uav0.pose.pose.position.z = 10
        rospy.loginfo("리더 드론 복귀")

    def deploy_kamikaze_drone(self):
        self.uav1.disable_following()
        self.uav1.move_forward(10)
        rospy.loginfo("자폭 드론 출격: 10m 앞으로 이동")

if __name__ == "__main__":
    rospy.init_node('multi_drone_control', anonymous=True)

    rtod = RealTimeObjectDetection()
    uav0 = DroneController("uav0")
    uav1 = FollowerController("uav1", "uav0")

    def run_drone(drone, altitude):
        drone.connect_and_takeoff(altitude)
        if drone.tracking_enabled:
            while not rospy.is_shutdown():
                drone.tracking()
        else:
            drone.update_pose()

    t0 = threading.Thread(target=run_drone, args=(uav0, 20))
    t1 = threading.Thread(target=run_drone, args=(uav1, 10))
    t0.start()
    t1.start()

    root = tk.Tk()
    interface = DroneControlInterface(root, rtod, uav0, uav1)

    rtod.start()
    root.mainloop()

    t0.join()
    t1.join()

    try:
        rospy.spin()
    except KeyboardInterrupt:
        cv2.destroyAllWindows()

코드 설계

1.png

3.png

423.png