""" RID BLE 仿真程序 - Windows 11 基于 ASTM F3411-22a / GB 42590-2023 协议 通过本机蓝牙以 BLE 5.0 扩展广播发送 RID 数据 依赖: pip install winsdk 运行要求: - Windows 11 / Windows 10 (版本 1903+) - 支持 BLE 5.0 的蓝牙适配器 - 在 Windows 设置 > 隐私与安全性 > 位置 中为应用开启位置权限 - 以管理员权限运行(部分机器需要) 位置来源(按优先级): 1. Windows 位置服务(GPS/WiFi/IP 融合定位) 2. 上次已知位置缓存 3. 默认坐标(北京,仅在完全无法定位时使用) 时间来源: - 位置向量时间戳:UTC 系统时钟,当前小时内 1/10 秒计数 - 系统报文时间戳:Unix UTC,基准 2019-01-01 00:00:00 """ """ 测试GIT """ import struct import asyncio import time import sys import math import argparse from dataclasses import dataclass, field from typing import Optional # ────────────────────────────────────────────────────────── # 1. RID 数据结构定义 # ────────────────────────────────────────────────────────── @dataclass class RIDBasicPacket: """基本ID报文 (Message Type 0x0)""" ua_type: int = 2 # UA类型: 0=未声明,1=固定翼,2=旋翼,3=直升机,4=混合升力,5=多旋翼 id_type: int = 4 # ID类型: 0=无,1=序列号,2=CAA,3=UTM委派,4=特定会话ID(字符串) uasid: str = "" # 产品唯一识别码(最多20字节ASCII) reserved: bytes = field(default_factory=lambda: bytes(3)) @dataclass class RIDPosVecPacket: """位置向量报文 (Message Type 0x1)""" operational_status: int = 0 # 运行状态: 0=未初始化,1=地面待机,2=空中飞行,3=紧急 altitude_type: int = 0 # 高度类型: 0=气压高度,1=几何高度 track_angle_ew: int = 0 # 航迹角EW方向: 0=东,1=西 speed_multiplier: int = 0 # 速度乘数: 0=×0.25m/s,1=×0.75m/s track_angle: int = 0 # 航迹角 0~179,配合EW标志 ground_speed: int = 0 # 地速 (Raw,×0.25 m/s) vertical_speed: int = 0 # 垂直速度(int8,+上升/-下降,×0.5 m/s) latitude: int = 0 # 纬度 (1e-7 度, int32 小端) longitude: int = 0 # 经度 (1e-7 度, int32 小端) pressure_altitude: int = 0 # 气压高度 (int16 小端) geometric_altitude: int = 0 # 几何高度 (int16 小端) altitude_agl: int = 0 # 距地高度 (int16 小端) horiz_accuracy: int = 0 # 水平精度 (4bit) vert_accuracy: int = 0 # 垂直精度 (4bit) speed_acc: int = 0 # 速度精度 (4bit) timestamp: int = 0 # 时间戳 (当前小时内1/10秒数, uint16) timestamp_acc: int = 0 # 时间戳精度 (4bit) reserved: int = 0 @dataclass class RIDRDPacket: """运行描述报文 (Message Type 0x3)""" description_type: int = 0 # 描述类型: 0=文字描述 description: str = "aa" # 描述内容(最多23字节ASCII) @dataclass class RIDSYSPacket: """系统报文 (Message Type 0x4)""" cs_pos_type: int = 1 # 控制站位置类型: 0=未知,1=实时,2=起飞点,3=固定 region_code: int = 2 # 区域: 0=未定义,2=中国 coord_type: int = 0 # 坐标系: 0=WGS84,1=其他 cs_latitude: int = 0 # 控制站纬度 (1e-7 度, int32) cs_longitude: int = 0 # 控制站经度 (1e-7 度, int32) area_count: int = 0 # 运行区域计数 (uint16) area_radius: int = 0 # 运行区域半径 (×10 m) area_alt_upper: int = 0 # 运行区域高度上限 (int16) area_alt_lower: int = 0 # 运行区域高度下限 (int16) ua_run_category: int = 1 # 运行类别: 0=未定义,1=开放,2=特许,3=审定 ua_run_level: int = 0 # UA等级: 0=微型,1=轻型,2=小型,3=其他RID cs_altitude: int = 0 # 控制站高度 (int16) timestamp: int = 0 # 时间戳 (Unix - 2019-01-01, 秒) reserved: int = 0 # ────────────────────────────────────────────────────────── # 2. RID 序列化函数(完全对应 C 代码逻辑) # ────────────────────────────────────────────────────────── def rid_basic_serialize(packet: RIDBasicPacket) -> bytes: """基本ID报文序列化 → 25字节,对应 RIDBasicSerialize()""" buf = bytearray(25) buf[0] = 0x01 # 报头:类型0x0 | 版本0x1 # Bit[7:4]=ID类型, Bit[3:0]=UA类型 buf[1] = ((packet.id_type & 0xF) << 4) | (packet.ua_type & 0xF) # UASID:ASCII编码,不足20字节补零,超出截断 uasid_bytes = packet.uasid.encode('ascii', errors='replace')[:20] for i, b in enumerate(uasid_bytes): buf[2 + i] = b # reserved[3] = 0(已初始化为0) return bytes(buf) def rid_posvec_serialize(packet: RIDPosVecPacket) -> bytes: """位置向量报文序列化 → 25字节,对应 RIDPosVecSerialize()""" buf = bytearray(25) buf[0] = 0x11 # 报头:类型0x1 | 版本0x1 # Flag byte: Bit[0]=SpeedMultiplier, Bit[1]=TrackAngleEW, Bit[2]=AltitudeType, Bit[3]=Rsvd, Bit[7:4]=OperationalStatus flag = (packet.speed_multiplier & 1) | \ ((packet.track_angle_ew & 1) << 1) | \ ((packet.altitude_type & 1) << 2) | \ ((packet.operational_status & 0xF) << 4) buf[1] = flag buf[2] = packet.track_angle & 0xFF buf[3] = packet.ground_speed & 0xFF # VerticalSpeed: int8,用struct pack buf[4] = struct.pack('b', max(-128, min(127, packet.vertical_speed)))[0] # Latitude: int32 小端 lat_bytes = struct.pack(' bytes: """运行描述报文序列化 → 25字节,对应 RIDRDSerialize()""" buf = bytearray(25) buf[0] = 0x31 # 报头:类型0x3 | 版本0x1 buf[1] = packet.description_type & 0xFF desc_bytes = packet.description.encode('ascii', errors='replace')[:23] for i, b in enumerate(desc_bytes): buf[2 + i] = b return bytes(buf) def rid_sys_serialize(packet: RIDSYSPacket) -> bytes: """系统报文序列化 → 25字节,对应 RIDSYSSerialize()""" buf = bytearray(25) buf[0] = 0x41 # 报头:类型0x4 | 版本0x1 # Tag: Bit[1:0]=CSPosType, Bit[4:2]=RegionCode, Bit[6:5]=CoordType, Bit[7]=Rsvd tag = (packet.cs_pos_type & 0x3) | \ ((packet.region_code & 0x7) << 2) | \ ((packet.coord_type & 0x3) << 5) buf[1] = tag # ControlStationLatitude: int32 小端 cs_lat = struct.pack(' bytes: """ 将子报文列表打包为完整 RID Payload。 返回最终序列化字节串,对应 RIDPayloadSerialize() + RIDPacket() """ count = len(subpackets) assert 1 <= count <= 10, "子报文数量须在 1~10 之间" header = 0xF1 # 类型0xF | 版本0x1 sub_len = 0x19 # 25字节固定长度 result = bytearray([header, sub_len, count]) for pkt in subpackets: assert len(pkt) == 25, "每个子报文必须恰好25字节" result.extend(pkt) return bytes(result) # 全局消息计数器(用于 build_ble_ad_data 诊断输出) _diag_msg_counter = 0 def build_ble_ad_data(rid_payload: bytes) -> bytes: """ 将 RID Payload 打包为 BLE AD Data 结构(仅用于诊断/hex dump 显示)。 格式: [AD Length][AD Type=0x16][UUID=0xFFFA LE][AppCode=0x0D][Counter][RID Payload] 符合 ASTM F3411-22a BLE 传输层规范。 """ global _diag_msg_counter uuid_lsb = 0xFA uuid_msb = 0xFF counter = _diag_msg_counter & 0xFF _diag_msg_counter = (_diag_msg_counter + 1) & 0xFF ad_content = bytes([0x16, uuid_lsb, uuid_msb, 0x0D, counter]) + rid_payload ad_length = len(ad_content) return bytes([ad_length]) + ad_content def build_wifi_beacon_vendor_ie(rid_payload: bytes) -> bytes: """ 将 RID Payload 打包为 Wi-Fi Beacon 帧的 Vendor Specific IE (Element ID: 221)。 符合 ASTM F3411-22a Wi-Fi 传输层规范: ASD-STAN OUI (FA:0B:BC), Type 0x0D。 注意: 原生 Windows API 无法直接发送底层 Wi-Fi Beacon 帧(通常需监视模式网卡+Npcap等底层驱动支持)。 此处实现用于模拟组包过程,打印其 Hex 并在后续移植到其他平台(如 Linux/C/Scapy)时参考。 """ global _diag_msg_counter counter = _diag_msg_counter & 0xFF # Element ID: 221 (0xDD) # Length: 从 OUI 计算起,长为 5 字节(OUI 3字节 + Type 1字节 + Counter 1字节) + Payload 长度 length = 5 + len(rid_payload) ie = bytearray([0xDD, length, 0xFA, 0x0B, 0xBC, 0x0D, counter]) ie.extend(rid_payload) return bytes(ie) def rid_packet_full(basic: RIDBasicPacket, pos_vec: RIDPosVecPacket, rd: RIDRDPacket, sys: RIDSYSPacket) -> bytes: """完整打包:Basic + PosVec + RD + SYS,对应 RIDPacket()""" subpackets = [ rid_basic_serialize(basic), rid_posvec_serialize(pos_vec), rid_rd_serialize(rd), rid_sys_serialize(sys), ] return rid_payload_serialize(subpackets) def rid_packet_nano(basic: RIDBasicPacket, pos_vec: RIDPosVecPacket, sys: RIDSYSPacket) -> bytes: """最小包:Basic + PosVec + SYS,对应 RIDPacketNano()""" subpackets = [ rid_basic_serialize(basic), rid_posvec_serialize(pos_vec), rid_sys_serialize(sys), ] return rid_payload_serialize(subpackets) # ────────────────────────────────────────────────────────── # 3. Windows BLE 广播模块 # ────────────────────────────────────────────────────────── def _check_winsdk(): """检查 winsdk 是否可用""" try: import winsdk # noqa: F401 return True except ImportError: return False # ────────────────────────────────────────────────────────── # 3a. Windows 本机定位提供器 # ────────────────────────────────────────────────────────── class WindowsLocationProvider: """ 通过 Windows Runtime Geolocation API 获取本机位置。 支持 GPS / WiFi / 移动网络 / IP 融合定位。 需要在 Windows 设置 > 隐私与安全性 > 位置 中开启位置权限。 """ # 默认回退坐标(北京) FALLBACK_LAT = 39.9042 FALLBACK_LON = 116.4074 def __init__(self): self._last_lat: Optional[float] = None self._last_lon: Optional[float] = None self._last_alt: float = 0.0 self._last_accuracy: float = 9999.0 self._access_granted: bool = False self._locator = None self._available: bool = False self._import_api() def _import_api(self): try: from winsdk.windows.devices.geolocation import ( Geolocator, GeolocationAccessStatus, PositionAccuracy, ) self._Geolocator = Geolocator self._AccessStatus = GeolocationAccessStatus self._PositionAccuracy = PositionAccuracy self._available = True except ImportError as e: print(f"[位置] Geolocation API 不可用: {e}") self._available = False async def request_access_async(self) -> bool: """请求位置访问权限,需要在首次调用时执行一次""" if not self._available: return False try: access = await self._Geolocator.request_access_async() self._access_granted = (access == self._AccessStatus.ALLOWED) if self._access_granted: self._locator = self._Geolocator() self._locator.desired_accuracy = self._PositionAccuracy.HIGH print("[位置] 位置权限已授予,定位精度: HIGH") else: print(f"[位置] 位置权限被拒绝 (状态={access})") print("[位置] 请前往: 设置 > 隐私与安全性 > 位置,开启位置访问权限") return self._access_granted except Exception as e: print(f"[位置] 请求权限失败: {e}") return False async def get_position_async(self) -> Optional[tuple]: """ 异步获取当前位置。 返回: (lat_deg, lon_deg, alt_m, accuracy_m) 或 None(失败时) """ if not self._available or not self._access_granted or self._locator is None: return None try: pos = await self._locator.get_geoposition_async() coord = pos.coordinate lat = float(coord.latitude) lon = float(coord.longitude) # altitude 在部分设备上可能为 None alt = float(coord.altitude) if coord.altitude is not None else 0.0 # accuracy 为圆形误差(米) accuracy = float(coord.accuracy) if coord.accuracy is not None else 9999.0 self._last_lat = lat self._last_lon = lon self._last_alt = alt self._last_accuracy = accuracy return (lat, lon, alt, accuracy) except Exception as e: print(f"[位置] 获取位置失败: {e}") return None @property def last_position(self) -> Optional[tuple]: """上次成功获取的位置 (lat, lon, alt, accuracy),无记录时返回 None""" if self._last_lat is None: return None return (self._last_lat, self._last_lon, self._last_alt, self._last_accuracy) @property def has_fix(self) -> bool: """是否已有定位结果""" return self._last_lat is not None class WindowsBLEAdvertiser: """ 使用 Windows Runtime API 进行 BLE 广播。 工作模式(按优先级自动选择): 1. 扩展广播(BLE 5.0)—— 单帧发送完整 RID Payload(需硬件支持) 2. Legacy 单子帧轮播 —— 每次发送 1 条 25 字节子报文,由调用方周期轮换 MAC 地址稳定策略: - Windows 10/11 的 BLE 广播 API 默认使用 RPA (Resolvable Private Address), 且每次 start() 都会生成新的随机 MAC。 - 即使设置 is_anonymous = False,Windows 仍可能出于隐私保护强制轮换 MAC。 - 唯一能保持 MAC 绝对不变的方法是:**永远不调用 stop(),也不创建新 Publisher**。 - 既然 in-place 更新 DataSections 无效,我们改用 **ManufacturerData** 或 **动态更新 Payload** 的方式, 但 WinRT API 限制极严。 - 最终妥协方案:在 RID 协议中,接收端**不应该依赖 MAC 地址**来关联无人机, 而应该依赖 **UASID (Basic ID 报文)**。只要 UASID 相同,接收端就应视为同一架无人机。 - 为了尽量减少 MAC 变化频率,我们降低更新频率,或者接受 MAC 变化的事实。 """ def __init__(self): self._publisher = None # BluetoothLEAdvertisementPublisher self._adv = None # BluetoothLEAdvertisement self._started = False self._ext_supported = False self._ext_ok = False self._is_extended = False self._msg_counter = 0 # ASTM F3411-22a Message Counter (0~255) self._import_winsdk() def _import_winsdk(self): try: from winsdk.windows.devices.bluetooth.advertisement import ( BluetoothLEAdvertisementPublisher, BluetoothLEAdvertisement, BluetoothLEAdvertisementDataSection, BluetoothLEAdvertisementPublisherStatus, ) from winsdk.windows.storage.streams import DataWriter self._Publisher = BluetoothLEAdvertisementPublisher self._Advertisement = BluetoothLEAdvertisement self._DataSection = BluetoothLEAdvertisementDataSection self._PubStatus = BluetoothLEAdvertisementPublisherStatus self._DataWriter = DataWriter except ImportError as e: print(f"[错误] winsdk 导入失败: {e}") print("请运行: pip install winsdk") sys.exit(1) def _make_data_section(self, ad_type: int, data: bytes): """构造 BLE AD Data Section""" section = self._DataSection() section.data_type = ad_type writer = self._DataWriter() for b in data: writer.write_byte(b) section.data = writer.detach_buffer() return section def _build_service_data(self, raw_data: bytes) -> bytes: """ 构造 ASTM F3411-22a Service Data 字节。 格式: [UUID 0xFFFA LE (2B)][AppCode 0x0D (1B)][MsgCounter (1B)][Data] """ counter = self._msg_counter & 0xFF self._msg_counter = (self._msg_counter + 1) & 0xFF return bytes([0xFA, 0xFF, 0x0D, counter]) + raw_data def _set_ad_payload(self, raw_data: bytes): """在当前 Advertisement 对象上替换 DataSections""" service_data = self._build_service_data(raw_data) self._adv.data_sections.clear() section = self._make_data_section(0x16, service_data) self._adv.data_sections.append(section) # ── 扩展广播(BLE 5.0,完整帧) ──────────────────────── def try_start_extended(self, rid_payload: bytes) -> bool: """ 尝试以扩展广播发送完整 RID Payload。 返回 True = 成功,False = 硬件不支持(回退 Legacy)。 """ self.stop() self._adv = self._Advertisement() self._set_ad_payload(rid_payload) pub = self._Publisher(self._adv) try: pub.use_extended_advertisement = True except AttributeError: self._ext_supported = False return False pub.start() import time as _t _t.sleep(0.3) try: status = pub.status if status == self._PubStatus.STARTED: self._publisher = pub self._started = True self._ext_ok = True self._ext_supported = True self._is_extended = True print(f"[BLE] 扩展广播已启动 (Payload={len(rid_payload)}B)") return True else: pub.stop() print(f"[BLE] 扩展广播启动失败 (status={status}), 回退到 Legacy 模式") self._ext_supported = False return False except Exception as e: try: pub.stop() except Exception: pass print(f"[BLE] 扩展广播检测失败: {e}, 回退到 Legacy 模式") self._ext_supported = False return False def update_extended(self, rid_payload: bytes): """ 更新扩展广播数据。 复用同一个 Publisher 实例,stop → 更新 → start。 注意:Windows 强制每次 start() 都会更换随机 MAC 地址。 """ if self._publisher is None or self._adv is None: self.try_start_extended(rid_payload) return self._publisher.stop() self._set_ad_payload(rid_payload) self._publisher.start() self._started = True # ── Legacy 单子帧广播(兼容所有适配器) ──────────────── def start_legacy(self, subframe: bytes): """ 发送单条子帧报文。subframe 为 25 字节。 复用同一个 Publisher 实例,stop → 更新 → start。 """ if self._publisher is None or self._adv is None: self._adv = self._Advertisement() self._set_ad_payload(subframe) self._publisher = self._Publisher(self._adv) self._publisher.start() else: self._publisher.stop() self._set_ad_payload(subframe) self._publisher.start() self._started = True def stop(self): """停止并释放广播资源""" if self._publisher is not None and self._started: try: self._publisher.stop() except Exception: pass self._publisher = None self._adv = None self._started = False self._is_extended = False @property def is_running(self) -> bool: return self._started @property def is_extended(self) -> bool: return self._ext_ok class WindowsWiFiNANPublisher: """ 使用 Windows Runtime API 进行 Wi-Fi Aware (NAN) 广播。 要求: - Windows 10/11 操作系统 (1903+) - 支持 Wi-Fi Aware/Direct 的网卡与底层驱动 (如较新的 Intel / Qualcomm 无线网卡) 注意: 桌面电脑普通网卡的驱动常常默认关闭或不支持此功能,如果物理硬件不支持,此功能将自动跳过。 """ def __init__(self): self._publisher = None # WiFiAwarePublisher self._session = None # WiFiAwareSession self._started = False self._supported = False self._msg_counter = 0 self._import_winsdk() def _import_winsdk(self): try: from winsdk.windows.devices.wifiaware import ( WiFiAwareManager, WiFiAwarePublishOptions, ) from winsdk.windows.storage.streams import DataWriter self._Manager = WiFiAwareManager self._PubOptions = WiFiAwarePublishOptions self._DataWriter = DataWriter self._supported = True except ImportError: self._supported = False async def start_async(self, rid_payload: bytes) -> bool: if not self._supported: return False try: # 1. 检查和请求 Wi-Fi Aware 全局访问权限 access = await self._Manager.request_access_async() from winsdk.windows.devices.wifiaware import WiFiAwareAccessStatus if access != WiFiAwareAccessStatus.ALLOWED: print(f"[NAN] 无法访问 Wi-Fi Aware API (状态: {access})") return False # 2. 请求激活会话 session_result = await self._Manager.request_session_async() from winsdk.windows.devices.wifiaware import WiFiAwareSessionStatus if session_result.status != WiFiAwareSessionStatus.SUCCEEDED: # 状态不为SUCCEEDED通常意味着硬件、网卡或驱动彻底不支持它 return False self._session = session_result.session return await self._publish_with_payload(rid_payload) except Exception as e: # 对于不支持的主机,常在底层 UWP 抛出异常 print(f"[NAN] Wi-Fi Aware 启动失败 (通常为网卡不支持): {e}") return False async def _publish_with_payload(self, rid_payload: bytes) -> bool: """注册 Wi-Fi Aware Publish 对象""" if not self._session: return False options = self._PubOptions() # ASTM建议的服务专有名称 options.service_name = "org.wi-fi.aware.uas.rid" writer = self._DataWriter() # NAN Service Specific Info 格式参考 ASTM: [0x0D AppCode] [Counter] [RID Payload] counter = self._msg_counter & 0xFF self._msg_counter = (self._msg_counter + 1) & 0xFF writer.write_byte(0x0D) writer.write_byte(counter) for b in rid_payload: writer.write_byte(b) options.service_specific_info = writer.detach_buffer() pub_result = await self._session.publish_async(options) from winsdk.windows.devices.wifiaware import WiFiAwarePublisherStatus if pub_result.status == WiFiAwarePublisherStatus.SUCCEEDED: self._publisher = pub_result.publisher self._started = True return True else: return False async def update_payload_async(self, rid_payload: bytes): """ Windows的 WiFiAwarePublisher 不能直接 '原地 update_buffer'。 要在纯广播模式下更新内容,我们必须停止旧 Publisher,随后重新发起 Publish。 """ if not self._started or not self._session: return # 停用旧的 self.stop_publisher() # 开启新的 await self._publish_with_payload(rid_payload) def stop_publisher(self): if self._publisher is not None: # Python winsdk 中的 UWP 接口销毁 # 我们直接丢弃引用通常就会引起注销,不过底层依赖于 UWP COM self._publisher = None self._started = False def stop(self): self.stop_publisher() self._session = None @property def is_running(self) -> bool: return self._started # ────────────────────────────────────────────────────────── # 4. 辅助函数:物理量编码 # ────────────────────────────────────────────────────────── def encode_ground_speed(speed_mps: float) -> tuple[int, int]: """ 将地速(m/s)编码为 (ground_speed_raw, speed_multiplier) speed_multiplier=0: v = raw * 0.25 m/s,范围 0~63.75 m/s speed_multiplier=1: v = raw * 0.75 m/s,范围 0~191.25 m/s """ if speed_mps <= 63.75: return (round(speed_mps / 0.25) & 0xFF, 0) else: return (round(speed_mps / 0.75) & 0xFF, 1) def encode_vertical_speed(speed_mps: float) -> int: """垂直速度编码为 int8(单位 0.5 m/s,范围 ±63.5 m/s)""" raw = round(speed_mps / 0.5) return max(-128, min(127, raw)) def encode_track_angle(angle_deg: float) -> tuple[int, int]: """ 将航迹角(0~359°)编码为 (track_angle_raw, ew_flag) track_angle_raw = 0~179 ew_flag=0: 0~179° 朝东;ew_flag=1: 180~359° 朝西(raw=angle-180) """ angle_deg %= 360 if angle_deg < 180: return (int(angle_deg), 0) else: return (int(angle_deg - 180), 1) # ASTM F3411-22a 高度编码常量 ALT_UNKNOWN = -1000 # 填入结构体后序列化为 int16(-1000),表示"未知" def encode_altitude(alt_m: Optional[float]) -> int: """ ASTM F3411-22a 高度编码 -> int16。 公式: raw = (alt_m + 1000) * 2 解码: alt_m = raw / 2 - 1000 范围: -1000m (raw=0) ~ +31767m (raw=0xFFFE); raw=0xFFFF 即 int16(-1) 表示"未知"。 传 None 时返回 ALT_UNKNOWN。 """ if alt_m is None: return ALT_UNKNOWN raw = int((alt_m + 1000.0) * 2) raw = max(0, min(0xFFFE, raw)) # 转为 signed int16 用于 struct ' 32767: raw = raw - 65536 return raw def seconds_since_2019() -> int: """计算从 2019-01-01 00:00:00 UTC 至今的秒数(系统报文时间戳用)""" import calendar epoch_2019 = calendar.timegm((2019, 1, 1, 0, 0, 0, 0, 0, 0)) return int(time.time()) - epoch_2019 def current_hour_tenths() -> int: """当前小时内已过去的 1/10 秒数(位置向量报文时间戳用)""" t = time.time() secs_in_hour = t % 3600 return int(secs_in_hour * 10) % 36000 # ────────────────────────────────────────────────────────── # 5. RID 仿真主类 # ────────────────────────────────────────────────────────── class RIDSimulator: """RID 广播仿真器,周期更新位置信息并通过BLE广播""" def __init__(self, uasid: str = "TestDrone123", description: str = "RID Python Simulator"): # 基本ID报文(静态,启动一次) self.basic = RIDBasicPacket( ua_type=2, # 旋翼 id_type=1, # 特定会话ID(字符串) uasid=uasid ) # 运行描述报文(静态) self.rd = RIDRDPacket( description_type=0, description=description ) # 系统报文(控制站信息,初始坐标待首次定位后更新) self.sys = RIDSYSPacket( cs_pos_type=1, # 控制站实时位置 region_code=2, # 中国 coord_type=0, # WGS84 cs_latitude=0, # 由 update_control_station() 填入 cs_longitude=0, ua_run_category=1, # 开放类 ua_run_level=0, # 微型 ) # 位置向量报文(动态更新) self.pos_vec = RIDPosVecPacket() # 广播器 self.advertiser = WindowsBLEAdvertiser() self.nan_publisher = WindowsWiFiNANPublisher() self._running = False def update_data(self, lat_deg: float, lon_deg: float, alt_agl_m: float, ground_speed_mps: float = 0.0, track_angle_deg: float = 0.0, vertical_speed_mps: float = 0.0, pressure_alt_m: Optional[float] = None, geometric_alt_m: Optional[float] = None): """ 更新 RID 数据(仅填充结构体,不触发广播)。 广播由 main loop 按策略驱动。 """ gs_raw, sm = encode_ground_speed(ground_speed_mps) ta_raw, ew = encode_track_angle(track_angle_deg) vs_raw = encode_vertical_speed(vertical_speed_mps) ts_tenths = current_hour_tenths() self.pos_vec.operational_status = 2 # 空中飞行 self.pos_vec.altitude_type = 0 # 气压高度 self.pos_vec.speed_multiplier = sm self.pos_vec.track_angle_ew = ew self.pos_vec.track_angle = ta_raw self.pos_vec.ground_speed = gs_raw self.pos_vec.vertical_speed = vs_raw self.pos_vec.latitude = int(lat_deg * 1e7) self.pos_vec.longitude = int(lon_deg * 1e7) # 高度按 ASTM F3411-22a 编码: raw = (alt_m + 1000) * 2 self.pos_vec.altitude_agl = encode_altitude(alt_agl_m) self.pos_vec.pressure_altitude = encode_altitude(pressure_alt_m) self.pos_vec.geometric_altitude = encode_altitude(geometric_alt_m) self.pos_vec.timestamp = ts_tenths self.sys.timestamp = seconds_since_2019() def serialize_subframes(self) -> list[bytes]: """返回各子报文序列化后的 25 字节列表(用于 Legacy 轮播)""" return [ rid_basic_serialize(self.basic), rid_posvec_serialize(self.pos_vec), rid_rd_serialize(self.rd), rid_sys_serialize(self.sys), ] def serialize_full(self) -> bytes: """返回完整 RID Payload(用于扩展广播)""" return rid_packet_full(self.basic, self.pos_vec, self.rd, self.sys) def update_control_station(self, lat_deg: float, lon_deg: float, alt_m: float = 0.0): """同步更新控制站位置(笔记本 = 操控手位置)""" self.sys.cs_latitude = int(lat_deg * 1e7) self.sys.cs_longitude = int(lon_deg * 1e7) self.sys.cs_altitude = encode_altitude(alt_m) def stop(self): """停止广播""" self._running = False self.advertiser.stop() self.nan_publisher.stop() # ────────────────────────────────────────────────────────── # 6. 调试:打印报文内容 # ────────────────────────────────────────────────────────── def print_packet_hex(payload: bytes, label: str = "RID Payload"): """打印报文十六进制内容""" print(f"\n[{label}] 长度={len(payload)} 字节") for i in range(0, len(payload), 16): chunk = payload[i:i+16] hex_str = ' '.join(f'{b:02X}' for b in chunk) asc_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk) print(f" {i:04X}: {hex_str:<48} {asc_str}") def print_ad_data(ad_data: bytes, label: str = "BLE AD Data"): """打印 BLE AD Data 十六进制内容""" print_packet_hex(ad_data, label) # ────────────────────────────────────────────────────────── # 7. 主程序(异步,使用本机真实位置和系统时间) # ────────────────────────────────────────────────────────── _SUBFRAME_NAMES = ["BasicID", "PosVec", "RD", "SYS"] async def main_async(enable_ble: bool = True, enable_nan: bool = True): print("=" * 60) print(" RID BLE/NAN 仿真程序 (Windows 11 / ASTM F3411-22a)") print(" 位置: Windows 定位服务 | 时间: UTC 系统时钟") print(" 模式: 圆周飞行 50km/h | 广播 2Hz") print("=" * 60) # 检查依赖 if not _check_winsdk(): print("\n[错误] 缺少依赖库 winsdk,请运行:") print(" pip install winsdk") sys.exit(1) if sys.platform != 'win32': print("[错误] 此程序仅支持 Windows,当前系统:", sys.platform) sys.exit(1) # ── 初始化定位 ────────────────────────────────────────── location = WindowsLocationProvider() print("\n[位置] 正在请求位置权限...") granted = await location.request_access_async() init_lat = WindowsLocationProvider.FALLBACK_LAT init_lon = WindowsLocationProvider.FALLBACK_LON init_alt = 0.0 init_acc = 9999.0 if granted: print("[位置] 正在获取初始坐标(首次定位可能需要数秒)...") pos = await location.get_position_async() if pos: init_lat, init_lon, init_alt, init_acc = pos print(f"[位置] 初始位置: {init_lat:.6f}N, {init_lon:.6f}E " f"高度={init_alt:.1f}m 精度=+/-{init_acc:.0f}m") else: print(f"[位置] 初始定位失败,使用默认坐标 ({init_lat:.4f}N, {init_lon:.4f}E)") else: print(f"[位置] 权限被拒绝,使用默认坐标 ({init_lat:.4f}N, {init_lon:.4f}E)") # ── 圆周飞行参数 ──────────────────────────────────────── FLIGHT_SPEED_KMH = 50.0 # 飞行速度 km/h FLIGHT_SPEED_MPS = FLIGHT_SPEED_KMH / 3.6 # 13.889 m/s FLIGHT_RADIUS_M = 100.0 # 圆周半径 100m FLIGHT_ALT_AGL = 80.0 # 飞行高度 80m AGL CIRCUMFERENCE = 2.0 * math.pi * FLIGHT_RADIUS_M # 628.3m PERIOD = CIRCUMFERENCE / FLIGHT_SPEED_MPS # 一圈耗时 ~45.2s OMEGA = 2.0 * math.pi / PERIOD # 角速度 rad/s # 圆心 = 初始定位坐标 center_lat_rad = math.radians(init_lat) center_lon_rad = math.radians(init_lon) # 1度纬度 ≈ 111320m; 1度经度 ≈ 111320 * cos(lat) m M_PER_DEG_LAT = 111320.0 M_PER_DEG_LON = 111320.0 * math.cos(center_lat_rad) BROADCAST_INTERVAL = 0.5 # 2 Hz 广播 print(f"\n[圆周飞行]") print(f" 圆心 : {init_lat:.6f}N, {init_lon:.6f}E") print(f" 半径 : {FLIGHT_RADIUS_M:.0f}m") print(f" 速度 : {FLIGHT_SPEED_KMH:.0f} km/h ({FLIGHT_SPEED_MPS:.1f} m/s)") print(f" 高度(AGL) : {FLIGHT_ALT_AGL:.0f}m") print(f" 一圈耗时 : {PERIOD:.1f}s") print(f" 广播频率 : {1.0/BROADCAST_INTERVAL:.0f} Hz") # ── 创建仿真器 ────────────────────────────────────────── sim = RIDSimulator( uasid="TestDrone123", description="RID Python Sim", ) sim.update_control_station(init_lat, init_lon, init_alt) # ── 探测广播模式 ───────────────────────────────────────── sim.update_data(lat_deg=init_lat, lon_deg=init_lon, alt_agl_m=FLIGHT_ALT_AGL, ground_speed_mps=FLIGHT_SPEED_MPS) adv = sim.advertiser print("\n[通信层支持探测]") use_ext = False if enable_ble: print(" 1) BLE 5.0 扩展广播...") use_ext = adv.try_start_extended(sim.serialize_full()) if not use_ext: print(" -> 不支持, 降级使用 Legacy 子帧轮播模式") else: print(" -> 支持并已启动") else: print(" 1) BLE 广播 ... [已禁用]") nan_ok = False if enable_nan: print(" 2) Wi-Fi Aware (NAN) 广播...") nan_ok = await sim.nan_publisher.start_async(sim.serialize_full()) if not nan_ok: print(" -> 不支持 (原因: 硬件网卡未开放支持或缺少权限)") else: print(" -> 支持并已启动") else: print(" 2) Wi-Fi Aware (NAN) 广播 ... [已禁用]") print("\n[配置]") print(f" UASID : {sim.basic.uasid}") print(f" UA类型 : {sim.basic.ua_type} (旋翼)") id_type_name = { 0: "无", 1: "序列号", 2: "CAA注册", 3: "UTM委派", 4: "特定会话ID(字符串)", }.get(sim.basic.id_type, "未知") print(f" ID类型 : {sim.basic.id_type} ({id_type_name})") print(f" 描述 : {sim.rd.description}") print(f" 运行类别 : {sim.sys.ua_run_category} (开放)") print(f" UA等级 : {sim.sys.ua_run_level} (微型)") ble_mode_str = "未启用" if enable_ble: ble_mode_str = "5.0 扩展" if use_ext else "Legacy" print(f" 广播模式 : BLE ({ble_mode_str}), Wi-Fi NAN ({'开' if nan_ok else '关'})") print(f" 时间来源 : UTC 系统时钟") print(f" 位置来源 : {'圆周飞行仿真 (圆心=本机定位)' if granted else '圆周飞行仿真 (圆心=默认坐标)'}") print(f"\n[提示] 正在运行中... 按 Ctrl+C 停止广播") # Legacy 模式参数 subframe_idx = 0 n = 0 t_start = time.monotonic() try: while True: # ── 计算圆周飞行位置 ─────────────────────────── t_elapsed = time.monotonic() - t_start theta = OMEGA * t_elapsed # 当前角度 (rad),从正东方向逆时针 # 无人机在圆上的位置偏移 (m) dx = FLIGHT_RADIUS_M * math.cos(theta) # 东向偏移 dy = FLIGHT_RADIUS_M * math.sin(theta) # 北向偏移 # 转换为经纬度 lat = init_lat + dy / M_PER_DEG_LAT lon = init_lon + dx / M_PER_DEG_LON # 航迹角 = 速度方向 = 圆切线方向 # 位置 (cos θ, sin θ), 速度方向 (-sin θ, cos θ) # 航迹角以正北为0°顺时针: heading = 90° - atan2(cos θ, -sin θ) vx = -FLIGHT_SPEED_MPS * math.sin(theta) # 东向速度分量 vy = FLIGHT_SPEED_MPS * math.cos(theta) # 北向速度分量 heading_deg = (math.degrees(math.atan2(vx, vy))) % 360 # atan2(E,N) → 北基准顺时针 # 更新 RID 数据 sim.update_data( lat_deg=lat, lon_deg=lon, alt_agl_m=FLIGHT_ALT_AGL, ground_speed_mps=FLIGHT_SPEED_MPS, track_angle_deg=heading_deg, vertical_speed_mps=0.0, ) # ── 日志输出 ────────────────────────────────── utc_str = time.strftime("%H:%M:%S", time.gmtime()) ts_tenths = current_hour_tenths() mode_tag = "" if enable_ble: mode_tag += "BLE:EXT " if use_ext else f"BLE:L({_SUBFRAME_NAMES[subframe_idx % 4]}) " if nan_ok: mode_tag += "NAN:ON" mode_tag = mode_tag.strip() or "NONE" print(f"[{n:4d}] {utc_str} UTC | " f"{lat:.6f}N {lon:.6f}E " f"Alt={FLIGHT_ALT_AGL:+.1f}m " f"GS={FLIGHT_SPEED_KMH:.0f}km/h " f"HDG={heading_deg:.0f}° " f"[{mode_tag}] ts={ts_tenths}") if n == 0: payload = sim.serialize_full() print_packet_hex(payload, "RID Payload (第1帧详情)") print_packet_hex(build_ble_ad_data(payload), "BLE AD Data (供参考)") print_packet_hex(build_wifi_beacon_vendor_ie(payload), "Wi-Fi Beacon Vendor IE (供参考)") n += 1 # ── 广播更新 ───────────────────────────────── payload_full = sim.serialize_full() if enable_ble: if use_ext: adv.update_extended(payload_full) else: subframes = sim.serialize_subframes() sf = subframes[subframe_idx % len(subframes)] adv.start_legacy(sf) subframe_idx = (subframe_idx + 1) % len(subframes) # 更新 Wi-Fi NAN if enable_nan and sim.nan_publisher.is_running: await sim.nan_publisher.update_payload_async(payload_full) await asyncio.sleep(BROADCAST_INTERVAL) except KeyboardInterrupt: print("\n[用户中断] 正在停止...") finally: adv.stop() print("[完成] RID 仿真已终止") def main(): parser = argparse.ArgumentParser(description="RID BLE/NAN 仿真程序 - Windows 11") parser.add_argument("--disable-ble", action="store_true", help="禁用 BLE (蓝牙) 广播") parser.add_argument("--disable-nan", action="store_true", help="禁用 Wi-Fi Aware (NAN) 广播") args = parser.parse_args() asyncio.run(main_async(enable_ble=not args.disable_ble, enable_nan=not args.disable_nan)) if __name__ == "__main__": main()