in kafka-connector/src/main/java/com/google/pubsublite/kafka/sink/Schemas.java [50:69]
static ByteString encodeToBytes(@Nullable Schema schema, Object object) {
switch (safeSchemaType(schema)) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
case BOOLEAN:
case STRING:
return ByteString.copyFromUtf8(stringRep(schema, object));
case BYTES:
return extractBytes(object);
case ARRAY:
case MAP:
case STRUCT:
return encode(schema, object).toByteString();
}
throw new DataException("Invalid schema type.");
}