def generate_event_lines()

in backend-apis/deployment_scripts/media_event_generation.py [0:0]


def generate_event_lines(today: datetime, catalog: list[dict]) -> list[str]:
    documents_len = len(catalog)
    categories_catalog = {}
    for product in catalog:
        if product["categories"] not in categories_catalog:
            categories_catalog[product["categories"]] = []
        categories_catalog[product["categories"]].append(product)
    users_len = (documents_len - 1) * 20
    json_lines = []
    for i in range(60):  # 60 days
        day = today - timedelta(days=i + 1)
        rate = 1440 / (10 * documents_len)
        for j in range(10 * documents_len):
            home_page_time = (
                (day + timedelta(minutes=int(j * rate)))
                .isoformat()
                .replace("+00:00", "Z")
            )
            view_item_time = (
                (day + timedelta(minutes=int(j * rate), seconds=30))
                .isoformat()
                .replace("+00:00", "Z")
            )

            user = random.randint(0, users_len)
            category = catalog[user // 20 - 1]["categories"]
            document = random.choice(categories_catalog[category])
            view_homepage_event = json.dumps(
                generate_view_homepage(
                    user_pseudo_id=f"user-{user}", event_time=home_page_time
                )
            )
            view_item_event = json.dumps(
                generate_view_item(
                    user_pseudo_id=f"user-{user}",
                    event_time=view_item_time,
                    document_id=str(document["id"]),
                )
            )
            json_lines.append(view_homepage_event)
            json_lines.append(view_item_event)
    return json_lines