flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java [366:391]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        private void addSchemaToConf(JobConf jobConf) {
            // set columns/types -- including partition cols
            List<String> typeStrs =
                    Arrays.stream(fieldTypes)
                            .map(t -> HiveTypeUtil.toHiveTypeInfo(t, true).toString())
                            .collect(Collectors.toList());
            jobConf.set(IOConstants.COLUMNS, String.join(",", fieldNames));
            jobConf.set(IOConstants.COLUMNS_TYPES, String.join(",", typeStrs));
            // set schema evolution -- excluding partition cols
            int numNonPartCol = fieldNames.length - partitionKeys.size();
            jobConf.set(
                    SCHEMA_EVOLUTION_COLUMNS,
                    String.join(",", Arrays.copyOfRange(fieldNames, 0, numNonPartCol)));
            jobConf.set(
                    SCHEMA_EVOLUTION_COLUMNS_TYPES,
                    String.join(",", typeStrs.subList(0, numNonPartCol)));

            // in older versions, parquet reader also expects the selected col indices in conf,
            // excluding part cols
            String readColIDs =
                    Arrays.stream(selectedFields)
                            .filter(i -> i < numNonPartCol)
                            .mapToObj(String::valueOf)
                            .collect(Collectors.joining(","));
            jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIDs);
        }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java [168:193]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private void addSchemaToConf(JobConf jobConf) {
        // set columns/types -- including partition cols
        List<String> typeStrs =
                Arrays.stream(fieldTypes)
                        .map(t -> HiveTypeUtil.toHiveTypeInfo(t, true).toString())
                        .collect(Collectors.toList());
        jobConf.set(IOConstants.COLUMNS, String.join(",", fieldNames));
        jobConf.set(IOConstants.COLUMNS_TYPES, String.join(",", typeStrs));
        // set schema evolution -- excluding partition cols
        int numNonPartCol = fieldNames.length - partitionKeys.size();
        jobConf.set(
                SCHEMA_EVOLUTION_COLUMNS,
                String.join(",", Arrays.copyOfRange(fieldNames, 0, numNonPartCol)));
        jobConf.set(
                SCHEMA_EVOLUTION_COLUMNS_TYPES,
                String.join(",", typeStrs.subList(0, numNonPartCol)));

        // in older versions, parquet reader also expects the selected col indices in conf,
        // excluding part cols
        String readColIDs =
                Arrays.stream(selectedFields)
                        .filter(i -> i < numNonPartCol)
                        .mapToObj(String::valueOf)
                        .collect(Collectors.joining(","));
        jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIDs);
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



