in scripts/extract-bus-data.py [0:0]
def extract_bus_data(config):
bucket = config["s3_bucket"]
input_prefix = config["s3_input_prefix"]
input_suffix = config["s3_input_suffix"]
tmp_dir = config["tmp_dir"]
vehicle_id = config["vehicle_id"]
output_prefix = config["s3_output_prefix"]
dir_name = f"bus_{time.time()}"
dir_path = os.path.join(tmp_dir, dir_name)
os.makedirs(dir_path, mode=0o777, exist_ok=True)
s3_client = boto3.client(service_name='s3')
for key in s3_bucket_keys(s3_client, bucket=bucket, prefix=input_prefix, suffix=input_suffix):
bus_signals = s3_client.get_object(Bucket=bucket, Key=key)
print(f"Reading: {key}")
pd_df = pd.read_json(io.BytesIO(bus_signals['Body'].read()))
scene_id = key.split('/')[-3:-2][0].replace('_', '')
print(f"Processing bus, vehicle_id: {vehicle_id}, scene_id: {scene_id}")
data = dict()
nrows = 0
index_name = dict()
for col_index,col in enumerate(pd_df.columns):
index_name[col_index] = col
values = pd_df[col]['values']
for value in values:
ts = value[0]
row = ts_data(data, ts)
if row == None:
row = [np.nan]*len(pd_df.columns)
data[ts] = row
nrows+=1
row[col_index] = value[1]
ts_keys = list(data.keys())
ts_keys.sort()
nrows = len(ts_keys)
print(f"Bus data rows: {nrows}")
row_vectors = []
for ts_key in ts_keys:
row = data[ts_key]
row_vectors.append(np.array(row, dtype=np.float32).reshape(1, len(row)))
bus_data = np.concatenate(row_vectors, axis=0)
print(f"Imputing missing data")
impute_missing(bus_data, pd_df.columns)
# opening the csv file in 'w+' mode
file_path = os.path.join(dir_path, f"bus-{scene_id}.csv")
print(f"Writing bus data to {file_path}")
csv_file = open(file_path, 'w+', newline ='')
# writing the data into the file
with csv_file:
csv_writer = csv.writer(csv_file)
header = ['vehicle_id', 'scene_id', 'data_ts'] + list(pd_df.columns)
csv_writer.writerow(header)
for index in range(0, nrows, 1):
col_values = bus_data[index,:].tolist()
for coli, col in enumerate(pd_df.columns):
if is_categorical(col):
col_values[coli] = int(col_values[coli])
else:
col_values[coli] = round(col_values[coli], 6)
data_row = [vehicle_id, scene_id, ts_keys[index]] + col_values
csv_writer.writerow(data_row)
csv_file.close()
key = f"{output_prefix}/{file_path.rsplit('/', 1)[1]}"
print(f"Uploading {file_path} to {key}")
s3_client.upload_file(file_path, bucket, key)
print(f"Uploaded {file_path} to {key}")
print(f"Removing {file_path}")
os.remove(file_path)
print(f"Removed {file_path}")