in pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java [176:295]
public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) throws JsonProcessingException {
if (elasticSearchConfig.isSchemaEnable()) {
Object key = null;
GenericObject value = null;
Schema<?> keySchema = null;
Schema<?> valueSchema = null;
if (record.getSchema() != null && record.getSchema() instanceof KeyValueSchema) {
KeyValueSchema<GenericObject, GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
keySchema = keyValueSchema.getKeySchema();
valueSchema = keyValueSchema.getValueSchema();
KeyValue<GenericObject, GenericObject> keyValue =
(KeyValue<GenericObject, GenericObject>) record.getValue().getNativeObject();
key = keyValue.getKey();
value = keyValue.getValue();
} else {
key = record.getKey().orElse(null);
valueSchema = record.getSchema();
value = getGenericObjectFromRecord(record);
}
String id = null;
if (!elasticSearchConfig.isKeyIgnore() && key != null) {
if (keySchema != null){
id = stringifyKey(keySchema, key);
} else {
id = key.toString();
}
}
String doc = null;
if (value != null) {
if (valueSchema != null) {
if (elasticSearchConfig.isCopyKeyFields()
&& (keySchema.getSchemaInfo().getType().equals(SchemaType.AVRO)
|| keySchema.getSchemaInfo().getType().equals(SchemaType.JSON))) {
JsonNode keyNode = extractJsonNode(keySchema, key);
JsonNode valueNode = extractJsonNode(valueSchema, value);
doc = stringify(JsonConverter.topLevelMerge(keyNode, valueNode));
} else {
doc = stringifyValue(valueSchema, value);
}
} else {
if (value.getNativeObject() instanceof byte[]) {
// for BWC with the ES-Sink
doc = new String((byte[]) value.getNativeObject(), StandardCharsets.UTF_8);
} else {
doc = value.getNativeObject().toString();
}
}
}
if (doc != null && primaryFields != null) {
try {
// extract the PK from the JSON document
JsonNode jsonNode = objectMapper.readTree(doc);
id = stringifyKey(jsonNode, primaryFields);
} catch (JsonProcessingException e) {
log.error("Failed to read JSON", e);
throw e;
}
}
final ElasticSearchConfig.IdHashingAlgorithm idHashingAlgorithm =
elasticSearchConfig.getIdHashingAlgorithm();
if (id != null
&& idHashingAlgorithm != null
&& idHashingAlgorithm != ElasticSearchConfig.IdHashingAlgorithm.NONE) {
final byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);
boolean performHashing = true;
if (elasticSearchConfig.isConditionalIdHashing() && idBytes.length <= 512) {
performHashing = false;
}
if (performHashing) {
Hasher hasher;
switch (idHashingAlgorithm) {
case SHA256:
hasher = Hashing.sha256().newHasher();
break;
case SHA512:
hasher = Hashing.sha512().newHasher();
break;
default:
throw new UnsupportedOperationException("Unsupported IdHashingAlgorithm: "
+ idHashingAlgorithm);
}
hasher.putBytes(idBytes);
id = base64Encoder.encodeToString(hasher.hash().asBytes());
}
}
if (log.isDebugEnabled()) {
SchemaType schemaType = null;
if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
schemaType = record.getSchema().getSchemaInfo().getType();
}
log.debug("recordType={} schemaType={} id={} doc={}",
record.getClass().getName(),
schemaType,
id,
doc);
}
doc = sanitizeValue(doc);
return Pair.of(id, doc);
} else {
Message message = record.getMessage().orElse(null);
final String rawData;
if (message != null) {
rawData = new String(message.getData(), StandardCharsets.UTF_8);
} else {
GenericObject recordObject = getGenericObjectFromRecord(record);
rawData = stringifyValue(record.getSchema(), recordObject);
}
if (rawData == null || rawData.length() == 0){
throw new IllegalArgumentException("Record does not carry message information.");
}
String key = elasticSearchConfig.isKeyIgnore() ? null : record.getKey().map(Object::toString).orElse(null);
return Pair.of(key, sanitizeValue(rawData));
}
}