deepracer_offroad_ws/deepracer_offroad_navigation_pkg/deepracer_offroad_navigation_pkg/utils.py [20:85]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class DoubleBuffer():
    """Object type which helps to thread-safely read and write from the buffer.
    """
    def __init__(self, clear_data_on_get=True):
        """Create a DoubleBuffer object.

        Args:
            clear_data_on_get (bool, optional): Flag to clear data from the queue after its read.
                                                Defaults to True.
        """
        self.read_buffer = None
        self.write_buffer = None
        self.clear_data_on_get = clear_data_on_get
        self.cv = threading.Condition()

    def clear(self):
        """Helper method to clear the buffer.
        """
        with self.cv:
            self.read_buffer = None
            self.write_buffer = None

    def put(self, data):
        """Helper method to safely store the data in the buffer.

        Args:
            data (Any): The object that is to be stored.
        """
        with self.cv:
            self.write_buffer = data
            self.write_buffer, self.read_buffer = self.read_buffer, self.write_buffer
            self.cv.notify()

    def get(self, block=True):
        """Helper method to safely read the data from the buffer.

        Args:
            block (bool, optional): Flag set to wait for the new data if read_buffer is empty.
                                    Defaults to True.

        Raises:
            DoubleBuffer.Empty: Exception if returning empty read buffer without waiting for
                                new data.

        Returns:
            Any: Data stored in the buffer.
        """
        with self.cv:
            if not block:
                if self.read_buffer is None:
                    raise DoubleBuffer.Empty
            else:
                while self.read_buffer is None:
                    self.cv.wait()
            data = self.read_buffer
            if self.clear_data_on_get:
                self.read_buffer = None
            return data

    def get_nowait(self):
        """Wrapper method to return the data stored in buffer without waiting for new data.

        Returns:
            Any: Data stored in the buffer.
        """
        return self.get(block=False)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



deepracer_offroad_ws/qr_detection_pkg/qr_detection_pkg/utils.py [20:85]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class DoubleBuffer():
    """Object type which helps to thread-safely read and write from the buffer.
    """
    def __init__(self, clear_data_on_get=True):
        """Create a DoubleBuffer object.

        Args:
            clear_data_on_get (bool, optional): Flag to clear data from the queue after its read.
                                                Defaults to True.
        """
        self.read_buffer = None
        self.write_buffer = None
        self.clear_data_on_get = clear_data_on_get
        self.cv = threading.Condition()

    def clear(self):
        """Helper method to clear the buffer.
        """
        with self.cv:
            self.read_buffer = None
            self.write_buffer = None

    def put(self, data):
        """Helper method to safely store the data in the buffer.

        Args:
            data (Any): The object that is to be stored.
        """
        with self.cv:
            self.write_buffer = data
            self.write_buffer, self.read_buffer = self.read_buffer, self.write_buffer
            self.cv.notify()

    def get(self, block=True):
        """Helper method to safely read the data from the buffer.

        Args:
            block (bool, optional): Flag set to wait for the new data if read_buffer is empty.
                                    Defaults to True.

        Raises:
            DoubleBuffer.Empty: Exception if returning empty read buffer without waiting for
                                new data.

        Returns:
            Any: Data stored in the buffer.
        """
        with self.cv:
            if not block:
                if self.read_buffer is None:
                    raise DoubleBuffer.Empty
            else:
                while self.read_buffer is None:
                    self.cv.wait()
            data = self.read_buffer
            if self.clear_data_on_get:
                self.read_buffer = None
            return data

    def get_nowait(self):
        """Wrapper method to return the data stored in buffer without waiting for new data.

        Returns:
            Any: Data stored in the buffer.
        """
        return self.get(block=False)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



