You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
6.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import av
import cv2
import re
from typing import Optional, Tuple, Dict, Any
import numpy as np
# 读取H30T视频及字幕信息
class H30T_Reader:
"""
视频字幕读取器库
支持同时读取视频帧和字幕信息
"""
def __init__(self):
self.container = None
self.video_streams = []
self.subtitle_streams = []
self.is_open = False
def open(self, video_path: str) -> bool:
"""
打开视频文件
Args:
video_path (str): 视频文件路径
Returns:
bool: 是否成功打开
"""
try:
self.container = av.open(video_path)
self.container.seek(0)
# 获取所有流
streams = self.container.streams
self.subtitle_streams = [s for s in streams if s.type == 'subtitle']
self.video_streams = [s for s in streams if s.type == 'video']
self.is_open = True
return True
except Exception as e:
print(f"打开视频文件失败: {e}")
self.is_open = False
return False
def read(self) -> Tuple[Optional[np.ndarray], Optional[Dict[str, Any]]]:
"""
读取下一帧数据
Returns:
Tuple[Optional[np.ndarray], Optional[Dict[str, Any]]]:
(视频帧数组, 字幕信息字典)
如果没有更多数据,返回 (None, None)
"""
if not self.is_open or self.container is None:
return None, None
try:
# 按读取顺序处理所有数据包
for packet in self.container.demux():
if packet.stream.type == 'subtitle':
# 处理字幕包
for frame in packet.decode():
dialogue_text = frame.dialogue.decode('utf-8', errors='ignore')
# 解析元数据的正则表达式
metadata_pattern = r'FrameCnt: (\d+) ([\d-]+ [\d:.]+)\n\[focal_len: ([\d.]+)\] \[dzoom_ratio: ([\d.]+)\], \[latitude: ([\d.-]+)\] \[longitude: ([\d.-]+)\] \[rel_alt: ([\d.]+) abs_alt: ([\d.]+)\] \[gb_yaw: ([\d.-]+) gb_pitch: ([\d.-]+) gb_roll: ([\d.-]+)\]'
match = re.match(metadata_pattern, dialogue_text)
if match:
frame_cnt, timestamp, focal_len, dzoom_ratio, lat, lon, rel_alt, abs_alt, yaw, pitch, roll = match.groups()
subtitle_info = {
'frame_cnt': int(frame_cnt),
'timestamp': timestamp,
'focal_len': float(focal_len),
'dzoom_ratio': float(dzoom_ratio),
'latitude': float(lat),
'longitude': float(lon),
'rel_alt': float(rel_alt),
'abs_alt': float(abs_alt),
'gb_yaw': float(yaw),
'gb_pitch': float(pitch),
'gb_roll': float(roll),
'raw_text': dialogue_text
}
# 如果没有找到对应的视频帧返回字幕和None
return subtitle_info
else:
continue
except Exception as e:
print(f"读取数据失败: {e}")
return None, None
return None, None
def close(self):
"""
关闭视频文件
"""
if self.container is not None:
self.container.close()
self.container = None
self.is_open = False
self.video_streams = []
self.subtitle_streams = []
def __enter__(self):
"""支持上下文管理器"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出时自动关闭"""
self.close()
def get_stream_info(self) -> Dict[str, Any]:
"""
获取流信息
Returns:
Dict[str, Any]: 包含视频和字幕流信息的字典
"""
if not self.is_open:
return {}
info = {
'video_streams': [],
'subtitle_streams': []
}
for stream in self.video_streams:
info['video_streams'].append({
'index': stream.index,
'width': stream.width,
'height': stream.height,
'fps': float(stream.average_rate) if stream.average_rate else None,
'duration': float(stream.duration * stream.time_base) if stream.duration else None
})
for stream in self.subtitle_streams:
info['subtitle_streams'].append({
'index': stream.index,
'language': getattr(stream, 'language', 'unknown')
})
return info
# 使用示例
if __name__ == "__main__":
# 创建读取器实例
reader = H30T_Reader()
# 打开视频文件
if reader.open("DJI_20250418150210_0006_S.MP4"):
print("视频文件打开成功")
# 获取流信息
stream_info = reader.get_stream_info()
print("流信息:", stream_info)
# 读取数据
frame_count = 0
subtitle_count = 0
while True:
frame, subtitle = reader.read()
if frame is None and subtitle is None:
break
if frame is not None:
frame_count += 1
print(f"读取视频帧 {frame_count} - 尺寸: {frame.shape}")
# 显示视频帧(可选)
cv2.imshow('Video Frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if subtitle is not None:
subtitle_count += 1
print(f"读取字幕 {subtitle_count}: {subtitle}")
# 关闭视频文件
reader.close()
cv2.destroyAllWindows()
print(f"总共读取了 {frame_count} 帧视频和 {subtitle_count} 条字幕")