private ByteString handleValue()

in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [235:345]


  private ByteString handleValue(Schema schema, Object value, Map<String, String> attributes) {
    if (value == null) {
      return null;
    }
    if (schema == null) {
      String str = value.toString();
      return ByteString.copyFromUtf8(str);
    }
    Schema.Type t = schema.type();
    switch (t) {
      case INT8:
        byte b = (Byte) value;
        byte[] arr = {b};
        return ByteString.copyFrom(arr);
      case INT16:
        ByteBuffer shortBuf = ByteBuffer.allocate(2);
        shortBuf.putShort((Short) value);
        return ByteString.copyFrom(shortBuf);
      case INT32:
        ByteBuffer intBuf = ByteBuffer.allocate(4);
        intBuf.putInt((Integer) value);
        return ByteString.copyFrom(intBuf);
      case INT64:
        ByteBuffer longBuf = ByteBuffer.allocate(8);
        longBuf.putLong((Long) value);
        return ByteString.copyFrom(longBuf);
      case FLOAT32:
        ByteBuffer floatBuf = ByteBuffer.allocate(4);
        floatBuf.putFloat((Float) value);
        return ByteString.copyFrom(floatBuf);
      case FLOAT64:
        ByteBuffer doubleBuf = ByteBuffer.allocate(8);
        doubleBuf.putDouble((Double) value);
        return ByteString.copyFrom(doubleBuf);
      case BOOLEAN:
        byte bool = (byte) ((Boolean) value ? 1 : 0);
        byte[] boolArr = {bool};
        return ByteString.copyFrom(boolArr);
      case STRING:
        String str = (String) value;
        return ByteString.copyFromUtf8(str);
      case BYTES:
        if (value instanceof ByteString) {
          return (ByteString) value;
        } else if (value instanceof byte[]) {
          return ByteString.copyFrom((byte[]) value);
        } else if (value instanceof ByteBuffer) {
          return ByteString.copyFrom((ByteBuffer) value);
        } else {
          throw new DataException("Unexpected value class with BYTES schema type.");
        }
      case STRUCT:
        Struct struct = (Struct) value;
        ByteString msgBody = null;
        for (Field f : schema.fields()) {
          Schema.Type fieldType = f.schema().type();
          if (fieldType == Type.MAP || fieldType == Type.STRUCT) {
            throw new DataException("Struct type does not support nested Map or Struct types, " +
                "present in field " + f.name());
          }

          Object val = struct.get(f);
          if (val == null) {
            if (!f.schema().isOptional()) {
              throw new DataException("Struct message missing required field " + f.name());
            }  else {
              continue;
            }
          }
          if (f.name().equals(messageBodyName)) {
            Schema bodySchema = f.schema();
            msgBody = handleValue(bodySchema, val, null);
          } else {
            attributes.put(f.name(), val.toString());
          }
        }
        if (msgBody != null) {
          return msgBody;
        } else {
          return ByteString.EMPTY;
        }
      case MAP:
        Map<Object, Object> map = (Map<Object, Object>) value;
        Set<Object> keys = map.keySet();
        ByteString mapBody = null;
        for (Object key : keys) {
          if (key.equals(messageBodyName)) {
            mapBody = ByteString.copyFromUtf8(map.get(key).toString());
          } else {
            attributes.put(key.toString(), map.get(key).toString());
          }
        }
        if (mapBody != null) {
          return mapBody;
        } else {
          return ByteString.EMPTY;
        }
      case ARRAY:
        Schema.Type arrType = schema.valueSchema().type();
        if (arrType == Type.MAP || arrType == Type.STRUCT) {
          throw new DataException("Array type does not support Map or Struct types.");
        }
        ByteString out = ByteString.EMPTY;
        Object[] objArr = (Object[]) value;
        for (Object o : objArr) {
          out = out.concat(handleValue(schema.valueSchema(), o, null));
        }
        return out;
    }
    return ByteString.EMPTY;
  }