def create_download_session()

in cupid/io/table/core.py [0:0]


def create_download_session(session, table_or_parts, split_size=None, split_count=None,
                            columns=None, with_split_meta=False):
    channel = CupidTaskServiceRpcChannel(session)
    stub = task_service_pb.CupidTaskService_Stub(channel)

    if not isinstance(table_or_parts, (list, tuple, set, GeneratorType)):
        table_or_parts = [table_or_parts]

    if split_size is None and split_count is None:
        split_count = 1
    split_count = split_count or 0
    split_size = (split_size or 1024 ** 2) // 1024 ** 2

    table_pbs = []
    for t in table_or_parts:
        if isinstance(t, Table):
            if not columns:
                columns = t.table_schema.names
            table_kw = dict(
                projectName=t.project.name,
                tableName=t.name,
                columns=','.join(columns),
            )
        elif isinstance(t, TablePartition):
            if not columns:
                columns = t.table.table_schema.names
            table_kw = dict(
                projectName=t.table.project.name,
                tableName=t.table.name,
                columns=','.join(columns),
                partSpec=str(t.partition_spec).replace("'", '').strip(),
            )
        else:
            raise NotImplementedError
        table_pbs.append(task_service_pb.TableInputInfo(**table_kw))

    request = task_service_pb.SplitTablesRequest(
        lookupName=session.lookup_name,
        splitSize=split_size,
        splitCount=split_count,
        tableInputInfos=table_pbs,
        allowNoColumns=True,
        requireSplitMeta=with_split_meta,
    )

    controller = CupidRpcController()
    resp = stub.SplitTables(controller, request, None)
    if controller.Failed():
        raise CupidError(controller.ErrorText())
    logger.info(
        "[CupidTask] splitTables call, CurrentInstanceId: %s, "
        "request: %s, response: %s" % (
            session.lookup_name, str(request), str(resp),
        )
    )
    handle = resp.inputTableHandle

    channel = SandboxRpcChannel()
    stub = subprocess_pb.CupidSubProcessService_Stub(channel)

    if not with_split_meta:
        split_meta = itertools.repeat(None)
    else:
        req = subprocess_pb.GetSplitsMetaRequest(
            inputTableHandle=handle,
        )
        controller = CupidRpcController()
        resp = stub.GetSplitsMeta(controller, req, None)
        logger.info(
            "[CupidTask] getSplitsMeta call, CurrentInstanceId: %s, "
            "request: %s, response: %s" % (
                session.lookup_name, str(request), str(resp),
            )
        )
        if controller.Failed():
            split_meta = itertools.repeat(None)
            logger.warning('Failed to get results of getSplitsMeta, '
                        'may running on an old service')
        else:
            split_meta = resp.inputSplitsMeta

    req = subprocess_pb.GetSplitsRequest(inputTableHandle=handle)
    controller = CupidRpcController()
    resp = stub.GetSplits(controller, req, None)
    if controller.Failed():
        raise CupidError(controller.ErrorText())

    input_splits = []
    for info, meta in zip(resp.inputSplits, split_meta):
        input_splits.append(TableSplit(
            split_proto=info, meta_proto=meta, handle=handle, columns=columns))
    logger.info(
        "[SubProcess] getSplits call, CurrentInstanceId: %s, "
        "request: %s, response: %s" % (
            session.lookup_name,
            str(req), str(resp),
        )
    )
    return CupidTableDownloadSession(session=session, handle=handle, splits=input_splits)