static Value encode()

in kafka-connector/src/main/java/com/google/pubsublite/kafka/sink/Schemas.java [72:116]


  static Value encode(@Nullable Schema schema, Object object) {
    switch (safeSchemaType(schema)) {
      case INT8:
      case INT16:
      case INT32:
      case INT64:
      case FLOAT32:
      case FLOAT64:
        return toValue((Number) object);
      case BOOLEAN:
        return Value.newBuilder().setBoolValue((Boolean) object).build();
      case STRING:
        return Value.newBuilder().setStringValue(object.toString()).build();
      case BYTES:
        ByteString bytes = extractBytes(object);
        return Value.newBuilder()
            .setStringValue(Base64.getEncoder().encodeToString(bytes.toByteArray())).build();
      case ARRAY: {
        ListValue.Builder listBuilder = ListValue.newBuilder();
        List<Object> objects = (List<Object>) object;
        for (Object o : objects) {
          listBuilder.addValues(encode(schema.valueSchema(), o));
        }
        return Value.newBuilder().setListValue(listBuilder).build();
      }
      case MAP: {
        Struct.Builder builder = Struct.newBuilder();
        Map<Object, Object> map = (Map<Object, Object>) object;
        for (Object key : map.keySet()) {
          builder.putFields(stringRep(schema.keySchema(), key),
              encode(schema.valueSchema(), map.get(key)));
        }
        return Value.newBuilder().setStructValue(builder).build();
      }
      case STRUCT: {
        Struct.Builder builder = Struct.newBuilder();
        org.apache.kafka.connect.data.Struct struct = (org.apache.kafka.connect.data.Struct) object;
        for (Field f : schema.fields()) {
          builder.putFields(f.name(), encode(f.schema(), struct.get(f)));
        }
        return Value.newBuilder().setStructValue(builder).build();
      }
    }
    throw new DataException("Invalid schema type.");
  }