tensorflow/inference/docker/build_artifacts/sagemaker/tfs_utils.py [121:255]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        "--port={} "
        "--rest_api_port={} "
        "--model_config_file={} "
        "--max_num_load_retries=0 {} {} {} {}".format(
            tfs_grpc_port,
            tfs_rest_port,
            tfs_config_path,
            get_tfs_batching_args(tfs_enable_batching, tfs_batching_config_file),
            get_tensorflow_intra_op_parallelism_args(tfs_intra_op_parallelism),
            get_tensorflow_inter_op_parallelism_args(tfs_inter_op_parallelism),
            get_tfs_gpu_mem_args(tfs_enable_gpu_memory_fraction, tfs_gpu_memory_fraction),
        )
    )
    return cmd


def find_models():
    base_path = "/opt/ml/model"
    models = []
    for f in _find_saved_model_files(base_path):
        parts = f.split("/")
        if len(parts) >= 6 and re.match(r"^\d+$", parts[-2]):
            model_path = "/".join(parts[0:-2])
            if model_path not in models:
                models.append(model_path)
    return models


def find_model_versions(model_path):
    """Remove leading zeros from the version number, returns list of versions"""
    return [
        version[:-1].lstrip("0") + version[-1]
        for version in os.listdir(model_path)
        if version.isnumeric()
    ]


def _find_saved_model_files(path):
    for e in os.scandir(path):
        if e.is_dir():
            yield from _find_saved_model_files(os.path.join(path, e.name))
        else:
            if e.name == "saved_model.pb":
                yield os.path.join(path, e.name)


def get_tfs_batching_args(enable_batching, tfs_batching_config):
    if enable_batching:
        return "--enable_batching=true " "--batching_parameters_file={}".format(tfs_batching_config)
    else:
        return ""


def get_tensorflow_intra_op_parallelism_args(tfs_intra_op_parallelism):
    if tfs_intra_op_parallelism:
        return "--tensorflow_intra_op_parallelism={}".format(tfs_intra_op_parallelism)
    else:
        return ""


def get_tensorflow_inter_op_parallelism_args(tfs_inter_op_parallelism):
    if tfs_inter_op_parallelism:
        return "--tensorflow_inter_op_parallelism={}".format(tfs_inter_op_parallelism)
    else:
        return ""


def get_tfs_gpu_mem_args(enable_gpu_memory_fraction, gpu_memory_fraction):
    if enable_gpu_memory_fraction and gpu_memory_fraction:
        return "--per_process_gpu_memory_fraction={}".format(gpu_memory_fraction)
    else:
        return ""


def create_batching_config(batching_config_file):
    class _BatchingParameter:
        def __init__(self, key, env_var, value, defaulted_message):
            self.key = key
            self.env_var = env_var
            self.value = value
            self.defaulted_message = defaulted_message

    cpu_count = multiprocessing.cpu_count()
    batching_parameters = [
        _BatchingParameter(
            "max_batch_size",
            "SAGEMAKER_TFS_MAX_BATCH_SIZE",
            8,
            "max_batch_size defaulted to {}. Set {} to override default. "
            "Tuning this parameter may yield better performance.",
        ),
        _BatchingParameter(
            "batch_timeout_micros",
            "SAGEMAKER_TFS_BATCH_TIMEOUT_MICROS",
            1000,
            "batch_timeout_micros defaulted to {}. Set {} to override "
            "default. Tuning this parameter may yield better performance.",
        ),
        _BatchingParameter(
            "num_batch_threads",
            "SAGEMAKER_TFS_NUM_BATCH_THREADS",
            cpu_count,
            "num_batch_threads defaulted to {}," "the number of CPUs. Set {} to override default.",
        ),
        _BatchingParameter(
            "max_enqueued_batches",
            "SAGEMAKER_TFS_MAX_ENQUEUED_BATCHES",
            # Batch limits number of concurrent requests, which limits number
            # of enqueued batches, so this can be set high for Batch
            100000000 if "SAGEMAKER_BATCH" in os.environ else cpu_count,
            "max_enqueued_batches defaulted to {}. Set {} to override default. "
            "Tuning this parameter may be necessary to tune out-of-memory "
            "errors occur.",
        ),
    ]

    warning_message = ""
    for batching_parameter in batching_parameters:
        if batching_parameter.env_var in os.environ:
            batching_parameter.value = os.environ[batching_parameter.env_var]
        else:
            warning_message += batching_parameter.defaulted_message.format(
                batching_parameter.value, batching_parameter.env_var
            )
            warning_message += "\n"
    if warning_message:
        log.warning(warning_message)

    config = ""
    for batching_parameter in batching_parameters:
        config += "%s { value: %s }\n" % (batching_parameter.key, batching_parameter.value)

    log.info("batching config: \n%s\n", config)
    with open(batching_config_file, "w", encoding="utf8") as f:
        f.write(config)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tensorflow/inference/docker/build_artifacts/sagemaker_neuron/tfs_utils.py [120:254]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        "--port={} "
        "--rest_api_port={} "
        "--model_config_file={} "
        "--max_num_load_retries=0 {} {} {} {}".format(
            tfs_grpc_port,
            tfs_rest_port,
            tfs_config_path,
            get_tfs_batching_args(tfs_enable_batching, tfs_batching_config_file),
            get_tensorflow_intra_op_parallelism_args(tfs_intra_op_parallelism),
            get_tensorflow_inter_op_parallelism_args(tfs_inter_op_parallelism),
            get_tfs_gpu_mem_args(tfs_enable_gpu_memory_fraction, tfs_gpu_memory_fraction),
        )
    )
    return cmd


