flink-python/pyflink/fn_execution/embedded/converters.py (184 lines of code) (raw):

################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import pickle from abc import ABC, abstractmethod from typing import TypeVar, List, Tuple from pemja import findClass from pyflink.common import Row, RowKind, TypeInformation from pyflink.common.typeinfo import (PickledBytesTypeInfo, PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo, RowTypeInfo, TupleTypeInfo, MapTypeInfo, ListTypeInfo) from pyflink.datastream import TimeWindow, CountWindow, GlobalWindow IN = TypeVar('IN') OUT = TypeVar('OUT') # Java Window JTimeWindow = findClass('org.apache.flink.table.runtime.operators.window.TimeWindow') JCountWindow = findClass('org.apache.flink.table.runtime.operators.window.CountWindow') JGlobalWindow = findClass('org.apache.flink.table.runtime.operators.window.GlobalWindow') class DataConverter(ABC): @abstractmethod def to_internal(self, value) -> IN: pass @abstractmethod def to_external(self, value) -> OUT: pass def __eq__(self, other): return type(self) == type(other) class IdentityDataConverter(DataConverter): def to_internal(self, value) -> IN: return value def to_external(self, value) -> OUT: return value class PickleDataConverter(DataConverter): def to_internal(self, value) -> IN: if value is None: return None return pickle.loads(value) def to_external(self, value) -> OUT: if value is None: return None return pickle.dumps(value) class FlattenRowDataConverter(DataConverter): def __init__(self, field_data_converters: List[DataConverter]): self._field_data_converters = field_data_converters def to_internal(self, value) -> IN: if value is None: return None return tuple([self._field_data_converters[i].to_internal(item) for i, item in enumerate(value)]) def to_external(self, value) -> OUT: if value is None: return None return tuple([self._field_data_converters[i].to_external(item) for i, item in enumerate(value)]) class RowDataConverter(DataConverter): def __init__(self, field_data_converters: List[DataConverter], field_names: List[str]): self._field_data_converters = field_data_converters self._field_names = field_names def to_internal(self, value) -> IN: if value is None: return None row = Row() row._values = [self._field_data_converters[i].to_internal(item) for i, item in enumerate(value[1])] row.set_field_names(self._field_names) row.set_row_kind(RowKind(value[0])) return row def to_external(self, value: Row) -> OUT: if value is None: return None values = value._values fields = tuple([self._field_data_converters[i].to_external(values[i]) for i in range(len(values))]) return value.get_row_kind().value, fields class TupleDataConverter(DataConverter): def __init__(self, field_data_converters: List[DataConverter]): self._field_data_converters = field_data_converters def to_internal(self, value) -> IN: if value is None: return None return tuple([self._field_data_converters[i].to_internal(item) for i, item in enumerate(value)]) def to_external(self, value: Tuple) -> OUT: if value is None: return None return tuple([self._field_data_converters[i].to_external(item) for i, item in enumerate(value)]) class ListDataConverter(DataConverter): def __init__(self, field_converter: DataConverter): self._field_converter = field_converter def to_internal(self, value) -> IN: if value is None: return None return [self._field_converter.to_internal(item) for item in value] def to_external(self, value) -> OUT: if value is None: return None return [self._field_converter.to_external(item) for item in value] class ArrayDataConverter(ListDataConverter): def __init__(self, field_converter: DataConverter): super(ArrayDataConverter, self).__init__(field_converter) def to_internal(self, value) -> IN: return tuple(super(ArrayDataConverter, self).to_internal(value)) def to_external(self, value) -> OUT: return tuple(super(ArrayDataConverter, self).to_external(value)) class DictDataConverter(DataConverter): def __init__(self, key_converter: DataConverter, value_converter: DataConverter): self._key_converter = key_converter self._value_converter = value_converter def to_internal(self, value) -> IN: if value is None: return None return {self._key_converter.to_internal(k): self._value_converter.to_internal(v) for k, v in value.items()} def to_external(self, value) -> OUT: if value is None: return None return {self._key_converter.to_external(k): self._value_converter.to_external(v) for k, v in value.items()} class TimeWindowConverter(DataConverter): def to_internal(self, value) -> TimeWindow: return TimeWindow(value.getStart(), value.getEnd()) def to_external(self, value: TimeWindow) -> OUT: return JTimeWindow(value.start, value.end) class CountWindowConverter(DataConverter): def to_internal(self, value) -> CountWindow: return CountWindow(value.getId()) def to_external(self, value: CountWindow) -> OUT: return JCountWindow(value.id) class GlobalWindowConverter(DataConverter): def to_internal(self, value) -> IN: return GlobalWindow() def to_external(self, value) -> OUT: return JGlobalWindow.get() def from_type_info_proto(type_info): # for data stream type information. from pyflink.fn_execution import flink_fn_execution_pb2 type_info_name = flink_fn_execution_pb2.TypeInfo type_name = type_info.type_name if type_name == type_info_name.PICKLED_BYTES: return PickleDataConverter() elif type_name == type_info_name.ROW: return RowDataConverter( [from_type_info_proto(f.field_type) for f in type_info.row_type_info.fields], [f.field_name for f in type_info.row_type_info.fields]) elif type_name == type_info_name.TUPLE: return TupleDataConverter( [from_type_info_proto(field_type) for field_type in type_info.tuple_type_info.field_types]) elif type_name in (type_info_name.BASIC_ARRAY, type_info_name.OBJECT_ARRAY): return ArrayDataConverter(from_type_info_proto(type_info.collection_element_type)) elif type_info == type_info_name.LIST: return ListDataConverter(from_type_info_proto(type_info.collection_element_type)) elif type_name == type_info_name.MAP: return DictDataConverter(from_type_info_proto(type_info.map_type_info.key_type), from_type_info_proto(type_info.map_type_info.value_type)) return IdentityDataConverter() def from_schema_proto(schema, one_arg_optimized=False): field_converters = [from_field_type_proto(f.type) for f in schema.fields] if one_arg_optimized and len(field_converters) == 1: return field_converters[0] else: return FlattenRowDataConverter(field_converters) def from_field_type_proto(field_type): from pyflink.fn_execution import flink_fn_execution_pb2 schema_type_name = flink_fn_execution_pb2.Schema type_name = field_type.type_name if type_name == schema_type_name.ROW: return RowDataConverter( [from_field_type_proto(f.type) for f in field_type.row_schema.fields], [f.name for f in field_type.row_schema.fields]) elif type_name == schema_type_name.BASIC_ARRAY: return ArrayDataConverter(from_field_type_proto(field_type.collection_element_type)) elif type_name == schema_type_name.MAP: return DictDataConverter(from_field_type_proto(field_type.map_info.key_type), from_field_type_proto(field_type.map_info.value_type)) return IdentityDataConverter() def from_type_info(type_info: TypeInformation): if isinstance(type_info, (PickledBytesTypeInfo, RowTypeInfo, TupleTypeInfo)): return PickleDataConverter() elif isinstance(type_info, (PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo)): return ArrayDataConverter(from_type_info(type_info._element_type)) elif isinstance(type_info, ListTypeInfo): return ListDataConverter(from_type_info(type_info.elem_type)) elif isinstance(type_info, MapTypeInfo): return DictDataConverter(from_type_info(type_info._key_type_info), from_type_info(type_info._value_type_info)) return IdentityDataConverter()