in generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java [572:684]
private void generateVariableLengthReader(Versions fieldFlexibleVersions,
String name,
FieldType type,
Versions possibleVersions,
Versions nullableVersions,
String assignmentPrefix,
String assignmentSuffix,
boolean isStructArrayWithKeys,
boolean zeroCopy) {
String lengthVar = type.isArray() ? "arrayLength" : "length";
buffer.printf("int %s;%n", lengthVar);
VersionConditional.forVersions(fieldFlexibleVersions, possibleVersions).
ifMember(__ -> {
buffer.printf("%s = _readable.readUnsignedVarint() - 1;%n", lengthVar);
}).
ifNotMember(__ -> {
if (type.isString()) {
buffer.printf("%s = _readable.readShort();%n", lengthVar);
} else if (type.isBytes() || type.isArray() || type.isRecords()) {
buffer.printf("%s = _readable.readInt();%n", lengthVar);
} else {
throw new RuntimeException("Can't handle variable length type " + type);
}
}).
generate(buffer);
buffer.printf("if (%s < 0) {%n", lengthVar);
buffer.incrementIndent();
VersionConditional.forVersions(nullableVersions, possibleVersions).
ifNotMember(__ -> {
buffer.printf("throw new RuntimeException(\"non-nullable field %s " +
"was serialized as null\");%n", name);
}).
ifMember(__ -> {
buffer.printf("%snull%s", assignmentPrefix, assignmentSuffix);
}).
generate(buffer);
buffer.decrementIndent();
if (type.isString()) {
buffer.printf("} else if (%s > 0x7fff) {%n", lengthVar);
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"string field %s " +
"had invalid length \" + %s);%n", name, lengthVar);
buffer.decrementIndent();
}
buffer.printf("} else {%n");
buffer.incrementIndent();
if (type.isString()) {
buffer.printf("%s_readable.readString(%s)%s",
assignmentPrefix, lengthVar, assignmentSuffix);
} else if (type.isBytes()) {
if (zeroCopy) {
buffer.printf("%s_readable.readByteBuffer(%s)%s",
assignmentPrefix, lengthVar, assignmentSuffix);
} else {
buffer.printf("byte[] newBytes = new byte[%s];%n", lengthVar);
buffer.printf("_readable.readArray(newBytes);%n");
buffer.printf("%snewBytes%s", assignmentPrefix, assignmentSuffix);
}
} else if (type.isRecords()) {
headerGenerator.addImport(MessageGenerator.RECORDS_READABLE_CLASS);
buffer.printf("if (_readable instanceof RecordsReadable) {%n");
buffer.incrementIndent();
buffer.printf("%s((RecordsReadable) _readable).readRecords(%s)%s",
assignmentPrefix, lengthVar, assignmentSuffix);
buffer.decrementIndent();
buffer.printf("} else {%n");
buffer.incrementIndent();
buffer.printf("throw new RuntimeException(\"Cannot read records from " +
"reader of class: \" + _readable.getClass().getSimpleName());%n");
buffer.decrementIndent();
buffer.printf("}%n");
} else if (type.isArray()) {
FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
if (isStructArrayWithKeys) {
headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
buffer.printf("%s newCollection = new %s(%s);%n",
FieldSpec.collectionType(arrayType.elementType().toString()),
FieldSpec.collectionType(arrayType.elementType().toString()), lengthVar);
} else {
headerGenerator.addImport(MessageGenerator.ARRAYLIST_CLASS);
String boxedArrayType =
arrayType.elementType().getBoxedJavaType(headerGenerator);
buffer.printf("ArrayList<%s> newCollection = new ArrayList<%s>(%s);%n",
boxedArrayType, boxedArrayType, lengthVar);
}
buffer.printf("for (int i = 0; i < %s; i++) {%n", lengthVar);
buffer.incrementIndent();
if (arrayType.elementType().isArray()) {
throw new RuntimeException("Nested arrays are not supported. " +
"Use an array of structures containing another array.");
} else if (arrayType.elementType().isBytes() || arrayType.elementType().isString()) {
generateVariableLengthReader(fieldFlexibleVersions,
name + " element",
arrayType.elementType(),
possibleVersions,
Versions.NONE,
"newCollection.add(",
String.format(");%n"),
false,
false);
} else {
buffer.printf("newCollection.add(%s);%n",
primitiveReadExpression(arrayType.elementType()));
}
buffer.decrementIndent();
buffer.printf("}%n");
buffer.printf("%snewCollection%s", assignmentPrefix, assignmentSuffix);
} else {
throw new RuntimeException("Can't handle variable length type " + type);
}
buffer.decrementIndent();
buffer.printf("}%n");
}