Source code for senseye_cameras.writer

import logging
from . loop_thread import LoopThread
from . output.output_factory import create_output

log = logging.getLogger(__name__)


[docs]class Writer(LoopThread): '''Writes data from a queue into an output file.''' def __init__(self, q, on_write=None, type='ffmpeg', config=None, path=0, frequency=None, writing=False, input_config=None): config = config or {} input_config = input_config or {} self.output = None self.q = q self.on_write = on_write self.writing = writing self.type = type self.config = config self.path = path self.input_config = input_config self.frequency = frequency if self.frequency is None: self.frequency = self.config.get('fps', 100) LoopThread.__init__(self, frequency=self.frequency)
[docs] def initialize_writer(self): self.output = create_output(type=self.type, config=self.config, path=self.path, input_config=self.input_config) log.info(f'Started {str(self.output)} (config: {self.output.config})') self.frames_written = 0
[docs] def loop(self): if self.writing: data = self.q.get_nowait() if data is not None: self.output.write(data) self.frames_written += 1 if self.on_write is not None: self.on_write(data=data)
[docs] def set_path(self, path=None): self.output.set_path(path) log.info(f'{str(self)} path set to {path}')
[docs] def on_stop(self): # flush out the queue and close output if self.output: self.writing = False self.flush() self.output.close() unit = 'blocks' if self.type == 'audio_file' else 'frames' log.info(f'Stopped {str(self.output)} ({unit}: {self.frames_written})') self.output = None self.frames_written = 0
[docs] def flush(self): purge = self.q.remove_existing() for data in purge: self.output.write(data) self.frames_written += 1 if self.on_write is not None: self.on_write(data=data)