marketing-analytics/predicting/future-customer-value-segments/fcvs_pipeline_bq.py [304:384]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        )

        fullcbs_without_extra_dimension = (
            full_elog_merged
            | beam.Map(lambda x: (x[0], x))  # key: customer_id
            | 'Group full merged elog by customer id' >> beam.GroupByKey()
            | beam.FlatMap(
                c.create_fullcbs,
                pvalue.AsSingleton(options),
                pvalue.AsSingleton(min_max_dates)
            )  # (customer_id, number_of_transactions, historical_aov,
               #  frequency, recency, total_time_observed)
        )

        full_elog_if_extra_dimension = (
            full_elog
            | 'Discard records if no extra dimension' >> beam.FlatMap(
                c.discard_if_no_extra_dimension, pvalue.AsSingleton(options))
        )

        extra_dimensions_stats = (
            full_elog_if_extra_dimension
            | beam.Map(lambda x: ((x[0], x[4]), x)
                       )  # key: (customer_id, extra_dimension)
            | 'Group full elog by customer id and extra dimension' >>
                beam.GroupByKey()
            | beam.Map(
                c.create_extra_dimensions_stats
            )  # (customer_id, extra_dimension, dimension_count, tot_sales,
               #  max_dimension_date)
        )

        top_dimension_per_customer = (
            extra_dimensions_stats
            | beam.Map(lambda x: (x[0], x))  # customer_id
            | 'Group extra dimension stats by customer id' >> beam.GroupByKey()
            | beam.Map(
                c.extract_top_extra_dimension
            )  # (customer_id, extra_dimension, dimension_count, tot_sales,
               #  max_dimension_date)
        )

        customer_dimension_map = (
            top_dimension_per_customer
            | beam.Map(
                lambda x: (x[0], x[1]))  # (customer_id, extra_dimension)
        )

        prediction = (
            pipeline
            | 'Create single elem Stream V' >> beam.Create([1])
            | beam.FlatMap(
                c.calculate_prediction,
                pvalue.AsSingleton(options),
                pvalue.AsIter(fullcbs_without_extra_dimension),
                pvalue.AsSingleton(num_customers),
                pvalue.AsSingleton(num_txns)
            )  # [customer_id, p_alive, predicted_purchases, future_aov,
               #  historical_aov, expected_value, frequency, recency,
               #  total_time_observed], prediction_params
        )

        prediction_by_customer_no_segments_no_extra_dimension = (
            prediction
            | beam.FlatMap(lambda x: x[0])  # Extract predictions by customer
        )

        prediction_by_customer_no_segments = (
            prediction_by_customer_no_segments_no_extra_dimension
            | beam.FlatMap(
                c.add_top_extra_dimension_to_fullcbs,
                pvalue.AsSingleton(options),
                pvalue.AsDict(customer_dimension_map)
            )  # [customer_id, p_alive, predicted_purchases, future_aov
               #  historical_aov, expected_value, frequency, recency,
               #  total_time_observed, extra_dimension?]
        )

        _ = (
            prediction
            | beam.Map(lambda x: x[1])  # Extract prediction params
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



marketing-analytics/predicting/future-customer-value-segments/fcvs_pipeline_csv.py [269:349]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        )

        fullcbs_without_extra_dimension = (
            full_elog_merged
            | beam.Map(lambda x: (x[0], x))  # key: customer_id
            | 'Group full merged elog by customer id' >> beam.GroupByKey()
            | beam.FlatMap(
                c.create_fullcbs,
                pvalue.AsSingleton(options),
                pvalue.AsSingleton(min_max_dates)
            )  # (customer_id, number_of_transactions, historical_aov,
               #  frequency, recency, total_time_observed)
        )

        full_elog_if_extra_dimension = (
            full_elog
            | 'Discard records if no extra dimension' >> beam.FlatMap(
                c.discard_if_no_extra_dimension, pvalue.AsSingleton(options))
        )

        extra_dimensions_stats = (
            full_elog_if_extra_dimension
            | beam.Map(lambda x: ((x[0], x[4]), x)
                       )  # key: (customer_id, extra_dimension)
            | 'Group full elog by customer id and extra dimension' >>
                beam.GroupByKey()
            | beam.Map(
                c.create_extra_dimensions_stats
            )  # (customer_id, extra_dimension, dimension_count, tot_sales,
               #  max_dimension_date)
        )

        top_dimension_per_customer = (
            extra_dimensions_stats
            | beam.Map(lambda x: (x[0], x))  # customer_id
            | 'Group extra dimension stats by customer id' >> beam.GroupByKey()
            | beam.Map(
                c.extract_top_extra_dimension
            )  # (customer_id, extra_dimension, dimension_count, tot_sales,
               #  max_dimension_date)
        )

        customer_dimension_map = (
            top_dimension_per_customer
            | beam.Map(
                lambda x: (x[0], x[1]))  # (customer_id, extra_dimension)
        )

        prediction = (
            pipeline
            | 'Create single elem Stream V' >> beam.Create([1])
            | beam.FlatMap(
                c.calculate_prediction,
                pvalue.AsSingleton(options),
                pvalue.AsIter(fullcbs_without_extra_dimension),
                pvalue.AsSingleton(num_customers),
                pvalue.AsSingleton(num_txns)
            )  # [customer_id, p_alive, predicted_purchases, future_aov,
               #  historical_aov, expected_value, frequency, recency,
               #  total_time_observed], prediction_params
        )

        prediction_by_customer_no_segments_no_extra_dimension = (
            prediction
            | beam.FlatMap(lambda x: x[0])  # Extract predictions by customer
        )

        prediction_by_customer_no_segments = (
            prediction_by_customer_no_segments_no_extra_dimension
            | beam.FlatMap(
                c.add_top_extra_dimension_to_fullcbs,
                pvalue.AsSingleton(options),
                pvalue.AsDict(customer_dimension_map)
            )  # [customer_id, p_alive, predicted_purchases, future_aov
               #  historical_aov, expected_value, frequency, recency,
               #  total_time_observed, extra_dimension?]
        )

        _ = (
            prediction
            | beam.Map(lambda x: x[1])  # Extract predictions params
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



