Source code for gbvision.utils.net.stream_broadcaster

import abc
import pickle
from typing import Optional

import cv2
import time

from gbvision.utils.releasable import Releasable
from gbvision.constants.math import EPSILON
from gbvision.constants.types import Frame, Coordinates


[docs]class StreamBroadcaster(Releasable, abc.ABC): """ An abstract broadcaster that sends stream to a stream receiver :param shape: Optional, the shape (x, y) of the sent frame, when set to something other then (0, 0) it overrides the fx and fy parameters, when set to (0, 0) it is not used :param fx: Ratio between width of the given frame to the width of the frame sent, default is 1 (same width) :param fy: ratio between height of the given frame to the height of the frame sent, default is 1 (same height) :param use_grayscale: boolean indicating if the frame should be converted to grayscale when sent, default is False :param max_fps: integer representing the maximum fps (frames per second) of the stream, when set to None there is no fps limitation, default is None :param max_bitrate: Integer that determines the max bitrate of video stream. The bitrate is messured with Kbps and default is None. """ def __init__(self, shape: Optional[Coordinates] = None, fx: Optional[float] = None, fy: Optional[float] = None, use_grayscale: bool = False, max_fps: int = None, im_encode='.jpg', max_bitrate: int = None): self.shape = shape self.fx = fx self.fy = fy self.use_grayscale = use_grayscale self.max_fps = max_fps self.prev_time = 0.0 self.im_encode = im_encode self.max_bitrate = max_bitrate
[docs] def send_frame(self, frame: Frame) -> None: """ safely sends the frame, doing all the pre-processing required :param frame: :return: """ if frame is not None: frame = self._prep_frame(frame) frame = cv2.imencode(self.im_encode, frame)[1] data = self._to_bytes(frame) if self._can_send_bytes(data): self._send_bytes(data) self._update_time()
@abc.abstractmethod def _send_bytes(self, data: bytes) -> None: """ Unsafely sends the given binary formatted frame to the stream receiver should not be used by the programmer, only by the API :param data: The frame to send in a binary format (like pickle) """ def _prep_frame(self, frame: Frame) -> Frame: """ Prepares an image to be sent Resize and convert the colors of the image by the parameters of the stream broadcaster :param frame: The frame to prepare :return: The frame after preparation """ if self.shape is not None: frame = cv2.resize(frame, self.shape) if self.fx is not None or self.fy is not None: frame = cv2.resize(frame, (0, 0), fx=self.fx or 1.0, fy=self.fy or 1.0) if self.use_grayscale and len(frame.shape) > 2: frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) return frame @staticmethod def _to_bytes(frame: object) -> bytes: """ Converts the given python object to a binary representation :param frame: The python object to parse :return: A binary representation of the python object """ return pickle.dumps(frame) def _legal_time(self) -> bool: """ Checks if at the fps will not pass the max fps limit if an image will be sent at the current moment :return: True if the image can be sent, False otherwise """ return self.max_fps is None or (time.time() - self.prev_time) * self.max_fps >= 1 def _update_time(self) -> None: """ Updates the previous time a frame was sent, used at the end of send_frame """ self.prev_time = time.time() def _legal_bitrate(self, data: bytes) -> None: """ Checks if sending the data will be a violation of the bitrate limit :param data: The data to send :return: True if there's no bitrate limit or frame bitrate is below max bitrate, False otherwise """ return self.max_bitrate is None or len(data) / ( (time.time() - self.prev_time + EPSILON) * 1000) <= self.max_bitrate def _can_send_bytes(self, data: bytes) -> bool: """ Checks if the data can be sent under the provided limitations (max FPS, max KBPS and etc) :param data: The data to send :return: True if sending the data will not violate the limitations, False otherwise """ if not self._legal_time(): return False if not self._legal_bitrate(data): return False return True