in transcoder/output/google_cloud/PubSubOutputManager.py [0:0]
def __init__(self, project_id: str, output_encoding: str, output_prefix: str = None,
lazy_create_resources: bool = False, create_schema_enforcing_topics: bool = True):
super().__init__(lazy_create_resources=lazy_create_resources)
self.project_id = project_id
self.is_binary_encoded = output_encoding.lower() == "binary"
self.use_fast_avro = True
self.output_prefix = output_prefix
self.create_schema_enforcing_topics = create_schema_enforcing_topics
self.project_path = f"projects/{project_id}"
self.publisher = pubsub_v1.PublisherClient()
self.topics = list(self.publisher.list_topics(request={"project": self.project_path}))
for topic in self.topics:
topic_id = os.path.basename(topic.name)
self.existing_schemas.update({topic_id: True})
self.schema_client = SchemaServiceClient()
self.schemas = list(self.schema_client.list_schemas(request={"parent": self.project_path}))
self.avro_schemas = {}
if self.lazy_create_resources is True:
for schema in self.schemas:
schema_id = os.path.basename(schema.name)
self.avro_schemas[schema_id] = json.loads(self._get_schema_avro(schema.name).definition)
self.publish_futures = []
self.publish_futures_data = {}