in plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java [270:331]
private Schema getSchema(Object value) {
Objects.requireNonNull(value);
if (value instanceof PlcValue) {
value = ((PlcValue) value).getObject();
}
if (value instanceof List) {
List list = (List) value;
if (list.isEmpty()) {
throw new ConnectException("Unsupported empty lists.");
}
// In PLC4X list elements all contain the same type.
Object firstElement = list.get(0);
Schema elementSchema = getSchema(firstElement);
return SchemaBuilder.array(elementSchema).build();
}
if (value instanceof BigInteger) {
// no support yet
}
if (value instanceof BigDecimal) {
// no support yet
}
if (value instanceof Boolean) {
return Schema.OPTIONAL_BOOLEAN_SCHEMA;
}
if (value instanceof byte[]) {
return Schema.OPTIONAL_BYTES_SCHEMA;
}
if (value instanceof Byte) {
return Schema.OPTIONAL_INT8_SCHEMA;
}
if (value instanceof Double) {
return Schema.OPTIONAL_FLOAT64_SCHEMA;
}
if (value instanceof Float) {
return Schema.OPTIONAL_FLOAT32_SCHEMA;
}
if (value instanceof Integer) {
return Schema.OPTIONAL_INT32_SCHEMA;
}
if (value instanceof LocalDate) {
return Date.builder().optional().build();
}
if (value instanceof LocalDateTime) {
return Timestamp.builder().optional().build();
}
if (value instanceof LocalTime) {
return Time.builder().optional().build();
}
if (value instanceof Long) {
return Schema.OPTIONAL_INT64_SCHEMA;
}
if (value instanceof Short) {
return Schema.OPTIONAL_INT16_SCHEMA;
}
if (value instanceof String) {
return Schema.OPTIONAL_STRING_SCHEMA;
}
// TODO: add support for collective and complex types
throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
}