import sys
import time
import ffmpeg
import logging
import numpy as np
from . input import Input
log = logging.getLogger(__name__)
[docs]class CameraFfmpeg(Input):
'''
Opens a camera using ffmpeg.
Args:
id (int/str): id of the ffmpeg camera
config (dict): Configuration dictionary. Accepted keywords:
fps (int): framerate of the camera
res (tuple): resolution in the format (width, height, channels)
block_size (int): how many bytes to read from the camera if the camera outputs a stream of bytes.
camera_pixel_format (str): pixel format of the camera (eg: bgr24, uyvy422)
format (str): desired output pixel format of the camera (eg: rawvideo, h264)
'''
def __init__(self, id=0, config=None):
config = config or {}
defaults = {
'fps': 30,
'res': (1280, 720, 3),
'block_size': 16384,
'camera_pixel_format': 'uyvy422',
'format': 'rawvideo',
}
Input.__init__(self, id=id, config=config, defaults=defaults)
self.process = None
[docs] def open(self):
'''
Opens the ffmpeg subprocess and logs.
'''
self.process = (
ffmpeg
.input(
f'{self.id}',
format=self.get_format(),
pix_fmt=self.config.get('camera_pixel_format'),
framerate=self.config.get('fps'),
s=f'{self.config.get("res")[0]}x{self.config.get("res")[1]}',
)
.output('pipe:', format=self.config.get('format'))
# hide logging
.global_args('-loglevel', 'error', '-hide_banner')
# disable audio
.global_args('-an')
.run_async(pipe_stdout=True)
)
# give the process time to start up
time.sleep(0.2)
return_code = self.process.poll()
if return_code is not None:
raise Exception(f'Failed to open ffmpeg camera {self.id}. Ffmpeg process exited with return code: {return_code} ')
self.input = self.process.stdout
[docs] def read(self):
'''
Reads in raw frames.
'''
frame = None
timestamp = None
try:
if self.config.get('format') == 'rawvideo':
# convert rawvideo frames into a numpy array
frame_size = np.prod(np.array(self.config.get('res')))
frame_bytes = self.input.read(frame_size)
timestamp = time.time()
frame_str = np.frombuffer(frame_bytes, dtype='uint8')
# add shape metadata to the frame
# numpy expects (width, height, channels)
numpy_res = self.config.get('res')[1::-1] + self.config.get('res')[2:]
frame = frame_str.reshape(numpy_res)
else:
# directly read in bytes otherwise
frame = self.input.read(self.config.get('block_size'))
except Exception as e:
log.error(f"{str(self)} read error: {e}")
return frame, timestamp
[docs] def close(self):
if self.process:
self.process.kill()
self.process = None
self.input = None