wikipedia/_tools/parse_clicks.py (67 lines of code) (raw):
import argparse
import csv
import gzip
import logging
import os
import pickle
import sys
from collections import Counter
import requests
# Set up the logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Define a handler to output log messages to the console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Define a formatter for the log messages
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
console_handler.setFormatter(formatter)
# Add the console handler to the logger
logger.addHandler(console_handler)
class ClickStreamDist:
def __init__(self, year, month, lang):
self.filename = f"clickstream-{lang}wiki-{year}-{month:02d}.tsv.gz"
self.url = f"https://dumps.wikimedia.org/other/clickstream/{year:d}-{month:02d}/{self.filename}"
self.clickstream_output_file = os.path.expanduser(f"~/.rally/benchmarks/data/wikipedia/{self.filename}")
def download(self):
if os.path.exists(self.clickstream_output_file):
logger.info("File already exists. Skipping download.")
return
logger.info("Downloading the clickstream file...")
response = requests.get(self.url)
if response.status_code == 200:
# Create the file if it is missing
os.makedirs(os.path.dirname(self.clickstream_output_file), exist_ok=True)
# Write the content to the file
with open(self.clickstream_output_file, "wb") as file:
file.write(response.content)
logger.info("File downloaded successfully.")
else:
logger.info("Failed to download the file.")
def analyze(self):
logger.info("Analyzing...")
word_freq = self.calculate_word_frequency()
word_prob = self.calculate_word_probability(word_freq)
self.dump_probability_distribution(word_prob)
logger.info("Analysis completed.")
def calculate_word_frequency(self):
logger.info("Calculating word frequency...")
word_freq = Counter()
with gzip.open(self.clickstream_output_file, "rt", encoding="utf-8") as file:
# Documentation for clickstream format: https://meta.wikimedia.org/wiki/Research:Wikipedia_clickstream
for row in csv.reader(file, delimiter="\t"):
prev, curr, count = row[0], row[1].replace("_", " ").strip().replace('"', ""), int(row[3])
if prev != "other-search" and curr != "Main Page":
word_freq[curr] += count
sorted_word_freq = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return sorted_word_freq[:10000]
def calculate_word_probability(self, word_freq):
logger.info("Calculating word probability...")
total_words = sum(count for _, count in word_freq)
return [(word, count / total_words) for word, count in word_freq]
def dump_probability_distribution(self, prob_dist):
logger.info("Dumping probability distribution...")
writer = csv.writer(sys.stdout)
writer.writerow(["query", "probability"])
writer.writerows(prob_dist)
parser = argparse.ArgumentParser()
parser.add_argument("--year", type=int, default=2023, help="Year")
parser.add_argument("--month", type=int, default=6, help="Month")
parser.add_argument("--lang", type=str, default="en", help="Language")
args = parser.parse_args()
click_stream = ClickStreamDist(year=args.year, month=args.month, lang=args.lang)
click_stream.download()
click_stream.analyze()