in sagemaker/source/preprocessing/preprocessing.py [0:0]
def pivot_data(config):
"""
Take fleet info and fleet sensor log files and combine them into a dataset. Each example in the data set is a
window of sensor readings with meta data columns and a target column.
This method writes data to an output location specified by the configuration.
Args:
config: Configuration object.
"""
fleet_info = pd.read_csv(config.fleet_info_fn)
fleet_sensor_logs = pd.read_csv(config.fleet_sensor_logs_fn,
chunksize=config.chunksize) # Support potentially large sensor logs.
dataset_writer = DataFrameWriter(filename=config.fleet_dataset_fn, chunksize=config.processing_chunksize)
for chunk_idx, sensor_df in enumerate(fleet_sensor_logs):
print("Processing Sensor Data Chunk {}".format(chunk_idx + 1))
# Convert timestamp column to have correct datatype.
sensor_df[config.timestamp_column] = pd.to_datetime(sensor_df[config.timestamp_column])
sensor_df.sort_values([config.vehicle_id_column, config.timestamp_column], inplace=True)
sensor_columns = sensor_df.columns.drop(labels=[config.vehicle_id_column,
config.target_column,
config.timestamp_column])
for row_idx, row in sensor_df.iterrows():
time_start = row[config.timestamp_column]
time_end = time_start + pd.Timedelta(config.period_ms, unit='ms') * config.window_length
vehicle_id = row[config.vehicle_id_column]
interval_filter = (sensor_df[config.vehicle_id_column] == vehicle_id) & \
(sensor_df[config.timestamp_column] < time_end) & \
(sensor_df[config.timestamp_column] >= time_start)
sample = sensor_df[interval_filter]
if len(sample) == config.window_length:
target = sample[config.target_column].iloc[0]
# Notes: This can be done more efficiently.
inst = OrderedDict()
# TODO: The order of the columns seems to be the same....
inst[config.vehicle_id_column] = vehicle_id
inst[config.period_column] = config.period_ms
inst[config.target_column] = target
inst[config.timestamp_column] = time_start
for k, v in fleet_info.iloc[vehicle_id].iteritems():
inst[k] = v
for col in sensor_columns.values:
for i in range(config.window_length):
inst[col + '_' + str(i)] = sample[col].iloc[i]
dataset_writer.append(inst)
dataset_writer.flush_buffer()
print('Wrote fleet dataset (unsampled) to file.')