in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [235:345]
private ByteString handleValue(Schema schema, Object value, Map<String, String> attributes) {
if (value == null) {
return null;
}
if (schema == null) {
String str = value.toString();
return ByteString.copyFromUtf8(str);
}
Schema.Type t = schema.type();
switch (t) {
case INT8:
byte b = (Byte) value;
byte[] arr = {b};
return ByteString.copyFrom(arr);
case INT16:
ByteBuffer shortBuf = ByteBuffer.allocate(2);
shortBuf.putShort((Short) value);
return ByteString.copyFrom(shortBuf);
case INT32:
ByteBuffer intBuf = ByteBuffer.allocate(4);
intBuf.putInt((Integer) value);
return ByteString.copyFrom(intBuf);
case INT64:
ByteBuffer longBuf = ByteBuffer.allocate(8);
longBuf.putLong((Long) value);
return ByteString.copyFrom(longBuf);
case FLOAT32:
ByteBuffer floatBuf = ByteBuffer.allocate(4);
floatBuf.putFloat((Float) value);
return ByteString.copyFrom(floatBuf);
case FLOAT64:
ByteBuffer doubleBuf = ByteBuffer.allocate(8);
doubleBuf.putDouble((Double) value);
return ByteString.copyFrom(doubleBuf);
case BOOLEAN:
byte bool = (byte) ((Boolean) value ? 1 : 0);
byte[] boolArr = {bool};
return ByteString.copyFrom(boolArr);
case STRING:
String str = (String) value;
return ByteString.copyFromUtf8(str);
case BYTES:
if (value instanceof ByteString) {
return (ByteString) value;
} else if (value instanceof byte[]) {
return ByteString.copyFrom((byte[]) value);
} else if (value instanceof ByteBuffer) {
return ByteString.copyFrom((ByteBuffer) value);
} else {
throw new DataException("Unexpected value class with BYTES schema type.");
}
case STRUCT:
Struct struct = (Struct) value;
ByteString msgBody = null;
for (Field f : schema.fields()) {
Schema.Type fieldType = f.schema().type();
if (fieldType == Type.MAP || fieldType == Type.STRUCT) {
throw new DataException("Struct type does not support nested Map or Struct types, " +
"present in field " + f.name());
}
Object val = struct.get(f);
if (val == null) {
if (!f.schema().isOptional()) {
throw new DataException("Struct message missing required field " + f.name());
} else {
continue;
}
}
if (f.name().equals(messageBodyName)) {
Schema bodySchema = f.schema();
msgBody = handleValue(bodySchema, val, null);
} else {
attributes.put(f.name(), val.toString());
}
}
if (msgBody != null) {
return msgBody;
} else {
return ByteString.EMPTY;
}
case MAP:
Map<Object, Object> map = (Map<Object, Object>) value;
Set<Object> keys = map.keySet();
ByteString mapBody = null;
for (Object key : keys) {
if (key.equals(messageBodyName)) {
mapBody = ByteString.copyFromUtf8(map.get(key).toString());
} else {
attributes.put(key.toString(), map.get(key).toString());
}
}
if (mapBody != null) {
return mapBody;
} else {
return ByteString.EMPTY;
}
case ARRAY:
Schema.Type arrType = schema.valueSchema().type();
if (arrType == Type.MAP || arrType == Type.STRUCT) {
throw new DataException("Array type does not support Map or Struct types.");
}
ByteString out = ByteString.EMPTY;
Object[] objArr = (Object[]) value;
for (Object o : objArr) {
out = out.concat(handleValue(schema.valueSchema(), o, null));
}
return out;
}
return ByteString.EMPTY;
}