def ingest_from_s3()

in webinars/snowflake_2021-09/finspace.py [0:0]


    def ingest_from_s3(self,
                       s3_location: str,
                       dataset_id: str,
                       change_type: str,
                       wait_for_completion: bool = True,
                       format_type: str = "CSV",
                       format_params: dict = {'separator': ',', 'withHeader': 'true'}):
        """
        Creates a changeset and ingests the data given in the S3 location into the changeset

        :param s3_location: the source location of the data for the changeset, will be copied into the changeset
        :stype: str

        :param dataset_id: the identifier of the containing dataset for the changeset to be created for this data
        :type: str

        :param change_type: What is the kind of changetype?  "APPEND", "REPLACE" are the choices
        :type: str

        :param wait_for_completion: Boolean, should the function wait for the operation to complete?
        :type: str

        :param format_type: format type, CSV, PARQUET, XML, JSON
        :type: str

        :param format_params: dictionary of format parameters
        :type: dict

        :return: the id of the changeset created
        """
        create_changeset_response = self.client.create_changeset(
            datasetId=dataset_id,
            changeType=change_type,
            sourceType='S3',
            sourceParams={'s3SourcePath': s3_location},
            formatType=format_type.upper(),
            formatParams=format_params
        )

        changeset_id = create_changeset_response['changeset']['id']

        if wait_for_completion:
            self.wait_for_ingestion(dataset_id, changeset_id)
        return changeset_id