Source code for senseye_cameras.input.camera_ffmpeg

import sys
import time
import ffmpeg
import logging
import numpy as np

from . input import Input

log = logging.getLogger(__name__)


[docs]class CameraFfmpeg(Input): ''' Opens a camera using ffmpeg. Args: id (int/str): id of the ffmpeg camera config (dict): Configuration dictionary. Accepted keywords: fps (int): framerate of the camera res (tuple): resolution in the format (width, height, channels) block_size (int): how many bytes to read from the camera if the camera outputs a stream of bytes. camera_pixel_format (str): pixel format of the camera (eg: bgr24, uyvy422) format (str): desired output pixel format of the camera (eg: rawvideo, h264) ''' def __init__(self, id=0, config=None): config = config or {} defaults = { 'fps': 30, 'res': (1280, 720, 3), 'block_size': 16384, 'camera_pixel_format': 'uyvy422', 'format': 'rawvideo', } Input.__init__(self, id=id, config=config, defaults=defaults) self.process = None
[docs] def get_format(self): '''Get os specific format.''' if 'linux' in sys.platform: return 'v4l2' if sys.platform == 'darwin': return 'avfoundation' return 'dshow'
[docs] def open(self): ''' Opens the ffmpeg subprocess and logs. ''' self.process = ( ffmpeg .input( f'{self.id}', format=self.get_format(), pix_fmt=self.config.get('camera_pixel_format'), framerate=self.config.get('fps'), s=f'{self.config.get("res")[0]}x{self.config.get("res")[1]}', ) .output('pipe:', format=self.config.get('format')) # hide logging .global_args('-loglevel', 'error', '-hide_banner') # disable audio .global_args('-an') .run_async(pipe_stdout=True) ) # give the process time to start up time.sleep(0.2) return_code = self.process.poll() if return_code is not None: raise Exception(f'Failed to open ffmpeg camera {self.id}. Ffmpeg process exited with return code: {return_code} ') self.input = self.process.stdout
[docs] def read(self): ''' Reads in raw frames. ''' frame = None timestamp = None try: if self.config.get('format') == 'rawvideo': # convert rawvideo frames into a numpy array frame_size = np.prod(np.array(self.config.get('res'))) frame_bytes = self.input.read(frame_size) timestamp = time.time() frame_str = np.frombuffer(frame_bytes, dtype='uint8') # add shape metadata to the frame # numpy expects (width, height, channels) numpy_res = self.config.get('res')[1::-1] + self.config.get('res')[2:] frame = frame_str.reshape(numpy_res) else: # directly read in bytes otherwise frame = self.input.read(self.config.get('block_size')) except Exception as e: log.error(f"{str(self)} read error: {e}") return frame, timestamp
[docs] def close(self): if self.process: self.process.kill() self.process = None self.input = None