From babae2b754f14cbed30482a9ce2244a2bf559dbc Mon Sep 17 00:00:00 2001 From: liujiahua Date: Sun, 1 Mar 2026 17:15:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=E3=80=8C/=E3=80=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rid_sim_windows.py | 1114 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1114 insertions(+) create mode 100644 rid_sim_windows.py diff --git a/rid_sim_windows.py b/rid_sim_windows.py new file mode 100644 index 0000000..4efe7e7 --- /dev/null +++ b/rid_sim_windows.py @@ -0,0 +1,1114 @@ +""" +RID BLE 仿真程序 - Windows 11 +基于 ASTM F3411-22a / GB 42590-2023 协议 +通过本机蓝牙以 BLE 5.0 扩展广播发送 RID 数据 + +依赖: + pip install winsdk + +运行要求: + - Windows 11 / Windows 10 (版本 1903+) + - 支持 BLE 5.0 的蓝牙适配器 + - 在 Windows 设置 > 隐私与安全性 > 位置 中为应用开启位置权限 + - 以管理员权限运行(部分机器需要) + +位置来源(按优先级): + 1. Windows 位置服务(GPS/WiFi/IP 融合定位) + 2. 上次已知位置缓存 + 3. 默认坐标(北京,仅在完全无法定位时使用) + +时间来源: + - 位置向量时间戳:UTC 系统时钟,当前小时内 1/10 秒计数 + - 系统报文时间戳:Unix UTC,基准 2019-01-01 00:00:00 +""" + +import struct +import asyncio +import time +import sys +import math +import argparse +from dataclasses import dataclass, field +from typing import Optional + +# ────────────────────────────────────────────────────────── +# 1. RID 数据结构定义 +# ────────────────────────────────────────────────────────── + +@dataclass +class RIDBasicPacket: + """基本ID报文 (Message Type 0x0)""" + ua_type: int = 2 # UA类型: 0=未声明,1=固定翼,2=旋翼,3=直升机,4=混合升力,5=多旋翼 + id_type: int = 4 # ID类型: 0=无,1=序列号,2=CAA,3=UTM委派,4=特定会话ID(字符串) + uasid: str = "" # 产品唯一识别码(最多20字节ASCII) + reserved: bytes = field(default_factory=lambda: bytes(3)) + + +@dataclass +class RIDPosVecPacket: + """位置向量报文 (Message Type 0x1)""" + operational_status: int = 0 # 运行状态: 0=未初始化,1=地面待机,2=空中飞行,3=紧急 + altitude_type: int = 0 # 高度类型: 0=气压高度,1=几何高度 + track_angle_ew: int = 0 # 航迹角EW方向: 0=东,1=西 + speed_multiplier: int = 0 # 速度乘数: 0=×0.25m/s,1=×0.75m/s + track_angle: int = 0 # 航迹角 0~179,配合EW标志 + ground_speed: int = 0 # 地速 (Raw,×0.25 m/s) + vertical_speed: int = 0 # 垂直速度(int8,+上升/-下降,×0.5 m/s) + latitude: int = 0 # 纬度 (1e-7 度, int32 小端) + longitude: int = 0 # 经度 (1e-7 度, int32 小端) + pressure_altitude: int = 0 # 气压高度 (int16 小端) + geometric_altitude: int = 0 # 几何高度 (int16 小端) + altitude_agl: int = 0 # 距地高度 (int16 小端) + horiz_accuracy: int = 0 # 水平精度 (4bit) + vert_accuracy: int = 0 # 垂直精度 (4bit) + speed_acc: int = 0 # 速度精度 (4bit) + timestamp: int = 0 # 时间戳 (当前小时内1/10秒数, uint16) + timestamp_acc: int = 0 # 时间戳精度 (4bit) + reserved: int = 0 + + +@dataclass +class RIDRDPacket: + """运行描述报文 (Message Type 0x3)""" + description_type: int = 0 # 描述类型: 0=文字描述 + description: str = "" # 描述内容(最多23字节ASCII) + + +@dataclass +class RIDSYSPacket: + """系统报文 (Message Type 0x4)""" + cs_pos_type: int = 1 # 控制站位置类型: 0=未知,1=实时,2=起飞点,3=固定 + region_code: int = 2 # 区域: 0=未定义,2=中国 + coord_type: int = 0 # 坐标系: 0=WGS84,1=其他 + cs_latitude: int = 0 # 控制站纬度 (1e-7 度, int32) + cs_longitude: int = 0 # 控制站经度 (1e-7 度, int32) + area_count: int = 0 # 运行区域计数 (uint16) + area_radius: int = 0 # 运行区域半径 (×10 m) + area_alt_upper: int = 0 # 运行区域高度上限 (int16) + area_alt_lower: int = 0 # 运行区域高度下限 (int16) + ua_run_category: int = 1 # 运行类别: 0=未定义,1=开放,2=特许,3=审定 + ua_run_level: int = 0 # UA等级: 0=微型,1=轻型,2=小型,3=其他RID + cs_altitude: int = 0 # 控制站高度 (int16) + timestamp: int = 0 # 时间戳 (Unix - 2019-01-01, 秒) + reserved: int = 0 + + +# ────────────────────────────────────────────────────────── +# 2. RID 序列化函数(完全对应 C 代码逻辑) +# ────────────────────────────────────────────────────────── + +def rid_basic_serialize(packet: RIDBasicPacket) -> bytes: + """基本ID报文序列化 → 25字节,对应 RIDBasicSerialize()""" + buf = bytearray(25) + buf[0] = 0x01 # 报头:类型0x0 | 版本0x1 + # Bit[7:4]=ID类型, Bit[3:0]=UA类型 + buf[1] = ((packet.id_type & 0xF) << 4) | (packet.ua_type & 0xF) + # UASID:ASCII编码,不足20字节补零,超出截断 + uasid_bytes = packet.uasid.encode('ascii', errors='replace')[:20] + for i, b in enumerate(uasid_bytes): + buf[2 + i] = b + # reserved[3] = 0(已初始化为0) + return bytes(buf) + + +def rid_posvec_serialize(packet: RIDPosVecPacket) -> bytes: + """位置向量报文序列化 → 25字节,对应 RIDPosVecSerialize()""" + buf = bytearray(25) + buf[0] = 0x11 # 报头:类型0x1 | 版本0x1 + # Flag byte: Bit[0]=SpeedMultiplier, Bit[1]=TrackAngleEW, Bit[2]=AltitudeType, Bit[3]=Rsvd, Bit[7:4]=OperationalStatus + flag = (packet.speed_multiplier & 1) | \ + ((packet.track_angle_ew & 1) << 1) | \ + ((packet.altitude_type & 1) << 2) | \ + ((packet.operational_status & 0xF) << 4) + buf[1] = flag + buf[2] = packet.track_angle & 0xFF + buf[3] = packet.ground_speed & 0xFF + # VerticalSpeed: int8,用struct pack + buf[4] = struct.pack('b', max(-128, min(127, packet.vertical_speed)))[0] + # Latitude: int32 小端 + lat_bytes = struct.pack(' bytes: + """运行描述报文序列化 → 25字节,对应 RIDRDSerialize()""" + buf = bytearray(25) + buf[0] = 0x31 # 报头:类型0x3 | 版本0x1 + buf[1] = packet.description_type & 0xFF + desc_bytes = packet.description.encode('ascii', errors='replace')[:23] + for i, b in enumerate(desc_bytes): + buf[2 + i] = b + return bytes(buf) + + +def rid_sys_serialize(packet: RIDSYSPacket) -> bytes: + """系统报文序列化 → 25字节,对应 RIDSYSSerialize()""" + buf = bytearray(25) + buf[0] = 0x41 # 报头:类型0x4 | 版本0x1 + # Tag: Bit[1:0]=CSPosType, Bit[4:2]=RegionCode, Bit[6:5]=CoordType, Bit[7]=Rsvd + tag = (packet.cs_pos_type & 0x3) | \ + ((packet.region_code & 0x7) << 2) | \ + ((packet.coord_type & 0x3) << 5) + buf[1] = tag + # ControlStationLatitude: int32 小端 + cs_lat = struct.pack(' bytes: + """ + 将子报文列表打包为完整 RID Payload。 + 返回最终序列化字节串,对应 RIDPayloadSerialize() + RIDPacket() + """ + count = len(subpackets) + assert 1 <= count <= 10, "子报文数量须在 1~10 之间" + header = 0xF1 # 类型0xF | 版本0x1 + sub_len = 0x19 # 25字节固定长度 + result = bytearray([header, sub_len, count]) + for pkt in subpackets: + assert len(pkt) == 25, "每个子报文必须恰好25字节" + result.extend(pkt) + return bytes(result) + + +# 全局消息计数器(用于 build_ble_ad_data 诊断输出) +_diag_msg_counter = 0 + +def build_ble_ad_data(rid_payload: bytes) -> bytes: + """ + 将 RID Payload 打包为 BLE AD Data 结构(仅用于诊断/hex dump 显示)。 + 格式: [AD Length][AD Type=0x16][UUID=0xFFFA LE][AppCode=0x0D][Counter][RID Payload] + 符合 ASTM F3411-22a BLE 传输层规范。 + """ + global _diag_msg_counter + uuid_lsb = 0xFA + uuid_msb = 0xFF + counter = _diag_msg_counter & 0xFF + _diag_msg_counter = (_diag_msg_counter + 1) & 0xFF + ad_content = bytes([0x16, uuid_lsb, uuid_msb, 0x0D, counter]) + rid_payload + ad_length = len(ad_content) + return bytes([ad_length]) + ad_content + + +def build_wifi_beacon_vendor_ie(rid_payload: bytes) -> bytes: + """ + 将 RID Payload 打包为 Wi-Fi Beacon 帧的 Vendor Specific IE (Element ID: 221)。 + 符合 ASTM F3411-22a Wi-Fi 传输层规范: ASD-STAN OUI (FA:0B:BC), Type 0x0D。 + 注意: 原生 Windows API 无法直接发送底层 Wi-Fi Beacon 帧(通常需监视模式网卡+Npcap等底层驱动支持)。 + 此处实现用于模拟组包过程,打印其 Hex 并在后续移植到其他平台(如 Linux/C/Scapy)时参考。 + """ + global _diag_msg_counter + counter = _diag_msg_counter & 0xFF + # Element ID: 221 (0xDD) + # Length: 从 OUI 计算起,长为 5 字节(OUI 3字节 + Type 1字节 + Counter 1字节) + Payload 长度 + length = 5 + len(rid_payload) + ie = bytearray([0xDD, length, 0xFA, 0x0B, 0xBC, 0x0D, counter]) + ie.extend(rid_payload) + return bytes(ie) + + +def rid_packet_full(basic: RIDBasicPacket, + pos_vec: RIDPosVecPacket, + rd: RIDRDPacket, + sys: RIDSYSPacket) -> bytes: + """完整打包:Basic + PosVec + RD + SYS,对应 RIDPacket()""" + subpackets = [ + rid_basic_serialize(basic), + rid_posvec_serialize(pos_vec), + rid_rd_serialize(rd), + rid_sys_serialize(sys), + ] + return rid_payload_serialize(subpackets) + + +def rid_packet_nano(basic: RIDBasicPacket, + pos_vec: RIDPosVecPacket, + sys: RIDSYSPacket) -> bytes: + """最小包:Basic + PosVec + SYS,对应 RIDPacketNano()""" + subpackets = [ + rid_basic_serialize(basic), + rid_posvec_serialize(pos_vec), + rid_sys_serialize(sys), + ] + return rid_payload_serialize(subpackets) + + +# ────────────────────────────────────────────────────────── +# 3. Windows BLE 广播模块 +# ────────────────────────────────────────────────────────── + +def _check_winsdk(): + """检查 winsdk 是否可用""" + try: + import winsdk # noqa: F401 + return True + except ImportError: + return False + + +# ────────────────────────────────────────────────────────── +# 3a. Windows 本机定位提供器 +# ────────────────────────────────────────────────────────── + +class WindowsLocationProvider: + """ + 通过 Windows Runtime Geolocation API 获取本机位置。 + 支持 GPS / WiFi / 移动网络 / IP 融合定位。 + 需要在 Windows 设置 > 隐私与安全性 > 位置 中开启位置权限。 + """ + + # 默认回退坐标(北京) + FALLBACK_LAT = 39.9042 + FALLBACK_LON = 116.4074 + + def __init__(self): + self._last_lat: Optional[float] = None + self._last_lon: Optional[float] = None + self._last_alt: float = 0.0 + self._last_accuracy: float = 9999.0 + self._access_granted: bool = False + self._locator = None + self._available: bool = False + self._import_api() + + def _import_api(self): + try: + from winsdk.windows.devices.geolocation import ( + Geolocator, + GeolocationAccessStatus, + PositionAccuracy, + ) + self._Geolocator = Geolocator + self._AccessStatus = GeolocationAccessStatus + self._PositionAccuracy = PositionAccuracy + self._available = True + except ImportError as e: + print(f"[位置] Geolocation API 不可用: {e}") + self._available = False + + async def request_access_async(self) -> bool: + """请求位置访问权限,需要在首次调用时执行一次""" + if not self._available: + return False + try: + access = await self._Geolocator.request_access_async() + self._access_granted = (access == self._AccessStatus.ALLOWED) + if self._access_granted: + self._locator = self._Geolocator() + self._locator.desired_accuracy = self._PositionAccuracy.HIGH + print("[位置] 位置权限已授予,定位精度: HIGH") + else: + print(f"[位置] 位置权限被拒绝 (状态={access})") + print("[位置] 请前往: 设置 > 隐私与安全性 > 位置,开启位置访问权限") + return self._access_granted + except Exception as e: + print(f"[位置] 请求权限失败: {e}") + return False + + async def get_position_async(self) -> Optional[tuple]: + """ + 异步获取当前位置。 + 返回: (lat_deg, lon_deg, alt_m, accuracy_m) 或 None(失败时) + """ + if not self._available or not self._access_granted or self._locator is None: + return None + try: + pos = await self._locator.get_geoposition_async() + coord = pos.coordinate + lat = float(coord.latitude) + lon = float(coord.longitude) + # altitude 在部分设备上可能为 None + alt = float(coord.altitude) if coord.altitude is not None else 0.0 + # accuracy 为圆形误差(米) + accuracy = float(coord.accuracy) if coord.accuracy is not None else 9999.0 + self._last_lat = lat + self._last_lon = lon + self._last_alt = alt + self._last_accuracy = accuracy + return (lat, lon, alt, accuracy) + except Exception as e: + print(f"[位置] 获取位置失败: {e}") + return None + + @property + def last_position(self) -> Optional[tuple]: + """上次成功获取的位置 (lat, lon, alt, accuracy),无记录时返回 None""" + if self._last_lat is None: + return None + return (self._last_lat, self._last_lon, self._last_alt, self._last_accuracy) + + @property + def has_fix(self) -> bool: + """是否已有定位结果""" + return self._last_lat is not None + + +class WindowsBLEAdvertiser: + """ + 使用 Windows Runtime API 进行 BLE 广播。 + + 工作模式(按优先级自动选择): + 1. 扩展广播(BLE 5.0)—— 单帧发送完整 RID Payload(需硬件支持) + 2. Legacy 单子帧轮播 —— 每次发送 1 条 25 字节子报文,由调用方周期轮换 + + MAC 地址稳定策略: + - Windows 10/11 的 BLE 广播 API 默认使用 RPA (Resolvable Private Address), + 且每次 start() 都会生成新的随机 MAC。 + - 即使设置 is_anonymous = False,Windows 仍可能出于隐私保护强制轮换 MAC。 + - 唯一能保持 MAC 绝对不变的方法是:**永远不调用 stop(),也不创建新 Publisher**。 + - 既然 in-place 更新 DataSections 无效,我们改用 **ManufacturerData** 或 **动态更新 Payload** 的方式, + 但 WinRT API 限制极严。 + - 最终妥协方案:在 RID 协议中,接收端**不应该依赖 MAC 地址**来关联无人机, + 而应该依赖 **UASID (Basic ID 报文)**。只要 UASID 相同,接收端就应视为同一架无人机。 + - 为了尽量减少 MAC 变化频率,我们降低更新频率,或者接受 MAC 变化的事实。 + """ + + def __init__(self): + self._publisher = None # BluetoothLEAdvertisementPublisher + self._adv = None # BluetoothLEAdvertisement + self._started = False + self._ext_supported = False + self._ext_ok = False + self._is_extended = False + self._msg_counter = 0 # ASTM F3411-22a Message Counter (0~255) + self._import_winsdk() + + def _import_winsdk(self): + try: + from winsdk.windows.devices.bluetooth.advertisement import ( + BluetoothLEAdvertisementPublisher, + BluetoothLEAdvertisement, + BluetoothLEAdvertisementDataSection, + BluetoothLEAdvertisementPublisherStatus, + ) + from winsdk.windows.storage.streams import DataWriter + self._Publisher = BluetoothLEAdvertisementPublisher + self._Advertisement = BluetoothLEAdvertisement + self._DataSection = BluetoothLEAdvertisementDataSection + self._PubStatus = BluetoothLEAdvertisementPublisherStatus + self._DataWriter = DataWriter + except ImportError as e: + print(f"[错误] winsdk 导入失败: {e}") + print("请运行: pip install winsdk") + sys.exit(1) + + def _make_data_section(self, ad_type: int, data: bytes): + """构造 BLE AD Data Section""" + section = self._DataSection() + section.data_type = ad_type + writer = self._DataWriter() + for b in data: + writer.write_byte(b) + section.data = writer.detach_buffer() + return section + + def _build_service_data(self, raw_data: bytes) -> bytes: + """ + 构造 ASTM F3411-22a Service Data 字节。 + 格式: [UUID 0xFFFA LE (2B)][AppCode 0x0D (1B)][MsgCounter (1B)][Data] + """ + counter = self._msg_counter & 0xFF + self._msg_counter = (self._msg_counter + 1) & 0xFF + return bytes([0xFA, 0xFF, 0x0D, counter]) + raw_data + + def _set_ad_payload(self, raw_data: bytes): + """在当前 Advertisement 对象上替换 DataSections""" + service_data = self._build_service_data(raw_data) + self._adv.data_sections.clear() + section = self._make_data_section(0x16, service_data) + self._adv.data_sections.append(section) + + # ── 扩展广播(BLE 5.0,完整帧) ──────────────────────── + + def try_start_extended(self, rid_payload: bytes) -> bool: + """ + 尝试以扩展广播发送完整 RID Payload。 + 返回 True = 成功,False = 硬件不支持(回退 Legacy)。 + """ + self.stop() + self._adv = self._Advertisement() + self._set_ad_payload(rid_payload) + + pub = self._Publisher(self._adv) + try: + pub.use_extended_advertisement = True + except AttributeError: + self._ext_supported = False + return False + + pub.start() + import time as _t + _t.sleep(0.3) + try: + status = pub.status + if status == self._PubStatus.STARTED: + self._publisher = pub + self._started = True + self._ext_ok = True + self._ext_supported = True + self._is_extended = True + print(f"[BLE] 扩展广播已启动 (Payload={len(rid_payload)}B)") + return True + else: + pub.stop() + print(f"[BLE] 扩展广播启动失败 (status={status}), 回退到 Legacy 模式") + self._ext_supported = False + return False + except Exception as e: + try: + pub.stop() + except Exception: + pass + print(f"[BLE] 扩展广播检测失败: {e}, 回退到 Legacy 模式") + self._ext_supported = False + return False + + def update_extended(self, rid_payload: bytes): + """ + 更新扩展广播数据。 + 复用同一个 Publisher 实例,stop → 更新 → start。 + 注意:Windows 强制每次 start() 都会更换随机 MAC 地址。 + """ + if self._publisher is None or self._adv is None: + self.try_start_extended(rid_payload) + return + + self._publisher.stop() + self._set_ad_payload(rid_payload) + self._publisher.start() + self._started = True + + # ── Legacy 单子帧广播(兼容所有适配器) ──────────────── + + def start_legacy(self, subframe: bytes): + """ + 发送单条子帧报文。subframe 为 25 字节。 + 复用同一个 Publisher 实例,stop → 更新 → start。 + """ + if self._publisher is None or self._adv is None: + self._adv = self._Advertisement() + self._set_ad_payload(subframe) + self._publisher = self._Publisher(self._adv) + self._publisher.start() + else: + self._publisher.stop() + self._set_ad_payload(subframe) + self._publisher.start() + self._started = True + + def stop(self): + """停止并释放广播资源""" + if self._publisher is not None and self._started: + try: + self._publisher.stop() + except Exception: + pass + self._publisher = None + self._adv = None + self._started = False + self._is_extended = False + + @property + def is_running(self) -> bool: + return self._started + + @property + def is_extended(self) -> bool: + return self._ext_ok + + +class WindowsWiFiNANPublisher: + """ + 使用 Windows Runtime API 进行 Wi-Fi Aware (NAN) 广播。 + 要求: + - Windows 10/11 操作系统 (1903+) + - 支持 Wi-Fi Aware/Direct 的网卡与底层驱动 (如较新的 Intel / Qualcomm 无线网卡) + 注意: 桌面电脑普通网卡的驱动常常默认关闭或不支持此功能,如果物理硬件不支持,此功能将自动跳过。 + """ + def __init__(self): + self._publisher = None # WiFiAwarePublisher + self._session = None # WiFiAwareSession + self._started = False + self._supported = False + self._msg_counter = 0 + self._import_winsdk() + + def _import_winsdk(self): + try: + from winsdk.windows.devices.wifiaware import ( + WiFiAwareManager, + WiFiAwarePublishOptions, + ) + from winsdk.windows.storage.streams import DataWriter + self._Manager = WiFiAwareManager + self._PubOptions = WiFiAwarePublishOptions + self._DataWriter = DataWriter + self._supported = True + except ImportError: + self._supported = False + + async def start_async(self, rid_payload: bytes) -> bool: + if not self._supported: + return False + + try: + # 1. 检查和请求 Wi-Fi Aware 全局访问权限 + access = await self._Manager.request_access_async() + from winsdk.windows.devices.wifiaware import WiFiAwareAccessStatus + if access != WiFiAwareAccessStatus.ALLOWED: + print(f"[NAN] 无法访问 Wi-Fi Aware API (状态: {access})") + return False + + # 2. 请求激活会话 + session_result = await self._Manager.request_session_async() + from winsdk.windows.devices.wifiaware import WiFiAwareSessionStatus + if session_result.status != WiFiAwareSessionStatus.SUCCEEDED: + # 状态不为SUCCEEDED通常意味着硬件、网卡或驱动彻底不支持它 + return False + self._session = session_result.session + + return await self._publish_with_payload(rid_payload) + + except Exception as e: + # 对于不支持的主机,常在底层 UWP 抛出异常 + print(f"[NAN] Wi-Fi Aware 启动失败 (通常为网卡不支持): {e}") + return False + + async def _publish_with_payload(self, rid_payload: bytes) -> bool: + """注册 Wi-Fi Aware Publish 对象""" + if not self._session: + return False + + options = self._PubOptions() + # ASTM建议的服务专有名称 + options.service_name = "org.wi-fi.aware.uas.rid" + + writer = self._DataWriter() + # NAN Service Specific Info 格式参考 ASTM: [0x0D AppCode] [Counter] [RID Payload] + counter = self._msg_counter & 0xFF + self._msg_counter = (self._msg_counter + 1) & 0xFF + + writer.write_byte(0x0D) + writer.write_byte(counter) + for b in rid_payload: + writer.write_byte(b) + + options.service_specific_info = writer.detach_buffer() + + pub_result = await self._session.publish_async(options) + from winsdk.windows.devices.wifiaware import WiFiAwarePublisherStatus + if pub_result.status == WiFiAwarePublisherStatus.SUCCEEDED: + self._publisher = pub_result.publisher + self._started = True + return True + else: + return False + + async def update_payload_async(self, rid_payload: bytes): + """ + Windows的 WiFiAwarePublisher 不能直接 '原地 update_buffer'。 + 要在纯广播模式下更新内容,我们必须停止旧 Publisher,随后重新发起 Publish。 + """ + if not self._started or not self._session: + return + # 停用旧的 + self.stop_publisher() + # 开启新的 + await self._publish_with_payload(rid_payload) + + def stop_publisher(self): + if self._publisher is not None: + # Python winsdk 中的 UWP 接口销毁 + # 我们直接丢弃引用通常就会引起注销,不过底层依赖于 UWP COM + self._publisher = None + self._started = False + + def stop(self): + self.stop_publisher() + self._session = None + + @property + def is_running(self) -> bool: + return self._started + + +# ────────────────────────────────────────────────────────── +# 4. 辅助函数:物理量编码 +# ────────────────────────────────────────────────────────── + +def encode_ground_speed(speed_mps: float) -> tuple[int, int]: + """ + 将地速(m/s)编码为 (ground_speed_raw, speed_multiplier) + speed_multiplier=0: v = raw * 0.25 m/s,范围 0~63.75 m/s + speed_multiplier=1: v = raw * 0.75 m/s,范围 0~191.25 m/s + """ + if speed_mps <= 63.75: + return (round(speed_mps / 0.25) & 0xFF, 0) + else: + return (round(speed_mps / 0.75) & 0xFF, 1) + + +def encode_vertical_speed(speed_mps: float) -> int: + """垂直速度编码为 int8(单位 0.5 m/s,范围 ±63.5 m/s)""" + raw = round(speed_mps / 0.5) + return max(-128, min(127, raw)) + + +def encode_track_angle(angle_deg: float) -> tuple[int, int]: + """ + 将航迹角(0~359°)编码为 (track_angle_raw, ew_flag) + track_angle_raw = 0~179 + ew_flag=0: 0~179° 朝东;ew_flag=1: 180~359° 朝西(raw=angle-180) + """ + angle_deg %= 360 + if angle_deg < 180: + return (int(angle_deg), 0) + else: + return (int(angle_deg - 180), 1) + + +# ASTM F3411-22a 高度编码常量 +ALT_UNKNOWN = -1000 # 填入结构体后序列化为 int16(-1000),表示"未知" + +def encode_altitude(alt_m: Optional[float]) -> int: + """ + ASTM F3411-22a 高度编码 -> int16。 + 公式: raw = (alt_m + 1000) * 2 + 解码: alt_m = raw / 2 - 1000 + 范围: -1000m (raw=0) ~ +31767m (raw=0xFFFE); + raw=0xFFFF 即 int16(-1) 表示"未知"。 + 传 None 时返回 ALT_UNKNOWN。 + """ + if alt_m is None: + return ALT_UNKNOWN + raw = int((alt_m + 1000.0) * 2) + raw = max(0, min(0xFFFE, raw)) + # 转为 signed int16 用于 struct ' 32767: + raw = raw - 65536 + return raw + + +def seconds_since_2019() -> int: + """计算从 2019-01-01 00:00:00 UTC 至今的秒数(系统报文时间戳用)""" + import calendar + epoch_2019 = calendar.timegm((2019, 1, 1, 0, 0, 0, 0, 0, 0)) + return int(time.time()) - epoch_2019 + + +def current_hour_tenths() -> int: + """当前小时内已过去的 1/10 秒数(位置向量报文时间戳用)""" + t = time.time() + secs_in_hour = t % 3600 + return int(secs_in_hour * 10) % 36000 + + +# ────────────────────────────────────────────────────────── +# 5. RID 仿真主类 +# ────────────────────────────────────────────────────────── + +class RIDSimulator: + """RID 广播仿真器,周期更新位置信息并通过BLE广播""" + + def __init__(self, + uasid: str = "TestDrone123", + description: str = "RID Python Simulator"): + # 基本ID报文(静态,启动一次) + self.basic = RIDBasicPacket( + ua_type=2, # 旋翼 + id_type=1, # 特定会话ID(字符串) + uasid=uasid + ) + # 运行描述报文(静态) + self.rd = RIDRDPacket( + description_type=0, + description=description + ) + # 系统报文(控制站信息,初始坐标待首次定位后更新) + self.sys = RIDSYSPacket( + cs_pos_type=1, # 控制站实时位置 + region_code=2, # 中国 + coord_type=0, # WGS84 + cs_latitude=0, # 由 update_control_station() 填入 + cs_longitude=0, + ua_run_category=1, # 开放类 + ua_run_level=0, # 微型 + ) + # 位置向量报文(动态更新) + self.pos_vec = RIDPosVecPacket() + + # 广播器 + self.advertiser = WindowsBLEAdvertiser() + self.nan_publisher = WindowsWiFiNANPublisher() + self._running = False + + def update_data(self, + lat_deg: float, + lon_deg: float, + alt_agl_m: float, + ground_speed_mps: float = 0.0, + track_angle_deg: float = 0.0, + vertical_speed_mps: float = 0.0, + pressure_alt_m: Optional[float] = None, + geometric_alt_m: Optional[float] = None): + """ + 更新 RID 数据(仅填充结构体,不触发广播)。 + 广播由 main loop 按策略驱动。 + """ + gs_raw, sm = encode_ground_speed(ground_speed_mps) + ta_raw, ew = encode_track_angle(track_angle_deg) + vs_raw = encode_vertical_speed(vertical_speed_mps) + ts_tenths = current_hour_tenths() + + self.pos_vec.operational_status = 2 # 空中飞行 + self.pos_vec.altitude_type = 0 # 气压高度 + self.pos_vec.speed_multiplier = sm + self.pos_vec.track_angle_ew = ew + self.pos_vec.track_angle = ta_raw + self.pos_vec.ground_speed = gs_raw + self.pos_vec.vertical_speed = vs_raw + self.pos_vec.latitude = int(lat_deg * 1e7) + self.pos_vec.longitude = int(lon_deg * 1e7) + # 高度按 ASTM F3411-22a 编码: raw = (alt_m + 1000) * 2 + self.pos_vec.altitude_agl = encode_altitude(alt_agl_m) + self.pos_vec.pressure_altitude = encode_altitude(pressure_alt_m) + self.pos_vec.geometric_altitude = encode_altitude(geometric_alt_m) + self.pos_vec.timestamp = ts_tenths + + self.sys.timestamp = seconds_since_2019() + + def serialize_subframes(self) -> list[bytes]: + """返回各子报文序列化后的 25 字节列表(用于 Legacy 轮播)""" + return [ + rid_basic_serialize(self.basic), + rid_posvec_serialize(self.pos_vec), + rid_rd_serialize(self.rd), + rid_sys_serialize(self.sys), + ] + + def serialize_full(self) -> bytes: + """返回完整 RID Payload(用于扩展广播)""" + return rid_packet_full(self.basic, self.pos_vec, self.rd, self.sys) + + def update_control_station(self, lat_deg: float, lon_deg: float, alt_m: float = 0.0): + """同步更新控制站位置(笔记本 = 操控手位置)""" + self.sys.cs_latitude = int(lat_deg * 1e7) + self.sys.cs_longitude = int(lon_deg * 1e7) + self.sys.cs_altitude = encode_altitude(alt_m) + + def stop(self): + """停止广播""" + self._running = False + self.advertiser.stop() + self.nan_publisher.stop() + + +# ────────────────────────────────────────────────────────── +# 6. 调试:打印报文内容 +# ────────────────────────────────────────────────────────── + +def print_packet_hex(payload: bytes, label: str = "RID Payload"): + """打印报文十六进制内容""" + print(f"\n[{label}] 长度={len(payload)} 字节") + for i in range(0, len(payload), 16): + chunk = payload[i:i+16] + hex_str = ' '.join(f'{b:02X}' for b in chunk) + asc_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk) + print(f" {i:04X}: {hex_str:<48} {asc_str}") + + +def print_ad_data(ad_data: bytes, label: str = "BLE AD Data"): + """打印 BLE AD Data 十六进制内容""" + print_packet_hex(ad_data, label) + + +# ────────────────────────────────────────────────────────── +# 7. 主程序(异步,使用本机真实位置和系统时间) +# ────────────────────────────────────────────────────────── + +_SUBFRAME_NAMES = ["BasicID", "PosVec", "RD", "SYS"] + + +async def main_async(enable_ble: bool = True, enable_nan: bool = True): + print("=" * 60) + print(" RID BLE/NAN 仿真程序 (Windows 11 / ASTM F3411-22a)") + print(" 位置: Windows 定位服务 | 时间: UTC 系统时钟") + print(" 模式: 圆周飞行 50km/h | 广播 2Hz") + print("=" * 60) + + # 检查依赖 + if not _check_winsdk(): + print("\n[错误] 缺少依赖库 winsdk,请运行:") + print(" pip install winsdk") + sys.exit(1) + + if sys.platform != 'win32': + print("[错误] 此程序仅支持 Windows,当前系统:", sys.platform) + sys.exit(1) + + # ── 初始化定位 ────────────────────────────────────────── + location = WindowsLocationProvider() + print("\n[位置] 正在请求位置权限...") + granted = await location.request_access_async() + + init_lat = WindowsLocationProvider.FALLBACK_LAT + init_lon = WindowsLocationProvider.FALLBACK_LON + init_alt = 0.0 + init_acc = 9999.0 + + if granted: + print("[位置] 正在获取初始坐标(首次定位可能需要数秒)...") + pos = await location.get_position_async() + if pos: + init_lat, init_lon, init_alt, init_acc = pos + print(f"[位置] 初始位置: {init_lat:.6f}N, {init_lon:.6f}E " + f"高度={init_alt:.1f}m 精度=+/-{init_acc:.0f}m") + else: + print(f"[位置] 初始定位失败,使用默认坐标 ({init_lat:.4f}N, {init_lon:.4f}E)") + else: + print(f"[位置] 权限被拒绝,使用默认坐标 ({init_lat:.4f}N, {init_lon:.4f}E)") + + # ── 圆周飞行参数 ──────────────────────────────────────── + FLIGHT_SPEED_KMH = 50.0 # 飞行速度 km/h + FLIGHT_SPEED_MPS = FLIGHT_SPEED_KMH / 3.6 # 13.889 m/s + FLIGHT_RADIUS_M = 100.0 # 圆周半径 100m + FLIGHT_ALT_AGL = 80.0 # 飞行高度 80m AGL + CIRCUMFERENCE = 2.0 * math.pi * FLIGHT_RADIUS_M # 628.3m + PERIOD = CIRCUMFERENCE / FLIGHT_SPEED_MPS # 一圈耗时 ~45.2s + OMEGA = 2.0 * math.pi / PERIOD # 角速度 rad/s + + # 圆心 = 初始定位坐标 + center_lat_rad = math.radians(init_lat) + center_lon_rad = math.radians(init_lon) + # 1度纬度 ≈ 111320m; 1度经度 ≈ 111320 * cos(lat) m + M_PER_DEG_LAT = 111320.0 + M_PER_DEG_LON = 111320.0 * math.cos(center_lat_rad) + + BROADCAST_INTERVAL = 0.5 # 2 Hz 广播 + + print(f"\n[圆周飞行]") + print(f" 圆心 : {init_lat:.6f}N, {init_lon:.6f}E") + print(f" 半径 : {FLIGHT_RADIUS_M:.0f}m") + print(f" 速度 : {FLIGHT_SPEED_KMH:.0f} km/h ({FLIGHT_SPEED_MPS:.1f} m/s)") + print(f" 高度(AGL) : {FLIGHT_ALT_AGL:.0f}m") + print(f" 一圈耗时 : {PERIOD:.1f}s") + print(f" 广播频率 : {1.0/BROADCAST_INTERVAL:.0f} Hz") + + # ── 创建仿真器 ────────────────────────────────────────── + sim = RIDSimulator( + uasid="TestDrone123", + description="RID Python Sim", + ) + sim.update_control_station(init_lat, init_lon, init_alt) + + # ── 探测广播模式 ───────────────────────────────────────── + sim.update_data(lat_deg=init_lat, lon_deg=init_lon, alt_agl_m=FLIGHT_ALT_AGL, + ground_speed_mps=FLIGHT_SPEED_MPS) + adv = sim.advertiser + + print("\n[通信层支持探测]") + + use_ext = False + if enable_ble: + print(" 1) BLE 5.0 扩展广播...") + use_ext = adv.try_start_extended(sim.serialize_full()) + if not use_ext: + print(" -> 不支持, 降级使用 Legacy 子帧轮播模式") + else: + print(" -> 支持并已启动") + else: + print(" 1) BLE 广播 ... [已禁用]") + + nan_ok = False + if enable_nan: + print(" 2) Wi-Fi Aware (NAN) 广播...") + nan_ok = await sim.nan_publisher.start_async(sim.serialize_full()) + if not nan_ok: + print(" -> 不支持 (原因: 硬件网卡未开放支持或缺少权限)") + else: + print(" -> 支持并已启动") + else: + print(" 2) Wi-Fi Aware (NAN) 广播 ... [已禁用]") + + print("\n[配置]") + print(f" UASID : {sim.basic.uasid}") + print(f" UA类型 : {sim.basic.ua_type} (旋翼)") + id_type_name = { + 0: "无", + 1: "序列号", + 2: "CAA注册", + 3: "UTM委派", + 4: "特定会话ID(字符串)", + }.get(sim.basic.id_type, "未知") + print(f" ID类型 : {sim.basic.id_type} ({id_type_name})") + print(f" 描述 : {sim.rd.description}") + print(f" 运行类别 : {sim.sys.ua_run_category} (开放)") + print(f" UA等级 : {sim.sys.ua_run_level} (微型)") + + ble_mode_str = "未启用" + if enable_ble: + ble_mode_str = "5.0 扩展" if use_ext else "Legacy" + print(f" 广播模式 : BLE ({ble_mode_str}), Wi-Fi NAN ({'开' if nan_ok else '关'})") + + print(f" 时间来源 : UTC 系统时钟") + print(f" 位置来源 : {'圆周飞行仿真 (圆心=本机定位)' if granted else '圆周飞行仿真 (圆心=默认坐标)'}") + print(f"\n[提示] 正在运行中... 按 Ctrl+C 停止广播") + + # Legacy 模式参数 + subframe_idx = 0 + n = 0 + t_start = time.monotonic() + + try: + while True: + # ── 计算圆周飞行位置 ─────────────────────────── + t_elapsed = time.monotonic() - t_start + theta = OMEGA * t_elapsed # 当前角度 (rad),从正东方向逆时针 + + # 无人机在圆上的位置偏移 (m) + dx = FLIGHT_RADIUS_M * math.cos(theta) # 东向偏移 + dy = FLIGHT_RADIUS_M * math.sin(theta) # 北向偏移 + + # 转换为经纬度 + lat = init_lat + dy / M_PER_DEG_LAT + lon = init_lon + dx / M_PER_DEG_LON + + # 航迹角 = 速度方向 = 圆切线方向 + # 位置 (cos θ, sin θ), 速度方向 (-sin θ, cos θ) + # 航迹角以正北为0°顺时针: heading = 90° - atan2(cos θ, -sin θ) + vx = -FLIGHT_SPEED_MPS * math.sin(theta) # 东向速度分量 + vy = FLIGHT_SPEED_MPS * math.cos(theta) # 北向速度分量 + heading_deg = (math.degrees(math.atan2(vx, vy))) % 360 # atan2(E,N) → 北基准顺时针 + + # 更新 RID 数据 + sim.update_data( + lat_deg=lat, lon_deg=lon, + alt_agl_m=FLIGHT_ALT_AGL, + ground_speed_mps=FLIGHT_SPEED_MPS, + track_angle_deg=heading_deg, + vertical_speed_mps=0.0, + ) + + # ── 日志输出 ────────────────────────────────── + utc_str = time.strftime("%H:%M:%S", time.gmtime()) + ts_tenths = current_hour_tenths() + + mode_tag = "" + if enable_ble: + mode_tag += "BLE:EXT " if use_ext else f"BLE:L({_SUBFRAME_NAMES[subframe_idx % 4]}) " + if nan_ok: + mode_tag += "NAN:ON" + mode_tag = mode_tag.strip() or "NONE" + + print(f"[{n:4d}] {utc_str} UTC | " + f"{lat:.6f}N {lon:.6f}E " + f"Alt={FLIGHT_ALT_AGL:+.1f}m " + f"GS={FLIGHT_SPEED_KMH:.0f}km/h " + f"HDG={heading_deg:.0f}° " + f"[{mode_tag}] ts={ts_tenths}") + + if n == 0: + payload = sim.serialize_full() + print_packet_hex(payload, "RID Payload (第1帧详情)") + print_packet_hex(build_ble_ad_data(payload), "BLE AD Data (供参考)") + print_packet_hex(build_wifi_beacon_vendor_ie(payload), "Wi-Fi Beacon Vendor IE (供参考)") + n += 1 + + # ── 广播更新 ───────────────────────────────── + payload_full = sim.serialize_full() + + if enable_ble: + if use_ext: + adv.update_extended(payload_full) + else: + subframes = sim.serialize_subframes() + sf = subframes[subframe_idx % len(subframes)] + adv.start_legacy(sf) + subframe_idx = (subframe_idx + 1) % len(subframes) + + # 更新 Wi-Fi NAN + if enable_nan and sim.nan_publisher.is_running: + await sim.nan_publisher.update_payload_async(payload_full) + + await asyncio.sleep(BROADCAST_INTERVAL) + + except KeyboardInterrupt: + print("\n[用户中断] 正在停止...") + finally: + adv.stop() + print("[完成] RID 仿真已终止") + + +def main(): + parser = argparse.ArgumentParser(description="RID BLE/NAN 仿真程序 - Windows 11") + parser.add_argument("--disable-ble", action="store_true", help="禁用 BLE (蓝牙) 广播") + parser.add_argument("--disable-nan", action="store_true", help="禁用 Wi-Fi Aware (NAN) 广播") + args = parser.parse_args() + + asyncio.run(main_async(enable_ble=not args.disable_ble, enable_nan=not args.disable_nan)) + + +if __name__ == "__main__": + main()