tools/unsupervised_dataset/sql_crawler/crawler_log.py (77 lines of code) (raw):
import datetime
import csv
import os
import logging
import pathlib
from sql_crawler import cloud_integration
class CrawlerLog(object):
""" Logs the status of the SQL crawler, including websites and queries.
The CrawlerLog keeps track of which websites were explored, how many
queries were found, and creates a CSV with all the queries. It also
logs any errors encountered. The log is saved into Logs subdirectory
with name based on start time. Queries are saved into Queries
subdirectory.
"""
def __init__(self, stream):
""" Initializes crawler log to keep track of crawler progress and
instantiates instance variables.
"""
self.start_time = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")
folder_path = str(pathlib.Path(__file__).parent)
log_folder_path = folder_path + "/logs"
query_folder_path = folder_path + "/queries"
# Create directory for logs if it does not already exists
if not os.path.exists(log_folder_path):
os.mkdir(log_folder_path)
logName = "{0}/log-{1}.log".format(log_folder_path, self.start_time)
logging.basicConfig(filename=logName, filemode="a", level=logging.INFO)
logging.info("Beginning crawl at time %s.", self.start_time)
if not os.path.exists(query_folder_path):
os.mkdir(query_folder_path)
self.stream = stream
self.query_name = "{0}/queries_{1}.csv".format(query_folder_path, self.start_time)
if not self.stream:
self.csv_file = open(self.query_name, "a")
self.queries = csv.writer(self.csv_file)
self.queries.writerow(["Query", "URL"])
self.save_to_gcs = False
self.save_to_bq = False
self.batch_data = []
self.error_log_count = 0
def log_queries(self, queries, url):
""" Caches queries to be logged into CSV file or BigQuery. Periodically
flushes cache and writes queries once reaching maximum size.
Args:
queries: Queries to be logged
url: URL for page containing queries
"""
self.batch_data += [[query, url] for query in queries]
while (len(self.batch_data) > 1000):
self.flush_data(self.batch_data[:1000])
self.batch_data = self.batch_data[1000:]
def flush_data(self, data):
""" Flushes data directly to CSV file or BigQuery.
Args:
data: Rows to be flushed to CSV file or BigQuery table
"""
if self.save_to_bq:
err = cloud_integration.insert_rows(self.bq_project, self.bq_dataset, self.bq_table, data)
if err:
self.log_error(err)
if not self.stream:
self.queries.writerows(data)
def log_page(self, url, count):
""" Logs results of crawling one page using provided arguments.
Args:
url: URL of page being crawled
count: Number of queries found on the page
"""
logging.info("Crawled %s. Found %s queries.", url, str(count))
def log_error(self, errorMessage):
""" Logs crawler error to logfile.
Args:
str: Error message to be logged.
"""
self.error_log_count += 1
logging.error("ERROR: %s", errorMessage)
def parse_location_arg(self, location):
""" Validates and splits location argument for cloud upload
into two parts. Should be formatted as project_id.dataset.
Args:
location: String with name of project ID and dataset.
Returns
List of separate strings after splitting location.
"""
if location.count(".") != 1:
self.log_error("Argument not formatted correctly: {0}".format(location))
return None, None
return location.split(".")
def set_gcs(self, location):
""" Sets variables for uploading data to Google Cloud Storage.
Args:
location: String with name of project ID and bucket name,
separated by a period.
"""
self.gcs_project, self.gcs_bucket = self.parse_location_arg(location)
if self.gcs_project and self.gcs_bucket:
self.save_to_gcs = True
def set_bq(self, location):
""" Sets variables for uploading data to Google BigQuery.
Args:
location: String with name of project ID and dataset name,
separated by a period.
"""
self.bq_project, self.bq_dataset = self.parse_location_arg(location)
self.bq_table = "queries_{0}".format(self.start_time)
if self.bq_project and self.bq_dataset:
self.save_to_bq = cloud_integration.create_bigquery_table(self.bq_project, self.bq_dataset, self.bq_table)
if not self.save_to_bq:
self.log_error("Unable to create bigquery table.")
def close(self):
""" Flushes remaining querise and closes the crawler log. Uploads file
to Google Cloud. Prints message if there are handled errors logged
during crawling process.
"""
logging.info("Finished crawling.")
# Flush remaining queries and close file
self.flush_data(self.batch_data)
if not self.stream:
self.csv_file.close()
# Save file to GCS, if applicable
file_name = "queries_{0}".format(self.start_time)
if self.save_to_gcs:
status, message = cloud_integration.upload_gcs_file(self.gcs_project,
self.gcs_bucket, file_name, self.query_name)
if status:
logging.info(message)
else:
self.log_error(message)
if self.error_log_count > 0:
print("Logged {0} errors. See log for details.".format(self.error_log_count))