in output/pubsub.py [0:0]
def output(self):
if 'topic' not in self.output_config:
raise NotConfiguredException(
'No Pub/Sub topic defined in configuration.')
topic_template = self.jinja_environment.from_string(
self.output_config['topic'])
topic_template.name = 'topic'
topic_output = topic_template.render()
if 'content' not in self.output_config:
raise NotConfiguredException(
'No Pub/Sub message content defined in configuration.')
messages = {'single': 'message'}
if 'messages' in self.output_config:
messages_template = self.jinja_environment.from_string(
self.output_config['messages'])
messages_template.name = 'messages'
messages_output = messages_template.render()
messages = json.loads(messages_output)
publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=True if 'ordering_key' in
self.output_config else False)
client_options = {}
if 'api_endpoint' in self.output_config:
client_options = {
"api_endpoint": self.output_config['api_endpoint']
}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options)
publish_futures = []
if isinstance(messages, list):
new_messages = {}
for message in messages:
new_messages[hashlib.md5(
json.dumps(message).encode()).hexdigest()] = message
messages = new_messages
for message_key, message_value in messages.items():
attributes = {}
if 'attributes' in self.output_config:
attributes_template = self.jinja_environment.from_string(
self.output_config['attributes'])
attributes_template.name = 'attributes'
attributes_output = attributes_template.render(
key=message_key, value=message_value)
attributes = json.loads(attributes_output)
ordering_key = None
if 'ordering_key' in self.output_config:
ordering_key_template = self.jinja_environment.from_string(
self.output_config['ordering_key'])
ordering_key_template.name = 'ordering_key'
ordering_key = ordering_key_template.render(key=message_key,
value=message_value)
content_template = self.jinja_environment.from_string(
self.output_config['content'])
content_template.name = 'content'
content = content_template.render(key=message_key,
value=message_value)
if ordering_key:
future = publisher.publish(topic_output,
data=content.encode('utf-8'),
ordering_key=ordering_key,
**attributes)
else:
future = publisher.publish(topic_output,
data=content.encode('utf-8'),
**attributes)
self.logger.info('Message published.',
extra={
'key': message_key,
'topic': topic_output,
'attributes': attributes,
'ordering_key': ordering_key
})
future.add_done_callback(self.callback)
publish_futures.append(future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
self.logger.info('Message sending finished!',
extra={
'count': len(publish_futures),
'topic': topic_output,
})