in courses/machine_learning/deepdive2/end_to_end_ml/solutions/serving/application/lib/pyasn1/codec/ber/decoder.py [0:0]
def __call__(self, substrate, asn1Spec=None,
tagSet=None, length=None, state=stDecodeTag,
decodeFun=None, substrateFun=None,
**options):
if LOG:
LOG('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate)))
allowEoo = options.pop('allowEoo', False)
# Look for end-of-octets sentinel
if allowEoo and self.supportIndefLength:
if substrate[:2] == self.__eooSentinel:
if LOG:
LOG('end-of-octets sentinel found')
return eoo.endOfOctets, substrate[2:]
value = noValue
tagMap = self.__tagMap
typeMap = self.__typeMap
tagCache = self.__tagCache
tagSetCache = self.__tagSetCache
fullSubstrate = substrate
while state is not stStop:
if state is stDecodeTag:
if not substrate:
raise error.SubstrateUnderrunError(
'Short octet stream on tag decoding'
)
# Decode tag
isShortTag = True
firstOctet = substrate[0]
substrate = substrate[1:]
try:
lastTag = tagCache[firstOctet]
except KeyError:
integerTag = oct2int(firstOctet)
tagClass = integerTag & 0xC0
tagFormat = integerTag & 0x20
tagId = integerTag & 0x1F
if tagId == 0x1F:
isShortTag = False
lengthOctetIdx = 0
tagId = 0
try:
while True:
integerTag = oct2int(substrate[lengthOctetIdx])
lengthOctetIdx += 1
tagId <<= 7
tagId |= (integerTag & 0x7F)
if not integerTag & 0x80:
break
substrate = substrate[lengthOctetIdx:]
except IndexError:
raise error.SubstrateUnderrunError(
'Short octet stream on long tag decoding'
)
lastTag = tag.Tag(
tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
)
if isShortTag:
# cache short tags
tagCache[firstOctet] = lastTag
if tagSet is None:
if isShortTag:
try:
tagSet = tagSetCache[firstOctet]
except KeyError:
# base tag not recovered
tagSet = tag.TagSet((), lastTag)
tagSetCache[firstOctet] = tagSet
else:
tagSet = tag.TagSet((), lastTag)
else:
tagSet = lastTag + tagSet
state = stDecodeLength
if LOG:
LOG('tag decoded into %s, decoding length' % tagSet)
if state is stDecodeLength:
# Decode length
if not substrate:
raise error.SubstrateUnderrunError(
'Short octet stream on length decoding'
)
firstOctet = oct2int(substrate[0])
if firstOctet < 128:
size = 1
length = firstOctet
elif firstOctet > 128:
size = firstOctet & 0x7F
# encoded in size bytes
encodedLength = octs2ints(substrate[1:size + 1])
# missing check on maximum size, which shouldn't be a
# problem, we can handle more than is possible
if len(encodedLength) != size:
raise error.SubstrateUnderrunError(
'%s<%s at %s' % (size, len(encodedLength), tagSet)
)
length = 0
for lengthOctet in encodedLength:
length <<= 8
length |= lengthOctet
size += 1
else:
size = 1
length = -1
substrate = substrate[size:]
if length == -1:
if not self.supportIndefLength:
raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
else:
if len(substrate) < length:
raise error.SubstrateUnderrunError('%d-octet short' % (length - len(substrate)))
state = stGetValueDecoder
if LOG:
LOG('value length decoded into %d, payload substrate is: %s' % (length, debug.hexdump(length == -1 and substrate or substrate[:length])))
if state is stGetValueDecoder:
if asn1Spec is None:
state = stGetValueDecoderByTag
else:
state = stGetValueDecoderByAsn1Spec
#
# There're two ways of creating subtypes in ASN.1 what influences
# decoder operation. These methods are:
# 1) Either base types used in or no IMPLICIT tagging has been
# applied on subtyping.
# 2) Subtype syntax drops base type information (by means of
# IMPLICIT tagging.
# The first case allows for complete tag recovery from substrate
# while the second one requires original ASN.1 type spec for
# decoding.
#
# In either case a set of tags (tagSet) is coming from substrate
# in an incremental, tag-by-tag fashion (this is the case of
# EXPLICIT tag which is most basic). Outermost tag comes first
# from the wire.
#
if state is stGetValueDecoderByTag:
try:
concreteDecoder = tagMap[tagSet]
except KeyError:
concreteDecoder = None
if concreteDecoder:
state = stDecodeValue
else:
try:
concreteDecoder = tagMap[tagSet[:1]]
except KeyError:
concreteDecoder = None
if concreteDecoder:
state = stDecodeValue
else:
state = stTryAsExplicitTag
if LOG:
LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
if state is stGetValueDecoderByAsn1Spec:
if asn1Spec.__class__ is tagmap.TagMap:
try:
chosenSpec = asn1Spec[tagSet]
except KeyError:
chosenSpec = None
if LOG:
LOG('candidate ASN.1 spec is a map of:')
for firstOctet, v in asn1Spec.presentTypes.items():
LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
if asn1Spec.skipTypes:
LOG('but neither of: ')
for firstOctet, v in asn1Spec.skipTypes.items():
LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
chosenSpec = asn1Spec
if LOG:
LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
else:
chosenSpec = None
if chosenSpec is not None:
try:
# ambiguous type or just faster codec lookup
concreteDecoder = typeMap[chosenSpec.typeId]
if LOG:
LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
except KeyError:
# use base type for codec lookup to recover untagged types
baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
try:
# base type or tagged subtype
concreteDecoder = tagMap[baseTagSet]
if LOG:
LOG('value decoder chosen by base %s' % (baseTagSet,))
except KeyError:
concreteDecoder = None
if concreteDecoder:
asn1Spec = chosenSpec
state = stDecodeValue
else:
state = stTryAsExplicitTag
else:
concreteDecoder = None
state = stTryAsExplicitTag
if LOG:
LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
if state is stDecodeValue:
if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
substrateFun = lambda a, b, c: (a, b[:c])
options.update(fullSubstrate=fullSubstrate)
if length == -1: # indef length
value, substrate = concreteDecoder.indefLenValueDecoder(
substrate, asn1Spec,
tagSet, length, stGetValueDecoder,
self, substrateFun,
**options
)
else:
value, substrate = concreteDecoder.valueDecoder(
substrate, asn1Spec,
tagSet, length, stGetValueDecoder,
self, substrateFun,
**options
)
if LOG:
LOG('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, isinstance(value, base.Asn1Item) and value.prettyPrint() or value, substrate and debug.hexdump(substrate) or '<none>'))
state = stStop
break
if state is stTryAsExplicitTag:
if (tagSet and
tagSet[0].tagFormat == tag.tagFormatConstructed and
tagSet[0].tagClass != tag.tagClassUniversal):
# Assume explicit tagging
concreteDecoder = explicitTagDecoder
state = stDecodeValue
else:
concreteDecoder = None
state = self.defaultErrorState
if LOG:
LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
if state is stDumpRawValue:
concreteDecoder = self.defaultRawDecoder
if LOG:
LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
state = stDecodeValue
if state is stErrorCondition:
raise error.PyAsn1Error(
'%s not in asn1Spec: %r' % (tagSet, asn1Spec)
)
if LOG:
debug.scope.pop()
LOG('decoder left scope %s, call completed' % debug.scope)
return value, substrate