in AWSIoTPythonSDK/core/shadow/deviceShadow.py [0:0]
def generalCallback(self, client, userdata, message):
# In Py3.x, message.payload comes in as a bytes(string)
# json.loads needs a string input
with self._dataStructureLock:
currentTopic = message.topic
currentAction = self._parseTopicAction(currentTopic) # get/delete/update/delta
currentType = self._parseTopicType(currentTopic) # accepted/rejected/delta
payloadUTF8String = message.payload.decode('utf-8')
# get/delete/update: Need to deal with token, timer and unsubscribe
if currentAction in ["get", "delete", "update"]:
# Check for token
self._basicJSONParserHandler.setString(payloadUTF8String)
if self._basicJSONParserHandler.validateJSON(): # Filter out invalid JSON
currentToken = self._basicJSONParserHandler.getAttributeValue(u"clientToken")
if currentToken is not None:
self._logger.debug("shadow message clientToken: " + currentToken)
if currentToken is not None and currentToken in self._tokenPool.keys(): # Filter out JSON without the desired token
# Sync local version when it is an accepted response
self._logger.debug("Token is in the pool. Type: " + currentType)
if currentType == "accepted":
incomingVersion = self._basicJSONParserHandler.getAttributeValue(u"version")
# If it is get/update accepted response, we need to sync the local version
if incomingVersion is not None and incomingVersion > self._lastVersionInSync and currentAction != "delete":
self._lastVersionInSync = incomingVersion
# If it is a delete accepted, we need to reset the version
else:
self._lastVersionInSync = -1 # The version will always be synced for the next incoming delta/GU-accepted response
# Cancel the timer and clear the token
self._tokenPool[currentToken].cancel()
del self._tokenPool[currentToken]
# Need to unsubscribe?
self._shadowSubscribeStatusTable[currentAction] -= 1
if not self._isPersistentSubscribe and self._shadowSubscribeStatusTable.get(currentAction) <= 0:
self._shadowSubscribeStatusTable[currentAction] = 0
processNonPersistentUnsubscribe = Thread(target=self._doNonPersistentUnsubscribe, args=[currentAction])
processNonPersistentUnsubscribe.start()
# Custom callback
if self._shadowSubscribeCallbackTable.get(currentAction) is not None:
processCustomCallback = Thread(target=self._shadowSubscribeCallbackTable[currentAction], args=[payloadUTF8String, currentType, currentToken])
processCustomCallback.start()
# delta: Watch for version
else:
currentType += "/" + self._parseTopicShadowName(currentTopic)
# Sync local version
self._basicJSONParserHandler.setString(payloadUTF8String)
if self._basicJSONParserHandler.validateJSON(): # Filter out JSON without version
incomingVersion = self._basicJSONParserHandler.getAttributeValue(u"version")
if incomingVersion is not None and incomingVersion > self._lastVersionInSync:
self._lastVersionInSync = incomingVersion
# Custom callback
if self._shadowSubscribeCallbackTable.get(currentAction) is not None:
processCustomCallback = Thread(target=self._shadowSubscribeCallbackTable[currentAction], args=[payloadUTF8String, currentType, None])
processCustomCallback.start()