def create_stream()

in tensorwatch/watcher_base.py [0:0]


    def create_stream(self, name:str=None, devices:Sequence[str]=None, event_name:str='',
        expr=None, throttle:float=None, vis_args:VisArgs=None)->Stream:

        r"""Create stream with or without expression and attach to devices where 
        it will be written to.
        """
        stream_index = self._stream_count
        stream_name = name or 'Watcher{}-Stream{}'.format(self.index, stream_index)
        self._stream_count += 1

        # we allow few shortcuts, so modify expression if needed
        expr = expr
        if expr=='' or expr=='x':
            expr = 'map(lambda x:x, l)'
        elif expr and expr.strip().startswith('lambda '):
            expr = 'map({}, l)'.format(expr)
        # else no rewrites

        # if no expression specified then we don't create evaler
        evaler = Evaler(expr) if expr is not None else None

        # get stream infos for this event
        stream_infos = self._stream_infos.get(event_name, None)
        # if first for this event, create dictionary
        if stream_infos is None:
            stream_infos = self._stream_infos[event_name] = {}

        stream_info = stream_infos.get(stream_name, None)
        if not stream_info:
            utils.debug_log("Creating stream", stream_name)
            stream = Stream(stream_name=stream_name)
            devices = self.devices_or_default(devices)
            if devices is not None:
                # attached devices are opened in write-only mode
                device_streams = self._stream_factory.get_streams(stream_types=devices, 
                                                           for_write=True)
                for device_stream in device_streams:
                    device_stream.subscribe(stream)
            stream_req = StreamCreateRequest(stream_name=stream_name, devices=devices, event_name=event_name,
                     expr=expr, throttle=throttle, vis_args=vis_args)
            stream_info = stream_infos[stream_name] = WatcherBase.StreamInfo(
                stream_req, evaler, stream, stream_index)
        else:
            # TODO: throw error?
            utils.debug_log("Stream already exist, not creating again", stream_name)

        return stream_info.stream