import time
import atexit
import logging
from . safe_queue import SafeQueue
from . reader import Reader
from . writer import Writer
log = logging.getLogger(__name__)
[docs]class Stream:
'''
Links an Input with an Output.
Args:
input_type/input_config/id: see create_input.
output_type/output_config/path: see create_output
reading/writing (bool): whether to read/write on start.
on_read (func): called on frame read. Function should take fn(data=None, timestamp=None) as args.
on_read/on_write (func): called on frame write. Function should take fn(data=None)
'''
def __init__(
self,
input_type='usb', input_config=None, id=0,
output_type='ffmpeg', output_config=None, path='.',
input_frequency=None, output_frequency=None,
reading=False, writing=False,
on_read=None, on_write=None,
queue_size=700
):
input_config = input_config or {}
output_config = output_config or {}
self.q = SafeQueue(queue_size)
self.input_type = input_type
self.input_config = input_config
self.id = id
self.output_type = output_type
self.output_config = output_config
self.path = path
self.input_frequency = input_frequency
self.output_frequency = output_frequency
self.on_read = on_read
self.on_write = on_write
self.writer = self.reader = None
atexit.register(self.stop)
self.reader = Reader(
self.q, on_read=self.on_read, type=self.input_type, config=self.input_config, frequency=self.input_frequency,
id=self.id,
)
self.reader.start()
self.writer = Writer(
self.q, on_write=self.on_write, type=self.output_type, config=self.output_config, frequency=self.reader.frequency,
input_config=self.reader.input.config, path=self.path,
)
self.writer.start()
if reading:
self.start_reading()
if writing:
self.start_writing()
[docs] def set_path(self, path=None):
'''Sets the writers' path.'''
self.path = path
if self.writer:
self.writer.set_path(path)
####################
# READER FUNCTIONS
####################
[docs] def start_reading(self):
'''Starts reading in frames.'''
self.reader.reading = True
log.info(f'{str(self)} reading started - {time.time()}')
[docs] def stop_reading(self):
'''Stops reading in frames.'''
if self.reader.reading:
self.reader.reading = False
log.info(f'{str(self)} reading stopped - {time.time()}')
####################
# WRITER FUNCTIONS
####################
[docs] def start_writing(self):
'''Starts writing frames.'''
self.writer.initialize_writer()
self.reader.writing = True
self.writer.writing = True
log.info(f'{str(self)} writing started - {time.time()}')
[docs] def stop_writing(self):
'''Stops writing frames.'''
if self.writer.writing:
self.reader.writing = False
self.writer.on_stop()
log.info(f'{str(self)} writing stopped - {time.time()}')
[docs] def stop(self):
self.stop_reading()
self.stop_writing()
self.reader.stop()
self.writer.stop()
def __str__(self):
return f'{self.__class__.__name__}({self.input_type}:{self.id}, {self.output_type})'