in src/modules/timestream_telemetry/lib/TimestreamTelemetryUtils.py [0:0]
def import_csv(self, filepath, rebase_time_ms=None):
with open(filepath, 'r') as csv_file:
# creating a csv reader object
csv_reader = csv.reader(csv_file)
records = []
counter = 0
first_record_time = 0
earliest_record_time = sys.maxsize
latest_record_time = 0
# extracting each data row one by one
# row[0] row[1] row[2] row[3] row[4] row[5]
# Time, TelemetryAssetType, TelemetryAssetId, PropertyId, Value, Type
# 1633415395173, Alarm, Mixer_7_..., Status, Normal, VARCHAR
# 1633415395173, Mixer, Mixer_7_..., RPM, 100, DOUBLE
for row in csv_reader:
dimensions = [
{'Name': 'TelemetryAssetType', 'Value': row[1]},
{'Name': 'TelemetryAssetId', 'Value': row[2]},
]
if (first_record_time == 0):
first_record_time = int(row[0])
if (rebase_time_ms is not None):
record_time = rebase_time_ms + int(row[0]) - first_record_time
else:
record_time = int(row[0])
earliest_record_time = min(record_time, earliest_record_time)
latest_record_time = max(record_time, latest_record_time)
record = {
'Dimensions': dimensions,
'MeasureName': row[3],
'MeasureValue': row[4],
'MeasureValueType': row[5],
'Time': str(record_time)
}
records.append(record)
counter = counter + 1
if len(records) == 100:
self._submit_batch(records, counter)
records = []
if len(records) != 0:
self._submit_batch(records, counter)
print(f" Ingested {counter} records from "
f"{datetime.datetime.fromtimestamp(earliest_record_time/1000.0, datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')} - "
f"{datetime.datetime.fromtimestamp(latest_record_time/1000.0, datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')}")