in utilities/feature_store_helper.py [0:0]
def _update_flow(self, s3_file_to_ingest, bucket, flow_location):
flow_json = {'metadata': {'version': 1},
'nodes': [
{'node_id': '7f6515d7-7ea4-48ba-98ce-5b32c73306e6',
'type': 'SOURCE',
'operator': 'sagemaker.s3_source_0.1',
'parameters': {'dataset_definition': {'__typename': 'S3CreateDatasetDefinitionOutput',
'datasetSourceType': 'S3',
'name': s3_file_to_ingest.split('/')[-1],
'description': None,
's3ExecutionContext': {'__typename': 'S3ExecutionContext',
's3Uri': s3_file_to_ingest,
's3ContentType': 'csv',
's3HasHeader': True}}},
'inputs': [],
'outputs': [{'name': 'default'}]
},
{'node_id': 'e6a71ea2-dd1e-477f-964a-03238f974a35',
'type': 'TRANSFORM',
'operator': 'sagemaker.spark.infer_and_cast_type_0.1',
'parameters': {},
'trained_parameters': {},
'inputs': [{'name': 'default',
'node_id': '7f6515d7-7ea4-48ba-98ce-5b32c73306e6',
'output_name': 'default'}],
'outputs': [{'name': 'default'}]
}]
}
with open('tmp.flow', 'w') as f:
json.dump(flow_json, f)
s3_client = boto3.client('s3')
s3_client.upload_file('tmp.flow', bucket, flow_location)
os.remove('tmp.flow')
return flow_json