in src/smspark/spark_event_logs_publisher.py [0:0]
def run(self) -> None:
"""Publish spark events to the given S3 URL.
If spark_event_logs_s3_uri is specified, spark events will be published to
s3 via spark's s3a client.
"""
if self.spark_event_logs_s3_uri is not None:
log.info("spark_event_logs_s3_uri is specified, publishing to s3 directly")
self._config_event_log_with_s3_uri()
return
if not self.local_spark_event_logs_dir:
log.info("Spark event log not enabled.")
return
log.info("Start to copy the spark event logs file.")
self._config_event_log()
dst_dir = self.local_spark_event_logs_dir
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
while not self._stop_publishing:
self._copy_spark_event_logs(EVENT_LOG_DIR, dst_dir)
time.sleep(self._copy_interval)
# After stop, should still try to perform the last copy, otherwise the last part may be missed
self._copy_spark_event_logs(EVENT_LOG_DIR, dst_dir)