in AWSIoTPythonSDK/core/jobs/thingJobManager.py [0:0]
def getJobTopic(self, srcJobExecTopicType, srcJobExecTopicReplyType=jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId=None):
if self._thingName is None:
return None
#Verify topics that only support request type, actually have request type specified for reply
if (srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_TOPIC or srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) and srcJobExecTopicReplyType != jobExecutionTopicReplyType.JOB_REQUEST_TYPE:
return None
#Verify topics that explicitly do not want a job ID do not have one specified
if (jobId is not None and _isWithoutJobIdTopicType(srcJobExecTopicType)):
return None
#Verify job ID is present if the topic requires one
if jobId is None and srcJobExecTopicType[_JOB_ID_REQUIRED_INDEX]:
return None
#Ensure the job operation is a non-empty string
if srcJobExecTopicType[_JOB_OPERATION_INDEX] == '':
return None
if srcJobExecTopicType[_JOB_ID_REQUIRED_INDEX]:
return '{0}{1}/jobs/{2}/{3}{4}'.format(_BASE_THINGS_TOPIC, self._thingName, str(jobId), srcJobExecTopicType[_JOB_OPERATION_INDEX], srcJobExecTopicReplyType[_JOB_SUFFIX_INDEX])
elif srcJobExecTopicType == jobExecutionTopicType.JOB_WILDCARD_TOPIC:
return '{0}{1}/jobs/#'.format(_BASE_THINGS_TOPIC, self._thingName)
else:
return '{0}{1}/jobs/{2}{3}'.format(_BASE_THINGS_TOPIC, self._thingName, srcJobExecTopicType[_JOB_OPERATION_INDEX], srcJobExecTopicReplyType[_JOB_SUFFIX_INDEX])