in kafka-connector/src/main/java/com/google/pubsublite/kafka/sink/Schemas.java [72:116]
static Value encode(@Nullable Schema schema, Object object) {
switch (safeSchemaType(schema)) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
return toValue((Number) object);
case BOOLEAN:
return Value.newBuilder().setBoolValue((Boolean) object).build();
case STRING:
return Value.newBuilder().setStringValue(object.toString()).build();
case BYTES:
ByteString bytes = extractBytes(object);
return Value.newBuilder()
.setStringValue(Base64.getEncoder().encodeToString(bytes.toByteArray())).build();
case ARRAY: {
ListValue.Builder listBuilder = ListValue.newBuilder();
List<Object> objects = (List<Object>) object;
for (Object o : objects) {
listBuilder.addValues(encode(schema.valueSchema(), o));
}
return Value.newBuilder().setListValue(listBuilder).build();
}
case MAP: {
Struct.Builder builder = Struct.newBuilder();
Map<Object, Object> map = (Map<Object, Object>) object;
for (Object key : map.keySet()) {
builder.putFields(stringRep(schema.keySchema(), key),
encode(schema.valueSchema(), map.get(key)));
}
return Value.newBuilder().setStructValue(builder).build();
}
case STRUCT: {
Struct.Builder builder = Struct.newBuilder();
org.apache.kafka.connect.data.Struct struct = (org.apache.kafka.connect.data.Struct) object;
for (Field f : schema.fields()) {
builder.putFields(f.name(), encode(f.schema(), struct.get(f)));
}
return Value.newBuilder().setStructValue(builder).build();
}
}
throw new DataException("Invalid schema type.");
}