in courses/machine_learning/deepdive2/structured/solutions/serving/application/lib/pyasn1/codec/ber/decoder.py [0:0]
def indefLenValueDecoder(self, substrate, asn1Spec,
tagSet=None, length=None, state=None,
decodeFun=None, substrateFun=None,
**options):
if tagSet[0].tagFormat != tag.tagFormatConstructed:
raise error.PyAsn1Error('Constructed tag format expected')
if substrateFun is not None:
if asn1Spec is not None:
asn1Object = asn1Spec.clone()
elif self.protoComponent is not None:
asn1Object = self.protoComponent.clone(tagSet=tagSet)
else:
asn1Object = self.protoRecordComponent, self.protoSequenceComponent
return substrateFun(asn1Object, substrate, length)
if asn1Spec is None:
return self._decodeComponents(
substrate, tagSet=tagSet, decodeFun=decodeFun,
**dict(options, allowEoo=True)
)
asn1Object = asn1Spec.clone()
asn1Object.clear()
if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
namedTypes = asn1Object.componentType
isSetType = asn1Object.typeId == univ.Set.typeId
isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
if LOG:
LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
asn1Spec))
seenIndices = set()
idx = 0
while substrate:
if len(namedTypes) <= idx:
asn1Spec = None
elif isSetType:
asn1Spec = namedTypes.tagMapUnique
else:
try:
if isDeterministic:
asn1Spec = namedTypes[idx].asn1Object
elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
asn1Spec = namedTypes.getTagMapNearPosition(idx)
else:
asn1Spec = namedTypes[idx].asn1Object
except IndexError:
raise error.PyAsn1Error(
'Excessive components decoded at %r' % (asn1Object,)
)
component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True, **options)
if component is eoo.endOfOctets:
break
if not isDeterministic and namedTypes:
if isSetType:
idx = namedTypes.getPositionByType(component.effectiveTagSet)
elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
asn1Object.setComponentByPosition(
idx, component,
verifyConstraints=False,
matchTags=False, matchConstraints=False
)
seenIndices.add(idx)
idx += 1
else:
raise error.SubstrateUnderrunError(
'No EOO seen before substrate ends'
)
if LOG:
LOG('seen component indices %s' % seenIndices)
if namedTypes:
if not namedTypes.requiredComponents.issubset(seenIndices):
raise error.PyAsn1Error('ASN.1 object %s has uninitialized components' % asn1Object.__class__.__name__)
if namedTypes.hasOpenTypes:
openTypes = options.get('openTypes', {})
if LOG:
LOG('using open types map: %r' % openTypes)
if openTypes or options.get('decodeOpenTypes', False):
for idx, namedType in enumerate(namedTypes.namedTypes):
if not namedType.openType:
continue
if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
continue
governingValue = asn1Object.getComponentByName(
namedType.openType.name
)
try:
openType = openTypes[governingValue]
except KeyError:
try:
openType = namedType.openType[governingValue]
except KeyError:
if LOG:
LOG('failed to resolve open type by governing '
'value %r' % (governingValue,))
continue
if LOG:
LOG('resolved open type %r by governing '
'value %r' % (openType, governingValue))
containerValue = asn1Object.getComponentByPosition(idx)
if containerValue.typeId in (
univ.SetOf.typeId, univ.SequenceOf.typeId):
for pos, containerElement in enumerate(
containerValue):
component, rest = decodeFun(
containerValue[pos].asOctets(),
asn1Spec=openType, **dict(options, allowEoo=True)
)
containerValue[pos] = component
else:
component, rest = decodeFun(
asn1Object.getComponentByPosition(idx).asOctets(),
asn1Spec=openType, **dict(options, allowEoo=True)
)
if component is not eoo.endOfOctets:
asn1Object.setComponentByPosition(idx, component)
else:
inconsistency = asn1Object.isInconsistent
if inconsistency:
raise inconsistency
else:
asn1Object = asn1Spec.clone()
asn1Object.clear()
componentType = asn1Spec.componentType
if LOG:
LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
idx = 0
while substrate:
component, substrate = decodeFun(substrate, componentType, allowEoo=True, **options)
if component is eoo.endOfOctets:
break
asn1Object.setComponentByPosition(
idx, component,
verifyConstraints=False,
matchTags=False, matchConstraints=False
)
idx += 1
else:
raise error.SubstrateUnderrunError(
'No EOO seen before substrate ends'
)
return asn1Object, substrate