def _qnn_conv()

in python/tvm/relay/frontend/mxnet.py [0:0]


def _qnn_conv(inputs, attrs, subgraphs, params):
    def _has_fused_activation(_attrs, _supported_activations):
        has_fused_activation = False
        if attrs.get_bool("with_act", False) or attrs.get_bool("with_postsum_act", False):
            subgraph_activation_attrs = _get_subgraph_op(subgraphs, "Activation")["attrs"]
            act_type = subgraph_activation_attrs["act_type"]
            if act_type not in _supported_activations:
                raise ValueError(
                    "Fused activation {} is not supported at " "this time".format(act_type)
                )
            has_fused_activation = True
        return has_fused_activation

    def _get_data_scale_and_zp(_data, _inputs, _data_min_idx, _data_max_idx):
        """Finds the Qnn params for the data expr."""
        data_min = _inputs[_data_min_idx]
        data_max = _inputs[_data_max_idx]
        assert data_min <= data_max
        data_dtype = _infer_type(_data).checked_type.dtype
        assert data_dtype in {"int8", "uint8"}
        if data_min < 0.0:
            assert (
                data_dtype == "int8"
            ), "Expect int8 when data_min < 0.0, consider quantize model with int8."
        _data_scale = (
            get_mkldnn_uint8_scale(data_min, data_max)
            if data_dtype == "uint8"
            else get_mkldnn_int8_scale(data_min, data_max)
        )
        _data_zero_point = 0
        return _data_scale, _data_zero_point

    def _get_bn_alpha_coeff(_bn_gamma_idx, _bn_beta_idx, _bn_running_mean_idx, _bn_running_var_idx):
        """Extract the BN coeff. These will be use later for BN folding into convolution."""
        # Extract relevant attrs from bn.
        bn_attrs = _get_subgraph_op(subgraphs, "BatchNorm")["attrs"]
        bn_epsilon_param = float(bn_attrs["eps"])
        bn_scale_param = bn_attrs["fix_gamma"] == "False"
        bn_center_param = True

        # Extract the relevant relay expressions.
        bn_running_var = inputs[_bn_running_var_idx]
        bn_gamma = inputs[_bn_gamma_idx]
        bn_beta = inputs[_bn_beta_idx]
        bn_running_mean = inputs[_bn_running_mean_idx]

        # Get coefficient to multiply to weights.
        bn_epsilon = relay.const(bn_epsilon_param, "float32")
        denominator = relay.sqrt(relay.add(bn_running_var, bn_epsilon))
        _bn_scale = relay.divide(relay.const(1.0, "float32"), denominator)
        if bn_scale_param:
            _bn_scale = relay.multiply(bn_gamma, _bn_scale)

        # Get the shift.
        _bn_shift = relay.negative(relay.multiply(bn_running_mean, _bn_scale))
        if bn_center_param:
            _bn_shift = relay.add(bn_beta, _bn_shift)

        return _bn_scale, _bn_shift

    def _fold_bn(_bn_scale, _bn_shift, _has_bias, _has_bn):
        """Fold BN into kernel and bias. Get new kernel and bias."""
        _kernel = inputs[1]
        if _bn_scale:
            assert attrs.get_bool("with_bn", False)
            # Weights are on OIHW, and _bn_scale is in O.
            exp_bn_scale = relay.expand_dims(_bn_scale, axis=1, num_newaxis=3)
            _kernel = relay.multiply(exp_bn_scale, _kernel)

        _bias = None
        if _has_bias:
            _bias = inputs[2]
            if _has_bn:
                assert _bn_shift is not None
                assert _bn_scale is not None
                _bias = relay.add(relay.multiply(_bn_scale, _bias), _bn_shift)
        elif _has_bn:
            assert _bn_shift is not None
            assert _bn_scale is not None
            _bias = _bn_shift
        return _kernel, _bias

    def _get_quantized_kernel(_kernel, _bias, _data_scale):
        # For quantizing, we need min/max of kernel. So, we have to pre compute this expr.
        np_kernel = _infer_value(_kernel, params).numpy()
        kernel_channel_min = np.amin(np_kernel, axis=(1, 2, 3))
        kernel_channel_max = np.amax(np_kernel, axis=(1, 2, 3))

        np_bias = None
        if _bias is not None:
            np_bias = _infer_value(_bias, params).numpy()
        return quantize_conv_weights_bias_channel_mkldnn_from_var(
            _kernel, np_bias, kernel_channel_min, kernel_channel_max, _data_scale
        )

    def _get_qnn_conv2d(
        _data,
        _kernel,
        _data_zero_point,
        _kernel_zero_point,
        _data_scale,
        _kernel_vector_scale,
        _conv2d_attrs,
    ):
        return relay.qnn.op.conv2d(
            _data,
            _kernel,
            input_zero_point=relay.const(_data_zero_point, "int32"),
            kernel_zero_point=relay.const(_kernel_zero_point, "int32"),
            input_scale=relay.const(_data_scale, "float32"),
            kernel_scale=relay.const(_kernel_vector_scale),
            channels=_conv2d_attrs["channels"],
            groups=_conv2d_attrs["groups"],
            kernel_size=_conv2d_attrs["kernel_size"],
            strides=_conv2d_attrs["strides"],
            dilation=_conv2d_attrs["dilation"],
            padding=_conv2d_attrs["padding"],
            data_layout=_conv2d_attrs["data_layout"],
            kernel_layout=_conv2d_attrs["kernel_layout"],
        )

    def _get_requantized_op(_res, _input_scale, _output_scale, _out_dtype):
        # Requantize to get the output back
        return relay.qnn.op.requantize(
            _res,
            input_scale=relay.const(_input_scale),
            input_zero_point=relay.const(0, "int32"),
            output_scale=relay.const(_output_scale, "float32"),
            output_zero_point=relay.const(0, "int32"),
            axis=1,
            out_dtype=_out_dtype,
        )

    def _get_sum(_res, _output_scale, out_dtype):
        """Handles sum of the second quantized tensor."""
        # This is done in following steps
        #   1) rhs is the add's second operand. First rhs will be requantized to output scale with
        #   dtype int32. The int32 dtype is to keep precision high before adding.
        #   2) Call normal add
        #   3) Depending on final out_dtype, clip and cast (basically requantize).

        _output_scale = relay.const(_output_scale, "float32")
        data_sum = inputs[-5]
        data_sum_min = inputs[-2]
        data_sum_max = inputs[-1]

        data_sum_dtype = _infer_type(data_sum).checked_type.dtype
        data_sum_scale = (
            get_mkldnn_uint8_scale(data_sum_min, data_sum_max)
            if data_sum_dtype == "uint8"
            else get_mkldnn_int8_scale(data_sum_min, data_sum_max)
        )
        data_sum_scale = relay.const(data_sum_scale, "float32")
        zero_point = relay.const(0, "int32")

        # Save one requantize if the previous expr already has a requantize node. This also improves
        # little bit with accuracy.
        if isinstance(data_sum, _expr.Call) and data_sum.op.name == "qnn.requantize":
            prev_input, prev_scale, prev_zero_point = data_sum.args[0:3]
            prev_axis = data_sum.attrs.axis
            data_sum = relay.qnn.op.requantize(
                prev_input,
                input_scale=prev_scale,
                input_zero_point=prev_zero_point,
                output_scale=_output_scale,
                output_zero_point=zero_point,
                axis=prev_axis,
                out_dtype="int32",
            )
        else:
            data_sum = relay.qnn.op.requantize(
                data_sum,
                input_scale=data_sum_scale,
                input_zero_point=zero_point,
                output_scale=_output_scale,
                output_zero_point=zero_point,
                out_dtype="int32",
            )

        # 2) Add two int32 tensors.
        _res = relay.add(_res, data_sum)

        # 3) Clip/cast to change the out dtype.
        _res = relay.clip(
            _res,
            a_min=float(tvm.tir.op.min_value(out_dtype).value),
            a_max=float(tvm.tir.op.max_value(out_dtype).value),
        )
        _res = relay.cast(_res, out_dtype)
        return _res

    def _parse():
        assert len(subgraphs) == 1
        subgraph_conv_attrs = StrAttrsDict(_get_subgraph_op(subgraphs, "Convolution")["attrs"])

        is_quantized = attrs.get_bool("quantized", False)
        if is_quantized:
            # The MKLDNN has a quantized convolution subgraph. There are many different arguments
            # that are taken into account to parse the subgraph.
            #   * no_bias
            #   * with_sum
            #   * with_bn
            #   * with_postsum_relu
            #   * with_act
            #
            # Note - Relu/clip handling is not required because output min/max take care of that.
            #
            # The parsing can be broken down into following steps
            #   1) Get the input data scale and zero points.
            #   2) Extract BN params.
            #   3) Fold the BN params into kernel and bias.
            #   4) Quantize the kernel.
            #   4) Call QNN conv2d op.
            #   5) Quantize bias and call bias_add.
            #   6) Handle sum of quantized tensors if needed. Or just Requantize.

            has_bias = not subgraph_conv_attrs.get_bool("no_bias", False)
            has_sum = attrs.get_bool("with_sum", False)
            has_bn = attrs.get_bool("with_bn", False)

            ###############################################
            #   1) Get the input data scale and zero point.
            ###############################################
            # Last 2 indexes are data min and max. If the conv has a sum, then last 2 indexes are
            # for the second tensor. So, the data min max indexes are last 3 and 4
            data_min_idx = -2
            data_max_idx = -1
            if has_sum:
                data_min_idx = -4
                data_max_idx = -3

            data = inputs[0]
            data_scale, data_zero_point = _get_data_scale_and_zp(
                data, inputs, data_min_idx, data_max_idx
            )

            #############################
            #   2) Extract the BN params.
            #############################
            # Find the indexes to look at for BN.
            bn_scale = bn_shift = None
            if has_bn:
                if has_bias:
                    bn_start_idx = 3
                else:
                    bn_start_idx = 2

                bn_gamma_idx = bn_start_idx
                bn_beta_idx = bn_start_idx + 1
                bn_running_mean_idx = bn_start_idx + 2
                bn_running_var_idx = bn_start_idx + 3

                bn_scale, bn_shift = _get_bn_alpha_coeff(
                    bn_gamma_idx, bn_beta_idx, bn_running_mean_idx, bn_running_var_idx
                )

            ########################################
            #   3) Fold the BN into kernel and bias.
            ########################################
            kernel, bias = _fold_bn(bn_scale, bn_shift, has_bias, has_bn)

            #######################################################################
            #   4) Fold BN params into kernel. Get quantized kernel and QNN params.
            #######################################################################
            kernel, kernel_vector_scale, kernel_zero_point = _get_quantized_kernel(
                kernel, bias, data_scale
            )

            ##########################
            #   5) Call QNN conv2d op.
            ##########################
            conv2d_attrs = _get_mx_conv2d_attrs(subgraph_conv_attrs)
            res = _get_qnn_conv2d(
                data,
                kernel,
                data_zero_point,
                kernel_zero_point,
                data_scale,
                kernel_vector_scale,
                conv2d_attrs,
            )

            ###############################################
            #   6) Fold BN params into bias. Call bias_add.
            ###############################################
            if has_bias or has_bn:
                bias_scale = data_scale * kernel_vector_scale
                int32_bias = quantize_conv_bias_mkldnn_from_var(bias, bias_scale)
                res = _op.nn.bias_add(res, int32_bias, axis=1)

            #####################################################################
            #   7) Handle sum of quantized tensors if needed. Or just Requantize.
            #####################################################################
            min_output_range = attrs.get_float("min_calib_range")
            max_output_range = attrs.get_float("max_calib_range")
            output_scale, out_dtype = get_conv_mkldnn_requantized_scale_outDtype(
                min_output_range, max_output_range
            )

            # QNN conv2d output scale is product of data_scale and kernel_vector_scale
            input_scale = data_scale * kernel_vector_scale
            if attrs.get_bool("with_sum", False):
                # There is a second tensor that has to be added to the QNN conv2d output. Therefore,
                # the QNN conv2d is first requantized to output scale with int32 precision. The
                # second tensor will also be requantized to output scale with int32 precision,
                # followed by an add operator.
                res = _get_requantized_op(res, input_scale, output_scale, "int32")
                res = _get_sum(res, output_scale, out_dtype)
            else:
                # Get the requantized conv output
                res = _get_requantized_op(res, input_scale, output_scale, out_dtype)

            return res, min_output_range, max_output_range
        else:
            res = _mx_conv(inputs, subgraph_conv_attrs)
            has_fused_relu = _has_fused_activation(attrs, ["relu"])
            if has_fused_relu:
                res = _op.nn.relu(res)
            return res

    return _parse()