You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
6.5 KiB

2 weeks ago
import av
import cv2
import re
from typing import Optional, Tuple, Dict, Any
import numpy as np
# 读取H30T视频及字幕信息
class H30T_Reader:
"""
视频字幕读取器库
支持同时读取视频帧和字幕信息
"""
def __init__(self):
self.container = None
self.video_streams = []
self.subtitle_streams = []
self.is_open = False
def open(self, video_path: str) -> bool:
"""
打开视频文件
Args:
video_path (str): 视频文件路径
Returns:
bool: 是否成功打开
"""
try:
self.container = av.open(video_path)
self.container.seek(0)
# 获取所有流
streams = self.container.streams
self.subtitle_streams = [s for s in streams if s.type == 'subtitle']
self.video_streams = [s for s in streams if s.type == 'video']
self.is_open = True
return True
except Exception as e:
print(f"打开视频文件失败: {e}")
self.is_open = False
return False
def read(self) -> Tuple[Optional[np.ndarray], Optional[Dict[str, Any]]]:
"""
读取下一帧数据
Returns:
Tuple[Optional[np.ndarray], Optional[Dict[str, Any]]]:
(视频帧数组, 字幕信息字典)
如果没有更多数据返回 (None, None)
"""
if not self.is_open or self.container is None:
return None, None
try:
# 按读取顺序处理所有数据包
for packet in self.container.demux():
if packet.stream.type == 'subtitle':
# 处理字幕包
for frame in packet.decode():
dialogue_text = frame.dialogue.decode('utf-8', errors='ignore')
# 解析元数据的正则表达式
metadata_pattern = r'FrameCnt: (\d+) ([\d-]+ [\d:.]+)\n\[focal_len: ([\d.]+)\] \[dzoom_ratio: ([\d.]+)\], \[latitude: ([\d.-]+)\] \[longitude: ([\d.-]+)\] \[rel_alt: ([\d.]+) abs_alt: ([\d.]+)\] \[gb_yaw: ([\d.-]+) gb_pitch: ([\d.-]+) gb_roll: ([\d.-]+)\]'
match = re.match(metadata_pattern, dialogue_text)
if match:
frame_cnt, timestamp, focal_len, dzoom_ratio, lat, lon, rel_alt, abs_alt, yaw, pitch, roll = match.groups()
subtitle_info = {
'frame_cnt': int(frame_cnt),
'timestamp': timestamp,
'focal_len': float(focal_len),
'dzoom_ratio': float(dzoom_ratio),
'latitude': float(lat),
'longitude': float(lon),
'rel_alt': float(rel_alt),
'abs_alt': float(abs_alt),
'gb_yaw': float(yaw),
'gb_pitch': float(pitch),
'gb_roll': float(roll),
'raw_text': dialogue_text
}
# 如果没有找到对应的视频帧返回字幕和None
return subtitle_info
else:
continue
except Exception as e:
print(f"读取数据失败: {e}")
return None, None
return None, None
def close(self):
"""
关闭视频文件
"""
if self.container is not None:
self.container.close()
self.container = None
self.is_open = False
self.video_streams = []
self.subtitle_streams = []
def __enter__(self):
"""支持上下文管理器"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出时自动关闭"""
self.close()
def get_stream_info(self) -> Dict[str, Any]:
"""
获取流信息
Returns:
Dict[str, Any]: 包含视频和字幕流信息的字典
"""
if not self.is_open:
return {}
info = {
'video_streams': [],
'subtitle_streams': []
}
for stream in self.video_streams:
info['video_streams'].append({
'index': stream.index,
'width': stream.width,
'height': stream.height,
'fps': float(stream.average_rate) if stream.average_rate else None,
'duration': float(stream.duration * stream.time_base) if stream.duration else None
})
for stream in self.subtitle_streams:
info['subtitle_streams'].append({
'index': stream.index,
'language': getattr(stream, 'language', 'unknown')
})
return info
# 使用示例
if __name__ == "__main__":
# 创建读取器实例
reader = H30T_Reader()
# 打开视频文件
if reader.open("DJI_20250418150210_0006_S.MP4"):
print("视频文件打开成功")
# 获取流信息
stream_info = reader.get_stream_info()
print("流信息:", stream_info)
# 读取数据
frame_count = 0
subtitle_count = 0
while True:
frame, subtitle = reader.read()
if frame is None and subtitle is None:
break
if frame is not None:
frame_count += 1
print(f"读取视频帧 {frame_count} - 尺寸: {frame.shape}")
# 显示视频帧(可选)
cv2.imshow('Video Frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if subtitle is not None:
subtitle_count += 1
print(f"读取字幕 {subtitle_count}: {subtitle}")
# 关闭视频文件
reader.close()
cv2.destroyAllWindows()
print(f"总共读取了 {frame_count} 帧视频和 {subtitle_count} 条字幕")