Source code for senseye_cameras.safe_queue

import queue
import logging

log = logging.getLogger(__name__)

[docs]class SafeQueue(queue.Queue): """ Class that extends Python Queue to safely handle exceptions from Queue accessor methods. """ def __init__(self, maxsize=0, module='Generic'): super().__init__(maxsize=maxsize) # Information on where the queue is being used for debug information self.module = module
[docs] def put(self, item, block=True, timeout=None, force=False): """ Attempt to insert an item into the queue, handling the Full exception if it arises. Return True if item is successfully enqueued. If a force was required, return the dequeued item. """ ret = True try: if self.full() and force: # When force is enabled, do a blocking dequeue ret = self.get() super().put(item=item, block=block, timeout=timeout) except queue.Full: log.debug(f'Queue module {self.module} is full. {item} not inserted') return False return ret
[docs] def put_nowait(self, item, force=False): """ Non-blocking put with exception handling. """ return self.put(item, block=False, force=force)
[docs] def get(self, block=True, timeout=None): """ Attempt to retrieve an element from the queue, handling the Empty exception if it arises. Returns the item or None if dequeue is unsuccessful. Note: It is possible that a None object was enqueued. Checking for a None value on a get() call will be ambiguous in this case. It may mean the original item was None or that the queue is empty. """ try: return super().get(block=block, timeout=timeout) except queue.Empty: log.debug(f'Queue module {self.module} is empty. Capacity is {self.maxsize}') return None
[docs] def get_nowait(self): """ Non-blocking get with exception handling. """ return self.get(block=False)
[docs] def to_list(self): """ Converts queue to a list. """ ret_list = [] with self.mutex: ret_list = list(self.queue) return ret_list
[docs] def remove_existing(self, num_elements=None): """ Pops items from self queue and appends to a list. Mutates self queue by removing qsize elements. Note: If you force an item to be input into the queue during this operation, you may clobber data. For example: - SafeQueue size is 10 when remove_existing() is called - SafeQueue maxsize is 10 - SafeQueue.put_nowait(an_item, force=True) - The first item is forced off, and may not have been grabbed into the list. - Using a mutex here may cause a deadlock. """ outgoing = [] # Attempt to remove passed number of elements, with all elements # being the default. # # If number requested is more than number of elements, remove all # elements size = num_elements if not num_elements or size > self.qsize(): size = self.qsize() for i in range(0,size): outgoing.append(self.get_nowait()) return outgoing
def __str__(self): """ Outputs readable version of items in queue by using list representation. """ return self.to_list().__str__()