private Schema getSchema()

in plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java [270:331]


    private Schema getSchema(Object value) {
        Objects.requireNonNull(value);

        if (value instanceof PlcValue) {
            value = ((PlcValue) value).getObject();
        }

        if (value instanceof List) {
            List list = (List) value;
            if (list.isEmpty()) {
                throw new ConnectException("Unsupported empty lists.");
            }
            // In PLC4X list elements all contain the same type.
            Object firstElement = list.get(0);
            Schema elementSchema = getSchema(firstElement);
            return SchemaBuilder.array(elementSchema).build();
        }
        if (value instanceof BigInteger) {
            // no support yet
        }
        if (value instanceof BigDecimal) {
            // no support yet
        }
        if (value instanceof Boolean) {
            return Schema.OPTIONAL_BOOLEAN_SCHEMA;
        }
        if (value instanceof byte[]) {
            return Schema.OPTIONAL_BYTES_SCHEMA;
        }
        if (value instanceof Byte) {
            return Schema.OPTIONAL_INT8_SCHEMA;
        }
        if (value instanceof Double) {
            return Schema.OPTIONAL_FLOAT64_SCHEMA;
        }
        if (value instanceof Float) {
            return Schema.OPTIONAL_FLOAT32_SCHEMA;
        }
        if (value instanceof Integer) {
            return Schema.OPTIONAL_INT32_SCHEMA;
        }
        if (value instanceof LocalDate) {
            return Date.builder().optional().build();
        }
        if (value instanceof LocalDateTime) {
            return Timestamp.builder().optional().build();
        }
        if (value instanceof LocalTime) {
            return Time.builder().optional().build();
        }
        if (value instanceof Long) {
            return Schema.OPTIONAL_INT64_SCHEMA;
        }
        if (value instanceof Short) {
            return Schema.OPTIONAL_INT16_SCHEMA;
        }
        if (value instanceof String) {
            return Schema.OPTIONAL_STRING_SCHEMA;
        }
        // TODO: add support for collective and complex types
        throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
    }