Source code for cvkit.video_readers.deffcode_reader

import json
import time
from datetime import timedelta
from queue import Queue, Empty
from threading import Thread

import numpy as np
from deffcode import FFdecoder

from cvkit.video_readers.video_reader_interface import BaseVideoReaderInterface


[docs]class DeffcodeVideoReader(BaseVideoReaderInterface): """ This implementation uses Deffcode as the underlying library to implement the BaseVideoReaderInterface. :param video_path: Path of the video. :type video_path: str :param fps: The FPS of the video. :type fps: float :param buffer_size: The size of the frame pre-fetch buffer. :type buffer_size: int """ FLAVOR = "deffcode"
[docs] def random_access_image(self, position): if 0 <= position < self.total_frames: ts = self.get_timestamp(position) stream = FFdecoder(self.video_path, **{'-ss': ts}).formulate() frame = next(stream.generateFrame(), None) stream.terminate() if frame is not None: return frame
def get_timestamp(self, frame_number): return str(timedelta(seconds=(frame_number / self.fps))) def __init__(self, video_path, fps, buffer_size=128): super().__init__(video_path, fps, buffer_size) self.state = 0 self.thread = None self.buffer = Queue(maxsize=buffer_size) self.stream = FFdecoder(self.video_path).formulate() self.total_frames = json.loads(self.stream.metadata)['approx_video_nframes'] self.stream.terminate() self.current_index = -1 def start(self): ts = self.get_timestamp(self.current_index + 1) self.stream = FFdecoder(self.video_path, **{'-ss': ts}).formulate() self.thread = Thread(target=self.fill_buffer) self.thread.daemon = True self.state = 1 self.thread.start() def fill_buffer(self): while True: if self.state <= 0: break if not self.buffer.full(): frame = next(self.stream.generateFrame(), None) self.buffer.put(frame) else: time.sleep(0.01) def stop(self): if self.thread: self.state = -1 self.thread.join() with self.buffer.mutex: self.buffer.queue.clear() self.stream.terminate() self.thread = None
[docs] def pause(self) -> None: self.state = 0
[docs] def release(self): self.stop()
[docs] def next_frame(self) -> np.ndarray: if self.state == -1: return None elif self.state != 1: self.start() try: self.current_frame = self.buffer.get() self.current_index += 1 return self.current_frame except Empty: self.stop() return None