in blogs/finspace_redshift-2021-09/finspace.py [0:0]
def ingest_from_s3(self,
s3_location: str,
dataset_id: str,
change_type: str,
wait_for_completion: bool = True,
format_type: str = "CSV",
format_params: dict = {'separator': ',', 'withHeader': 'true'}):
"""
Creates a changeset and ingests the data given in the S3 location into the changeset
:param s3_location: the source location of the data for the changeset, will be copied into the changeset
:stype: str
:param dataset_id: the identifier of the containing dataset for the changeset to be created for this data
:type: str
:param change_type: What is the kind of changetype? "APPEND", "REPLACE" are the choices
:type: str
:param wait_for_completion: Boolean, should the function wait for the operation to complete?
:type: str
:param format_type: format type, CSV, PARQUET, XML, JSON
:type: str
:param format_params: dictionary of format parameters
:type: dict
:return: the id of the changeset created
"""
create_changeset_response = self.client.create_changeset(
datasetId=dataset_id,
changeType=change_type,
sourceType='S3',
sourceParams={'s3SourcePath': s3_location},
formatType=format_type.upper(),
formatParams=format_params
)
changeset_id = create_changeset_response['changeset']['id']
if wait_for_completion:
self.wait_for_ingestion(dataset_id, changeset_id)
return changeset_id