marketing-analytics/predicting/future-customer-value-segments/fcvs_pipeline_bq.py [112:273]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                pvalue.AsSingleton(options))  # (customer_id, date_str, date,
                                              #  sales, extra_dimension?)
        )

        full_elog_merged = (
            full_elog
            | beam.Filter(lambda x: x[3] > 0)  # sales > 0
            | beam.Map(lambda x: ((x[0], x[1]), x))  # key: (customer_id, date)
            | 'Group full elog by customer and date' >> beam.GroupByKey()
            | beam.Map(c.merge_full_elog_by_customer_and_date)  # (customer_id,
                                                                #  date_str, date,
                                                                #  sales)
        )

        min_max_dates = (
            full_elog_merged
            | beam.Map(lambda x: x[2])  # date
            | beam.CombineGlobally(c.MinMaxDatesFn())
            | beam.Map(c.min_max_dates_dict)
        )

        limits_dates = (
            min_max_dates
            | beam.FlatMap(c.limit_dates_boundaries, pvalue.AsSingleton(options))
        )

        cohort = (
            full_elog_merged
            | beam.FlatMap(c.filter_customers_in_cohort,
                           pvalue.AsSingleton(limits_dates))
            | 'Distinct Customer IDs in Cohort' >> util.Distinct()
        )

        cohort_count = (
            cohort
            | 'Count cohort entries' >> beam.combiners.Count.Globally()
        )

        cohort_set = (
            cohort
            | beam.Map(lambda x: (x, 1))
        )

        all_customer_ids = (
            full_elog_merged
            | beam.Map(lambda x: x[0])  # key: customer_id
            | 'Distinct all Customer IDs' >> util.Distinct()
        )

        all_customer_ids_count = (
            all_customer_ids
            | 'Count all customers' >> beam.combiners.Count.Globally()
        )

        num_customers = (
            pipeline
            | 'Create single elem Stream I' >> beam.Create([1])
            | beam.FlatMap(c.count_customers,
                           pvalue.AsSingleton(cohort_count),
                           pvalue.AsSingleton(all_customer_ids_count),
                           pvalue.AsSingleton(options))
        )

        cal_hol_elog = (
            full_elog_merged
            | beam.FlatMap(c.filter_cohort_records_in_cal_hol,
                           pvalue.AsDict(cohort_set),
                           pvalue.AsSingleton(limits_dates))
        )

        cal_hol_elog_count = (
            cal_hol_elog
            | 'Count cal hol elog entries' >> beam.combiners.Count.Globally()
        )

        calibration = (
            cal_hol_elog
            | beam.FlatMap(c.filter_records_in_calibration,
                           pvalue.AsSingleton(limits_dates))
        )

        num_txns_total = (
            full_elog_merged
            | beam.FlatMap(c.filter_records_in_cal_hol,
                           pvalue.AsSingleton(limits_dates))
            | 'Count num txns total' >> beam.combiners.Count.Globally()
        )

        num_txns = (
            pipeline
            | 'Create single elem Stream II' >> beam.Create([1])
            | beam.FlatMap(c.count_txns,
                           pvalue.AsSingleton(cal_hol_elog_count),
                           pvalue.AsSingleton(num_txns_total),
                           pvalue.AsSingleton(options))
        )

        calcbs = (
            calibration
            | beam.Map(lambda x: (x[0], x))
            | 'Group calibration elog by customer id' >> beam.GroupByKey()
            | beam.FlatMap(
                c.create_cal_cbs,
                pvalue.AsSingleton(options),
                pvalue.AsSingleton(limits_dates)
            )  # (customer_id, number_of_transactions, average_order_value,
               #  frequency, recency, total_time_observed)
        )

        first_transaction_dates_by_customer = (
            cal_hol_elog
            | beam.Map(lambda x: (x[0], x))  # customer_id
            | 'Group cal hol elog by customer id' >> beam.GroupByKey()
            | beam.Map(lambda x: (x[0], min(map(operator.itemgetter(2), x[1])))
                       )  # item 2 -> date
        )

        cal_hol_elog_repeat = (
            cal_hol_elog
            | beam.FlatMap(c.filter_first_transaction_date_records,
                           pvalue.AsDict(first_transaction_dates_by_customer))
            | beam.FlatMap(
                c.calculate_time_unit_numbers,  # (customer_id, date,
                                                #  time_unit_number)
                pvalue.AsSingleton(options),
                pvalue.AsSingleton(limits_dates))
            | beam.Map(lambda x: (x[2], 1))  # key: time_unit_number
            | 'Group cal hol elog repeat by time unit number' >>
            beam.GroupByKey()
            | beam.Map(lambda x: (x[0], sum(x[1]))
                       )  # (time_unit_number, occurrences)
        )

        repeat_tx = (
            pipeline
            | 'Create single elem Stream III' >> beam.Create([1])
            | beam.FlatMap(c.calculate_cumulative_repeat_transactions,
                           pvalue.AsIter(cal_hol_elog_repeat)
                           )  # (time_unit_number, repeat_transactions,
                              #  repeat_transactions_cumulative)
        )

        model_validation = (
            pipeline
            | 'Create single elem Stream IV' >> beam.Create([1])
            | beam.FlatMap(c.calculate_model_fit_validation,
                           pvalue.AsSingleton(options),
                           pvalue.AsSingleton(limits_dates),
                           pvalue.AsIter(calcbs),
                           pvalue.AsIter(repeat_tx),
                           pvalue.AsSingleton(num_customers),
                           pvalue.AsSingleton(num_txns))
        )

        _ = (
            model_validation
            | beam.Map(c.raise_error_if_invalid_mape)
        )

        _ = (
            model_validation
            | beam.Map(lambda x: x[0])
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



marketing-analytics/predicting/future-customer-value-segments/fcvs_pipeline_csv.py [105:266]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                pvalue.AsSingleton(options))  # (customer_id, date_str, date,
                                              #  sales, extra_dimension?)
        )

        full_elog_merged = (
            full_elog
            | beam.Filter(lambda x: x[3] > 0)  # sales > 0
            | beam.Map(lambda x: ((x[0], x[1]), x))  # key: (customer_id, date)
            | 'Group full elog by customer and date' >> beam.GroupByKey()
            | beam.Map(c.merge_full_elog_by_customer_and_date)  # (customer_id,
                                                                #  date_str, date,
                                                                #  sales)
        )

        min_max_dates = (
            full_elog_merged
            | beam.Map(lambda x: x[2])  # date
            | beam.CombineGlobally(c.MinMaxDatesFn())
            | beam.Map(c.min_max_dates_dict)
        )

        limits_dates = (
            min_max_dates
            | beam.FlatMap(c.limit_dates_boundaries, pvalue.AsSingleton(options))
        )

        cohort = (
            full_elog_merged
            | beam.FlatMap(c.filter_customers_in_cohort,
                           pvalue.AsSingleton(limits_dates))
            | 'Distinct Customer IDs in Cohort' >> util.Distinct()
        )

        cohort_count = (
            cohort
            | 'Count cohort entries' >> beam.combiners.Count.Globally()
        )

        cohort_set = (
            cohort
            | beam.Map(lambda x: (x, 1))
        )

        all_customer_ids = (
            full_elog_merged
            | beam.Map(lambda x: x[0])  # key: customer_id
            | 'Distinct all Customer IDs' >> util.Distinct()
        )

        all_customer_ids_count = (
            all_customer_ids
            | 'Count all customers' >> beam.combiners.Count.Globally()
        )

        num_customers = (
            pipeline
            | 'Create single elem Stream I' >> beam.Create([1])
            | beam.FlatMap(c.count_customers,
                           pvalue.AsSingleton(cohort_count),
                           pvalue.AsSingleton(all_customer_ids_count),
                           pvalue.AsSingleton(options))
        )

        cal_hol_elog = (
            full_elog_merged
            | beam.FlatMap(c.filter_cohort_records_in_cal_hol,
                           pvalue.AsDict(cohort_set),
                           pvalue.AsSingleton(limits_dates))
        )

        cal_hol_elog_count = (
            cal_hol_elog
            | 'Count cal hol elog entries' >> beam.combiners.Count.Globally()
        )

        calibration = (
            cal_hol_elog
            | beam.FlatMap(c.filter_records_in_calibration,
                           pvalue.AsSingleton(limits_dates))
        )

        num_txns_total = (
            full_elog_merged
            | beam.FlatMap(c.filter_records_in_cal_hol,
                           pvalue.AsSingleton(limits_dates))
            | 'Count num txns total' >> beam.combiners.Count.Globally()
        )

        num_txns = (
            pipeline
            | 'Create single elem Stream II' >> beam.Create([1])
            | beam.FlatMap(c.count_txns,
                           pvalue.AsSingleton(cal_hol_elog_count),
                           pvalue.AsSingleton(num_txns_total),
                           pvalue.AsSingleton(options))
        )

        calcbs = (
            calibration
            | beam.Map(lambda x: (x[0], x))
            | 'Group calibration elog by customer id' >> beam.GroupByKey()
            | beam.FlatMap(
                c.create_cal_cbs,
                pvalue.AsSingleton(options),
                pvalue.AsSingleton(limits_dates)
            )  # (customer_id, number_of_transactions, average_order_value,
               #  frequency, recency, total_time_observed)
        )

        first_transaction_dates_by_customer = (
            cal_hol_elog
            | beam.Map(lambda x: (x[0], x))  # customer_id
            | 'Group cal hol elog by customer id' >> beam.GroupByKey()
            | beam.Map(lambda x: (x[0], min(map(operator.itemgetter(2), x[1])))
                       )  # item 2 -> date
        )

        cal_hol_elog_repeat = (
            cal_hol_elog
            | beam.FlatMap(c.filter_first_transaction_date_records,
                           pvalue.AsDict(first_transaction_dates_by_customer))
            | beam.FlatMap(
                c.calculate_time_unit_numbers,  # (customer_id, date,
                                                #  time_unit_number)
                pvalue.AsSingleton(options),
                pvalue.AsSingleton(limits_dates))
            | beam.Map(lambda x: (x[2], 1))  # key: time_unit_number
            | 'Group cal hol elog repeat by time unit number' >>
            beam.GroupByKey()
            | beam.Map(lambda x: (x[0], sum(x[1]))
                       )  # (time_unit_number, occurrences)
        )

        repeat_tx = (
            pipeline
            | 'Create single elem Stream III' >> beam.Create([1])
            | beam.FlatMap(c.calculate_cumulative_repeat_transactions,
                           pvalue.AsIter(cal_hol_elog_repeat)
                           )  # (time_unit_number, repeat_transactions,
                              #  repeat_transactions_cumulative)
        )

        model_validation = (
            pipeline
            | 'Create single elem Stream IV' >> beam.Create([1])
            | beam.FlatMap(c.calculate_model_fit_validation,
                           pvalue.AsSingleton(options),
                           pvalue.AsSingleton(limits_dates),
                           pvalue.AsIter(calcbs),
                           pvalue.AsIter(repeat_tx),
                           pvalue.AsSingleton(num_customers),
                           pvalue.AsSingleton(num_txns))
        )

        _ = (
            model_validation
            | beam.Map(c.raise_error_if_invalid_mape)
        )

        _ = (
            model_validation
            | beam.Map(lambda x: x[0])
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



