in broadcast-monitoring/src/sharedlib/common/utils.py [0:0]
def convert_csv_to_ddb(csv_file_path, table_name, delimiter=',', ddb_client=None):
if ddb_client is not None:
table = ddb_client.Table(table_name)
else:
table = dynamodb.Table(table_name)
# each entry in header row should be in the format of "<ColumnName> (Type)"
# e.g. "Stream_ID (S)" or "Duration_Sec (N)"
pattern = re.compile(r'([a-zA-Z0-9_\s]+) \(([A-Z]+)\)')
def _parse_column_schema(col_header, pattern):
m = re.search(pattern, col_header)
return m.group(1), m.group(2)
def _get_csv_data(file, csv_delimiter):
dr = csv.DictReader(file, delimiter=csv_delimiter)
for d in dr:
row_data = {}
for field_name in dr.fieldnames:
ddb_column_name, ddb_column_type = _parse_column_schema(field_name, pattern)
if d[field_name]:
if ddb_column_type == 'N':
row_data[ddb_column_name] = float(d[field_name])
elif ddb_column_type == 'BOOL':
row_data[ddb_column_name] = bool(d[field_name])
else:
row_data[ddb_column_name] = d[field_name]
yield row_data
with open(csv_file_path) as csv_file:
for data in _get_csv_data(csv_file, delimiter):
item = convert_dict_float_to_dec(data)
logger.info(f'going to put {item} to DDB table: {table.name}')
table.put_item(Item=item)