def convert_csv_to_ddb()

in broadcast-monitoring/src/sharedlib/common/utils.py [0:0]


def convert_csv_to_ddb(csv_file_path, table_name, delimiter=',', ddb_client=None):
    if ddb_client is not None:
        table = ddb_client.Table(table_name)
    else:
        table = dynamodb.Table(table_name)

    # each entry in header row should be in the format of "<ColumnName> (Type)"
    # e.g. "Stream_ID (S)" or "Duration_Sec (N)"
    pattern = re.compile(r'([a-zA-Z0-9_\s]+) \(([A-Z]+)\)')

    def _parse_column_schema(col_header, pattern):
        m = re.search(pattern, col_header)
        return m.group(1), m.group(2)

    def _get_csv_data(file, csv_delimiter):
        dr = csv.DictReader(file, delimiter=csv_delimiter)
        for d in dr:
            row_data = {}
            for field_name in dr.fieldnames:
                ddb_column_name, ddb_column_type = _parse_column_schema(field_name, pattern)
                if d[field_name]:
                    if ddb_column_type == 'N':
                        row_data[ddb_column_name] = float(d[field_name])
                    elif ddb_column_type == 'BOOL':
                        row_data[ddb_column_name] = bool(d[field_name])
                    else:
                        row_data[ddb_column_name] = d[field_name]
            yield row_data

    with open(csv_file_path) as csv_file:
        for data in _get_csv_data(csv_file, delimiter):
            item = convert_dict_float_to_dec(data)
            logger.info(f'going to put {item} to DDB table: {table.name}')
            table.put_item(Item=item)