master
wangchongwu 2 months ago
parent 0c1a2f8eee
commit cbd8df9bdd

@ -0,0 +1,152 @@
import numpy as np
from abc import ABC, abstractmethod
class BaseFilter(ABC):
"""
卡尔曼滤波器基类
定义所有滤波器的统一接口
"""
def __init__(self, dt, process_noise_std, measurement_noise_std, model_type):
"""
初始化滤波器基类
参数:
dt: 时间间隔
process_noise_std: 过程噪声标准差
measurement_noise_std: 测量噪声标准差
model_type: 模型类型必须是 'CV', 'CA', 'CT' 中的一种
"""
# 验证模型类型
valid_models = ['CV', 'CA', 'CT']
if model_type not in valid_models:
raise ValueError(f"模型类型必须是 {valid_models} 中的一种,当前输入: {model_type}")
self.dt = dt
self.process_noise_std = process_noise_std
self.measurement_noise_std = measurement_noise_std
self.model_type = model_type
@abstractmethod
def predict(self, x=None, P=None):
"""
预测步骤
参数:
x: 输入状态可选
P: 输入协方差矩阵可选
"""
pass
@abstractmethod
def update(self, measurement):
"""
更新步骤
参数:
measurement: 观测值
返回:
x: 更新后的状态
P: 更新后的协方差矩阵
likelihood: 似然值
"""
pass
@abstractmethod
def step(self, measurement):
"""
执行一步完整的滤波过程预测+更新
参数:
measurement: 观测值
返回:
x: 更新后的状态
P: 更新后的协方差矩阵
likelihood: 似然值
"""
pass
@abstractmethod
def get_position(self):
"""
获取当前位置估计
返回:
(x, y): 位置坐标
"""
pass
@abstractmethod
def get_velocity(self):
"""
获取当前速度估计
返回:
(vx, vy): 速度分量
"""
pass
def get_speed(self):
"""
获取当前速度大小
返回:
speed: 速度大小 (标量)
"""
vx, vy = self.get_velocity()
return np.sqrt(vx**2 + vy**2)
def get_model_type(self):
"""
获取当前模型类型
返回:
model_type: 模型类型 ('CV', 'CA', 'CT')
"""
return self.model_type
@abstractmethod
def set_state(self, x, P=None):
"""
设置滤波器状态
参数:
x: 状态向量
P: 协方差矩阵可选
"""
pass
def _calculate_likelihood(self, innovation, innovation_cov):
"""
计算高斯似然值可选实现
参数:
innovation: 创新向量
innovation_cov: 创新协方差矩阵
返回:
likelihood: 似然值
"""
dim = innovation.shape[0]
det_S = np.linalg.det(innovation_cov)
# 避免数值问题
if det_S <= 0:
return 1e-10
inv_S = np.linalg.inv(innovation_cov)
# 多元高斯分布的概率密度函数
norm_factor = 1.0 / np.sqrt((2 * np.pi) ** dim * det_S)
exponent = -0.5 * (innovation.T @ inv_S @ innovation)
# 处理不同维度的返回值
if hasattr(exponent, 'shape') and exponent.shape == (1, 1):
exponent = exponent[0, 0]
elif hasattr(exponent, 'shape') and len(exponent.shape) > 0:
exponent = float(exponent.flatten()[0])
likelihood = norm_factor * np.exp(exponent)
return float(likelihood)

