You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

566 lines
23 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
Downward scan simulator that visualises the projected camera footprint
on top of a 2D map background. Camera, flight and scan parameters are
interactive so engineers can inspect coverage overlap in real time.
"""
from __future__ import annotations
import argparse
import ctypes
import math
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Optional, Tuple
import numpy as np
# -----------------------------------------------------------------------------
# Environment bootstrap (reuse logic from tests/python/1.py)
# -----------------------------------------------------------------------------
PROJECT_ROOT = Path(__file__).resolve().parents[2]
BIN_DIR = PROJECT_ROOT / "Bin"
if str(BIN_DIR) not in sys.path:
sys.path.insert(0, str(BIN_DIR))
LIB_GUIDE = BIN_DIR / "libGuideStitch.so"
if LIB_GUIDE.exists():
try:
ctypes.CDLL(str(LIB_GUIDE), mode=ctypes.RTLD_GLOBAL)
except OSError as exc:
print(f"[WARN] Failed to preload {LIB_GUIDE.name}: {exc}")
from UStitcher import API_UnderStitch, FrameInfo, UPanConfig
import cv2
# -----------------------------------------------------------------------------
# Utility dataclasses
# -----------------------------------------------------------------------------
@dataclass
class CameraProfile:
width: int = 1920
height: int = 1080
focus_mm: float = 40.0
pixel_size_um: float = 4
@dataclass
class FlightProfile:
start_lat: float = 39.0
start_lon: float = 120.0
altitude_m: float = 1000.0
speed_mps: float = 70.0
heading_deg: float = 90.0 # 0 -> east, 90 -> north
@dataclass
class ScannerProfile:
base_az_deg: float = 90.0
base_pitch_deg: float = -45.0
az_min_deg: float = 60.0
az_max_deg: float = 120.0
pitch_min_deg: float = -80.0
pitch_max_deg: float = -10.0
az_max_speed_degps: float = 25.0
az_max_acc_degps2: float = 60.0
pitch_max_speed_degps: float = 20.0
pitch_max_acc_degps2: float = 50.0
@dataclass
class MapView:
background: Optional[Path]
meters_per_pixel: float = 2.0
trail_length: int = 200
@dataclass
class SimulationOptions:
duration_s: float = 60.0
frame_rate: float = 25.0
overlay_alpha: float = 0.25
display_scale: float = 0.03 # 直接作用在全景尺度,控制画布大小
@dataclass
class AxisState:
angle: float
velocity: float
phase: float = 0.0
@dataclass
class AxisProfile:
low: float
high: float
mid: float
amplitude: float
omega: float
# -----------------------------------------------------------------------------
# Helper utilities
# -----------------------------------------------------------------------------
def clamp(value: float, low: float, high: float) -> float:
if low > high:
low, high = high, low
return max(low, min(high, value))
def make_aircraft_polygon(center: np.ndarray, heading_deg: float, size: float = 18.0) -> np.ndarray:
"""Construct a simple aircraft-shaped polygon oriented by heading."""
base = np.array(
[
[size, 0.0], # nose
[-0.4 * size, 0.35 * size],
[-0.1 * size, 0.0],
[-0.4 * size, -0.35 * size],
],
dtype=np.float32,
)
rad = math.radians(heading_deg)
rot = np.array(
[
[math.cos(rad), -math.sin(rad)],
[math.sin(rad), math.cos(rad)],
],
dtype=np.float32,
)
pts = base @ rot.T
pts[:, 1] *= -1 # screen Y axis points downward
pts += center.astype(np.float32)
return pts
# -----------------------------------------------------------------------------
# Map drawing helper
# -----------------------------------------------------------------------------
def _blank_canvas(height: int = 900, width: int = 900) -> np.ndarray:
canvas = np.full((height, width, 3), 75, dtype=np.uint8)
step = 100
for x in range(0, width, step):
cv2.line(canvas, (x, 0), (x, height - 1), (45, 45, 45), 1)
for y in range(0, height, step):
cv2.line(canvas, (0, y), (width - 1, y), (45, 45, 45), 1)
return canvas
class MapCanvas:
def __init__(self, view: MapView):
self.view = view
if view.background and view.background.exists():
image = cv2.imread(str(view.background), cv2.IMREAD_COLOR)
if image is None:
print(f"[WARN] Failed to load map '{view.background}', using blank canvas.")
image = _blank_canvas()
else:
image = _blank_canvas()
self.base = image
self.origin = np.array([image.shape[1] // 2, image.shape[0] // 2], dtype=np.float32)
self.history: List[np.ndarray] = []
def geo_to_px(self, pts_en: np.ndarray) -> np.ndarray:
"""Convert local east/north meters to pixel coordinates."""
scale = 1.0 / self.view.meters_per_pixel
px = np.empty_like(pts_en)
px[:, 0] = self.origin[0] + pts_en[:, 0] * scale
px[:, 1] = self.origin[1] - pts_en[:, 1] * scale
return px
def push_trail(self, polygon_px: np.ndarray) -> None:
self.history.append(polygon_px.astype(np.int32))
if len(self.history) > self.view.trail_length:
self.history = self.history[-self.view.trail_length :]
def draw(self, live_poly: np.ndarray, aircraft_px: Tuple[int, int], info: Iterable[str]) -> np.ndarray:
frame = self.base.copy()
overlay = frame.copy()
for idx, poly in enumerate(self.history):
alpha = max(0.05, 1.0 - (len(self.history) - idx) / len(self.history))
cv2.fillPoly(overlay, [poly], (30, 90, 180), lineType=cv2.LINE_AA)
cv2.addWeighted(overlay, alpha * 0.2, frame, 1 - alpha * 0.2, 0, frame)
cv2.polylines(frame, [live_poly.astype(np.int32)], True, (0, 255, 0), 2, cv2.LINE_AA)
cv2.circle(frame, aircraft_px, 6, (0, 0, 255), -1, cv2.LINE_AA)
y = 20
for line in info:
cv2.putText(frame, line, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, cv2.LINE_AA)
y += 20
return frame
# -----------------------------------------------------------------------------
# Simulation core
# -----------------------------------------------------------------------------
def warpPointWithH(H: np.ndarray, pt: np.ndarray) -> np.ndarray:
"""使用H矩阵投影点坐标参考1.py的实现"""
wp = H @ np.array([pt[0], pt[1], 1.0])
return wp / wp[2]
class UnderStitchSimulator:
def __init__(
self,
camera: CameraProfile,
flight: FlightProfile,
scanner: ScannerProfile,
map_view: MapView,
options: SimulationOptions,
):
self.camera = camera
self.flight = flight
self.scanner = scanner
self.map = MapCanvas(map_view)
self.options = options
if options.display_scale <= 0:
raise ValueError("display_scale must be positive.")
self.time_step = 1.0 / options.frame_rate
self.total_frames = int(options.duration_s * options.frame_rate)
self.stitcher = API_UnderStitch.Create(str((PROJECT_ROOT / "cache").resolve()))
self.frame_info = FrameInfo()
self._init_frame_info()
# 初始化拼接器并获取全景图信息
self.pan_info = self.stitcher.Init(self.frame_info)
print(f"[INFO] 初始化成功,全景图尺寸: {self.pan_info.m_pan_width} x {self.pan_info.m_pan_height}")
# 根据 display_scale 缩放全景参数,减小绘制画布
scale = self.options.display_scale
self.pan_info.scale = self.pan_info.scale * 0.5
self.scaled_pan_width = max(1, int(self.pan_info.m_pan_width * scale))
self.scaled_pan_height = max(1, int(self.pan_info.m_pan_height * scale))
self.scaled_pan_scale = self.pan_info.scale * scale
self.scaled_shift_x = self.pan_info.map_shiftX * scale
self.scaled_shift_y = self.pan_info.map_shiftY * scale
# 构建从地理坐标到全景图的变换矩阵参考Arith_UnderStitch.cpp的getAffineFromGeo2Pan
self.H_geo2pan = np.array([
[self.scaled_pan_scale, 0, self.scaled_shift_x],
[0, -self.scaled_pan_scale, self.scaled_pan_height + self.scaled_shift_y],
[0, 0, 1]
], dtype=np.float64)
# 初始化伺服扫描状态(正弦扇扫:边界速度最小、中心最大)
self.az_profile = self._build_axis_profile(
self.scanner.az_min_deg,
self.scanner.az_max_deg,
self.scanner.az_max_speed_degps,
self.scanner.az_max_acc_degps2,
)
self.pitch_profile = self._build_axis_profile(
self.scanner.pitch_min_deg,
self.scanner.pitch_max_deg,
self.scanner.pitch_max_speed_degps,
self.scanner.pitch_max_acc_degps2,
)
self.az_state = self._init_axis_state(self.az_profile, self.scanner.base_az_deg)
self.pitch_state = self._init_axis_state(self.pitch_profile, self.scanner.base_pitch_deg)
self.frame_info.servoInfo.fServoAz = self.az_state.angle
self.frame_info.servoInfo.fServoPt = self.pitch_state.angle
def _build_axis_profile(
self,
min_deg: float,
max_deg: float,
max_speed: float,
max_acc: float,
) -> AxisProfile:
low, high = (min_deg, max_deg) if min_deg <= max_deg else (max_deg, min_deg)
mid = (low + high) * 0.5
amplitude = (high - low) * 0.5
if amplitude <= 1e-6:
return AxisProfile(low, high, mid, 0.0, 0.0)
omega_candidates = []
if max_speed > 0:
omega_candidates.append(max_speed / amplitude)
if max_acc > 0:
omega_candidates.append(math.sqrt(max_acc / amplitude))
omega = min(omega_candidates) if omega_candidates else 0.0
return AxisProfile(low, high, mid, amplitude, omega)
def _init_axis_state(self, profile: AxisProfile, base_angle: float) -> AxisState:
angle = clamp(base_angle, profile.low, profile.high)
if profile.amplitude <= 1e-6 or profile.omega <= 0.0:
return AxisState(angle=angle, velocity=0.0, phase=0.0)
rel = clamp((angle - profile.mid) / profile.amplitude, -1.0, 1.0)
phase = math.asin(rel)
velocity = profile.amplitude * profile.omega * math.cos(phase)
return AxisState(angle=angle, velocity=velocity, phase=phase)
def _init_frame_info(self) -> None:
fi = self.frame_info
fi.camInfo.nFocus = int(self.camera.focus_mm)
fi.camInfo.fPixelSize = float(self.camera.pixel_size_um)
fi.nWidth = self.camera.width
fi.nHeight = self.camera.height
fi.nEvHeight = int(self.flight.altitude_m)
fi.craft.stPos.B = self.flight.start_lat
fi.craft.stPos.L = self.flight.start_lon
fi.craft.stPos.H = self.flight.altitude_m
fi.servoInfo.fServoAz = self.scanner.base_az_deg
fi.servoInfo.fServoPt = self.scanner.base_pitch_deg
def _update_pose(self, step_idx: int) -> Tuple[float, float]:
t = step_idx * self.time_step
distance = self.flight.speed_mps * t
heading_rad = math.radians(self.flight.heading_deg)
east = distance * math.cos(heading_rad)
north = distance * math.sin(heading_rad)
lat = self.flight.start_lat + north / 111320.0
lon = self.flight.start_lon + east / (111320.0 * math.cos(math.radians(self.flight.start_lat)))
self.frame_info.craft.stPos.B = lat
self.frame_info.craft.stPos.L = lon
self.frame_info.nFrmID = step_idx
self.frame_info.craft.stAtt.fYaw = self.flight.heading_deg
self.frame_info.craft.stAtt.fPitch = 0.0
self.frame_info.craft.stAtt.fRoll = 0.0
return east, north
def _step_axis(self, state: AxisState, profile: AxisProfile, dt: float) -> None:
if profile.amplitude <= 1e-6 or profile.omega <= 0.0:
state.angle = profile.mid
state.velocity = 0.0
state.phase = 0.0
return
state.phase += profile.omega * dt
# Wrap phase to [-pi, pi] for numerical stability
state.phase = (state.phase + math.pi) % (2 * math.pi) - math.pi
state.angle = profile.mid + profile.amplitude * math.sin(state.phase)
state.velocity = profile.amplitude * profile.omega * math.cos(state.phase)
def _update_scanner(self, dt: float) -> None:
self._step_axis(self.az_state, self.az_profile, dt)
self._step_axis(self.pitch_state, self.pitch_profile, dt)
self.frame_info.servoInfo.fServoAz = self.az_state.angle
self.frame_info.servoInfo.fServoPt = self.pitch_state.angle
def _image_corners_to_geo(self, H_img2geo: np.ndarray) -> Optional[np.ndarray]:
if H_img2geo is None or H_img2geo.size != 9:
return None
img_corners = np.array(
[
[0.0, 0.0],
[self.camera.width, 0.0],
[self.camera.width, self.camera.height],
[0.0, self.camera.height],
],
dtype=np.float64,
)
geo_corners = [warpPointWithH(H_img2geo, pt)[:2] for pt in img_corners]
return np.array(geo_corners, dtype=np.float64)
def _image_point_to_geo(self, H_img2geo: np.ndarray, pt: np.ndarray) -> Optional[np.ndarray]:
if H_img2geo is None or H_img2geo.size != 9:
return None
return warpPointWithH(H_img2geo, pt)[:2]
def _geo_to_pan(self, geo_pts: np.ndarray) -> np.ndarray:
pan_pts = [warpPointWithH(self.H_geo2pan, pt)[:2] for pt in geo_pts]
return np.array(pan_pts, dtype=np.float64)
def _compute_aircraft_geo_downward(self) -> Optional[np.ndarray]:
"""
将俯仰角临时设置为 -90°用于计算机体正下方在大地坐标中的投影。
"""
original_pitch = self.frame_info.servoInfo.fServoPt
try:
self.frame_info.servoInfo.fServoPt = -90.0
H_down = self.stitcher.getHomography(self.frame_info)
finally:
self.frame_info.servoInfo.fServoPt = original_pitch
if H_down is None or H_down.size != 9:
return None
center_px = np.array([self.camera.width * 0.5, self.camera.height * 0.5], dtype=np.float64)
return self._image_point_to_geo(H_down, center_px)
def run(self) -> None:
print(f"[INFO] Running simulator for {self.total_frames} frames.")
t0 = time.time()
# 创建全景图画布
pan_canvas = np.zeros((self.scaled_pan_height, self.scaled_pan_width, 3), dtype=np.uint8)
try:
for idx in range(self.total_frames):
start = time.perf_counter()
sim_time = idx * self.time_step
east, north = self._update_pose(idx)
self._update_scanner(self.time_step)
H_img2geo = self.stitcher.getHomography(self.frame_info)
geo_footprint = self._image_corners_to_geo(H_img2geo)
if geo_footprint is None:
print("[WARN] Image->geo projection failed; skipping frame.")
continue
plane_pan = None
plane_geo_down = self._compute_aircraft_geo_downward()
if plane_geo_down is not None:
plane_pan = self._geo_to_pan(np.array([plane_geo_down]))[0]
# 将当前覆盖区域投影到全景图
footprint_pan = self._geo_to_pan(geo_footprint)
# 累积绘制到原始全景图上,实现自然叠加效果
cv2.fillPoly(pan_canvas, [footprint_pan.astype(np.int32)], (30, 90, 180), lineType=cv2.LINE_AA)
# 基于当前累积结果生成可渲染帧
frame = pan_canvas.copy()
# 绘制当前覆盖框(绿色边框)
cv2.polylines(frame, [footprint_pan.astype(np.int32)], True, (0, 255, 0), 2, cv2.LINE_AA)
# 飞机位置画红色三角形
if plane_pan is not None:
# 根据全景图比例计算三角形大小
triangle_size = max(15, int(50 / max(self.scaled_pan_scale, 0.01)))
heading_rad = math.radians(self.flight.heading_deg)
# 定义指向上方的三角形(航向方向)
# 顶点在航向方向,底边垂直于航向
top = np.array([0, -triangle_size], dtype=np.float32)
left = np.array([-triangle_size * 0.6, triangle_size * 0.4], dtype=np.float32)
right = np.array([triangle_size * 0.6, triangle_size * 0.4], dtype=np.float32)
# 旋转三角形以匹配航向
cos_h = math.cos(heading_rad)
sin_h = math.sin(heading_rad)
rot_mat = np.array([[cos_h, -sin_h], [sin_h, cos_h]], dtype=np.float32)
top_rot = top @ rot_mat.T
left_rot = left @ rot_mat.T
right_rot = right @ rot_mat.T
# 转换到像素坐标
center = plane_pan.astype(np.float32)
triangle_pts = np.array([
(center + top_rot).astype(np.int32),
(center + left_rot).astype(np.int32),
(center + right_rot).astype(np.int32)
])
cv2.fillPoly(frame, [triangle_pts], (0, 0, 255), lineType=cv2.LINE_AA)
cv2.polylines(frame, [triangle_pts], True, (255, 255, 255), 1, cv2.LINE_AA)
# 添加HUD信息
hud = [
f"Frame: {idx}/{self.total_frames}",
f"Pan Size (orig): {self.pan_info.m_pan_width} x {self.pan_info.m_pan_height}",
f"Pan Size (scaled): {self.scaled_pan_width} x {self.scaled_pan_height}",
f"Pan Scale (orig/scaled): {self.pan_info.scale:.4f}/{self.scaled_pan_scale:.4f}",
f"Lat/Lon: {self.frame_info.craft.stPos.B:.6f}, {self.frame_info.craft.stPos.L:.6f}",
f"Altitude: {self.flight.altitude_m:.1f} m",
f"Speed: {self.flight.speed_mps:.1f} m/s",
f"Az/Pt: {self.frame_info.servoInfo.fServoAz:.1f}/{self.frame_info.servoInfo.fServoPt:.1f}",
]
y = 20
for line in hud:
cv2.putText(frame, line, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, cv2.LINE_AA)
y += 20
display_frame = frame
if not math.isclose(self.options.display_scale, 1.0):
display_frame = cv2.resize(
frame,
(0, 0),
fx=self.options.display_scale,
fy=self.options.display_scale,
interpolation=cv2.INTER_AREA,
)
cv2.imshow("UnderStitch Scan Simulator - Panorama View", display_frame)
key = cv2.waitKey(1)
if key == 27 or key == ord("q"):
break
end = time.perf_counter()
print(f"frame {idx} took {(end-start)*1000:.1f} ms")
# remaining = max(0.0, t0 + sim_time + self.time_step - time.time())
# time.sleep(remaining)
finally:
API_UnderStitch.Destroy(self.stitcher)
cv2.destroyAllWindows()
# -----------------------------------------------------------------------------
# CLI
# -----------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="UnderStitch downward scan footprint simulator.")
parser.add_argument("--duration", type=float, default=60.0, help="Simulation duration in seconds.")
parser.add_argument("--fps", type=float, default=50.0, help="Output/solver frame rate.")
parser.add_argument("--speed", type=float, default=50.0, help="Aircraft speed (m/s).")
parser.add_argument("--heading", type=float, default=36.0, help="Aircraft heading (deg, 0=E).")
parser.add_argument("--alt", type=float, default=1200.0, help="Altitude above ground (m).")
parser.add_argument("--map", type=Path, default=None, help="Optional background map image.")
parser.add_argument("--scale", type=float, default=2.0, help="Meters per pixel for map rendering.")
parser.add_argument("--trail", type=int, default=200, help="Stored footprint polygons.")
parser.add_argument("--base_az", type=float, default=90.0, help="Center azimuth (deg).")
parser.add_argument("--base_pitch", type=float, default=-90.0, help="Base pitch (deg).")
parser.add_argument("--az_min", type=float, default=90, help="Minimum azimuth limit (deg).")
parser.add_argument("--az_max", type=float, default=90, help="Maximum azimuth limit (deg).")
parser.add_argument("--az_speed", type=float, default=0, help="Azimuth max speed (deg/s).")
parser.add_argument("--az_acc", type=float, default=0, help="Azimuth max acceleration (deg/s^2).")
parser.add_argument("--pitch_min", type=float, default=-45.0, help="Minimum pitch limit (deg).")
parser.add_argument("--pitch_max", type=float, default=45.0, help="Maximum pitch limit (deg).")
parser.add_argument("--pitch_speed", type=float, default=45.0, help="Pitch max speed (deg/s).")
parser.add_argument("--pitch_acc", type=float, default=20.0, help="Pitch max acceleration (deg/s^2).")
parser.add_argument("--display_scale", type=float, default=0.3, help="Panorama display scale factor.")
return parser.parse_args()
def main() -> None:
args = parse_args()
camera = CameraProfile()
flight = FlightProfile(
altitude_m=args.alt,
speed_mps=args.speed,
heading_deg=args.heading,
)
scanner = ScannerProfile(
base_az_deg=args.base_az,
base_pitch_deg=args.base_pitch,
az_min_deg=args.az_min,
az_max_deg=args.az_max,
pitch_min_deg=args.pitch_min,
pitch_max_deg=args.pitch_max,
az_max_speed_degps=args.az_speed,
az_max_acc_degps2=args.az_acc,
pitch_max_speed_degps=args.pitch_speed,
pitch_max_acc_degps2=args.pitch_acc,
)
map_view = MapView(background=args.map, meters_per_pixel=args.scale, trail_length=args.trail)
options = SimulationOptions(
duration_s=args.duration,
frame_rate=args.fps,
display_scale=args.display_scale,
)
simulator = UnderStitchSimulator(camera, flight, scanner, map_view, options)
simulator.run()
if __name__ == "__main__":
main()