in tensorwatch/watcher_base.py [0:0]
def create_stream(self, name:str=None, devices:Sequence[str]=None, event_name:str='',
expr=None, throttle:float=None, vis_args:VisArgs=None)->Stream:
r"""Create stream with or without expression and attach to devices where
it will be written to.
"""
stream_index = self._stream_count
stream_name = name or 'Watcher{}-Stream{}'.format(self.index, stream_index)
self._stream_count += 1
# we allow few shortcuts, so modify expression if needed
expr = expr
if expr=='' or expr=='x':
expr = 'map(lambda x:x, l)'
elif expr and expr.strip().startswith('lambda '):
expr = 'map({}, l)'.format(expr)
# else no rewrites
# if no expression specified then we don't create evaler
evaler = Evaler(expr) if expr is not None else None
# get stream infos for this event
stream_infos = self._stream_infos.get(event_name, None)
# if first for this event, create dictionary
if stream_infos is None:
stream_infos = self._stream_infos[event_name] = {}
stream_info = stream_infos.get(stream_name, None)
if not stream_info:
utils.debug_log("Creating stream", stream_name)
stream = Stream(stream_name=stream_name)
devices = self.devices_or_default(devices)
if devices is not None:
# attached devices are opened in write-only mode
device_streams = self._stream_factory.get_streams(stream_types=devices,
for_write=True)
for device_stream in device_streams:
device_stream.subscribe(stream)
stream_req = StreamCreateRequest(stream_name=stream_name, devices=devices, event_name=event_name,
expr=expr, throttle=throttle, vis_args=vis_args)
stream_info = stream_infos[stream_name] = WatcherBase.StreamInfo(
stream_req, evaler, stream, stream_index)
else:
# TODO: throw error?
utils.debug_log("Stream already exist, not creating again", stream_name)
return stream_info.stream