def _write_record()

in transcoder/output/google_cloud/PubSubOutputManager.py [0:0]


    def _write_record(self, record_type_name, record):
        topic_id = record_type_name
        topic_path = self.publisher.topic_path(self.project_id, topic_id)

        if self.is_binary_encoded is True:
            schema = self.avro_schemas[record_type_name]
            bout = io.BytesIO()

            if self.use_fast_avro is True:
                avro_schema = parse_schema(schema)
                if self.create_schema_enforcing_topics is True:
                    # Use schemaless writer
                    fastavro.schemaless_writer(bout, avro_schema, record)
                else:
                    # Use binary writer
                    fastavro.writer(bout, avro_schema, [record], validator=True)
            else:
                jsoned_avsc_schema = json.dumps(schema)
                avro_schema = avro.schema.parse(jsoned_avsc_schema)
                writer = DatumWriter(avro_schema)
                encoder = BinaryEncoder(bout)
                writer.write(record, encoder)

            data = bout.getvalue()
            publish_future = self.publisher.publish(topic_path, data)
            publish_future.add_done_callback(self.get_callback(publish_future, data))
            self.publish_futures.append(publish_future)
        else:
            data: str
            if self.use_fast_avro is True:
                schema = self.avro_schemas[record_type_name]
                sout = io.StringIO()
                avro_schema = parse_schema(schema)
                fastavro.json_writer(sout, avro_schema, [record], write_union_type=self.create_schema_enforcing_topics)
                data = sout.getvalue()
            else:
                if self.create_schema_enforcing_topics is True:
                    raise OutputNotAvailableError('--create_schema_enforcing_topics can not be used with avro.io '
                                                  'library')
                data = json.dumps(record)

            publish_future: Future = self.publisher.publish(topic_path, data.encode("utf-8"))
            publish_future.add_done_callback(self.get_callback(publish_future, data))
            self.publish_futures.append(publish_future)