in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java [90:160]
private static IndexGenerator createRuntimeIndexGenerator(
String index,
String[] fieldNames,
DataType[] fieldTypes,
IndexHelper indexHelper,
ZoneId localTimeZoneId) {
final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index);
final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr));
final String indexSuffix =
index.substring(indexPrefix.length() + dynamicIndexPatternStr.length());
if (indexHelper.checkIsDynamicIndexWithSystemTimeFormat(index)) {
final String dateTimeFormat =
indexHelper.extractDateFormat(
index, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
@Override
public String generate(RowData row) {
return indexPrefix
.concat(LocalDateTime.now(localTimeZoneId).format(dateTimeFormatter))
.concat(indexSuffix);
}
};
}
final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index);
final int indexFieldPos =
indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType();
final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot();
// validate index field type
indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);
// time extract dynamic index pattern
final RowData.FieldGetter fieldGetter =
RowData.createFieldGetter(indexFieldType, indexFieldPos);
if (isDynamicIndexWithFormat) {
final String dateTimeFormat =
indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
DynamicFormatter formatFunction =
createFormatFunction(
indexFieldType, indexFieldLogicalTypeRoot, localTimeZoneId);
return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
@Override
public String generate(RowData row) {
Object fieldOrNull = fieldGetter.getFieldOrNull(row);
final String formattedField;
// TODO we can possibly optimize it to use the nullability of the field
if (fieldOrNull != null) {
formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter);
} else {
formattedField = "null";
}
return indexPrefix.concat(formattedField).concat(indexSuffix);
}
};
}
// general dynamic index pattern
return new IndexGeneratorBase(index) {
@Override
public String generate(RowData row) {
Object indexField = fieldGetter.getFieldOrNull(row);
return indexPrefix
.concat(indexField == null ? "null" : indexField.toString())
.concat(indexSuffix);
}
};
}