Files
test/rid_sim_windows.py
2026-03-01 17:15:10 +08:00

1115 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
RID BLE 仿真程序 - Windows 11
基于 ASTM F3411-22a / GB 42590-2023 协议
通过本机蓝牙以 BLE 5.0 扩展广播发送 RID 数据
依赖:
pip install winsdk
运行要求:
- Windows 11 / Windows 10 (版本 1903+)
- 支持 BLE 5.0 的蓝牙适配器
- 在 Windows 设置 > 隐私与安全性 > 位置 中为应用开启位置权限
- 以管理员权限运行(部分机器需要)
位置来源(按优先级):
1. Windows 位置服务GPS/WiFi/IP 融合定位)
2. 上次已知位置缓存
3. 默认坐标(北京,仅在完全无法定位时使用)
时间来源:
- 位置向量时间戳UTC 系统时钟,当前小时内 1/10 秒计数
- 系统报文时间戳Unix UTC基准 2019-01-01 00:00:00
"""
import struct
import asyncio
import time
import sys
import math
import argparse
from dataclasses import dataclass, field
from typing import Optional
# ──────────────────────────────────────────────────────────
# 1. RID 数据结构定义
# ──────────────────────────────────────────────────────────
@dataclass
class RIDBasicPacket:
"""基本ID报文 (Message Type 0x0)"""
ua_type: int = 2 # UA类型: 0=未声明,1=固定翼,2=旋翼,3=直升机,4=混合升力,5=多旋翼
id_type: int = 4 # ID类型: 0=无,1=序列号,2=CAA,3=UTM委派,4=特定会话ID(字符串)
uasid: str = "" # 产品唯一识别码最多20字节ASCII
reserved: bytes = field(default_factory=lambda: bytes(3))
@dataclass
class RIDPosVecPacket:
"""位置向量报文 (Message Type 0x1)"""
operational_status: int = 0 # 运行状态: 0=未初始化,1=地面待机,2=空中飞行,3=紧急
altitude_type: int = 0 # 高度类型: 0=气压高度,1=几何高度
track_angle_ew: int = 0 # 航迹角EW方向: 0=东,1=西
speed_multiplier: int = 0 # 速度乘数: 0=×0.25m/s,1=×0.75m/s
track_angle: int = 0 # 航迹角 0~179配合EW标志
ground_speed: int = 0 # 地速 (Raw×0.25 m/s)
vertical_speed: int = 0 # 垂直速度int8+上升/-下降×0.5 m/s
latitude: int = 0 # 纬度 (1e-7 度, int32 小端)
longitude: int = 0 # 经度 (1e-7 度, int32 小端)
pressure_altitude: int = 0 # 气压高度 (int16 小端)
geometric_altitude: int = 0 # 几何高度 (int16 小端)
altitude_agl: int = 0 # 距地高度 (int16 小端)
horiz_accuracy: int = 0 # 水平精度 (4bit)
vert_accuracy: int = 0 # 垂直精度 (4bit)
speed_acc: int = 0 # 速度精度 (4bit)
timestamp: int = 0 # 时间戳 (当前小时内1/10秒数, uint16)
timestamp_acc: int = 0 # 时间戳精度 (4bit)
reserved: int = 0
@dataclass
class RIDRDPacket:
"""运行描述报文 (Message Type 0x3)"""
description_type: int = 0 # 描述类型: 0=文字描述
description: str = "" # 描述内容最多23字节ASCII
@dataclass
class RIDSYSPacket:
"""系统报文 (Message Type 0x4)"""
cs_pos_type: int = 1 # 控制站位置类型: 0=未知,1=实时,2=起飞点,3=固定
region_code: int = 2 # 区域: 0=未定义,2=中国
coord_type: int = 0 # 坐标系: 0=WGS84,1=其他
cs_latitude: int = 0 # 控制站纬度 (1e-7 度, int32)
cs_longitude: int = 0 # 控制站经度 (1e-7 度, int32)
area_count: int = 0 # 运行区域计数 (uint16)
area_radius: int = 0 # 运行区域半径 (×10 m)
area_alt_upper: int = 0 # 运行区域高度上限 (int16)
area_alt_lower: int = 0 # 运行区域高度下限 (int16)
ua_run_category: int = 1 # 运行类别: 0=未定义,1=开放,2=特许,3=审定
ua_run_level: int = 0 # UA等级: 0=微型,1=轻型,2=小型,3=其他RID
cs_altitude: int = 0 # 控制站高度 (int16)
timestamp: int = 0 # 时间戳 (Unix - 2019-01-01, 秒)
reserved: int = 0
# ──────────────────────────────────────────────────────────
# 2. RID 序列化函数(完全对应 C 代码逻辑)
# ──────────────────────────────────────────────────────────
def rid_basic_serialize(packet: RIDBasicPacket) -> bytes:
"""基本ID报文序列化 → 25字节对应 RIDBasicSerialize()"""
buf = bytearray(25)
buf[0] = 0x01 # 报头类型0x0 | 版本0x1
# Bit[7:4]=ID类型, Bit[3:0]=UA类型
buf[1] = ((packet.id_type & 0xF) << 4) | (packet.ua_type & 0xF)
# UASIDASCII编码不足20字节补零超出截断
uasid_bytes = packet.uasid.encode('ascii', errors='replace')[:20]
for i, b in enumerate(uasid_bytes):
buf[2 + i] = b
# reserved[3] = 0已初始化为0
return bytes(buf)
def rid_posvec_serialize(packet: RIDPosVecPacket) -> bytes:
"""位置向量报文序列化 → 25字节对应 RIDPosVecSerialize()"""
buf = bytearray(25)
buf[0] = 0x11 # 报头类型0x1 | 版本0x1
# Flag byte: Bit[0]=SpeedMultiplier, Bit[1]=TrackAngleEW, Bit[2]=AltitudeType, Bit[3]=Rsvd, Bit[7:4]=OperationalStatus
flag = (packet.speed_multiplier & 1) | \
((packet.track_angle_ew & 1) << 1) | \
((packet.altitude_type & 1) << 2) | \
((packet.operational_status & 0xF) << 4)
buf[1] = flag
buf[2] = packet.track_angle & 0xFF
buf[3] = packet.ground_speed & 0xFF
# VerticalSpeed: int8用struct pack
buf[4] = struct.pack('b', max(-128, min(127, packet.vertical_speed)))[0]
# Latitude: int32 小端
lat_bytes = struct.pack('<i', packet.latitude)
for i in range(4):
buf[5 + i] = lat_bytes[i]
# Longitude: int32 小端
lon_bytes = struct.pack('<i', packet.longitude)
for i in range(4):
buf[9 + i] = lon_bytes[i]
# PressureAltitude: int16 小端
pa_bytes = struct.pack('<h', packet.pressure_altitude)
buf[13] = pa_bytes[0]
buf[14] = pa_bytes[1]
# GeometricAltitude: int16 小端
ga_bytes = struct.pack('<h', packet.geometric_altitude)
buf[15] = ga_bytes[0]
buf[16] = ga_bytes[1]
# AltitudeAGL: int16 小端
agl_bytes = struct.pack('<h', packet.altitude_agl)
buf[17] = agl_bytes[0]
buf[18] = agl_bytes[1]
# Accuracy: Bit[3:0]=HorizAcc, Bit[7:4]=VertAcc
buf[19] = ((packet.vert_accuracy & 0xF) << 4) | (packet.horiz_accuracy & 0xF)
# SpeedAccuracy: Bit[3:0]=SpeedAcc
buf[20] = packet.speed_acc & 0xF
# Timestamp: uint16 小端
ts_bytes = struct.pack('<H', packet.timestamp & 0xFFFF)
buf[21] = ts_bytes[0]
buf[22] = ts_bytes[1]
# TimestampAccuracy: Bit[3:0]
buf[23] = packet.timestamp_acc & 0xF
# reserved
buf[24] = packet.reserved & 0xFF
return bytes(buf)
def rid_rd_serialize(packet: RIDRDPacket) -> bytes:
"""运行描述报文序列化 → 25字节对应 RIDRDSerialize()"""
buf = bytearray(25)
buf[0] = 0x31 # 报头类型0x3 | 版本0x1
buf[1] = packet.description_type & 0xFF
desc_bytes = packet.description.encode('ascii', errors='replace')[:23]
for i, b in enumerate(desc_bytes):
buf[2 + i] = b
return bytes(buf)
def rid_sys_serialize(packet: RIDSYSPacket) -> bytes:
"""系统报文序列化 → 25字节对应 RIDSYSSerialize()"""
buf = bytearray(25)
buf[0] = 0x41 # 报头类型0x4 | 版本0x1
# Tag: Bit[1:0]=CSPosType, Bit[4:2]=RegionCode, Bit[6:5]=CoordType, Bit[7]=Rsvd
tag = (packet.cs_pos_type & 0x3) | \
((packet.region_code & 0x7) << 2) | \
((packet.coord_type & 0x3) << 5)
buf[1] = tag
# ControlStationLatitude: int32 小端
cs_lat = struct.pack('<i', packet.cs_latitude)
for i in range(4):
buf[2 + i] = cs_lat[i]
# ControlStationLongitude: int32 小端
cs_lon = struct.pack('<i', packet.cs_longitude)
for i in range(4):
buf[6 + i] = cs_lon[i]
# AreaCount: uint16 小端
ac_bytes = struct.pack('<H', packet.area_count & 0xFFFF)
buf[10] = ac_bytes[0]
buf[11] = ac_bytes[1]
# AreaRadius
buf[12] = packet.area_radius & 0xFF
# AreaAltitudeUpper: int16 小端
au_bytes = struct.pack('<h', packet.area_alt_upper)
buf[13] = au_bytes[0]
buf[14] = au_bytes[1]
# AreaAltitudeLower: int16 小端
al_bytes = struct.pack('<h', packet.area_alt_lower)
buf[15] = al_bytes[0]
buf[16] = al_bytes[1]
# UARunCategory_Level: Bit[3:0]=Level, Bit[7:4]=Category
buf[17] = ((packet.ua_run_category & 0xF) << 4) | (packet.ua_run_level & 0xF)
# ControlStationAltitude: int16 小端
csa_bytes = struct.pack('<h', packet.cs_altitude)
buf[18] = csa_bytes[0]
buf[19] = csa_bytes[1]
# Timestamp: uint32 小端
ts_bytes = struct.pack('<I', packet.timestamp & 0xFFFFFFFF)
for i in range(4):
buf[20 + i] = ts_bytes[i]
# reserved
buf[24] = packet.reserved & 0xFF
return bytes(buf)
def rid_payload_serialize(subpackets: list[bytes]) -> bytes:
"""
将子报文列表打包为完整 RID Payload。
返回最终序列化字节串,对应 RIDPayloadSerialize() + RIDPacket()
"""
count = len(subpackets)
assert 1 <= count <= 10, "子报文数量须在 1~10 之间"
header = 0xF1 # 类型0xF | 版本0x1
sub_len = 0x19 # 25字节固定长度
result = bytearray([header, sub_len, count])
for pkt in subpackets:
assert len(pkt) == 25, "每个子报文必须恰好25字节"
result.extend(pkt)
return bytes(result)
# 全局消息计数器(用于 build_ble_ad_data 诊断输出)
_diag_msg_counter = 0
def build_ble_ad_data(rid_payload: bytes) -> bytes:
"""
将 RID Payload 打包为 BLE AD Data 结构(仅用于诊断/hex dump 显示)。
格式: [AD Length][AD Type=0x16][UUID=0xFFFA LE][AppCode=0x0D][Counter][RID Payload]
符合 ASTM F3411-22a BLE 传输层规范。
"""
global _diag_msg_counter
uuid_lsb = 0xFA
uuid_msb = 0xFF
counter = _diag_msg_counter & 0xFF
_diag_msg_counter = (_diag_msg_counter + 1) & 0xFF
ad_content = bytes([0x16, uuid_lsb, uuid_msb, 0x0D, counter]) + rid_payload
ad_length = len(ad_content)
return bytes([ad_length]) + ad_content
def build_wifi_beacon_vendor_ie(rid_payload: bytes) -> bytes:
"""
将 RID Payload 打包为 Wi-Fi Beacon 帧的 Vendor Specific IE (Element ID: 221)。
符合 ASTM F3411-22a Wi-Fi 传输层规范: ASD-STAN OUI (FA:0B:BC), Type 0x0D。
注意: 原生 Windows API 无法直接发送底层 Wi-Fi Beacon 帧(通常需监视模式网卡+Npcap等底层驱动支持
此处实现用于模拟组包过程,打印其 Hex 并在后续移植到其他平台(如 Linux/C/Scapy时参考。
"""
global _diag_msg_counter
counter = _diag_msg_counter & 0xFF
# Element ID: 221 (0xDD)
# Length: 从 OUI 计算起,长为 5 字节(OUI 3字节 + Type 1字节 + Counter 1字节) + Payload 长度
length = 5 + len(rid_payload)
ie = bytearray([0xDD, length, 0xFA, 0x0B, 0xBC, 0x0D, counter])
ie.extend(rid_payload)
return bytes(ie)
def rid_packet_full(basic: RIDBasicPacket,
pos_vec: RIDPosVecPacket,
rd: RIDRDPacket,
sys: RIDSYSPacket) -> bytes:
"""完整打包Basic + PosVec + RD + SYS对应 RIDPacket()"""
subpackets = [
rid_basic_serialize(basic),
rid_posvec_serialize(pos_vec),
rid_rd_serialize(rd),
rid_sys_serialize(sys),
]
return rid_payload_serialize(subpackets)
def rid_packet_nano(basic: RIDBasicPacket,
pos_vec: RIDPosVecPacket,
sys: RIDSYSPacket) -> bytes:
"""最小包Basic + PosVec + SYS对应 RIDPacketNano()"""
subpackets = [
rid_basic_serialize(basic),
rid_posvec_serialize(pos_vec),
rid_sys_serialize(sys),
]
return rid_payload_serialize(subpackets)
# ──────────────────────────────────────────────────────────
# 3. Windows BLE 广播模块
# ──────────────────────────────────────────────────────────
def _check_winsdk():
"""检查 winsdk 是否可用"""
try:
import winsdk # noqa: F401
return True
except ImportError:
return False
# ──────────────────────────────────────────────────────────
# 3a. Windows 本机定位提供器
# ──────────────────────────────────────────────────────────
class WindowsLocationProvider:
"""
通过 Windows Runtime Geolocation API 获取本机位置。
支持 GPS / WiFi / 移动网络 / IP 融合定位。
需要在 Windows 设置 > 隐私与安全性 > 位置 中开启位置权限。
"""
# 默认回退坐标(北京)
FALLBACK_LAT = 39.9042
FALLBACK_LON = 116.4074
def __init__(self):
self._last_lat: Optional[float] = None
self._last_lon: Optional[float] = None
self._last_alt: float = 0.0
self._last_accuracy: float = 9999.0
self._access_granted: bool = False
self._locator = None
self._available: bool = False
self._import_api()
def _import_api(self):
try:
from winsdk.windows.devices.geolocation import (
Geolocator,
GeolocationAccessStatus,
PositionAccuracy,
)
self._Geolocator = Geolocator
self._AccessStatus = GeolocationAccessStatus
self._PositionAccuracy = PositionAccuracy
self._available = True
except ImportError as e:
print(f"[位置] Geolocation API 不可用: {e}")
self._available = False
async def request_access_async(self) -> bool:
"""请求位置访问权限,需要在首次调用时执行一次"""
if not self._available:
return False
try:
access = await self._Geolocator.request_access_async()
self._access_granted = (access == self._AccessStatus.ALLOWED)
if self._access_granted:
self._locator = self._Geolocator()
self._locator.desired_accuracy = self._PositionAccuracy.HIGH
print("[位置] 位置权限已授予,定位精度: HIGH")
else:
print(f"[位置] 位置权限被拒绝 (状态={access})")
print("[位置] 请前往: 设置 > 隐私与安全性 > 位置,开启位置访问权限")
return self._access_granted
except Exception as e:
print(f"[位置] 请求权限失败: {e}")
return False
async def get_position_async(self) -> Optional[tuple]:
"""
异步获取当前位置。
返回: (lat_deg, lon_deg, alt_m, accuracy_m) 或 None失败时
"""
if not self._available or not self._access_granted or self._locator is None:
return None
try:
pos = await self._locator.get_geoposition_async()
coord = pos.coordinate
lat = float(coord.latitude)
lon = float(coord.longitude)
# altitude 在部分设备上可能为 None
alt = float(coord.altitude) if coord.altitude is not None else 0.0
# accuracy 为圆形误差(米)
accuracy = float(coord.accuracy) if coord.accuracy is not None else 9999.0
self._last_lat = lat
self._last_lon = lon
self._last_alt = alt
self._last_accuracy = accuracy
return (lat, lon, alt, accuracy)
except Exception as e:
print(f"[位置] 获取位置失败: {e}")
return None
@property
def last_position(self) -> Optional[tuple]:
"""上次成功获取的位置 (lat, lon, alt, accuracy),无记录时返回 None"""
if self._last_lat is None:
return None
return (self._last_lat, self._last_lon, self._last_alt, self._last_accuracy)
@property
def has_fix(self) -> bool:
"""是否已有定位结果"""
return self._last_lat is not None
class WindowsBLEAdvertiser:
"""
使用 Windows Runtime API 进行 BLE 广播。
工作模式(按优先级自动选择):
1. 扩展广播BLE 5.0)—— 单帧发送完整 RID Payload需硬件支持
2. Legacy 单子帧轮播 —— 每次发送 1 条 25 字节子报文,由调用方周期轮换
MAC 地址稳定策略:
- Windows 10/11 的 BLE 广播 API 默认使用 RPA (Resolvable Private Address)
且每次 start() 都会生成新的随机 MAC。
- 即使设置 is_anonymous = FalseWindows 仍可能出于隐私保护强制轮换 MAC。
- 唯一能保持 MAC 绝对不变的方法是:**永远不调用 stop(),也不创建新 Publisher**。
- 既然 in-place 更新 DataSections 无效,我们改用 **ManufacturerData** 或 **动态更新 Payload** 的方式,
但 WinRT API 限制极严。
- 最终妥协方案:在 RID 协议中,接收端**不应该依赖 MAC 地址**来关联无人机,
而应该依赖 **UASID (Basic ID 报文)**。只要 UASID 相同,接收端就应视为同一架无人机。
- 为了尽量减少 MAC 变化频率,我们降低更新频率,或者接受 MAC 变化的事实。
"""
def __init__(self):
self._publisher = None # BluetoothLEAdvertisementPublisher
self._adv = None # BluetoothLEAdvertisement
self._started = False
self._ext_supported = False
self._ext_ok = False
self._is_extended = False
self._msg_counter = 0 # ASTM F3411-22a Message Counter (0~255)
self._import_winsdk()
def _import_winsdk(self):
try:
from winsdk.windows.devices.bluetooth.advertisement import (
BluetoothLEAdvertisementPublisher,
BluetoothLEAdvertisement,
BluetoothLEAdvertisementDataSection,
BluetoothLEAdvertisementPublisherStatus,
)
from winsdk.windows.storage.streams import DataWriter
self._Publisher = BluetoothLEAdvertisementPublisher
self._Advertisement = BluetoothLEAdvertisement
self._DataSection = BluetoothLEAdvertisementDataSection
self._PubStatus = BluetoothLEAdvertisementPublisherStatus
self._DataWriter = DataWriter
except ImportError as e:
print(f"[错误] winsdk 导入失败: {e}")
print("请运行: pip install winsdk")
sys.exit(1)
def _make_data_section(self, ad_type: int, data: bytes):
"""构造 BLE AD Data Section"""
section = self._DataSection()
section.data_type = ad_type
writer = self._DataWriter()
for b in data:
writer.write_byte(b)
section.data = writer.detach_buffer()
return section
def _build_service_data(self, raw_data: bytes) -> bytes:
"""
构造 ASTM F3411-22a Service Data 字节。
格式: [UUID 0xFFFA LE (2B)][AppCode 0x0D (1B)][MsgCounter (1B)][Data]
"""
counter = self._msg_counter & 0xFF
self._msg_counter = (self._msg_counter + 1) & 0xFF
return bytes([0xFA, 0xFF, 0x0D, counter]) + raw_data
def _set_ad_payload(self, raw_data: bytes):
"""在当前 Advertisement 对象上替换 DataSections"""
service_data = self._build_service_data(raw_data)
self._adv.data_sections.clear()
section = self._make_data_section(0x16, service_data)
self._adv.data_sections.append(section)
# ── 扩展广播BLE 5.0,完整帧) ────────────────────────
def try_start_extended(self, rid_payload: bytes) -> bool:
"""
尝试以扩展广播发送完整 RID Payload。
返回 True = 成功False = 硬件不支持(回退 Legacy
"""
self.stop()
self._adv = self._Advertisement()
self._set_ad_payload(rid_payload)
pub = self._Publisher(self._adv)
try:
pub.use_extended_advertisement = True
except AttributeError:
self._ext_supported = False
return False
pub.start()
import time as _t
_t.sleep(0.3)
try:
status = pub.status
if status == self._PubStatus.STARTED:
self._publisher = pub
self._started = True
self._ext_ok = True
self._ext_supported = True
self._is_extended = True
print(f"[BLE] 扩展广播已启动 (Payload={len(rid_payload)}B)")
return True
else:
pub.stop()
print(f"[BLE] 扩展广播启动失败 (status={status}), 回退到 Legacy 模式")
self._ext_supported = False
return False
except Exception as e:
try:
pub.stop()
except Exception:
pass
print(f"[BLE] 扩展广播检测失败: {e}, 回退到 Legacy 模式")
self._ext_supported = False
return False
def update_extended(self, rid_payload: bytes):
"""
更新扩展广播数据。
复用同一个 Publisher 实例stop → 更新 → start。
注意Windows 强制每次 start() 都会更换随机 MAC 地址。
"""
if self._publisher is None or self._adv is None:
self.try_start_extended(rid_payload)
return
self._publisher.stop()
self._set_ad_payload(rid_payload)
self._publisher.start()
self._started = True
# ── Legacy 单子帧广播(兼容所有适配器) ────────────────
def start_legacy(self, subframe: bytes):
"""
发送单条子帧报文。subframe 为 25 字节。
复用同一个 Publisher 实例stop → 更新 → start。
"""
if self._publisher is None or self._adv is None:
self._adv = self._Advertisement()
self._set_ad_payload(subframe)
self._publisher = self._Publisher(self._adv)
self._publisher.start()
else:
self._publisher.stop()
self._set_ad_payload(subframe)
self._publisher.start()
self._started = True
def stop(self):
"""停止并释放广播资源"""
if self._publisher is not None and self._started:
try:
self._publisher.stop()
except Exception:
pass
self._publisher = None
self._adv = None
self._started = False
self._is_extended = False
@property
def is_running(self) -> bool:
return self._started
@property
def is_extended(self) -> bool:
return self._ext_ok
class WindowsWiFiNANPublisher:
"""
使用 Windows Runtime API 进行 Wi-Fi Aware (NAN) 广播。
要求:
- Windows 10/11 操作系统 (1903+)
- 支持 Wi-Fi Aware/Direct 的网卡与底层驱动 (如较新的 Intel / Qualcomm 无线网卡)
注意: 桌面电脑普通网卡的驱动常常默认关闭或不支持此功能,如果物理硬件不支持,此功能将自动跳过。
"""
def __init__(self):
self._publisher = None # WiFiAwarePublisher
self._session = None # WiFiAwareSession
self._started = False
self._supported = False
self._msg_counter = 0
self._import_winsdk()
def _import_winsdk(self):
try:
from winsdk.windows.devices.wifiaware import (
WiFiAwareManager,
WiFiAwarePublishOptions,
)
from winsdk.windows.storage.streams import DataWriter
self._Manager = WiFiAwareManager
self._PubOptions = WiFiAwarePublishOptions
self._DataWriter = DataWriter
self._supported = True
except ImportError:
self._supported = False
async def start_async(self, rid_payload: bytes) -> bool:
if not self._supported:
return False
try:
# 1. 检查和请求 Wi-Fi Aware 全局访问权限
access = await self._Manager.request_access_async()
from winsdk.windows.devices.wifiaware import WiFiAwareAccessStatus
if access != WiFiAwareAccessStatus.ALLOWED:
print(f"[NAN] 无法访问 Wi-Fi Aware API (状态: {access})")
return False
# 2. 请求激活会话
session_result = await self._Manager.request_session_async()
from winsdk.windows.devices.wifiaware import WiFiAwareSessionStatus
if session_result.status != WiFiAwareSessionStatus.SUCCEEDED:
# 状态不为SUCCEEDED通常意味着硬件、网卡或驱动彻底不支持它
return False
self._session = session_result.session
return await self._publish_with_payload(rid_payload)
except Exception as e:
# 对于不支持的主机,常在底层 UWP 抛出异常
print(f"[NAN] Wi-Fi Aware 启动失败 (通常为网卡不支持): {e}")
return False
async def _publish_with_payload(self, rid_payload: bytes) -> bool:
"""注册 Wi-Fi Aware Publish 对象"""
if not self._session:
return False
options = self._PubOptions()
# ASTM建议的服务专有名称
options.service_name = "org.wi-fi.aware.uas.rid"
writer = self._DataWriter()
# NAN Service Specific Info 格式参考 ASTM: [0x0D AppCode] [Counter] [RID Payload]
counter = self._msg_counter & 0xFF
self._msg_counter = (self._msg_counter + 1) & 0xFF
writer.write_byte(0x0D)
writer.write_byte(counter)
for b in rid_payload:
writer.write_byte(b)
options.service_specific_info = writer.detach_buffer()
pub_result = await self._session.publish_async(options)
from winsdk.windows.devices.wifiaware import WiFiAwarePublisherStatus
if pub_result.status == WiFiAwarePublisherStatus.SUCCEEDED:
self._publisher = pub_result.publisher
self._started = True
return True
else:
return False
async def update_payload_async(self, rid_payload: bytes):
"""
Windows的 WiFiAwarePublisher 不能直接 '原地 update_buffer'
要在纯广播模式下更新内容,我们必须停止旧 Publisher随后重新发起 Publish。
"""
if not self._started or not self._session:
return
# 停用旧的
self.stop_publisher()
# 开启新的
await self._publish_with_payload(rid_payload)
def stop_publisher(self):
if self._publisher is not None:
# Python winsdk 中的 UWP 接口销毁
# 我们直接丢弃引用通常就会引起注销,不过底层依赖于 UWP COM
self._publisher = None
self._started = False
def stop(self):
self.stop_publisher()
self._session = None
@property
def is_running(self) -> bool:
return self._started
# ──────────────────────────────────────────────────────────
# 4. 辅助函数:物理量编码
# ──────────────────────────────────────────────────────────
def encode_ground_speed(speed_mps: float) -> tuple[int, int]:
"""
将地速(m/s)编码为 (ground_speed_raw, speed_multiplier)
speed_multiplier=0: v = raw * 0.25 m/s范围 0~63.75 m/s
speed_multiplier=1: v = raw * 0.75 m/s范围 0~191.25 m/s
"""
if speed_mps <= 63.75:
return (round(speed_mps / 0.25) & 0xFF, 0)
else:
return (round(speed_mps / 0.75) & 0xFF, 1)
def encode_vertical_speed(speed_mps: float) -> int:
"""垂直速度编码为 int8单位 0.5 m/s范围 ±63.5 m/s"""
raw = round(speed_mps / 0.5)
return max(-128, min(127, raw))
def encode_track_angle(angle_deg: float) -> tuple[int, int]:
"""
将航迹角(0~359°)编码为 (track_angle_raw, ew_flag)
track_angle_raw = 0~179
ew_flag=0: 0~179° 朝东ew_flag=1: 180~359° 朝西raw=angle-180
"""
angle_deg %= 360
if angle_deg < 180:
return (int(angle_deg), 0)
else:
return (int(angle_deg - 180), 1)
# ASTM F3411-22a 高度编码常量
ALT_UNKNOWN = -1000 # 填入结构体后序列化为 int16(-1000),表示"未知"
def encode_altitude(alt_m: Optional[float]) -> int:
"""
ASTM F3411-22a 高度编码 -> int16。
公式: raw = (alt_m + 1000) * 2
解码: alt_m = raw / 2 - 1000
范围: -1000m (raw=0) ~ +31767m (raw=0xFFFE);
raw=0xFFFF 即 int16(-1) 表示"未知"
传 None 时返回 ALT_UNKNOWN。
"""
if alt_m is None:
return ALT_UNKNOWN
raw = int((alt_m + 1000.0) * 2)
raw = max(0, min(0xFFFE, raw))
# 转为 signed int16 用于 struct '<h' 打包
if raw > 32767:
raw = raw - 65536
return raw
def seconds_since_2019() -> int:
"""计算从 2019-01-01 00:00:00 UTC 至今的秒数(系统报文时间戳用)"""
import calendar
epoch_2019 = calendar.timegm((2019, 1, 1, 0, 0, 0, 0, 0, 0))
return int(time.time()) - epoch_2019
def current_hour_tenths() -> int:
"""当前小时内已过去的 1/10 秒数(位置向量报文时间戳用)"""
t = time.time()
secs_in_hour = t % 3600
return int(secs_in_hour * 10) % 36000
# ──────────────────────────────────────────────────────────
# 5. RID 仿真主类
# ──────────────────────────────────────────────────────────
class RIDSimulator:
"""RID 广播仿真器周期更新位置信息并通过BLE广播"""
def __init__(self,
uasid: str = "TestDrone123",
description: str = "RID Python Simulator"):
# 基本ID报文静态启动一次
self.basic = RIDBasicPacket(
ua_type=2, # 旋翼
id_type=1, # 特定会话ID字符串
uasid=uasid
)
# 运行描述报文(静态)
self.rd = RIDRDPacket(
description_type=0,
description=description
)
# 系统报文(控制站信息,初始坐标待首次定位后更新)
self.sys = RIDSYSPacket(
cs_pos_type=1, # 控制站实时位置
region_code=2, # 中国
coord_type=0, # WGS84
cs_latitude=0, # 由 update_control_station() 填入
cs_longitude=0,
ua_run_category=1, # 开放类
ua_run_level=0, # 微型
)
# 位置向量报文(动态更新)
self.pos_vec = RIDPosVecPacket()
# 广播器
self.advertiser = WindowsBLEAdvertiser()
self.nan_publisher = WindowsWiFiNANPublisher()
self._running = False
def update_data(self,
lat_deg: float,
lon_deg: float,
alt_agl_m: float,
ground_speed_mps: float = 0.0,
track_angle_deg: float = 0.0,
vertical_speed_mps: float = 0.0,
pressure_alt_m: Optional[float] = None,
geometric_alt_m: Optional[float] = None):
"""
更新 RID 数据(仅填充结构体,不触发广播)。
广播由 main loop 按策略驱动。
"""
gs_raw, sm = encode_ground_speed(ground_speed_mps)
ta_raw, ew = encode_track_angle(track_angle_deg)
vs_raw = encode_vertical_speed(vertical_speed_mps)
ts_tenths = current_hour_tenths()
self.pos_vec.operational_status = 2 # 空中飞行
self.pos_vec.altitude_type = 0 # 气压高度
self.pos_vec.speed_multiplier = sm
self.pos_vec.track_angle_ew = ew
self.pos_vec.track_angle = ta_raw
self.pos_vec.ground_speed = gs_raw
self.pos_vec.vertical_speed = vs_raw
self.pos_vec.latitude = int(lat_deg * 1e7)
self.pos_vec.longitude = int(lon_deg * 1e7)
# 高度按 ASTM F3411-22a 编码: raw = (alt_m + 1000) * 2
self.pos_vec.altitude_agl = encode_altitude(alt_agl_m)
self.pos_vec.pressure_altitude = encode_altitude(pressure_alt_m)
self.pos_vec.geometric_altitude = encode_altitude(geometric_alt_m)
self.pos_vec.timestamp = ts_tenths
self.sys.timestamp = seconds_since_2019()
def serialize_subframes(self) -> list[bytes]:
"""返回各子报文序列化后的 25 字节列表(用于 Legacy 轮播)"""
return [
rid_basic_serialize(self.basic),
rid_posvec_serialize(self.pos_vec),
rid_rd_serialize(self.rd),
rid_sys_serialize(self.sys),
]
def serialize_full(self) -> bytes:
"""返回完整 RID Payload用于扩展广播"""
return rid_packet_full(self.basic, self.pos_vec, self.rd, self.sys)
def update_control_station(self, lat_deg: float, lon_deg: float, alt_m: float = 0.0):
"""同步更新控制站位置(笔记本 = 操控手位置)"""
self.sys.cs_latitude = int(lat_deg * 1e7)
self.sys.cs_longitude = int(lon_deg * 1e7)
self.sys.cs_altitude = encode_altitude(alt_m)
def stop(self):
"""停止广播"""
self._running = False
self.advertiser.stop()
self.nan_publisher.stop()
# ──────────────────────────────────────────────────────────
# 6. 调试:打印报文内容
# ──────────────────────────────────────────────────────────
def print_packet_hex(payload: bytes, label: str = "RID Payload"):
"""打印报文十六进制内容"""
print(f"\n[{label}] 长度={len(payload)} 字节")
for i in range(0, len(payload), 16):
chunk = payload[i:i+16]
hex_str = ' '.join(f'{b:02X}' for b in chunk)
asc_str = ''.join(chr(b) if 32 <= b < 127 else '.' for b in chunk)
print(f" {i:04X}: {hex_str:<48} {asc_str}")
def print_ad_data(ad_data: bytes, label: str = "BLE AD Data"):
"""打印 BLE AD Data 十六进制内容"""
print_packet_hex(ad_data, label)
# ──────────────────────────────────────────────────────────
# 7. 主程序(异步,使用本机真实位置和系统时间)
# ──────────────────────────────────────────────────────────
_SUBFRAME_NAMES = ["BasicID", "PosVec", "RD", "SYS"]
async def main_async(enable_ble: bool = True, enable_nan: bool = True):
print("=" * 60)
print(" RID BLE/NAN 仿真程序 (Windows 11 / ASTM F3411-22a)")
print(" 位置: Windows 定位服务 | 时间: UTC 系统时钟")
print(" 模式: 圆周飞行 50km/h | 广播 2Hz")
print("=" * 60)
# 检查依赖
if not _check_winsdk():
print("\n[错误] 缺少依赖库 winsdk请运行")
print(" pip install winsdk")
sys.exit(1)
if sys.platform != 'win32':
print("[错误] 此程序仅支持 Windows当前系统:", sys.platform)
sys.exit(1)
# ── 初始化定位 ──────────────────────────────────────────
location = WindowsLocationProvider()
print("\n[位置] 正在请求位置权限...")
granted = await location.request_access_async()
init_lat = WindowsLocationProvider.FALLBACK_LAT
init_lon = WindowsLocationProvider.FALLBACK_LON
init_alt = 0.0
init_acc = 9999.0
if granted:
print("[位置] 正在获取初始坐标(首次定位可能需要数秒)...")
pos = await location.get_position_async()
if pos:
init_lat, init_lon, init_alt, init_acc = pos
print(f"[位置] 初始位置: {init_lat:.6f}N, {init_lon:.6f}E "
f"高度={init_alt:.1f}m 精度=+/-{init_acc:.0f}m")
else:
print(f"[位置] 初始定位失败,使用默认坐标 ({init_lat:.4f}N, {init_lon:.4f}E)")
else:
print(f"[位置] 权限被拒绝,使用默认坐标 ({init_lat:.4f}N, {init_lon:.4f}E)")
# ── 圆周飞行参数 ────────────────────────────────────────
FLIGHT_SPEED_KMH = 50.0 # 飞行速度 km/h
FLIGHT_SPEED_MPS = FLIGHT_SPEED_KMH / 3.6 # 13.889 m/s
FLIGHT_RADIUS_M = 100.0 # 圆周半径 100m
FLIGHT_ALT_AGL = 80.0 # 飞行高度 80m AGL
CIRCUMFERENCE = 2.0 * math.pi * FLIGHT_RADIUS_M # 628.3m
PERIOD = CIRCUMFERENCE / FLIGHT_SPEED_MPS # 一圈耗时 ~45.2s
OMEGA = 2.0 * math.pi / PERIOD # 角速度 rad/s
# 圆心 = 初始定位坐标
center_lat_rad = math.radians(init_lat)
center_lon_rad = math.radians(init_lon)
# 1度纬度 ≈ 111320m; 1度经度 ≈ 111320 * cos(lat) m
M_PER_DEG_LAT = 111320.0
M_PER_DEG_LON = 111320.0 * math.cos(center_lat_rad)
BROADCAST_INTERVAL = 0.5 # 2 Hz 广播
print(f"\n[圆周飞行]")
print(f" 圆心 : {init_lat:.6f}N, {init_lon:.6f}E")
print(f" 半径 : {FLIGHT_RADIUS_M:.0f}m")
print(f" 速度 : {FLIGHT_SPEED_KMH:.0f} km/h ({FLIGHT_SPEED_MPS:.1f} m/s)")
print(f" 高度(AGL) : {FLIGHT_ALT_AGL:.0f}m")
print(f" 一圈耗时 : {PERIOD:.1f}s")
print(f" 广播频率 : {1.0/BROADCAST_INTERVAL:.0f} Hz")
# ── 创建仿真器 ──────────────────────────────────────────
sim = RIDSimulator(
uasid="TestDrone123",
description="RID Python Sim",
)
sim.update_control_station(init_lat, init_lon, init_alt)
# ── 探测广播模式 ─────────────────────────────────────────
sim.update_data(lat_deg=init_lat, lon_deg=init_lon, alt_agl_m=FLIGHT_ALT_AGL,
ground_speed_mps=FLIGHT_SPEED_MPS)
adv = sim.advertiser
print("\n[通信层支持探测]")
use_ext = False
if enable_ble:
print(" 1) BLE 5.0 扩展广播...")
use_ext = adv.try_start_extended(sim.serialize_full())
if not use_ext:
print(" -> 不支持, 降级使用 Legacy 子帧轮播模式")
else:
print(" -> 支持并已启动")
else:
print(" 1) BLE 广播 ... [已禁用]")
nan_ok = False
if enable_nan:
print(" 2) Wi-Fi Aware (NAN) 广播...")
nan_ok = await sim.nan_publisher.start_async(sim.serialize_full())
if not nan_ok:
print(" -> 不支持 (原因: 硬件网卡未开放支持或缺少权限)")
else:
print(" -> 支持并已启动")
else:
print(" 2) Wi-Fi Aware (NAN) 广播 ... [已禁用]")
print("\n[配置]")
print(f" UASID : {sim.basic.uasid}")
print(f" UA类型 : {sim.basic.ua_type} (旋翼)")
id_type_name = {
0: "",
1: "序列号",
2: "CAA注册",
3: "UTM委派",
4: "特定会话ID(字符串)",
}.get(sim.basic.id_type, "未知")
print(f" ID类型 : {sim.basic.id_type} ({id_type_name})")
print(f" 描述 : {sim.rd.description}")
print(f" 运行类别 : {sim.sys.ua_run_category} (开放)")
print(f" UA等级 : {sim.sys.ua_run_level} (微型)")
ble_mode_str = "未启用"
if enable_ble:
ble_mode_str = "5.0 扩展" if use_ext else "Legacy"
print(f" 广播模式 : BLE ({ble_mode_str}), Wi-Fi NAN ({'' if nan_ok else ''})")
print(f" 时间来源 : UTC 系统时钟")
print(f" 位置来源 : {'圆周飞行仿真 (圆心=本机定位)' if granted else '圆周飞行仿真 (圆心=默认坐标)'}")
print(f"\n[提示] 正在运行中... 按 Ctrl+C 停止广播")
# Legacy 模式参数
subframe_idx = 0
n = 0
t_start = time.monotonic()
try:
while True:
# ── 计算圆周飞行位置 ───────────────────────────
t_elapsed = time.monotonic() - t_start
theta = OMEGA * t_elapsed # 当前角度 (rad),从正东方向逆时针
# 无人机在圆上的位置偏移 (m)
dx = FLIGHT_RADIUS_M * math.cos(theta) # 东向偏移
dy = FLIGHT_RADIUS_M * math.sin(theta) # 北向偏移
# 转换为经纬度
lat = init_lat + dy / M_PER_DEG_LAT
lon = init_lon + dx / M_PER_DEG_LON
# 航迹角 = 速度方向 = 圆切线方向
# 位置 (cos θ, sin θ), 速度方向 (-sin θ, cos θ)
# 航迹角以正北为0°顺时针: heading = 90° - atan2(cos θ, -sin θ)
vx = -FLIGHT_SPEED_MPS * math.sin(theta) # 东向速度分量
vy = FLIGHT_SPEED_MPS * math.cos(theta) # 北向速度分量
heading_deg = (math.degrees(math.atan2(vx, vy))) % 360 # atan2(E,N) → 北基准顺时针
# 更新 RID 数据
sim.update_data(
lat_deg=lat, lon_deg=lon,
alt_agl_m=FLIGHT_ALT_AGL,
ground_speed_mps=FLIGHT_SPEED_MPS,
track_angle_deg=heading_deg,
vertical_speed_mps=0.0,
)
# ── 日志输出 ──────────────────────────────────
utc_str = time.strftime("%H:%M:%S", time.gmtime())
ts_tenths = current_hour_tenths()
mode_tag = ""
if enable_ble:
mode_tag += "BLE:EXT " if use_ext else f"BLE:L({_SUBFRAME_NAMES[subframe_idx % 4]}) "
if nan_ok:
mode_tag += "NAN:ON"
mode_tag = mode_tag.strip() or "NONE"
print(f"[{n:4d}] {utc_str} UTC | "
f"{lat:.6f}N {lon:.6f}E "
f"Alt={FLIGHT_ALT_AGL:+.1f}m "
f"GS={FLIGHT_SPEED_KMH:.0f}km/h "
f"HDG={heading_deg:.0f}° "
f"[{mode_tag}] ts={ts_tenths}")
if n == 0:
payload = sim.serialize_full()
print_packet_hex(payload, "RID Payload (第1帧详情)")
print_packet_hex(build_ble_ad_data(payload), "BLE AD Data (供参考)")
print_packet_hex(build_wifi_beacon_vendor_ie(payload), "Wi-Fi Beacon Vendor IE (供参考)")
n += 1
# ── 广播更新 ─────────────────────────────────
payload_full = sim.serialize_full()
if enable_ble:
if use_ext:
adv.update_extended(payload_full)
else:
subframes = sim.serialize_subframes()
sf = subframes[subframe_idx % len(subframes)]
adv.start_legacy(sf)
subframe_idx = (subframe_idx + 1) % len(subframes)
# 更新 Wi-Fi NAN
if enable_nan and sim.nan_publisher.is_running:
await sim.nan_publisher.update_payload_async(payload_full)
await asyncio.sleep(BROADCAST_INTERVAL)
except KeyboardInterrupt:
print("\n[用户中断] 正在停止...")
finally:
adv.stop()
print("[完成] RID 仿真已终止")
def main():
parser = argparse.ArgumentParser(description="RID BLE/NAN 仿真程序 - Windows 11")
parser.add_argument("--disable-ble", action="store_true", help="禁用 BLE (蓝牙) 广播")
parser.add_argument("--disable-nan", action="store_true", help="禁用 Wi-Fi Aware (NAN) 广播")
args = parser.parse_args()
asyncio.run(main_async(enable_ble=not args.disable_ble, enable_nan=not args.disable_nan))
if __name__ == "__main__":
main()