def extend_with_iploc_information()

in identity-resolution/notebooks/identity-graph/nepytune/cli/extend.py [0:0]


def extend_with_iploc_information(ip_loc_file_path):
    """Coroutine which generates ip location facts based on transient id."""
    with open(ip_loc_file_path) as f_h:
        loc_data = {data["transient_id"]: data["loc"] for data in json_lines_file(f_h)}

    data = yield

    def get_sane_ip_locaction(uid, facts, max_ts_difference=3600):
        """
        Given transient id and its facts add information about ip/location.

        Process is semi-deterministic.
            1. Choose the location at random from the given list of locations
            2. Repeat returning this location as long as the timestamp difference
               lies within the `max_ts_difference`
            3. Otherwise, start from 1)
        """
        facts = [None] + sorted(facts, key=lambda x: x["ts"])
        ptr1, ptr2 = itertools.tee(facts, 2)
        next(ptr2, None)

        loc_fact = random.choice(loc_data[uid])

        for previous_item, current in zip(ptr1, ptr2):
            if (
                previous_item is None
                or current["ts"] - previous_item["ts"] > max_ts_difference
            ):
                loc_fact = random.choice(loc_data[uid])
            yield {**current, **loc_fact}

    while data is not None:
        transformed = data.copy()
        transformed["facts"] = list(
            get_sane_ip_locaction(uid=data["uid"], facts=data["facts"])
        )
        data = yield transformed