def find_models():
    base_path = "/opt/ml/model"
    models = []
    for f in _find_saved_model_files(base_path):
        parts = f.split("/")
        if len(parts) >= 6 and re.match(r"^\d+$", parts[-2]):
            model_path = "/".join(parts[0:-2])
            if model_path not in models:
                models.append(model_path)
    return models


def find_model_versions(model_path):
    """Remove leading zeros from the version number, returns list of versions"""
    return [
        version[:-1].lstrip("0") + version[-1]
        for version in os.listdir(model_path)
        if version.isnumeric()
    ]


def _find_saved_model_files(path):
    for e in os.scandir(path):
        if e.is_dir():
            yield from _find_saved_model_files(os.path.join(path, e.name))
        else:
            if e.name == "saved_model.pb":
                yield os.path.join(path, e.name)


def get_tfs_batching_args(enable_batching, tfs_batching_config):
    if enable_batching:
        return "--enable_batching=true " "--batching_parameters_file={}".format(tfs_batching_config)
    else:
        return ""


def get_tensorflow_intra_op_parallelism_args(tfs_intra_op_parallelism):
    if tfs_intra_op_parallelism:
        return "--tensorflow_intra_op_parallelism={}".format(tfs_intra_op_parallelism)
    else:
        return ""


def get_tensorflow_inter_op_parallelism_args(tfs_inter_op_parallelism):
    if tfs_inter_op_parallelism:
        return "--tensorflow_inter_op_parallelism={}".format(tfs_inter_op_parallelism)
    else:
        return ""


def get_tfs_gpu_mem_args(enable_gpu_memory_fraction, gpu_memory_fraction):
    if enable_gpu_memory_fraction and gpu_memory_fraction:
        return "--per_process_gpu_memory_fraction={}".format(gpu_memory_fraction)
    else:
        return ""


def create_batching_config(batching_config_file):
    class _BatchingParameter:
        def __init__(self, key, env_var, value, defaulted_message):
            self.key = key
            self.env_var = env_var
            self.value = value
            self.defaulted_message = defaulted_message

    cpu_count = multiprocessing.cpu_count()
    batching_parameters = [
        _BatchingParameter(
            "max_batch_size",
            "SAGEMAKER_TFS_MAX_BATCH_SIZE",
            8,
            "max_batch_size defaulted to {}. Set {} to override default. "
            "Tuning this parameter may yield better performance.",
        ),
        _BatchingParameter(
            "batch_timeout_micros",
            "SAGEMAKER_TFS_BATCH_TIMEOUT_MICROS",
            1000,
            "batch_timeout_micros defaulted to {}. Set {} to override "
            "default. Tuning this parameter may yield better performance.",
        ),
        _BatchingParameter(
            "num_batch_threads",
            "SAGEMAKER_TFS_NUM_BATCH_THREADS",
            cpu_count,
            "num_batch_threads defaulted to {}," "the number of CPUs. Set {} to override default.",
        ),
        _BatchingParameter(
            "max_enqueued_batches",
            "SAGEMAKER_TFS_MAX_ENQUEUED_BATCHES",
            # Batch limits number of concurrent requests, which limits number
            # of enqueued batches, so this can be set high for Batch
            100000000 if "SAGEMAKER_BATCH" in os.environ else cpu_count,
            "max_enqueued_batches defaulted to {}. Set {} to override default. "
            "Tuning this parameter may be necessary to tune out-of-memory "
            "errors occur.",
        ),
    ]

    warning_message = ""
    for batching_parameter in batching_parameters:
        if batching_parameter.env_var in os.environ:
            batching_parameter.value = os.environ[batching_parameter.env_var]
        else:
            warning_message += batching_parameter.defaulted_message.format(
                batching_parameter.value, batching_parameter.env_var
            )
            warning_message += "\n"
    if warning_message:
        log.warning(warning_message)

    config = ""
    for batching_parameter in batching_parameters:
        config += "%s { value: %s }\n" % (batching_parameter.key, batching_parameter.value)

    log.info("batching config: \n%s\n", config)
    with open(batching_config_file, "w", encoding="utf8") as f:
        f.write(config)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



