import av import cv2 import re from typing import Optional, Tuple, Dict, Any import numpy as np # 读取H30T视频及字幕信息 class H30T_Reader: """ 视频字幕读取器库 支持同时读取视频帧和字幕信息 """ def __init__(self): self.container = None self.video_streams = [] self.subtitle_streams = [] self.is_open = False def open(self, video_path: str) -> bool: """ 打开视频文件 Args: video_path (str): 视频文件路径 Returns: bool: 是否成功打开 """ try: self.container = av.open(video_path) self.container.seek(0) # 获取所有流 streams = self.container.streams self.subtitle_streams = [s for s in streams if s.type == 'subtitle'] self.video_streams = [s for s in streams if s.type == 'video'] self.is_open = True return True except Exception as e: print(f"打开视频文件失败: {e}") self.is_open = False return False def read(self) -> Tuple[Optional[np.ndarray], Optional[Dict[str, Any]]]: """ 读取下一帧数据 Returns: Tuple[Optional[np.ndarray], Optional[Dict[str, Any]]]: (视频帧数组, 字幕信息字典) 如果没有更多数据,返回 (None, None) """ if not self.is_open or self.container is None: return None, None try: # 按读取顺序处理所有数据包 for packet in self.container.demux(): if packet.stream.type == 'subtitle': # 处理字幕包 for frame in packet.decode(): dialogue_text = frame.dialogue.decode('utf-8', errors='ignore') # 解析元数据的正则表达式 metadata_pattern = r'FrameCnt: (\d+) ([\d-]+ [\d:.]+)\n\[focal_len: ([\d.]+)\] \[dzoom_ratio: ([\d.]+)\], \[latitude: ([\d.-]+)\] \[longitude: ([\d.-]+)\] \[rel_alt: ([\d.]+) abs_alt: ([\d.]+)\] \[gb_yaw: ([\d.-]+) gb_pitch: ([\d.-]+) gb_roll: ([\d.-]+)\]' match = re.match(metadata_pattern, dialogue_text) if match: frame_cnt, timestamp, focal_len, dzoom_ratio, lat, lon, rel_alt, abs_alt, yaw, pitch, roll = match.groups() subtitle_info = { 'frame_cnt': int(frame_cnt), 'timestamp': timestamp, 'focal_len': float(focal_len), 'dzoom_ratio': float(dzoom_ratio), 'latitude': float(lat), 'longitude': float(lon), 'rel_alt': float(rel_alt), 'abs_alt': float(abs_alt), 'gb_yaw': float(yaw), 'gb_pitch': float(pitch), 'gb_roll': float(roll), 'raw_text': dialogue_text } # 如果没有找到对应的视频帧,返回字幕和None return subtitle_info else: continue except Exception as e: print(f"读取数据失败: {e}") return None, None return None, None def close(self): """ 关闭视频文件 """ if self.container is not None: self.container.close() self.container = None self.is_open = False self.video_streams = [] self.subtitle_streams = [] def __enter__(self): """支持上下文管理器""" return self def __exit__(self, exc_type, exc_val, exc_tb): """退出时自动关闭""" self.close() def get_stream_info(self) -> Dict[str, Any]: """ 获取流信息 Returns: Dict[str, Any]: 包含视频和字幕流信息的字典 """ if not self.is_open: return {} info = { 'video_streams': [], 'subtitle_streams': [] } for stream in self.video_streams: info['video_streams'].append({ 'index': stream.index, 'width': stream.width, 'height': stream.height, 'fps': float(stream.average_rate) if stream.average_rate else None, 'duration': float(stream.duration * stream.time_base) if stream.duration else None }) for stream in self.subtitle_streams: info['subtitle_streams'].append({ 'index': stream.index, 'language': getattr(stream, 'language', 'unknown') }) return info # 使用示例 if __name__ == "__main__": # 创建读取器实例 reader = H30T_Reader() # 打开视频文件 if reader.open("DJI_20250418150210_0006_S.MP4"): print("视频文件打开成功") # 获取流信息 stream_info = reader.get_stream_info() print("流信息:", stream_info) # 读取数据 frame_count = 0 subtitle_count = 0 while True: frame, subtitle = reader.read() if frame is None and subtitle is None: break if frame is not None: frame_count += 1 print(f"读取视频帧 {frame_count} - 尺寸: {frame.shape}") # 显示视频帧(可选) cv2.imshow('Video Frame', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break if subtitle is not None: subtitle_count += 1 print(f"读取字幕 {subtitle_count}: {subtitle}") # 关闭视频文件 reader.close() cv2.destroyAllWindows() print(f"总共读取了 {frame_count} 帧视频和 {subtitle_count} 条字幕")