Source code for senseye_cameras.reader

import logging
from . loop_thread import LoopThread

from . input.input_factory import create_input

log = logging.getLogger(__name__)


[docs]class Reader(LoopThread): '''Reads data into a queue.''' def __init__(self, q, on_read=None, type='usb', config=None, id=0, frequency=None, reading=False, writing=False): config = config or {} self.input = create_input(type=type, config=config, id=id) self.input.open() log.info(f'Started {str(self.input)} (config: {self.input.config})') self.q = q self.on_read = on_read self.type = type self.reading = reading self.writing = writing self.frequency = frequency if self.frequency is None: self.frequency = self.input.config.get('fps', 100) LoopThread.__init__(self, frequency=self.frequency)
[docs] def loop(self): if self.reading: data, timestamp = self.input.read() if data is not None: if self.on_read is not None: self.on_read(data=data, timestamp=timestamp) if self.writing: self.q.put_nowait(data)
[docs] def on_stop(self): self.reading = False self.input.close() log.info(f'Stopped {str(self.input)}')