def valueDecoder()

in courses/machine_learning/deepdive2/end_to_end_ml/labs/serving/application/lib/pyasn1/codec/ber/decoder.py [0:0]


    def valueDecoder(self, substrate, asn1Spec,
                     tagSet=None, length=None, state=None,
                     decodeFun=None, substrateFun=None,
                     **options):
        if tagSet[0].tagFormat != tag.tagFormatConstructed:
            raise error.PyAsn1Error('Constructed tag format expected')

        head, tail = substrate[:length], substrate[length:]

        if substrateFun is not None:
            if asn1Spec is not None:
                asn1Object = asn1Spec.clone()

            elif self.protoComponent is not None:
                asn1Object = self.protoComponent.clone(tagSet=tagSet)

            else:
                asn1Object = self.protoRecordComponent, self.protoSequenceComponent

            return substrateFun(asn1Object, substrate, length)

        if asn1Spec is None:
            asn1Object, trailing = self._decodeComponents(
                head, tagSet=tagSet, decodeFun=decodeFun, **options
            )

            if trailing:
                if LOG:
                    LOG('Unused trailing %d octets encountered: %s' % (
                        len(trailing), debug.hexdump(trailing)))

            return asn1Object, tail

        asn1Object = asn1Spec.clone()
        asn1Object.clear()

        if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):

            namedTypes = asn1Spec.componentType

            isSetType = asn1Spec.typeId == univ.Set.typeId
            isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault

            if LOG:
                LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
                    not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
                    asn1Spec))

            seenIndices = set()
            idx = 0
            while head:
                if not namedTypes:
                    componentType = None

                elif isSetType:
                    componentType = namedTypes.tagMapUnique

                else:
                    try:
                        if isDeterministic:
                            componentType = namedTypes[idx].asn1Object

                        elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
                            componentType = namedTypes.getTagMapNearPosition(idx)

                        else:
                            componentType = namedTypes[idx].asn1Object

                    except IndexError:
                        raise error.PyAsn1Error(
                            'Excessive components decoded at %r' % (asn1Spec,)
                        )

                component, head = decodeFun(head, componentType, **options)

                if not isDeterministic and namedTypes:
                    if isSetType:
                        idx = namedTypes.getPositionByType(component.effectiveTagSet)

                    elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
                        idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)

                asn1Object.setComponentByPosition(
                    idx, component,
                    verifyConstraints=False,
                    matchTags=False, matchConstraints=False
                )

                seenIndices.add(idx)
                idx += 1

            if LOG:
                LOG('seen component indices %s' % seenIndices)

            if namedTypes:
                if not namedTypes.requiredComponents.issubset(seenIndices):
                    raise error.PyAsn1Error(
                        'ASN.1 object %s has uninitialized '
                        'components' % asn1Object.__class__.__name__)

                if  namedTypes.hasOpenTypes:

                    openTypes = options.get('openTypes', {})

                    if LOG:
                        LOG('using open types map: %r' % openTypes)

                    if openTypes or options.get('decodeOpenTypes', False):

                        for idx, namedType in enumerate(namedTypes.namedTypes):
                            if not namedType.openType:
                                continue

                            if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
                                continue

                            governingValue = asn1Object.getComponentByName(
                                namedType.openType.name
                            )

                            try:
                                openType = openTypes[governingValue]

                            except KeyError:

                                try:
                                    openType = namedType.openType[governingValue]

                                except KeyError:
                                    if LOG:
                                        LOG('failed to resolve open type by governing '
                                            'value %r' % (governingValue,))
                                    continue

                            if LOG:
                                LOG('resolved open type %r by governing '
                                    'value %r' % (openType, governingValue))

                            containerValue = asn1Object.getComponentByPosition(idx)

                            if containerValue.typeId in (
                                    univ.SetOf.typeId, univ.SequenceOf.typeId):

                                for pos, containerElement in enumerate(
                                        containerValue):

                                    component, rest = decodeFun(
                                        containerValue[pos].asOctets(),
                                        asn1Spec=openType, **options
                                    )

                                    containerValue[pos] = component

                            else:
                                component, rest = decodeFun(
                                    asn1Object.getComponentByPosition(idx).asOctets(),
                                    asn1Spec=openType, **options
                                )

                                asn1Object.setComponentByPosition(idx, component)

            else:
                inconsistency = asn1Object.isInconsistent
                if inconsistency:
                    raise inconsistency

        else:
            asn1Object = asn1Spec.clone()
            asn1Object.clear()

            componentType = asn1Spec.componentType

            if LOG:
                LOG('decoding type %r chosen by given `asn1Spec`' % componentType)

            idx = 0

            while head:
                component, head = decodeFun(head, componentType, **options)
                asn1Object.setComponentByPosition(
                    idx, component,
                    verifyConstraints=False,
                    matchTags=False, matchConstraints=False
                )

                idx += 1

        return asn1Object, tail