def output()

in output/pubsub.py [0:0]


    def output(self):
        if 'topic' not in self.output_config:
            raise NotConfiguredException(
                'No Pub/Sub topic defined in configuration.')
        topic_template = self.jinja_environment.from_string(
            self.output_config['topic'])
        topic_template.name = 'topic'
        topic_output = topic_template.render()

        if 'content' not in self.output_config:
            raise NotConfiguredException(
                'No Pub/Sub message content defined in configuration.')

        messages = {'single': 'message'}
        if 'messages' in self.output_config:
            messages_template = self.jinja_environment.from_string(
                self.output_config['messages'])
            messages_template.name = 'messages'
            messages_output = messages_template.render()
            messages = json.loads(messages_output)

        publisher_options = pubsub_v1.types.PublisherOptions(
            enable_message_ordering=True if 'ordering_key' in
            self.output_config else False)
        client_options = {}
        if 'api_endpoint' in self.output_config:
            client_options = {
                "api_endpoint": self.output_config['api_endpoint']
            }
        publisher = pubsub_v1.PublisherClient(
            publisher_options=publisher_options, client_options=client_options)

        publish_futures = []
        if isinstance(messages, list):
            new_messages = {}
            for message in messages:
                new_messages[hashlib.md5(
                    json.dumps(message).encode()).hexdigest()] = message
            messages = new_messages
        for message_key, message_value in messages.items():
            attributes = {}
            if 'attributes' in self.output_config:
                attributes_template = self.jinja_environment.from_string(
                    self.output_config['attributes'])
                attributes_template.name = 'attributes'
                attributes_output = attributes_template.render(
                    key=message_key, value=message_value)
                attributes = json.loads(attributes_output)

            ordering_key = None
            if 'ordering_key' in self.output_config:
                ordering_key_template = self.jinja_environment.from_string(
                    self.output_config['ordering_key'])
                ordering_key_template.name = 'ordering_key'
                ordering_key = ordering_key_template.render(key=message_key,
                                                            value=message_value)

            content_template = self.jinja_environment.from_string(
                self.output_config['content'])
            content_template.name = 'content'
            content = content_template.render(key=message_key,
                                              value=message_value)

            if ordering_key:
                future = publisher.publish(topic_output,
                                           data=content.encode('utf-8'),
                                           ordering_key=ordering_key,
                                           **attributes)
            else:
                future = publisher.publish(topic_output,
                                           data=content.encode('utf-8'),
                                           **attributes)
            self.logger.info('Message published.',
                             extra={
                                 'key': message_key,
                                 'topic': topic_output,
                                 'attributes': attributes,
                                 'ordering_key': ordering_key
                             })
            future.add_done_callback(self.callback)
            publish_futures.append(future)

        futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
        self.logger.info('Message sending finished!',
                         extra={
                             'count': len(publish_futures),
                             'topic': topic_output,
                         })