@ -0,0 +1,274 @@
import numpy as np
from BaseFilter import BaseFilter
class CTModelEKF(BaseFilter):
"""
协调转弯模型的扩展卡尔曼滤波器 (CT-EKF)
状态向量: [x, y, vx, vy, omega]
其中 omega 是角速度转弯率
"""
def __init__(self, dt, process_noise_std, measurement_noise_std, omega_noise_std=0.1, model_type='CT'):
"""
初始化CT-EKF滤波器
参数:
dt: 时间间隔
process_noise_std: 位置和速度的过程噪声标准差
measurement_noise_std: 测量噪声标准差
omega_noise_std: 角速度的过程噪声标准差
model_type: 模型类型默认为'CT'协调转弯模型
"""
super().__init__(dt, process_noise_std, measurement_noise_std, model_type)
self.omega_noise_std = omega_noise_std
# 观测矩阵 (只观测位置)
self.H = np.array([[1, 0, 0, 0, 0],
[0, 1, 0, 0, 0]])
# 过程噪声协方差矩阵 (5x5)
dt2 = self.dt**2
dt3 = self.dt**3
dt4 = self.dt**4
q = self.process_noise_std**2
q_omega = self.omega_noise_std**2
self.Q = np.array([[q*dt4/4, 0, q*dt3/2, 0, 0],
[0, q*dt4/4, 0, q*dt3/2, 0],
[q*dt3/2, 0, q*dt2, 0, 0],
[0, q*dt3/2, 0, q*dt2, 0],
[0, 0, 0, 0, q_omega*dt]])
# 测量噪声协方差矩阵
self.R = self.measurement_noise_std**2 * np.eye(2)
# 初始误差协方差矩阵 (5x5)
self.P = np.diag([1000, 1000, 100, 100, 10]) # 角速度的初始不确定性较小
# 初始状态 (5x1)
self.x = np.zeros((5, 1))
def state_transition(self, x):
"""
非线性状态转移函数 f(x)
参数:
x: 状态向量 [px, py, vx, vy, omega]
返回:
x_pred: 预测状态
"""
px, py, vx, vy, omega = x.flatten()
dt = self.dt
# 处理角速度接近零的情况(直线运动)
if abs(omega) < 1e-6:
# 使用恒速模型
px_new = px + vx * dt
py_new = py + vy * dt
vx_new = vx
vy_new = vy
omega_new = omega
else:
# 协调转弯模型
wt = omega * dt
sin_wt = np.sin(wt)
cos_wt = np.cos(wt)
px_new = px + (vx * sin_wt + vy * (cos_wt - 1)) / omega
py_new = py + (vy * sin_wt - vx * (cos_wt - 1)) / omega
vx_new = vx * cos_wt - vy * sin_wt
vy_new = vx * sin_wt + vy * cos_wt
omega_new = omega # 假设角速度恒定
return np.array([[px_new], [py_new], [vx_new], [vy_new], [omega_new]])
def compute_jacobian(self, x):
"""
计算状态转移函数的雅可比矩阵 F
参数:
x: 状态向量 [px, py, vx, vy, omega]
返回:
F: 雅可比矩阵 (5x5)
"""
px, py, vx, vy, omega = x.flatten()
dt = self.dt
if abs(omega) < 1e-6:
# 直线运动情况的雅可比矩阵
F = np.array([[1, 0, dt, 0, 0],
[0, 1, 0, dt, 0],
[0, 0, 1, 0, 0],
[0, 0, 0, 1, 0],
[0, 0, 0, 0, 1]])
else:
# 转弯运动情况的雅可比矩阵
wt = omega * dt
sin_wt = np.sin(wt)
cos_wt = np.cos(wt)
# 计算偏导数
# ∂px/∂omega
dpx_domega = (vx * (cos_wt * dt * omega - sin_wt) +
vy * (-sin_wt * dt * omega - cos_wt + 1)) / (omega**2)
# ∂py/∂omega
dpy_domega = (vy * (cos_wt * dt * omega - sin_wt) -
vx * (-sin_wt * dt * omega - cos_wt + 1)) / (omega**2)
# ∂vx/∂omega
dvx_domega = dt * (-vx * sin_wt - vy * cos_wt)
# ∂vy/∂omega
dvy_domega = dt * (vx * cos_wt - vy * sin_wt)
F = np.array([[1, 0, sin_wt/omega, (cos_wt-1)/omega, dpx_domega],
[0, 1, (1-cos_wt)/omega, sin_wt/omega, dpy_domega],
[0, 0, cos_wt, -sin_wt, dvx_domega],
[0, 0, sin_wt, cos_wt, dvy_domega],
[0, 0, 0, 0, 1]])
return F
def predict(self, x=None, P=None):
"""
EKF预测步骤
参数:
x: 输入状态可选
P: 输入协方差矩阵可选
"""
if x is not None:
self.x = x
if P is not None:
self.P = P
# 非线性状态预测
self.x = self.state_transition(self.x)
# 计算雅可比矩阵
F = self.compute_jacobian(self.x)
# 协方差预测线性化后的标准KF公式
self.P = F @ self.P @ F.T + self.Q
def update(self, measurement):
"""
EKF更新步骤
参数:
measurement: 观测值 [x, y]
返回:
x: 更新后的状态
P: 更新后的协方差矩阵
likelihood: 似然值
"""
# 观测预测(线性)
z_pred = self.H @ self.x
# 创新
y = measurement.reshape(-1, 1) - z_pred
# 创新协方差
S = self.H @ self.P @ self.H.T + self.R
# 计算似然值
likelihood = self._calculate_likelihood(y, S)
# 卡尔曼增益
K = self.P @ self.H.T @ np.linalg.inv(S)
# 更新状态
self.x = self.x + K @ y
# 更新协方差
I = np.eye(5)
self.P = (I - K @ self.H) @ self.P
return self.x, self.P, likelihood
def _calculate_likelihood(self, innovation, innovation_cov):
"""
计算高斯似然值
参数:
innovation: 创新向量
innovation_cov: 创新协方差矩阵
返回:
likelihood: 似然值
"""
dim = innovation.shape[0]
det_S = np.linalg.det(innovation_cov)
# 避免数值问题
if det_S <= 0:
return 1e-10
inv_S = np.linalg.inv(innovation_cov)
# 多元高斯分布的概率密度函数
norm_factor = 1.0 / np.sqrt((2 * np.pi) ** dim * det_S)
exponent = -0.5 * (innovation.T @ inv_S @ innovation)[0, 0]
likelihood = norm_factor * np.exp(exponent)
return float(likelihood)
def get_position(self):
"""
获取当前位置估计
返回:
(x, y): 位置坐标
"""
return self.x[0, 0], self.x[1, 0]
def get_velocity(self):
"""
获取当前速度估计
返回:
(vx, vy): 速度分量
"""
return self.x[2, 0], self.x[3, 0]
def get_angular_velocity(self):
"""
获取当前角速度估计
返回:
omega: 角速度
"""
return self.x[4, 0]
def step(self, measurement):
"""
执行一步完整的滤波过程预测+更新
参数:
measurement: 观测值 [x, y]
返回:
x: 更新后的状态
P: 更新后的协方差矩阵
likelihood: 似然值
"""
# 预测步骤
self.predict()
# 更新步骤
return self.update(measurement)
def set_state(self, x, P=None):
"""
设置滤波器状态
参数:
x: 状态向量 [px, py, vx, vy, omega]
P: 协方差矩阵可选
"""
self.x = x.reshape(-1, 1)
if P is not None:
self.P = P

