flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java [90:160]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static IndexGenerator createRuntimeIndexGenerator(
            String index,
            String[] fieldNames,
            DataType[] fieldTypes,
            IndexHelper indexHelper,
            ZoneId localTimeZoneId) {
        final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index);
        final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr));
        final String indexSuffix =
                index.substring(indexPrefix.length() + dynamicIndexPatternStr.length());

        if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
            final String dateTimeFormat =
                    indexHelper.extractDateFormat(
                            index, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
                @Override
                public String generate(RowData row) {
                    return indexPrefix
                            .concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))
                            .concat(indexSuffix);
                }
            };
        }

        final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index);
        final int indexFieldPos =
                indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
        final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType();
        final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot();

        // validate index field type
        indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);

        // time extract dynamic index pattern
        final RowData.FieldGetter fieldGetter =
                RowData.createFieldGetter(indexFieldType, indexFieldPos);

        if (isDynamicIndexWithFormat) {
            final String dateTimeFormat =
                    indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
            DynamicFormatter formatFunction =
                    createFormatFunction(
                            indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId);

            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
                @Override
                public String generate(RowData row) {
                    Object fieldOrNull = fieldGetter.getFieldOrNull(row);
                    final String formattedField;
                    // TODO we can possibly optimize it to use the nullability of the field
                    if (fieldOrNull != null) {
                        formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter);
                    } else {
                        formattedField = "null";
                    }
                    return indexPrefix.concat(formattedField).concat(indexSuffix);
                }
            };
        }
        // general dynamic index pattern
        return new IndexGeneratorBase(index) {
            @Override
            public String generate(RowData row) {
                Object indexField = fieldGetter.getFieldOrNull(row);
                return indexPrefix
                        .concat(indexField == null ? "null" : indexField.toString())
                        .concat(indexSuffix);
            }
        };
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/IndexGeneratorFactory.java [93:163]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static IndexGenerator createRuntimeIndexGenerator(
            String index,
            String[] fieldNames,
            DataType[] fieldTypes,
            IndexHelper indexHelper,
            ZoneId localTimeZoneId) {
        final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index);
        final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr));
        final String indexSuffix =
                index.substring(indexPrefix.length() + dynamicIndexPatternStr.length());

        if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
            final String dateTimeFormat =
                    indexHelper.extractDateFormat(
                            index, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
                @Override
                public String generate(RowData row) {
                    return indexPrefix
                            .concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))
                            .concat(indexSuffix);
                }
            };
        }

        final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index);
        final int indexFieldPos =
                indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
        final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType();
        final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot();

        // validate index field type
        indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);

        // time extract dynamic index pattern
        final RowData.FieldGetter fieldGetter =
                RowData.createFieldGetter(indexFieldType, indexFieldPos);

        if (isDynamicIndexWithFormat) {
            final String dateTimeFormat =
                    indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
            DynamicFormatter formatFunction =
                    createFormatFunction(
                            indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId);

            return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
                @Override
                public String generate(RowData row) {
                    Object fieldOrNull = fieldGetter.getFieldOrNull(row);
                    final String formattedField;
                    // TODO we can possibly optimize it to use the nullability of the field
                    if (fieldOrNull != null) {
                        formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter);
                    } else {
                        formattedField = "null";
                    }
                    return indexPrefix.concat(formattedField).concat(indexSuffix);
                }
            };
        }
        // general dynamic index pattern
        return new IndexGeneratorBase(index) {
            @Override
            public String generate(RowData row) {
                Object indexField = fieldGetter.getFieldOrNull(row);
                return indexPrefix
                        .concat(indexField == null ? "null" : indexField.toString())
                        .concat(indexSuffix);
            }
        };
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



