jobs/influxdb-to-bigquery/influxdb_to_bigquery/main.py (100 lines of code) (raw):

from google.cloud import bigquery from influxdb import InfluxDBClient import pandas as pd import click from datetime import datetime import logging def collect_influxdb_data( influxdb_host, influxdb_port, influxdb_username, influxdb_password, influxdb_measurement, date, bq_project_id, bq_dataset_id, bq_table_id, ): # Create InfluxDB client and extract data client = InfluxDBClient( host=influxdb_host, port=influxdb_port, username=influxdb_username, password=influxdb_password, ssl=True, verify_ssl=True, ) query_template = "SELECT * FROM {influxdb_measurement} WHERE time >= '{date}T00:00:00Z' AND time < '{date}T23:59:59Z' AND \"environment\"='prod' " # noqa: E501,E261 query = query_template.format(date=date, influxdb_measurement=influxdb_measurement) results = client.query(query) # Convert the resultset to pd dataframe df = pd.DataFrame(list(results.get_points())) if len(df) > 0: # rename the columns to be compatible with BQ df.columns = df.columns.str.replace(".", "_") df["time"] = pd.to_datetime(df["time"]) df["submission_date"] = df["time"].apply(lambda x: x.date()) load_bigquery_table(df, bq_project_id, bq_dataset_id, bq_table_id, date) else: logging.info(f"{influxdb_measurement} is empty".format(influxdb_measurement)) def load_bigquery_table(df, bq_project_id, bq_dataset_id, bq_table_id, date): # Create BigQuery client and insert data bq_client = bigquery.Client(project=bq_project_id) bq_dataset_ref = bq_client.dataset(bq_dataset_id) bq_table_ref = bq_dataset_ref.table(bq_table_id) # Configure BigQuery job and write dataframe to table job_config = bigquery.LoadJobConfig( schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION, write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE, time_partitioning=bigquery.table.TimePartitioning(field="submission_date"), ) partition = f"{bq_table_ref}${str(date).replace('-', '')}" job = bq_client.load_table_from_dataframe(df, partition, job_config=job_config) job.result() @click.command() @click.option("--bq_project_id", help="GCP BigQuery project id", required=True) @click.option("--bq_dataset_id", help="GCP BigQuery dataset id", required=True) @click.option( "--bq_table_id", help="GCP BigQuery table id", required=True, ) @click.option("--influxdb_measurement", help="Influx measurement to fetch", required=True) @click.option( "--influxdb_username", help="Influxdb username", required=True, ) @click.option( "--influxdb_password", help="Influxdb password", required=True, ) @click.option( "--influxdb_host", help="Influxdb host URL", required=True, ) @click.option("--influxdb_port", default=8086, help="Influxdb port") @click.option( "--date", type=lambda x: datetime.strptime(x, "%Y-%m-%d").date(), required=True ) def main( bq_project_id, bq_dataset_id, bq_table_id, influxdb_measurement, influxdb_username, influxdb_password, influxdb_host, influxdb_port, date, ): collect_influxdb_data( influxdb_host, influxdb_port, influxdb_username, influxdb_password, influxdb_measurement, date, bq_project_id, bq_dataset_id, bq_table_id, ) if __name__ == "__main__": main()