def extract_process_function()

in flink-python/pyflink/fn_execution/datastream/embedded/operations.py [0:0]


def extract_process_function(
        user_defined_function_proto, j_runtime_context, j_function_context, j_timer_context,
        j_side_output_context, job_parameters, j_keyed_state_backend, j_operator_state_backend):
    from pyflink.fn_execution import flink_fn_execution_pb2
    UserDefinedDataStreamFunction = flink_fn_execution_pb2.UserDefinedDataStreamFunction

    user_defined_func = pickle.loads(user_defined_function_proto.payload)
    func_type = user_defined_function_proto.function_type

    runtime_context = StreamingRuntimeContext.of(j_runtime_context, job_parameters)

    if j_side_output_context:
        side_output_context = SideOutputContext(j_side_output_context)

        def process_func(values):
            if values is None:
                return
            for value in values:
                if isinstance(value, tuple) and isinstance(value[0], OutputTag):
                    output_tag = value[0]  # type: OutputTag
                    side_output_context.collect(output_tag.tag_id, value[1])
                else:
                    yield value
    else:
        def process_func(values):
            if values is None:
                return
            yield from values

    def open_func():
        if hasattr(user_defined_func, "open"):
            user_defined_func.open(runtime_context)

    def close_func():
        if hasattr(user_defined_func, "close"):
            user_defined_func.close()

    if func_type == UserDefinedDataStreamFunction.PROCESS:
        function_context = InternalProcessFunctionContext(j_function_context)
        process_element = user_defined_func.process_element

        def process_element_func(value):
            yield from process_func(process_element(value, function_context))

        return OneInputOperation(open_func, close_func, process_element_func)

    elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS:
        function_context = InternalKeyedProcessFunctionContext(
            j_function_context, user_defined_function_proto.key_type_info)
        timer_context = InternalKeyedProcessFunctionOnTimerContext(
            j_timer_context, user_defined_function_proto.key_type_info)

        keyed_state_backend = KeyedStateBackend(
            function_context,
            j_keyed_state_backend)
        runtime_context.set_keyed_state_backend(keyed_state_backend)

        process_element = user_defined_func.process_element
        on_timer = user_defined_func.on_timer

        def process_element_func(value):
            yield from process_func(process_element(value[1], function_context))

        def on_timer_func(timestamp):
            yield from process_func(on_timer(timestamp, timer_context))

        return OneInputOperation(open_func, close_func, process_element_func, on_timer_func)

    elif func_type == UserDefinedDataStreamFunction.CO_PROCESS:
        function_context = InternalProcessFunctionContext(j_function_context)

        process_element1 = user_defined_func.process_element1
        process_element2 = user_defined_func.process_element2

        def process_element_func1(value):
            yield from process_func(process_element1(value, function_context))

        def process_element_func2(value):
            yield from process_func(process_element2(value, function_context))

        return TwoInputOperation(
            open_func, close_func, process_element_func1, process_element_func2)

    elif func_type == UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS:
        broadcast_ctx = InternalBroadcastProcessFunctionContext(
            j_function_context, j_operator_state_backend)
        read_only_broadcast_ctx = InternalBroadcastProcessFunctionReadOnlyContext(
            j_function_context, j_operator_state_backend)

        process_element = user_defined_func.process_element
        process_broadcast_element = user_defined_func.process_broadcast_element

        def process_element_func1(value):
            yield from process_func(process_element(value, read_only_broadcast_ctx))

        def process_element_func2(value):
            yield from process_func(process_broadcast_element(value, broadcast_ctx))

        return TwoInputOperation(
            open_func, close_func, process_element_func1, process_element_func2)

    elif func_type == UserDefinedDataStreamFunction.KEYED_CO_PROCESS:
        function_context = InternalKeyedProcessFunctionContext(
            j_function_context, user_defined_function_proto.key_type_info)
        timer_context = InternalKeyedProcessFunctionOnTimerContext(
            j_timer_context, user_defined_function_proto.key_type_info)

        keyed_state_backend = KeyedStateBackend(
            function_context,
            j_keyed_state_backend)
        runtime_context.set_keyed_state_backend(keyed_state_backend)

        process_element1 = user_defined_func.process_element1
        process_element2 = user_defined_func.process_element2
        on_timer = user_defined_func.on_timer

        def process_element_func1(value):
            yield from process_func(process_element1(value[1], function_context))

        def process_element_func2(value):
            yield from process_func(process_element2(value[1], function_context))

        def on_timer_func(timestamp):
            yield from process_func(on_timer(timestamp, timer_context))

        return TwoInputOperation(
            open_func, close_func, process_element_func1, process_element_func2, on_timer_func)

    elif func_type == UserDefinedDataStreamFunction.KEYED_CO_BROADCAST_PROCESS:
        broadcast_ctx = InternalKeyedBroadcastProcessFunctionContext(
            j_function_context, j_operator_state_backend)
        read_only_broadcast_ctx = InternalKeyedBroadcastProcessFunctionReadOnlyContext(
            j_function_context, user_defined_function_proto.key_type_info, j_operator_state_backend)
        timer_context = InternalKeyedBroadcastProcessFunctionOnTimerContext(
            j_timer_context, user_defined_function_proto.key_type_info, j_operator_state_backend)

        keyed_state_backend = KeyedStateBackend(
            read_only_broadcast_ctx,
            j_keyed_state_backend)
        runtime_context.set_keyed_state_backend(keyed_state_backend)

        process_element = user_defined_func.process_element
        process_broadcast_element = user_defined_func.process_broadcast_element
        on_timer = user_defined_func.on_timer

        def process_element_func1(value):
            yield from process_func(process_element(value[1], read_only_broadcast_ctx))

        def process_element_func2(value):
            yield from process_func(process_broadcast_element(value, broadcast_ctx))

        def on_timer_func(timestamp):
            yield from on_timer(timestamp, timer_context)

        return TwoInputOperation(
            open_func, close_func, process_element_func1, process_element_func2, on_timer_func)

    elif func_type == UserDefinedDataStreamFunction.WINDOW:

        window_operation_descriptor = (
            user_defined_func
        )  # type: WindowOperationDescriptor

        def user_key_selector(normal_data):
            return normal_data

        window_assigner = window_operation_descriptor.assigner
        window_trigger = window_operation_descriptor.trigger
        allowed_lateness = window_operation_descriptor.allowed_lateness
        late_data_output_tag = window_operation_descriptor.late_data_output_tag
        window_state_descriptor = window_operation_descriptor.window_state_descriptor
        internal_window_function = window_operation_descriptor.internal_window_function
        window_serializer = window_operation_descriptor.window_serializer
        window_coder = window_serializer._get_coder()

        if isinstance(window_coder, TimeWindowCoder):
            window_converter = TimeWindowConverter()
        elif isinstance(window_coder, CountWindowCoder):
            window_converter = CountWindowConverter()
        else:
            window_converter = GlobalWindowConverter()

        internal_timer_service = InternalTimerServiceImpl(
            j_timer_context.timerService(), window_converter)

        function_context = InternalKeyedProcessFunctionContext(
            j_function_context,
            user_defined_function_proto.key_type_info)
        window_timer_context = InternalWindowTimerContext(
            j_timer_context,
            user_defined_function_proto.key_type_info,
            window_converter)

        keyed_state_backend = KeyedStateBackend(
            function_context,
            j_keyed_state_backend,
            j_function_context.getWindowSerializer(),
            window_converter)
        runtime_context.set_keyed_state_backend(keyed_state_backend)

        window_operator = WindowOperator(
            window_assigner,
            keyed_state_backend,
            user_key_selector,
            window_state_descriptor,
            internal_window_function,
            window_trigger,
            allowed_lateness,
            late_data_output_tag)

        def open_func():
            window_operator.open(runtime_context, internal_timer_service)

        def close_func():
            window_operator.close()

        def process_element_func(value):
            yield from process_func(
                window_operator.process_element(value[1], function_context.timestamp()))

        if window_assigner.is_event_time():
            def on_timer_func(timestamp):
                window = window_timer_context.window()
                key = window_timer_context.get_current_key()
                yield from process_func(window_operator.on_event_time(timestamp, key, window))
        else:
            def on_timer_func(timestamp):
                window = window_timer_context.window()
                key = window_timer_context.get_current_key()
                yield from process_func(window_operator.on_processing_time(timestamp, key, window))

        return OneInputOperation(open_func, close_func, process_element_func, on_timer_func)

    else:
        raise Exception("Unknown function type {0}.".format(func_type))