in shims/qpid-proton-python/src/jms_hdrs_props_test/Sender.py [0:0]
def _get_jms_message_header_kwargs(self):
hdr_kwargs = {}
hdr_annotations = {}
for jms_header in list(self.test_headers_map.keys()):
value_map = self.test_headers_map[jms_header]
value_type = list(value_map.keys())[0] # There is only ever one value in map
value = value_map[value_type]
if jms_header == 'JMS_TYPE_HEADER':
if value_type == 'string':
hdr_kwargs['subject'] = value
else:
raise InteropTestError('JmsSenderShim._get_jms_message_header_kwargs(): ' +
'JMS_TYPE_HEADER requires value type "string", type "%s" found' %
value_type)
elif jms_header == 'JMS_CORRELATIONID_HEADER':
if value_type == 'string':
hdr_kwargs['correlation_id'] = value
elif value_type == 'bytes':
hdr_kwargs['correlation_id'] = base64.b64decode(value)
else:
raise InteropTestError('JmsSenderShim._get_jms_message_header_kwargs(): ' +
'JMS_CORRELATIONID_HEADER requires value type "string" or "bytes", ' +
'type "%s" found' % value_type)
hdr_annotations[proton.symbol(u'x-opt-app-correlation-id')] = True
elif jms_header == 'JMS_REPLYTO_HEADER':
if value_type == 'queue':
hdr_kwargs['reply_to'] = value
hdr_annotations[proton.symbol(u'x-opt-jms-reply-to')] = proton.byte(0)
elif value_type == 'topic':
hdr_kwargs['reply_to'] = value
hdr_annotations[proton.symbol(u'x-opt-jms-reply-to')] = proton.byte(1)
elif value_type in ('temp_queue', 'temp_topic'):
raise InteropTestError('JmsSenderShim._get_jms_message_header_kwargs(): ' +
'JMS_REPLYTO_HEADER type "temp_queue" or "temp_topic" not handled')
else:
raise InteropTestError('JmsSenderShim._get_jms_message_header_kwargs(): ' +
'JMS_REPLYTO_HEADER requires value type "queue" or "topic", ' +
'type "%s" found' % value_type)
else:
raise InteropTestError('JmsSenderShim._add_jms_message_headers(): Invalid JMS message header "%s"' %
jms_header)
return (hdr_kwargs, hdr_annotations)