|
|
import av
|
|
|
import cv2
|
|
|
import re
|
|
|
from typing import Optional, Tuple, Dict, Any
|
|
|
import numpy as np
|
|
|
|
|
|
# 读取H30T视频及字幕信息
|
|
|
class H30T_Reader:
|
|
|
"""
|
|
|
视频字幕读取器库
|
|
|
支持同时读取视频帧和字幕信息
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.container = None
|
|
|
self.video_streams = []
|
|
|
self.subtitle_streams = []
|
|
|
self.is_open = False
|
|
|
|
|
|
def open(self, video_path: str) -> bool:
|
|
|
"""
|
|
|
打开视频文件
|
|
|
|
|
|
Args:
|
|
|
video_path (str): 视频文件路径
|
|
|
|
|
|
Returns:
|
|
|
bool: 是否成功打开
|
|
|
"""
|
|
|
try:
|
|
|
self.container = av.open(video_path)
|
|
|
self.container.seek(0)
|
|
|
|
|
|
# 获取所有流
|
|
|
streams = self.container.streams
|
|
|
self.subtitle_streams = [s for s in streams if s.type == 'subtitle']
|
|
|
self.video_streams = [s for s in streams if s.type == 'video']
|
|
|
|
|
|
self.is_open = True
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"打开视频文件失败: {e}")
|
|
|
self.is_open = False
|
|
|
return False
|
|
|
|
|
|
def read(self) -> Tuple[Optional[np.ndarray], Optional[Dict[str, Any]]]:
|
|
|
"""
|
|
|
读取下一帧数据
|
|
|
|
|
|
Returns:
|
|
|
Tuple[Optional[np.ndarray], Optional[Dict[str, Any]]]:
|
|
|
(视频帧数组, 字幕信息字典)
|
|
|
如果没有更多数据,返回 (None, None)
|
|
|
"""
|
|
|
if not self.is_open or self.container is None:
|
|
|
return None, None
|
|
|
|
|
|
try:
|
|
|
# 按读取顺序处理所有数据包
|
|
|
for packet in self.container.demux():
|
|
|
if packet.stream.type == 'subtitle':
|
|
|
# 处理字幕包
|
|
|
for frame in packet.decode():
|
|
|
dialogue_text = frame.dialogue.decode('utf-8', errors='ignore')
|
|
|
|
|
|
# 解析元数据的正则表达式
|
|
|
metadata_pattern = r'FrameCnt: (\d+) ([\d-]+ [\d:.]+)\n\[focal_len: ([\d.]+)\] \[dzoom_ratio: ([\d.]+)\], \[latitude: ([\d.-]+)\] \[longitude: ([\d.-]+)\] \[rel_alt: ([\d.]+) abs_alt: ([\d.]+)\] \[gb_yaw: ([\d.-]+) gb_pitch: ([\d.-]+) gb_roll: ([\d.-]+)\]'
|
|
|
|
|
|
match = re.match(metadata_pattern, dialogue_text)
|
|
|
if match:
|
|
|
frame_cnt, timestamp, focal_len, dzoom_ratio, lat, lon, rel_alt, abs_alt, yaw, pitch, roll = match.groups()
|
|
|
|
|
|
subtitle_info = {
|
|
|
'frame_cnt': int(frame_cnt),
|
|
|
'timestamp': timestamp,
|
|
|
'focal_len': float(focal_len),
|
|
|
'dzoom_ratio': float(dzoom_ratio),
|
|
|
'latitude': float(lat),
|
|
|
'longitude': float(lon),
|
|
|
'rel_alt': float(rel_alt),
|
|
|
'abs_alt': float(abs_alt),
|
|
|
'gb_yaw': float(yaw),
|
|
|
'gb_pitch': float(pitch),
|
|
|
'gb_roll': float(roll),
|
|
|
'raw_text': dialogue_text
|
|
|
}
|
|
|
|
|
|
|
|
|
# 如果没有找到对应的视频帧,返回字幕和None
|
|
|
return subtitle_info
|
|
|
else:
|
|
|
continue
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"读取数据失败: {e}")
|
|
|
return None, None
|
|
|
|
|
|
return None, None
|
|
|
|
|
|
def close(self):
|
|
|
"""
|
|
|
关闭视频文件
|
|
|
"""
|
|
|
if self.container is not None:
|
|
|
self.container.close()
|
|
|
self.container = None
|
|
|
self.is_open = False
|
|
|
self.video_streams = []
|
|
|
self.subtitle_streams = []
|
|
|
|
|
|
def __enter__(self):
|
|
|
"""支持上下文管理器"""
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
"""退出时自动关闭"""
|
|
|
self.close()
|
|
|
|
|
|
def get_stream_info(self) -> Dict[str, Any]:
|
|
|
"""
|
|
|
获取流信息
|
|
|
|
|
|
Returns:
|
|
|
Dict[str, Any]: 包含视频和字幕流信息的字典
|
|
|
"""
|
|
|
if not self.is_open:
|
|
|
return {}
|
|
|
|
|
|
info = {
|
|
|
'video_streams': [],
|
|
|
'subtitle_streams': []
|
|
|
}
|
|
|
|
|
|
for stream in self.video_streams:
|
|
|
info['video_streams'].append({
|
|
|
'index': stream.index,
|
|
|
'width': stream.width,
|
|
|
'height': stream.height,
|
|
|
'fps': float(stream.average_rate) if stream.average_rate else None,
|
|
|
'duration': float(stream.duration * stream.time_base) if stream.duration else None
|
|
|
})
|
|
|
|
|
|
for stream in self.subtitle_streams:
|
|
|
info['subtitle_streams'].append({
|
|
|
'index': stream.index,
|
|
|
'language': getattr(stream, 'language', 'unknown')
|
|
|
})
|
|
|
|
|
|
return info
|
|
|
|
|
|
|
|
|
# 使用示例
|
|
|
if __name__ == "__main__":
|
|
|
# 创建读取器实例
|
|
|
reader = H30T_Reader()
|
|
|
|
|
|
# 打开视频文件
|
|
|
if reader.open("DJI_20250418150210_0006_S.MP4"):
|
|
|
print("视频文件打开成功")
|
|
|
|
|
|
# 获取流信息
|
|
|
stream_info = reader.get_stream_info()
|
|
|
print("流信息:", stream_info)
|
|
|
|
|
|
# 读取数据
|
|
|
frame_count = 0
|
|
|
subtitle_count = 0
|
|
|
|
|
|
while True:
|
|
|
frame, subtitle = reader.read()
|
|
|
|
|
|
if frame is None and subtitle is None:
|
|
|
break
|
|
|
|
|
|
if frame is not None:
|
|
|
frame_count += 1
|
|
|
print(f"读取视频帧 {frame_count} - 尺寸: {frame.shape}")
|
|
|
|
|
|
# 显示视频帧(可选)
|
|
|
cv2.imshow('Video Frame', frame)
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
|
break
|
|
|
|
|
|
if subtitle is not None:
|
|
|
subtitle_count += 1
|
|
|
print(f"读取字幕 {subtitle_count}: {subtitle}")
|
|
|
|
|
|
# 关闭视频文件
|
|
|
reader.close()
|
|
|
cv2.destroyAllWindows()
|
|
|
|
|
|
print(f"总共读取了 {frame_count} 帧视频和 {subtitle_count} 条字幕")
|