public Pair extractIdAndDocument()

in pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java [176:295]


    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) throws JsonProcessingException {
        if (elasticSearchConfig.isSchemaEnable()) {
            Object key = null;
            GenericObject value = null;
            Schema<?> keySchema = null;
            Schema<?> valueSchema = null;
            if (record.getSchema() != null && record.getSchema() instanceof KeyValueSchema) {
                KeyValueSchema<GenericObject, GenericObject> keyValueSchema = (KeyValueSchema) record.getSchema();
                keySchema = keyValueSchema.getKeySchema();
                valueSchema = keyValueSchema.getValueSchema();
                KeyValue<GenericObject, GenericObject> keyValue =
                        (KeyValue<GenericObject, GenericObject>) record.getValue().getNativeObject();
                key = keyValue.getKey();
                value = keyValue.getValue();
            } else {
                key = record.getKey().orElse(null);
                valueSchema = record.getSchema();
                value = getGenericObjectFromRecord(record);
            }

            String id = null;
            if (!elasticSearchConfig.isKeyIgnore() && key != null) {
                if (keySchema != null){
                    id = stringifyKey(keySchema, key);
                } else {
                    id = key.toString();
                }
            }

            String doc = null;
            if (value != null) {
                if (valueSchema != null) {
                    if (elasticSearchConfig.isCopyKeyFields()
                            && (keySchema.getSchemaInfo().getType().equals(SchemaType.AVRO)
                            || keySchema.getSchemaInfo().getType().equals(SchemaType.JSON))) {
                        JsonNode keyNode = extractJsonNode(keySchema, key);
                        JsonNode valueNode = extractJsonNode(valueSchema, value);
                        doc = stringify(JsonConverter.topLevelMerge(keyNode, valueNode));
                    } else {
                        doc = stringifyValue(valueSchema, value);
                    }
                } else {
                    if (value.getNativeObject() instanceof byte[]) {
                        // for BWC with the ES-Sink
                        doc = new String((byte[]) value.getNativeObject(), StandardCharsets.UTF_8);
                    } else {
                        doc = value.getNativeObject().toString();
                    }
                }
            }

            if (doc != null && primaryFields != null) {
                try {
                    // extract the PK from the JSON document
                    JsonNode jsonNode = objectMapper.readTree(doc);
                    id = stringifyKey(jsonNode, primaryFields);
                } catch (JsonProcessingException e) {
                    log.error("Failed to read JSON", e);
                    throw e;
                }
            }

            final ElasticSearchConfig.IdHashingAlgorithm idHashingAlgorithm =
                    elasticSearchConfig.getIdHashingAlgorithm();
            if (id != null
                    && idHashingAlgorithm != null
                    && idHashingAlgorithm != ElasticSearchConfig.IdHashingAlgorithm.NONE) {
                final byte[] idBytes = id.getBytes(StandardCharsets.UTF_8);

                boolean performHashing = true;
                if (elasticSearchConfig.isConditionalIdHashing() && idBytes.length <= 512) {
                    performHashing = false;
                }
                if (performHashing) {
                    Hasher hasher;
                    switch (idHashingAlgorithm) {
                        case SHA256:
                            hasher = Hashing.sha256().newHasher();
                            break;
                        case SHA512:
                            hasher = Hashing.sha512().newHasher();
                            break;
                        default:
                            throw new UnsupportedOperationException("Unsupported IdHashingAlgorithm: "
                                    + idHashingAlgorithm);
                    }
                    hasher.putBytes(idBytes);
                    id = base64Encoder.encodeToString(hasher.hash().asBytes());
                }
            }

            if (log.isDebugEnabled()) {
                SchemaType schemaType = null;
                if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
                    schemaType = record.getSchema().getSchemaInfo().getType();
                }
                log.debug("recordType={} schemaType={} id={} doc={}",
                        record.getClass().getName(),
                        schemaType,
                        id,
                        doc);
            }
            doc = sanitizeValue(doc);
            return Pair.of(id, doc);
    } else {
            Message message = record.getMessage().orElse(null);
            final String rawData;
            if (message != null) {
                rawData = new String(message.getData(), StandardCharsets.UTF_8);
            } else {
                GenericObject recordObject = getGenericObjectFromRecord(record);
                rawData = stringifyValue(record.getSchema(), recordObject);
            }
            if (rawData == null || rawData.length() == 0){
                throw new IllegalArgumentException("Record does not carry message information.");
            }
            String key = elasticSearchConfig.isKeyIgnore() ? null : record.getKey().map(Object::toString).orElse(null);
            return Pair.of(key, sanitizeValue(rawData));
        }
    }