in src/main/java/com/amazon/kinesis/kafka/DataUtility.java [30:112]
public static ByteBuffer parseValue(Schema schema, Object value) {
if (value == null) {
return null;
}
Schema.Type t = schema.type();
switch (t) {
case INT8:
ByteBuffer byteBuffer = ByteBuffer.allocate(1);
byteBuffer.put((Byte) value);
return byteBuffer;
case INT16:
ByteBuffer shortBuf = ByteBuffer.allocate(2);
shortBuf.putShort((Short) value);
return shortBuf;
case INT32:
ByteBuffer intBuf = ByteBuffer.allocate(4);
intBuf.putInt((Integer) value);
return intBuf;
case INT64:
ByteBuffer longBuf = ByteBuffer.allocate(8);
longBuf.putLong((Long) value);
return longBuf;
case FLOAT32:
ByteBuffer floatBuf = ByteBuffer.allocate(4);
floatBuf.putFloat((Float) value);
return floatBuf;
case FLOAT64:
ByteBuffer doubleBuf = ByteBuffer.allocate(8);
doubleBuf.putDouble((Double) value);
return doubleBuf;
case BOOLEAN:
ByteBuffer boolBuffer = ByteBuffer.allocate(1);
boolBuffer.put((byte) ((Boolean) value ? 1 : 0));
return boolBuffer;
case STRING:
try {
return ByteBuffer.wrap(((String) value).getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
System.out.println("Message cannot be translated:" + e.getLocalizedMessage());
} catch (Exception e) {
System.out.println("Unepxected error:" + e.getLocalizedMessage());
throw e;
}
case ARRAY:
Schema sch = schema.valueSchema();
if (sch.type() == Type.MAP || sch.type() == Type.STRUCT) {
throw new DataException("Invalid schema type.");
}
Object[] objs = (Object[]) value;
ByteBuffer[] byteBuffers = new ByteBuffer[objs.length];
int noOfByteBuffer = 0;
for (Object obj : objs) {
byteBuffers[noOfByteBuffer++] = parseValue(sch, obj);
}
ByteBuffer result = ByteBuffer.allocate(Arrays.stream(byteBuffers).mapToInt(Buffer::remaining).sum());
Arrays.stream(byteBuffers).forEach(bb -> result.put(bb.duplicate()));
return result;
case BYTES:
if (value instanceof byte[])
return ByteBuffer.wrap((byte[]) value);
else if (value instanceof ByteBuffer)
return (ByteBuffer) value;
case MAP:
// TO BE IMPLEMENTED
return ByteBuffer.wrap(null);
case STRUCT:
List<ByteBuffer> fieldList = new LinkedList<ByteBuffer>();
// Parsing each field of structure
schema.fields().forEach(field -> fieldList.add(parseValue(field.schema(), ((Struct) value).get(field))));
// Initialize ByteBuffer
ByteBuffer processedValue = ByteBuffer.allocate(fieldList.stream().mapToInt(Buffer::remaining).sum());
// Combine bytebuffer of all fields
fieldList.forEach(buffer -> processedValue.put(buffer.duplicate()));
return processedValue;
}
return null;
}