@ -0,0 +1,157 @@
import numpy as np
import KalmanFilter2D as kf2d
class IMMFilter:
"""
交互式多模型(IMM)滤波器
"""
def __init__(self, filters, M, mu_init):
if not isinstance(filters, (list, tuple)):
raise ValueError("filters必须是滤波器列表")
if len(filters) < 2:
raise ValueError("IMM滤波器至少需要2个模型")
self.filters = filters
self.n_models = len(filters)
if M is None or mu_init is None:
raise ValueError("必须提供转移矩阵M和初始概率mu_init")
if M.shape != (self.n_models, self.n_models):
raise ValueError(f"转移矩阵M的形状应为({self.n_models}, {self.n_models})")
if len(mu_init) != self.n_models:
raise ValueError(f"初始概率mu_init的长度应为{self.n_models}")
self.M = M # 模型转移概率矩阵
self.mu = mu_init # 模型概率
# 找到最大状态向量维度
self.state_dims = [f.x.shape[0] for f in filters]
self.max_state_dim = max(self.state_dims)
# 使用最大维度初始化
self.x = np.zeros((self.max_state_dim, 1))
self.P = np.eye(self.max_state_dim)
self.x_mix = [np.zeros((self.max_state_dim, 1)) for _ in range(self.n_models)]
self.P_mix = [np.eye(self.max_state_dim) for _ in range(self.n_models)]
def predict(self):
# 1. 计算混合概率 (Interaction/Mixing)
c_bar = self.mu @ self.M
mixing_probs = (self.M * self.mu[:, np.newaxis]) / c_bar
# 2. 混合每个模型的状态
for j in range(self.n_models):
self.x_mix[j] = np.zeros((self.max_state_dim, 1))
self.P_mix[j] = np.zeros((self.max_state_dim, self.max_state_dim))
# 混合状态向量
for i in range(self.n_models):
# 扩展状态向量到最大维度
x_extended = np.zeros((self.max_state_dim, 1))
x_extended[:self.state_dims[i], :] = self.filters[i].x
self.x_mix[j] += x_extended * mixing_probs[i, j]
# 混合协方差矩阵
for i in range(self.n_models):
# 扩展状态向量和协方差矩阵
x_extended = np.zeros((self.max_state_dim, 1))
P_extended = np.eye(self.max_state_dim) * 1e-6 # 添加小的对角元素
x_extended[:self.state_dims[i], :] = self.filters[i].x
P_extended[:self.state_dims[i], :self.state_dims[i]] = self.filters[i].P
y = x_extended - self.x_mix[j]
self.P_mix[j] += mixing_probs[i, j] * (P_extended + y @ y.T)
# 3. 对每个模型进行预测
for j in range(self.n_models):
# 使用混合后的状态进行预测(截取到对应维度)
x_j = self.x_mix[j][:self.state_dims[j], :]
P_j = self.P_mix[j][:self.state_dims[j], :self.state_dims[j]]
self.filters[j].predict(x_j, P_j)
def update(self, z):
# 1. 对每个模型进行更新,并获取似然
likelihoods = np.zeros(self.n_models)
for j in range(self.n_models):
_, _, likelihoods[j] = self.filters[j].update(z)
# 2. 更新模型概率
c_bar = self.mu @ self.M
self.mu = likelihoods * c_bar
# 数值稳定性处理
mu_sum = np.sum(self.mu)
if mu_sum > 0 and not np.isnan(mu_sum):
self.mu /= mu_sum # 归一化
else:
# 如果出现数值问题,重置为均匀分布
self.mu = np.ones(self.n_models) / self.n_models
# 3. 状态融合
self.x.fill(0)
self.P.fill(0)
# 融合状态向量
for j in range(self.n_models):
x_extended = np.zeros((self.max_state_dim, 1))
x_extended[:self.state_dims[j], :] = self.filters[j].x
self.x += self.mu[j] * x_extended
# 融合协方差矩阵
for j in range(self.n_models):
x_extended = np.zeros((self.max_state_dim, 1))
P_extended = np.eye(self.max_state_dim) * 1e-6
x_extended[:self.state_dims[j], :] = self.filters[j].x
P_extended[:self.state_dims[j], :self.state_dims[j]] = self.filters[j].P
y = x_extended - self.x
self.P += self.mu[j] * (P_extended + y @ y.T)
return self.x, self.P
def step(self, z):
"""
执行完整的IMM滤波步骤预测 + 更新
参数:
z: 观测值 (2x1 numpy数组)
返回:
x: 融合后的状态估计 (状态维度x1 numpy数组)
P: 融合后的协方差矩阵 (状态维度x状态维度 numpy数组)
mu: 当前模型概率 (numpy数组)
velocity: 当前速度估计 [vx, vy] (numpy数组)
"""
# 执行预测步骤
self.predict()
# 执行更新步骤
x, P = self.update(z)
# 提取速度信息 - 考虑CT模型的角速度影响
# 找到概率最大的模型
dominant_model_idx = np.argmax(self.mu)
# 获取优势模型的类型
dominant_model = self.filters[dominant_model_idx]
dominant_model_type = dominant_model.get_model_type()
if dominant_model_type == 'CT' and len(x) >= 5: # CT模型且状态向量包含角速度
# CT模型考虑角速度的瞬时速度
vx, vy, omega = x[2, 0], x[3, 0], x[4, 0]
# 如果有显著的角速度,计算切向速度分量
if abs(omega) > 1e-4: # 降低阈值以更容易检测转弯
# 当前线速度大小
v_linear = np.sqrt(vx**2 + vy**2)
# 角速度对速度方向的影响(切向加速度)
# 这里简化处理,保持原有线速度但标记为转弯状态
velocity = np.array([vx, vy, omega]) # 包含角速度信息
else:
velocity = np.array([vx, vy, 0.0]) # 直线运动
else:
# CV或CA模型只返回线速度
velocity = np.array([x[2, 0], x[3, 0]])
return x, P, self.mu, velocity

