def execute()

in mwaairflow/assets/plugins/operators/salesforce_to_s3_operator.py [0:0]


    def execute(self, context):
        """
        Execute the operator.
        This will get all the data for a particular Salesforce model
        and write it to a file.
        """
        logging.info("Prepping to gather data from Salesforce")

        # Open a name temporary file to store output file until S3 upload
        with NamedTemporaryFile("w") as tmp:

            # Load the SalesforceHook
            hook = SalesforceHook(conn_id=self.sf_conn_id)

            # Attempt to login to Salesforce
            # If this process fails, it will raise an error and die.
            try:
                hook.sign_in()
            except:
                logging.debug("Unable to login.")

            # Get object from Salesforce
            # If fields were not defined, all fields are pulled.
            if not self.fields:
                self.fields = hook.get_available_fields(self.object)

            logging.info(
                "Making request for "
                "{0} fields from {1}".format(len(self.fields), self.object)
            )

            if self.query:
                query = self.special_query(
                    self.query, hook, relationship_object=self.relationship_object
                )
            else:
                if self.to_date and self.from_date:
                    logging.info(
                        f"Gathering items from date: {self.from_date} to date: {self.to_date}"
                    )
                    date_select = f"{self.object} WHERE SystemModStamp >= {self.from_date} AND SystemModStamp <= {self.to_date}"
                    query = hook.get_object_from_salesforce(date_select, self.fields)
                elif self.from_date:
                    logging.info(f"Gathering items from date: {self.from_date}")
                    date_select = (
                        f"{self.object} WHERE SystemModStamp >= {self.from_date}"
                    )
                    query = hook.get_object_from_salesforce(date_select, self.fields)
                elif self.to_date:
                    logging.info(f"Gathering items to date: {self.to_date}")
                    date_select = (
                        f"{self.object} WHERE SystemModStamp <= {self.to_date}"
                    )
                    query = hook.get_object_from_salesforce(date_select, self.fields)
                else:
                    query = hook.get_object_from_salesforce(self.object, self.fields)

            # output the records from the query to a file
            # the list of records is stored under the "records" key
            logging.info("Writing query results to: {0}".format(tmp.name))

            if not query["records"]:
                logging.info(f"No records found in the query: {query}")
            else:
                hook.write_object_to_file(
                    query["records"],
                    filename=tmp.name,
                    fmt=self.fmt,
                    coerce_to_timestamp=self.coerce_to_timestamp,
                    record_time_added=self.record_time_added,
                )

                # Flush the temp file and upload temp file to S3
                tmp.flush()

                dest_s3 = S3Hook(self.s3_conn_id)

                dest_s3.load_file(
                    filename=tmp.name,
                    key=self.s3_key,
                    bucket_name=self.s3_bucket,
                    replace=True,
                )

                tmp.close()

        logging.info("Query finished!")