in wstl1/tools/notebook/extensions/wstl/magics/wstl.py [0:0]
def _get_incremental_transform(stub, shell, session_id, wstl_args, cell):
"""Invokes, throughs a gRPC request, an incremental Whistle transform.
Args:
stub: gRPC client stub library.
shell: an instance of the iPython shell that invoked the magic command.
session_id: the incremental transformation session id.
wstl_args: the arguments to the wstl magic command.
cell: the contents of the cell, containing whistle.
Returns:
TransformResponse or a grpc.RpcError
"""
req = wstlservice_pb2.IncrementalTransformRequest()
req.session_id = session_id
req.wstl = cell
if wstl_args.library_config:
library_configs = _location.parse_location(
shell,
wstl_args.library_config,
file_ext=_constants.WSTL_FILE_EXT,
load_contents=False)
if library_configs:
req.library_config.extend(library_configs)
if wstl_args.code_config:
code_configs = _location.parse_location(
shell,
wstl_args.code_config,
file_ext=_constants.JSON_FILE_EXT,
load_contents=False)
if code_configs:
req.code_config.extend(code_configs)
if wstl_args.unit_config:
unit_config = _location.parse_location(
shell,
wstl_args.unit_config,
file_ext=_constants.TEXTPROTO_FILE_EXT,
load_contents=False)
if unit_config:
req.unit_config = unit_config[0]
if wstl_args.input:
inputs = _location.parse_location(
shell,
wstl_args.input,
file_ext=_constants.JSON_FILE_EXT,
load_contents=True)
if inputs:
req.input.extend(inputs)
else:
return None, "no inputs matching arguement {}".format(wstl_args.input)
try:
resp = stub.GetIncrementalTransform(req, timeout=_GRPC_TIMEOUT)
except grpc.RpcError as rpc_error:
return None, rpc_error
else:
return resp, None