transcoder/message/DatacastGroup.py (48 lines of code) (raw):
#
# Copyright 2022 Google LLC
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
from google.cloud import bigquery
from transcoder.message.DatacastField import DatacastField
class DatacastGroup(DatacastField):
"""Implementation class encapsulating grouped fields"""
def __init__(self, name):
self.name = name
self.fields: [] = []
def append_field(self, field):
"""Append a field to this instance's fields list"""
self.fields.append(field)
def cast_value_to_type(self, value, field_type, is_nullable: bool = True) -> Any:
return str(value)
def create_json_field(self, part: DatacastField = None):
field = self
if part is not None:
field = part
group = {'type': 'array', 'title': field.name, 'properties': {}}
for child_field in self.fields:
group['properties'][child_field.name] = child_field.create_json_field(child_field)
return group
def create_avro_field(self, part: DatacastField = None):
field = self
if part is not None:
field = part
children = []
for child_field in self.fields:
children.append(child_field.create_avro_field())
return {
'name': field.name,
'type': ['null', {
'type': 'array',
'items': {
'name': field.name,
'type': 'record',
'fields': children
}
}],
'default': None
}
def create_bigquery_field(self, part: DatacastField = None):
field = self
if part is not None:
field = part
children = []
for child_field in self.fields:
children.append(child_field.create_bigquery_field())
return bigquery.SchemaField(field.name, 'RECORD', mode="REPEATED", fields=children)
def __repr__(self):
return f'DatacastGroup(name: {self.name})'