in mwaairflow/assets/plugins/operators/salesforce_to_s3_operator.py [0:0]
def execute(self, context):
"""
Execute the operator.
This will get all the data for a particular Salesforce model
and write it to a file.
"""
logging.info("Prepping to gather data from Salesforce")
# Open a name temporary file to store output file until S3 upload
with NamedTemporaryFile("w") as tmp:
# Load the SalesforceHook
hook = SalesforceHook(conn_id=self.sf_conn_id)
# Attempt to login to Salesforce
# If this process fails, it will raise an error and die.
try:
hook.sign_in()
except:
logging.debug("Unable to login.")
# Get object from Salesforce
# If fields were not defined, all fields are pulled.
if not self.fields:
self.fields = hook.get_available_fields(self.object)
logging.info(
"Making request for "
"{0} fields from {1}".format(len(self.fields), self.object)
)
if self.query:
query = self.special_query(
self.query, hook, relationship_object=self.relationship_object
)
else:
if self.to_date and self.from_date:
logging.info(
f"Gathering items from date: {self.from_date} to date: {self.to_date}"
)
date_select = f"{self.object} WHERE SystemModStamp >= {self.from_date} AND SystemModStamp <= {self.to_date}"
query = hook.get_object_from_salesforce(date_select, self.fields)
elif self.from_date:
logging.info(f"Gathering items from date: {self.from_date}")
date_select = (
f"{self.object} WHERE SystemModStamp >= {self.from_date}"
)
query = hook.get_object_from_salesforce(date_select, self.fields)
elif self.to_date:
logging.info(f"Gathering items to date: {self.to_date}")
date_select = (
f"{self.object} WHERE SystemModStamp <= {self.to_date}"
)
query = hook.get_object_from_salesforce(date_select, self.fields)
else:
query = hook.get_object_from_salesforce(self.object, self.fields)
# output the records from the query to a file
# the list of records is stored under the "records" key
logging.info("Writing query results to: {0}".format(tmp.name))
if not query["records"]:
logging.info(f"No records found in the query: {query}")
else:
hook.write_object_to_file(
query["records"],
filename=tmp.name,
fmt=self.fmt,
coerce_to_timestamp=self.coerce_to_timestamp,
record_time_added=self.record_time_added,
)
# Flush the temp file and upload temp file to S3
tmp.flush()
dest_s3 = S3Hook(self.s3_conn_id)
dest_s3.load_file(
filename=tmp.name,
key=self.s3_key,
bucket_name=self.s3_bucket,
replace=True,
)
tmp.close()
logging.info("Query finished!")