Server/python2/service.py (52 lines of code) (raw):

import io import struct class Service: WILDCARD_CHARACTERS = "*" ENABLE_PRIVILEGE = 1 ENABLE_AUDIO_PRIVILEGE = 2 ENABLE_VIDEO_PRIVILEGE = 4 ENABLE_SCREEN_PRIVILEGE = 8 def __init__(self, channel_id, user_id, privilege=None): self.channel_id = channel_id self.user_id = user_id self.privilege = privilege @classmethod def create_service_only_with_user_id(cls, user_id): return cls(cls.WILDCARD_CHARACTERS, user_id) @classmethod def create_service_only_with_channel_id(cls, channel_id): return cls(channel_id, cls.WILDCARD_CHARACTERS) def add_audio_publish_privilege(self): if self.privilege is None: self.privilege = 0 | self.ENABLE_PRIVILEGE self.privilege |= self.ENABLE_AUDIO_PRIVILEGE def add_video_publish_privilege(self): if self.privilege is None: self.privilege = 0 | self.ENABLE_PRIVILEGE self.privilege |= self.ENABLE_VIDEO_PRIVILEGE def add_screen_publish_privilege(self): if self.privilege is None: self.privilege = 0 | self.ENABLE_PRIVILEGE self.privilege |= self.ENABLE_SCREEN_PRIVILEGE def pack(self): buf = io.BytesIO() channel_id_bytes = self.channel_id.encode('utf-8') buf.write(struct.pack('>I', len(channel_id_bytes))) buf.write(channel_id_bytes) user_id_bytes = self.user_id.encode('utf-8') buf.write(struct.pack('>I', len(user_id_bytes))) buf.write(user_id_bytes) if self.privilege is not None: buf.write(struct.pack('>?I', True, self.privilege)) else: buf.write(struct.pack('>?', False)) return buf.getvalue() @staticmethod def unpack(buf): channel_id_length = struct.unpack('>I', buf.read(4))[0] channel_id = buf.read(channel_id_length).decode('utf-8') user_id_length = struct.unpack('>I', buf.read(4))[0] user_id = buf.read(user_id_length).decode('utf-8') has_privilege = struct.unpack('>?', buf.read(1))[0] privilege = struct.unpack('>I', buf.read(4))[0] if has_privilege else None return Service(channel_id, user_id, privilege)