in utils/ioutils/av_wrappers.py [0:0]
def av_laod_audio(container, audio_fps=None, start_time=0, duration=None):
audio_stream = container.streams.audio[0]
_ss = audio_stream.start_time * audio_stream.time_base if audio_stream.start_time is not None else 0.
_dur = audio_stream.duration * audio_stream.time_base
_ff = _ss + _dur
_fps = audio_stream.rate
if audio_fps is None:
resample = False
audio_fps = _fps
else:
resample = True
audio_resampler = av.audio.resampler.AudioResampler(format="s16p", layout="mono", rate=audio_fps)
if duration is None:
duration = _ff - start_time
duration = min(duration, _ff - start_time)
end_time = start_time + duration
# Fast forward
container.seek(int(start_time * av.time_base))
# Decode snippet
data, timestamps = [], []
for frame in container.decode(audio=0):
frame_pts = frame.pts * frame.time_base
frame_end_pts = frame_pts + Fraction(frame.samples, frame.rate)
if frame_end_pts < start_time: # Skip until start time
continue
if frame_pts > end_time: # Exit if clip has been extracted
break
try:
frame.pts = None
if resample:
np_snd = audio_resampler.resample(frame).to_ndarray()
else:
np_snd = frame.to_ndarray()
data += [np_snd]
timestamps += [frame_pts]
except AttributeError:
break
data = np.concatenate(data, 1)
# Trim audio
start_decoded_time = timestamps[0]
ss = int((start_time - start_decoded_time) * audio_fps)
t = int(duration * audio_fps)
if ss < 0:
data = np.pad(data, ((0, 0), (-ss, 0)), 'constant', constant_values=0)
ss = 0
if t > data.shape[1]:
data = np.pad(data, ((0, 0), (0, t-data.shape[1])), 'constant', constant_values=0)
data = data[:, ss: ss+t]
data = data / np.iinfo(data.dtype).max
return data, audio_fps