import logging
from . loop_thread import LoopThread
from . output.output_factory import create_output
log = logging.getLogger(__name__)
[docs]class Writer(LoopThread):
'''Writes data from a queue into an output file.'''
def __init__(self, q, on_write=None, type='ffmpeg', config=None, path=0, frequency=None, writing=False, input_config=None):
config = config or {}
input_config = input_config or {}
self.output = None
self.q = q
self.on_write = on_write
self.writing = writing
self.type = type
self.config = config
self.path = path
self.input_config = input_config
self.frequency = frequency
if self.frequency is None:
self.frequency = self.config.get('fps', 100)
LoopThread.__init__(self, frequency=self.frequency)
[docs] def initialize_writer(self):
self.output = create_output(type=self.type, config=self.config, path=self.path, input_config=self.input_config)
log.info(f'Started {str(self.output)} (config: {self.output.config})')
self.frames_written = 0
[docs] def loop(self):
if self.writing:
data = self.q.get_nowait()
if data is not None:
self.output.write(data)
self.frames_written += 1
if self.on_write is not None:
self.on_write(data=data)
[docs] def set_path(self, path=None):
self.output.set_path(path)
log.info(f'{str(self)} path set to {path}')
[docs] def on_stop(self):
# flush out the queue and close output
if self.output:
self.writing = False
self.flush()
self.output.close()
unit = 'blocks' if self.type == 'audio_file' else 'frames'
log.info(f'Stopped {str(self.output)} ({unit}: {self.frames_written})')
self.output = None
self.frames_written = 0
[docs] def flush(self):
purge = self.q.remove_existing()
for data in purge:
self.output.write(data)
self.frames_written += 1
if self.on_write is not None:
self.on_write(data=data)