in cupid/io/table/core.py [0:0]
def create_download_session(session, table_or_parts, split_size=None, split_count=None,
columns=None, with_split_meta=False):
channel = CupidTaskServiceRpcChannel(session)
stub = task_service_pb.CupidTaskService_Stub(channel)
if not isinstance(table_or_parts, (list, tuple, set, GeneratorType)):
table_or_parts = [table_or_parts]
if split_size is None and split_count is None:
split_count = 1
split_count = split_count or 0
split_size = (split_size or 1024 ** 2) // 1024 ** 2
table_pbs = []
for t in table_or_parts:
if isinstance(t, Table):
if not columns:
columns = t.table_schema.names
table_kw = dict(
projectName=t.project.name,
tableName=t.name,
columns=','.join(columns),
)
elif isinstance(t, TablePartition):
if not columns:
columns = t.table.table_schema.names
table_kw = dict(
projectName=t.table.project.name,
tableName=t.table.name,
columns=','.join(columns),
partSpec=str(t.partition_spec).replace("'", '').strip(),
)
else:
raise NotImplementedError
table_pbs.append(task_service_pb.TableInputInfo(**table_kw))
request = task_service_pb.SplitTablesRequest(
lookupName=session.lookup_name,
splitSize=split_size,
splitCount=split_count,
tableInputInfos=table_pbs,
allowNoColumns=True,
requireSplitMeta=with_split_meta,
)
controller = CupidRpcController()
resp = stub.SplitTables(controller, request, None)
if controller.Failed():
raise CupidError(controller.ErrorText())
logger.info(
"[CupidTask] splitTables call, CurrentInstanceId: %s, "
"request: %s, response: %s" % (
session.lookup_name, str(request), str(resp),
)
)
handle = resp.inputTableHandle
channel = SandboxRpcChannel()
stub = subprocess_pb.CupidSubProcessService_Stub(channel)
if not with_split_meta:
split_meta = itertools.repeat(None)
else:
req = subprocess_pb.GetSplitsMetaRequest(
inputTableHandle=handle,
)
controller = CupidRpcController()
resp = stub.GetSplitsMeta(controller, req, None)
logger.info(
"[CupidTask] getSplitsMeta call, CurrentInstanceId: %s, "
"request: %s, response: %s" % (
session.lookup_name, str(request), str(resp),
)
)
if controller.Failed():
split_meta = itertools.repeat(None)
logger.warning('Failed to get results of getSplitsMeta, '
'may running on an old service')
else:
split_meta = resp.inputSplitsMeta
req = subprocess_pb.GetSplitsRequest(inputTableHandle=handle)
controller = CupidRpcController()
resp = stub.GetSplits(controller, req, None)
if controller.Failed():
raise CupidError(controller.ErrorText())
input_splits = []
for info, meta in zip(resp.inputSplits, split_meta):
input_splits.append(TableSplit(
split_proto=info, meta_proto=meta, handle=handle, columns=columns))
logger.info(
"[SubProcess] getSplits call, CurrentInstanceId: %s, "
"request: %s, response: %s" % (
session.lookup_name,
str(req), str(resp),
)
)
return CupidTableDownloadSession(session=session, handle=handle, splits=input_splits)