@ -0,0 +1,122 @@
import numpy as np
from BaseFilter import BaseFilter
class KalmanFilter2D(BaseFilter):
def __init__(self, dt, process_noise_std, measurement_noise_std, model_type='CV'):
"""
2D卡尔曼滤波器
状态向量: [x, y, vx, vy]
参数:
dt: 时间间隔
process_noise_std: 过程噪声标准差
measurement_noise_std: 测量噪声标准差
model_type: 模型类型默认为'CV'恒速模型
"""
super().__init__(dt, process_noise_std, measurement_noise_std, model_type)
# 状态转移矩阵 (恒速模型)
self.A = np.array([[1, 0, dt, 0],
[0, 1, 0, dt],
[0, 0, 1, 0],
[0, 0, 0, 1]])
# 观测矩阵 (只观测位置)
self.H = np.array([[1, 0, 0, 0],
[0, 1, 0, 0]])
# 过程噪声协方差矩阵
self.Q = process_noise_std**2 * np.array([[dt**4/4, 0, dt**3/2, 0],
[0, dt**4/4, 0, dt**3/2],
[dt**3/2, 0, dt**2, 0],
[0, dt**3/2, 0, dt**2]])
# 测量噪声协方差矩阵
self.R = measurement_noise_std**2 * np.eye(2)
# 初始误差协方差矩阵
self.P = 100 * np.eye(4)
# 初始状态
self.x = np.zeros((4, 1))
def predict(self, x=None, P=None):
"""预测步骤"""
if x is not None:
self.x = x
if P is not None:
self.P = P
self.x = np.dot(self.A, self.x)
self.P = np.dot(np.dot(self.A, self.P), self.A.T) + self.Q
def update(self, measurement):
"""更新步骤"""
# 创新
y = measurement - np.dot(self.H, self.x)
# 创新协方差
S = np.dot(np.dot(self.H, self.P), self.H.T) + self.R
# 计算似然值
likelihood = self._calculate_likelihood(y, S)
# 卡尔曼增益
K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S))
# 更新状态
self.x = self.x + np.dot(K, y)
# 更新协方差
I = np.eye(4)
self.P = np.dot((I - np.dot(K, self.H)), self.P)
return self.x, self.P, likelihood
def _calculate_likelihood(self, innovation, innovation_cov):
"""计算高斯似然值"""
dim = innovation.shape[0]
det_S = np.linalg.det(innovation_cov)
inv_S = np.linalg.inv(innovation_cov)
# 避免数值问题
if det_S <= 0:
return 1e-10
# 多元高斯分布的概率密度函数
norm_factor = 1.0 / np.sqrt((2 * np.pi) ** dim * det_S)
exponent = -0.5 * np.dot(np.dot(innovation.T, inv_S), innovation)
likelihood = norm_factor * np.exp(exponent)
return float(likelihood[0, 0])
def step(self, measurement):
"""
执行一步完整的滤波过程预测+更新
参数:
measurement: 观测值 [x, y]
返回:
x: 更新后的状态
P: 更新后的协方差矩阵
likelihood: 似然值
"""
# 预测步骤
self.predict()
# 更新步骤
return self.update(measurement)
def get_position(self):
"""获取当前位置估计"""
return self.x[0, 0], self.x[1, 0]
def get_velocity(self):
"""获取当前速度估计"""
return self.x[2, 0], self.x[3, 0]
def set_state(self, x, P=None):
"""
设置滤波器状态
参数:
x: 状态向量 [px, py, vx, vy]
P: 协方差矩阵可选
"""
self.x = x.reshape(-1, 1)
if P is not None:
self.P = P

@ -0,0 +1,427 @@
import numpy as np
import matplotlib.pyplot as plt
from IMM_Filter import IMMFilter
from KalmanFilter2D import KalmanFilter2D
from CTModelEKF import CTModelEKF
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
def generate_trajectory(n_points=100, trajectory_type='circular', noise_std=0.1,
outlier_ratio=0.0, outlier_magnitude=10.0):
"""
生成[x,y]轨迹数据
参数:
n_points: 轨迹点数
trajectory_type: 轨迹类型 ('linear', 'circular', 'figure8', 'maneuver')
noise_std: 噪声标准差
outlier_ratio: 野值比例 (0.0-1.0)
outlier_magnitude: 野值幅度倍数
返回:
true_positions: 真实轨迹 (n_points, 2)
noisy_measurements: 带噪声的观测 (n_points, 2)
outlier_indices: 野值点的索引
"""
t = np.linspace(0, 10, n_points)
if trajectory_type == 'linear':
# 直线运动
x_true = 2 * t + 10
y_true = 1.5 * t + 5
elif trajectory_type == 'circular':
# 圆形轨迹
radius = 20
x_true = radius * np.cos(0.5 * t) + 50
y_true = radius * np.sin(0.5 * t) + 50
elif trajectory_type == 'figure8':
# 8字形轨迹
x_true = 20 * np.sin(t) + 50
y_true = 20 * np.sin(2 * t) + 50
elif trajectory_type == 'maneuver':
# 机动轨迹(包含转弯)
x_true = np.zeros_like(t)
y_true = np.zeros_like(t)
for i, time in enumerate(t):
if time < 3:
# 直线运动
x_true[i] = 5 * time
y_true[i] = 2 * time
elif time < 6:
# 转弯
angle = np.pi * (time - 3) / 3
x_true[i] = 15 + 10 * np.cos(angle)
y_true[i] = 6 + 10 * np.sin(angle)
else:
# 再次直线运动
x_true[i] = 5 + 3 * (time - 6)
y_true[i] = 16 + 1 * (time - 6)
elif trajectory_type == 'my':
x_true = np.zeros_like(t)
y_true = np.zeros_like(t)
for i, time in enumerate(t):
if time < 3:
x_true[i] = 5 * time
y_true[i] = 2 * time
elif time < 6:
angle = np.pi * (time - 3) / 15
x_true[i] = 15 + 2 * np.cos(angle)
y_true[i] = 6 + 2 * np.sin(angle)
else:
raise ValueError(f"未知的轨迹类型: {trajectory_type}")
# 真实位置
true_positions = np.column_stack([x_true, y_true])
# 添加测量噪声
noise = np.random.normal(0, noise_std, (n_points, 2))
noisy_measurements = true_positions + noise
# 添加野值(跳点)
outlier_indices = []
if outlier_ratio > 0:
n_outliers = int(n_points * outlier_ratio)
outlier_indices = np.random.choice(n_points, n_outliers, replace=False)
for idx in outlier_indices:
# 生成随机方向的野值
outlier_direction = np.random.uniform(0, 2*np.pi)
outlier_distance = outlier_magnitude * noise_std
outlier_offset = outlier_distance * np.array([np.cos(outlier_direction),
np.sin(outlier_direction)])
noisy_measurements[idx] += outlier_offset
return true_positions, noisy_measurements, outlier_indices
def test_imm_filter(trajectory_type='maneuver', noise_std=0.5, n_points=300,
outlier_ratio=0.0, outlier_magnitude=10.0):
"""
测试IMM滤波器
参数:
trajectory_type: 轨迹类型
noise_std: 测量噪声标准差
n_points: 轨迹点数
outlier_ratio: 野值比例
outlier_magnitude: 野值幅度倍数
"""
# 时间步长
dt = 0.01
# 生成轨迹
true_positions, measurements, outlier_indices = generate_trajectory(
n_points, trajectory_type, noise_std, outlier_ratio, outlier_magnitude)
# 初始化两个卡尔曼滤波器
# 平滑模型(低过程噪声)- 适合匀速运动
kf_smooth = KalmanFilter2D(dt, process_noise_std=0.1, measurement_noise_std=noise_std)
# 机动模型(高过程噪声)- 适合机动运动
kf_maneuver = KalmanFilter2D(dt, process_noise_std=10, measurement_noise_std=noise_std)
# 协调转弯模型 - 适合转弯运动
# 增加过程噪声和角速度噪声以提高鲁棒性,防止发散
ct_model = CTModelEKF(dt, process_noise_std=1.0, measurement_noise_std=noise_std, omega_noise_std=10.0)
# 模型转移矩阵: 假设有95%概率保持原模型
M = np.array([[0.95, 0.05],
[0.05, 0.95]])
mu_init = np.array([0.5, 0.5]) # 初始模型概率
# 初始化每个滤波器的状态
# 使用前两个测量点估计初始速度
if len(measurements) > 1:
initial_vx = (measurements[1, 0] - measurements[0, 0]) / dt
initial_vy = (measurements[1, 1] - measurements[0, 1]) / dt
else:
initial_vx, initial_vy = 0, 0
initial_state_4d = np.array([[measurements[0, 0]], [measurements[0, 1]], [initial_vx], [initial_vy]])
initial_state_5d = np.array([[measurements[0, 0]], [measurements[0, 1]], [initial_vx], [initial_vy], [0.01]]) # 给CT模型一个初始角速度猜测
kf_smooth.x = initial_state_4d.copy()
kf_maneuver.x = initial_state_4d.copy()
ct_model.x = initial_state_5d.copy()
# 创建IMM滤波器 - 使用改进后的单滤波器兼容性
imm = IMMFilter([kf_smooth,kf_maneuver],M,mu_init)
# 存储结果
estimated_positions = []
estimated_velocities = []
model_probabilities = []
individual_model_positions = [[] for _ in range(len(imm.filters))] # 存储每个模型的独立估计
# 运行滤波器
for i in range(len(measurements)):
# 执行完整的滤波步骤(预测+更新)
z = measurements[i].reshape(-1, 1)
x, P, mu, velocity = imm.step(z)
# 保存IMM融合结果
estimated_positions.append([x[0, 0], x[1, 0]])
estimated_velocities.append([velocity[0], velocity[1]])
model_probabilities.append(mu.copy())
# 保存每个模型的独立估计结果
for j, filter_obj in enumerate(imm.filters):
individual_model_positions[j].append([filter_obj.x[0, 0], filter_obj.x[1, 0]])
estimated_positions = np.array(estimated_positions)
estimated_velocities = np.array(estimated_velocities)
model_probabilities = np.array(model_probabilities)
# 计算误差
position_errors = np.linalg.norm(true_positions - estimated_positions, axis=1)
rmse = np.sqrt(np.mean(position_errors**2))
print(f"轨迹类型: {trajectory_type}")
print(f"测量噪声标准差: {noise_std}")
if outlier_ratio > 0:
print(f"野值比例: {outlier_ratio:.1%}, 野值数量: {len(outlier_indices)}")
print(f"野值幅度: {outlier_magnitude}倍噪声标准差")
print(f"位置估计RMSE: {rmse:.3f}")
# 动态输出模型概率(根据实际滤波器类型确定名称)
n_models = model_probabilities.shape[1]
# 根据实际传入的滤波器类型确定模型名称
model_names = []
for filter_obj in imm.filters:
if isinstance(filter_obj, CTModelEKF):
model_names.append('CT模型')
elif isinstance(filter_obj, KalmanFilter2D):
# 根据过程噪声区分平滑模型和机动模型
if filter_obj.process_noise_std <= 0.1:
model_names.append('平滑模型')
else:
model_names.append('机动模型')
else:
model_names.append(f'{type(filter_obj).__name__}')
for i in range(n_models):
model_name = model_names[i] if i < len(model_names) else f'模型{i+1}'
print(f"平均模型概率 - {model_name}: {np.mean(model_probabilities[:, i]):.3f}")
if n_models == 1:
print("注意: 当前使用单个滤波器模式")
# 绘图
fig = plt.figure(figsize=(15, 10))
# 子图1: 轨迹对比(交互式)
ax1 = plt.subplot(2, 2, 1)
plt.plot(true_positions[:, 0], true_positions[:, 1], 'g-', linewidth=2, label='真实轨迹')
plt.scatter(measurements[:, 0], measurements[:, 1], c='r', s=10, alpha=0.5, label='噪声观测')
# 标出野值点
if len(outlier_indices) > 0:
plt.scatter(measurements[outlier_indices, 0], measurements[outlier_indices, 1],
c='orange', s=50, marker='x', linewidth=2, label='野值点')
plt.plot(estimated_positions[:, 0], estimated_positions[:, 1], 'b-', linewidth=2, label='IMM估计')
plt.xlabel('X位置')
plt.ylabel('Y位置')
title = f'{trajectory_type}轨迹跟踪结果'
if outlier_ratio > 0:
title += f' (野值{outlier_ratio:.1%})'
plt.title(title + '\n')
plt.legend()
plt.grid(True)
# 子图2: 位置误差
plt.subplot(2, 2, 2)
plt.plot(position_errors, 'r-', linewidth=1)
plt.xlabel('时间步')
plt.ylabel('位置误差')
plt.title(f'位置估计误差 (RMSE={rmse:.3f})')
plt.grid(True)
# 子图3: 模型概率(动态适应模型数量)
plt.subplot(2, 2, 3)
# 动态绘制模型概率曲线
n_models = model_probabilities.shape[1]
colors = ['b-', 'r-', 'g-', 'm-', 'c-', 'y-'] # 支持更多模型的颜色
# 使用与输出部分相同的模型名称逻辑
for i in range(n_models):
model_name = model_names[i] if i < len(model_names) else f'模型{i+1}'
color = colors[i] if i < len(colors) else f'C{i}-'
plt.plot(model_probabilities[:, i], color, linewidth=2, label=model_name)
plt.xlabel('时间步')
plt.ylabel('模型概率')
if n_models == 1:
plt.title('模型概率变化 (单模型)')
else:
plt.title('模型概率变化')
plt.legend()
plt.grid(True)
# 子图4: 滤波速度
plt.subplot(2, 2, 4)
# 计算真实速度(用于对比)
true_velocities = np.zeros((len(true_positions), 2))
for i in range(1, len(true_positions)):
true_velocities[i, 0] = (true_positions[i, 0] - true_positions[i-1, 0]) / dt
true_velocities[i, 1] = (true_positions[i, 1] - true_positions[i-1, 1]) / dt
# 绘制速度曲线
time_steps = np.arange(len(estimated_velocities))
plt.plot(time_steps, true_velocities[:, 0], 'g--', linewidth=1, alpha=0.7, label='真实Vx')
plt.plot(time_steps, true_velocities[:, 1], 'g:', linewidth=1, alpha=0.7, label='真实Vy')
plt.plot(time_steps, estimated_velocities[:, 0], 'b-', linewidth=2, label='估计Vx')
plt.plot(time_steps, estimated_velocities[:, 1], 'r-', linewidth=2, label='估计Vy')
plt.xlabel('时间步')
plt.ylabel('速度')
plt.title('X和Y方向滤波速度')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# 转换个体模型位置为numpy数组
individual_model_positions = [np.array(pos) for pos in individual_model_positions]
# 创建新的独立图形:显示各个模型的独立滤波结果
fig2 = plt.figure(figsize=(12, 8))
plt.plot(true_positions[:, 0], true_positions[:, 1], 'g-', linewidth=3, label='真实轨迹', alpha=0.8)
plt.scatter(measurements[:, 0], measurements[:, 1], c='gray', s=8, alpha=0.3, label='噪声观测')
# 标出野值点
if len(outlier_indices) > 0:
plt.scatter(measurements[outlier_indices, 0], measurements[outlier_indices, 1],
c='orange', s=50, marker='x', linewidth=2, label='野值点')
# 绘制每个模型的独立估计轨迹
colors = ['blue', 'red', 'purple', 'brown', 'pink', 'cyan']
linestyles = ['-', '--', '-.', ':', '-', '--']
for i, (pos, filter_obj) in enumerate(zip(individual_model_positions, imm.filters)):
model_name = model_names[i] if i < len(model_names) else f'模型{i+1}'
color = colors[i] if i < len(colors) else f'C{i}'
linestyle = linestyles[i] if i < len(linestyles) else '-'
plt.plot(pos[:, 0], pos[:, 1], color=color, linestyle=linestyle,
linewidth=2, label=f'{model_name}独立估计', alpha=0.7)
# 绘制IMM融合结果
plt.plot(estimated_positions[:, 0], estimated_positions[:, 1], 'k-',
linewidth=2, label='IMM融合结果', alpha=0.9)
plt.xlabel('X位置')
plt.ylabel('Y位置')
title = f'{trajectory_type}轨迹 - 各模型独立滤波结果对比'
if outlier_ratio > 0:
title += f' (野值{outlier_ratio:.1%})'
plt.title(title)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return rmse, model_probabilities, estimated_velocities
def test_one_filter(trajectory_type='maneuver', noise_std=0.5, n_points=300,
outlier_ratio=0.0, outlier_magnitude=10.0):
# 时间步长
dt = 0.01
# 生成轨迹
true_positions, measurements, outlier_indices = generate_trajectory(
n_points, trajectory_type, noise_std, outlier_ratio, outlier_magnitude)
# 初始化每个滤波器的状态
# 使用前两个测量点估计初始速度
if len(measurements) > 1:
initial_vx = (measurements[1, 0] - measurements[0, 0]) / dt
initial_vy = (measurements[1, 1] - measurements[0, 1]) / dt
else:
initial_vx, initial_vy = 0, 0
initial_state_4d = np.array([[measurements[0, 0]], [measurements[0, 1]], [initial_vx], [initial_vy]])
initial_state_5d = np.array([[measurements[0, 0]], [measurements[0, 1]], [initial_vx], [initial_vy], [0.0001]]) # 给CT模型一个初始角速度猜测
# 平滑模型(低过程噪声)- 适合匀速运动
filter = KalmanFilter2D(dt, process_noise_std=8, measurement_noise_std=noise_std)
#filter = CTModelEKF(dt, process_noise_std=1.0, measurement_noise_std=noise_std, omega_noise_std=10.0)
filter.x = initial_state_4d.copy()
# 存储结果
estimated_positions = []
estimated_velocities = []
# 运行滤波器
for i in range(len(measurements)):
# 执行完整的滤波步骤(预测+更新)
z = measurements[i].reshape(-1, 1)
x, P, mu = filter.step(z)
# 保存
estimated_positions.append([x[0, 0], x[1, 0]])
#estimated_velocities.append([velocity[0], velocity[1]])
estimated_positions = np.array(estimated_positions)
estimated_velocities = np.array(estimated_velocities)
# 计算误差
position_errors = np.linalg.norm(true_positions - estimated_positions, axis=1)
rmse = np.sqrt(np.mean(position_errors**2))
# 绘图
fig = plt.figure(figsize=(15, 10))
# 子图1: 轨迹对比(交互式)
ax1 = plt.subplot(2, 1, 1)
plt.plot(true_positions[:, 0], true_positions[:, 1], 'g-', linewidth=2, label='真实轨迹')
plt.scatter(measurements[:, 0], measurements[:, 1], c='r', s=10, alpha=0.5, label='噪声观测')
# 标出野值点
if len(outlier_indices) > 0:
plt.scatter(measurements[outlier_indices, 0], measurements[outlier_indices, 1],
c='orange', s=50, marker='x', linewidth=2, label='野值点')
plt.plot(estimated_positions[:, 0], estimated_positions[:, 1], 'b-', linewidth=2, label='Filter估计')
plt.xlabel('X位置')
plt.ylabel('Y位置')
title = f'{trajectory_type}轨迹跟踪结果'
if outlier_ratio > 0:
title += f' (野值{outlier_ratio:.1%})'
plt.title(title + '\n')
plt.legend()
plt.grid(True)
# 子图2: 位置误差
plt.subplot(2, 1, 2)
plt.plot(position_errors, 'r-', linewidth=1)
plt.xlabel('时间步')
plt.ylabel('位置误差')
plt.title(f'位置估计误差 (RMSE={rmse:.3f})')
plt.grid(True)
plt.show()
return rmse,1.0, estimated_velocities
if __name__ == '__main__':
trajectory_types = ['linear', 'circular', 'figure8', 'maneuver','my']
#rmse, model_probs, velocities = test_imm_filter('my', noise_std = 0.1, n_points=300,outlier_ratio=0.01,outlier_magnitude=20)
test_one_filter('my', noise_std = 0.1, n_points=300,outlier_ratio=0.01,outlier_magnitude=1)

@ -328,7 +328,7 @@ class VideoPlayer:
def main():
video_path = 'DJI_20250418150210_0006_S.MP4'
video_path = 'video/2.mp4'
# 检查文件是否存在
if not os.path.exists(video_path):

Loading…
Cancel
Save